Skip to content

Commit 2aa720d

Browse files
committed
feat(kafka): added option to disable downstream suppression for ignored endpoints (#1652)
- To disable downstream suppression for ignored endpoints, set the environment variable: INSTANA_IGNORE_ENDPOINTS_DISABLE_SUPPRESSION - Currently applicable for kafka only
1 parent c21c717 commit 2aa720d

File tree

6 files changed

+245
-11
lines changed

6 files changed

+245
-11
lines changed

packages/collector/test/tracing/messaging/kafkajs/test.js

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,6 +1100,185 @@ mochaSuiteFn('tracing/kafkajs', function () {
11001100
});
11011101
});
11021102
});
1103+
1104+
describe('when downstream suppression is disabled via INSTANA_IGNORE_ENDPOINTS_DISABLE_SUPPRESSION', function () {
1105+
describe('when send method is ignored', function () {
1106+
before(async () => {
1107+
consumerControls = new ProcessControls({
1108+
appPath: path.join(__dirname, 'consumer'),
1109+
useGlobalAgent: true
1110+
});
1111+
1112+
producerControls = new ProcessControls({
1113+
appPath: path.join(__dirname, 'producer'),
1114+
useGlobalAgent: true,
1115+
env: {
1116+
// basic ignoring config for send
1117+
INSTANA_IGNORE_ENDPOINTS: 'kafka:send',
1118+
INSTANA_IGNORE_ENDPOINTS_DISABLE_SUPPRESSION: true
1119+
}
1120+
});
1121+
1122+
await consumerControls.startAndWaitForAgentConnection();
1123+
await producerControls.startAndWaitForAgentConnection();
1124+
});
1125+
1126+
beforeEach(async () => {
1127+
await agentControls.clearReceivedTraceData();
1128+
});
1129+
1130+
after(async () => {
1131+
await producerControls.stop();
1132+
await consumerControls.stop();
1133+
});
1134+
1135+
it('should ignore only the kafka call send and should trace downstream calls', async () => {
1136+
const message = {
1137+
key: 'someKey',
1138+
value: 'someMessage'
1139+
};
1140+
await producerControls.sendRequest({
1141+
method: 'POST',
1142+
path: '/send-messages',
1143+
simple: true,
1144+
body: JSON.stringify(message),
1145+
headers: {
1146+
'Content-Type': 'application/json'
1147+
}
1148+
});
1149+
1150+
await consumerControls.sendRequest({
1151+
method: 'GET',
1152+
path: '/health',
1153+
simple: true
1154+
});
1155+
1156+
await delay(200);
1157+
await retry(async () => {
1158+
const spans = await agentControls.getSpans();
1159+
1160+
// 2 x HTTP server (1 x consumer, 1 x producer)
1161+
// 3 x HTTP client (1 x producer)(2 x consumer)
1162+
// 2 x Kafka consume span (consumer)
1163+
expect(spans).to.have.lengthOf(7);
1164+
1165+
// Flow: HTTP entry (producer) (traced)
1166+
// ├── Kafka Produce (ignored)
1167+
// │ └── Kafka Consume (traced) → HTTP (traced)
1168+
// └── HTTP exit (traced)
1169+
//
1170+
// HTTP entry (consumer) (traced)
1171+
const kafkaConsumerSpan = spans.find(span => span.n === 'kafka' && span.k === 1);
1172+
const producerHttpSpan = spans.find(
1173+
span => span.n === 'node.http.server' && span.k === 1 && span.data.http.url === '/send-messages'
1174+
);
1175+
const producerHttpExitSpan = spans.find(span => span.n === 'node.http.client' && span.k === 2);
1176+
const consumerHttpSpan = spans.find(
1177+
span => span.n === 'node.http.server' && span.k === 1 && span.data.http.url === '/health'
1178+
);
1179+
1180+
expect(kafkaConsumerSpan).to.exist;
1181+
expect(kafkaConsumerSpan.data.kafka).to.include({
1182+
service: 'test-topic-1',
1183+
access: 'consume'
1184+
});
1185+
expect(producerHttpSpan).to.exist;
1186+
expect(producerHttpExitSpan).to.exist;
1187+
expect(consumerHttpSpan).to.exist;
1188+
});
1189+
});
1190+
});
1191+
1192+
describe('when consume method is ignored', function () {
1193+
before(async () => {
1194+
consumerControls = new ProcessControls({
1195+
appPath: path.join(__dirname, 'consumer'),
1196+
useGlobalAgent: true,
1197+
env: {
1198+
// basic ignoring config for consume
1199+
INSTANA_IGNORE_ENDPOINTS: 'kafka:consume',
1200+
INSTANA_IGNORE_ENDPOINTS_DISABLE_SUPPRESSION: true
1201+
}
1202+
});
1203+
1204+
producerControls = new ProcessControls({
1205+
appPath: path.join(__dirname, 'producer'),
1206+
useGlobalAgent: true
1207+
});
1208+
1209+
await consumerControls.startAndWaitForAgentConnection();
1210+
await producerControls.startAndWaitForAgentConnection();
1211+
});
1212+
1213+
beforeEach(async () => {
1214+
await agentControls.clearReceivedTraceData();
1215+
});
1216+
1217+
after(async () => {
1218+
await producerControls.stop();
1219+
await consumerControls.stop();
1220+
});
1221+
1222+
it('should ignore consume spans while trace downstream calls', async () => {
1223+
const message = {
1224+
key: 'someKey',
1225+
value: 'someMessage'
1226+
};
1227+
await producerControls.sendRequest({
1228+
method: 'POST',
1229+
path: '/send-messages',
1230+
simple: true,
1231+
body: JSON.stringify(message),
1232+
headers: {
1233+
'Content-Type': 'application/json'
1234+
}
1235+
});
1236+
1237+
await consumerControls.sendRequest({
1238+
method: 'GET',
1239+
path: '/health',
1240+
simple: true
1241+
});
1242+
1243+
await delay(200);
1244+
await retry(async () => {
1245+
const spans = await agentControls.getSpans();
1246+
1247+
// 2 x HTTP server (1 x consumer, 1 x producer)
1248+
// 3 x HTTP client (1 x producer)(2 x consumer)
1249+
// 1 x Kafka produce
1250+
expect(spans).to.have.lengthOf(6);
1251+
1252+
// Flow: HTTP entry (producer) (traced)
1253+
// ├── Kafka Produce (traced)
1254+
// │ └── Kafka Consume (ignored) → HTTP (traced)
1255+
// └── HTTP exit (traced)
1256+
//
1257+
// HTTP entry (consumer) (traced)
1258+
const kafkaConsumerSpan = spans.find(span => span.n === 'kafka' && span.k === 1);
1259+
const kafkaProducerSpan = spans.find(span => span.n === 'kafka' && span.k === 2);
1260+
const producerHttpSpan = spans.find(
1261+
span => span.n === 'node.http.server' && span.k === 1 && span.data.http.url === '/send-messages'
1262+
);
1263+
const producerHttpExitSpan = spans.find(span => span.n === 'node.http.client' && span.k === 2);
1264+
const consumerHttpSpan = spans.find(
1265+
span => span.n === 'node.http.server' && span.k === 1 && span.data.http.url === '/health'
1266+
);
1267+
1268+
expect(kafkaConsumerSpan).not.exist;
1269+
expect(kafkaProducerSpan).to.exist;
1270+
1271+
expect(kafkaProducerSpan.data.kafka).to.include({
1272+
service: 'test-topic-1',
1273+
access: 'send'
1274+
});
1275+
expect(producerHttpSpan).to.exist;
1276+
expect(producerHttpExitSpan).to.exist;
1277+
expect(consumerHttpSpan).to.exist;
1278+
});
1279+
});
1280+
});
1281+
});
11031282
});
11041283

