diff --git a/src/auth-migration.ts b/src/auth-migration.ts new file mode 100644 index 0000000..d515696 --- /dev/null +++ b/src/auth-migration.ts @@ -0,0 +1,330 @@ +import { ClickHouseService } from './clickhouse.ts'; +import { ClickHouseConfig } from './types.ts'; + +/** + * AuthMigrationService handles database schema migrations for user authentication + * Creates and manages users, api_keys tables and updates existing events table + */ +export class AuthMigrationService { + private clickhouse: ClickHouseService; + private config: ClickHouseConfig; + + constructor(clickhouse: ClickHouseService) { + this.clickhouse = clickhouse; + this.config = clickhouse.config; + } + + /** + * Run all authentication-related migrations + */ + async runAuthMigrations(): Promise { + console.log('Starting authentication database migrations...'); + + try { + await this.createUsersTable(); + await this.createApiKeysTable(); + await this.updateEventsTableWithUserId(); + + console.log('Authentication migrations completed successfully'); + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + throw new Error(`Authentication migration failed: ${errorMessage}`); + } + } + + /** + * Create the users table for storing user accounts + */ + async createUsersTable(): Promise { + const systemDatabase = + this.config.systemDatabase || this.config.database || 'default'; + + const createUsersTableQuery = ` + CREATE TABLE IF NOT EXISTS ${systemDatabase}.users ( + id String DEFAULT generateUUIDv4(), + email String, + password_hash String, + created_at DateTime DEFAULT now(), + updated_at DateTime DEFAULT now() + ) + ENGINE = MergeTree() + ORDER BY id + SETTINGS index_granularity = 8192 + `; + + try { + await this.clickhouse.queryDatabase( + systemDatabase, + createUsersTableQuery + ); + console.log( + `Users table created successfully in database '${systemDatabase}'` + ); + + // Create unique index on email for fast lookups and uniqueness enforcement + const createEmailIndexQuery = ` + CREATE INDEX IF NOT EXISTS idx_users_email ON ${systemDatabase}.users (email) TYPE bloom_filter GRANULARITY 1 + `; + + try { + await this.clickhouse.queryDatabase( + systemDatabase, + createEmailIndexQuery + ); + console.log('Email index created for users table'); + } catch (indexError) { + // Index creation might fail in some ClickHouse versions, but table creation succeeded + console.warn( + 'Could not create email index (this is not critical):', + indexError + ); + } + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + throw new Error(`Failed to create users table: ${errorMessage}`); + } + } + + /** + * Create the api_keys table for storing user API keys + */ + async createApiKeysTable(): Promise { + const systemDatabase = + this.config.systemDatabase || this.config.database || 'default'; + + const createApiKeysTableQuery = ` + CREATE TABLE IF NOT EXISTS ${systemDatabase}.api_keys ( + id String DEFAULT generateUUIDv4(), + user_id String, + key_hash String, + name String DEFAULT '', + created_at DateTime DEFAULT now(), + last_used_at Nullable(DateTime) + ) + ENGINE = MergeTree() + ORDER BY (user_id, id) + SETTINGS index_granularity = 8192 + `; + + try { + await this.clickhouse.queryDatabase( + systemDatabase, + createApiKeysTableQuery + ); + console.log( + `API keys table created successfully in database '${systemDatabase}'` + ); + + // Create index on key_hash for fast API key validation + const createKeyHashIndexQuery = ` + CREATE INDEX IF NOT EXISTS idx_api_keys_hash ON ${systemDatabase}.api_keys (key_hash) TYPE bloom_filter GRANULARITY 1 + `; + + try { + await this.clickhouse.queryDatabase( + systemDatabase, + createKeyHashIndexQuery + ); + console.log('Key hash index created for api_keys table'); + } catch (indexError) { + // Index creation might fail in some ClickHouse versions, but table creation succeeded + console.warn( + 'Could not create key hash index (this is not critical):', + indexError + ); + } + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + throw new Error(`Failed to create api_keys table: ${errorMessage}`); + } + } + + /** + * Update existing events table to include user_id column + */ + async updateEventsTableWithUserId(): Promise { + const systemDatabase = + this.config.systemDatabase || this.config.database || 'default'; + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-explicit-any + const tableName = (this.config as any).tableName || 'events'; + + try { + // Check if user_id column already exists + const checkColumnQuery = ` + SELECT name + FROM system.columns + WHERE database = '${systemDatabase}' + AND table = '${tableName}' + AND name = 'user_id' + `; + + const existingColumns = await this.clickhouse.queryDatabaseJSON( + 'system', + checkColumnQuery + ); + + if (existingColumns.length > 0) { + console.log(`Column 'user_id' already exists in table '${tableName}'`); + return; + } + + // Add user_id column to existing events table + const addColumnQuery = ` + ALTER TABLE ${systemDatabase}.${tableName} + ADD COLUMN IF NOT EXISTS user_id Nullable(String) + `; + + await this.clickhouse.queryDatabase(systemDatabase, addColumnQuery); + console.log(`Added 'user_id' column to events table '${tableName}'`); + + // Create index on user_id for efficient user-specific queries + const createUserIdIndexQuery = ` + CREATE INDEX IF NOT EXISTS idx_events_user_id ON ${systemDatabase}.${tableName} (user_id) TYPE bloom_filter GRANULARITY 1 + `; + + try { + await this.clickhouse.queryDatabase( + systemDatabase, + createUserIdIndexQuery + ); + console.log('User ID index created for events table'); + } catch (indexError) { + // Index creation might fail in some ClickHouse versions, but column addition succeeded + console.warn( + 'Could not create user_id index (this is not critical):', + indexError + ); + } + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + throw new Error( + `Failed to update events table with user_id column: ${errorMessage}` + ); + } + } + + /** + * Verify that all authentication tables exist and have correct schema + */ + async verifyAuthSchema(): Promise<{ + usersTableExists: boolean; + apiKeysTableExists: boolean; + eventsHasUserId: boolean; + errors: string[]; + }> { + const systemDatabase = + this.config.systemDatabase || this.config.database || 'default'; + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-explicit-any + const tableName = (this.config as any).tableName || 'events'; + const errors: string[] = []; + + let usersTableExists = false; + let apiKeysTableExists = false; + let eventsHasUserId = false; + + try { + // Check users table + const usersTableQuery = ` + SELECT 1 FROM system.tables + WHERE database = '${systemDatabase}' AND name = 'users' + LIMIT 1 + `; + const usersResult = await this.clickhouse.queryDatabaseJSON( + 'system', + usersTableQuery + ); + usersTableExists = usersResult.length > 0; + + if (!usersTableExists) { + errors.push('Users table does not exist'); + } + + // Check api_keys table + const apiKeysTableQuery = ` + SELECT 1 FROM system.tables + WHERE database = '${systemDatabase}' AND name = 'api_keys' + LIMIT 1 + `; + const apiKeysResult = await this.clickhouse.queryDatabaseJSON( + 'system', + apiKeysTableQuery + ); + apiKeysTableExists = apiKeysResult.length > 0; + + if (!apiKeysTableExists) { + errors.push('API keys table does not exist'); + } + + // Check events table has user_id column + const eventsColumnQuery = ` + SELECT name + FROM system.columns + WHERE database = '${systemDatabase}' + AND table = '${tableName}' + AND name = 'user_id' + `; + const eventsColumnResult = await this.clickhouse.queryDatabaseJSON( + 'system', + eventsColumnQuery + ); + eventsHasUserId = eventsColumnResult.length > 0; + + if (!eventsHasUserId) { + errors.push(`Events table '${tableName}' does not have user_id column`); + } + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + errors.push(`Schema verification failed: ${errorMessage}`); + } + + return { + usersTableExists, + apiKeysTableExists, + eventsHasUserId, + errors, + }; + } + + /** + * Drop all authentication tables (use with caution - for testing/rollback only) + */ + async dropAuthTables(): Promise { + const systemDatabase = + this.config.systemDatabase || this.config.database || 'default'; + + console.warn( + 'Dropping authentication tables - this will delete all user data!' + ); + + try { + // Drop api_keys table first (has foreign key reference to users) + await this.clickhouse.queryDatabase( + systemDatabase, + `DROP TABLE IF EXISTS ${systemDatabase}.api_keys` + ); + console.log('Dropped api_keys table'); + + // Drop users table + await this.clickhouse.queryDatabase( + systemDatabase, + `DROP TABLE IF EXISTS ${systemDatabase}.users` + ); + console.log('Dropped users table'); + + // Note: We don't drop the user_id column from events table as it might contain data + console.log('Authentication tables dropped successfully'); + console.warn( + 'Note: user_id column in events table was not removed to preserve data' + ); + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + throw new Error(`Failed to drop authentication tables: ${errorMessage}`); + } + } +} diff --git a/src/clickhouse.ts b/src/clickhouse.ts new file mode 100644 index 0000000..10c3e42 --- /dev/null +++ b/src/clickhouse.ts @@ -0,0 +1,1353 @@ +import { ClickHouseClient, createClient } from '@clickhouse/client'; +import { + ClickHouseConfig, + DashboardStats, + EventData, + UserContext, +} from './types.ts'; + +// Import broadcast functions for real-time updates +// eslint-disable-next-line @typescript-eslint/no-explicit-any +let broadcastEvent: ((event: any) => void) | null = null; +// eslint-disable-next-line @typescript-eslint/no-explicit-any +let broadcastStats: ((stats: any) => void) | null = null; + +// Dynamic import to avoid circular dependency +try { + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + const wsModule = await import('../routes/api/ws.ts'); + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + broadcastEvent = wsModule.broadcastEvent; + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + broadcastStats = wsModule.broadcastStats; +} catch (error) { + console.log('WebSocket module not available for broadcasting'); +} + +export class ClickHouseService { + config: ClickHouseConfig; + private client: ClickHouseClient; + private eventBuffer: EventData[] = []; + private userEventBuffers: Map = new Map(); + private bufferSize = 100; + private flushInterval = 5000; // 5 seconds + private statsInterval = 10000; // 10 seconds + private userTableCache: Map = new Map(); + private flushIntervalId?: number; + private statsIntervalId?: number; + + constructor(config: ClickHouseConfig) { + this.config = config; + + // Create ClickHouse client with official SDK + this.client = createClient({ + url: `http://${config.host}:${config.port}`, + username: config.username, + password: config.password, + database: config.systemDatabase || config.database || 'default', + request_timeout: 30000, + compression: { + response: true, + request: true, + }, + }); + + this.startBufferFlush(); + this.startStatsUpdates(); + } + + /** + * Sanitizes user ID to be safe for use in ClickHouse table names + * Replaces special characters with underscores and ensures valid identifier format + */ + private sanitizeUserId(userId: string): string { + if (!userId || typeof userId !== 'string') { + throw new Error('User ID must be a non-empty string'); + } + + // Trim whitespace + let sanitized = userId.trim(); + + if (sanitized.length === 0) { + throw new Error('User ID cannot be empty or only whitespace'); + } + + // Replace special characters with underscores + // Keep only alphanumeric characters and underscores + sanitized = sanitized.replace(/[^a-zA-Z0-9_]/g, '_'); + + // Clean up multiple consecutive underscores + sanitized = sanitized.replace(/_+/g, '_'); + + // Remove trailing underscores + sanitized = sanitized.replace(/_+$/, ''); + + // Remove leading underscores + sanitized = sanitized.replace(/^_+/, ''); + + // If we're left with nothing but had content before, create a fallback + if (sanitized.length === 0) { + sanitized = 'user'; + } + + // Ensure it doesn't start with a number (ClickHouse identifier requirement) + if (/^[0-9]/.test(sanitized)) { + sanitized = 'u_' + sanitized; + } + + // Limit length to avoid ClickHouse identifier limits (max 127 characters) + // Reserve space for table prefix, so limit user part to 100 characters + if (sanitized.length > 100) { + sanitized = sanitized.substring(0, 100); + // Remove trailing underscore if truncation created one + sanitized = sanitized.replace(/_+$/, ''); + } + + // Final validation - must not be empty after sanitization + if (sanitized.length === 0) { + sanitized = 'user'; + } + + return sanitized; + } + + /** + * Generates a consistent table name for a user + * Uses the configured table prefix and sanitized user ID + */ + getUserTableName(userId: string): string { + // Check cache first for performance + if (this.userTableCache.has(userId)) { + return this.userTableCache.get(userId)!; + } + + const sanitizedUserId = this.sanitizeUserId(userId); + const tablePrefix = this.config.tablePrefix; + + // Handle empty table prefix case - check for empty string specifically + const tableName = + tablePrefix !== undefined && tablePrefix !== '' + ? `${tablePrefix}_${sanitizedUserId}` + : sanitizedUserId; + + // Validate final table name length + if (tableName.length > 127) { + throw new Error( + `Generated table name exceeds ClickHouse identifier limit: ${tableName}` + ); + } + + // Validate table name format (ClickHouse identifier rules) + if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(tableName)) { + throw new Error( + `Generated table name is not a valid ClickHouse identifier: ${tableName}` + ); + } + + // Cache the result + this.userTableCache.set(userId, tableName); + + return tableName; + } + + /** + * Validates if a string is a valid ClickHouse identifier + */ + private isValidClickHouseIdentifier(identifier: string): boolean { + if (!identifier || identifier.length === 0 || identifier.length > 127) { + return false; + } + + // Must start with letter or underscore, followed by letters, numbers, or underscores + return /^[a-zA-Z_][a-zA-Z0-9_]*$/.test(identifier); + } + + /** + * Execute a query against a specific database + */ + async queryDatabase(database: string, query: string): Promise { + try { + const result = await this.client.query({ + query, + format: 'TabSeparated', + clickhouse_settings: { + database, + }, + }); + return await result.text(); + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + throw new Error( + `Query failed on database '${database}': ${errorMessage}` + ); + } + } + + /** + * Execute a query against a specific database and return JSON results + */ + async queryDatabaseJSON( + database: string, + query: string, + queryParams?: Record + ): Promise { + try { + const result = await this.client.query({ + query, + format: 'JSONEachRow', + query_params: queryParams, + clickhouse_settings: { + database, + }, + }); + return await result.json(); + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + throw new Error( + `Query failed on database '${database}': ${errorMessage}` + ); + } + } + + /** + * Legacy query method - uses system database by default + */ + async query(query: string): Promise { + const database = + this.config.systemDatabase || this.config.database || 'default'; + return this.queryDatabase(database, query); + } + + /** + * Legacy queryJSON method - uses system database by default + */ + async queryJSON(query: string): Promise { + const database = + this.config.systemDatabase || this.config.database || 'default'; + return this.queryDatabaseJSON(database, query); + } + + /** + * Initialize the user database for storing user-specific tables + * Creates the database if it doesn't exist + */ + async initializeUserDatabase(): Promise { + const userDatabase = this.config.userDatabase || 'user_events'; + + try { + // Create the user database if it doesn't exist + const createDbQuery = `CREATE DATABASE IF NOT EXISTS ${userDatabase}`; + await this.client.command({ + query: createDbQuery, + }); + + console.log(`User database '${userDatabase}' initialized successfully`); + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + console.error( + `Failed to initialize user database '${userDatabase}': ${errorMessage}` + ); + } + } + + /** + * Creates a user-specific table with the standard events schema + * Returns the table name that was created + */ + async createUserTable(userId: string): Promise { + if (!userId || typeof userId !== 'string') { + throw new Error('User ID must be a non-empty string'); + } + + const tableName = this.getUserTableName(userId); + const userDatabase = this.config.userDatabase || 'user_events'; + + try { + // Ensure user database exists first + await this.initializeUserDatabase(); + + const createQuery = ` + CREATE TABLE IF NOT EXISTS ${userDatabase}.${tableName} ( + id String DEFAULT generateUUIDv4(), + timestamp DateTime DEFAULT now(), + data JSON, + source String DEFAULT '', + ip String DEFAULT '', + user_agent String DEFAULT '', + created_at DateTime DEFAULT now() + ) + ENGINE = MergeTree() + PARTITION BY toYYYYMM(timestamp) + ORDER BY (timestamp, id) + SETTINGS index_granularity = 8192 + `; + + await this.client.command({ + query: createQuery, + clickhouse_settings: { + database: userDatabase, + }, + }); + + console.log( + `User table '${tableName}' created successfully in database '${userDatabase}'` + ); + + return tableName; + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + throw new Error( + `Failed to create user table for user '${userId}': ${errorMessage}` + ); + } + } + + /** + * Ensures a user table exists, creating it if necessary + * Returns the table name + */ + async ensureUserTable(userId: string): Promise { + if (!userId || typeof userId !== 'string') { + throw new Error('User ID must be a non-empty string'); + } + + const tableName = this.getUserTableName(userId); + const userDatabase = this.config.userDatabase || 'user_events'; + + try { + // Check if table already exists + const checkQuery = `SELECT 1 FROM system.tables WHERE database = '${userDatabase}' AND name = '${tableName}' LIMIT 1`; + const result = await this.client.query({ + query: checkQuery, + format: 'JSONEachRow', + clickhouse_settings: { + database: 'system', + }, + }); + const rows = await result.json(); + + if (rows.length > 0) { + // Table exists, return the name + return tableName; + } + + // Table doesn't exist, create it + return await this.createUserTable(userId); + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + throw new Error( + `Failed to ensure user table for user '${userId}': ${errorMessage}` + ); + } + } + + /** + * Initialize system database and legacy table (for backward compatibility) + */ + async initialize(): Promise { + // Use legacy tableName if available, otherwise use a default + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-explicit-any + const tableName = (this.config as any).tableName || 'events'; + const systemDatabase = + this.config.systemDatabase || this.config.database || 'default'; + + const createQuery = ` + CREATE TABLE IF NOT EXISTS ${tableName} ( + id String DEFAULT generateUUIDv4(), + timestamp DateTime DEFAULT now(), + data JSON, + source String DEFAULT '', + ip String DEFAULT '', + user_agent String DEFAULT '', + created_at DateTime DEFAULT now() + ) + ENGINE = MergeTree() + PARTITION BY toYYYYMM(timestamp) + ORDER BY (timestamp, id) + SETTINGS index_granularity = 8192 + `; + + await this.client.command({ + query: createQuery, + clickhouse_settings: { + database: systemDatabase, + }, + }); + console.log(`System database '${systemDatabase}' initialized successfully`); + } + + async insertEvent(event: EventData, userId?: string): Promise { + if (userId) { + return this.insertEventForUser(userId, event); + } + + const formattedEvent = { + id: event.id || crypto.randomUUID(), + timestamp: event.timestamp || new Date().toISOString(), + data: JSON.stringify(event.data), + source: event.source || '', + ip: event.ip || '', + user_agent: event.user_agent || '', + }; + + // Store the formatted event with stringified data for ClickHouse + this.eventBuffer.push(formattedEvent as any); + + // Broadcast new event to connected clients + if (broadcastEvent) { + broadcastEvent({ + id: formattedEvent.id, + timestamp: formattedEvent.timestamp, + data: event.data, // Send original data, not stringified + source: formattedEvent.source, + ip: formattedEvent.ip, + }); + } + + if (this.eventBuffer.length >= this.bufferSize) { + await this.flushBuffer(); + } + } + + /** + * Insert event into user-specific table with buffering + */ + async insertEventForUser(userId: string, event: EventData): Promise { + if (!userId || typeof userId !== 'string') { + throw new Error('User ID must be a non-empty string'); + } + + // Ensure user table exists + await this.ensureUserTable(userId); + + const formattedEvent = { + id: event.id || crypto.randomUUID(), + timestamp: event.timestamp || new Date().toISOString(), + data: JSON.stringify(event.data), + source: event.source || '', + ip: event.ip || '', + user_agent: event.user_agent || '', + }; + + // Get or create user-specific buffer + if (!this.userEventBuffers.has(userId)) { + this.userEventBuffers.set(userId, []); + } + + const userBuffer = this.userEventBuffers.get(userId)!; + userBuffer.push(formattedEvent as any); + + // Broadcast new event to connected clients + if (broadcastEvent) { + broadcastEvent({ + id: formattedEvent.id, + timestamp: formattedEvent.timestamp, + data: event.data, // Send original data, not stringified + source: formattedEvent.source, + ip: formattedEvent.ip, + userId: userId, // Include user context in broadcast + }); + } + + // Flush user buffer if it reaches the buffer size + if (userBuffer.length >= this.bufferSize) { + await this.flushUserBuffer(userId); + } + } + + private async flushBuffer(): Promise { + if (this.eventBuffer.length === 0) return; + + const events = this.eventBuffer.splice(0); + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-explicit-any + const tableName = (this.config as any).tableName || 'events'; + + try { + await this.client.insert({ + table: tableName, + values: events, + format: 'JSONEachRow', + }); + } catch (error) { + console.error('Buffer flush error:', error); + this.eventBuffer.unshift(...events); + } + } + + /** + * Flush events from a specific user's buffer to their table + */ + private async flushUserBuffer(userId: string): Promise { + const userBuffer = this.userEventBuffers.get(userId); + if (!userBuffer || userBuffer.length === 0) return; + + const events = userBuffer.splice(0); + const tableName = this.getUserTableName(userId); + const userDatabase = this.config.userDatabase || 'user_events'; + + try { + await this.client.insert({ + table: `${userDatabase}.${tableName}`, + values: events, + format: 'JSONEachRow', + }); + } catch (error) { + console.error(`User buffer flush error for user '${userId}':`, error); + userBuffer.unshift(...events); + } + } + + private startBufferFlush(): void { + this.flushIntervalId = setInterval(() => { + // Flush legacy buffer + this.flushBuffer().catch(console.error); + + // Flush all user buffers + this.flushAllUserBuffers().catch(console.error); + }, this.flushInterval); + } + + /** + * Flush all user buffers that have pending events + */ + private async flushAllUserBuffers(): Promise { + const flushPromises: Promise[] = []; + + for (const userId of this.userEventBuffers.keys()) { + const userBuffer = this.userEventBuffers.get(userId); + if (userBuffer && userBuffer.length > 0) { + flushPromises.push(this.flushUserBuffer(userId)); + } + } + + if (flushPromises.length > 0) { + await Promise.allSettled(flushPromises); + } + } + + private startStatsUpdates(): void { + this.statsIntervalId = setInterval(async () => { + try { + if (broadcastStats) { + const stats = await this.getStats(); + broadcastStats(stats); + } + } catch (error) { + console.error('Stats broadcast error:', error); + } + }, this.statsInterval); + } + + /** + * Cleanup method to clear intervals and prevent memory leaks + * Should be called when the service is no longer needed + */ + async cleanup(): Promise { + if (this.flushIntervalId !== undefined) { + clearInterval(this.flushIntervalId); + this.flushIntervalId = undefined; + } + if (this.statsIntervalId !== undefined) { + clearInterval(this.statsIntervalId); + this.statsIntervalId = undefined; + } + + // Close the ClickHouse client connection + await this.client.close(); + } + + async getStats(userId?: string): Promise { + if (userId) { + return this.getStatsForUser(userId); + } + + // Legacy behavior - query system/shared table + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-explicit-any + const tableName = (this.config as any).tableName || 'events'; + const queries = [ + `SELECT count() as total FROM ${tableName}`, + `SELECT count() as today FROM ${tableName} WHERE toDate(timestamp) = today()`, + `SELECT count() as hour FROM ${tableName} WHERE timestamp >= now() - INTERVAL 1 HOUR`, + `SELECT formatReadableSize(sum(length(data))) as size FROM ${tableName}`, + `SELECT id, timestamp, data, source, ip FROM ${tableName} ORDER BY timestamp DESC LIMIT 10`, + ]; + + const [total, today, hour, size, recent] = await Promise.all([ + this.queryJSON(queries[0]), + this.queryJSON(queries[1]), + this.queryJSON(queries[2]), + this.queryJSON(queries[3]), + this.queryJSON(queries[4]), + ]); + + return { + totalEvents: total[0]?.total || 0, + eventsToday: today[0]?.today || 0, + eventsLastHour: hour[0]?.hour || 0, + dataSize: size[0]?.size || '0 B', + recentEvents: recent.map((r) => ({ + id: r.id, + timestamp: r.timestamp, + data: typeof r.data === 'string' ? JSON.parse(r.data) : r.data, + source: r.source, + ip: r.ip, + })), + }; + } + + /** + * Get statistics for a specific user from their dedicated table + */ + async getStatsForUser(userId: string): Promise { + if (!userId || typeof userId !== 'string') { + throw new Error('User ID must be a non-empty string'); + } + + // Ensure user table exists + await this.ensureUserTable(userId); + + const tableName = this.getUserTableName(userId); + const userDatabase = this.config.userDatabase || 'user_events'; + + const queries = [ + `SELECT count() as total FROM ${userDatabase}.${tableName}`, + `SELECT count() as today FROM ${userDatabase}.${tableName} WHERE toDate(timestamp) = today()`, + `SELECT count() as hour FROM ${userDatabase}.${tableName} WHERE timestamp >= now() - INTERVAL 1 HOUR`, + `SELECT formatReadableSize(sum(length(data))) as size FROM ${userDatabase}.${tableName}`, + `SELECT id, timestamp, data, source, ip FROM ${userDatabase}.${tableName} ORDER BY timestamp DESC LIMIT 10`, + ]; + + try { + const [total, today, hour, size, recent] = await Promise.all([ + this.queryDatabaseJSON(userDatabase, queries[0]), + this.queryDatabaseJSON(userDatabase, queries[1]), + this.queryDatabaseJSON(userDatabase, queries[2]), + this.queryDatabaseJSON(userDatabase, queries[3]), + this.queryDatabaseJSON(userDatabase, queries[4]), + ]); + + return { + totalEvents: total[0]?.total || 0, + eventsToday: today[0]?.today || 0, + eventsLastHour: hour[0]?.hour || 0, + dataSize: size[0]?.size || '0 B', + recentEvents: recent.map((r) => ({ + id: r.id, + timestamp: r.timestamp, + data: typeof r.data === 'string' ? JSON.parse(r.data) : r.data, + source: r.source, + ip: r.ip, + })), + }; + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + throw new Error( + `Failed to get stats for user '${userId}': ${errorMessage}` + ); + } + } + + /** + * Query a user-specific table with custom SQL + */ + async queryUserTable(userId: string, query: string): Promise { + if (!userId || typeof userId !== 'string') { + throw new Error('User ID must be a non-empty string'); + } + + if (!query || typeof query !== 'string') { + throw new Error('Query must be a non-empty string'); + } + + // Ensure user table exists + await this.ensureUserTable(userId); + + const tableName = this.getUserTableName(userId); + const userDatabase = this.config.userDatabase || 'user_events'; + + try { + // Replace table placeholder in query if present + const processedQuery = query.replace( + /\{userTable\}/g, + `${userDatabase}.${tableName}` + ); + + return await this.queryDatabaseJSON(userDatabase, processedQuery); + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + throw new Error( + `Failed to query user table for user '${userId}': ${errorMessage}` + ); + } + } + + /** + * List all user tables in the user database for administrative operations + * Returns an array of table names that match the user table pattern + */ + async listUserTables(): Promise { + const userDatabase = this.config.userDatabase || 'user_events'; + const tablePrefix = this.config.tablePrefix || 'events_user'; + + try { + // Query system.tables to find all tables in the user database that match our pattern + const query = ` + SELECT name + FROM system.tables + WHERE database = '${userDatabase}' + AND name LIKE '${tablePrefix}_%' + ORDER BY name + `; + + const result = await this.client.query({ + query, + format: 'JSONEachRow', + clickhouse_settings: { + database: 'system', + }, + }); + const rows = await result.json(); + return rows.map((row: any) => row.name); + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + throw new Error(`Failed to list user tables: ${errorMessage}`); + } + } + + /** + * Drop a user's table for cleanup operations + * This permanently deletes all data for the specified user + */ + async dropUserTable(userId: string): Promise { + if (!userId || typeof userId !== 'string') { + throw new Error('User ID must be a non-empty string'); + } + + const tableName = this.getUserTableName(userId); + const userDatabase = this.config.userDatabase || 'user_events'; + + try { + // Check if table exists before attempting to drop + const checkQuery = `SELECT 1 FROM system.tables WHERE database = '${userDatabase}' AND name = '${tableName}' LIMIT 1`; + const result = await this.client.query({ + query: checkQuery, + format: 'JSONEachRow', + clickhouse_settings: { + database: 'system', + }, + }); + const exists = await result.json(); + + if (exists.length === 0) { + throw new Error( + `User table '${tableName}' does not exist in database '${userDatabase}'` + ); + } + + // Drop the table + const dropQuery = `DROP TABLE ${userDatabase}.${tableName}`; + await this.client.command({ + query: dropQuery, + clickhouse_settings: { + database: userDatabase, + }, + }); + + // Clear the table name from cache + this.userTableCache.delete(userId); + + // Clear user buffer if it exists + this.userEventBuffers.delete(userId); + + console.log( + `User table '${tableName}' dropped successfully from database '${userDatabase}'` + ); + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + throw new Error( + `Failed to drop user table for user '${userId}': ${errorMessage}` + ); + } + } + + /** + * Validate the integrity of a user's table for data verification + * Checks table structure, data consistency, and returns validation results + */ + async validateUserTableIntegrity(userId: string): Promise<{ + isValid: boolean; + tableExists: boolean; + schemaValid: boolean; + recordCount: number; + issues: string[]; + }> { + if (!userId || typeof userId !== 'string') { + throw new Error('User ID must be a non-empty string'); + } + + const tableName = this.getUserTableName(userId); + const userDatabase = this.config.userDatabase || 'user_events'; + const issues: string[] = []; + let isValid = true; + let tableExists = false; + let schemaValid = false; + let recordCount = 0; + + try { + // Check if table exists + const existsQuery = `SELECT 1 FROM system.tables WHERE database = '${userDatabase}' AND name = '${tableName}' LIMIT 1`; + const existsResult = await this.client.query({ + query: existsQuery, + format: 'JSONEachRow', + clickhouse_settings: { + database: 'system', + }, + }); + const existsRows = await existsResult.json(); + tableExists = existsRows.length > 0; + + if (!tableExists) { + issues.push( + `Table '${tableName}' does not exist in database '${userDatabase}'` + ); + isValid = false; + return { isValid, tableExists, schemaValid, recordCount, issues }; + } + + // Validate table schema + const schemaQuery = ` + SELECT name, type + FROM system.columns + WHERE database = '${userDatabase}' AND table = '${tableName}' + ORDER BY name + `; + const schemaResult = await this.client.query({ + query: schemaQuery, + format: 'JSONEachRow', + clickhouse_settings: { + database: 'system', + }, + }); + const schemaRows = await schemaResult.json(); + + // Expected columns and their types + const expectedSchema = { + id: 'String', + timestamp: 'DateTime', + data: "Object('json')", + source: 'String', + ip: 'String', + user_agent: 'String', + created_at: 'DateTime', + }; + + // Check if all expected columns exist with correct types + const actualColumns = new Map( + schemaRows.map((col: any) => [col.name, col.type]) + ); + + for (const [columnName, expectedType] of Object.entries(expectedSchema)) { + if (!actualColumns.has(columnName)) { + issues.push(`Missing column: ${columnName}`); + isValid = false; + } else { + const actualType = actualColumns.get(columnName); + // Allow some flexibility in type matching (e.g., Object('json') vs JSON) + if ( + actualType !== expectedType && + !( + columnName === 'data' && + (actualType === 'JSON' || actualType?.includes('json')) + ) + ) { + issues.push( + `Column '${columnName}' has type '${actualType}', expected '${expectedType}'` + ); + isValid = false; + } + } + } + + // Check for unexpected columns + for (const columnName of actualColumns.keys()) { + if (!expectedSchema.hasOwnProperty(columnName)) { + issues.push(`Unexpected column: ${columnName}`); + // This is not necessarily invalid, just noteworthy + } + } + + schemaValid = issues.length === 0; + + // Get record count + const countQuery = `SELECT count() as count FROM ${userDatabase}.${tableName}`; + const countResult = await this.queryDatabaseJSON( + userDatabase, + countQuery + ); + recordCount = countResult[0]?.count || 0; + + // Check for data consistency issues + if (recordCount > 0) { + // Check for null IDs + const nullIdQuery = `SELECT count() as count FROM ${userDatabase}.${tableName} WHERE id = ''`; + const nullIdResult = await this.queryDatabaseJSON( + userDatabase, + nullIdQuery + ); + const nullIdCount = nullIdResult[0]?.count || 0; + + if (nullIdCount > 0) { + issues.push(`Found ${nullIdCount} records with empty ID`); + isValid = false; + } + + // Check for invalid timestamps + const invalidTimestampQuery = `SELECT count() as count FROM ${userDatabase}.${tableName} WHERE timestamp = '1970-01-01 00:00:00'`; + const invalidTimestampResult = await this.queryDatabaseJSON( + userDatabase, + invalidTimestampQuery + ); + const invalidTimestampCount = invalidTimestampResult[0]?.count || 0; + + if (invalidTimestampCount > 0) { + issues.push( + `Found ${invalidTimestampCount} records with invalid timestamps` + ); + isValid = false; + } + + // Check for malformed JSON data + try { + const jsonCheckQuery = `SELECT count() as count FROM ${userDatabase}.${tableName} WHERE NOT isValidJSON(data)`; + const jsonCheckResult = await this.queryDatabaseJSON( + userDatabase, + jsonCheckQuery + ); + const invalidJsonCount = jsonCheckResult[0]?.count || 0; + + if (invalidJsonCount > 0) { + issues.push( + `Found ${invalidJsonCount} records with invalid JSON data` + ); + isValid = false; + } + } catch (error) { + // isValidJSON might not be available in all ClickHouse versions + issues.push( + 'Could not validate JSON data format (isValidJSON function not available)' + ); + } + } + + return { + isValid, + tableExists, + schemaValid, + recordCount, + issues, + }; + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + issues.push(`Validation error: ${errorMessage}`); + return { + isValid: false, + tableExists, + schemaValid, + recordCount, + issues, + }; + } + } + + /** + * Get comprehensive statistics about user table usage + * Returns information about all user tables and their sizes + */ + async getUserTableStatistics(): Promise<{ + totalTables: number; + totalEvents: number; + totalSize: string; + tables: Array<{ + tableName: string; + userId: string; + eventCount: number; + dataSize: string; + lastActivity: string | null; + }>; + }> { + const userDatabase = this.config.userDatabase || 'user_events'; + const tablePrefix = this.config.tablePrefix || 'events_user'; + + try { + // Get all user tables + const userTables = await this.listUserTables(); + + const tableStats = []; + let totalEvents = 0; + let totalSizeBytes = 0; + + for (const tableName of userTables) { + try { + // Extract user ID from table name + const userId = tableName.startsWith(tablePrefix + '_') + ? tableName.substring(tablePrefix.length + 1) + : tableName; + + // Get table statistics + const statsQueries = [ + `SELECT count() as count FROM ${userDatabase}.${tableName}`, + `SELECT sum(length(data)) as size FROM ${userDatabase}.${tableName}`, + `SELECT max(timestamp) as last_activity FROM ${userDatabase}.${tableName}`, + ]; + + const [countResult, sizeResult, activityResult] = await Promise.all([ + this.queryDatabaseJSON(userDatabase, statsQueries[0]), + this.queryDatabaseJSON(userDatabase, statsQueries[1]), + this.queryDatabaseJSON(userDatabase, statsQueries[2]), + ]); + + const eventCount = countResult[0]?.count || 0; + const sizeBytes = sizeResult[0]?.size || 0; + const lastActivity = activityResult[0]?.last_activity || null; + + totalEvents += eventCount; + totalSizeBytes += sizeBytes; + + // Format size for display + const formatSize = (bytes: number): string => { + if (bytes === 0) return '0 B'; + const k = 1024; + const sizes = ['B', 'KB', 'MB', 'GB', 'TB']; + const i = Math.floor(Math.log(bytes) / Math.log(k)); + return ( + parseFloat((bytes / Math.pow(k, i)).toFixed(2)) + ' ' + sizes[i] + ); + }; + + tableStats.push({ + tableName, + userId, + eventCount, + dataSize: formatSize(sizeBytes), + lastActivity, + }); + } catch (error) { + console.error(`Error getting stats for table ${tableName}:`, error); + // Continue with other tables even if one fails + tableStats.push({ + tableName, + userId: 'unknown', + eventCount: 0, + dataSize: '0 B', + lastActivity: null, + }); + } + } + + // Format total size + const formatTotalSize = (bytes: number): string => { + if (bytes === 0) return '0 B'; + const k = 1024; + const sizes = ['B', 'KB', 'MB', 'GB', 'TB']; + const i = Math.floor(Math.log(bytes) / Math.log(k)); + return parseFloat((bytes / Math.pow(k, i)).toFixed(2)) + ' ' + sizes[i]; + }; + + return { + totalTables: userTables.length, + totalEvents, + totalSize: formatTotalSize(totalSizeBytes), + tables: tableStats.sort((a, b) => b.eventCount - a.eventCount), // Sort by event count descending + }; + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + throw new Error(`Failed to get user table statistics: ${errorMessage}`); + } + } + + /** + * Ping the ClickHouse server to check connectivity + */ + async ping(): Promise { + try { + const result = await this.client.query({ + query: 'SELECT 1', + format: 'JSONEachRow', + }); + const rows = (await result.json()) as Array<{ '1': number }>; + return rows.length > 0 && rows[0]['1'] === 1; + } catch (error) { + console.error('ClickHouse ping failed:', error); + return false; + } + } + + /** + * Get server information + */ + async getServerInfo(): Promise<{ + version: string; + uptime: number; + timezone: string; + }> { + try { + const queries = [ + 'SELECT version() as version', + 'SELECT uptime() as uptime', + 'SELECT timezone() as timezone', + ]; + + const [versionResult, uptimeResult, timezoneResult] = await Promise.all([ + this.client.query({ query: queries[0], format: 'JSONEachRow' }), + this.client.query({ query: queries[1], format: 'JSONEachRow' }), + this.client.query({ query: queries[2], format: 'JSONEachRow' }), + ]); + + const [versionRows, uptimeRows, timezoneRows] = await Promise.all([ + versionResult.json() as Promise>, + uptimeResult.json() as Promise>, + timezoneResult.json() as Promise>, + ]); + + return { + version: versionRows[0]?.version || 'unknown', + uptime: uptimeRows[0]?.uptime || 0, + timezone: timezoneRows[0]?.timezone || 'unknown', + }; + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + throw new Error(`Failed to get server info: ${errorMessage}`); + } + } +} + +// Export a singleton instance for backward compatibility +let clickHouseInstance: ClickHouseService | null = null; + +export function getClickHouseService( + config?: ClickHouseConfig +): ClickHouseService { + if (!clickHouseInstance && config) { + clickHouseInstance = new ClickHouseService(config); + } else if (!clickHouseInstance) { + throw new Error( + 'ClickHouse service not initialized. Provide config on first call.' + ); + } + return clickHouseInstance; +} + +export function resetClickHouseService(): void { + if (clickHouseInstance) { + clickHouseInstance.cleanup(); + clickHouseInstance = null; + } +} +/** + + * UserContextManager handles user-specific operations and context management + * Provides a higher-level interface for managing user tables and contexts + */ +export class UserContextManager { + private clickhouse: ClickHouseService; + private contextCache: Map = new Map(); + private inFlightRequests: Map> = + new Map(); + + constructor(clickhouse: ClickHouseService) { + this.clickhouse = clickhouse; + } + + /** + * Validates user ID using the same validation logic as sanitizeUserId + * Throws consistent error messages for invalid user IDs + */ + private validateAndSanitizeUserId(userId: string): void { + // Consistent validation that matches sanitizeUserId requirements + if (!userId || typeof userId !== 'string') { + throw new Error('User ID must be a non-empty string'); + } + + const trimmed = userId.trim(); + if (trimmed.length === 0) { + throw new Error('User ID cannot be empty or only whitespace'); + } + + // Validate maximum length to prevent excessively long user IDs + // sanitizeUserId limits to 100 characters after processing, so reject anything > 200 before processing + if (trimmed.length > 200) { + throw new Error( + `User ID exceeds maximum allowed length of 200 characters (got ${trimmed.length})` + ); + } + } + + /** + * Legacy validation method - kept for backward compatibility + * @deprecated Use validateAndSanitizeUserId for consistent error handling + */ + validateUserId(userId: string): boolean { + try { + this.validateAndSanitizeUserId(userId); + return true; + } catch { + return false; + } + } + + /** + * Ensures a user context exists, creating the user table if necessary + * Returns the user context with table information + */ + async ensureUserContext(userId: string): Promise { + // Consolidated validation with consistent error messages + this.validateAndSanitizeUserId(userId); + + // Check cache first + if (this.contextCache.has(userId)) { + return this.contextCache.get(userId)!; + } + + try { + // Ensure user table exists + const tableName = await this.clickhouse.ensureUserTable(userId); + const userDatabase = this.clickhouse.config.userDatabase || 'user_events'; + + const context: UserContext = { + userId, + tableName, + database: userDatabase, + }; + + // Cache the context + this.contextCache.set(userId, context); + + return context; + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + throw new Error( + `Failed to ensure user context for '${userId}': ${errorMessage}` + ); + } + } + + /** + * Gets the user context if it exists, without creating it + * Returns null if the context doesn't exist + * Uses async locking to prevent duplicate database queries for concurrent requests + */ + async getUserContext(userId: string): Promise { + // Validate user ID first + try { + this.validateAndSanitizeUserId(userId); + } catch { + return null; + } + + // Check cache first + if (this.contextCache.has(userId)) { + return this.contextCache.get(userId)!; + } + + // Check if there's already an in-flight request for this user + // This prevents duplicate database queries for concurrent calls + const existingRequest = this.inFlightRequests.get(userId); + if (existingRequest) { + return existingRequest; + } + + // Create and track the database query promise + const requestPromise = this.fetchUserContextFromDatabase(userId); + this.inFlightRequests.set(userId, requestPromise); + + try { + const result = await requestPromise; + return result; + } finally { + // Clean up the in-flight request tracking + this.inFlightRequests.delete(userId); + } + } + + /** + * Internal method to fetch user context from database + * Separated to enable proper async locking + */ + private async fetchUserContextFromDatabase( + userId: string + ): Promise { + try { + // Check if user table exists without creating it + const tableName = this.clickhouse.getUserTableName(userId); + const userDatabase = this.clickhouse.config.userDatabase || 'user_events'; + + // Check if table actually exists in ClickHouse + const checkQuery = `SELECT 1 FROM system.tables WHERE database = '${userDatabase}' AND name = '${tableName}' LIMIT 1`; + const result = await this.clickhouse.queryDatabaseJSON( + 'system', + checkQuery + ); + + if (result.length > 0) { + const context: UserContext = { + userId, + tableName, + database: userDatabase, + }; + + // Cache the context + this.contextCache.set(userId, context); + + return context; + } + + return null; + } catch (error) { + console.error(`Error checking user context for '${userId}':`, error); + return null; + } + } + + /** + * Clears the context cache + * Useful for testing or when you want to force fresh lookups + */ + clearCache(): void { + this.contextCache.clear(); + } + + /** + * Removes a specific user's context from the cache + */ + clearUserCache(userId: string): void { + this.contextCache.delete(userId); + } + + /** + * Gets all cached user contexts + * Useful for debugging or administrative operations + */ + getCachedContexts(): UserContext[] { + return Array.from(this.contextCache.values()); + } + + /** + * Gets the number of cached contexts + */ + getCacheSize(): number { + return this.contextCache.size; + } +} diff --git a/src/documentation.ts b/src/documentation.ts new file mode 100644 index 0000000..f992acb --- /dev/null +++ b/src/documentation.ts @@ -0,0 +1,805 @@ +/// + +import type { EventData } from "./types.ts"; + +// Documentation content structure interfaces +export interface CodeExample { + title: string; + language: string; + code: string; + description?: string; +} + +export interface ApiEndpoint { + method: string; + path: string; + description: string; + requestBody?: any; + responseBody?: any; + statusCodes: { code: number; description: string }[]; + headers?: { name: string; required: boolean; description: string }[]; +} + +export interface ErrorCode { + code: number; + message: string; + description: string; + solution: string; +} + +export interface TroubleshootingItem { + problem: string; + solution: string; + codeExample?: string; +} + +export interface DocumentationContent { + title: string; + description: string; + quickStart: CodeExample[]; + apiEndpoints: ApiEndpoint[]; + integrationExamples: CodeExample[]; + errorCodes: ErrorCode[]; + troubleshooting: TroubleshootingItem[]; +} + +// Helper function to get dynamic host URL +export function getHostUrl(): string { + try { + return Deno.env.get("HOST_URL") || "http://localhost:8000"; + } catch { + return "http://localhost:8000"; + } +} + +// Function to generate API examples with dynamic host URL +export function getQuickStartExamples(hostUrl?: string): CodeExample[] { + const baseUrl = hostUrl || getHostUrl(); + + return [ + { + title: "Single Event Tracking", + language: "bash", + code: `curl -X POST ${baseUrl}/api/track \\ + -H "Content-Type: application/json" \\ + -H "x-user-id: your-user-id" \\ + -d '{ + "event": "page_view", + "properties": { + "page": "/home", + "title": "Home Page", + "timestamp": "2024-01-15T10:30:00Z" + } + }'`, + description: + "Track a single event with user context and custom properties", + }, + { + title: "Batch Event Tracking", + language: "bash", + code: `curl -X POST ${baseUrl}/api/track \\ + -H "Content-Type: application/json" \\ + -H "x-user-id: your-user-id" \\ + -d '[ + { + "event": "button_click", + "properties": { + "button_id": "signup", + "page": "/landing" + } + }, + { + "event": "form_submit", + "properties": { + "form_id": "newsletter", + "email": "user@example.com" + } + } + ]'`, + description: + "Track multiple events in a single request for better performance", + }, + { + title: "Success Response", + language: "json", + code: `{ + "success": true, + "processed": 1, + "userId": "your-user-id" +}`, + description: "Expected response format for successful event tracking", + }, + { + title: "Error Response", + language: "json", + code: `{ + "error": "Bad Request", + "message": "Invalid event data format", + "statusCode": 400 +}`, + description: "Example error response when request format is invalid", + }, + ]; +} + +// Constants for API examples and documentation content +export const API_EXAMPLES: CodeExample[] = getQuickStartExamples(); + +export const INTEGRATION_EXAMPLES: CodeExample[] = [ + { + title: "JavaScript Fetch API - Single Event", + language: "javascript", + code: `// Single event tracking with error handling +async function trackEvent(eventData, userId) { + try { + const response = await fetch('/api/track', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-user-id': userId + }, + body: JSON.stringify(eventData) + }); + + if (!response.ok) { + const errorData = await response.json(); + throw new Error(\`HTTP \${response.status}: \${errorData.message || errorData.error}\`); + } + + const result = await response.json(); + console.log('Event tracked successfully:', result); + return result; + } catch (error) { + console.error('Failed to track event:', error); + throw error; + } +} + +// Usage example +trackEvent({ + event: 'page_view', + properties: { + page: '/dashboard', + title: 'User Dashboard', + referrer: document.referrer, + timestamp: new Date().toISOString() + } +}, 'user-123');`, + description: "Basic client-side event tracking with proper error handling", + }, + { + title: "JavaScript Fetch API - Batch Events", + language: "javascript", + code: `// Batch event tracking for better performance +async function trackBatchEvents(events, userId) { + try { + const response = await fetch('/api/track', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-user-id': userId + }, + body: JSON.stringify(events) + }); + + if (!response.ok) { + const errorData = await response.json(); + throw new Error(\`HTTP \${response.status}: \${errorData.message || errorData.error}\`); + } + + const result = await response.json(); + console.log(\`Successfully tracked \${result.processed} events\`); + return result; + } catch (error) { + console.error('Failed to track batch events:', error); + throw error; + } +} + +// Usage example - tracking multiple user interactions +const userEvents = [ + { + event: 'button_click', + properties: { + button_id: 'cta-signup', + page: '/landing', + position: 'header' + } + }, + { + event: 'form_view', + properties: { + form_id: 'signup-form', + step: 1 + } + }, + { + event: 'form_submit', + properties: { + form_id: 'signup-form', + success: true, + email: 'user@example.com' + } + } +]; + +trackBatchEvents(userEvents, 'user-456');`, + description: + "Batch event tracking for improved performance and reduced network requests", + }, + { + title: "Browser Client-side with Retry Logic", + language: "javascript", + code: `class BrowserEventTracker { + constructor(userId, options = {}) { + this.userId = userId; + this.baseUrl = options.baseUrl || ''; + this.maxRetries = options.maxRetries || 3; + this.retryDelay = options.retryDelay || 1000; + this.eventQueue = []; + this.isOnline = navigator.onLine; + + // Listen for online/offline events + window.addEventListener('online', () => { + this.isOnline = true; + this.flushQueue(); + }); + + window.addEventListener('offline', () => { + this.isOnline = false; + }); + } + + async track(eventData) { + if (!this.isOnline) { + this.eventQueue.push(eventData); + console.log('Offline: Event queued for later'); + return; + } + + return this.trackWithRetry(eventData); + } + + async trackWithRetry(eventData, attempt = 1) { + try { + const response = await fetch(\`\${this.baseUrl}/api/track\`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-user-id': this.userId + }, + body: JSON.stringify(eventData) + }); + + if (!response.ok) { + throw new Error(\`HTTP \${response.status}\`); + } + + return await response.json(); + } catch (error) { + if (attempt < this.maxRetries) { + console.log(\`Retry attempt \${attempt} failed, retrying...\`); + await this.delay(this.retryDelay * Math.pow(2, attempt - 1)); + return this.trackWithRetry(eventData, attempt + 1); + } else { + console.error('Max retries exceeded:', error); + // Queue for later if all retries failed + this.eventQueue.push(eventData); + throw error; + } + } + } + + async flushQueue() { + if (this.eventQueue.length === 0) return; + + const events = [...this.eventQueue]; + this.eventQueue = []; + + try { + await this.trackWithRetry(events); + console.log(\`Flushed \${events.length} queued events\`); + } catch (error) { + // Re-queue events if flush fails + this.eventQueue.unshift(...events); + } + } + + delay(ms) { + return new Promise(resolve => setTimeout(resolve, ms)); + } +} + +// Usage +const tracker = new BrowserEventTracker('user-789', { + maxRetries: 5, + retryDelay: 500 +}); + +// Track events with automatic retry and offline queueing +tracker.track({ + event: 'user_interaction', + properties: { + action: 'scroll', + position: window.scrollY, + timestamp: Date.now() + } +});`, + description: + "Browser-optimized tracker with offline support, retry logic, and event queueing", + }, + { + title: "Node.js Server-side Implementation", + language: "javascript", + code: `const https = require('https'); +const http = require('http'); + +class ServerEventTracker { + constructor(baseUrl, userId, options = {}) { + this.baseUrl = baseUrl; + this.userId = userId; + this.maxRetries = options.maxRetries || 3; + this.timeout = options.timeout || 5000; + this.isHttps = baseUrl.startsWith('https'); + } + + async track(eventData) { + return this.makeRequest(eventData); + } + + async trackBatch(events) { + return this.makeRequest(events); + } + + makeRequest(data) { + const postData = JSON.stringify(data); + const url = new URL(\`\${this.baseUrl}/api/track\`); + + const options = { + hostname: url.hostname, + port: url.port || (this.isHttps ? 443 : 80), + path: url.pathname, + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-user-id': this.userId, + 'Content-Length': Buffer.byteLength(postData) + }, + timeout: this.timeout + }; + + return new Promise((resolve, reject) => { + const client = this.isHttps ? https : http; + + const req = client.request(options, (res) => { + let body = ''; + res.on('data', chunk => body += chunk); + res.on('end', () => { + try { + const result = JSON.parse(body); + if (res.statusCode >= 200 && res.statusCode < 300) { + resolve(result); + } else { + reject(new Error(\`HTTP \${res.statusCode}: \${result.message || result.error}\`)); + } + } catch (error) { + reject(new Error(\`Invalid JSON response: \${body}\`)); + } + }); + }); + + req.on('error', reject); + req.on('timeout', () => { + req.destroy(); + reject(new Error('Request timeout')); + }); + + req.write(postData); + req.end(); + }); + } + + async trackWithRetry(eventData) { + let lastError; + + for (let attempt = 1; attempt <= this.maxRetries; attempt++) { + try { + return await this.track(eventData); + } catch (error) { + lastError = error; + if (attempt < this.maxRetries) { + const delay = Math.min(1000 * Math.pow(2, attempt - 1), 10000); + console.log(\`Attempt \${attempt} failed, retrying in \${delay}ms...\`); + await new Promise(resolve => setTimeout(resolve, delay)); + } + } + } + + throw lastError; + } +} + +// Usage examples +const tracker = new ServerEventTracker('http://localhost:8000', 'server-user-123', { + maxRetries: 5, + timeout: 10000 +}); + +// Track single server-side event +tracker.trackWithRetry({ + event: 'api_request', + properties: { + endpoint: '/api/users', + method: 'GET', + response_time: 150, + status_code: 200, + user_agent: req.headers['user-agent'] + } +}).catch(console.error); + +// Track batch of server events +const serverEvents = [ + { + event: 'database_query', + properties: { + query_type: 'SELECT', + table: 'users', + duration: 45 + } + }, + { + event: 'cache_hit', + properties: { + key: 'user:123', + ttl: 3600 + } + } +]; + +tracker.trackBatch(serverEvents).catch(console.error);`, + description: + "Production-ready Node.js server-side implementation with timeout handling and retry logic", + }, + { + title: "TypeScript with Advanced Error Handling", + language: "typescript", + code: `interface TrackingEvent { + event: string; + properties: Record; + timestamp?: string; +} + +interface TrackingResponse { + success: boolean; + processed: number; + userId: string; +} + +interface TrackerOptions { + maxRetries?: number; + retryDelay?: number; + timeout?: number; + onError?: (error: Error, event: TrackingEvent | TrackingEvent[]) => void; + onSuccess?: (response: TrackingResponse) => void; +} + +class TypeScriptEventTracker { + private baseUrl: string; + private userId: string; + private options: Required; + + constructor(baseUrl: string, userId: string, options: TrackerOptions = {}) { + this.baseUrl = baseUrl; + this.userId = userId; + this.options = { + maxRetries: options.maxRetries ?? 3, + retryDelay: options.retryDelay ?? 1000, + timeout: options.timeout ?? 5000, + onError: options.onError ?? (() => {}), + onSuccess: options.onSuccess ?? (() => {}) + }; + } + + async track(event: TrackingEvent): Promise { + return this.trackWithRetry(event); + } + + async trackBatch(events: TrackingEvent[]): Promise { + if (events.length === 0) { + throw new Error('Cannot track empty event batch'); + } + return this.trackWithRetry(events); + } + + private async trackWithRetry( + data: TrackingEvent | TrackingEvent[] + ): Promise { + let lastError: Error; + + for (let attempt = 1; attempt <= this.options.maxRetries; attempt++) { + try { + const response = await this.makeRequest(data); + this.options.onSuccess(response); + return response; + } catch (error) { + lastError = error as Error; + + if (attempt < this.options.maxRetries && this.isRetryableError(error as Error)) { + const delay = this.options.retryDelay * Math.pow(2, attempt - 1); + await this.delay(delay); + continue; + } + + this.options.onError(lastError, data); + throw lastError; + } + } + + throw lastError!; + } + + private async makeRequest(data: TrackingEvent | TrackingEvent[]): Promise { + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), this.options.timeout); + + try { + const response = await fetch(\`\${this.baseUrl}/api/track\`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-user-id': this.userId + }, + body: JSON.stringify(data), + signal: controller.signal + }); + + clearTimeout(timeoutId); + + if (!response.ok) { + const errorData = await response.json().catch(() => ({})); + throw new Error(\`HTTP \${response.status}: \${errorData.message || errorData.error || 'Unknown error'}\`); + } + + return await response.json(); + } catch (error) { + clearTimeout(timeoutId); + if (error.name === 'AbortError') { + throw new Error('Request timeout'); + } + throw error; + } + } + + private isRetryableError(error: Error): boolean { + // Retry on network errors, timeouts, and 5xx server errors + return error.message.includes('timeout') || + error.message.includes('network') || + error.message.includes('fetch') || + /HTTP 5\d\d/.test(error.message); + } + + private delay(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); + } +} + +// Usage examples +const tracker = new TypeScriptEventTracker('http://localhost:8000', 'ts-user-456', { + maxRetries: 5, + retryDelay: 500, + timeout: 8000, + onError: (error, event) => { + console.error('Tracking failed:', error.message); + // Could send to error reporting service + }, + onSuccess: (response) => { + console.log(\`Successfully tracked \${response.processed} events\`); + } +}); + +// Single event with type safety +const userEvent: TrackingEvent = { + event: 'feature_used', + properties: { + feature_name: 'advanced_search', + user_tier: 'premium', + search_query: 'typescript event tracking', + results_count: 42 + }, + timestamp: new Date().toISOString() +}; + +tracker.track(userEvent); + +// Batch events with type safety +const analyticsEvents: TrackingEvent[] = [ + { + event: 'page_load', + properties: { + page: '/analytics', + load_time: 1250, + user_agent: navigator.userAgent + } + }, + { + event: 'widget_interaction', + properties: { + widget_type: 'chart', + action: 'filter_applied', + filter_value: 'last_30_days' + } + } +]; + +tracker.trackBatch(analyticsEvents);`, + description: + "Enterprise-grade TypeScript implementation with comprehensive error handling, timeouts, and type safety", + }, +]; + +export const ERROR_CODES: ErrorCode[] = [ + { + code: 400, + message: "Bad Request", + description: "The request body is malformed or missing required fields", + solution: + "Ensure your request body is valid JSON and includes all required event properties", + }, + { + code: 401, + message: "Unauthorized", + description: "Missing or invalid user identification", + solution: + "Include a valid x-user-id, user-id, x-api-key, or authorization header", + }, + { + code: 405, + message: "Method Not Allowed", + description: "HTTP method not supported for this endpoint", + solution: "Use POST method for /api/track endpoint", + }, + { + code: 500, + message: "Internal Server Error", + description: "Server encountered an error processing the request", + solution: + "Check your request format and try again. Contact support if the issue persists", + }, +]; + +export const TROUBLESHOOTING_ITEMS: TroubleshootingItem[] = [ + { + problem: "CORS errors when tracking from browser", + solution: + "The API supports CORS for all origins. Ensure you're making the request to the correct endpoint and include proper headers.", + codeExample: `// Ensure proper headers are included +fetch('/api/track', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-user-id': 'your-user-id' + }, + body: JSON.stringify(eventData) +});`, + }, + { + problem: "401 Unauthorized error", + solution: + "User identification is required. Include one of the supported user ID headers.", + codeExample: `// Any of these headers will work: +headers: { + 'x-user-id': 'user-123', // Preferred + 'user-id': 'user-123', // Alternative + 'x-api-key': 'api-key-456', // For API keys + 'authorization': 'Bearer token' // For JWT tokens +}`, + }, + { + problem: "Events not appearing in dashboard", + solution: + "Ensure you're using the same user ID for tracking and dashboard viewing. Events are user-specific.", + codeExample: `// Make sure the user ID is consistent +const userId = 'user-123'; +// Use the same userId for both tracking and dashboard access`, + }, + { + problem: "Large payload errors", + solution: + "Break large batches into smaller chunks. The API supports batch processing but has size limits.", + codeExample: `// Process events in batches of 100 +const batchSize = 100; +for (let i = 0; i < events.length; i += batchSize) { + const batch = events.slice(i, i + batchSize); + await trackEvents(batch); +}`, + }, + { + problem: "Network timeout issues", + solution: + "Implement retry logic with exponential backoff for network resilience.", + codeExample: `async function trackWithRetry(eventData, maxRetries = 3) { + for (let i = 0; i < maxRetries; i++) { + try { + return await trackEvent(eventData); + } catch (error) { + if (i === maxRetries - 1) throw error; + await new Promise(resolve => + setTimeout(resolve, Math.pow(2, i) * 1000) + ); + } + } +}`, + }, +]; + +export const API_ENDPOINTS: ApiEndpoint[] = [ + { + method: "POST", + path: "/api/track", + description: "Track single or multiple events for a specific user", + requestBody: { + single: { + event: "string", + properties: "Record", + timestamp: "string (optional)", + }, + batch: [ + { + event: "string", + properties: "Record", + timestamp: "string (optional)", + }, + ], + }, + responseBody: { + success: true, + processed: "number", + userId: "string", + }, + statusCodes: [ + { code: 200, description: "Events successfully processed" }, + { code: 400, description: "Invalid request body or user ID" }, + { code: 401, description: "Missing user identification" }, + { code: 500, description: "Server error during processing" }, + ], + headers: [ + { + name: "x-user-id", + required: true, + description: "User identifier (preferred)", + }, + { + name: "user-id", + required: false, + description: "Alternative user identifier", + }, + { + name: "x-api-key", + required: false, + description: "API key as user identifier", + }, + { + name: "authorization", + required: false, + description: "Bearer token as user identifier", + }, + { + name: "Content-Type", + required: true, + description: "Must be application/json", + }, + ], + }, +]; + +// Main documentation content structure +export const DOCUMENTATION_CONTENT: DocumentationContent = { + title: "Event Tracking API Documentation", + description: + "A high-performance event tracking and analytics service built with Fresh and ClickHouse. Track user events, analyze behavior, and monitor your application in real-time.", + quickStart: API_EXAMPLES, + apiEndpoints: API_ENDPOINTS, + integrationExamples: INTEGRATION_EXAMPLES, + errorCodes: ERROR_CODES, + troubleshooting: TROUBLESHOOTING_ITEMS, +}; diff --git a/src/migration.ts b/src/migration.ts new file mode 100644 index 0000000..bef7426 --- /dev/null +++ b/src/migration.ts @@ -0,0 +1,651 @@ +import { ClickHouseService } from "./clickhouse.ts"; +import { ClickHouseConfig, EventData } from "./types.ts"; +import { AuthMigrationService } from "./auth-migration.ts"; + +export interface MigrationResult { + success: boolean; + migratedUsers: string[]; + totalEvents: number; + migratedEvents: number; + errors: string[]; + duration: number; +} + +export interface ValidationResult { + valid: boolean; + totalEventsInSharedTable: number; + totalEventsInUserTables: number; + userTableCounts: Record; + missingEvents: number; + errors: string[]; +} + +export interface MigrationProgress { + currentUser: string; + processedUsers: number; + totalUsers: number; + processedEvents: number; + totalEvents: number; + startTime: number; + estimatedTimeRemaining?: number; +} + +/** + * MigrationService handles the migration of data from shared tables to user-specific tables + * Provides data integrity validation and rollback capabilities + */ +export class MigrationService { + private clickhouse: ClickHouseService; + private config: ClickHouseConfig; + private batchSize: number = 1000; + private progressCallback?: (progress: MigrationProgress) => void; + + constructor(clickhouse: ClickHouseService) { + this.clickhouse = clickhouse; + this.config = clickhouse.config; + } + + /** + * Sets the batch size for migration operations + */ + setBatchSize(size: number): void { + if (size <= 0) { + throw new Error("Batch size must be greater than 0"); + } + this.batchSize = size; + } + + /** + * Sets a progress callback function to monitor migration progress + */ + setProgressCallback(callback: (progress: MigrationProgress) => void): void { + this.progressCallback = callback; + } + + /** + * Main migration method that transfers data from shared table to user-specific tables + * Preserves all historical events and maintains data integrity + */ + async migrateToUserTables(): Promise { + const startTime = Date.now(); + const result: MigrationResult = { + success: false, + migratedUsers: [], + totalEvents: 0, + migratedEvents: 0, + errors: [], + duration: 0, + }; + + try { + console.log( + "Starting migration from shared table to user-specific tables...", + ); + + // Step 1: Validate prerequisites + await this.validatePrerequisites(); + + // Step 2: Get unique user IDs from shared table + const userIds = await this.getUniqueUserIds(); + console.log(`Found ${userIds.length} unique users to migrate`); + + if (userIds.length === 0) { + console.log("No users found in shared table, migration complete"); + result.success = true; + result.duration = Date.now() - startTime; + return result; + } + + // Step 3: Get total event count for progress tracking + result.totalEvents = await this.getTotalEventCount(); + console.log(`Total events to migrate: ${result.totalEvents}`); + + // Step 4: Migrate data for each user + let processedEvents = 0; + for (let i = 0; i < userIds.length; i++) { + const userId = userIds[i]; + + try { + console.log(`Migrating user ${i + 1}/${userIds.length}: ${userId}`); + + // Report progress + if (this.progressCallback) { + this.progressCallback({ + currentUser: userId, + processedUsers: i, + totalUsers: userIds.length, + processedEvents, + totalEvents: result.totalEvents, + startTime, + estimatedTimeRemaining: this.calculateEstimatedTime( + startTime, + i, + userIds.length, + ), + }); + } + + const migratedCount = await this.migrateUserData(userId); + result.migratedUsers.push(userId); + processedEvents += migratedCount; + result.migratedEvents += migratedCount; + + console.log( + `Successfully migrated ${migratedCount} events for user: ${userId}`, + ); + } catch (error) { + const errorMessage = error instanceof Error + ? error.message + : String(error); + const userError = + `Failed to migrate user '${userId}': ${errorMessage}`; + console.error(userError); + result.errors.push(userError); + } + } + + // Step 5: Validate migration integrity + console.log("Validating migration integrity..."); + const validation = await this.validateMigration(); + + if (!validation.valid) { + result.errors.push("Migration validation failed"); + result.errors.push(...validation.errors); + throw new Error("Migration validation failed"); + } + + result.success = true; + console.log( + `Migration completed successfully. Migrated ${result.migratedEvents} events for ${result.migratedUsers.length} users`, + ); + } catch (error) { + const errorMessage = error instanceof Error + ? error.message + : String(error); + console.error("Migration failed:", errorMessage); + result.errors.push(errorMessage); + result.success = false; + } finally { + result.duration = Date.now() - startTime; + } + + return result; + } + + /** + * Validates that all prerequisites for migration are met + */ + private async validatePrerequisites(): Promise { + // Check if user database exists + const userDatabase = this.config.userDatabase || "user_events"; + try { + await this.clickhouse.queryDatabase(userDatabase, "SELECT 1 LIMIT 1"); + } catch (error) { + throw new Error( + `User database '${userDatabase}' is not accessible. Please ensure it exists and is properly configured.`, + ); + } + + // Check if shared table exists and has user_id column + const sharedTableName = (this.config as any).tableName || "events"; + const systemDatabase = this.config.systemDatabase || this.config.database || + "default"; + + try { + const columns = await this.clickhouse.queryDatabaseJSON( + "system", + `SELECT name FROM system.columns WHERE database = '${systemDatabase}' AND table = '${sharedTableName}'`, + ); + + const columnNames = columns.map((col) => col.name); + if (!columnNames.includes("user_id")) { + throw new Error( + `Shared table '${sharedTableName}' does not have a 'user_id' column. Migration requires this column to identify user data.`, + ); + } + } catch (error) { + if (error instanceof Error && error.message.includes("user_id")) { + throw error; + } + throw new Error( + `Cannot access shared table '${sharedTableName}' in database '${systemDatabase}'. Please verify the table exists.`, + ); + } + } + + /** + * Gets unique user IDs from the shared events table + */ + private async getUniqueUserIds(): Promise { + const sharedTableName = (this.config as any).tableName || "events"; + const systemDatabase = this.config.systemDatabase || this.config.database || + "default"; + + try { + const query = ` + SELECT DISTINCT user_id + FROM ${systemDatabase}.${sharedTableName} + WHERE user_id IS NOT NULL AND user_id != '' + ORDER BY user_id + `; + + const result = await this.clickhouse.queryDatabaseJSON( + systemDatabase, + query, + ); + return result + .map((row) => row.user_id) + .filter((id) => id && typeof id === "string"); + } catch (error) { + const errorMessage = error instanceof Error + ? error.message + : String(error); + throw new Error(`Failed to get unique user IDs: ${errorMessage}`); + } + } + + /** + * Gets the total count of events in the shared table + */ + private async getTotalEventCount(): Promise { + const sharedTableName = (this.config as any).tableName || "events"; + const systemDatabase = this.config.systemDatabase || this.config.database || + "default"; + + try { + const query = + `SELECT count() as total FROM ${systemDatabase}.${sharedTableName} WHERE user_id IS NOT NULL AND user_id != ''`; + const result = await this.clickhouse.queryDatabaseJSON( + systemDatabase, + query, + ); + return result[0]?.total || 0; + } catch (error) { + const errorMessage = error instanceof Error + ? error.message + : String(error); + throw new Error(`Failed to get total event count: ${errorMessage}`); + } + } + + /** + * Migrates data for a specific user from shared table to their user-specific table + */ + private async migrateUserData(userId: string): Promise { + if (!userId || typeof userId !== "string") { + throw new Error("User ID must be a non-empty string"); + } + + // Ensure user table exists + await this.clickhouse.ensureUserTable(userId); + + const sharedTableName = (this.config as any).tableName || "events"; + const systemDatabase = this.config.systemDatabase || this.config.database || + "default"; + const userTableName = this.clickhouse.getUserTableName(userId); + const userDatabase = this.config.userDatabase || "user_events"; + + let totalMigrated = 0; + let offset = 0; + + try { + while (true) { + // Get batch of events for this user + // Use parameterized query to prevent SQL injection + const selectQuery = ` + SELECT id, timestamp, data, source, ip, user_agent, created_at + FROM ${systemDatabase}.${sharedTableName} + WHERE user_id = {userId:String} + ORDER BY timestamp, id + LIMIT ${this.batchSize} OFFSET ${offset} + `; + + const events = await this.clickhouse.queryDatabaseJSON( + systemDatabase, + selectQuery, + { userId }, + ); + + if (events.length === 0) { + break; // No more events for this user + } + + // Insert events into user-specific table + if (events.length > 0) { + try { + // Use the ClickHouse service's insert method by creating a batch insert query + const insertQuery = + `INSERT INTO ${userDatabase}.${userTableName} FORMAT JSONEachRow`; + const eventData = events + .map((event) => + JSON.stringify({ + id: event.id, + timestamp: event.timestamp, + data: typeof event.data === "string" + ? event.data + : JSON.stringify(event.data), + source: event.source || "", + ip: event.ip || "", + user_agent: event.user_agent || "", + created_at: event.created_at, + }) + ) + .join("\n"); + + // Use direct HTTP call for batch insert (this is how ClickHouse batch inserts work) + const url = + `http://${this.config.host}:${this.config.port}/?query=${ + encodeURIComponent( + insertQuery, + ) + }`; + + const response = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + ...this.getAuthHeaders(), + }, + body: eventData, + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error( + `Failed to insert batch for user '${userId}': ${response.statusText} - ${errorText}`, + ); + } + + totalMigrated += events.length; + } catch (error) { + const errorMessage = error instanceof Error + ? error.message + : String(error); + throw new Error( + `Failed to insert events for user '${userId}': ${errorMessage}`, + ); + } + } + + offset += this.batchSize; + } + + return totalMigrated; + } catch (error) { + const errorMessage = error instanceof Error + ? error.message + : String(error); + throw new Error( + `Failed to migrate data for user '${userId}': ${errorMessage}`, + ); + } + } + + /** + * Validates the integrity of the migration by comparing event counts + */ + async validateMigration(): Promise { + const result: ValidationResult = { + valid: false, + totalEventsInSharedTable: 0, + totalEventsInUserTables: 0, + userTableCounts: {}, + missingEvents: 0, + errors: [], + }; + + try { + // Get total events in shared table + result.totalEventsInSharedTable = await this.getTotalEventCount(); + + // Get unique user IDs + const userIds = await this.getUniqueUserIds(); + + // Count events in each user table + let totalInUserTables = 0; + for (const userId of userIds) { + try { + const userTableName = this.clickhouse.getUserTableName(userId); + const userDatabase = this.config.userDatabase || "user_events"; + + const countQuery = + `SELECT count() as total FROM ${userDatabase}.${userTableName}`; + const countResult = await this.clickhouse.queryDatabaseJSON( + userDatabase, + countQuery, + ); + const userEventCount = countResult[0]?.total || 0; + + result.userTableCounts[userId] = userEventCount; + totalInUserTables += userEventCount; + } catch (error) { + const errorMessage = error instanceof Error + ? error.message + : String(error); + result.errors.push( + `Failed to count events for user '${userId}': ${errorMessage}`, + ); + } + } + + result.totalEventsInUserTables = totalInUserTables; + result.missingEvents = result.totalEventsInSharedTable - + result.totalEventsInUserTables; + + // Validation passes if event counts match + result.valid = result.missingEvents === 0 && result.errors.length === 0; + + if (!result.valid) { + if (result.missingEvents > 0) { + result.errors.push( + `Missing ${result.missingEvents} events in user tables`, + ); + } else if (result.missingEvents < 0) { + result.errors.push( + `Extra ${Math.abs(result.missingEvents)} events in user tables`, + ); + } + } + } catch (error) { + const errorMessage = error instanceof Error + ? error.message + : String(error); + result.errors.push(`Validation failed: ${errorMessage}`); + } + + return result; + } + + /** + * Verifies data integrity for a specific user by comparing sample data + */ + async verifyUserDataIntegrity(userId: string): Promise { + if (!userId || typeof userId !== "string") { + throw new Error("User ID must be a non-empty string"); + } + + try { + const sharedTableName = (this.config as any).tableName || "events"; + const systemDatabase = this.config.systemDatabase || + this.config.database || "default"; + const userTableName = this.clickhouse.getUserTableName(userId); + const userDatabase = this.config.userDatabase || "user_events"; + + // Get sample of events from shared table + // Use parameterized query to prevent SQL injection + const sharedQuery = ` + SELECT id, timestamp, data, source, ip, user_agent + FROM ${systemDatabase}.${sharedTableName} + WHERE user_id = {userId:String} + ORDER BY timestamp, id + LIMIT 100 + `; + + // Get sample of events from user table + const userQuery = ` + SELECT id, timestamp, data, source, ip, user_agent + FROM ${userDatabase}.${userTableName} + ORDER BY timestamp, id + LIMIT 100 + `; + + const [sharedEvents, userEvents] = await Promise.all([ + this.clickhouse.queryDatabaseJSON(systemDatabase, sharedQuery, { + userId, + }), + this.clickhouse.queryDatabaseJSON(userDatabase, userQuery), + ]); + + // Compare sample data + if (sharedEvents.length !== userEvents.length) { + console.warn( + `Sample size mismatch for user '${userId}': shared=${sharedEvents.length}, user=${userEvents.length}`, + ); + return false; + } + + for (let i = 0; i < sharedEvents.length; i++) { + const shared = sharedEvents[i]; + const user = userEvents[i]; + + if ( + shared.id !== user.id || + shared.timestamp !== user.timestamp || + shared.source !== user.source || + shared.ip !== user.ip || + shared.user_agent !== user.user_agent + ) { + console.warn(`Data mismatch for user '${userId}' at index ${i}`); + return false; + } + + // Compare data field (handle JSON string vs object) + const sharedData = typeof shared.data === "string" + ? shared.data + : JSON.stringify(shared.data); + const userData = typeof user.data === "string" + ? user.data + : JSON.stringify(user.data); + + if (sharedData !== userData) { + console.warn( + `Data field mismatch for user '${userId}' at index ${i}`, + ); + return false; + } + } + + return true; + } catch (error) { + const errorMessage = error instanceof Error + ? error.message + : String(error); + console.error( + `Failed to verify data integrity for user '${userId}': ${errorMessage}`, + ); + return false; + } + } + + /** + * Rollback migration by dropping all user tables (use with caution) + */ + async rollbackMigration(): Promise { + console.warn( + "Starting migration rollback - this will drop all user tables!", + ); + + try { + const userIds = await this.getUniqueUserIds(); + const userDatabase = this.config.userDatabase || "user_events"; + + for (const userId of userIds) { + try { + const userTableName = this.clickhouse.getUserTableName(userId); + const dropQuery = + `DROP TABLE IF EXISTS ${userDatabase}.${userTableName}`; + await this.clickhouse.queryDatabase(userDatabase, dropQuery); + console.log(`Dropped user table: ${userTableName}`); + } catch (error) { + const errorMessage = error instanceof Error + ? error.message + : String(error); + console.error( + `Failed to drop table for user '${userId}': ${errorMessage}`, + ); + } + } + + console.log("Migration rollback completed"); + } catch (error) { + const errorMessage = error instanceof Error + ? error.message + : String(error); + throw new Error(`Rollback failed: ${errorMessage}`); + } + } + + /** + * Lists all user tables in the user database + */ + async listUserTables(): Promise { + try { + const userDatabase = this.config.userDatabase || "user_events"; + const tablePrefix = this.config.tablePrefix || "events_user"; + + const query = ` + SELECT name + FROM system.tables + WHERE database = '${userDatabase}' + AND name LIKE '${tablePrefix}_%' + ORDER BY name + `; + + const result = await this.clickhouse.queryDatabaseJSON("system", query); + return result.map((row) => row.name); + } catch (error) { + const errorMessage = error instanceof Error + ? error.message + : String(error); + throw new Error(`Failed to list user tables: ${errorMessage}`); + } + } + + /** + * Gets authentication headers for ClickHouse requests + */ + private getAuthHeaders(): Record { + const headers: Record = {}; + + if (this.config.username && this.config.password) { + const credentials = btoa( + `${this.config.username}:${this.config.password}`, + ); + headers["Authorization"] = `Basic ${credentials}`; + } else if (this.config.username) { + headers["X-ClickHouse-User"] = this.config.username; + if (this.config.password) { + headers["X-ClickHouse-Key"] = this.config.password; + } + } + + return headers; + } + + /** + * Calculates estimated time remaining for migration + */ + private calculateEstimatedTime( + startTime: number, + processed: number, + total: number, + ): number | undefined { + if (processed === 0) return undefined; + + const elapsed = Date.now() - startTime; + const rate = processed / elapsed; + const remaining = total - processed; + + return remaining / rate; + } +} diff --git a/src/types.ts b/src/types.ts new file mode 100644 index 0000000..f5ac7e9 --- /dev/null +++ b/src/types.ts @@ -0,0 +1,147 @@ +/// + +export interface EventData { + id?: string; + timestamp?: string | Date; + data: Record; + source?: string; + ip?: string; + user_agent?: string; +} + +export interface ClickHouseConfig { + host: string; + port: number; + // Legacy properties for backward compatibility + database?: string; + tableName?: string; + // New properties for user separation + systemDatabase?: string; // For system tables + userDatabase?: string; // For user-specific tables + username: string; + password: string; + tablePrefix?: string; // Default: "events_user" +} + +export interface UserTableConfig { + userId: string; + tableName: string; + database: string; +} + +export interface UserContext { + userId: string; + tableName: string; + database: string; +} + +export interface DashboardStats { + totalEvents: number; + eventsToday: number; + eventsLastHour: number; + dataSize: string; + recentEvents: EventData[]; +} + +// Authentication types +export interface User { + id: string; + email: string; + created_at: string; + updated_at: string; +} + +export interface ApiKey { + id: string; + user_id: string; + name: string; + key?: string; // Only returned on creation + created_at: string; + last_used_at?: string; +} + +export interface Session { + id: string; + user_id: string; + created_at: Date; + expires_at: Date; +} + +// Request/Response types for authentication +export interface CreateUserRequest { + email: string; + password: string; +} + +export interface SignInRequest { + email: string; + password: string; +} + +export interface CreateApiKeyRequest { + name?: string; +} + +export interface UpdateApiKeyRequest { + name: string; +} + +// Authentication response types +export interface AuthResponse { + success: boolean; + message?: string; + user?: User; +} + +export interface ApiKeyResponse { + success: boolean; + message?: string; + apiKey?: ApiKey; +} + +export interface ApiKeysListResponse { + success: boolean; + message?: string; + apiKeys?: Omit[]; +} + +// Middleware state for authentication +export interface AuthState { + user?: User; + session?: Session; +} + +// Database row types (internal use) +export interface UserRow { + id: string; + email: string; + password_hash: string; + created_at: string; + updated_at: string; +} + +export interface ApiKeyRow { + id: string; + user_id: string; + key_hash: string; + name: string; + created_at: string; + last_used_at?: string; +} + +export function getClickHouseConfig(): ClickHouseConfig { + const config = { + host: Deno.env.get("CLICKHOUSE_HOST") || "localhost", + port: parseInt(Deno.env.get("CLICKHOUSE_PORT") || "8123"), + systemDatabase: Deno.env.get("CLICKHOUSE_SYSTEM_DATABASE") || "default", + userDatabase: Deno.env.get("CLICKHOUSE_USER_DATABASE") || "user_events", + username: Deno.env.get("CLICKHOUSE_USER") || "default", + password: Deno.env.get("CLICKHOUSE_PASSWORD") || "", + tablePrefix: Deno.env.get("CLICKHOUSE_TABLE_PREFIX") || "events_user", + }; + + if (Deno.env.get("DEBUG") === "true") { + console.debug("ClickHouse config:", config); + } + return config; +}