Hobyte

Docker Peer Discovery

Distributed Systems can be quiet challenging. For example, in my distributed Systems course at University, I had to develop a distributed System that is fault tolerant. The backend had to run in three containers in parallel and had to survive a failure of one container.

I decided to develop a collaborative markdown editor, where the frontend can connect to one of the the backends and would receive the changes from all other frontends. But for this to work, the backends have to synchronize the changes between all the instances. But how do they know each other?

Meet:

Docker Compose and DNS

For each docker compose service, you can define, how many containers are started by setting the scale parameter:

services:
  backend:
    image: backend
    scale: 3

This will create three containers of the backend service. And for all other containers, the backend is reachable under the hostname backend. This is the base for my peer discovery algorithm, because a dns query for backend will return the IPs of all backend containers:

$ nslookup backend-node
Server:         127.0.0.11
Address:        127.0.0.11#53

Non-authoritative answer:
Name:   backend-node
Address: 172.18.0.2
Name:   backend-node
Address: 172.18.0.3
Name:   backend-node
Address: 172.18.0.4

Connection to all Nodes

Now, we know the IPs of all other instances and just need to create the connections. I decided to use WebSockets, because they allow for bidirectional communication. If we would connect to all other instances, there would be two connections between each of the services. To prevent this, I use an algorithm to filter the IP addresses so only one connection is established:

  1. Get all IPs of the backend service

  2. Only keep the IPs bigger than the own IP

  3. connect to these IPs

  4. remove all connections where the peer doesn’t exist any more

As cou can see in this diagram, this prevents double connections, but ensures, that all nodes are connected.

connections.drawio

I rust, this can be implemented like this:

let own_ip = get_own_ip();
    loop {
        tracing::info!("Create new connections");

        let mut peers: HashMap<IpAddr, JoinHandle<()>> = HashMap::new();
        let peer_ips = get_peer_ips().unwrap();

        // keep only ips bigger than own ip
        let peer_ips = peer_ips
            .into_iter()
            .filter(|ip| ip > &own_ip)
            .collect::<Vec<IpAddr>>();

        // remove all peers that don't exist any more
        for (ip, handle) in (&peers).into_iter() {
            if !(peer_ips.contains(&ip)) {
                handle.abort();
            }
        }

        // start new task for new ips
        tracing::info!("Connect with new peers: {:?}", peer_ips);
        for ip in peer_ips.iter() {
            if !(peers.contains_key(ip)) {
                peers.insert(ip.clone(), tokio::spawn(connect_with_peer(ip.clone())));
            }
        }

        // sleep unitl next check
        sleep(Duration::from_secs(10)).await;
    }

This code will get the own IP and all IPs from the backend. Then it filters out the own IP and all that are smaller. For the remaining IPs, a websocket connection is created. Running this code every 10 second makes sure, that a connection to new containers is created and that old connections are removed.

Handling the connection

As second part, I created a websocket server with axum. In this example, It just prints out all received messages and responds with ping if ping is received:

pub async fn start_server() {
    let app = Router::new()
        .route("/", get(root_handler))
        .route("/ws", get(websocket_handler));
    let listener = tokio::net::TcpListener::bind("0.0.0.0:5000").await.unwrap();

    axum::serve(
        listener,
        app.into_make_service_with_connect_info::<SocketAddr>(),
    )
    .await
    .unwrap();
}

async fn websocket_handler(
    ws: WebSocketUpgrade,
    ConnectInfo(addr): ConnectInfo<SocketAddr>,
) -> impl IntoResponse {
    ws.on_upgrade(move |socket| handle_socket(socket, addr))
}

async fn handle_socket(mut socket: WebSocket, addr: SocketAddr) {
    let client_ip = addr.ip().to_string();

    tracing::info!("WebSocket connection with {} established", client_ip);
    if let Err(e) = socket
        .send(axum::extract::ws::Message::Text(Utf8Bytes::from_static(
            "Connected",
        )))
        .await
    {
        tracing::warn!("Error sending message: {}", e);
        return;
    }

    while let Some(Ok(msg)) = socket.recv().await {
        match msg {
            Message::Text(text) => {
                tracing::info!("Received text from {}: {}", client_ip, text);
            }
            Message::Ping(_) => {
                tracing::info!("Received ping from {}", client_ip);
                socket.send(Message::Pong(Bytes::new())).await.unwrap();
            }
            Message::Close(_) => {
                tracing::info!("Received close from {}", client_ip);
                break;
            }
            _ => {}
        }
    }
    tracing::info!("Connection closed with {}", client_ip);
}

Conclusion

With this setup, you can connect all backends to receive messages. This is perfect for distributed applications, but also works well for IoT devices. The only prerequisite is a way to get the IPs of all the nodes. I used the docker compose dns feature, but other methods are also possible.

The full example code is available on GitLab. Check it out and test it if you want.