diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java new file mode 100644 index 0000000000000..2dced51d398c8 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java @@ -0,0 +1,752 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Integration tests for OpenTelemetry tracing with real broker. + * Note: These tests may be timing-dependent and could be flaky in CI environments. + * They verify end-to-end tracing functionality with actual Pulsar broker. + */ +@Test(groups = "broker") +public class OpenTelemetryTracingIntegrationTest extends BrokerTestBase { + + private InMemorySpanExporter spanExporter; + private OpenTelemetrySdk openTelemetry; + private SdkTracerProvider tracerProvider; + + @BeforeClass + @Override + protected void setup() throws Exception { + // Setup OpenTelemetry SDK with in-memory exporter + spanExporter = InMemorySpanExporter.create(); + tracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build(); + + openTelemetry = OpenTelemetrySdk.builder() + .setTracerProvider(tracerProvider) + .build(); + + baseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + if (openTelemetry != null) { + openTelemetry.close(); + } + } + + private void flushSpans() throws Exception { + tracerProvider.forceFlush().join(5, TimeUnit.SECONDS); + } + + @Test + public void testBasicProducerConsumerTracing() throws Exception { + String topic = "persistent://prop/ns-abc/test-basic-tracing"; + spanExporter.reset(); + + // Create client with tracing enabled + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .openTelemetry(openTelemetry) + .enableTracing(true) + .build(); + + // Create producer + Producer producer = client.newProducer(Schema.STRING) + .topic(topic) + .create(); + + // Create consumer + Consumer consumer = client.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("test-sub") + .subscribe(); + + // Send and receive message + MessageId sentMsgId = producer.send("test-message"); + assertNotNull(sentMsgId); + + Message msg = consumer.receive(5, TimeUnit.SECONDS); + assertNotNull(msg); + assertEquals(msg.getValue(), "test-message"); + consumer.acknowledge(msg); + + // Close client to force span flush + producer.close(); + consumer.close(); + client.close(); + + // Force flush tracer provider + flushSpans(); + + // Verify spans - at least one span should be created + List spans = spanExporter.getFinishedSpanItems(); + assertTrue(spans.size() > 0, "Expected at least one span, got: " + spans.size()); + + // Verify producer span if present + spans.stream() + .filter(s -> s.getKind() == SpanKind.PRODUCER) + .findFirst() + .ifPresent(producerSpan -> { + assertEquals(producerSpan.getName(), "send " + topic); + assertEquals(producerSpan.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), "pulsar"); + }); + + // Verify consumer span if present + spans.stream() + .filter(s -> s.getKind() == SpanKind.CONSUMER) + .findFirst() + .ifPresent(consumerSpan -> { + assertEquals(consumerSpan.getName(), "process " + topic); + assertEquals(consumerSpan.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), "pulsar"); + assertEquals(consumerSpan.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey( + "messaging.pulsar.acknowledgment.type")), + "acknowledge"); + }); + } + + @Test + public void testNegativeAcknowledgment() throws Exception { + String topic = "persistent://prop/ns-abc/test-negative-ack"; + spanExporter.reset(); + + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .openTelemetry(openTelemetry) + .enableTracing(true) + .build(); + + Producer producer = client.newProducer(Schema.STRING) + .topic(topic) + .create(); + + Consumer consumer = client.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("test-sub") + .negativeAckRedeliveryDelay(0, TimeUnit.SECONDS) + .subscribe(); + + // Send message + producer.send("test-message"); + + // Receive and negative acknowledge + Message msg = consumer.receive(5, TimeUnit.SECONDS); + assertNotNull(msg); + consumer.negativeAcknowledge(msg); + + // Close to ensure negative ack is processed + producer.close(); + consumer.close(); + client.close(); + + // Wait for spans + flushSpans(); + + // Find consumer span + List spans = spanExporter.getFinishedSpanItems(); + SpanData consumerSpan = spans.stream() + .filter(s -> s.getKind() == SpanKind.CONSUMER) + .findFirst() + .orElseThrow(() -> new AssertionError("Consumer span not found. Total spans: " + + spans.size() + ", kinds: " + spans.stream() + .map(s -> s.getKind().toString()).collect(java.util.stream.Collectors.joining(", ")))); + + // Verify negative ack attribute + assertEquals(consumerSpan.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("messaging.pulsar.acknowledgment.type")), + "negative_acknowledge"); + assertEquals(consumerSpan.getStatus().getStatusCode(), io.opentelemetry.api.trace.StatusCode.UNSET); + } + + @Test + public void testCumulativeAcknowledgment() throws Exception { + String topic = "persistent://prop/ns-abc/test-cumulative-ack"; + spanExporter.reset(); + + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .openTelemetry(openTelemetry) + .enableTracing(true) + .build(); + + Producer producer = client.newProducer(Schema.STRING) + .topic(topic) + .create(); + + Consumer consumer = client.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("test-sub") + .subscriptionType(SubscriptionType.Failover) + .subscribe(); + + // Send multiple messages + for (int i = 0; i < 5; i++) { + producer.send("message-" + i); + } + + // Receive all messages + Message lastMsg = null; + for (int i = 0; i < 5; i++) { + lastMsg = consumer.receive(5, TimeUnit.SECONDS); + assertNotNull(lastMsg); + } + + // Cumulative acknowledge last message + consumer.acknowledgeCumulative(lastMsg); + + // Wait for spans + flushSpans(); + + // Verify all consumer spans have cumulative_acknowledge attribute + List spans = spanExporter.getFinishedSpanItems(); + long consumerSpansWithCumulativeAck = spans.stream() + .filter(s -> s.getKind() == SpanKind.CONSUMER) + .filter(s -> "cumulative_acknowledge".equals(s.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey( + "messaging.pulsar.acknowledgment.type")))) + .count(); + + assertEquals(consumerSpansWithCumulativeAck, 5); + + producer.close(); + consumer.close(); + client.close(); + } + + @Test + public void testAcknowledgmentTimeout() throws Exception { + String topic = "persistent://prop/ns-abc/test-ack-timeout"; + spanExporter.reset(); + + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .openTelemetry(openTelemetry) + .enableTracing(true) + .build(); + + Producer producer = client.newProducer(Schema.STRING) + .topic(topic) + .create(); + + Consumer consumer = client.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("test-sub") + .ackTimeout(1, TimeUnit.SECONDS) + .subscribe(); + + // Send message + producer.send("test-message"); + + // Receive but don't acknowledge + Message msg = consumer.receive(5, TimeUnit.SECONDS); + assertNotNull(msg); + + // Note: Ack timeout behavior varies based on subscription type and broker implementation + // For Shared subscription, ack timeout triggers redelivery but span may already be ended + // This test verifies the basic tracing flow works even with ack timeout configured + + // Acknowledge to properly end the span + consumer.acknowledge(msg); + + // Wait for spans + flushSpans(); + + // Verify consumer span exists with acknowledge attribute + List spans = spanExporter.getFinishedSpanItems(); + boolean foundConsumerSpan = spans.stream() + .filter(s -> s.getKind() == SpanKind.CONSUMER) + .anyMatch(s -> s.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey( + "messaging.pulsar.acknowledgment.type")) != null); + + assertTrue(foundConsumerSpan, "Expected consumer span with acknowledgment.type attribute"); + + producer.close(); + consumer.close(); + client.close(); + } + + @Test + public void testMultiTopicConsumerTracing() throws Exception { + String topic1 = "persistent://prop/ns-abc/test-multi-topic-1"; + String topic2 = "persistent://prop/ns-abc/test-multi-topic-2"; + spanExporter.reset(); + + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .openTelemetry(openTelemetry) + .enableTracing(true) + .build(); + + Producer producer1 = client.newProducer(Schema.STRING) + .topic(topic1) + .create(); + + Producer producer2 = client.newProducer(Schema.STRING) + .topic(topic2) + .create(); + + Consumer consumer = client.newConsumer(Schema.STRING) + .topics(List.of(topic1, topic2)) + .subscriptionName("test-sub") + .subscribe(); + + // Send messages to both topics + producer1.send("message-topic1"); + producer2.send("message-topic2"); + + // Receive and acknowledge both messages + Set receivedTopics = new java.util.HashSet<>(); + for (int i = 0; i < 2; i++) { + Message msg = consumer.receive(5, TimeUnit.SECONDS); + assertNotNull(msg); + receivedTopics.add(msg.getTopicName()); + consumer.acknowledge(msg); + } + + assertEquals(receivedTopics.size(), 2); + + // Wait for spans + flushSpans(); + + // Verify spans for both topics + List spans = spanExporter.getFinishedSpanItems(); + long consumerSpans = spans.stream() + .filter(s -> s.getKind() == SpanKind.CONSUMER) + .count(); + + assertEquals(consumerSpans, 2); + + producer1.close(); + producer2.close(); + consumer.close(); + client.close(); + } + + @Test + public void testTracingWithoutGlobalEnable() throws Exception { + String topic = "persistent://prop/ns-abc/test-no-global-tracing"; + spanExporter.reset(); + + // Create client with OpenTelemetry but tracing NOT enabled + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .openTelemetry(openTelemetry) + .enableTracing(false) // Explicitly disabled + .build(); + + Producer producer = client.newProducer(Schema.STRING) + .topic(topic) + .create(); + + Consumer consumer = client.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("test-sub") + .subscribe(); + + // Send and receive message + producer.send("test-message"); + Message msg = consumer.receive(5, TimeUnit.SECONDS); + assertNotNull(msg); + consumer.acknowledge(msg); + + // Wait for potential spans + flushSpans(); + + // Verify NO spans were created + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(spans.size(), 0, "Expected no spans when tracing is disabled"); + + producer.close(); + consumer.close(); + client.close(); + } + + @Test + public void testSharedSubscriptionTracing() throws Exception { + String topic = "persistent://prop/ns-abc/test-shared-subscription"; + spanExporter.reset(); + + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .openTelemetry(openTelemetry) + .enableTracing(true) + .build(); + + Producer producer = client.newProducer(Schema.STRING) + .topic(topic) + .create(); + + Consumer consumer = client.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("test-shared-sub") + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + + // Send messages + for (int i = 0; i < 3; i++) { + producer.send("message-" + i); + } + + // Receive and acknowledge individually + for (int i = 0; i < 3; i++) { + Message msg = consumer.receive(5, TimeUnit.SECONDS); + assertNotNull(msg); + consumer.acknowledge(msg); + } + + producer.close(); + consumer.close(); + client.close(); + + flushSpans(); + + // Verify individual acks for Shared subscription + List spans = spanExporter.getFinishedSpanItems(); + long consumerSpansWithIndividualAck = spans.stream() + .filter(s -> s.getKind() == SpanKind.CONSUMER) + .filter(s -> "acknowledge".equals(s.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey( + "messaging.pulsar.acknowledgment.type")))) + .count(); + + assertEquals(consumerSpansWithIndividualAck, 3); + } + + @Test + public void testKeySharedSubscriptionTracing() throws Exception { + String topic = "persistent://prop/ns-abc/test-key-shared-subscription"; + spanExporter.reset(); + + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .openTelemetry(openTelemetry) + .enableTracing(true) + .build(); + + Producer producer = client.newProducer(Schema.STRING) + .topic(topic) + .create(); + + Consumer consumer = client.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("test-key-shared-sub") + .subscriptionType(SubscriptionType.Key_Shared) + .subscribe(); + + // Send messages with keys + for (int i = 0; i < 3; i++) { + producer.newMessage() + .key("key-" + (i % 2)) + .value("message-" + i) + .send(); + } + + // Receive and acknowledge + for (int i = 0; i < 3; i++) { + Message msg = consumer.receive(5, TimeUnit.SECONDS); + assertNotNull(msg); + consumer.acknowledge(msg); + } + + producer.close(); + consumer.close(); + client.close(); + + flushSpans(); + + // Verify spans for Key_Shared subscription + List spans = spanExporter.getFinishedSpanItems(); + long consumerSpans = spans.stream() + .filter(s -> s.getKind() == SpanKind.CONSUMER) + .count(); + + assertEquals(consumerSpans, 3); + } + + @Test + public void testExclusiveSubscriptionTracing() throws Exception { + String topic = "persistent://prop/ns-abc/test-exclusive-subscription"; + spanExporter.reset(); + + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .openTelemetry(openTelemetry) + .enableTracing(true) + .build(); + + Producer producer = client.newProducer(Schema.STRING) + .topic(topic) + .create(); + + Consumer consumer = client.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("test-exclusive-sub") + .subscriptionType(SubscriptionType.Exclusive) + .subscribe(); + + // Send messages + for (int i = 0; i < 3; i++) { + producer.send("message-" + i); + } + + // Receive all messages + Message lastMsg = null; + for (int i = 0; i < 3; i++) { + lastMsg = consumer.receive(5, TimeUnit.SECONDS); + assertNotNull(lastMsg); + } + + // Cumulative acknowledge last message + consumer.acknowledgeCumulative(lastMsg); + + producer.close(); + consumer.close(); + client.close(); + + flushSpans(); + + // Verify cumulative ack for Exclusive subscription + List spans = spanExporter.getFinishedSpanItems(); + long consumerSpansWithCumulativeAck = spans.stream() + .filter(s -> s.getKind() == SpanKind.CONSUMER) + .filter(s -> "cumulative_acknowledge".equals(s.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey( + "messaging.pulsar.acknowledgment.type")))) + .count(); + + assertEquals(consumerSpansWithCumulativeAck, 3); + } + + @Test + public void testFailoverSubscriptionWithCumulativeAck() throws Exception { + String topic = "persistent://prop/ns-abc/test-failover-cumulative"; + spanExporter.reset(); + + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .openTelemetry(openTelemetry) + .enableTracing(true) + .build(); + + Producer producer = client.newProducer(Schema.STRING) + .topic(topic) + .create(); + + Consumer consumer = client.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("test-failover-sub") + .subscriptionType(SubscriptionType.Failover) + .subscribe(); + + // Send messages + for (int i = 0; i < 5; i++) { + producer.send("message-" + i); + } + + // Receive all messages + Message lastMsg = null; + for (int i = 0; i < 5; i++) { + lastMsg = consumer.receive(5, TimeUnit.SECONDS); + assertNotNull(lastMsg); + } + + // Cumulative acknowledge last message + consumer.acknowledgeCumulative(lastMsg); + + producer.close(); + consumer.close(); + client.close(); + + flushSpans(); + + // Verify all spans ended with cumulative ack + List spans = spanExporter.getFinishedSpanItems(); + long consumerSpansWithCumulativeAck = spans.stream() + .filter(s -> s.getKind() == SpanKind.CONSUMER) + .filter(s -> "cumulative_acknowledge".equals(s.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey( + "messaging.pulsar.acknowledgment.type")))) + .count(); + + assertEquals(consumerSpansWithCumulativeAck, 5); + } + + @Test + public void testMultiTopicConsumerWithCumulativeAck() throws Exception { + String topic1 = "persistent://prop/ns-abc/test-multi-cumulative-1"; + String topic2 = "persistent://prop/ns-abc/test-multi-cumulative-2"; + spanExporter.reset(); + + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .openTelemetry(openTelemetry) + .enableTracing(true) + .build(); + + Producer producer1 = client.newProducer(Schema.STRING) + .topic(topic1) + .create(); + + Producer producer2 = client.newProducer(Schema.STRING) + .topic(topic2) + .create(); + + Consumer consumer = client.newConsumer(Schema.STRING) + .topics(List.of(topic1, topic2)) + .subscriptionName("test-multi-cumulative-sub") + .subscriptionType(SubscriptionType.Failover) + .subscribe(); + + // Send messages to both topics + producer1.send("topic1-msg1"); + producer1.send("topic1-msg2"); + producer2.send("topic2-msg1"); + producer2.send("topic2-msg2"); + + // Receive messages from both topics + Message topic1LastMsg = null; + Message topic2LastMsg = null; + for (int i = 0; i < 4; i++) { + Message msg = consumer.receive(5, TimeUnit.SECONDS); + assertNotNull(msg); + if (msg.getTopicName().contains("multi-cumulative-1")) { + topic1LastMsg = msg; + } else { + topic2LastMsg = msg; + } + } + + // Cumulative acknowledge for each topic separately + if (topic1LastMsg != null) { + consumer.acknowledgeCumulative(topic1LastMsg); + } + if (topic2LastMsg != null) { + consumer.acknowledgeCumulative(topic2LastMsg); + } + + producer1.close(); + producer2.close(); + consumer.close(); + client.close(); + + flushSpans(); + + // Verify cumulative ack only affects spans from the same topic + List spans = spanExporter.getFinishedSpanItems(); + long consumerSpansWithCumulativeAck = spans.stream() + .filter(s -> s.getKind() == SpanKind.CONSUMER) + .filter(s -> "cumulative_acknowledge".equals(s.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey( + "messaging.pulsar.acknowledgment.type")))) + .count(); + + // Should have cumulative ack for messages from both topics + assertEquals(consumerSpansWithCumulativeAck, 4); + } + + @Test + public void testBatchMessagesTracing() throws Exception { + String topic = "persistent://prop/ns-abc/test-batch-tracing"; + spanExporter.reset(); + + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .openTelemetry(openTelemetry) + .enableTracing(true) + .build(); + + Producer producer = client.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(true) + .batchingMaxMessages(5) + .batchingMaxPublishDelay(1, TimeUnit.SECONDS) + .create(); + + Consumer consumer = client.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("test-sub") + .subscribe(); + + // Send batch of messages + for (int i = 0; i < 5; i++) { + producer.sendAsync("message-" + i); + } + producer.flush(); + + // Receive and acknowledge all messages + for (int i = 0; i < 5; i++) { + Message msg = consumer.receive(5, TimeUnit.SECONDS); + assertNotNull(msg); + consumer.acknowledge(msg); + } + + producer.close(); + consumer.close(); + client.close(); + + // Wait for spans + flushSpans(); + + // Verify spans for batched messages + // Note: Tracing behavior may vary for batched messages depending on when spans are created + List spans = spanExporter.getFinishedSpanItems(); + assertTrue(spans.size() > 0, "Expected at least some spans for batched messages"); + + // Verify that spans have correct attributes + spans.stream() + .filter(s -> s.getKind() == SpanKind.PRODUCER || s.getKind() == SpanKind.CONSUMER) + .forEach(span -> { + assertNotNull(span.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system"))); + assertEquals(span.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), "pulsar"); + }); + } +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java index d31d42bbe634b..7ac063d227b17 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java @@ -615,6 +615,44 @@ ClientBuilder authentication(String authPluginClassName, Map aut */ ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry openTelemetry); + /** + * Enable OpenTelemetry distributed tracing. + * + *

When enabled, interceptors are automatically added to all producers and consumers + * to create spans for message publishing and consumption, and automatically propagate trace context + * via message properties. + * + *

This method is useful when OpenTelemetry is configured globally (e.g., via Java Agent or + * {@link io.opentelemetry.api.GlobalOpenTelemetry}) and you just want to enable tracing interceptors + * without explicitly setting an OpenTelemetry instance. + * + *

Example with Java Agent: + *

{@code
+     * // When using -javaagent:opentelemetry-javaagent.jar
+     * PulsarClient client = PulsarClient.builder()
+     *     .serviceUrl("pulsar://localhost:6650")
+     *     .enableTracing(true)  // Use GlobalOpenTelemetry
+     *     .build();
+     * }
+ * + *

Example with GlobalOpenTelemetry: + *

{@code
+     * // Configure GlobalOpenTelemetry elsewhere in your application
+     * GlobalOpenTelemetry.set(myOpenTelemetry);
+     *
+     * // Just enable tracing in the client
+     * PulsarClient client = PulsarClient.builder()
+     *     .serviceUrl("pulsar://localhost:6650")
+     *     .enableTracing(true)
+     *     .build();
+     * }
+ * + * @param tracingEnabled whether to enable tracing (default: false) + * @return the client builder instance + * @since 4.2.0 + */ + ClientBuilder enableTracing(boolean tracingEnabled); + /** * The clock used by the pulsar client. * diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TraceableMessage.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TraceableMessage.java new file mode 100644 index 0000000000000..5e4c61778ab54 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TraceableMessage.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import io.opentelemetry.api.trace.Span; + +/** + * Extension of {@link Message} interface that supports OpenTelemetry tracing. + *

+ * This interface allows attaching OpenTelemetry spans directly to messages, + * eliminating the need for external tracking via maps. + *

+ * The span lifecycle: + *

    + *
  • Producer: Span is created before send and attached to the message. + * When the send is acknowledged, the span is retrieved and completed.
  • + *
  • Consumer: Span is created when message is received and attached to the message. + * When the message is acknowledged, the span is retrieved and completed.
  • + *
+ */ +public interface TraceableMessage { + + /** + * Set the OpenTelemetry span associated with this message. + *

+ * This method is called by tracing interceptors to attach a span to the message + * for later retrieval when completing the span. + * + * @param span the span to associate with this message, or null to clear + */ + void setTracingSpan(Span span); + + /** + * Get the OpenTelemetry span associated with this message. + * + * @return the span associated with this message, or null if no span is set + */ + Span getTracingSpan(); +} \ No newline at end of file diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TraceableMessageId.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TraceableMessageId.java new file mode 100644 index 0000000000000..d8470184ccc18 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TraceableMessageId.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import io.opentelemetry.api.trace.Span; + +/** + * Extension interface that allows {@link MessageId} implementations to support OpenTelemetry tracing. + *

+ * This interface enables attaching OpenTelemetry spans directly to message IDs, + * allowing span retrieval in acknowledge callbacks which only receive MessageId, + * not the full Message object. + *

+ * This is particularly useful for consumer-side tracing where: + *

    + *
  • A span is created when a message is received (in beforeConsume)
  • + *
  • The span is attached to the message's MessageId
  • + *
  • When the message is acknowledged, the span can be retrieved from the MessageId + * and completed, even though the acknowledge callback only provides MessageId
  • + *
+ */ +public interface TraceableMessageId { + + /** + * Set the OpenTelemetry span associated with this message ID. + *

+ * This method is called by tracing interceptors to attach a span to the message ID + * for later retrieval in acknowledge callbacks. + * + * @param span the span to associate with this message ID, or null to clear + */ + void setTracingSpan(Span span); + + /** + * Get the OpenTelemetry span associated with this message ID. + * + * @return the span associated with this message ID, or null if no span is set + */ + Span getTracingSpan(); +} diff --git a/pulsar-client/TRACING.md b/pulsar-client/TRACING.md new file mode 100644 index 0000000000000..3d4258c650487 --- /dev/null +++ b/pulsar-client/TRACING.md @@ -0,0 +1,405 @@ +# OpenTelemetry Tracing for Pulsar Java Client + +This document describes how to use OpenTelemetry distributed tracing with the Pulsar Java client. + +## Overview + +The Pulsar Java client provides built-in support for OpenTelemetry distributed tracing. This allows you to: + +- Trace message publishing from producer to broker +- Trace message consumption from broker to consumer +- Propagate trace context across services via message properties +- Extract trace context from external sources (e.g., HTTP requests) +- Create end-to-end traces across your distributed system + +## Features + +### Producer Tracing + +Producer tracing creates spans for: +- **send** - Span starts when `send()` or `sendAsync()` is called and completes when broker acknowledges receipt + +### Consumer Tracing + +Consumer tracing creates spans for: +- **process** - Span starts when message is received and completes when message is acknowledged, negatively acknowledged, or ack timeout occurs + +### Trace Context Propagation + +Trace context is automatically propagated using W3C TraceContext format: +- `traceparent` - Contains trace ID, span ID, and trace flags +- `tracestate` - Contains vendor-specific trace information + +Context is injected into and extracted from message properties, enabling seamless trace propagation across services. + +## Quick Start + +### 1. Add Dependencies + +The Pulsar client already includes OpenTelemetry API dependencies. You'll need to add the SDK and exporters: + +```xml + + io.opentelemetry + opentelemetry-sdk + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-exporter-otlp + ${opentelemetry.version} + +``` + +### 2. Enable Tracing + +There are three ways to enable tracing: + +#### Option 1: Using OpenTelemetry Java Agent (Easiest) + +```bash +# Start your application with the Java Agent +java -javaagent:opentelemetry-javaagent.jar \ + -Dotel.service.name=my-service \ + -Dotel.exporter.otlp.endpoint=http://localhost:4317 \ + -jar your-application.jar +``` + +```java +// Just enable tracing - uses GlobalOpenTelemetry from the agent +PulsarClient client = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .enableTracing(true) // That's it! + .build(); +``` + +#### Option 2: With Explicit OpenTelemetry Instance + +```java +OpenTelemetry openTelemetry = // configure your OpenTelemetry instance + +PulsarClient client = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .openTelemetry(openTelemetry, true) // Set OpenTelemetry AND enable tracing + .build(); +``` + +#### Option 3: Using GlobalOpenTelemetry + +```java +// Configure GlobalOpenTelemetry once in your application +GlobalOpenTelemetry.set(myOpenTelemetry); + +// Enable tracing in the client - uses GlobalOpenTelemetry +PulsarClient client = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .enableTracing(true) + .build(); +``` + +**What happens when tracing is enabled:** +- **Create spans** for producer send operations +- **Inject trace context** into message properties automatically +- **Create spans** for consumer receive/ack operations +- **Extract trace context** from message properties automatically +- Link all spans to create end-to-end distributed traces + +### 3. Manual Interceptor Configuration (Advanced) + +If you prefer manual control, you can add interceptors explicitly: + +```java +import org.apache.pulsar.client.impl.tracing.OpenTelemetryProducerInterceptor; +import org.apache.pulsar.client.impl.tracing.OpenTelemetryConsumerInterceptor; + +// Create client (tracing not enabled globally) +PulsarClient client = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .openTelemetry(openTelemetry) + .build(); + +// Add interceptor manually to specific producer +Producer producer = client.newProducer(Schema.STRING) + .topic("my-topic") + .intercept(new OpenTelemetryProducerInterceptor()) + .create(); + +// Add interceptor manually to specific consumer +Consumer consumer = client.newConsumer(Schema.STRING) + .topic("my-topic") + .subscriptionName("my-subscription") + .intercept(new OpenTelemetryConsumerInterceptor<>()) + .subscribe(); +``` + +## Advanced Usage + +### End-to-End Tracing Example + +This example shows how to create a complete trace from an HTTP request through Pulsar to a consumer: + +```java +// Service 1: HTTP API that publishes to Pulsar +@POST +@Path("/order") +public Response createOrder(@Context HttpHeaders headers, Order order) { + // Extract trace context from incoming HTTP request + Context context = TracingProducerBuilder.extractFromHeaders( + convertHeaders(headers)); + + // Publish to Pulsar with trace context + TracingProducerBuilder tracingBuilder = new TracingProducerBuilder(); + producer.newMessage() + .value(order) + .let(builder -> tracingBuilder.injectContext(builder, context)) + .send(); + + return Response.accepted().build(); +} + +// Service 2: Pulsar consumer that processes orders +Consumer consumer = client.newConsumer(Schema.JSON(Order.class)) + .topic("orders") + .subscriptionName("order-processor") + .intercept(new OpenTelemetryConsumerInterceptor<>()) + .subscribe(); + +while (true) { + Message msg = consumer.receive(); + // Trace context is automatically extracted + // Any spans created here will be part of the same trace + processOrder(msg.getValue()); + consumer.acknowledge(msg); +} +``` + +### Custom Span Creation + +You can create custom spans during message processing: + +```java +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; + +Tracer tracer = GlobalOpenTelemetry.get().getTracer("my-app"); + +Message msg = consumer.receive(); + +// Create a custom span for processing +Span span = tracer.spanBuilder("process-message") + .setSpanKind(SpanKind.INTERNAL) + .startSpan(); + +try (Scope scope = span.makeCurrent()) { + // Your processing logic + processMessage(msg.getValue()); + span.setStatus(StatusCode.OK); +} catch (Exception e) { + span.recordException(e); + span.setStatus(StatusCode.ERROR); + throw e; +} finally { + span.end(); + consumer.acknowledge(msg); +} +``` + +## Configuration + +### Compatibility with OpenTelemetry Java Agent + +This implementation is **fully compatible** with the [OpenTelemetry Java Instrumentation](https://github.com/open-telemetry/opentelemetry-java-instrumentation/tree/main/instrumentation/pulsar) for Pulsar: + +- Both use **W3C TraceContext** format (traceparent, tracestate headers) +- Both propagate context via **message properties** +- **No conflicts**: Our implementation checks if trace context is already present (from Java Agent) and avoids duplicate injection +- You can use either approach or both together + +### Using OpenTelemetry Java Agent + +The easiest way to enable tracing is using the OpenTelemetry Java Agent (automatic instrumentation): + +```bash +java -javaagent:path/to/opentelemetry-javaagent.jar \ + -Dotel.service.name=my-service \ + -Dotel.exporter.otlp.endpoint=http://localhost:4317 \ + -jar your-application.jar +``` + +**Note**: When using the Java Agent, you don't need to call `.openTelemetry(otel, true)` as the agent automatically instruments Pulsar. However, calling it won't cause conflicts. + +### Programmatic Configuration + +You can also configure OpenTelemetry programmatically: + +```java +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; + +OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder() + .setEndpoint("http://localhost:4317") + .build(); + +SdkTracerProvider tracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build()) + .build(); + +OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder() + .setTracerProvider(tracerProvider) + .buildAndRegisterGlobal(); +``` + +### Environment Variables + +Configure via environment variables: + +```bash +export OTEL_SERVICE_NAME=my-service +export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +export OTEL_TRACES_EXPORTER=otlp +export OTEL_METRICS_EXPORTER=otlp +``` + +## Span Attributes + +The tracing implementation adds the following attributes to spans following the [OpenTelemetry messaging semantic conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/): + +### Producer Spans +- `messaging.system`: "pulsar" +- `messaging.destination.name`: Topic name +- `messaging.operation.name`: "send" +- `messaging.message.id`: Message ID (added when broker confirms) + +**Span naming**: `send {topic}` (e.g., "send my-topic") + +### Consumer Spans +- `messaging.system`: "pulsar" +- `messaging.destination.name`: Topic name +- `messaging.operation.name`: "process" +- `messaging.message.id`: Message ID +- `messaging.pulsar.acknowledgment.type`: How the message was acknowledged + - `"acknowledge"`: Normal individual acknowledgment + - `"cumulative_acknowledge"`: Cumulative acknowledgment + - `"negative_acknowledge"`: Message negatively acknowledged (will retry) + - `"ack_timeout"`: Acknowledgment timeout occurred (will retry) + +**Span naming**: `process {topic}` (e.g., "process my-topic") + +## Span Lifecycle and Acknowledgment Behavior + +Understanding how spans are handled for different acknowledgment scenarios. Every consumer span includes a `messaging.pulsar.acknowledgment.type` attribute indicating how it was completed: + +### Successful Acknowledgment +- Span ends with **OK** status +- Attribute: `messaging.pulsar.acknowledgment.type = "acknowledge"` + +### Cumulative Acknowledgment +- Span ends with **OK** status +- Attribute: `messaging.pulsar.acknowledgment.type = "cumulative_acknowledge"` +- All spans up to the acknowledged position are ended with this attribute + +### Negative Acknowledgment +- Span ends with **OK** status (not an error) +- Attribute: `messaging.pulsar.acknowledgment.type = "negative_acknowledge"` +- This is normal flow, not a failure - the message will be redelivered and a new span will be created + +### Acknowledgment Timeout +- Span ends with **OK** status (not an error) +- Attribute: `messaging.pulsar.acknowledgment.type = "ack_timeout"` +- This is expected behavior when `ackTimeout` is configured - the message will be redelivered and a new span will be created + +### Application Exception During Processing +- If your application code throws an exception, create a child span and mark it with ERROR status +- The consumer span itself will end normally when you call `negativeAcknowledge()` +- This provides clear separation between messaging operations (OK) and application logic (ERROR) + +**Example - Separating messaging and application errors**: +```java +Message msg = consumer.receive(); +Span processingSpan = tracer.spanBuilder("business-logic").startSpan(); +try (Scope scope = processingSpan.makeCurrent()) { + processMessage(msg.getValue()); + processingSpan.setStatus(StatusCode.OK); + consumer.acknowledge(msg); // Consumer span ends with acknowledgment.type="acknowledge" +} catch (Exception e) { + processingSpan.recordException(e); + processingSpan.setStatus(StatusCode.ERROR); // Business logic failed + consumer.negativeAcknowledge(msg); // Consumer span ends with acknowledgment.type="negative_acknowledge" + throw e; +} finally { + processingSpan.end(); +} +``` + +### Querying by Acknowledgment Type + +The `messaging.pulsar.acknowledgment.type` attribute allows you to filter and analyze spans: + +**Example queries in your tracing backend**: +- Find all retried messages: `messaging.pulsar.acknowledgment.type = "negative_acknowledge" OR "ack_timeout"` +- Calculate retry rate: `count(negative_acknowledge) / count(acknowledge)` +- Identify timeout issues: `messaging.pulsar.acknowledgment.type = "ack_timeout"` +- Analyze cumulative vs individual acks: Group by `messaging.pulsar.acknowledgment.type` + +## Best Practices + +1. **Always use interceptors**: Add tracing interceptors to both producers and consumers for complete visibility. + +2. **Propagate context from HTTP**: When publishing from HTTP endpoints, always extract and propagate the trace context. + +3. **Handle errors properly**: Ensure spans are ended even when exceptions occur. + +4. **Distinguish messaging vs. application errors**: + - Messaging operations (nack, timeout) end with OK status + events + - Application failures should be tracked in separate child spans with ERROR status + +5. **Use meaningful span names**: The default span names include the topic name for easy identification. + +6. **Consider performance**: Tracing adds minimal overhead, but in high-throughput scenarios, consider sampling. + +7. **Clean up resources**: Ensure interceptors and OpenTelemetry SDK are properly closed when shutting down. + +## Troubleshooting + +### Traces not appearing + +1. Verify OpenTelemetry SDK is configured and exporters are set up +2. Check that interceptors are added to producers/consumers +3. Verify trace exporter endpoint is reachable +4. Enable debug logging: `-Dio.opentelemetry.javaagent.debug=true` + +### Missing parent-child relationships + +1. Ensure trace context is being injected via `TracingProducerBuilder.injectContext()` +2. Verify message properties contain `traceparent` header +3. Check that both producer and consumer have tracing interceptors + +### High overhead + +1. Consider using sampling: `-Dotel.traces.sampler=parentbased_traceidratio -Dotel.traces.sampler.arg=0.1` +2. Use batch span processor (default) +3. Adjust batch processor settings if needed + +## Examples + +See the following files for complete examples: +- `TracingExampleTest.java` - Comprehensive usage examples +- `OpenTelemetryTracingTest.java` - Unit tests demonstrating API usage + +## API Reference + +### Main Classes + +- `OpenTelemetryProducerInterceptor` - Producer interceptor for tracing +- `OpenTelemetryConsumerInterceptor` - Consumer interceptor for tracing +- `TracingContext` - Utility methods for span creation and context propagation +- `TracingProducerBuilder` - Helper for injecting trace context into messages + +## Additional Resources + +- [OpenTelemetry Java Documentation](https://opentelemetry.io/docs/instrumentation/java/) +- [W3C Trace Context Specification](https://www.w3.org/TR/trace-context/) +- [Pulsar Documentation](https://pulsar.apache.org/docs/) \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java index 5b3a52d5e427f..9a2d9d1150578 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -157,6 +157,12 @@ public ClientBuilder openTelemetry(OpenTelemetry openTelemetry) { return this; } + @Override + public ClientBuilder enableTracing(boolean tracingEnabled) { + conf.setTracingEnabled(tracingEnabled); + return this; + } + @Override public ClientBuilder authentication(String authPluginClassName, String authParamsString) throws UnsupportedAuthenticationException { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index ec95f2c2f658d..57511e0d09953 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -80,6 +80,7 @@ public abstract class ConsumerBase extends HandlerState implements Consumer conf; protected final String consumerName; protected final CompletableFuture> subscribeFuture; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index dc2363c279f7e..11bf617e2fdc0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -213,10 +213,22 @@ public CompletableFuture> subscribeAsync() { applyDLQConfig = CompletableFuture.completedFuture(null); } return applyDLQConfig.thenCompose(__ -> { - if (interceptorList == null || interceptorList.size() == 0) { + // Automatically add tracing interceptor if tracing is enabled + List> effectiveInterceptors = interceptorList; + if (client.getConfiguration().isTracingEnabled()) { + if (effectiveInterceptors == null) { + effectiveInterceptors = new java.util.ArrayList<>(); + } else { + effectiveInterceptors = new java.util.ArrayList<>(effectiveInterceptors); + } + effectiveInterceptors.add( + new org.apache.pulsar.client.impl.tracing.OpenTelemetryConsumerInterceptor<>()); + } + + if (effectiveInterceptors == null || effectiveInterceptors.size() == 0) { return client.subscribeAsync(conf, schema, null); } else { - return client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(interceptorList)); + return client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(effectiveInterceptors)); } }); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 8cffba44dc5ca..6446a9b60dace 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -25,14 +25,23 @@ import java.util.Objects; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageIdAdv; +import org.apache.pulsar.client.api.TraceableMessageId; import org.apache.pulsar.common.api.proto.MessageIdData; import org.apache.pulsar.common.naming.TopicName; -public class MessageIdImpl implements MessageIdAdv { +public class MessageIdImpl implements MessageIdAdv, TraceableMessageId { + private static final long serialVersionUID = 1L; + protected final long ledgerId; protected final long entryId; protected final int partitionIndex; + /** + * OpenTelemetry tracing span associated with this message ID. + * Used for distributed tracing support via the TraceableMessageId interface. + */ + private transient io.opentelemetry.api.trace.Span tracingSpan; + // Private constructor used only for json deserialization @SuppressWarnings("unused") private MessageIdImpl() { @@ -188,4 +197,16 @@ public byte[] toByteArray() { // there is no message batch so we pass -1 return toByteArray(-1, 0); } + + // TraceableMessageId implementation for OpenTelemetry support + + @Override + public void setTracingSpan(io.opentelemetry.api.trace.Span span) { + this.tracingSpan = span; + } + + @Override + public io.opentelemetry.api.trace.Span getTracingSpan() { + return this.tracingSpan; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index 4d7b6cc473485..c964db5750521 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SchemaSerializationException; +import org.apache.pulsar.client.api.TraceableMessage; import org.apache.pulsar.client.impl.schema.AbstractSchema; import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; @@ -60,7 +61,7 @@ import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; -public class MessageImpl implements Message { +public class MessageImpl implements TraceableMessage, Message { protected MessageId messageId; private final MessageMetadata msgMetadata; @@ -84,6 +85,13 @@ public class MessageImpl implements Message { private boolean poolMessage; @Getter private long consumerEpoch; + + /** + * OpenTelemetry tracing span associated with this message. + * Used for distributed tracing support via the TraceableMessage interface. + */ + private transient io.opentelemetry.api.trace.Span tracingSpan; + // Constructor for out-going message public static MessageImpl create(MessageMetadata msgMetadata, ByteBuffer payload, Schema schema, String topic) { @@ -844,6 +852,18 @@ ByteBuf getPayload() { return payload; } + // TraceableMessage implementation for OpenTelemetry support + + @Override + public void setTracingSpan(io.opentelemetry.api.trace.Span span) { + this.tracingSpan = span; + } + + @Override + public io.opentelemetry.api.trace.Span getTracingSpan() { + return this.tracingSpan; + } + enum SchemaState { None, Ready, Broken } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java index 7c33cba964527..e176cc41bc61b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java @@ -106,9 +106,20 @@ public CompletableFuture> createAsync() { return FutureUtil.failedFuture(pce); } - return interceptorList == null || interceptorList.size() == 0 + // Automatically add tracing interceptor if tracing is enabled + List effectiveInterceptors = interceptorList; + if (client.getConfiguration().isTracingEnabled()) { + if (effectiveInterceptors == null) { + effectiveInterceptors = new ArrayList<>(); + } else { + effectiveInterceptors = new ArrayList<>(effectiveInterceptors); + } + effectiveInterceptors.add(new org.apache.pulsar.client.impl.tracing.OpenTelemetryProducerInterceptor()); + } + + return effectiveInterceptors == null || effectiveInterceptors.size() == 0 ? client.createProducerAsync(conf, schema, null) - : client.createProducerAsync(conf, schema, new ProducerInterceptors(interceptorList)); + : client.createProducerAsync(conf, schema, new ProducerInterceptors(effectiveInterceptors)); } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 5f5239131a878..d14b1dfaf3d48 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -1397,7 +1397,7 @@ public ScheduledExecutorProvider getScheduledExecutorProvider() { return scheduledExecutorProvider; } - InstrumentProvider instrumentProvider() { + public InstrumentProvider instrumentProvider() { return instrumentProvider; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java index 3dc9b23e93e86..872fe283fb9b6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java @@ -22,8 +22,9 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.TopicMessageId; +import org.apache.pulsar.client.api.TraceableMessageId; -public class TopicMessageIdImpl implements MessageIdAdv, TopicMessageId { +public class TopicMessageIdImpl implements MessageIdAdv, TopicMessageId, TraceableMessageId { private final String ownerTopic; private final MessageIdAdv msgId; @@ -129,4 +130,22 @@ public MessageIdAdv getFirstChunkMessageId() { public String toString() { return msgId.toString(); } + + // TraceableMessageId implementation for OpenTelemetry support + // Delegates to the wrapped MessageIdAdv if it implements TraceableMessageId + + @Override + public void setTracingSpan(io.opentelemetry.api.trace.Span span) { + if (msgId instanceof TraceableMessageId) { + ((TraceableMessageId) msgId).setTracingSpan(span); + } + } + + @Override + public io.opentelemetry.api.trace.Span getTracingSpan() { + if (msgId instanceof TraceableMessageId) { + return ((TraceableMessageId) msgId).getTracingSpan(); + } + return null; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java index 7b9916b58fc21..19533b21601c3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java @@ -18,15 +18,17 @@ */ package org.apache.pulsar.client.impl; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.Map; import java.util.Optional; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TraceableMessage; import org.apache.pulsar.common.api.EncryptionContext; -public class TopicMessageImpl implements Message { +public class TopicMessageImpl implements TraceableMessage, Message { /** This topicPartitionName is get from ConsumerImpl, it contains partition part. */ private final String topicPartitionName; @@ -65,6 +67,7 @@ public String getTopicPartitionName() { } @Override + @SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "messageId is immutable") public MessageId getMessageId() { return messageId; } @@ -226,4 +229,22 @@ public Optional getIndex() { return msg.getIndex(); } + // TraceableMessage implementation for OpenTelemetry support + // Delegates to the wrapped message if it implements TraceableMessage + + @Override + public void setTracingSpan(io.opentelemetry.api.trace.Span span) { + if (msg instanceof TraceableMessage) { + ((TraceableMessage) msg).setTracingSpan(span); + } + } + + @Override + public io.opentelemetry.api.trace.Span getTracingSpan() { + if (msg instanceof TraceableMessage) { + return ((TraceableMessage) msg).getTracingSpan(); + } + return null; + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index e406581e707b3..df6e01a73f583 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -437,6 +437,13 @@ public class ClientConfigurationData implements Serializable, Cloneable { private transient OpenTelemetry openTelemetry; + @ApiModelProperty( + name = "tracingEnabled", + value = "Whether to enable OpenTelemetry distributed tracing. When enabled, " + + "tracing interceptors are automatically added to producers and consumers." + ) + private boolean tracingEnabled = false; + /** * Gets the authentication settings for the client. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java index a0bdd8b6fb6c3..d73baef3a0ca7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java @@ -24,6 +24,7 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import io.opentelemetry.api.trace.Tracer; import java.util.function.Consumer; import org.apache.pulsar.PulsarVersion; @@ -32,6 +33,7 @@ public class InstrumentProvider { public static final InstrumentProvider NOOP = new InstrumentProvider(OpenTelemetry.noop()); private final Meter meter; + private final Tracer tracer; public InstrumentProvider(OpenTelemetry otel) { if (otel == null) { @@ -43,6 +45,10 @@ public InstrumentProvider(OpenTelemetry otel) { .meterBuilder("org.apache.pulsar.client") .setInstrumentationVersion(PulsarVersion.getVersion()) .build(); + this.tracer = otel.getTracerProvider() + .tracerBuilder("org.apache.pulsar.client") + .setInstrumentationVersion(PulsarVersion.getVersion()) + .build(); } public Counter newCounter(String name, Unit unit, String description, String topic, Attributes attributes) { @@ -63,4 +69,8 @@ public ObservableUpDownCounter newObservableUpDownCounter(String name, Unit unit Consumer callback) { return new ObservableUpDownCounter(meter, name, unit, description, topic, attributes, callback); } + + public Tracer getTracer() { + return tracer; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java new file mode 100644 index 0000000000000..eece10b76ff94 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.tracing; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.propagation.TextMapPropagator; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerInterceptor; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TopicMessageId; +import org.apache.pulsar.client.api.TraceableMessageId; +import org.apache.pulsar.client.impl.ConsumerBase; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * OpenTelemetry consumer interceptor that creates spans for message consumption. + *

+ * This interceptor automatically retrieves the Tracer from the client's InstrumentProvider, + * ensuring consistent OpenTelemetry configuration across the client. + *

+ * Span Storage Strategy: + *

    + *
  • Shared/Key_Shared subscriptions: Spans are attached directly to {@link TraceableMessageId} + * instances with zero map overhead.
  • + *
  • Failover/Exclusive subscriptions: A nested map is initialized eagerly to track message IDs + * and their spans in sorted order. This is necessary because cumulative ack must end spans + * for all messages up to the acked position.
  • + *
+ *

+ * Multi-Topic Consumer Support:
+ * For {@link org.apache.pulsar.client.impl.MultiTopicsConsumerImpl} and pattern-based consumers, cumulative + * acknowledgment only affects messages from the same topic partition. The interceptor uses a nested + * map structure (topic partition → message IDs) and {@link TopicMessageId#getOwnerTopic()} to ensure + * spans are only ended for messages from the acknowledged topic partition. + */ +public class OpenTelemetryConsumerInterceptor implements ConsumerInterceptor { + + private static final Logger log = LoggerFactory.getLogger(OpenTelemetryConsumerInterceptor.class); + + private Tracer tracer; + private TextMapPropagator propagator; + private String topic; + private String subscription; + private boolean initialized = false; + + /** + * Used for cumulative acknowledgment support (Failover/Exclusive subscriptions). + * Outer map: topic partition -> (message ID -> span) + * Inner ConcurrentSkipListMap maintains sorted order for efficient range operations. + * Initialized eagerly for Failover/Exclusive subscriptions. + *

+ * The nested structure is necessary for multi-topic consumers where a single interceptor + * instance handles messages from multiple topic partitions. Cumulative ack only affects + * messages from the same topic partition. + */ + private volatile Map> messageSpansByTopic; + + public OpenTelemetryConsumerInterceptor() { + // Tracer and propagator will be initialized in beforeConsume when we have access to the consumer + } + + /** + * Get the topic key for a message ID. + * For TopicMessageId, returns the owner topic. Otherwise returns the consumer's topic. + */ + private String getTopicKey(MessageId messageId) { + if (messageId instanceof TopicMessageId) { + return ((TopicMessageId) messageId).getOwnerTopic(); + } + return topic != null ? topic : ""; + } + + /** + * Initialize the tracer from the consumer's client. + * This is called lazily on the first message. + */ + private void initializeIfNeeded(Consumer consumer) { + if (!initialized && consumer instanceof ConsumerBase consumerBase) { + PulsarClientImpl client = consumerBase.getClient(); + InstrumentProvider instrumentProvider = client.instrumentProvider(); + + this.tracer = instrumentProvider.getTracer(); + this.propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); + this.initialized = true; + if (consumerBase.getConf().getSubscriptionType() == SubscriptionType.Exclusive + || consumerBase.getConf().getSubscriptionType() == SubscriptionType.Failover) { + ensureMapInitialized(); + } + } + } + + /** + * Ensure the map is initialized for cumulative acknowledgment support. + * This is called when we detect cumulative ack is being used. + */ + private void ensureMapInitialized() { + if (messageSpansByTopic == null) { + messageSpansByTopic = new ConcurrentHashMap<>(); + log.debug("Initialized message spans map for cumulative acknowledgment support"); + } + } + + @Override + public void close() { + // Clean up any remaining spans for Failover/Exclusive subscriptions + if (messageSpansByTopic != null) { + messageSpansByTopic.values().forEach(topicSpans -> + topicSpans.values().forEach(TracingContext::endSpan) + ); + messageSpansByTopic.clear(); + } + } + + @Override + public Message beforeConsume(Consumer consumer, Message message) { + // Initialize tracer from consumer on first call + initializeIfNeeded(consumer); + + if (tracer == null || propagator == null) { + return message; + } + + try { + if (topic == null) { + topic = consumer.getTopic(); + } + if (subscription == null) { + subscription = consumer.getSubscription(); + } + + // Create a consumer span for this message + Span span = TracingContext.createConsumerSpan(tracer, topic, subscription, message, propagator); + + if (TracingContext.isValid(span)) { + MessageId messageId = message.getMessageId(); + + // Store in map for cumulative ack support (Failover/Exclusive) + if (messageSpansByTopic != null && messageId instanceof MessageIdAdv) { + String topicKey = getTopicKey(messageId); + messageSpansByTopic.computeIfAbsent(topicKey, + k -> new ConcurrentSkipListMap<>()).put((MessageIdAdv) messageId, span); + } + + // Always attach span to message ID for individual ack/nack + if (messageId instanceof TraceableMessageId) { + ((TraceableMessageId) messageId).setTracingSpan(span); + } + + log.debug("Created consumer span for message {} on topic {}", messageId, topic); + } + } catch (Exception e) { + log.error("Error creating consumer span", e); + } + + return message; + } + + @Override + public void onAcknowledge(Consumer consumer, MessageId messageId, Throwable exception) { + if (!(messageId instanceof TraceableMessageId)) { + return; + } + + Span span = ((TraceableMessageId) messageId).getTracingSpan(); + if (span != null) { + try { + if (exception != null) { + TracingContext.endSpan(span, exception); + } else { + // Add attribute to indicate acknowledgment type + span.setAttribute("messaging.pulsar.acknowledgment.type", "acknowledge"); + TracingContext.endSpan(span); + } + // Clear the span from the message ID + ((TraceableMessageId) messageId).setTracingSpan(null); + + // Remove from map if it exists (Failover/Exclusive) + if (messageSpansByTopic != null && messageId instanceof MessageIdAdv) { + String topicKey = getTopicKey(messageId); + ConcurrentSkipListMap topicSpans = messageSpansByTopic.get(topicKey); + if (topicSpans != null) { + topicSpans.remove((MessageIdAdv) messageId); + } + } + } catch (Exception e) { + log.error("Error ending consumer span on acknowledge", e); + } + } + } + + @Override + public void onAcknowledgeCumulative(Consumer consumer, MessageId messageId, Throwable exception) { + if (!(messageId instanceof MessageIdAdv cumulativeAckPos)) { + // Fallback to simple ack for non-adv message IDs + if (messageId instanceof TraceableMessageId) { + Span span = ((TraceableMessageId) messageId).getTracingSpan(); + if (span != null) { + try { + if (exception != null) { + TracingContext.endSpan(span, exception); + } else { + // Add attribute to indicate acknowledgment type + span.setAttribute("messaging.pulsar.acknowledgment.type", "cumulative_acknowledge"); + TracingContext.endSpan(span); + } + ((TraceableMessageId) messageId).setTracingSpan(null); + } catch (Exception e) { + log.error("Error ending consumer span on cumulative acknowledge", e); + } + } + } + return; + } + + String topicKey = getTopicKey(messageId); + + // Get the topic-specific map + ConcurrentSkipListMap topicSpans = messageSpansByTopic != null + ? messageSpansByTopic.get(topicKey) : null; + + // First, try to get the span for the cumulative ack position itself + Span currentSpan = null; + if (messageId instanceof TraceableMessageId) { + currentSpan = ((TraceableMessageId) messageId).getTracingSpan(); + } + + // End spans for all messages in the topic-specific map up to the cumulative ack position + if (topicSpans != null) { + Iterator> iterator = topicSpans.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + MessageIdAdv msgId = entry.getKey(); + + // End spans for all messages <= cumulative ack position + if (msgId.compareTo(cumulativeAckPos) <= 0) { + Span span = entry.getValue(); + try { + if (exception != null) { + TracingContext.endSpan(span, exception); + } else { + // Add attribute to indicate acknowledgment type + span.setAttribute("messaging.pulsar.acknowledgment.type", "cumulative_acknowledge"); + TracingContext.endSpan(span); + } + + // Clear the span from the message ID + if (msgId instanceof TraceableMessageId) { + ((TraceableMessageId) msgId).setTracingSpan(null); + } + } catch (Exception e) { + log.error("Error ending consumer span on cumulative acknowledge for message {}", msgId, e); + } + iterator.remove(); + } else { + // Since the map is sorted, we can break early + break; + } + } + + // Clean up empty topic map + if (topicSpans.isEmpty()) { + messageSpansByTopic.remove(topicKey); + } + } + + // If the cumulative ack position span wasn't in the map, end it directly + if (currentSpan != null && messageId instanceof TraceableMessageId) { + try { + if (exception != null) { + TracingContext.endSpan(currentSpan, exception); + } else { + TracingContext.endSpan(currentSpan); + } + ((TraceableMessageId) messageId).setTracingSpan(null); + } catch (Exception e) { + log.error("Error ending consumer span on cumulative acknowledge", e); + } + } + } + + @Override + public void onNegativeAcksSend(Consumer consumer, Set messageIds) { + for (MessageId messageId : messageIds) { + if (!(messageId instanceof TraceableMessageId)) { + continue; + } + + Span span = ((TraceableMessageId) messageId).getTracingSpan(); + if (span != null) { + try { + // Add attribute to indicate negative acknowledgment (not an error, but normal flow) + span.setAttribute("messaging.pulsar.acknowledgment.type", "negative_acknowledge"); + // End span normally - negative ack is expected behavior, not an error + TracingContext.endSpan(span); + // Clear the span from the message ID + ((TraceableMessageId) messageId).setTracingSpan(null); + + // Remove from map if it exists (Failover/Exclusive) + if (messageSpansByTopic != null && messageId instanceof MessageIdAdv) { + String topicKey = getTopicKey(messageId); + ConcurrentSkipListMap topicSpans = messageSpansByTopic.get(topicKey); + if (topicSpans != null) { + topicSpans.remove((MessageIdAdv) messageId); + } + } + } catch (Exception e) { + log.error("Error ending consumer span on negative acknowledge", e); + } + } + } + } + + @Override + public void onAckTimeoutSend(Consumer consumer, Set messageIds) { + for (MessageId messageId : messageIds) { + if (!(messageId instanceof TraceableMessageId)) { + continue; + } + + Span span = ((TraceableMessageId) messageId).getTracingSpan(); + if (span != null) { + try { + // Add attribute to indicate ack timeout (not an error, but expected behavior) + span.setAttribute("messaging.pulsar.acknowledgment.type", "ack_timeout"); + // End span normally - ack timeout is expected behavior, not an error + TracingContext.endSpan(span); + // Clear the span from the message ID + ((TraceableMessageId) messageId).setTracingSpan(null); + + // Remove from map if it exists (Failover/Exclusive) + if (messageSpansByTopic != null && messageId instanceof MessageIdAdv) { + String topicKey = getTopicKey(messageId); + ConcurrentSkipListMap topicSpans = messageSpansByTopic.get(topicKey); + if (topicSpans != null) { + topicSpans.remove((MessageIdAdv) messageId); + } + } + } catch (Exception e) { + log.error("Error ending consumer span on ack timeout", e); + } + } + } + } +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java new file mode 100644 index 0000000000000..8fe0659a5d836 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.tracing; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapPropagator; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.TraceableMessage; +import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; +import org.apache.pulsar.client.impl.ProducerBase; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * OpenTelemetry producer interceptor that creates spans for message publishing. + *

+ * This interceptor automatically retrieves the Tracer from the client's InstrumentProvider, + * ensuring consistent OpenTelemetry configuration across the client. + *

+ * Spans are attached directly to {@link TraceableMessage} instances, eliminating the need + * for external span tracking via maps. + */ +public class OpenTelemetryProducerInterceptor implements ProducerInterceptor { + + private static final Logger log = LoggerFactory.getLogger(OpenTelemetryProducerInterceptor.class); + + private Tracer tracer; + private TextMapPropagator propagator; + private String topic; + private boolean initialized = false; + + public OpenTelemetryProducerInterceptor() { + // Tracer and propagator will be initialized in beforeSend when we have access to the producer + } + + /** + * Initialize the tracer from the producer's client. + * This is called lazily on the first message. + */ + private void initializeIfNeeded(Producer producer) { + if (!initialized && producer instanceof ProducerBase producerBase) { + PulsarClientImpl client = producerBase.getClient(); + InstrumentProvider instrumentProvider = client.instrumentProvider(); + + this.tracer = instrumentProvider.getTracer(); + this.propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); + this.initialized = true; + } + } + + @Override + public void close() { + // No cleanup needed - spans are attached to messages + } + + @Override + public boolean eligible(Message message) { + return tracer != null && propagator != null; + } + + @Override + public Message beforeSend(Producer producer, Message message) { + // Initialize tracer from producer on first call + initializeIfNeeded(producer); + + if (!eligible(message)) { + return message; + } + + try { + if (topic == null) { + topic = producer.getTopic(); + } + + // Create a span for this message publication + // The span will be linked to the current context, which may have been set by: + // 1. An active span in the current thread (e.g., from HTTP request handling) + // 2. Context propagated from upstream services + Span span = TracingContext.createProducerSpan(tracer, topic, Context.current()); + + if (TracingContext.isValid(span) && message instanceof TraceableMessage) { + // Attach the span directly to the message + ((TraceableMessage) message).setTracingSpan(span); + log.debug("Created producer span for message on topic {}", topic); + } + } catch (Exception e) { + log.error("Error creating producer span", e); + } + + return message; + } + + @Override + public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) { + if (!(message instanceof TraceableMessage)) { + return; + } + + Span span = ((TraceableMessage) message).getTracingSpan(); + if (span != null) { + try { + if (msgId != null) { + span.setAttribute("messaging.message.id", msgId.toString()); + } + + if (exception != null) { + TracingContext.endSpan(span, exception); + } else { + TracingContext.endSpan(span); + } + + // Clear the span from the message + ((TraceableMessage) message).setTracingSpan(null); + } catch (Exception e) { + log.error("Error ending producer span", e); + } + } + } +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/TracingContext.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/TracingContext.java new file mode 100644 index 0000000000000..c5a9b0c3345b6 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/TracingContext.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.tracing; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.context.propagation.TextMapSetter; +import java.util.HashMap; +import java.util.Map; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.jspecify.annotations.Nullable; + +/** + * Utility class for managing OpenTelemetry tracing context in Pulsar messages. + */ +public class TracingContext { + + private static final TextMapGetter> GETTER = new TextMapGetter>() { + @Override + public Iterable keys(Map carrier) { + return carrier.keySet(); + } + + @Nullable + @Override + public String get(@Nullable Map carrier, String key) { + return carrier != null ? carrier.get(key) : null; + } + }; + + private static final TextMapSetter> SETTER = (carrier, key, value) -> { + if (carrier != null) { + carrier.put(key, value); + } + }; + + /** + * Extract trace context from message properties. + * + * @param message the message to extract context from + * @param propagator the text map propagator to use + * @return the extracted context + */ + public static Context extractContext(Message message, TextMapPropagator propagator) { + if (message == null || propagator == null) { + return Context.current(); + } + return propagator.extract(Context.current(), message.getProperties(), GETTER); + } + + /** + * Inject trace context into message properties. + * + * @param messageBuilder the message builder to inject context into + * @param context the context to inject + * @param propagator the text map propagator to use + */ + public static void injectContext(TypedMessageBuilder messageBuilder, Context context, + TextMapPropagator propagator) { + if (messageBuilder == null || context == null || propagator == null) { + return; + } + + Map carrier = new HashMap<>(); + propagator.inject(context, carrier, SETTER); + + for (Map.Entry entry : carrier.entrySet()) { + messageBuilder.property(entry.getKey(), entry.getValue()); + } + } + + /** + * Create a producer span for message publishing. + * + * @param tracer the tracer to use + * @param topic the topic name + * @param parentContext the parent context (may be null) + * @return the created span + */ + public static Span createProducerSpan(Tracer tracer, String topic, @Nullable Context parentContext) { + if (tracer == null) { + return Span.getInvalid(); + } + + Context context = parentContext != null ? parentContext : Context.current(); + return tracer.spanBuilder("send " + topic) + .setParent(context) + .setSpanKind(SpanKind.PRODUCER) + .setAttribute("messaging.system", "pulsar") + .setAttribute("messaging.destination.name", topic) + .setAttribute("messaging.operation.name", "send") + .startSpan(); + } + + /** + * Create a consumer span for message consumption. + * + * @param tracer the tracer to use + * @param topic the topic name + * @param subscription the subscription name + * @param message the message being consumed + * @param propagator the text map propagator to use for context extraction + * @return the created span + */ + public static Span createConsumerSpan(Tracer tracer, String topic, String subscription, Message message, + TextMapPropagator propagator) { + if (tracer == null) { + return Span.getInvalid(); + } + + Context parentContext = extractContext(message, propagator); + + return tracer.spanBuilder("process " + topic) + .setParent(parentContext) + .setSpanKind(SpanKind.CONSUMER) + .setAttribute("messaging.system", "pulsar") + .setAttribute("messaging.destination.name", topic) + .setAttribute("messaging.destination.subscription.name", subscription) + .setAttribute("messaging.operation.name", "process") + .setAttribute("messaging.message.id", message.getMessageId().toString()) + .startSpan(); + } + + /** + * Mark a span as successful and end it. + * + * @param span the span to end + */ + public static void endSpan(Span span) { + if (span != null && span.isRecording()) { + span.setStatus(StatusCode.OK); + span.end(); + } + } + + /** + * Mark a span as failed with an exception and end it. + * + * @param span the span to end + * @param throwable the exception that caused the failure + */ + public static void endSpan(Span span, Throwable throwable) { + if (span != null && span.isRecording()) { + span.setStatus(StatusCode.ERROR, throwable.getMessage()); + span.recordException(throwable); + span.end(); + } + } + + /** + * Check if a span has a valid context. + * + * @param span the span to check + * @return true if the span has a valid context + */ + public static boolean isValid(Span span) { + return span != null && span.getSpanContext() != null && span.getSpanContext().isValid(); + } + + /** + * Get the span context from a span. + * + * @param span the span + * @return the span context + */ + public static SpanContext getSpanContext(Span span) { + return span != null ? span.getSpanContext() : SpanContext.getInvalid(); + } +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/package-info.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/package-info.java new file mode 100644 index 0000000000000..73d650339cbff --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/package-info.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * OpenTelemetry tracing support for Pulsar Java client. + * + *

Overview

+ * This package provides OpenTelemetry distributed tracing capabilities for Pulsar producers and consumers. + * It automatically creates spans for message publishing, consumption, and acknowledgment operations, + * and propagates trace context across services using message properties. + * + *

Producer Tracing

+ * Producer tracing tracks: + *
    + *
  • publish - Span created when send is called
  • + *
  • published - Span completed when broker confirms receipt
  • + *
+ * + *

Basic Producer Example

+ *
{@code
+ * // Configure OpenTelemetry (or use auto-instrumentation)
+ * OpenTelemetry openTelemetry = ...;
+ *
+ * // Create producer with tracing interceptor
+ * Producer producer = client.newProducer(Schema.STRING)
+ *     .topic("my-topic")
+ *     .intercept(new OpenTelemetryProducerInterceptor())
+ *     .create();
+ *
+ * // Send message - trace context is automatically propagated
+ * producer.newMessage()
+ *     .value("Hello World")
+ *     .send();
+ * }
+ * + *

+ * Trace context is automatically injected into message properties from the current thread's context. + * This means if your code is running within a traced HTTP request or any other active span, + * the trace will automatically continue through Pulsar messages. + * + *

Consumer Tracing

+ * Consumer tracing tracks: + *
    + *
  • consume - Span created when message is received
  • + *
  • ack - Span completed when message is acknowledged
  • + *
  • nack - Span completed when message is negatively acknowledged
  • + *
+ * + *

Basic Consumer Example

+ *
{@code
+ * // Create consumer with tracing interceptor
+ * Consumer consumer = client.newConsumer(Schema.STRING)
+ *     .topic("my-topic")
+ *     .subscriptionName("my-subscription")
+ *     .intercept(new OpenTelemetryConsumerInterceptor<>())
+ *     .subscribe();
+ *
+ * // Receive and process messages - trace context is automatically extracted
+ * while (true) {
+ *     Message msg = consumer.receive();
+ *     try {
+ *         // Process message
+ *         System.out.println("Received: " + msg.getValue());
+ *         consumer.acknowledge(msg);
+ *     } catch (Exception e) {
+ *         consumer.negativeAcknowledge(msg);
+ *     }
+ * }
+ * }
+ * + *

End-to-End Tracing Example

+ *
{@code
+ * // Service 1: HTTP endpoint that publishes to Pulsar
+ * // When using auto-instrumentation or OpenTelemetry SDK, the HTTP request
+ * // will have an active span context that automatically propagates to Pulsar
+ * @POST
+ * @Path("/publish")
+ * public Response publishMessage(String body) {
+ *     // Send message - trace context automatically injected!
+ *     producer.newMessage()
+ *         .value(body)
+ *         .send();
+ *
+ *     return Response.ok().build();
+ * }
+ *
+ * // Service 2: Consumer that processes messages
+ * Consumer consumer = client.newConsumer(Schema.STRING)
+ *     .topic("my-topic")
+ *     .subscriptionName("my-subscription")
+ *     .intercept(new OpenTelemetryConsumerInterceptor<>())
+ *     .subscribe();
+ *
+ * // Process messages - trace continues from HTTP request
+ * Message msg = consumer.receive();
+ * // Trace context is automatically extracted from message properties
+ * processMessage(msg.getValue());
+ * consumer.acknowledge(msg);
+ *
+ * // The entire flow from HTTP request -> Producer -> Consumer is now traced!
+ * }
+ * + *

Configuration

+ * OpenTelemetry can be configured via: + *
    + *
  • Java Agent auto-instrumentation
  • + *
  • Environment variables (OTEL_*)
  • + *
  • System properties (otel.*)
  • + *
  • Programmatic configuration
  • + *
+ * + *

Trace Context Propagation

+ * Trace context is propagated using W3C TraceContext format via message properties: + *
    + *
  • traceparent - Contains trace ID, span ID, and trace flags
  • + *
  • tracestate - Contains vendor-specific trace information
  • + *
+ * + * @see OpenTelemetryProducerInterceptor + * @see OpenTelemetryConsumerInterceptor + * @see TracingContext + */ +package org.apache.pulsar.client.impl.tracing; \ No newline at end of file diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryTracingTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryTracingTest.java new file mode 100644 index 0000000000000..bbf92b9ffb1a8 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryTracingTest.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.tracing; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.impl.MessageImpl; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Tests for OpenTelemetry tracing integration. + */ +public class OpenTelemetryTracingTest { + + private InMemorySpanExporter spanExporter; + private OpenTelemetrySdk openTelemetry; + private Tracer tracer; + private TextMapPropagator propagator; + + @BeforeClass + public void setup() { + spanExporter = InMemorySpanExporter.create(); + SdkTracerProvider tracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build(); + + openTelemetry = OpenTelemetrySdk.builder() + .setTracerProvider(tracerProvider) + .build(); + + tracer = openTelemetry.getTracer("test-tracer"); + propagator = openTelemetry.getPropagators().getTextMapPropagator(); + } + + @AfterClass + public void tearDown() { + if (openTelemetry != null) { + openTelemetry.close(); + } + } + + @Test + public void testCreateProducerSpan() { + spanExporter.reset(); + + String topic = "test-topic"; + Span span = TracingContext.createProducerSpan(tracer, topic, null); + + assertNotNull(span); + assertTrue(span.isRecording()); + assertTrue(TracingContext.isValid(span)); + + TracingContext.endSpan(span); + + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(spans.size(), 1); + + SpanData spanData = spans.get(0); + assertEquals(spanData.getName(), "send " + topic); + assertEquals(spanData.getKind(), SpanKind.PRODUCER); + assertEquals(spanData.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), "pulsar"); + assertEquals(spanData.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("messaging.destination.name")), topic); + assertEquals(spanData.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("messaging.operation.name")), "send"); + } + + @Test + public void testCreateConsumerSpan() { + spanExporter.reset(); + + String topic = "test-topic"; + String subscription = "test-sub"; + Map properties = new HashMap<>(); + properties.put("test-key", "test-value"); + + Message message = createTestMessage(properties); + + Span span = TracingContext.createConsumerSpan(tracer, topic, subscription, message, propagator); + + assertNotNull(span); + assertTrue(span.isRecording()); + assertTrue(TracingContext.isValid(span)); + + TracingContext.endSpan(span); + + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(spans.size(), 1); + + SpanData spanData = spans.get(0); + assertEquals(spanData.getName(), "process " + topic); + assertEquals(spanData.getKind(), SpanKind.CONSUMER); + assertEquals(spanData.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), "pulsar"); + assertEquals(spanData.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("messaging.destination.name")), topic); + assertEquals(spanData.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("messaging.destination.subscription.name")), + subscription); + assertEquals(spanData.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("messaging.operation.name")), "process"); + } + + @Test + public void testSpanWithException() { + spanExporter.reset(); + + String topic = "test-topic"; + Span span = TracingContext.createProducerSpan(tracer, topic, null); + + RuntimeException exception = new RuntimeException("Test exception"); + TracingContext.endSpan(span, exception); + + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(spans.size(), 1); + + SpanData spanData = spans.get(0); + assertEquals(spanData.getStatus().getStatusCode(), io.opentelemetry.api.trace.StatusCode.ERROR); + assertFalse(spanData.getEvents().isEmpty()); + } + + @Test + public void testContextPropagation() { + spanExporter.reset(); + + // Create a parent span + Span parentSpan = tracer.spanBuilder("parent").startSpan(); + try (Scope scope = parentSpan.makeCurrent()) { + // Create a producer span as child + String topic = "test-topic"; + Span producerSpan = TracingContext.createProducerSpan(tracer, topic, Context.current()); + + assertNotNull(producerSpan); + assertTrue(TracingContext.isValid(producerSpan)); + + TracingContext.endSpan(producerSpan); + } finally { + parentSpan.end(); + } + + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(spans.size(), 2); + + // Verify parent-child relationship + SpanData producerSpan = spans.get(0); + SpanData parentSpanData = spans.get(1); + + assertEquals(producerSpan.getParentSpanId(), parentSpanData.getSpanId()); + } + + + private Message createTestMessage(Map properties) { + // Create a simple MessageMetadata with properties + org.apache.pulsar.common.api.proto.MessageMetadata metadata = + new org.apache.pulsar.common.api.proto.MessageMetadata(); + + for (Map.Entry entry : properties.entrySet()) { + metadata.addProperty() + .setKey(entry.getKey()) + .setValue(entry.getValue()); + } + + // Create a message with metadata + MessageImpl message = MessageImpl.create( + metadata, + java.nio.ByteBuffer.wrap("test".getBytes()), + org.apache.pulsar.client.api.Schema.BYTES, + "test-topic" + ); + + // Set a MessageId on the message + message.setMessageId(new org.apache.pulsar.client.impl.MessageIdImpl(1L, 1L, -1)); + + return message; + } +} \ No newline at end of file diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/tracing/TracingExampleTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/tracing/TracingExampleTest.java new file mode 100644 index 0000000000000..d50c45a1df5a7 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/tracing/TracingExampleTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.tracing; + +import org.testng.annotations.Test; + +/** + * Example test demonstrating OpenTelemetry tracing usage patterns. + * These are code examples for documentation purposes. + */ +public class TracingExampleTest { + + /** + * Example 1: Basic producer with tracing. + */ + @Test(enabled = false) + public void exampleBasicProducerTracing() throws Exception { + // Configure OpenTelemetry (or use auto-instrumentation) + // OpenTelemetry openTelemetry = ...; + + // Create Pulsar client + // PulsarClient client = PulsarClient.builder() + // .serviceUrl("pulsar://localhost:6650") + // .build(); + + // Create producer with tracing interceptor + // Producer producer = client.newProducer(Schema.STRING) + // .topic("my-topic") + // .intercept(new OpenTelemetryProducerInterceptor()) + // .create(); + + // Send message - trace context is automatically propagated + // producer.newMessage() + // .value("Hello World") + // .send(); + } + + /** + * Example 2: Producer with automatic tracing. + */ + @Test(enabled = false) + public void exampleProducerWithAutomaticTracing() throws Exception { + // Create Pulsar client with tracing enabled + // PulsarClient client = PulsarClient.builder() + // .serviceUrl("pulsar://localhost:6650") + // .openTelemetry(openTelemetry, true) // Enable automatic tracing + // .build(); + + // Producer automatically has tracing enabled + // Producer producer = client.newProducer(Schema.STRING) + // .topic("my-topic") + // .create(); + + // Send message - trace context is automatically injected + // producer.newMessage() + // .value("Hello World") + // .send(); + } + + /** + * Example 3: Basic consumer with tracing. + */ + @Test(enabled = false) + public void exampleBasicConsumerTracing() throws Exception { + // Create Pulsar client + // PulsarClient client = PulsarClient.builder() + // .serviceUrl("pulsar://localhost:6650") + // .build(); + + // Create consumer with tracing interceptor + // Consumer consumer = client.newConsumer(Schema.STRING) + // .topic("my-topic") + // .subscriptionName("my-subscription") + // .intercept(new OpenTelemetryConsumerInterceptor<>()) + // .subscribe(); + + // Receive and process messages - trace context is automatically extracted + // while (true) { + // Message msg = consumer.receive(); + // try { + // // Process message + // System.out.println("Received: " + msg.getValue()); + // consumer.acknowledge(msg); + // } catch (Exception e) { + // consumer.negativeAcknowledge(msg); + // } + // } + } + + /** + * Example 4: End-to-end tracing from HTTP to Pulsar (automatic). + */ + @Test(enabled = false) + public void exampleEndToEndTracing() throws Exception { + // ===== HTTP Service ===== + // When the HTTP framework has OpenTelemetry instrumentation, + // the active span context automatically propagates to Pulsar + + // Producer - trace context automatically injected from HTTP span + // producer.newMessage() + // .value("Message from HTTP request") + // .send(); + + // ===== Consumer Service ===== + // In another service/application + + // Consumer with tracing + // Consumer consumer = client.newConsumer(Schema.STRING) + // .topic("my-topic") + // .subscriptionName("my-subscription") + // .intercept(new OpenTelemetryConsumerInterceptor<>()) + // .subscribe(); + + // Process messages - trace continues from HTTP request + // Message msg = consumer.receive(); + // // Trace context is automatically extracted from message properties + // processMessage(msg.getValue()); + // consumer.acknowledge(msg); + + // The entire flow from HTTP request -> Producer -> Consumer is now traced! + } + + /** + * Example 5: Custom span creation in message processing. + */ + @Test(enabled = false) + public void exampleCustomSpanCreation() throws Exception { + // When you need to create custom spans during message processing + + // Consumer with tracing + // Consumer consumer = client.newConsumer(Schema.STRING) + // .topic("my-topic") + // .subscriptionName("my-subscription") + // .intercept(new OpenTelemetryConsumerInterceptor<>()) + // .subscribe(); + + // Get tracer + // Tracer tracer = GlobalOpenTelemetry.get().getTracer("my-application"); + + // Process messages + // Message msg = consumer.receive(); + + // The consumer interceptor already created a span, so we're in that context + // Create a child span for custom processing + // Span processingSpan = tracer.spanBuilder("process-message") + // .setSpanKind(SpanKind.INTERNAL) + // .startSpan(); + + // try (Scope scope = processingSpan.makeCurrent()) { + // // Your processing logic here + // // Any spans created here will be children of processingSpan + // doSomeProcessing(msg.getValue()); + // processingSpan.setStatus(StatusCode.OK); + // } catch (Exception e) { + // processingSpan.recordException(e); + // processingSpan.setStatus(StatusCode.ERROR); + // throw e; + // } finally { + // processingSpan.end(); + // consumer.acknowledge(msg); + // } + } +} \ No newline at end of file