Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Node
node_modules
.npmrc
**/node_modules
apps/*/node_modules
packages/*/node_modules
.pnpm-store

# Python
Expand Down
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,7 @@ test-results/*
.env.testing
.env.dev
docker/.env.system
.env.system
.env.system
docker/docker-compose.override.yml
docker/start.ps1
docker/stop.ps1
121 changes: 121 additions & 0 deletions WARP.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# WARP.md

This file provides guidance to WARP (warp.dev) when working with code in this repository.

# Project Structure

This is a monorepo managed by **pnpm**.

- **apps/**
- `web`: React/Vite application (Frontend).
- `desktop`: Electron application (Desktop Client).
- `background-jobs`: Node.js service (Express, BullMQ) that orchestrates video processing and communicates with the Python service.
- **packages/**
- `prisma`: Database schema, migrations, and client generation.
- `shared`: Shared utilities, types, constants, and services (including `pythonService`).
- `ui`: Shared UI component library.
- **python/**: Python service for AI/ML tasks (Video Analysis, Transcription, Face Recognition).
- **docker/**: Docker Compose configuration and Dockerfiles.

# Development Setup

## Prerequisites
- Node.js (v22+)
- pnpm (v10+)
- Python (3.11+)
- Docker & Docker Compose (Recommended for running services like Postgres, Redis, ChromaDB)

## Initial Setup
1. Install dependencies:
```bash
pnpm install
```
2. Set up environment variables:
- Copy `.env.example` to `.env` in the root.
- Configure `HOST_MEDIA_PATH`, `GEMINI_API_KEY` (if used), etc.
3. Start infrastructure services (Postgres, Redis, ChromaDB):
```bash
docker compose -f docker/docker-compose.yml up -d postgres redis chroma
```
4. Initialize Database:
```bash
pnpm prisma generate
pnpm prisma migrate dev
pnpm prisma seed
```

## Python Environment (Local Development)
If running `background-jobs` locally (not in Docker), you must set up the Python environment:

1. Create a virtual environment in `python/venv` (or similar):
```bash
cd python
python -m venv venv
# Activate venv (Windows: venv\Scripts\activate, Unix: source venv/bin/activate)
pip install -r requirements.txt
cd ..
```
2. Update `.env` to point to the local Python setup:
```ini
PYTHON_SCRIPT="./python/analysis_service.py"
VENV_PATH="./python/venv"
PYTHON_PORT="8765"
```

# Common Commands

## Running Applications
- **Web App**:
```bash
pnpm --filter web dev
```
- **Desktop App**:
```bash
pnpm --filter desktop dev
```
- **Background Jobs**:
```bash
pnpm --filter background-jobs dev
```

## Database Management (via `packages/prisma`)
- Generate Client: `pnpm prisma generate`
- Run Migrations: `pnpm prisma migrate dev`
- Seed Database: `pnpm prisma seed`
- Open Studio: `pnpm prisma studio`

## Testing
- Run all tests:
```bash
pnpm test
```
- Run specific package tests:
```bash
pnpm --filter web test
pnpm --filter desktop test
pnpm --filter shared test
```

## Linting & Formatting
- Lint: `pnpm --filter <package> lint`
- Format: `pnpm --filter <package> format` (or `prettier --write .`)

# Architecture & Key Components

## Video Processing Pipeline
1. **Ingestion**: `background-jobs` watches `HOST_MEDIA_PATH` or receives upload events.
2. **Queue**: BullMQ manages processing jobs (`video-indexing`).
3. **Orchestration**: `videoIndexer.ts` in `background-jobs` coordinates the workflow.
4. **Analysis**: `pythonService` (in `packages/shared`) spawns the Python process and communicates via **WebSockets** (`ws://localhost:8765`).
- **Transcription**: OpenAI Whisper.
- **Vision**: OpenCV, YOLO, Gemini (optional).
- **Embeddings**: Stored in ChromaDB for semantic search.

## Communication
- **Node <-> Python**: WebSocket connection. The Node.js service manages the Python process lifecycle (spawn/kill).
- **Frontend <-> Backend**: REST API (Express) and likely WebSockets or Polling for job status.

## Database
- **Postgres**: Stores metadata, video info, jobs, scenes, and faces. Managed by Prisma.
- **ChromaDB**: Vector database for semantic search embeddings.
- **Redis**: Backend for BullMQ job queues.
7 changes: 4 additions & 3 deletions apps/background-jobs/src/jobs/videoIndexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,10 @@ async function processVideo(job: Job<{ videoPath: string; jobId: string; forceRe
export const videoIndexerWorker = new Worker('video-indexing', processVideo, {
connection,
concurrency: 1,
lockDuration: 15 * 60 * 1000,
stalledInterval: 5 * 60 * 1000,
maxStalledCount: 2,
// Extended timeouts for long videos (up to 4 hours)
lockDuration: 4 * 60 * 60 * 1000, // 4 hours - job can run this long
stalledInterval: 30 * 60 * 1000, // Check for stalled jobs every 30 minutes
maxStalledCount: 3, // Allow 3 stall events before marking failed
})

videoIndexerWorker.on('failed', async (job: Job | undefined, err: Error) => {
Expand Down
55 changes: 54 additions & 1 deletion apps/background-jobs/src/queue.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,64 @@
import { Queue } from 'bullmq'
import { config } from './config'
import IORedis from 'ioredis'
import { logger } from '@shared/services/logger'

// Resilient Redis connection with exponential backoff
// Addresses Docker Desktop networking instability on Windows/WSL2
export const connection = new IORedis({
host: config.redisHost,
port: config.redisPort,
maxRetriesPerRequest: null,
maxRetriesPerRequest: null, // Required by BullMQ

// Connection resilience settings
retryStrategy: (times: number) => {
// Exponential backoff: 100ms, 200ms, 400ms, ... up to 30s max
const delay = Math.min(Math.pow(2, times) * 100, 30000)
logger.warn(`Redis connection retry #${times}, next attempt in ${delay}ms`)
return delay
},

// Reconnect on error
reconnectOnError: (err: Error) => {
const targetErrors = ['READONLY', 'ETIMEDOUT', 'ECONNRESET', 'ECONNREFUSED']
if (targetErrors.some(e => err.message.includes(e))) {
logger.warn(`Redis reconnecting due to: ${err.message}`)
return true // Reconnect for these errors
}
return false
},

// Connection timeouts
connectTimeout: 30000, // 30s to establish connection
commandTimeout: 60000, // 60s for commands (long jobs)

// Keep-alive settings
keepAlive: 30000, // Send TCP keepalive every 30s

// Auto-reconnect
enableReadyCheck: true,
enableOfflineQueue: true, // Queue commands while disconnected
})

// Log connection events
connection.on('connect', () => {
logger.info('Redis connected')
})

connection.on('ready', () => {
logger.info('Redis ready')
})

connection.on('error', (err) => {
logger.error(`Redis error: ${err.message}`)
})

connection.on('close', () => {
logger.warn('Redis connection closed')
})

connection.on('reconnecting', () => {
logger.info('Redis reconnecting...')
})

export const videoQueue = new Queue('video-indexing', {
Expand Down
26 changes: 16 additions & 10 deletions apps/background-jobs/src/routes/folders.ts
Original file line number Diff line number Diff line change
@@ -1,46 +1,52 @@
import express from 'express'
import { prisma } from '../services/db'
import { findVideoFiles } from '@shared/utils/videos';
import { findVideoFiles, findAudioFiles } from '@shared/utils/videos';
import { watchFolder } from '../watcher'
import { videoQueue } from 'src/queue'
import { getVideosNotEmbedded } from '@shared/services/vectorDb';

const router = express.Router()

function isAudioFolder(p: string): boolean {
const name = p.toLowerCase()
return name.includes('/x_audio') || name.endsWith('/audio') || name.includes('xaudio')
}

router.post('/trigger', async (req, res) => {
const { folderPath } = req.body
if (!folderPath) return res.status(400).json({ error: 'folderPath required' })

try {
const videos = await findVideoFiles(folderPath)
const uniqueVideos = await getVideosNotEmbedded(videos.map((video) => video.path))
const useAudio = isAudioFolder(folderPath)
const mediaFiles = useAudio ? await findAudioFiles(folderPath) : await findVideoFiles(folderPath)
const uniqueMedia = await getVideosNotEmbedded(mediaFiles.map((f) => f.path))
const folder = await prisma.folder.update({
where: { path: folderPath },
data: {
videoCount: uniqueVideos.length,
videoCount: uniqueMedia.length,
lastScanned: new Date(),
},
})

watchFolder(folderPath)

for (const video of uniqueVideos) {
for (const mediaPath of uniqueMedia) {
const job = await prisma.job.upsert({
where: { videoPath: video, id: '' },
where: { videoPath: mediaPath, id: '' },
create: {
videoPath: video,
videoPath: mediaPath,
userId: folder?.userId,
folderId: folder.id,
},
update: { folderId: folder.id},
})
await videoQueue.add('index-video', { videoPath: video, jobId: job.id, folderId: folder.id })
await videoQueue.add('index-video', { videoPath: mediaPath, jobId: job.id, folderId: folder.id })
}

res.json({
message: 'Folder added and videos queued for processing',
message: useAudio ? 'Audio folder scanned and files queued' : 'Folder added and videos queued for processing',
folder,
queuedVideos: uniqueVideos.length,
queuedFiles: uniqueMedia.length,
})
} catch (error) {
console.error(error)
Expand Down
33 changes: 27 additions & 6 deletions apps/background-jobs/src/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,36 @@ import chokidar from 'chokidar'
import path from 'path'
import { videoQueue } from './queue.js'
import { prisma } from './services/db.js'
import { SUPPORTED_VIDEO_EXTENSIONS } from '@shared/constants/index'
import { SUPPORTED_VIDEO_EXTENSIONS, SUPPORTED_AUDIO_EXTENSIONS, WATCHER_IGNORED } from '@shared/constants/index'
import { logger } from '@shared/services/logger.js'

function isAudioFolder(p: string): boolean {
const name = p.toLowerCase()
return name.includes('/x_audio') || name.endsWith('/audio') || name.includes('xaudio')
}

export function watchFolder(folderPath: string) {
const watcher = chokidar.watch(folderPath, { ignored: /^\./, persistent: true, ignoreInitial: true })
const useAudio = isAudioFolder(folderPath)
const extPattern = useAudio ? SUPPORTED_AUDIO_EXTENSIONS : SUPPORTED_VIDEO_EXTENSIONS

const watcher = chokidar.watch(folderPath, {
ignored: (p: string) => WATCHER_IGNORED.some((re) => re.test(p)),
persistent: true,
ignoreInitial: true,
depth: 0, // top-level only; avoids heavy recursion (Downloads subdirs, Syncthing internals)
awaitWriteFinish: { stabilityThreshold: 3000, pollInterval: 100 },
ignorePermissionErrors: true,
atomic: true,
})

watcher.on('error', (error) => {
logger.error(`Watcher error for ${folderPath}: ${error.message}`)
// Do not crash. For EIO/ENOMEM we keep the watcher alive; new file events may still arrive.
})

watcher.on('add', async (filePath) => {
try {
if (!SUPPORTED_VIDEO_EXTENSIONS.test(filePath)) return
if (!extPattern.test(filePath)) return

const folder = await prisma.folder.findFirst({
where: {
Expand All @@ -25,7 +46,7 @@ export function watchFolder(folderPath: string) {
})
await videoQueue.add('index-video', { videoPath: filePath, jobId: job.id, folderId: folder.id })
} catch (error) {
console.error('Error adding new video file while watching for new folder changes: ', error)
logger.error(`Error adding file from watcher for ${folderPath}: ${(error as Error).message}`)
}
})
}
Expand All @@ -41,12 +62,12 @@ export async function initializeWatchers() {
watchFolder(folder.path)
}
} catch (error) {
console.error('Failed to initialize watchers:', error)
logger.error(`Failed to initialize watchers: ${(error as Error).message}`)
}
}

export function stopWatcher(folderPath: string) {
const watcher = chokidar.watch(folderPath, { ignored: /^\./, persistent: true, ignoreInitial: true })
const watcher = chokidar.watch(folderPath, { ignored: (p: string) => WATCHER_IGNORED.some((re) => re.test(p)), persistent: true, ignoreInitial: true })
if (watcher) {
watcher.close()
}
Expand Down
3 changes: 2 additions & 1 deletion apps/desktop/electron.vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ export default defineConfig({
input: {
main: resolve(__dirname, 'lib/main/main.ts'),
},
external: ['chromadb', '@shared', 'onnxruntime-node', '@ffmpeg-installer/ffmpeg', '@ffprobe-installer/ffprobe', 'sharp', 'egm96-universal'],
external: ['chromadb', '@shared', 'onnxruntime-node', '@ffmpeg-installer/ffmpeg', '@ffprobe-installer/ffprobe', 'sharp', 'egm96-universal', 'node-llama-cpp'],
},
outDir: 'dist',
},
},
preload: {
Expand Down
4 changes: 2 additions & 2 deletions apps/desktop/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
"name": "desktop",
"version": "0.1.1",
"description": "AI-Powered Video Indexing and Search",
"main": "./out/main/main.js",
"main": "./dist/main.js",
"license": "MIT",
"author": {
"name": "ilias",
"url": "https://github.com/iliashad"
},
"scripts": {
"dev": "pnpm run build:shared && cross-env ELECTRON_DISABLE_SANDBOX=1 electron-vite dev -w",
"dev": "cross-env ELECTRON_DISABLE_SANDBOX=1 electron-vite dev -w --config electron.vite.config.ts",
"format": "prettier --write .",
"lint": "eslint . --ext .ts,.tsx --fix",
"start": "electron-vite preview",
Expand Down
Loading