Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 81 additions & 9 deletions hippocortex/__init__.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,63 @@
from __future__ import annotations

import logging
from dataclasses import dataclass
from dataclasses import dataclass, field
from time import perf_counter

from hippocortex.config import HippoConfig
from hippocortex.embedders.base import Embedder
from hippocortex.embedders.dummy_embedder import DummyEmbedder
from hippocortex.hippo.episodic_store import SQLiteEpisodicStore
from hippocortex.observability import configure_json_logger
from hippocortex.registry import get_consolidation_strategy, get_router_strategy, get_storage_backend, register_defaults
from hippocortex.telemetry import NoOpTelemetry, Telemetry
from hippocortex.types import ConsolidationOutput, ContextPack
from hippocortex.working_memory import WorkingMemory


logger = logging.getLogger(__name__)
configure_json_logger(logger)


class CortexAPI:
def __init__(self, sdk: "HippoCortex") -> None:
self._sdk = sdk

def search(self, agent_id: str, query: str, k: int = 5, filters: dict | None = None):
def search(
self,
agent_id: str,
query: str,
k: int = 5,
filters: dict | None = None,
session_id: str | None = None,
request_id: str | None = None,
):
started = perf_counter()
vector = self._sdk.embedder.embed_text(query)
return self._sdk.semantic_store.search(agent_id=agent_id, query_vector=vector, k=k, filters=filters)
hits = self._sdk.semantic_store.search(agent_id=agent_id, query_vector=vector, k=k, filters=filters)
duration_ms = (perf_counter() - started) * 1000
hit_rate = len(hits) / k if k > 0 else 0.0
tags = {"agent_id": agent_id}
self._sdk.telemetry.observe("hippocortex.search.duration_ms", duration_ms, tags=tags)
self._sdk.telemetry.observe("hippocortex.search.hit_rate", hit_rate, tags=tags)
logger.info(
"search_completed",
extra={
"agent_id": agent_id,
"session_id": session_id,
"request_id": request_id,
"duration_ms": duration_ms,
"hit_rate": hit_rate,
},
)
return hits


@dataclass
class HippoCortex:
config: HippoConfig
embedder: Embedder
telemetry: Telemetry = field(default_factory=NoOpTelemetry)

def __post_init__(self) -> None:
register_defaults()
Expand All @@ -40,7 +70,7 @@ def __post_init__(self) -> None:
f"model.embedding_dim {self.config.model.embedding_dim}"
)

