Skip to content

Commit 8e3014c

Browse files
committed
feat: add ack context map and producer improvements for batching
- Add ack context map to preserve trace context across batched acknowledges - Update producer to use batchSpan._startTime for accurate publish time - Add explicit parent span support in client plugin - Wrap Message.ack() to store context before batched gRPC acknowledge - Update Subscription.emit to properly handle storage context - Sync auto-load improvements from Branch 1
1 parent 68122e8 commit 8e3014c

File tree

3 files changed

+130
-11
lines changed

3 files changed

+130
-11
lines changed

packages/datadog-instrumentations/src/google-cloud-pubsub.js

Lines changed: 118 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,17 @@ const {
55
addHook
66
} = require('./helpers/instrument')
77
const shimmer = require('../../datadog-shimmer')
8+
const { storage } = require('../../datadog-core')
89

910
// Auto-load push subscription plugin for push subscribers
1011
// This ensures pubsub.delivery spans work even if app doesn't import @google-cloud/pubsub
12+
// The plugin's constructor subscribes to apm:http:server:request:start immediately (no tracer needed)
13+
// The tracer is only used at request time when delivery spans are created
1114
try {
12-
const tracer = require('../../dd-trace')
1315
const PushSubscriptionPlugin = require('../../datadog-plugin-google-cloud-pubsub/src/pubsub-push-subscription')
14-
if (tracer._tracer) {
15-
const handler = new PushSubscriptionPlugin(tracer._tracer, {})
16-
handler.configure({})
17-
}
16+
const pushPlugin = new PushSubscriptionPlugin(null, {})
17+
// Plugin instance kept alive to maintain channel subscription
18+
pushPlugin.configure({})
1819
} catch {
1920
// Silent - push subscription plugin is optional
2021
}
@@ -27,6 +28,10 @@ const receiveStartCh = channel('apm:google-cloud-pubsub:receive:start')
2728
const receiveFinishCh = channel('apm:google-cloud-pubsub:receive:finish')
2829
const receiveErrorCh = channel('apm:google-cloud-pubsub:receive:error')
2930

