Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 88 additions & 1 deletion packages/datadog-instrumentations/src/google-cloud-pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const {
addHook
} = require('./helpers/instrument')
const shimmer = require('../../datadog-shimmer')
const { storage } = require('../../datadog-core')
const log = require('../../dd-trace/src/log')

// Auto-load push subscription plugin to enable pubsub.delivery spans for push subscriptions
Expand All @@ -24,6 +25,8 @@ const receiveStartCh = channel('apm:google-cloud-pubsub:receive:start')
const receiveFinishCh = channel('apm:google-cloud-pubsub:receive:finish')
const receiveErrorCh = channel('apm:google-cloud-pubsub:receive:error')

const ackContextMap = new Map()

const publisherMethods = [
'createTopic',
'updateTopic',
Expand Down Expand Up @@ -74,7 +77,67 @@ function wrapMethod (method) {
return function (request) {
if (!requestStartCh.hasSubscribers) return method.apply(this, arguments)

// For acknowledge/modifyAckDeadline, try to restore span context from stored map
let restoredStore = null
const isAckOperation = api === 'acknowledge' || api === 'modifyAckDeadline'
if (isAckOperation && request && request.ackIds && request.ackIds.length > 0) {
// Try to find a stored context for any of these ack IDs
for (const ackId of request.ackIds) {
const storedContext = ackContextMap.get(ackId)
if (storedContext) {
restoredStore = storedContext
break
}
}

if (api === 'acknowledge') {
request.ackIds.forEach(ackId => {
if (ackContextMap.has(ackId)) {
ackContextMap.delete(ackId)
}
})
}
}

const ctx = { request, api, projectId: this.auth._cachedProjectId }

if (restoredStore) {
const parentSpan = restoredStore.span
if (parentSpan) {
ctx.parentSpan = parentSpan
}
return storage('legacy').run(restoredStore, () => {
return requestStartCh.runStores(ctx, () => {
const cb = arguments[arguments.length - 1]

if (typeof cb === 'function') {
arguments[arguments.length - 1] = shimmer.wrapFunction(cb, cb => function (error) {
if (error) {
ctx.error = error
requestErrorCh.publish(ctx)
}
return requestFinishCh.runStores(ctx, cb, this, ...arguments)
})
return method.apply(this, arguments)
}

return method.apply(this, arguments)
.then(
response => {
requestFinishCh.publish(ctx)
return response
},
error => {
ctx.error = error
requestErrorCh.publish(ctx)
requestFinishCh.publish(ctx)
throw error
}
)
})
})
}

return requestStartCh.runStores(ctx, () => {
const cb = arguments[arguments.length - 1]

Expand Down Expand Up @@ -120,7 +183,8 @@ addHook({ name: '@google-cloud/pubsub', versions: ['>=1.2'] }, (obj) => {
shimmer.wrap(Subscription.prototype, 'emit', emit => function (eventName, message) {
if (eventName !== 'message' || !message) return emit.apply(this, arguments)

const ctx = {}
const store = storage('legacy').getStore()
const ctx = { message, store }
try {
return emit.apply(this, arguments)
} catch (err) {
Expand All @@ -133,6 +197,29 @@ addHook({ name: '@google-cloud/pubsub', versions: ['>=1.2'] }, (obj) => {
return obj
})

addHook({ name: '@google-cloud/pubsub', versions: ['>=1.2'], file: 'build/src/subscriber.js' }, (obj) => {
const Message = obj.Message

if (Message && Message.prototype && Message.prototype.ack) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (Message && Message.prototype && Message.prototype.ack) {
if (Message?.prototype?.ack) {

shimmer.wrap(Message.prototype, 'ack', originalAck => function () {
const currentStore = storage('legacy').getStore()
const activeSpan = currentStore && currentStore.span

if (activeSpan) {
const storeWithSpanContext = { ...currentStore, span: activeSpan }

if (this.ackId) {
ackContextMap.set(this.ackId, storeWithSpanContext)
}
}

return originalAck.apply(this, arguments)
})
}

return obj
})

addHook({ name: '@google-cloud/pubsub', versions: ['>=1.2'], file: 'build/src/lease-manager.js' }, (obj) => {
const LeaseManager = obj.LeaseManager
const ctx = {}
Expand Down
11 changes: 9 additions & 2 deletions packages/datadog-plugin-google-cloud-pubsub/src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,22 @@ class GoogleCloudPubsubClientPlugin extends ClientPlugin {

if (api === 'publish') return

this.startSpan(this.operationName(), {
const explicitParent = ctx.parentSpan
const spanOptions = {
service: this.config.service || this.serviceName(),
resource: [api, request.name].filter(Boolean).join(' '),
kind: this.constructor.kind,
meta: {
'pubsub.method': api,
'gcloud.project_id': projectId
}
}, ctx)
}

if (explicitParent) {
spanOptions.childOf = explicitParent.context()
}

this.startSpan(this.operationName(), spanOptions, ctx)

return ctx.currentStore
}
Expand Down
129 changes: 117 additions & 12 deletions packages/datadog-plugin-google-cloud-pubsub/src/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,141 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin {

bindStart (ctx) {
const { request, api, projectId } = ctx

if (api !== 'publish') return

const messages = request.messages || []
const topic = request.topic
const messageCount = messages.length
const hasTraceContext = messages[0]?.attributes?.['x-datadog-trace-id']

// Collect span links from messages 2-N (skip first - it becomes parent)
const spanLinkData = hasTraceContext
? messages.slice(1).map(msg => this._extractSpanLink(msg.attributes)).filter(Boolean)
: []

const firstAttrs = messages[0]?.attributes
const parentData = firstAttrs?.['x-datadog-trace-id'] && firstAttrs['x-datadog-parent-id']
? {
traceId: firstAttrs['x-datadog-trace-id'],
spanId: firstAttrs['x-datadog-parent-id'],
traceIdUpper: firstAttrs['_dd.p.tid'],
samplingPriority: firstAttrs['x-datadog-sampling-priority']
}
: null

const topicName = topic.split('/').pop() || topic
const span = this.startSpan({ // TODO: rename
const batchSpan = this.startSpan({
childOf: parentData ? this._extractParentContext(parentData) : undefined,
resource: `${api} to Topic ${topicName}`,
meta: {
'gcloud.project_id': projectId,
'pubsub.method': api, // TODO: remove
'pubsub.topic': topic
'pubsub.method': api,
'pubsub.topic': topic,
'span.kind': 'producer',
'_dd.base_service': this.tracer._service,
'_dd.serviceoverride.type': 'integration',
'pubsub.linked_message_count': spanLinkData.length || undefined,
operation: messageCount > 1 ? 'batched.pubsub.request' : 'pubsub.request'
},
metrics: {
'pubsub.batch.message_count': messageCount,
'pubsub.batch': messageCount > 1 ? true : undefined
}
}, ctx)

for (const msg of messages) {
if (!msg.attributes) {
msg.attributes = {}
const spanCtx = batchSpan.context()
const batchTraceId = spanCtx.toTraceId()
const batchSpanId = spanCtx.toSpanId()
const batchTraceIdUpper = spanCtx._trace.tags['_dd.p.tid']
const batchTraceIdHex = BigInt(batchTraceId).toString(16).padStart(16, '0')
const batchSpanIdHex = BigInt(batchSpanId).toString(16).padStart(16, '0')

if (spanLinkData.length) {
batchSpan.setTag('_dd.span_links', JSON.stringify(
spanLinkData.map(link => ({
trace_id: link.traceId,
span_id: link.spanId,
flags: link.samplingPriority || 0
}))
))
}

messages.forEach((msg, i) => {
msg.attributes = msg.attributes || {}

if (!hasTraceContext) {
this.tracer.inject(batchSpan, 'text_map', msg.attributes)
}
this.tracer.inject(span, 'text_map', msg.attributes)

Object.assign(msg.attributes, {
'_dd.pubsub_request.trace_id': batchTraceIdHex,
'_dd.pubsub_request.span_id': batchSpanIdHex,
'_dd.batch.size': String(messageCount),
'_dd.batch.index': String(i),
'gcloud.project_id': projectId,
'pubsub.topic': topic,
'x-dd-publish-start-time': String(Math.floor(batchSpan._startTime))
})

if (batchTraceIdUpper) {
msg.attributes['_dd.pubsub_request.p.tid'] = batchTraceIdUpper
}

if (this.config.dsmEnabled) {
const payloadSize = getHeadersSize(msg)
const dataStreamsContext = this.tracer
.setCheckpoint(['direction:out', `topic:${topic}`, 'type:google-pubsub'], span, payloadSize)
const dataStreamsContext = this.tracer.setCheckpoint(
['direction:out', `topic:${topic}`, 'type:google-pubsub'],
batchSpan,
getHeadersSize(msg)
)
DsmPathwayCodec.encode(dataStreamsContext, msg.attributes)
}
}
})

ctx.batchSpan = batchSpan
return ctx.currentStore
}

bindFinish (ctx) {
if (ctx.batchSpan && !ctx.batchSpan._duration) ctx.batchSpan.finish()
return super.bindFinish(ctx)
}

bindError (ctx) {
if (ctx.error && ctx.batchSpan) {
ctx.batchSpan.setTag('error', ctx.error)
ctx.batchSpan.finish()
}
return ctx.parentStore
}

_extractSpanLink (attrs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (!attrs?.['x-datadog-trace-id'] || !attrs['x-datadog-parent-id']) return null

const lowerHex = BigInt(attrs['x-datadog-trace-id']).toString(16).padStart(16, '0')
const spanIdHex = BigInt(attrs['x-datadog-parent-id']).toString(16).padStart(16, '0')
const traceIdHex = attrs['_dd.p.tid']
? attrs['_dd.p.tid'] + lowerHex
: lowerHex.padStart(32, '0')

return {
traceId: traceIdHex,
spanId: spanIdHex,
samplingPriority: attrs['x-datadog-sampling-priority']
? Number.parseInt(attrs['x-datadog-sampling-priority'], 10)
: undefined
}
}

_extractParentContext (data) {
const carrier = {
'x-datadog-trace-id': data.traceId,
'x-datadog-parent-id': data.spanId
}
if (data.traceIdUpper) carrier['_dd.p.tid'] = data.traceIdUpper
if (data.samplingPriority) carrier['x-datadog-sampling-priority'] = String(data.samplingPriority)

return this.tracer.extract('text_map', carrier)
}
}

module.exports = GoogleCloudPubsubProducerPlugin
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,14 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {
}

_createDeliverySpan (messageData, parentContext, tracer) {
const { message, subscription, topicName } = messageData
const { message, subscription, topicName, attrs } = messageData
const subscriptionName = subscription.split('/').pop() || subscription
const publishStartTime = attrs['x-dd-publish-start-time']
const startTime = publishStartTime ? Number.parseInt(publishStartTime, 10) : undefined

const span = tracer._tracer.startSpan('pubsub.delivery', {
childOf: parentContext,
startTime,
integrationName: 'google-cloud-pubsub',
tags: {
'span.kind': 'consumer',
Expand All @@ -93,10 +96,35 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {
})

span.setTag('resource.name', `Push Subscription ${subscriptionName}`)
this._addBatchMetadata(span, attrs)

return span
}

_addBatchMetadata (span, attrs) {
const batchSize = attrs['_dd.batch.size']
const batchIndex = attrs['_dd.batch.index']

if (batchSize && batchIndex !== undefined) {
const size = Number.parseInt(batchSize, 10)
const index = Number.parseInt(batchIndex, 10)

span.setTag('pubsub.batch.message_count', size)
span.setTag('pubsub.batch.message_index', index)
span.setTag('pubsub.batch.description', `Message ${index + 1} of ${size}`)

const requestTraceId = attrs['_dd.pubsub_request.trace_id']
const requestSpanId = attrs['_dd.pubsub_request.span_id']

if (requestTraceId) {
span.setTag('pubsub.batch.request_trace_id', requestTraceId)
}
if (requestSpanId) {
span.setTag('pubsub.batch.request_span_id', requestSpanId)
}
}
}

_extractProjectTopic (attrs, subscription) {
const topicName = attrs['pubsub.topic']
const projectId = subscription.match(/projects\/([^\\/]+)\/subscriptions/)
Expand Down
Loading