Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 83 additions & 49 deletions src/buildkite_test_collector/collector/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from .instant import Instant
from .run_env import RunEnv

JsonValue = Union[str, int, float, bool, 'JsonDict', Tuple['JsonValue']]
JsonValue = Union[str, int, float, bool, "JsonDict", Tuple["JsonValue"]]
JsonDict = Dict[str, JsonValue]

# pylint: disable=C0103 disable=W0622 disable=R0913
Expand All @@ -24,6 +24,7 @@ class TestResultPassed:
@dataclass(frozen=True)
class TestResultFailed:
"""Represents a failed test result"""

failure_reason: Optional[str]
failure_expanded: Optional[Iterable[Mapping[str, Iterable[str]]]] = None

Expand All @@ -40,19 +41,51 @@ class TestSpan:

Buildkite Test Analtics supports some basic tracing to allow insight into
the runtime performance your tests.

The detail field should be a dict matching the expected format for each section type:
- sql: {"query": str}
- annotation: {"content": str}
- http: {"method": str, "url": str, "lib": str}
- sleep: no detail required

See: https://buildkite.com/docs/test-engine/test-collection/importing-json#json-test-results-data-reference-detail-objects # pylint: disable=C0301
"""
section: Literal['http', 'sql', 'sleep', 'annotation']

section: Literal["http", "sql", "sleep", "annotation"]
duration: timedelta
start_at: Optional[Instant] = None
end_at: Optional[Instant] = None
detail: Optional[str] = None
detail: Optional[Dict[str, str]] = None

def __post_init__(self):
"""Validate detail structure matches the section type requirements"""
if self.section == "sleep":
# Sleep spans don't need detail, so no validation is required
return

if self.detail is None:
raise TypeError(
"detail is requred for 'sql', 'annotation' and 'http' spans"
)

if not isinstance(self.detail, dict):
raise TypeError(f"detail must be a dict, got {type(self.detail).__name__}")

if self.section == "sql":
if "query" not in self.detail:
raise ValueError("SQL span detail must contain 'query' field")
elif self.section == "annotation":
if "content" not in self.detail:
raise ValueError("Annotation span detail must contain 'content' field")
elif self.section == "http":
required = {"method", "url", "lib"}
missing = required - set(self.detail.keys())
if missing:
raise ValueError(f"HTTP span detail missing required fields: {missing}")

def as_json(self, started_at: Instant) -> JsonDict:
"""Convert this span into a Dict for eventual serialisation into JSON"""
attrs = {
"section": self.section,
"duration": self.duration.total_seconds()
}
attrs = {"section": self.section, "duration": self.duration.total_seconds()}

if self.detail is not None:
attrs["detail"] = self.detail
Expand All @@ -75,24 +108,25 @@ class TestHistory:
the runtime performance your tests. This object is the top-level of that
tracing tree.
"""

start_at: Optional[Instant] = None
end_at: Optional[Instant] = None
duration: Optional[timedelta] = None
children: List['TestSpan'] = ()
children: List["TestSpan"] = ()

def is_finished(self) -> bool:
"""Is there an end_at time present?"""
return self.end_at is not None

def push_span(self, span: TestSpan) -> 'TestHistory':
def push_span(self, span: TestSpan) -> "TestHistory":
"""Add a new span to the children"""
return replace(self, children=self.children + tuple([span]))

def as_json(self, started_at: Instant) -> JsonDict:
"""Convert this trace into a Dict for eventual serialisation into JSON"""
attrs = {
"section": "top",
"children": list(map(lambda span: span.as_json(started_at), self.children))
"children": list(map(lambda span: span.as_json(started_at), self.children)),
}

if self.start_at is not None:
Expand All @@ -110,6 +144,7 @@ def as_json(self, started_at: Instant) -> JsonDict:
@dataclass(frozen=True)
class TestData:
"""An individual test execution"""

# 8 attributes for this class seems reasonable
# pylint: disable=too-many-instance-attributes
id: UUID
Expand All @@ -118,28 +153,30 @@ class TestData:
history: TestHistory
location: Optional[str] = None
file_name: Optional[str] = None
tags: Dict[str,str] = field(default_factory=dict)
result: Union[TestResultPassed, TestResultFailed,
TestResultSkipped, None] = None
tags: Dict[str, str] = field(default_factory=dict)
result: Union[TestResultPassed, TestResultFailed, TestResultSkipped, None] = None

@classmethod
def start(cls, id: UUID,
*,
scope: str,
name: str,
location: Optional[str] = None,
file_name: Optional[str] = None) -> 'TestData':
def start(
cls,
id: UUID,
*,
scope: str,
name: str,
location: Optional[str] = None,
file_name: Optional[str] = None,
) -> "TestData":
"""Build a new instance with it's start_at time set to now"""
return cls(
id=id,
scope=scope,
name=name,
location=location,
file_name=file_name,
history=TestHistory(start_at=Instant.now())
history=TestHistory(start_at=Instant.now()),
)

def tag_execution(self, key: str, val: str) -> 'TestData':
def tag_execution(self, key: str, val: str) -> "TestData":
"""Set tag to test execution"""
if not isinstance(key, str) or not isinstance(val, str):
raise TypeError("Expected string for key and value")
Expand All @@ -148,35 +185,37 @@ def tag_execution(self, key: str, val: str) -> 'TestData':
new_tags[key] = val
return replace(self, tags=new_tags)

