Skip to content

Commit 57872a1

Browse files
authored
Merge branch 'master' into dependabot/npm_and_yarn/src/KafkaFlow.Admin.Dashboard/ClientApp/webpack-dev-middleware-and-angular-builders/custom-webpack-and-angular-devkit/build-angular-6.1.2
2 parents 5621632 + 3a801a3 commit 57872a1

File tree

23 files changed

+137
-76
lines changed

23 files changed

+137
-76
lines changed

Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
init_broker:
44
@echo command | date
55
@echo Initializing Kafka broker
6-
docker-compose -f docker-compose.yml up -d
6+
docker compose -f docker-compose.yml up -d
77

88
shutdown_broker:
99
@echo command | date
1010
@echo Shutting down kafka broker
11-
docker-compose -f docker-compose.yml down
11+
docker compose -f docker-compose.yml down
1212

1313
restore:
1414
dotnet restore KafkaFlow.sln
@@ -26,5 +26,5 @@ integration_tests:
2626
@echo command | date
2727
make init_broker
2828
@echo Running integration tests
29-
dotnet test tests/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj -c Release --framework netcoreapp3.1 --logger "console;verbosity=detailed"
29+
dotnet test tests/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj -c Release --logger "console;verbosity=detailed"
3030
make shutdown_broker

docker-compose.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
version: '3'
2-
31
services:
42
zookeeper:
53
image: confluentinc/cp-zookeeper:7.2.1

src/KafkaFlow.LogHandler.Console/KafkaFlow.LogHandler.Console.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
</ItemGroup>
1111

1212
<ItemGroup>
13-
<PackageReference Include="System.Text.Json" Version="6.0.8" />
13+
<PackageReference Include="System.Text.Json" Version="6.0.11" />
1414
</ItemGroup>
1515

1616
</Project>

src/KafkaFlow.LogHandler.Microsoft/KafkaFlow.LogHandler.Microsoft.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
</ItemGroup>
1313

1414
<ItemGroup>
15-
<PackageReference Include="System.Text.Json" Version="6.0.8" />
15+
<PackageReference Include="System.Text.Json" Version="6.0.11" />
1616
<PackageReference Include="Microsoft.Extensions.Logging" Version="6.0.0" />
1717
</ItemGroup>
1818

src/KafkaFlow.Serializer.JsonCore/KafkaFlow.Serializer.JsonCore.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
</PropertyGroup>
99

1010
<ItemGroup>
11-
<PackageReference Include="System.Text.Json" Version="6.0.8" />
11+
<PackageReference Include="System.Text.Json" Version="6.0.11" />
1212
</ItemGroup>
1313

1414
<ItemGroup>

