diff --git a/packages/dd-trace/src/exporters/agent/writer.js b/packages/dd-trace/src/exporters/agent/writer.js index 7a17299c73e..23c0f503ce4 100644 --- a/packages/dd-trace/src/exporters/agent/writer.js +++ b/packages/dd-trace/src/exporters/agent/writer.js @@ -20,9 +20,28 @@ class Writer extends BaseWriter { this._headers = headers this._config = config this._encoder = new AgentEncoder(this) + this._retryQueue = [] + this._retryTimer = null + this._maxRetryQueueSize = config.maxRetryQueueSize || 100 + this._maxRetryAttempts = config.maxRetryAttempts || 3 + this._baseRetryDelay = config.baseRetryDelay || 1000 // 1 second } _sendPayload (data, count, done) { + this._sendPayloadWithRetry(data, count, this._wrapDoneCallback(done), 0) + } + + _wrapDoneCallback (done) { + let doneCalled = false + return () => { + if (!doneCalled) { + doneCalled = true + done() + } + } + } + + _sendPayloadWithRetry (data, count, done, retryAttempt = 0) { runtimeMetrics.increment(`${METRIC_PREFIX}.requests`, true) const { _headers, _lookup, _protocolVersion, _url } = this @@ -41,6 +60,19 @@ class Writer extends BaseWriter { startupLog({ agentError: err }) + // Handle 429 (rate limit) responses with retry logic + if (status === 429) { + if (retryAttempt < this._maxRetryAttempts) { + this._scheduleRetry(data, count, done, retryAttempt) + return + } + // Max retries exceeded, drop the payload + log.errorWithoutTelemetry('Maximum retry attempts reached for 429 response, dropping payload') + runtimeMetrics.increment(`${METRIC_PREFIX}.retries.dropped`, true) + done() + return + } + if (err) { log.errorWithoutTelemetry('Error sending payload to the agent (status code: %s)', err.status, err) done() @@ -49,6 +81,11 @@ class Writer extends BaseWriter { log.debug('Response from the agent: %s', res) + // Track successful retry if this was a retry attempt + if (retryAttempt > 0) { + runtimeMetrics.increment(`${METRIC_PREFIX}.retries.success`, true) + } + try { this._prioritySampler.update(JSON.parse(res).rate_by_service) } catch (e) { @@ -60,6 +97,109 @@ class Writer extends BaseWriter { done() }) } + + _scheduleRetry (data, count, done, retryAttempt) { + // Check if queue is full + if (this._retryQueue.length >= this._maxRetryQueueSize) { + log.errorWithoutTelemetry('Retry queue is full, dropping payload') + runtimeMetrics.increment(`${METRIC_PREFIX}.retries.dropped`, true) + done() + return + } + + // Calculate exponential backoff delay + const delay = this._baseRetryDelay * (2 ** retryAttempt) + const scheduledAt = Date.now() + delay + + // Track retry metrics + runtimeMetrics.increment(`${METRIC_PREFIX}.retries.scheduled`, true) + runtimeMetrics.increment(`${METRIC_PREFIX}.retries.by.attempt`, `attempt:${retryAttempt + 1}`, true) + + log.debug(`Scheduling retry attempt ${retryAttempt + 1} in ${delay}ms`) + + // Add to retry queue with scheduled time + this._retryQueue.push({ + data, + count, + done, + retryAttempt: retryAttempt + 1, + scheduledAt + }) + + // Start processing if not already running + this._scheduleNextRetry() + } + + _scheduleNextRetry () { + // Clear any existing timer + if (this._retryTimer) { + clearTimeout(this._retryTimer) + this._retryTimer = null + } + + if (this._retryQueue.length === 0) { + return + } + + // Find the next payload that's ready to send + const now = Date.now() + let readyIndex = -1 + + for (let i = 0; i < this._retryQueue.length; i++) { + if (this._retryQueue[i].scheduledAt <= now) { + readyIndex = i + break + } + } + + if (readyIndex >= 0) { + // Process ready payload immediately + const payload = this._retryQueue.splice(readyIndex, 1)[0] + + try { + this._sendPayloadWithRetry( + payload.data, + payload.count, + payload.done, + payload.retryAttempt + ) + } catch (err) { + log.errorWithoutTelemetry('Error processing retry', err) + runtimeMetrics.increment(`${METRIC_PREFIX}.retries.dropped`, true) + payload.done() + } + + // Schedule next immediately if there are more items + if (this._retryQueue.length > 0) { + setImmediate(() => this._scheduleNextRetry()) + } + } else { + // Nothing ready yet, wait for the earliest one + const earliestTime = Math.min(...this._retryQueue.map(p => p.scheduledAt)) + const delay = Math.max(0, earliestTime - now) + + this._retryTimer = setTimeout(() => { + this._retryTimer = null + this._scheduleNextRetry() + }, delay) + } + } + + _destroy () { + // Clear pending timer + if (this._retryTimer) { + clearTimeout(this._retryTimer) + this._retryTimer = null + } + + // Drain queue and call done callbacks + while (this._retryQueue.length > 0) { + const payload = this._retryQueue.shift() + log.debug('Dropping queued retry due to writer cleanup') + runtimeMetrics.increment(`${METRIC_PREFIX}.retries.dropped`, true) + payload.done() + } + } } function setHeader (headers, key, value) { @@ -91,6 +231,7 @@ function makeRequest (version, data, count, url, headers, lookup, needsStartupLo setHeader(options.headers, 'Datadog-Meta-Lang', 'nodejs') setHeader(options.headers, 'Datadog-Meta-Lang-Version', process.version) setHeader(options.headers, 'Datadog-Meta-Lang-Interpreter', process.jsEngine || 'v8') + setHeader(options.headers, 'Datadog-Send-Real-Http-Status', 'true') log.debug('Request to the agent: %j', options) diff --git a/packages/dd-trace/test/exporters/agent/writer.spec.js b/packages/dd-trace/test/exporters/agent/writer.spec.js index bb5278eb054..5486a958f1d 100644 --- a/packages/dd-trace/test/exporters/agent/writer.spec.js +++ b/packages/dd-trace/test/exporters/agent/writer.spec.js @@ -122,7 +122,8 @@ function describeWriter (protocolVersion) { 'Datadog-Meta-Lang-Version': process.version, 'Datadog-Meta-Lang-Interpreter': 'v8', 'Datadog-Meta-Tracer-Version': 'tracerVersion', - 'X-Datadog-Trace-Count': '2' + 'X-Datadog-Trace-Count': '2', + 'Datadog-Send-Real-Http-Status': 'true' }, lookup: undefined }) @@ -145,12 +146,22 @@ function describeWriter (protocolVersion) { 'Datadog-Meta-Lang-Version': process.version, 'Datadog-Meta-Lang-Interpreter': 'v8', 'Datadog-Meta-Tracer-Version': 'tracerVersion', - 'X-Datadog-Trace-Count': '2' + 'X-Datadog-Trace-Count': '2', + 'Datadog-Send-Real-Http-Status': 'true' }) done() }) }) + it('should include Datadog-Send-Real-Http-Status header', (done) => { + encoder.count.returns(2) + encoder.makePayload.returns([Buffer.from('data')]) + writer.flush(() => { + expect(request.getCall(0).args[1].headers['Datadog-Send-Real-Http-Status']).to.equal('true') + done() + }) + }) + it('should log request errors', done => { const error = new Error('boom') error.status = 42 @@ -194,6 +205,290 @@ function describeWriter (protocolVersion) { }) }) }) + + context('with 429 (rate limit) responses', () => { + let clock + + beforeEach(() => { + clock = sinon.useFakeTimers() + }) + + afterEach(() => { + clock.restore() + }) + + it('should retry on 429 response with exponential backoff', (done) => { + encoder.count.returns(2) + encoder.makePayload.returns([Buffer.from('data')]) + + let callCount = 0 + request.callsFake((data, options, callback) => { + callCount++ + if (callCount < 3) { + // First two calls return 429 + callback(null, null, 429) + } else { + // Third call succeeds + callback(null, response, 200) + } + }) + + writer.flush(() => { + expect(callCount).to.equal(3) + done() + }) + + // Fast-forward through retry delays + clock.tick(1000) // First retry after 1s + clock.tick(2000) // Second retry after 2s + }) + + it('should drop payload after max retry attempts', (done) => { + encoder.count.returns(2) + encoder.makePayload.returns([Buffer.from('data')]) + + // Always return 429 + request.yields(null, null, 429) + + writer.flush(() => { + // Should have attempted 4 times total (initial + 3 retries) + expect(request.callCount).to.equal(4) + expect(log.errorWithoutTelemetry).to.have.been.calledWith( + 'Maximum retry attempts reached for 429 response, dropping payload' + ) + done() + }) + + // Fast-forward through all retry delays + clock.tick(1000) // First retry + clock.tick(2000) // Second retry + clock.tick(4000) // Third retry + }) + + it('should handle multiple concurrent 429 responses', (done) => { + encoder.count.returns(2) + encoder.makePayload.returns([Buffer.from('data')]) + + let callCount = 0 + request.callsFake((data, options, callback) => { + callCount++ + if (callCount <= 5) { + // First 5 calls return 429 + callback(null, null, 429) + } else { + // Subsequent calls succeed + callback(null, response, 200) + } + }) + + let completed = 0 + const total = 3 + + for (let i = 0; i < total; i++) { + writer.flush(() => { + completed++ + if (completed === total) { + // All flushes should complete + expect(completed).to.equal(total) + done() + } + }) + } + + // Fast-forward through retry delays + for (let i = 0; i < 10; i++) { + clock.tick(1000) + } + }) + + it('should drop payloads when retry queue is full', (done) => { + const config = { maxRetryQueueSize: 5 } + writer = new Writer({ url, prioritySampler, protocolVersion, config }) + + encoder.count.returns(2) + encoder.makePayload.returns([Buffer.from('data')]) + + // Always return 429 + request.yields(null, null, 429) + + let completed = 0 + const total = 10 // More than maxRetryQueueSize + + for (let i = 0; i < total; i++) { + writer.flush(() => { + completed++ + if (completed === total) { + // Should have dropped some payloads + expect(log.errorWithoutTelemetry).to.have.been.calledWith( + 'Retry queue is full, dropping payload' + ) + done() + } + }) + } + + // Fast-forward to process initial requests + clock.tick(100) + }) + + it('should call done callback only once per flush', (done) => { + encoder.count.returns(2) + encoder.makePayload.returns([Buffer.from('data')]) + + let callCount = 0 + request.callsFake((data, options, callback) => { + callCount++ + if (callCount === 1) { + callback(null, null, 429) + } else { + callback(null, response, 200) + } + }) + + let doneCallCount = 0 + writer.flush(() => { + doneCallCount++ + // Should only be called once even though retry happened + expect(doneCallCount).to.equal(1) + done() + }) + + clock.tick(1000) // First retry + }) + + it('should handle errors during retry processing', (done) => { + encoder.count.returns(2) + encoder.makePayload.returns([Buffer.from('data')]) + + let callCount = 0 + request.callsFake((data, options, callback) => { + callCount++ + if (callCount === 1) { + callback(null, null, 429) + } else if (callCount === 2) { + // Simulate synchronous error during retry + throw new Error('Retry processing error') + } else { + callback(null, response, 200) + } + }) + + writer.flush(() => { + expect(log.errorWithoutTelemetry).to.have.been.calledWith( + 'Error processing retry', + sinon.match.instanceOf(Error) + ) + done() + }) + + clock.tick(1000) // First retry (will throw) + }) + + it('should use configurable retry parameters', (done) => { + const config = { + maxRetryAttempts: 2, + baseRetryDelay: 500, + maxRetryQueueSize: 50 + } + writer = new Writer({ url, prioritySampler, protocolVersion, config }) + + encoder.count.returns(2) + encoder.makePayload.returns([Buffer.from('data')]) + + // Always return 429 + request.yields(null, null, 429) + + writer.flush(() => { + // Should have attempted 3 times total (initial + 2 retries) + expect(request.callCount).to.equal(3) + done() + }) + + // Fast-forward using custom delay + clock.tick(500) // First retry + clock.tick(1000) // Second retry + }) + + it('should properly schedule retries with correct timing', (done) => { + encoder.count.returns(2) + encoder.makePayload.returns([Buffer.from('data')]) + + const retryTimes = [] + let callCount = 0 + + request.callsFake((data, options, callback) => { + callCount++ + retryTimes.push(Date.now()) + + if (callCount < 4) { + callback(null, null, 429) + } else { + callback(null, response, 200) + } + }) + + writer.flush(() => { + // Verify exponential backoff timing: initial, +1s, +2s, +4s + expect(retryTimes[1] - retryTimes[0]).to.equal(1000) // First retry + expect(retryTimes[2] - retryTimes[1]).to.equal(2000) // Second retry + expect(retryTimes[3] - retryTimes[2]).to.equal(4000) // Third retry + done() + }) + + clock.tick(1000) // First retry + clock.tick(2000) // Second retry + clock.tick(4000) // Third retry + }) + }) + + context('cleanup and shutdown', () => { + it('should clean up pending retries on destroy', (done) => { + encoder.count.returns(2) + encoder.makePayload.returns([Buffer.from('data')]) + + // Return 429 to queue retries + request.yields(null, null, 429) + + let doneCallCount = 0 + writer.flush(() => { + doneCallCount++ + }) + writer.flush(() => { + doneCallCount++ + }) + + // Destroy before retries process + setImmediate(() => { + writer._destroy() + + // Both done callbacks should have been called + expect(doneCallCount).to.equal(2) + expect(log.debug).to.have.been.calledWith('Dropping queued retry due to writer cleanup') + done() + }) + }) + + it('should clear pending timers on destroy', () => { + const clock = sinon.useFakeTimers() + + encoder.count.returns(2) + encoder.makePayload.returns([Buffer.from('data')]) + + // Return 429 to schedule retries + request.yields(null, null, 429) + + writer.flush(() => {}) + + // Verify timer is scheduled + expect(writer._retryTimer).to.not.be.null + + // Destroy should clear timer + writer._destroy() + expect(writer._retryTimer).to.be.null + + clock.restore() + }) + }) }) }