diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.java index 310a79460b..29fc505e2c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.java @@ -52,6 +52,7 @@ import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.AuthenticationUtil; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.netty.ChannelFutures; @@ -329,7 +330,13 @@ public void addTxnMarkersToBrokerQueue(String transactionalId, addressFuture.whenComplete((address, throwable) -> { if (throwable != null) { - log.warn("Failed to find broker for topic partition {}", topicPartition, throwable); + if (throwable instanceof PulsarClientException.LookupException + || throwable.getCause() instanceof PulsarClientException.LookupException) { + log.warn("Failed to find broker for topic partition {} - {}", topicPartition, + throwable.toString()); + } else { + log.warn("Failed to find broker for topic partition {}", topicPartition, throwable); + } unknownBrokerTopicList.add(topicPartition); addFuture.complete(null); return;