Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion apps/aggregator/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,23 @@ PORT=3001
INGESTOR_WS_URL=ws://localhost:3000
INGESTOR_HTTP_URL=http://localhost:3000

# Redis (optional; used for caching/session when set; health check verifies connectivity)
# Redis Storage Configuration
# When REDIS_URL is set, StorageService persists aggregated prices to Redis.
# When unset, StorageService runs in no-op mode (prices are only held in memory).
# REDIS_URL=redis://localhost:6379

# TTL in seconds for the latest price key (price:{prefix}:latest:{SYMBOL}).
# After this many seconds the key expires automatically. Default: 300 (5 minutes).
# REDIS_PRICE_TTL_SECONDS=300

# Maximum number of historical price entries retained per symbol in the sorted set.
# Entries older than this limit are pruned automatically on each write. Default: 100.
# REDIS_HISTORY_MAX_ENTRIES=100

# Prefix for all Redis keys. Useful for namespace isolation in shared Redis instances.
# Keys will be: {prefix}:latest:{SYMBOL}, {prefix}:history:{SYMBOL}, {prefix}:symbols
# Default: price
# REDIS_KEY_PREFIX=price

# Signer Service (for publishing aggregated data)
SIGNER_URL=http://localhost:3002
67 changes: 63 additions & 4 deletions apps/aggregator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ The Aggregator service is responsible for calculating a single consensus price p
| `/ready` | GET | Readiness probe for Kubernetes. Same checks as `/health`; returns 200 when the service can accept traffic. |
| `/live` | GET | Liveness probe for Kubernetes. Returns 200 when the process is alive (no dependency checks). |
| `/status` | GET | Detailed system information: uptime, memory usage, dependency check results, and version. |
| `/metrics` | GET | Prometheus metrics in [exposition format](https://prometheus.io/docs/instrumenting/exposition_formats/). Scrape this endpoint for aggregation count, latency, errors, and default Node.js metrics. |
| `/metrics` | GET | Prometheus metrics in [exposition format](https://prometheus.io/docs/instrumenting/exposition_formats/). Scrape this endpoint for aggregation count, latency, errors, storage cache hits/misses, and default Node.js metrics. |
| `/debug/prices` | GET | Last aggregated and normalized prices held in memory. Useful for debugging without hitting external systems. |

**Health checks**: When `REDIS_URL` or `INGESTOR_URL` are set, the health check verifies connectivity. If a configured dependency is unreachable, `/health` and `/ready` return 503. If not set, that dependency is skipped (not included in the check).
Expand All @@ -48,14 +48,21 @@ aggregator/
│ │ ├── normalized-price.interface.ts
│ │ ├── aggregated-price.interface.ts
│ │ ├── aggregator.interface.ts
│ │ └── aggregation-config.interface.ts
│ │ ├── aggregation-config.interface.ts
│ │ └── storage-options.interface.ts
│ ├── strategies/
│ │ └── aggregators/ # Aggregation strategy implementations
│ │ ├── weighted-average.aggregator.ts
│ │ ├── median.aggregator.ts
│ │ └── trimmed-mean.aggregator.ts
│ ├── services/
│ │ └── aggregation.service.ts # Main aggregation service
│ │ ├── aggregation.service.ts # Main aggregation service
│ │ ├── data-reception.service.ts
│ │ ├── normalization.service.ts
│ │ └── storage.service.ts # Redis persistence layer
│ ├── modules/
│ │ ├── normalization.module.ts
│ │ └── storage.module.ts # Redis storage module
│ ├── health/ # Health checks (Terminus)
│ │ ├── health.controller.ts
│ │ └── indicators/
Expand All @@ -68,10 +75,61 @@ aggregator/
│ │ ├── debug.controller.ts
│ │ └── debug.service.ts
│ ├── config/
│ │ └── source-weights.config.ts # Weight configuration
│ │ ├── source-weights.config.ts # Weight configuration
│ │ └── redis.config.ts # Redis config factory
│ └── app.module.ts
```

## Redis Storage

The aggregator uses Redis to persist aggregated prices for fast access by downstream services (e.g., the API service). Redis is **optional** — when `REDIS_URL` is not set, the service runs in no-op mode and all storage operations are silently skipped.

### Quick Start

```bash
# Start Redis with Docker
docker compose up -d redis

# Add to apps/aggregator/.env
REDIS_URL=redis://localhost:6379
```

### Redis Key Schema

| Key Pattern | Type | TTL | Purpose |
|-------------|------|-----|---------|
| `price:latest:{SYMBOL}` | STRING (JSON) | `REDIS_PRICE_TTL_SECONDS` (default 300 s) | Latest `AggregatedPrice` per symbol, expires automatically |
| `price:history:{SYMBOL}` | SORTED SET, score = `computedAt` ms | None (bounded by max-entries trim) | Time-series history, up to `REDIS_HISTORY_MAX_ENTRIES` entries |
| `price:symbols` | SET | None | Index of all symbols that have been stored |

The prefix `price` can be overridden with `REDIS_KEY_PREFIX` for namespace isolation in shared Redis instances.

### Environment Variables

| Variable | Default | Description |
|----------|---------|-------------|
| `REDIS_URL` | _(unset)_ | Redis connection URL. When absent, StorageService runs in no-op mode. |
| `REDIS_PRICE_TTL_SECONDS` | `300` | Seconds before a `latest` price key expires. Default is 5 minutes. |
| `REDIS_HISTORY_MAX_ENTRIES` | `100` | Maximum history entries kept per symbol. Older entries are pruned automatically on write. |
| `REDIS_KEY_PREFIX` | `price` | Prefix for all Redis keys. Useful for separating environments. |

### Graceful Degradation

When `REDIS_URL` is not configured:
- `StorageService.onModuleInit()` logs a warning and sets `client = null`.
- All public methods (`storePrice`, `getLatestPrice`, `getPriceHistory`, `storePriceBatch`, `getLatestPriceBatch`, `getTrackedSymbols`, `deleteSymbol`) return safe empty values immediately.
- The service logs and never throws — a Redis failure never interrupts price aggregation.

### Prometheus Metrics (Storage)

| Metric | Type | Description |
|--------|------|-------------|
| `aggregator_storage_cache_hits_total` | Counter | Total Redis read cache hits |
| `aggregator_storage_cache_misses_total` | Counter | Total Redis read cache misses |
| `aggregator_storage_operation_duration_seconds` | Histogram (labels: `operation`) | Duration of Redis `read`/`write` operations in seconds |

These metrics are available at `GET /metrics` alongside existing aggregation metrics.

## Aggregation Methods

### Weighted Average
Expand Down Expand Up @@ -387,6 +445,7 @@ npm run test:cov
All components have comprehensive test coverage (>85%):

- ✅ `aggregation.service.spec.ts` - 50+ test cases
- ✅ `storage.service.spec.ts` - 40+ test cases (mocked ioredis)
- ✅ `weighted-average.aggregator.spec.ts` - 15+ test cases
- ✅ `median.aggregator.spec.ts` - 18+ test cases
- ✅ `trimmed-mean.aggregator.spec.ts` - 20+ test cases
Expand Down
3 changes: 1 addition & 2 deletions apps/aggregator/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@
"@nestjs/platform-express": "^10.0.0",
"@oracle-stocks/shared": "*",
"@nestjs/terminus": "^10.0.0",
"axios": "^1.6.0",
"axios": "^1.13.4",
"ioredis": "^5.3.2",
"prom-client": "^15.1.0",
"axios": "^1.13.4",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.3",
"reflect-metadata": "^0.1.13",
Expand Down
20 changes: 11 additions & 9 deletions apps/aggregator/src/app.module.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
import { Module } from '@nestjs/common';
import { NormalizationModule } from './modules/normalization.module';
import { HttpModule } from '@nestjs/axios';
import { ConfigModule } from '@nestjs/config';
import { EventEmitterModule } from '@nestjs/event-emitter';
import { NormalizationModule } from './modules/normalization.module';
import { StorageModule } from './modules/storage.module';
import { HealthModule } from './health/health.module';
import { MetricsModule } from './metrics/metrics.module';
import { DebugModule } from './debug/debug.module';
import { DataReceptionService } from './services/data-reception.service';
import { AggregationService } from './services/aggregation.service';
import { WeightedAverageAggregator } from './strategies/aggregators/weighted-average.aggregator';
import { MedianAggregator } from './strategies/aggregators/median.aggregator';
import { TrimmedMeanAggregator } from './strategies/aggregators/trimmed-mean.aggregator';
import { HealthModule } from './health/health.module';
import { MetricsModule } from './metrics/metrics.module';
import { DebugModule } from './debug/debug.module';

@Module({
imports: [
NormalizationModule,
ConfigModule.forRoot({ isGlobal: true, envFilePath: '.env' }),
NormalizationModule,
HealthModule,
MetricsModule,
DebugModule,
ConfigModule.forRoot({
isGlobal: true,
}),
StorageModule,
HttpModule,
EventEmitterModule.forRoot(),
],
Expand All @@ -32,4 +34,4 @@ import { DebugModule } from './debug/debug.module';
],
exports: [AggregationService],
})
export class AppModule { }
export class AppModule {}
14 changes: 14 additions & 0 deletions apps/aggregator/src/config/redis.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { registerAs } from '@nestjs/config';

/**
* Redis configuration factory.
* Reads REDIS_* environment variables and exposes them as a typed config namespace.
*
* Usage: configService.get<ReturnType<typeof redisConfig>>('redis')
*/
export const redisConfig = registerAs('redis', () => ({
url: process.env.REDIS_URL ?? '',
priceTtlSeconds: parseInt(process.env.REDIS_PRICE_TTL_SECONDS ?? '300', 10),
historyMaxEntries: parseInt(process.env.REDIS_HISTORY_MAX_ENTRIES ?? '100', 10),
keyPrefix: process.env.REDIS_KEY_PREFIX ?? 'price',
}));
33 changes: 33 additions & 0 deletions apps/aggregator/src/interfaces/storage-options.interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Options for StorageModule / StorageService configuration.
* All fields are optional; defaults are safe for a local Redis instance.
*/
export interface StorageModuleOptions {
/** Redis connection URL. If absent, StorageService runs in no-op mode. */
redisUrl?: string;

/** TTL in seconds for price:latest keys. Default: 300 (5 minutes). */
priceTtlSeconds?: number;

/**
* Maximum number of history entries kept per symbol in the Sorted Set.
* Older entries are removed by ZREMRANGEBYRANK after each write.
* Default: 100.
*/
historyMaxEntries?: number;

/**
* Prefix for all Redis keys. Default: "price".
* Useful for namespace isolation in shared Redis instances.
* Keys will be: {prefix}:latest:{SYMBOL}, {prefix}:history:{SYMBOL}, {prefix}:symbols
*/
keyPrefix?: string;
}

/** Resolved, fully-defaulted options used internally by StorageService. */
export interface ResolvedStorageOptions {
redisUrl: string;
priceTtlSeconds: number;
historyMaxEntries: number;
keyPrefix: string;
}
26 changes: 26 additions & 0 deletions apps/aggregator/src/metrics/metrics.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ export class MetricsService {
/** Throughput: aggregations per symbol (optional dimension) */
readonly aggregationsBySymbol: Counter<string>;

/** Number of cache hits on Redis price reads */
readonly storageCacheHits: Counter<string>;

/** Number of cache misses on Redis price reads */
readonly storageCacheMisses: Counter<string>;

/** Storage operation duration (read/write) in seconds */
readonly storageOperationDuration: Histogram<string>;

constructor() {
this.register = new Registry();
this.aggregationCount = new Counter({
Expand Down Expand Up @@ -53,6 +62,23 @@ export class MetricsService {
labelNames: ['symbol', 'method'],
registers: [this.register],
});
this.storageCacheHits = new Counter({
name: 'aggregator_storage_cache_hits_total',
help: 'Total cache hits on Redis price reads',
registers: [this.register],
});
this.storageCacheMisses = new Counter({
name: 'aggregator_storage_cache_misses_total',
help: 'Total cache misses on Redis price reads',
registers: [this.register],
});
this.storageOperationDuration = new Histogram({
name: 'aggregator_storage_operation_duration_seconds',
help: 'Storage (Redis) operation duration in seconds',
labelNames: ['operation'],
buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1],
registers: [this.register],
});
collectDefaultMetrics({ register: this.register, prefix: 'aggregator_' });
}

Expand Down
20 changes: 20 additions & 0 deletions apps/aggregator/src/modules/storage.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { StorageService } from '../services/storage.service';
import { MetricsModule } from '../metrics/metrics.module';

/**
* StorageModule
*
* Provides and exports StorageService for Redis-backed price persistence.
* Import this module wherever StorageService is needed.
*/
@Module({
imports: [
ConfigModule, // StorageService reads REDIS_* vars via ConfigService
MetricsModule, // StorageService optionally injects MetricsService
],
providers: [StorageService],
exports: [StorageService],
})
export class StorageModule {}
5 changes: 5 additions & 0 deletions apps/aggregator/src/services/aggregation.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { TrimmedMeanAggregator } from '../strategies/aggregators/trimmed-mean.ag
import { getSourceWeight } from '../config/source-weights.config';
import { MetricsService } from '../metrics/metrics.service';
import { DebugService } from '../debug/debug.service';
import { StorageService } from './storage.service';

/**
* Configuration options for the aggregation service
Expand Down Expand Up @@ -43,6 +44,7 @@ export class AggregationService {
constructor(
@Optional() private readonly metricsService?: MetricsService,
@Optional() private readonly debugService?: DebugService,
@Optional() private readonly storageService?: StorageService,
) {
// Initialize all aggregation strategies
this.aggregators = new Map<string, IAggregator>();
Expand Down Expand Up @@ -150,6 +152,9 @@ export class AggregationService {
symbol,
(Date.now() - startTime) / 1000,
);
this.storageService?.storePrice(result).catch((err) =>
this.logger.error(`StorageService.storePrice failed: ${err.message}`),
);
return result;
} catch (err) {
this.metricsService?.recordError(method);
Expand Down
Loading