Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,28 @@ members = [


[dependencies]
tokio-rustls = { version = "0.26.1" }
tokio-rustls = { version = "0.26.4" }
rustls-pemfile = "2.2.0"
rabbitmq-stream-protocol = { version = "0.9", path = "protocol" }
tokio = { version = "1.29.1", features = ["full"] }
tokio-util = { version = "0.7.3", features = ["codec"] }
bytes = "1.0.0"
pin-project = { version = "1.0.0" }
tokio-stream = "0.1.11"
futures = "0.3.0"
url = "2.2.2"
tokio = { version = "1.48.0", features = ["full"] }
tokio-util = { version = "0.7.16", features = ["codec"] }
bytes = "1.10.1"
pin-project = { version = "1.1.10" }
tokio-stream = "0.1.17"
futures = "0.3.31"
url = "2.5.7"
tracing = "0.1"
thiserror = "2.0"
async-trait = "0.1.51"
async-trait = "0.1.89"
rand = "0.8"
dashmap = "6.1.0"
murmur3 = "0.5.2"
serde = { version = "1.0", features = ["derive"], optional = true }

[dev-dependencies]
tracing-subscriber = "0.3.1"
fake = { version = "4.2.0", features = ['derive'] }
chrono = "0.4.26"
tracing-subscriber = "0.3.20"
fake = { version = "4.4.0", features = ['derive'] }
chrono = "0.4.42"
serde_json = "1.0"
reqwest = { version = "0.12", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
Expand Down
10 changes: 5 additions & 5 deletions benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ edition = "2021"
[dependencies]

rabbitmq-stream-client = { path = "../" }
tracing-subscriber = "0.3.1"
tracing-subscriber = "0.3.20"
tracing = "0.1"
tokio = { version = "1.29.1", features = ["full"] }
clap = { version = "4.0.22", features = ["derive"] }
futures = "0.3.0"
async-trait = "0.1.51"
tokio = { version = "1.48.0", features = ["full"] }
clap = { version = "4.5.51", features = ["derive"] }
futures = "0.3.31"
async-trait = "0.1.89"
16 changes: 7 additions & 9 deletions examples/ha_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl MyHAProducer {
ProducerPublishError::Timeout | ProducerPublishError::Closed => {
Box::pin(self.send_with_confirm(message)).await
}
_ => return Err(e),
_ => Err(e),
},
}
}
Expand All @@ -114,14 +114,12 @@ async fn ensure_stream_exists(environment: &Environment, stream: &str) -> Rabbit
.create(stream)
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
panic!("Error creating stream: {:?} {:?}", stream, err);
}
if let Err(StreamCreateError::Create { stream, status }) = create_response {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
panic!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
Expand Down
14 changes: 6 additions & 8 deletions examples/send_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.create(stream)
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
if let Err(StreamCreateError::Create { stream, status }) = create_response {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
Expand Down
14 changes: 6 additions & 8 deletions examples/send_with_confirm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.create(stream)
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
if let Err(StreamCreateError::Create { stream, status }) = create_response {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
Expand Down
14 changes: 6 additions & 8 deletions examples/simple-consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.create(stream)
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
if let Err(StreamCreateError::Create { stream, status }) = create_response {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
Expand Down
14 changes: 6 additions & 8 deletions examples/superstreams/receive_super_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.create_super_stream(super_stream, 3, None)
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
if let Err(StreamCreateError::Create { stream, status }) = create_response {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
Expand Down
14 changes: 6 additions & 8 deletions examples/superstreams/send_super_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.create_super_stream(super_stream, 3, None)
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
if let Err(StreamCreateError::Create { stream, status }) = create_response {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
Expand Down
14 changes: 6 additions & 8 deletions examples/superstreams/send_super_stream_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.create_super_stream(super_stream, 3, None)
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
if let Err(StreamCreateError::Create { stream, status }) = create_response {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
Expand Down
14 changes: 6 additions & 8 deletions examples/superstreams/send_super_stream_routing_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.create_super_stream(super_stream, 3, Some(routing_key))
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
if let Err(StreamCreateError::Create { stream, status }) = create_response {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
Expand Down
5 changes: 1 addition & 4 deletions examples/tls_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}

async fn start_publisher(
env: Environment,
stream: &String,
) -> Result<(), Box<dyn std::error::Error>> {
async fn start_publisher(env: Environment, stream: &str) -> Result<(), Box<dyn std::error::Error>> {
let _ = env.stream_creator().create(stream).await;

let producer = env.producer().batch_size(BATCH_SIZE).build(stream).await?;
Expand Down
10 changes: 5 additions & 5 deletions protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ wasm-bindgen = ["uuid/js"]

[dependencies]
byteorder = "1"
ordered-float = "4.1.0"
ordered-float = "4.6.0"
uuid = "1"
chrono = "0.4.26"
num_enum = "0.7.0"
chrono = "0.4.42"
num_enum = "0.7.5"
derive_more = { version = "2.0.1", features = ["full"] }

[dev-dependencies]
pretty_assertions = "1.2.0"
fake = { version = "4.0", features = [ "derive", "chrono", "uuid" ] }
pretty_assertions = "1.4.1"
fake = { version = "4.4", features = [ "derive", "chrono", "uuid" ] }
3 changes: 1 addition & 2 deletions protocol/src/response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,7 @@ mod tests {
use crate::{
codec::{Decoder, Encoder},
commands::{
close::CloseResponse, consumer_update::ConsumerUpdateCommand,
consumer_update_request::ConsumerUpdateRequestCommand, deliver::DeliverCommand,
close::CloseResponse, consumer_update::ConsumerUpdateCommand, deliver::DeliverCommand,
exchange_command_versions::ExchangeCommandVersionsResponse, generic::GenericResponse,
heart_beat::HeartbeatResponse, metadata::MetadataResponse,
metadata_update::MetadataUpdateCommand, open::OpenResponse,
Expand Down
27 changes: 7 additions & 20 deletions src/stream_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,29 +40,16 @@ impl StreamCreator {
number_of_partitions: usize,
binding_keys: Option<Vec<String>>,
) -> Result<(), StreamCreateError> {
let mut partitions_names = Vec::with_capacity(number_of_partitions);
let mut new_binding_keys: Vec<String> = Vec::with_capacity(number_of_partitions);

if binding_keys.is_none() {
for i in 0..number_of_partitions {
new_binding_keys.push(i.to_string());
partitions_names.push(super_stream.to_owned() + "-" + i.to_string().as_str())
}
} else {
new_binding_keys = binding_keys.unwrap();
for binding_key in new_binding_keys.iter() {
partitions_names.push(super_stream.to_owned() + "-" + binding_key)
}
}
let binding_keys = binding_keys
.unwrap_or_else(|| (0..number_of_partitions).map(|i| i.to_string()).collect());
let partition_names = binding_keys
.iter()
.map(|binding_key| format!("{super_stream}-{binding_key}"))
.collect();

let client = self.env.create_client().await?;
let response = client
.create_super_stream(
super_stream,
partitions_names,
new_binding_keys,
self.options,
)
.create_super_stream(super_stream, partition_names, binding_keys, self.options)
.await?;
client.close().await?;

Expand Down
9 changes: 9 additions & 0 deletions tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use rabbitmq_stream_protocol::ResponseCode;
use serde::Deserialize;
use tokio::sync::Semaphore;

#[allow(unused)]
pub struct TestClient {
pub client: Client,
pub stream: String,
Expand All @@ -24,6 +25,7 @@ impl Drop for Countdown {
}
}

#[allow(unused)]
impl Countdown {
pub fn new(n: u32) -> (Self, impl Future + Send) {
let sem = Arc::new(Semaphore::new(0));
Expand All @@ -41,14 +43,17 @@ pub struct TestEnvironment {
pub env: Environment,
pub stream: String,
pub super_stream: String,
#[allow(unused)]
pub partitions: Vec<String>,
}

impl TestClient {
#[allow(unused)]
pub async fn create() -> TestClient {
Self::create_with_option(ClientOptions::default()).await
}

#[allow(unused)]
pub async fn create_with_option(options: ClientOptions) -> TestClient {
let stream: String = Faker.fake();
let client = Client::connect(options).await.unwrap();
Expand All @@ -64,6 +69,7 @@ impl TestClient {
}
}

#[allow(unused)]
pub async fn create_super_stream() -> TestClient {
let super_stream: String = Faker.fake();
let client = Client::connect(ClientOptions::default()).await.unwrap();
Expand Down Expand Up @@ -104,6 +110,7 @@ impl Drop for TestClient {
}
}

#[allow(unused)]
impl TestEnvironment {
pub async fn create() -> TestEnvironment {
let stream: String = Faker.fake();
Expand Down Expand Up @@ -203,6 +210,7 @@ pub async fn list_http_connection() -> Vec<RabbitConnection> {
.unwrap()
}

#[allow(unused)]
pub async fn wait_for_named_connection(connection_name: String) -> RabbitConnection {
let mut max = 10;
while max > 0 {
Expand All @@ -219,6 +227,7 @@ pub async fn wait_for_named_connection(connection_name: String) -> RabbitConnect
panic!("Connection not found. timeout");
}

#[allow(unused)]
pub async fn drop_connection(connection: RabbitConnection) {
reqwest::Client::new()
.delete(format!(
Expand Down
Loading
Loading