Skip to content

Commit 68122e8

Browse files
committed
feat: add producer-side batch message handling with span linking
- Collect span links from messages 2-N (first becomes parent) - Extract parent context from first message trace context - Create pubsub.request span with span links metadata - Inject batch metadata into all messages (_dd.pubsub_request.*, _dd.batch.*) - Add 128-bit trace ID support (_dd.p.tid) - Add operation tag for batched vs single requests
1 parent 05f7c87 commit 68122e8

File tree

1 file changed

+124
-12
lines changed
  • packages/datadog-plugin-google-cloud-pubsub/src

1 file changed

+124
-12
lines changed

packages/datadog-plugin-google-cloud-pubsub/src/producer.js

Lines changed: 124 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,36 +9,148 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin {
99

1010
bindStart (ctx) {
1111
const { request, api, projectId } = ctx
12-
1312
if (api !== 'publish') return
1413

1514
const messages = request.messages || []
1615
const topic = request.topic
16+
const messageCount = messages.length
17+
const hasTraceContext = messages[0]?.attributes?.['x-datadog-trace-id']
18+
19+
// Collect span links from messages 2-N (skip first - it becomes parent)
20+
const spanLinkData = hasTraceContext
21+
? messages.slice(1).map(msg => this._extractSpanLink(msg.attributes)).filter(Boolean)
22+
: []
23+
24+
// Extract parent from first message
25+
const firstAttrs = messages[0]?.attributes
26+
const parentData = firstAttrs?.['x-datadog-trace-id'] && firstAttrs['x-datadog-parent-id']
27+
? {
28+
traceId: firstAttrs['x-datadog-trace-id'],
29+
spanId: firstAttrs['x-datadog-parent-id'],
30+
traceIdUpper: firstAttrs['_dd.p.tid'],
31+
samplingPriority: firstAttrs['x-datadog-sampling-priority']
32+
}
33+
: null
34+
35+
// Create pubsub.request span
1736
const topicName = topic.split('/').pop() || topic
18-
const span = this.startSpan({ // TODO: rename
37+
const batchSpan = this.startSpan({
38+
childOf: parentData ? this._extractParentContext(parentData) : undefined,
1939
resource: `${api} to Topic ${topicName}`,
2040
meta: {
2141
'gcloud.project_id': projectId,
22-
'pubsub.method': api, // TODO: remove
23-
'pubsub.topic': topic
42+
'pubsub.method': api,
43+
'pubsub.topic': topic,
44+
'span.kind': 'producer',
45+
'_dd.base_service': this.tracer._service,
46+
'_dd.serviceoverride.type': 'integration',
47+
'pubsub.linked_message_count': spanLinkData.length || undefined,
48+
operation: messageCount > 1 ? 'batched.pubsub.request' : 'pubsub.request'
49+
},
50+
metrics: {
51+
'pubsub.batch.message_count': messageCount,
52+
'pubsub.batch': messageCount > 1 ? true : undefined
2453
}
2554
}, ctx)
2655

27-
for (const msg of messages) {
28-
if (!msg.attributes) {
29-
msg.attributes = {}
56+
const spanCtx = batchSpan.context()
57+
const batchTraceId = spanCtx.toTraceId()
58+
const batchSpanId = spanCtx.toSpanId()
59+
const batchTraceIdUpper = spanCtx._trace.tags['_dd.p.tid']
60+
61+
// Convert to hex for storage (simpler, used directly by span links)
62+
const batchTraceIdHex = BigInt(batchTraceId).toString(16).padStart(16, '0')
63+
const batchSpanIdHex = BigInt(batchSpanId).toString(16).padStart(16, '0')
64+
65+
// Add span links as metadata
66+
if (spanLinkData.length) {
67+
batchSpan.setTag('_dd.span_links', JSON.stringify(
68+
spanLinkData.map(link => ({
69+
trace_id: link.traceId,
70+
span_id: link.spanId,
71+
flags: link.samplingPriority || 0
72+
}))
73+
))
74+
}
75+
76+
// Add metadata to all messages
77+
messages.forEach((msg, i) => {
78+
msg.attributes = msg.attributes || {}
79+
80+
if (!hasTraceContext) {
81+
this.tracer.inject(batchSpan, 'text_map', msg.attributes)
82+
}
83+
84+
Object.assign(msg.attributes, {
85+
'_dd.pubsub_request.trace_id': batchTraceIdHex,
86+
'_dd.pubsub_request.span_id': batchSpanIdHex,
87+
'_dd.batch.size': String(messageCount),
88+
'_dd.batch.index': String(i),
89+
'gcloud.project_id': projectId,
90+
'pubsub.topic': topic
91+
})
92+
93+
if (batchTraceIdUpper) {
94+
msg.attributes['_dd.pubsub_request.p.tid'] = batchTraceIdUpper
3095
}
31-
this.tracer.inject(span, 'text_map', msg.attributes)
96+
97+
msg.attributes['x-dd-publish-start-time'] ??= String(Date.now())
98+
3299
if (this.config.dsmEnabled) {
33-
const payloadSize = getHeadersSize(msg)
34-
const dataStreamsContext = this.tracer
35-
.setCheckpoint(['direction:out', `topic:${topic}`, 'type:google-pubsub'], span, payloadSize)
100+
const dataStreamsContext = this.tracer.setCheckpoint(
101+
['direction:out', `topic:${topic}`, 'type:google-pubsub'],
102+
batchSpan,
103+
getHeadersSize(msg)
104+
)
36105
DsmPathwayCodec.encode(dataStreamsContext, msg.attributes)
37106
}
38-
}
107+
})
39108

109+
ctx.batchSpan = batchSpan
40110
return ctx.currentStore
41111
}
112+
113+
bindFinish (ctx) {
114+
if (ctx.batchSpan && !ctx.batchSpan._duration) ctx.batchSpan.finish()
115+
return super.bindFinish(ctx)
116+
}
117+
118+
bindError (ctx) {
119+
if (ctx.error && ctx.batchSpan) {
120+
ctx.batchSpan.setTag('error', ctx.error)
121+
ctx.batchSpan.finish()
122+
}
123+
return super.bindError(ctx)
124+
}
125+
126+
_extractSpanLink (attrs) {
127+
if (!attrs?.['x-datadog-trace-id'] || !attrs['x-datadog-parent-id']) return null
128+
129+
const lowerHex = BigInt(attrs['x-datadog-trace-id']).toString(16).padStart(16, '0')
130+
const spanIdHex = BigInt(attrs['x-datadog-parent-id']).toString(16).padStart(16, '0')
131+
const traceIdHex = attrs['_dd.p.tid']
132+
? attrs['_dd.p.tid'] + lowerHex
133+
: lowerHex.padStart(32, '0')
134+
135+
return {
136+
traceId: traceIdHex,
137+
spanId: spanIdHex,
138+
samplingPriority: attrs['x-datadog-sampling-priority']
139+
? Number.parseInt(attrs['x-datadog-sampling-priority'], 10)
140+
: undefined
141+
}
142+
}
143+
144+
_extractParentContext (data) {
145+
const carrier = {
146+
'x-datadog-trace-id': data.traceId,
147+
'x-datadog-parent-id': data.spanId
148+
}
149+
if (data.traceIdUpper) carrier['_dd.p.tid'] = data.traceIdUpper
150+
if (data.samplingPriority) carrier['x-datadog-sampling-priority'] = String(data.samplingPriority)
151+
152+
return this.tracer.extract('text_map', carrier)
153+
}
42154
}
43155

44156
module.exports = GoogleCloudPubsubProducerPlugin

0 commit comments

Comments
 (0)