Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/system-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
persist-credentials: false
repository: 'DataDog/system-tests'
# Automatically managed, use scripts/update-system-tests-version to update
ref: '6be22a6418b86c3be4920790dfa121149aed2903'
ref: '58543d139f808509bc70e7ea4c7d969557ba3757'

- name: Download wheels to binaries directory
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
Expand Down Expand Up @@ -90,7 +90,7 @@ jobs:
persist-credentials: false
repository: 'DataDog/system-tests'
# Automatically managed, use scripts/update-system-tests-version to update
ref: '6be22a6418b86c3be4920790dfa121149aed2903'
ref: '58543d139f808509bc70e7ea4c7d969557ba3757'

- name: Build runner
uses: ./.github/actions/install_runner
Expand Down Expand Up @@ -275,7 +275,7 @@ jobs:
persist-credentials: false
repository: 'DataDog/system-tests'
# Automatically managed, use scripts/update-system-tests-version to update
ref: '6be22a6418b86c3be4920790dfa121149aed2903'
ref: '58543d139f808509bc70e7ea4c7d969557ba3757'
- name: Download wheels to binaries directory
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
with:
Expand Down
2 changes: 1 addition & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ variables:
DD_VPA_TEMPLATE: "vpa-template-cpu-p70-10percent-2x-oom-min-cap"
# CI_DEBUG_SERVICES: "true"
# Automatically managed, use scripts/update-system-tests-version to update
SYSTEM_TESTS_REF: "6be22a6418b86c3be4920790dfa121149aed2903"
SYSTEM_TESTS_REF: "58543d139f808509bc70e7ea4c7d969557ba3757"

