diff --git a/products/tasks/backend/api.py b/products/tasks/backend/api.py index 982702379a56..4a4c64df1d25 100644 --- a/products/tasks/backend/api.py +++ b/products/tasks/backend/api.py @@ -30,6 +30,7 @@ from posthog.event_usage import groups from posthog.permissions import APIScopePermission from posthog.rate_limit import CodeInviteThrottle +from posthog.renderers import ServerSentEventRenderer from posthog.storage import object_storage from ee.hogai.utils.aio import async_to_sync @@ -441,6 +442,7 @@ def partial_update(self, request, *args, **kwargs): task_run.completed_at = timezone.now() task_run.save() + task_run.publish_stream_state_event() # Signal Temporal and post Slack updates after commit to avoid # holding the row lock during external calls. @@ -564,6 +566,7 @@ def set_output(self, request, pk=None, **kwargs): # TODO: Validate output data according to schema for the task type. task_run.output = output_data task_run.save(update_fields=["output", "updated_at"]) + task_run.publish_stream_state_event() self._post_slack_update_for_pr(task_run) return Response(TaskRunDetailSerializer(task_run, context=self.get_serializer_context()).data) @@ -1022,6 +1025,8 @@ def session_logs(self, request, pk=None, **kwargs): response = JsonResponse([], safe=False) response["X-Total-Count"] = "0" response["X-Filtered-Count"] = "0" + response["X-Matching-Count"] = "0" + response["X-Has-More"] = "false" response["Cache-Control"] = "no-cache" response["Server-Timing"] = timer.to_header_string() return response @@ -1045,6 +1050,7 @@ def session_logs(self, request, pk=None, **kwargs): event_types_str = params.get("event_types") exclude_types_str = params.get("exclude_types") limit = params.get("limit", 1000) + offset = params.get("offset", 0) event_types = {t.strip() for t in event_types_str.split(",") if t.strip()} if event_types_str else None exclude_types = {t.strip() for t in exclude_types_str.split(",") if t.strip()} if exclude_types_str else None @@ -1074,37 +1080,58 @@ def session_logs(self, request, pk=None, **kwargs): filtered.append(entry) - if len(filtered) >= limit: - break + matching_count = len(filtered) + page = filtered[offset : offset + limit] + has_more = offset + len(page) < matching_count - response = JsonResponse(filtered, safe=False) + response = JsonResponse(page, safe=False) response["X-Total-Count"] = str(total_count) - response["X-Filtered-Count"] = str(len(filtered)) + response["X-Filtered-Count"] = str(matching_count) + response["X-Matching-Count"] = str(matching_count) + response["X-Has-More"] = "true" if has_more else "false" response["Cache-Control"] = "no-cache" response["Server-Timing"] = timer.to_header_string() return response - @action(detail=True, methods=["get"], url_path="stream", required_scopes=["task:read"]) + @staticmethod + def _format_sse_event(data: dict, *, event_id: str | None = None, event_name: str | None = None) -> bytes: + parts: list[str] = [] + if event_name: + parts.append(f"event: {event_name}") + if event_id: + parts.append(f"id: {event_id}") + parts.append(f"data: {json.dumps(data)}") + return ("\n".join(parts) + "\n\n").encode() + + @action( + detail=True, + methods=["get"], + url_path="stream", + required_scopes=["task:read"], + renderer_classes=[ServerSentEventRenderer], + ) def stream(self, request, pk=None, **kwargs): task_run = cast(TaskRun, self.get_object()) stream_key = get_task_run_stream_key(str(task_run.id)) + last_event_id = request.headers.get("Last-Event-ID") + start_latest = request.GET.get("start") == "latest" + format_sse_event = self._format_sse_event async def async_stream() -> AsyncGenerator[bytes, None]: redis_stream = TaskRunRedisStream(stream_key) if not await redis_stream.wait_for_stream(): - yield b'event: error\ndata: {"error":"Stream not available"}\n\n' + yield format_sse_event({"error": "Stream not available"}, event_name="error") return + + start_id = last_event_id or "0" + if not last_event_id and start_latest: + start_id = await redis_stream.get_latest_stream_id() or "0" try: - async for event in redis_stream.read_stream(): - yield f"data: {json.dumps(event)}\n\n".encode() + async for event_id, event in redis_stream.read_stream_entries(start_id=start_id): + yield format_sse_event(event, event_id=event_id) except TaskRunStreamError as e: - logger.error( - "TaskRunRedisStream error for stream %s: %s", - stream_key, - e, - exc_info=True, - ) - yield b'event: error\ndata: {"error": "Stream error"}\n\n' + logger.error("TaskRunRedisStream error for stream %s: %s", stream_key, e, exc_info=True) + yield format_sse_event({"error": str(e)}, event_name="error") return StreamingHttpResponse( async_stream() if settings.SERVER_GATEWAY_INTERFACE == "ASGI" else async_to_sync(lambda: async_stream()), diff --git a/products/tasks/backend/models.py b/products/tasks/backend/models.py index 9a9c89f82ad4..19c79d5c8597 100644 --- a/products/tasks/backend/models.py +++ b/products/tasks/backend/models.py @@ -4,7 +4,7 @@ import uuid import string import secrets -from typing import TYPE_CHECKING, Literal, Optional +from typing import TYPE_CHECKING, Any, Literal, Optional if TYPE_CHECKING: from products.slack_app.backend.slack_thread import SlackThreadContext @@ -28,6 +28,7 @@ from posthog.temporal.oauth import PosthogMcpScopes from products.tasks.backend.constants import DEFAULT_TRUSTED_DOMAINS +from products.tasks.backend.stream.redis_stream import publish_task_run_stream_event logger = structlog.get_logger(__name__) @@ -199,6 +200,7 @@ def create_run( state=state, branch=branch, ) + task_run.publish_stream_state_event() self.capture_event( "task_run_created", { @@ -513,6 +515,7 @@ def mark_completed(self): self.status = self.Status.COMPLETED self.completed_at = timezone.now() self.save(update_fields=["status", "completed_at"]) + self.publish_stream_state_event() self.capture_event( "task_run_completed", {"duration_seconds": self._duration_seconds()}, @@ -524,6 +527,7 @@ def mark_failed(self, error: str): self.error_message = error self.completed_at = timezone.now() self.save(update_fields=["status", "error_message", "completed_at"]) + self.publish_stream_state_event() self.capture_event( "task_run_failed", { @@ -532,6 +536,26 @@ def mark_failed(self, error: str): }, ) + def build_stream_state_event(self) -> dict[str, Any]: + return { + "type": "task_run_state", + "run_id": str(self.id), + "task_id": str(self.task_id), + "status": self.status, + "stage": self.stage, + "output": self.output, + "branch": self.branch, + "error_message": self.error_message, + "updated_at": self.updated_at.isoformat() if self.updated_at else None, + "completed_at": self.completed_at.isoformat() if self.completed_at else None, + } + + def publish_stream_event(self, event: dict[str, Any]) -> None: + publish_task_run_stream_event(str(self.id), event) + + def publish_stream_state_event(self) -> None: + self.publish_stream_event(self.build_stream_state_event()) + def emit_console_event(self, level: LogLevel, message: str) -> None: """Emit a console-style log event in ACP notification format.""" event = { @@ -548,6 +572,7 @@ def emit_console_event(self, level: LogLevel, message: str) -> None: }, } self.append_log([event]) + self.publish_stream_event(event) def emit_sandbox_output(self, stdout: str, stderr: str, exit_code: int) -> None: """Emit sandbox execution output as ACP notification.""" @@ -566,6 +591,7 @@ def emit_sandbox_output(self, stdout: str, stderr: str, exit_code: int) -> None: }, } self.append_log([event]) + self.publish_stream_event(event) @property def is_terminal(self) -> bool: diff --git a/products/tasks/backend/serializers.py b/products/tasks/backend/serializers.py index 7ca6e304d3b9..c391786f2f83 100644 --- a/products/tasks/backend/serializers.py +++ b/products/tasks/backend/serializers.py @@ -495,6 +495,12 @@ class TaskRunSessionLogsQuerySerializer(serializers.Serializer): max_value=5000, help_text="Maximum number of entries to return (default 1000, max 5000)", ) + offset = serializers.IntegerField( + required=False, + default=0, + min_value=0, + help_text="Zero-based offset into the filtered log entries", + ) class SandboxEnvironmentSerializer(serializers.ModelSerializer): diff --git a/products/tasks/backend/stream/redis_stream.py b/products/tasks/backend/stream/redis_stream.py index 5ddad245aeea..1ff5148c78e0 100644 --- a/products/tasks/backend/stream/redis_stream.py +++ b/products/tasks/backend/stream/redis_stream.py @@ -7,12 +7,15 @@ import structlog import redis.exceptions as redis_exceptions +from asgiref.sync import async_to_sync from posthog.redis import get_async_client logger = structlog.get_logger(__name__) -TASK_RUN_STREAM_MAX_LENGTH = 2000 +# Keep enough live history for users who open an in-progress run late while +# still bounding Redis growth for streams with a one-hour TTL. +TASK_RUN_STREAM_MAX_LENGTH = 20_000 TASK_RUN_STREAM_TIMEOUT = 60 * 60 # 60 minutes TASK_RUN_STREAM_PREFIX = "task-run-stream:" TASK_RUN_STREAM_READ_COUNT = 16 @@ -20,6 +23,12 @@ DATA_KEY = b"data" +def _normalize_stream_id(stream_id: str | bytes) -> str: + if isinstance(stream_id, bytes): + return stream_id.decode("utf-8") + return stream_id + + class TaskRunStreamError(Exception): pass @@ -76,15 +85,33 @@ async def wait_for_stream(self) -> bool: await asyncio.sleep(delay) delay = min(delay + delay_increment, max_delay) + async def get_latest_stream_id(self) -> str | None: + """Return the latest stream ID if the stream has any events.""" + messages = await self._redis_client.xrevrange(self._stream_key, count=1) + if not messages: + return None + stream_id, _message = messages[0] + return _normalize_stream_id(stream_id) + async def read_stream( self, start_id: str = "0", block_ms: int = 100, count: Optional[int] = TASK_RUN_STREAM_READ_COUNT, ) -> AsyncGenerator[dict, None]: + async for _stream_id, data in self.read_stream_entries(start_id=start_id, block_ms=block_ms, count=count): + yield data + + async def read_stream_entries( + self, + start_id: str = "0", + block_ms: int = 100, + count: Optional[int] = TASK_RUN_STREAM_READ_COUNT, + ) -> AsyncGenerator[tuple[str, dict], None]: """Read events from the Redis stream. - Yields parsed JSON dicts. Stops when a complete sentinel is received. + Yields Redis stream IDs and parsed JSON dicts. + Stops when a complete sentinel is received. Raises TaskRunStreamError on error sentinel or timeout. """ current_id = start_id @@ -106,7 +133,8 @@ async def read_stream( for _, stream_messages in messages: for stream_id, message in stream_messages: - current_id = stream_id + normalized_stream_id = _normalize_stream_id(stream_id) + current_id = normalized_stream_id raw = message.get(DATA_KEY, b"") data = json.loads(raw) @@ -117,7 +145,7 @@ async def read_stream( elif status == "error": raise TaskRunStreamError(data.get("error", "Unknown stream error")) else: - yield data + yield normalized_stream_id, data except (TaskRunStreamError, GeneratorExit): raise @@ -128,15 +156,22 @@ async def read_stream( except redis_exceptions.RedisError: raise TaskRunStreamError("Stream read error") - async def write_event(self, event: dict) -> None: - """Write a single event to the stream.""" + async def write_event(self, event: dict) -> str: + """Write a single event to the stream. + + Refreshes TTL on every write (sliding window) so long-running tasks + don't expire mid-stream. This is especially important for the sync + publish path (publish_task_run_stream_event) which bypasses initialize(). + """ raw = json.dumps(event) - await self._redis_client.xadd( + stream_id = await self._redis_client.xadd( self._stream_key, {DATA_KEY: raw}, maxlen=self._max_length, approximate=True, ) + await self._redis_client.expire(self._stream_key, self._timeout) + return _normalize_stream_id(stream_id) async def mark_complete(self) -> None: """Write a completion sentinel to signal end of stream.""" @@ -153,3 +188,21 @@ async def delete_stream(self) -> bool: except Exception: logger.exception("task_run_stream_delete_failed", stream_key=self._stream_key) return False + + +def publish_task_run_stream_event(run_id: str, event: dict) -> str | None: + """Synchronously publish a task-run event to Redis. + + This is intended for sync Django model/view code that needs to mirror + user-visible task-run events into the live SSE stream. + """ + + async def _publish() -> str: + redis_stream = TaskRunRedisStream(get_task_run_stream_key(run_id)) + return await redis_stream.write_event(event) + + try: + return async_to_sync(_publish)() + except Exception: + logger.exception("task_run_stream_publish_failed", run_id=run_id) + return None diff --git a/products/tasks/backend/temporal/process_task/activities/tests/test_update_task_run_status.py b/products/tasks/backend/temporal/process_task/activities/tests/test_update_task_run_status.py index 844db2446761..983a7bbee95a 100644 --- a/products/tasks/backend/temporal/process_task/activities/tests/test_update_task_run_status.py +++ b/products/tasks/backend/temporal/process_task/activities/tests/test_update_task_run_status.py @@ -1,4 +1,5 @@ import pytest +from unittest.mock import patch from asgiref.sync import async_to_sync @@ -45,6 +46,15 @@ def test_updates_error_message(self, activity_environment, test_task_run): test_task_run.refresh_from_db() assert test_task_run.error_message == error_msg + @pytest.mark.django_db + @patch("products.tasks.backend.models.TaskRun.publish_stream_state_event") + def test_publishes_stream_state_event(self, mock_publish_stream_state_event, activity_environment, test_task_run): + input_data = UpdateTaskRunStatusInput(run_id=str(test_task_run.id), status=TaskRun.Status.IN_PROGRESS) + + async_to_sync(activity_environment.run)(update_task_run_status, input_data) + + mock_publish_stream_state_event.assert_called_once() + @pytest.mark.django_db def test_handles_non_existent_task_run(self, activity_environment): non_existent_run_id = "550e8400-e29b-41d4-a716-446655440000" diff --git a/products/tasks/backend/temporal/process_task/activities/update_task_run_status.py b/products/tasks/backend/temporal/process_task/activities/update_task_run_status.py index 6e0d9c42f11b..dc221fa3dfa7 100644 --- a/products/tasks/backend/temporal/process_task/activities/update_task_run_status.py +++ b/products/tasks/backend/temporal/process_task/activities/update_task_run_status.py @@ -43,6 +43,7 @@ def update_task_run_status(input: UpdateTaskRunStatusInput) -> None: task_run.completed_at = timezone.now() task_run.save(update_fields=["status", "error_message", "completed_at", "updated_at"]) + task_run.publish_stream_state_event() log_with_activity_context( "Task run status updated", diff --git a/products/tasks/backend/tests/test_api.py b/products/tasks/backend/tests/test_api.py index f6c11f14ad00..09507002eda9 100644 --- a/products/tasks/backend/tests/test_api.py +++ b/products/tasks/backend/tests/test_api.py @@ -1,6 +1,8 @@ import json import time import uuid +import asyncio +import threading from typing import ClassVar from unittest.mock import MagicMock, patch @@ -19,6 +21,11 @@ from products.tasks.backend.models import CodeInvite, CodeInviteRedemption, SandboxEnvironment, Task, TaskRun from products.tasks.backend.services.connection_token import get_sandbox_jwt_public_key +from products.tasks.backend.stream.redis_stream import ( + TaskRunRedisStream, + get_task_run_stream_key, + publish_task_run_stream_event, +) from products.tasks.backend.temporal.process_task.utils import get_cached_github_user_token # Test RSA private key for JWT tests (RS256) @@ -642,6 +649,22 @@ def test_internal_field_defaults_to_false_on_create(self): class TestTaskRunAPI(BaseTaskAPITest): + @patch("products.tasks.backend.models.TaskRun.publish_stream_state_event") + @patch("products.tasks.backend.api.TaskRunViewSet._signal_workflow_completion") + def test_update_run_status_publishes_stream_state_event(self, mock_signal, mock_publish_stream_state_event): + task = self.create_task() + run = TaskRun.objects.create(task=task, team=self.team, status=TaskRun.Status.IN_PROGRESS) + + response = self.client.patch( + f"/api/projects/@current/tasks/{task.id}/runs/{run.id}/", + {"status": "completed"}, + format="json", + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + + mock_signal.assert_called_once() + mock_publish_stream_state_event.assert_called_once() + @patch("products.tasks.backend.api.TaskRunViewSet._signal_workflow_completion") def test_update_run_status_to_completed_signals_workflow(self, mock_signal): task = self.create_task() @@ -866,6 +889,20 @@ def test_set_output_with_pr_url_posts_slack_update_when_mapping_exists(self, moc self.assertEqual(input_arg.slack_thread_context["channel"], "C123") self.assertEqual(input_arg.slack_thread_context["thread_ts"], "1234.5678") + @patch("products.tasks.backend.models.TaskRun.publish_stream_state_event") + def test_set_output_publishes_stream_state_event(self, mock_publish_stream_state_event): + task = self.create_task() + run = TaskRun.objects.create(task=task, team=self.team, status=TaskRun.Status.IN_PROGRESS) + + response = self.client.patch( + f"/api/projects/@current/tasks/{task.id}/runs/{run.id}/set_output/", + {"output": {"pr_url": "https://github.com/org/repo/pull/1"}}, + format="json", + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + + mock_publish_stream_state_event.assert_called_once() + @patch("products.tasks.backend.temporal.process_task.activities.post_slack_update.post_slack_update") def test_partial_update_with_pr_url_posts_slack_update_when_mapping_exists(self, mock_post_slack_update): from posthog.models.integration import Integration @@ -1460,7 +1497,28 @@ def test_session_logs_limit(self): data = response.json() self.assertEqual(len(data), 3) self.assertEqual(response["X-Total-Count"], "10") - self.assertEqual(response["X-Filtered-Count"], "3") + self.assertEqual(response["X-Filtered-Count"], "10") + self.assertEqual(response["X-Matching-Count"], "10") + self.assertEqual(response["X-Has-More"], "true") + + def test_session_logs_offset(self): + task = self.create_task() + run = TaskRun.objects.create(task=task, team=self.team, status=TaskRun.Status.IN_PROGRESS) + + entries = [self._make_session_update_entry("user_message", f"2026-01-01T00:00:{i:02d}Z") for i in range(6)] + self._seed_log(task, run, entries) + + response = self.client.get(self._events_url(task, run) + "?limit=2&offset=2") + self.assertEqual(response.status_code, status.HTTP_200_OK) + + data = response.json() + self.assertEqual(len(data), 2) + self.assertEqual(data[0]["timestamp"], "2026-01-01T00:00:02Z") + self.assertEqual(data[1]["timestamp"], "2026-01-01T00:00:03Z") + self.assertEqual(response["X-Total-Count"], "6") + self.assertEqual(response["X-Filtered-Count"], "6") + self.assertEqual(response["X-Matching-Count"], "6") + self.assertEqual(response["X-Has-More"], "true") def test_session_logs_combined_filters(self): """Test after + event_types + limit together.""" @@ -1524,6 +1582,111 @@ def test_session_logs_server_timing_header(self): self.assertIn("filter", response["Server-Timing"]) +class TestTaskRunStreamAPI(BaseTaskAPITest): + def _stream_url(self, task: Task, run: TaskRun, suffix: str = "") -> str: + return f"/api/projects/@current/tasks/{task.id}/runs/{run.id}/stream/{suffix}" + + def _mark_stream_complete(self, run: TaskRun) -> None: + async def _mark() -> None: + redis_stream = TaskRunRedisStream(get_task_run_stream_key(str(run.id))) + await redis_stream.mark_complete() + + asyncio.run(_mark()) + + def _read_stream_ids(self, run: TaskRun) -> list[str]: + async def _read() -> list[str]: + redis_stream = TaskRunRedisStream(get_task_run_stream_key(str(run.id))) + messages = await redis_stream._redis_client.xrange(get_task_run_stream_key(str(run.id))) + return [msg_id.decode("utf-8") if isinstance(msg_id, bytes) else msg_id for msg_id, _ in messages] + + return asyncio.run(_read()) + + def _collect_sse_events(self, response) -> list[dict]: + content = b"".join(response.streaming_content).decode("utf-8") + events: list[dict] = [] + for block in [part.strip() for part in content.split("\n\n") if part.strip()]: + event_name = None + event_id = None + data = None + for line in block.splitlines(): + if line.startswith("event: "): + event_name = line[7:] + elif line.startswith("id: "): + event_id = line[4:] + elif line.startswith("data: "): + data = json.loads(line[6:]) + events.append({"event": event_name, "id": event_id, "data": data}) + return events + + def test_stream_replays_events_with_ids(self): + task = self.create_task() + run = task.create_run() + run.emit_console_event("info", "hello") + self._mark_stream_complete(run) + + response = self.client.get(self._stream_url(task, run), HTTP_ACCEPT="text/event-stream") + + self.assertEqual(response.status_code, status.HTTP_200_OK) + events = self._collect_sse_events(response) + self.assertGreaterEqual(len(events), 2) + self.assertEqual(events[0]["data"]["type"], "task_run_state") + self.assertIsNotNone(events[0]["id"]) + self.assertEqual(events[1]["data"]["notification"]["method"], "_posthog/console") + + def test_stream_resumes_from_last_event_id(self): + task = self.create_task() + run = task.create_run() + run.emit_console_event("info", "first") + run.emit_sandbox_output("stdout", "stderr", 0) + stream_ids = self._read_stream_ids(run) + self._mark_stream_complete(run) + + response = self.client.get( + self._stream_url(task, run), + HTTP_LAST_EVENT_ID=stream_ids[1], + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + events = self._collect_sse_events(response) + self.assertEqual(len(events), 1) + self.assertEqual(events[0]["data"]["notification"]["method"], "_posthog/sandbox_output") + + def test_stream_start_latest_only_yields_new_events(self): + task = self.create_task() + run = task.create_run() + + def publish_future_events() -> None: + time.sleep(0.05) + publish_task_run_stream_event( + str(run.id), + { + "type": "notification", + "timestamp": "2026-01-01T00:00:01Z", + "notification": { + "jsonrpc": "2.0", + "method": "_posthog/console", + "params": { + "sessionId": str(run.id), + "level": "info", + "message": "late hello", + }, + }, + }, + ) + self._mark_stream_complete(run) + + publisher = threading.Thread(target=publish_future_events) + publisher.start() + response = self.client.get(self._stream_url(task, run) + "?start=latest") + events = self._collect_sse_events(response) + publisher.join() + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertGreaterEqual(len(events), 1) + self.assertTrue(all(event["data"]["notification"]["method"] == "_posthog/console" for event in events), events) + self.assertEqual(events[-1]["data"]["notification"]["params"]["message"], "late hello") + + class TestTasksAPIPermissions(BaseTaskAPITest): def setUp(self): super().setUp() diff --git a/products/tasks/backend/tests/test_models.py b/products/tasks/backend/tests/test_models.py index d6f4df9b2bae..adaef5af8310 100644 --- a/products/tasks/backend/tests/test_models.py +++ b/products/tasks/backend/tests/test_models.py @@ -372,6 +372,17 @@ def test_str_representation(self): ) self.assertEqual(str(run), "Run for Test Task - In Progress") + @patch("products.tasks.backend.models.publish_task_run_stream_event") + def test_create_run_seeds_stream_state_event(self, mock_publish_stream_event): + run = self.task.create_run(branch="main") + + mock_publish_stream_event.assert_called_once() + call_args = mock_publish_stream_event.call_args + self.assertEqual(call_args.args[0], str(run.id)) + self.assertEqual(call_args.args[1]["type"], "task_run_state") + self.assertEqual(call_args.args[1]["status"], TaskRun.Status.QUEUED) + self.assertEqual(call_args.args[1]["branch"], "main") + def test_append_log_to_empty(self): run = TaskRun.objects.create( task=self.task, @@ -567,6 +578,20 @@ def test_emit_console_event_acp_format(self): self.assertEqual(entry["notification"]["params"]["level"], "info") self.assertEqual(entry["notification"]["params"]["message"], "Test message") + @patch("products.tasks.backend.models.publish_task_run_stream_event") + def test_emit_console_event_publishes_to_stream(self, mock_publish_stream_event): + run = TaskRun.objects.create( + task=self.task, + team=self.team, + ) + + run.emit_console_event("info", "Test message") + + mock_publish_stream_event.assert_called_once() + call_args = mock_publish_stream_event.call_args + self.assertEqual(call_args.args[0], str(run.id)) + self.assertEqual(call_args.args[1]["notification"]["method"], "_posthog/console") + @parameterized.expand( [ (0, "stdout output", "stderr output"), @@ -595,6 +620,20 @@ def test_emit_sandbox_output_acp_format(self, exit_code, stdout, stderr): self.assertEqual(entry["notification"]["params"]["stderr"], stderr) self.assertEqual(entry["notification"]["params"]["exitCode"], exit_code) + @patch("products.tasks.backend.models.publish_task_run_stream_event") + def test_emit_sandbox_output_publishes_to_stream(self, mock_publish_stream_event): + run = TaskRun.objects.create( + task=self.task, + team=self.team, + ) + + run.emit_sandbox_output("stdout output", "stderr output", 0) + + mock_publish_stream_event.assert_called_once() + call_args = mock_publish_stream_event.call_args + self.assertEqual(call_args.args[0], str(run.id)) + self.assertEqual(call_args.args[1]["notification"]["method"], "_posthog/sandbox_output") + @parameterized.expand( [ ("background_mode", {"mode": "background"}, True), diff --git a/products/tasks/frontend/generated/api.schemas.ts b/products/tasks/frontend/generated/api.schemas.ts index 59a3277a35f7..8a6d5df701e2 100644 --- a/products/tasks/frontend/generated/api.schemas.ts +++ b/products/tasks/frontend/generated/api.schemas.ts @@ -770,6 +770,11 @@ export type TasksRunsSessionLogsRetrieveParams = { * @maximum 5000 */ limit?: number + /** + * Zero-based offset into the filtered log entries + * @minimum 0 + */ + offset?: number } export type TasksRepositoryReadinessRetrieveParams = { diff --git a/services/mcp/src/api/generated.ts b/services/mcp/src/api/generated.ts index e5b9306a5568..bc7113b17864 100644 --- a/services/mcp/src/api/generated.ts +++ b/services/mcp/src/api/generated.ts @@ -35978,6 +35978,11 @@ export namespace Schemas { * @maximum 5000 */ limit?: number; + /** + * Zero-based offset into the filtered log entries + * @minimum 0 + */ + offset?: number; }; export type TasksRepositoryReadinessRetrieveParams = {