buildwithalex

dev topics

Fly's distributed systems challenge - Echo

|

Fly’s distributed system challenge Gossip Glomers is a series of challenges built on top of a platform called Maelstrom.

To start with, we’re only going to look at the first part of the challenge, where our node implementation has to respond to an ‘echo’ message that Maelstrom sends. The node has to respond with a message ‘echo_ok’. The original challenge is meant to be implemented in ‘go’, but let’s see how far we can get by using Rust.

I’ve decided to name my node implementation ‘icebreaker’.

lib.rs
  
use std::io::Write;
use anyhow::Context;

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
pub struct Message<Payload> {
    pub src: String,
    pub dest: String,
    pub body: Body<Payload>,
}

impl Message<Payload> {
    pub fn reply(&self, id: Option<usize>) -> Self {
        return Message {
            src: self.dest.clone(),
            dest: self.src.clone(),
            body: Body {
                msg_id: id,
                in_reply_to: self.body.msg_id,
                payload: self.body.payload.clone(),
            },
        };
    }

    pub fn send(self, out: &mut std::io::Stdout) -> anyhow::Result<()> {
        let mut out_lock = out.lock();
        serde_json::to_writer(&mut out_lock, &self).context("failed to write to stdout")?;
        out_lock.write_all(b"\n")?;

        return Ok(());
    }
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Body<Payload> {
    pub msg_id: Option<usize>,      // a unique integer identifier
    pub in_reply_to: Option<usize>, // for req/response, the msgId of the request
    #[serde(flatten)]
    pub payload: Payload,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type")]
#[serde(rename_all = "snake_case")]
pub enum Payload {
    Init {
        node_id: String,
        node_ids: Vec<String>,
    },
    InitOk,
    Echo {
        echo: String,
    },
    EchoOk {
        echo: String,
    },
}

#[derive(Debug)]
pub struct Node {
    pub node_id: String,
    pub node_ids: Vec<String>,
}

impl Node {
    pub fn new(msg: &Message<Payload>) -> Self {
        if let Payload::Init { node_id, node_ids } = &msg.body.payload {
            return Self {
                node_id: node_id.clone(),
                node_ids: node_ids.clone(),
            };
        }

        panic!("Payload is not of type init");
    }

    pub fn handle(&self, msg: &Message<Payload>, mut out: std::io::Stdout) -> anyhow::Result<()> {
        let mut reply = msg.reply(Some(0));
        match reply.body.payload {
            Payload::Echo { echo }=> {
                reply.body.payload = Payload::EchoOk { echo };
                reply.send(&mut out)?;
            },
            _ => println!("unknown msg type {:?}", msg.body.payload),
        }
        Ok(())
    }
} 
echo.rs
  
use anyhow::Context;
use icebreaker::*;
use std::io::BufRead;

fn main() -> anyhow::Result<()> {
    let mut lines = std::io::stdin().lines();
    let first_line = lines
        .next()
        .expect("no message received")
        .context("failed to read message from stdin")?;
    drop(lines);

    let init_msg: Message<Payload> = serde_json::from_str(&first_line)?;

    let node = Node::new(&init_msg);

    let mut init_reply = init_msg.reply(Some(0));
    init_reply.body.payload = Payload::InitOk;

    init_reply
        .send(&mut std::io::stdout())
        .context("failed to send reply")?;

    let (tx, rx) = std::sync::mpsc::channel();

    let join_handle = std::thread::spawn(move || {
        let stdin_lock = std::io::stdin().lock();
        for line in stdin_lock.lines() {
            let line = line.context("failed to read line from stdin")?;
            let message: Message<Payload> =
                serde_json::from_str(&line).context("failed to deserialize input")?;

            let _  = tx.send(message);
        }

        return Ok::<_, anyhow::Error>(());
    });

    for message in rx {
        node.handle(&message, std::io::stdout()).context("message handling by node failed")?;
    }

    join_handle.join().expect("thread failed").context("thread failed")?;

    Ok(())
}