From 99c706a546452e2b3081adc7ce44c882efe7dc07 Mon Sep 17 00:00:00 2001 From: Graham Binns Date: Wed, 29 Oct 2025 17:06:25 +0000 Subject: [PATCH 1/2] Upgrade dependencies where doing so won't introduce breaking changes. --- Cargo.toml | 24 ++++++++++++------------ benchmark/Cargo.toml | 10 +++++----- protocol/Cargo.toml | 10 +++++----- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index dc121266..6b60bc68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,28 +22,28 @@ members = [ [dependencies] -tokio-rustls = { version = "0.26.1" } +tokio-rustls = { version = "0.26.4" } rustls-pemfile = "2.2.0" rabbitmq-stream-protocol = { version = "0.9", path = "protocol" } -tokio = { version = "1.29.1", features = ["full"] } -tokio-util = { version = "0.7.3", features = ["codec"] } -bytes = "1.0.0" -pin-project = { version = "1.0.0" } -tokio-stream = "0.1.11" -futures = "0.3.0" -url = "2.2.2" +tokio = { version = "1.48.0", features = ["full"] } +tokio-util = { version = "0.7.16", features = ["codec"] } +bytes = "1.10.1" +pin-project = { version = "1.1.10" } +tokio-stream = "0.1.17" +futures = "0.3.31" +url = "2.5.7" tracing = "0.1" thiserror = "2.0" -async-trait = "0.1.51" +async-trait = "0.1.89" rand = "0.8" dashmap = "6.1.0" murmur3 = "0.5.2" serde = { version = "1.0", features = ["derive"], optional = true } [dev-dependencies] -tracing-subscriber = "0.3.1" -fake = { version = "4.2.0", features = ['derive'] } -chrono = "0.4.26" +tracing-subscriber = "0.3.20" +fake = { version = "4.4.0", features = ['derive'] } +chrono = "0.4.42" serde_json = "1.0" reqwest = { version = "0.12", features = ["json"] } serde = { version = "1.0", features = ["derive"] } diff --git a/benchmark/Cargo.toml b/benchmark/Cargo.toml index da50371e..38eea6d7 100644 --- a/benchmark/Cargo.toml +++ b/benchmark/Cargo.toml @@ -8,9 +8,9 @@ edition = "2021" [dependencies] rabbitmq-stream-client = { path = "../" } -tracing-subscriber = "0.3.1" +tracing-subscriber = "0.3.20" tracing = "0.1" -tokio = { version = "1.29.1", features = ["full"] } -clap = { version = "4.0.22", features = ["derive"] } -futures = "0.3.0" -async-trait = "0.1.51" +tokio = { version = "1.48.0", features = ["full"] } +clap = { version = "4.5.51", features = ["derive"] } +futures = "0.3.31" +async-trait = "0.1.89" diff --git a/protocol/Cargo.toml b/protocol/Cargo.toml index 7468399c..b0d08dab 100644 --- a/protocol/Cargo.toml +++ b/protocol/Cargo.toml @@ -12,12 +12,12 @@ wasm-bindgen = ["uuid/js"] [dependencies] byteorder = "1" -ordered-float = "4.1.0" +ordered-float = "4.6.0" uuid = "1" -chrono = "0.4.26" -num_enum = "0.7.0" +chrono = "0.4.42" +num_enum = "0.7.5" derive_more = { version = "2.0.1", features = ["full"] } [dev-dependencies] -pretty_assertions = "1.2.0" -fake = { version = "4.0", features = [ "derive", "chrono", "uuid" ] } +pretty_assertions = "1.4.1" +fake = { version = "4.4", features = [ "derive", "chrono", "uuid" ] } From 3ef4728f4e83453620191240f9bfc3df8aa0d381 Mon Sep 17 00:00:00 2001 From: Graham Binns Date: Thu, 30 Oct 2025 11:30:29 +0000 Subject: [PATCH 2/2] Fix a number of lints raised by clippy when running tests. --- examples/ha_producer.rs | 16 +++++------ examples/send_async.rs | 14 +++++----- examples/send_with_confirm.rs | 14 +++++----- examples/simple-consumer.rs | 14 +++++----- examples/superstreams/receive_super_stream.rs | 14 +++++----- examples/superstreams/send_super_stream.rs | 14 +++++----- .../superstreams/send_super_stream_hash.rs | 14 +++++----- .../send_super_stream_routing_key.rs | 14 +++++----- examples/tls_producer.rs | 5 +--- protocol/src/response/mod.rs | 3 +-- src/stream_creator.rs | 27 +++++-------------- tests/common.rs | 9 +++++++ tests/consumer_test.rs | 20 +++++++------- tests/producer_test.rs | 13 +++++---- 14 files changed, 83 insertions(+), 108 deletions(-) diff --git a/examples/ha_producer.rs b/examples/ha_producer.rs index bbcd29fb..ba65ee37 100644 --- a/examples/ha_producer.rs +++ b/examples/ha_producer.rs @@ -101,7 +101,7 @@ impl MyHAProducer { ProducerPublishError::Timeout | ProducerPublishError::Closed => { Box::pin(self.send_with_confirm(message)).await } - _ => return Err(e), + _ => Err(e), }, } } @@ -114,14 +114,12 @@ async fn ensure_stream_exists(environment: &Environment, stream: &str) -> Rabbit .create(stream) .await; - if let Err(e) = create_response { - if let StreamCreateError::Create { stream, status } = e { - match status { - // we can ignore this error because the stream already exists - ResponseCode::StreamAlreadyExists => {} - err => { - panic!("Error creating stream: {:?} {:?}", stream, err); - } + if let Err(StreamCreateError::Create { stream, status }) = create_response { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + panic!("Error creating stream: {:?} {:?}", stream, err); } } } diff --git a/examples/send_async.rs b/examples/send_async.rs index 2e99ad4c..70670659 100644 --- a/examples/send_async.rs +++ b/examples/send_async.rs @@ -35,14 +35,12 @@ async fn main() -> Result<(), Box> { .create(stream) .await; - if let Err(e) = create_response { - if let StreamCreateError::Create { stream, status } = e { - match status { - // we can ignore this error because the stream already exists - ResponseCode::StreamAlreadyExists => {} - err => { - println!("Error creating stream: {:?} {:?}", stream, err); - } + if let Err(StreamCreateError::Create { stream, status }) = create_response { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!("Error creating stream: {:?} {:?}", stream, err); } } } diff --git a/examples/send_with_confirm.rs b/examples/send_with_confirm.rs index 9162c3dc..a3948ae0 100644 --- a/examples/send_with_confirm.rs +++ b/examples/send_with_confirm.rs @@ -13,14 +13,12 @@ async fn main() -> Result<(), Box> { .create(stream) .await; - if let Err(e) = create_response { - if let StreamCreateError::Create { stream, status } = e { - match status { - // we can ignore this error because the stream already exists - ResponseCode::StreamAlreadyExists => {} - err => { - println!("Error creating stream: {:?} {:?}", stream, err); - } + if let Err(StreamCreateError::Create { stream, status }) = create_response { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!("Error creating stream: {:?} {:?}", stream, err); } } } diff --git a/examples/simple-consumer.rs b/examples/simple-consumer.rs index 01ded614..5a0d5a9a 100644 --- a/examples/simple-consumer.rs +++ b/examples/simple-consumer.rs @@ -14,14 +14,12 @@ async fn main() -> Result<(), Box> { .create(stream) .await; - if let Err(e) = create_response { - if let StreamCreateError::Create { stream, status } = e { - match status { - // we can ignore this error because the stream already exists - ResponseCode::StreamAlreadyExists => {} - err => { - println!("Error creating stream: {:?} {:?}", stream, err); - } + if let Err(StreamCreateError::Create { stream, status }) = create_response { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!("Error creating stream: {:?} {:?}", stream, err); } } } diff --git a/examples/superstreams/receive_super_stream.rs b/examples/superstreams/receive_super_stream.rs index 6adbc2c9..3a9472ef 100644 --- a/examples/superstreams/receive_super_stream.rs +++ b/examples/superstreams/receive_super_stream.rs @@ -17,14 +17,12 @@ async fn main() -> Result<(), Box> { .create_super_stream(super_stream, 3, None) .await; - if let Err(e) = create_response { - if let StreamCreateError::Create { stream, status } = e { - match status { - // we can ignore this error because the stream already exists - ResponseCode::StreamAlreadyExists => {} - err => { - println!("Error creating stream: {:?} {:?}", stream, err); - } + if let Err(StreamCreateError::Create { stream, status }) = create_response { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!("Error creating stream: {:?} {:?}", stream, err); } } } diff --git a/examples/superstreams/send_super_stream.rs b/examples/superstreams/send_super_stream.rs index a0318b90..ba46d9c9 100644 --- a/examples/superstreams/send_super_stream.rs +++ b/examples/superstreams/send_super_stream.rs @@ -52,14 +52,12 @@ async fn main() -> Result<(), Box> { .create_super_stream(super_stream, 3, None) .await; - if let Err(e) = create_response { - if let StreamCreateError::Create { stream, status } = e { - match status { - // we can ignore this error because the stream already exists - ResponseCode::StreamAlreadyExists => {} - err => { - println!("Error creating stream: {:?} {:?}", stream, err); - } + if let Err(StreamCreateError::Create { stream, status }) = create_response { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!("Error creating stream: {:?} {:?}", stream, err); } } } diff --git a/examples/superstreams/send_super_stream_hash.rs b/examples/superstreams/send_super_stream_hash.rs index 088f1607..d9a0f865 100644 --- a/examples/superstreams/send_super_stream_hash.rs +++ b/examples/superstreams/send_super_stream_hash.rs @@ -54,14 +54,12 @@ async fn main() -> Result<(), Box> { .create_super_stream(super_stream, 3, None) .await; - if let Err(e) = create_response { - if let StreamCreateError::Create { stream, status } = e { - match status { - // we can ignore this error because the stream already exists - ResponseCode::StreamAlreadyExists => {} - err => { - println!("Error creating stream: {:?} {:?}", stream, err); - } + if let Err(StreamCreateError::Create { stream, status }) = create_response { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!("Error creating stream: {:?} {:?}", stream, err); } } } diff --git a/examples/superstreams/send_super_stream_routing_key.rs b/examples/superstreams/send_super_stream_routing_key.rs index 12145d7d..3701e8ba 100644 --- a/examples/superstreams/send_super_stream_routing_key.rs +++ b/examples/superstreams/send_super_stream_routing_key.rs @@ -54,14 +54,12 @@ async fn main() -> Result<(), Box> { .create_super_stream(super_stream, 3, Some(routing_key)) .await; - if let Err(e) = create_response { - if let StreamCreateError::Create { stream, status } = e { - match status { - // we can ignore this error because the stream already exists - ResponseCode::StreamAlreadyExists => {} - err => { - println!("Error creating stream: {:?} {:?}", stream, err); - } + if let Err(StreamCreateError::Create { stream, status }) = create_response { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!("Error creating stream: {:?} {:?}", stream, err); } } } diff --git a/examples/tls_producer.rs b/examples/tls_producer.rs index dbe8b4b9..1672d1dd 100644 --- a/examples/tls_producer.rs +++ b/examples/tls_producer.rs @@ -50,10 +50,7 @@ async fn main() -> Result<(), Box> { Ok(()) } -async fn start_publisher( - env: Environment, - stream: &String, -) -> Result<(), Box> { +async fn start_publisher(env: Environment, stream: &str) -> Result<(), Box> { let _ = env.stream_creator().create(stream).await; let producer = env.producer().batch_size(BATCH_SIZE).build(stream).await?; diff --git a/protocol/src/response/mod.rs b/protocol/src/response/mod.rs index 4bda7679..e60ea947 100644 --- a/protocol/src/response/mod.rs +++ b/protocol/src/response/mod.rs @@ -229,8 +229,7 @@ mod tests { use crate::{ codec::{Decoder, Encoder}, commands::{ - close::CloseResponse, consumer_update::ConsumerUpdateCommand, - consumer_update_request::ConsumerUpdateRequestCommand, deliver::DeliverCommand, + close::CloseResponse, consumer_update::ConsumerUpdateCommand, deliver::DeliverCommand, exchange_command_versions::ExchangeCommandVersionsResponse, generic::GenericResponse, heart_beat::HeartbeatResponse, metadata::MetadataResponse, metadata_update::MetadataUpdateCommand, open::OpenResponse, diff --git a/src/stream_creator.rs b/src/stream_creator.rs index 5625aee4..f0584c53 100644 --- a/src/stream_creator.rs +++ b/src/stream_creator.rs @@ -40,29 +40,16 @@ impl StreamCreator { number_of_partitions: usize, binding_keys: Option>, ) -> Result<(), StreamCreateError> { - let mut partitions_names = Vec::with_capacity(number_of_partitions); - let mut new_binding_keys: Vec = Vec::with_capacity(number_of_partitions); - - if binding_keys.is_none() { - for i in 0..number_of_partitions { - new_binding_keys.push(i.to_string()); - partitions_names.push(super_stream.to_owned() + "-" + i.to_string().as_str()) - } - } else { - new_binding_keys = binding_keys.unwrap(); - for binding_key in new_binding_keys.iter() { - partitions_names.push(super_stream.to_owned() + "-" + binding_key) - } - } + let binding_keys = binding_keys + .unwrap_or_else(|| (0..number_of_partitions).map(|i| i.to_string()).collect()); + let partition_names = binding_keys + .iter() + .map(|binding_key| format!("{super_stream}-{binding_key}")) + .collect(); let client = self.env.create_client().await?; let response = client - .create_super_stream( - super_stream, - partitions_names, - new_binding_keys, - self.options, - ) + .create_super_stream(super_stream, partition_names, binding_keys, self.options) .await?; client.close().await?; diff --git a/tests/common.rs b/tests/common.rs index d23d7ab3..bfd5d1e6 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -8,6 +8,7 @@ use rabbitmq_stream_protocol::ResponseCode; use serde::Deserialize; use tokio::sync::Semaphore; +#[allow(unused)] pub struct TestClient { pub client: Client, pub stream: String, @@ -24,6 +25,7 @@ impl Drop for Countdown { } } +#[allow(unused)] impl Countdown { pub fn new(n: u32) -> (Self, impl Future + Send) { let sem = Arc::new(Semaphore::new(0)); @@ -41,14 +43,17 @@ pub struct TestEnvironment { pub env: Environment, pub stream: String, pub super_stream: String, + #[allow(unused)] pub partitions: Vec, } impl TestClient { + #[allow(unused)] pub async fn create() -> TestClient { Self::create_with_option(ClientOptions::default()).await } + #[allow(unused)] pub async fn create_with_option(options: ClientOptions) -> TestClient { let stream: String = Faker.fake(); let client = Client::connect(options).await.unwrap(); @@ -64,6 +69,7 @@ impl TestClient { } } + #[allow(unused)] pub async fn create_super_stream() -> TestClient { let super_stream: String = Faker.fake(); let client = Client::connect(ClientOptions::default()).await.unwrap(); @@ -104,6 +110,7 @@ impl Drop for TestClient { } } +#[allow(unused)] impl TestEnvironment { pub async fn create() -> TestEnvironment { let stream: String = Faker.fake(); @@ -203,6 +210,7 @@ pub async fn list_http_connection() -> Vec { .unwrap() } +#[allow(unused)] pub async fn wait_for_named_connection(connection_name: String) -> RabbitConnection { let mut max = 10; while max > 0 { @@ -219,6 +227,7 @@ pub async fn wait_for_named_connection(connection_name: String) -> RabbitConnect panic!("Connection not found. timeout"); } +#[allow(unused)] pub async fn drop_connection(connection: RabbitConnection) { reqwest::Client::new() .delete(format!( diff --git a/tests/consumer_test.rs b/tests/consumer_test.rs index 4ec97ed1..e0ba6ca4 100644 --- a/tests/consumer_test.rs +++ b/tests/consumer_test.rs @@ -107,7 +107,7 @@ async fn super_stream_consumer_test() { let mut received_messages = 0; let handle = super_stream_consumer.handle(); - while let Some(_) = super_stream_consumer.next().await { + while super_stream_consumer.next().await.is_some() { received_messages += 1; if received_messages == 10 { break; @@ -481,7 +481,7 @@ async fn consumer_test_with_filtering() { .lock() .await .iter() - .filter(|item| item == &&"filtering") + .filter(|item| item == &"filtering") .collect::>() .len(); @@ -562,7 +562,7 @@ async fn super_stream_consumer_test_with_filtering() { .lock() .await .iter() - .filter(|item| item == &&"filtering") + .filter(|item| item == &"filtering") .collect::>() .len(); @@ -654,7 +654,7 @@ async fn consumer_test_with_filtering_match_unfiltered() { .lock() .await .iter() - .filter(|item| item == &&"1") + .filter(|item| item == &"1") .collect::>() .len(); @@ -728,7 +728,7 @@ async fn super_stream_single_active_consumer_test() { task::spawn(async move { let received_messages_int = received_message_outer.clone(); let notify_received_messages_inner = notify_received_messages_outer.clone(); - while let Some(_) = super_stream_consumer.next().await { + while super_stream_consumer.next().await.is_some() { let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); if message_count == rec_msg { notify_received_messages_inner.notify_one(); @@ -742,7 +742,7 @@ async fn super_stream_single_active_consumer_test() { task::spawn(async move { let received_messages_int = received_message_outer.clone(); let notify_received_messages_inner = notify_received_messages_outer.clone(); - while let Some(_) = super_stream_consumer_2.next().await { + while super_stream_consumer_2.next().await.is_some() { let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); if message_count == rec_msg { notify_received_messages_inner.notify_one(); @@ -756,7 +756,7 @@ async fn super_stream_single_active_consumer_test() { task::spawn(async move { let received_messages_int = received_message_outer.clone(); let notify_received_messages_inner = notify_received_messages_outer.clone(); - while let Some(_) = super_stream_consumer_3.next().await { + while super_stream_consumer_3.next().await.is_some() { let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); if message_count == rec_msg { notify_received_messages_inner.notify_one(); @@ -887,7 +887,7 @@ async fn super_stream_single_active_consumer_test_with_callback() { task::spawn(async move { let received_messages_int = received_message_outer.clone(); let notify_received_messages_inner = notify_received_messages_outer.clone(); - while let Some(_) = super_stream_consumer.next().await { + while super_stream_consumer.next().await.is_some() { let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); if message_count == rec_msg { notify_received_messages_inner.notify_one(); @@ -901,7 +901,7 @@ async fn super_stream_single_active_consumer_test_with_callback() { task::spawn(async move { let received_messages_int = received_message_outer.clone(); let notify_received_messages_inner = notify_received_messages_outer.clone(); - while let Some(_) = super_stream_consumer_2.next().await { + while super_stream_consumer_2.next().await.is_some() { let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); if message_count == rec_msg { notify_received_messages_inner.notify_one(); @@ -915,7 +915,7 @@ async fn super_stream_single_active_consumer_test_with_callback() { task::spawn(async move { let received_messages_int = received_message_outer.clone(); let notify_received_messages_inner = notify_received_messages_outer.clone(); - while let Some(_) = super_stream_consumer_3.next().await { + while super_stream_consumer_3.next().await.is_some() { let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); if message_count == rec_msg { notify_received_messages_inner.notify_one(); diff --git a/tests/producer_test.rs b/tests/producer_test.rs index 99a464e3..0ae13313 100644 --- a/tests/producer_test.rs +++ b/tests/producer_test.rs @@ -251,12 +251,15 @@ async fn producer_send_with_callback_can_drop() { // A non copy structure struct Foo; - let f = Foo; + let f = Foo {}; producer .send( Message::builder().body(b"message".to_vec()).build(), move |_| { - // this callback is an FnOnce, so we can drop a value here + // This callback is an FnOnce, so we can drop a value here + // Foo doesn't actually implement Drop, so Clippy will complain + // if we don't squelch it. + #[allow(clippy::drop_non_drop)] drop(f); async {} }, @@ -676,11 +679,7 @@ async fn super_stream_producer_send_filtering_message() { .build(); let closed = super_stream_producer.send(message, |_| async move {}).await; - - match closed { - Ok(_) => assert!(true), - Err(_) => assert!(false), - } + assert!(closed.is_ok()) } #[tokio::test(flavor = "multi_thread")]