diff --git a/communication/src/networking.rs b/communication/src/networking.rs index f06efe68e..5782291e7 100644 --- a/communication/src/networking.rs +++ b/communication/src/networking.rs @@ -148,22 +148,38 @@ pub fn start_connections(addresses: Arc>, my_index: usize, noisy: bo /// Result contains connections `[my_index + 1, addresses.len() - 1]`. pub fn await_connections(addresses: Arc>, my_index: usize, noisy: bool) -> Result>> { let mut results: Vec<_> = (0..(addresses.len() - my_index - 1)).map(|_| None).collect(); - let listener = TcpListener::bind(&addresses[my_index][..])?; - - for _ in (my_index + 1) .. addresses.len() { - let mut stream = listener.accept()?.0; - stream.set_nodelay(true).expect("set_nodelay call failed"); - let mut buffer = [0u8;16]; - stream.read_exact(&mut buffer)?; - let mut cursor = io::Cursor::new(buffer); - let magic = cursor.read_u64::().expect("failed to decode magic"); - if magic != HANDSHAKE_MAGIC { - return Err(io::Error::new(io::ErrorKind::InvalidData, - "received incorrect timely handshake")); + + // We may have multiple addresses to bind to, and will listen on each of them until all received. + let listeners = addresses[my_index].split_whitespace().map(|addr| TcpListener::bind(addr)).collect::>>()?; + for listener in listeners.iter() { listener.set_nonblocking(true).expect("Couldn't set nonblocking"); } + + // Until we have all intended connections, poll each listener, sleeping briefly if none have accepted a new stream. + while results.iter().any(Option::is_none) { + let mut received = false; + for listener in listeners.iter() { + match listener.accept() { + Ok((mut stream, _)) => { + stream.set_nodelay(true).expect("set_nodelay call failed"); + let mut buffer = [0u8;16]; + stream.read_exact(&mut buffer)?; + let mut cursor = io::Cursor::new(buffer); + let magic = cursor.read_u64::().expect("failed to decode magic"); + if magic != HANDSHAKE_MAGIC { + return Err(io::Error::new(io::ErrorKind::InvalidData, + "received incorrect timely handshake")); + } + let identifier = cursor.read_u64::().expect("failed to decode worker index") as usize; + results[identifier - my_index - 1] = Some(stream); + if noisy { println!("worker {}:\tconnection from worker {}", my_index, identifier); } + received = true; + } + Err(e) => { if e.kind() != io::ErrorKind::WouldBlock { return Err(e); } } + } + } + if !received { + println!("awaiting connections (at {:?}/{:?})", results.iter().filter(|x| x.is_some()).count(), results.len()); + sleep(Duration::from_secs(1)); } - let identifier = cursor.read_u64::().expect("failed to decode worker index") as usize; - results[identifier - my_index - 1] = Some(stream); - if noisy { println!("worker {}:\tconnection from worker {}", my_index, identifier); } } Ok(results)