diff --git a/hippocortex/__init__.py b/hippocortex/__init__.py index ffd8282..f56a346 100644 --- a/hippocortex/__init__.py +++ b/hippocortex/__init__.py @@ -1,33 +1,63 @@ from __future__ import annotations import logging -from dataclasses import dataclass +from dataclasses import dataclass, field +from time import perf_counter from hippocortex.config import HippoConfig from hippocortex.embedders.base import Embedder from hippocortex.embedders.dummy_embedder import DummyEmbedder from hippocortex.hippo.episodic_store import SQLiteEpisodicStore +from hippocortex.observability import configure_json_logger from hippocortex.registry import get_consolidation_strategy, get_router_strategy, get_storage_backend, register_defaults +from hippocortex.telemetry import NoOpTelemetry, Telemetry from hippocortex.types import ConsolidationOutput, ContextPack from hippocortex.working_memory import WorkingMemory logger = logging.getLogger(__name__) +configure_json_logger(logger) class CortexAPI: def __init__(self, sdk: "HippoCortex") -> None: self._sdk = sdk - def search(self, agent_id: str, query: str, k: int = 5, filters: dict | None = None): + def search( + self, + agent_id: str, + query: str, + k: int = 5, + filters: dict | None = None, + session_id: str | None = None, + request_id: str | None = None, + ): + started = perf_counter() vector = self._sdk.embedder.embed_text(query) - return self._sdk.semantic_store.search(agent_id=agent_id, query_vector=vector, k=k, filters=filters) + hits = self._sdk.semantic_store.search(agent_id=agent_id, query_vector=vector, k=k, filters=filters) + duration_ms = (perf_counter() - started) * 1000 + hit_rate = len(hits) / k if k > 0 else 0.0 + tags = {"agent_id": agent_id} + self._sdk.telemetry.observe("hippocortex.search.duration_ms", duration_ms, tags=tags) + self._sdk.telemetry.observe("hippocortex.search.hit_rate", hit_rate, tags=tags) + logger.info( + "search_completed", + extra={ + "agent_id": agent_id, + "session_id": session_id, + "request_id": request_id, + "duration_ms": duration_ms, + "hit_rate": hit_rate, + }, + ) + return hits @dataclass class HippoCortex: config: HippoConfig embedder: Embedder + telemetry: Telemetry = field(default_factory=NoOpTelemetry) def __post_init__(self) -> None: register_defaults() @@ -40,7 +70,7 @@ def __post_init__(self) -> None: f"model.embedding_dim {self.config.model.embedding_dim}" ) - self.hippo = SQLiteEpisodicStore(self.config.storage.db_path) + self.hippo = SQLiteEpisodicStore(self.config.storage.db_path, telemetry=self.telemetry) semantic_backend = self.config.storage.semantic_store_backend.lower() self.semantic_store = get_storage_backend(semantic_backend, self.config, self.embedder.dimension) self.cortex = CortexAPI(self) @@ -56,22 +86,64 @@ def default(cls, config: HippoConfig | None = None, embedder: Embedder | None = emb = embedder or DummyEmbedder(dimension=cfg.model.embedding_dim) return cls(config=cfg, embedder=emb) - def consolidate(self, agent_id: str, session_id: str | None = None, strategy: str = "replay_v1") -> ConsolidationOutput: + def consolidate( + self, + agent_id: str, + session_id: str | None = None, + strategy: str = "replay_v1", + request_id: str | None = None, + ) -> ConsolidationOutput: + started = perf_counter() consolidator = self._consolidators.get(strategy) if consolidator is None: consolidator = get_consolidation_strategy(strategy, self.config) self._consolidators[strategy] = consolidator episodes = consolidator.select_episodes(self.hippo, agent_id=agent_id, session_id=session_id) - return consolidator.run(agent_id=agent_id, episodes=episodes, embedder=self.embedder, semantic_store=self.semantic_store) - - def build_context(self, agent_id: str, session_id: str, user_message: str, max_tokens: int) -> ContextPack: + output = consolidator.run(agent_id=agent_id, episodes=episodes, embedder=self.embedder, semantic_store=self.semantic_store) + duration_ms = (perf_counter() - started) * 1000 + self.telemetry.observe("hippocortex.consolidation.duration_ms", duration_ms, tags={"agent_id": agent_id, "strategy": strategy}) + logger.info( + "consolidation_completed", + extra={ + "agent_id": agent_id, + "session_id": session_id, + "request_id": request_id, + "duration_ms": duration_ms, + }, + ) + return output + + def build_context( + self, + agent_id: str, + session_id: str, + user_message: str, + max_tokens: int, + request_id: str | None = None, + ) -> ContextPack: decision = self.router.route(user_message=user_message, max_tokens=max_tokens) + logger.info( + "router_decision", + extra={ + "agent_id": agent_id, + "session_id": session_id, + "request_id": request_id, + "intent": decision.intent, + "explain": decision.explain, + }, + ) recent_events = self.hippo.list_events(agent_id=agent_id, session_id=session_id, limit=self.config.runtime.working_memory_turns) selected_recent = self.working_memory.select_recent(recent_events, token_budget=decision.working_memory_tokens) semantic_notes = [] if decision.intent in {"semantic", "hybrid"}: - semantic_notes = self.cortex.search(agent_id=agent_id, query=user_message, k=5) + semantic_notes = self.cortex.search( + agent_id=agent_id, + session_id=session_id, + query=user_message, + k=5, + request_id=request_id, + ) highlights = [] if decision.include_highlights: diff --git a/hippocortex/hippo/episodic_store.py b/hippocortex/hippo/episodic_store.py index 75e5174..a198b5a 100644 --- a/hippocortex/hippo/episodic_store.py +++ b/hippocortex/hippo/episodic_store.py @@ -1,16 +1,22 @@ from __future__ import annotations import json +import logging import sqlite3 from datetime import datetime from pathlib import Path +from hippocortex.observability import configure_json_logger +from hippocortex.telemetry import NoOpTelemetry, Telemetry from hippocortex.types import EventRecord class SQLiteEpisodicStore: - def __init__(self, db_path: str) -> None: + def __init__(self, db_path: str, telemetry: Telemetry | None = None) -> None: self.db_path = db_path + self.telemetry = telemetry or NoOpTelemetry() + self.logger = logging.getLogger("hippocortex.observability") + configure_json_logger(self.logger) Path(db_path).parent.mkdir(parents=True, exist_ok=True) if Path(db_path).parent != Path(".") else None self._init_db() @@ -48,6 +54,7 @@ def add_event( content: str, metadata: dict | None = None, importance: float = 0.5, + request_id: str | None = None, ) -> EventRecord: event = EventRecord( agent_id=agent_id, @@ -76,6 +83,17 @@ def add_event( ) conn.commit() event.id = int(cur.lastrowid) + self.telemetry.increment("hippocortex.events.write", value=1, tags={"agent_id": agent_id, "session_id": session_id}) + self.logger.info( + "event_written", + extra={ + "agent_id": agent_id, + "session_id": session_id, + "request_id": request_id, + "metric": "hippocortex.events.write", + "value": 1, + }, + ) return event def list_events(self, agent_id: str, session_id: str | None = None, limit: int = 50) -> list[EventRecord]: diff --git a/hippocortex/observability.py b/hippocortex/observability.py new file mode 100644 index 0000000..e1bcccc --- /dev/null +++ b/hippocortex/observability.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +import json +import logging +from datetime import datetime, timezone + + +class JsonFormatter(logging.Formatter): + def format(self, record: logging.LogRecord) -> str: + payload = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "level": record.levelname, + "logger": record.name, + "message": record.getMessage(), + "agent_id": getattr(record, "agent_id", None), + "session_id": getattr(record, "session_id", None), + "request_id": getattr(record, "request_id", None), + } + for key in ("intent", "metric", "value", "duration_ms", "hit_rate", "explain"): + if hasattr(record, key): + payload[key] = getattr(record, key) + return json.dumps(payload, ensure_ascii=False) + + +def configure_json_logger(logger: logging.Logger) -> None: + if logger.handlers: + return + handler = logging.StreamHandler() + handler.setFormatter(JsonFormatter()) + logger.addHandler(handler) + logger.setLevel(logging.INFO) + logger.propagate = False + diff --git a/hippocortex/router.py b/hippocortex/router.py index 4d2cd29..37af8a8 100644 --- a/hippocortex/router.py +++ b/hippocortex/router.py @@ -9,8 +9,10 @@ def route(self, user_message: str, max_tokens: int) -> RoutingDecision: semantic_keywords = ["remember", "history", "why", "pattern", "summarize", "context"] episodic_keywords = ["just now", "latest", "last message", "session"] - semantic_score = sum(1 for k in semantic_keywords if k in lowered) - episodic_score = sum(1 for k in episodic_keywords if k in lowered) + matched_semantic = [k for k in semantic_keywords if k in lowered] + matched_episodic = [k for k in episodic_keywords if k in lowered] + semantic_score = len(matched_semantic) + episodic_score = len(matched_episodic) if semantic_score > episodic_score: intent = "semantic" @@ -26,4 +28,15 @@ def route(self, user_message: str, max_tokens: int) -> RoutingDecision: working_memory_tokens=wm_tokens, semantic_tokens=semantic_tokens, include_highlights=intent != "semantic", + explain={ + "matched_keywords": { + "semantic": matched_semantic, + "episodic": matched_episodic, + }, + "scores": {"semantic": semantic_score, "episodic": episodic_score}, + "budget_allocation": { + "working_memory_tokens": wm_tokens, + "semantic_tokens": semantic_tokens, + }, + }, ) diff --git a/hippocortex/telemetry.py b/hippocortex/telemetry.py new file mode 100644 index 0000000..ffd1ea0 --- /dev/null +++ b/hippocortex/telemetry.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Protocol + + +class Telemetry(Protocol): + def increment(self, metric: str, value: int = 1, tags: dict[str, str] | None = None) -> None: ... + + def observe(self, metric: str, value: float, tags: dict[str, str] | None = None) -> None: ... + + +@dataclass +class NoOpTelemetry: + """Default telemetry backend that safely drops all metrics.""" + + def increment(self, metric: str, value: int = 1, tags: dict[str, str] | None = None) -> None: + return None + + def observe(self, metric: str, value: float, tags: dict[str, str] | None = None) -> None: + return None + + +@dataclass +class InMemoryTelemetry: + """Testing helper backend that records every metric call.""" + + counters: list[tuple[str, int, dict[str, str]]] = field(default_factory=list) + observations: list[tuple[str, float, dict[str, str]]] = field(default_factory=list) + + def increment(self, metric: str, value: int = 1, tags: dict[str, str] | None = None) -> None: + self.counters.append((metric, value, tags or {})) + + def observe(self, metric: str, value: float, tags: dict[str, str] | None = None) -> None: + self.observations.append((metric, value, tags or {})) + diff --git a/hippocortex/types.py b/hippocortex/types.py index cec01da..1a7db85 100644 --- a/hippocortex/types.py +++ b/hippocortex/types.py @@ -65,6 +65,7 @@ class RoutingDecision(BaseModel): working_memory_tokens: int semantic_tokens: int include_highlights: bool = True + explain: dict[str, Any] = Field(default_factory=dict) class ConsolidationOutput(BaseModel): diff --git a/tests/test_observability.py b/tests/test_observability.py new file mode 100644 index 0000000..edaff0a --- /dev/null +++ b/tests/test_observability.py @@ -0,0 +1,77 @@ +import logging + +from hippocortex import DummyEmbedder, HippoConfig, HippoCortex +from hippocortex.config import ModelConfig, StorageConfig +from hippocortex.telemetry import InMemoryTelemetry + + +def _build_sdk(tmp_path): + db_path = tmp_path / "memory.db" + telemetry = InMemoryTelemetry() + sdk = HippoCortex.default( + config=HippoConfig( + storage=StorageConfig(db_path=str(db_path)), + model=ModelConfig(embedding_dim=8), + ), + embedder=DummyEmbedder(dimension=8), + ) + sdk.telemetry = telemetry + sdk.hippo.telemetry = telemetry + return sdk, telemetry + + +def test_router_explain_contains_keywords_and_budget(): + decision = HippoCortex.default().router.route("summarize latest history", max_tokens=100) + assert "matched_keywords" in decision.explain + assert "budget_allocation" in decision.explain + + +def test_observability_logs_include_required_fields(tmp_path): + sdk, _ = _build_sdk(tmp_path) + records = [] + + class CaptureHandler(logging.Handler): + def emit(self, record): + records.append(record) + + handler = CaptureHandler() + logger = logging.getLogger("hippocortex") + obs_logger = logging.getLogger("hippocortex.observability") + logger.addHandler(handler) + obs_logger.addHandler(handler) + + try: + sdk.hippo.add_event("agent-1", "session-1", "user", "I like tea", request_id="req-1") + sdk.consolidate(agent_id="agent-1", session_id="session-1", request_id="req-1") + sdk.build_context( + agent_id="agent-1", + session_id="session-1", + user_message="summarize history", + max_tokens=100, + request_id="req-1", + ) + finally: + logger.removeHandler(handler) + obs_logger.removeHandler(handler) + + assert records + for record in records: + assert hasattr(record, "agent_id") + assert hasattr(record, "session_id") + assert hasattr(record, "request_id") + + +def test_telemetry_metrics_emitted(tmp_path): + sdk, telemetry = _build_sdk(tmp_path) + + sdk.hippo.add_event("agent-1", "session-1", "user", "I like tea", request_id="req-1") + sdk.consolidate(agent_id="agent-1", session_id="session-1", request_id="req-1") + sdk.cortex.search(agent_id="agent-1", session_id="session-1", query="tea", k=5, request_id="req-1") + + counter_names = [name for name, _, _ in telemetry.counters] + assert "hippocortex.events.write" in counter_names + + observation_names = [name for name, _, _ in telemetry.observations] + assert "hippocortex.consolidation.duration_ms" in observation_names + assert "hippocortex.search.duration_ms" in observation_names + assert "hippocortex.search.hit_rate" in observation_names