Skip to content

Commit 66b9cab

Browse files
authored
Consumer connected only to the followers (#352)
* Consumer connected only to the followers * Rename method to LookupLeaderOrRandomReplicasConnection --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent c3d8e55 commit 66b9cab

File tree

7 files changed

+88
-31
lines changed

7 files changed

+88
-31
lines changed

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,4 +301,4 @@ static RabbitMQ.Stream.Client.RawSuperStreamProducer.Create(RabbitMQ.Stream.Clie
301301
static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Producer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer>
302302
static RabbitMQ.Stream.Client.Reliable.ReliableBase.RandomWait() -> System.Threading.Tasks.Task
303303
static RabbitMQ.Stream.Client.RoutingHelper<T>.LookupLeaderConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>
304-
static RabbitMQ.Stream.Client.RoutingHelper<T>.LookupRandomConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>
304+
static RabbitMQ.Stream.Client.RoutingHelper<T>.LookupLeaderOrRandomReplicasConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ public static async Task<IConsumer> Create(
202202
)
203203
{
204204
var client = await RoutingHelper<Routing>
205-
.LookupRandomConnection(clientParameters, metaStreamInfo, config.Pool, logger)
205+
.LookupLeaderOrRandomReplicasConnection(clientParameters, metaStreamInfo, config.Pool, logger)
206206
.ConfigureAwait(false);
207207
var consumer = new RawConsumer((Client)client, config, logger);
208208
await consumer.Init().ConfigureAwait(false);

RabbitMQ.Stream.Client/RoutingClient.cs

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -67,23 +67,27 @@ internal static async Task<IClient> LookupConnection(
6767
// In this case we just return the node (leader for producer, random for consumer)
6868
// since there is not load balancer configuration
6969

70-
return await routing.CreateClient(clientParameters with
71-
{
72-
Endpoint = endPointNoLb,
73-
ClientProvidedName = clientParameters.ClientProvidedName
74-
}, broker, logger)
70+
return await routing
71+
.CreateClient(
72+
clientParameters with
73+
{
74+
Endpoint = endPointNoLb,
75+
ClientProvidedName = clientParameters.ClientProvidedName
76+
}, broker, logger)
7577
.ConfigureAwait(false);
7678
}
7779

7880
// here it means that there is a AddressResolver configuration
7981
// so there is a load-balancer or proxy we need to get the right connection
8082
// as first we try with the first node given from the LB
8183
var endPoint = clientParameters.AddressResolver.EndPoint;
82-
var client = await routing.CreateClient(clientParameters with
83-
{
84-
Endpoint = endPoint,
85-
ClientProvidedName = clientParameters.ClientProvidedName
86-
}, broker, logger)
84+
var client = await routing
85+
.CreateClient(
86+
clientParameters with
87+
{
88+
Endpoint = endPoint,
89+
ClientProvidedName = clientParameters.ClientProvidedName
90+
}, broker, logger)
8791
.ConfigureAwait(false);
8892

8993
var advertisedHost = GetPropertyValue(client.ConnectionProperties, "advertised_host");
@@ -95,11 +99,13 @@ internal static async Task<IClient> LookupConnection(
9599
attemptNo++;
96100
await client.Close("advertised_host or advertised_port doesn't match").ConfigureAwait(false);
97101

98-
client = await routing.CreateClient(clientParameters with
99-
{
100-
Endpoint = endPoint,
101-
ClientProvidedName = clientParameters.ClientProvidedName
102-
}, broker, logger)
102+
client = await routing
103+
.CreateClient(
104+
clientParameters with
105+
{
106+
Endpoint = endPoint,
107+
ClientProvidedName = clientParameters.ClientProvidedName
108+
}, broker, logger)
103109
.ConfigureAwait(false);
104110

105111
advertisedHost = GetPropertyValue(client.ConnectionProperties, "advertised_host");
@@ -175,13 +181,20 @@ await LookupConnection(clientParameters, metaDataInfo.Leader, MaxAttempts(metaDa
175181
}
176182

177183
/// <summary>
178-
/// Gets a random connection. The consumer can connect to a replica or leader.
184+
/// Gets a random connection a random replica.
185+
/// If the replicas are not available it will connect to the leader.
179186
/// </summary>
180-
public static async Task<IClient> LookupRandomConnection(ClientParameters clientParameters,
187+
public static async Task<IClient> LookupLeaderOrRandomReplicasConnection(ClientParameters clientParameters,
181188
StreamInfo metaDataInfo, ConnectionsPool pool, ILogger logger = null)
182189
{
183-
var brokers = new List<Broker>() { metaDataInfo.Leader };
190+
var brokers = new List<Broker>();
191+
if (metaDataInfo.Replicas is { Count: <= 0 })
192+
{
193+
brokers.Add(metaDataInfo.Leader);
194+
}
195+
184196
brokers.AddRange(metaDataInfo.Replicas);
197+
185198
var exceptions = new List<Exception>();
186199
var br = brokers.OrderBy(x => Random.Shared.Next()).ToList();
187200

Tests/ConnectionsPoolTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@ public async void RoutingShouldReturnTwoConnectionsGivenOneItemPerConnection()
8080
var metaDataInfo = new StreamInfo("stream", ResponseCode.Ok, new Broker("localhost", 3939),
8181
new List<Broker>());
8282
var pool = new ConnectionsPool(0, 1);
83-
var c1 = await RoutingHelper<PoolRouting>.LookupRandomConnection(clientParameters, metaDataInfo, pool);
83+
var c1 = await RoutingHelper<PoolRouting>.LookupLeaderOrRandomReplicasConnection(clientParameters, metaDataInfo, pool);
8484
c1.Consumers.Add(1, default);
85-
var c2 = await RoutingHelper<PoolRouting>.LookupRandomConnection(clientParameters, metaDataInfo, pool);
85+
var c2 = await RoutingHelper<PoolRouting>.LookupLeaderOrRandomReplicasConnection(clientParameters, metaDataInfo, pool);
8686
c2.Consumers.Add(1, default);
8787
// here we have two different connections
8888
// and must be different since we have only one id per connection

Tests/UnitTests.cs

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public Task<IClient> CreateClient(ClientParameters clientParameters, Broker brok
107107
public bool ValidateDns { get; set; } = false;
108108
}
109109

110-
public class ReplicaRouting : IRouting
110+
public class LeaderRouting : IRouting
111111
{
112112
public Task<IClient> CreateClient(ClientParameters clientParameters, Broker broker, ILogger logger = null)
113113
{
@@ -125,6 +125,25 @@ public Task<IClient> CreateClient(ClientParameters clientParameters, Broker brok
125125
public bool ValidateDns { get; set; } = false;
126126
}
127127

128+
public class ReplicaseRouting : IRouting
129+
{
130+
public Task<IClient> CreateClient(ClientParameters clientParameters, Broker broker, ILogger logger = null)
131+
{
132+
var fake = new FakeClient(clientParameters)
133+
{
134+
ConnectionProperties = new Dictionary<string, string>()
135+
{
136+
137+
["advertised_port"] = "5553",
138+
["advertised_host"] = "replica2"
139+
}
140+
};
141+
return Task.FromResult<IClient>(fake);
142+
}
143+
144+
public bool ValidateDns { get; set; } = false;
145+
}
146+
128147
// This class is only for unit tests
129148
public class UnitTests
130149
{
@@ -141,7 +160,7 @@ public async Task GiveProperExceptionWhenUnableToConnect()
141160
var metaDataInfo = new StreamInfo("stream", ResponseCode.Ok, new Broker("localhost", 3939),
142161
new List<Broker>());
143162
await Assert.ThrowsAsync<AggregateException>(() =>
144-
RoutingHelper<Routing>.LookupRandomConnection(clientParameters, metaDataInfo,
163+
RoutingHelper<Routing>.LookupLeaderOrRandomReplicasConnection(clientParameters, metaDataInfo,
145164
new ConnectionsPool(1, 1)));
146165
}
147166

@@ -214,14 +233,34 @@ public void RandomReplicaLeader()
214233
var metaDataInfo = new StreamInfo("stream", ResponseCode.Ok, new Broker("leader", 5552),
215234
new List<Broker>());
216235
var client =
217-
RoutingHelper<ReplicaRouting>.LookupRandomConnection(clientParameters, metaDataInfo,
236+
RoutingHelper<LeaderRouting>.LookupLeaderOrRandomReplicasConnection(clientParameters, metaDataInfo,
218237
new ConnectionsPool(1, 1));
219238
Assert.Equal("5552", client.Result.ConnectionProperties["advertised_port"]);
220239
var res = (client.Result.ConnectionProperties["advertised_host"] == "leader" ||
221240
client.Result.ConnectionProperties["advertised_host"] == "replica");
222241
Assert.True(res);
223242
}
224243

244+
[Fact]
245+
public void RandomOnlyReplicaIfThereAre()
246+
{
247+
// this test is not completed yet should add also some replicas
248+
var addressResolver = new AddressResolver(new IPEndPoint(IPAddress.Parse("192.168.10.99"), 5552));
249+
var clientParameters = new ClientParameters() { AddressResolver = addressResolver, };
250+
var metaDataInfo = new StreamInfo("stream",
251+
ResponseCode.Ok, new Broker("leader", 5552),
252+
new List<Broker>()
253+
{
254+
new Broker("replica2", 5553),
255+
});
256+
var client =
257+
RoutingHelper<ReplicaseRouting>.LookupLeaderOrRandomReplicasConnection(clientParameters, metaDataInfo,
258+
new ConnectionsPool(1, 1));
259+
Assert.Equal("5553", client.Result.ConnectionProperties["advertised_port"]);
260+
var res = (client.Result.ConnectionProperties["advertised_host"] == "replica2");
261+
Assert.True(res);
262+
}
263+
225264
[Fact]
226265
public void CompressUnCompressShouldHaveTheSize()
227266
{

docs/ReliableClient/Program.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@
88
var rClient = RClient.Start(new RClient.Config()
99
{
1010
ProducersPerConnection = 2,
11-
ConsumersPerConnection = 2,
12-
Host = "Node0",
13-
Port = 5553,
11+
ConsumersPerConnection = 100,
12+
Host = "localhost",
13+
Port = 5552,
1414
LoadBalancer = true,
15-
SuperStream = true,
16-
Streams = 1,
17-
Producers = 1,
15+
SuperStream = false,
16+
Streams = 10,
17+
Producers = 4,
1818
MessagesPerProducer = 50_000_000,
1919
Consumers = 4
2020
// Username = "test",

docs/ReliableClient/RClient.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ public static async Task Start(Config config)
9191
AddressResolver = resolver,
9292
UserName = config.Username,
9393
Password = config.Password,
94+
ConnectionPoolConfig = new ConnectionPoolConfig()
95+
{
96+
ProducersPerConnection = config.ProducersPerConnection,
97+
ConsumersPerConnection = config.ConsumersPerConnection,
98+
},
9499
Endpoints = new List<EndPoint>() {resolver.EndPoint}
95100
};
96101
}

0 commit comments

Comments
 (0)