Skip to content

Commit dd8a4ce

Browse files
authored
Check the entity status during the close (#353)
* Check the entity status when it is closed on the disconnection. When the entity is closed normally, and there is some network problem, the client could receive a closed unexpected even if the entity is closed normally. A double-check helps to handle this edge case. --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 66b9cab commit dd8a4ce

File tree

6 files changed

+41
-29
lines changed

6 files changed

+41
-29
lines changed

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,8 @@ RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void
219219
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
220220
RabbitMQ.Stream.Client.Reliable.ProducerFactory.CreateProducer(bool boot) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IProducer>
221221
RabbitMQ.Stream.Client.Reliable.ProducerFactory._producer -> RabbitMQ.Stream.Client.IProducer
222+
RabbitMQ.Stream.Client.Reliable.ReliableBase.IsClosedNormally() -> bool
223+
RabbitMQ.Stream.Client.Reliable.ReliableBase.IsClosedNormally(string closeReason) -> bool
222224
RabbitMQ.Stream.Client.Reliable.ReliableBase.UpdateStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus newStatus, RabbitMQ.Stream.Client.Reliable.ChangeStatusReason reason, string partition = null) -> void
223225
RabbitMQ.Stream.Client.Reliable.ReliableBase._status -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
224226
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Identifier.get -> string

RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
using System.Collections.Concurrent;
66
using System.Threading.Tasks;
7-
using Microsoft.Extensions.Logging;
87

98
namespace RabbitMQ.Stream.Client.Reliable;
109

@@ -60,18 +59,17 @@ private async Task<IConsumer> StandardConsumer(bool boot)
6059
Identifier = _consumerConfig.Identifier,
6160
ConnectionClosedHandler = async (closeReason) =>
6261
{
63-
if (closeReason == ConnectionClosedReason.Normal)
64-
{
65-
// we don't update the status here since it happens when Close() is called in a normal way
66-
BaseLogger.LogInformation("{Identity} is closed normally", ToString());
62+
if (IsClosedNormally(closeReason))
6763
return;
68-
}
6964

7065
await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
7166
ChangeStatusReason.UnexpectedlyDisconnected).ConfigureAwait(false);
7267
},
7368
MetadataHandler = async _ =>
7469
{
70+
if (IsClosedNormally())
71+
return;
72+
7573
await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
7674
ChangeStatusReason.MetaDataUpdate).ConfigureAwait(false);
7775
},
@@ -128,12 +126,8 @@ private async Task<IConsumer> SuperConsumer(bool boot)
128126
ConnectionClosedHandler = async (closeReason, partitionStream) =>
129127
{
130128
await RandomWait().ConfigureAwait(false);
131-
if (closeReason == ConnectionClosedReason.Normal)
132-
{
133-
// we don't update the status here since it happens when Close() is called in a normal way
134-
BaseLogger.LogInformation("{Identity} is closed normally", ToString());
129+
if (IsClosedNormally(closeReason))
135130
return;
136-
}
137131

138132
var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
139133
await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r,
@@ -143,6 +137,9 @@ await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r,
143137
MetadataHandler = async update =>
144138
{
145139
await RandomWait().ConfigureAwait(false);
140+
if (IsClosedNormally())
141+
return;
142+
146143
var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
147144
await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r,
148145
ChangeStatusReason.MetaDataUpdate)

RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

55
using System.Threading.Tasks;
6-
using Microsoft.Extensions.Logging;
76

87
namespace RabbitMQ.Stream.Client.Reliable;
98

@@ -48,12 +47,8 @@ private async Task<IProducer> SuperStreamProducer(bool boot)
4847
ConnectionClosedHandler = async (closeReason, partitionStream) =>
4948
{
5049
await RandomWait().ConfigureAwait(false);
51-
if (closeReason == ConnectionClosedReason.Normal)
52-
{
53-
BaseLogger.LogDebug("{Identity} is closed normally", ToString());
50+
if (IsClosedNormally(closeReason))
5451
return;
55-
}
56-
5752
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
5853
await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r,
5954
ChangeStatusReason.UnexpectedlyDisconnected)
@@ -62,6 +57,9 @@ await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r,
6257
MetadataHandler = async update =>
6358
{
6459
await RandomWait().ConfigureAwait(false);
60+
if (IsClosedNormally())
61+
return;
62+
6563
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
6664
await OnEntityClosed(_producerConfig.StreamSystem, update.Stream, r,
6765
ChangeStatusReason.MetaDataUpdate)
@@ -105,17 +103,17 @@ private async Task<IProducer> StandardProducer()
105103
MetadataHandler = async _ =>
106104
{
107105
await RandomWait().ConfigureAwait(false);
106+
if (IsClosedNormally())
107+
return;
108+
108109
await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream,
109110
ChangeStatusReason.MetaDataUpdate).ConfigureAwait(false);
110111
},
111112
ConnectionClosedHandler = async (closeReason) =>
112113
{
113114
await RandomWait().ConfigureAwait(false);
114-
if (closeReason == ConnectionClosedReason.Normal)
115-
{
116-
BaseLogger.LogDebug("{Identity} is closed normally", ToString());
115+
if (IsClosedNormally(closeReason))
117116
return;
118-
}
119117

120118
await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream,
121119
ChangeStatusReason.UnexpectedlyDisconnected).ConfigureAwait(false);

RabbitMQ.Stream.Client/Reliable/ReliableBase.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,21 @@ protected static async Task RandomWait()
114114
await Task.Delay(Consts.RandomMid()).ConfigureAwait(false);
115115
}
116116

117+
protected bool IsClosedNormally(string closeReason)
118+
{
119+
if (closeReason != ConnectionClosedReason.Normal && !CompareStatus(ReliableEntityStatus.Closed))
120+
return false;
121+
BaseLogger.LogInformation("{Identity} is closed normally", ToString());
122+
return true;
123+
}
124+
protected bool IsClosedNormally()
125+
{
126+
if (!CompareStatus(ReliableEntityStatus.Closed))
127+
return false;
128+
BaseLogger.LogInformation("{Identity} is closed normally", ToString());
129+
return true;
130+
}
131+
117132
protected void UpdateStatus(ReliableEntityStatus newStatus,
118133
ChangeStatusReason reason, string partition = null)
119134
{

docs/ReliableClient/Program.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,18 @@
77

88
var rClient = RClient.Start(new RClient.Config()
99
{
10-
ProducersPerConnection = 2,
10+
ProducersPerConnection = 100,
1111
ConsumersPerConnection = 100,
12-
Host = "localhost",
12+
Host = "node0",
1313
Port = 5552,
1414
LoadBalancer = true,
1515
SuperStream = false,
16-
Streams = 10,
17-
Producers = 4,
16+
Streams = 3,
17+
Producers = 10,
1818
MessagesPerProducer = 50_000_000,
19-
Consumers = 4
20-
// Username = "test",
21-
// Password = "test"
19+
Consumers = 10,
20+
Username = "test",
21+
Password = "test"
2222
});
2323

2424
await rClient.ConfigureAwait(false);

docs/ReliableClient/RClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ async Task MaybeSend(Producer producer, Message message, ManualResetEvent publis
261261
Properties = new Properties() {MessageId = $"hello{i}"}
262262
};
263263
await MaybeSend(producer, message, publishEvent).ConfigureAwait(false);
264-
await Task.Delay(500).ConfigureAwait(false);
264+
await Task.Delay(20).ConfigureAwait(false);
265265
Interlocked.Increment(ref totalSent);
266266
}
267267
});

0 commit comments

Comments
 (0)