Skip to content

Commit 5af72ea

Browse files
authored
feat(dashboard): added stats endpoint to compute stats on server side and avoid limit (#2823)
* feat(dashboard): added stats endpoint to compute stats on server side and avoid limit * updated query
1 parent 4899c28 commit 5af72ea

File tree

5 files changed

+418
-231
lines changed

5 files changed

+418
-231
lines changed
Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
import { db } from '@sim/db'
2+
import { permissions, workflow, workflowExecutionLogs } from '@sim/db/schema'
3+
import { createLogger } from '@sim/logger'
4+
import { and, eq, sql } from 'drizzle-orm'
5+
import { type NextRequest, NextResponse } from 'next/server'
6+
import { z } from 'zod'
7+
import { getSession } from '@/lib/auth'
8+
import { generateRequestId } from '@/lib/core/utils/request'
9+
import { buildFilterConditions, LogFilterParamsSchema } from '@/lib/logs/filters'
10+
11+
const logger = createLogger('LogsStatsAPI')
12+
13+
export const revalidate = 0
14+
15+
const StatsQueryParamsSchema = LogFilterParamsSchema.extend({
16+
segmentCount: z.coerce.number().optional().default(72),
17+
})
18+
19+
export interface SegmentStats {
20+
timestamp: string
21+
totalExecutions: number
22+
successfulExecutions: number
23+
avgDurationMs: number
24+
}
25+
26+
export interface WorkflowStats {
27+
workflowId: string
28+
workflowName: string
29+
segments: SegmentStats[]
30+
overallSuccessRate: number
31+
totalExecutions: number
32+
totalSuccessful: number
33+
}
34+
35+
export interface DashboardStatsResponse {
36+
workflows: WorkflowStats[]
37+
aggregateSegments: SegmentStats[]
38+
totalRuns: number
39+
totalErrors: number
40+
avgLatency: number
41+
timeBounds: {
42+
start: string
43+
end: string
44+
}
45+
segmentMs: number
46+
}
47+
48+
export async function GET(request: NextRequest) {
49+
const requestId = generateRequestId()
50+
51+
try {
52+
const session = await getSession()
53+
if (!session?.user?.id) {
54+
logger.warn(`[${requestId}] Unauthorized logs stats access attempt`)
55+
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
56+
}
57+
58+
const userId = session.user.id
59+
60+
try {
61+
const { searchParams } = new URL(request.url)
62+
const params = StatsQueryParamsSchema.parse(Object.fromEntries(searchParams.entries()))
63+
64+
const workspaceFilter = eq(workflowExecutionLogs.workspaceId, params.workspaceId)
65+
66+
const commonFilters = buildFilterConditions(params, { useSimpleLevelFilter: true })
67+
const whereCondition = commonFilters ? and(workspaceFilter, commonFilters) : workspaceFilter
68+
69+
const boundsQuery = await db
70+
.select({
71+
minTime: sql<string>`MIN(${workflowExecutionLogs.startedAt})`,
72+
maxTime: sql<string>`MAX(${workflowExecutionLogs.startedAt})`,
73+
})
74+
.from(workflowExecutionLogs)
75+
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
76+
.innerJoin(
77+
permissions,
78+
and(
79+
eq(permissions.entityType, 'workspace'),
80+
eq(permissions.entityId, workflowExecutionLogs.workspaceId),
81+
eq(permissions.userId, userId)
82+
)
83+
)
84+
.where(whereCondition)
85+
86+
const bounds = boundsQuery[0]
87+
const now = new Date()
88+
89+
let startTime: Date
90+
let endTime: Date
91+
92+
if (!bounds?.minTime || !bounds?.maxTime) {
93+
endTime = now
94+
startTime = new Date(now.getTime() - 24 * 60 * 60 * 1000)
95+
} else {
96+
startTime = new Date(bounds.minTime)
97+
endTime = new Date(Math.max(new Date(bounds.maxTime).getTime(), now.getTime()))
98+
}
99+
100+
const totalMs = Math.max(1, endTime.getTime() - startTime.getTime())
101+
const segmentMs = Math.max(60000, Math.floor(totalMs / params.segmentCount))
102+
103+
const statsQuery = await db
104+
.select({
105+
workflowId: workflowExecutionLogs.workflowId,
106+
workflowName: workflow.name,
107+
segmentIndex:
108+
sql<number>`FLOOR(EXTRACT(EPOCH FROM (${workflowExecutionLogs.startedAt} - ${startTime}::timestamp)) * 1000 / ${segmentMs})`.as(
109+
'segment_index'
110+
),
111+
totalExecutions: sql<number>`COUNT(*)`.as('total_executions'),
112+
successfulExecutions:
113+
sql<number>`COUNT(*) FILTER (WHERE ${workflowExecutionLogs.level} != 'error')`.as(
114+
'successful_executions'
115+
),
116+
avgDurationMs:
117+
sql<number>`COALESCE(AVG(${workflowExecutionLogs.totalDurationMs}) FILTER (WHERE ${workflowExecutionLogs.totalDurationMs} > 0), 0)`.as(
118+
'avg_duration_ms'
119+
),
120+
})
121+
.from(workflowExecutionLogs)
122+
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
123+
.innerJoin(
124+
permissions,
125+
and(
126+
eq(permissions.entityType, 'workspace'),
127+
eq(permissions.entityId, workflowExecutionLogs.workspaceId),
128+
eq(permissions.userId, userId)
129+
)
130+
)
131+
.where(whereCondition)
132+
.groupBy(
133+
workflowExecutionLogs.workflowId,
134+
workflow.name,
135+
sql`FLOOR(EXTRACT(EPOCH FROM (${workflowExecutionLogs.startedAt} - ${startTime}::timestamp)) * 1000 / ${segmentMs})`
136+
)
137+
.orderBy(workflowExecutionLogs.workflowId, sql`segment_index`)
138+
139+
const workflowMap = new Map<
140+
string,
141+
{
142+
workflowId: string
143+
workflowName: string
144+
segments: Map<number, SegmentStats>
145+
totalExecutions: number
146+
totalSuccessful: number
147+
}
148+
>()
149+
150+
for (const row of statsQuery) {
151+
const segmentIndex = Math.min(
152+
params.segmentCount - 1,
153+
Math.max(0, Math.floor(Number(row.segmentIndex)))
154+
)
155+
156+
if (!workflowMap.has(row.workflowId)) {
157+
workflowMap.set(row.workflowId, {
158+
workflowId: row.workflowId,
159+
workflowName: row.workflowName,
160+
segments: new Map(),
161+
totalExecutions: 0,
162+
totalSuccessful: 0,
163+
})
164+
}
165+
166+
const wf = workflowMap.get(row.workflowId)!
167+
wf.totalExecutions += Number(row.totalExecutions)
168+
wf.totalSuccessful += Number(row.successfulExecutions)
169+
170+
const existing = wf.segments.get(segmentIndex)
171+
if (existing) {
172+
const oldTotal = existing.totalExecutions
173+
const newTotal = oldTotal + Number(row.totalExecutions)
174+
existing.totalExecutions = newTotal
175+
existing.successfulExecutions += Number(row.successfulExecutions)
176+
existing.avgDurationMs =
177+
newTotal > 0
178+
? (existing.avgDurationMs * oldTotal +
179+
Number(row.avgDurationMs || 0) * Number(row.totalExecutions)) /
180+
newTotal
181+
: 0
182+
} else {
183+
wf.segments.set(segmentIndex, {
184+
timestamp: new Date(startTime.getTime() + segmentIndex * segmentMs).toISOString(),
185+
totalExecutions: Number(row.totalExecutions),
186+
successfulExecutions: Number(row.successfulExecutions),
187+
avgDurationMs: Number(row.avgDurationMs || 0),
188+
})
189+
}
190+
}
191+
192+
const workflows: WorkflowStats[] = []
193+
for (const wf of workflowMap.values()) {
194+
const segments: SegmentStats[] = []
195+
for (let i = 0; i < params.segmentCount; i++) {
196+
const existing = wf.segments.get(i)
197+
if (existing) {
198+
segments.push(existing)
199+
} else {
200+
segments.push({
201+
timestamp: new Date(startTime.getTime() + i * segmentMs).toISOString(),
202+
totalExecutions: 0,
203+
successfulExecutions: 0,
204+
avgDurationMs: 0,
205+
})
206+
}
207+
}
208+
209+
workflows.push({
210+
workflowId: wf.workflowId,
211+
workflowName: wf.workflowName,
212+
segments,
213+
totalExecutions: wf.totalExecutions,
214+
totalSuccessful: wf.totalSuccessful,
215+
overallSuccessRate:
216+
wf.totalExecutions > 0 ? (wf.totalSuccessful / wf.totalExecutions) * 100 : 100,
217+
})
218+
}
219+
220+
workflows.sort((a, b) => {
221+
const errA = a.overallSuccessRate < 100 ? 1 - a.overallSuccessRate / 100 : 0
222+
const errB = b.overallSuccessRate < 100 ? 1 - b.overallSuccessRate / 100 : 0
223+
if (errA !== errB) return errB - errA
224+
return a.workflowName.localeCompare(b.workflowName)
225+
})
226+
227+
const aggregateSegments: SegmentStats[] = []
228+
let totalRuns = 0
229+
let totalErrors = 0
230+
let weightedLatencySum = 0
231+
let latencyCount = 0
232+
233+
for (let i = 0; i < params.segmentCount; i++) {
234+
let segTotal = 0
235+
let segSuccess = 0
236+
let segWeightedLatency = 0
237+
let segLatencyCount = 0
238+
239+
for (const wf of workflows) {
240+
const seg = wf.segments[i]
241+
segTotal += seg.totalExecutions
242+
segSuccess += seg.successfulExecutions
243+
if (seg.avgDurationMs > 0 && seg.totalExecutions > 0) {
244+
segWeightedLatency += seg.avgDurationMs * seg.totalExecutions
245+
segLatencyCount += seg.totalExecutions
246+
}
247+
}
248+
249+
totalRuns += segTotal
250+
totalErrors += segTotal - segSuccess
251+
weightedLatencySum += segWeightedLatency
252+
latencyCount += segLatencyCount
253+
254+
aggregateSegments.push({
255+
timestamp: new Date(startTime.getTime() + i * segmentMs).toISOString(),
256+
totalExecutions: segTotal,
257+
successfulExecutions: segSuccess,
258+
avgDurationMs: segLatencyCount > 0 ? segWeightedLatency / segLatencyCount : 0,
259+
})
260+
}
261+
262+
const avgLatency = latencyCount > 0 ? weightedLatencySum / latencyCount : 0
263+
264+
const response: DashboardStatsResponse = {
265+
workflows,
266+
aggregateSegments,
267+
totalRuns,
268+
totalErrors,
269+
avgLatency,
270+
timeBounds: {
271+
start: startTime.toISOString(),
272+
end: endTime.toISOString(),
273+
},
274+
segmentMs,
275+
}
276+
277+
return NextResponse.json(response, { status: 200 })
278+
} catch (validationError) {
279+
if (validationError instanceof z.ZodError) {
280+
logger.warn(`[${requestId}] Invalid logs stats request parameters`, {
281+
errors: validationError.errors,
282+
})
283+
return NextResponse.json(
284+
{
285+
error: 'Invalid request parameters',
286+
details: validationError.errors,
287+
},
288+
{ status: 400 }
289+
)
290+
}
291+
throw validationError
292+
}
293+
} catch (error: any) {
294+
logger.error(`[${requestId}] logs stats fetch error`, error)
295+
return NextResponse.json({ error: error.message }, { status: 500 })
296+
}
297+
}

0 commit comments

Comments
 (0)