diff --git a/packages/datadog-instrumentations/src/google-cloud-pubsub.js b/packages/datadog-instrumentations/src/google-cloud-pubsub.js index 30348115f48..49a0ad72c92 100644 --- a/packages/datadog-instrumentations/src/google-cloud-pubsub.js +++ b/packages/datadog-instrumentations/src/google-cloud-pubsub.js @@ -5,6 +5,7 @@ const { addHook } = require('./helpers/instrument') const shimmer = require('../../datadog-shimmer') +const { storage } = require('../../datadog-core') const log = require('../../dd-trace/src/log') // Auto-load push subscription plugin to enable pubsub.delivery spans for push subscriptions @@ -24,6 +25,8 @@ const receiveStartCh = channel('apm:google-cloud-pubsub:receive:start') const receiveFinishCh = channel('apm:google-cloud-pubsub:receive:finish') const receiveErrorCh = channel('apm:google-cloud-pubsub:receive:error') +const ackContextMap = new Map() + const publisherMethods = [ 'createTopic', 'updateTopic', @@ -74,7 +77,67 @@ function wrapMethod (method) { return function (request) { if (!requestStartCh.hasSubscribers) return method.apply(this, arguments) + // For acknowledge/modifyAckDeadline, try to restore span context from stored map + let restoredStore = null + const isAckOperation = api === 'acknowledge' || api === 'modifyAckDeadline' + if (isAckOperation && request && request.ackIds && request.ackIds.length > 0) { + // Try to find a stored context for any of these ack IDs + for (const ackId of request.ackIds) { + const storedContext = ackContextMap.get(ackId) + if (storedContext) { + restoredStore = storedContext + break + } + } + + if (api === 'acknowledge') { + request.ackIds.forEach(ackId => { + if (ackContextMap.has(ackId)) { + ackContextMap.delete(ackId) + } + }) + } + } + const ctx = { request, api, projectId: this.auth._cachedProjectId } + + if (restoredStore) { + const parentSpan = restoredStore.span + if (parentSpan) { + ctx.parentSpan = parentSpan + } + return storage('legacy').run(restoredStore, () => { + return requestStartCh.runStores(ctx, () => { + const cb = arguments[arguments.length - 1] + + if (typeof cb === 'function') { + arguments[arguments.length - 1] = shimmer.wrapFunction(cb, cb => function (error) { + if (error) { + ctx.error = error + requestErrorCh.publish(ctx) + } + return requestFinishCh.runStores(ctx, cb, this, ...arguments) + }) + return method.apply(this, arguments) + } + + return method.apply(this, arguments) + .then( + response => { + requestFinishCh.publish(ctx) + return response + }, + error => { + ctx.error = error + requestErrorCh.publish(ctx) + requestFinishCh.publish(ctx) + throw error + } + ) + }) + }) + } + return requestStartCh.runStores(ctx, () => { const cb = arguments[arguments.length - 1] @@ -120,7 +183,8 @@ addHook({ name: '@google-cloud/pubsub', versions: ['>=1.2'] }, (obj) => { shimmer.wrap(Subscription.prototype, 'emit', emit => function (eventName, message) { if (eventName !== 'message' || !message) return emit.apply(this, arguments) - const ctx = {} + const store = storage('legacy').getStore() + const ctx = { message, store } try { return emit.apply(this, arguments) } catch (err) { @@ -133,6 +197,29 @@ addHook({ name: '@google-cloud/pubsub', versions: ['>=1.2'] }, (obj) => { return obj }) +addHook({ name: '@google-cloud/pubsub', versions: ['>=1.2'], file: 'build/src/subscriber.js' }, (obj) => { + const Message = obj.Message + + if (Message && Message.prototype && Message.prototype.ack) { + shimmer.wrap(Message.prototype, 'ack', originalAck => function () { + const currentStore = storage('legacy').getStore() + const activeSpan = currentStore && currentStore.span + + if (activeSpan) { + const storeWithSpanContext = { ...currentStore, span: activeSpan } + + if (this.ackId) { + ackContextMap.set(this.ackId, storeWithSpanContext) + } + } + + return originalAck.apply(this, arguments) + }) + } + + return obj +}) + addHook({ name: '@google-cloud/pubsub', versions: ['>=1.2'], file: 'build/src/lease-manager.js' }, (obj) => { const LeaseManager = obj.LeaseManager const ctx = {} diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/client.js b/packages/datadog-plugin-google-cloud-pubsub/src/client.js index 162031b999a..fa43b08542d 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/client.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/client.js @@ -12,7 +12,8 @@ class GoogleCloudPubsubClientPlugin extends ClientPlugin { if (api === 'publish') return - this.startSpan(this.operationName(), { + const explicitParent = ctx.parentSpan + const spanOptions = { service: this.config.service || this.serviceName(), resource: [api, request.name].filter(Boolean).join(' '), kind: this.constructor.kind, @@ -20,7 +21,13 @@ class GoogleCloudPubsubClientPlugin extends ClientPlugin { 'pubsub.method': api, 'gcloud.project_id': projectId } - }, ctx) + } + + if (explicitParent) { + spanOptions.childOf = explicitParent.context() + } + + this.startSpan(this.operationName(), spanOptions, ctx) return ctx.currentStore } diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js index f494d9ffcdc..4485e0e4686 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js @@ -9,36 +9,141 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { bindStart (ctx) { const { request, api, projectId } = ctx - if (api !== 'publish') return const messages = request.messages || [] const topic = request.topic + const messageCount = messages.length + const hasTraceContext = messages[0]?.attributes?.['x-datadog-trace-id'] + + // Collect span links from messages 2-N (skip first - it becomes parent) + const spanLinkData = hasTraceContext + ? messages.slice(1).map(msg => this._extractSpanLink(msg.attributes)).filter(Boolean) + : [] + + const firstAttrs = messages[0]?.attributes + const parentData = firstAttrs?.['x-datadog-trace-id'] && firstAttrs['x-datadog-parent-id'] + ? { + traceId: firstAttrs['x-datadog-trace-id'], + spanId: firstAttrs['x-datadog-parent-id'], + traceIdUpper: firstAttrs['_dd.p.tid'], + samplingPriority: firstAttrs['x-datadog-sampling-priority'] + } + : null + const topicName = topic.split('/').pop() || topic - const span = this.startSpan({ // TODO: rename + const batchSpan = this.startSpan({ + childOf: parentData ? this._extractParentContext(parentData) : undefined, resource: `${api} to Topic ${topicName}`, meta: { 'gcloud.project_id': projectId, - 'pubsub.method': api, // TODO: remove - 'pubsub.topic': topic + 'pubsub.method': api, + 'pubsub.topic': topic, + 'span.kind': 'producer', + '_dd.base_service': this.tracer._service, + '_dd.serviceoverride.type': 'integration', + 'pubsub.linked_message_count': spanLinkData.length || undefined, + operation: messageCount > 1 ? 'batched.pubsub.request' : 'pubsub.request' + }, + metrics: { + 'pubsub.batch.message_count': messageCount, + 'pubsub.batch': messageCount > 1 ? true : undefined } }, ctx) - for (const msg of messages) { - if (!msg.attributes) { - msg.attributes = {} + const spanCtx = batchSpan.context() + const batchTraceId = spanCtx.toTraceId() + const batchSpanId = spanCtx.toSpanId() + const batchTraceIdUpper = spanCtx._trace.tags['_dd.p.tid'] + const batchTraceIdHex = BigInt(batchTraceId).toString(16).padStart(16, '0') + const batchSpanIdHex = BigInt(batchSpanId).toString(16).padStart(16, '0') + + if (spanLinkData.length) { + batchSpan.setTag('_dd.span_links', JSON.stringify( + spanLinkData.map(link => ({ + trace_id: link.traceId, + span_id: link.spanId, + flags: link.samplingPriority || 0 + })) + )) + } + + messages.forEach((msg, i) => { + msg.attributes = msg.attributes || {} + + if (!hasTraceContext) { + this.tracer.inject(batchSpan, 'text_map', msg.attributes) } - this.tracer.inject(span, 'text_map', msg.attributes) + + Object.assign(msg.attributes, { + '_dd.pubsub_request.trace_id': batchTraceIdHex, + '_dd.pubsub_request.span_id': batchSpanIdHex, + '_dd.batch.size': String(messageCount), + '_dd.batch.index': String(i), + 'gcloud.project_id': projectId, + 'pubsub.topic': topic, + 'x-dd-publish-start-time': String(Math.floor(batchSpan._startTime)) + }) + + if (batchTraceIdUpper) { + msg.attributes['_dd.pubsub_request.p.tid'] = batchTraceIdUpper + } + if (this.config.dsmEnabled) { - const payloadSize = getHeadersSize(msg) - const dataStreamsContext = this.tracer - .setCheckpoint(['direction:out', `topic:${topic}`, 'type:google-pubsub'], span, payloadSize) + const dataStreamsContext = this.tracer.setCheckpoint( + ['direction:out', `topic:${topic}`, 'type:google-pubsub'], + batchSpan, + getHeadersSize(msg) + ) DsmPathwayCodec.encode(dataStreamsContext, msg.attributes) } - } + }) + ctx.batchSpan = batchSpan return ctx.currentStore } + + bindFinish (ctx) { + if (ctx.batchSpan && !ctx.batchSpan._duration) ctx.batchSpan.finish() + return super.bindFinish(ctx) + } + + bindError (ctx) { + if (ctx.error && ctx.batchSpan) { + ctx.batchSpan.setTag('error', ctx.error) + ctx.batchSpan.finish() + } + return ctx.parentStore + } + + _extractSpanLink (attrs) { + if (!attrs?.['x-datadog-trace-id'] || !attrs['x-datadog-parent-id']) return null + + const lowerHex = BigInt(attrs['x-datadog-trace-id']).toString(16).padStart(16, '0') + const spanIdHex = BigInt(attrs['x-datadog-parent-id']).toString(16).padStart(16, '0') + const traceIdHex = attrs['_dd.p.tid'] + ? attrs['_dd.p.tid'] + lowerHex + : lowerHex.padStart(32, '0') + + return { + traceId: traceIdHex, + spanId: spanIdHex, + samplingPriority: attrs['x-datadog-sampling-priority'] + ? Number.parseInt(attrs['x-datadog-sampling-priority'], 10) + : undefined + } + } + + _extractParentContext (data) { + const carrier = { + 'x-datadog-trace-id': data.traceId, + 'x-datadog-parent-id': data.spanId + } + if (data.traceIdUpper) carrier['_dd.p.tid'] = data.traceIdUpper + if (data.samplingPriority) carrier['x-datadog-sampling-priority'] = String(data.samplingPriority) + + return this.tracer.extract('text_map', carrier) + } } module.exports = GoogleCloudPubsubProducerPlugin diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/pubsub-push-subscription.js b/packages/datadog-plugin-google-cloud-pubsub/src/pubsub-push-subscription.js index f368d0170ee..11cfdd02bc5 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/pubsub-push-subscription.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/pubsub-push-subscription.js @@ -71,11 +71,14 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin { } _createDeliverySpan (messageData, parentContext, tracer) { - const { message, subscription, topicName } = messageData + const { message, subscription, topicName, attrs } = messageData const subscriptionName = subscription.split('/').pop() || subscription + const publishStartTime = attrs['x-dd-publish-start-time'] + const startTime = publishStartTime ? Number.parseInt(publishStartTime, 10) : undefined const span = tracer._tracer.startSpan('pubsub.delivery', { childOf: parentContext, + startTime, integrationName: 'google-cloud-pubsub', tags: { 'span.kind': 'consumer', @@ -93,10 +96,35 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin { }) span.setTag('resource.name', `Push Subscription ${subscriptionName}`) + this._addBatchMetadata(span, attrs) return span } + _addBatchMetadata (span, attrs) { + const batchSize = attrs['_dd.batch.size'] + const batchIndex = attrs['_dd.batch.index'] + + if (batchSize && batchIndex !== undefined) { + const size = Number.parseInt(batchSize, 10) + const index = Number.parseInt(batchIndex, 10) + + span.setTag('pubsub.batch.message_count', size) + span.setTag('pubsub.batch.message_index', index) + span.setTag('pubsub.batch.description', `Message ${index + 1} of ${size}`) + + const requestTraceId = attrs['_dd.pubsub_request.trace_id'] + const requestSpanId = attrs['_dd.pubsub_request.span_id'] + + if (requestTraceId) { + span.setTag('pubsub.batch.request_trace_id', requestTraceId) + } + if (requestSpanId) { + span.setTag('pubsub.batch.request_span_id', requestSpanId) + } + } + } + _extractProjectTopic (attrs, subscription) { const topicName = attrs['pubsub.topic'] const projectId = subscription.match(/projects\/([^\\/]+)\/subscriptions/)