src/KafkaFlow/Batching/BatchConsumeMiddleware.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public BatchConsumeMiddleware(
3838

3939
public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
4040
{
41-
await _dispatchSemaphore.WaitAsync();
41+
await _dispatchSemaphore.WaitAsync().ConfigureAwait(false);
4242

4343
try
4444
{
@@ -59,7 +59,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
5959

6060
if (_batch.Count >= _batchSize)
6161
{
62-
await this.TriggerDispatchAndWaitAsync();
62+
await this.TriggerDispatchAndWaitAsync().ConfigureAwait(false);
6363
}
6464
}
6565

@@ -72,11 +72,11 @@ public void Dispose()
7272

7373
private async Task TriggerDispatchAndWaitAsync()
7474
{
75-
await _dispatchSemaphore.WaitAsync();
75+
await _dispatchSemaphore.WaitAsync().ConfigureAwait(false);
7676
_dispatchTokenSource?.Cancel();
7777
_dispatchSemaphore.Release();
7878

79-
await (_dispatchTask ?? Task.CompletedTask);
79+
await (_dispatchTask ?? Task.CompletedTask).ConfigureAwait(false);
8080
}
8181

8282
private void ScheduleExecution(IMessageContext context, MiddlewareDelegate next)
@@ -92,7 +92,7 @@ private void ScheduleExecution(IMessageContext context, MiddlewareDelegate next)
9292

9393
private async Task DispatchAsync(IMessageContext context, MiddlewareDelegate next)
9494
{
95-
await _dispatchSemaphore.WaitAsync();
95+
await _dispatchSemaphore.WaitAsync().ConfigureAwait(false);
9696

9797
_dispatchTokenSource.Dispose();
9898
_dispatchTokenSource = null;

src/KafkaFlow/Clusters/ClusterManager.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public async Task<IEnumerable<TopicPartitionOffset>> GetConsumerGroupOffsetsAsyn
8585

8686
foreach (var name in topicsName)
8787
{
88-
topicsMetadata.Add((name, await this.GetTopicMetadataAsync(name)));
88+
topicsMetadata.Add((name, await this.GetTopicMetadataAsync(name).ConfigureAwait(false)));
8989
}
9090

9191
var topics =
@@ -98,7 +98,7 @@ public async Task<IEnumerable<TopicPartitionOffset>> GetConsumerGroupOffsetsAsyn
9898
.ToList();
9999

100100
var result = await _lazyAdminClient.Value.ListConsumerGroupOffsetsAsync(
101-
new[] { new ConsumerGroupTopicPartitions(consumerGroup, topics) });
101+
new[] { new ConsumerGroupTopicPartitions(consumerGroup, topics) }).ConfigureAwait(false);
102102

103103
if (!result.Any())
104104
{
@@ -125,7 +125,7 @@ public async Task CreateIfNotExistsAsync(IEnumerable<TopicConfiguration> configu
125125
})
126126
.ToArray();
127127

128-
await _lazyAdminClient.Value.CreateTopicsAsync(topics);
128+
await _lazyAdminClient.Value.CreateTopicsAsync(topics).ConfigureAwait(false);
129129
}
130130
catch (CreateTopicsException exception)
131131
{

src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,13 @@ public IClusterConfigurationBuilder AddConsumer(Action<IConsumerConfigurationBui
9898

9999
public IClusterConfigurationBuilder OnStopping(Action<IDependencyResolver> handler)
100100
{
101-
_onStoppingHandler = handler;
101+
_onStoppingHandler += handler;
102102
return this;
103103
}
104104

105105
public IClusterConfigurationBuilder OnStarted(Action<IDependencyResolver> handler)
106106
{
107-
_onStartedHandler = handler;
107+
_onStartedHandler += handler;
108108
return this;
109109
}
110110

src/KafkaFlow/Consumers/Consumer.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ public async ValueTask<ConsumeResult<byte[], byte[]>> ConsumeAsync(CancellationT
163163
try
164164
{
165165
this.EnsureConsumer();
166-
await _flowManager.BlockHeartbeat(cancellationToken);
166+
await _flowManager.BlockHeartbeat(cancellationToken).ConfigureAwait(false);
167167
return _consumer.Consume(cancellationToken);
168168
}
169169
catch (OperationCanceledException)
@@ -176,7 +176,7 @@ public async ValueTask<ConsumeResult<byte[], byte[]>> ConsumeAsync(CancellationT
176176
"Max Poll Interval Exceeded",
177177
new { this.Configuration.ConsumerName });
178178

179-
await _maxPollIntervalExceeded.FireAsync();
179+
await _maxPollIntervalExceeded.FireAsync().ConfigureAwait(false);
180180
}
181181
catch (KafkaException ex) when (ex.Error.IsFatal)
182182
{
@@ -187,7 +187,7 @@ public async ValueTask<ConsumeResult<byte[], byte[]>> ConsumeAsync(CancellationT
187187

188188
this.InvalidateConsumer();
189189

190-
await Task.Delay(5000, cancellationToken);
190+
await Task.Delay(5000, cancellationToken).ConfigureAwait(false);
191191
}
192192
catch (Exception ex)
193193
{
@@ -242,11 +242,11 @@ private void EnsureConsumer()
242242
.SetPartitionsRevokedHandler(
243243
(consumer, partitions) =>
244244
{
245+
_partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
245246
this.Assignment = new List<TopicPartition>();
246247
this.Subscription = new List<string>();
247248
_currentPartitionsOffsets.Clear();
248249
_flowManager.Stop();
249-
_partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
250250
})
251251
.SetErrorHandler((consumer, error) => _errorsHandlers.ForEach(x => x(consumer, error)))
252252
.SetStatisticsHandler((consumer, statistics) => _statisticsHandlers.ForEach(x => x(consumer, statistics)));

src/KafkaFlow/Consumers/ConsumerManager.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,14 @@ private void StartEvaluateWorkerCountTimer() => _evaluateWorkersCountTimer?.Chan
7070

7171
private async Task EvaluateWorkersCountAsync()
7272
{
73-
var newWorkersCount = await this.CalculateWorkersCount(this.Consumer.Assignment);
73+
var newWorkersCount = await this.CalculateWorkersCount(this.Consumer.Assignment).ConfigureAwait(false);
7474

7575
if (newWorkersCount == this.WorkerPool.CurrentWorkersCount)
7676
{
7777
return;
7878
}
7979

80-
await this.ChangeWorkersCountAsync(newWorkersCount);
80+
await this.ChangeWorkersCountAsync(newWorkersCount).ConfigureAwait(false);
8181
}
8282

8383
private async Task ChangeWorkersCountAsync(int workersCount)
@@ -86,10 +86,10 @@ private async Task ChangeWorkersCountAsync(int workersCount)
8686
{
8787
this.StopEvaluateWorkerCountTimer();
8888

89-
await this.Feeder.StopAsync();
90-
await this.WorkerPool.StopAsync();
89+
await this.Feeder.StopAsync().ConfigureAwait(false);
90+
await this.WorkerPool.StopAsync().ConfigureAwait(false);
9191

92-
await this.WorkerPool.StartAsync(this.Consumer.Assignment, workersCount);
92+
await this.WorkerPool.StartAsync(this.Consumer.Assignment, workersCount).ConfigureAwait(false);
9393
this.Feeder.Start();
9494

9595
this.StartEvaluateWorkerCountTimer();
@@ -155,7 +155,8 @@ private async Task<int> CalculateWorkersCount(IEnumerable<Confluent.Kafka.TopicP
155155
.Select(x => x.Partition.Value)
156156
.ToList()))
157157
.ToList()),
158-
_dependencyResolver);
158+
_dependencyResolver)
159+
.ConfigureAwait(false);
159160
}
160161
catch (Exception e)
161162
{

0 commit comments

Comments
 (0)