From cb8994f428f7e9379c47c2f76194ae8e3f201f31 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 16 Dec 2022 10:17:26 +0100 Subject: [PATCH 1/2] [transactions] Reduce spammy log during broker shutdown --- .../transaction/TransactionMarkerChannelManager.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.java index 310a79460b..708a7fea9b 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.java @@ -52,6 +52,7 @@ import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.AuthenticationUtil; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.netty.ChannelFutures; @@ -329,7 +330,13 @@ public void addTxnMarkersToBrokerQueue(String transactionalId, addressFuture.whenComplete((address, throwable) -> { if (throwable != null) { - log.warn("Failed to find broker for topic partition {}", topicPartition, throwable); + if (throwable instanceof PulsarClientException.LookupException + || throwable.getCause() instanceof PulsarClientException.LookupException) { + log.warn("Failed to find broker for topic partition {} - {}", topicPartition, + throwable + ""); + } else { + log.warn("Failed to find broker for topic partition {}", topicPartition, throwable); + } unknownBrokerTopicList.add(topicPartition); addFuture.complete(null); return; From f2823f4df13eb8281993014c8ebc42a488a031b9 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Thu, 15 Jun 2023 17:03:41 +0800 Subject: [PATCH 2/2] Replace to to string --- .../transaction/TransactionMarkerChannelManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.java index 708a7fea9b..29fc505e2c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.java @@ -333,7 +333,7 @@ public void addTxnMarkersToBrokerQueue(String transactionalId, if (throwable instanceof PulsarClientException.LookupException || throwable.getCause() instanceof PulsarClientException.LookupException) { log.warn("Failed to find broker for topic partition {} - {}", topicPartition, - throwable + ""); + throwable.toString()); } else { log.warn("Failed to find broker for topic partition {}", topicPartition, throwable); }