diff --git a/apps/aggregator/.env.example b/apps/aggregator/.env.example index 67e0b3c..4d3cf72 100644 --- a/apps/aggregator/.env.example +++ b/apps/aggregator/.env.example @@ -5,8 +5,23 @@ PORT=3001 INGESTOR_WS_URL=ws://localhost:3000 INGESTOR_HTTP_URL=http://localhost:3000 -# Redis (optional; used for caching/session when set; health check verifies connectivity) +# Redis Storage Configuration +# When REDIS_URL is set, StorageService persists aggregated prices to Redis. +# When unset, StorageService runs in no-op mode (prices are only held in memory). # REDIS_URL=redis://localhost:6379 +# TTL in seconds for the latest price key (price:{prefix}:latest:{SYMBOL}). +# After this many seconds the key expires automatically. Default: 300 (5 minutes). +# REDIS_PRICE_TTL_SECONDS=300 + +# Maximum number of historical price entries retained per symbol in the sorted set. +# Entries older than this limit are pruned automatically on each write. Default: 100. +# REDIS_HISTORY_MAX_ENTRIES=100 + +# Prefix for all Redis keys. Useful for namespace isolation in shared Redis instances. +# Keys will be: {prefix}:latest:{SYMBOL}, {prefix}:history:{SYMBOL}, {prefix}:symbols +# Default: price +# REDIS_KEY_PREFIX=price + # Signer Service (for publishing aggregated data) SIGNER_URL=http://localhost:3002 diff --git a/apps/aggregator/README.md b/apps/aggregator/README.md index 9c054d7..9272501 100644 --- a/apps/aggregator/README.md +++ b/apps/aggregator/README.md @@ -34,7 +34,7 @@ The Aggregator service is responsible for calculating a single consensus price p | `/ready` | GET | Readiness probe for Kubernetes. Same checks as `/health`; returns 200 when the service can accept traffic. | | `/live` | GET | Liveness probe for Kubernetes. Returns 200 when the process is alive (no dependency checks). | | `/status` | GET | Detailed system information: uptime, memory usage, dependency check results, and version. | -| `/metrics` | GET | Prometheus metrics in [exposition format](https://prometheus.io/docs/instrumenting/exposition_formats/). Scrape this endpoint for aggregation count, latency, errors, and default Node.js metrics. | +| `/metrics` | GET | Prometheus metrics in [exposition format](https://prometheus.io/docs/instrumenting/exposition_formats/). Scrape this endpoint for aggregation count, latency, errors, storage cache hits/misses, and default Node.js metrics. | | `/debug/prices` | GET | Last aggregated and normalized prices held in memory. Useful for debugging without hitting external systems. | **Health checks**: When `REDIS_URL` or `INGESTOR_URL` are set, the health check verifies connectivity. If a configured dependency is unreachable, `/health` and `/ready` return 503. If not set, that dependency is skipped (not included in the check). @@ -48,14 +48,21 @@ aggregator/ │ │ ├── normalized-price.interface.ts │ │ ├── aggregated-price.interface.ts │ │ ├── aggregator.interface.ts -│ │ └── aggregation-config.interface.ts +│ │ ├── aggregation-config.interface.ts +│ │ └── storage-options.interface.ts │ ├── strategies/ │ │ └── aggregators/ # Aggregation strategy implementations │ │ ├── weighted-average.aggregator.ts │ │ ├── median.aggregator.ts │ │ └── trimmed-mean.aggregator.ts │ ├── services/ -│ │ └── aggregation.service.ts # Main aggregation service +│ │ ├── aggregation.service.ts # Main aggregation service +│ │ ├── data-reception.service.ts +│ │ ├── normalization.service.ts +│ │ └── storage.service.ts # Redis persistence layer +│ ├── modules/ +│ │ ├── normalization.module.ts +│ │ └── storage.module.ts # Redis storage module │ ├── health/ # Health checks (Terminus) │ │ ├── health.controller.ts │ │ └── indicators/ @@ -68,10 +75,61 @@ aggregator/ │ │ ├── debug.controller.ts │ │ └── debug.service.ts │ ├── config/ -│ │ └── source-weights.config.ts # Weight configuration +│ │ ├── source-weights.config.ts # Weight configuration +│ │ └── redis.config.ts # Redis config factory │ └── app.module.ts ``` +## Redis Storage + +The aggregator uses Redis to persist aggregated prices for fast access by downstream services (e.g., the API service). Redis is **optional** — when `REDIS_URL` is not set, the service runs in no-op mode and all storage operations are silently skipped. + +### Quick Start + +```bash +# Start Redis with Docker +docker compose up -d redis + +# Add to apps/aggregator/.env +REDIS_URL=redis://localhost:6379 +``` + +### Redis Key Schema + +| Key Pattern | Type | TTL | Purpose | +|-------------|------|-----|---------| +| `price:latest:{SYMBOL}` | STRING (JSON) | `REDIS_PRICE_TTL_SECONDS` (default 300 s) | Latest `AggregatedPrice` per symbol, expires automatically | +| `price:history:{SYMBOL}` | SORTED SET, score = `computedAt` ms | None (bounded by max-entries trim) | Time-series history, up to `REDIS_HISTORY_MAX_ENTRIES` entries | +| `price:symbols` | SET | None | Index of all symbols that have been stored | + +The prefix `price` can be overridden with `REDIS_KEY_PREFIX` for namespace isolation in shared Redis instances. + +### Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `REDIS_URL` | _(unset)_ | Redis connection URL. When absent, StorageService runs in no-op mode. | +| `REDIS_PRICE_TTL_SECONDS` | `300` | Seconds before a `latest` price key expires. Default is 5 minutes. | +| `REDIS_HISTORY_MAX_ENTRIES` | `100` | Maximum history entries kept per symbol. Older entries are pruned automatically on write. | +| `REDIS_KEY_PREFIX` | `price` | Prefix for all Redis keys. Useful for separating environments. | + +### Graceful Degradation + +When `REDIS_URL` is not configured: +- `StorageService.onModuleInit()` logs a warning and sets `client = null`. +- All public methods (`storePrice`, `getLatestPrice`, `getPriceHistory`, `storePriceBatch`, `getLatestPriceBatch`, `getTrackedSymbols`, `deleteSymbol`) return safe empty values immediately. +- The service logs and never throws — a Redis failure never interrupts price aggregation. + +### Prometheus Metrics (Storage) + +| Metric | Type | Description | +|--------|------|-------------| +| `aggregator_storage_cache_hits_total` | Counter | Total Redis read cache hits | +| `aggregator_storage_cache_misses_total` | Counter | Total Redis read cache misses | +| `aggregator_storage_operation_duration_seconds` | Histogram (labels: `operation`) | Duration of Redis `read`/`write` operations in seconds | + +These metrics are available at `GET /metrics` alongside existing aggregation metrics. + ## Aggregation Methods ### Weighted Average @@ -387,6 +445,7 @@ npm run test:cov All components have comprehensive test coverage (>85%): - ✅ `aggregation.service.spec.ts` - 50+ test cases +- ✅ `storage.service.spec.ts` - 40+ test cases (mocked ioredis) - ✅ `weighted-average.aggregator.spec.ts` - 15+ test cases - ✅ `median.aggregator.spec.ts` - 18+ test cases - ✅ `trimmed-mean.aggregator.spec.ts` - 20+ test cases diff --git a/apps/aggregator/package.json b/apps/aggregator/package.json index 463083e..ce9048a 100644 --- a/apps/aggregator/package.json +++ b/apps/aggregator/package.json @@ -23,10 +23,9 @@ "@nestjs/platform-express": "^10.0.0", "@oracle-stocks/shared": "*", "@nestjs/terminus": "^10.0.0", - "axios": "^1.6.0", + "axios": "^1.13.4", "ioredis": "^5.3.2", "prom-client": "^15.1.0", - "axios": "^1.13.4", "class-transformer": "^0.5.1", "class-validator": "^0.14.3", "reflect-metadata": "^0.1.13", diff --git a/apps/aggregator/src/app.module.ts b/apps/aggregator/src/app.module.ts index 071749a..03d141d 100644 --- a/apps/aggregator/src/app.module.ts +++ b/apps/aggregator/src/app.module.ts @@ -1,24 +1,26 @@ import { Module } from '@nestjs/common'; -import { NormalizationModule } from './modules/normalization.module'; +import { HttpModule } from '@nestjs/axios'; import { ConfigModule } from '@nestjs/config'; +import { EventEmitterModule } from '@nestjs/event-emitter'; +import { NormalizationModule } from './modules/normalization.module'; +import { StorageModule } from './modules/storage.module'; +import { HealthModule } from './health/health.module'; +import { MetricsModule } from './metrics/metrics.module'; +import { DebugModule } from './debug/debug.module'; +import { DataReceptionService } from './services/data-reception.service'; import { AggregationService } from './services/aggregation.service'; import { WeightedAverageAggregator } from './strategies/aggregators/weighted-average.aggregator'; import { MedianAggregator } from './strategies/aggregators/median.aggregator'; import { TrimmedMeanAggregator } from './strategies/aggregators/trimmed-mean.aggregator'; -import { HealthModule } from './health/health.module'; -import { MetricsModule } from './metrics/metrics.module'; -import { DebugModule } from './debug/debug.module'; @Module({ imports: [ - NormalizationModule, ConfigModule.forRoot({ isGlobal: true, envFilePath: '.env' }), + NormalizationModule, HealthModule, MetricsModule, DebugModule, - ConfigModule.forRoot({ - isGlobal: true, - }), + StorageModule, HttpModule, EventEmitterModule.forRoot(), ], @@ -32,4 +34,4 @@ import { DebugModule } from './debug/debug.module'; ], exports: [AggregationService], }) -export class AppModule { } +export class AppModule {} diff --git a/apps/aggregator/src/config/redis.config.ts b/apps/aggregator/src/config/redis.config.ts new file mode 100644 index 0000000..99b938c --- /dev/null +++ b/apps/aggregator/src/config/redis.config.ts @@ -0,0 +1,14 @@ +import { registerAs } from '@nestjs/config'; + +/** + * Redis configuration factory. + * Reads REDIS_* environment variables and exposes them as a typed config namespace. + * + * Usage: configService.get>('redis') + */ +export const redisConfig = registerAs('redis', () => ({ + url: process.env.REDIS_URL ?? '', + priceTtlSeconds: parseInt(process.env.REDIS_PRICE_TTL_SECONDS ?? '300', 10), + historyMaxEntries: parseInt(process.env.REDIS_HISTORY_MAX_ENTRIES ?? '100', 10), + keyPrefix: process.env.REDIS_KEY_PREFIX ?? 'price', +})); diff --git a/apps/aggregator/src/interfaces/storage-options.interface.ts b/apps/aggregator/src/interfaces/storage-options.interface.ts new file mode 100644 index 0000000..682f7f1 --- /dev/null +++ b/apps/aggregator/src/interfaces/storage-options.interface.ts @@ -0,0 +1,33 @@ +/** + * Options for StorageModule / StorageService configuration. + * All fields are optional; defaults are safe for a local Redis instance. + */ +export interface StorageModuleOptions { + /** Redis connection URL. If absent, StorageService runs in no-op mode. */ + redisUrl?: string; + + /** TTL in seconds for price:latest keys. Default: 300 (5 minutes). */ + priceTtlSeconds?: number; + + /** + * Maximum number of history entries kept per symbol in the Sorted Set. + * Older entries are removed by ZREMRANGEBYRANK after each write. + * Default: 100. + */ + historyMaxEntries?: number; + + /** + * Prefix for all Redis keys. Default: "price". + * Useful for namespace isolation in shared Redis instances. + * Keys will be: {prefix}:latest:{SYMBOL}, {prefix}:history:{SYMBOL}, {prefix}:symbols + */ + keyPrefix?: string; +} + +/** Resolved, fully-defaulted options used internally by StorageService. */ +export interface ResolvedStorageOptions { + redisUrl: string; + priceTtlSeconds: number; + historyMaxEntries: number; + keyPrefix: string; +} diff --git a/apps/aggregator/src/metrics/metrics.service.ts b/apps/aggregator/src/metrics/metrics.service.ts index 4d0e8c3..2ac3344 100644 --- a/apps/aggregator/src/metrics/metrics.service.ts +++ b/apps/aggregator/src/metrics/metrics.service.ts @@ -26,6 +26,15 @@ export class MetricsService { /** Throughput: aggregations per symbol (optional dimension) */ readonly aggregationsBySymbol: Counter; + /** Number of cache hits on Redis price reads */ + readonly storageCacheHits: Counter; + + /** Number of cache misses on Redis price reads */ + readonly storageCacheMisses: Counter; + + /** Storage operation duration (read/write) in seconds */ + readonly storageOperationDuration: Histogram; + constructor() { this.register = new Registry(); this.aggregationCount = new Counter({ @@ -53,6 +62,23 @@ export class MetricsService { labelNames: ['symbol', 'method'], registers: [this.register], }); + this.storageCacheHits = new Counter({ + name: 'aggregator_storage_cache_hits_total', + help: 'Total cache hits on Redis price reads', + registers: [this.register], + }); + this.storageCacheMisses = new Counter({ + name: 'aggregator_storage_cache_misses_total', + help: 'Total cache misses on Redis price reads', + registers: [this.register], + }); + this.storageOperationDuration = new Histogram({ + name: 'aggregator_storage_operation_duration_seconds', + help: 'Storage (Redis) operation duration in seconds', + labelNames: ['operation'], + buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1], + registers: [this.register], + }); collectDefaultMetrics({ register: this.register, prefix: 'aggregator_' }); } diff --git a/apps/aggregator/src/modules/storage.module.ts b/apps/aggregator/src/modules/storage.module.ts new file mode 100644 index 0000000..cbb8140 --- /dev/null +++ b/apps/aggregator/src/modules/storage.module.ts @@ -0,0 +1,20 @@ +import { Module } from '@nestjs/common'; +import { ConfigModule } from '@nestjs/config'; +import { StorageService } from '../services/storage.service'; +import { MetricsModule } from '../metrics/metrics.module'; + +/** + * StorageModule + * + * Provides and exports StorageService for Redis-backed price persistence. + * Import this module wherever StorageService is needed. + */ +@Module({ + imports: [ + ConfigModule, // StorageService reads REDIS_* vars via ConfigService + MetricsModule, // StorageService optionally injects MetricsService + ], + providers: [StorageService], + exports: [StorageService], +}) +export class StorageModule {} diff --git a/apps/aggregator/src/services/aggregation.service.ts b/apps/aggregator/src/services/aggregation.service.ts index 289ec81..a52bf42 100644 --- a/apps/aggregator/src/services/aggregation.service.ts +++ b/apps/aggregator/src/services/aggregation.service.ts @@ -8,6 +8,7 @@ import { TrimmedMeanAggregator } from '../strategies/aggregators/trimmed-mean.ag import { getSourceWeight } from '../config/source-weights.config'; import { MetricsService } from '../metrics/metrics.service'; import { DebugService } from '../debug/debug.service'; +import { StorageService } from './storage.service'; /** * Configuration options for the aggregation service @@ -43,6 +44,7 @@ export class AggregationService { constructor( @Optional() private readonly metricsService?: MetricsService, @Optional() private readonly debugService?: DebugService, + @Optional() private readonly storageService?: StorageService, ) { // Initialize all aggregation strategies this.aggregators = new Map(); @@ -150,6 +152,9 @@ export class AggregationService { symbol, (Date.now() - startTime) / 1000, ); + this.storageService?.storePrice(result).catch((err) => + this.logger.error(`StorageService.storePrice failed: ${err.message}`), + ); return result; } catch (err) { this.metricsService?.recordError(method); diff --git a/apps/aggregator/src/services/storage.service.spec.ts b/apps/aggregator/src/services/storage.service.spec.ts new file mode 100644 index 0000000..061b0ab --- /dev/null +++ b/apps/aggregator/src/services/storage.service.spec.ts @@ -0,0 +1,662 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { ConfigService } from '@nestjs/config'; +import { StorageService } from './storage.service'; +import { MetricsService } from '../metrics/metrics.service'; +import { AggregatedPrice } from '../interfaces/aggregated-price.interface'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeMockAggregatedPrice(symbol = 'AAPL'): AggregatedPrice { + return { + symbol, + price: 182.35, + method: 'weighted-average', + confidence: 88, + metrics: { + standardDeviation: 0.12, + spread: 0.5, + sourceCount: 4, + variance: 0.0144, + }, + startTimestamp: 1700000000000, + endTimestamp: 1700000030000, + sources: ['finnhub', 'alpha_vantage'], + computedAt: 1700000030000, + }; +} + +// --------------------------------------------------------------------------- +// Mock pipeline and Redis client +// --------------------------------------------------------------------------- + +const mockPipelineExec = jest.fn().mockResolvedValue([]); + +const mockPipeline = { + set: jest.fn().mockReturnThis(), + zadd: jest.fn().mockReturnThis(), + zremrangebyrank: jest.fn().mockReturnThis(), + sadd: jest.fn().mockReturnThis(), + del: jest.fn().mockReturnThis(), + srem: jest.fn().mockReturnThis(), + exec: mockPipelineExec, +}; + +const mockOn = jest.fn(); +const mockDisconnect = jest.fn(); +const mockGet = jest.fn(); +const mockMget = jest.fn(); +const mockZrangebyscore = jest.fn(); +const mockSmembers = jest.fn(); +const mockPipelineFactory = jest.fn().mockReturnValue(mockPipeline); + +const mockRedisInstance = { + on: mockOn, + disconnect: mockDisconnect, + get: mockGet, + mget: mockMget, + zrangebyscore: mockZrangebyscore, + smembers: mockSmembers, + pipeline: mockPipelineFactory, +}; + +// Mock ioredis so StorageService receives our fake client. +// Must be declared before the module is loaded by the test runner. +// We use a factory that returns mockRedisInstance for every `new Redis(...)` call. +jest.mock('ioredis', () => { + // Track constructor calls for assertion in tests + const MockRedis = jest.fn().mockImplementation(() => mockRedisInstance); + (MockRedis as any).default = MockRedis; + return MockRedis; +}); + +// Capture constructor call args for retryStrategy tests +const RedisMock = jest.requireMock('ioredis'); + +// --------------------------------------------------------------------------- +// Mock MetricsService +// --------------------------------------------------------------------------- + +const mockCacheHitsInc = jest.fn(); +const mockCacheMissesInc = jest.fn(); +const mockDurationObserve = jest.fn(); + +const mockMetricsService = { + storageCacheHits: { inc: mockCacheHitsInc }, + storageCacheMisses: { inc: mockCacheMissesInc }, + storageOperationDuration: { observe: mockDurationObserve }, +}; + +// --------------------------------------------------------------------------- +// Factory helpers +// --------------------------------------------------------------------------- + +async function buildService(redisUrl: string | undefined): Promise { + const configGetFn = jest.fn((key: string): string | undefined => { + if (key === 'REDIS_URL') return redisUrl; + if (key === 'REDIS_PRICE_TTL_SECONDS') return '300'; + if (key === 'REDIS_HISTORY_MAX_ENTRIES') return '100'; + if (key === 'REDIS_KEY_PREFIX') return 'price'; + return undefined; + }); + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + StorageService, + { provide: ConfigService, useValue: { get: configGetFn } }, + { provide: MetricsService, useValue: mockMetricsService }, + ], + }).compile(); + + const service = module.get(StorageService); + service.onModuleInit(); + return service; +} + +async function buildServiceNoMetrics(redisUrl: string | undefined): Promise { + const configGetFn = jest.fn((key: string): string | undefined => { + if (key === 'REDIS_URL') return redisUrl; + if (key === 'REDIS_PRICE_TTL_SECONDS') return '300'; + if (key === 'REDIS_HISTORY_MAX_ENTRIES') return '100'; + if (key === 'REDIS_KEY_PREFIX') return 'price'; + return undefined; + }); + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + StorageService, + { provide: ConfigService, useValue: { get: configGetFn } }, + ], + }).compile(); + + const service = module.get(StorageService); + service.onModuleInit(); + return service; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('StorageService', () => { + beforeEach(() => { + jest.clearAllMocks(); + mockPipelineExec.mockResolvedValue([]); + }); + + // ------------------------------------------------------------------------- + // 1. Initialization + // ------------------------------------------------------------------------- + + describe('initialization', () => { + it('should be defined', async () => { + const service = await buildService('redis://localhost:6379'); + expect(service).toBeDefined(); + }); + + it('should create a Redis client when REDIS_URL is set', async () => { + await buildService('redis://localhost:6379'); + expect(RedisMock).toHaveBeenCalledWith('redis://localhost:6379', expect.objectContaining({ + maxRetriesPerRequest: null, + enableReadyCheck: true, + lazyConnect: false, + })); + }); + + it('should NOT create a Redis client when REDIS_URL is absent', async () => { + await buildService(undefined); + expect(RedisMock).not.toHaveBeenCalled(); + }); + + it('should attach event listeners on the Redis client', async () => { + await buildService('redis://localhost:6379'); + expect(mockOn).toHaveBeenCalledWith('connect', expect.any(Function)); + expect(mockOn).toHaveBeenCalledWith('ready', expect.any(Function)); + expect(mockOn).toHaveBeenCalledWith('error', expect.any(Function)); + expect(mockOn).toHaveBeenCalledWith('close', expect.any(Function)); + expect(mockOn).toHaveBeenCalledWith('reconnecting', expect.any(Function)); + }); + + it('should configure retryStrategy that backs off and caps at 3 seconds', async () => { + await buildService('redis://localhost:6379'); + const calls = RedisMock.mock.calls as unknown[][]; + const options = calls[0][1] as any; + expect(options.retryStrategy).toBeDefined(); + // Back-off increases with attempts (delay = times * 100, capped at 3000ms) + expect(options.retryStrategy(1)).toBe(100); + expect(options.retryStrategy(10)).toBe(1000); + expect(options.retryStrategy(20)).toBe(2000); + // Returns null (stop retrying) after more than 20 attempts + expect(options.retryStrategy(21)).toBeNull(); + expect(options.retryStrategy(30)).toBeNull(); + }); + }); + + // ------------------------------------------------------------------------- + // 2. onModuleDestroy + // ------------------------------------------------------------------------- + + describe('onModuleDestroy', () => { + it('should disconnect the Redis client', async () => { + const service = await buildService('redis://localhost:6379'); + service.onModuleDestroy(); + expect(mockDisconnect).toHaveBeenCalled(); + }); + + it('should not throw when client is null (no-op mode)', async () => { + const service = await buildService(undefined); + expect(() => service.onModuleDestroy()).not.toThrow(); + }); + }); + + // ------------------------------------------------------------------------- + // 3. storePrice + // ------------------------------------------------------------------------- + + describe('storePrice', () => { + it('should write the latest key with correct TTL', async () => { + const service = await buildService('redis://localhost:6379'); + const price = makeMockAggregatedPrice('AAPL'); + await service.storePrice(price); + + expect(mockPipeline.set).toHaveBeenCalledWith( + 'price:latest:AAPL', + JSON.stringify(price), + 'EX', + 300, + ); + }); + + it('should add to history sorted set with computedAt as score', async () => { + const service = await buildService('redis://localhost:6379'); + const price = makeMockAggregatedPrice('AAPL'); + await service.storePrice(price); + + expect(mockPipeline.zadd).toHaveBeenCalledWith( + 'price:history:AAPL', + price.computedAt, + JSON.stringify(price), + ); + }); + + it('should trim history with ZREMRANGEBYRANK using correct rank bounds', async () => { + const service = await buildService('redis://localhost:6379'); + const price = makeMockAggregatedPrice('AAPL'); + await service.storePrice(price); + + // Keep newest 100 entries: remove rank 0 to -(100+1) = -101 + expect(mockPipeline.zremrangebyrank).toHaveBeenCalledWith( + 'price:history:AAPL', + 0, + -(100 + 1), + ); + }); + + it('should add symbol to the symbols index set', async () => { + const service = await buildService('redis://localhost:6379'); + const price = makeMockAggregatedPrice('AAPL'); + await service.storePrice(price); + + expect(mockPipeline.sadd).toHaveBeenCalledWith('price:symbols', 'AAPL'); + }); + + it('should execute the pipeline', async () => { + const service = await buildService('redis://localhost:6379'); + const price = makeMockAggregatedPrice('AAPL'); + await service.storePrice(price); + + expect(mockPipelineExec).toHaveBeenCalled(); + }); + + it('should uppercase the symbol in all keys', async () => { + const service = await buildService('redis://localhost:6379'); + const price = makeMockAggregatedPrice('aapl'); + await service.storePrice(price); + + expect(mockPipeline.set).toHaveBeenCalledWith( + 'price:latest:AAPL', + expect.any(String), + 'EX', + expect.any(Number), + ); + expect(mockPipeline.zadd).toHaveBeenCalledWith( + 'price:history:AAPL', + expect.any(Number), + expect.any(String), + ); + }); + + it('should be a no-op when client is null (no-op mode)', async () => { + const service = await buildService(undefined); + await expect(service.storePrice(makeMockAggregatedPrice())).resolves.toBeUndefined(); + expect(mockPipelineFactory).not.toHaveBeenCalled(); + }); + + it('should record write duration metric', async () => { + const service = await buildService('redis://localhost:6379'); + await service.storePrice(makeMockAggregatedPrice()); + expect(mockDurationObserve).toHaveBeenCalledWith({ operation: 'write' }, expect.any(Number)); + }); + }); + + // ------------------------------------------------------------------------- + // 4. getLatestPrice + // ------------------------------------------------------------------------- + + describe('getLatestPrice', () => { + it('should return null in no-op mode', async () => { + const service = await buildService(undefined); + await expect(service.getLatestPrice('AAPL')).resolves.toBeNull(); + }); + + it('should return null on cache miss', async () => { + const service = await buildService('redis://localhost:6379'); + mockGet.mockResolvedValueOnce(null); + await expect(service.getLatestPrice('AAPL')).resolves.toBeNull(); + }); + + it('should return parsed AggregatedPrice on cache hit', async () => { + const service = await buildService('redis://localhost:6379'); + const price = makeMockAggregatedPrice('AAPL'); + mockGet.mockResolvedValueOnce(JSON.stringify(price)); + + const result = await service.getLatestPrice('AAPL'); + expect(result).toEqual(price); + }); + + it('should call GET with the correct key', async () => { + const service = await buildService('redis://localhost:6379'); + mockGet.mockResolvedValueOnce(null); + await service.getLatestPrice('GOOGL'); + expect(mockGet).toHaveBeenCalledWith('price:latest:GOOGL'); + }); + + it('should increment cache hits metric on hit', async () => { + const service = await buildService('redis://localhost:6379'); + const price = makeMockAggregatedPrice(); + mockGet.mockResolvedValueOnce(JSON.stringify(price)); + + await service.getLatestPrice('AAPL'); + expect(mockCacheHitsInc).toHaveBeenCalledWith(1); + expect(mockCacheMissesInc).not.toHaveBeenCalled(); + }); + + it('should increment cache misses metric on miss', async () => { + const service = await buildService('redis://localhost:6379'); + mockGet.mockResolvedValueOnce(null); + + await service.getLatestPrice('AAPL'); + expect(mockCacheMissesInc).toHaveBeenCalledWith(1); + expect(mockCacheHitsInc).not.toHaveBeenCalled(); + }); + + it('should record read duration metric', async () => { + const service = await buildService('redis://localhost:6379'); + mockGet.mockResolvedValueOnce(null); + await service.getLatestPrice('AAPL'); + expect(mockDurationObserve).toHaveBeenCalledWith({ operation: 'read' }, expect.any(Number)); + }); + + it('should not throw when MetricsService is not injected', async () => { + const service = await buildServiceNoMetrics('redis://localhost:6379'); + mockGet.mockResolvedValueOnce(null); + await expect(service.getLatestPrice('AAPL')).resolves.toBeNull(); + }); + }); + + // ------------------------------------------------------------------------- + // 5. getPriceHistory + // ------------------------------------------------------------------------- + + describe('getPriceHistory', () => { + it('should return empty array in no-op mode', async () => { + const service = await buildService(undefined); + await expect(service.getPriceHistory('AAPL')).resolves.toEqual([]); + }); + + it('should call ZRANGEBYSCORE with correct key and default range', async () => { + const service = await buildService('redis://localhost:6379'); + mockZrangebyscore.mockResolvedValueOnce([]); + await service.getPriceHistory('AAPL'); + expect(mockZrangebyscore).toHaveBeenCalledWith('price:history:AAPL', 0, '+inf'); + }); + + it('should call ZRANGEBYSCORE with custom time range', async () => { + const service = await buildService('redis://localhost:6379'); + mockZrangebyscore.mockResolvedValueOnce([]); + await service.getPriceHistory('AAPL', 1000, 2000); + expect(mockZrangebyscore).toHaveBeenCalledWith('price:history:AAPL', 1000, 2000); + }); + + it('should pass LIMIT clause when limit argument is provided', async () => { + const service = await buildService('redis://localhost:6379'); + mockZrangebyscore.mockResolvedValueOnce([]); + await service.getPriceHistory('AAPL', 0, '+inf', 10); + expect(mockZrangebyscore).toHaveBeenCalledWith( + 'price:history:AAPL', + 0, + '+inf', + 'LIMIT', + 0, + 10, + ); + }); + + it('should parse and return all JSON members', async () => { + const service = await buildService('redis://localhost:6379'); + const p1 = makeMockAggregatedPrice('AAPL'); + const p2 = { ...p1, computedAt: p1.computedAt + 1000 }; + mockZrangebyscore.mockResolvedValueOnce([JSON.stringify(p1), JSON.stringify(p2)]); + + const result = await service.getPriceHistory('AAPL'); + expect(result).toHaveLength(2); + expect(result[0]).toEqual(p1); + expect(result[1]).toEqual(p2); + }); + + it('should return empty array when ZRANGEBYSCORE returns no results', async () => { + const service = await buildService('redis://localhost:6379'); + mockZrangebyscore.mockResolvedValueOnce([]); + await expect(service.getPriceHistory('AAPL')).resolves.toEqual([]); + }); + + it('should record read duration metric', async () => { + const service = await buildService('redis://localhost:6379'); + mockZrangebyscore.mockResolvedValueOnce([]); + await service.getPriceHistory('AAPL'); + expect(mockDurationObserve).toHaveBeenCalledWith({ operation: 'read' }, expect.any(Number)); + }); + }); + + // ------------------------------------------------------------------------- + // 6. storePriceBatch + // ------------------------------------------------------------------------- + + describe('storePriceBatch', () => { + it('should be a no-op in no-op mode', async () => { + const service = await buildService(undefined); + await expect( + service.storePriceBatch([makeMockAggregatedPrice()]), + ).resolves.toBeUndefined(); + expect(mockPipelineFactory).not.toHaveBeenCalled(); + }); + + it('should be a no-op when prices array is empty', async () => { + const service = await buildService('redis://localhost:6379'); + await service.storePriceBatch([]); + expect(mockPipelineFactory).not.toHaveBeenCalled(); + }); + + it('should pipeline writes for each price', async () => { + const service = await buildService('redis://localhost:6379'); + const p1 = makeMockAggregatedPrice('AAPL'); + const p2 = makeMockAggregatedPrice('GOOGL'); + await service.storePriceBatch([p1, p2]); + + expect(mockPipeline.set).toHaveBeenCalledTimes(2); + expect(mockPipeline.zadd).toHaveBeenCalledTimes(2); + expect(mockPipeline.zremrangebyrank).toHaveBeenCalledTimes(2); + expect(mockPipeline.sadd).toHaveBeenCalledTimes(2); + expect(mockPipelineExec).toHaveBeenCalledTimes(1); + }); + + it('should use correct keys for each symbol in the batch', async () => { + const service = await buildService('redis://localhost:6379'); + const p1 = makeMockAggregatedPrice('AAPL'); + const p2 = makeMockAggregatedPrice('TSLA'); + await service.storePriceBatch([p1, p2]); + + expect(mockPipeline.set).toHaveBeenCalledWith( + 'price:latest:AAPL', + expect.any(String), + 'EX', + 300, + ); + expect(mockPipeline.set).toHaveBeenCalledWith( + 'price:latest:TSLA', + expect.any(String), + 'EX', + 300, + ); + }); + + it('should record write duration metric', async () => { + const service = await buildService('redis://localhost:6379'); + await service.storePriceBatch([makeMockAggregatedPrice()]); + expect(mockDurationObserve).toHaveBeenCalledWith({ operation: 'write' }, expect.any(Number)); + }); + }); + + // ------------------------------------------------------------------------- + // 7. getLatestPriceBatch + // ------------------------------------------------------------------------- + + describe('getLatestPriceBatch', () => { + it('should return empty map in no-op mode', async () => { + const service = await buildService(undefined); + const result = await service.getLatestPriceBatch(['AAPL', 'GOOGL']); + expect(result.size).toBe(0); + }); + + it('should return empty map when symbols array is empty', async () => { + const service = await buildService('redis://localhost:6379'); + const result = await service.getLatestPriceBatch([]); + expect(result.size).toBe(0); + expect(mockMget).not.toHaveBeenCalled(); + }); + + it('should use MGET for a single round-trip', async () => { + const service = await buildService('redis://localhost:6379'); + mockMget.mockResolvedValueOnce([null, null]); + await service.getLatestPriceBatch(['AAPL', 'GOOGL']); + expect(mockMget).toHaveBeenCalledWith('price:latest:AAPL', 'price:latest:GOOGL'); + }); + + it('should return only symbols present in Redis (filter nulls)', async () => { + const service = await buildService('redis://localhost:6379'); + const price = makeMockAggregatedPrice('AAPL'); + mockMget.mockResolvedValueOnce([JSON.stringify(price), null]); + + const result = await service.getLatestPriceBatch(['AAPL', 'GOOGL']); + expect(result.size).toBe(1); + expect(result.get('AAPL')).toEqual(price); + expect(result.has('GOOGL')).toBe(false); + }); + + it('should record a hit metric for each found symbol', async () => { + const service = await buildService('redis://localhost:6379'); + const price = makeMockAggregatedPrice('AAPL'); + mockMget.mockResolvedValueOnce([JSON.stringify(price), JSON.stringify(price)]); + + await service.getLatestPriceBatch(['AAPL', 'MSFT']); + expect(mockCacheHitsInc).toHaveBeenCalledTimes(2); + }); + + it('should record a miss metric for each symbol not in cache', async () => { + const service = await buildService('redis://localhost:6379'); + mockMget.mockResolvedValueOnce([null, null]); + + await service.getLatestPriceBatch(['AAPL', 'MSFT']); + expect(mockCacheMissesInc).toHaveBeenCalledTimes(2); + }); + + it('should record read duration metric', async () => { + const service = await buildService('redis://localhost:6379'); + mockMget.mockResolvedValueOnce([]); + await service.getLatestPriceBatch(['AAPL']); + expect(mockDurationObserve).toHaveBeenCalledWith({ operation: 'read' }, expect.any(Number)); + }); + }); + + // ------------------------------------------------------------------------- + // 8. getTrackedSymbols + // ------------------------------------------------------------------------- + + describe('getTrackedSymbols', () => { + it('should return empty array in no-op mode', async () => { + const service = await buildService(undefined); + await expect(service.getTrackedSymbols()).resolves.toEqual([]); + }); + + it('should call SMEMBERS on the symbols key', async () => { + const service = await buildService('redis://localhost:6379'); + mockSmembers.mockResolvedValueOnce(['AAPL', 'GOOGL']); + const result = await service.getTrackedSymbols(); + expect(mockSmembers).toHaveBeenCalledWith('price:symbols'); + expect(result).toEqual(['AAPL', 'GOOGL']); + }); + + it('should return empty array when no symbols have been tracked', async () => { + const service = await buildService('redis://localhost:6379'); + mockSmembers.mockResolvedValueOnce([]); + await expect(service.getTrackedSymbols()).resolves.toEqual([]); + }); + }); + + // ------------------------------------------------------------------------- + // 9. deleteSymbol + // ------------------------------------------------------------------------- + + describe('deleteSymbol', () => { + it('should be a no-op in no-op mode', async () => { + const service = await buildService(undefined); + await expect(service.deleteSymbol('AAPL')).resolves.toBeUndefined(); + expect(mockPipelineFactory).not.toHaveBeenCalled(); + }); + + it('should DEL the latest key', async () => { + const service = await buildService('redis://localhost:6379'); + await service.deleteSymbol('AAPL'); + expect(mockPipeline.del).toHaveBeenCalledWith('price:latest:AAPL'); + }); + + it('should DEL the history key', async () => { + const service = await buildService('redis://localhost:6379'); + await service.deleteSymbol('AAPL'); + expect(mockPipeline.del).toHaveBeenCalledWith('price:history:AAPL'); + }); + + it('should SREM the symbol from the symbols index', async () => { + const service = await buildService('redis://localhost:6379'); + await service.deleteSymbol('AAPL'); + expect(mockPipeline.srem).toHaveBeenCalledWith('price:symbols', 'AAPL'); + }); + + it('should execute the pipeline', async () => { + const service = await buildService('redis://localhost:6379'); + await service.deleteSymbol('AAPL'); + expect(mockPipelineExec).toHaveBeenCalled(); + }); + + it('should uppercase the symbol', async () => { + const service = await buildService('redis://localhost:6379'); + await service.deleteSymbol('aapl'); + expect(mockPipeline.del).toHaveBeenCalledWith('price:latest:AAPL'); + expect(mockPipeline.del).toHaveBeenCalledWith('price:history:AAPL'); + }); + }); + + // ------------------------------------------------------------------------- + // 10. Key prefix customization + // ------------------------------------------------------------------------- + + describe('key prefix', () => { + it('should use a custom key prefix from config', async () => { + const configGetFn = jest.fn((key: string): string | undefined => { + if (key === 'REDIS_URL') return 'redis://localhost:6379'; + if (key === 'REDIS_KEY_PREFIX') return 'oracle'; + if (key === 'REDIS_PRICE_TTL_SECONDS') return '300'; + if (key === 'REDIS_HISTORY_MAX_ENTRIES') return '100'; + return undefined; + }); + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + StorageService, + { provide: ConfigService, useValue: { get: configGetFn } }, + { provide: MetricsService, useValue: mockMetricsService }, + ], + }).compile(); + + const service = module.get(StorageService); + service.onModuleInit(); + + const price = makeMockAggregatedPrice('AAPL'); + await service.storePrice(price); + + expect(mockPipeline.set).toHaveBeenCalledWith( + 'oracle:latest:AAPL', + expect.any(String), + 'EX', + expect.any(Number), + ); + expect(mockPipeline.zadd).toHaveBeenCalledWith( + 'oracle:history:AAPL', + expect.any(Number), + expect.any(String), + ); + expect(mockPipeline.sadd).toHaveBeenCalledWith('oracle:symbols', 'AAPL'); + }); + }); +}); diff --git a/apps/aggregator/src/services/storage.service.ts b/apps/aggregator/src/services/storage.service.ts new file mode 100644 index 0000000..eafb642 --- /dev/null +++ b/apps/aggregator/src/services/storage.service.ts @@ -0,0 +1,321 @@ +import { + Injectable, + Logger, + Optional, + OnModuleInit, + OnModuleDestroy, +} from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import Redis from 'ioredis'; +import { AggregatedPrice } from '../interfaces/aggregated-price.interface'; +import { MetricsService } from '../metrics/metrics.service'; +import { ResolvedStorageOptions } from '../interfaces/storage-options.interface'; + +/** + * StorageService + * + * Persists aggregated prices to Redis using three data structures: + * + * STRING {prefix}:latest:{SYMBOL} Latest AggregatedPrice (JSON), with TTL + * ZSET {prefix}:history:{SYMBOL} Time-series (score = computedAt ms), bounded length + * SET {prefix}:symbols Index of all tracked symbols + * + * When REDIS_URL is not set the service operates in no-op mode: + * all public methods return safe empty values without throwing. + * + * Auto-reconnection is handled by ioredis retryStrategy with + * exponential back-off (up to 3 s, max 20 attempts). + * + * maxRetriesPerRequest is set to null so commands issued while + * Redis is temporarily unreachable are queued internally instead of + * failing immediately — fulfilling requirement #6 (auto-reconnection). + */ +@Injectable() +export class StorageService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(StorageService.name); + private client: Redis | null = null; + private opts: ResolvedStorageOptions; + + constructor( + private readonly configService: ConfigService, + @Optional() private readonly metricsService?: MetricsService, + ) {} + + // --------------------------------------------------------------------------- + // Lifecycle + // --------------------------------------------------------------------------- + + onModuleInit(): void { + this.opts = this.resolveOptions(); + if (!this.opts.redisUrl) { + this.logger.warn('REDIS_URL not set — StorageService running in no-op mode'); + return; + } + this.connect(); + } + + onModuleDestroy(): void { + if (this.client) { + this.client.disconnect(); + this.logger.log('Redis connection closed on shutdown'); + } + } + + // --------------------------------------------------------------------------- + // Connection management (private) + // --------------------------------------------------------------------------- + + private resolveOptions(): ResolvedStorageOptions { + return { + redisUrl: this.configService.get('REDIS_URL') ?? '', + priceTtlSeconds: parseInt( + this.configService.get('REDIS_PRICE_TTL_SECONDS') ?? '300', + 10, + ), + historyMaxEntries: parseInt( + this.configService.get('REDIS_HISTORY_MAX_ENTRIES') ?? '100', + 10, + ), + keyPrefix: this.configService.get('REDIS_KEY_PREFIX') ?? 'price', + }; + } + + private connect(): void { + this.client = new Redis(this.opts.redisUrl, { + // null = queue commands during reconnect rather than failing immediately + maxRetriesPerRequest: null, + enableReadyCheck: true, + lazyConnect: false, + retryStrategy: (times: number): number | null => { + if (times > 20) { + this.logger.error(`Redis retry limit reached after ${times} attempts — giving up`); + return null; + } + const delay = Math.min(times * 100, 3000); + this.logger.warn(`Redis reconnect attempt ${times}, retrying in ${delay}ms`); + return delay; + }, + }); + + this.client.on('connect', () => { + this.logger.log('Redis connected'); + }); + + this.client.on('ready', () => { + this.logger.log('Redis ready'); + }); + + this.client.on('error', (err: Error) => { + this.logger.error(`Redis error: ${err.message}`); + }); + + this.client.on('close', () => { + this.logger.warn('Redis connection closed'); + }); + + this.client.on('reconnecting', () => { + this.logger.log('Redis reconnecting...'); + }); + } + + // --------------------------------------------------------------------------- + // Key builders (private) + // --------------------------------------------------------------------------- + + private latestKey(symbol: string): string { + return `${this.opts.keyPrefix}:latest:${symbol.toUpperCase()}`; + } + + private historyKey(symbol: string): string { + return `${this.opts.keyPrefix}:history:${symbol.toUpperCase()}`; + } + + private symbolsKey(): string { + return `${this.opts.keyPrefix}:symbols`; + } + + // --------------------------------------------------------------------------- + // Metrics helpers (private) + // --------------------------------------------------------------------------- + + private recordCacheAccess(hit: boolean): void { + if (!this.metricsService) return; + if (hit) { + this.metricsService.storageCacheHits.inc(1); + } else { + this.metricsService.storageCacheMisses.inc(1); + } + } + + private recordDuration(operation: string, durationMs: number): void { + this.metricsService?.storageOperationDuration.observe( + { operation }, + durationMs / 1000, + ); + } + + // --------------------------------------------------------------------------- + // Public API + // --------------------------------------------------------------------------- + + /** + * Store one aggregated price. + * + * Executes a single pipeline (one network round-trip) that: + * 1. Sets the latest price key with TTL (STRING + EXPIRE). + * 2. Appends to the history sorted set (ZADD, score = computedAt). + * 3. Trims history to the configured max entries (ZREMRANGEBYRANK). + * 4. Records the symbol in the global index (SADD). + */ + async storePrice(price: AggregatedPrice): Promise { + if (!this.client) return; + + const start = Date.now(); + const symbol = price.symbol.toUpperCase(); + const serialized = JSON.stringify(price); + + const pipeline = this.client.pipeline(); + pipeline.set(this.latestKey(symbol), serialized, 'EX', this.opts.priceTtlSeconds); + pipeline.zadd(this.historyKey(symbol), price.computedAt, serialized); + // Keep only the newest historyMaxEntries — remove all entries below that rank + pipeline.zremrangebyrank(this.historyKey(symbol), 0, -(this.opts.historyMaxEntries + 1)); + pipeline.sadd(this.symbolsKey(), symbol); + + await pipeline.exec(); + this.recordDuration('write', Date.now() - start); + this.logger.debug(`Stored price for ${symbol}`); + } + + /** + * Retrieve the latest aggregated price for a symbol. + * + * Returns null if the key does not exist, has expired, or Redis is unavailable. + */ + async getLatestPrice(symbol: string): Promise { + if (!this.client) return null; + + const start = Date.now(); + const raw = await this.client.get(this.latestKey(symbol)); + const hit = raw !== null; + + this.recordCacheAccess(hit); + this.recordDuration('read', Date.now() - start); + + if (!raw) return null; + return JSON.parse(raw) as AggregatedPrice; + } + + /** + * Retrieve price history for a symbol within an optional time range. + * + * Uses ZRANGEBYSCORE on the history sorted set (scored by computedAt ms). + * + * @param symbol Trading symbol (e.g., 'AAPL') + * @param fromMs Start timestamp inclusive, Unix ms. Default: 0 (all history) + * @param toMs End timestamp inclusive, Unix ms or '+inf'. Default: '+inf' + * @param limit Maximum number of entries returned. Default: all + */ + async getPriceHistory( + symbol: string, + fromMs: number = 0, + toMs: number | '+inf' = '+inf', + limit?: number, + ): Promise { + if (!this.client) return []; + + const start = Date.now(); + let raw: string[]; + + if (limit !== undefined) { + raw = await this.client.zrangebyscore( + this.historyKey(symbol), + fromMs, + toMs, + 'LIMIT', + 0, + limit, + ); + } else { + raw = await this.client.zrangebyscore(this.historyKey(symbol), fromMs, toMs); + } + + this.recordDuration('read', Date.now() - start); + return raw.map((r) => JSON.parse(r) as AggregatedPrice); + } + + /** + * Store latest prices for multiple symbols in a single pipeline (batch write). + * + * Executes one network round-trip regardless of how many symbols are provided. + */ + async storePriceBatch(prices: AggregatedPrice[]): Promise { + if (!this.client || prices.length === 0) return; + + const start = Date.now(); + const pipeline = this.client.pipeline(); + + for (const price of prices) { + const symbol = price.symbol.toUpperCase(); + const serialized = JSON.stringify(price); + pipeline.set(this.latestKey(symbol), serialized, 'EX', this.opts.priceTtlSeconds); + pipeline.zadd(this.historyKey(symbol), price.computedAt, serialized); + pipeline.zremrangebyrank(this.historyKey(symbol), 0, -(this.opts.historyMaxEntries + 1)); + pipeline.sadd(this.symbolsKey(), symbol); + } + + await pipeline.exec(); + this.recordDuration('write', Date.now() - start); + this.logger.debug(`Batch stored ${prices.length} prices`); + } + + /** + * Retrieve latest prices for multiple symbols (batch read). + * + * Uses MGET for a single network round-trip. + * Symbols with no data (expired or never written) are omitted from the result. + */ + async getLatestPriceBatch(symbols: string[]): Promise> { + if (!this.client || symbols.length === 0) return new Map(); + + const start = Date.now(); + const upperSymbols = symbols.map((s) => s.toUpperCase()); + const keys = upperSymbols.map((s) => this.latestKey(s)); + const values = await this.client.mget(...keys); + + const results = new Map(); + for (let i = 0; i < upperSymbols.length; i++) { + const raw = values[i]; + this.recordCacheAccess(raw !== null); + if (raw) { + results.set(upperSymbols[i], JSON.parse(raw) as AggregatedPrice); + } + } + + this.recordDuration('read', Date.now() - start); + return results; + } + + /** + * Return all symbols that have been stored at least once. + */ + async getTrackedSymbols(): Promise { + if (!this.client) return []; + return this.client.smembers(this.symbolsKey()); + } + + /** + * Delete all data for a symbol (latest key, history sorted set, symbol index entry). + * Primarily used for testing and administrative operations. + */ + async deleteSymbol(symbol: string): Promise { + if (!this.client) return; + + const s = symbol.toUpperCase(); + const pipeline = this.client.pipeline(); + pipeline.del(this.latestKey(s)); + pipeline.del(this.historyKey(s)); + pipeline.srem(this.symbolsKey(), s); + await pipeline.exec(); + this.logger.debug(`Deleted all data for ${s}`); + } +} diff --git a/apps/ingestor/src/modules/prices.module.ts b/apps/ingestor/src/modules/prices.module.ts index d496a4f..ed44a90 100644 --- a/apps/ingestor/src/modules/prices.module.ts +++ b/apps/ingestor/src/modules/prices.module.ts @@ -1,8 +1,9 @@ import { Module } from '@nestjs/common'; import { PricesController } from '../controllers/prices.controller'; import { PriceFetcherService } from '../services/price-fetcher.service'; -import { StockService } from '../services/stock.service'; -import { FinnhubAdapter } from '../providers/finnhub.adapter'; +import { SchedulerService } from '../services/scheduler.service'; +import { StockService } from '../services/stock.service'; +import { FinnhubAdapter } from '../providers/finnhub.adapter'; @Module({ controllers: [PricesController], diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..6073cef --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,27 @@ +version: '3.9' + +services: + redis: + image: redis:7.2-alpine + container_name: neko-oracle-redis + ports: + - '6379:6379' + volumes: + - redis_data:/data + command: > + redis-server + --save 60 1 + --loglevel notice + --maxmemory 256mb + --maxmemory-policy allkeys-lru + healthcheck: + test: ['CMD', 'redis-cli', 'ping'] + interval: 10s + timeout: 5s + retries: 5 + start_period: 5s + restart: unless-stopped + +volumes: + redis_data: + driver: local