From 5818f14721887f05fd9ededf317bfbf4b16e0a92 Mon Sep 17 00:00:00 2001 From: Lester Haynes Date: Thu, 19 Oct 2023 14:37:49 -0700 Subject: [PATCH 1/3] Customize kafka instrumentation --- .../KafkaClientsInstrumentationModule.java | 5 +- .../v0_11/KafkaConsumerInstrumentation.java | 48 +--- ...iceCallExecutorServiceInstrumentation.java | 44 ++++ .../kafkaclients/v0_11/TracingQueue.java | 212 ++++++++++++++++++ ...rackingConsumerWrapperInstrumentation.java | 57 +++++ ...WrappedExecutorServiceInstrumentation.java | 39 ++++ 6 files changed, 359 insertions(+), 46 deletions(-) create mode 100644 instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ServiceCallExecutorServiceInstrumentation.java create mode 100644 instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingQueue.java create mode 100644 instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TrackingConsumerWrapperInstrumentation.java create mode 100644 instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/WrappedExecutorServiceInstrumentation.java diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientsInstrumentationModule.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientsInstrumentationModule.java index 4b71df3df6ef..659f1efb6335 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientsInstrumentationModule.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientsInstrumentationModule.java @@ -29,6 +29,9 @@ public List typeInstrumentations() { return asList( new KafkaProducerInstrumentation(), new KafkaConsumerInstrumentation(), - new ConsumerRecordsInstrumentation()); + new ConsumerRecordsInstrumentation(), + new ServiceCallExecutorServiceInstrumentation(), + //new WrappedExecutorServiceInstrumentation(), + new TrackingConsumerWrapperInstrumentation()); } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java index 5a7f2aad4440..6e64d13c22e7 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java @@ -7,13 +7,8 @@ import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext; import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.consumerReceiveInstrumenter; -import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.enhanceConfig; -import static net.bytebuddy.matcher.ElementMatchers.isConstructor; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.returns; -import static net.bytebuddy.matcher.ElementMatchers.takesArgument; -import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil; @@ -23,10 +18,6 @@ import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -38,49 +29,16 @@ public class KafkaConsumerInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { - return named("org.apache.kafka.clients.consumer.KafkaConsumer"); + //return nameContains("Consumer").or( + return named("com.linkedin.kafka.clients.consumer.LiKafkaConsumerImpl"); } @Override public void transform(TypeTransformer transformer) { - transformer.applyAdviceToMethod( - isConstructor().and(takesArgument(0, Map.class)), - this.getClass().getName() + "$ConstructorMapAdvice"); - transformer.applyAdviceToMethod( - isConstructor().and(takesArgument(0, Properties.class)), - this.getClass().getName() + "$ConstructorPropertiesAdvice"); - transformer.applyAdviceToMethod( - named("poll") - .and(isPublic()) - .and(takesArguments(1)) - .and(takesArgument(0, long.class).or(takesArgument(0, Duration.class))) - .and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))), + transformer.applyAdviceToMethod(named("poll").and(isPublic()), this.getClass().getName() + "$PollAdvice"); } - @SuppressWarnings("unused") - public static class ConstructorMapAdvice { - - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) Map config) { - // ensure config is a mutable map - if (config.getClass() != HashMap.class) { - config = new HashMap<>(config); - } - enhanceConfig(config); - } - } - - @SuppressWarnings("unused") - public static class ConstructorPropertiesAdvice { - - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter(@Advice.Argument(0) Properties config) { - enhanceConfig(config); - } - } - @SuppressWarnings("unused") public static class PollAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ServiceCallExecutorServiceInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ServiceCallExecutorServiceInstrumentation.java new file mode 100644 index 000000000000..3bc5df414a27 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ServiceCallExecutorServiceInstrumentation.java @@ -0,0 +1,44 @@ +package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11; + +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.named; + +import io.opentelemetry.context.Context; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; + +import java.util.concurrent.ExecutorService; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.implementation.bytecode.assign.Assigner; +import net.bytebuddy.matcher.ElementMatcher; + + +public class ServiceCallExecutorServiceInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("com.linkedin.container.servicecall.ServiceCallExecutorService"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod().and(isConstructor()), + ServiceCallExecutorServiceInstrumentation.class.getName() + "$ConstructorAdvice"); + } + + @SuppressWarnings("unused") + public static class ConstructorAdvice { + @Advice.OnMethodExit + public static void wrap( + @Advice.FieldValue(value = "_threadType") Object threadType, + @Advice.This( + typing = Assigner.Typing.DYNAMIC, readOnly = false) ExecutorService executorService) { + if (((Enum) threadType).name().equals("QUEUE")) { + executorService = Context.taskWrapping(executorService); + } + } + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingQueue.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingQueue.java new file mode 100644 index 000000000000..633fd1fc0dae --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingQueue.java @@ -0,0 +1,212 @@ +package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext; +import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil; +import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest; +import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.consumerProcessInstrumenter; + + +@SuppressWarnings("serial") +public final class TracingQueue extends LinkedBlockingQueue> { + private final LinkedBlockingQueue> delegate; + + @Nullable + private KafkaProcessRequest currentRequest; + @Nullable + private Context currentContext; + @Nullable + private Scope currentScope; + private TracingQueue(LinkedBlockingQueue> delegate) { + this.delegate = delegate; + } + + public static LinkedBlockingQueue> wrap( + LinkedBlockingQueue> delegate) { + System.out.println("Are you ready to wrap?"); + return new TracingQueue<>(delegate); + } + + @Override + public boolean add(ConsumerRecord e) { + return delegate.add(e); + } + + @Override + public boolean offer(ConsumerRecord e) { + return delegate.offer(e); + } + + @Override + public boolean offer(ConsumerRecord e, long timeout, TimeUnit unit) throws InterruptedException { + return delegate.offer(e, timeout, unit); + } + + @Override + public ConsumerRecord remove() { + return delegate.remove(); + } + + @Override + public boolean remove(Object o) { + return delegate.remove(o); + } + + @Override + public void put(ConsumerRecord e) throws InterruptedException { + delegate.put(e); + } + + @Override + public ConsumerRecord poll() { + closeScopeAndEndSpan(); + ConsumerRecord next = delegate.poll(); + this.handlePoll(next); + return next; + } + + @Override + public ConsumerRecord poll(long timeout, TimeUnit unit) throws InterruptedException { + closeScopeAndEndSpan(); + ConsumerRecord next = delegate.poll(timeout, unit); + this.handlePoll(next); + return next; + } + + @Override + public ConsumerRecord element() { + return delegate.element(); + } + + @Override + public ConsumerRecord peek() { + return delegate.peek(); + } + + @Override + public ConsumerRecord take() throws InterruptedException { + closeScopeAndEndSpan(); + ConsumerRecord next = delegate.take(); + this.handlePoll(next); + return next; + } + + private void handlePoll(ConsumerRecord record) { + if (record != null && KafkaClientsConsumerProcessTracing.wrappingEnabled()) { + //String msg = String.format("Starting span for record %s", record); + //LOGGER.info(msg); + //System.out.println(msg); + KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(record); + Context receiveContext = consumerContext.getContext(); + Context parentContext = receiveContext != null ? receiveContext : Context.current(); + currentRequest = KafkaProcessRequest.create(consumerContext, record); + currentContext = consumerProcessInstrumenter().start(parentContext, currentRequest); + currentScope = currentContext.makeCurrent(); + } + } + + @Override + public int remainingCapacity() { + return delegate.remainingCapacity(); + } + + @Override + public boolean containsAll(Collection c) { + return delegate.containsAll(c); + } + + @Override + public boolean addAll(Collection> c) { + return delegate.addAll(c); + } + + @Override + public boolean removeAll(Collection c) { + return delegate.removeAll(c); + } + + @Override + public boolean retainAll(Collection c) { + return delegate.retainAll(c); + } + + @Override + public void clear() { + delegate.clear(); + } + + @Override + public int size() { + return delegate.size(); + } + + @Override + public boolean isEmpty() { + return delegate.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return delegate.contains(o); + } + + @Override + public Iterator> iterator() { + return delegate.iterator(); + } + + @Override + public Object[] toArray() { + return delegate.toArray(); + } + + @Override + public T[] toArray(T[] a) { + return delegate.toArray(a); + } + + @Override + public int drainTo(Collection> c) { + return delegate.drainTo(c); + } + + @Override + public int drainTo(Collection> c, int maxElements) { + return delegate.drainTo(c, maxElements); + } + + private void closeScopeAndEndSpan() { + if (currentScope != null) { + //String msg = String.format("Ending span for previous record %s", _currentRequest.getRecord()); + //LOGGER.info(msg); + //System.out.println(msg); + currentScope.close(); + consumerProcessInstrumenter().end(currentContext, currentRequest, null, null); + currentScope = null; + currentRequest = null; + currentContext = null; + } + } + + private void writeObject(ObjectOutputStream stream) + throws IOException { + stream.defaultWriteObject(); + } + + private void readObject(ObjectInputStream stream) + throws IOException, ClassNotFoundException { + stream.defaultReadObject(); + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TrackingConsumerWrapperInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TrackingConsumerWrapperInstrumentation.java new file mode 100644 index 000000000000..2a82b2ec442f --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TrackingConsumerWrapperInstrumentation.java @@ -0,0 +1,57 @@ +package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11; + +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.named; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.kafka.clients.consumer.ConsumerRecord; + + +public class TrackingConsumerWrapperInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("com.linkedin.tracker.consumer.TrackingConsumerWrapper"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod().and(named("subscribe")), + TrackingConsumerWrapperInstrumentation.class.getName() + "$SubscribeAdvice"); + } + + @SuppressWarnings("unused") + public static class SubscribeAdvice { + @Advice.OnMethodExit + public static void wrap( + @Advice.FieldValue(readOnly = false, value = "_topicRecordQueues") + ConcurrentHashMap>>> recordQueues) { + + // it's important not to suppress consumer span creation here because this instrumentation can + // leak the context and so there may be a leaked consumer span in the context, in which + // case it's important to overwrite the leaked span instead of suppressing the correct span + // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947) + + System.out.println("SubscribeAdvice was called!"); + for (Map.Entry>>> entry : recordQueues.entrySet()) { + String topic = entry.getKey(); + List>> queues = entry.getValue(); + for (int i = 0; i < queues.size(); i++) { + LinkedBlockingQueue> toWrap = queues.get(i); + System.out.println("Wrapping this queue " + toWrap + " for topic " + topic); + queues.set(i, TracingQueue.wrap(toWrap)); + System.out.println("Wrapped queue " + toWrap + " for topic " + topic); + } + } + } + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/WrappedExecutorServiceInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/WrappedExecutorServiceInstrumentation.java new file mode 100644 index 000000000000..2d14bb87b7ea --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/WrappedExecutorServiceInstrumentation.java @@ -0,0 +1,39 @@ +package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11; + +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.nameContains; +import static net.bytebuddy.matcher.ElementMatchers.named; + +import io.opentelemetry.context.Context; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; + +import java.util.concurrent.ExecutorService; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + + +public class WrappedExecutorServiceInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return nameContains("WrappedExecutorService"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod().and(named("getServiceCallExecutorService")), + WrappedExecutorServiceInstrumentation.class.getName() + "$GetAdvice"); + } + + @SuppressWarnings("unused") + public static class GetAdvice { + @Advice.OnMethodExit + public static void wrap( + @Advice.Return(readOnly = false) ExecutorService executorService) { + executorService = Context.taskWrapping(executorService); + } + } +} From f49a1a09da5a38b337270a108d55281c5962bd3e Mon Sep 17 00:00:00 2001 From: Lester Haynes Date: Sun, 22 Oct 2023 21:32:45 -0700 Subject: [PATCH 2/3] tweaks --- .../kafkaclients/v0_11/KafkaClientsInstrumentationModule.java | 3 +-- .../kafkaclients/v0_11/KafkaConsumerInstrumentation.java | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientsInstrumentationModule.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientsInstrumentationModule.java index 659f1efb6335..91a2e64b7485 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientsInstrumentationModule.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientsInstrumentationModule.java @@ -29,9 +29,8 @@ public List typeInstrumentations() { return asList( new KafkaProducerInstrumentation(), new KafkaConsumerInstrumentation(), - new ConsumerRecordsInstrumentation(), + //new ConsumerRecordsInstrumentation(), new ServiceCallExecutorServiceInstrumentation(), - //new WrappedExecutorServiceInstrumentation(), new TrackingConsumerWrapperInstrumentation()); } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java index 6e64d13c22e7..6cab15979818 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java @@ -29,8 +29,7 @@ public class KafkaConsumerInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { - //return nameContains("Consumer").or( - return named("com.linkedin.kafka.clients.consumer.LiKafkaConsumerImpl"); + return named("com.linkedin.kafka.clients.consumer.LiKafkaConsumerImpl"); } @Override From cc0e1ad70b88d01221407dcaca9dbb7e80f5619d Mon Sep 17 00:00:00 2001 From: Lester Haynes Date: Tue, 24 Oct 2023 13:26:49 -0700 Subject: [PATCH 3/3] more tweaks --- .../v0_11/ConsumerRecordsInstrumentation.java | 2 +- .../KafkaClientsInstrumentationModule.java | 2 +- .../kafkaclients/v0_11/TracingIterator.java | 19 ++++++++++++++++--- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java index 5ba44989bedc..110823c1bbf1 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java @@ -106,7 +106,7 @@ public static void wrap( // case it's important to overwrite the leaked span instead of suppressing the correct span // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947) KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records); - iterator = TracingIterator.wrap(iterator, consumerContext); + iterator = TracingIterator.wrap(iterator, consumerContext, true); } } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientsInstrumentationModule.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientsInstrumentationModule.java index 91a2e64b7485..cf248204971a 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientsInstrumentationModule.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientsInstrumentationModule.java @@ -29,7 +29,7 @@ public List typeInstrumentations() { return asList( new KafkaProducerInstrumentation(), new KafkaConsumerInstrumentation(), - //new ConsumerRecordsInstrumentation(), + new ConsumerRecordsInstrumentation(), new ServiceCallExecutorServiceInstrumentation(), new TrackingConsumerWrapperInstrumentation()); } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterator.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterator.java index 957439a4e721..d1d7f5f3347f 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterator.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterator.java @@ -21,6 +21,7 @@ public class TracingIterator implements Iterator> { private final Iterator> delegateIterator; private final Context parentContext; private final KafkaConsumerContext consumerContext; + private final boolean endTraceImmediately; /* * Note: this may potentially create problems if this iterator is used from different threads. But @@ -31,19 +32,28 @@ public class TracingIterator implements Iterator> { @Nullable private Scope currentScope; private TracingIterator( - Iterator> delegateIterator, KafkaConsumerContext consumerContext) { + Iterator> delegateIterator, KafkaConsumerContext consumerContext, + boolean endTraceImmediately) { this.delegateIterator = delegateIterator; Context receiveContext = consumerContext.getContext(); // use the receive CONSUMER as parent if it's available this.parentContext = receiveContext != null ? receiveContext : Context.current(); this.consumerContext = consumerContext; + this.endTraceImmediately = endTraceImmediately; } - public static Iterator> wrap( + + public static Iterator> wrap( Iterator> delegateIterator, KafkaConsumerContext consumerContext) { + return wrap(delegateIterator, consumerContext, false); + } + + public static Iterator> wrap( + Iterator> delegateIterator, KafkaConsumerContext consumerContext, + boolean endTraceImmediately) { if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) { - return new TracingIterator<>(delegateIterator, consumerContext); + return new TracingIterator<>(delegateIterator, consumerContext, endTraceImmediately); } return delegateIterator; } @@ -70,6 +80,9 @@ public ConsumerRecord next() { currentContext = consumerProcessInstrumenter().start(parentContext, currentRequest); currentScope = currentContext.makeCurrent(); } + if (endTraceImmediately) { + closeScopeAndEndSpan(); + } return next; }