From c8bc611d82194a55caeb5b869b52b29ef8810729 Mon Sep 17 00:00:00 2001 From: Nicolas Meienberger Date: Fri, 2 Jan 2026 14:13:45 +0100 Subject: [PATCH 1/4] feat: restore progress --- app/client/components/restore-form.tsx | 3 + app/client/components/restore-progress.tsx | 92 +++++ app/client/hooks/use-server-events.ts | 34 +- app/schemas/events-dto.ts | 38 ++ app/server/core/events.ts | 8 +- .../modules/events/events.controller.ts | 41 +- .../repositories/repositories.controller.ts | 2 + .../repositories/repositories.service.ts | 317 +++++++++++++--- app/server/utils/restic.ts | 355 ++++++++++++++---- 9 files changed, 753 insertions(+), 137 deletions(-) create mode 100644 app/client/components/restore-progress.tsx diff --git a/app/client/components/restore-form.tsx b/app/client/components/restore-form.tsx index 77025816..f9cd2c36 100644 --- a/app/client/components/restore-form.tsx +++ b/app/client/components/restore-form.tsx @@ -9,6 +9,7 @@ import { Label } from "~/client/components/ui/label"; import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from "~/client/components/ui/select"; import { PathSelector } from "~/client/components/path-selector"; import { FileTree } from "~/client/components/file-tree"; +import { RestoreProgress } from "~/client/components/restore-progress"; import { listSnapshotFilesOptions, restoreSnapshotMutation } from "~/client/api-client/@tanstack/react-query.gen"; import { useFileBrowser } from "~/client/hooks/use-file-browser"; import { OVERWRITE_MODES, type OverwriteMode } from "~/schemas/restic"; @@ -168,6 +169,8 @@ export function RestoreForm({ snapshot, repository, snapshotId, returnPath }: Re
+ {isRestoring && } + Restore Location diff --git a/app/client/components/restore-progress.tsx b/app/client/components/restore-progress.tsx new file mode 100644 index 00000000..9383a8d6 --- /dev/null +++ b/app/client/components/restore-progress.tsx @@ -0,0 +1,92 @@ +import { useEffect, useState } from "react"; +import { ByteSize } from "~/client/components/bytes-size"; +import { Card } from "~/client/components/ui/card"; +import { Progress } from "~/client/components/ui/progress"; +import { useServerEvents } from "~/client/hooks/use-server-events"; +import type { RestoreCompletedEventDto, RestoreProgressEventDto } from "~/schemas/events-dto"; +import { formatBytes } from "~/utils/format-bytes"; +import { formatDuration } from "~/utils/utils"; + +type Props = { + repositoryId: string; + snapshotId: string; +}; + +export const RestoreProgress = ({ repositoryId, snapshotId }: Props) => { + const { addEventListener } = useServerEvents(); + const [progress, setProgress] = useState(null); + + useEffect(() => { + const unsubscribe = addEventListener("restore:progress", (data) => { + const progressData = data as RestoreProgressEventDto; + if (progressData.repositoryId === repositoryId && progressData.snapshotId === snapshotId) { + setProgress(progressData); + } + }); + + const unsubscribeComplete = addEventListener("restore:completed", (data) => { + const completedData = data as RestoreCompletedEventDto; + if (completedData.repositoryId === repositoryId && completedData.snapshotId === snapshotId) { + setProgress(null); + } + }); + + return () => { + unsubscribe(); + unsubscribeComplete(); + }; + }, [addEventListener, repositoryId, snapshotId]); + + if (!progress) { + return ( + +
+
+ Restore in progress +
+ + ); + } + + const percentDone = Math.round(progress.percent_done * 100); + const speed = formatBytes(progress.bytes_done / progress.seconds_elapsed); + + return ( + +
+
+
+ Restore in progress +
+ {percentDone}% +
+ + + +
+
+

Files

+

+ {progress.files_done.toLocaleString()} / {progress.total_files.toLocaleString()} +

+
+
+

Data

+

+ / +

+
+
+

Elapsed

+

{formatDuration(progress.seconds_elapsed)}

+
+
+

Speed

+

+ {progress.seconds_elapsed > 0 ? `${speed.text} ${speed.unit}/s` : "Calculating..."} +

+
+
+ + ); +}; diff --git a/app/client/hooks/use-server-events.ts b/app/client/hooks/use-server-events.ts index 28eefd16..0c1eb46e 100644 --- a/app/client/hooks/use-server-events.ts +++ b/app/client/hooks/use-server-events.ts @@ -1,9 +1,12 @@ -import { useCallback, useEffect, useRef } from "react"; import { useQueryClient } from "@tanstack/react-query"; +import { useCallback, useEffect, useRef } from "react"; import type { BackupCompletedEventDto, BackupProgressEventDto, BackupStartedEventDto, + RestoreCompletedEventDto, + RestoreProgressEventDto, + RestoreStartedEventDto, } from "~/schemas/events-dto"; type ServerEventType = @@ -17,6 +20,9 @@ type ServerEventType = | "volume:updated" | "mirror:started" | "mirror:completed" + | "restore:started" + | "restore:progress" + | "restore:completed" | "doctor:started" | "doctor:completed" | "doctor:cancelled"; @@ -161,6 +167,32 @@ export function useServerEvents() { }); }); + eventSource.addEventListener("restore:started", (e) => { + const data = JSON.parse(e.data) as RestoreStartedEventDto; + console.info("[SSE] Restore started:", data); + + handlersRef.current.get("restore:started")?.forEach((handler) => { + handler(data); + }); + }); + + eventSource.addEventListener("restore:progress", (e) => { + const data = JSON.parse(e.data) as RestoreProgressEventDto; + + handlersRef.current.get("restore:progress")?.forEach((handler) => { + handler(data); + }); + }); + + eventSource.addEventListener("restore:completed", (e) => { + const data = JSON.parse(e.data) as RestoreCompletedEventDto; + console.info("[SSE] Restore completed:", data); + + handlersRef.current.get("restore:completed")?.forEach((handler) => { + handler(data); + }); + }); + eventSource.addEventListener("doctor:started", (e) => { const data = JSON.parse(e.data) as DoctorEvent; console.info("[SSE] Doctor started:", data); diff --git a/app/schemas/events-dto.ts b/app/schemas/events-dto.ts index 4ff9b02f..f8ce1ed1 100644 --- a/app/schemas/events-dto.ts +++ b/app/schemas/events-dto.ts @@ -2,6 +2,7 @@ import { type } from "arktype"; import { resticBackupProgressMetricsSchema, resticBackupRunSummarySchema } from "~/schemas/restic-dto"; export const backupEventStatusSchema = type("'success' | 'error' | 'stopped' | 'warning'"); +export const restoreEventStatusSchema = type("'success' | 'error'"); const backupEventBaseSchema = type({ scheduleId: "number", @@ -13,6 +14,20 @@ const organizationScopedSchema = type({ organizationId: "string", }); +const restoreEventBaseSchema = type({ + repositoryId: "string", + snapshotId: "string", +}); + +const restoreProgressMetricsSchema = type({ + seconds_elapsed: "number", + percent_done: "number", + total_files: "number", + files_done: "number", + total_bytes: "number", + bytes_done: "number", +}); + export const backupStartedEventSchema = backupEventBaseSchema; export const backupProgressEventSchema = backupEventBaseSchema.and(resticBackupProgressMetricsSchema); @@ -24,16 +39,39 @@ export const backupCompletedEventSchema = backupEventBaseSchema.and( }), ); +export const restoreStartedEventSchema = restoreEventBaseSchema; + +export const restoreProgressEventSchema = restoreEventBaseSchema.and(restoreProgressMetricsSchema); + +export const restoreCompletedEventSchema = restoreEventBaseSchema.and( + type({ + status: restoreEventStatusSchema, + error: "string?", + }), +); + export const serverBackupStartedEventSchema = organizationScopedSchema.and(backupStartedEventSchema); export const serverBackupProgressEventSchema = organizationScopedSchema.and(backupProgressEventSchema); export const serverBackupCompletedEventSchema = organizationScopedSchema.and(backupCompletedEventSchema); +export const serverRestoreStartedEventSchema = organizationScopedSchema.and(restoreStartedEventSchema); + +export const serverRestoreProgressEventSchema = organizationScopedSchema.and(restoreProgressEventSchema); + +export const serverRestoreCompletedEventSchema = organizationScopedSchema.and(restoreCompletedEventSchema); + export type BackupEventStatusDto = typeof backupEventStatusSchema.infer; export type BackupStartedEventDto = typeof backupStartedEventSchema.infer; export type BackupProgressEventDto = typeof backupProgressEventSchema.infer; export type BackupCompletedEventDto = typeof backupCompletedEventSchema.infer; +export type RestoreStartedEventDto = typeof restoreStartedEventSchema.infer; +export type RestoreProgressEventDto = typeof restoreProgressEventSchema.infer; +export type RestoreCompletedEventDto = typeof restoreCompletedEventSchema.infer; export type ServerBackupStartedEventDto = typeof serverBackupStartedEventSchema.infer; export type ServerBackupProgressEventDto = typeof serverBackupProgressEventSchema.infer; export type ServerBackupCompletedEventDto = typeof serverBackupCompletedEventSchema.infer; +export type ServerRestoreStartedEventDto = typeof serverRestoreStartedEventSchema.infer; +export type ServerRestoreProgressEventDto = typeof serverRestoreProgressEventSchema.infer; +export type ServerRestoreCompletedEventDto = typeof serverRestoreCompletedEventSchema.infer; diff --git a/app/server/core/events.ts b/app/server/core/events.ts index 45da39f6..c6869dd5 100644 --- a/app/server/core/events.ts +++ b/app/server/core/events.ts @@ -1,11 +1,14 @@ import { EventEmitter } from "node:events"; import type { TypedEmitter } from "tiny-typed-emitter"; -import type { DoctorResult } from "~/schemas/restic"; import type { ServerBackupCompletedEventDto, ServerBackupProgressEventDto, ServerBackupStartedEventDto, + ServerRestoreCompletedEventDto, + ServerRestoreProgressEventDto, + ServerRestoreStartedEventDto, } from "~/schemas/events-dto"; +import type { DoctorResult } from "~/schemas/restic"; /** * Event payloads for the SSE system @@ -14,6 +17,9 @@ interface ServerEvents { "backup:started": (data: ServerBackupStartedEventDto) => void; "backup:progress": (data: ServerBackupProgressEventDto) => void; "backup:completed": (data: ServerBackupCompletedEventDto) => void; + "restore:started": (data: ServerRestoreStartedEventDto) => void; + "restore:progress": (data: ServerRestoreProgressEventDto) => void; + "restore:completed": (data: ServerRestoreCompletedEventDto) => void; "mirror:started": (data: { organizationId: string; scheduleId: number; diff --git a/app/server/modules/events/events.controller.ts b/app/server/modules/events/events.controller.ts index 3d7a778b..5ab49af1 100644 --- a/app/server/modules/events/events.controller.ts +++ b/app/server/modules/events/events.controller.ts @@ -1,14 +1,17 @@ import { Hono } from "hono"; import { streamSSE } from "hono/streaming"; -import { logger } from "../../utils/logger"; -import { serverEvents } from "../../core/events"; -import { requireAuth } from "../auth/auth.middleware"; -import type { DoctorResult } from "~/schemas/restic"; import type { ServerBackupCompletedEventDto, ServerBackupProgressEventDto, ServerBackupStartedEventDto, + ServerRestoreCompletedEventDto, + ServerRestoreProgressEventDto, + ServerRestoreStartedEventDto, } from "~/schemas/events-dto"; +import type { DoctorResult } from "~/schemas/restic"; +import { serverEvents } from "../../core/events"; +import { logger } from "../../utils/logger"; +import { requireAuth } from "../auth/auth.middleware"; export const eventsController = new Hono().use(requireAuth).get("/", (c) => { logger.info("Client connected to SSE endpoint"); @@ -96,6 +99,30 @@ export const eventsController = new Hono().use(requireAuth).get("/", (c) => { }); }; + const onRestoreStarted = async (data: ServerRestoreStartedEventDto) => { + if (data.organizationId !== organizationId) return; + await stream.writeSSE({ + data: JSON.stringify(data), + event: "restore:started", + }); + }; + + const onRestoreProgress = async (data: ServerRestoreProgressEventDto) => { + if (data.organizationId !== organizationId) return; + await stream.writeSSE({ + data: JSON.stringify(data), + event: "restore:progress", + }); + }; + + const onRestoreCompleted = async (data: ServerRestoreCompletedEventDto) => { + if (data.organizationId !== organizationId) return; + await stream.writeSSE({ + data: JSON.stringify(data), + event: "restore:completed", + }); + }; + const onDoctorStarted = async (data: { organizationId: string; repositoryId: string; repositoryName: string }) => { if (data.organizationId !== organizationId) return; await stream.writeSSE({ @@ -139,6 +166,9 @@ export const eventsController = new Hono().use(requireAuth).get("/", (c) => { serverEvents.on("volume:updated", onVolumeUpdated); serverEvents.on("mirror:started", onMirrorStarted); serverEvents.on("mirror:completed", onMirrorCompleted); + serverEvents.on("restore:started", onRestoreStarted); + serverEvents.on("restore:progress", onRestoreProgress); + serverEvents.on("restore:completed", onRestoreCompleted); serverEvents.on("doctor:started", onDoctorStarted); serverEvents.on("doctor:completed", onDoctorCompleted); serverEvents.on("doctor:cancelled", onDoctorCancelled); @@ -159,6 +189,9 @@ export const eventsController = new Hono().use(requireAuth).get("/", (c) => { serverEvents.off("volume:updated", onVolumeUpdated); serverEvents.off("mirror:started", onMirrorStarted); serverEvents.off("mirror:completed", onMirrorCompleted); + serverEvents.off("restore:started", onRestoreStarted); + serverEvents.off("restore:progress", onRestoreProgress); + serverEvents.off("restore:completed", onRestoreCompleted); serverEvents.off("doctor:started", onDoctorStarted); serverEvents.off("doctor:completed", onDoctorCompleted); serverEvents.off("doctor:cancelled", onDoctorCancelled); diff --git a/app/server/modules/repositories/repositories.controller.ts b/app/server/modules/repositories/repositories.controller.ts index c4c1b8fd..6074fb3f 100644 --- a/app/server/modules/repositories/repositories.controller.ts +++ b/app/server/modules/repositories/repositories.controller.ts @@ -143,6 +143,8 @@ export const repositoriesController = new Hono() summary: snapshot.summary, }; + c.header("Cache-Control", "max-age=300, stale-while-revalidate=600"); + return c.json(response, 200); }) .get( diff --git a/app/server/modules/repositories/repositories.service.ts b/app/server/modules/repositories/repositories.service.ts index 20e735c7..12ad36dc 100644 --- a/app/server/modules/repositories/repositories.service.ts +++ b/app/server/modules/repositories/repositories.service.ts @@ -1,29 +1,43 @@ import crypto from "node:crypto"; -import { and, eq } from "drizzle-orm"; -import { BadRequestError, ConflictError, InternalServerError, NotFoundError } from "http-errors-enhanced"; -import { db } from "../../db/db"; -import { repositoriesTable } from "../../db/schema"; -import { toMessage } from "../../utils/errors"; -import { generateShortId } from "../../utils/id"; -import { restic, buildEnv, buildRepoUrl, addCommonArgs, cleanupTemporaryKeys } from "../../utils/restic"; -import { safeSpawn } from "../../utils/spawn"; -import { cryptoUtils } from "../../utils/crypto"; -import { cache } from "../../utils/cache"; -import { repoMutex } from "../../core/repository-mutex"; import { type } from "arktype"; +import { and, eq } from "drizzle-orm"; +import { + BadRequestError, + ConflictError, + InternalServerError, + NotFoundError, +} from "http-errors-enhanced"; import { - repositoryConfigSchema, type CompressionMode, type OverwriteMode, type RepositoryConfig, + repositoryConfigSchema, } from "~/schemas/restic"; -import { getOrganizationId } from "~/server/core/request-context"; import { serverEvents } from "~/server/core/events"; -import { executeDoctor } from "./doctor"; +import { getOrganizationId } from "~/server/core/request-context"; import { logger } from "~/server/utils/logger"; -import { parseRetentionCategories, type RetentionCategory } from "~/server/utils/retention-categories"; +import { + parseRetentionCategories, + type RetentionCategory, +} from "~/server/utils/retention-categories"; +import { repoMutex } from "../../core/repository-mutex"; +import { db } from "../../db/db"; +import { repositoriesTable } from "../../db/schema"; +import { cache } from "../../utils/cache"; +import { cryptoUtils } from "../../utils/crypto"; +import { toMessage } from "../../utils/errors"; +import { generateShortId } from "../../utils/id"; +import { + addCommonArgs, + buildEnv, + buildRepoUrl, + cleanupTemporaryKeys, + restic, +} from "../../utils/restic"; +import { safeSpawn } from "../../utils/spawn"; import { backupsService } from "../backups/backups.service"; import type { UpdateRepositoryBody } from "./repositories.dto"; +import { executeDoctor } from "./doctor"; const runningDoctors = new Map(); @@ -31,22 +45,31 @@ const findRepository = async (idOrShortId: string) => { const organizationId = getOrganizationId(); return await db.query.repositoriesTable.findFirst({ where: { - AND: [{ OR: [{ id: idOrShortId }, { shortId: idOrShortId }] }, { organizationId }], + AND: [ + { OR: [{ id: idOrShortId }, { shortId: idOrShortId }] }, + { organizationId }, + ], }, }); }; const listRepositories = async () => { const organizationId = getOrganizationId(); - const repositories = await db.query.repositoriesTable.findMany({ where: { organizationId } }); + const repositories = await db.query.repositoriesTable.findMany({ + where: { organizationId }, + }); return repositories; }; -const encryptConfig = async (config: RepositoryConfig): Promise => { +const encryptConfig = async ( + config: RepositoryConfig, +): Promise => { const encryptedConfig: Record = { ...config }; if (config.customPassword) { - encryptedConfig.customPassword = await cryptoUtils.sealSecret(config.customPassword); + encryptedConfig.customPassword = await cryptoUtils.sealSecret( + config.customPassword, + ); } if (config.cacert) { @@ -56,25 +79,39 @@ const encryptConfig = async (config: RepositoryConfig): Promise { @@ -199,7 +250,10 @@ const deleteRepository = async (id: string) => { await db .delete(repositoriesTable) .where( - and(eq(repositoriesTable.id, repository.id), eq(repositoriesTable.organizationId, repository.organizationId)), + and( + eq(repositoriesTable.id, repository.id), + eq(repositoriesTable.organizationId, repository.organizationId), + ), ); cache.delByPrefix(`snapshots:${repository.id}:`); @@ -223,7 +277,8 @@ const listSnapshots = async (id: string, backupId?: string) => { } const cacheKey = `snapshots:${repository.id}:${backupId || "all"}`; - const cached = cache.get>>(cacheKey); + const cached = + cache.get>>(cacheKey); if (cached) { return cached; } @@ -233,7 +288,10 @@ const listSnapshots = async (id: string, backupId?: string) => { let snapshots = []; if (backupId) { - snapshots = await restic.snapshots(repository.config, { tags: [backupId], organizationId }); + snapshots = await restic.snapshots(repository.config, { + tags: [backupId], + organizationId, + }); } else { snapshots = await restic.snapshots(repository.config, { organizationId }); } @@ -264,9 +322,26 @@ const listSnapshotFiles = async ( const cacheKey = `ls:${repository.id}:${snapshotId}:${path || "root"}:${offset}:${limit}`; type LsResult = { - snapshot: { id: string; short_id: string; time: string; hostname: string; paths: string[] } | null; - nodes: { name: string; type: string; path: string; size?: number; mode?: number }[]; - pagination: { offset: number; limit: number; total: number; hasMore: boolean }; + snapshot: { + id: string; + short_id: string; + time: string; + hostname: string; + paths: string[]; + } | null; + nodes: { + name: string; + type: string; + path: string; + size?: number; + mode?: number; + }[]; + pagination: { + offset: number; + limit: number; + total: number; + hasMore: boolean; + }; }; const cached = cache.get(cacheKey); if (cached?.snapshot) { @@ -280,9 +355,18 @@ const listSnapshotFiles = async ( }; } - const releaseLock = await repoMutex.acquireShared(repository.id, `ls:${snapshotId}`); + const releaseLock = await repoMutex.acquireShared( + repository.id, + `ls:${snapshotId}`, + ); try { - const result = await restic.ls(repository.config, snapshotId, organizationId, path, { offset, limit }); + const result = await restic.ls( + repository.config, + snapshotId, + organizationId, + path, + { offset, limit }, + ); if (!result.snapshot) { throw new NotFoundError("Snapshot not found or empty"); @@ -332,9 +416,50 @@ const restoreSnapshot = async ( const target = options?.targetPath || "/"; - const releaseLock = await repoMutex.acquireShared(repository.id, `restore:${snapshotId}`); + const releaseLock = await repoMutex.acquireShared( + repository.id, + `restore:${snapshotId}`, + ); try { - const result = await restic.restore(repository.config, snapshotId, target, { ...options, organizationId }); + serverEvents.emit("restore:started", { + organizationId, + repositoryId: repository.id, + snapshotId, + }); + + const result = await restic.restore(repository.config, snapshotId, target, { + ...options, + organizationId, + onProgress: (progress) => { + const { + seconds_elapsed, + percent_done, + total_files, + files_done, + total_bytes, + bytes_done, + } = progress; + + serverEvents.emit("restore:progress", { + organizationId, + repositoryId: repository.id, + snapshotId, + seconds_elapsed, + percent_done, + total_files, + files_done, + total_bytes, + bytes_done, + }); + }, + }); + + serverEvents.emit("restore:completed", { + organizationId, + repositoryId: repository.id, + snapshotId, + status: "success", + }); return { success: true, @@ -342,6 +467,15 @@ const restoreSnapshot = async ( filesRestored: result.files_restored, filesSkipped: result.files_skipped, }; + } catch (error) { + serverEvents.emit("restore:completed", { + organizationId, + repositoryId: repository.id, + snapshotId, + status: "error", + error: toMessage(error), + }); + throw error; } finally { releaseLock(); } @@ -356,10 +490,14 @@ const getSnapshotDetails = async (id: string, snapshotId: string) => { } const cacheKey = `snapshots:${repository.id}:all`; - let snapshots = cache.get>>(cacheKey); + let snapshots = + cache.get>>(cacheKey); if (!snapshots) { - const releaseLock = await repoMutex.acquireShared(repository.id, `snapshot_details:${snapshotId}`); + const releaseLock = await repoMutex.acquireShared( + repository.id, + `snapshot_details:${snapshotId}`, + ); try { snapshots = await restic.snapshots(repository.config, { organizationId }); cache.set(cacheKey, snapshots); @@ -368,7 +506,9 @@ const getSnapshotDetails = async (id: string, snapshotId: string) => { } } - const snapshot = snapshots.find((snap) => snap.id === snapshotId || snap.short_id === snapshotId); + const snapshot = snapshots.find( + (snap) => snap.id === snapshotId || snap.short_id === snapshotId, + ); if (!snapshot) { void refreshSnapshots(id).catch(() => {}); @@ -389,7 +529,9 @@ const checkHealth = async (repositoryId: string) => { const releaseLock = await repoMutex.acquireExclusive(repository.id, "check"); try { - const { hasErrors, error } = await restic.check(repository.config, { organizationId }); + const { hasErrors, error } = await restic.check(repository.config, { + organizationId, + }); await db .update(repositoriesTable) @@ -399,7 +541,10 @@ const checkHealth = async (repositoryId: string) => { lastError: error, }) .where( - and(eq(repositoriesTable.id, repository.id), eq(repositoriesTable.organizationId, repository.organizationId)), + and( + eq(repositoriesTable.id, repository.id), + eq(repositoriesTable.organizationId, repository.organizationId), + ), ); return { lastError: error }; @@ -422,7 +567,10 @@ const startDoctor = async (id: string) => { const abortController = new AbortController(); try { - await db.update(repositoriesTable).set({ status: "doctor" }).where(eq(repositoriesTable.id, repository.id)); + await db + .update(repositoriesTable) + .set({ status: "doctor" }) + .where(eq(repositoriesTable.id, repository.id)); serverEvents.emit("doctor:started", { organizationId: repository.organizationId, @@ -436,7 +584,12 @@ const startDoctor = async (id: string) => { throw error; } - executeDoctor(repository.id, repository.config, repository.name, abortController.signal) + executeDoctor( + repository.id, + repository.config, + repository.name, + abortController.signal, + ) .catch((error) => { logger.error(`Doctor background task failed: ${toMessage(error)}`); }) @@ -456,14 +609,20 @@ const cancelDoctor = async (id: string) => { const abortController = runningDoctors.get(repository.id); if (!abortController) { - await db.update(repositoriesTable).set({ status: "unknown" }).where(eq(repositoriesTable.id, repository.id)); + await db + .update(repositoriesTable) + .set({ status: "unknown" }) + .where(eq(repositoriesTable.id, repository.id)); throw new ConflictError("No doctor operation is currently running"); } abortController.abort(); runningDoctors.delete(repository.id); - await db.update(repositoriesTable).set({ status: "unknown" }).where(eq(repositoriesTable.id, repository.id)); + await db + .update(repositoriesTable) + .set({ status: "unknown" }) + .where(eq(repositoriesTable.id, repository.id)); serverEvents.emit("doctor:cancelled", { organizationId: repository.organizationId, @@ -482,7 +641,10 @@ const deleteSnapshot = async (id: string, snapshotId: string) => { throw new NotFoundError("Repository not found"); } - const releaseLock = await repoMutex.acquireExclusive(repository.id, `delete:${snapshotId}`); + const releaseLock = await repoMutex.acquireExclusive( + repository.id, + `delete:${snapshotId}`, + ); try { await restic.deleteSnapshot(repository.config, snapshotId, organizationId); cache.delByPrefix(`snapshots:${repository.id}:`); @@ -500,9 +662,16 @@ const deleteSnapshots = async (id: string, snapshotIds: string[]) => { throw new NotFoundError("Repository not found"); } - const releaseLock = await repoMutex.acquireExclusive(repository.id, `delete:bulk`); + const releaseLock = await repoMutex.acquireExclusive( + repository.id, + `delete:bulk`, + ); try { - await restic.deleteSnapshots(repository.config, snapshotIds, organizationId); + await restic.deleteSnapshots( + repository.config, + snapshotIds, + organizationId, + ); cache.delByPrefix(`snapshots:${repository.id}:`); for (const snapshotId of snapshotIds) { cache.delByPrefix(`ls:${repository.id}:${snapshotId}:`); @@ -524,9 +693,17 @@ const tagSnapshots = async ( throw new NotFoundError("Repository not found"); } - const releaseLock = await repoMutex.acquireExclusive(repository.id, `tag:bulk`); + const releaseLock = await repoMutex.acquireExclusive( + repository.id, + `tag:bulk`, + ); try { - await restic.tagSnapshots(repository.config, snapshotIds, tags, organizationId); + await restic.tagSnapshots( + repository.config, + snapshotIds, + tags, + organizationId, + ); cache.delByPrefix(`snapshots:${repository.id}:`); for (const snapshotId of snapshotIds) { cache.delByPrefix(`ls:${repository.id}:${snapshotId}:`); @@ -549,7 +726,9 @@ const refreshSnapshots = async (id: string) => { const releaseLock = await repoMutex.acquireShared(repository.id, "refresh"); try { - const snapshots = await restic.snapshots(repository.config, { organizationId }); + const snapshots = await restic.snapshots(repository.config, { + organizationId, + }); const cacheKey = `snapshots:${repository.id}:all`; cache.set(cacheKey, snapshots); @@ -676,14 +855,24 @@ const execResticCommand = async ( } addCommonArgs(resticArgs, env, repository.config); - const result = await safeSpawn({ command: "restic", args: resticArgs, env, signal, onStdout, onStderr }); + const result = await safeSpawn({ + command: "restic", + args: resticArgs, + env, + signal, + onStdout, + onStderr, + }); await cleanupTemporaryKeys(env); return { exitCode: result.exitCode }; }; -const getRetentionCategories = async (repositoryId: string, scheduleId?: string) => { +const getRetentionCategories = async ( + repositoryId: string, + scheduleId?: string, +) => { if (!scheduleId) { return new Map(); } @@ -707,11 +896,15 @@ const getRetentionCategories = async (repositoryId: string, scheduleId?: string) const { repository } = repositoryResult; - const dryRunResults = await restic.forget(repository.config, schedule.retentionPolicy, { - tag: scheduleId, - organizationId: getOrganizationId(), - dryRun: true, - }); + const dryRunResults = await restic.forget( + repository.config, + schedule.retentionPolicy, + { + tag: scheduleId, + organizationId: getOrganizationId(), + dryRun: true, + }, + ); if (!dryRunResults.data) { return new Map(); diff --git a/app/server/utils/restic.ts b/app/server/utils/restic.ts index f51569ea..15676ade 100644 --- a/app/server/utils/restic.ts +++ b/app/server/utils/restic.ts @@ -1,28 +1,38 @@ import crypto from "node:crypto"; import fs from "node:fs/promises"; -import path from "node:path"; import os from "node:os"; -import { throttle } from "es-toolkit"; +import path from "node:path"; import { type } from "arktype"; -import { RESTIC_PASS_FILE, DEFAULT_EXCLUDES, RESTIC_CACHE_DIR } from "../core/constants"; -import { config as appConfig } from "../core/config"; -import { logger } from "./logger"; -import { cryptoUtils } from "./crypto"; -import type { RetentionPolicy } from "../modules/backups/backups.dto"; -import { safeSpawn, exec } from "./spawn"; -import type { CompressionMode, RepositoryConfig, OverwriteMode, BandwidthLimit } from "~/schemas/restic"; +import { throttle } from "es-toolkit"; +import type { + BandwidthLimit, + CompressionMode, + OverwriteMode, + RepositoryConfig, +} from "~/schemas/restic"; import { + type ResticBackupProgressDto, + type ResticRestoreOutputDto, + type ResticSnapshotSummaryDto, resticBackupOutputSchema, resticBackupProgressSchema, resticRestoreOutputSchema, resticSnapshotSummarySchema, - type ResticBackupProgressDto, - type ResticRestoreOutputDto, - type ResticSnapshotSummaryDto, } from "~/schemas/restic-dto"; -import { ResticError } from "./errors"; +import { config as appConfig } from "../core/config"; +import { + DEFAULT_EXCLUDES, + REPOSITORY_BASE, + RESTIC_CACHE_DIR, + RESTIC_PASS_FILE, +} from "../core/constants"; import { db } from "../db/db"; +import type { RetentionPolicy } from "../modules/backups/backups.dto"; +import { cryptoUtils } from "./crypto"; +import { ResticError } from "./errors"; import { safeJsonParse } from "./json"; +import { logger } from "./logger"; +import { exec, safeSpawn } from "./spawn"; const snapshotInfoSchema = type({ gid: "number?", @@ -62,25 +72,37 @@ export const buildRepoUrl = (config: RepositoryConfig): string => { case "sftp": return `sftp:${config.user}@${config.host}:${config.path}`; default: { - throw new Error(`Unsupported repository backend: ${JSON.stringify(config)}`); + throw new Error( + `Unsupported repository backend: ${JSON.stringify(config)}`, + ); } } }; -export const buildEnv = async (config: RepositoryConfig, organizationId: string) => { +export const buildEnv = async ( + config: RepositoryConfig, + organizationId: string, +) => { const env: Record = { RESTIC_CACHE_DIR, PATH: process.env.PATH || "/usr/local/bin:/usr/bin:/bin", }; if (config.isExistingRepository && config.customPassword) { - const decryptedPassword = await cryptoUtils.resolveSecret(config.customPassword); - const passwordFilePath = path.join("/tmp", `zerobyte-pass-${crypto.randomBytes(8).toString("hex")}.txt`); + const decryptedPassword = await cryptoUtils.resolveSecret( + config.customPassword, + ); + const passwordFilePath = path.join( + "/tmp", + `zerobyte-pass-${crypto.randomBytes(8).toString("hex")}.txt`, + ); await fs.writeFile(passwordFilePath, decryptedPassword, { mode: 0o600 }); env.RESTIC_PASSWORD_FILE = passwordFilePath; } else { - const org = await db.query.organization.findFirst({ where: { id: organizationId } }); + const org = await db.query.organization.findFirst({ + where: { id: organizationId }, + }); if (!org) { throw new Error(`Organization ${organizationId} not found`); @@ -88,10 +110,17 @@ export const buildEnv = async (config: RepositoryConfig, organizationId: string) const metadata = org.metadata; if (!metadata?.resticPassword) { - throw new Error(`Restic password not configured for organization ${organizationId}`); + throw new Error( + `Restic password not configured for organization ${organizationId}`, + ); } else { - const decryptedPassword = await cryptoUtils.resolveSecret(metadata.resticPassword); - const passwordFilePath = path.join("/tmp", `zerobyte-pass-${crypto.randomBytes(8).toString("hex")}.txt`); + const decryptedPassword = await cryptoUtils.resolveSecret( + metadata.resticPassword, + ); + const passwordFilePath = path.join( + "/tmp", + `zerobyte-pass-${crypto.randomBytes(8).toString("hex")}.txt`, + ); await fs.writeFile(passwordFilePath, decryptedPassword, { mode: 0o600 }); env.RESTIC_PASSWORD_FILE = passwordFilePath; } @@ -99,8 +128,12 @@ export const buildEnv = async (config: RepositoryConfig, organizationId: string) switch (config.backend) { case "s3": - env.AWS_ACCESS_KEY_ID = await cryptoUtils.resolveSecret(config.accessKeyId); - env.AWS_SECRET_ACCESS_KEY = await cryptoUtils.resolveSecret(config.secretAccessKey); + env.AWS_ACCESS_KEY_ID = await cryptoUtils.resolveSecret( + config.accessKeyId, + ); + env.AWS_SECRET_ACCESS_KEY = await cryptoUtils.resolveSecret( + config.secretAccessKey, + ); // Huawei OBS requires virtual-hosted style access if (config.endpoint.includes("myhuaweicloud")) { @@ -108,22 +141,35 @@ export const buildEnv = async (config: RepositoryConfig, organizationId: string) } break; case "r2": - env.AWS_ACCESS_KEY_ID = await cryptoUtils.resolveSecret(config.accessKeyId); - env.AWS_SECRET_ACCESS_KEY = await cryptoUtils.resolveSecret(config.secretAccessKey); + env.AWS_ACCESS_KEY_ID = await cryptoUtils.resolveSecret( + config.accessKeyId, + ); + env.AWS_SECRET_ACCESS_KEY = await cryptoUtils.resolveSecret( + config.secretAccessKey, + ); env.AWS_REGION = "auto"; env.AWS_S3_FORCE_PATH_STYLE = "true"; break; case "gcs": { - const decryptedCredentials = await cryptoUtils.resolveSecret(config.credentialsJson); - const credentialsPath = path.join("/tmp", `zerobyte-gcs-${crypto.randomBytes(8).toString("hex")}.json`); - await fs.writeFile(credentialsPath, decryptedCredentials, { mode: 0o600 }); + const decryptedCredentials = await cryptoUtils.resolveSecret( + config.credentialsJson, + ); + const credentialsPath = path.join( + "/tmp", + `zerobyte-gcs-${crypto.randomBytes(8).toString("hex")}.json`, + ); + await fs.writeFile(credentialsPath, decryptedCredentials, { + mode: 0o600, + }); env.GOOGLE_PROJECT_ID = config.projectId; env.GOOGLE_APPLICATION_CREDENTIALS = credentialsPath; break; } case "azure": { env.AZURE_ACCOUNT_NAME = config.accountName; - env.AZURE_ACCOUNT_KEY = await cryptoUtils.resolveSecret(config.accountKey); + env.AZURE_ACCOUNT_KEY = await cryptoUtils.resolveSecret( + config.accountKey, + ); if (config.endpointSuffix) { env.AZURE_ENDPOINT_SUFFIX = config.endpointSuffix; } @@ -131,16 +177,23 @@ export const buildEnv = async (config: RepositoryConfig, organizationId: string) } case "rest": { if (config.username) { - env.RESTIC_REST_USERNAME = await cryptoUtils.resolveSecret(config.username); + env.RESTIC_REST_USERNAME = await cryptoUtils.resolveSecret( + config.username, + ); } if (config.password) { - env.RESTIC_REST_PASSWORD = await cryptoUtils.resolveSecret(config.password); + env.RESTIC_REST_PASSWORD = await cryptoUtils.resolveSecret( + config.password, + ); } break; } case "sftp": { const decryptedKey = await cryptoUtils.resolveSecret(config.privateKey); - const keyPath = path.join("/tmp", `zerobyte-ssh-${crypto.randomBytes(8).toString("hex")}`); + const keyPath = path.join( + "/tmp", + `zerobyte-ssh-${crypto.randomBytes(8).toString("hex")}`, + ); let normalizedKey = decryptedKey.replace(/\r\n/g, "\n"); if (!normalizedKey.endsWith("\n")) { @@ -148,8 +201,12 @@ export const buildEnv = async (config: RepositoryConfig, organizationId: string) } if (normalizedKey.includes("ENCRYPTED")) { - logger.error("SFTP: Private key appears to be passphrase-protected. Please use an unencrypted key."); - throw new Error("Passphrase-protected SSH keys are not supported. Please provide an unencrypted private key."); + logger.error( + "SFTP: Private key appears to be passphrase-protected. Please use an unencrypted key.", + ); + throw new Error( + "Passphrase-protected SSH keys are not supported. Please provide an unencrypted private key.", + ); } await fs.writeFile(keyPath, normalizedKey, { mode: 0o600 }); @@ -184,12 +241,25 @@ export const buildEnv = async (config: RepositoryConfig, organizationId: string) ]; if (config.skipHostKeyCheck || !config.knownHosts) { - sshArgs.push("-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null"); + sshArgs.push( + "-o", + "StrictHostKeyChecking=no", + "-o", + "UserKnownHostsFile=/dev/null", + ); } else if (config.knownHosts) { - const knownHostsPath = path.join("/tmp", `zerobyte-known-hosts-${crypto.randomBytes(8).toString("hex")}`); + const knownHostsPath = path.join( + "/tmp", + `zerobyte-known-hosts-${crypto.randomBytes(8).toString("hex")}`, + ); await fs.writeFile(knownHostsPath, config.knownHosts, { mode: 0o600 }); env._SFTP_KNOWN_HOSTS_PATH = knownHostsPath; - sshArgs.push("-o", "StrictHostKeyChecking=yes", "-o", `UserKnownHostsFile=${knownHostsPath}`); + sshArgs.push( + "-o", + "StrictHostKeyChecking=yes", + "-o", + `UserKnownHostsFile=${knownHostsPath}`, + ); } if (config.port && config.port !== 22) { @@ -204,7 +274,10 @@ export const buildEnv = async (config: RepositoryConfig, organizationId: string) if (config.cacert) { const decryptedCert = await cryptoUtils.resolveSecret(config.cacert); - const certPath = path.join("/tmp", `zerobyte-cacert-${crypto.randomBytes(8).toString("hex")}.pem`); + const certPath = path.join( + "/tmp", + `zerobyte-cacert-${crypto.randomBytes(8).toString("hex")}.pem`, + ); await fs.writeFile(certPath, decryptedCert, { mode: 0o600 }); env.RESTIC_CACERT = certPath; } @@ -216,7 +289,11 @@ export const buildEnv = async (config: RepositoryConfig, organizationId: string) return env; }; -const init = async (config: RepositoryConfig, organizationId: string, options?: { timeoutMs?: number }) => { +const init = async ( + config: RepositoryConfig, + organizationId: string, + options?: { timeoutMs?: number }, +) => { const repoUrl = buildRepoUrl(config); logger.info(`Initializing restic repository at ${repoUrl}...`); @@ -226,7 +303,12 @@ const init = async (config: RepositoryConfig, organizationId: string, options?: const args = ["init", "--repo", repoUrl]; addCommonArgs(args, env, config); - const res = await exec({ command: "restic", args, env, timeout: options?.timeoutMs ?? 20000 }); + const res = await exec({ + command: "restic", + args, + env, + timeout: options?.timeoutMs ?? 20000, + }); await cleanupTemporaryKeys(env); if (res.exitCode !== 0) { @@ -256,7 +338,13 @@ const backup = async ( const repoUrl = buildRepoUrl(config); const env = await buildEnv(config, options?.organizationId); - const args: string[] = ["--repo", repoUrl, "backup", "--compression", options?.compressionMode ?? "auto"]; + const args: string[] = [ + "--repo", + repoUrl, + "backup", + "--compression", + options?.compressionMode ?? "auto", + ]; if (options?.oneFileSystem) { args.push("--one-file-system"); @@ -274,7 +362,9 @@ const backup = async ( let includeFile: string | null = null; if (options?.include && options.include.length > 0) { - const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "zerobyte-restic-include-")); + const tmp = await fs.mkdtemp( + path.join(os.tmpdir(), "zerobyte-restic-include-"), + ); includeFile = path.join(tmp, `include.txt`); await fs.writeFile(includeFile, options.include.join("\n"), "utf-8"); @@ -290,7 +380,9 @@ const backup = async ( let excludeFile: string | null = null; if (options?.exclude && options.exclude.length > 0) { - const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "zerobyte-restic-exclude-")); + const tmp = await fs.mkdtemp( + path.join(os.tmpdir(), "zerobyte-restic-exclude-"), + ); excludeFile = path.join(tmp, `exclude.txt`); await fs.writeFile(excludeFile, options.exclude.join("\n"), "utf-8"); @@ -379,6 +471,17 @@ const backup = async ( return { result, exitCode: res.exitCode }; }; +const restoreProgressSchema = type({ + message_type: "'status'", + seconds_elapsed: "number", + percent_done: "number", + total_files: "number", + files_done: "number", + total_bytes: "number", + bytes_done: "number", +}); + +export type RestoreProgress = typeof restoreProgressSchema.infer; const restore = async ( config: RepositoryConfig, snapshotId: string, @@ -390,17 +493,30 @@ const restore = async ( excludeXattr?: string[]; delete?: boolean; overwrite?: OverwriteMode; + onProgress?: (progress: RestoreProgress) => void; + signal?: AbortSignal; }, ) => { const repoUrl = buildRepoUrl(config); const env = await buildEnv(config, options.organizationId); - const args: string[] = ["--repo", repoUrl, "restore", snapshotId, "--target", target]; + const args: string[] = [ + "--repo", + repoUrl, + "restore", + snapshotId, + "--target", + target, + ]; if (options?.overwrite) { args.push("--overwrite", options.overwrite); } + if (options?.delete) { + args.push("--delete"); + } + if (options?.include?.length) { for (const pattern of options.include) { args.push("--include", pattern); @@ -421,8 +537,32 @@ const restore = async ( addCommonArgs(args, env, config); + const streamProgress = throttle((data: string) => { + if (options?.onProgress) { + try { + const jsonData = JSON.parse(data); + const progress = restoreProgressSchema(jsonData); + if (!(progress instanceof type.errors)) { + options.onProgress(progress); + } + } catch (_) { + // Ignore JSON parse errors for non-JSON lines + } + } + }, 1000); + logger.debug(`Executing: restic ${args.join(" ")}`); - const res = await safeSpawn({ command: "restic", args, env }); + const res = await safeSpawn({ + command: "restic", + args, + env, + signal: options?.signal, + onStdout: (data) => { + if (options?.onProgress) { + streamProgress(data); + } + }, + }); await cleanupTemporaryKeys(env); @@ -432,21 +572,27 @@ const restore = async ( } const lastLine = res.summary.trim(); - let summaryLine = ""; + let summaryLine: unknown = {}; try { - const resSummary = JSON.parse(lastLine ?? "{}"); - summaryLine = resSummary; + summaryLine = JSON.parse(lastLine ?? "{}"); } catch (_) { - logger.warn("Failed to parse restic backup output JSON summary.", lastLine); - summaryLine = "{}"; + logger.warn( + "Failed to parse restic restore output JSON summary.", + lastLine, + ); + summaryLine = {}; } - logger.debug(`Restic restore output last line: ${summaryLine}`); + logger.debug( + `Restic restore output last line: ${JSON.stringify(summaryLine)}`, + ); const result = resticRestoreOutputSchema(summaryLine); if (result instanceof type.errors) { logger.warn(`Restic restore output validation failed: ${result.summary}`); - logger.info(`Restic restore completed for snapshot ${snapshotId} to target ${target}`); + logger.info( + `Restic restore completed for snapshot ${snapshotId} to target ${target}`, + ); const fallback: ResticRestoreOutputDto = { message_type: "summary" as const, total_files: 0, @@ -465,7 +611,10 @@ const restore = async ( return result; }; -const snapshots = async (config: RepositoryConfig, options: { tags?: string[]; organizationId: string }) => { +const snapshots = async ( + config: RepositoryConfig, + options: { tags?: string[]; organizationId: string }, +) => { const { tags, organizationId } = options; const repoUrl = buildRepoUrl(config); @@ -492,8 +641,12 @@ const snapshots = async (config: RepositoryConfig, options: { tags?: string[]; o const result = snapshotInfoSchema.array()(JSON.parse(res.stdout)); if (result instanceof type.errors) { - logger.error(`Restic snapshots output validation failed: ${result.summary}`); - throw new Error(`Restic snapshots output validation failed: ${result.summary}`); + logger.error( + `Restic snapshots output validation failed: ${result.summary}`, + ); + throw new Error( + `Restic snapshots output validation failed: ${result.summary}`, + ); } return result; @@ -540,7 +693,15 @@ const forget = async ( const repoUrl = buildRepoUrl(config); const env = await buildEnv(config, extra.organizationId); - const args: string[] = ["--repo", repoUrl, "forget", "--group-by", "tags", "--tag", extra.tag]; + const args: string[] = [ + "--repo", + repoUrl, + "forget", + "--group-by", + "tags", + "--tag", + extra.tag, + ]; if (extra.dryRun) { args.push("--dry-run", "--no-lock"); @@ -583,12 +744,18 @@ const forget = async ( } const lines = res.stdout.split("\n").filter((line) => line.trim()); - const result = extra.dryRun ? safeJsonParse(lines.at(-1) ?? "[]") : null; + const result = extra.dryRun + ? safeJsonParse(lines.at(-1) ?? "[]") + : null; return { success: true, data: result }; }; -const deleteSnapshots = async (config: RepositoryConfig, snapshotIds: string[], organizationId: string) => { +const deleteSnapshots = async ( + config: RepositoryConfig, + snapshotIds: string[], + organizationId: string, +) => { const repoUrl = buildRepoUrl(config); const env = await buildEnv(config, organizationId); @@ -596,7 +763,13 @@ const deleteSnapshots = async (config: RepositoryConfig, snapshotIds: string[], throw new Error("No snapshot IDs provided for deletion."); } - const args: string[] = ["--repo", repoUrl, "forget", ...snapshotIds, "--prune"]; + const args: string[] = [ + "--repo", + repoUrl, + "forget", + ...snapshotIds, + "--prune", + ]; addCommonArgs(args, env, config); const res = await exec({ command: "restic", args, env }); @@ -610,7 +783,11 @@ const deleteSnapshots = async (config: RepositoryConfig, snapshotIds: string[], return { success: true }; }; -const deleteSnapshot = async (config: RepositoryConfig, snapshotId: string, organizationId: string) => { +const deleteSnapshot = async ( + config: RepositoryConfig, + snapshotId: string, + organizationId: string, +) => { return deleteSnapshots(config, [snapshotId], organizationId); }; @@ -777,14 +954,22 @@ const ls = async ( }; }; -const unlock = async (config: RepositoryConfig, options: { signal?: AbortSignal; organizationId: string }) => { +const unlock = async ( + config: RepositoryConfig, + options: { signal?: AbortSignal; organizationId: string }, +) => { const repoUrl = buildRepoUrl(config); const env = await buildEnv(config, options.organizationId); const args = ["unlock", "--repo", repoUrl, "--remove-all"]; addCommonArgs(args, env, config); - const res = await exec({ command: "restic", args, env, signal: options?.signal }); + const res = await exec({ + command: "restic", + args, + env, + signal: options?.signal, + }); await cleanupTemporaryKeys(env); if (options?.signal?.aborted) { @@ -816,12 +1001,22 @@ const check = async ( addCommonArgs(args, env, config); - const res = await exec({ command: "restic", args, env, signal: options?.signal }); + const res = await exec({ + command: "restic", + args, + env, + signal: options?.signal, + }); await cleanupTemporaryKeys(env); if (options?.signal?.aborted) { logger.warn("Restic check was aborted by signal."); - return { success: false, hasErrors: true, output: "", error: "Operation aborted" }; + return { + success: false, + hasErrors: true, + output: "", + error: "Operation aborted", + }; } const { stdout, stderr } = res; @@ -847,14 +1042,22 @@ const check = async ( }; }; -const repairIndex = async (config: RepositoryConfig, options: { signal?: AbortSignal; organizationId: string }) => { +const repairIndex = async ( + config: RepositoryConfig, + options: { signal?: AbortSignal; organizationId: string }, +) => { const repoUrl = buildRepoUrl(config); const env = await buildEnv(config, options.organizationId); const args = ["repair", "index", "--repo", repoUrl]; addCommonArgs(args, env, config); - const res = await exec({ command: "restic", args, env, signal: options?.signal }); + const res = await exec({ + command: "restic", + args, + env, + signal: options?.signal, + }); await cleanupTemporaryKeys(env); if (options?.signal?.aborted) { @@ -894,7 +1097,13 @@ const copy = async ( RESTIC_FROM_PASSWORD_FILE: sourceEnv.RESTIC_PASSWORD_FILE, }; - const args: string[] = ["--repo", destRepoUrl, "copy", "--from-repo", sourceRepoUrl]; + const args: string[] = [ + "--repo", + destRepoUrl, + "copy", + "--from-repo", + sourceRepoUrl, + ]; if (options.tag) { args.push("--tag", options.tag); @@ -1023,7 +1232,12 @@ export const restic = { }; export const cleanupTemporaryKeys = async (env: Record) => { - const keysToClean = ["_SFTP_KEY_PATH", "_SFTP_KNOWN_HOSTS_PATH", "RESTIC_CACERT", "GOOGLE_APPLICATION_CREDENTIALS"]; + const keysToClean = [ + "_SFTP_KEY_PATH", + "_SFTP_KNOWN_HOSTS_PATH", + "RESTIC_CACERT", + "GOOGLE_APPLICATION_CREDENTIALS", + ]; for (const key of keysToClean) { if (env[key]) { @@ -1032,7 +1246,10 @@ export const cleanupTemporaryKeys = async (env: Record) => { } // Clean up custom password files - if (env.RESTIC_PASSWORD_FILE && env.RESTIC_PASSWORD_FILE !== RESTIC_PASS_FILE) { + if ( + env.RESTIC_PASSWORD_FILE && + env.RESTIC_PASSWORD_FILE !== RESTIC_PASS_FILE + ) { await fs.unlink(env.RESTIC_PASSWORD_FILE).catch(() => {}); } }; From a91f9949669d83a99b72de206f0a5c08e8f20606 Mon Sep 17 00:00:00 2001 From: Nicolas Meienberger Date: Fri, 13 Feb 2026 22:45:29 +0100 Subject: [PATCH 2/4] feat: keep restore progress on reload --- app/client/components/restore-form.tsx | 99 ++++++++++++++++++++-- app/client/components/restore-progress.tsx | 6 +- 2 files changed, 92 insertions(+), 13 deletions(-) diff --git a/app/client/components/restore-form.tsx b/app/client/components/restore-form.tsx index f9cd2c36..b6877566 100644 --- a/app/client/components/restore-form.tsx +++ b/app/client/components/restore-form.tsx @@ -1,18 +1,28 @@ -import { useCallback, useState } from "react"; +import { useCallback, useEffect, useState } from "react"; import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query"; -import { toast } from "sonner"; import { ChevronDown, FileIcon, FolderOpen, RotateCcw } from "lucide-react"; import { Button } from "~/client/components/ui/button"; import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "~/client/components/ui/card"; import { Input } from "~/client/components/ui/input"; import { Label } from "~/client/components/ui/label"; import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from "~/client/components/ui/select"; +import { + AlertDialog, + AlertDialogAction, + AlertDialogContent, + AlertDialogDescription, + AlertDialogFooter, + AlertDialogHeader, + AlertDialogTitle, +} from "~/client/components/ui/alert-dialog"; import { PathSelector } from "~/client/components/path-selector"; import { FileTree } from "~/client/components/file-tree"; import { RestoreProgress } from "~/client/components/restore-progress"; import { listSnapshotFilesOptions, restoreSnapshotMutation } from "~/client/api-client/@tanstack/react-query.gen"; import { useFileBrowser } from "~/client/hooks/use-file-browser"; +import { useServerEvents } from "~/client/hooks/use-server-events"; import { OVERWRITE_MODES, type OverwriteMode } from "~/schemas/restic"; +import type { RestoreCompletedEventDto, RestoreProgressEventDto } from "~/schemas/events-dto"; import type { Repository, Snapshot } from "~/client/lib/types"; import { handleRepositoryError } from "~/client/lib/errors"; import { useNavigate } from "@tanstack/react-router"; @@ -28,6 +38,7 @@ interface RestoreFormProps { export function RestoreForm({ snapshot, repository, snapshotId, returnPath }: RestoreFormProps) { const navigate = useNavigate(); + const { addEventListener } = useServerEvents(); const queryClient = useQueryClient(); const volumeBasePath = snapshot.paths[0]?.match(/^(.*?_data)(\/|$)/)?.[1] || "/"; @@ -37,9 +48,48 @@ export function RestoreForm({ snapshot, repository, snapshotId, returnPath }: Re const [overwriteMode, setOverwriteMode] = useState("always"); const [showAdvanced, setShowAdvanced] = useState(false); const [excludeXattr, setExcludeXattr] = useState(""); + const [isRestoreActive, setIsRestoreActive] = useState(false); + const [restoreResult, setRestoreResult] = useState(null); + const [showRestoreResultAlert, setShowRestoreResultAlert] = useState(false); const [selectedPaths, setSelectedPaths] = useState>(new Set()); + useEffect(() => { + const unsubscribeStarted = addEventListener("restore:started", (data) => { + const startedData = data as { + repositoryId: string; + snapshotId: string; + }; + if (startedData.repositoryId === repository.id && startedData.snapshotId === snapshotId) { + setIsRestoreActive(true); + setRestoreResult(null); + setShowRestoreResultAlert(false); + } + }); + + const unsubscribeProgress = addEventListener("restore:progress", (data) => { + const progressData = data as RestoreProgressEventDto; + if (progressData.repositoryId === repository.id && progressData.snapshotId === snapshotId) { + setIsRestoreActive(true); + } + }); + + const unsubscribeCompleted = addEventListener("restore:completed", (data) => { + const completedData = data as RestoreCompletedEventDto; + if (completedData.repositoryId === repository.id && completedData.snapshotId === snapshotId) { + setIsRestoreActive(false); + setRestoreResult(completedData); + setShowRestoreResultAlert(true); + } + }); + + return () => { + unsubscribeStarted(); + unsubscribeProgress(); + unsubscribeCompleted(); + }; + }, [addEventListener, repository.id, snapshotId]); + const { data: filesData, isLoading: filesLoading } = useQuery({ ...listSnapshotFilesOptions({ path: { id: repository.id, snapshotId }, @@ -98,11 +148,8 @@ export function RestoreForm({ snapshot, repository, snapshotId, returnPath }: Re const { mutate: restoreSnapshot, isPending: isRestoring } = useMutation({ ...restoreSnapshotMutation(), - onSuccess: () => { - toast.success("Restore completed"); - void navigate({ to: returnPath }); - }, onError: (error) => { + setIsRestoreActive(false); handleRepositoryError("Restore failed", error, repository.id); }, }); @@ -119,6 +166,10 @@ export function RestoreForm({ snapshot, repository, snapshotId, returnPath }: Re const pathsArray = Array.from(selectedPaths); const includePaths = pathsArray.map((path) => addBasePath(path)); + setIsRestoreActive(true); + setRestoreResult(null); + setShowRestoreResultAlert(false); + restoreSnapshot({ path: { id: repository.id }, body: { @@ -141,7 +192,19 @@ export function RestoreForm({ snapshot, repository, snapshotId, returnPath }: Re restoreSnapshot, ]); + const acknowledgeRestoreResult = useCallback(() => { + setShowRestoreResultAlert(false); + setRestoreResult(null); + }, []); + + const handleResultAlertOpenChange = useCallback((open: boolean) => { + if (open) { + setShowRestoreResultAlert(true); + } + }, []); + const canRestore = restoreLocation === "original" || customTargetPath.trim(); + const isRestoreRunning = isRestoring || isRestoreActive; return (
@@ -156,9 +219,9 @@ export function RestoreForm({ snapshot, repository, snapshotId, returnPath }: Re -

Speed

-

- {progress.seconds_elapsed > 0 ? `${speed.text} ${speed.unit}/s` : "Calculating..."} -

+

{speed ? `${speed.text} ${speed.unit}/s` : "Calculating..."}

From 00ddfe7f30f20e842d611969de20e7c087e294eb Mon Sep 17 00:00:00 2001 From: Nicolas Meienberger Date: Fri, 13 Feb 2026 23:12:24 +0100 Subject: [PATCH 3/4] refactor: centralize sse event types --- app/client/components/restore-form.tsx | 27 +- app/client/components/restore-progress.tsx | 21 +- app/client/hooks/use-server-events.ts | 283 +++++----------- .../components/backup-progress-card.tsx | 11 +- .../components/schedule-mirrors-config.tsx | 6 +- app/schemas/events-dto.ts | 9 +- app/schemas/server-events.ts | 64 ++++ app/server/core/events.ts | 57 +--- .../repositories/repositories.controller.ts | 2 - .../repositories/repositories.service.ts | 253 +++------------ app/server/utils/restic.ts | 301 ++++-------------- 11 files changed, 286 insertions(+), 748 deletions(-) create mode 100644 app/schemas/server-events.ts diff --git a/app/client/components/restore-form.tsx b/app/client/components/restore-form.tsx index b6877566..4fcc334d 100644 --- a/app/client/components/restore-form.tsx +++ b/app/client/components/restore-form.tsx @@ -1,4 +1,4 @@ -import { useCallback, useEffect, useState } from "react"; +import { useCallback, useEffect, useRef, useState } from "react"; import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query"; import { ChevronDown, FileIcon, FolderOpen, RotateCcw } from "lucide-react"; import { Button } from "~/client/components/ui/button"; @@ -20,9 +20,8 @@ import { FileTree } from "~/client/components/file-tree"; import { RestoreProgress } from "~/client/components/restore-progress"; import { listSnapshotFilesOptions, restoreSnapshotMutation } from "~/client/api-client/@tanstack/react-query.gen"; import { useFileBrowser } from "~/client/hooks/use-file-browser"; -import { useServerEvents } from "~/client/hooks/use-server-events"; +import { type RestoreCompletedEvent, useServerEvents } from "~/client/hooks/use-server-events"; import { OVERWRITE_MODES, type OverwriteMode } from "~/schemas/restic"; -import type { RestoreCompletedEventDto, RestoreProgressEventDto } from "~/schemas/events-dto"; import type { Repository, Snapshot } from "~/client/lib/types"; import { handleRepositoryError } from "~/client/lib/errors"; import { useNavigate } from "@tanstack/react-router"; @@ -49,34 +48,34 @@ export function RestoreForm({ snapshot, repository, snapshotId, returnPath }: Re const [showAdvanced, setShowAdvanced] = useState(false); const [excludeXattr, setExcludeXattr] = useState(""); const [isRestoreActive, setIsRestoreActive] = useState(false); - const [restoreResult, setRestoreResult] = useState(null); + const [restoreResult, setRestoreResult] = useState(null); const [showRestoreResultAlert, setShowRestoreResultAlert] = useState(false); + const restoreCompletedRef = useRef(false); const [selectedPaths, setSelectedPaths] = useState>(new Set()); useEffect(() => { - const unsubscribeStarted = addEventListener("restore:started", (data) => { - const startedData = data as { - repositoryId: string; - snapshotId: string; - }; + const unsubscribeStarted = addEventListener("restore:started", (startedData) => { if (startedData.repositoryId === repository.id && startedData.snapshotId === snapshotId) { + restoreCompletedRef.current = false; setIsRestoreActive(true); setRestoreResult(null); setShowRestoreResultAlert(false); } }); - const unsubscribeProgress = addEventListener("restore:progress", (data) => { - const progressData = data as RestoreProgressEventDto; + const unsubscribeProgress = addEventListener("restore:progress", (progressData) => { if (progressData.repositoryId === repository.id && progressData.snapshotId === snapshotId) { + if (restoreCompletedRef.current) { + return; + } setIsRestoreActive(true); } }); - const unsubscribeCompleted = addEventListener("restore:completed", (data) => { - const completedData = data as RestoreCompletedEventDto; + const unsubscribeCompleted = addEventListener("restore:completed", (completedData) => { if (completedData.repositoryId === repository.id && completedData.snapshotId === snapshotId) { + restoreCompletedRef.current = true; setIsRestoreActive(false); setRestoreResult(completedData); setShowRestoreResultAlert(true); @@ -149,6 +148,7 @@ export function RestoreForm({ snapshot, repository, snapshotId, returnPath }: Re const { mutate: restoreSnapshot, isPending: isRestoring } = useMutation({ ...restoreSnapshotMutation(), onError: (error) => { + restoreCompletedRef.current = true; setIsRestoreActive(false); handleRepositoryError("Restore failed", error, repository.id); }, @@ -166,6 +166,7 @@ export function RestoreForm({ snapshot, repository, snapshotId, returnPath }: Re const pathsArray = Array.from(selectedPaths); const includePaths = pathsArray.map((path) => addBasePath(path)); + restoreCompletedRef.current = false; setIsRestoreActive(true); setRestoreResult(null); setShowRestoreResultAlert(false); diff --git a/app/client/components/restore-progress.tsx b/app/client/components/restore-progress.tsx index e5a80f1f..1fa63eeb 100644 --- a/app/client/components/restore-progress.tsx +++ b/app/client/components/restore-progress.tsx @@ -2,8 +2,7 @@ import { useEffect, useState } from "react"; import { ByteSize } from "~/client/components/bytes-size"; import { Card } from "~/client/components/ui/card"; import { Progress } from "~/client/components/ui/progress"; -import { useServerEvents } from "~/client/hooks/use-server-events"; -import type { RestoreCompletedEventDto, RestoreProgressEventDto } from "~/schemas/events-dto"; +import { type RestoreProgressEvent, useServerEvents } from "~/client/hooks/use-server-events"; import { formatBytes } from "~/utils/format-bytes"; import { formatDuration } from "~/utils/utils"; @@ -14,18 +13,16 @@ type Props = { export const RestoreProgress = ({ repositoryId, snapshotId }: Props) => { const { addEventListener } = useServerEvents(); - const [progress, setProgress] = useState(null); + const [progress, setProgress] = useState(null); useEffect(() => { - const unsubscribe = addEventListener("restore:progress", (data) => { - const progressData = data as RestoreProgressEventDto; + const unsubscribe = addEventListener("restore:progress", (progressData) => { if (progressData.repositoryId === repositoryId && progressData.snapshotId === snapshotId) { setProgress(progressData); } }); - const unsubscribeComplete = addEventListener("restore:completed", (data) => { - const completedData = data as RestoreCompletedEventDto; + const unsubscribeComplete = addEventListener("restore:completed", (completedData) => { if (completedData.repositoryId === repositoryId && completedData.snapshotId === snapshotId) { setProgress(null); } @@ -49,7 +46,7 @@ export const RestoreProgress = ({ repositoryId, snapshotId }: Props) => { } const percentDone = Math.round(progress.percent_done * 100); - const speed = progress.seconds_elapsed > 0 ? formatBytes(progress.bytes_done / progress.seconds_elapsed) : null; + const speed = progress.seconds_elapsed > 0 ? formatBytes(progress.bytes_restored / progress.seconds_elapsed) : null; return ( @@ -67,13 +64,17 @@ export const RestoreProgress = ({ repositoryId, snapshotId }: Props) => {

Files

- {progress.files_done.toLocaleString()} / {progress.total_files.toLocaleString()} + {progress.files_restored.toLocaleString()} +  /  + {progress.total_files.toLocaleString()}

Data

- / + +  /  +

diff --git a/app/client/hooks/use-server-events.ts b/app/client/hooks/use-server-events.ts index 0c1eb46e..df3a82ce 100644 --- a/app/client/hooks/use-server-events.ts +++ b/app/client/hooks/use-server-events.ts @@ -1,62 +1,38 @@ import { useQueryClient } from "@tanstack/react-query"; import { useCallback, useEffect, useRef } from "react"; -import type { - BackupCompletedEventDto, - BackupProgressEventDto, - BackupStartedEventDto, - RestoreCompletedEventDto, - RestoreProgressEventDto, - RestoreStartedEventDto, -} from "~/schemas/events-dto"; - -type ServerEventType = - | "connected" - | "heartbeat" - | "backup:started" - | "backup:progress" - | "backup:completed" - | "volume:mounted" - | "volume:unmounted" - | "volume:updated" - | "mirror:started" - | "mirror:completed" - | "restore:started" - | "restore:progress" - | "restore:completed" - | "doctor:started" - | "doctor:completed" - | "doctor:cancelled"; - -export interface VolumeEvent { - volumeName: string; -} - -export interface MirrorEvent { - scheduleId: number; - repositoryId: string; - repositoryName: string; - status?: "success" | "error" | "in_progress"; - error?: string; -} - -export interface DoctorEvent { - repositoryId: string; - repositoryName: string; - error?: string; -} - -export interface DoctorCompletedEvent extends DoctorEvent { - success: boolean; - completedAt: number; - steps: Array<{ - step: string; - success: boolean; - output: string | null; - error: string | null; - }>; -} - -type EventHandler = (data: unknown) => void; +import { serverEventNames, type ServerEventPayloadMap } from "~/schemas/server-events"; + +type LifecycleEventPayloadMap = { + connected: { type: "connected"; timestamp: number }; + heartbeat: { timestamp: number }; +}; + +type ServerEventsPayloadMap = LifecycleEventPayloadMap & ServerEventPayloadMap; +type ServerEventType = keyof ServerEventsPayloadMap; + +type EventHandler = (data: ServerEventsPayloadMap[T]) => void; +type EventHandlerSet = Set>; +type EventHandlerMap = { + [K in ServerEventType]?: EventHandlerSet; +}; + +const invalidatingEvents = new Set([ + "backup:completed", + "volume:updated", + "volume:status_changed", + "mirror:completed", + "doctor:started", + "doctor:completed", + "doctor:cancelled", +]); + +export type RestoreEvent = ServerEventsPayloadMap["restore:started"] | ServerEventsPayloadMap["restore:completed"]; +export type RestoreProgressEvent = ServerEventsPayloadMap["restore:progress"]; +export type RestoreCompletedEvent = ServerEventsPayloadMap["restore:completed"]; +export type BackupProgressEvent = ServerEventsPayloadMap["backup:progress"]; + +const parseEventData = (event: Event): ServerEventsPayloadMap[T] => + JSON.parse((event as MessageEvent).data) as ServerEventsPayloadMap[T]; /** * Hook to listen to Server-Sent Events (SSE) from the backend @@ -65,166 +41,51 @@ type EventHandler = (data: unknown) => void; export function useServerEvents() { const queryClient = useQueryClient(); const eventSourceRef = useRef(null); - const handlersRef = useRef>>(new Map()); + const handlersRef = useRef({}); + const emit = useCallback((eventName: T, data: ServerEventsPayloadMap[T]) => { + const handlers = handlersRef.current[eventName] as EventHandlerSet | undefined; + handlers?.forEach((handler) => { + handler(data); + }); + }, []); useEffect(() => { const eventSource = new EventSource("/api/v1/events"); eventSourceRef.current = eventSource; - eventSource.addEventListener("connected", () => { + eventSource.addEventListener("connected", (event) => { + const data = parseEventData<"connected">(event); console.info("[SSE] Connected to server events"); + emit("connected", data); }); - eventSource.addEventListener("heartbeat", () => {}); - - eventSource.addEventListener("backup:started", (e) => { - const data = JSON.parse(e.data) as BackupStartedEventDto; - console.info("[SSE] Backup started:", data); - - handlersRef.current.get("backup:started")?.forEach((handler) => { - handler(data); - }); - }); - - eventSource.addEventListener("backup:progress", (e) => { - const data = JSON.parse(e.data) as BackupProgressEventDto; - - handlersRef.current.get("backup:progress")?.forEach((handler) => { - handler(data); - }); - }); - - eventSource.addEventListener("backup:completed", (e) => { - const data = JSON.parse(e.data) as BackupCompletedEventDto; - console.info("[SSE] Backup completed:", data); - - void queryClient.invalidateQueries(); - void queryClient.refetchQueries(); - - handlersRef.current.get("backup:completed")?.forEach((handler) => { - handler(data); - }); - }); - - eventSource.addEventListener("volume:mounted", (e) => { - const data = JSON.parse(e.data) as VolumeEvent; - console.info("[SSE] Volume mounted:", data); - - handlersRef.current.get("volume:mounted")?.forEach((handler) => { - handler(data); - }); - }); - - eventSource.addEventListener("volume:unmounted", (e) => { - const data = JSON.parse(e.data) as VolumeEvent; - console.info("[SSE] Volume unmounted:", data); - - handlersRef.current.get("volume:unmounted")?.forEach((handler) => { - handler(data); - }); - }); - - eventSource.addEventListener("volume:updated", (e) => { - const data = JSON.parse(e.data) as VolumeEvent; - console.info("[SSE] Volume updated:", data); - - void queryClient.invalidateQueries(); - - handlersRef.current.get("volume:updated")?.forEach((handler) => { - handler(data); - }); - }); - - eventSource.addEventListener("volume:status_changed", (e) => { - const data = JSON.parse(e.data) as VolumeEvent; - console.info("[SSE] Volume status updated:", data); - - void queryClient.invalidateQueries(); - - handlersRef.current.get("volume:updated")?.forEach((handler) => { - handler(data); - }); - }); - - eventSource.addEventListener("mirror:started", (e) => { - const data = JSON.parse(e.data) as MirrorEvent; - console.info("[SSE] Mirror copy started:", data); - - handlersRef.current.get("mirror:started")?.forEach((handler) => { - handler(data); - }); - }); - - eventSource.addEventListener("mirror:completed", (e) => { - const data = JSON.parse(e.data) as MirrorEvent; - console.info("[SSE] Mirror copy completed:", data); - - // Invalidate queries to refresh mirror status in the UI - void queryClient.invalidateQueries(); - - handlersRef.current.get("mirror:completed")?.forEach((handler) => { - handler(data); - }); - }); - - eventSource.addEventListener("restore:started", (e) => { - const data = JSON.parse(e.data) as RestoreStartedEventDto; - console.info("[SSE] Restore started:", data); - - handlersRef.current.get("restore:started")?.forEach((handler) => { - handler(data); - }); - }); - - eventSource.addEventListener("restore:progress", (e) => { - const data = JSON.parse(e.data) as RestoreProgressEventDto; - - handlersRef.current.get("restore:progress")?.forEach((handler) => { - handler(data); - }); - }); - - eventSource.addEventListener("restore:completed", (e) => { - const data = JSON.parse(e.data) as RestoreCompletedEventDto; - console.info("[SSE] Restore completed:", data); - - handlersRef.current.get("restore:completed")?.forEach((handler) => { - handler(data); - }); - }); - - eventSource.addEventListener("doctor:started", (e) => { - const data = JSON.parse(e.data) as DoctorEvent; - console.info("[SSE] Doctor started:", data); - - void queryClient.invalidateQueries(); - - handlersRef.current.get("doctor:started")?.forEach((handler) => { - handler(data); - }); + eventSource.addEventListener("heartbeat", (event) => { + emit("heartbeat", parseEventData<"heartbeat">(event)); }); - eventSource.addEventListener("doctor:completed", (e) => { - const data = JSON.parse(e.data) as DoctorCompletedEvent; - console.info("[SSE] Doctor completed:", data); + for (const eventName of serverEventNames) { + eventSource.addEventListener(eventName, (event) => { + const data = parseEventData(event); + console.info(`[SSE] ${eventName}:`, data); - void queryClient.invalidateQueries(); + if (invalidatingEvents.has(eventName)) { + void queryClient.invalidateQueries(); + } - handlersRef.current.get("doctor:completed")?.forEach((handler) => { - handler(data); - }); - }); + if (eventName === "backup:completed") { + void queryClient.refetchQueries(); + } - eventSource.addEventListener("doctor:cancelled", (e) => { - const data = JSON.parse(e.data) as DoctorEvent; - console.info("[SSE] Doctor cancelled:", data); + if (eventName === "volume:status_changed") { + const statusData = data as ServerEventsPayloadMap["volume:status_changed"]; + emit("volume:status_changed", statusData); + emit("volume:updated", statusData); + return; + } - void queryClient.invalidateQueries(); - - handlersRef.current.get("doctor:cancelled")?.forEach((handler) => { - handler(data); + emit(eventName, data); }); - }); + } eventSource.onerror = (error) => { console.error("[SSE] Connection error:", error); @@ -235,16 +96,20 @@ export function useServerEvents() { eventSource.close(); eventSourceRef.current = null; }; - }, [queryClient]); + }, [emit, queryClient]); - const addEventListener = useCallback((event: ServerEventType, handler: EventHandler) => { - if (!handlersRef.current.has(event)) { - handlersRef.current.set(event, new Set()); - } - handlersRef.current.get(event)?.add(handler); + const addEventListener = useCallback((eventName: T, handler: EventHandler) => { + const existingHandlers = handlersRef.current[eventName] as EventHandlerSet | undefined; + const eventHandlers = existingHandlers ?? new Set>(); + eventHandlers.add(handler); + handlersRef.current[eventName] = eventHandlers as EventHandlerMap[T]; return () => { - handlersRef.current.get(event)?.delete(handler); + const handlers = handlersRef.current[eventName] as EventHandlerSet | undefined; + handlers?.delete(handler); + if (handlers && handlers.size === 0) { + delete handlersRef.current[eventName]; + } }; }, []); diff --git a/app/client/modules/backups/components/backup-progress-card.tsx b/app/client/modules/backups/components/backup-progress-card.tsx index 62cfb2d3..19df47b9 100644 --- a/app/client/modules/backups/components/backup-progress-card.tsx +++ b/app/client/modules/backups/components/backup-progress-card.tsx @@ -2,8 +2,7 @@ import { useEffect, useState } from "react"; import { ByteSize } from "~/client/components/bytes-size"; import { Card } from "~/client/components/ui/card"; import { Progress } from "~/client/components/ui/progress"; -import { useServerEvents } from "~/client/hooks/use-server-events"; -import type { BackupProgressEventDto } from "~/schemas/events-dto"; +import { type BackupProgressEvent, useServerEvents } from "~/client/hooks/use-server-events"; import { formatDuration } from "~/utils/utils"; import { formatBytes } from "~/utils/format-bytes"; @@ -13,18 +12,16 @@ type Props = { export const BackupProgressCard = ({ scheduleId }: Props) => { const { addEventListener } = useServerEvents(); - const [progress, setProgress] = useState(null); + const [progress, setProgress] = useState(null); useEffect(() => { - const unsubscribe = addEventListener("backup:progress", (data) => { - const progressData = data as BackupProgressEventDto; + const unsubscribe = addEventListener("backup:progress", (progressData) => { if (progressData.scheduleId === scheduleId) { setProgress(progressData); } }); - const unsubscribeComplete = addEventListener("backup:completed", (data) => { - const completedData = data as { scheduleId: number }; + const unsubscribeComplete = addEventListener("backup:completed", (completedData) => { if (completedData.scheduleId === scheduleId) { setProgress(null); } diff --git a/app/client/modules/backups/components/schedule-mirrors-config.tsx b/app/client/modules/backups/components/schedule-mirrors-config.tsx index 3fa4e940..00563381 100644 --- a/app/client/modules/backups/components/schedule-mirrors-config.tsx +++ b/app/client/modules/backups/components/schedule-mirrors-config.tsx @@ -99,8 +99,7 @@ export const ScheduleMirrorsConfig = ({ scheduleId, primaryRepositoryId, reposit }, [compatibility]); useEffect(() => { - const unsubscribeStarted = addEventListener("mirror:started", (data) => { - const event = data as { scheduleId: number; repositoryId: string }; + const unsubscribeStarted = addEventListener("mirror:started", (event) => { if (event.scheduleId !== scheduleId) return; setAssignments((prev) => { const next = new Map(prev); @@ -111,8 +110,7 @@ export const ScheduleMirrorsConfig = ({ scheduleId, primaryRepositoryId, reposit }); }); - const unsubscribeCompleted = addEventListener("mirror:completed", (data) => { - const event = data as { scheduleId: number; repositoryId: string; status?: "success" | "error"; error?: string }; + const unsubscribeCompleted = addEventListener("mirror:completed", (event) => { if (event.scheduleId !== scheduleId) return; setAssignments((prev) => { const next = new Map(prev); diff --git a/app/schemas/events-dto.ts b/app/schemas/events-dto.ts index f8ce1ed1..6e7ccca7 100644 --- a/app/schemas/events-dto.ts +++ b/app/schemas/events-dto.ts @@ -23,9 +23,9 @@ const restoreProgressMetricsSchema = type({ seconds_elapsed: "number", percent_done: "number", total_files: "number", - files_done: "number", + files_restored: "number", total_bytes: "number", - bytes_done: "number", + bytes_restored: "number", }); export const backupStartedEventSchema = backupEventBaseSchema; @@ -44,10 +44,7 @@ export const restoreStartedEventSchema = restoreEventBaseSchema; export const restoreProgressEventSchema = restoreEventBaseSchema.and(restoreProgressMetricsSchema); export const restoreCompletedEventSchema = restoreEventBaseSchema.and( - type({ - status: restoreEventStatusSchema, - error: "string?", - }), + type({ status: restoreEventStatusSchema, error: "string?" }), ); export const serverBackupStartedEventSchema = organizationScopedSchema.and(backupStartedEventSchema); diff --git a/app/schemas/server-events.ts b/app/schemas/server-events.ts new file mode 100644 index 00000000..5397a267 --- /dev/null +++ b/app/schemas/server-events.ts @@ -0,0 +1,64 @@ +import type { + ServerBackupCompletedEventDto, + ServerBackupProgressEventDto, + ServerBackupStartedEventDto, + ServerRestoreCompletedEventDto, + ServerRestoreProgressEventDto, + ServerRestoreStartedEventDto, +} from "~/schemas/events-dto"; +import type { DoctorResult } from "~/schemas/restic"; + +const payload = () => undefined as unknown as T; + +/** + * Single runtime registry for all broadcastable server events. + * Used as source-of-truth for both event names and payload typing. + */ +export const serverEventPayloads = { + "backup:started": payload(), + "backup:progress": payload(), + "backup:completed": payload(), + "restore:started": payload(), + "restore:progress": payload(), + "restore:completed": payload(), + "mirror:started": payload<{ + organizationId: string; + scheduleId: number; + repositoryId: string; + repositoryName: string; + }>(), + "mirror:completed": payload<{ + organizationId: string; + scheduleId: number; + repositoryId: string; + repositoryName: string; + status: "success" | "error"; + error?: string; + }>(), + "volume:mounted": payload<{ organizationId: string; volumeName: string }>(), + "volume:unmounted": payload<{ organizationId: string; volumeName: string }>(), + "volume:updated": payload<{ organizationId: string; volumeName: string }>(), + "volume:status_changed": payload<{ organizationId: string; volumeName: string; status: string }>(), + "doctor:started": payload<{ organizationId: string; repositoryId: string; repositoryName: string }>(), + "doctor:completed": payload< + { + organizationId: string; + repositoryId: string; + repositoryName: string; + } & DoctorResult + >(), + "doctor:cancelled": payload<{ + organizationId: string; + repositoryId: string; + repositoryName: string; + error?: string; + }>(), +} as const; + +export type ServerEventPayloadMap = typeof serverEventPayloads; + +export type ServerEventHandlers = { + [K in keyof ServerEventPayloadMap]: (data: ServerEventPayloadMap[K]) => void; +}; + +export const serverEventNames = Object.keys(serverEventPayloads) as Array; diff --git a/app/server/core/events.ts b/app/server/core/events.ts index c6869dd5..5b4d026b 100644 --- a/app/server/core/events.ts +++ b/app/server/core/events.ts @@ -1,61 +1,10 @@ import { EventEmitter } from "node:events"; import type { TypedEmitter } from "tiny-typed-emitter"; -import type { - ServerBackupCompletedEventDto, - ServerBackupProgressEventDto, - ServerBackupStartedEventDto, - ServerRestoreCompletedEventDto, - ServerRestoreProgressEventDto, - ServerRestoreStartedEventDto, -} from "~/schemas/events-dto"; -import type { DoctorResult } from "~/schemas/restic"; - -/** - * Event payloads for the SSE system - */ -interface ServerEvents { - "backup:started": (data: ServerBackupStartedEventDto) => void; - "backup:progress": (data: ServerBackupProgressEventDto) => void; - "backup:completed": (data: ServerBackupCompletedEventDto) => void; - "restore:started": (data: ServerRestoreStartedEventDto) => void; - "restore:progress": (data: ServerRestoreProgressEventDto) => void; - "restore:completed": (data: ServerRestoreCompletedEventDto) => void; - "mirror:started": (data: { - organizationId: string; - scheduleId: number; - repositoryId: string; - repositoryName: string; - }) => void; - "mirror:completed": (data: { - organizationId: string; - scheduleId: number; - repositoryId: string; - repositoryName: string; - status: "success" | "error"; - error?: string; - }) => void; - "volume:mounted": (data: { organizationId: string; volumeName: string }) => void; - "volume:unmounted": (data: { organizationId: string; volumeName: string }) => void; - "volume:updated": (data: { organizationId: string; volumeName: string }) => void; - "volume:status_changed": (data: { organizationId: string; volumeName: string; status: string }) => void; - "doctor:started": (data: { organizationId: string; repositoryId: string; repositoryName: string }) => void; - "doctor:completed": ( - data: { - organizationId: string; - repositoryId: string; - repositoryName: string; - } & DoctorResult, - ) => void; - "doctor:cancelled": (data: { - organizationId: string; - repositoryId: string; - repositoryName: string; - error?: string; - }) => void; -} +import type { ServerEventHandlers } from "~/schemas/server-events"; +export type { ServerEventHandlers, ServerEventPayloadMap } from "~/schemas/server-events"; /** * Global event emitter for server-side events * Use this to emit events that should be broadcasted to connected clients via SSE */ -export const serverEvents = new EventEmitter() as TypedEmitter; +export const serverEvents = new EventEmitter() as TypedEmitter; diff --git a/app/server/modules/repositories/repositories.controller.ts b/app/server/modules/repositories/repositories.controller.ts index 6074fb3f..c4c1b8fd 100644 --- a/app/server/modules/repositories/repositories.controller.ts +++ b/app/server/modules/repositories/repositories.controller.ts @@ -143,8 +143,6 @@ export const repositoriesController = new Hono() summary: snapshot.summary, }; - c.header("Cache-Control", "max-age=300, stale-while-revalidate=600"); - return c.json(response, 200); }) .get( diff --git a/app/server/modules/repositories/repositories.service.ts b/app/server/modules/repositories/repositories.service.ts index 12ad36dc..e813a95e 100644 --- a/app/server/modules/repositories/repositories.service.ts +++ b/app/server/modules/repositories/repositories.service.ts @@ -16,10 +16,7 @@ import { import { serverEvents } from "~/server/core/events"; import { getOrganizationId } from "~/server/core/request-context"; import { logger } from "~/server/utils/logger"; -import { - parseRetentionCategories, - type RetentionCategory, -} from "~/server/utils/retention-categories"; +import { parseRetentionCategories, type RetentionCategory } from "~/server/utils/retention-categories"; import { repoMutex } from "../../core/repository-mutex"; import { db } from "../../db/db"; import { repositoriesTable } from "../../db/schema"; @@ -27,13 +24,7 @@ import { cache } from "../../utils/cache"; import { cryptoUtils } from "../../utils/crypto"; import { toMessage } from "../../utils/errors"; import { generateShortId } from "../../utils/id"; -import { - addCommonArgs, - buildEnv, - buildRepoUrl, - cleanupTemporaryKeys, - restic, -} from "../../utils/restic"; +import { addCommonArgs, buildEnv, buildRepoUrl, cleanupTemporaryKeys, restic } from "../../utils/restic"; import { safeSpawn } from "../../utils/spawn"; import { backupsService } from "../backups/backups.service"; import type { UpdateRepositoryBody } from "./repositories.dto"; @@ -45,31 +36,22 @@ const findRepository = async (idOrShortId: string) => { const organizationId = getOrganizationId(); return await db.query.repositoriesTable.findFirst({ where: { - AND: [ - { OR: [{ id: idOrShortId }, { shortId: idOrShortId }] }, - { organizationId }, - ], + AND: [{ OR: [{ id: idOrShortId }, { shortId: idOrShortId }] }, { organizationId }], }, }); }; const listRepositories = async () => { const organizationId = getOrganizationId(); - const repositories = await db.query.repositoriesTable.findMany({ - where: { organizationId }, - }); + const repositories = await db.query.repositoriesTable.findMany({ where: { organizationId } }); return repositories; }; -const encryptConfig = async ( - config: RepositoryConfig, -): Promise => { +const encryptConfig = async (config: RepositoryConfig): Promise => { const encryptedConfig: Record = { ...config }; if (config.customPassword) { - encryptedConfig.customPassword = await cryptoUtils.sealSecret( - config.customPassword, - ); + encryptedConfig.customPassword = await cryptoUtils.sealSecret(config.customPassword); } if (config.cacert) { @@ -79,39 +61,25 @@ const encryptConfig = async ( switch (config.backend) { case "s3": case "r2": - encryptedConfig.accessKeyId = await cryptoUtils.sealSecret( - config.accessKeyId, - ); - encryptedConfig.secretAccessKey = await cryptoUtils.sealSecret( - config.secretAccessKey, - ); + encryptedConfig.accessKeyId = await cryptoUtils.sealSecret(config.accessKeyId); + encryptedConfig.secretAccessKey = await cryptoUtils.sealSecret(config.secretAccessKey); break; case "gcs": - encryptedConfig.credentialsJson = await cryptoUtils.sealSecret( - config.credentialsJson, - ); + encryptedConfig.credentialsJson = await cryptoUtils.sealSecret(config.credentialsJson); break; case "azure": - encryptedConfig.accountKey = await cryptoUtils.sealSecret( - config.accountKey, - ); + encryptedConfig.accountKey = await cryptoUtils.sealSecret(config.accountKey); break; case "rest": if (config.username) { - encryptedConfig.username = await cryptoUtils.sealSecret( - config.username, - ); + encryptedConfig.username = await cryptoUtils.sealSecret(config.username); } if (config.password) { - encryptedConfig.password = await cryptoUtils.sealSecret( - config.password, - ); + encryptedConfig.password = await cryptoUtils.sealSecret(config.password); } break; case "sftp": - encryptedConfig.privateKey = await cryptoUtils.sealSecret( - config.privateKey, - ); + encryptedConfig.privateKey = await cryptoUtils.sealSecret(config.privateKey); break; } @@ -193,9 +161,7 @@ const createRepository = async (name: string, config: RepositoryConfig, compress error = result.error; } else { - const initResult = await restic.init(encryptedConfig, organizationId, { - timeoutMs: 20000, - }); + const initResult = await restic.init(encryptedConfig, organizationId, { timeoutMs: 20000 }); error = initResult.error; } @@ -203,12 +169,7 @@ const createRepository = async (name: string, config: RepositoryConfig, compress await db .update(repositoriesTable) .set({ status: "healthy", lastChecked: Date.now(), lastError: null }) - .where( - and( - eq(repositoriesTable.id, id), - eq(repositoriesTable.organizationId, organizationId), - ), - ); + .where(and(eq(repositoriesTable.id, id), eq(repositoriesTable.organizationId, organizationId))); return { repository: created, status: 201 }; } @@ -216,16 +177,9 @@ const createRepository = async (name: string, config: RepositoryConfig, compress const errorMessage = toMessage(error); await db .delete(repositoriesTable) - .where( - and( - eq(repositoriesTable.id, id), - eq(repositoriesTable.organizationId, organizationId), - ), - ); + .where(and(eq(repositoriesTable.id, id), eq(repositoriesTable.organizationId, organizationId))); - throw new InternalServerError( - `Failed to initialize repository: ${errorMessage}`, - ); + throw new InternalServerError(`Failed to initialize repository: ${errorMessage}`); }; const getRepository = async (id: string) => { @@ -250,10 +204,7 @@ const deleteRepository = async (id: string) => { await db .delete(repositoriesTable) .where( - and( - eq(repositoriesTable.id, repository.id), - eq(repositoriesTable.organizationId, repository.organizationId), - ), + and(eq(repositoriesTable.id, repository.id), eq(repositoriesTable.organizationId, repository.organizationId)), ); cache.delByPrefix(`snapshots:${repository.id}:`); @@ -277,8 +228,7 @@ const listSnapshots = async (id: string, backupId?: string) => { } const cacheKey = `snapshots:${repository.id}:${backupId || "all"}`; - const cached = - cache.get>>(cacheKey); + const cached = cache.get>>(cacheKey); if (cached) { return cached; } @@ -288,10 +238,7 @@ const listSnapshots = async (id: string, backupId?: string) => { let snapshots = []; if (backupId) { - snapshots = await restic.snapshots(repository.config, { - tags: [backupId], - organizationId, - }); + snapshots = await restic.snapshots(repository.config, { tags: [backupId], organizationId }); } else { snapshots = await restic.snapshots(repository.config, { organizationId }); } @@ -322,26 +269,9 @@ const listSnapshotFiles = async ( const cacheKey = `ls:${repository.id}:${snapshotId}:${path || "root"}:${offset}:${limit}`; type LsResult = { - snapshot: { - id: string; - short_id: string; - time: string; - hostname: string; - paths: string[]; - } | null; - nodes: { - name: string; - type: string; - path: string; - size?: number; - mode?: number; - }[]; - pagination: { - offset: number; - limit: number; - total: number; - hasMore: boolean; - }; + snapshot: { id: string; short_id: string; time: string; hostname: string; paths: string[] } | null; + nodes: { name: string; type: string; path: string; size?: number; mode?: number }[]; + pagination: { offset: number; limit: number; total: number; hasMore: boolean }; }; const cached = cache.get(cacheKey); if (cached?.snapshot) { @@ -355,18 +285,9 @@ const listSnapshotFiles = async ( }; } - const releaseLock = await repoMutex.acquireShared( - repository.id, - `ls:${snapshotId}`, - ); + const releaseLock = await repoMutex.acquireShared(repository.id, `ls:${snapshotId}`); try { - const result = await restic.ls( - repository.config, - snapshotId, - organizationId, - path, - { offset, limit }, - ); + const result = await restic.ls(repository.config, snapshotId, organizationId, path, { offset, limit }); if (!result.snapshot) { throw new NotFoundError("Snapshot not found or empty"); @@ -416,10 +337,7 @@ const restoreSnapshot = async ( const target = options?.targetPath || "/"; - const releaseLock = await repoMutex.acquireShared( - repository.id, - `restore:${snapshotId}`, - ); + const releaseLock = await repoMutex.acquireShared(repository.id, `restore:${snapshotId}`); try { serverEvents.emit("restore:started", { organizationId, @@ -431,25 +349,11 @@ const restoreSnapshot = async ( ...options, organizationId, onProgress: (progress) => { - const { - seconds_elapsed, - percent_done, - total_files, - files_done, - total_bytes, - bytes_done, - } = progress; - serverEvents.emit("restore:progress", { organizationId, repositoryId: repository.id, snapshotId, - seconds_elapsed, - percent_done, - total_files, - files_done, - total_bytes, - bytes_done, + ...progress, }); }, }); @@ -490,14 +394,10 @@ const getSnapshotDetails = async (id: string, snapshotId: string) => { } const cacheKey = `snapshots:${repository.id}:all`; - let snapshots = - cache.get>>(cacheKey); + let snapshots = cache.get>>(cacheKey); if (!snapshots) { - const releaseLock = await repoMutex.acquireShared( - repository.id, - `snapshot_details:${snapshotId}`, - ); + const releaseLock = await repoMutex.acquireShared(repository.id, `snapshot_details:${snapshotId}`); try { snapshots = await restic.snapshots(repository.config, { organizationId }); cache.set(cacheKey, snapshots); @@ -506,9 +406,7 @@ const getSnapshotDetails = async (id: string, snapshotId: string) => { } } - const snapshot = snapshots.find( - (snap) => snap.id === snapshotId || snap.short_id === snapshotId, - ); + const snapshot = snapshots.find((snap) => snap.id === snapshotId || snap.short_id === snapshotId); if (!snapshot) { void refreshSnapshots(id).catch(() => {}); @@ -529,9 +427,7 @@ const checkHealth = async (repositoryId: string) => { const releaseLock = await repoMutex.acquireExclusive(repository.id, "check"); try { - const { hasErrors, error } = await restic.check(repository.config, { - organizationId, - }); + const { hasErrors, error } = await restic.check(repository.config, { organizationId }); await db .update(repositoriesTable) @@ -541,10 +437,7 @@ const checkHealth = async (repositoryId: string) => { lastError: error, }) .where( - and( - eq(repositoriesTable.id, repository.id), - eq(repositoriesTable.organizationId, repository.organizationId), - ), + and(eq(repositoriesTable.id, repository.id), eq(repositoriesTable.organizationId, repository.organizationId)), ); return { lastError: error }; @@ -567,10 +460,7 @@ const startDoctor = async (id: string) => { const abortController = new AbortController(); try { - await db - .update(repositoriesTable) - .set({ status: "doctor" }) - .where(eq(repositoriesTable.id, repository.id)); + await db.update(repositoriesTable).set({ status: "doctor" }).where(eq(repositoriesTable.id, repository.id)); serverEvents.emit("doctor:started", { organizationId: repository.organizationId, @@ -584,12 +474,7 @@ const startDoctor = async (id: string) => { throw error; } - executeDoctor( - repository.id, - repository.config, - repository.name, - abortController.signal, - ) + executeDoctor(repository.id, repository.config, repository.name, abortController.signal) .catch((error) => { logger.error(`Doctor background task failed: ${toMessage(error)}`); }) @@ -609,20 +494,14 @@ const cancelDoctor = async (id: string) => { const abortController = runningDoctors.get(repository.id); if (!abortController) { - await db - .update(repositoriesTable) - .set({ status: "unknown" }) - .where(eq(repositoriesTable.id, repository.id)); + await db.update(repositoriesTable).set({ status: "unknown" }).where(eq(repositoriesTable.id, repository.id)); throw new ConflictError("No doctor operation is currently running"); } abortController.abort(); runningDoctors.delete(repository.id); - await db - .update(repositoriesTable) - .set({ status: "unknown" }) - .where(eq(repositoriesTable.id, repository.id)); + await db.update(repositoriesTable).set({ status: "unknown" }).where(eq(repositoriesTable.id, repository.id)); serverEvents.emit("doctor:cancelled", { organizationId: repository.organizationId, @@ -641,10 +520,7 @@ const deleteSnapshot = async (id: string, snapshotId: string) => { throw new NotFoundError("Repository not found"); } - const releaseLock = await repoMutex.acquireExclusive( - repository.id, - `delete:${snapshotId}`, - ); + const releaseLock = await repoMutex.acquireExclusive(repository.id, `delete:${snapshotId}`); try { await restic.deleteSnapshot(repository.config, snapshotId, organizationId); cache.delByPrefix(`snapshots:${repository.id}:`); @@ -662,16 +538,9 @@ const deleteSnapshots = async (id: string, snapshotIds: string[]) => { throw new NotFoundError("Repository not found"); } - const releaseLock = await repoMutex.acquireExclusive( - repository.id, - `delete:bulk`, - ); + const releaseLock = await repoMutex.acquireExclusive(repository.id, `delete:bulk`); try { - await restic.deleteSnapshots( - repository.config, - snapshotIds, - organizationId, - ); + await restic.deleteSnapshots(repository.config, snapshotIds, organizationId); cache.delByPrefix(`snapshots:${repository.id}:`); for (const snapshotId of snapshotIds) { cache.delByPrefix(`ls:${repository.id}:${snapshotId}:`); @@ -693,17 +562,9 @@ const tagSnapshots = async ( throw new NotFoundError("Repository not found"); } - const releaseLock = await repoMutex.acquireExclusive( - repository.id, - `tag:bulk`, - ); + const releaseLock = await repoMutex.acquireExclusive(repository.id, `tag:bulk`); try { - await restic.tagSnapshots( - repository.config, - snapshotIds, - tags, - organizationId, - ); + await restic.tagSnapshots(repository.config, snapshotIds, tags, organizationId); cache.delByPrefix(`snapshots:${repository.id}:`); for (const snapshotId of snapshotIds) { cache.delByPrefix(`ls:${repository.id}:${snapshotId}:`); @@ -726,9 +587,7 @@ const refreshSnapshots = async (id: string) => { const releaseLock = await repoMutex.acquireShared(repository.id, "refresh"); try { - const snapshots = await restic.snapshots(repository.config, { - organizationId, - }); + const snapshots = await restic.snapshots(repository.config, { organizationId }); const cacheKey = `snapshots:${repository.id}:all`; cache.set(cacheKey, snapshots); @@ -855,24 +714,14 @@ const execResticCommand = async ( } addCommonArgs(resticArgs, env, repository.config); - const result = await safeSpawn({ - command: "restic", - args: resticArgs, - env, - signal, - onStdout, - onStderr, - }); + const result = await safeSpawn({ command: "restic", args: resticArgs, env, signal, onStdout, onStderr }); await cleanupTemporaryKeys(env); return { exitCode: result.exitCode }; }; -const getRetentionCategories = async ( - repositoryId: string, - scheduleId?: string, -) => { +const getRetentionCategories = async (repositoryId: string, scheduleId?: string) => { if (!scheduleId) { return new Map(); } @@ -896,15 +745,11 @@ const getRetentionCategories = async ( const { repository } = repositoryResult; - const dryRunResults = await restic.forget( - repository.config, - schedule.retentionPolicy, - { - tag: scheduleId, - organizationId: getOrganizationId(), - dryRun: true, - }, - ); + const dryRunResults = await restic.forget(repository.config, schedule.retentionPolicy, { + tag: scheduleId, + organizationId: getOrganizationId(), + dryRun: true, + }); if (!dryRunResults.data) { return new Map(); diff --git a/app/server/utils/restic.ts b/app/server/utils/restic.ts index 15676ade..5299b1a1 100644 --- a/app/server/utils/restic.ts +++ b/app/server/utils/restic.ts @@ -4,12 +4,7 @@ import os from "node:os"; import path from "node:path"; import { type } from "arktype"; import { throttle } from "es-toolkit"; -import type { - BandwidthLimit, - CompressionMode, - OverwriteMode, - RepositoryConfig, -} from "~/schemas/restic"; +import type { BandwidthLimit, CompressionMode, OverwriteMode, RepositoryConfig } from "~/schemas/restic"; import { type ResticBackupProgressDto, type ResticRestoreOutputDto, @@ -20,12 +15,7 @@ import { resticSnapshotSummarySchema, } from "~/schemas/restic-dto"; import { config as appConfig } from "../core/config"; -import { - DEFAULT_EXCLUDES, - REPOSITORY_BASE, - RESTIC_CACHE_DIR, - RESTIC_PASS_FILE, -} from "../core/constants"; +import { DEFAULT_EXCLUDES, RESTIC_CACHE_DIR, RESTIC_PASS_FILE } from "../core/constants"; import { db } from "../db/db"; import type { RetentionPolicy } from "../modules/backups/backups.dto"; import { cryptoUtils } from "./crypto"; @@ -72,37 +62,25 @@ export const buildRepoUrl = (config: RepositoryConfig): string => { case "sftp": return `sftp:${config.user}@${config.host}:${config.path}`; default: { - throw new Error( - `Unsupported repository backend: ${JSON.stringify(config)}`, - ); + throw new Error(`Unsupported repository backend: ${JSON.stringify(config)}`); } } }; -export const buildEnv = async ( - config: RepositoryConfig, - organizationId: string, -) => { +export const buildEnv = async (config: RepositoryConfig, organizationId: string) => { const env: Record = { RESTIC_CACHE_DIR, PATH: process.env.PATH || "/usr/local/bin:/usr/bin:/bin", }; if (config.isExistingRepository && config.customPassword) { - const decryptedPassword = await cryptoUtils.resolveSecret( - config.customPassword, - ); - const passwordFilePath = path.join( - "/tmp", - `zerobyte-pass-${crypto.randomBytes(8).toString("hex")}.txt`, - ); + const decryptedPassword = await cryptoUtils.resolveSecret(config.customPassword); + const passwordFilePath = path.join("/tmp", `zerobyte-pass-${crypto.randomBytes(8).toString("hex")}.txt`); await fs.writeFile(passwordFilePath, decryptedPassword, { mode: 0o600 }); env.RESTIC_PASSWORD_FILE = passwordFilePath; } else { - const org = await db.query.organization.findFirst({ - where: { id: organizationId }, - }); + const org = await db.query.organization.findFirst({ where: { id: organizationId } }); if (!org) { throw new Error(`Organization ${organizationId} not found`); @@ -110,17 +88,10 @@ export const buildEnv = async ( const metadata = org.metadata; if (!metadata?.resticPassword) { - throw new Error( - `Restic password not configured for organization ${organizationId}`, - ); + throw new Error(`Restic password not configured for organization ${organizationId}`); } else { - const decryptedPassword = await cryptoUtils.resolveSecret( - metadata.resticPassword, - ); - const passwordFilePath = path.join( - "/tmp", - `zerobyte-pass-${crypto.randomBytes(8).toString("hex")}.txt`, - ); + const decryptedPassword = await cryptoUtils.resolveSecret(metadata.resticPassword); + const passwordFilePath = path.join("/tmp", `zerobyte-pass-${crypto.randomBytes(8).toString("hex")}.txt`); await fs.writeFile(passwordFilePath, decryptedPassword, { mode: 0o600 }); env.RESTIC_PASSWORD_FILE = passwordFilePath; } @@ -128,12 +99,8 @@ export const buildEnv = async ( switch (config.backend) { case "s3": - env.AWS_ACCESS_KEY_ID = await cryptoUtils.resolveSecret( - config.accessKeyId, - ); - env.AWS_SECRET_ACCESS_KEY = await cryptoUtils.resolveSecret( - config.secretAccessKey, - ); + env.AWS_ACCESS_KEY_ID = await cryptoUtils.resolveSecret(config.accessKeyId); + env.AWS_SECRET_ACCESS_KEY = await cryptoUtils.resolveSecret(config.secretAccessKey); // Huawei OBS requires virtual-hosted style access if (config.endpoint.includes("myhuaweicloud")) { @@ -141,35 +108,22 @@ export const buildEnv = async ( } break; case "r2": - env.AWS_ACCESS_KEY_ID = await cryptoUtils.resolveSecret( - config.accessKeyId, - ); - env.AWS_SECRET_ACCESS_KEY = await cryptoUtils.resolveSecret( - config.secretAccessKey, - ); + env.AWS_ACCESS_KEY_ID = await cryptoUtils.resolveSecret(config.accessKeyId); + env.AWS_SECRET_ACCESS_KEY = await cryptoUtils.resolveSecret(config.secretAccessKey); env.AWS_REGION = "auto"; env.AWS_S3_FORCE_PATH_STYLE = "true"; break; case "gcs": { - const decryptedCredentials = await cryptoUtils.resolveSecret( - config.credentialsJson, - ); - const credentialsPath = path.join( - "/tmp", - `zerobyte-gcs-${crypto.randomBytes(8).toString("hex")}.json`, - ); - await fs.writeFile(credentialsPath, decryptedCredentials, { - mode: 0o600, - }); + const decryptedCredentials = await cryptoUtils.resolveSecret(config.credentialsJson); + const credentialsPath = path.join("/tmp", `zerobyte-gcs-${crypto.randomBytes(8).toString("hex")}.json`); + await fs.writeFile(credentialsPath, decryptedCredentials, { mode: 0o600 }); env.GOOGLE_PROJECT_ID = config.projectId; env.GOOGLE_APPLICATION_CREDENTIALS = credentialsPath; break; } case "azure": { env.AZURE_ACCOUNT_NAME = config.accountName; - env.AZURE_ACCOUNT_KEY = await cryptoUtils.resolveSecret( - config.accountKey, - ); + env.AZURE_ACCOUNT_KEY = await cryptoUtils.resolveSecret(config.accountKey); if (config.endpointSuffix) { env.AZURE_ENDPOINT_SUFFIX = config.endpointSuffix; } @@ -177,23 +131,16 @@ export const buildEnv = async ( } case "rest": { if (config.username) { - env.RESTIC_REST_USERNAME = await cryptoUtils.resolveSecret( - config.username, - ); + env.RESTIC_REST_USERNAME = await cryptoUtils.resolveSecret(config.username); } if (config.password) { - env.RESTIC_REST_PASSWORD = await cryptoUtils.resolveSecret( - config.password, - ); + env.RESTIC_REST_PASSWORD = await cryptoUtils.resolveSecret(config.password); } break; } case "sftp": { const decryptedKey = await cryptoUtils.resolveSecret(config.privateKey); - const keyPath = path.join( - "/tmp", - `zerobyte-ssh-${crypto.randomBytes(8).toString("hex")}`, - ); + const keyPath = path.join("/tmp", `zerobyte-ssh-${crypto.randomBytes(8).toString("hex")}`); let normalizedKey = decryptedKey.replace(/\r\n/g, "\n"); if (!normalizedKey.endsWith("\n")) { @@ -201,12 +148,8 @@ export const buildEnv = async ( } if (normalizedKey.includes("ENCRYPTED")) { - logger.error( - "SFTP: Private key appears to be passphrase-protected. Please use an unencrypted key.", - ); - throw new Error( - "Passphrase-protected SSH keys are not supported. Please provide an unencrypted private key.", - ); + logger.error("SFTP: Private key appears to be passphrase-protected. Please use an unencrypted key."); + throw new Error("Passphrase-protected SSH keys are not supported. Please provide an unencrypted private key."); } await fs.writeFile(keyPath, normalizedKey, { mode: 0o600 }); @@ -241,25 +184,12 @@ export const buildEnv = async ( ]; if (config.skipHostKeyCheck || !config.knownHosts) { - sshArgs.push( - "-o", - "StrictHostKeyChecking=no", - "-o", - "UserKnownHostsFile=/dev/null", - ); + sshArgs.push("-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null"); } else if (config.knownHosts) { - const knownHostsPath = path.join( - "/tmp", - `zerobyte-known-hosts-${crypto.randomBytes(8).toString("hex")}`, - ); + const knownHostsPath = path.join("/tmp", `zerobyte-known-hosts-${crypto.randomBytes(8).toString("hex")}`); await fs.writeFile(knownHostsPath, config.knownHosts, { mode: 0o600 }); env._SFTP_KNOWN_HOSTS_PATH = knownHostsPath; - sshArgs.push( - "-o", - "StrictHostKeyChecking=yes", - "-o", - `UserKnownHostsFile=${knownHostsPath}`, - ); + sshArgs.push("-o", "StrictHostKeyChecking=yes", "-o", `UserKnownHostsFile=${knownHostsPath}`); } if (config.port && config.port !== 22) { @@ -274,10 +204,7 @@ export const buildEnv = async ( if (config.cacert) { const decryptedCert = await cryptoUtils.resolveSecret(config.cacert); - const certPath = path.join( - "/tmp", - `zerobyte-cacert-${crypto.randomBytes(8).toString("hex")}.pem`, - ); + const certPath = path.join("/tmp", `zerobyte-cacert-${crypto.randomBytes(8).toString("hex")}.pem`); await fs.writeFile(certPath, decryptedCert, { mode: 0o600 }); env.RESTIC_CACERT = certPath; } @@ -289,11 +216,7 @@ export const buildEnv = async ( return env; }; -const init = async ( - config: RepositoryConfig, - organizationId: string, - options?: { timeoutMs?: number }, -) => { +const init = async (config: RepositoryConfig, organizationId: string, options?: { timeoutMs?: number }) => { const repoUrl = buildRepoUrl(config); logger.info(`Initializing restic repository at ${repoUrl}...`); @@ -303,12 +226,7 @@ const init = async ( const args = ["init", "--repo", repoUrl]; addCommonArgs(args, env, config); - const res = await exec({ - command: "restic", - args, - env, - timeout: options?.timeoutMs ?? 20000, - }); + const res = await exec({ command: "restic", args, env, timeout: options?.timeoutMs ?? 20000 }); await cleanupTemporaryKeys(env); if (res.exitCode !== 0) { @@ -338,13 +256,7 @@ const backup = async ( const repoUrl = buildRepoUrl(config); const env = await buildEnv(config, options?.organizationId); - const args: string[] = [ - "--repo", - repoUrl, - "backup", - "--compression", - options?.compressionMode ?? "auto", - ]; + const args: string[] = ["--repo", repoUrl, "backup", "--compression", options?.compressionMode ?? "auto"]; if (options?.oneFileSystem) { args.push("--one-file-system"); @@ -362,9 +274,7 @@ const backup = async ( let includeFile: string | null = null; if (options?.include && options.include.length > 0) { - const tmp = await fs.mkdtemp( - path.join(os.tmpdir(), "zerobyte-restic-include-"), - ); + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "zerobyte-restic-include-")); includeFile = path.join(tmp, `include.txt`); await fs.writeFile(includeFile, options.include.join("\n"), "utf-8"); @@ -380,9 +290,7 @@ const backup = async ( let excludeFile: string | null = null; if (options?.exclude && options.exclude.length > 0) { - const tmp = await fs.mkdtemp( - path.join(os.tmpdir(), "zerobyte-restic-exclude-"), - ); + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "zerobyte-restic-exclude-")); excludeFile = path.join(tmp, `exclude.txt`); await fs.writeFile(excludeFile, options.exclude.join("\n"), "utf-8"); @@ -409,6 +317,8 @@ const backup = async ( const progress = resticBackupProgressSchema(jsonData); if (!(progress instanceof type.errors)) { options.onProgress(progress); + } else { + logger.error(progress.summary); } } catch (_) { // Ignore JSON parse errors for non-JSON lines @@ -472,13 +382,13 @@ const backup = async ( }; const restoreProgressSchema = type({ - message_type: "'status'", + message_type: "'status' | 'summary'", seconds_elapsed: "number", - percent_done: "number", + percent_done: "number = 0", total_files: "number", - files_done: "number", - total_bytes: "number", - bytes_done: "number", + files_restored: "number = 0", + total_bytes: "number = 0", + bytes_restored: "number = 0", }); export type RestoreProgress = typeof restoreProgressSchema.infer; @@ -500,23 +410,12 @@ const restore = async ( const repoUrl = buildRepoUrl(config); const env = await buildEnv(config, options.organizationId); - const args: string[] = [ - "--repo", - repoUrl, - "restore", - snapshotId, - "--target", - target, - ]; + const args: string[] = ["--repo", repoUrl, "restore", snapshotId, "--target", target]; if (options?.overwrite) { args.push("--overwrite", options.overwrite); } - if (options?.delete) { - args.push("--delete"); - } - if (options?.include?.length) { for (const pattern of options.include) { args.push("--include", pattern); @@ -544,6 +443,8 @@ const restore = async ( const progress = restoreProgressSchema(jsonData); if (!(progress instanceof type.errors)) { options.onProgress(progress); + } else { + logger.error(progress.summary); } } catch (_) { // Ignore JSON parse errors for non-JSON lines @@ -576,23 +477,16 @@ const restore = async ( try { summaryLine = JSON.parse(lastLine ?? "{}"); } catch (_) { - logger.warn( - "Failed to parse restic restore output JSON summary.", - lastLine, - ); + logger.warn("Failed to parse restic restore output JSON summary.", lastLine); summaryLine = {}; } - logger.debug( - `Restic restore output last line: ${JSON.stringify(summaryLine)}`, - ); + logger.debug(`Restic restore output last line: ${JSON.stringify(summaryLine)}`); const result = resticRestoreOutputSchema(summaryLine); if (result instanceof type.errors) { logger.warn(`Restic restore output validation failed: ${result.summary}`); - logger.info( - `Restic restore completed for snapshot ${snapshotId} to target ${target}`, - ); + logger.info(`Restic restore completed for snapshot ${snapshotId} to target ${target}`); const fallback: ResticRestoreOutputDto = { message_type: "summary" as const, total_files: 0, @@ -611,10 +505,7 @@ const restore = async ( return result; }; -const snapshots = async ( - config: RepositoryConfig, - options: { tags?: string[]; organizationId: string }, -) => { +const snapshots = async (config: RepositoryConfig, options: { tags?: string[]; organizationId: string }) => { const { tags, organizationId } = options; const repoUrl = buildRepoUrl(config); @@ -641,12 +532,8 @@ const snapshots = async ( const result = snapshotInfoSchema.array()(JSON.parse(res.stdout)); if (result instanceof type.errors) { - logger.error( - `Restic snapshots output validation failed: ${result.summary}`, - ); - throw new Error( - `Restic snapshots output validation failed: ${result.summary}`, - ); + logger.error(`Restic snapshots output validation failed: ${result.summary}`); + throw new Error(`Restic snapshots output validation failed: ${result.summary}`); } return result; @@ -693,15 +580,7 @@ const forget = async ( const repoUrl = buildRepoUrl(config); const env = await buildEnv(config, extra.organizationId); - const args: string[] = [ - "--repo", - repoUrl, - "forget", - "--group-by", - "tags", - "--tag", - extra.tag, - ]; + const args: string[] = ["--repo", repoUrl, "forget", "--group-by", "tags", "--tag", extra.tag]; if (extra.dryRun) { args.push("--dry-run", "--no-lock"); @@ -744,18 +623,12 @@ const forget = async ( } const lines = res.stdout.split("\n").filter((line) => line.trim()); - const result = extra.dryRun - ? safeJsonParse(lines.at(-1) ?? "[]") - : null; + const result = extra.dryRun ? safeJsonParse(lines.at(-1) ?? "[]") : null; return { success: true, data: result }; }; -const deleteSnapshots = async ( - config: RepositoryConfig, - snapshotIds: string[], - organizationId: string, -) => { +const deleteSnapshots = async (config: RepositoryConfig, snapshotIds: string[], organizationId: string) => { const repoUrl = buildRepoUrl(config); const env = await buildEnv(config, organizationId); @@ -763,13 +636,7 @@ const deleteSnapshots = async ( throw new Error("No snapshot IDs provided for deletion."); } - const args: string[] = [ - "--repo", - repoUrl, - "forget", - ...snapshotIds, - "--prune", - ]; + const args: string[] = ["--repo", repoUrl, "forget", ...snapshotIds, "--prune"]; addCommonArgs(args, env, config); const res = await exec({ command: "restic", args, env }); @@ -783,11 +650,7 @@ const deleteSnapshots = async ( return { success: true }; }; -const deleteSnapshot = async ( - config: RepositoryConfig, - snapshotId: string, - organizationId: string, -) => { +const deleteSnapshot = async (config: RepositoryConfig, snapshotId: string, organizationId: string) => { return deleteSnapshots(config, [snapshotId], organizationId); }; @@ -954,22 +817,14 @@ const ls = async ( }; }; -const unlock = async ( - config: RepositoryConfig, - options: { signal?: AbortSignal; organizationId: string }, -) => { +const unlock = async (config: RepositoryConfig, options: { signal?: AbortSignal; organizationId: string }) => { const repoUrl = buildRepoUrl(config); const env = await buildEnv(config, options.organizationId); const args = ["unlock", "--repo", repoUrl, "--remove-all"]; addCommonArgs(args, env, config); - const res = await exec({ - command: "restic", - args, - env, - signal: options?.signal, - }); + const res = await exec({ command: "restic", args, env, signal: options?.signal }); await cleanupTemporaryKeys(env); if (options?.signal?.aborted) { @@ -1001,22 +856,12 @@ const check = async ( addCommonArgs(args, env, config); - const res = await exec({ - command: "restic", - args, - env, - signal: options?.signal, - }); + const res = await exec({ command: "restic", args, env, signal: options?.signal }); await cleanupTemporaryKeys(env); if (options?.signal?.aborted) { logger.warn("Restic check was aborted by signal."); - return { - success: false, - hasErrors: true, - output: "", - error: "Operation aborted", - }; + return { success: false, hasErrors: true, output: "", error: "Operation aborted" }; } const { stdout, stderr } = res; @@ -1042,22 +887,14 @@ const check = async ( }; }; -const repairIndex = async ( - config: RepositoryConfig, - options: { signal?: AbortSignal; organizationId: string }, -) => { +const repairIndex = async (config: RepositoryConfig, options: { signal?: AbortSignal; organizationId: string }) => { const repoUrl = buildRepoUrl(config); const env = await buildEnv(config, options.organizationId); const args = ["repair", "index", "--repo", repoUrl]; addCommonArgs(args, env, config); - const res = await exec({ - command: "restic", - args, - env, - signal: options?.signal, - }); + const res = await exec({ command: "restic", args, env, signal: options?.signal }); await cleanupTemporaryKeys(env); if (options?.signal?.aborted) { @@ -1097,13 +934,7 @@ const copy = async ( RESTIC_FROM_PASSWORD_FILE: sourceEnv.RESTIC_PASSWORD_FILE, }; - const args: string[] = [ - "--repo", - destRepoUrl, - "copy", - "--from-repo", - sourceRepoUrl, - ]; + const args: string[] = ["--repo", destRepoUrl, "copy", "--from-repo", sourceRepoUrl]; if (options.tag) { args.push("--tag", options.tag); @@ -1232,12 +1063,7 @@ export const restic = { }; export const cleanupTemporaryKeys = async (env: Record) => { - const keysToClean = [ - "_SFTP_KEY_PATH", - "_SFTP_KNOWN_HOSTS_PATH", - "RESTIC_CACERT", - "GOOGLE_APPLICATION_CREDENTIALS", - ]; + const keysToClean = ["_SFTP_KEY_PATH", "_SFTP_KNOWN_HOSTS_PATH", "RESTIC_CACERT", "GOOGLE_APPLICATION_CREDENTIALS"]; for (const key of keysToClean) { if (env[key]) { @@ -1246,10 +1072,7 @@ export const cleanupTemporaryKeys = async (env: Record) => { } // Clean up custom password files - if ( - env.RESTIC_PASSWORD_FILE && - env.RESTIC_PASSWORD_FILE !== RESTIC_PASS_FILE - ) { + if (env.RESTIC_PASSWORD_FILE && env.RESTIC_PASSWORD_FILE !== RESTIC_PASS_FILE) { await fs.unlink(env.RESTIC_PASSWORD_FILE).catch(() => {}); } }; From d528c5f5181c52c3e5e83dfdf0463979fe33008d Mon Sep 17 00:00:00 2001 From: Nicolas Meienberger Date: Sat, 14 Feb 2026 12:21:39 +0100 Subject: [PATCH 4/4] refactor(sse): generic handler factory --- .../modules/events/events.controller.ts | 224 ++++-------------- 1 file changed, 51 insertions(+), 173 deletions(-) diff --git a/app/server/modules/events/events.controller.ts b/app/server/modules/events/events.controller.ts index 5ab49af1..762051f2 100644 --- a/app/server/modules/events/events.controller.ts +++ b/app/server/modules/events/events.controller.ts @@ -1,17 +1,36 @@ import { Hono } from "hono"; import { streamSSE } from "hono/streaming"; -import type { - ServerBackupCompletedEventDto, - ServerBackupProgressEventDto, - ServerBackupStartedEventDto, - ServerRestoreCompletedEventDto, - ServerRestoreProgressEventDto, - ServerRestoreStartedEventDto, -} from "~/schemas/events-dto"; -import type { DoctorResult } from "~/schemas/restic"; import { serverEvents } from "../../core/events"; import { logger } from "../../utils/logger"; import { requireAuth } from "../auth/auth.middleware"; +import type { ServerEventPayloadMap } from "~/schemas/server-events"; + +type OrganizationScopedEvent = { + [EventName in keyof ServerEventPayloadMap]: ServerEventPayloadMap[EventName] extends { + organizationId: string; + } + ? EventName + : never; +}[keyof ServerEventPayloadMap]; + +const broadcastEvents = [ + "backup:started", + "backup:progress", + "backup:completed", + "volume:mounted", + "volume:unmounted", + "volume:updated", + "mirror:started", + "mirror:completed", + "restore:started", + "restore:progress", + "restore:completed", + "doctor:started", + "doctor:completed", + "doctor:cancelled", +] as const satisfies OrganizationScopedEvent[]; + +type BroadcastEvent = (typeof broadcastEvents)[number]; export const eventsController = new Hono().use(requireAuth).get("/", (c) => { logger.info("Client connected to SSE endpoint"); @@ -23,155 +42,27 @@ export const eventsController = new Hono().use(requireAuth).get("/", (c) => { event: "connected", }); - const onBackupStarted = async (data: ServerBackupStartedEventDto) => { - if (data.organizationId !== organizationId) return; - await stream.writeSSE({ - data: JSON.stringify(data), - event: "backup:started", - }); - }; - - const onBackupProgress = async (data: ServerBackupProgressEventDto) => { - if (data.organizationId !== organizationId) return; - await stream.writeSSE({ - data: JSON.stringify(data), - event: "backup:progress", - }); - }; - - const onBackupCompleted = async (data: ServerBackupCompletedEventDto) => { - if (data.organizationId !== organizationId) return; - await stream.writeSSE({ - data: JSON.stringify(data), - event: "backup:completed", - }); - }; - - const onVolumeMounted = async (data: { organizationId: string; volumeName: string }) => { - if (data.organizationId !== organizationId) return; - await stream.writeSSE({ - data: JSON.stringify(data), - event: "volume:mounted", - }); - }; - - const onVolumeUnmounted = async (data: { organizationId: string; volumeName: string }) => { - if (data.organizationId !== organizationId) return; - await stream.writeSSE({ - data: JSON.stringify(data), - event: "volume:unmounted", - }); - }; - - const onVolumeUpdated = async (data: { organizationId: string; volumeName: string }) => { - if (data.organizationId !== organizationId) return; - await stream.writeSSE({ - data: JSON.stringify(data), - event: "volume:updated", - }); - }; - - const onMirrorStarted = async (data: { - organizationId: string; - scheduleId: number; - repositoryId: string; - repositoryName: string; - }) => { - if (data.organizationId !== organizationId) return; - await stream.writeSSE({ - data: JSON.stringify(data), - event: "mirror:started", - }); - }; - - const onMirrorCompleted = async (data: { - organizationId: string; - scheduleId: number; - repositoryId: string; - repositoryName: string; - status: "success" | "error"; - error?: string; - }) => { - if (data.organizationId !== organizationId) return; - await stream.writeSSE({ - data: JSON.stringify(data), - event: "mirror:completed", - }); - }; - - const onRestoreStarted = async (data: ServerRestoreStartedEventDto) => { - if (data.organizationId !== organizationId) return; - await stream.writeSSE({ - data: JSON.stringify(data), - event: "restore:started", - }); - }; - - const onRestoreProgress = async (data: ServerRestoreProgressEventDto) => { - if (data.organizationId !== organizationId) return; - await stream.writeSSE({ - data: JSON.stringify(data), - event: "restore:progress", - }); - }; - - const onRestoreCompleted = async (data: ServerRestoreCompletedEventDto) => { - if (data.organizationId !== organizationId) return; - await stream.writeSSE({ - data: JSON.stringify(data), - event: "restore:completed", - }); - }; - - const onDoctorStarted = async (data: { organizationId: string; repositoryId: string; repositoryName: string }) => { - if (data.organizationId !== organizationId) return; - await stream.writeSSE({ - data: JSON.stringify(data), - event: "doctor:started", - }); - }; - - const onDoctorCompleted = async ( - data: { - organizationId: string; - repositoryId: string; - repositoryName: string; - } & DoctorResult, - ) => { - if (data.organizationId !== organizationId) return; - await stream.writeSSE({ - data: JSON.stringify(data), - event: "doctor:completed", - }); + const createOrganizationEventHandler = (event: EventName) => { + return async (data: ServerEventPayloadMap[EventName]) => { + if (data.organizationId !== organizationId) return; + await stream.writeSSE({ + data: JSON.stringify(data), + event, + }); + }; }; - const onDoctorCancelled = async (data: { - organizationId: string; - repositoryId: string; - repositoryName: string; - error?: string; - }) => { - if (data.organizationId !== organizationId) return; - await stream.writeSSE({ - data: JSON.stringify(data), - event: "doctor:cancelled", - }); - }; + const eventHandlers = broadcastEvents.reduce( + (handlers, event) => { + handlers[event] = createOrganizationEventHandler(event); + return handlers; + }, + {} as { [EventName in BroadcastEvent]: (data: ServerEventPayloadMap[EventName]) => Promise }, + ); - serverEvents.on("backup:started", onBackupStarted); - serverEvents.on("backup:progress", onBackupProgress); - serverEvents.on("backup:completed", onBackupCompleted); - serverEvents.on("volume:mounted", onVolumeMounted); - serverEvents.on("volume:unmounted", onVolumeUnmounted); - serverEvents.on("volume:updated", onVolumeUpdated); - serverEvents.on("mirror:started", onMirrorStarted); - serverEvents.on("mirror:completed", onMirrorCompleted); - serverEvents.on("restore:started", onRestoreStarted); - serverEvents.on("restore:progress", onRestoreProgress); - serverEvents.on("restore:completed", onRestoreCompleted); - serverEvents.on("doctor:started", onDoctorStarted); - serverEvents.on("doctor:completed", onDoctorCompleted); - serverEvents.on("doctor:cancelled", onDoctorCancelled); + for (const event of broadcastEvents) { + serverEvents.on(event, eventHandlers[event]); + } let keepAlive = true; let cleanedUp = false; @@ -181,20 +72,10 @@ export const eventsController = new Hono().use(requireAuth).get("/", (c) => { cleanedUp = true; c.req.raw.signal.removeEventListener("abort", onRequestAbort); - serverEvents.off("backup:started", onBackupStarted); - serverEvents.off("backup:progress", onBackupProgress); - serverEvents.off("backup:completed", onBackupCompleted); - serverEvents.off("volume:mounted", onVolumeMounted); - serverEvents.off("volume:unmounted", onVolumeUnmounted); - serverEvents.off("volume:updated", onVolumeUpdated); - serverEvents.off("mirror:started", onMirrorStarted); - serverEvents.off("mirror:completed", onMirrorCompleted); - serverEvents.off("restore:started", onRestoreStarted); - serverEvents.off("restore:progress", onRestoreProgress); - serverEvents.off("restore:completed", onRestoreCompleted); - serverEvents.off("doctor:started", onDoctorStarted); - serverEvents.off("doctor:completed", onDoctorCompleted); - serverEvents.off("doctor:cancelled", onDoctorCancelled); + + for (const event of broadcastEvents) { + serverEvents.off(event, eventHandlers[event]); + } } function handleDisconnect() { @@ -214,10 +95,7 @@ export const eventsController = new Hono().use(requireAuth).get("/", (c) => { try { while (keepAlive && !c.req.raw.signal.aborted && !stream.aborted) { - await stream.writeSSE({ - data: JSON.stringify({ timestamp: Date.now() }), - event: "heartbeat", - }); + await stream.writeSSE({ data: JSON.stringify({ timestamp: Date.now() }), event: "heartbeat" }); await stream.sleep(5000); } } finally {