From 11590db85bd994039d16e0b8544cd6a7508e97f1 Mon Sep 17 00:00:00 2001 From: BlindOP Date: Thu, 28 Aug 2025 18:17:53 +0300 Subject: [PATCH 1/2] Fixed forwarding of StatusChanged event --- .../Reliable/DeduplicatingProducer.cs | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/RabbitMQ.Stream.Client/Reliable/DeduplicatingProducer.cs b/RabbitMQ.Stream.Client/Reliable/DeduplicatingProducer.cs index 3b2c94e0..62a51a52 100644 --- a/RabbitMQ.Stream.Client/Reliable/DeduplicatingProducer.cs +++ b/RabbitMQ.Stream.Client/Reliable/DeduplicatingProducer.cs @@ -39,22 +39,7 @@ public static async Task Create(DeduplicatingProducerConf var x = new DeduplicatingProducer() { _producer = await Producer - .Create( - new ProducerConfig(producerConfig.StreamSystem, producerConfig.Stream) - { - _reference = producerConfig.Reference, - ConfirmationHandler = producerConfig.ConfirmationHandler, - ReconnectStrategy = producerConfig.ReconnectStrategy, - ClientProvidedName = producerConfig.ClientProvidedName, - SuperStreamConfig = producerConfig.SuperStreamConfig, - MaxInFlight = producerConfig.MaxInFlight, - MessagesBufferSize = producerConfig.MessagesBufferSize, - TimeoutMessageAfter = producerConfig.TimeoutMessageAfter, - Filter = producerConfig.Filter, - Identifier = producerConfig.Identifier, - ResourceAvailableReconnectStrategy = producerConfig.ResourceAvailableReconnectStrategy, - - }, logger) + .Create(producerConfig, logger) .ConfigureAwait(false) }; return x; From 84994892c97317daffb33a7bab7c9b4d05e90c61 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 1 Sep 2025 11:52:03 +0200 Subject: [PATCH 2/2] add tests Signed-off-by: Gabriele Santomaggio --- Tests/DeduplicationProducerTests.cs | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/Tests/DeduplicationProducerTests.cs b/Tests/DeduplicationProducerTests.cs index 9d5c8168..763e6fb0 100644 --- a/Tests/DeduplicationProducerTests.cs +++ b/Tests/DeduplicationProducerTests.cs @@ -3,6 +3,7 @@ // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. using System; +using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -80,16 +81,22 @@ public async Task DeduplicationInActionSendingTheSameIdMessagesWontStore() SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); var testPassed = new TaskCompletionSource(); const ulong TotalMessages = 1000UL; - var p = await DeduplicatingProducer.Create( - new DeduplicatingProducerConfig(system, stream, "my_producer_reference") + var dupConfig = new DeduplicatingProducerConfig(system, stream, "my_producer_reference") + { + ConfirmationHandler = async confirmation => { - ConfirmationHandler = async confirmation => - { - if (confirmation.PublishingId == TotalMessages) - testPassed.SetResult(TotalMessages); - await Task.CompletedTask; - }, - }); + if (confirmation.PublishingId == TotalMessages) + testPassed.SetResult(TotalMessages); + await Task.CompletedTask; + }, + }; + var statusInfoReceived = new List(); + dupConfig.StatusChanged += (status) => { statusInfoReceived.Add(status); }; + + var p = await DeduplicatingProducer.Create(dupConfig); + + Assert.Equal(ReliableEntityStatus.Initialization, statusInfoReceived[0].From); + Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[0].To); // first send and the messages are stored for (ulong i = 1; i <= TotalMessages; i++) {