From f33cef54d62644b3b23f437c1618d9c40ec38118 Mon Sep 17 00:00:00 2001 From: Ryan Sutton Date: Wed, 1 Apr 2026 17:26:20 -0600 Subject: [PATCH 01/10] Wire up output schema --- products/tasks/backend/api.py | 12 +++++++++-- products/tasks/backend/models.py | 35 +++++++++++++++++++++++++++++++- 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/products/tasks/backend/api.py b/products/tasks/backend/api.py index fa416696acbe..52a0ce092d44 100644 --- a/products/tasks/backend/api.py +++ b/products/tasks/backend/api.py @@ -14,6 +14,7 @@ from django.utils import timezone import requests as http_requests +import jsonschema import posthoganalytics from drf_spectacular.utils import OpenApiResponse, extend_schema from rest_framework import status, viewsets @@ -507,7 +508,7 @@ def perform_create(self, serializer): ) def set_output(self, request, pk=None, **kwargs): task_run = cast(TaskRun, self.get_object()) - + task = cast(Task, task_run.task) output_data = request.data.get("output", {}) if not isinstance(output_data, dict): return Response( @@ -515,7 +516,14 @@ def set_output(self, request, pk=None, **kwargs): status=status.HTTP_400_BAD_REQUEST, ) - # TODO: Validate output data according to schema for the task type. + if task.json_schema: + try: + jsonschema.validate(instance=output_data, schema=task.json_schema) + except jsonschema.ValidationError as e: + return Response( + ErrorResponseSerializer({"error": f"Output validation error: {e.message}"}).data, + status=status.HTTP_400_BAD_REQUEST, + ) task_run.output = output_data task_run.save(update_fields=["output", "updated_at"]) self._post_slack_update_for_pr(task_run) diff --git a/products/tasks/backend/models.py b/products/tasks/backend/models.py index 0f16f4a1bf17..2712fbd9e4f4 100644 --- a/products/tasks/backend/models.py +++ b/products/tasks/backend/models.py @@ -6,6 +6,11 @@ import secrets from typing import TYPE_CHECKING, Literal, Optional +from django.db.models.signals import post_save +from django.dispatch import receiver + +from pydantic import BaseModel + if TYPE_CHECKING: from products.slack_app.backend.slack_thread import SlackThreadContext @@ -224,6 +229,7 @@ def create_and_run( start_workflow: bool = True, posthog_mcp_scopes: PosthogMcpScopes = "full", branch: str | None = None, + output_schema: BaseModel | None = None, ) -> "Task": from products.tasks.backend.temporal.client import execute_task_processing_workflow @@ -234,7 +240,6 @@ def create_and_run( github_integration = Integration.objects.filter(team=team, kind="github").first() if not github_integration: raise ValueError(f"Team {team.id} does not have a GitHub integration") - task = Task.objects.create( team=team, title=title, @@ -243,6 +248,7 @@ def create_and_run( created_by=created_by, github_integration=github_integration, repository=repository, + json_schema=output_schema.model_json_schema() if output_schema else None, ) extra_state: dict[str, str] | None = None @@ -488,6 +494,20 @@ def mark_completed(self): {"duration_seconds": self._duration_seconds()}, ) + def track_structured_result(self): + """Track a structured result event with properties from the run output.""" + if not self.output: + return + + try: + self.capture_event("task_run_structured_result", {"result": self.output}) + except Exception as e: + logger.warning( + "task_run.track_structured_result_failed", + task_run_id=str(self.id), + error=str(e), + ) + def mark_failed(self, error: str): """Mark the progress as failed with an error message.""" self.status = self.Status.FAILED @@ -796,3 +816,16 @@ class Meta: def __str__(self): return f"{self.user} redeemed {self.invite_code}" + + +@receiver(post_save, sender=TaskRun) +def track_task_run_completion(sender, instance: TaskRun, created: bool, **kwargs): + try: + if not created and instance.state == TaskRun.Status.COMPLETED and instance.output and instance.task.json_schema: + instance.track_structured_result() + except Exception as e: + logger.warning( + "task_run.track_structured_result_failed", + task_run_id=str(instance.id), + error=str(e), + ) From 89139708ae4419ea3ffcf934fec07be5c8958009 Mon Sep 17 00:00:00 2001 From: Ryan Sutton Date: Thu, 2 Apr 2026 19:13:22 -0600 Subject: [PATCH 02/10] Modify output_schema to accept either a pydantic model or json schema --- products/tasks/backend/models.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/products/tasks/backend/models.py b/products/tasks/backend/models.py index df7d3572e26f..6c7f28b4b7e4 100644 --- a/products/tasks/backend/models.py +++ b/products/tasks/backend/models.py @@ -39,6 +39,12 @@ LogLevel = Literal["debug", "info", "warn", "error"] +def resolve_schema(schema: BaseModel | dict) -> dict: + if isinstance(schema, BaseModel): + return schema.model_json_schema() + return schema + + class Task(DeletedMetaFields, models.Model): class OriginProduct(models.TextChoices): ERROR_TRACKING = "error_tracking", "Error Tracking" @@ -231,7 +237,7 @@ def create_and_run( branch: str | None = None, sandbox_environment_id: str | None = None, internal: bool = False, - output_schema: BaseModel | None = None, + output_schema: BaseModel | dict | None = None, ) -> "Task": from products.tasks.backend.temporal.client import execute_task_processing_workflow @@ -258,7 +264,7 @@ def create_and_run( github_integration=github_integration, repository=repository, internal=internal, - json_schema=output_schema.model_json_schema() if output_schema else None, + json_schema=resolve_schema(output_schema) if output_schema else None, ) extra_state: dict[str, str] | None = None From 5df7b9017417d75e11250277ede383c241702b10 Mon Sep 17 00:00:00 2001 From: Ryan Sutton Date: Thu, 2 Apr 2026 19:15:45 -0600 Subject: [PATCH 03/10] Push json schema to agent configuration --- .../process_task/activities/get_task_processing_context.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/products/tasks/backend/temporal/process_task/activities/get_task_processing_context.py b/products/tasks/backend/temporal/process_task/activities/get_task_processing_context.py index 23302bbbc1ce..b735dd257a79 100644 --- a/products/tasks/backend/temporal/process_task/activities/get_task_processing_context.py +++ b/products/tasks/backend/temporal/process_task/activities/get_task_processing_context.py @@ -38,6 +38,7 @@ class TaskProcessingContext: _branch: str | None = None sandbox_environment_name: str | None = None allowed_domains: list[str] | None = None + json_schema: dict | None = None @property def mode(self) -> str: @@ -164,4 +165,5 @@ def get_task_processing_context(input: GetTaskProcessingContextInput) -> TaskPro _branch=task_run.branch, sandbox_environment_name=sandbox_environment_name, allowed_domains=allowed_domains, + json_schema=task.json_schema, ) From b27092892a6639f9e8be31b0419c64b9e287e7ca Mon Sep 17 00:00:00 2001 From: Ryan Sutton Date: Mon, 6 Apr 2026 18:30:45 -0600 Subject: [PATCH 04/10] Use validated_request and implement task completion on output --- products/tasks/backend/api.py | 11 ++-- products/tasks/backend/serializers.py | 77 +++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 7 deletions(-) diff --git a/products/tasks/backend/api.py b/products/tasks/backend/api.py index 52a0ce092d44..f1b5ea71e63a 100644 --- a/products/tasks/backend/api.py +++ b/products/tasks/backend/api.py @@ -58,6 +58,7 @@ TaskRunRelayMessageRequestSerializer, TaskRunRelayMessageResponseSerializer, TaskRunSessionLogsQuerySerializer, + TaskRunSetOutputRequestSerializer, TaskRunUpdateSerializer, TaskSerializer, ) @@ -492,7 +493,7 @@ def perform_create(self, serializer): serializer.save(team=self.team, task=task) @validated_request( - request_serializer=None, + request_serializer=TaskRunSetOutputRequestSerializer, responses={ 200: OpenApiResponse(response=TaskRunDetailSerializer, description="Run with updated output"), 404: OpenApiResponse(description="Run not found"), @@ -509,12 +510,7 @@ def perform_create(self, serializer): def set_output(self, request, pk=None, **kwargs): task_run = cast(TaskRun, self.get_object()) task = cast(Task, task_run.task) - output_data = request.data.get("output", {}) - if not isinstance(output_data, dict): - return Response( - ErrorResponseSerializer({"error": "output must be a dictionary"}).data, - status=status.HTTP_400_BAD_REQUEST, - ) + output_data = request.validated_request["output"] if task.json_schema: try: @@ -526,6 +522,7 @@ def set_output(self, request, pk=None, **kwargs): ) task_run.output = output_data task_run.save(update_fields=["output", "updated_at"]) + self._signal_workflow_completion(task_run, TaskRun.Status.COMPLETED, None) self._post_slack_update_for_pr(task_run) return Response(TaskRunDetailSerializer(task_run, context=self.get_serializer_context()).data) diff --git a/products/tasks/backend/serializers.py b/products/tasks/backend/serializers.py index c4152f6b5f05..57147ad373f2 100644 --- a/products/tasks/backend/serializers.py +++ b/products/tasks/backend/serializers.py @@ -217,6 +217,12 @@ def update(self, instance, validated_data): return super().update(instance, validated_data) +class TaskRunSetOutputRequestSerializer(serializers.Serializer): + output = serializers.JSONField( + help_text="Output data from the run. Validated against the task's json_schema if one is set." + ) + + class ErrorResponseSerializer(serializers.Serializer): error = serializers.CharField(help_text="Error message") @@ -354,6 +360,77 @@ class ConnectionTokenResponseSerializer(serializers.Serializer): token = serializers.CharField(help_text="JWT token for authenticating with the sandbox") +class CreateAndRunRequestSerializer(serializers.Serializer): + """Request body for creating a task and immediately running it.""" + + title = serializers.CharField( + max_length=255, + required=False, + allow_blank=True, + default="", + help_text="Task title. If blank, one is generated from the description.", + ) + description = serializers.CharField( + required=True, + help_text="Task description / prompt for the agent.", + ) + origin_product = serializers.ChoiceField( + choices=Task.OriginProduct.choices, + required=False, + default=Task.OriginProduct.USER_CREATED, + help_text="Which product originated this task.", + ) + repository = serializers.CharField( + max_length=255, + required=False, + allow_blank=True, + allow_null=True, + default=None, + help_text="GitHub repository in 'org/repo' format.", + ) + mode = serializers.ChoiceField( + choices=["interactive", "background"], + required=False, + default="background", + help_text="Execution mode for the run.", + ) + branch = serializers.CharField( + required=False, + allow_null=True, + default=None, + max_length=255, + help_text="Git branch to checkout in the sandbox.", + ) + create_pr = serializers.BooleanField( + required=False, + default=True, + help_text="Whether the agent should create a pull request.", + ) + json_schema = serializers.JSONField( + required=False, + default=None, + help_text="JSON Schema (or Pydantic model schema) for structured output validation.", + ) + internal = serializers.BooleanField( + required=False, + default=False, + help_text="If true, the task is for internal use and hidden from end users.", + ) + sandbox_environment_id = serializers.UUIDField( + required=False, + default=None, + help_text="Optional sandbox environment to apply for this run.", + ) + + def validate_repository(self, value): + if not value: + return value + parts = value.split("/") + if len(parts) != 2 or not parts[0] or not parts[1]: + raise serializers.ValidationError("Repository must be in the format organization/repository") + return value.lower() + + class TaskRunCreateRequestSerializer(serializers.Serializer): """Request body for creating a new task run""" From 74220f81f3db4919c8ec15dc2befd226523484ba Mon Sep 17 00:00:00 2001 From: "tests-posthog[bot]" <250237707+tests-posthog[bot]@users.noreply.github.com> Date: Tue, 7 Apr 2026 00:39:58 +0000 Subject: [PATCH 05/10] chore: update OpenAPI generated types --- products/tasks/frontend/generated/api.schemas.ts | 5 +++++ products/tasks/frontend/generated/api.ts | 4 ++++ services/mcp/src/api/generated.ts | 5 +++++ 3 files changed, 14 insertions(+) diff --git a/products/tasks/frontend/generated/api.schemas.ts b/products/tasks/frontend/generated/api.schemas.ts index 4d21a3d118b7..56692437dbca 100644 --- a/products/tasks/frontend/generated/api.schemas.ts +++ b/products/tasks/frontend/generated/api.schemas.ts @@ -568,6 +568,11 @@ export interface TaskRunRelayMessageResponseApi { relay_id?: string } +export interface PatchedTaskRunSetOutputRequestApi { + /** Output data from the run. Validated against the task's json_schema if one is set. */ + output?: unknown +} + /** * * `needs_setup` - needs_setup * `detected` - detected diff --git a/products/tasks/frontend/generated/api.ts b/products/tasks/frontend/generated/api.ts index 202b61e9fb31..11146aaae526 100644 --- a/products/tasks/frontend/generated/api.ts +++ b/products/tasks/frontend/generated/api.ts @@ -15,6 +15,7 @@ import type { PaginatedTaskListApi, PaginatedTaskRunDetailListApi, PatchedTaskApi, + PatchedTaskRunSetOutputRequestApi, PatchedTaskRunUpdateApi, RepositoryReadinessResponseApi, SandboxEnvironmentApi, @@ -556,11 +557,14 @@ export const tasksRunsSetOutputPartialUpdate = async ( projectId: string, taskId: string, id: string, + patchedTaskRunSetOutputRequestApi: PatchedTaskRunSetOutputRequestApi, options?: RequestInit ): Promise => { return apiMutator(getTasksRunsSetOutputPartialUpdateUrl(projectId, taskId, id), { ...options, method: 'PATCH', + headers: { 'Content-Type': 'application/json', ...options?.headers }, + body: JSON.stringify(patchedTaskRunSetOutputRequestApi), }) } diff --git a/services/mcp/src/api/generated.ts b/services/mcp/src/api/generated.ts index fbca7f68539d..a4223da1be07 100644 --- a/services/mcp/src/api/generated.ts +++ b/services/mcp/src/api/generated.ts @@ -25694,6 +25694,11 @@ export namespace Schemas { readonly created_by?: UserBasic; } + export interface PatchedTaskRunSetOutputRequest { + /** Output data from the run. Validated against the task's json_schema if one is set. */ + output?: unknown; + } + /** * * `not_started` - not_started * `queued` - queued From 209584f050ca48c685e3eb6c47899e196abf0157 Mon Sep 17 00:00:00 2001 From: Ryan Sutton Date: Mon, 6 Apr 2026 18:49:00 -0600 Subject: [PATCH 06/10] Remove external serializer --- products/tasks/backend/serializers.py | 71 --------------------------- 1 file changed, 71 deletions(-) diff --git a/products/tasks/backend/serializers.py b/products/tasks/backend/serializers.py index 57147ad373f2..2d55e3862a3a 100644 --- a/products/tasks/backend/serializers.py +++ b/products/tasks/backend/serializers.py @@ -360,77 +360,6 @@ class ConnectionTokenResponseSerializer(serializers.Serializer): token = serializers.CharField(help_text="JWT token for authenticating with the sandbox") -class CreateAndRunRequestSerializer(serializers.Serializer): - """Request body for creating a task and immediately running it.""" - - title = serializers.CharField( - max_length=255, - required=False, - allow_blank=True, - default="", - help_text="Task title. If blank, one is generated from the description.", - ) - description = serializers.CharField( - required=True, - help_text="Task description / prompt for the agent.", - ) - origin_product = serializers.ChoiceField( - choices=Task.OriginProduct.choices, - required=False, - default=Task.OriginProduct.USER_CREATED, - help_text="Which product originated this task.", - ) - repository = serializers.CharField( - max_length=255, - required=False, - allow_blank=True, - allow_null=True, - default=None, - help_text="GitHub repository in 'org/repo' format.", - ) - mode = serializers.ChoiceField( - choices=["interactive", "background"], - required=False, - default="background", - help_text="Execution mode for the run.", - ) - branch = serializers.CharField( - required=False, - allow_null=True, - default=None, - max_length=255, - help_text="Git branch to checkout in the sandbox.", - ) - create_pr = serializers.BooleanField( - required=False, - default=True, - help_text="Whether the agent should create a pull request.", - ) - json_schema = serializers.JSONField( - required=False, - default=None, - help_text="JSON Schema (or Pydantic model schema) for structured output validation.", - ) - internal = serializers.BooleanField( - required=False, - default=False, - help_text="If true, the task is for internal use and hidden from end users.", - ) - sandbox_environment_id = serializers.UUIDField( - required=False, - default=None, - help_text="Optional sandbox environment to apply for this run.", - ) - - def validate_repository(self, value): - if not value: - return value - parts = value.split("/") - if len(parts) != 2 or not parts[0] or not parts[1]: - raise serializers.ValidationError("Repository must be in the format organization/repository") - return value.lower() - - class TaskRunCreateRequestSerializer(serializers.Serializer): """Request body for creating a new task run""" From 990122608a201c197040edcf240b6eb89d7cd577 Mon Sep 17 00:00:00 2001 From: ryans-posthog Date: Mon, 6 Apr 2026 19:30:44 -0600 Subject: [PATCH 07/10] Update products/tasks/backend/models.py Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- products/tasks/backend/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/products/tasks/backend/models.py b/products/tasks/backend/models.py index 30433b341309..b32a48c1fba8 100644 --- a/products/tasks/backend/models.py +++ b/products/tasks/backend/models.py @@ -858,7 +858,7 @@ def __str__(self): @receiver(post_save, sender=TaskRun) def track_task_run_completion(sender, instance: TaskRun, created: bool, **kwargs): try: - if not created and instance.state == TaskRun.Status.COMPLETED and instance.output and instance.task.json_schema: + if not created and instance.status == TaskRun.Status.COMPLETED and instance.output and instance.task.json_schema: instance.track_structured_result() except Exception as e: logger.warning( From ad072b0de58939e0e87fffcfd6970bfaaaa7929f Mon Sep 17 00:00:00 2001 From: Ryan Sutton Date: Mon, 6 Apr 2026 19:32:09 -0600 Subject: [PATCH 08/10] Address issue with double exception handling --- products/tasks/backend/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/products/tasks/backend/models.py b/products/tasks/backend/models.py index b32a48c1fba8..2342a407555d 100644 --- a/products/tasks/backend/models.py +++ b/products/tasks/backend/models.py @@ -862,7 +862,7 @@ def track_task_run_completion(sender, instance: TaskRun, created: bool, **kwargs instance.track_structured_result() except Exception as e: logger.warning( - "task_run.track_structured_result_failed", + "task_run.track_task_run_completion_failed", task_run_id=str(instance.id), error=str(e), ) From b3b8ce16e138e764c14035bcef25c6f13c811ea2 Mon Sep 17 00:00:00 2001 From: Ryan Sutton Date: Mon, 6 Apr 2026 19:33:56 -0600 Subject: [PATCH 09/10] Fix indention issue --- products/tasks/backend/models.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/products/tasks/backend/models.py b/products/tasks/backend/models.py index 2342a407555d..9dc9c6882e9b 100644 --- a/products/tasks/backend/models.py +++ b/products/tasks/backend/models.py @@ -858,7 +858,12 @@ def __str__(self): @receiver(post_save, sender=TaskRun) def track_task_run_completion(sender, instance: TaskRun, created: bool, **kwargs): try: - if not created and instance.status == TaskRun.Status.COMPLETED and instance.output and instance.task.json_schema: + if ( + not created + and instance.status == TaskRun.Status.COMPLETED + and instance.output + and instance.task.json_schema + ): instance.track_structured_result() except Exception as e: logger.warning( From 0a74873b6bbaa264d6df0f4859c8add8255d33ce Mon Sep 17 00:00:00 2001 From: ryans-posthog Date: Tue, 7 Apr 2026 00:18:44 -0600 Subject: [PATCH 10/10] Update products/tasks/backend/api.py --- products/tasks/backend/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/products/tasks/backend/api.py b/products/tasks/backend/api.py index f1b5ea71e63a..df87f0cc0682 100644 --- a/products/tasks/backend/api.py +++ b/products/tasks/backend/api.py @@ -510,7 +510,7 @@ def perform_create(self, serializer): def set_output(self, request, pk=None, **kwargs): task_run = cast(TaskRun, self.get_object()) task = cast(Task, task_run.task) - output_data = request.validated_request["output"] + output_data = request.validated_data["output"] if task.json_schema: try: