From 9f87c796035f42c27d4526e484137b1ba3e16447 Mon Sep 17 00:00:00 2001 From: Makisuo Date: Sat, 4 Apr 2026 23:09:45 +0200 Subject: [PATCH 1/7] init-resend setup --- apps/alerting/src/index.ts | 72 ++- apps/alerting/tsconfig.json | 1 + apps/api/package.json | 2 + apps/api/src/alerting.ts | 2 + apps/api/src/http.ts | 2 + apps/api/src/routes/digest.http.ts | 48 ++ apps/api/src/services/DigestService.ts | 477 ++++++++++++++++++ apps/api/src/services/EmailService.ts | 101 ++++ apps/api/src/services/Env.ts | 10 + apps/api/tsconfig.json | 1 + bun.lock | 2 + .../db/drizzle/0011_digest_subscriptions.sql | 16 + packages/db/drizzle/meta/_journal.json | 7 + packages/db/src/schema/digest.ts | 35 ++ packages/db/src/schema/index.ts | 1 + packages/domain/src/http/api.ts | 2 + packages/domain/src/http/digest.ts | 95 ++++ packages/domain/src/http/index.ts | 1 + packages/email/package.json | 2 +- packages/email/tsconfig.json | 1 + 20 files changed, 864 insertions(+), 14 deletions(-) create mode 100644 apps/api/src/routes/digest.http.ts create mode 100644 apps/api/src/services/DigestService.ts create mode 100644 apps/api/src/services/EmailService.ts create mode 100644 packages/db/drizzle/0011_digest_subscriptions.sql create mode 100644 packages/db/src/schema/digest.ts create mode 100644 packages/domain/src/http/digest.ts diff --git a/apps/alerting/src/index.ts b/apps/alerting/src/index.ts index df0f860e..1202e159 100644 --- a/apps/alerting/src/index.ts +++ b/apps/alerting/src/index.ts @@ -1,5 +1,5 @@ import { BunRuntime } from "@effect/platform-bun" -import { AlertRuntime, AlertsService, Database, Env, makeTelemetryLayer, OrgTinybirdSettingsService, QueryEngineService, TinybirdService } from "@maple/api/alerting" +import { AlertRuntime, AlertsService, Database, DigestService, EmailService, Env, makeTelemetryLayer, OrgTinybirdSettingsService, QueryEngineService, TinybirdService } from "@maple/api/alerting" import { Cause, Duration, Effect, Layer, Schedule } from "effect" const DatabaseLive = Database.Default.pipe( @@ -38,24 +38,36 @@ const AlertsServiceLive = AlertsService.Live.pipe( Layer.provide(AlertsDependenciesLive), ) +const EmailServiceLive = EmailService.Live.pipe( + Layer.provide(Env.Default), +) + +const DigestDependenciesLive = Layer.mergeAll( + BaseLive, + TinybirdServiceLive, + EmailServiceLive, +) + +const DigestServiceLive = DigestService.Live.pipe( + Layer.provide(DigestDependenciesLive), +) + const TelemetryLive = makeTelemetryLayer("alerting") -const program = Effect.gen(function* () { +const alertLoop = Effect.gen(function* () { const alerts = yield* AlertsService - yield* Effect.logInfo("Alerting worker started") - yield* alerts.runSchedulerTick().pipe( Effect.tap((result) => - Effect.logInfo("Alerting worker tick complete").pipe( - Effect.annotateLogs({ - evaluatedCount: result.evaluatedCount, - processedCount: result.processedCount, - evaluationFailureCount: result.evaluationFailureCount, - deliveryFailureCount: result.deliveryFailureCount, - }), - ), + Effect.logInfo("Alerting worker tick complete").pipe( + Effect.annotateLogs({ + evaluatedCount: result.evaluatedCount, + processedCount: result.processedCount, + evaluationFailureCount: result.evaluationFailureCount, + deliveryFailureCount: result.deliveryFailureCount, + }), ), + ), Effect.catchCause((cause) => Effect.logError("Alerting worker tick failed").pipe( Effect.annotateLogs({ error: Cause.pretty(cause) }), @@ -67,8 +79,42 @@ const program = Effect.gen(function* () { ), ), ) +}) + +const digestLoop = Effect.gen(function* () { + const digest = yield* DigestService + + yield* digest.runDigestTick().pipe( + Effect.tap((result) => + Effect.logInfo("Digest tick complete").pipe( + Effect.annotateLogs({ + sentCount: result.sentCount, + errorCount: result.errorCount, + skipped: result.skipped, + }), + ), + ), + Effect.catchCause((cause) => + Effect.logError("Digest tick failed").pipe( + Effect.annotateLogs({ error: Cause.pretty(cause) }), + ), + ), + Effect.repeat( + Schedule.spaced(Duration.minutes(15)).pipe( + Schedule.jittered, + ), + ), + ) +}) + +const program = Effect.gen(function* () { + yield* Effect.logInfo("Alerting worker started") + + // Run digest loop detached, alert loop in foreground + yield* digestLoop.pipe(Effect.ignore, Effect.forkDetach) + yield* alertLoop }).pipe( - Effect.provide(AlertsServiceLive), + Effect.provide(Layer.mergeAll(AlertsServiceLive, DigestServiceLive)), Effect.provide(TelemetryLive), ) diff --git a/apps/alerting/tsconfig.json b/apps/alerting/tsconfig.json index 844424ca..253f11ea 100644 --- a/apps/alerting/tsconfig.json +++ b/apps/alerting/tsconfig.json @@ -3,6 +3,7 @@ "compilerOptions": { "target": "ES2022", "module": "ESNext", + "jsx": "react-jsx", "lib": ["ES2022", "DOM", "DOM.Iterable"], "types": ["bun"], "moduleResolution": "bundler", diff --git a/apps/api/package.json b/apps/api/package.json index bd4ecc39..ce234564 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -23,6 +23,8 @@ "@libsql/client": "0.15.15", "@maple/db": "workspace:*", "@maple/domain": "workspace:*", + "@maple/email": "workspace:*", + "@react-email/components": "^1.0.11", "@maple/query-engine": "workspace:*", "@tinybirdco/sdk": "catalog:tinybird", "autumn-js": "^1.1.2", diff --git a/apps/api/src/alerting.ts b/apps/api/src/alerting.ts index d341b664..d2e0faca 100644 --- a/apps/api/src/alerting.ts +++ b/apps/api/src/alerting.ts @@ -1,5 +1,7 @@ export { AlertRuntime, AlertsService } from "./services/AlertsService"; export { Database } from "./services/DatabaseLive"; +export { DigestService } from "./services/DigestService"; +export { EmailService } from "./services/EmailService"; export { Env } from "./services/Env"; export { OrgTinybirdSettingsService } from "./services/OrgTinybirdSettingsService"; export { QueryEngineService } from "./services/QueryEngineService"; diff --git a/apps/api/src/http.ts b/apps/api/src/http.ts index 7431dd84..013b6509 100644 --- a/apps/api/src/http.ts +++ b/apps/api/src/http.ts @@ -6,6 +6,7 @@ import { HttpAlertsLive } from "./routes/alerts.http"; import { HttpAuthLive, HttpAuthPublicLive } from "./routes/auth.http"; import { HttpCloudflareLogpushLive } from "./routes/cloudflare-logpush.http"; import { HttpDashboardsLive } from "./routes/dashboards.http"; +import { HttpDigestLive } from "./routes/digest.http"; import { HttpIngestKeysLive } from "./routes/ingest-keys.http"; import { HttpObservabilityLive } from "./routes/observability.http"; import { HttpOrgTinybirdSettingsLive } from "./routes/org-tinybird-settings.http"; @@ -21,6 +22,7 @@ export const HttpApiRoutes = HttpApiBuilder.layer(MapleApi).pipe( Layer.provide(HttpAlertsLive), Layer.provide(HttpCloudflareLogpushLive), Layer.provide(HttpDashboardsLive), + Layer.provide(HttpDigestLive), Layer.provide(HttpIngestKeysLive), Layer.provide(HttpObservabilityLive), Layer.provide(HttpOrgTinybirdSettingsLive), diff --git a/apps/api/src/routes/digest.http.ts b/apps/api/src/routes/digest.http.ts new file mode 100644 index 00000000..b9627005 --- /dev/null +++ b/apps/api/src/routes/digest.http.ts @@ -0,0 +1,48 @@ +import { HttpApiBuilder } from "effect/unstable/httpapi" +import { CurrentTenant, MapleApi } from "@maple/domain/http" +import { Effect } from "effect" +import { DigestService } from "../services/DigestService" + +export const HttpDigestLive = HttpApiBuilder.group( + MapleApi, + "digest", + (handlers) => + Effect.gen(function* () { + const digest = yield* DigestService + + return handlers + .handle("getSubscription", () => + Effect.gen(function* () { + const tenant = yield* CurrentTenant.Context + return yield* digest.getSubscription(tenant.orgId, tenant.userId) + }), + ) + .handle("upsertSubscription", ({ payload }) => + Effect.gen(function* () { + const tenant = yield* CurrentTenant.Context + return yield* digest.upsertSubscription( + tenant.orgId, + tenant.userId, + { + email: payload.email, + enabled: payload.enabled, + dayOfWeek: payload.dayOfWeek, + timezone: payload.timezone, + }, + ) + }), + ) + .handle("deleteSubscription", () => + Effect.gen(function* () { + const tenant = yield* CurrentTenant.Context + yield* digest.deleteSubscription(tenant.orgId, tenant.userId) + }), + ) + .handle("preview", () => + Effect.gen(function* () { + const tenant = yield* CurrentTenant.Context + return yield* digest.preview(tenant.orgId) + }), + ) + }), +) diff --git a/apps/api/src/services/DigestService.ts b/apps/api/src/services/DigestService.ts new file mode 100644 index 00000000..d31b8d18 --- /dev/null +++ b/apps/api/src/services/DigestService.ts @@ -0,0 +1,477 @@ +import { digestSubscriptions } from "@maple/db" +import type { + DigestSubscriptionResponse, + DigestPreviewResponse, +} from "@maple/domain/http" +import { + DigestNotConfiguredError, + DigestPersistenceError, +} from "@maple/domain/http" +import type { OrgId, UserId } from "@maple/domain/http" +import { render } from "@react-email/components" +import { and, eq } from "drizzle-orm" +import { Cause, Effect, Layer, ServiceMap } from "effect" +import { WeeklyDigest, type WeeklyDigestProps } from "@maple/email/weekly-digest" +import { Database } from "./DatabaseLive" +import { EmailService } from "./EmailService" +import { Env } from "./Env" +import { TinybirdService } from "./TinybirdService" + +const toPersistenceError = (error: unknown) => + new DigestPersistenceError({ + message: error instanceof Error ? error.message : "Digest persistence error", + }) + +export class DigestService extends ServiceMap.Service()( + "DigestService", + { + make: Effect.gen(function* () { + const database = yield* Database + const email = yield* EmailService + const env = yield* Env + const tinybird = yield* TinybirdService + + const getSubscription = Effect.fn("DigestService.getSubscription")( + function* (orgId: OrgId, userId: UserId) { + const rows = yield* database + .execute((db) => + db + .select() + .from(digestSubscriptions) + .where( + and( + eq(digestSubscriptions.orgId, orgId), + eq(digestSubscriptions.userId, userId), + ), + ) + .limit(1), + ) + .pipe(Effect.mapError(toPersistenceError)) + + const row = rows[0] + if (!row) { + return yield* Effect.fail( + new DigestPersistenceError({ + message: "No digest subscription found", + }), + ) + } + + return rowToResponse(row) + }, + ) + + const upsertSubscription = Effect.fn( + "DigestService.upsertSubscription", + )(function* ( + orgId: OrgId, + userId: UserId, + input: { + email: string + enabled?: boolean + dayOfWeek?: number + timezone?: string + }, + ) { + const now = Date.now() + const id = crypto.randomUUID() + + yield* database + .execute((db) => + db + .insert(digestSubscriptions) + .values({ + id, + orgId, + userId, + email: input.email, + enabled: input.enabled === false ? 0 : 1, + dayOfWeek: input.dayOfWeek ?? 1, + timezone: input.timezone ?? "UTC", + createdAt: now, + updatedAt: now, + }) + .onConflictDoUpdate({ + target: [digestSubscriptions.orgId, digestSubscriptions.userId], + set: { + email: input.email, + enabled: input.enabled === false ? 0 : 1, + ...(input.dayOfWeek != null + ? { dayOfWeek: input.dayOfWeek } + : {}), + ...(input.timezone != null + ? { timezone: input.timezone } + : {}), + updatedAt: now, + }, + }), + ) + .pipe(Effect.mapError(toPersistenceError)) + + return yield* getSubscription(orgId, userId) + }) + + const deleteSubscription = Effect.fn( + "DigestService.deleteSubscription", + )(function* (orgId: OrgId, userId: UserId) { + yield* database + .execute((db) => + db + .delete(digestSubscriptions) + .where( + and( + eq(digestSubscriptions.orgId, orgId), + eq(digestSubscriptions.userId, userId), + ), + ), + ) + .pipe(Effect.mapError(toPersistenceError)) + }) + + const generateDigestData = Effect.fn( + "DigestService.generateDigestData", + )(function* (orgId: OrgId) { + const now = new Date() + const currentEnd = now.toISOString() + const currentStart = new Date( + now.getTime() - 7 * 24 * 60 * 60 * 1000, + ).toISOString() + const previousStart = new Date( + now.getTime() - 14 * 24 * 60 * 60 * 1000, + ).toISOString() + + const systemTenant = { + orgId, + userId: "system-digest" as UserId, + roles: ["root"] as any, + authMode: "self_hosted" as const, + } + + // Query all data in parallel + const [ + currentOverview, + previousOverview, + currentErrors, + currentUsage, + previousUsage, + topErrors, + ] = yield* Effect.all( + [ + tinybird.query(systemTenant, { + pipe: "service_overview", + params: { start_time: currentStart, end_time: currentEnd }, + }), + tinybird.query(systemTenant, { + pipe: "service_overview", + params: { start_time: previousStart, end_time: currentStart }, + }), + tinybird.query(systemTenant, { + pipe: "errors_summary", + params: { start_time: currentStart, end_time: currentEnd }, + }), + tinybird.query(systemTenant, { + pipe: "get_service_usage", + params: { start_time: currentStart, end_time: currentEnd }, + }), + tinybird.query(systemTenant, { + pipe: "get_service_usage", + params: { start_time: previousStart, end_time: currentStart }, + }), + tinybird.query(systemTenant, { + pipe: "errors_by_type", + params: { + start_time: currentStart, + end_time: currentEnd, + limit: 5, + }, + }), + ], + { concurrency: 6 }, + ).pipe( + Effect.mapError( + () => + new DigestPersistenceError({ + message: "Failed to fetch digest data from Tinybird", + }), + ), + ) + + // Aggregate service health + const curOverviewData = currentOverview.data as Array> + const prevOverviewData = previousOverview.data as Array> + + const totalRequests = curOverviewData.reduce( + (sum, s) => sum + (Number(s.total_count) || 0), + 0, + ) + const prevTotalRequests = prevOverviewData.reduce( + (sum, s) => sum + (Number(s.total_count) || 0), + 0, + ) + + const totalErrors = curOverviewData.reduce( + (sum, s) => sum + (Number(s.error_count) || 0), + 0, + ) + const prevTotalErrors = prevOverviewData.reduce( + (sum, s) => sum + (Number(s.error_count) || 0), + 0, + ) + + // Weighted avg P95 + const avgP95 = + totalRequests > 0 + ? curOverviewData.reduce( + (sum, s) => + sum + + (Number(s.p95_duration_ms) || 0) * + (Number(s.total_count) || 0), + 0, + ) / totalRequests + : 0 + const prevAvgP95 = + prevTotalRequests > 0 + ? prevOverviewData.reduce( + (sum, s) => + sum + + (Number(s.p95_duration_ms) || 0) * + (Number(s.total_count) || 0), + 0, + ) / prevTotalRequests + : 0 + + // Data volume + const curUsageData = currentUsage.data as Array> + const prevUsageData = previousUsage.data as Array> + const sumUsage = (data: Array>) => ({ + logs: data.reduce((s, r) => s + (Number(r.log_count) || 0), 0), + traces: data.reduce((s, r) => s + (Number(r.trace_count) || 0), 0), + metrics: data.reduce( + (s, r) => + s + + (Number(r.sum_metric_count) || 0) + + (Number(r.gauge_metric_count) || 0) + + (Number(r.histogram_metric_count) || 0) + + (Number(r.exp_histogram_metric_count) || 0), + 0, + ), + totalBytes: data.reduce( + (s, r) => + s + + (Number(r.log_size_bytes) || 0) + + (Number(r.trace_size_bytes) || 0) + + (Number(r.sum_metric_size_bytes) || 0) + + (Number(r.gauge_metric_size_bytes) || 0) + + (Number(r.histogram_metric_size_bytes) || 0) + + (Number(r.exp_histogram_metric_size_bytes) || 0), + 0, + ), + }) + const curUsage = sumUsage(curUsageData) + const prevUsage = sumUsage(prevUsageData) + + const delta = (cur: number, prev: number) => + prev === 0 ? (cur > 0 ? 100 : 0) : ((cur - prev) / prev) * 100 + + const formatDate = (d: Date) => + d.toLocaleDateString("en-US", { month: "short", day: "numeric" }) + + const services = curOverviewData + .sort( + (a, b) => + (Number(b.total_count) || 0) - (Number(a.total_count) || 0), + ) + .slice(0, 10) + .map((s) => ({ + name: String(s.service_name), + requests: Number(s.total_count) || 0, + errorRate: + (Number(s.total_count) || 0) > 0 + ? ((Number(s.error_count) || 0) / + (Number(s.total_count) || 0)) * + 100 + : 0, + p95Ms: Number(s.p95_duration_ms) || 0, + })) + + const errorsData = (topErrors.data as Array>) + .slice(0, 5) + .map((e) => ({ + message: String(e.error_fingerprint || e.status_message || "Unknown"), + count: Number(e.error_count) || 0, + })) + + const startDate = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000) + + const props: WeeklyDigestProps = { + orgName: orgId, + dateRange: { + start: formatDate(startDate), + end: formatDate(now), + }, + summary: { + requests: { + value: totalRequests, + delta: delta(totalRequests, prevTotalRequests), + }, + errors: { + value: totalErrors, + delta: delta(totalErrors, prevTotalErrors), + }, + p95Latency: { + valueMs: avgP95, + delta: delta(avgP95, prevAvgP95), + }, + dataVolume: { + valueBytes: curUsage.totalBytes, + delta: delta(curUsage.totalBytes, prevUsage.totalBytes), + }, + }, + services, + topErrors: errorsData, + ingestion: curUsage, + dashboardUrl: `${env.MAPLE_APP_BASE_URL}`, + unsubscribeUrl: `${env.MAPLE_APP_BASE_URL}/settings`, + } + + return props + }) + + const renderDigestHtml = Effect.fn("DigestService.renderDigestHtml")( + function* (props: WeeklyDigestProps) { + return yield* Effect.tryPromise({ + try: () => render(WeeklyDigest(props)), + catch: (error) => + new DigestPersistenceError({ + message: + error instanceof Error + ? error.message + : "Failed to render digest email", + }), + }) + }, + ) + + const preview = Effect.fn("DigestService.preview")(function* ( + orgId: OrgId, + ) { + const props = yield* generateDigestData(orgId) + const html = yield* renderDigestHtml(props) + return { html } as DigestPreviewResponse + }) + + const runDigestTick = Effect.fn("DigestService.runDigestTick")( + function* () { + if (!email.isConfigured) { + return { sentCount: 0, errorCount: 0, skipped: true } + } + + const now = Date.now() + const sevenDaysAgo = now - 7 * 24 * 60 * 60 * 1000 + const currentDayOfWeek = new Date().getUTCDay() + + // Find subscriptions due for sending + const subs = yield* database + .execute((db) => + db + .select() + .from(digestSubscriptions) + .where(eq(digestSubscriptions.enabled, 1)), + ) + .pipe(Effect.mapError(toPersistenceError)) + + const dueSubs = subs.filter( + (s) => + s.dayOfWeek === currentDayOfWeek && + (s.lastSentAt == null || s.lastSentAt < sevenDaysAgo), + ) + + if (dueSubs.length === 0) { + return { sentCount: 0, errorCount: 0, skipped: false } + } + + // Group by org to avoid duplicate Tinybird queries + const byOrg = new Map() + for (const sub of dueSubs) { + const existing = byOrg.get(sub.orgId) ?? [] + existing.push(sub) + byOrg.set(sub.orgId, existing) + } + + let sentCount = 0 + let errorCount = 0 + + for (const [orgId, orgSubs] of byOrg) { + const sendForOrg = Effect.gen(function* () { + const props = yield* generateDigestData(orgId as OrgId) + const html = yield* renderDigestHtml(props) + + for (const sub of orgSubs) { + yield* email + .send( + sub.email, + `Maple Weekly Digest — ${props.dateRange.start} to ${props.dateRange.end}`, + html, + ) + .pipe( + Effect.tap(() => + database.execute((db) => + db + .update(digestSubscriptions) + .set({ lastSentAt: Date.now() }) + .where(eq(digestSubscriptions.id, sub.id)), + ), + ), + Effect.match({ + onSuccess: () => { sentCount++ }, + onFailure: () => { errorCount++ }, + }), + ) + } + }).pipe( + Effect.catchCause((cause) => + Effect.logError("Digest failed for org").pipe( + Effect.annotateLogs({ orgId, error: Cause.pretty(cause) }), + Effect.tap(() => + Effect.sync(() => { errorCount += orgSubs.length }), + ), + ), + ), + ) + + yield* sendForOrg + } + + return { sentCount, errorCount, skipped: false } + }, + ) + + return { + getSubscription, + upsertSubscription, + deleteSubscription, + preview, + runDigestTick, + } + }), + }, +) { + static readonly layer = Layer.effect(this, this.make) + static readonly Live = this.layer + static readonly Default = this.layer +} + +function rowToResponse( + row: typeof digestSubscriptions.$inferSelect, +): DigestSubscriptionResponse { + return { + id: row.id, + email: row.email, + enabled: row.enabled === 1, + dayOfWeek: row.dayOfWeek, + timezone: row.timezone, + lastSentAt: row.lastSentAt ?? null, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + } as DigestSubscriptionResponse +} diff --git a/apps/api/src/services/EmailService.ts b/apps/api/src/services/EmailService.ts new file mode 100644 index 00000000..368bb5a6 --- /dev/null +++ b/apps/api/src/services/EmailService.ts @@ -0,0 +1,101 @@ +import { Duration, Effect, Layer, Option, Redacted, Schema, ServiceMap } from "effect" +import { Env } from "./Env" + +export class EmailDeliveryError extends Schema.TaggedErrorClass()( + "@maple/errors/EmailDeliveryError", + { + message: Schema.String, + }, +) {} + +export interface EmailServiceShape { + readonly isConfigured: boolean + readonly send: ( + to: string, + subject: string, + html: string, + ) => Effect.Effect +} + +const EMAIL_TIMEOUT = Duration.seconds(15) + +export class EmailService extends ServiceMap.Service()( + "EmailService", + { + make: Effect.gen(function* () { + const env = yield* Env + const apiKey = env.RESEND_API_KEY + const fromEmail = env.RESEND_FROM_EMAIL + + const isConfigured = Option.isSome(apiKey) + + const send = Effect.fn("EmailService.send")(function* ( + to: string, + subject: string, + html: string, + ) { + if (Option.isNone(apiKey)) { + return yield* Effect.fail( + new EmailDeliveryError({ + message: "Email not configured: RESEND_API_KEY is not set", + }), + ) + } + + const response = yield* Effect.tryPromise({ + try: () => + fetch("https://api.resend.com/emails", { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${Redacted.value(apiKey.value)}`, + }, + body: JSON.stringify({ + from: fromEmail, + to: [to], + subject, + html, + }), + }), + catch: (error) => + new EmailDeliveryError({ + message: error instanceof Error + ? error.message + : "Resend API request failed", + }), + }).pipe( + Effect.timeoutOrElse({ + duration: EMAIL_TIMEOUT, + onTimeout: () => + Effect.fail( + new EmailDeliveryError({ + message: "Resend API request timed out after 15s", + }), + ), + }), + ) + + if (!response.ok) { + const body = yield* Effect.tryPromise({ + try: () => response.text(), + catch: () => + new EmailDeliveryError({ + message: `Resend API returned ${response.status}`, + }), + }) + return yield* Effect.fail( + new EmailDeliveryError({ + message: `Resend API returned ${response.status}: ${body}`, + }), + ) + } + }) + + return { isConfigured, send } + }), + }, +) { + static readonly layer = Layer.effect(this, this.make) + static readonly Live = this.layer + static readonly Default = this.layer +} diff --git a/apps/api/src/services/Env.ts b/apps/api/src/services/Env.ts index 6a5e6498..953cb1f1 100644 --- a/apps/api/src/services/Env.ts +++ b/apps/api/src/services/Env.ts @@ -21,6 +21,8 @@ export interface EnvShape { readonly AUTUMN_SECRET_KEY: Option.Option> readonly SD_INTERNAL_TOKEN: Option.Option> readonly INTERNAL_SERVICE_TOKEN: Option.Option> + readonly RESEND_API_KEY: Option.Option> + readonly RESEND_FROM_EMAIL: string } export class Env extends ServiceMap.Service()("Env", { @@ -77,6 +79,12 @@ export class Env extends ServiceMap.Service()("Env", { INTERNAL_SERVICE_TOKEN: yield* Config.option( Config.redacted("INTERNAL_SERVICE_TOKEN"), ), + RESEND_API_KEY: yield* Config.option( + Config.redacted("RESEND_API_KEY"), + ), + RESEND_FROM_EMAIL: yield* Config.string("RESEND_FROM_EMAIL").pipe( + Config.withDefault("Maple "), + ), } as const; const normalizedEnv = { @@ -90,6 +98,8 @@ export class Env extends ServiceMap.Service()("Env", { AUTUMN_SECRET_KEY: normalizeOptionalSecret(env.AUTUMN_SECRET_KEY), SD_INTERNAL_TOKEN: normalizeOptionalSecret(env.SD_INTERNAL_TOKEN), INTERNAL_SERVICE_TOKEN: normalizeOptionalSecret(env.INTERNAL_SERVICE_TOKEN), + RESEND_API_KEY: normalizeOptionalSecret(env.RESEND_API_KEY), + RESEND_FROM_EMAIL: env.RESEND_FROM_EMAIL, } as const const authMode = normalizedEnv.MAPLE_AUTH_MODE.toLowerCase() diff --git a/apps/api/tsconfig.json b/apps/api/tsconfig.json index 1e8e6077..0460cf5e 100644 --- a/apps/api/tsconfig.json +++ b/apps/api/tsconfig.json @@ -3,6 +3,7 @@ "compilerOptions": { "target": "ES2022", "module": "ESNext", + "jsx": "react-jsx", "lib": ["ES2022", "DOM", "DOM.Iterable"], "types": ["bun", "node"], "moduleResolution": "bundler", diff --git a/bun.lock b/bun.lock index d46cb5dc..45b0d058 100644 --- a/bun.lock +++ b/bun.lock @@ -35,7 +35,9 @@ "@libsql/client": "0.15.15", "@maple/db": "workspace:*", "@maple/domain": "workspace:*", + "@maple/email": "workspace:*", "@maple/query-engine": "workspace:*", + "@react-email/components": "^1.0.11", "@tinybirdco/sdk": "catalog:tinybird", "autumn-js": "^1.1.2", "date-fns": "^4.1.0", diff --git a/packages/db/drizzle/0011_digest_subscriptions.sql b/packages/db/drizzle/0011_digest_subscriptions.sql new file mode 100644 index 00000000..56a74cb4 --- /dev/null +++ b/packages/db/drizzle/0011_digest_subscriptions.sql @@ -0,0 +1,16 @@ +CREATE TABLE `digest_subscriptions` ( + `id` text PRIMARY KEY NOT NULL, + `org_id` text NOT NULL, + `user_id` text NOT NULL, + `email` text NOT NULL, + `enabled` integer DEFAULT 1 NOT NULL, + `day_of_week` integer DEFAULT 1 NOT NULL, + `timezone` text DEFAULT 'UTC' NOT NULL, + `last_sent_at` integer, + `created_at` integer NOT NULL, + `updated_at` integer NOT NULL +); +--> statement-breakpoint +CREATE UNIQUE INDEX `digest_subscriptions_org_user_idx` ON `digest_subscriptions` (`org_id`,`user_id`); +--> statement-breakpoint +CREATE INDEX `digest_subscriptions_org_enabled_idx` ON `digest_subscriptions` (`org_id`,`enabled`); diff --git a/packages/db/drizzle/meta/_journal.json b/packages/db/drizzle/meta/_journal.json index 9ad32622..912589f3 100644 --- a/packages/db/drizzle/meta/_journal.json +++ b/packages/db/drizzle/meta/_journal.json @@ -78,6 +78,13 @@ "when": 1774400000000, "tag": "0010_exclude_service_names", "breakpoints": true + }, + { + "idx": 11, + "version": "7", + "when": 1775300000000, + "tag": "0011_digest_subscriptions", + "breakpoints": true } ] } diff --git a/packages/db/src/schema/digest.ts b/packages/db/src/schema/digest.ts new file mode 100644 index 00000000..cddd6487 --- /dev/null +++ b/packages/db/src/schema/digest.ts @@ -0,0 +1,35 @@ +import { + index, + integer, + sqliteTable, + text, + uniqueIndex, +} from "drizzle-orm/sqlite-core" + +export const digestSubscriptions = sqliteTable( + "digest_subscriptions", + { + id: text("id").notNull().primaryKey(), + orgId: text("org_id").notNull(), + userId: text("user_id").notNull(), + email: text("email").notNull(), + enabled: integer("enabled", { mode: "number" }).notNull().default(1), + dayOfWeek: integer("day_of_week", { mode: "number" }).notNull().default(1), + timezone: text("timezone").notNull().default("UTC"), + lastSentAt: integer("last_sent_at", { mode: "number" }), + createdAt: integer("created_at", { mode: "number" }).notNull(), + updatedAt: integer("updated_at", { mode: "number" }).notNull(), + }, + (table) => [ + uniqueIndex("digest_subscriptions_org_user_idx").on( + table.orgId, + table.userId, + ), + index("digest_subscriptions_org_enabled_idx").on( + table.orgId, + table.enabled, + ), + ], +) + +export type DigestSubscriptionRow = typeof digestSubscriptions.$inferSelect diff --git a/packages/db/src/schema/index.ts b/packages/db/src/schema/index.ts index 33fe3469..23e5d928 100644 --- a/packages/db/src/schema/index.ts +++ b/packages/db/src/schema/index.ts @@ -2,6 +2,7 @@ export * from "./alerts"; export * from "./api-keys"; export * from "./cloudflare-logpush-connectors"; export * from "./dashboards"; +export * from "./digest"; export * from "./org-ingest-keys"; export * from "./org-tinybird-settings"; export * from "./scrape-targets"; diff --git a/packages/domain/src/http/api.ts b/packages/domain/src/http/api.ts index 65f9e797..3ceb1fc1 100644 --- a/packages/domain/src/http/api.ts +++ b/packages/domain/src/http/api.ts @@ -4,6 +4,7 @@ import { AlertsApiGroup } from "./alerts"; import { AuthApiGroup, AuthPublicApiGroup } from "./auth"; import { CloudflareLogpushApiGroup } from "./cloudflare-logpush"; import { DashboardsApiGroup } from "./dashboards"; +import { DigestApiGroup } from "./digest"; import { IngestKeysApiGroup } from "./ingest-keys"; import { ObservabilityApiGroup } from "./observability"; import { OrgTinybirdSettingsApiGroup } from "./org-tinybird-settings"; @@ -19,6 +20,7 @@ export class MapleApi extends HttpApi.make("MapleApi") .add(AlertsApiGroup) .add(CloudflareLogpushApiGroup) .add(DashboardsApiGroup) + .add(DigestApiGroup) .add(IngestKeysApiGroup) .add(ObservabilityApiGroup) .add(OrgTinybirdSettingsApiGroup) diff --git a/packages/domain/src/http/digest.ts b/packages/domain/src/http/digest.ts new file mode 100644 index 00000000..ca008a9a --- /dev/null +++ b/packages/domain/src/http/digest.ts @@ -0,0 +1,95 @@ +import { HttpApiEndpoint, HttpApiGroup } from "effect/unstable/httpapi" +import { Schema } from "effect" +import { Authorization } from "./current-tenant" + +export const DigestSubscriptionId = Schema.String.check(Schema.isUUID()).pipe( + Schema.brand("@maple/DigestSubscriptionId"), + Schema.annotate({ + identifier: "@maple/DigestSubscriptionId", + title: "Digest Subscription ID", + }), +) +export type DigestSubscriptionId = Schema.Schema.Type< + typeof DigestSubscriptionId +> + +export class DigestSubscriptionResponse extends Schema.Class( + "DigestSubscriptionResponse", +)({ + id: DigestSubscriptionId, + email: Schema.String, + enabled: Schema.Boolean, + dayOfWeek: Schema.Number, + timezone: Schema.String, + lastSentAt: Schema.NullOr(Schema.Number), + createdAt: Schema.Number, + updatedAt: Schema.Number, +}) {} + +export class UpsertDigestSubscriptionRequest extends Schema.Class( + "UpsertDigestSubscriptionRequest", +)({ + email: Schema.String, + enabled: Schema.optional(Schema.Boolean), + dayOfWeek: Schema.optional(Schema.Number), + timezone: Schema.optional(Schema.String), +}) {} + +export class DigestPreviewResponse extends Schema.Class( + "DigestPreviewResponse", +)({ + html: Schema.String, +}) {} + +export class DigestPersistenceError extends Schema.TaggedErrorClass()( + "@maple/http/errors/DigestPersistenceError", + { + message: Schema.String, + }, + { httpApiStatus: 503 }, +) {} + +export class DigestNotFoundError extends Schema.TaggedErrorClass()( + "@maple/http/errors/DigestNotFoundError", + { + message: Schema.String, + }, + { httpApiStatus: 404 }, +) {} + +export class DigestNotConfiguredError extends Schema.TaggedErrorClass()( + "@maple/http/errors/DigestNotConfiguredError", + { + message: Schema.String, + }, + { httpApiStatus: 501 }, +) {} + +export class DigestApiGroup extends HttpApiGroup.make("digest") + .add( + HttpApiEndpoint.get("getSubscription", "/", { + success: DigestSubscriptionResponse, + error: DigestPersistenceError, + }), + ) + .add( + HttpApiEndpoint.post("upsertSubscription", "/", { + payload: UpsertDigestSubscriptionRequest, + success: DigestSubscriptionResponse, + error: DigestPersistenceError, + }), + ) + .add( + HttpApiEndpoint.delete("deleteSubscription", "/", { + success: Schema.Void, + error: DigestPersistenceError, + }), + ) + .add( + HttpApiEndpoint.post("preview", "/preview", { + success: DigestPreviewResponse, + error: [DigestPersistenceError, DigestNotConfiguredError], + }), + ) + .prefix("/api/digest") + .middleware(Authorization) {} diff --git a/packages/domain/src/http/index.ts b/packages/domain/src/http/index.ts index 87b96858..36225bb8 100644 --- a/packages/domain/src/http/index.ts +++ b/packages/domain/src/http/index.ts @@ -5,6 +5,7 @@ export * from "./auth"; export * from "./cloudflare-logpush"; export * as CurrentTenant from "./current-tenant"; export * from "./dashboards"; +export * from "./digest"; export * from "./ingest-keys"; export * from "./org-tinybird-settings"; export * from "../primitives"; diff --git a/packages/email/package.json b/packages/email/package.json index 3a287b47..5052830f 100644 --- a/packages/email/package.json +++ b/packages/email/package.json @@ -3,7 +3,7 @@ "private": true, "type": "module", "exports": { - "./*": "./src/*" + "./weekly-digest": "./src/weekly-digest.tsx" }, "scripts": { "dev:email": "email dev", diff --git a/packages/email/tsconfig.json b/packages/email/tsconfig.json index 7e4b9b52..61473db7 100644 --- a/packages/email/tsconfig.json +++ b/packages/email/tsconfig.json @@ -1,5 +1,6 @@ { "include": ["**/*.ts", "**/*.tsx"], + "exclude": ["**/render-preview*"], "compilerOptions": { "target": "ES2022", "module": "ESNext", From 158e662bfa1ce38076bdd1654e0256b72fb40237 Mon Sep 17 00:00:00 2001 From: Makisuo Date: Mon, 6 Apr 2026 23:25:53 +0200 Subject: [PATCH 2/7] fix --- apps/api/src/index.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 5fe605b3..7317efce 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -12,6 +12,8 @@ import { AuthorizationLive } from "./services/AuthorizationLive"; import { CloudflareLogpushService } from "./services/CloudflareLogpushService"; import { DashboardPersistenceService } from "./services/DashboardPersistenceService"; import { Database } from "./services/DatabaseLive"; +import { DigestService } from "./services/DigestService"; +import { EmailService } from "./services/EmailService"; import { Env } from "./services/Env"; import { OrgIngestKeysService } from "./services/OrgIngestKeysService"; import { OrgTinybirdSettingsService } from "./services/OrgTinybirdSettingsService"; @@ -62,11 +64,20 @@ const AlertsServiceLive = AlertsService.layer.pipe( Layer.provideMerge(Layer.mergeAll(CoreServicesLive, QueryEngineServiceLive, AlertRuntime.Default)), ) +const EmailServiceLive = EmailService.layer.pipe( + Layer.provide(Env.layer), +) + +const DigestServiceLive = DigestService.layer.pipe( + Layer.provideMerge(Layer.mergeAll(InfraLive, TinybirdServiceLive, EmailServiceLive)), +) + const MainLive = Layer.mergeAll( CoreServicesLive, TinybirdServiceLive, QueryEngineServiceLive, AlertsServiceLive, + DigestServiceLive, ) const AllRoutes = Layer.mergeAll( From d243b519f75ed227b7f5537b2f38e6b09d693091 Mon Sep 17 00:00:00 2001 From: Makisuo Date: Mon, 6 Apr 2026 23:49:39 +0200 Subject: [PATCH 3/7] fix --- apps/alerting/src/index.ts | 20 +- apps/api/src/index.ts | 10 +- apps/api/src/services/DigestService.ts | 223 ++++++++++++------ apps/api/src/services/EmailService.ts | 17 +- apps/api/src/services/Env.ts | 4 +- .../settings/notifications-section.tsx | 96 ++++++++ apps/web/src/routes/settings.tsx | 7 +- packages/domain/src/http/digest.ts | 14 +- 8 files changed, 297 insertions(+), 94 deletions(-) create mode 100644 apps/web/src/components/settings/notifications-section.tsx diff --git a/apps/alerting/src/index.ts b/apps/alerting/src/index.ts index 1202e159..d8f32cb0 100644 --- a/apps/alerting/src/index.ts +++ b/apps/alerting/src/index.ts @@ -38,7 +38,7 @@ const AlertsServiceLive = AlertsService.Live.pipe( Layer.provide(AlertsDependenciesLive), ) -const EmailServiceLive = EmailService.Live.pipe( +const EmailServiceLive = EmailService.Default.pipe( Layer.provide(Env.Default), ) @@ -48,7 +48,7 @@ const DigestDependenciesLive = Layer.mergeAll( EmailServiceLive, ) -const DigestServiceLive = DigestService.Live.pipe( +const DigestServiceLive = DigestService.Default.pipe( Layer.provide(DigestDependenciesLive), ) @@ -111,11 +111,21 @@ const program = Effect.gen(function* () { yield* Effect.logInfo("Alerting worker started") // Run digest loop detached, alert loop in foreground - yield* digestLoop.pipe(Effect.ignore, Effect.forkDetach) + yield* digestLoop.pipe( + Effect.catchCause((cause) => + Effect.logError("Digest loop terminated unexpectedly").pipe( + Effect.annotateLogs({ error: Cause.pretty(cause) }), + ), + ), + Effect.forkDetach, + ) yield* alertLoop }).pipe( - Effect.provide(Layer.mergeAll(AlertsServiceLive, DigestServiceLive)), - Effect.provide(TelemetryLive), + Effect.provide( + Layer.mergeAll(AlertsServiceLive, DigestServiceLive).pipe( + Layer.provide(TelemetryLive), + ), + ), ) BunRuntime.runMain(program) diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 7317efce..b7b0af58 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -37,7 +37,7 @@ const DocsRoute = HttpApiScalar.layer(MapleApi, { }); const InfraLive = Database.layer.pipe( - Layer.provideMerge(Env.layer), + Layer.provideMerge(Env.Default), ) const CoreServicesLive = Layer.mergeAll( @@ -64,11 +64,11 @@ const AlertsServiceLive = AlertsService.layer.pipe( Layer.provideMerge(Layer.mergeAll(CoreServicesLive, QueryEngineServiceLive, AlertRuntime.Default)), ) -const EmailServiceLive = EmailService.layer.pipe( - Layer.provide(Env.layer), +const EmailServiceLive = EmailService.Default.pipe( + Layer.provide(Env.Default), ) -const DigestServiceLive = DigestService.layer.pipe( +const DigestServiceLive = DigestService.Default.pipe( Layer.provideMerge(Layer.mergeAll(InfraLive, TinybirdServiceLive, EmailServiceLive)), ) @@ -116,7 +116,7 @@ const RuntimeLive = Layer.mergeAll( const app = HttpRouter.serve(AllRoutes).pipe( Layer.provide(RuntimeLive), Layer.provide(MainLive), - Layer.provide(AuthorizationLive.pipe(Layer.provideMerge(Env.layer))), + Layer.provide(AuthorizationLive.pipe(Layer.provideMerge(Env.Default))), ); BunRuntime.runMain(app.pipe(Layer.launch as never)); diff --git a/apps/api/src/services/DigestService.ts b/apps/api/src/services/DigestService.ts index d31b8d18..4e5ced57 100644 --- a/apps/api/src/services/DigestService.ts +++ b/apps/api/src/services/DigestService.ts @@ -1,27 +1,63 @@ import { digestSubscriptions } from "@maple/db" -import type { - DigestSubscriptionResponse, - DigestPreviewResponse, -} from "@maple/domain/http" import { DigestNotConfiguredError, + DigestNotFoundError, DigestPersistenceError, + DigestPreviewResponse, + DigestRenderError, + DigestSubscriptionId, + DigestSubscriptionResponse, + OrgId, + UserId, + RoleName, } from "@maple/domain/http" -import type { OrgId, UserId } from "@maple/domain/http" +import type { OrgId as OrgIdType, RoleName as RoleNameType } from "@maple/domain/http" import { render } from "@react-email/components" import { and, eq } from "drizzle-orm" -import { Cause, Effect, Layer, ServiceMap } from "effect" +import { Array as Arr, Cause, Effect, Layer, ServiceMap } from "effect" import { WeeklyDigest, type WeeklyDigestProps } from "@maple/email/weekly-digest" import { Database } from "./DatabaseLive" import { EmailService } from "./EmailService" import { Env } from "./Env" import { TinybirdService } from "./TinybirdService" +const SYSTEM_DIGEST_USER = UserId.makeUnsafe("system-digest") +const ROOT_ROLE = RoleName.makeUnsafe("root") + const toPersistenceError = (error: unknown) => new DigestPersistenceError({ - message: error instanceof Error ? error.message : "Digest persistence error", + message: error instanceof Error ? `${error.message}` : `Digest persistence error: ${String(error)}`, }) +/** Row shapes for Tinybird query responses */ +interface ServiceOverviewRow { + service_name: string + total_count: number + error_count: number + p95_duration_ms: number +} + +interface ServiceUsageRow { + log_count: number + trace_count: number + sum_metric_count: number + gauge_metric_count: number + histogram_metric_count: number + exp_histogram_metric_count: number + log_size_bytes: number + trace_size_bytes: number + sum_metric_size_bytes: number + gauge_metric_size_bytes: number + histogram_metric_size_bytes: number + exp_histogram_metric_size_bytes: number +} + +interface ErrorsByTypeRow { + error_fingerprint?: string + status_message?: string + error_count: number +} + export class DigestService extends ServiceMap.Service()( "DigestService", { @@ -33,6 +69,9 @@ export class DigestService extends ServiceMap.Service()( const getSubscription = Effect.fn("DigestService.getSubscription")( function* (orgId: OrgId, userId: UserId) { + yield* Effect.annotateCurrentSpan("orgId", orgId) + yield* Effect.annotateCurrentSpan("userId", userId) + const rows = yield* database .execute((db) => db @@ -50,11 +89,9 @@ export class DigestService extends ServiceMap.Service()( const row = rows[0] if (!row) { - return yield* Effect.fail( - new DigestPersistenceError({ - message: "No digest subscription found", - }), - ) + return yield* new DigestNotFoundError({ + message: "No digest subscription found", + }) } return rowToResponse(row) @@ -73,6 +110,9 @@ export class DigestService extends ServiceMap.Service()( timezone?: string }, ) { + yield* Effect.annotateCurrentSpan("orgId", orgId) + yield* Effect.annotateCurrentSpan("userId", userId) + const now = Date.now() const id = crypto.randomUUID() @@ -114,6 +154,9 @@ export class DigestService extends ServiceMap.Service()( const deleteSubscription = Effect.fn( "DigestService.deleteSubscription", )(function* (orgId: OrgId, userId: UserId) { + yield* Effect.annotateCurrentSpan("orgId", orgId) + yield* Effect.annotateCurrentSpan("userId", userId) + yield* database .execute((db) => db @@ -131,6 +174,8 @@ export class DigestService extends ServiceMap.Service()( const generateDigestData = Effect.fn( "DigestService.generateDigestData", )(function* (orgId: OrgId) { + yield* Effect.annotateCurrentSpan("orgId", orgId) + const now = new Date() const currentEnd = now.toISOString() const currentStart = new Date( @@ -142,8 +187,8 @@ export class DigestService extends ServiceMap.Service()( const systemTenant = { orgId, - userId: "system-digest" as UserId, - roles: ["root"] as any, + userId: SYSTEM_DIGEST_USER, + roles: [ROOT_ROLE] as ReadonlyArray, authMode: "self_hosted" as const, } @@ -189,16 +234,18 @@ export class DigestService extends ServiceMap.Service()( { concurrency: 6 }, ).pipe( Effect.mapError( - () => + (error) => new DigestPersistenceError({ - message: "Failed to fetch digest data from Tinybird", + message: `Failed to fetch digest data from Tinybird: ${error instanceof Error ? error.message : String(error)}`, }), ), ) + void currentErrors + // Aggregate service health - const curOverviewData = currentOverview.data as Array> - const prevOverviewData = previousOverview.data as Array> + const curOverviewData = currentOverview.data as Array + const prevOverviewData = previousOverview.data as Array const totalRequests = curOverviewData.reduce( (sum, s) => sum + (Number(s.total_count) || 0), @@ -241,9 +288,9 @@ export class DigestService extends ServiceMap.Service()( : 0 // Data volume - const curUsageData = currentUsage.data as Array> - const prevUsageData = previousUsage.data as Array> - const sumUsage = (data: Array>) => ({ + const curUsageData = currentUsage.data as Array + const prevUsageData = previousUsage.data as Array + const sumUsage = (data: Array) => ({ logs: data.reduce((s, r) => s + (Number(r.log_count) || 0), 0), traces: data.reduce((s, r) => s + (Number(r.trace_count) || 0), 0), metrics: data.reduce( @@ -294,7 +341,7 @@ export class DigestService extends ServiceMap.Service()( p95Ms: Number(s.p95_duration_ms) || 0, })) - const errorsData = (topErrors.data as Array>) + const errorsData = (topErrors.data as Array) .slice(0, 5) .map((e) => ({ message: String(e.error_fingerprint || e.status_message || "Unknown"), @@ -334,6 +381,18 @@ export class DigestService extends ServiceMap.Service()( unsubscribeUrl: `${env.MAPLE_APP_BASE_URL}/settings`, } + yield* Effect.annotateCurrentSpan("totalRequests", totalRequests) + yield* Effect.annotateCurrentSpan("totalErrors", totalErrors) + yield* Effect.annotateCurrentSpan("serviceCount", services.length) + yield* Effect.logInfo("Digest data generated").pipe( + Effect.annotateLogs({ + orgId, + totalRequests, + totalErrors, + serviceCount: services.length, + }), + ) + return props }) @@ -342,7 +401,7 @@ export class DigestService extends ServiceMap.Service()( return yield* Effect.tryPromise({ try: () => render(WeeklyDigest(props)), catch: (error) => - new DigestPersistenceError({ + new DigestRenderError({ message: error instanceof Error ? error.message @@ -355,9 +414,17 @@ export class DigestService extends ServiceMap.Service()( const preview = Effect.fn("DigestService.preview")(function* ( orgId: OrgId, ) { + yield* Effect.annotateCurrentSpan("orgId", orgId) + + if (!email.isConfigured) { + return yield* new DigestNotConfiguredError({ + message: "Email delivery is not configured", + }) + } + const props = yield* generateDigestData(orgId) const html = yield* renderDigestHtml(props) - return { html } as DigestPreviewResponse + return new DigestPreviewResponse({ html }) }) const runDigestTick = Effect.fn("DigestService.runDigestTick")( @@ -391,56 +458,66 @@ export class DigestService extends ServiceMap.Service()( } // Group by org to avoid duplicate Tinybird queries - const byOrg = new Map() - for (const sub of dueSubs) { - const existing = byOrg.get(sub.orgId) ?? [] - existing.push(sub) - byOrg.set(sub.orgId, existing) - } - - let sentCount = 0 - let errorCount = 0 - - for (const [orgId, orgSubs] of byOrg) { - const sendForOrg = Effect.gen(function* () { - const props = yield* generateDigestData(orgId as OrgId) - const html = yield* renderDigestHtml(props) - - for (const sub of orgSubs) { - yield* email - .send( - sub.email, - `Maple Weekly Digest — ${props.dateRange.start} to ${props.dateRange.end}`, - html, - ) - .pipe( - Effect.tap(() => - database.execute((db) => - db - .update(digestSubscriptions) - .set({ lastSentAt: Date.now() }) - .where(eq(digestSubscriptions.id, sub.id)), + const byOrg = Arr.groupBy(dueSubs, (s) => s.orgId) + + const results = yield* Effect.forEach( + Object.entries(byOrg), + ([rawOrgId, orgSubs]) => + Effect.gen(function* () { + const orgId = OrgId.makeUnsafe(rawOrgId) + const props = yield* generateDigestData(orgId) + const html = yield* renderDigestHtml(props) + + const sendResults = yield* Effect.forEach( + orgSubs, + (sub) => + email + .send( + sub.email, + `Maple Weekly Digest — ${props.dateRange.start} to ${props.dateRange.end}`, + html, + ) + .pipe( + Effect.tap(() => + database.execute((db) => + db + .update(digestSubscriptions) + .set({ lastSentAt: Date.now() }) + .where(eq(digestSubscriptions.id, sub.id)), + ), + ), + Effect.match({ + onSuccess: () => ({ sent: true }), + onFailure: () => ({ sent: false }), + }), ), - ), - Effect.match({ - onSuccess: () => { sentCount++ }, - onFailure: () => { errorCount++ }, + { concurrency: 1 }, + ) + + return sendResults + }).pipe( + Effect.catchCause((cause) => + Effect.logError("Digest failed for org").pipe( + Effect.annotateLogs({ + orgId: rawOrgId, + error: Cause.pretty(cause), }), - ) - } - }).pipe( - Effect.catchCause((cause) => - Effect.logError("Digest failed for org").pipe( - Effect.annotateLogs({ orgId, error: Cause.pretty(cause) }), - Effect.tap(() => - Effect.sync(() => { errorCount += orgSubs.length }), + Effect.map(() => + orgSubs.map(() => ({ sent: false })), + ), ), ), ), - ) + { concurrency: 1 }, + ) - yield* sendForOrg - } + const allResults = results.flat() + const sentCount = allResults.filter((r) => r.sent).length + const errorCount = allResults.filter((r) => !r.sent).length + + yield* Effect.annotateCurrentSpan("sentCount", sentCount) + yield* Effect.annotateCurrentSpan("errorCount", errorCount) + yield* Effect.annotateCurrentSpan("orgCount", Object.keys(byOrg).length) return { sentCount, errorCount, skipped: false } }, @@ -456,16 +533,14 @@ export class DigestService extends ServiceMap.Service()( }), }, ) { - static readonly layer = Layer.effect(this, this.make) - static readonly Live = this.layer - static readonly Default = this.layer + static readonly Default = Layer.effect(this, this.make) } function rowToResponse( row: typeof digestSubscriptions.$inferSelect, ): DigestSubscriptionResponse { - return { - id: row.id, + return new DigestSubscriptionResponse({ + id: DigestSubscriptionId.makeUnsafe(row.id), email: row.email, enabled: row.enabled === 1, dayOfWeek: row.dayOfWeek, @@ -473,5 +548,5 @@ function rowToResponse( lastSentAt: row.lastSentAt ?? null, createdAt: row.createdAt, updatedAt: row.updatedAt, - } as DigestSubscriptionResponse + }) } diff --git a/apps/api/src/services/EmailService.ts b/apps/api/src/services/EmailService.ts index 368bb5a6..340373a8 100644 --- a/apps/api/src/services/EmailService.ts +++ b/apps/api/src/services/EmailService.ts @@ -34,6 +34,10 @@ export class EmailService extends ServiceMap.Service response.text(), @@ -83,19 +89,24 @@ export class EmailService extends ServiceMap.Service()("Env", { return normalizedEnv; }), }) { - static readonly layer = Layer.effect(this, this.make) - static readonly Live = this.layer - static readonly Default = this.layer + static readonly Default = Layer.effect(this, this.make) } diff --git a/apps/web/src/components/settings/notifications-section.tsx b/apps/web/src/components/settings/notifications-section.tsx new file mode 100644 index 00000000..0d9d1dd4 --- /dev/null +++ b/apps/web/src/components/settings/notifications-section.tsx @@ -0,0 +1,96 @@ +import { useEffect, useState } from "react" +import { Exit } from "effect" +import { toast } from "sonner" +import { Result, useAtomRefresh, useAtomSet, useAtomValue } from "@/lib/effect-atom" +import { MapleApiAtomClient } from "@/lib/services/common/atom-client" +import { useUser } from "@clerk/tanstack-react-start" + +import { Switch } from "@maple/ui/components/ui/switch" +import { Skeleton } from "@maple/ui/components/ui/skeleton" +import { EnvelopeIcon } from "@/components/icons" +import { cn } from "@maple/ui/utils" + +export function NotificationsSection() { + const { user } = useUser() + const email = user?.primaryEmailAddress?.emailAddress ?? "" + + const subscriptionQueryAtom = MapleApiAtomClient.query("digest", "getSubscription", {}) + const subscriptionResult = useAtomValue(subscriptionQueryAtom) + const refreshSubscription = useAtomRefresh(subscriptionQueryAtom) + + const upsertMutation = useAtomSet( + MapleApiAtomClient.mutation("digest", "upsertSubscription"), + { mode: "promiseExit" }, + ) + + const [enabled, setEnabled] = useState(true) + const [initialized, setInitialized] = useState(false) + const [isSaving, setIsSaving] = useState(false) + + useEffect(() => { + if (initialized) return + if (Result.isSuccess(subscriptionResult)) { + setEnabled(subscriptionResult.value.enabled) + setInitialized(true) + } else if (!Result.isInitial(subscriptionResult)) { + // No subscription yet — default to enabled (opt-out model) + setEnabled(true) + setInitialized(true) + } + }, [subscriptionResult, initialized]) + + async function handleToggle(checked: boolean) { + if (!email) return + + setEnabled(checked) + setIsSaving(true) + + const result = await upsertMutation({ + email, + enabled: checked, + }) + + if (Exit.isSuccess(result)) { + refreshSubscription() + toast.success(checked ? "Weekly digest enabled" : "Weekly digest disabled") + } else { + toast.error("Failed to update notification preferences") + setEnabled(!checked) + } + setIsSaving(false) + } + + if (!initialized) { + return ( +
+ +
+ ) + } + + return ( +
+
+
+
+ +
+
+

Email

+

Weekly digest via email

+
+
+ +
+
+ ) +} diff --git a/apps/web/src/routes/settings.tsx b/apps/web/src/routes/settings.tsx index c98cbdd5..9af230d7 100644 --- a/apps/web/src/routes/settings.tsx +++ b/apps/web/src/routes/settings.tsx @@ -14,6 +14,7 @@ import { IngestionSection } from "@/components/settings/ingestion-section" import { ApiKeysSection } from "@/components/settings/api-keys-section" import { McpSection } from "@/components/settings/mcp-section" import { ConnectorsSection } from "@/components/settings/connectors-section" +import { NotificationsSection } from "@/components/settings/notifications-section" import { OrgTinybirdSettingsSection } from "@/components/settings/org-tinybird-settings-section" import { hasBringYourOwnCloudAddOn } from "@/lib/billing/plan-gating" import { MapleApiAtomClient } from "@/lib/services/common/atom-client" @@ -21,6 +22,7 @@ import { UserIcon, ServerIcon, KeyIcon, + BellIcon, CreditCardIcon, DatabaseIcon, CodeIcon, @@ -28,7 +30,7 @@ import { } from "@/components/icons" import { cn } from "@maple/ui/utils" -const tabValues = ["members", "ingestion", "api-keys", "mcp", "connectors", "billing", "data-platform"] as const +const tabValues = ["members", "ingestion", "api-keys", "mcp", "connectors", "notifications", "billing", "data-platform"] as const type SettingsTab = (typeof tabValues)[number] const SettingsSearch = Schema.Struct({ @@ -53,6 +55,7 @@ const allNavItems: NavItem[] = [ { id: "api-keys", label: "API Keys", icon: KeyIcon }, { id: "mcp", label: "MCP", icon: CodeIcon }, { id: "connectors", label: "Connectors", icon: DatabaseIcon }, + { id: "notifications", label: "Notifications", icon: BellIcon }, { id: "billing", label: "Billing", icon: CreditCardIcon }, { id: "data-platform", label: "Data Platform", icon: DatabaseIcon }, ] @@ -96,6 +99,7 @@ const tabLabels: Record = { "api-keys": "API Keys", mcp: "MCP", connectors: "Connectors", + notifications: "Notifications", billing: "Billing", "data-platform": "Data Platform", } @@ -179,6 +183,7 @@ export function SettingsPage() { {activeTab === "api-keys" && } {activeTab === "mcp" && } {activeTab === "connectors" && } + {activeTab === "notifications" && } {activeTab === "billing" && } {activeTab === "data-platform" && ( diff --git a/packages/domain/src/http/digest.ts b/packages/domain/src/http/digest.ts index ca008a9a..8fcb895a 100644 --- a/packages/domain/src/http/digest.ts +++ b/packages/domain/src/http/digest.ts @@ -65,18 +65,26 @@ export class DigestNotConfiguredError extends Schema.TaggedErrorClass()( + "@maple/http/errors/DigestRenderError", + { + message: Schema.String, + }, + { httpApiStatus: 500 }, +) {} + export class DigestApiGroup extends HttpApiGroup.make("digest") .add( HttpApiEndpoint.get("getSubscription", "/", { success: DigestSubscriptionResponse, - error: DigestPersistenceError, + error: [DigestPersistenceError, DigestNotFoundError], }), ) .add( HttpApiEndpoint.post("upsertSubscription", "/", { payload: UpsertDigestSubscriptionRequest, success: DigestSubscriptionResponse, - error: DigestPersistenceError, + error: [DigestPersistenceError, DigestNotFoundError], }), ) .add( @@ -88,7 +96,7 @@ export class DigestApiGroup extends HttpApiGroup.make("digest") .add( HttpApiEndpoint.post("preview", "/preview", { success: DigestPreviewResponse, - error: [DigestPersistenceError, DigestNotConfiguredError], + error: [DigestPersistenceError, DigestNotConfiguredError, DigestRenderError], }), ) .prefix("/api/digest") From 79d30f08a709b7e5c8b9a01d93f2fa85fcab4b93 Mon Sep 17 00:00:00 2001 From: Makisuo Date: Tue, 7 Apr 2026 00:23:42 +0200 Subject: [PATCH 4/7] fix --- apps/api/src/services/DigestService.ts | 100 +++++++++--------- .../settings/notifications-section.tsx | 13 ++- apps/web/src/routes/settings.tsx | 2 +- 3 files changed, 58 insertions(+), 57 deletions(-) diff --git a/apps/api/src/services/DigestService.ts b/apps/api/src/services/DigestService.ts index 4e5ced57..480e273a 100644 --- a/apps/api/src/services/DigestService.ts +++ b/apps/api/src/services/DigestService.ts @@ -29,33 +29,35 @@ const toPersistenceError = (error: unknown) => message: error instanceof Error ? `${error.message}` : `Digest persistence error: ${String(error)}`, }) -/** Row shapes for Tinybird query responses */ +/** Row shapes matching query engine output (camelCase from CH DSL) */ interface ServiceOverviewRow { - service_name: string - total_count: number - error_count: number - p95_duration_ms: number + serviceName: string + throughput: number + errorCount: number + p95LatencyMs: number } interface ServiceUsageRow { - log_count: number - trace_count: number - sum_metric_count: number - gauge_metric_count: number - histogram_metric_count: number - exp_histogram_metric_count: number - log_size_bytes: number - trace_size_bytes: number - sum_metric_size_bytes: number - gauge_metric_size_bytes: number - histogram_metric_size_bytes: number - exp_histogram_metric_size_bytes: number + serviceName: string + totalLogCount: number + totalLogSizeBytes: number + totalTraceCount: number + totalTraceSizeBytes: number + totalSumMetricCount: number + totalSumMetricSizeBytes: number + totalGaugeMetricCount: number + totalGaugeMetricSizeBytes: number + totalHistogramMetricCount: number + totalHistogramMetricSizeBytes: number + totalExpHistogramMetricCount: number + totalExpHistogramMetricSizeBytes: number + totalSizeBytes: number } interface ErrorsByTypeRow { - error_fingerprint?: string - status_message?: string - error_count: number + errorType: string + sampleMessage: string + count: number } export class DigestService extends ServiceMap.Service()( @@ -248,20 +250,20 @@ export class DigestService extends ServiceMap.Service()( const prevOverviewData = previousOverview.data as Array const totalRequests = curOverviewData.reduce( - (sum, s) => sum + (Number(s.total_count) || 0), + (sum, s) => sum + (Number(s.throughput) || 0), 0, ) const prevTotalRequests = prevOverviewData.reduce( - (sum, s) => sum + (Number(s.total_count) || 0), + (sum, s) => sum + (Number(s.throughput) || 0), 0, ) const totalErrors = curOverviewData.reduce( - (sum, s) => sum + (Number(s.error_count) || 0), + (sum, s) => sum + (Number(s.errorCount) || 0), 0, ) const prevTotalErrors = prevOverviewData.reduce( - (sum, s) => sum + (Number(s.error_count) || 0), + (sum, s) => sum + (Number(s.errorCount) || 0), 0, ) @@ -271,8 +273,8 @@ export class DigestService extends ServiceMap.Service()( ? curOverviewData.reduce( (sum, s) => sum + - (Number(s.p95_duration_ms) || 0) * - (Number(s.total_count) || 0), + (Number(s.p95LatencyMs) || 0) * + (Number(s.throughput) || 0), 0, ) / totalRequests : 0 @@ -281,8 +283,8 @@ export class DigestService extends ServiceMap.Service()( ? prevOverviewData.reduce( (sum, s) => sum + - (Number(s.p95_duration_ms) || 0) * - (Number(s.total_count) || 0), + (Number(s.p95LatencyMs) || 0) * + (Number(s.throughput) || 0), 0, ) / prevTotalRequests : 0 @@ -291,26 +293,26 @@ export class DigestService extends ServiceMap.Service()( const curUsageData = currentUsage.data as Array const prevUsageData = previousUsage.data as Array const sumUsage = (data: Array) => ({ - logs: data.reduce((s, r) => s + (Number(r.log_count) || 0), 0), - traces: data.reduce((s, r) => s + (Number(r.trace_count) || 0), 0), + logs: data.reduce((s, r) => s + (Number(r.totalLogCount) || 0), 0), + traces: data.reduce((s, r) => s + (Number(r.totalTraceCount) || 0), 0), metrics: data.reduce( (s, r) => s + - (Number(r.sum_metric_count) || 0) + - (Number(r.gauge_metric_count) || 0) + - (Number(r.histogram_metric_count) || 0) + - (Number(r.exp_histogram_metric_count) || 0), + (Number(r.totalSumMetricCount) || 0) + + (Number(r.totalGaugeMetricCount) || 0) + + (Number(r.totalHistogramMetricCount) || 0) + + (Number(r.totalExpHistogramMetricCount) || 0), 0, ), totalBytes: data.reduce( (s, r) => s + - (Number(r.log_size_bytes) || 0) + - (Number(r.trace_size_bytes) || 0) + - (Number(r.sum_metric_size_bytes) || 0) + - (Number(r.gauge_metric_size_bytes) || 0) + - (Number(r.histogram_metric_size_bytes) || 0) + - (Number(r.exp_histogram_metric_size_bytes) || 0), + (Number(r.totalLogSizeBytes) || 0) + + (Number(r.totalTraceSizeBytes) || 0) + + (Number(r.totalSumMetricSizeBytes) || 0) + + (Number(r.totalGaugeMetricSizeBytes) || 0) + + (Number(r.totalHistogramMetricSizeBytes) || 0) + + (Number(r.totalExpHistogramMetricSizeBytes) || 0), 0, ), }) @@ -326,26 +328,26 @@ export class DigestService extends ServiceMap.Service()( const services = curOverviewData .sort( (a, b) => - (Number(b.total_count) || 0) - (Number(a.total_count) || 0), + (Number(b.throughput) || 0) - (Number(a.throughput) || 0), ) .slice(0, 10) .map((s) => ({ - name: String(s.service_name), - requests: Number(s.total_count) || 0, + name: String(s.serviceName), + requests: Number(s.throughput) || 0, errorRate: - (Number(s.total_count) || 0) > 0 - ? ((Number(s.error_count) || 0) / - (Number(s.total_count) || 0)) * + (Number(s.throughput) || 0) > 0 + ? ((Number(s.errorCount) || 0) / + (Number(s.throughput) || 0)) * 100 : 0, - p95Ms: Number(s.p95_duration_ms) || 0, + p95Ms: Number(s.p95LatencyMs) || 0, })) const errorsData = (topErrors.data as Array) .slice(0, 5) .map((e) => ({ - message: String(e.error_fingerprint || e.status_message || "Unknown"), - count: Number(e.error_count) || 0, + message: String(e.errorType || e.sampleMessage || "Unknown"), + count: Number(e.count) || 0, })) const startDate = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000) diff --git a/apps/web/src/components/settings/notifications-section.tsx b/apps/web/src/components/settings/notifications-section.tsx index 0d9d1dd4..6d3c460e 100644 --- a/apps/web/src/components/settings/notifications-section.tsx +++ b/apps/web/src/components/settings/notifications-section.tsx @@ -3,7 +3,8 @@ import { Exit } from "effect" import { toast } from "sonner" import { Result, useAtomRefresh, useAtomSet, useAtomValue } from "@/lib/effect-atom" import { MapleApiAtomClient } from "@/lib/services/common/atom-client" -import { useUser } from "@clerk/tanstack-react-start" +import { UpsertDigestSubscriptionRequest } from "@maple/domain/http" +import { useUser } from "@clerk/clerk-react" import { Switch } from "@maple/ui/components/ui/switch" import { Skeleton } from "@maple/ui/components/ui/skeleton" @@ -12,7 +13,7 @@ import { cn } from "@maple/ui/utils" export function NotificationsSection() { const { user } = useUser() - const email = user?.primaryEmailAddress?.emailAddress ?? "" + const email = user?.primaryEmailAddress?.emailAddress const subscriptionQueryAtom = MapleApiAtomClient.query("digest", "getSubscription", {}) const subscriptionResult = useAtomValue(subscriptionQueryAtom) @@ -33,7 +34,6 @@ export function NotificationsSection() { setEnabled(subscriptionResult.value.enabled) setInitialized(true) } else if (!Result.isInitial(subscriptionResult)) { - // No subscription yet — default to enabled (opt-out model) setEnabled(true) setInitialized(true) } @@ -46,8 +46,7 @@ export function NotificationsSection() { setIsSaving(true) const result = await upsertMutation({ - email, - enabled: checked, + payload: new UpsertDigestSubscriptionRequest({ email, enabled: checked }), }) if (Exit.isSuccess(result)) { @@ -60,7 +59,7 @@ export function NotificationsSection() { setIsSaving(false) } - if (!initialized) { + if (!initialized || !user) { return (
@@ -88,7 +87,7 @@ export function NotificationsSection() {
diff --git a/apps/web/src/routes/settings.tsx b/apps/web/src/routes/settings.tsx index 9af230d7..2f1d3783 100644 --- a/apps/web/src/routes/settings.tsx +++ b/apps/web/src/routes/settings.tsx @@ -119,7 +119,7 @@ export function SettingsPage() { // Build visible nav items based on permissions const visibleItems = allNavItems.filter((item) => { - if (item.id === "members" || item.id === "billing") return isClerkAuthEnabled + if (item.id === "members" || item.id === "billing" || item.id === "notifications") return isClerkAuthEnabled if (item.id === "data-platform") return canAccessDataPlatform return true }) From 64aab9ce702e2da558f0355da9ef9fb62c3fbe06 Mon Sep 17 00:00:00 2001 From: Makisuo Date: Tue, 7 Apr 2026 00:28:28 +0200 Subject: [PATCH 5/7] dtuff --- apps/api/src/services/DigestService.ts | 12 ++++--- apps/api/src/services/TinybirdService.ts | 2 +- .../settings/notifications-section.tsx | 32 +++++++++++++++++++ 3 files changed, 40 insertions(+), 6 deletions(-) diff --git a/apps/api/src/services/DigestService.ts b/apps/api/src/services/DigestService.ts index 480e273a..ddb4741b 100644 --- a/apps/api/src/services/DigestService.ts +++ b/apps/api/src/services/DigestService.ts @@ -179,13 +179,15 @@ export class DigestService extends ServiceMap.Service()( yield* Effect.annotateCurrentSpan("orgId", orgId) const now = new Date() - const currentEnd = now.toISOString() - const currentStart = new Date( + const toClickHouseDateTime = (d: Date) => + d.toISOString().replace("T", " ").replace(/\.\d{3}Z$/, "") + const currentEnd = toClickHouseDateTime(now) + const currentStart = toClickHouseDateTime(new Date( now.getTime() - 7 * 24 * 60 * 60 * 1000, - ).toISOString() - const previousStart = new Date( + )) + const previousStart = toClickHouseDateTime(new Date( now.getTime() - 14 * 24 * 60 * 60 * 1000, - ).toISOString() + )) const systemTenant = { orgId, diff --git a/apps/api/src/services/TinybirdService.ts b/apps/api/src/services/TinybirdService.ts index df6acab3..d85fe79b 100644 --- a/apps/api/src/services/TinybirdService.ts +++ b/apps/api/src/services/TinybirdService.ts @@ -104,7 +104,7 @@ export class TinybirdService extends ServiceMap.Service toTinybirdQueryError(pipe, error), }).pipe( Effect.tapError((error) => - Effect.logError("TinybirdService.executeSql failed", { pipe, error: String(error) }), + Effect.logError("TinybirdService.executeSql failed", { pipe, error: String(error), message: error.message, sql: truncateSql(sql) }), ), ) diff --git a/apps/web/src/components/settings/notifications-section.tsx b/apps/web/src/components/settings/notifications-section.tsx index 6d3c460e..4506934c 100644 --- a/apps/web/src/components/settings/notifications-section.tsx +++ b/apps/web/src/components/settings/notifications-section.tsx @@ -6,6 +6,7 @@ import { MapleApiAtomClient } from "@/lib/services/common/atom-client" import { UpsertDigestSubscriptionRequest } from "@maple/domain/http" import { useUser } from "@clerk/clerk-react" +import { Button } from "@maple/ui/components/ui/button" import { Switch } from "@maple/ui/components/ui/switch" import { Skeleton } from "@maple/ui/components/ui/skeleton" import { EnvelopeIcon } from "@/components/icons" @@ -27,6 +28,12 @@ export function NotificationsSection() { const [enabled, setEnabled] = useState(true) const [initialized, setInitialized] = useState(false) const [isSaving, setIsSaving] = useState(false) + const [isPreviewing, setIsPreviewing] = useState(false) + + const previewMutation = useAtomSet( + MapleApiAtomClient.mutation("digest", "preview"), + { mode: "promiseExit" }, + ) useEffect(() => { if (initialized) return @@ -59,6 +66,21 @@ export function NotificationsSection() { setIsSaving(false) } + async function handlePreview() { + setIsPreviewing(true) + const result = await previewMutation({}) + if (Exit.isSuccess(result)) { + const win = window.open("", "_blank") + if (win) { + win.document.write(result.value.html) + win.document.close() + } + } else { + toast.error("Failed to generate digest preview") + } + setIsPreviewing(false) + } + if (!initialized || !user) { return (
@@ -90,6 +112,16 @@ export function NotificationsSection() { disabled={isSaving || !email} />
+ {enabled && ( + + )} ) } From 1a1cae6bf4c0025acf33f97e53243514aba6f417 Mon Sep 17 00:00:00 2001 From: Makisuo Date: Tue, 7 Apr 2026 00:44:11 +0200 Subject: [PATCH 6/7] stuff --- apps/api/src/services/DigestService.ts | 69 +++++++++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/apps/api/src/services/DigestService.ts b/apps/api/src/services/DigestService.ts index ddb4741b..d9006bd1 100644 --- a/apps/api/src/services/DigestService.ts +++ b/apps/api/src/services/DigestService.ts @@ -12,9 +12,10 @@ import { RoleName, } from "@maple/domain/http" import type { OrgId as OrgIdType, RoleName as RoleNameType } from "@maple/domain/http" +import { createClerkClient } from "@clerk/backend" import { render } from "@react-email/components" import { and, eq } from "drizzle-orm" -import { Array as Arr, Cause, Effect, Layer, ServiceMap } from "effect" +import { Array as Arr, Cause, Effect, Layer, Option, Redacted, ServiceMap } from "effect" import { WeeklyDigest, type WeeklyDigestProps } from "@maple/email/weekly-digest" import { Database } from "./DatabaseLive" import { EmailService } from "./EmailService" @@ -431,12 +432,78 @@ export class DigestService extends ServiceMap.Service()( return new DigestPreviewResponse({ html }) }) + const ensureSubscriptions = Effect.fn("DigestService.ensureSubscriptions")( + function* () { + if (env.MAPLE_AUTH_MODE.toLowerCase() !== "clerk") return + if (Option.isNone(env.CLERK_SECRET_KEY)) return + + const clerk = createClerkClient({ + secretKey: Redacted.value(env.CLERK_SECRET_KEY.value), + }) + + const orgs = Option.isSome(env.MAPLE_ORG_ID_OVERRIDE) + ? [{ id: env.MAPLE_ORG_ID_OVERRIDE.value }] + : (yield* Effect.tryPromise({ + try: () => clerk.organizations.getOrganizationList({ limit: 100 }), + catch: () => new DigestPersistenceError({ message: "Failed to list Clerk organizations" }), + })).data + + for (const org of orgs) { + const members = yield* Effect.tryPromise({ + try: () => clerk.organizations.getOrganizationMembershipList({ + organizationId: org.id, + limit: 100, + }), + catch: () => new DigestPersistenceError({ message: `Failed to list Clerk members for org ${org.id}` }), + }) + + const now = Date.now() + for (const member of members.data) { + const memberEmail = member.publicUserData?.identifier + const memberUserId = member.publicUserData?.userId + if (!memberEmail || !memberUserId) continue + + yield* database + .execute((db) => + db + .insert(digestSubscriptions) + .values({ + id: crypto.randomUUID(), + orgId: org.id, + userId: memberUserId, + email: memberEmail, + enabled: 1, + dayOfWeek: 1, + timezone: "UTC", + createdAt: now, + updatedAt: now, + }) + .onConflictDoNothing({ + target: [digestSubscriptions.orgId, digestSubscriptions.userId], + }), + ) + .pipe(Effect.mapError(toPersistenceError)) + } + } + + yield* Effect.logInfo("Digest subscriptions seeded from Clerk") + }, + ) + const runDigestTick = Effect.fn("DigestService.runDigestTick")( function* () { if (!email.isConfigured) { return { sentCount: 0, errorCount: 0, skipped: true } } + yield* ensureSubscriptions().pipe( + Effect.catchCause((cause) => + Effect.logWarning("Failed to seed digest subscriptions").pipe( + Effect.annotateLogs({ error: Cause.pretty(cause) }), + ), + ), + ) + const now = Date.now() const sevenDaysAgo = now - 7 * 24 * 60 * 60 * 1000 const currentDayOfWeek = new Date().getUTCDay() From e29d58772adfccf735f6309ce41fc232d3894f1a Mon Sep 17 00:00:00 2001 From: Makisuo Date: Tue, 7 Apr 2026 01:11:07 +0200 Subject: [PATCH 7/7] s --- apps/api/src/services/QueryEngineService.ts | 84 +++++++++++ apps/web/src/api/tinybird/custom-charts.ts | 133 +++++++++++------- .../api/tinybird/timeseries-adapters.test.ts | 98 +++++++------ apps/web/src/api/tinybird/traces.test.ts | 114 ++++++++------- .../widgets/list-widget.test.ts | 108 ++++++++------ packages/domain/src/query-engine.ts | 1 + 6 files changed, 346 insertions(+), 192 deletions(-) diff --git a/apps/api/src/services/QueryEngineService.ts b/apps/api/src/services/QueryEngineService.ts index 74783832..d7d5d8c8 100644 --- a/apps/api/src/services/QueryEngineService.ts +++ b/apps/api/src/services/QueryEngineService.ts @@ -306,6 +306,64 @@ function groupTimeSeriesRows( + rows: ReadonlyArray, + fillOptions?: BucketFillOptions, +): Array { + const emptyMetrics: Record = { + count: 0, + avg_duration: 0, + p50_duration: 0, + p95_duration: 0, + p99_duration: 0, + error_rate: 0, + apdex: 0, + } + const bucketMap = new Map>() + const bucketOrder: string[] = fillOptions + ? buildBucketTimeline(fillOptions.startMs, fillOptions.endMs, fillOptions.bucketSeconds) + : [] + + for (const row of rows) { + const bucket = normalizeBucket(row.bucket) + bucketMap.set(bucket, { + count: Number(row.count), + avg_duration: Number(row.avgDuration), + p50_duration: Number(row.p50Duration), + p95_duration: Number(row.p95Duration), + p99_duration: Number(row.p99Duration), + error_rate: Number(row.errorRate), + apdex: Number(row.apdexScore), + }) + if (!fillOptions) { + bucketOrder.push(bucket) + } + } + + if (fillOptions) { + for (const bucket of bucketOrder) { + if (!bucketMap.has(bucket)) { + bucketMap.set(bucket, { ...emptyMetrics }) + } + } + } + + return bucketOrder.map((bucket) => ({ + bucket, + series: bucketMap.get(bucket)!, + })) +} + function collapseMetricTimeseriesRows( rows: ReadonlyArray, metric: Extract["metric"], @@ -685,6 +743,32 @@ export const makeQueryEngineExecute = (tinybird: QueryEngineTinybird) => if (request.query.source === "traces" && request.query.kind === "timeseries") { const opts = extractTracesOpts(request.query.filters as Record) + + if (request.query.allMetrics) { + const rows = yield* executeCHQuery( + tinybird, + tenant, + CH.tracesTimeseriesQuery({ + ...opts, + metric: request.query.metric, + allMetrics: true, + needsSampling: false, + groupBy: request.query.groupBy as string[] | undefined, + apdexThresholdMs: request.query.metric === "apdex" ? request.query.apdexThresholdMs : undefined, + }), + { orgId: tenant.orgId, startTime: request.startTime, endTime: request.endTime, bucketSeconds: bucketSeconds! }, + "Failed to execute traces all-metrics timeseries query", + ) + + return new QueryEngineExecuteResponse({ + result: { + kind: "timeseries", + source: "traces", + data: groupAllMetricsTimeSeriesRows(rows, fillOptions), + }, + }) + } + const rows = yield* executeCHQuery( tinybird, tenant, diff --git a/apps/web/src/api/tinybird/custom-charts.ts b/apps/web/src/api/tinybird/custom-charts.ts index 4b88cdd4..6e8811a3 100644 --- a/apps/web/src/api/tinybird/custom-charts.ts +++ b/apps/web/src/api/tinybird/custom-charts.ts @@ -542,18 +542,57 @@ function makeTracesTimeseriesRequest( }) } -function extractSingleSeries( +function makeAllMetricsTimeseriesRequest( + opts: { + startTime?: string + endTime?: string + bucketSeconds: number + serviceName?: string + rootSpansOnly?: boolean + environments?: string[] + commitShas?: string[] + }, +) { + return new QueryEngineExecuteRequest({ + startTime: opts.startTime ?? "2020-01-01 00:00:00", + endTime: opts.endTime ?? "2099-12-31 23:59:59", + query: { + kind: "timeseries" as const, + source: "traces" as const, + metric: "count" as const, + allMetrics: true, + filters: { + serviceName: opts.serviceName, + rootSpansOnly: opts.rootSpansOnly ?? true, + environments: opts.environments, + commitShas: opts.commitShas, + }, + bucketSeconds: opts.bucketSeconds, + }, + }) +} + +interface AllMetricsPoint { + count: number + errorRate: number + p50: number + p95: number + p99: number +} + +function extractAllMetricsSeries( response: { result: { kind: string; data: ReadonlyArray<{ bucket: string; series: Record }> } }, -): Map { - const map = new Map() +): Map { + const map = new Map() if (response.result.kind !== "timeseries") return map - for (const point of (response.result as any).data) { - // Sum across all series keys (there's typically one or "all") - let total = 0 - for (const v of Object.values(point.series)) { - total += Number(v) - } - map.set(point.bucket, total) + for (const point of response.result.data) { + map.set(point.bucket, { + count: point.series.count ?? 0, + errorRate: point.series.error_rate ?? 0, + p50: point.series.p50_duration ?? 0, + p95: point.series.p95_duration ?? 0, + p99: point.series.p99_duration ?? 0, + }) } return map } @@ -579,36 +618,29 @@ const getCustomChartServiceDetailEffect = Effect.fn("QueryEngine.getCustomChartS rootSpansOnly: true, } - const [countRes, errorRateRes, p50Res, p95Res, p99Res, metricsResult] = yield* Effect.all([ - executeQueryEngine("queryEngine.serviceDetail.count", makeTracesTimeseriesRequest("count", reqOpts)), - executeQueryEngine("queryEngine.serviceDetail.errorRate", makeTracesTimeseriesRequest("error_rate", reqOpts)), - executeQueryEngine("queryEngine.serviceDetail.p50", makeTracesTimeseriesRequest("p50_duration", reqOpts)), - executeQueryEngine("queryEngine.serviceDetail.p95", makeTracesTimeseriesRequest("p95_duration", reqOpts)), - executeQueryEngine("queryEngine.serviceDetail.p99", makeTracesTimeseriesRequest("p99_duration", reqOpts)), + const [allMetricsRes, metricsResult] = yield* Effect.all([ + executeQueryEngine("queryEngine.serviceDetail.allMetrics", makeAllMetricsTimeseriesRequest(reqOpts)), querySpanMetricsCalls({ service: input.serviceName, start_time: input.startTime, end_time: input.endTime, bucket_seconds: bucketSeconds, }), - ], { concurrency: 6 }) + ], { concurrency: 2 }) - const countMap = extractSingleSeries(countRes as any) - const errorRateMap = extractSingleSeries(errorRateRes as any) - const p50Map = extractSingleSeries(p50Res as any) - const p95Map = extractSingleSeries(p95Res as any) - const p99Map = extractSingleSeries(p99Res as any) + const allMetrics = extractAllMetricsSeries(allMetricsRes as any) const metricsMap = new Map( metricsResult.data.map((r) => [toIsoBucket(String(r.bucket)), Number(r.sumValue)]), ) const allBuckets = new Set() - for (const k of countMap.keys()) allBuckets.add(k) + for (const k of allMetrics.keys()) allBuckets.add(k) for (const k of metricsMap.keys()) allBuckets.add(k) const points = [...allBuckets].sort().map((bucket): ServiceDetailTimeSeriesPoint => { - const rawCount = countMap.get(bucket) ?? 0 + const m = allMetrics.get(bucket) + const rawCount = m?.count ?? 0 const metricsThroughput = metricsMap.get(bucket) if (metricsThroughput != null && metricsThroughput > 0) { @@ -618,10 +650,10 @@ const getCustomChartServiceDetailEffect = Effect.fn("QueryEngine.getCustomChartS tracedThroughput: rawCount, hasSampling: true, samplingWeight: rawCount > 0 ? metricsThroughput / rawCount : 1, - errorRate: errorRateMap.get(bucket) ?? 0, - p50LatencyMs: p50Map.get(bucket) ?? 0, - p95LatencyMs: p95Map.get(bucket) ?? 0, - p99LatencyMs: p99Map.get(bucket) ?? 0, + errorRate: m?.errorRate ?? 0, + p50LatencyMs: m?.p50 ?? 0, + p95LatencyMs: m?.p95 ?? 0, + p99LatencyMs: m?.p99 ?? 0, } } @@ -631,10 +663,10 @@ const getCustomChartServiceDetailEffect = Effect.fn("QueryEngine.getCustomChartS tracedThroughput: rawCount, hasSampling: false, samplingWeight: 1, - errorRate: errorRateMap.get(bucket) ?? 0, - p50LatencyMs: p50Map.get(bucket) ?? 0, - p95LatencyMs: p95Map.get(bucket) ?? 0, - p99LatencyMs: p99Map.get(bucket) ?? 0, + errorRate: m?.errorRate ?? 0, + p50LatencyMs: m?.p50 ?? 0, + p95LatencyMs: m?.p95 ?? 0, + p99LatencyMs: m?.p99 ?? 0, } }) @@ -680,24 +712,16 @@ const getOverviewTimeSeriesEffect = Effect.fn("QueryEngine.getOverviewTimeSeries environments: input.environments, } - const [countRes, errorRateRes, p50Res, p95Res, p99Res, metricsResult] = yield* Effect.all([ - executeQueryEngine("queryEngine.overview.count", makeTracesTimeseriesRequest("count", reqOpts)), - executeQueryEngine("queryEngine.overview.errorRate", makeTracesTimeseriesRequest("error_rate", reqOpts)), - executeQueryEngine("queryEngine.overview.p50", makeTracesTimeseriesRequest("p50_duration", reqOpts)), - executeQueryEngine("queryEngine.overview.p95", makeTracesTimeseriesRequest("p95_duration", reqOpts)), - executeQueryEngine("queryEngine.overview.p99", makeTracesTimeseriesRequest("p99_duration", reqOpts)), + const [allMetricsRes, metricsResult] = yield* Effect.all([ + executeQueryEngine("queryEngine.overview.allMetrics", makeAllMetricsTimeseriesRequest(reqOpts)), querySpanMetricsCalls({ start_time: input.startTime, end_time: input.endTime, bucket_seconds: bucketSeconds, }), - ], { concurrency: 6 }) + ], { concurrency: 2 }) - const countMap = extractSingleSeries(countRes as any) - const errorRateMap = extractSingleSeries(errorRateRes as any) - const p50Map = extractSingleSeries(p50Res as any) - const p95Map = extractSingleSeries(p95Res as any) - const p99Map = extractSingleSeries(p99Res as any) + const allMetrics = extractAllMetricsSeries(allMetricsRes as any) // SpanMetrics: aggregate across all services per bucket const metricsMap = new Map() @@ -707,11 +731,12 @@ const getOverviewTimeSeriesEffect = Effect.fn("QueryEngine.getOverviewTimeSeries } const allBuckets = new Set() - for (const k of countMap.keys()) allBuckets.add(k) + for (const k of allMetrics.keys()) allBuckets.add(k) for (const k of metricsMap.keys()) allBuckets.add(k) const points = [...allBuckets].sort().map((bucket): ServiceDetailTimeSeriesPoint => { - const rawCount = countMap.get(bucket) ?? 0 + const m = allMetrics.get(bucket) + const rawCount = m?.count ?? 0 const metricsThroughput = metricsMap.get(bucket) if (metricsThroughput != null && metricsThroughput > 0) { @@ -721,10 +746,10 @@ const getOverviewTimeSeriesEffect = Effect.fn("QueryEngine.getOverviewTimeSeries tracedThroughput: rawCount, hasSampling: true, samplingWeight: rawCount > 0 ? metricsThroughput / rawCount : 1, - errorRate: errorRateMap.get(bucket) ?? 0, - p50LatencyMs: p50Map.get(bucket) ?? 0, - p95LatencyMs: p95Map.get(bucket) ?? 0, - p99LatencyMs: p99Map.get(bucket) ?? 0, + errorRate: m?.errorRate ?? 0, + p50LatencyMs: m?.p50 ?? 0, + p95LatencyMs: m?.p95 ?? 0, + p99LatencyMs: m?.p99 ?? 0, } } @@ -734,10 +759,10 @@ const getOverviewTimeSeriesEffect = Effect.fn("QueryEngine.getOverviewTimeSeries tracedThroughput: rawCount, hasSampling: false, samplingWeight: 1, - errorRate: errorRateMap.get(bucket) ?? 0, - p50LatencyMs: p50Map.get(bucket) ?? 0, - p95LatencyMs: p95Map.get(bucket) ?? 0, - p99LatencyMs: p99Map.get(bucket) ?? 0, + errorRate: m?.errorRate ?? 0, + p50LatencyMs: m?.p50 ?? 0, + p95LatencyMs: m?.p95 ?? 0, + p99LatencyMs: m?.p99 ?? 0, } }) diff --git a/apps/web/src/api/tinybird/timeseries-adapters.test.ts b/apps/web/src/api/tinybird/timeseries-adapters.test.ts index 939cedf3..48511feb 100644 --- a/apps/web/src/api/tinybird/timeseries-adapters.test.ts +++ b/apps/web/src/api/tinybird/timeseries-adapters.test.ts @@ -1,5 +1,18 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import { Effect } from "effect"; + +const executeQueryEngineMock = vi.fn() +const runTinybirdQueryMock = vi.fn() + +vi.mock("@/api/tinybird/effect-utils", () => ({ + TinybirdDateTimeString: {}, + TinybirdQueryError: class extends Error { _tag = "TinybirdQueryError" }, + decodeInput: (_schema: unknown, data: unknown) => Effect.succeed(data), + invalidTinybirdInput: () => Effect.fail(new Error("invalid")), + executeQueryEngine: (...args: unknown[]) => executeQueryEngineMock(...args), + runTinybirdQuery: (...args: unknown[]) => runTinybirdQueryMock(...args), +})) + import { getCustomChartServiceDetail, getCustomChartServiceSparklines, @@ -7,40 +20,37 @@ import { } from "@/api/tinybird/custom-charts"; import { getServiceApdexTimeSeries } from "@/api/tinybird/services"; -const customTracesTimeseriesMock = vi.fn(); -const serviceApdexTimeseriesMock = vi.fn(); +function tsResponse(data: Array<{ bucket: string; series: Record }>) { + return Effect.succeed({ result: { kind: "timeseries", source: "traces", data } }) +} -vi.mock("@/lib/tinybird", () => ({ - getTinybird: () => ({ - query: { - custom_traces_timeseries: customTracesTimeseriesMock, - service_apdex_time_series: serviceApdexTimeseriesMock, - }, - }), -})); +const emptyTs = () => tsResponse([]) describe("timeseries adapters", () => { beforeEach(() => { - customTracesTimeseriesMock.mockReset(); - serviceApdexTimeseriesMock.mockReset(); - }); + executeQueryEngineMock.mockReset() + runTinybirdQueryMock.mockReset() + }) it("fills overview/detail buckets without flattening existing points", async () => { - customTracesTimeseriesMock.mockReturnValue( - Effect.succeed({ - data: [ - { - bucket: "2026-01-01 00:00:00", - groupName: "all", + const bucket = "2026-01-01T00:00:00.000Z" + + executeQueryEngineMock.mockImplementation((operation: string) => { + if (operation.includes("spanMetricsCalls")) return emptyTs() + if (operation.includes("allMetrics")) { + return tsResponse([{ + bucket, + series: { count: 10, - errorRate: 2, - p50Duration: 11, - p95Duration: 20, - p99Duration: 30, + error_rate: 2, + p50_duration: 11, + p95_duration: 20, + p99_duration: 30, }, - ], - }), - ); + }]) + } + return emptyTs() + }) const overview = await Effect.runPromise( getOverviewTimeSeries({ @@ -80,24 +90,22 @@ describe("timeseries adapters", () => { }); it("fills service sparklines per service across the selected timeline", async () => { - customTracesTimeseriesMock.mockReturnValue( - Effect.succeed({ - data: [ - { - bucket: "2026-01-01 00:00:00", - groupName: "checkout", - count: 3, - errorRate: 1, - }, - { - bucket: "2026-01-01 00:10:00", - groupName: "checkout", - count: 5, - errorRate: 0, - }, - ], - }), - ); + executeQueryEngineMock.mockImplementation((operation: string) => { + if (operation.includes("spanMetricsCalls")) return emptyTs() + if (operation.includes("count")) { + return tsResponse([ + { bucket: "2026-01-01T00:00:00.000Z", series: { checkout: 3 } }, + { bucket: "2026-01-01T00:10:00.000Z", series: { checkout: 5 } }, + ]) + } + if (operation.includes("error")) { + return tsResponse([ + { bucket: "2026-01-01T00:00:00.000Z", series: { checkout: 1 } }, + { bucket: "2026-01-01T00:10:00.000Z", series: { checkout: 0 } }, + ]) + } + return emptyTs() + }) const response = await Effect.runPromise( getCustomChartServiceSparklines({ @@ -127,7 +135,7 @@ describe("timeseries adapters", () => { }); it("fills service apdex buckets while preserving real values", async () => { - serviceApdexTimeseriesMock.mockReturnValue( + runTinybirdQueryMock.mockReturnValue( Effect.succeed({ data: [ { diff --git a/apps/web/src/api/tinybird/traces.test.ts b/apps/web/src/api/tinybird/traces.test.ts index d41688de..d2dfa8da 100644 --- a/apps/web/src/api/tinybird/traces.test.ts +++ b/apps/web/src/api/tinybird/traces.test.ts @@ -1,29 +1,39 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import { Effect } from "effect"; -const tinybirdQueryMocks = { - list_traces: vi.fn<() => Effect.Effect<{ data: Array> }, never, never>>( - () => Effect.succeed({ data: [] }), - ), - traces_facets: vi.fn(() => Effect.succeed({ data: [] })), - traces_duration_stats: vi.fn(() => Effect.succeed({ data: [] })), - span_hierarchy: vi.fn(() => Effect.succeed({ data: [] })), - span_attribute_keys: vi.fn(() => Effect.succeed({ data: [] })), - span_attribute_values: vi.fn(() => Effect.succeed({ data: [] })), -}; +const executeQueryEngineMock = vi.fn() -vi.mock("@/lib/tinybird", () => ({ - getTinybird: () => ({ query: tinybirdQueryMocks }), -})); +vi.mock("@/api/tinybird/effect-utils", async () => { + const actual = await vi.importActual( + "@/api/tinybird/effect-utils", + ) + return { + ...actual, + executeQueryEngine: (...args: unknown[]) => executeQueryEngineMock(...args), + } +}) import { getTracesDurationStats, getTracesFacets, listTraces } from "@/api/tinybird/traces"; describe("tinybird traces attribute filter params", () => { beforeEach(() => { - for (const mock of Object.values(tinybirdQueryMocks)) { - mock.mockClear(); - } - }); + executeQueryEngineMock.mockReset() + executeQueryEngineMock.mockImplementation((operation: string) => { + if (operation.includes("Facets")) { + return Effect.succeed({ result: { kind: "facets", source: "traces", data: [] } }) + } + if (operation.includes("DurationStats") || operation.includes("Stats")) { + return Effect.succeed({ + result: { + kind: "stats", + source: "traces", + data: { minDurationMs: 0, maxDurationMs: 0, p50DurationMs: 0, p95DurationMs: 0 }, + }, + }) + } + return Effect.succeed({ result: { kind: "list", source: "traces", data: [] } }) + }) + }) it("forwards basic filter params to list_traces", async () => { await Effect.runPromise( @@ -35,10 +45,11 @@ describe("tinybird traces attribute filter params", () => { }), ); - expect(tinybirdQueryMocks.list_traces).toHaveBeenCalledWith( + expect(executeQueryEngineMock).toHaveBeenCalledWith( + "queryEngine.listTraces", expect.objectContaining({ - start_time: "2026-02-01 00:00:00", - end_time: "2026-02-01 01:00:00", + startTime: "2026-02-01 00:00:00", + endTime: "2026-02-01 01:00:00", }), ); }); @@ -53,16 +64,18 @@ describe("tinybird traces attribute filter params", () => { }), ); - expect(tinybirdQueryMocks.traces_facets).toHaveBeenCalledWith( + expect(executeQueryEngineMock).toHaveBeenCalledWith( + "queryEngine.getTracesFacets", expect.objectContaining({ - start_time: "2026-02-01 00:00:00", - end_time: "2026-02-01 01:00:00", + startTime: "2026-02-01 00:00:00", + endTime: "2026-02-01 01:00:00", }), ); - expect(tinybirdQueryMocks.traces_duration_stats).toHaveBeenCalledWith( + expect(executeQueryEngineMock).toHaveBeenCalledWith( + "queryEngine.getTracesDurationStats", expect.objectContaining({ - start_time: "2026-02-01 00:00:00", - end_time: "2026-02-01 01:00:00", + startTime: "2026-02-01 00:00:00", + endTime: "2026-02-01 01:00:00", }), ); }); @@ -77,35 +90,40 @@ describe("tinybird traces attribute filter params", () => { }), ); - expect(tinybirdQueryMocks.traces_duration_stats).toHaveBeenCalledWith( + expect(executeQueryEngineMock).toHaveBeenCalledWith( + "queryEngine.getTracesDurationStats", expect.objectContaining({ - start_time: "2026-02-01 00:00:00", - end_time: "2026-02-01 01:00:00", + startTime: "2026-02-01 00:00:00", + endTime: "2026-02-01 01:00:00", }), ); }); it("builds a curated rootSpan summary for overview rows", async () => { - tinybirdQueryMocks.list_traces.mockReturnValueOnce( + executeQueryEngineMock.mockReturnValueOnce( Effect.succeed({ - data: [ - { - traceId: "trace-1", - startTime: "2026-02-01 00:00:00", - endTime: "2026-02-01 00:00:02", - durationMicros: 2000000, - spanCount: 3, - services: ["checkout", "payments"], - rootSpanName: "GET", - rootSpanKind: "SPAN_KIND_SERVER", - rootSpanStatusCode: "Ok", - rootHttpMethod: "GET", - rootHttpRoute: "/checkout", - rootHttpStatusCode: "200", - hasError: 0, - }, - ], - } as { data: Array> }), + result: { + kind: "list", + source: "traces", + data: [ + { + traceId: "trace-1", + timestamp: "2026-02-01 00:00:00", + durationMs: 2000, + serviceName: "checkout", + spanName: "GET", + spanKind: "SPAN_KIND_SERVER", + statusCode: "Ok", + hasError: 0, + spanAttributes: { + "http.method": "GET", + "http.route": "/checkout", + "http.status_code": "200", + }, + }, + ], + }, + }), ); const response = await Effect.runPromise( diff --git a/apps/web/src/components/dashboard-builder/widgets/list-widget.test.ts b/apps/web/src/components/dashboard-builder/widgets/list-widget.test.ts index 9ebb5f08..46c85755 100644 --- a/apps/web/src/components/dashboard-builder/widgets/list-widget.test.ts +++ b/apps/web/src/components/dashboard-builder/widgets/list-widget.test.ts @@ -1,51 +1,69 @@ import { describe, expect, it, vi } from "vitest"; import { Effect } from "effect"; -// --- Mock tinybird BEFORE importing anything that uses it --- -const tinybirdQueryMocks = { - list_traces: vi.fn(() => - Effect.succeed({ - data: [ - { - traceId: "t1", - startTime: "2026-03-28 00:00:00", - endTime: "2026-03-28 00:00:01", - durationMicros: 142000, - spanCount: 3, - services: ["api-gw", "user-svc"], - rootSpanName: "GET /api/users", - rootSpanKind: "SERVER", - rootSpanStatusCode: "Ok", - rootHttpMethod: "GET", - rootHttpRoute: "/api/users", - rootHttpStatusCode: "200", - hasError: 0, - }, - ], - }), - ), - list_logs: vi.fn(() => - Effect.succeed({ - data: [ - { - timestamp: "2026-03-28 12:00:00", - severityText: "ERROR", - severityNumber: 17, - serviceName: "api-gw", - body: "Connection refused", - traceId: "t1", - spanId: "s1", - logAttributes: '{"http.method":"GET"}', - resourceAttributes: '{"service.version":"1.0"}', - }, - ], - }), - ), -}; - -vi.mock("@/lib/tinybird", () => ({ - getTinybird: () => ({ query: tinybirdQueryMocks }), -})); +// --- Mock effect-utils BEFORE importing anything that uses it --- +const executeQueryEngineMock = vi.fn() +const runTinybirdQueryMock = vi.fn() + +vi.mock("@/api/tinybird/effect-utils", async () => { + const actual = await vi.importActual( + "@/api/tinybird/effect-utils", + ) + return { + ...actual, + executeQueryEngine: (...args: unknown[]) => executeQueryEngineMock(...args), + runTinybirdQuery: (...args: unknown[]) => runTinybirdQueryMock(...args), + } +}) + +// Default mock implementations +executeQueryEngineMock.mockImplementation((operation: string) => { + if (operation.includes("listTraces")) { + return Effect.succeed({ + result: { + kind: "list", + source: "traces", + data: [ + { + traceId: "t1", + timestamp: "2026-03-28 00:00:00", + durationMs: 142, + serviceName: "api-gw", + spanName: "GET /api/users", + spanKind: "SERVER", + statusCode: "Ok", + hasError: 0, + spanAttributes: { + "http.method": "GET", + "http.route": "/api/users", + "http.status_code": "200", + }, + }, + ], + }, + }) + } + // Default: empty list + return Effect.succeed({ result: { kind: "list", source: "traces", data: [] } }) +}) + +runTinybirdQueryMock.mockImplementation(() => + Effect.succeed({ + data: [ + { + timestamp: "2026-03-28 12:00:00", + severityText: "ERROR", + severityNumber: 17, + serviceName: "api-gw", + body: "Connection refused", + traceId: "t1", + spanId: "s1", + logAttributes: '{"http.method":"GET"}', + resourceAttributes: '{"service.version":"1.0"}', + }, + ], + }), +) // --- Now import production code --- import { listTraces } from "@/api/tinybird/traces"; diff --git a/packages/domain/src/query-engine.ts b/packages/domain/src/query-engine.ts index c3dede8e..80f91101 100644 --- a/packages/domain/src/query-engine.ts +++ b/packages/domain/src/query-engine.ts @@ -91,6 +91,7 @@ export const TracesTimeseriesQuery = Schema.Struct({ kind: Schema.Literal("timeseries"), source: Schema.Literal("traces"), metric: TracesMetric, + allMetrics: Schema.optional(Schema.Boolean), apdexThresholdMs: Schema.optional( Schema.Number.check(Schema.isFinite(), Schema.isGreaterThan(0)), ),