Skip to content

Commit 02f3865

Browse files
author
Alexander Shuvalov
committed
feat: add librdkafka log handler
1 parent 1b269b8 commit 02f3865

File tree

4 files changed

+49
-3
lines changed

4 files changed

+49
-3
lines changed

src/KafkaFlow/Clusters/ClusterManager.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using Confluent.Kafka.Admin;
88
using KafkaFlow.Authentication;
99
using KafkaFlow.Configuration;
10+
using KafkaFlow.Extensions;
1011

1112
namespace KafkaFlow.Clusters;
1213

@@ -33,7 +34,8 @@ public ClusterManager(ILogHandler logHandler, ClusterConfiguration configuration
3334

3435
config.ReadSecurityInformationFrom(configuration);
3536

36-
var adminClientBuilder = new AdminClientBuilder(config);
37+
var adminClientBuilder = new AdminClientBuilder(config)
38+
.SetLogHandler((_, log) => _logHandler.Log(log));
3739

3840
var security = configuration.GetSecurityInformation();
3941

src/KafkaFlow/Consumers/Consumer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,8 @@ private void EnsureConsumer()
253253
.SetPartitionsAssignedHandler(FirePartitionsAssignedHandlers)
254254
.SetPartitionsRevokedHandler(FirePartitionRevokedHandlers)
255255
.SetErrorHandler((consumer, error) => _errorsHandlers.ForEach(x => x(consumer, error)))
256-
.SetStatisticsHandler((consumer, statistics) => _statisticsHandlers.ForEach(x => x(consumer, statistics)));
256+
.SetStatisticsHandler((consumer, statistics) => _statisticsHandlers.ForEach(x => x(consumer, statistics)))
257+
.SetLogHandler((_, log) => _logHandler.Log(log));
257258

258259
var security = this.Configuration.ClusterConfiguration.GetSecurityInformation();
259260

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
using System;
2+
using Confluent.Kafka;
3+
4+
namespace KafkaFlow.Extensions;
5+
6+
internal static class LogHandlerExtensions
7+
{
8+
public static void Log(this ILogHandler logHandler, LogMessage logMessage)
9+
{
10+
var message = "librdkafka log";
11+
var data = new
12+
{
13+
Level = logMessage.Level.ToString(),
14+
logMessage.Name,
15+
logMessage.Facility,
16+
logMessage.Message,
17+
};
18+
19+
switch (logMessage.Level)
20+
{
21+
case SyslogLevel.Emergency:
22+
case SyslogLevel.Alert:
23+
case SyslogLevel.Critical:
24+
case SyslogLevel.Error:
25+
logHandler.Error(message, null, data);
26+
break;
27+
case SyslogLevel.Warning:
28+
logHandler.Warning(message, null, data);
29+
break;
30+
case SyslogLevel.Notice:
31+
case SyslogLevel.Info:
32+
logHandler.Info(message, data);
33+
break;
34+
case SyslogLevel.Debug:
35+
logHandler.Verbose(message, data);
36+
break;
37+
default:
38+
throw new ArgumentOutOfRangeException(nameof(logMessage.Level), logMessage.Level, null);
39+
}
40+
}
41+
}

src/KafkaFlow/Producers/MessageProducer.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Confluent.Kafka;
55
using KafkaFlow.Authentication;
66
using KafkaFlow.Configuration;
7+
using KafkaFlow.Extensions;
78

89
namespace KafkaFlow.Producers;
910

@@ -280,7 +281,8 @@ private IProducer<byte[], byte[]> EnsureProducer()
280281
});
281282
}
282283
}
283-
});
284+
})
285+
.SetLogHandler((_, log) => _logHandler.Log(log));
284286

285287
var security = _configuration.Cluster.GetSecurityInformation();
286288

0 commit comments

Comments
 (0)