Skip to content
This repository was archived by the owner on Aug 17, 2020. It is now read-only.

Commit baf0236

Browse files
authored
Recorder buffer refactor (#177)
* refactor of the recorder to split buffer * changes based in the review
1 parent fd13fea commit baf0236

File tree

1 file changed

+102
-77
lines changed

1 file changed

+102
-77
lines changed

agent/recorder.go

Lines changed: 102 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ type (
3838
debugMode bool
3939
metadata map[string]interface{}
4040

41-
spans []tracer.RawSpan
41+
payloadSpans []PayloadSpan
42+
payloadEvents []PayloadEvent
4243

4344
flushFrequency time.Duration
4445
url string
@@ -62,6 +63,9 @@ type (
6263
testSpansNotSent int64
6364
testSpansRejected int64
6465
}
66+
67+
PayloadSpan map[string]interface{}
68+
PayloadEvent map[string]interface{}
6569
)
6670

6771
func NewSpanRecorder(agent *Agent) *SpanRecorder {
@@ -87,7 +91,7 @@ func (r *SpanRecorder) RecordSpan(span tracer.RawSpan) {
8791
if !r.t.Alive() {
8892
atomic.AddInt64(&r.stats.totalSpans, 1)
8993
atomic.AddInt64(&r.stats.spansRejected, 1)
90-
if isTestSpan(span) {
94+
if isTestSpan(span.Tags) {
9195
atomic.AddInt64(&r.stats.totalTestSpans, 1)
9296
atomic.AddInt64(&r.stats.testSpansRejected, 1)
9397
}
@@ -98,15 +102,18 @@ func (r *SpanRecorder) RecordSpan(span tracer.RawSpan) {
98102
}
99103

100104
func (r *SpanRecorder) loop() error {
105+
defer func() {
106+
r.logger.Println("recorder has been stopped.")
107+
}()
101108
ticker := time.NewTicker(1 * time.Second)
102109
cTime := time.Now()
103110
for {
104111
select {
105112
case <-ticker.C:
106-
hasSpans := r.hasSpans()
107-
if hasSpans || time.Now().Sub(cTime) >= r.getFlushFrequency() {
113+
hasPayloadData := r.hasPayloadData()
114+
if hasPayloadData || time.Now().Sub(cTime) >= r.getFlushFrequency() {
108115
if r.debugMode {
109-
if hasSpans {
116+
if hasPayloadData {
110117
r.logger.Println("Ticker: Sending by buffer")
111118
} else {
112119
r.logger.Println("Ticker: Sending by time")
@@ -135,26 +142,18 @@ func (r *SpanRecorder) loop() error {
135142
// Sends the spans in the buffer to Scope
136143
func (r *SpanRecorder) sendSpans() (error, bool) {
137144
atomic.AddInt64(&r.stats.sendSpansCalls, 1)
138-
spans := r.popSpans()
139-
140145
const batchSize = 1000
141-
batchLength := len(spans) / batchSize
142-
143-
r.logger.Printf("sending %d spans in %d batches", len(spans), batchLength+1)
144-
145146
var lastError error
146-
for b := 0; b <= batchLength; b++ {
147-
var batch []tracer.RawSpan
148-
// We extract the batch of spans to be send
149-
if b == batchLength {
150-
// If we are in the last batch, we select the remaining spans
151-
batch = spans[b*batchSize:]
152-
} else {
153-
batch = spans[b*batchSize : ((b + 1) * batchSize)]
147+
for {
148+
spans, spMore, spTotal := r.popPayloadSpan(batchSize)
149+
events, evMore, evTotal := r.popPayloadEvents(batchSize)
150+
151+
payload := map[string]interface{}{
152+
"metadata": r.metadata,
153+
"spans": spans,
154+
"events": events,
155+
tags.AgentID: r.agentId,
154156
}
155-
156-
payload := r.getPayload(batch)
157-
158157
buf, err := encodePayload(payload)
159158
if err != nil {
160159
atomic.AddInt64(&r.stats.sendSpansKo, 1)
@@ -163,15 +162,13 @@ func (r *SpanRecorder) sendSpans() (error, bool) {
163162
}
164163

165164
var testSpans int64
166-
for _, span := range batch {
165+
for _, span := range spans {
167166
if isTestSpan(span) {
168167
testSpans++
169168
}
170169
}
171170

172-
if batchLength > 0 {
173-
r.logger.Printf("sending batch %d with %d spans", b+1, len(batch))
174-
}
171+
r.logger.Printf("sending %d/%d spans with %d/%d events", len(spans), spTotal, len(events), evTotal)
175172
statusCode, err := r.callIngest(buf)
176173
if err != nil {
177174
atomic.AddInt64(&r.stats.sendSpansKo, 1)
@@ -186,6 +183,10 @@ func (r *SpanRecorder) sendSpans() (error, bool) {
186183
return err, true
187184
}
188185
lastError = err
186+
187+
if !spMore && !evMore {
188+
break
189+
}
189190
}
190191
return lastError, false
191192
}
@@ -302,53 +303,45 @@ func (r *SpanRecorder) callIngest(payload *bytes.Buffer) (statusCode int, err er
302303
return statusCode, lastError
303304
}
304305

305-
// Combines `rawSpans` and `metadata` into a payload that the Scope backend can process
306-
func (r *SpanRecorder) getPayload(rawSpans []tracer.RawSpan) map[string]interface{} {
307-
spans := []map[string]interface{}{}
308-
events := []map[string]interface{}{}
309-
for _, span := range rawSpans {
310-
var parentSpanID string
311-
if span.ParentSpanID != 0 {
312-
parentSpanID = fmt.Sprintf("%x", span.ParentSpanID)
306+
// Get payload components
307+
func (r *SpanRecorder) getPayloadComponents(span tracer.RawSpan) (PayloadSpan, []PayloadEvent) {
308+
events := make([]PayloadEvent, 0)
309+
var parentSpanID string
310+
if span.ParentSpanID != 0 {
311+
parentSpanID = fmt.Sprintf("%x", span.ParentSpanID)
312+
}
313+
payloadSpan := PayloadSpan{
314+
"context": map[string]interface{}{
315+
"trace_id": fmt.Sprintf("%x", span.Context.TraceID),
316+
"span_id": fmt.Sprintf("%x", span.Context.SpanID),
317+
"baggage": span.Context.Baggage,
318+
},
319+
"parent_span_id": parentSpanID,
320+
"operation": span.Operation,
321+
"start": r.applyNTPOffset(span.Start).Format(time.RFC3339Nano),
322+
"duration": span.Duration.Nanoseconds(),
323+
"tags": span.Tags,
324+
}
325+
for _, event := range span.Logs {
326+
var fields = make(map[string]interface{})
327+
for _, field := range event.Fields {
328+
fields[field.Key()] = field.Value()
329+
}
330+
eventId, err := uuid.NewRandom()
331+
if err != nil {
332+
panic(err)
313333
}
314-
spans = append(spans, map[string]interface{}{
334+
events = append(events, PayloadEvent{
315335
"context": map[string]interface{}{
316336
"trace_id": fmt.Sprintf("%x", span.Context.TraceID),
317337
"span_id": fmt.Sprintf("%x", span.Context.SpanID),
318-
"baggage": span.Context.Baggage,
338+
"event_id": eventId.String(),
319339
},
320-
"parent_span_id": parentSpanID,
321-
"operation": span.Operation,
322-
"start": r.applyNTPOffset(span.Start).Format(time.RFC3339Nano),
323-
"duration": span.Duration.Nanoseconds(),
324-
"tags": span.Tags,
340+
"timestamp": r.applyNTPOffset(event.Timestamp).Format(time.RFC3339Nano),
341+
"fields": fields,
325342
})
326-
for _, event := range span.Logs {
327-
var fields = make(map[string]interface{})
328-
for _, field := range event.Fields {
329-
fields[field.Key()] = field.Value()
330-
}
331-
eventId, err := uuid.NewRandom()
332-
if err != nil {
333-
panic(err)
334-
}
335-
events = append(events, map[string]interface{}{
336-
"context": map[string]interface{}{
337-
"trace_id": fmt.Sprintf("%x", span.Context.TraceID),
338-
"span_id": fmt.Sprintf("%x", span.Context.SpanID),
339-
"event_id": eventId.String(),
340-
},
341-
"timestamp": r.applyNTPOffset(event.Timestamp).Format(time.RFC3339Nano),
342-
"fields": fields,
343-
})
344-
}
345-
}
346-
return map[string]interface{}{
347-
"metadata": r.metadata,
348-
"spans": spans,
349-
"events": events,
350-
tags.AgentID: r.agentId,
351343
}
344+
return payloadSpan, events
352345
}
353346

354347
// Encodes `payload` using msgpack and compress it with gzip
@@ -379,32 +372,64 @@ func (r *SpanRecorder) getFlushFrequency() time.Duration {
379372
}
380373

381374
// Gets if there any span available to be send
382-
func (r *SpanRecorder) hasSpans() bool {
375+
func (r *SpanRecorder) hasPayloadData() bool {
383376
r.RLock()
384377
defer r.RUnlock()
385-
return len(r.spans) > 0
378+
return len(r.payloadSpans) > 0 || len(r.payloadEvents) > 0
379+
}
380+
381+
// Gets a number of payload spans from buffer
382+
func (r *SpanRecorder) popPayloadSpan(count int) ([]PayloadSpan, bool, int) {
383+
r.Lock()
384+
defer r.Unlock()
385+
var spans []PayloadSpan
386+
length := len(r.payloadSpans)
387+
if length <= count || count == -1 {
388+
spans = r.payloadSpans
389+
if spans == nil {
390+
spans = make([]PayloadSpan, 0)
391+
}
392+
r.payloadSpans = nil
393+
return spans, false, length
394+
}
395+
spans = r.payloadSpans[:count]
396+
r.payloadSpans = r.payloadSpans[count:]
397+
return spans, true, length
386398
}
387399

388-
// Gets the spans to be send and clears the buffer
389-
func (r *SpanRecorder) popSpans() []tracer.RawSpan {
400+
// Gets a number of payload events from buffer
401+
func (r *SpanRecorder) popPayloadEvents(count int) ([]PayloadEvent, bool, int) {
390402
r.Lock()
391403
defer r.Unlock()
392-
spans := r.spans
393-
r.spans = nil
394-
return spans
404+
var events []PayloadEvent
405+
length := len(r.payloadEvents)
406+
if length <= count || count == -1 {
407+
events = r.payloadEvents
408+
if events == nil {
409+
events = make([]PayloadEvent, 0)
410+
}
411+
r.payloadEvents = nil
412+
return events, false, length
413+
}
414+
events = r.payloadEvents[:count]
415+
r.payloadEvents = r.payloadEvents[count:]
416+
return events, true, length
395417
}
396418

397419
// Adds a span to the buffer
398420
func (r *SpanRecorder) addSpan(span tracer.RawSpan) {
399421
r.Lock()
400422
defer r.Unlock()
401-
r.spans = append(r.spans, span)
423+
payloadSpan, payloadEvents := r.getPayloadComponents(span)
424+
r.payloadSpans = append(r.payloadSpans, payloadSpan)
425+
r.payloadEvents = append(r.payloadEvents, payloadEvents...)
426+
402427
atomic.AddInt64(&r.stats.totalSpans, 1)
403-
if isTestSpan(span) {
428+
if isTestSpan(span.Tags) {
404429
atomic.AddInt64(&r.stats.totalTestSpans, 1)
405430
}
406431
}
407432

408-
func isTestSpan(span tracer.RawSpan) bool {
409-
return span.Tags["span.kind"] == "test"
433+
func isTestSpan(tags map[string]interface{}) bool {
434+
return tags["span.kind"] == "test"
410435
}

0 commit comments

Comments
 (0)