From 1eb382755cbd61bcf5f57c148b7c292a5a32d194 Mon Sep 17 00:00:00 2001 From: Phong Chuong Date: Tue, 27 Jan 2026 18:22:57 +0000 Subject: [PATCH 1/4] refactor(pubsub): move shutdown logic into BatchWorker --- src/pubsub/src/publisher/publisher.rs | 12 ++++- src/pubsub/src/publisher/worker.rs | 63 +++++++++++++++------------ 2 files changed, 46 insertions(+), 29 deletions(-) diff --git a/src/pubsub/src/publisher/publisher.rs b/src/pubsub/src/publisher/publisher.rs index e721259928..edee7503d9 100644 --- a/src/pubsub/src/publisher/publisher.rs +++ b/src/pubsub/src/publisher/publisher.rs @@ -413,7 +413,9 @@ mod tests { // If we hold on to the handles returned from the publisher, it should // be safe to drop the publisher and .await on the handles. let mut mock = MockGapicPublisher::new(); - mock.expect_publish().return_once(publish_ok); + // mock.expect_publish().returning(publish_ok); + mock.expect_publish().times(2).returning(publish_ok); + let client = GapicPublisher::from_stub(mock); let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()) .set_message_count_threshold(1000_u32) @@ -424,13 +426,19 @@ mod tests { let messages = [ PubsubMessage::new().set_data("hello"), PubsubMessage::new().set_data("world"), + PubsubMessage::new() + .set_data("hello") + .set_ordering_key("key"), + PubsubMessage::new() + .set_data("world") + .set_ordering_key("key"), ]; let mut handles = Vec::new(); for msg in messages { let handle = publisher.publish(msg.clone()); handles.push((msg, handle)); } - drop(publisher); // This should trigger the batch to send, no delay. + drop(publisher); // This should trigger the publisher to send all pending messages. for (id, rx) in handles.into_iter() { let got = rx.await.expect("expected message id"); diff --git a/src/pubsub/src/publisher/worker.rs b/src/pubsub/src/publisher/worker.rs index fcb2e46925..4f8f784da6 100644 --- a/src/pubsub/src/publisher/worker.rs +++ b/src/pubsub/src/publisher/worker.rs @@ -164,18 +164,8 @@ impl Worker { } } None => { - // The sender has been dropped send batch and stop running. - // This isn't guaranteed to execute if a user does not .await on the - // corresponding PublishHandles for the batch and the program ends. - let mut flush_set = JoinSet::new(); - for (_, batch_worker) in batch_workers.iter_mut() { - let (tx, rx) = oneshot::channel(); - batch_worker - .send(ToBatchWorker::Flush(tx)) - .expect(batch_worker_error_msg); - flush_set.spawn(rx); - } - flush_set.join_all().await; + // By dropping the BatchWorker Senders, they will individually handle the + // shutdown procedures. break; } } @@ -247,6 +237,29 @@ impl BatchWorker { } } + // Flush the pending batch and pending messages by sending and awaiting Publish + // sequentially. + async fn flush_sequential(&mut self, mut inflight: JoinSet>) { + self.handle_inflight_join(inflight.join_next().await); + while !self.pending_batch.is_empty() || !self.pending_msgs.is_empty() { + self.move_to_batch(); + self.pending_batch + .flush(self.client.clone(), self.topic.clone(), &mut inflight); + self.handle_inflight_join(inflight.join_next().await); + } + } + + // Flush the pending batch and pending messages by sending Publish + // concurrent. + async fn flush_concurrent(&mut self, mut inflight: JoinSet>) { + while !self.pending_batch.is_empty() || !self.pending_msgs.is_empty() { + self.move_to_batch(); + self.pending_batch + .flush(self.client.clone(), self.topic.clone(), &mut inflight); + } + inflight.join_all().await; + } + /// The main loop of the batch worker. /// /// This method concurrently handles the following events: @@ -288,11 +301,7 @@ impl BatchWorker { }, Some(ToBatchWorker::Flush(tx)) => { // Send all the batches concurrently. - while !self.pending_batch.is_empty() || !self.pending_msgs.is_empty() { - self.move_to_batch(); - self.pending_batch.flush(self.client.clone(), self.topic.clone(), &mut inflight); - } - inflight.join_all().await; + self.flush_concurrent(inflight).await; inflight = JoinSet::new(); let _ = tx.send(()); }, @@ -300,7 +309,9 @@ impl BatchWorker { // Nothing to resume as we do not pause without ordering key. } None => { - // TODO(#4012): Add shutdown procedure for BatchWorker. + // This isn't guaranteed to execute if a user does not .await on the + // corresponding PublishHandles for the batch and the program ends. + self.flush_concurrent(inflight).await; break; } } @@ -347,7 +358,8 @@ impl BatchWorker { self.paused = false; } None => { - // TODO(#4012): Add shutdown procedure for BatchWorker. + // There should be no pending messages and messages in the pending batch as + // it was already handled when this was paused. break; } } @@ -373,20 +385,17 @@ impl BatchWorker { } }, Some(ToBatchWorker::Flush(tx)) => { - // Send batches sequentially. - self.handle_inflight_join(inflight.join_next().await); - while !self.pending_batch.is_empty() || !self.pending_msgs.is_empty() { - self.move_to_batch(); - self.pending_batch.flush(self.client.clone(), self.topic.clone(), &mut inflight); - self.handle_inflight_join(inflight.join_next().await); - } + self.flush_sequential(inflight).await; + inflight = JoinSet::new(); let _ = tx.send(()); }, Some(ToBatchWorker::ResumePublish()) => { // Nothing to resume as we are not paused. }, None => { - // TODO(#4012): Add shutdown procedure for BatchWorker. + // This isn't guaranteed to execute if a user does not .await on the + // corresponding PublishHandles for the batch and the program ends. + self.flush_sequential(inflight).await; break; } } From a20e5915a24017c5eaff1d7de1841a36489b172a Mon Sep 17 00:00:00 2001 From: Phong Chuong Date: Tue, 27 Jan 2026 18:24:25 +0000 Subject: [PATCH 2/4] fix comment --- src/pubsub/src/publisher/publisher.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/pubsub/src/publisher/publisher.rs b/src/pubsub/src/publisher/publisher.rs index edee7503d9..2fe699ee41 100644 --- a/src/pubsub/src/publisher/publisher.rs +++ b/src/pubsub/src/publisher/publisher.rs @@ -413,7 +413,6 @@ mod tests { // If we hold on to the handles returned from the publisher, it should // be safe to drop the publisher and .await on the handles. let mut mock = MockGapicPublisher::new(); - // mock.expect_publish().returning(publish_ok); mock.expect_publish().times(2).returning(publish_ok); let client = GapicPublisher::from_stub(mock); From afe0c60d2e025642a8599f0d60bffc0677e6df46 Mon Sep 17 00:00:00 2001 From: Phong Chuong Date: Tue, 27 Jan 2026 18:29:43 +0000 Subject: [PATCH 3/4] fix comments --- src/pubsub/src/publisher/worker.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pubsub/src/publisher/worker.rs b/src/pubsub/src/publisher/worker.rs index 4f8f784da6..f10ba5e7a1 100644 --- a/src/pubsub/src/publisher/worker.rs +++ b/src/pubsub/src/publisher/worker.rs @@ -310,7 +310,7 @@ impl BatchWorker { } None => { // This isn't guaranteed to execute if a user does not .await on the - // corresponding PublishHandles for the batch and the program ends. + // corresponding PublishHandles. self.flush_concurrent(inflight).await; break; } @@ -394,7 +394,7 @@ impl BatchWorker { }, None => { // This isn't guaranteed to execute if a user does not .await on the - // corresponding PublishHandles for the batch and the program ends. + // corresponding PublishHandles. self.flush_sequential(inflight).await; break; } From b6368c5e2478697dd8e953f34e93921e61d262f6 Mon Sep 17 00:00:00 2001 From: Phong Chuong <147636638+PhongChuong@users.noreply.github.com> Date: Tue, 27 Jan 2026 20:07:11 -0500 Subject: [PATCH 4/4] Apply suggestions from code review Co-authored-by: Suzy Mueller --- src/pubsub/src/publisher/worker.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/pubsub/src/publisher/worker.rs b/src/pubsub/src/publisher/worker.rs index f10ba5e7a1..801e0cbbc3 100644 --- a/src/pubsub/src/publisher/worker.rs +++ b/src/pubsub/src/publisher/worker.rs @@ -237,8 +237,7 @@ impl BatchWorker { } } - // Flush the pending batch and pending messages by sending and awaiting Publish - // sequentially. + // Flush the pending messages by sending the messages in sequential batches. async fn flush_sequential(&mut self, mut inflight: JoinSet>) { self.handle_inflight_join(inflight.join_next().await); while !self.pending_batch.is_empty() || !self.pending_msgs.is_empty() { @@ -249,8 +248,8 @@ impl BatchWorker { } } - // Flush the pending batch and pending messages by sending Publish - // concurrent. + // Flush the pending batch and pending messages by sending remaining + // messages in concurrent batches. async fn flush_concurrent(&mut self, mut inflight: JoinSet>) { while !self.pending_batch.is_empty() || !self.pending_msgs.is_empty() { self.move_to_batch();