@@ -3,6 +3,7 @@ package agent
33import (
44 "bytes"
55 "compress/gzip"
6+ "context"
67 "crypto/x509"
78 "errors"
89 "fmt"
@@ -16,6 +17,7 @@ import (
1617
1718 "github.com/google/uuid"
1819 "github.com/vmihailenco/msgpack"
20+ "golang.org/x/sync/semaphore"
1921 "gopkg.in/tomb.v2"
2022
2123 "go.undefinedlabs.com/scopeagent/tags"
4547 concurrencyLevel int
4648 url string
4749 client * http.Client
50+ s * semaphore.Weighted
4851
4952 logger * log.Logger
5053 stats * RecorderStats
@@ -84,7 +87,7 @@ func NewSpanRecorder(agent *Agent) *SpanRecorder {
8487 r .url = agent .getUrl ("api/agent/ingest" )
8588 r .client = & http.Client {}
8689 r .stats = & RecorderStats {}
87-
90+ r . s = semaphore . NewWeighted ( int64 ( r . concurrencyLevel ))
8891 r .logger .Printf ("recorder frequency: %v" , agent .flushFrequency )
8992 r .logger .Printf ("recorder concurrency level: %v" , agent .concurrencyLevel )
9093 r .t .Go (r .loop )
@@ -146,48 +149,91 @@ func (r *SpanRecorder) loop() error {
146149
147150// Sends the spans in the buffer to Scope
148151func (r * SpanRecorder ) sendSpans () (error , bool ) {
152+ defer func () {
153+ // We acquire all to ensure all previous go routines has finished before leaving this function
154+ if r .s .Acquire (context .Background (), int64 (r .concurrencyLevel )) == nil {
155+ r .s .Release (int64 (r .concurrencyLevel ))
156+ }
157+ }()
149158 atomic .AddInt64 (& r .stats .sendSpansCalls , 1 )
150159 const batchSize = 1000
160+
161+ // Local mutex to synchronize go routines and avoid race conditions in lastError var
162+ var lastErrorMutex sync.Mutex
151163 var lastError error
164+ getLastError := func () error {
165+ lastErrorMutex .Lock ()
166+ defer lastErrorMutex .Unlock ()
167+ return lastError
168+ }
169+ setLastError := func (err error ) {
170+ lastErrorMutex .Lock ()
171+ defer lastErrorMutex .Unlock ()
172+ lastError = err
173+ }
174+
175+ var shouldCancel int32
176+
152177 for {
153178 spans , spMore , spTotal := r .popPayloadSpan (batchSize )
154179 events , evMore , evTotal := r .popPayloadEvents (batchSize )
155180
156- payload := map [string ]interface {}{
157- "metadata" : r .metadata ,
158- "spans" : spans ,
159- "events" : events ,
160- tags .AgentID : r .agentId ,
161- }
162- buf , err := encodePayload (payload )
181+ // We acquire one concurrency slot, if the concurrency level has been reached, we wait until a release
182+ err := r .s .Acquire (context .Background (), 1 )
163183 if err != nil {
164184 atomic .AddInt64 (& r .stats .sendSpansKo , 1 )
165185 atomic .AddInt64 (& r .stats .spansNotSent , int64 (len (spans )))
166186 return err , false
167187 }
188+ // If we had acquire then a previous go routine has finished, we check if the shouldCancel has been set from previous goroutines
189+ if atomic .LoadInt32 (& shouldCancel ) > 0 {
190+ return getLastError (), true
191+ }
168192
169- var testSpans int64
170- for _ , span := range spans {
171- if isTestSpan (span ) {
172- testSpans ++
193+ go func (sp []PayloadSpan , ev []PayloadEvent , spTotalCount , evTotalCount int ) {
194+ defer r .s .Release (1 )
195+ payload := map [string ]interface {}{
196+ "metadata" : r .metadata ,
197+ "spans" : sp ,
198+ "events" : ev ,
199+ tags .AgentID : r .agentId ,
200+ }
201+ buf , err := encodePayload (payload )
202+ if err != nil {
203+ atomic .AddInt64 (& r .stats .sendSpansKo , 1 )
204+ atomic .AddInt64 (& r .stats .spansNotSent , int64 (len (sp )))
205+ setLastError (err )
206+ return
207+ }
208+ var testSpans int64
209+ for _ , span := range sp {
210+ if isTestSpan (span ) {
211+ testSpans ++
212+ }
173213 }
174- }
175214
176- r .logger .Printf ("sending %d/%d spans with %d/%d events" , len (spans ), spTotal , len (events ), evTotal )
177- statusCode , err := r .callIngest (buf )
178- if err != nil {
179- atomic .AddInt64 (& r .stats .sendSpansKo , 1 )
180- atomic .AddInt64 (& r .stats .spansNotSent , int64 (len (spans )))
181- atomic .AddInt64 (& r .stats .testSpansNotSent , testSpans )
182- } else {
183- atomic .AddInt64 (& r .stats .sendSpansOk , 1 )
184- atomic .AddInt64 (& r .stats .spansSent , int64 (len (spans )))
185- atomic .AddInt64 (& r .stats .testSpansSent , testSpans )
186- }
187- if statusCode == 401 {
188- return err , true
189- }
190- lastError = err
215+ if len (sp ) == 0 && len (ev ) == 0 {
216+ r .logger .Print ("sending health check" )
217+ } else {
218+ r .logger .Printf ("sending %d/%d spans with %d/%d events" , len (sp ), spTotalCount , len (ev ), evTotalCount )
219+ }
220+ statusCode , err := r .callIngest (buf )
221+ if err != nil {
222+ atomic .AddInt64 (& r .stats .sendSpansKo , 1 )
223+ atomic .AddInt64 (& r .stats .spansNotSent , int64 (len (spans )))
224+ atomic .AddInt64 (& r .stats .testSpansNotSent , testSpans )
225+ setLastError (err )
226+ } else {
227+ atomic .AddInt64 (& r .stats .sendSpansOk , 1 )
228+ atomic .AddInt64 (& r .stats .spansSent , int64 (len (spans )))
229+ atomic .AddInt64 (& r .stats .testSpansSent , testSpans )
230+ }
231+ if statusCode == 401 {
232+ atomic .AddInt32 (& shouldCancel , 1 )
233+ return
234+ }
235+
236+ }(spans , events , spTotal , evTotal )
191237
192238 if ! spMore && ! evMore {
193239 break
0 commit comments