diff --git a/RabbitMQ.Stream.Client/AddressResolver.cs b/RabbitMQ.Stream.Client/AddressResolver.cs index bee715a2..4889d0b4 100644 --- a/RabbitMQ.Stream.Client/AddressResolver.cs +++ b/RabbitMQ.Stream.Client/AddressResolver.cs @@ -6,7 +6,7 @@ namespace RabbitMQ.Stream.Client { - public class AddressResolver + public class AddressResolver : IAddressResolver { public AddressResolver(EndPoint endPoint) { @@ -16,5 +16,6 @@ public AddressResolver(EndPoint endPoint) public EndPoint EndPoint { get; set; } public bool Enabled { get; set; } + public EndPoint Resolve(string address, int host) => EndPoint; } } diff --git a/RabbitMQ.Stream.Client/AddressResolverDynamic.cs b/RabbitMQ.Stream.Client/AddressResolverDynamic.cs new file mode 100644 index 00000000..da36f44c --- /dev/null +++ b/RabbitMQ.Stream.Client/AddressResolverDynamic.cs @@ -0,0 +1,22 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. + +using System; +using System.Net; + +namespace RabbitMQ.Stream.Client; + +public class AddressResolverDynamic : IAddressResolver +{ + private readonly Func _resolveFunction; + + public AddressResolverDynamic(Func resolveFunction) + { + _resolveFunction = resolveFunction; + Enabled = true; + } + + public bool Enabled { get; set; } + public EndPoint Resolve(string address, int host) => _resolveFunction(address, host); +} diff --git a/RabbitMQ.Stream.Client/Client.cs b/RabbitMQ.Stream.Client/Client.cs index 7402c3f1..7aa0539f 100644 --- a/RabbitMQ.Stream.Client/Client.cs +++ b/RabbitMQ.Stream.Client/Client.cs @@ -72,7 +72,7 @@ public string ClientProvidedName /// public SslOption Ssl { get; set; } = new SslOption(); - public AddressResolver AddressResolver { get; set; } = null; + public IAddressResolver AddressResolver { get; set; } = null; public AuthMechanism AuthMechanism { get; set; } = AuthMechanism.Plain; diff --git a/RabbitMQ.Stream.Client/IAddressResolver.cs b/RabbitMQ.Stream.Client/IAddressResolver.cs new file mode 100644 index 00000000..d15d82b1 --- /dev/null +++ b/RabbitMQ.Stream.Client/IAddressResolver.cs @@ -0,0 +1,13 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. + +using System.Net; + +namespace RabbitMQ.Stream.Client; + +public interface IAddressResolver +{ + public bool Enabled { get; } + public EndPoint Resolve(string address, int host); +} diff --git a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt index 53c35247..48a735c0 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt @@ -79,12 +79,6 @@ override RabbitMQ.Stream.Client.Reliable.Producer.Close() -> System.Threading.Ta override RabbitMQ.Stream.Client.Reliable.Producer.ToString() -> string RabbitMQ.Stream.Client.AbstractEntity RabbitMQ.Stream.Client.AbstractEntity.AbstractEntity() -> void -RabbitMQ.Stream.Client.AddressResolver -RabbitMQ.Stream.Client.AddressResolver.AddressResolver(System.Net.EndPoint endPoint) -> void -RabbitMQ.Stream.Client.AddressResolver.Enabled.get -> bool -RabbitMQ.Stream.Client.AddressResolver.Enabled.set -> void -RabbitMQ.Stream.Client.AddressResolver.EndPoint.get -> System.Net.EndPoint -RabbitMQ.Stream.Client.AddressResolver.EndPoint.set -> void RabbitMQ.Stream.Client.AMQP.AmqpParseException RabbitMQ.Stream.Client.AMQP.AmqpParseException.AmqpParseException(string s) -> void RabbitMQ.Stream.Client.AMQP.AmqpWireFormatting @@ -187,8 +181,6 @@ RabbitMQ.Stream.Client.Client.StoreOffset(string reference, string stream, ulong RabbitMQ.Stream.Client.Client.StreamExists(string stream) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Client.Subscribe(RabbitMQ.Stream.Client.RawConsumerConfig config, ushort initialCredit, System.Collections.Generic.Dictionary properties, System.Func deliverHandler, System.Func> consumerUpdateHandler) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)> RabbitMQ.Stream.Client.ClientParameters -RabbitMQ.Stream.Client.ClientParameters.AddressResolver.get -> RabbitMQ.Stream.Client.AddressResolver -RabbitMQ.Stream.Client.ClientParameters.AddressResolver.set -> void RabbitMQ.Stream.Client.ClientParameters.ClientProvidedName.get -> string RabbitMQ.Stream.Client.ClientParameters.ClientProvidedName.set -> void RabbitMQ.Stream.Client.ClientParameters.Endpoint.get -> System.Net.EndPoint @@ -724,7 +716,7 @@ RabbitMQ.Stream.Client.StreamSystem.QueryPartition(string superStream) -> System RabbitMQ.Stream.Client.StreamSystem.QuerySequence(string reference, string stream) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.StreamExists(string stream) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystemConfig -RabbitMQ.Stream.Client.StreamSystemConfig.AddressResolver.get -> RabbitMQ.Stream.Client.AddressResolver +RabbitMQ.Stream.Client.StreamSystemConfig.AddressResolver.get -> RabbitMQ.Stream.Client.IAddressResolver RabbitMQ.Stream.Client.StreamSystemConfig.AddressResolver.set -> void RabbitMQ.Stream.Client.StreamSystemConfig.ClientProvidedName.get -> string RabbitMQ.Stream.Client.StreamSystemConfig.ClientProvidedName.set -> void diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index f5cc5948..ef3e41e5 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -22,6 +22,18 @@ RabbitMQ.Stream.Client.AbstractEntity.Shutdown(RabbitMQ.Stream.Client.EntityComm RabbitMQ.Stream.Client.AbstractEntity.ThrowIfClosed() -> void RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken RabbitMQ.Stream.Client.AbstractEntity.UpdateStatusToClosed() -> void +RabbitMQ.Stream.Client.AddressResolver +RabbitMQ.Stream.Client.AddressResolver.AddressResolver(System.Net.EndPoint endPoint) -> void +RabbitMQ.Stream.Client.AddressResolver.Enabled.get -> bool +RabbitMQ.Stream.Client.AddressResolver.Enabled.set -> void +RabbitMQ.Stream.Client.AddressResolver.EndPoint.get -> System.Net.EndPoint +RabbitMQ.Stream.Client.AddressResolver.EndPoint.set -> void +RabbitMQ.Stream.Client.AddressResolver.Resolve(string address, int host) -> System.Net.EndPoint +RabbitMQ.Stream.Client.AddressResolverDynamic +RabbitMQ.Stream.Client.AddressResolverDynamic.AddressResolverDynamic(System.Func resolveFunction) -> void +RabbitMQ.Stream.Client.AddressResolverDynamic.Enabled.get -> bool +RabbitMQ.Stream.Client.AddressResolverDynamic.Enabled.set -> void +RabbitMQ.Stream.Client.AddressResolverDynamic.Resolve(string address, int host) -> System.Net.EndPoint RabbitMQ.Stream.Client.AlreadyClosedException RabbitMQ.Stream.Client.AlreadyClosedException.AlreadyClosedException(string s) -> void RabbitMQ.Stream.Client.AuthMechanism @@ -53,6 +65,8 @@ RabbitMQ.Stream.Client.Client.Subscribe(string stream, RabbitMQ.Stream.Client.IO RabbitMQ.Stream.Client.Client.SuperStreamExists(string stream) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Client.Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Client.UpdateSecret(string newSecret) -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.ClientParameters.AddressResolver.get -> RabbitMQ.Stream.Client.IAddressResolver +RabbitMQ.Stream.Client.ClientParameters.AddressResolver.set -> void RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.set -> void RabbitMQ.Stream.Client.ClientParameters.MetadataUpdateHandler @@ -147,6 +161,9 @@ RabbitMQ.Stream.Client.FlowControl.Strategy.get -> RabbitMQ.Stream.Client.Consum RabbitMQ.Stream.Client.FlowControl.Strategy.set -> void RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List partitions) -> System.Threading.Tasks.Task> RabbitMQ.Stream.Client.HeartBeatHandler.HeartBeatHandler(System.Func> sendHeartbeatFunc, System.Func> close, int heartbeat, Microsoft.Extensions.Logging.ILogger logger = null) -> void +RabbitMQ.Stream.Client.IAddressResolver +RabbitMQ.Stream.Client.IAddressResolver.Enabled.get -> bool +RabbitMQ.Stream.Client.IAddressResolver.Resolve(string address, int host) -> System.Net.EndPoint RabbitMQ.Stream.Client.IClient.ClientId.get -> string RabbitMQ.Stream.Client.IClient.ClientId.init -> void RabbitMQ.Stream.Client.IClient.Consumers.get -> System.Collections.Generic.IDictionary diff --git a/RabbitMQ.Stream.Client/RoutingClient.cs b/RabbitMQ.Stream.Client/RoutingClient.cs index dd131b3f..3e53c625 100644 --- a/RabbitMQ.Stream.Client/RoutingClient.cs +++ b/RabbitMQ.Stream.Client/RoutingClient.cs @@ -80,7 +80,7 @@ clientParameters with // here it means that there is a AddressResolver configuration // so there is a load-balancer or proxy we need to get the right connection // as first we try with the first node given from the LB - var endPoint = clientParameters.AddressResolver.EndPoint; + var endPoint = clientParameters.AddressResolver.Resolve(broker.Host, (int)broker.Port); var client = await routing .CreateClient( clientParameters with diff --git a/RabbitMQ.Stream.Client/StreamSystem.cs b/RabbitMQ.Stream.Client/StreamSystem.cs index 77748720..4d7a94a4 100644 --- a/RabbitMQ.Stream.Client/StreamSystem.cs +++ b/RabbitMQ.Stream.Client/StreamSystem.cs @@ -40,7 +40,7 @@ internal void Validate() public IList Endpoints { get; set; } = new List { new IPEndPoint(IPAddress.Loopback, 5552) }; - public AddressResolver AddressResolver { get; set; } + public IAddressResolver AddressResolver { get; set; } public string ClientProvidedName { get; set; } = "dotnet-stream-locator"; public AuthMechanism AuthMechanism { get; set; } = AuthMechanism.Plain;