Skip to content

Commit 3654fa6

Browse files
committed
feat(dashboard): added stats endpoint to compute stats on server side and avoid limit
1 parent 2cee30f commit 3654fa6

File tree

5 files changed

+411
-231
lines changed

5 files changed

+411
-231
lines changed
Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
import { db } from '@sim/db'
2+
import { permissions, workflow, workflowExecutionLogs } from '@sim/db/schema'
3+
import { createLogger } from '@sim/logger'
4+
import { and, eq, sql } from 'drizzle-orm'
5+
import { type NextRequest, NextResponse } from 'next/server'
6+
import { z } from 'zod'
7+
import { getSession } from '@/lib/auth'
8+
import { generateRequestId } from '@/lib/core/utils/request'
9+
import { buildFilterConditions, LogFilterParamsSchema } from '@/lib/logs/filters'
10+
11+
const logger = createLogger('LogsStatsAPI')
12+
13+
export const revalidate = 0
14+
15+
const StatsQueryParamsSchema = LogFilterParamsSchema.extend({
16+
segmentCount: z.coerce.number().optional().default(72),
17+
})
18+
19+
export interface SegmentStats {
20+
timestamp: string
21+
totalExecutions: number
22+
successfulExecutions: number
23+
avgDurationMs: number
24+
}
25+
26+
export interface WorkflowStats {
27+
workflowId: string
28+
workflowName: string
29+
segments: SegmentStats[]
30+
overallSuccessRate: number
31+
totalExecutions: number
32+
totalSuccessful: number
33+
}
34+
35+
export interface DashboardStatsResponse {
36+
workflows: WorkflowStats[]
37+
aggregateSegments: SegmentStats[]
38+
totalRuns: number
39+
totalErrors: number
40+
avgLatency: number
41+
timeBounds: {
42+
start: string
43+
end: string
44+
}
45+
segmentMs: number
46+
}
47+
48+
export async function GET(request: NextRequest) {
49+
const requestId = generateRequestId()
50+
51+
try {
52+
const session = await getSession()
53+
if (!session?.user?.id) {
54+
logger.warn(`[${requestId}] Unauthorized logs stats access attempt`)
55+
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
56+
}
57+
58+
const userId = session.user.id
59+
60+
try {
61+
const { searchParams } = new URL(request.url)
62+
const params = StatsQueryParamsSchema.parse(Object.fromEntries(searchParams.entries()))
63+
64+
const workspaceFilter = eq(workflowExecutionLogs.workspaceId, params.workspaceId)
65+
66+
const commonFilters = buildFilterConditions(params, { useSimpleLevelFilter: true })
67+
const whereCondition = commonFilters ? and(workspaceFilter, commonFilters) : workspaceFilter
68+
69+
const boundsQuery = await db
70+
.select({
71+
minTime: sql<string>`MIN(${workflowExecutionLogs.startedAt})`,
72+
maxTime: sql<string>`MAX(${workflowExecutionLogs.startedAt})`,
73+
})
74+
.from(workflowExecutionLogs)
75+
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
76+
.innerJoin(
77+
permissions,
78+
and(
79+
eq(permissions.entityType, 'workspace'),
80+
eq(permissions.entityId, workflowExecutionLogs.workspaceId),
81+
eq(permissions.userId, userId)
82+
)
83+
)
84+
.where(whereCondition)
85+
86+
const bounds = boundsQuery[0]
87+
const now = new Date()
88+
89+
let startTime: Date
90+
let endTime: Date
91+
92+
if (!bounds?.minTime || !bounds?.maxTime) {
93+
endTime = now
94+
startTime = new Date(now.getTime() - 24 * 60 * 60 * 1000)
95+
} else {
96+
startTime = new Date(bounds.minTime)
97+
endTime = new Date(Math.max(new Date(bounds.maxTime).getTime(), now.getTime()))
98+
}
99+
100+
const totalMs = Math.max(1, endTime.getTime() - startTime.getTime())
101+
const segmentMs = Math.max(60000, Math.floor(totalMs / params.segmentCount))
102+
103+
const statsQuery = await db
104+
.select({
105+
workflowId: workflowExecutionLogs.workflowId,
106+
workflowName: workflow.name,
107+
segmentIndex:
108+
sql<number>`FLOOR(EXTRACT(EPOCH FROM (${workflowExecutionLogs.startedAt} - ${startTime}::timestamp)) * 1000 / ${segmentMs})`.as(
109+
'segment_index'
110+
),
111+
totalExecutions: sql<number>`COUNT(*)`.as('total_executions'),
112+
successfulExecutions:
113+
sql<number>`COUNT(*) FILTER (WHERE ${workflowExecutionLogs.level} != 'error')`.as(
114+
'successful_executions'
115+
),
116+
avgDurationMs:
117+
sql<number>`COALESCE(AVG(${workflowExecutionLogs.totalDurationMs}) FILTER (WHERE ${workflowExecutionLogs.totalDurationMs} > 0), 0)`.as(
118+
'avg_duration_ms'
119+
),
120+
})
121+
.from(workflowExecutionLogs)
122+
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
123+
.innerJoin(
124+
permissions,
125+
and(
126+
eq(permissions.entityType, 'workspace'),
127+
eq(permissions.entityId, workflowExecutionLogs.workspaceId),
128+
eq(permissions.userId, userId)
129+
)
130+
)
131+
.where(whereCondition)
132+
.groupBy(
133+
workflowExecutionLogs.workflowId,
134+
workflow.name,
135+
sql`FLOOR(EXTRACT(EPOCH FROM (${workflowExecutionLogs.startedAt} - ${startTime}::timestamp)) * 1000 / ${segmentMs})`
136+
)
137+
.orderBy(workflowExecutionLogs.workflowId, sql`segment_index`)
138+
139+
const workflowMap = new Map<
140+
string,
141+
{
142+
workflowId: string
143+
workflowName: string
144+
segments: Map<number, SegmentStats>
145+
totalExecutions: number
146+
totalSuccessful: number
147+
}
148+
>()
149+
150+
for (const row of statsQuery) {
151+
const segmentIndex = Math.min(
152+
params.segmentCount - 1,
153+
Math.max(0, Math.floor(Number(row.segmentIndex)))
154+
)
155+
156+
if (!workflowMap.has(row.workflowId)) {
157+
workflowMap.set(row.workflowId, {
158+
workflowId: row.workflowId,
159+
workflowName: row.workflowName,
160+
segments: new Map(),
161+
totalExecutions: 0,
162+
totalSuccessful: 0,
163+
})
164+
}
165+
166+
const wf = workflowMap.get(row.workflowId)!
167+
wf.totalExecutions += Number(row.totalExecutions)
168+
wf.totalSuccessful += Number(row.successfulExecutions)
169+
170+
const existing = wf.segments.get(segmentIndex)
171+
if (existing) {
172+
existing.totalExecutions += Number(row.totalExecutions)
173+
existing.successfulExecutions += Number(row.successfulExecutions)
174+
existing.avgDurationMs = (existing.avgDurationMs + Number(row.avgDurationMs || 0)) / 2
175+
} else {
176+
wf.segments.set(segmentIndex, {
177+
timestamp: new Date(startTime.getTime() + segmentIndex * segmentMs).toISOString(),
178+
totalExecutions: Number(row.totalExecutions),
179+
successfulExecutions: Number(row.successfulExecutions),
180+
avgDurationMs: Number(row.avgDurationMs || 0),
181+
})
182+
}
183+
}
184+
185+
const workflows: WorkflowStats[] = []
186+
for (const wf of workflowMap.values()) {
187+
const segments: SegmentStats[] = []
188+
for (let i = 0; i < params.segmentCount; i++) {
189+
const existing = wf.segments.get(i)
190+
if (existing) {
191+
segments.push(existing)
192+
} else {
193+
segments.push({
194+
timestamp: new Date(startTime.getTime() + i * segmentMs).toISOString(),
195+
totalExecutions: 0,
196+
successfulExecutions: 0,
197+
avgDurationMs: 0,
198+
})
199+
}
200+
}
201+
202+
workflows.push({
203+
workflowId: wf.workflowId,
204+
workflowName: wf.workflowName,
205+
segments,
206+
totalExecutions: wf.totalExecutions,
207+
totalSuccessful: wf.totalSuccessful,
208+
overallSuccessRate:
209+
wf.totalExecutions > 0 ? (wf.totalSuccessful / wf.totalExecutions) * 100 : 100,
210+
})
211+
}
212+
213+
workflows.sort((a, b) => {
214+
const errA = a.overallSuccessRate < 100 ? 1 - a.overallSuccessRate / 100 : 0
215+
const errB = b.overallSuccessRate < 100 ? 1 - b.overallSuccessRate / 100 : 0
216+
if (errA !== errB) return errB - errA
217+
return a.workflowName.localeCompare(b.workflowName)
218+
})
219+
220+
const aggregateSegments: SegmentStats[] = []
221+
let totalRuns = 0
222+
let totalErrors = 0
223+
let weightedLatencySum = 0
224+
let latencyCount = 0
225+
226+
for (let i = 0; i < params.segmentCount; i++) {
227+
let segTotal = 0
228+
let segSuccess = 0
229+
let segWeightedLatency = 0
230+
let segLatencyCount = 0
231+
232+
for (const wf of workflows) {
233+
const seg = wf.segments[i]
234+
segTotal += seg.totalExecutions
235+
segSuccess += seg.successfulExecutions
236+
if (seg.avgDurationMs > 0 && seg.totalExecutions > 0) {
237+
segWeightedLatency += seg.avgDurationMs * seg.totalExecutions
238+
segLatencyCount += seg.totalExecutions
239+
}
240+
}
241+
242+
totalRuns += segTotal
243+
totalErrors += segTotal - segSuccess
244+
weightedLatencySum += segWeightedLatency
245+
latencyCount += segLatencyCount
246+
247+
aggregateSegments.push({
248+
timestamp: new Date(startTime.getTime() + i * segmentMs).toISOString(),
249+
totalExecutions: segTotal,
250+
successfulExecutions: segSuccess,
251+
avgDurationMs: segLatencyCount > 0 ? segWeightedLatency / segLatencyCount : 0,
252+
})
253+
}
254+
255+
const avgLatency = latencyCount > 0 ? weightedLatencySum / latencyCount : 0
256+
257+
const response: DashboardStatsResponse = {
258+
workflows,
259+
aggregateSegments,
260+
totalRuns,
261+
totalErrors,
262+
avgLatency,
263+
timeBounds: {
264+
start: startTime.toISOString(),
265+
end: endTime.toISOString(),
266+
},
267+
segmentMs,
268+
}
269+
270+
return NextResponse.json(response, { status: 200 })
271+
} catch (validationError) {
272+
if (validationError instanceof z.ZodError) {
273+
logger.warn(`[${requestId}] Invalid logs stats request parameters`, {
274+
errors: validationError.errors,
275+
})
276+
return NextResponse.json(
277+
{
278+
error: 'Invalid request parameters',
279+
details: validationError.errors,
280+
},
281+
{ status: 400 }
282+
)
283+
}
284+
throw validationError
285+
}
286+
} catch (error: any) {
287+
logger.error(`[${requestId}] logs stats fetch error`, error)
288+
return NextResponse.json({ error: error.message }, { status: 500 })
289+
}
290+
}

0 commit comments

Comments
 (0)