Skip to content

Commit 0e44efb

Browse files
author
Leonidas Loucas
committed
test: Add first test, and timeout so we dont hang on bad test
1 parent 4d2f7da commit 0e44efb

File tree

4 files changed

+86
-3
lines changed

4 files changed

+86
-3
lines changed

rust/lib/srpc/client/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,4 @@ path = "examples/rust_client_example2.rs"
4343
required-features = []
4444

4545
[dev-dependencies]
46-
tokio = { version = "1", features = ["full", "macros"] }
46+
tokio = { version = "1", features = ["full", "macros", "test-util"] }

rust/lib/srpc/client/src/lib.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use tokio_util::codec::{FramedRead, LinesCodec};
1515
use tracing::debug;
1616

1717
mod chunk_limiter;
18+
#[cfg(test)]
19+
mod tests;
1820

1921
// Custom error type
2022
#[derive(Debug)]
@@ -40,13 +42,19 @@ pub struct ClientConfig {
4042
pub struct ReceiveOptions {
4143
channel_buffer_size: usize,
4244
max_chunk_size: usize,
45+
read_next_line_duration: Duration,
4346
}
4447

4548
impl ReceiveOptions {
46-
pub fn new(channel_buffer_size: usize, max_chunk_size: usize) -> Self {
49+
pub fn new(
50+
channel_buffer_size: usize,
51+
max_chunk_size: usize,
52+
read_next_line_duration: Duration,
53+
) -> Self {
4754
ReceiveOptions {
4855
channel_buffer_size,
4956
max_chunk_size,
57+
read_next_line_duration,
5058
}
5159
}
5260
}
@@ -56,6 +64,7 @@ impl Default for ReceiveOptions {
5664
ReceiveOptions {
5765
channel_buffer_size: 100,
5866
max_chunk_size: 16384,
67+
read_next_line_duration: Duration::from_secs(10),
5968
}
6069
}
6170
}
@@ -251,14 +260,15 @@ where
251260
let stream = Arc::clone(&self.stream);
252261
let (tx, rx) = mpsc::channel(opts.channel_buffer_size);
253262
let max_chunk_size = opts.max_chunk_size;
263+
let read_next_line_duration = opts.read_next_line_duration;
254264

255265
tokio::spawn(async move {
256266
let mut guard = stream.lock().await;
257267
let limited_reader = ChunkLimiter::new(&mut *guard, max_chunk_size);
258268
let buf_reader = BufReader::new(limited_reader);
259269
let mut framed = FramedRead::new(buf_reader, LinesCodec::new());
260270

261-
while let Some(line_res) = framed.next().await {
271+
while let Ok(Some(line_res)) = timeout(read_next_line_duration, framed.next()).await {
262272
let line_res = line_res.map_err(|e| Box::new(e) as Box<dyn Error + Send>);
263273

264274
match line_res {

rust/lib/srpc/client/src/tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
mod lib;
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
use std::error::Error;
2+
3+
use crate::{ClientConfig, ConnectedClient, ReceiveOptions};
4+
5+
use tokio::{
6+
io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream},
7+
sync::mpsc,
8+
};
9+
10+
fn setup_test_client() -> (ConnectedClient<DuplexStream>, DuplexStream) {
11+
let (client_stream, server_stream) = duplex(1024);
12+
13+
let config = ClientConfig::new("example.com", 443, "/", "", "");
14+
(ConnectedClient::new(config, client_stream), server_stream)
15+
}
16+
17+
fn one_message() -> impl FnMut(&str) -> bool {
18+
let mut first_call = true;
19+
move |_msg: &str| {
20+
if first_call {
21+
first_call = false;
22+
true
23+
} else {
24+
false
25+
}
26+
}
27+
}
28+
29+
async fn check_message(
30+
server_message: &str,
31+
rx: &mut mpsc::Receiver<Result<String, Box<dyn Error + Send>>>,
32+
) {
33+
if let Some(Ok(received_msg)) = rx.recv().await {
34+
assert_eq!(received_msg, server_message.trim());
35+
} else {
36+
panic!("Did not receive expected message from server");
37+
}
38+
}
39+
40+
async fn check_server(
41+
client_message: &str,
42+
server_stream: &mut DuplexStream,
43+
) -> Result<(), Box<dyn Error>> {
44+
let mut server_buf = vec![0u8; client_message.len()];
45+
server_stream.read_exact(&mut server_buf).await?;
46+
assert_eq!(&server_buf, client_message.as_bytes());
47+
Ok(())
48+
}
49+
50+
#[tokio::test(start_paused = true)]
51+
async fn test_connected_client_send_and_receive() -> Result<(), Box<dyn Error>> {
52+
let (connected_client, mut server_stream) = setup_test_client();
53+
54+
let client_message = "Hello from client\n";
55+
connected_client.send_message(client_message).await?;
56+
57+
check_server(client_message, &mut server_stream).await?;
58+
59+
let server_message = "Hello from server\n";
60+
server_stream.write_all(server_message.as_bytes()).await?;
61+
62+
let should_continue = one_message();
63+
64+
let opts = ReceiveOptions::default();
65+
let mut rx = connected_client
66+
.receive_message(false, should_continue, &opts)
67+
.await?;
68+
69+
check_message(server_message, &mut rx).await;
70+
71+
Ok(())
72+
}

0 commit comments

Comments
 (0)