-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathevents.ts
More file actions
90 lines (75 loc) · 2.73 KB
/
events.ts
File metadata and controls
90 lines (75 loc) · 2.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import type { ChildProcess } from "node:child_process";
import { EventEmitter } from "node:events";
export interface TaskEvents {
output: { line: string; runId: number; stream: "stderr" | "stdout" };
"queue:changed": { queue: string[]; runningTaskId: null | string };
"review:output": {
line: string;
reviewId: number;
stream: "stderr" | "stdout";
};
"review:queue": { queue: number[]; runningReviewId: null | number };
"review:status": { reviewId: number; status: string };
"status:changed": { runId: number; status: string };
}
class TaskEventBus extends EventEmitter {
emit<K extends keyof TaskEvents>(event: K, data: TaskEvents[K]): boolean {
return super.emit(event, data);
}
off<K extends keyof TaskEvents>(
event: K,
listener: (data: TaskEvents[K]) => void,
): this {
return super.off(event, listener);
}
on<K extends keyof TaskEvents>(
event: K,
listener: (data: TaskEvents[K]) => void,
): this {
return super.on(event, listener);
}
}
export const taskEvents = new TaskEventBus();
/** Map<runId, ChildProcess> for cancellation */
export const activeProcesses = new Map<number, ChildProcess>();
/** Map<runId, string[]> — ring buffer of last 1000 lines per runId */
const OUTPUT_BUFFER_SIZE = 1000;
export const outputBuffer = new Map<number, string[]>();
export function appendOutput(runId: number, line: string): void {
let buffer = outputBuffer.get(runId);
if (!buffer) {
buffer = [];
outputBuffer.set(runId, buffer);
}
buffer.push(line);
if (buffer.length > OUTPUT_BUFFER_SIZE) {
buffer.splice(0, buffer.length - OUTPUT_BUFFER_SIZE);
}
}
export function emitSystemLine(runId: number, message: string): void {
const line = `[system] ${message}`;
appendOutput(runId, line);
taskEvents.emit("output", { line, runId, stream: "stdout" });
}
/** Set of runIds that have been cancelled */
export const cancelledRuns = new Set<number>();
/** Map<reviewId, ChildProcess> for review cancellation */
export const activeReviewProcesses = new Map<number, ChildProcess>();
/** Map<reviewId, string[]> — ring buffer of last 1000 lines per reviewId */
export const reviewOutputBuffer = new Map<number, string[]>();
export function appendReviewOutput(reviewId: number, line: string): void {
let buffer = reviewOutputBuffer.get(reviewId);
if (!buffer) {
buffer = [];
reviewOutputBuffer.set(reviewId, buffer);
}
buffer.push(line);
if (buffer.length > OUTPUT_BUFFER_SIZE) {
buffer.splice(0, buffer.length - OUTPUT_BUFFER_SIZE);
}
}
export function emitReviewSystemLine(reviewId: number, message: string): void {
const line = `[system] ${message}`;
appendReviewOutput(reviewId, line);
taskEvents.emit("review:output", { line, reviewId, stream: "stdout" });
}