31+
// Global map to store ackId -> span context for batched acknowledges
32+
// This allows us to restore the correct trace context when the batched gRPC call happens
33+
const ackContextMap = new Map()
34+
3035
const publisherMethods = [
3136
'createTopic',
3237
'updateTopic',
@@ -77,7 +82,75 @@ function wrapMethod (method) {
7782
return function (request) {
7883
if (!requestStartCh.hasSubscribers) return method.apply(this, arguments)
7984

85+
// For acknowledge/modifyAckDeadline, try to restore span context from stored map
86+
let restoredStore = null
87+
if (api === 'acknowledge' || api === 'modifyAckDeadline') {
88+
if (request && request.ackIds && request.ackIds.length > 0) {
89+
// Try to find a stored context for any of these ack IDs
90+
for (const ackId of request.ackIds) {
91+
const storedContext = ackContextMap.get(ackId)
92+
if (storedContext) {
93+
restoredStore = storedContext
94+
break
95+
}
96+
}
97+
98+
// Clean up ackIds from the map ONLY for acknowledge, not modifyAckDeadline
99+
// ModifyAckDeadline happens first (lease extension), then acknowledge happens later
100+
if (api === 'acknowledge') {
101+
request.ackIds.forEach(ackId => {
102+
if (ackContextMap.has(ackId)) {
103+
ackContextMap.delete(ackId)
104+
}
105+
})
106+
}
107+
}
108+
}
109+
80110
const ctx = { request, api, projectId: this.auth._cachedProjectId }
111+
112+
// If we have a restored context, run in that context
113+
if (restoredStore) {
114+
// CRITICAL: Add the parent span to ctx so the plugin uses it as parent
115+
const parentSpan = restoredStore.span
116+
if (parentSpan) {
117+
ctx.parentSpan = parentSpan
118+
}
119+
const self = this
120+
const args = arguments
121+
return storage('legacy').run(restoredStore, () => {
122+
return requestStartCh.runStores(ctx, () => {
123+
const cb = args[args.length - 1]
124+
125+
if (typeof cb === 'function') {
126+
args[args.length - 1] = shimmer.wrapFunction(cb, cb => function (error) {
127+
if (error) {
128+
ctx.error = error
129+
requestErrorCh.publish(ctx)
130+
}
131+
return requestFinishCh.runStores(ctx, cb, this, ...arguments)
132+
})
133+
return method.apply(self, args)
134+
}
135+
136+
return method.apply(self, args)
137+
.then(
138+
response => {
139+
requestFinishCh.publish(ctx)
140+
return response
141+
},
142+
error => {
143+
ctx.error = error
144+
requestErrorCh.publish(ctx)
145+
requestFinishCh.publish(ctx)
146+
throw error
147+
}
148+
)
149+
})
150+
})
151+
}
152+
153+
// Otherwise run normally
81154
return requestStartCh.runStores(ctx, () => {
82155
const cb = arguments[arguments.length - 1]
83156

@@ -123,7 +196,12 @@ addHook({ name: '@google-cloud/pubsub', versions: ['>=1.2'] }, (obj) => {
123196
shimmer.wrap(Subscription.prototype, 'emit', emit => function (eventName, message) {
124197
if (eventName !== 'message' || !message) return emit.apply(this, arguments)
125198

126-
const ctx = {}
199+
// Get the current async context store (should contain the pubsub.delivery span)
200+
const store = storage('legacy').getStore()
201+
202+
// If we have a span in the store, the context is properly set up
203+
// The user's message handler will now run in this context and see the active span
204+
const ctx = { message, store }
127205
try {
128206
return emit.apply(this, arguments)
129207
} catch (err) {
@@ -136,6 +214,40 @@ addHook({ name: '@google-cloud/pubsub', versions: ['>=1.2'] }, (obj) => {
136214
return obj
137215
})
138216

217+
// Wrap message.ack() - must hook the subscriber-message.js file directly since Message is not exported from main module
218+
addHook({ name: '@google-cloud/pubsub', versions: ['>=1.2'], file: 'build/src/subscriber.js' }, (obj) => {
219+
const Message = obj.Message
220+
221+
222+
if (Message && Message.prototype && Message.prototype.ack) {
223+
shimmer.wrap(Message.prototype, 'ack', originalAck => function () {
224+
// Capture the current active span and create a store with its context
225+
const currentStore = storage('legacy').getStore()
226+
const activeSpan = currentStore && currentStore.span
227+
228+
if (activeSpan) {
229+
230+
// CRITICAL: We must store a context that reflects the span's actual trace
231+
// The span might have been created with a custom parent (reparented to pubsub.request)
232+
// but the async storage might still contain the original context.
233+
// So we create a fresh store with the span to ensure the correct trace is preserved.
234+
const storeWithSpanContext = { ...currentStore, span: activeSpan }
235+
236+
if (this.ackId) {
237+
ackContextMap.set(this.ackId, storeWithSpanContext)
238+
}
239+
} else {
240+
}
241+
242+
return originalAck.apply(this, arguments)
243+
})
244+
245+
} else {
246+
}
247+
248+
return obj
249+
})
250+
139251
addHook({ name: '@google-cloud/pubsub', versions: ['>=1.2'], file: 'build/src/lease-manager.js' }, (obj) => {
140252
const LeaseManager = obj.LeaseManager
141253
const ctx = {}

packages/datadog-plugin-google-cloud-pubsub/src/client.js

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,23 @@ class GoogleCloudPubsubClientPlugin extends ClientPlugin {
1212

1313
if (api === 'publish') return
1414

15-
this.startSpan(this.operationName(), {
15+
const explicitParent = ctx.parentSpan // From restored context in wrapMethod
16+
const spanOptions = {
1617
service: this.config.service || this.serviceName(),
1718
resource: [api, request.name].filter(Boolean).join(' '),
1819
kind: this.constructor.kind,
1920
meta: {
2021
'pubsub.method': api,
2122
'gcloud.project_id': projectId
2223
}
23-
}, ctx)
24+
}
25+
26+
// If we have an explicit parent span (from restored context), use it
27+
if (explicitParent) {
28+
spanOptions.childOf = explicitParent.context()
29+
}
30+
31+
this.startSpan(this.operationName(), spanOptions, ctx)
2432

2533
return ctx.currentStore
2634
}

packages/datadog-plugin-google-cloud-pubsub/src/producer.js

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,15 +87,14 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin {
8787
'_dd.batch.size': String(messageCount),
8888
'_dd.batch.index': String(i),
8989
'gcloud.project_id': projectId,
90-
'pubsub.topic': topic
90+
'pubsub.topic': topic,
91+
'x-dd-publish-start-time': String(Math.floor(batchSpan._startTime))
9192
})
9293

9394
if (batchTraceIdUpper) {
9495
msg.attributes['_dd.pubsub_request.p.tid'] = batchTraceIdUpper
9596
}
9697

97-
msg.attributes['x-dd-publish-start-time'] ??= String(Date.now())
98-
9998
if (this.config.dsmEnabled) {
10099
const dataStreamsContext = this.tracer.setCheckpoint(
101100
['direction:out', `topic:${topic}`, 'type:google-pubsub'],

0 commit comments

Comments
 (0)