self.hippo = SQLiteEpisodicStore(self.config.storage.db_path)
self.hippo = SQLiteEpisodicStore(self.config.storage.db_path, telemetry=self.telemetry)
semantic_backend = self.config.storage.semantic_store_backend.lower()
self.semantic_store = get_storage_backend(semantic_backend, self.config, self.embedder.dimension)
self.cortex = CortexAPI(self)
Expand All @@ -56,22 +86,64 @@ def default(cls, config: HippoConfig | None = None, embedder: Embedder | None =
emb = embedder or DummyEmbedder(dimension=cfg.model.embedding_dim)
return cls(config=cfg, embedder=emb)

def consolidate(self, agent_id: str, session_id: str | None = None, strategy: str = "replay_v1") -> ConsolidationOutput:
def consolidate(
self,
agent_id: str,
session_id: str | None = None,
strategy: str = "replay_v1",
request_id: str | None = None,
) -> ConsolidationOutput:
started = perf_counter()
consolidator = self._consolidators.get(strategy)
if consolidator is None:
consolidator = get_consolidation_strategy(strategy, self.config)
self._consolidators[strategy] = consolidator
episodes = consolidator.select_episodes(self.hippo, agent_id=agent_id, session_id=session_id)
return consolidator.run(agent_id=agent_id, episodes=episodes, embedder=self.embedder, semantic_store=self.semantic_store)

def build_context(self, agent_id: str, session_id: str, user_message: str, max_tokens: int) -> ContextPack:
output = consolidator.run(agent_id=agent_id, episodes=episodes, embedder=self.embedder, semantic_store=self.semantic_store)
duration_ms = (perf_counter() - started) * 1000
self.telemetry.observe("hippocortex.consolidation.duration_ms", duration_ms, tags={"agent_id": agent_id, "strategy": strategy})
logger.info(
"consolidation_completed",
extra={
"agent_id": agent_id,
"session_id": session_id,
"request_id": request_id,
"duration_ms": duration_ms,
},
)
return output

def build_context(
self,
agent_id: str,
session_id: str,
user_message: str,
max_tokens: int,
request_id: str | None = None,
) -> ContextPack:
decision = self.router.route(user_message=user_message, max_tokens=max_tokens)
logger.info(
"router_decision",
extra={
"agent_id": agent_id,
"session_id": session_id,
"request_id": request_id,
"intent": decision.intent,
"explain": decision.explain,
},
)
recent_events = self.hippo.list_events(agent_id=agent_id, session_id=session_id, limit=self.config.runtime.working_memory_turns)
selected_recent = self.working_memory.select_recent(recent_events, token_budget=decision.working_memory_tokens)

semantic_notes = []
if decision.intent in {"semantic", "hybrid"}:
semantic_notes = self.cortex.search(agent_id=agent_id, query=user_message, k=5)
semantic_notes = self.cortex.search(
agent_id=agent_id,
session_id=session_id,
query=user_message,
k=5,
request_id=request_id,
)

highlights = []
if decision.include_highlights:
Expand Down
20 changes: 19 additions & 1 deletion hippocortex/hippo/episodic_store.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
from __future__ import annotations

import json
import logging
import sqlite3
from datetime import datetime
from pathlib import Path

from hippocortex.observability import configure_json_logger
from hippocortex.telemetry import NoOpTelemetry, Telemetry
from hippocortex.types import EventRecord


class SQLiteEpisodicStore:
def __init__(self, db_path: str) -> None:
def __init__(self, db_path: str, telemetry: Telemetry | None = None) -> None:
self.db_path = db_path
self.telemetry = telemetry or NoOpTelemetry()
self.logger = logging.getLogger("hippocortex.observability")
configure_json_logger(self.logger)
Path(db_path).parent.mkdir(parents=True, exist_ok=True) if Path(db_path).parent != Path(".") else None
self._init_db()

Expand Down Expand Up @@ -48,6 +54,7 @@ def add_event(
content: str,
metadata: dict | None = None,
importance: float = 0.5,
request_id: str | None = None,
) -> EventRecord:
event = EventRecord(
agent_id=agent_id,
Expand Down Expand Up @@ -76,6 +83,17 @@ def add_event(
)
conn.commit()
event.id = int(cur.lastrowid)
self.telemetry.increment("hippocortex.events.write", value=1, tags={"agent_id": agent_id, "session_id": session_id})
self.logger.info(
"event_written",
extra={
"agent_id": agent_id,
"session_id": session_id,
"request_id": request_id,
"metric": "hippocortex.events.write",
"value": 1,
},
)
return event

def list_events(self, agent_id: str, session_id: str | None = None, limit: int = 50) -> list[EventRecord]:
Expand Down
33 changes: 33 additions & 0 deletions hippocortex/observability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from __future__ import annotations

import json
import logging
from datetime import datetime, timezone


class JsonFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
payload = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"agent_id": getattr(record, "agent_id", None),
"session_id": getattr(record, "session_id", None),
"request_id": getattr(record, "request_id", None),
}
for key in ("intent", "metric", "value", "duration_ms", "hit_rate", "explain"):
if hasattr(record, key):
payload[key] = getattr(record, key)
return json.dumps(payload, ensure_ascii=False)


def configure_json_logger(logger: logging.Logger) -> None:
if logger.handlers:
return
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger.propagate = False

17 changes: 15 additions & 2 deletions hippocortex/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ def route(self, user_message: str, max_tokens: int) -> RoutingDecision:
semantic_keywords = ["remember", "history", "why", "pattern", "summarize", "context"]
episodic_keywords = ["just now", "latest", "last message", "session"]

semantic_score = sum(1 for k in semantic_keywords if k in lowered)
episodic_score = sum(1 for k in episodic_keywords if k in lowered)
matched_semantic = [k for k in semantic_keywords if k in lowered]
matched_episodic = [k for k in episodic_keywords if k in lowered]
semantic_score = len(matched_semantic)
episodic_score = len(matched_episodic)

if semantic_score > episodic_score:
intent = "semantic"
Expand All @@ -26,4 +28,15 @@ def route(self, user_message: str, max_tokens: int) -> RoutingDecision:
working_memory_tokens=wm_tokens,
semantic_tokens=semantic_tokens,
include_highlights=intent != "semantic",
explain={
"matched_keywords": {
"semantic": matched_semantic,
"episodic": matched_episodic,
},
"scores": {"semantic": semantic_score, "episodic": episodic_score},
"budget_allocation": {
"working_memory_tokens": wm_tokens,
"semantic_tokens": semantic_tokens,
},
},
)
36 changes: 36 additions & 0 deletions hippocortex/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Protocol


