diff --git a/src/pubsub/src/publisher/publisher.rs b/src/pubsub/src/publisher/publisher.rs index e721259928..2fe699ee41 100644 --- a/src/pubsub/src/publisher/publisher.rs +++ b/src/pubsub/src/publisher/publisher.rs @@ -413,7 +413,8 @@ mod tests { // If we hold on to the handles returned from the publisher, it should // be safe to drop the publisher and .await on the handles. let mut mock = MockGapicPublisher::new(); - mock.expect_publish().return_once(publish_ok); + mock.expect_publish().times(2).returning(publish_ok); + let client = GapicPublisher::from_stub(mock); let publisher = PublisherPartialBuilder::new(client, TOPIC.to_string()) .set_message_count_threshold(1000_u32) @@ -424,13 +425,19 @@ mod tests { let messages = [ PubsubMessage::new().set_data("hello"), PubsubMessage::new().set_data("world"), + PubsubMessage::new() + .set_data("hello") + .set_ordering_key("key"), + PubsubMessage::new() + .set_data("world") + .set_ordering_key("key"), ]; let mut handles = Vec::new(); for msg in messages { let handle = publisher.publish(msg.clone()); handles.push((msg, handle)); } - drop(publisher); // This should trigger the batch to send, no delay. + drop(publisher); // This should trigger the publisher to send all pending messages. for (id, rx) in handles.into_iter() { let got = rx.await.expect("expected message id"); diff --git a/src/pubsub/src/publisher/worker.rs b/src/pubsub/src/publisher/worker.rs index fcb2e46925..801e0cbbc3 100644 --- a/src/pubsub/src/publisher/worker.rs +++ b/src/pubsub/src/publisher/worker.rs @@ -164,18 +164,8 @@ impl Worker { } } None => { - // The sender has been dropped send batch and stop running. - // This isn't guaranteed to execute if a user does not .await on the - // corresponding PublishHandles for the batch and the program ends. - let mut flush_set = JoinSet::new(); - for (_, batch_worker) in batch_workers.iter_mut() { - let (tx, rx) = oneshot::channel(); - batch_worker - .send(ToBatchWorker::Flush(tx)) - .expect(batch_worker_error_msg); - flush_set.spawn(rx); - } - flush_set.join_all().await; + // By dropping the BatchWorker Senders, they will individually handle the + // shutdown procedures. break; } } @@ -247,6 +237,28 @@ impl BatchWorker { } } + // Flush the pending messages by sending the messages in sequential batches. + async fn flush_sequential(&mut self, mut inflight: JoinSet>) { + self.handle_inflight_join(inflight.join_next().await); + while !self.pending_batch.is_empty() || !self.pending_msgs.is_empty() { + self.move_to_batch(); + self.pending_batch + .flush(self.client.clone(), self.topic.clone(), &mut inflight); + self.handle_inflight_join(inflight.join_next().await); + } + } + + // Flush the pending batch and pending messages by sending remaining + // messages in concurrent batches. + async fn flush_concurrent(&mut self, mut inflight: JoinSet>) { + while !self.pending_batch.is_empty() || !self.pending_msgs.is_empty() { + self.move_to_batch(); + self.pending_batch + .flush(self.client.clone(), self.topic.clone(), &mut inflight); + } + inflight.join_all().await; + } + /// The main loop of the batch worker. /// /// This method concurrently handles the following events: @@ -288,11 +300,7 @@ impl BatchWorker { }, Some(ToBatchWorker::Flush(tx)) => { // Send all the batches concurrently. - while !self.pending_batch.is_empty() || !self.pending_msgs.is_empty() { - self.move_to_batch(); - self.pending_batch.flush(self.client.clone(), self.topic.clone(), &mut inflight); - } - inflight.join_all().await; + self.flush_concurrent(inflight).await; inflight = JoinSet::new(); let _ = tx.send(()); }, @@ -300,7 +308,9 @@ impl BatchWorker { // Nothing to resume as we do not pause without ordering key. } None => { - // TODO(#4012): Add shutdown procedure for BatchWorker. + // This isn't guaranteed to execute if a user does not .await on the + // corresponding PublishHandles. + self.flush_concurrent(inflight).await; break; } } @@ -347,7 +357,8 @@ impl BatchWorker { self.paused = false; } None => { - // TODO(#4012): Add shutdown procedure for BatchWorker. + // There should be no pending messages and messages in the pending batch as + // it was already handled when this was paused. break; } } @@ -373,20 +384,17 @@ impl BatchWorker { } }, Some(ToBatchWorker::Flush(tx)) => { - // Send batches sequentially. - self.handle_inflight_join(inflight.join_next().await); - while !self.pending_batch.is_empty() || !self.pending_msgs.is_empty() { - self.move_to_batch(); - self.pending_batch.flush(self.client.clone(), self.topic.clone(), &mut inflight); - self.handle_inflight_join(inflight.join_next().await); - } + self.flush_sequential(inflight).await; + inflight = JoinSet::new(); let _ = tx.send(()); }, Some(ToBatchWorker::ResumePublish()) => { // Nothing to resume as we are not paused. }, None => { - // TODO(#4012): Add shutdown procedure for BatchWorker. + // This isn't guaranteed to execute if a user does not .await on the + // corresponding PublishHandles. + self.flush_sequential(inflight).await; break; } }