Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/pubsub/src/publisher/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ mod tests {
// If we hold on to the handles returned from the publisher, it should
// be safe to drop the publisher and .await on the handles.
let mut mock = MockGapicPublisher::new();
mock.expect_publish().return_once(publish_ok);
mock.expect_publish().times(2).returning(publish_ok);

let client = GapicPublisher::from_stub(mock);
let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string())
.set_message_count_threshold(1000_u32)
Expand All @@ -424,13 +425,19 @@ mod tests {
let messages = [
PubsubMessage::new().set_data("hello"),
PubsubMessage::new().set_data("world"),
PubsubMessage::new()
.set_data("hello")
.set_ordering_key("key"),
PubsubMessage::new()
.set_data("world")
.set_ordering_key("key"),
];
let mut handles = Vec::new();
for msg in messages {
let handle = publisher.publish(msg.clone());
handles.push((msg, handle));
}
drop(publisher); // This should trigger the batch to send, no delay.
drop(publisher); // This should trigger the publisher to send all pending messages.

for (id, rx) in handles.into_iter() {
let got = rx.await.expect("expected message id");
Expand Down
62 changes: 35 additions & 27 deletions src/pubsub/src/publisher/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,18 +164,8 @@ impl Worker {
}
}
None => {
// The sender has been dropped send batch and stop running.
// This isn't guaranteed to execute if a user does not .await on the
// corresponding PublishHandles for the batch and the program ends.
let mut flush_set = JoinSet::new();
for (_, batch_worker) in batch_workers.iter_mut() {
let (tx, rx) = oneshot::channel();
batch_worker
.send(ToBatchWorker::Flush(tx))
.expect(batch_worker_error_msg);
flush_set.spawn(rx);
}
flush_set.join_all().await;
// By dropping the BatchWorker Senders, they will individually handle the
// shutdown procedures.
break;
}
}
Expand Down Expand Up @@ -247,6 +237,28 @@ impl BatchWorker {
}
}

// Flush the pending messages by sending the messages in sequential batches.
async fn flush_sequential(&mut self, mut inflight: JoinSet<Result<(), gax::error::Error>>) {
self.handle_inflight_join(inflight.join_next().await);
while !self.pending_batch.is_empty() || !self.pending_msgs.is_empty() {
self.move_to_batch();
self.pending_batch
.flush(self.client.clone(), self.topic.clone(), &mut inflight);
self.handle_inflight_join(inflight.join_next().await);
}
}

// Flush the pending batch and pending messages by sending remaining
// messages in concurrent batches.
async fn flush_concurrent(&mut self, mut inflight: JoinSet<Result<(), gax::error::Error>>) {
while !self.pending_batch.is_empty() || !self.pending_msgs.is_empty() {
self.move_to_batch();
self.pending_batch
.flush(self.client.clone(), self.topic.clone(), &mut inflight);
}
inflight.join_all().await;
}

/// The main loop of the batch worker.
///
/// This method concurrently handles the following events:
Expand Down Expand Up @@ -288,19 +300,17 @@ impl BatchWorker {
},
Some(ToBatchWorker::Flush(tx)) => {
// Send all the batches concurrently.
while !self.pending_batch.is_empty() || !self.pending_msgs.is_empty() {
self.move_to_batch();
self.pending_batch.flush(self.client.clone(), self.topic.clone(), &mut inflight);
}
inflight.join_all().await;
self.flush_concurrent(inflight).await;
inflight = JoinSet::new();
let _ = tx.send(());
},
Some(ToBatchWorker::ResumePublish()) => {
// Nothing to resume as we do not pause without ordering key.
}
None => {
// TODO(#4012): Add shutdown procedure for BatchWorker.
// This isn't guaranteed to execute if a user does not .await on the
// corresponding PublishHandles.
self.flush_concurrent(inflight).await;
break;
}
}
Expand Down Expand Up @@ -347,7 +357,8 @@ impl BatchWorker {
self.paused = false;
}
None => {
// TODO(#4012): Add shutdown procedure for BatchWorker.
// There should be no pending messages and messages in the pending batch as
// it was already handled when this was paused.
break;
}
}
Expand All @@ -373,20 +384,17 @@ impl BatchWorker {
}
},
Some(ToBatchWorker::Flush(tx)) => {
// Send batches sequentially.
self.handle_inflight_join(inflight.join_next().await);
while !self.pending_batch.is_empty() || !self.pending_msgs.is_empty() {
self.move_to_batch();
self.pending_batch.flush(self.client.clone(), self.topic.clone(), &mut inflight);
self.handle_inflight_join(inflight.join_next().await);
}
self.flush_sequential(inflight).await;
inflight = JoinSet::new();
let _ = tx.send(());
},
Some(ToBatchWorker::ResumePublish()) => {
// Nothing to resume as we are not paused.
},
None => {
// TODO(#4012): Add shutdown procedure for BatchWorker.
// This isn't guaranteed to execute if a user does not .await on the
// corresponding PublishHandles.
self.flush_sequential(inflight).await;
break;
}
}
Expand Down
Loading