Skip to content

Commit 92f42d8

Browse files
committed
feat(sampler): use a reservoir sampler
1 parent 24e2114 commit 92f42d8

File tree

4 files changed

+94
-26
lines changed

4 files changed

+94
-26
lines changed

lib/agent/index.js

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ var assign = require('lodash.assign')
88
var CollectorApi = require('./api')
99
var Metrics = require('./metrics')
1010
var consts = require('../consts')
11+
var ReservoirSampler = require('./reservoir_sampler')
1112

1213
var REQUEST_ID = 'request-id'
1314
var SPAN_ID = 'span-id'
14-
var MUST_COLLECT_LIMIT = 20
15+
var MUST_COLLECT_LIMIT = 100
1516

1617
function Agent (options) {
1718
debug('Agent is initializing...')
@@ -27,7 +28,6 @@ function Agent (options) {
2728

2829
// init required variables
2930
this.partials = {}
30-
this.spans = []
3131

3232
this.apmMetrics = Metrics.apm.create({
3333
collectorApi: this.collectorApi,
@@ -57,6 +57,8 @@ function Agent (options) {
5757
_this.serviceKey = serviceKey
5858
_this.start()
5959
})
60+
61+
this.reservoirSampler = new ReservoirSampler(MUST_COLLECT_LIMIT)
6062
}
6163

6264
Agent.prototype.start = function () {
@@ -132,11 +134,7 @@ Agent.prototype.serverSend = function (data) {
132134
span.isForceSampled = span.isForceSampled || data.mustCollect === consts.MUST_COLLECT.ERROR
133135

134136
if (span.isForceSampled) {
135-
this.mustCollectCount += 1
136-
}
137-
138-
if (span.isForceSampled && (this.mustCollectCount <= MUST_COLLECT_LIMIT)) {
139-
this.spans.push(span)
137+
this.reservoirSampler.addReturnsSuccess(span)
140138
}
141139

142140
this.rpmMetrics.addResponseTime(data.responseTime)
@@ -251,7 +249,7 @@ Agent.prototype.onCrash = function (data) {
251249
}
252250
})
253251

254-
this.spans.push(span)
252+
this.reservoirSampler.addReturnsSuccess(span)
255253

256254
this._send({
257255
isSync: true
@@ -386,19 +384,18 @@ Agent.prototype.getMicrotime = function () {
386384

387385
Agent.prototype._send = function (options) {
388386
debug('sending logs to the trace service')
389-
if (this.spans.length > 0) {
387+
var spans = this.reservoirSampler.getItems()
388+
if (spans.length > 0) {
390389
var dataBag = {
391390
sample: {
392391
rate: 1,
393392
totalRequestCount: 1
394393
},
395-
spans: this.spans
394+
spans: spans
396395
}
397396

398397
this.totalRequestCount = 0
399-
400-
this.spans = []
401-
this.mustCollectCount = 0
398+
this.reservoirSampler = new ReservoirSampler(MUST_COLLECT_LIMIT)
402399
this.collectorApi.sendSamples(dataBag, options && options.isSync)
403400
}
404401
}

lib/agent/index.spec.js

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ describe('The Trace agent', function () {
151151
expect(rpmMetrics.addStatusCode).to.have.been.calledWith(statusCode)
152152
expect(agent.totalRequestCount).to.eql(1)
153153
expect(agent.partials).to.eql({})
154-
expect(agent.spans).to.eql([{
154+
expect(agent.reservoirSampler.getItems()).to.eql([{
155155
requestId: transactionId,
156156
isForceSampled: true,
157157
isSampled: false,
@@ -195,7 +195,7 @@ describe('The Trace agent', function () {
195195
})
196196

197197
expect(agent.totalRequestCount).to.eql(1)
198-
expect(agent.spans).to.eql([])
198+
expect(agent.reservoirSampler.getItems()).to.eql([])
199199
expect(agent.partials[transactionId]).to.eql({
200200
requestId: transactionId,
201201
isForceSampled: false,
@@ -237,7 +237,7 @@ describe('The Trace agent', function () {
237237
this.sandbox.stub(agent, 'getSpanId').returns(spanId)
238238
agent.report('obviously', { a: 'mock' })
239239
expect(agent.totalRequestCount).to.eql(1)
240-
expect(agent.spans).to.eql([])
240+
expect(agent.reservoirSampler.getItems()).to.eql([])
241241
expect(agent.partials[transactionId]).to.eql({
242242
requestId: transactionId,
243243
isForceSampled: false,
@@ -281,7 +281,7 @@ describe('The Trace agent', function () {
281281

282282
agent.reportError('mocking a', new Error('with style'))
283283
expect(agent.totalRequestCount).to.eql(1)
284-
expect(agent.spans).to.eql([])
284+
expect(agent.reservoirSampler.getItems()).to.eql([])
285285
expect(agent.partials[transactionId]).to.eql({
286286
requestId: transactionId,
287287
isForceSampled: false,
@@ -311,17 +311,13 @@ describe('The Trace agent', function () {
311311
})
312312

313313
it('passes sample and span data to the API client', function () {
314-
agent.spans = [
315-
1,
316-
2,
317-
3
318-
]
314+
agent.reservoirSampler.addReturnsSuccess(1)
319315
agent.totalRequestCount = 6
320316
agent._send()
321317

322318
expect(agent.totalRequestCount).to.eql(0)
323319
expect(collectorApi.sendSamples).to.be.calledWith({
324-
spans: [1, 2, 3],
320+
spans: [1],
325321
sample: {
326322
rate: 1,
327323
totalRequestCount: 1
@@ -331,7 +327,9 @@ describe('The Trace agent', function () {
331327

332328
it('limits must-collect count to 20', function () {
333329
var statusCode = 200
334-
agent.mustCollectCount = 20
330+
for (var i = 0; i < 100; i += 1) {
331+
agent.reservoirSampler.addReturnsSuccess(1)
332+
}
335333
agent.serverReceive({
336334
id: transactionId,
337335
spanId: spanId,
@@ -349,7 +347,7 @@ describe('The Trace agent', function () {
349347
mustCollect: '1'
350348
})
351349

352-
expect(agent.spans).to.eql([])
353-
expect(agent.mustCollectCount).to.eql(21)
350+
expect(agent.reservoirSampler.getItems().length).to.eql(100)
351+
expect(agent.reservoirSampler.itemsSeen).to.eql(101)
354352
})
355353
})

lib/agent/reservoir_sampler.js

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
function Reservoir (limit) {
2+
this.MAX_ITEMS = limit || 10
3+
4+
this.itemsSeen = 0
5+
this.data = []
6+
}
7+
8+
Reservoir.prototype.addReturnsSuccess = function (item) {
9+
var inserted = true
10+
if (this.itemsSeen < this.MAX_ITEMS) {
11+
this.data.push(item)
12+
inserted = true
13+
} else {
14+
inserted = this._replaceSampledItemReturnsSuccess(item)
15+
}
16+
this.itemsSeen++
17+
return inserted
18+
}
19+
20+
Reservoir.prototype.getItems = function () {
21+
return this.data
22+
}
23+
24+
Reservoir.prototype._replaceSampledItemReturnsSuccess = function (item) {
25+
var toReplaceIndex = Math.floor(Math.random() * (this.itemsSeen + 2))
26+
if (toReplaceIndex < this.MAX_ITEMS) {
27+
this.data[toReplaceIndex] = item
28+
return true
29+
}
30+
return false
31+
}
32+
33+
module.exports = Reservoir
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
var expect = require('chai').expect
2+
var ReservoirSampler = require('./reservoir_sampler')
3+
4+
describe('The Reservoir Sampler', function () {
5+
var itemToAdd = {
6+
name: 'John Coin'
7+
}
8+
9+
describe('when seen count is below the size limit', function () {
10+
it('adds the item', function () {
11+
var sampler = new ReservoirSampler(3)
12+
var isAdded = sampler.addReturnsSuccess(itemToAdd)
13+
14+
expect(isAdded).to.eql(true)
15+
expect(sampler.getItems()).to.eql([itemToAdd])
16+
})
17+
})
18+
19+
describe('when seen count is above the size limit', function () {
20+
it('adds the item if selected', function () {
21+
this.sandbox.stub(Math, 'random', function () {
22+
return 0
23+
})
24+
var sampler = new ReservoirSampler(1)
25+
sampler.itemsSeen = 1
26+
var isAdded = sampler.addReturnsSuccess(itemToAdd)
27+
expect(isAdded).to.eql(true)
28+
})
29+
30+
it('discards the item if not selected', function () {
31+
this.sandbox.stub(Math, 'random', function () {
32+
return 1
33+
})
34+
var sampler = new ReservoirSampler(1)
35+
sampler.itemsSeen = 1
36+
var isAdded = sampler.addReturnsSuccess(itemToAdd)
37+
expect(isAdded).to.eql(false)
38+
})
39+
})
40+
})

0 commit comments

Comments
 (0)