11051284
function resetMessages(consumer) {

packages/core/src/tracing/cls.js

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ let serviceName;
2929
let processIdentityProvider = null;
3030
/** @type {Boolean} */
3131
let allowRootExitSpan;
32+
/** @type {Boolean} */
33+
let ignoreEndpointsDisableDownStreamSuppression;
3234

3335
/*
3436
* Access the Instana namespace in continuation local storage.
@@ -52,6 +54,7 @@ function init(config, _processIdentityProvider) {
5254
}
5355
processIdentityProvider = _processIdentityProvider;
5456
allowRootExitSpan = config?.tracing?.allowRootExitSpan;
57+
ignoreEndpointsDisableDownStreamSuppression = config?.tracing?.ignoreEndpointsDisableSuppression;
5558
}
5659

5760
class InstanaSpan {
@@ -149,6 +152,15 @@ class InstanaSpan {
149152
writable: true,
150153
enumerable: false
151154
});
155+
// This property "shouldSuppressDownstream" currently applicable only for ignoring endpoints.
156+
// When a span is ignored, downstream suppression is automatically enabled.
157+
// However, if required, downstream suppression propagation can be explicitly disabled
158+
// via the env variable `INSTANA_IGNORE_ENDPOINTS_DISABLE_SUPPRESSION`.
159+
Object.defineProperty(this, 'shouldSuppressDownstream', {
160+
value: false,
161+
writable: true,
162+
enumerable: false
163+
});
152164
}
153165

154166
/**
@@ -241,7 +253,12 @@ class InstanaIgnoredSpan extends InstanaSpan {
241253
*/
242254
constructor(name, data) {
243255
super(name, data);
256+
244257
this.isIgnored = true;
258+
// By default, downstream suppression for ignoring endpoints is enabled.
259+
// If the environment variable `INSTANA_IGNORE_ENDPOINTS_DISABLE_SUPPRESSION` is set,
260+
// should not suppress the downstream calls.
261+
this.shouldSuppressDownstream = !ignoreEndpointsDisableDownStreamSuppression;
245262
}
246263

247264
transmit() {
@@ -439,7 +456,10 @@ function setIgnoredSpan({ spanName, kind, traceId, parentId, data = {} }) {
439456

440457
// For entry spans, we need to retain suppression information to ensure that
441458
// tracing is suppressed for all internal (!) subsequent outgoing (exit) calls.
442-
setTracingLevel('0');
459+
// By default, downstream suppression for ignored spans is enabled.
460+
// If the environment variable `INSTANA_IGNORE_ENDPOINTS_DISABLE_SUPPRESSION is set,
461+
// should not suppress the downstream calls.
462+
if (span.shouldSuppressDownstream) setTracingLevel('0');
443463
}
444464

445465
// Set the span object as the currently active span in the active CLS context and also add a cleanup hook for when

packages/core/src/tracing/instrumentation/messaging/kafkaJs.js

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ function instrumentedSend(ctx, originalSend, originalArgs, topic, messages) {
9292
});
9393
if (Array.isArray(messages)) {
9494
span.b = { s: messages.length };
95-
setTraceHeaders({ messages, span, ignored: span.isIgnored });
95+
setTraceHeaders({ messages, span });
9696
}
9797
span.stack = tracingUtil.getStackTrace(instrumentedSend);
9898

@@ -165,7 +165,10 @@ function instrumentedSendBatch(ctx, originalSendBatch, originalArgs, topicMessag
165165

166166
span.stack = tracingUtil.getStackTrace(instrumentedSend);
167167
topicMessages.forEach(topicMessage => {
168-
setTraceHeaders({ messages: topicMessage.messages, span, ignored: span.isIgnored });
168+
setTraceHeaders({
169+
messages: topicMessage.messages,
170+
span
171+
});
169172
});
170173

171174
if (messageCount > 0) {
@@ -453,9 +456,9 @@ function removeInstanaHeadersFromMessage(message) {
453456
}
454457
}
455458

456-
function setTraceHeaders({ messages, span, ignored }) {
457-
if (ignored) {
458-
// If the span is ignored, suppress trace propagation to downstream services.
459+
function setTraceHeaders({ messages, span }) {
460+
if (span.shouldSuppressDownstream) {
461+
// Suppress trace propagation to downstream services.
459462
addTraceLevelSuppressionToAllMessages(messages);
460463
} else {
461464
// Otherwise, inject the trace context into the headers for propagation.

packages/core/src/tracing/instrumentation/messaging/rdkafka.js

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,10 @@ function instrumentedProduce(ctx, originalProduce, originalArgs) {
153153

154154
span.stack = tracingUtil.getStackTrace(instrumentedProduce, 1);
155155

156-
originalArgs[6] = setTraceHeaders({ headers: originalArgs[6], span, ignored: span.isIgnored });
156+
originalArgs[6] = setTraceHeaders({
157+
headers: originalArgs[6],
158+
span
159+
});
157160

158161
if (deliveryCb) {
159162
ctx.once('delivery-report', function instanaDeliveryReportListener(err) {
@@ -384,9 +387,9 @@ function logDeprecationKafkaAvroMessage() {
384387
'[Deprecation Warning] The support for kafka-avro library is deprecated and might be removed in the next major release. See https://github.com/waldophotos/kafka-avro/issues/120'
385388
);
386389
}
387-
function setTraceHeaders({ headers, span, ignored }) {
388-
if (ignored) {
389-
// If the span is ignored, suppress trace propagation to downstream services.
390+
function setTraceHeaders({ headers, span }) {
391+
if (span.shouldSuppressDownstream) {
392+
// Suppress trace propagation to downstream services.
390393
return addTraceLevelSuppression(headers);
391394
} else {
392395
// Otherwise, inject the trace context into the headers for propagation.

packages/core/src/util/normalizeConfig.js

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ const configNormalizers = require('./configNormalizers');
2727
* @property {KafkaTracingOptions} [kafka]
2828
* @property {boolean} [allowRootExitSpan]
2929
* @property {import('../tracing').IgnoreEndpoints} [ignoreEndpoints]
30+
* @property {boolean} [ignoreEndpointsDisableSuppression]
3031
*/
3132

3233
/**
@@ -124,7 +125,8 @@ const defaults = {
124125
kafka: {
125126
traceCorrelation: true
126127
},
127-
ignoreEndpoints: {}
128+
ignoreEndpoints: {},
129+
ignoreEndpointsDisableSuppression: false
128130
},
129131
secrets: {
130132
matcherMode: 'contains-ignore-case',
@@ -243,6 +245,7 @@ function normalizeTracingConfig(config) {
243245
normalizeTracingKafka(config);
244246
normalizeAllowRootExitSpan(config);
245247
normalizeIgnoreEndpoints(config);
248+
normalizeIgnoreEndpointsDisableSuppression(config);
246249
}
247250

248251
/**
@@ -745,3 +748,18 @@ function normalizeIgnoreEndpoints(config) {
745748
return;
746749
}
747750
}
751+
752+
/**
753+
* @param {InstanaConfig} config
754+
*/
755+
function normalizeIgnoreEndpointsDisableSuppression(config) {
756+
if (process.env['INSTANA_IGNORE_ENDPOINTS_DISABLE_SUPPRESSION'] === 'true') {
757+
logger.info(
758+
'Disabling downstream suppression for ignoring endpoints feature as it is explicitly disabled via environment variable "INSTANA_IGNORE_ENDPOINTS_DISABLE_SUPPRESSION".'
759+
);
760+
config.tracing.ignoreEndpointsDisableSuppression = true;
761+
return;
762+
}
763+
764+
config.tracing.ignoreEndpointsDisableSuppression = defaults.tracing.ignoreEndpointsDisableSuppression;
765+
}

packages/core/test/util/normalizeConfig_test.js

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,17 @@ describe('util.normalizeConfig', () => {
646646
const config = normalizeConfig();
647647
expect(config.tracing.ignoreEndpoints).to.deep.equal({});
648648
});
649+
650+
it('should return false when INSTANA_IGNORE_ENDPOINTS_DISABLE_SUPPRESSION is not set', () => {
651+
const config = normalizeConfig();
652+
expect(config.tracing.ignoreEndpointsDisableSuppression).to.deep.equal(false);
653+
});
654+
655+
it('should return true when INSTANA_IGNORE_ENDPOINTS_DISABLE_SUPPRESSION is set', () => {
656+
process.env.INSTANA_IGNORE_ENDPOINTS_DISABLE_SUPPRESSION = true;
657+
const config = normalizeConfig();
658+
expect(config.tracing.ignoreEndpointsDisableSuppression).to.deep.equal(true);
659+
});
649660
});
650661

651662
function checkDefaults(config) {

0 commit comments

Comments
 (0)