Skip to content
This repository was archived by the owner on Aug 17, 2020. It is now read-only.

Commit 5a51a81

Browse files
committed
recorder concurrency support using channels.
1 parent b1a706b commit 5a51a81

File tree

5 files changed

+146
-56
lines changed

5 files changed

+146
-56
lines changed

agent/agent.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type (
4646
recorder *SpanRecorder
4747
recorderFilename string
4848
flushFrequency time.Duration
49+
concurrencyLevel int
4950

5051
optionalRecorders []tracer.SpanRecorder
5152

@@ -62,8 +63,8 @@ type (
6263
var (
6364
version = "0.1.13-pre5"
6465

65-
testingModeFrequency = time.Second
66-
nonTestingModeFrequency = time.Minute
66+
testingModeFrequency = time.Duration(env.ScopeTracerDispatcherHealthcheckFrequencyInTestMode.Value) * time.Millisecond
67+
nonTestingModeFrequency = time.Duration(env.ScopeTracerDispatcherHealthcheckFrequency.Value) * time.Millisecond
6768
)
6869

6970
func WithApiKey(apiKey string) Option {
@@ -185,6 +186,7 @@ func NewAgent(options ...Option) (*Agent, error) {
185186
agent.userAgent = fmt.Sprintf("scope-agent-go/%s", agent.version)
186187
agent.panicAsFail = false
187188
agent.failRetriesCount = 0
189+
agent.concurrencyLevel = env.ScopeTracerDispatcherConcurrencyLevel.Value
188190

189191
for _, opt := range options {
190192
opt(agent)

agent/recorder.go

Lines changed: 114 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ type (
4848
logger *log.Logger
4949
stats *RecorderStats
5050
statsOnce sync.Once
51+
52+
concurrencyLevel int
53+
workerJobs chan *workerJob
54+
workerResults chan *workerResult
5155
}
5256
RecorderStats struct {
5357
totalSpans int64
@@ -66,6 +70,18 @@ type (
6670

6771
PayloadSpan map[string]interface{}
6872
PayloadEvent map[string]interface{}
73+
74+
workerJob struct {
75+
spans []PayloadSpan
76+
totalSpans int
77+
events []PayloadEvent
78+
totalEvents int
79+
}
80+
workerResult struct {
81+
workerId int
82+
error error
83+
shouldExit bool
84+
}
6985
)
7086

7187
func NewSpanRecorder(agent *Agent) *SpanRecorder {
@@ -79,9 +95,12 @@ func NewSpanRecorder(agent *Agent) *SpanRecorder {
7995
r.metadata = agent.metadata
8096
r.logger = agent.logger
8197
r.flushFrequency = agent.flushFrequency
98+
r.concurrencyLevel = agent.concurrencyLevel
8299
r.url = agent.getUrl("api/agent/ingest")
83100
r.client = &http.Client{}
84101
r.stats = &RecorderStats{}
102+
r.logger.Printf("recorder frequency: %v", agent.flushFrequency)
103+
r.logger.Printf("recorder concurrency level: %v", agent.concurrencyLevel)
85104
r.t.Go(r.loop)
86105
return r
87106
}
@@ -103,8 +122,18 @@ func (r *SpanRecorder) RecordSpan(span tracer.RawSpan) {
103122

104123
func (r *SpanRecorder) loop() error {
105124
defer func() {
125+
close(r.workerJobs)
126+
close(r.workerResults)
106127
r.logger.Println("recorder has been stopped.")
107128
}()
129+
130+
// start workers
131+
r.workerJobs = make(chan *workerJob, r.concurrencyLevel)
132+
r.workerResults = make(chan *workerResult, r.concurrencyLevel)
133+
for i := 0; i < r.concurrencyLevel; i++ {
134+
go r.worker(i + 1)
135+
}
136+
108137
ticker := time.NewTicker(1 * time.Second)
109138
cTime := time.Now()
110139
for {
@@ -144,51 +173,104 @@ func (r *SpanRecorder) sendSpans() (error, bool) {
144173
atomic.AddInt64(&r.stats.sendSpansCalls, 1)
145174
const batchSize = 1000
146175
var lastError error
176+
var jobs int
147177
for {
148178
spans, spMore, spTotal := r.popPayloadSpan(batchSize)
149179
events, evMore, evTotal := r.popPayloadEvents(batchSize)
150180

151-
payload := map[string]interface{}{
152-
"metadata": r.metadata,
153-
"spans": spans,
154-
"events": events,
155-
tags.AgentID: r.agentId,
181+
r.workerJobs <- &workerJob{
182+
spans: spans,
183+
totalSpans: spTotal,
184+
events: events,
185+
totalEvents: evTotal,
156186
}
157-
buf, err := encodePayload(payload)
158-
if err != nil {
159-
atomic.AddInt64(&r.stats.sendSpansKo, 1)
160-
atomic.AddInt64(&r.stats.spansNotSent, int64(len(spans)))
161-
return err, false
162-
}
163-
164-
var testSpans int64
165-
for _, span := range spans {
166-
if isTestSpan(span) {
167-
testSpans++
187+
jobs++
188+
189+
if len(r.workerResults) > 0 {
190+
// We check if a previous result call the cancellation of the send
191+
result := <-r.workerResults
192+
lastError = result.error
193+
jobs--
194+
if result.shouldExit {
195+
r.logger.Printf("worker %d: received a should exit response", result.workerId)
196+
for i := 0; i < jobs; i++ {
197+
<-r.workerResults
198+
}
199+
return result.error, result.shouldExit
168200
}
169201
}
170202

171-
r.logger.Printf("sending %d/%d spans with %d/%d events", len(spans), spTotal, len(events), evTotal)
172-
statusCode, err := r.callIngest(buf)
173-
if err != nil {
174-
atomic.AddInt64(&r.stats.sendSpansKo, 1)
175-
atomic.AddInt64(&r.stats.spansNotSent, int64(len(spans)))
176-
atomic.AddInt64(&r.stats.testSpansNotSent, testSpans)
177-
} else {
178-
atomic.AddInt64(&r.stats.sendSpansOk, 1)
179-
atomic.AddInt64(&r.stats.spansSent, int64(len(spans)))
180-
atomic.AddInt64(&r.stats.testSpansSent, testSpans)
203+
if !spMore && !evMore {
204+
break
181205
}
182-
if statusCode == 401 {
183-
return err, true
206+
}
207+
shouldExit := false
208+
for i := 0; i < jobs; i++ {
209+
result := <-r.workerResults
210+
lastError = result.error
211+
if result.shouldExit {
212+
r.logger.Printf("worker %d: received a should exit response", result.workerId)
213+
shouldExit = true
184214
}
185-
lastError = err
215+
}
216+
return lastError, shouldExit
217+
}
186218

187-
if !spMore && !evMore {
188-
break
219+
func (r *SpanRecorder) worker(id int) {
220+
for {
221+
select {
222+
case j, ok := <-r.workerJobs:
223+
if !ok {
224+
if r.debugMode {
225+
r.logger.Printf("exiting from worker: %d", id)
226+
}
227+
return
228+
}
229+
230+
payload := map[string]interface{}{
231+
"metadata": r.metadata,
232+
"spans": j.spans,
233+
"events": j.events,
234+
tags.AgentID: r.agentId,
235+
}
236+
237+
buf, err := encodePayload(payload)
238+
if err != nil {
239+
atomic.AddInt64(&r.stats.sendSpansKo, 1)
240+
atomic.AddInt64(&r.stats.spansNotSent, int64(len(j.spans)))
241+
r.workerResults <- &workerResult{
242+
workerId: id,
243+
error: err,
244+
shouldExit: false,
245+
}
246+
continue
247+
}
248+
249+
var testSpans int64
250+
for _, span := range j.spans {
251+
if isTestSpan(span) {
252+
testSpans++
253+
}
254+
}
255+
256+
r.logger.Printf("worker %d: sending %d/%d spans with %d/%d events", id, len(j.spans), j.totalSpans, len(j.events), j.totalEvents)
257+
statusCode, err := r.callIngest(buf)
258+
if err != nil {
259+
atomic.AddInt64(&r.stats.sendSpansKo, 1)
260+
atomic.AddInt64(&r.stats.spansNotSent, int64(len(j.spans)))
261+
atomic.AddInt64(&r.stats.testSpansNotSent, testSpans)
262+
} else {
263+
atomic.AddInt64(&r.stats.sendSpansOk, 1)
264+
atomic.AddInt64(&r.stats.spansSent, int64(len(j.spans)))
265+
atomic.AddInt64(&r.stats.testSpansSent, testSpans)
266+
}
267+
r.workerResults <- &workerResult{
268+
workerId: id,
269+
error: err,
270+
shouldExit: statusCode == 401,
271+
}
189272
}
190273
}
191-
return lastError, false
192274
}
193275

194276
// Stop recorder

env/vars.go

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,27 @@ package env
33
import "go.undefinedlabs.com/scopeagent/tags"
44

55
var (
6-
ScopeDsn = newStringEnvVar("", "SCOPE_DSN")
7-
ScopeApiKey = newStringEnvVar("", "SCOPE_APIKEY")
8-
ScopeApiEndpoint = newStringEnvVar("https://app.scope.dev", "SCOPE_API_ENDPOINT")
9-
ScopeService = newStringEnvVar("default", "SCOPE_SERVICE")
10-
ScopeRepository = newStringEnvVar("", "SCOPE_REPOSITORY")
11-
ScopeCommitSha = newStringEnvVar("", "SCOPE_COMMIT_SHA")
12-
ScopeBranch = newStringEnvVar("", "SCOPE_BRANCH")
13-
ScopeSourceRoot = newStringEnvVar("", "SCOPE_SOURCE_ROOT")
14-
ScopeLoggerRoot = newStringEnvVar("", "SCOPE_LOGGER_ROOT", "SCOPE_LOG_ROOT_PATH")
15-
ScopeDebug = newBooleanEnvVar(false, "SCOPE_DEBUG")
16-
ScopeTracerGlobal = newBooleanEnvVar(false, "SCOPE_TRACER_GLOBAL", "SCOPE_SET_GLOBAL_TRACER")
17-
ScopeTestingMode = newBooleanEnvVar(false, "SCOPE_TESTING_MODE")
18-
ScopeTestingFailRetries = newIntEnvVar(0, "SCOPE_TESTING_FAIL_RETRIES")
19-
ScopeTestingPanicAsFail = newBooleanEnvVar(false, "SCOPE_TESTING_PANIC_AS_FAIL")
20-
ScopeConfiguration = newSliceEnvVar([]string{tags.PlatformName, tags.PlatformArchitecture, tags.GoVersion}, "SCOPE_CONFIGURATION")
21-
ScopeMetadata = newMapEnvVar(nil, "SCOPE_METADATA")
22-
ScopeInstrumentationHttpPayloads = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_HTTP_PAYLOADS")
23-
ScopeInstrumentationHttpStacktrace = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_HTTP_STACKTRACE")
24-
ScopeInstrumentationDbStatementValues = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_DB_STATEMENT_VALUES")
25-
ScopeInstrumentationDbStacktrace = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_DB_STACKTRACE")
6+
ScopeDsn = newStringEnvVar("", "SCOPE_DSN")
7+
ScopeApiKey = newStringEnvVar("", "SCOPE_APIKEY")
8+
ScopeApiEndpoint = newStringEnvVar("https://app.scope.dev", "SCOPE_API_ENDPOINT")
9+
ScopeService = newStringEnvVar("default", "SCOPE_SERVICE")
10+
ScopeRepository = newStringEnvVar("", "SCOPE_REPOSITORY")
11+
ScopeCommitSha = newStringEnvVar("", "SCOPE_COMMIT_SHA")
12+
ScopeBranch = newStringEnvVar("", "SCOPE_BRANCH")
13+
ScopeSourceRoot = newStringEnvVar("", "SCOPE_SOURCE_ROOT")
14+
ScopeLoggerRoot = newStringEnvVar("", "SCOPE_LOGGER_ROOT", "SCOPE_LOG_ROOT_PATH")
15+
ScopeDebug = newBooleanEnvVar(false, "SCOPE_DEBUG")
16+
ScopeTracerGlobal = newBooleanEnvVar(false, "SCOPE_TRACER_GLOBAL", "SCOPE_SET_GLOBAL_TRACER")
17+
ScopeTestingMode = newBooleanEnvVar(false, "SCOPE_TESTING_MODE")
18+
ScopeTestingFailRetries = newIntEnvVar(0, "SCOPE_TESTING_FAIL_RETRIES")
19+
ScopeTestingPanicAsFail = newBooleanEnvVar(false, "SCOPE_TESTING_PANIC_AS_FAIL")
20+
ScopeConfiguration = newSliceEnvVar([]string{tags.PlatformName, tags.PlatformArchitecture, tags.GoVersion}, "SCOPE_CONFIGURATION")
21+
ScopeMetadata = newMapEnvVar(nil, "SCOPE_METADATA")
22+
ScopeInstrumentationHttpPayloads = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_HTTP_PAYLOADS")
23+
ScopeInstrumentationHttpStacktrace = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_HTTP_STACKTRACE")
24+
ScopeInstrumentationDbStatementValues = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_DB_STATEMENT_VALUES")
25+
ScopeInstrumentationDbStacktrace = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_DB_STACKTRACE")
26+
ScopeTracerDispatcherHealthcheckFrequency = newIntEnvVar(60000, "SCOPE_TRACER_DISPATCHER_HEALTHCHECK_FRECUENCY")
27+
ScopeTracerDispatcherHealthcheckFrequencyInTestMode = newIntEnvVar(1000, "SCOPE_TRACER_DISPATCHER_HEALTHCHECK_FRECUENCY_IN_TESTMODE")
28+
ScopeTracerDispatcherConcurrencyLevel = newIntEnvVar(5, "SCOPE_TRACER_DISPATCHER_CONCURRENCY_LEVEL")
2629
)

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@ require (
1616
github.com/undefinedlabs/go-mpatch v0.0.0-20200122175732-0044123dbb98
1717
github.com/vmihailenco/msgpack v4.0.4+incompatible
1818
golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4 // indirect
19+
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 // indirect
1920
golang.org/x/net v0.0.0-20200301022130-244492dfa37a
2021
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 // indirect
2122
google.golang.org/appengine v1.6.5 // indirect
2223
google.golang.org/grpc v1.27.1
2324
gopkg.in/src-d/go-git.v4 v4.13.1
2425
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
26+
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc // indirect
2527
)

init.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"testing"
1212

1313
"go.undefinedlabs.com/scopeagent/agent"
14+
"go.undefinedlabs.com/scopeagent/errors"
1415
"go.undefinedlabs.com/scopeagent/instrumentation"
1516
"go.undefinedlabs.com/scopeagent/instrumentation/logging"
1617
scopetesting "go.undefinedlabs.com/scopeagent/instrumentation/testing"
@@ -52,11 +53,11 @@ func Run(m *testing.M, opts ...agent.Option) int {
5253
os.Exit(1)
5354
}()
5455
reflection.AddPanicHandler(func(e interface{}) {
55-
instrumentation.Logger().Printf("Panic handler triggered by: %v,\nFlushing agent, sending partial results...", e)
56+
instrumentation.Logger().Printf("Panic handler triggered by: %v.\nFlushing agent, sending partial results...", errors.GetCurrentError(e).ErrorStack())
5657
newAgent.Flush()
5758
})
5859
reflection.AddOnPanicExitHandler(func(e interface{}) {
59-
instrumentation.Logger().Printf("Process is going to end by: %v,\nStopping agent...", e)
60+
instrumentation.Logger().Printf("Process is going to end by: %v,\nStopping agent...", errors.GetCurrentError(e).ErrorStack())
6061
scopetesting.PanicAllRunningTests(e, 3)
6162
newAgent.Stop()
6263
})

0 commit comments

Comments
 (0)