Skip to content

Better stop flow for AI bridge#101

Open
batuhan wants to merge 7 commits intomainfrom
batuhan/stop
Open

Better stop flow for AI bridge#101
batuhan wants to merge 7 commits intomainfrom
batuhan/stop

Conversation

@batuhan
Copy link
Copy Markdown
Member

@batuhan batuhan commented Apr 5, 2026

No description provided.

batuhan added 4 commits April 5, 2026 15:54
Refactor and extend abort/stop behavior to support targeted user stops. Introduces userStopRequest/plan/result types and handleUserStop/executeUserStopPlan to resolve and execute room-wide, active-turn, or queued-turn stops. Replaces direct abortRoom calls with handleUserStop in command and message handlers.

Adds assistantStopMetadata and propagates it into streamingState and UI message metadata (including response status mapping). Tracks room run targets (source/initial events) and binds streaming state to room runs. Implements queue operations to drain or remove pending items by source event and finalizes stopped queue items, preserving ACK reaction removal and session notifications. Adjusts streaming finish logic to treat cancelled vs stop reasons appropriately. Includes unit tests for plan resolution, queued removal, and metadata emission.
Move pendingQueueHasSourceEvent into pending_queue.go and implement proper locking (pendingQueuesMu and queue.mu) to safely inspect queue items. Simplify drainPendingQueue to delete the queue map entry and return its items directly. Remove the duplicate helper from abort_helpers.go. Improve formatAbortNotice by capitalizing each sentence part and joining them with ". " for clearer messages. Remove redundant run field assignments in roomRunTarget. Adjust finishStreamingWithFailure to fall through from "cancelled" to "stop" so cancelled streams call End like stop cases and remove some redundant nil checks. These changes tidy concurrency handling, clarify abort messaging, and simplify streaming error handling.
Change streamingState.stop to an atomic.Pointer[assistantStopMetadata] and update all callsites to use .Load()/.Store() to avoid races. Fix pending queue drain to lock the queue when accessing items to prevent concurrent access. Improve room run logic to prefer current state.turn when present and store stop metadata atomically when marking a run stopped. Use utf8 + unicode for correct Unicode-aware capitalization in abort notices and update tests to store stop metadata via the new atomic API.
Move AckReactionRemoveAfter checks into removePendingAckReactions and remove duplicate guarded calls throughout the codebase, so callers simply invoke the removal and the function decides whether to act. Simplify pending queue management by replacing clearPendingQueue usage with drainPendingQueue and delete the pendingQueueHasSourceEvent helper. Adjust stop-plan handling to speculatively return queued stops and add a fallback in executeUserStopPlan to convert a queued plan to no-match if nothing was drained. Update tests to reflect the new speculative behavior and the fallback.
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 5, 2026

📝 Walkthrough

Summary by CodeRabbit

  • New Features

    • Flexible user stop flow: stop specific queued turns, active turns, sub-agents, or entire rooms.
    • Stop metadata added to UI/stream messages (reason, scope, target, requested-via).
  • Bug Fixes

    • More consistent removal/draining of pending queue items and ack reactions across flows.
    • Stream finalization and response-status now reliably reflect cancelled/stopped outcomes.
  • Tests

    • New unit tests for stop resolution, execution, queue mutations, and UI metadata.

Walkthrough

Implements a structured user-stop pipeline: resolve a stop plan, execute targeted cancellations for active runs, queued turns, or whole rooms, propagate stop metadata into streaming state and UI metadata, add queue-drain/removal helpers, and add unit tests for stop behaviors. (34 words)

Changes

