Skip to content

Commit 433f337

Browse files
committed
feat: allow multiple OnStarted and OnStopping callback
1 parent 1bc8d2c commit 433f337

File tree

2 files changed

+68
-11
lines changed

2 files changed

+68
-11
lines changed

src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,13 @@ public IClusterConfigurationBuilder AddConsumer(Action<IConsumerConfigurationBui
9898

9999
public IClusterConfigurationBuilder OnStopping(Action<IDependencyResolver> handler)
100100
{
101-
_onStoppingHandler = handler;
101+
_onStoppingHandler += handler;
102102
return this;
103103
}
104104

105105
public IClusterConfigurationBuilder OnStarted(Action<IDependencyResolver> handler)
106106
{
107-
_onStartedHandler = handler;
107+
_onStartedHandler += handler;
108108
return this;
109109
}
110110

tests/KafkaFlow.IntegrationTests/GlobalEventsTest.cs

Lines changed: 66 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public class GlobalEventsTest
2626
private readonly Fixture _fixture = new();
2727
private string _topic;
2828
private bool _isPartitionAssigned;
29+
private IKafkaBus _bus;
2930

3031
[TestInitialize]
3132
public void Setup()
@@ -35,6 +36,52 @@ public void Setup()
3536
MessageStorage.Clear();
3637
}
3738

39+
[TestMethod]
40+
public async Task OnStarted_RegisterMultipleOnStartedCallbacks_AllAreCalled()
41+
{
42+
// Arrange
43+
const int ExpectedOnStartedCount = 2;
44+
var countOnStarted = 0;
45+
46+
// Act
47+
await this.GetServiceProviderAsync(
48+
observers => { },
49+
this.ConfigureConsumer<GzipMiddleware>,
50+
this.ConfigureProducer<ProtobufNetSerializer>,
51+
cluster =>
52+
{
53+
cluster.OnStarted(_ => countOnStarted++);
54+
cluster.OnStarted(_ => countOnStarted++);
55+
});
56+
57+
// Assert
58+
Assert.AreEqual(ExpectedOnStartedCount, countOnStarted);
59+
}
60+
61+
[TestMethod]
62+
public async Task OnStopping_RegisterMultipleOnStoppingCallbacks_AllAreCalled()
63+
{
64+
// Arrange
65+
const int ExpectedOnStoppingCount = 2;
66+
var countOnStopping = 0;
67+
68+
// Act
69+
await this.GetServiceProviderAsync(
70+
observers => { },
71+
this.ConfigureConsumer<GzipMiddleware>,
72+
this.ConfigureProducer<ProtobufNetSerializer>,
73+
cluster =>
74+
{
75+
cluster.OnStopping(_ => countOnStopping++);
76+
cluster.OnStopping(_ => countOnStopping++);
77+
});
78+
79+
await _bus?.StopAsync();
80+
81+
// Assert
82+
Assert.AreEqual(ExpectedOnStoppingCount, countOnStopping);
83+
}
84+
3885
[TestMethod]
3986
public async Task SubscribeGlobalEvents_AllEvents_TriggeredCorrectly()
4087
{
@@ -241,10 +288,25 @@ private void ConfigureProducer<T>(IProducerConfigurationBuilder producerConfigur
241288
private async Task<IServiceProvider> GetServiceProviderAsync(
242289
Action<IGlobalEvents> configureGlobalEvents,
243290
Action<IConsumerConfigurationBuilder> consumerConfiguration,
244-
Action<IProducerConfigurationBuilder> producerConfiguration)
291+
Action<IProducerConfigurationBuilder> producerConfiguration,
292+
Action<IClusterConfigurationBuilder> builderConfiguration = null)
245293
{
246294
_isPartitionAssigned = false;
247295

296+
var clusterBuilderAction = (HostBuilderContext context, IClusterConfigurationBuilder cluster) =>
297+
{
298+
cluster
299+
.WithBrokers(context.Configuration.GetValue<string>("Kafka:Brokers").Split(';'))
300+
.CreateTopicIfNotExists(_topic, 1, 1)
301+
.AddProducer<JsonProducer2>(producerConfiguration)
302+
.AddConsumer(consumerConfiguration);
303+
};
304+
305+
clusterBuilderAction += (_, cluster) =>
306+
{
307+
builderConfiguration?.Invoke(cluster);
308+
};
309+
248310
var builder = Host
249311
.CreateDefaultBuilder()
250312
.ConfigureAppConfiguration(
@@ -262,12 +324,7 @@ private async Task<IServiceProvider> GetServiceProviderAsync(
262324
services.AddKafka(
263325
kafka => kafka
264326
.UseLogHandler<TraceLogHandler>()
265-
.AddCluster(
266-
cluster => cluster
267-
.WithBrokers(context.Configuration.GetValue<string>("Kafka:Brokers").Split(';'))
268-
.CreateTopicIfNotExists(_topic, 1, 1)
269-
.AddProducer<JsonProducer2>(producerConfiguration)
270-
.AddConsumer(consumerConfiguration))
327+
.AddCluster(cluster => { clusterBuilderAction.Invoke(context, cluster); })
271328
.SubscribeGlobalEvents(configureGlobalEvents)))
272329
.UseDefaultServiceProvider(
273330
(_, options) =>
@@ -277,8 +334,8 @@ private async Task<IServiceProvider> GetServiceProviderAsync(
277334
});
278335

279336
var host = builder.Build();
280-
var bus = host.Services.CreateKafkaBus();
281-
bus.StartAsync().GetAwaiter().GetResult();
337+
_bus = host.Services.CreateKafkaBus();
338+
_bus.StartAsync().GetAwaiter().GetResult();
282339

283340
await this.WaitForPartitionAssignmentAsync();
284341

0 commit comments

Comments
 (0)