Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ func (agent *agentS) Ready() bool {

// SendMetrics sends collected entity data to the host agent
func (agent *agentS) SendMetrics(data acceptor.Metrics) error {
agent.mu.RLock()
defer agent.mu.RUnlock()

pid, err := strconv.Atoi(agent.agentComm.from.EntityID)
if err != nil && agent.agentComm.from.EntityID != "" {
agent.logger.Debug("agent got malformed PID %q", agent.agentComm.from.EntityID)
Expand All @@ -159,7 +162,11 @@ func (agent *agentS) SendMetrics(data acceptor.Metrics) error {
}

agent.logger.Error("failed to send metrics to the host agent: ", err)

// We need to release the read lock before calling reset() which acquires a write lock
agent.mu.RUnlock()
agent.reset()
agent.mu.RLock()

return err
}
Expand All @@ -186,6 +193,9 @@ func (agent *agentS) SendEvent(event *EventData) error {

// SendSpans sends collected spans to the host agent
func (agent *agentS) SendSpans(spans []Span) error {
agent.mu.RLock()
defer agent.mu.RUnlock()

for i := range spans {
spans[i].From = agent.agentComm.from
}
Expand All @@ -202,7 +212,11 @@ func (agent *agentS) SendSpans(spans []Span) error {
return nil
} else {
agent.logger.Error("failed to send spans to the host agent: ", err)

// We need to release the read lock before calling reset() which acquires a write lock
agent.mu.RUnlock()
agent.reset()
agent.mu.RLock()
}

return err
Expand All @@ -221,6 +235,9 @@ type hostAgentProfile struct {

// SendProfiles sends profile data to the agent
func (agent *agentS) SendProfiles(profiles []autoprofile.Profile) error {
agent.mu.RLock()
defer agent.mu.RUnlock()

agentProfiles := make([]hostAgentProfile, 0, len(profiles))
for _, p := range profiles {
agentProfiles = append(agentProfiles, hostAgentProfile{p, agent.agentComm.from.EntityID})
Expand All @@ -233,7 +250,11 @@ func (agent *agentS) SendProfiles(profiles []autoprofile.Profile) error {
}

agent.logger.Error("failed to send profile data to the host agent: ", err)

// We need to release the read lock before calling reset() which acquires a write lock
agent.mu.RUnlock()
agent.reset()
agent.mu.RLock()

return err
}
Expand Down
14 changes: 11 additions & 3 deletions delayed_spans.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,24 @@ func (ds *delayedSpans) flush() {
case s := <-ds.spans:
t, ok := s.Tracer().(Tracer)
if !ok {
sensor.logger.Debug("span tracer has unexpected type")
muSensor.Lock()
if sensor != nil {
sensor.logger.Debug("span tracer has unexpected type")
}
muSensor.Unlock()
continue
}

if err := ds.processSpan(s, t.Options()); err != nil {
sensor.logger.Debug("error while processing spans:", err.Error())
muSensor.Lock()
if sensor != nil {
sensor.logger.Debug("error while processing spans:", err.Error())
}
muSensor.Unlock()
continue
}

if sensor.Agent().Ready() {
if isAgentReady() {
s.tracer.recorder.RecordSpan(s)
} else {
ds.append(s)
Expand Down
2 changes: 1 addition & 1 deletion delayed_spans_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestPartiallyFlushDelayedSpans(t *testing.T) {

notReadyAfter := maxDelayedSpans / 10
sensor.agent = &eventuallyNotReadyClient{
notReadyAfter: uint64(notReadyAfter),
notReadyAfter: uint64(notReadyAfter * 2),
}

delayed.flush()
Expand Down
6 changes: 5 additions & 1 deletion event.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ func sendEvent(event *EventData) {

// we do fire & forget here, because the whole pid dance isn't necessary to send events
go func() {
_ = safeSensor().Agent().SendEvent(event)
muSensor.Lock()
if sensor != nil {
_ = sensor.Agent().SendEvent(event)
}
muSensor.Unlock()
}()
}
70 changes: 57 additions & 13 deletions recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,20 @@ type Recorder struct {
func NewRecorder() *Recorder {
r := &Recorder{}

ticker := time.NewTicker(1 * time.Second)
// Create a reference to r that will be captured by the goroutine

go func() {
ticker := time.NewTicker(1 * time.Second)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move ticker creation inside the goroutine to avoid race conditions, to ensures the ticker is only accessible from this goroutine

defer ticker.Stop() // Ensure ticker is stopped when goroutine exits

for range ticker.C {

if isAgentReady() {
go func() {
go func(*Recorder) {
if err := r.Flush(context.Background()); err != nil {
sensor.logger.Error("failed to flush the spans: ", err.Error())
}
}()
}(r)
}
}
}()
Expand All @@ -60,32 +64,51 @@ func NewTestRecorder() *Recorder {
// RecordSpan accepts spans to be recorded and added to the span queue
// for eventual reporting to the host agent.
func (r *Recorder) RecordSpan(span *spanS) {
// Get all sensor-related values under a single lock to minimize contention
muSensor.Lock()
if sensor == nil {
muSensor.Unlock()
return
}

agentReady := sensor != nil && sensor.Agent().Ready()
maxBufferedSpans := sensor.options.MaxBufferedSpans
forceTransmissionAt := sensor.options.ForceTransmissionStartingAt
logger := sensor.logger
muSensor.Unlock()

// If we're not announced and not in test mode then just
// return
if !r.testMode && !sensor.Agent().Ready() {
if !r.testMode && !agentReady {
return
}

r.Lock()
defer r.Unlock()

if len(r.spans) == sensor.options.MaxBufferedSpans {
if len(r.spans) == maxBufferedSpans {
r.spans = r.spans[1:]
}

r.spans = append(r.spans, newSpan(span))

if r.testMode || !sensor.Agent().Ready() {
if r.testMode || !agentReady {
return
}

if len(r.spans) >= sensor.options.ForceTransmissionStartingAt {
sensor.logger.Debug("forcing ", len(r.spans), "span(s) to the agent")
go func() {
if err := r.Flush(context.Background()); err != nil {
sensor.logger.Error("failed to flush the spans: ", err.Error())
if len(r.spans) >= forceTransmissionAt {
logger.Debug("forcing ", len(r.spans), "span(s) to the agent")
// Create a reference to r for this goroutine to avoid race conditions
rec := r
go func(recorder *Recorder) {
if err := recorder.Flush(context.Background()); err != nil {
muSensor.Lock()
if sensor != nil {
sensor.logger.Error("failed to flush the spans: ", err.Error())
}
muSensor.Unlock()
}
}()
}(rec)
}
}

Expand Down Expand Up @@ -114,12 +137,33 @@ func (r *Recorder) GetQueuedSpans() []Span {

// Flush sends queued spans to the agent
func (r *Recorder) Flush(ctx context.Context) error {
// For test mode, we don't want to actually send spans
if r.testMode {
return nil
}

// Check if agent is ready before getting and clearing spans
muSensor.Lock()
if sensor == nil {
muSensor.Unlock()
return nil
}

agent := sensor.Agent()
agentReady := agent.Ready()
muSensor.Unlock()

// If agent is not ready, don't flush spans
if !agentReady {
return nil
}

spansToSend := r.GetQueuedSpans()
if len(spansToSend) == 0 {
return nil
}

if err := sensor.Agent().SendSpans(spansToSend); err != nil {
if err := agent.SendSpans(spansToSend); err != nil {
r.Lock()
defer r.Unlock()

Expand Down
Loading
Loading