diff --git a/src/metrics/index.ts b/src/metrics/index.ts new file mode 100644 index 0000000..29819d4 --- /dev/null +++ b/src/metrics/index.ts @@ -0,0 +1,15 @@ +export { MetricStore } from './store.js' +export { RingBuffer } from './ring-buffer.js' +export { + MetricSampleSchema, + MetricSampleInputSchema, + MetricUnitSchema, + MetricLabelsSchema, + type MetricSample, + type MetricSampleInput, + type MetricUnit, + type MetricLabels, + type MetricQuery, + type AggregateFunction, + type AggregateResult, +} from './types.js' diff --git a/src/metrics/ring-buffer.test.ts b/src/metrics/ring-buffer.test.ts new file mode 100644 index 0000000..036d18b --- /dev/null +++ b/src/metrics/ring-buffer.test.ts @@ -0,0 +1,80 @@ +import { describe, it, expect, beforeEach } from 'vitest' +import { RingBuffer } from './ring-buffer.js' +import type { MetricSample } from './types.js' + +function makeSample(value: number, offsetMs = 0): MetricSample { + return { + metric_name: 'test.metric', + value, + unit: 'count', + timestamp: new Date(1_000_000 + offsetMs), + labels: {}, + } +} + +describe('RingBuffer', () => { + it('should reject non-positive capacity', () => { + expect(() => new RingBuffer(0)).toThrow(RangeError) + expect(() => new RingBuffer(-1)).toThrow(RangeError) + expect(() => new RingBuffer(1.5)).toThrow(RangeError) + }) + + it('should report size 0 when empty', () => { + const buf = new RingBuffer(4) + expect(buf.size).toBe(0) + expect(buf.isFull).toBe(false) + expect(buf.toArray()).toEqual([]) + }) + + it('should grow size as samples are pushed until capacity', () => { + const buf = new RingBuffer(3) + buf.push(makeSample(1)) + expect(buf.size).toBe(1) + buf.push(makeSample(2)) + buf.push(makeSample(3)) + expect(buf.size).toBe(3) + expect(buf.isFull).toBe(true) + }) + + it('should preserve insertion order when buffer is not full', () => { + const buf = new RingBuffer(5) + buf.push(makeSample(10, 0)) + buf.push(makeSample(20, 1)) + buf.push(makeSample(30, 2)) + expect(buf.toArray().map(s => s.value)).toEqual([10, 20, 30]) + }) + + it('should overwrite oldest entry when full', () => { + const buf = new RingBuffer(3) + buf.push(makeSample(1)) + buf.push(makeSample(2)) + buf.push(makeSample(3)) + buf.push(makeSample(4)) // overwrites value=1 + expect(buf.size).toBe(3) + expect(buf.toArray().map(s => s.value)).toEqual([2, 3, 4]) + }) + + it('should maintain oldest-first order after multiple overwrites', () => { + const buf = new RingBuffer(3) + for (let i = 1; i <= 7; i++) buf.push(makeSample(i)) + // After 7 pushes into capacity-3 buffer, last 3 are 5, 6, 7 + expect(buf.toArray().map(s => s.value)).toEqual([5, 6, 7]) + }) + + it('should return empty array after clear', () => { + const buf = new RingBuffer(3) + buf.push(makeSample(1)) + buf.push(makeSample(2)) + buf.clear() + expect(buf.size).toBe(0) + expect(buf.toArray()).toEqual([]) + }) + + it('should accept new pushes after clear', () => { + const buf = new RingBuffer(2) + buf.push(makeSample(1)) + buf.clear() + buf.push(makeSample(99)) + expect(buf.toArray().map(s => s.value)).toEqual([99]) + }) +}) diff --git a/src/metrics/ring-buffer.ts b/src/metrics/ring-buffer.ts new file mode 100644 index 0000000..27458f1 --- /dev/null +++ b/src/metrics/ring-buffer.ts @@ -0,0 +1,59 @@ +import type { MetricSample } from './types.js' + +/** + * Fixed-capacity circular buffer for MetricSample. + * + * Writes are O(1). When the buffer is full the oldest slot is overwritten. + * This caps memory at O(capacity) regardless of ingest rate — safe for a + * long-running process with no external TSDB dependency at MVP stage. + */ +export class RingBuffer { + private readonly slots: Array + private head = 0 // next write position + private length = 0 // number of valid entries + + constructor(private readonly capacity: number) { + if (capacity < 1 || !Number.isInteger(capacity)) { + throw new RangeError(`RingBuffer capacity must be a positive integer, got ${capacity}`) + } + this.slots = new Array(capacity) + } + + get size(): number { + return this.length + } + + get isFull(): boolean { + return this.length === this.capacity + } + + push(sample: MetricSample): void { + this.slots[this.head] = sample + this.head = (this.head + 1) % this.capacity + if (this.length < this.capacity) this.length++ + } + + /** + * Returns all valid samples ordered oldest→newest. + * Allocates a new array on each call; callers that need high-frequency + * iteration should query via MetricStore instead. + */ + toArray(): MetricSample[] { + if (this.length === 0) return [] + + // When the buffer is not yet full, valid data lives in slots[0..length-1]. + // Once full, head points to the oldest entry (it was just overwritten). + const out: MetricSample[] = new Array(this.length) + const oldest = this.isFull ? this.head : 0 + for (let i = 0; i < this.length; i++) { + out[i] = this.slots[(oldest + i) % this.capacity] as MetricSample + } + return out + } + + clear(): void { + this.head = 0 + this.length = 0 + this.slots.fill(undefined) + } +} diff --git a/src/metrics/store.test.ts b/src/metrics/store.test.ts new file mode 100644 index 0000000..5399efb --- /dev/null +++ b/src/metrics/store.test.ts @@ -0,0 +1,179 @@ +import { describe, it, expect, beforeEach } from 'vitest' +import { MetricStore } from './store.js' + +function ts(offsetMs: number): Date { + return new Date(1_700_000_000_000 + offsetMs) +} + +describe('MetricStore.record', () => { + it('should store a sample and return it with a timestamp', () => { + const store = new MetricStore() + const sample = store.record({ metric_name: 'api.latency', value: 42, unit: 'ms' }) + expect(sample.metric_name).toBe('api.latency') + expect(sample.value).toBe(42) + expect(sample.timestamp).toBeInstanceOf(Date) + expect(store.size).toBe(1) + }) + + it('should use provided timestamp when supplied', () => { + const store = new MetricStore() + const t = ts(0) + const sample = store.record({ metric_name: 'x', value: 1, unit: 'count', timestamp: t }) + expect(sample.timestamp).toEqual(t) + }) + + it('should throw when metric_name is empty', () => { + const store = new MetricStore() + expect(() => store.record({ metric_name: '', value: 1, unit: 'count' })).toThrow() + }) + + it('should throw when value is NaN', () => { + const store = new MetricStore() + expect(() => store.record({ metric_name: 'x', value: NaN, unit: 'count' })).toThrow() + }) + + it('should throw when value is Infinity', () => { + const store = new MetricStore() + expect(() => store.record({ metric_name: 'x', value: Infinity, unit: 'ms' })).toThrow() + }) + + it('should throw on an unrecognised unit', () => { + const store = new MetricStore() + expect(() => + store.record({ metric_name: 'x', value: 1, unit: 'furlongs' as never }) + ).toThrow() + }) +}) + +describe('MetricStore.query', () => { + let store: MetricStore + + beforeEach(() => { + store = new MetricStore() + store.record({ metric_name: 'cpu', value: 10, unit: 'percent', timestamp: ts(0), labels: { host: 'a' } }) + store.record({ metric_name: 'cpu', value: 20, unit: 'percent', timestamp: ts(100), labels: { host: 'b' } }) + store.record({ metric_name: 'cpu', value: 30, unit: 'percent', timestamp: ts(200), labels: { host: 'a' } }) + store.record({ metric_name: 'mem', value: 50, unit: 'bytes', timestamp: ts(50), labels: { host: 'a' } }) + store.record({ metric_name: 'latency', value: 99, unit: 'ms', timestamp: ts(150), labels: { host: 'a' } }) + }) + + it('should return all samples when no filter is applied', () => { + expect(store.query()).toHaveLength(5) + }) + + it('should filter by metric_name', () => { + const results = store.query({ metric_name: 'cpu' }) + expect(results).toHaveLength(3) + expect(results.every(s => s.metric_name === 'cpu')).toBe(true) + }) + + it('should filter by from timestamp (inclusive)', () => { + const results = store.query({ metric_name: 'cpu', from: ts(100) }) + expect(results.map(s => s.value).sort()).toEqual([20, 30]) + }) + + it('should filter by to timestamp (inclusive)', () => { + const results = store.query({ metric_name: 'cpu', to: ts(100) }) + expect(results.map(s => s.value).sort()).toEqual([10, 20]) + }) + + it('should filter by a closed time range', () => { + const results = store.query({ from: ts(50), to: ts(150) }) + // mem@ts(50), cpu@ts(100), latency@ts(150) + expect(results).toHaveLength(3) + }) + + it('should filter by a single label', () => { + const results = store.query({ metric_name: 'cpu', labels: { host: 'a' } }) + expect(results).toHaveLength(2) + expect(results.every(s => s.labels['host'] === 'a')).toBe(true) + }) + + it('should return an empty array when no sample matches labels', () => { + expect(store.query({ labels: { host: 'z' } })).toEqual([]) + }) + + it('should return results newest-first', () => { + const results = store.query({ metric_name: 'cpu' }) + expect(results[0]?.value).toBe(30) // ts(200) is newest + expect(results[2]?.value).toBe(10) // ts(0) is oldest + }) + + it('should honour limit', () => { + expect(store.query({ limit: 2 })).toHaveLength(2) + }) +}) + +describe('MetricStore.aggregate', () => { + let store: MetricStore + + beforeEach(() => { + store = new MetricStore() + ;[10, 20, 30, 40, 50, 60, 70, 80, 90, 100].forEach((v, i) => { + store.record({ metric_name: 'req.duration', value: v, unit: 'ms', timestamp: ts(i * 100) }) + }) + }) + + it('should return null when no samples match', () => { + expect(store.aggregate('nonexistent', 'avg')).toBeNull() + }) + + it('should compute avg correctly', () => { + const result = store.aggregate('req.duration', 'avg') + expect(result?.value).toBe(55) // (10+20+...+100)/10 + }) + + it('should compute min correctly', () => { + expect(store.aggregate('req.duration', 'min')?.value).toBe(10) + }) + + it('should compute max correctly', () => { + expect(store.aggregate('req.duration', 'max')?.value).toBe(100) + }) + + it('should compute sum correctly', () => { + expect(store.aggregate('req.duration', 'sum')?.value).toBe(550) + }) + + it('should compute count correctly', () => { + expect(store.aggregate('req.duration', 'count')?.value).toBe(10) + }) + + it('should compute p95 correctly', () => { + // p95 of [10..100] (10 values) → ceil(10*0.95)=10 → sorted[9]=100 + expect(store.aggregate('req.duration', 'p95')?.value).toBe(100) + }) + + it('should compute p99 correctly', () => { + expect(store.aggregate('req.duration', 'p99')?.value).toBe(100) + }) + + it('should populate sample_count, from, and to fields', () => { + const result = store.aggregate('req.duration', 'avg') + expect(result?.sample_count).toBe(10) + expect(result?.from).toBeInstanceOf(Date) + expect(result?.to).toBeInstanceOf(Date) + }) + + it('should apply time-range filter before aggregating', () => { + // Only ts(0)..ts(200) → values 10, 20, 30 + const result = store.aggregate('req.duration', 'avg', { from: ts(0), to: ts(200) }) + expect(result?.value).toBe(20) + expect(result?.sample_count).toBe(3) + }) +}) + +describe('MetricStore capacity eviction', () => { + it('should evict oldest samples when capacity is exceeded', () => { + const store = new MetricStore({ capacity: 3 }) + store.record({ metric_name: 'm', value: 1, unit: 'count', timestamp: ts(0) }) + store.record({ metric_name: 'm', value: 2, unit: 'count', timestamp: ts(1) }) + store.record({ metric_name: 'm', value: 3, unit: 'count', timestamp: ts(2) }) + store.record({ metric_name: 'm', value: 4, unit: 'count', timestamp: ts(3) }) + + const results = store.query({ metric_name: 'm' }) + expect(results).toHaveLength(3) + // Newest-first: 4, 3, 2; value=1 was evicted + expect(results.map(s => s.value)).toEqual([4, 3, 2]) + }) +}) diff --git a/src/metrics/store.ts b/src/metrics/store.ts new file mode 100644 index 0000000..01acf53 --- /dev/null +++ b/src/metrics/store.ts @@ -0,0 +1,156 @@ +import { RingBuffer } from './ring-buffer.js' +import { + MetricSampleInputSchema, + type AggregateFunction, + type AggregateResult, + type MetricLabels, + type MetricQuery, + type MetricSample, + type MetricSampleInput, +} from './types.js' + +export interface MetricStoreOptions { + /** Max samples kept in memory. Oldest are dropped when exceeded. Default: 50_000 */ + capacity?: number +} + +/** + * In-memory time-series store backed by a ring buffer. + * + * Chosen over InfluxDB/TimescaleDB for the MVP because it requires zero + * infrastructure and the agent-runtime already runs in a constrained + * environment. The interface is designed so a TimescaleDB adapter can be + * swapped in behind the same API once query volumes justify it. + */ +export class MetricStore { + private readonly buffer: RingBuffer + + constructor(options: MetricStoreOptions = {}) { + this.buffer = new RingBuffer(options.capacity ?? 50_000) + } + + // ── Write ────────────────────────────────────────────────────────────────── + + /** + * Record a metric sample. Timestamp defaults to now. + * Throws a ZodError if the input fails validation. + */ + record(input: MetricSampleInput): MetricSample { + const parsed = MetricSampleInputSchema.parse(input) + const sample: MetricSample = { + ...parsed, + timestamp: parsed.timestamp ?? new Date(), + } + this.buffer.push(sample) + return sample + } + + // ── Read ─────────────────────────────────────────────────────────────────── + + /** + * Query samples with optional name, time-range, and label filters. + * Results are returned newest-first. + */ + query(q: MetricQuery = {}): MetricSample[] { + let samples = this.buffer.toArray() + + if (q.metric_name !== undefined) { + samples = samples.filter(s => s.metric_name === q.metric_name) + } + if (q.from !== undefined) { + const from = q.from + samples = samples.filter(s => s.timestamp >= from) + } + if (q.to !== undefined) { + const to = q.to + samples = samples.filter(s => s.timestamp <= to) + } + if (q.labels !== undefined) { + const filterLabels = q.labels + samples = samples.filter(s => labelsMatch(s.labels, filterLabels)) + } + + // Newest-first to match typical dashboard/API expectations + samples.reverse() + + if (q.limit !== undefined && q.limit > 0) { + samples = samples.slice(0, q.limit) + } + + return samples + } + + /** + * Compute a single aggregate statistic over a filtered set of samples. + * Returns null when there are no matching samples. + */ + aggregate( + metric_name: string, + fn: AggregateFunction, + filter: Omit = {}, + ): AggregateResult | null { + const samples = this.query({ ...filter, metric_name }) + if (samples.length === 0) return null + + const values = samples.map(s => s.value) + const timestamps = samples.map(s => s.timestamp) + + return { + metric_name, + fn, + value: computeAggregate(values, fn), + sample_count: samples.length, + from: min(timestamps), + to: max(timestamps), + } + } + + /** Number of samples currently in the buffer. */ + get size(): number { + return this.buffer.size + } + + /** Wipe all stored samples — useful in tests. */ + clear(): void { + this.buffer.clear() + } +} + +// ── Helpers ────────────────────────────────────────────────────────────────── + +function labelsMatch(sample: MetricLabels, filter: MetricLabels): boolean { + for (const [k, v] of Object.entries(filter)) { + if (sample[k] !== v) return false + } + return true +} + +function computeAggregate(values: number[], fn: AggregateFunction): number { + switch (fn) { + case 'count': return values.length + case 'sum': return values.reduce((a, b) => a + b, 0) + case 'min': return Math.min(...values) + case 'max': return Math.max(...values) + case 'avg': return values.reduce((a, b) => a + b, 0) / values.length + case 'p95': return percentile(values, 0.95) + case 'p99': return percentile(values, 0.99) + } +} + +/** Nearest-rank percentile. Sorts a copy — does not mutate the input. */ +function percentile(values: number[], p: number): number { + const sorted = [...values].sort((a, b) => a - b) + // Clamp to valid index range + const idx = Math.max(0, Math.ceil(sorted.length * p) - 1) + return sorted[idx] as number +} + +function min(dates: Date[]): Date | null { + if (dates.length === 0) return null + return dates.reduce((a, b) => (a < b ? a : b)) +} + +function max(dates: Date[]): Date | null { + if (dates.length === 0) return null + return dates.reduce((a, b) => (a > b ? a : b)) +} diff --git a/src/metrics/types.ts b/src/metrics/types.ts new file mode 100644 index 0000000..7cf2649 --- /dev/null +++ b/src/metrics/types.ts @@ -0,0 +1,61 @@ +import { z } from 'zod' + +// Supported units — extensible via union expansion, not a free string, +// so dashboards and aggregation logic can make unit-aware decisions. +export const MetricUnitSchema = z.enum([ + 'ms', // milliseconds (latency, duration) + 'bytes', // memory, payload size + 'count', // events, items + 'percent', // utilisation, error rate + 'requests_per_sec', // throughput + 'errors_per_sec', // error rate + 'seconds', // coarse durations +]) + +export type MetricUnit = z.infer + +// Labels are string→string pairs for slicing/dicing — same model as +// Prometheus labels. Kept flat so ring-buffer serialisation stays cheap. +export const MetricLabelsSchema = z.record(z.string(), z.string()) +export type MetricLabels = z.infer + +export const MetricSampleSchema = z.object({ + metric_name: z.string().min(1).max(255), + value: z.number().finite(), + unit: MetricUnitSchema, + timestamp: z.date(), + labels: MetricLabelsSchema.default({}), +}) + +export type MetricSample = z.infer + +// Input type — timestamp defaults to now when omitted +export const MetricSampleInputSchema = MetricSampleSchema.extend({ + timestamp: z.date().optional(), +}) + +export type MetricSampleInput = z.infer + +// Query filters passed to MetricStore.query() +export interface MetricQuery { + metric_name?: string + /** inclusive lower bound */ + from?: Date + /** inclusive upper bound */ + to?: Date + /** all provided labels must match exactly */ + labels?: MetricLabels + /** max samples to return, applied after filtering, newest-first */ + limit?: number +} + +export type AggregateFunction = 'avg' | 'min' | 'max' | 'sum' | 'count' | 'p95' | 'p99' + +export interface AggregateResult { + metric_name: string + fn: AggregateFunction + value: number + sample_count: number + from: Date | null + to: Date | null +}