Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 23 additions & 20 deletions src/pubsub/src/publisher/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,17 @@ impl BatchWorker {
|| self.pending_batch.size() >= self.batching_options.byte_threshold
}

// Move pending messages to the pending batch respecting batch thresholds.
pub(crate) fn move_to_batch(&mut self) {
// Move pending messages to the pending batch respecting batch thresholds
// and flush the batch if it is full.
pub(crate) fn move_to_batch_and_maybe_send(
&mut self,
inflight: &mut JoinSet<Result<(), gax::error::Error>>,
) {
while let Some(publish) = self.pending_msgs.pop_front() {
self.pending_batch.push(publish);
if self.at_batch_threshold() {
self.pending_batch
.flush(self.client.clone(), self.topic.clone(), inflight);
break;
}
}
Expand All @@ -240,19 +246,26 @@ impl BatchWorker {
// Flush the pending messages by sending the messages in sequential batches.
async fn flush_sequential(&mut self, mut inflight: JoinSet<Result<(), gax::error::Error>>) {
self.handle_inflight_join(inflight.join_next().await);
while !self.pending_batch.is_empty() || !self.pending_msgs.is_empty() {
self.move_to_batch();
while !self.pending_msgs.is_empty() {
self.move_to_batch_and_maybe_send(&mut inflight);
self.handle_inflight_join(inflight.join_next().await);
}
// The final batch may not have been full, send it.
if !self.pending_batch.is_empty() {
self.pending_batch
.flush(self.client.clone(), self.topic.clone(), &mut inflight);
self.handle_inflight_join(inflight.join_next().await);
}
self.handle_inflight_join(inflight.join_next().await);
}

// Flush the pending batch and pending messages by sending remaining
// messages in concurrent batches.
async fn flush_concurrent(&mut self, mut inflight: JoinSet<Result<(), gax::error::Error>>) {
while !self.pending_batch.is_empty() || !self.pending_msgs.is_empty() {
self.move_to_batch();
while !self.pending_msgs.is_empty() {
self.move_to_batch_and_maybe_send(&mut inflight);
}
// The final batch may not have been full, send it.
if !self.pending_batch.is_empty() {
self.pending_batch
.flush(self.client.clone(), self.topic.clone(), &mut inflight);
}
Expand Down Expand Up @@ -293,13 +306,9 @@ impl BatchWorker {
match msg {
Some(ToBatchWorker::Publish(msg)) => {
self.pending_msgs.push_back(msg);
self.move_to_batch();
if self.at_batch_threshold() {
self.pending_batch.flush(self.client.clone(), self.topic.clone(), &mut inflight);
}
self.move_to_batch_and_maybe_send(&mut inflight);
},
Some(ToBatchWorker::Flush(tx)) => {
// Send all the batches concurrently.
self.flush_concurrent(inflight).await;
inflight = JoinSet::new();
let _ = tx.send(());
Expand Down Expand Up @@ -367,20 +376,14 @@ impl BatchWorker {
tokio::select! {
join = inflight.join_next(), if !inflight.is_empty() => {
self.handle_inflight_join(join);
self.move_to_batch();
if self.at_batch_threshold() {
self.pending_batch.flush(self.client.clone(), self.topic.clone(), &mut inflight);
}
self.move_to_batch_and_maybe_send(&mut inflight);
}
msg = self.rx.recv() => {
match msg {
Some(ToBatchWorker::Publish(msg)) => {
self.pending_msgs.push_back(msg);
if inflight.is_empty() {
self.move_to_batch();
if self.at_batch_threshold() {
self.pending_batch.flush(self.client.clone(), self.topic.clone(), &mut inflight);
}
self.move_to_batch_and_maybe_send(&mut inflight);
}
},
Some(ToBatchWorker::Flush(tx)) => {
Expand Down
Loading