Skip to content
Open
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
5924b2f
fix: produce memory leak
JotaDobleEse Nov 27, 2024
fef94ae
chore(deps): bump image-size from 1.1.1 to 1.2.1 in /website
dependabot[bot] Apr 7, 2025
7d30bca
chore(deps): bump @babel/runtime from 7.23.8 to 7.27.0 in /website
dependabot[bot] Apr 7, 2025
be40314
chore(deps): bump @babel/helpers from 7.23.8 to 7.27.0 in /website
dependabot[bot] Apr 7, 2025
a270c05
chore(deps): bump express in /src/KafkaFlow.Admin.Dashboard/ClientApp
dependabot[bot] Apr 7, 2025
f31db0a
chore(deps): bump webpack from 5.89.0 to 5.98.0 in /website
dependabot[bot] Apr 7, 2025
e49244a
chore(deps): bump ws, engine.io and socket.io-adapter
dependabot[bot] Apr 7, 2025
72efee4
chore(deps): bump estree-util-value-to-estree in /website
dependabot[bot] Apr 8, 2025
f94e6e6
chore(deps): bump @babel/helpers
dependabot[bot] Apr 8, 2025
68fb86b
chore(deps): bump http-proxy-middleware
dependabot[bot] Apr 16, 2025
c8214b6
feat: upgrade to dotnet8.0
MiguelCosta Nov 21, 2024
4872764
feat: upgrade to Confluent.Kafka.* 2.8.0
MiguelCosta Feb 6, 2025
aedc918
feat: updat nuget packages
MiguelCosta Mar 13, 2025
8e88d9a
docs: add guide to migrate from v3 to v4
MiguelCosta Mar 13, 2025
0bc471f
chore: support null tombstones
AlexeyRaga Mar 30, 2025
c683ddf
fix: replace messaging.type with messaging.system
simaoribeiro Apr 24, 2025
40e5e84
chore(deps): bump http-proxy-middleware from 2.0.7 to 2.0.9 in /website
dependabot[bot] Apr 24, 2025
286dd54
chore(deps): bump cookie, socket.io and express
dependabot[bot] Apr 7, 2025
42dfc67
Merge branch 'master' into fix/produce-memory-leak
JotaDobleEse Jul 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions src/KafkaFlow/Producers/MessageProducer.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
using System;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
using KafkaFlow.Authentication;
using KafkaFlow.Configuration;
using System;
using System.Text;
using System.Threading.Tasks;

namespace KafkaFlow.Producers;

Expand Down Expand Up @@ -354,20 +354,31 @@ private void InternalProduce(
var localProducer = this.EnsureProducer();
var message = CreateMessage(context);

if (partition.HasValue)
try
{
if (partition.HasValue)
{
localProducer.Produce(
new TopicPartition(context.ProducerContext.Topic, partition.Value),
message,
Handler);

return;
}

localProducer.Produce(
new Confluent.Kafka.TopicPartition(context.ProducerContext.Topic, partition.Value),
message,
Handler);

return;
}

localProducer.Produce(
context.ProducerContext.Topic,
message,
Handler);
catch (Exception ex)
{
DeliveryReport<byte[], byte[]> report = new()
{
Error = new Error(ErrorCode.Local_Fatal, ex.Message, isFatal: true),
};
Handler(report);
}

void Handler(DeliveryReport<byte[], byte[]> report)
{
Expand Down