Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 31 additions & 15 deletions communication/src/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,22 +148,38 @@
/// Result contains connections `[my_index + 1, addresses.len() - 1]`.
pub fn await_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
let mut results: Vec<_> = (0..(addresses.len() - my_index - 1)).map(|_| None).collect();
let listener = TcpListener::bind(&addresses[my_index][..])?;

for _ in (my_index + 1) .. addresses.len() {
let mut stream = listener.accept()?.0;
stream.set_nodelay(true).expect("set_nodelay call failed");
let mut buffer = [0u8;16];
stream.read_exact(&mut buffer)?;
let mut cursor = io::Cursor::new(buffer);
let magic = cursor.read_u64::<ByteOrder>().expect("failed to decode magic");
if magic != HANDSHAKE_MAGIC {
return Err(io::Error::new(io::ErrorKind::InvalidData,
"received incorrect timely handshake"));

// We may have multiple addresses to bind to, and will listen on each of them until all received.
let listeners = addresses[my_index].split_whitespace().map(|addr| TcpListener::bind(addr)).collect::<Result<Vec<_>>>()?;

Check warning on line 153 in communication/src/networking.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

redundant closure
for listener in listeners.iter() { listener.set_nonblocking(true).expect("Couldn't set nonblocking"); }

// Until we have all intended connections, poll each listener, sleeping briefly if none have accepted a new stream.
while results.iter().any(Option::is_none) {
let mut received = false;
for listener in listeners.iter() {
match listener.accept() {
Ok((mut stream, _)) => {
stream.set_nodelay(true).expect("set_nodelay call failed");
let mut buffer = [0u8;16];
stream.read_exact(&mut buffer)?;
let mut cursor = io::Cursor::new(buffer);
let magic = cursor.read_u64::<ByteOrder>().expect("failed to decode magic");
if magic != HANDSHAKE_MAGIC {
return Err(io::Error::new(io::ErrorKind::InvalidData,
"received incorrect timely handshake"));
}
let identifier = cursor.read_u64::<ByteOrder>().expect("failed to decode worker index") as usize;
results[identifier - my_index - 1] = Some(stream);
if noisy { println!("worker {}:\tconnection from worker {}", my_index, identifier); }
received = true;
}
Err(e) => { if e.kind() != io::ErrorKind::WouldBlock { return Err(e); } }
}
}
if !received {
println!("awaiting connections (at {:?}/{:?})", results.iter().filter(|x| x.is_some()).count(), results.len());
sleep(Duration::from_secs(1));
}
let identifier = cursor.read_u64::<ByteOrder>().expect("failed to decode worker index") as usize;
results[identifier - my_index - 1] = Some(stream);
if noisy { println!("worker {}:\tconnection from worker {}", my_index, identifier); }
}

Ok(results)
Expand Down
Loading