diff --git a/src/pubsub/src/publisher/worker.rs b/src/pubsub/src/publisher/worker.rs index 801e0cbbc3..6f98f4946f 100644 --- a/src/pubsub/src/publisher/worker.rs +++ b/src/pubsub/src/publisher/worker.rs @@ -216,11 +216,17 @@ impl BatchWorker { || self.pending_batch.size() >= self.batching_options.byte_threshold } - // Move pending messages to the pending batch respecting batch thresholds. - pub(crate) fn move_to_batch(&mut self) { + // Move pending messages to the pending batch respecting batch thresholds + // and flush the batch if it is full. + pub(crate) fn move_to_batch_and_maybe_send( + &mut self, + inflight: &mut JoinSet>, + ) { while let Some(publish) = self.pending_msgs.pop_front() { self.pending_batch.push(publish); if self.at_batch_threshold() { + self.pending_batch + .flush(self.client.clone(), self.topic.clone(), inflight); break; } } @@ -240,19 +246,26 @@ impl BatchWorker { // Flush the pending messages by sending the messages in sequential batches. async fn flush_sequential(&mut self, mut inflight: JoinSet>) { self.handle_inflight_join(inflight.join_next().await); - while !self.pending_batch.is_empty() || !self.pending_msgs.is_empty() { - self.move_to_batch(); + while !self.pending_msgs.is_empty() { + self.move_to_batch_and_maybe_send(&mut inflight); + self.handle_inflight_join(inflight.join_next().await); + } + // The final batch may not have been full, send it. + if !self.pending_batch.is_empty() { self.pending_batch .flush(self.client.clone(), self.topic.clone(), &mut inflight); - self.handle_inflight_join(inflight.join_next().await); } + self.handle_inflight_join(inflight.join_next().await); } // Flush the pending batch and pending messages by sending remaining // messages in concurrent batches. async fn flush_concurrent(&mut self, mut inflight: JoinSet>) { - while !self.pending_batch.is_empty() || !self.pending_msgs.is_empty() { - self.move_to_batch(); + while !self.pending_msgs.is_empty() { + self.move_to_batch_and_maybe_send(&mut inflight); + } + // The final batch may not have been full, send it. + if !self.pending_batch.is_empty() { self.pending_batch .flush(self.client.clone(), self.topic.clone(), &mut inflight); } @@ -293,13 +306,9 @@ impl BatchWorker { match msg { Some(ToBatchWorker::Publish(msg)) => { self.pending_msgs.push_back(msg); - self.move_to_batch(); - if self.at_batch_threshold() { - self.pending_batch.flush(self.client.clone(), self.topic.clone(), &mut inflight); - } + self.move_to_batch_and_maybe_send(&mut inflight); }, Some(ToBatchWorker::Flush(tx)) => { - // Send all the batches concurrently. self.flush_concurrent(inflight).await; inflight = JoinSet::new(); let _ = tx.send(()); @@ -367,20 +376,14 @@ impl BatchWorker { tokio::select! { join = inflight.join_next(), if !inflight.is_empty() => { self.handle_inflight_join(join); - self.move_to_batch(); - if self.at_batch_threshold() { - self.pending_batch.flush(self.client.clone(), self.topic.clone(), &mut inflight); - } + self.move_to_batch_and_maybe_send(&mut inflight); } msg = self.rx.recv() => { match msg { Some(ToBatchWorker::Publish(msg)) => { self.pending_msgs.push_back(msg); if inflight.is_empty() { - self.move_to_batch(); - if self.at_batch_threshold() { - self.pending_batch.flush(self.client.clone(), self.topic.clone(), &mut inflight); - } + self.move_to_batch_and_maybe_send(&mut inflight); } }, Some(ToBatchWorker::Flush(tx)) => {