diff --git a/packages/app/src/index.ts b/packages/app/src/index.ts index a212689c..8272d046 100644 --- a/packages/app/src/index.ts +++ b/packages/app/src/index.ts @@ -39,7 +39,7 @@ import {Pool, SocketEvent, isRelayEvent, netContext} from "@welshman/net" import {pubkey, unwrapAndStore} from "./session.js" import {repository, tracker} from "./core.js" import {getRelays, loadRelay} from "./relays.js" -import {trackRelayStats, getRelayQuality} from "./relayStats.js" +import {trackRelayStats, getRelayQuality, getRelayPrior} from "./relayStats.js" import {deriveRelayList, getRelayList} from "./relayLists.js" import {deriveBlockedRelayList, getBlockedRelayList} from "./blockedRelayLists.js" import {deriveMessagingRelayList, getMessagingRelayList} from "./messagingRelayLists.js" @@ -103,6 +103,7 @@ export const derivePubkeyRelays = (pubkey: string, mode?: RelayMode) => { routerContext.getUserPubkey = () => pubkey.get() routerContext.getPubkeyRelays = getPubkeyRelays routerContext.getRelayQuality = getRelayQuality +routerContext.getRelayPrior = getRelayPrior routerContext.getDefaultRelays = _relayGetter() routerContext.getIndexerRelays = _relayGetter() routerContext.getSearchRelays = _relayGetter(r => r?.supported_nips?.includes?.("50")) diff --git a/packages/app/src/relayStats.ts b/packages/app/src/relayStats.ts index ae87743a..1b6bada0 100644 --- a/packages/app/src/relayStats.ts +++ b/packages/app/src/relayStats.ts @@ -26,6 +26,9 @@ export type RelayStats = { publish_failure_count: number eose_count: number notice_count: number + alpha?: number + beta?: number + last_delivery_update?: number } export const makeRelayStats = (url: string): RelayStats => ({ @@ -105,6 +108,71 @@ export const getRelayQuality = (url: string) => { return 0.7 } +// Thompson Sampling priors for relay delivery scoring + +const THOMPSON_DECAY = 0.95 +const THOMPSON_DECAY_INTERVAL = HOUR + +/** Sanitize a stored prior param: non-finite or non-positive values reset to 1 (uniform). */ +const sanitizePrior = (value: number | undefined) => { + if (value == null || !Number.isFinite(value) || value <= 0) return 1 + return value +} + +/** Decay raw stored alpha/beta to their effective values at the current time. */ +const decayPrior = (stats: RelayStats) => { + const elapsed = now() - (stats.last_delivery_update ?? stats.first_seen) + const intervals = elapsed / THOMPSON_DECAY_INTERVAL + const decay = Math.pow(THOMPSON_DECAY, intervals) + return { + alpha: 1 + (sanitizePrior(stats.alpha) - 1) * decay, + beta: 1 + (sanitizePrior(stats.beta) - 1) * decay, + } +} + +/** + * Retrieve Thompson Sampling priors for a relay (global per-relay, not per-pubkey). + * Returns undefined when no delivery data exists, letting the Router fall back to Math.random(). + * Applies time-based exponential decay to prevent prior ossification. + */ +export const getRelayPrior = (url: string) => { + const stats = getRelayStats(url) + if (!stats?.alpha || !stats?.beta) return undefined + + const {alpha, beta} = decayPrior(stats) + + if (!Number.isFinite(alpha) || !Number.isFinite(beta) || alpha <= 0 || beta <= 0) return undefined + if (alpha < 1.01 && beta < 1.01) return undefined + + return {alpha, beta} +} + +/** + * Record relay delivery outcome for Thompson Sampling. + * Call after observing how many events a relay delivered vs expected. + * Priors are global per-relay — the Router's scoreRelay doesn't know + * the pubkey context, so this captures "is this relay reliable?" overall. + * @param url - Relay URL + * @param delivered - Number of events the relay returned (>= 0) + * @param expected - Number of events expected (from baseline or other relays) + */ +export const recordRelayDelivery = (url: string, delivered: number, expected: number) => { + if (expected <= 0 || !Number.isFinite(delivered) || !Number.isFinite(expected) || delivered < 0) + return + const fraction = Math.min(delivered / expected, 1) + updateRelayStats([ + url, + stats => { + // Decay stored priors before adding new observation so that stale + // values don't snap back after idle periods. + const decayed = decayPrior(stats) + stats.alpha = decayed.alpha + fraction + stats.beta = decayed.beta + (1 - fraction) + stats.last_delivery_update = now() + }, + ]) +} + // Utilities for syncing stats from connections to relays type RelayStatsUpdate = [string, (stats: RelayStats) => void] diff --git a/packages/lib/__tests__/Beta.test.ts b/packages/lib/__tests__/Beta.test.ts new file mode 100644 index 00000000..538553f8 --- /dev/null +++ b/packages/lib/__tests__/Beta.test.ts @@ -0,0 +1,162 @@ +import {describe, it, expect, vi} from "vitest" +import {sampleBeta} from "../src/Beta" + +// Mirrors the decay logic from @welshman/app relayStats.ts for integration testing +const HOUR = 3600 +const THOMPSON_DECAY = 0.95 +const THOMPSON_DECAY_INTERVAL = HOUR + +function decayPrior(alpha: number, beta: number, elapsedSeconds: number) { + const intervals = elapsedSeconds / THOMPSON_DECAY_INTERVAL + const decay = Math.pow(THOMPSON_DECAY, intervals) + return { + alpha: 1 + (alpha - 1) * decay, + beta: 1 + (beta - 1) * decay, + } +} + +describe("Beta", () => { + describe("sampleBeta", () => { + it("should return values in [0, 1] for uniform prior", () => { + for (let i = 0; i < 100; i++) { + const value = sampleBeta(1, 1) + expect(value).toBeGreaterThanOrEqual(0) + expect(value).toBeLessThanOrEqual(1) + } + }) + + it("should concentrate near 1 for strong success prior", () => { + const samples = Array.from({length: 1000}, () => sampleBeta(100, 1)) + const mean = samples.reduce((a, b) => a + b, 0) / samples.length + expect(mean).toBeGreaterThan(0.95) + }) + + it("should concentrate near 0 for strong failure prior", () => { + const samples = Array.from({length: 1000}, () => sampleBeta(1, 100)) + const mean = samples.reduce((a, b) => a + b, 0) / samples.length + expect(mean).toBeLessThan(0.05) + }) + + it("should produce mean ≈ alpha / (alpha + beta)", () => { + const alpha = 3 + const beta = 7 + const expected = alpha / (alpha + beta) // 0.3 + const samples = Array.from({length: 10000}, () => sampleBeta(alpha, beta)) + const mean = samples.reduce((a, b) => a + b, 0) / samples.length + expect(Math.abs(mean - expected)).toBeLessThan(0.05) + }) + + it("should handle very small alpha/beta without crashing", () => { + for (let i = 0; i < 100; i++) { + const value = sampleBeta(0.1, 0.1) + expect(value).toBeGreaterThanOrEqual(0) + expect(value).toBeLessThanOrEqual(1) + } + }) + + it("should throw on zero, negative, NaN, and Infinity params", () => { + expect(() => sampleBeta(0, 1)).toThrow(RangeError) + expect(() => sampleBeta(1, 0)).toThrow(RangeError) + expect(() => sampleBeta(-1, 1)).toThrow(RangeError) + expect(() => sampleBeta(1, -1)).toThrow(RangeError) + expect(() => sampleBeta(NaN, 1)).toThrow(RangeError) + expect(() => sampleBeta(1, NaN)).toThrow(RangeError) + expect(() => sampleBeta(Infinity, 1)).toThrow(RangeError) + expect(() => sampleBeta(1, Infinity)).toThrow(RangeError) + }) + + it("should produce deterministic results with seeded RNG", () => { + let seed = 42 + const seededRng = () => { + seed = (seed * 16807 + 0) % 2147483647 + return seed / 2147483647 + } + + seed = 42 + const a = sampleBeta(2, 5, seededRng) + + seed = 42 + const b = sampleBeta(2, 5, seededRng) + + expect(a).toBe(b) + }) + + it("should return rng() directly for uniform prior (fast path)", () => { + const rng = vi.fn(() => 0.42) + const result = sampleBeta(1, 1, rng) + expect(result).toBe(0.42) + expect(rng).toHaveBeenCalledTimes(1) + }) + }) + + describe("Thompson Sampling decay integration", () => { + it("should decay priors toward uniform after long idle", () => { + const storedAlpha = 100 + const storedBeta = 5 + const oneWeek = 7 * 24 * HOUR + + const {alpha, beta} = decayPrior(storedAlpha, storedBeta, oneWeek) + // After a week of decay at 0.95/hour (0.95^168 ≈ 0.00018), priors near uniform + expect(alpha).toBeLessThan(1.02) + expect(beta).toBeLessThan(1.01) + }) + + it("should preserve priors with zero elapsed time", () => { + const {alpha, beta} = decayPrior(50, 10, 0) + expect(alpha).toBe(50) + expect(beta).toBe(10) + }) + + it("decay-then-update should not restore stale confidence", () => { + // Simulate: relay had alpha=100, idle for a week, then one new success + const storedAlpha = 100 + const storedBeta = 5 + const oneWeek = 7 * 24 * HOUR + + // Correct behavior: decay first, then add observation + const decayed = decayPrior(storedAlpha, storedBeta, oneWeek) + const updatedAlpha = decayed.alpha + 1 // one success + + // Updated alpha should be near 2 (decayed ~1 + 1 success), NOT near 101 + expect(updatedAlpha).toBeLessThan(3) + expect(updatedAlpha).toBeGreaterThan(1) + // Beta should also remain near-uniform after decay + expect(decayed.beta).toBeLessThan(1.01) + }) + + it("decayed priors should produce valid sampleBeta input", () => { + // Even heavily decayed, alpha/beta stay >= 1 + const {alpha, beta} = decayPrior(1000, 1000, 100 * 24 * HOUR) + expect(alpha).toBeGreaterThanOrEqual(1) + expect(beta).toBeGreaterThanOrEqual(1) + + // Should not throw + const sample = sampleBeta(alpha, beta) + expect(sample).toBeGreaterThanOrEqual(0) + expect(sample).toBeLessThanOrEqual(1) + }) + }) + + describe("router defensive fallback contract", () => { + it("sampleBeta should throw on invalid params so router can catch", () => { + // Router wraps sampleBeta in try/catch — verify the throw contract + expect(() => sampleBeta(-1, 2)).toThrow(RangeError) + expect(() => sampleBeta(2, -1)).toThrow(RangeError) + expect(() => sampleBeta(0, 0)).toThrow(RangeError) + expect(() => sampleBeta(NaN, NaN)).toThrow(RangeError) + }) + + it("router scoring pattern should fall back on bad priors", () => { + // Simulates the exact pattern in router's scoreRelay + const badPrior = {alpha: -1, beta: 2} + let sample: number + try { + sample = sampleBeta(badPrior.alpha, badPrior.beta) + } catch { + sample = Math.random() + } + expect(sample).toBeGreaterThanOrEqual(0) + expect(sample).toBeLessThanOrEqual(1) + }) + }) +}) diff --git a/packages/lib/src/Beta.ts b/packages/lib/src/Beta.ts new file mode 100644 index 00000000..e9252688 --- /dev/null +++ b/packages/lib/src/Beta.ts @@ -0,0 +1,83 @@ +/** Clamp rng() to avoid 0 (which breaks Math.log / Math.pow). */ +const EPS = Number.MIN_VALUE + +function rngPos(rng: () => number): number { + return Math.max(rng(), EPS) +} + +/** + * Sample from a Beta(alpha, beta) distribution. + * Returns a value in [0, 1]. + * + * Uses Jöhnk's algorithm when both params < 1, gamma sampling otherwise. + * When alpha=1 and beta=1 (uniform prior), returns rng() directly. + * + * @param alpha - Shape parameter (> 0) + * @param beta - Shape parameter (> 0) + * @param rng - Random number generator returning values in (0, 1). Defaults to Math.random. + */ +export function sampleBeta(alpha: number, beta: number, rng: () => number = Math.random): number { + if (!Number.isFinite(alpha) || !Number.isFinite(beta) || alpha <= 0 || beta <= 0) { + throw new RangeError( + `sampleBeta requires alpha > 0 and beta > 0, got alpha=${alpha}, beta=${beta}`, + ) + } + + // For alpha=1, beta=1 (uniform prior), just return rng() + if (alpha === 1 && beta === 1) return rng() + + // Jöhnk's algorithm for both params < 1 + if (alpha < 1 && beta < 1) { + while (true) { + const u = rngPos(rng) + const v = rngPos(rng) + const x = Math.pow(u, 1 / alpha) + const y = Math.pow(v, 1 / beta) + if (x + y <= 1) { + if (x + y > 0) return x / (x + y) + // Handle underflow by taking logs + const logX = Math.log(u) / alpha + const logY = Math.log(v) / beta + const logM = logX > logY ? logX : logY + return Math.exp(logX - logM) / (Math.exp(logX - logM) + Math.exp(logY - logM)) + } + } + } + + // For larger alpha/beta, use gamma sampling approach + const x = sampleGamma(alpha, rng) + const y = sampleGamma(beta, rng) + return x / (x + y) +} + +/** + * Sample from a Gamma(shape, 1) distribution using Marsaglia and Tsang's method. + */ +function sampleGamma(shape: number, rng: () => number): number { + if (!Number.isFinite(shape) || shape <= 0) { + throw new RangeError(`sampleGamma requires shape > 0, got shape=${shape}`) + } + + if (shape < 1) { + // Boost: Gamma(shape) = Gamma(shape+1) * U^(1/shape) + return sampleGamma(shape + 1, rng) * Math.pow(rngPos(rng), 1 / shape) + } + + const d = shape - 1 / 3 + const c = 1 / Math.sqrt(9 * d) + + while (true) { + let x: number + let v: number + do { + // Box-Muller for normal sample + x = Math.sqrt(-2 * Math.log(rngPos(rng))) * Math.cos(2 * Math.PI * rng()) + v = 1 + c * x + } while (v <= 0) + + v = v * v * v + const u = rng() + if (u < 1 - 0.0331 * (x * x) * (x * x)) return d * v + if (Math.log(u) < 0.5 * x * x + d * (1 - v + Math.log(v))) return d * v + } +} diff --git a/packages/lib/src/index.ts b/packages/lib/src/index.ts index a1f86b35..ff9e01d8 100644 --- a/packages/lib/src/index.ts +++ b/packages/lib/src/index.ts @@ -1,3 +1,4 @@ +export * from "./Beta.js" export * from "./Deferred.js" export * from "./Emitter.js" export * from "./LRUCache.js" diff --git a/packages/router/src/index.ts b/packages/router/src/index.ts index a9c507e2..ba26cea5 100644 --- a/packages/router/src/index.ts +++ b/packages/router/src/index.ts @@ -12,6 +12,7 @@ import { add, take, chunks, + sampleBeta, } from "@welshman/lib" import { getFilterId, @@ -84,6 +85,14 @@ export type RouterOptions = { */ getRelayQuality?: (url: string) => number + /** + * Retrieves Thompson Sampling priors for a relay (global per-relay, not per-pubkey). + * When provided, the Router uses Beta-distributed sampling instead of + * uniform random, biasing toward relays with better delivery history. + * Return undefined for relays with no history (falls back to Math.random). + */ + getRelayPrior?: (url: string) => {alpha: number; beta: number} | undefined + /** * Retrieves the limit setting, which is the maximum number of relays that should be * returned from getUrls and getSelections. @@ -310,7 +319,7 @@ export class RouterScenario { const limit = this.getLimit() const fallbackPolicy = this.getPolicy() const relayWeights = new Map() - const {getRelayQuality} = this.router.options + const {getRelayQuality, getRelayPrior} = this.router.options const {allowOnion, allowLocal, allowInsecure} = this.options for (const {weight, relays} of this.selections) { @@ -327,11 +336,18 @@ export class RouterScenario { const scoreRelay = (relay: string) => { const weight = relayWeights.get(relay)! const quality = getRelayQuality ? getRelayQuality(relay) : 1 + let sample: number + try { + const prior = getRelayPrior?.(relay) + sample = prior ? sampleBeta(prior.alpha, prior.beta) : Math.random() + } catch { + sample = Math.random() + } // Log the weight, since it's a straight count which ends up over-weighting hubs. - // Also add some random noise so that we'll occasionally pick lower quality/less - // popular relays. - return -(quality * inc(Math.log(weight)) * Math.random()) + // When delivery priors exist, Beta sampling biases toward relays that actually + // delivered events; otherwise uniform random provides exploration. + return -(quality * inc(Math.log(weight)) * sample) } const relays = take(