default:
interruptible: true
Expand Down
5 changes: 3 additions & 2 deletions ddtrace/internal/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,16 @@ def flush(self, max_length: int) -> dict:
"""
Flush the endpoints to a payload, returning the first `max` endpoints.
"""
endpoints_snapshot = tuple(self.endpoints)
if max_length >= len(self.endpoints):
res = {
"is_first": self.is_first,
"endpoints": [dataclasses.asdict(ep, dict_factory=_dict_factory) for ep in self.endpoints],
"endpoints": [dataclasses.asdict(ep, dict_factory=_dict_factory) for ep in endpoints_snapshot],
}
self.reset()
return res
else:
batch = [self.endpoints.pop() for _ in range(max_length)]
batch = tuple(self.endpoints.pop() for _ in range(max_length))
res = {
"is_first": self.is_first,
"endpoints": [dataclasses.asdict(ep, dict_factory=_dict_factory) for ep in batch],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
Fix a potential race condition in the tracer.
179 changes: 179 additions & 0 deletions tests/internal/test_endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import threading
from time import sleep

import pytest

from ddtrace.internal.endpoints import HttpEndPointsCollection


@pytest.fixture
def collection():
coll = HttpEndPointsCollection()
coll.reset()
yield coll
coll.reset()


def test_flush_uses_tuple_snapshot(collection):
"""Test that flush() operates on a tuple snapshot, not the original set."""
collection.add_endpoint("GET", "/api/users")
collection.add_endpoint("POST", "/api/users")
collection.add_endpoint("DELETE", "/api/users/123")

assert len(collection.endpoints) == 3
result = collection.flush(max_length=10)
assert result["is_first"] is True
assert len(result["endpoints"]) == 3
assert len(collection.endpoints) == 0


def test_flush_snapshot_prevents_modification_during_iteration(collection):
"""Test that modifying self.endpoints during flush iteration doesn't cause RuntimeError."""
collection.add_endpoint("GET", "/api/v1")
collection.add_endpoint("POST", "/api/v2")
collection.add_endpoint("PUT", "/api/v3")

initial_count = len(collection.endpoints)
assert initial_count == 3
result = collection.flush(max_length=10)

assert len(result["endpoints"]) == initial_count
assert result["is_first"] is True


def test_concurrent_add_during_flush_does_not_break_iteration(collection):
"""Test that adding endpoints from another thread during flush doesn't cause RuntimeError."""
for i in range(5):
collection.add_endpoint("GET", f"/api/endpoint{i}")

assert len(collection.endpoints) == 5

flush_completed = threading.Event()
flush_result = {}
exception_caught = []

def flush_thread():
try:
result = collection.flush(max_length=10)
flush_result["data"] = result
flush_completed.set()
except Exception as e:
exception_caught.append(e)
flush_completed.set()

def add_thread():
sleep(0.001)

# Try to modify the set while flush might be iterating
for i in range(5, 10):
collection.add_endpoint("POST", f"/api/new{i}")
sleep(0.001)

t1 = threading.Thread(target=flush_thread)
t2 = threading.Thread(target=add_thread)

t1.start()
t2.start()

t1.join(timeout=2.0)
t2.join(timeout=2.0)

assert flush_completed.is_set(), "Flush did not complete"
assert len(exception_caught) == 0, f"Exception occurred during flush: {exception_caught}"
assert "data" in flush_result, "Flush did not return a result"

result = flush_result["data"]
assert "endpoints" in result
assert "is_first" in result


def test_flush_with_partial_batch(collection):
"""Test that flush creates a tuple snapshot even when using pop() for partial batches."""
for i in range(10):
collection.add_endpoint("GET", f"/api/endpoint{i}")

assert len(collection.endpoints) == 10

result = collection.flush(max_length=5)

assert len(result["endpoints"]) == 5
assert result["is_first"] is True
assert len(collection.endpoints) == 5

result2 = collection.flush(max_length=10)
assert len(result2["endpoints"]) == 5
assert result2["is_first"] is False # Not first anymore

assert len(collection.endpoints) == 0


def test_partial_flush_with_concurrent_modification(collection):
"""Test that partial flush (max_length < size) is safe from race conditions."""
for i in range(10):
collection.add_endpoint("GET", f"/api/endpoint{i}")

assert len(collection.endpoints) == 10

flush_completed = threading.Event()
flush_result = {}
exception_caught = []

def flush_thread():
try:
# Partial flush - this should trigger the else branch at line 118
result = collection.flush(max_length=5)
flush_result["data"] = result
flush_completed.set()
except Exception as e:
exception_caught.append(e)
flush_completed.set()

def add_thread():
sleep(0.001)
# Try to modify the set while flush might be iterating
for i in range(10, 15):
collection.add_endpoint("POST", f"/api/new{i}")
sleep(0.001)

t1 = threading.Thread(target=flush_thread)
t2 = threading.Thread(target=add_thread)

t1.start()
t2.start()

t1.join(timeout=2.0)
t2.join(timeout=2.0)

assert flush_completed.is_set(), "Flush did not complete"
assert len(exception_caught) == 0, f"Exception occurred during flush: {exception_caught}"
assert "data" in flush_result, "Flush did not return a result"

result = flush_result["data"]
assert len(result["endpoints"]) == 5
assert "is_first" in result


def test_http_endpoint_hash_consistency(collection):
"""Test that HttpEndPoint hashing works correctly for set operations."""
collection.add_endpoint("GET", "/api/test")
collection.add_endpoint("GET", "/api/test")
assert len(collection.endpoints) == 1

collection.add_endpoint("POST", "/api/test")
collection.add_endpoint("GET", "/api/other")
assert len(collection.endpoints) == 3


def test_snapshot_is_tuple_type(collection):
"""Verify that the snapshot created in flush is actually a tuple."""
collection.add_endpoint("GET", "/test")
collection.add_endpoint("POST", "/test")
assert isinstance(collection.endpoints, set)

result = collection.flush(max_length=10)
assert len(result["endpoints"]) == 2

for ep in result["endpoints"]:
assert isinstance(ep, dict)
assert "method" in ep
assert "path" in ep
Loading