Cohort / File(s) Summary
Core stop pipeline
bridges/ai/abort_helpers.go
Replace ad-hoc abort with typed stop planning/execution (userStopRequest, userStopPlan, userStopResult), add resolveUserStopPlan, executeUserStopPlan, finalizeStoppedQueueItems, handleUserStop, buildStopMetadata; adjust when Meta.AbortedLastRun, savePortalQuiet(...,"stop"), and notifySessionMutation are set.
Stop tests
bridges/ai/abort_helpers_test.go
Add unit tests covering plan resolution and execution for room-wide/active/queued/no-match cases, queue removal/drain behavior, and UI metadata metadata.stop and response_status for cancelled turns.
Request routing
bridges/ai/commands_parity.go, bridges/ai/handlematrix.go
Stop command and text-trigger now construct userStopRequest and call handleUserStop; system notices are formatted from userStopResult instead of old abortRoom.
Pending queue & helpers
bridges/ai/pending_queue.go, bridges/ai/client.go, bridges/ai/internal_dispatch.go
Add pendingMessage.sourceEventID(), drainPendingQueue, removePendingQueueBySourceEvent; change clearPendingQueue signature to accept ctx; revise snapshot copying; always call removePendingAckReactions at dispatch/cleanup sites and tighten its guard.
Room run & streaming state
bridges/ai/room_runs.go, bridges/ai/streaming_state.go, bridges/ai/streaming_init.go
Extend roomRunState with bound *streamingState, stop *assistantStopMetadata, and cached target IDs; add bindRoomRunState, roomRunTarget, markRoomRunStopped; bind run state during streaming init; add atomic stop to streamingState.
Subagent stop flow
bridges/ai/subagent_registry.go
stopSubagentRuns now accepts ctx and uses drainPendingQueue + finalizeStoppedQueueItems to compute stopped counts instead of snapshot+clear.
Streaming finish & status
bridges/ai/streaming_error_handling.go, bridges/ai/turn_data.go, bridges/ai/streaming_error_handling_test.go, bridges/ai/turn_data_test.go
Refactor finish logic to map "cancelled""stop" when stop metadata present; canonicalResponseStatus returns "cancelled" immediately if state.stop set. Add tests asserting cancelled/stop precedence and behavior.
UI metadata
bridges/ai/ui_message_metadata.go
Add assistantStopMetadata and include as optional Stop in assistantTurnMetadata; populate from state.stop.Load() in metadata builder.

Sequence Diagram