def finish(self) -> 'TestData':
def finish(self) -> "TestData":
"""Set the end_at and duration on this test"""
if self.is_finished():
return self

end_at = Instant.now()
duration = end_at - self.history.start_at
return replace(self, history=replace(self.history,
end_at=end_at,
duration=duration))
return replace(
self, history=replace(self.history, end_at=end_at, duration=duration)
)

def passed(self) -> 'TestData':
def passed(self) -> "TestData":
"""Mark this test as passed"""
return replace(self, result=TestResultPassed())

def failed(self, failure_reason=None, failure_expanded=None) -> 'TestData':
def failed(self, failure_reason=None, failure_expanded=None) -> "TestData":
"""Mark this test as failed"""
result = TestResultFailed(failure_reason=failure_reason, failure_expanded=failure_expanded)
result = TestResultFailed(
failure_reason=failure_reason, failure_expanded=failure_expanded
)
return replace(self, result=result)

def skipped(self) -> 'TestData':
def skipped(self) -> "TestData":
"""Mark this test as skipped"""
return replace(self, result=TestResultSkipped())

def is_finished(self) -> bool:
"""Does this test have an end_at time?"""
return self.history and self.history.is_finished()

def push_span(self, span: TestSpan) -> 'TestData':
def push_span(self, span: TestSpan) -> "TestData":
"""Add a span to the test history"""
return replace(self, history=self.history.push_span(span))

Expand All @@ -188,7 +227,7 @@ def as_json(self, started_at: Instant) -> JsonDict:
"name": self.name,
"location": self.location,
"file_name": self.file_name,
"history": self.history.as_json(started_at)
"history": self.history.as_json(started_at),
}

if len(self.tags) > 0:
Expand All @@ -213,60 +252,55 @@ def as_json(self, started_at: Instant) -> JsonDict:
@dataclass(frozen=True)
class Payload:
"""The full test analytics payload"""

run_env: RunEnv
data: Tuple[TestData]
started_at: Optional[Instant]
finished_at: Optional[Instant]

@classmethod
def init(cls, run_env: RunEnv) -> 'Payload':
def init(cls, run_env: RunEnv) -> "Payload":
"""Create a new instance of payload with the provided RunEnv"""

return cls(
run_env=run_env,
data=(),
started_at=None,
finished_at=None
)
return cls(run_env=run_env, data=(), started_at=None, finished_at=None)

def as_json(self) -> JsonDict:
"""Convert into a Dict suitable for eventual serialisation to JSON"""
finished_data = list(filter(
lambda td: td.is_finished(),
self.data
))
finished_data = list(filter(lambda td: td.is_finished(), self.data))

if len(finished_data) < len(self.data):
logger.warning('Unexpected unfinished test data, skipping unfinished test records...')
logger.warning(
"Unexpected unfinished test data, skipping unfinished test records..."
)

return {
"format": "json",
"run_env": self.run_env.as_json(),
"data": tuple(map(lambda td: td.as_json(self.started_at), finished_data)),
}

def push_test_data(self, report: TestData) -> 'Payload':
def push_test_data(self, report: TestData) -> "Payload":
"""Append a test-data to the payload"""
return replace(self, data=self.data + tuple([report]))

def is_started(self) -> bool:
"""Returns true of the payload has been started"""
return self.started_at is not None

def started(self) -> 'Payload':
def started(self) -> "Payload":
"""Mark the payload as started (ie the suite has started)"""
return replace(self, started_at=Instant.now())

def into_batches(self, batch_size=100) -> Tuple['Payload']:
def into_batches(self, batch_size=100) -> Tuple["Payload"]:
"""Convert the payload into a collection of payloads based on the batch size"""
return self.__into_batches(self.data, tuple(), batch_size)

def __into_batches(self, data, batches, batch_size) -> Tuple['Payload']:
def __into_batches(self, data, batches, batch_size) -> Tuple["Payload"]:
if len(data) <= batch_size:
return batches + tuple([replace(self, data=data)])

next_batch = data[0:batch_size]
next_data = data[batch_size:]
return self.__into_batches(
next_data,
batches + tuple([replace(self, data=next_batch)]), batch_size)
next_data, batches + tuple([replace(self, data=next_batch)]), batch_size
)
15 changes: 13 additions & 2 deletions src/buildkite_test_collector/pytest_plugin/span_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,28 @@ def record(self, span: TestSpan) -> None:

@contextmanager
def measure(self, section: Literal['http', 'sql', 'sleep', 'annotation'],
detail: Optional[str] = None) -> Any:
detail: Optional[dict] = None) -> Any:
"""
Measure the execution time of some code and record it as a span.

The detail parameter should be a dict matching the expected format for each section type:
- sql: {"query": str}
- annotation: {"content": str}
- http: {"method": str, "url": str, "lib": str}
- sleep: no detail required

Example:

.. code-block:: python

def test_measure_http_request(spans):
with spans.measure('http', 'The koan of Github'):
detail = {'method': 'GET', 'url': 'https://api.github.com/zen', 'lib': 'requests'}
with spans.measure('http', detail):
requests.get("https://api.github.com/zen")

def test_measure_sql_query(spans):
with spans.measure('sql', {'query': 'SELECT * FROM users'}):
db.execute('SELECT * FROM users')
"""
start_at = Instant.now()
try:
Expand Down
Loading