Skip to content

Commit a691f3c

Browse files
committed
Refactor queue checks, abort text, and streaming errors
Move pendingQueueHasSourceEvent into pending_queue.go and implement proper locking (pendingQueuesMu and queue.mu) to safely inspect queue items. Simplify drainPendingQueue to delete the queue map entry and return its items directly. Remove the duplicate helper from abort_helpers.go. Improve formatAbortNotice by capitalizing each sentence part and joining them with ". " for clearer messages. Remove redundant run field assignments in roomRunTarget. Adjust finishStreamingWithFailure to fall through from "cancelled" to "stop" so cancelled streams call End like stop cases and remove some redundant nil checks. These changes tidy concurrency handling, clarify abort messaging, and simplify streaming error handling.
1 parent 9c8e01d commit a691f3c

4 files changed

Lines changed: 30 additions & 35 deletions

File tree

bridges/ai/abort_helpers.go

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -75,31 +75,15 @@ func formatAbortNotice(result userStopResult) string {
7575
if len(parts) == 0 {
7676
return "No active or queued turns to stop."
7777
}
78-
suffix := ""
79-
if len(parts) > 1 {
80-
suffix = " " + strings.Join(parts[1:], ". ") + "."
78+
for i := range parts {
79+
parts[i] = strings.ToUpper(parts[i][:1]) + parts[i][1:]
8180
}
82-
return strings.ToUpper(parts[0][:1]) + parts[0][1:] + "." + suffix
81+
return strings.Join(parts, ". ") + "."
8382
default:
8483
return "No active or queued turns to stop."
8584
}
8685
}
8786

88-
func (oc *AIClient) pendingQueueHasSourceEvent(roomID id.RoomID, sourceEventID id.EventID) bool {
89-
if oc == nil || roomID == "" || sourceEventID == "" {
90-
return false
91-
}
92-
queue := oc.getQueueSnapshot(roomID)
93-
if queue == nil {
94-
return false
95-
}
96-
for _, item := range queue.items {
97-
if item.pending.sourceEventID() == sourceEventID {
98-
return true
99-
}
100-
}
101-
return false
102-
}
10387

10488
func buildStopMetadata(plan userStopPlan, req userStopRequest) *assistantStopMetadata {
10589
return &assistantStopMetadata{

bridges/ai/pending_queue.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -106,20 +106,36 @@ func (oc *AIClient) drainPendingQueue(roomID id.RoomID) []pendingQueueItem {
106106
return nil
107107
}
108108
delete(oc.pendingQueues, roomID)
109+
items := queue.items
109110
oc.pendingQueuesMu.Unlock()
110111

111-
queue.mu.Lock()
112-
items := slices.Clone(queue.items)
113-
queue.items = nil
114-
queue.summaryLines = nil
115-
queue.droppedCount = 0
116-
queue.lastItem = nil
117-
queue.mu.Unlock()
118-
119112
oc.stopQueueTyping(roomID)
120113
return items
121114
}
122115

116+
func (oc *AIClient) pendingQueueHasSourceEvent(roomID id.RoomID, sourceEventID id.EventID) bool {
117+
if oc == nil || roomID == "" || sourceEventID == "" {
118+
return false
119+
}
120+
oc.pendingQueuesMu.Lock()
121+
queue := oc.pendingQueues[roomID]
122+
if queue == nil {
123+
oc.pendingQueuesMu.Unlock()
124+
return false
125+
}
126+
queue.mu.Lock()
127+
found := false
128+
for _, item := range queue.items {
129+
if item.pending.sourceEventID() == sourceEventID {
130+
found = true
131+
break
132+
}
133+
}
134+
queue.mu.Unlock()
135+
oc.pendingQueuesMu.Unlock()
136+
return found
137+
}
138+
123139
func (oc *AIClient) removePendingQueueBySourceEvent(roomID id.RoomID, sourceEventID id.EventID) []pendingQueueItem {
124140
if oc == nil || roomID == "" || sourceEventID == "" {
125141
return nil

bridges/ai/room_runs.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,6 @@ func (oc *AIClient) roomRunTarget(roomID id.RoomID) (turnID string, sourceEventI
137137
turnID = state.turn.ID()
138138
sourceEventID = state.sourceEventID()
139139
initialEventID = state.turn.InitialEventID()
140-
run.turnID = turnID
141-
run.sourceEvent = sourceEventID
142-
run.initialEvent = initialEventID
143140
return turnID, sourceEventID, initialEventID, state
144141
}
145142

bridges/ai/streaming_error_handling.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,13 @@ func (oc *AIClient) finishStreamingWithFailure(
5353
switch reason {
5454
case "cancelled":
5555
state.writer().Abort(ctx, "cancelled")
56-
if state != nil && state.turn != nil {
57-
state.turn.End(msgconv.MapFinishReason(reason))
58-
}
56+
fallthrough
5957
case "stop":
60-
if state != nil && state.turn != nil {
58+
if state.turn != nil {
6159
state.turn.End(msgconv.MapFinishReason(reason))
6260
}
6361
default:
64-
if state != nil && state.turn != nil {
62+
if state.turn != nil {
6563
state.turn.EndWithError(err.Error())
6664
}
6765
}

0 commit comments

Comments
 (0)