class Telemetry(Protocol):
def increment(self, metric: str, value: int = 1, tags: dict[str, str] | None = None) -> None: ...

def observe(self, metric: str, value: float, tags: dict[str, str] | None = None) -> None: ...


@dataclass
class NoOpTelemetry:
"""Default telemetry backend that safely drops all metrics."""

def increment(self, metric: str, value: int = 1, tags: dict[str, str] | None = None) -> None:
return None

def observe(self, metric: str, value: float, tags: dict[str, str] | None = None) -> None:
return None


@dataclass
class InMemoryTelemetry:
"""Testing helper backend that records every metric call."""

counters: list[tuple[str, int, dict[str, str]]] = field(default_factory=list)
observations: list[tuple[str, float, dict[str, str]]] = field(default_factory=list)

def increment(self, metric: str, value: int = 1, tags: dict[str, str] | None = None) -> None:
self.counters.append((metric, value, tags or {}))

def observe(self, metric: str, value: float, tags: dict[str, str] | None = None) -> None:
self.observations.append((metric, value, tags or {}))

1 change: 1 addition & 0 deletions hippocortex/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class RoutingDecision(BaseModel):
working_memory_tokens: int
semantic_tokens: int
include_highlights: bool = True
explain: dict[str, Any] = Field(default_factory=dict)


class ConsolidationOutput(BaseModel):
Expand Down
77 changes: 77 additions & 0 deletions tests/test_observability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import logging

from hippocortex import DummyEmbedder, HippoConfig, HippoCortex
from hippocortex.config import ModelConfig, StorageConfig
from hippocortex.telemetry import InMemoryTelemetry


def _build_sdk(tmp_path):
db_path = tmp_path / "memory.db"
telemetry = InMemoryTelemetry()
sdk = HippoCortex.default(
config=HippoConfig(
storage=StorageConfig(db_path=str(db_path)),
model=ModelConfig(embedding_dim=8),
),
embedder=DummyEmbedder(dimension=8),
)
sdk.telemetry = telemetry
sdk.hippo.telemetry = telemetry
return sdk, telemetry


def test_router_explain_contains_keywords_and_budget():
decision = HippoCortex.default().router.route("summarize latest history", max_tokens=100)
assert "matched_keywords" in decision.explain
assert "budget_allocation" in decision.explain


def test_observability_logs_include_required_fields(tmp_path):
sdk, _ = _build_sdk(tmp_path)
records = []

class CaptureHandler(logging.Handler):
def emit(self, record):
records.append(record)

handler = CaptureHandler()
logger = logging.getLogger("hippocortex")
obs_logger = logging.getLogger("hippocortex.observability")
logger.addHandler(handler)
obs_logger.addHandler(handler)

try:
sdk.hippo.add_event("agent-1", "session-1", "user", "I like tea", request_id="req-1")
sdk.consolidate(agent_id="agent-1", session_id="session-1", request_id="req-1")
sdk.build_context(
agent_id="agent-1",
session_id="session-1",
user_message="summarize history",
max_tokens=100,
request_id="req-1",
)
finally:
logger.removeHandler(handler)
obs_logger.removeHandler(handler)

assert records
for record in records:
assert hasattr(record, "agent_id")
assert hasattr(record, "session_id")
assert hasattr(record, "request_id")


def test_telemetry_metrics_emitted(tmp_path):
sdk, telemetry = _build_sdk(tmp_path)

sdk.hippo.add_event("agent-1", "session-1", "user", "I like tea", request_id="req-1")
sdk.consolidate(agent_id="agent-1", session_id="session-1", request_id="req-1")
sdk.cortex.search(agent_id="agent-1", session_id="session-1", query="tea", k=5, request_id="req-1")

counter_names = [name for name, _, _ in telemetry.counters]
assert "hippocortex.events.write" in counter_names

observation_names = [name for name, _, _ in telemetry.observations]
assert "hippocortex.consolidation.duration_ms" in observation_names
assert "hippocortex.search.duration_ms" in observation_names
assert "hippocortex.search.hit_rate" in observation_names