sequenceDiagram
    participant User as User
    participant Handler as Stop Handler
    participant Client as AIClient
    participant Queue as Pending Queue
    participant Streaming as Streaming State
    participant UI as UI Metadata

    User->>Handler: trigger stop (command or text)
    Handler->>Client: handleUserStop(userStopRequest)
    Client->>Client: resolveUserStopPlan()
    alt room-wide
        Client->>Client: mark room run stopped / cancel run
        Client->>Queue: drainPendingQueue(roomID)
    else active-turn
        Client->>Client: cancel active run
        Client->>Streaming: markRoomRunStopped(metadata)
    else queued-turn
        Client->>Queue: removePendingQueueBySourceEvent(sourceEventID)
    else no-match
        Client->>Client: return no-match result
    end
    Client->>Client: finalizeStoppedQueueItems()
    Client->>Streaming: store stop metadata (atomic)
    Client->>UI: buildStreamUIMessage with stop metadata
    Handler->>User: send formatted abort notice
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Description check ❓ Inconclusive No pull request description was provided by the author, making it impossible to evaluate whether it relates to the changeset. Add a pull request description explaining the stop flow improvements, the new planning/execution pipeline, and the rationale for the refactoring.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Better stop flow for AI bridge' directly and specifically describes the primary change: refactoring the stop/abort logic in the AI bridge with a structured planning/execution pipeline.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch batuhan/stop

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@bridges/ai/abort_helpers.go`:
- Around line 159-165: The active-stop branch can race with the run disappearing
so you must not report “Stopped that turn” when nothing was persisted or
cancelled; after calling oc.markRoomRunStopped and oc.cancelRoomRun in the
stopPlanKindActive branch, re-check whether the target run still exists (e.g.,
re-run resolveUserStopPlan for the same plan/roomID) or, if
markRoomRunStopped/cancelRoomRun returned false/no-op, set result.Plan.Kind =
stopPlanKindNoMatch so formatAbortNotice will produce the downgraded message;
update the stopPlanKindActive branch around markRoomRunStopped, cancelRoomRun,
and result.SubagentsStopped to flip to stopPlanKindNoMatch on no-op/race.

In `@bridges/ai/client.go`:
- Around line 669-672: The defer currently guarded by hasDBMessage prevents
removePendingAckReactions from running for direct-dispatch debounced messages;
remove the hasDBMessage conditional and call
oc.removePendingAckReactions(oc.backgroundContext(ctx), portal,
queueItem.pending) unconditionally in that defer (the removePendingAckReactions
implementation already internally guards), so ack reactions are always cleared
regardless of the hasDBMessage path.

In `@bridges/ai/pending_queue.go`:
- Around line 88-109: The drainPendingQueue/clearPendingQueue duo currently
removes queued items from oc.pendingQueues (in drainPendingQueue) but does not
perform item finalization (removing ack reactions and notifying rooms), so
callers like clearPendingQueue and code in subagent_registry.go/client.go can
silently drop items; update the API so draining also finalizes each
pendingQueueItem: either merge finalize logic into drainPendingQueue (and have
clearPendingQueue call it) or add a new DrainAndFinalizePendingQueue helper that
calls oc.stopQueueTyping(roomID) and then iterates the returned
[]pendingQueueItem to run the existing finalization steps (remove ack reactions
and send queue-rejected statuses used by the logic in abort_helpers.go),
ensuring callers use this single helper instead of clearPendingQueue to avoid
silent drops.

In `@bridges/ai/streaming_error_handling.go`:
- Around line 53-60: The code currently falls through from the "cancelled" case
and calls state.turn.End(msgconv.MapFinishReason(reason)), which maps
"cancelled" to "other"; change this by special-casing "cancelled" so that when
reason == "cancelled" you call state.turn.End with an explicit cancellation
finish reason (do not use msgconv.MapFinishReason for that path) — use the
appropriate cancellation constant or the raw "cancelled" value as accepted by
state.turn.End; keep the existing behavior for other reasons (i.e., continue to
call msgconv.MapFinishReason(reason) in the "stop" branch).

In `@bridges/ai/subagent_registry.go`:
- Around line 46-47: The code calls drainPendingQueue(run.ChildRoomID) and
counts drained items as "stopped" but drainPendingQueue only removes queues
without per-item cleanup; instead, call finalizeStoppedQueueItems(ctx, drained)
(or the appropriate helper that performs rejection/ack cleanup) for the returned
items before incrementing stopped, and update the caller (executeUserStopPlan in
abort_helpers.go) to thread the current ctx through so finalizeStoppedQueueItems
receives the same context; ensure you reference drainPendingQueue,
finalizeStoppedQueueItems, and executeUserStopPlan when making these edits.

In `@bridges/ai/turn_data.go`:
- Around line 72-76: The cancel check is incorrectly gated by responseID, so
change the logic in the function handling turn status (using state.responseID
and state.stop.Load()) to evaluate state.stop.Load() first and return
"cancelled" immediately if non-nil, regardless of responseID; keep the
responseID trim check only for the later finishReason/status inference (e.g.,
preserve the existing responseID-based early return for in-progress detection
but after the stop check). Ensure references to state.stop.Load(),
state.responseID, and any finishReason inference logic remain unchanged except
for reordering so explicit local cancellation wins.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: f167e069-efa4-4bbb-a051-dd68b86305ba

📥 Commits

Reviewing files that changed from the base of the PR and between 1e3a245 and ed72a84.

📒 Files selected for processing (13)
  • bridges/ai/abort_helpers.go
  • bridges/ai/abort_helpers_test.go
  • bridges/ai/client.go
  • bridges/ai/commands_parity.go
  • bridges/ai/handlematrix.go
  • bridges/ai/pending_queue.go
  • bridges/ai/room_runs.go
  • bridges/ai/streaming_error_handling.go
  • bridges/ai/streaming_init.go
  • bridges/ai/streaming_state.go
  • bridges/ai/subagent_registry.go
  • bridges/ai/turn_data.go
  • bridges/ai/ui_message_metadata.go
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: build-docker
  • GitHub Check: build-agentremote-docker (amd64)
  • GitHub Check: build-agentremote-docker (arm64)
  • GitHub Check: Lint
  • GitHub Check: build-docker
  • GitHub Check: Lint
🔇 Additional comments (2)
bridges/ai/abort_helpers_test.go (1)

14-159: Good coverage of the new stop flow.

These tests pin the plan-selection behavior, targeted queued removal, and the metadata.stop / response_status == "cancelled" contract, which should make regressions in this refactor much easier to catch.

bridges/ai/room_runs.go (1)

103-118: Nice stop-metadata handoff into late-bound run state.

Copying run.stop into state.stop in bindRoomRunState closes the case where a user stop lands before the streaming state is attached, and caching the targeting fields here keeps later lookups cheap.

Also applies to: 135-146

batuhan added 2 commits April 5, 2026 16:48
Pass context through pending-queue and subagent stop helpers (clearPendingQueue, stopSubagentRuns, finalizeStoppedQueueItems) and always finalize/drain pending items when clearing queues. Fix executeUserStopPlan logic to mark active stops before cancelling and fall back to a no-match when an active stop is a no-op. Ensure removePendingAckReactions is always invoked in goroutine cleanup. Adjust finishStreamingWithFailure to properly end turns on cancelled streams without falling through, and prefer explicit stop flag in canonicalResponseStatus. Add tests covering the no-op active stop fallback, cancelled finish behavior, and canonicalResponseStatus preference.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
bridges/ai/streaming_error_handling_test.go (1)

104-106: Tighten the cancellation assertion to validate semantics, not just non-nil.

At Line 104, asserting only err != nil can pass for unrelated errors. Consider asserting errors.Is(err, context.Canceled) to make this test stricter.

Suggested test refinement
  if err == nil {
  	t.Fatal("expected wrapped cancellation error")
  }
+ if !errors.Is(err, context.Canceled) {
+ 	t.Fatalf("expected error wrapping context.Canceled, got %v", err)
+ }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@bridges/ai/streaming_error_handling_test.go` around lines 104 - 106, The test
currently only checks that err != nil; tighten it to assert the cancellation
semantics by using errors.Is(err, context.Canceled) (or equivalent) instead of a
nil check so the test validates a wrapped cancellation error; update the
assertion around the err variable in the test (where err is produced) to use
errors.Is(err, context.Canceled) and keep the existing failure message
descriptive.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@bridges/ai/pending_queue.go`:
- Around line 96-106: The deletion of a pendingQueue from oc.pendingQueues while
producers may still hold a pointer can drop concurrent enqueues; in the delete
paths (around the block using oc.pendingQueuesMu, pendingQueues map and
queue.mu) acquire the queue.mu before removing the map entry (or otherwise
ensure the queue remains registered until no writer can hold the pointer) so
that AIClient.enqueuePendingItem() and AIClient.getPendingQueue() cannot append
to an orphaned queue; update both the delete sites (the shown block and the
similar one at 135-139) to lock queue.mu, perform the map delete while holding
that lock, then release locks in the correct order to prevent lost items.
- Around line 123-145: After removing items from queue.items, ensure
pendingQueue.lastItem is cleared if it pointed to a removed (tail) item: check
queue.lastItem != nil and if queue.lastItem.sourceEventID() == sourceEventID (or
otherwise matches an item in the removed slice) then set queue.lastItem = nil
(or assign it to the new tail from kept if you prefer preserving context); this
prevents AIClient.takePendingQueueDispatchCandidate()/snapshot.lastItem from
using a removed/stopped event as the next dispatch context.

---

Nitpick comments:
In `@bridges/ai/streaming_error_handling_test.go`:
- Around line 104-106: The test currently only checks that err != nil; tighten
it to assert the cancellation semantics by using errors.Is(err,
context.Canceled) (or equivalent) instead of a nil check so the test validates a
wrapped cancellation error; update the assertion around the err variable in the
test (where err is produced) to use errors.Is(err, context.Canceled) and keep
the existing failure message descriptive.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 1f1e0f1b-989c-4888-b423-86cefcd77750

📥 Commits

Reviewing files that changed from the base of the PR and between 02c907a and a242d26.

📒 Files selected for processing (11)
  • bridges/ai/abort_helpers.go
  • bridges/ai/abort_helpers_test.go
  • bridges/ai/client.go
  • bridges/ai/commands_parity.go
  • bridges/ai/internal_dispatch.go
  • bridges/ai/pending_queue.go
  • bridges/ai/streaming_error_handling.go
  • bridges/ai/streaming_error_handling_test.go
  • bridges/ai/subagent_registry.go
  • bridges/ai/turn_data.go
  • bridges/ai/turn_data_test.go
🚧 Files skipped from review as they are similar to previous changes (7)
  • bridges/ai/turn_data.go
  • bridges/ai/commands_parity.go
  • bridges/ai/client.go
  • bridges/ai/streaming_error_handling.go
  • bridges/ai/subagent_registry.go
  • bridges/ai/abort_helpers.go
  • bridges/ai/abort_helpers_test.go
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: build-agentremote-docker (arm64)
  • GitHub Check: build-agentremote-docker (amd64)
  • GitHub Check: Lint
  • GitHub Check: build-docker
  • GitHub Check: build-docker
  • GitHub Check: Lint
🔇 Additional comments (3)
bridges/ai/turn_data_test.go (1)

104-111: Good regression coverage for explicit stop precedence.

This test cleanly locks in the expected "cancelled" status when stop metadata is present without requiring a response ID.

bridges/ai/streaming_error_handling_test.go (2)

8-13: Imports look appropriate and are used by the new cancellation-path test.


108-112: Nice check on terminal metadata mapping for cancelled flows.

Verifying metadata["finish_reason"] == "cancelled" is the right assertion for this pathway.

Rework pending queue locking and item housekeeping to avoid races and stale pointers. getPendingQueue now locks the queue.mu before releasing pendingQueuesMu and consistently applies settings for both new and existing queues. drainPendingQueue clears queue.items and lastItem when removing a queue. removePendingQueueBySourceEvent reassigns lastItem to the new tail if the removed item was the last. enqueuePendingItem removed a now-redundant explicit lock (the returned queue is already locked). Added a unit test to verify lastItem is cleared/reassigned, and tightened an error assertion in the streaming test to use errors.Is for wrapped cancellations.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
bridges/ai/streaming_error_handling_test.go (1)

109-111: Make metadata assertion failure clearer.

Consider asserting the metadata type assertion result before reading finish_reason, so failures are more diagnostic when the snapshot shape changes.

Proposed test assertion tightening
-	metadata, _ := message["metadata"].(map[string]any)
-	if metadata["finish_reason"] != "cancelled" {
-		t.Fatalf("expected cancelled finish_reason, got %#v", metadata["finish_reason"])
-	}
+	metadata, ok := message["metadata"].(map[string]any)
+	if !ok {
+		t.Fatalf("expected metadata map, got %T", message["metadata"])
+	}
+	if metadata["finish_reason"] != "cancelled" {
+		t.Fatalf("expected cancelled finish_reason, got %#v", metadata["finish_reason"])
+	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@bridges/ai/streaming_error_handling_test.go` around lines 109 - 111, The test
currently does an unchecked type assertion for metadata then reads
metadata["finish_reason"], which makes failures cryptic; update the test to
assert the type assertion succeeded (capture the comma-ok like metadata, ok :=
message["metadata"].(map[string]any) and t.Fatalf if !ok, printing the actual
type/value), then assert metadata["finish_reason"] == "cancelled" — this ensures
the failure message clearly shows when the snapshot shape changes and points to
the unexpected metadata value or type.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@bridges/ai/streaming_error_handling_test.go`:
- Around line 109-111: The test currently does an unchecked type assertion for
metadata then reads metadata["finish_reason"], which makes failures cryptic;
update the test to assert the type assertion succeeded (capture the comma-ok
like metadata, ok := message["metadata"].(map[string]any) and t.Fatalf if !ok,
printing the actual type/value), then assert metadata["finish_reason"] ==
"cancelled" — this ensures the failure message clearly shows when the snapshot
shape changes and points to the unexpected metadata value or type.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 681a68ff-711f-4f24-a45d-65c87579a63b

📥 Commits

Reviewing files that changed from the base of the PR and between a242d26 and bf332af.

📒 Files selected for processing (3)
  • bridges/ai/pending_queue.go
  • bridges/ai/queue_status_test.go
  • bridges/ai/streaming_error_handling_test.go
✅ Files skipped from review due to trivial changes (1)
  • bridges/ai/queue_status_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • bridges/ai/pending_queue.go
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: build-docker
  • GitHub Check: build-agentremote-docker (arm64)
  • GitHub Check: build-agentremote-docker (amd64)
  • GitHub Check: Lint
  • GitHub Check: Lint
  • GitHub Check: build-docker
🔇 Additional comments (2)
bridges/ai/streaming_error_handling_test.go (2)

8-13: Imports are correctly added and used.

Both zerolog and streamui are used by the new cancellation-path test and keep the test setup explicit.


90-113: Good coverage for cancelled finish propagation.

This test correctly validates both error wrapping (errors.Is) and UI metadata propagation of finish_reason: "cancelled" for the cancellation path.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

1 participant