diff --git a/README.md b/README.md index 7a6c760..d7a3b7e 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,11 @@ # openclaw-superpowers -**49 ready-to-use skills that make your AI agent autonomous, self-healing, and self-improving.** +**52 ready-to-use skills that make your AI agent autonomous, self-healing, and self-improving.** -[![Skills](https://img.shields.io/badge/skills-49-blue)](#skills-included) +[![Skills](https://img.shields.io/badge/skills-52-blue)](#skills-included) [![Security](https://img.shields.io/badge/security_skills-6-green)](#security--guardrails) -[![Cron](https://img.shields.io/badge/cron_scheduled-15-orange)](#openclaw-native-33-skills) -[![Scripts](https://img.shields.io/badge/companion_scripts-20-purple)](#companion-scripts) +[![Cron](https://img.shields.io/badge/cron_scheduled-16-orange)](#openclaw-native-36-skills) +[![Scripts](https://img.shields.io/badge/companion_scripts-23-purple)](#companion-scripts) [![License: MIT](https://img.shields.io/badge/license-MIT-yellow.svg)](LICENSE) A plug-and-play skill library for [OpenClaw](https://github.com/openclaw/openclaw) — the open-source AI agent runtime. Gives your agent structured thinking, security guardrails, persistent memory, cron scheduling, self-recovery, and the ability to write its own new skills during conversation. @@ -20,13 +20,13 @@ Built for developers who want their AI agent to run autonomously 24/7, not just Most AI agent frameworks give you a chatbot that forgets everything between sessions. OpenClaw is different — it runs persistently, handles multi-hour tasks, and has native cron scheduling. But out of the box, it doesn't know *how* to use those capabilities well. -**openclaw-superpowers bridges that gap.** Install 49 skills in one command, and your agent immediately knows how to: +**openclaw-superpowers bridges that gap.** Install 52 skills in one command, and your agent immediately knows how to: - **Think before it acts** — brainstorming, planning, and systematic debugging skills prevent the "dive in and break things" failure mode - **Protect itself** — 6 security skills detect prompt injection, block dangerous actions, audit installed code, and scan for leaked credentials - **Run unattended** — 12 cron-scheduled skills handle memory cleanup, health checks, budget tracking, and community monitoring while you sleep - **Recover from failures** — self-recovery, loop-breaking, and task handoff skills keep long-running work alive across crashes and restarts -- **Never forget** — DAG-based memory compaction, integrity checking, and context scoring ensure the agent preserves critical information even in month-long conversations +- **Never forget** — DAG-based memory compaction, integrity checking, context scoring, and SQLite session persistence ensure the agent preserves critical information even in month-long conversations - **Improve itself** — the agent can write new skills during normal conversation using `create-skill`, encoding your preferences as permanent behaviors --- @@ -51,7 +51,7 @@ cd ~/.openclaw/extensions/superpowers && ./install.sh openclaw gateway restart ``` -`install.sh` symlinks all 49 skills, creates state directories for stateful skills, and registers cron jobs — everything in one step. That's it. Your agent now has superpowers. +`install.sh` symlinks all 52 skills, creates state directories for stateful skills, and registers cron jobs — everything in one step. That's it. Your agent now has superpowers. --- @@ -79,7 +79,7 @@ Methodology skills that work in any AI agent runtime. Adapted from [obra/superpo | `skill-conflict-detector` | Detects name shadowing and description-overlap conflicts between installed skills | `detect.py` | | `skill-portability-checker` | Validates OS/binary dependencies in companion scripts; catches non-portable calls | `check.py` | -### OpenClaw-Native (33 skills) +### OpenClaw-Native (36 skills) Skills that require OpenClaw's persistent runtime — cron scheduling, session state, or long-running execution. These are the skills that make a 24/7 autonomous agent actually work reliably. @@ -118,6 +118,9 @@ Skills that require OpenClaw's persistent runtime — cron scheduling, session s | `context-assembly-scorer` | Scores how well current context represents full conversation; detects blind spots | every 4h | `score.py` | | `compaction-resilience-guard` | Monitors compaction for failures; enforces normal → aggressive → deterministic fallback chain | — | `guard.py` | | `memory-integrity-checker` | Validates summary DAGs for orphans, circular refs, token inflation, broken lineage | Sundays 3am | `integrity.py` | +| `session-persistence` | Imports session transcripts into SQLite with FTS5 full-text search; queryable message history | every 15 min | `persist.py` | +| `dag-recall` | Walks the memory DAG to recall detailed context on demand — query, expand, and assemble cited answers | — | `recall.py` | +| `expansion-grant-guard` | YAML-based delegation grant ledger — scoped permission grants with token budgets and auto-expiry | — | `guard.py` | ### Community (1 skill) @@ -148,13 +151,15 @@ Six skills form a defense-in-depth security layer for autonomous agents: | Feature | openclaw-superpowers | obra/superpowers | Custom prompts | |---|---|---|---| -| Skills included | **49** | 8 | 0 | +| Skills included | **52** | 8 | 0 | | Self-modifying (agent writes new skills) | Yes | No | No | -| Cron scheduling | **15 scheduled skills** | No | No | +| Cron scheduling | **16 scheduled skills** | No | No | | Persistent state across sessions | **YAML state schemas** | No | No | | Security guardrails | **6 defense-in-depth skills** | No | No | -| Companion scripts with CLI | **20 scripts** | No | No | +| Companion scripts with CLI | **23 scripts** | No | No | | Memory graph / knowledge graph | Yes | No | No | +| SQLite session persistence + FTS5 search | Yes | No | No | +| Sub-agent recall with token-budgeted grants | Yes | No | No | | MCP server health monitoring | Yes | No | No | | API spend tracking & budget enforcement | Yes | No | No | | Community feature radar (Reddit scanning) | Yes | No | No | @@ -175,7 +180,7 @@ Six skills form a defense-in-depth security layer for autonomous agents: │ │ │ ├── SKILL.md │ │ │ └── TEMPLATE.md │ │ └── ... -│ ├── openclaw-native/ # 33 persistent-runtime skills +│ ├── openclaw-native/ # 36 persistent-runtime skills │ │ ├── memory-graph-builder/ │ │ │ ├── SKILL.md # Skill definition + YAML frontmatter │ │ │ ├── STATE_SCHEMA.yaml # State shape (committed, versioned) @@ -198,7 +203,7 @@ Six skills form a defense-in-depth security layer for autonomous agents: Skills marked with a script ship a small executable alongside their `SKILL.md`: -- **20 Python scripts** (`run.py`, `audit.py`, `check.py`, `guard.py`, `bridge.py`, `onboard.py`, `sync.py`, `doctor.py`, `loadout.py`, `governor.py`, `detect.py`, `test.py`, `radar.py`, `graph.py`, `optimize.py`, `compact.py`, `intercept.py`, `score.py`, `integrity.py`) — run directly to manipulate state, generate reports, or trigger actions. No extra dependencies; `pyyaml` is optional but recommended. +- **23 Python scripts** (`run.py`, `audit.py`, `check.py`, `guard.py`, `bridge.py`, `onboard.py`, `sync.py`, `doctor.py`, `loadout.py`, `governor.py`, `detect.py`, `test.py`, `radar.py`, `graph.py`, `optimize.py`, `compact.py`, `intercept.py`, `score.py`, `integrity.py`, `persist.py`, `recall.py`) — run directly to manipulate state, generate reports, or trigger actions. No extra dependencies; `pyyaml` is optional but recommended. - **`vet.sh`** — Pure bash scanner; runs on any system with grep. - Every script supports `--help` and `--format json`. Dry-run mode available on scripts that make changes. - See the `example-state.yaml` in each skill directory for sample state and a commented walkthrough of cron behaviour. @@ -235,3 +240,4 @@ Skills marked with a script ship a small executable alongside their `SKILL.md`: - **[openclaw/openclaw](https://github.com/openclaw/openclaw)** — the open-source AI agent runtime - **[obra/superpowers](https://github.com/obra/superpowers)** — Jesse Vincent's skills framework; core skills adapted under MIT license - **[OpenLobster](https://github.com/Neirth/OpenLobster)** — inspiration for memory graph, config encryption auditing, tool-description scoring, and MCP health monitoring +- **[lossless-claw](https://github.com/Martian-Engineering/lossless-claw)** — inspiration for DAG-based memory compaction, session persistence, sub-agent recall, and delegation grants diff --git a/skills/openclaw-native/dag-recall/SKILL.md b/skills/openclaw-native/dag-recall/SKILL.md new file mode 100644 index 0000000..4b38a94 --- /dev/null +++ b/skills/openclaw-native/dag-recall/SKILL.md @@ -0,0 +1,137 @@ +--- +name: dag-recall +version: "1.0" +category: openclaw-native +description: Walks the memory DAG to recall detailed context on demand — query, expand, and assemble cited answers from hierarchical summaries without re-reading raw transcripts. +stateful: true +--- + +# DAG Recall + +## What it does + +When the agent needs to recall something from past sessions, reading raw transcripts is expensive and often exceeds context limits. DAG Recall walks the hierarchical summary DAG built by memory-dag-compactor — starting from high-level (d2/d3) nodes, expanding into detailed (d0/d1) children — and assembles a focused, cited answer. + +Inspired by [lossless-claw](https://github.com/Martian-Engineering/lossless-claw)'s sub-agent recall pattern, where a lightweight agent fetches and expands nodes on demand rather than loading entire conversation histories. + +## When to invoke + +- When the agent asks "what did we decide about X?" or "how did we implement Y?" +- When context about a past session is needed but the transcript isn't loaded +- When searching MEMORY.md returns only high-level summaries that need expansion +- Before starting work that depends on decisions or patterns from earlier sessions + +## How to use + +```bash +python3 recall.py --query "how did we handle auth migration" # Walk DAG + assemble answer +python3 recall.py --query "deploy process" --depth 2 # Limit expansion depth +python3 recall.py --query "API keys" --top 5 # Return top 5 matching nodes +python3 recall.py --expand s-d1-003 # Expand a specific node +python3 recall.py --trace s-d0-012 # Show full ancestor chain +python3 recall.py --recent --hours 48 # Recall from recent nodes only +python3 recall.py --status # Last recall summary +python3 recall.py --format json # Machine-readable output +``` + +## Recall algorithm + +1. **Search** — FTS5 query across all DAG node summaries +2. **Rank** — Score by relevance × recency × depth (deeper = more detailed = higher score for recall) +3. **Expand** — For each top-N match, walk to children (lower depth = more detail) +4. **Assemble** — Combine expanded content into a coherent answer with node citations +5. **Cache** — Store the assembled answer for fast re-retrieval + +### Expansion strategy + +``` +Query: "auth migration" + ↓ +d3 node: "Infrastructure & Auth overhaul Q1" (score: 0.72) + → expand d2: "Auth migration week of Feb 10" (score: 0.89) + → expand d1: "Migrated JWT signing from HS256 to RS256" (score: 0.95) + → expand d0: [raw operational detail — returned as-is] +``` + +Expansion stops when: +- Target depth reached (default: expand to d0) +- Token budget exhausted (default: 4000 tokens) +- No children exist (leaf node) + +## DAG structure expected + +Reads from `~/.openclaw/lcm-dag/` (same directory as memory-dag-compactor): + +``` +~/.openclaw/lcm-dag/ +├── index.json # Node metadata: id, depth, summary, children, created_at +├── nodes/ +│ ├── s-d0-001.md # Leaf node (operational detail) +│ ├── s-d1-001.md # Condensed summary +│ ├── s-d2-001.md # Arc summary +│ └── s-d3-001.md # Durable summary +└── fts.db # FTS5 index over node summaries +``` + +## Procedure + +**Step 1 — Query the DAG** + +```bash +python3 recall.py --query "how did we handle the database migration" +``` + +Searches the FTS5 index, ranks results, expands top matches, and assembles a cited answer: + +``` +Recall: "how did we handle the database migration" — 3 sources + + We migrated the database schema using Alembic with a blue-green + deployment strategy. The key decisions were: + + 1. Zero-downtime migration using shadow tables [s-d1-003] + 2. Rollback script tested against staging first [s-d0-012] + 3. Data backfill ran as async job over 2 hours [s-d0-015] + + Sources: + [s-d1-003] "Database migration — shadow table approach" (Feb 12) + [s-d0-012] "Alembic rollback script for users table" (Feb 12) + [s-d0-015] "Async backfill job for legacy records" (Feb 13) +``` + +**Step 2 — Expand a specific node** + +```bash +python3 recall.py --expand s-d1-003 +``` + +Shows the full content of a node and lists its children for further expansion. + +**Step 3 — Trace lineage** + +```bash +python3 recall.py --trace s-d0-012 +``` + +Shows the full ancestor chain from leaf to root, revealing how detail connects to high-level themes. + +## Integration with other skills + +- **memory-dag-compactor**: Produces the DAG that this skill reads — must be run first +- **session-persistence**: Alternative data source — recall can fall back to SQLite search when DAG nodes are insufficient +- **context-assembly-scorer**: Recall results feed into context assembly scoring +- **memory-integrity-checker**: Ensures DAG is structurally sound before recall walks it + +## State + +Recall history and cache stored in `~/.openclaw/skill-state/dag-recall/state.yaml`. + +Fields: `last_query`, `last_query_at`, `cache_size`, `total_recalls`, `recall_history`. + +## Notes + +- Uses Python's built-in `sqlite3` and `json` modules — no external dependencies +- FTS5 used for search when available; falls back to substring matching +- Token budget prevents runaway expansion on large DAGs +- Cache is LRU with configurable max size (default: 50 entries) +- If DAG doesn't exist yet, prints a helpful message pointing to memory-dag-compactor diff --git a/skills/openclaw-native/dag-recall/STATE_SCHEMA.yaml b/skills/openclaw-native/dag-recall/STATE_SCHEMA.yaml new file mode 100644 index 0000000..ecd3dd6 --- /dev/null +++ b/skills/openclaw-native/dag-recall/STATE_SCHEMA.yaml @@ -0,0 +1,26 @@ +version: "1.0" +description: Recall query history, cache stats, and expansion tracking. +fields: + last_query: + type: string + description: Most recent recall query text + last_query_at: + type: datetime + cache_size: + type: integer + description: Number of cached recall results + total_recalls: + type: integer + description: Lifetime recall count + avg_sources_per_recall: + type: number + description: Average number of DAG nodes cited per recall + recall_history: + type: list + description: Rolling log of recent recalls (last 20) + items: + query: { type: string } + recalled_at: { type: datetime } + sources_used: { type: integer } + tokens_assembled: { type: integer } + cache_hit: { type: boolean } diff --git a/skills/openclaw-native/dag-recall/example-state.yaml b/skills/openclaw-native/dag-recall/example-state.yaml new file mode 100644 index 0000000..1787893 --- /dev/null +++ b/skills/openclaw-native/dag-recall/example-state.yaml @@ -0,0 +1,62 @@ +# Example runtime state for dag-recall +last_query: "how did we handle the auth migration" +last_query_at: "2026-03-16T10:32:15.000000" +cache_size: 12 +total_recalls: 47 +avg_sources_per_recall: 3.2 +recall_history: + - query: "how did we handle the auth migration" + recalled_at: "2026-03-16T10:32:15.000000" + sources_used: 4 + tokens_assembled: 1820 + cache_hit: false + - query: "deploy process" + recalled_at: "2026-03-16T09:15:03.000000" + sources_used: 3 + tokens_assembled: 1240 + cache_hit: false + - query: "deploy process" + recalled_at: "2026-03-16T09:45:22.000000" + sources_used: 3 + tokens_assembled: 1240 + cache_hit: true +# ── Walkthrough ────────────────────────────────────────────────────────────── +# python3 recall.py --query "how did we handle the auth migration" +# +# Recall: "how did we handle the auth migration" — 4 sources +# +# [s-d1-003] (summary) Migrated JWT signing from HS256 to RS256 +# with key rotation plan... +# [s-d0-012] (detail) Created migration script for auth_keys table... +# [s-d0-015] (detail) Updated environment variables for RS256 public... +# [s-d0-018] (detail) Added rollback procedure in deploy/auth-rollback.sh... +# +# Sources: +# [s-d1-003] "Migrated JWT signing from HS256 to RS2..." (2026-02-10) +# [s-d0-012] "Created migration script for auth_keys..." (2026-02-10) +# [s-d0-015] "Updated environment variables for RS25..." (2026-02-11) +# [s-d0-018] "Added rollback procedure in deploy/aut..." (2026-02-11) +# +# python3 recall.py --trace s-d0-012 +# +# Trace: s-d0-012 → root (3 nodes) +# +# s-d0-012 (d0 — detail) +# Created migration script for auth_keys table +# Created: 2026-02-10 +# └── s-d1-003 (d1 — summary) +# JWT signing migration HS256 → RS256 +# Created: 2026-02-10 +# └── s-d2-001 (d2 — arc) +# Auth & Infrastructure overhaul Feb 2026 +# Created: 2026-02-14 +# +# python3 recall.py --status +# +# DAG Recall Status +# ────────────────────────────────────────────────── +# Last query: how did we handle the auth migration +# Last query at: 2026-03-16T10:32:15.000000 +# Total recalls: 47 +# Cache size: 12 / 50 +# DAG nodes: 24 diff --git a/skills/openclaw-native/dag-recall/recall.py b/skills/openclaw-native/dag-recall/recall.py new file mode 100755 index 0000000..2c32956 --- /dev/null +++ b/skills/openclaw-native/dag-recall/recall.py @@ -0,0 +1,599 @@ +#!/usr/bin/env python3 +"""DAG Recall — walk the memory DAG to recall detailed context on demand. + +Query, expand, and assemble cited answers from hierarchical summaries. + +Usage: + python3 recall.py --query "auth migration" # Search + expand + assemble + python3 recall.py --query "deploy" --depth 2 # Limit expansion depth + python3 recall.py --query "API" --top 5 # Top 5 matches + python3 recall.py --expand s-d1-003 # Expand a specific node + python3 recall.py --trace s-d0-012 # Ancestor chain to root + python3 recall.py --recent --hours 48 # Recent nodes only + python3 recall.py --status # Last recall summary + python3 recall.py --format json # Machine-readable output +""" + +import argparse +import json +import os +import sqlite3 +import sys +from collections import OrderedDict +from datetime import datetime, timedelta, timezone +from pathlib import Path + +# ── Paths ──────────────────────────────────────────────────────────────────── + +OPENCLAW_DIR = Path.home() / ".openclaw" +DAG_DIR = OPENCLAW_DIR / "lcm-dag" +INDEX_PATH = DAG_DIR / "index.json" +NODES_DIR = DAG_DIR / "nodes" +FTS_DB_PATH = DAG_DIR / "fts.db" +STATE_DIR = OPENCLAW_DIR / "skill-state" / "dag-recall" +STATE_PATH = STATE_DIR / "state.yaml" + +DEFAULT_TOKEN_BUDGET = 4000 +DEFAULT_TOP_N = 3 +DEFAULT_MAX_DEPTH = 0 # expand all the way to d0 +DEFAULT_CACHE_SIZE = 50 +CHARS_PER_TOKEN = 4 # rough estimate + + +# ── Index loading ──────────────────────────────────────────────────────────── + +def load_index(): + """Load the DAG index (node metadata).""" + if not INDEX_PATH.exists(): + return None + with open(INDEX_PATH) as f: + return json.load(f) + + +def load_node_content(node_id): + """Read the full content of a DAG node file.""" + node_path = NODES_DIR / f"{node_id}.md" + if not node_path.exists(): + return None + return node_path.read_text() + + +def estimate_tokens(text): + """Rough token estimate.""" + return len(text) // CHARS_PER_TOKEN + + +# ── FTS5 search ────────────────────────────────────────────────────────────── + +def search_fts(query, limit=20): + """Search DAG node summaries using FTS5.""" + if not FTS_DB_PATH.exists(): + return search_fallback(query, limit) + try: + conn = sqlite3.connect(str(FTS_DB_PATH)) + cur = conn.cursor() + # Check if FTS table exists + cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='nodes_fts'") + if not cur.fetchone(): + conn.close() + return search_fallback(query, limit) + cur.execute( + "SELECT node_id, snippet(nodes_fts, 1, '>>>', '<<<', '...', 40), rank " + "FROM nodes_fts WHERE nodes_fts MATCH ? ORDER BY rank LIMIT ?", + (query, limit), + ) + results = [ + {"node_id": row[0], "snippet": row[1], "score": -row[2]} + for row in cur.fetchall() + ] + conn.close() + return results + except Exception: + return search_fallback(query, limit) + + +def search_fallback(query, limit=20): + """Fallback: substring search across index summaries.""" + index = load_index() + if not index or "nodes" not in index: + return [] + terms = query.lower().split() + results = [] + for node in index["nodes"]: + summary = node.get("summary", "").lower() + score = sum(1 for t in terms if t in summary) + if score > 0: + results.append({ + "node_id": node["id"], + "snippet": node.get("summary", "")[:120], + "score": score, + }) + results.sort(key=lambda x: x["score"], reverse=True) + return results[:limit] + + +# ── Scoring ────────────────────────────────────────────────────────────────── + +def score_results(results, index): + """Re-score results factoring in depth and recency.""" + if not index or "nodes" not in index: + return results + + node_map = {n["id"]: n for n in index["nodes"]} + now = datetime.now(timezone.utc) + + for r in results: + meta = node_map.get(r["node_id"], {}) + depth = meta.get("depth", 0) + + # Deeper nodes (more detail) get bonus for recall + depth_bonus = max(0, 3 - depth) * 0.3 # d0=0.9, d1=0.6, d2=0.3, d3=0 + + # Recency bonus + recency_bonus = 0 + created = meta.get("created_at") + if created: + try: + ct = datetime.fromisoformat(created.replace("Z", "+00:00")) + days_old = (now - ct).days + recency_bonus = max(0, 1.0 - days_old / 90) # linear decay over 90 days + except Exception: + pass + + r["final_score"] = r["score"] + depth_bonus + recency_bonus + r["depth"] = depth + + results.sort(key=lambda x: x["final_score"], reverse=True) + return results + + +# ── Expansion ──────────────────────────────────────────────────────────────── + +def expand_node(node_id, index, target_depth=0, token_budget=DEFAULT_TOKEN_BUDGET): + """Expand a node by walking to its children, collecting content.""" + if not index or "nodes" not in index: + return [] + + node_map = {n["id"]: n for n in index["nodes"]} + collected = [] + tokens_used = 0 + + def walk(nid, budget_remaining): + nonlocal tokens_used + if budget_remaining <= 0: + return + + meta = node_map.get(nid) + if not meta: + return + + content = load_node_content(nid) + if not content: + content = meta.get("summary", "") + + t = estimate_tokens(content) + if t > budget_remaining: + # Truncate to fit budget + char_limit = budget_remaining * CHARS_PER_TOKEN + content = content[:char_limit] + "..." + t = budget_remaining + + collected.append({ + "node_id": nid, + "depth": meta.get("depth", 0), + "content": content, + "tokens": t, + "created_at": meta.get("created_at", ""), + }) + tokens_used += t + + # Expand children if above target depth + current_depth = meta.get("depth", 0) + if current_depth > target_depth: + children = meta.get("children", []) + for child_id in children: + if tokens_used >= token_budget: + break + walk(child_id, token_budget - tokens_used) + + walk(node_id, token_budget) + return collected + + +# ── Assembly ───────────────────────────────────────────────────────────────── + +def assemble_answer(query, expanded_nodes): + """Assemble expanded node content into a cited answer.""" + if not expanded_nodes: + return "No relevant information found in the memory DAG." + + lines = [] + sources = [] + for node in expanded_nodes: + nid = node["node_id"] + content = node["content"].strip() + depth = node["depth"] + created = node.get("created_at", "unknown")[:10] + depth_label = {0: "detail", 1: "summary", 2: "arc", 3: "durable"}.get(depth, f"d{depth}") + + lines.append(f" [{nid}] ({depth_label}) {content[:200]}") + sources.append(f" [{nid}] \"{content[:60]}...\" ({created})") + + answer = f"Recall: \"{query}\" — {len(expanded_nodes)} sources\n\n" + answer += "\n".join(lines) + answer += "\n\n Sources:\n" + answer += "\n".join(sources) + return answer + + +# ── Trace ──────────────────────────────────────────────────────────────────── + +def trace_ancestors(node_id, index): + """Walk from a node up to its ancestors (parents).""" + if not index or "nodes" not in index: + return [] + + # Build reverse parent map + parent_map = {} + for node in index["nodes"]: + for child_id in node.get("children", []): + parent_map[child_id] = node["id"] + + chain = [] + current = node_id + visited = set() + while current and current not in visited: + visited.add(current) + node_map = {n["id"]: n for n in index["nodes"]} + meta = node_map.get(current) + if not meta: + break + content = load_node_content(current) + chain.append({ + "node_id": current, + "depth": meta.get("depth", 0), + "summary": meta.get("summary", ""), + "content_preview": (content or "")[:200], + "created_at": meta.get("created_at", ""), + }) + current = parent_map.get(current) + + return chain + + +# ── Cache (LRU) ────────────────────────────────────────────────────────────── + +class RecallCache: + """Simple LRU cache for recall results.""" + + def __init__(self, max_size=DEFAULT_CACHE_SIZE): + self.max_size = max_size + self.cache = OrderedDict() + self._load() + + def _cache_path(self): + return STATE_DIR / "cache.json" + + def _load(self): + p = self._cache_path() + if p.exists(): + try: + data = json.loads(p.read_text()) + for k, v in data.items(): + self.cache[k] = v + except Exception: + pass + + def _save(self): + STATE_DIR.mkdir(parents=True, exist_ok=True) + # Keep only max_size entries + while len(self.cache) > self.max_size: + self.cache.popitem(last=False) + self._cache_path().write_text(json.dumps(dict(self.cache), indent=2)) + + def get(self, query): + key = query.lower().strip() + if key in self.cache: + self.cache.move_to_end(key) + return self.cache[key] + return None + + def put(self, query, result): + key = query.lower().strip() + self.cache[key] = result + self.cache.move_to_end(key) + self._save() + + def size(self): + return len(self.cache) + + +# ── State management ───────────────────────────────────────────────────────── + +def load_state(): + if STATE_PATH.exists(): + import re + state = {} + text = STATE_PATH.read_text() + for line in text.splitlines(): + line = line.strip() + if line.startswith("#") or not line: + continue + m = re.match(r'^(\w[\w_]*):\s*(.*)', line) + if m: + state[m.group(1)] = m.group(2).strip().strip('"') + return state + return {} + + +def save_state(state): + STATE_DIR.mkdir(parents=True, exist_ok=True) + lines = [] + for k, v in state.items(): + if isinstance(v, list): + lines.append(f"{k}:") + for item in v: + if isinstance(item, dict): + lines.append(f" - {json.dumps(item)}") + else: + lines.append(f" - {item}") + else: + lines.append(f"{k}: \"{v}\"" if isinstance(v, str) else f"{k}: {v}") + STATE_PATH.write_text("\n".join(lines) + "\n") + + +def update_state_after_recall(query, sources_used, tokens_assembled, cache_hit): + state = load_state() + now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f") + state["last_query"] = query + state["last_query_at"] = now + total = int(state.get("total_recalls", 0)) + 1 + state["total_recalls"] = str(total) + cache = RecallCache() + state["cache_size"] = str(cache.size()) + save_state(state) + + +# ── Commands ───────────────────────────────────────────────────────────────── + +def cmd_query(args): + """Search DAG, expand matches, assemble cited answer.""" + index = load_index() + if index is None: + print("DAG index not found at", INDEX_PATH) + print("Run memory-dag-compactor first: python3 compact.py --compact") + return 1 + + cache = RecallCache() + + # Check cache + if not args.no_cache: + cached = cache.get(args.query) + if cached: + print(cached["answer"]) + update_state_after_recall(args.query, cached["sources"], cached["tokens"], True) + return 0 + + # Search + results = search_fts(args.query, limit=args.top * 3) + if not results: + print(f"No results found for: \"{args.query}\"") + return 0 + + # Score + results = score_results(results, index) + top_results = results[:args.top] + + # Expand + all_expanded = [] + budget = args.token_budget + for r in top_results: + expanded = expand_node( + r["node_id"], index, + target_depth=args.depth, + token_budget=budget, + ) + for e in expanded: + budget -= e["tokens"] + all_expanded.extend(expanded) + if budget <= 0: + break + + # Deduplicate + seen = set() + unique = [] + for e in all_expanded: + if e["node_id"] not in seen: + seen.add(e["node_id"]) + unique.append(e) + + # Assemble + answer = assemble_answer(args.query, unique) + + if args.format == "json": + out = { + "query": args.query, + "sources": len(unique), + "tokens_assembled": sum(e["tokens"] for e in unique), + "nodes": unique, + } + print(json.dumps(out, indent=2)) + else: + print(answer) + + # Cache + state + total_tokens = sum(e["tokens"] for e in unique) + cache.put(args.query, {"answer": answer, "sources": len(unique), "tokens": total_tokens}) + update_state_after_recall(args.query, len(unique), total_tokens, False) + return 0 + + +def cmd_expand(args): + """Expand a specific node, showing content and children.""" + index = load_index() + if index is None: + print("DAG index not found. Run memory-dag-compactor first.") + return 1 + + node_map = {n["id"]: n for n in index.get("nodes", [])} + meta = node_map.get(args.expand) + if not meta: + print(f"Node not found: {args.expand}") + return 1 + + content = load_node_content(args.expand) + children = meta.get("children", []) + depth = meta.get("depth", 0) + + if args.format == "json": + print(json.dumps({ + "node_id": args.expand, + "depth": depth, + "summary": meta.get("summary", ""), + "content": content, + "children": children, + "created_at": meta.get("created_at", ""), + }, indent=2)) + else: + depth_label = {0: "detail", 1: "summary", 2: "arc", 3: "durable"}.get(depth, f"d{depth}") + print(f"Node: {args.expand} (depth {depth} — {depth_label})") + print(f"Created: {meta.get('created_at', 'unknown')}") + print(f"Summary: {meta.get('summary', 'n/a')}") + print(f"Children: {len(children)}") + if children: + for c in children: + cmeta = node_map.get(c, {}) + print(f" → {c} ({cmeta.get('summary', '')[:60]})") + print() + print("Content:") + print(content or "(empty)") + return 0 + + +def cmd_trace(args): + """Show full ancestor chain from node to root.""" + index = load_index() + if index is None: + print("DAG index not found. Run memory-dag-compactor first.") + return 1 + + chain = trace_ancestors(args.trace, index) + if not chain: + print(f"Node not found: {args.trace}") + return 1 + + if args.format == "json": + print(json.dumps(chain, indent=2)) + else: + print(f"Trace: {args.trace} → root ({len(chain)} nodes)") + print() + for i, node in enumerate(chain): + indent = " " * i + depth_label = {0: "detail", 1: "summary", 2: "arc", 3: "durable"}.get( + node["depth"], f"d{node['depth']}" + ) + print(f"{indent}{'└── ' if i > 0 else ''}{node['node_id']} (d{node['depth']} — {depth_label})") + print(f"{indent} {node['summary'][:80]}") + if node["created_at"]: + print(f"{indent} Created: {node['created_at'][:10]}") + return 0 + + +def cmd_recent(args): + """Show nodes created within the specified time window.""" + index = load_index() + if index is None: + print("DAG index not found. Run memory-dag-compactor first.") + return 1 + + cutoff = datetime.now(timezone.utc) - timedelta(hours=args.hours) + recent = [] + for node in index.get("nodes", []): + created = node.get("created_at") + if created: + try: + ct = datetime.fromisoformat(created.replace("Z", "+00:00")) + if ct >= cutoff: + recent.append(node) + except Exception: + pass + + recent.sort(key=lambda n: n.get("created_at", ""), reverse=True) + + if args.format == "json": + print(json.dumps(recent, indent=2)) + else: + print(f"Recent nodes (last {args.hours}h): {len(recent)}") + print() + for node in recent: + depth_label = {0: "detail", 1: "summary", 2: "arc", 3: "durable"}.get( + node.get("depth", 0), "?" + ) + print(f" {node['id']} d{node.get('depth', '?')} ({depth_label}) {node.get('created_at', '')[:16]}") + print(f" {node.get('summary', '')[:80]}") + return 0 + + +def cmd_status(args): + """Print last recall summary.""" + state = load_state() + cache = RecallCache() + + if args.format == "json": + state["cache_size"] = cache.size() + print(json.dumps(state, indent=2)) + else: + print("DAG Recall Status") + print("─" * 50) + print(f" Last query: {state.get('last_query', 'none')}") + print(f" Last query at: {state.get('last_query_at', 'never')}") + print(f" Total recalls: {state.get('total_recalls', 0)}") + print(f" Cache size: {cache.size()} / {DEFAULT_CACHE_SIZE}") + # Check if DAG exists + if INDEX_PATH.exists(): + index = load_index() + n = len(index.get("nodes", [])) if index else 0 + print(f" DAG nodes: {n}") + else: + print(f" DAG: not found ({DAG_DIR})") + return 0 + + +# ── Main ───────────────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser(description="DAG Recall — walk the memory DAG to recall context") + parser.add_argument("--query", type=str, help="Search query to recall information about") + parser.add_argument("--expand", type=str, help="Expand a specific node by ID") + parser.add_argument("--trace", type=str, help="Trace ancestor chain for a node") + parser.add_argument("--recent", action="store_true", help="Show recent nodes") + parser.add_argument("--status", action="store_true", help="Show recall status") + parser.add_argument("--depth", type=int, default=DEFAULT_MAX_DEPTH, + help=f"Target expansion depth (default: {DEFAULT_MAX_DEPTH} = expand to leaf)") + parser.add_argument("--top", type=int, default=DEFAULT_TOP_N, + help=f"Number of top results to expand (default: {DEFAULT_TOP_N})") + parser.add_argument("--token-budget", type=int, default=DEFAULT_TOKEN_BUDGET, + help=f"Max tokens to assemble (default: {DEFAULT_TOKEN_BUDGET})") + parser.add_argument("--hours", type=int, default=24, help="Hours window for --recent") + parser.add_argument("--no-cache", action="store_true", help="Skip cache lookup") + parser.add_argument("--format", choices=["text", "json"], default="text") + + args = parser.parse_args() + + if args.query: + return cmd_query(args) + elif args.expand: + return cmd_expand(args) + elif args.trace: + return cmd_trace(args) + elif args.recent: + return cmd_recent(args) + elif args.status: + return cmd_status(args) + else: + parser.print_help() + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/skills/openclaw-native/expansion-grant-guard/SKILL.md b/skills/openclaw-native/expansion-grant-guard/SKILL.md new file mode 100644 index 0000000..237064f --- /dev/null +++ b/skills/openclaw-native/expansion-grant-guard/SKILL.md @@ -0,0 +1,142 @@ +--- +name: expansion-grant-guard +version: "1.0" +category: openclaw-native +description: YAML-based delegation grant ledger — issues, validates, and tracks scoped permission grants for sub-agent expansions with token budgets and auto-expiry. +stateful: true +--- + +# Expansion Grant Guard + +## What it does + +When an agent delegates work to a sub-agent (or expands context via DAG recall), it needs a controlled way to grant scoped permissions. Expansion Grant Guard maintains a YAML-based grant ledger that issues time-limited, token-budgeted grants — ensuring sub-agent operations stay within defined boundaries. + +Inspired by [lossless-claw](https://github.com/Martian-Engineering/lossless-claw)'s delegation grant system, where a parent agent issues a signed grant specifying what a sub-agent can access, how many tokens it may consume, and when the grant expires. + +## When to invoke + +- Before any sub-agent expansion or delegation — issue a grant first +- When a sub-agent requests resources — validate the grant before proceeding +- When checking token budgets — verify remaining budget in the grant +- Periodically to clean up expired grants — auto-expiry sweep + +## How to use + +```bash +python3 guard.py --issue --scope "dag-recall" --budget 4000 --ttl 30 # Issue a grant +python3 guard.py --validate # Check if grant is valid +python3 guard.py --consume --tokens 500 # Record token usage +python3 guard.py --revoke # Revoke a grant early +python3 guard.py --list # List all active grants +python3 guard.py --sweep # Clean up expired grants +python3 guard.py --audit # Full audit log +python3 guard.py --stats # Grant statistics +python3 guard.py --status # Current status summary +python3 guard.py --format json # Machine-readable output +``` + +## Grant structure + +Each grant is a YAML entry in the ledger: + +```yaml +grant_id: "g-20260316-001" +scope: "dag-recall" # What the grant allows +issued_at: "2026-03-16T10:00:00Z" +expires_at: "2026-03-16T10:30:00Z" +token_budget: 4000 # Max tokens allowed +tokens_consumed: 1250 # Tokens used so far +status: active # active | expired | revoked | exhausted +issuer: "parent-session" +metadata: + query: "auth migration" + reason: "Recalling auth decisions for new implementation" +``` + +## Grant lifecycle + +``` +Issue → Active → { Consumed | Expired | Revoked | Exhausted } + │ │ + │ ├─ tokens_consumed < budget → still active + │ ├─ tokens_consumed >= budget → exhausted + │ ├─ now > expires_at → expired + │ └─ explicit revoke → revoked + │ + └─ Validation checks: status=active AND not expired AND budget remaining +``` + +## Procedure + +**Step 1 — Issue a grant before expansion** + +```bash +python3 guard.py --issue --scope "dag-recall" --budget 4000 --ttl 30 +``` + +Output: +``` +Grant Issued +───────────────────────────────────────────── + Grant ID: g-20260316-001 + Scope: dag-recall + Token budget: 4,000 + Expires: 2026-03-16T10:30:00Z (in 30 min) + Status: active +``` + +**Step 2 — Validate before consuming resources** + +```bash +python3 guard.py --validate g-20260316-001 +``` + +Returns status, remaining budget, and time until expiry. Non-zero exit code if invalid. + +**Step 3 — Record token consumption** + +```bash +python3 guard.py --consume g-20260316-001 --tokens 1250 +``` + +Deducts from the grant's remaining budget. Fails if exceeding budget. + +**Step 4 — Sweep expired grants** + +```bash +python3 guard.py --sweep +``` + +Marks all expired grants and cleans up the active list. + +## Scope types + +- `dag-recall` — Permission to walk DAG nodes and assemble answers +- `session-search` — Permission to search session persistence database +- `file-access` — Permission to read externalized large files +- `context-expand` — Permission to expand context window with external data +- `tool-invoke` — Permission to invoke external tools/MCP servers +- Custom scopes accepted — any string is valid + +## Integration with other skills + +- **dag-recall**: Should issue a grant before expanding DAG nodes; checks budget during expansion +- **session-persistence**: Grant-gated search — validate grant before querying message database +- **large-file-interceptor**: Grant-gated file restore — validate before loading large files back +- **context-assembly-scorer**: Token budget tracking feeds into assembly scoring + +## State + +Grant ledger and audit log stored in `~/.openclaw/skill-state/expansion-grant-guard/state.yaml`. + +Fields: `active_grants`, `total_issued`, `total_expired`, `total_revoked`, `total_exhausted`, `total_tokens_granted`, `total_tokens_consumed`, `grant_history`. + +## Notes + +- Uses Python's built-in modules only — no external dependencies +- Grant IDs are timestamped and sequential within a day +- Ledger file is append-friendly YAML — safe for concurrent reads +- Expired grants kept in history for audit; active list stays clean after sweep +- Default TTL is 30 minutes; max TTL is 24 hours +- Token budget is advisory — enforcement depends on consuming skill cooperation diff --git a/skills/openclaw-native/expansion-grant-guard/STATE_SCHEMA.yaml b/skills/openclaw-native/expansion-grant-guard/STATE_SCHEMA.yaml new file mode 100644 index 0000000..f1d586e --- /dev/null +++ b/skills/openclaw-native/expansion-grant-guard/STATE_SCHEMA.yaml @@ -0,0 +1,38 @@ +version: "1.0" +description: Grant ledger, consumption tracking, and audit history. +fields: + active_grants: + type: list + description: Currently active grants + items: + grant_id: { type: string } + scope: { type: string } + issued_at: { type: datetime } + expires_at: { type: datetime } + token_budget: { type: integer } + tokens_consumed: { type: integer } + status: { type: enum, values: [active, expired, revoked, exhausted] } + issuer: { type: string } + total_issued: + type: integer + total_expired: + type: integer + total_revoked: + type: integer + total_exhausted: + type: integer + total_tokens_granted: + type: integer + total_tokens_consumed: + type: integer + grant_history: + type: list + description: Rolling log of completed grants (last 50) + items: + grant_id: { type: string } + scope: { type: string } + issued_at: { type: datetime } + closed_at: { type: datetime } + token_budget: { type: integer } + tokens_consumed: { type: integer } + final_status: { type: string } diff --git a/skills/openclaw-native/expansion-grant-guard/example-state.yaml b/skills/openclaw-native/expansion-grant-guard/example-state.yaml new file mode 100644 index 0000000..df92bff --- /dev/null +++ b/skills/openclaw-native/expansion-grant-guard/example-state.yaml @@ -0,0 +1,75 @@ +# Example runtime state for expansion-grant-guard +active_grants: + - grant_id: "g-20260316-003" + scope: "dag-recall" + issued_at: "2026-03-16T10:30:00Z" + expires_at: "2026-03-16T11:00:00Z" + token_budget: 4000 + tokens_consumed: 1250 + status: active + issuer: "parent-session" + - grant_id: "g-20260316-004" + scope: "session-search" + issued_at: "2026-03-16T10:45:00Z" + expires_at: "2026-03-16T11:15:00Z" + token_budget: 2000 + tokens_consumed: 0 + status: active + issuer: "parent-session" +total_issued: 12 +total_expired: 6 +total_revoked: 1 +total_exhausted: 3 +total_tokens_granted: 48000 +total_tokens_consumed: 28750 +grant_history: + - grant_id: "g-20260316-001" + scope: "dag-recall" + issued_at: "2026-03-16T09:00:00Z" + closed_at: "2026-03-16T09:22:15Z" + token_budget: 4000 + tokens_consumed: 3840 + final_status: exhausted + - grant_id: "g-20260316-002" + scope: "context-expand" + issued_at: "2026-03-16T09:30:00Z" + closed_at: "2026-03-16T10:00:00Z" + token_budget: 8000 + tokens_consumed: 2100 + final_status: expired +# ── Walkthrough ────────────────────────────────────────────────────────────── +# python3 guard.py --issue --scope "dag-recall" --budget 4000 --ttl 30 +# +# Grant Issued +# ───────────────────────────────────────────── +# Grant ID: g-20260316-003 +# Scope: dag-recall +# Token budget: 4,000 +# Expires: 2026-03-16T11:00:00Z (in 30 min) +# Status: active +# +# python3 guard.py --validate g-20260316-003 +# +# ✓ Grant: g-20260316-003 +# Status: active +# Scope: dag-recall +# Tokens remaining: 2,750 / 4,000 +# Expires in: 18.5 min +# +# python3 guard.py --consume g-20260316-003 --tokens 500 +# +# Consumed 500 tokens from g-20260316-003 +# Remaining: 2,250 / 4,000 +# +# python3 guard.py --stats +# +# Expansion Grant Statistics +# ────────────────────────────────────────────── +# Active grants: 2 +# Total issued: 12 +# Total expired: 6 +# Total revoked: 1 +# Total exhausted: 3 +# Tokens granted: 48,000 +# Tokens consumed: 28,750 +# Utilization: 59.9% diff --git a/skills/openclaw-native/expansion-grant-guard/guard.py b/skills/openclaw-native/expansion-grant-guard/guard.py new file mode 100755 index 0000000..191d4f0 --- /dev/null +++ b/skills/openclaw-native/expansion-grant-guard/guard.py @@ -0,0 +1,504 @@ +#!/usr/bin/env python3 +"""Expansion Grant Guard — YAML-based delegation grant ledger. + +Issues, validates, and tracks scoped permission grants for sub-agent +expansions with token budgets and auto-expiry. + +Usage: + python3 guard.py --issue --scope "dag-recall" --budget 4000 --ttl 30 + python3 guard.py --validate + python3 guard.py --consume --tokens 500 + python3 guard.py --revoke + python3 guard.py --list + python3 guard.py --sweep + python3 guard.py --audit + python3 guard.py --stats + python3 guard.py --status + python3 guard.py --format json +""" + +import argparse +import json +import os +import re +import sys +from datetime import datetime, timedelta, timezone +from pathlib import Path + +# ── Paths ──────────────────────────────────────────────────────────────────── + +OPENCLAW_DIR = Path.home() / ".openclaw" +STATE_DIR = OPENCLAW_DIR / "skill-state" / "expansion-grant-guard" +LEDGER_PATH = STATE_DIR / "ledger.yaml" +STATE_PATH = STATE_DIR / "state.yaml" + +DEFAULT_TTL_MINUTES = 30 +MAX_TTL_MINUTES = 1440 # 24 hours +DEFAULT_BUDGET = 4000 +MAX_HISTORY = 50 + + +# ── YAML-lite read/write ──────────────────────────────────────────────────── + +def load_ledger(): + """Load the grant ledger from YAML-like file.""" + if not LEDGER_PATH.exists(): + return {"grants": [], "counter": 0} + try: + text = LEDGER_PATH.read_text() + data = json.loads(text) # stored as JSON for reliability + return data + except Exception: + return {"grants": [], "counter": 0} + + +def save_ledger(ledger): + """Save ledger to disk.""" + STATE_DIR.mkdir(parents=True, exist_ok=True) + LEDGER_PATH.write_text(json.dumps(ledger, indent=2, default=str)) + + +def load_state(): + """Load aggregate state.""" + if STATE_PATH.exists(): + try: + return json.loads(STATE_PATH.read_text()) + except Exception: + pass + return { + "total_issued": 0, + "total_expired": 0, + "total_revoked": 0, + "total_exhausted": 0, + "total_tokens_granted": 0, + "total_tokens_consumed": 0, + "grant_history": [], + } + + +def save_state(state): + STATE_DIR.mkdir(parents=True, exist_ok=True) + # Trim history + if len(state.get("grant_history", [])) > MAX_HISTORY: + state["grant_history"] = state["grant_history"][-MAX_HISTORY:] + STATE_PATH.write_text(json.dumps(state, indent=2, default=str)) + + +# ── Grant operations ───────────────────────────────────────────────────────── + +def generate_grant_id(ledger): + """Generate a sequential grant ID for today.""" + counter = ledger.get("counter", 0) + 1 + ledger["counter"] = counter + today = datetime.now(timezone.utc).strftime("%Y%m%d") + return f"g-{today}-{counter:03d}" + + +def now_utc(): + return datetime.now(timezone.utc) + + +def parse_iso(s): + """Parse ISO datetime string.""" + try: + return datetime.fromisoformat(s.replace("Z", "+00:00")) + except Exception: + return None + + +def is_expired(grant): + """Check if a grant has expired.""" + expires = parse_iso(grant.get("expires_at", "")) + if expires and now_utc() > expires: + return True + return False + + +def is_exhausted(grant): + """Check if token budget is consumed.""" + return grant.get("tokens_consumed", 0) >= grant.get("token_budget", 0) + + +def effective_status(grant): + """Compute effective status considering time and budget.""" + if grant.get("status") in ("revoked",): + return "revoked" + if is_expired(grant): + return "expired" + if is_exhausted(grant): + return "exhausted" + return grant.get("status", "active") + + +# ── Commands ───────────────────────────────────────────────────────────────── + +def cmd_issue(args): + """Issue a new grant.""" + ledger = load_ledger() + state = load_state() + + ttl = min(args.ttl, MAX_TTL_MINUTES) + budget = args.budget + + grant_id = generate_grant_id(ledger) + issued = now_utc() + expires = issued + timedelta(minutes=ttl) + + grant = { + "grant_id": grant_id, + "scope": args.scope, + "issued_at": issued.isoformat(), + "expires_at": expires.isoformat(), + "token_budget": budget, + "tokens_consumed": 0, + "status": "active", + "issuer": args.issuer or "parent-session", + "metadata": {}, + } + if args.reason: + grant["metadata"]["reason"] = args.reason + + ledger["grants"].append(grant) + save_ledger(ledger) + + state["total_issued"] = state.get("total_issued", 0) + 1 + state["total_tokens_granted"] = state.get("total_tokens_granted", 0) + budget + save_state(state) + + if args.format == "json": + print(json.dumps(grant, indent=2)) + else: + print("Grant Issued") + print("─" * 50) + print(f" Grant ID: {grant_id}") + print(f" Scope: {args.scope}") + print(f" Token budget: {budget:,}") + print(f" Expires: {expires.isoformat()} (in {ttl} min)") + print(f" Status: active") + return 0 + + +def cmd_validate(args): + """Validate a grant — check active, not expired, budget remaining.""" + ledger = load_ledger() + grant = None + for g in ledger["grants"]: + if g["grant_id"] == args.validate: + grant = g + break + + if not grant: + print(f"Grant not found: {args.validate}") + return 1 + + status = effective_status(grant) + remaining = grant["token_budget"] - grant["tokens_consumed"] + expires = parse_iso(grant["expires_at"]) + ttl_remaining = (expires - now_utc()).total_seconds() / 60 if expires else 0 + + valid = status == "active" + + if args.format == "json": + print(json.dumps({ + "grant_id": grant["grant_id"], + "valid": valid, + "status": status, + "scope": grant["scope"], + "tokens_remaining": max(0, remaining), + "minutes_remaining": max(0, round(ttl_remaining, 1)), + }, indent=2)) + else: + icon = "✓" if valid else "✗" + print(f"{icon} Grant: {grant['grant_id']}") + print(f" Status: {status}") + print(f" Scope: {grant['scope']}") + print(f" Tokens remaining: {max(0, remaining):,} / {grant['token_budget']:,}") + if ttl_remaining > 0: + print(f" Expires in: {max(0, round(ttl_remaining, 1))} min") + else: + print(f" Expired: {grant['expires_at']}") + + return 0 if valid else 1 + + +def cmd_consume(args): + """Record token consumption against a grant.""" + ledger = load_ledger() + state = load_state() + + grant = None + for g in ledger["grants"]: + if g["grant_id"] == args.consume: + grant = g + break + + if not grant: + print(f"Grant not found: {args.consume}") + return 1 + + status = effective_status(grant) + if status != "active": + print(f"Grant {args.consume} is {status} — cannot consume") + return 1 + + remaining = grant["token_budget"] - grant["tokens_consumed"] + if args.tokens > remaining: + print(f"Insufficient budget: requested {args.tokens}, remaining {remaining}") + return 1 + + grant["tokens_consumed"] += args.tokens + state["total_tokens_consumed"] = state.get("total_tokens_consumed", 0) + args.tokens + + # Check if now exhausted + if grant["tokens_consumed"] >= grant["token_budget"]: + grant["status"] = "exhausted" + state["total_exhausted"] = state.get("total_exhausted", 0) + 1 + close_grant(state, grant, "exhausted") + + save_ledger(ledger) + save_state(state) + + new_remaining = grant["token_budget"] - grant["tokens_consumed"] + if args.format == "json": + print(json.dumps({ + "grant_id": grant["grant_id"], + "tokens_consumed": args.tokens, + "tokens_remaining": new_remaining, + "status": effective_status(grant), + }, indent=2)) + else: + print(f"Consumed {args.tokens:,} tokens from {grant['grant_id']}") + print(f" Remaining: {new_remaining:,} / {grant['token_budget']:,}") + if new_remaining == 0: + print(f" Status: exhausted") + return 0 + + +def cmd_revoke(args): + """Revoke a grant early.""" + ledger = load_ledger() + state = load_state() + + grant = None + for g in ledger["grants"]: + if g["grant_id"] == args.revoke: + grant = g + break + + if not grant: + print(f"Grant not found: {args.revoke}") + return 1 + + if grant["status"] == "revoked": + print(f"Grant {args.revoke} is already revoked") + return 0 + + grant["status"] = "revoked" + state["total_revoked"] = state.get("total_revoked", 0) + 1 + close_grant(state, grant, "revoked") + + save_ledger(ledger) + save_state(state) + + if args.format == "json": + print(json.dumps({"grant_id": grant["grant_id"], "status": "revoked"}, indent=2)) + else: + print(f"Revoked grant: {grant['grant_id']}") + return 0 + + +def cmd_list(args): + """List active grants.""" + ledger = load_ledger() + active = [g for g in ledger["grants"] if effective_status(g) == "active"] + + if args.format == "json": + print(json.dumps(active, indent=2)) + else: + print(f"Active Grants: {len(active)}") + print("─" * 60) + if not active: + print(" No active grants") + for g in active: + remaining = g["token_budget"] - g["tokens_consumed"] + expires = parse_iso(g["expires_at"]) + ttl = (expires - now_utc()).total_seconds() / 60 if expires else 0 + print(f" {g['grant_id']} scope={g['scope']} " + f"tokens={remaining:,}/{g['token_budget']:,} " + f"expires={max(0, round(ttl))}min") + return 0 + + +def cmd_sweep(args): + """Clean up expired grants.""" + ledger = load_ledger() + state = load_state() + swept = 0 + + for g in ledger["grants"]: + if g["status"] == "active" and is_expired(g): + g["status"] = "expired" + state["total_expired"] = state.get("total_expired", 0) + 1 + close_grant(state, g, "expired") + swept += 1 + + save_ledger(ledger) + save_state(state) + + if args.format == "json": + print(json.dumps({"swept": swept}, indent=2)) + else: + print(f"Sweep complete: {swept} grants expired") + return 0 + + +def cmd_audit(args): + """Show full audit log.""" + state = load_state() + history = state.get("grant_history", []) + + if args.format == "json": + print(json.dumps(history, indent=2)) + else: + print(f"Grant Audit Log: {len(history)} entries") + print("─" * 65) + for h in reversed(history[-20:]): + print(f" {h['grant_id']} scope={h['scope']} " + f"status={h['final_status']} " + f"tokens={h['tokens_consumed']}/{h['token_budget']} " + f"closed={h.get('closed_at', '?')[:16]}") + return 0 + + +def cmd_stats(args): + """Show grant statistics.""" + state = load_state() + ledger = load_ledger() + active_count = sum(1 for g in ledger["grants"] if effective_status(g) == "active") + + if args.format == "json": + stats = { + "active_grants": active_count, + "total_issued": state.get("total_issued", 0), + "total_expired": state.get("total_expired", 0), + "total_revoked": state.get("total_revoked", 0), + "total_exhausted": state.get("total_exhausted", 0), + "total_tokens_granted": state.get("total_tokens_granted", 0), + "total_tokens_consumed": state.get("total_tokens_consumed", 0), + } + print(json.dumps(stats, indent=2)) + else: + print("Expansion Grant Statistics") + print("─" * 50) + print(f" Active grants: {active_count}") + print(f" Total issued: {state.get('total_issued', 0)}") + print(f" Total expired: {state.get('total_expired', 0)}") + print(f" Total revoked: {state.get('total_revoked', 0)}") + print(f" Total exhausted: {state.get('total_exhausted', 0)}") + tg = state.get('total_tokens_granted', 0) + tc = state.get('total_tokens_consumed', 0) + print(f" Tokens granted: {tg:,}") + print(f" Tokens consumed: {tc:,}") + if tg > 0: + pct = round(tc / tg * 100, 1) + print(f" Utilization: {pct}%") + return 0 + + +def cmd_status(args): + """Show current status summary.""" + state = load_state() + ledger = load_ledger() + active = [g for g in ledger["grants"] if effective_status(g) == "active"] + + if args.format == "json": + print(json.dumps({ + "active_grants": len(active), + "total_issued": state.get("total_issued", 0), + "total_tokens_consumed": state.get("total_tokens_consumed", 0), + }, indent=2)) + else: + print("Expansion Grant Guard — Status") + print("─" * 50) + print(f" Active grants: {len(active)}") + print(f" Total issued: {state.get('total_issued', 0)}") + print(f" Tokens consumed: {state.get('total_tokens_consumed', 0):,}") + if active: + print() + for g in active[:5]: + remaining = g["token_budget"] - g["tokens_consumed"] + print(f" {g['grant_id']} {g['scope']} {remaining:,} tokens remaining") + return 0 + + +# ── Helpers ────────────────────────────────────────────────────────────────── + +def close_grant(state, grant, final_status): + """Record a closed grant in history.""" + entry = { + "grant_id": grant["grant_id"], + "scope": grant["scope"], + "issued_at": grant["issued_at"], + "closed_at": now_utc().isoformat(), + "token_budget": grant["token_budget"], + "tokens_consumed": grant["tokens_consumed"], + "final_status": final_status, + } + history = state.get("grant_history", []) + history.append(entry) + state["grant_history"] = history + + +# ── Main ───────────────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser( + description="Expansion Grant Guard — delegation grant ledger" + ) + parser.add_argument("--issue", action="store_true", help="Issue a new grant") + parser.add_argument("--validate", type=str, help="Validate a grant by ID") + parser.add_argument("--consume", type=str, help="Consume tokens from a grant") + parser.add_argument("--revoke", type=str, help="Revoke a grant") + parser.add_argument("--list", action="store_true", help="List active grants") + parser.add_argument("--sweep", action="store_true", help="Clean up expired grants") + parser.add_argument("--audit", action="store_true", help="Show audit log") + parser.add_argument("--stats", action="store_true", help="Grant statistics") + parser.add_argument("--status", action="store_true", help="Current status") + # Issue options + parser.add_argument("--scope", type=str, default="general", help="Grant scope") + parser.add_argument("--budget", type=int, default=DEFAULT_BUDGET, help="Token budget") + parser.add_argument("--ttl", type=int, default=DEFAULT_TTL_MINUTES, help="TTL in minutes") + parser.add_argument("--issuer", type=str, help="Grant issuer identifier") + parser.add_argument("--reason", type=str, help="Reason for grant") + # Consume options + parser.add_argument("--tokens", type=int, default=0, help="Tokens to consume") + # Output + parser.add_argument("--format", choices=["text", "json"], default="text") + + args = parser.parse_args() + + if args.issue: + return cmd_issue(args) + elif args.validate: + return cmd_validate(args) + elif args.consume: + return cmd_consume(args) + elif args.revoke: + return cmd_revoke(args) + elif args.list: + return cmd_list(args) + elif args.sweep: + return cmd_sweep(args) + elif args.audit: + return cmd_audit(args) + elif args.stats: + return cmd_stats(args) + elif args.status: + return cmd_status(args) + else: + parser.print_help() + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/skills/openclaw-native/session-persistence/SKILL.md b/skills/openclaw-native/session-persistence/SKILL.md new file mode 100644 index 0000000..d53fb4f --- /dev/null +++ b/skills/openclaw-native/session-persistence/SKILL.md @@ -0,0 +1,106 @@ +--- +name: session-persistence +version: "1.0" +category: openclaw-native +description: Imports OpenClaw session transcripts into a local SQLite database with FTS5 full-text search — the agent never loses a message, even after context compaction or session rollover. +stateful: true +cron: "*/15 * * * *" +--- + +# Session Persistence + +## What it does + +OpenClaw stores session data in JSONL files that are difficult to search and easy to lose track of. Session Persistence imports every message into a local SQLite database with full-text search, making the agent's entire history queryable — across all sessions, all channels, all time. + +Inspired by [lossless-claw](https://github.com/Martian-Engineering/lossless-claw)'s SQLite message persistence layer, which stores every message with sequence numbers, token counts, and structured message parts. + +## When to invoke + +- Automatically every 15 minutes (cron) — incremental import of new messages +- When the agent needs to search past conversations — use `--search` +- After a crash or session rollover — verify all messages are persisted +- For analytics — message counts, session timelines, activity patterns + +## How to use + +```bash +python3 persist.py --import # Import new messages from session files +python3 persist.py --import --source # Import from a specific directory +python3 persist.py --search "auth migration" # FTS5 full-text search +python3 persist.py --search "deploy" --role user # Search only user messages +python3 persist.py --recent --hours 24 # Messages from the last 24 hours +python3 persist.py --conversation # Dump a full conversation +python3 persist.py --stats # Database statistics +python3 persist.py --export --format jsonl # Export back to JSONL +python3 persist.py --status # Last import summary +python3 persist.py --format json # Machine-readable output +``` + +## Database schema + +Stored at `~/.openclaw/lcm-db/messages.db`: + +```sql +conversations — id, channel, started_at, last_message_at, message_count +messages — id, conversation_id, seq, role, content, token_estimate, created_at +messages_fts — FTS5 virtual table over messages.content for fast search +import_log — id, imported_at, conversations_added, messages_added, source +``` + +## Cron wakeup behaviour + +Every 15 minutes: + +1. Scan session directory for JSONL files +2. For each file, check `last_imported_seq` to skip already-imported messages +3. Parse new messages and insert into SQLite +4. Update FTS5 index +5. Update import log and state + +## Procedure + +**Step 1 — Initial import** + +```bash +python3 persist.py --import +``` + +First run imports all existing session files. Subsequent runs are incremental — only new messages since last import. + +**Step 2 — Search your history** + +```bash +python3 persist.py --search "how did we handle the database migration" +``` + +FTS5 provides ranked results across all sessions and time periods. Results include conversation ID, timestamp, role, and content snippet. + +**Step 3 — Analyze patterns** + +```bash +python3 persist.py --stats +``` + +Shows total messages, conversations, date ranges, messages per role, and activity timeline. + +## Integration with other skills + +- **memory-dag-compactor**: Can use SQLite messages as source data instead of MEMORY.md, bringing architecture closer to lossless-claw +- **dag-recall**: Search results feed into DAG expansion for detailed recall +- **context-assembly-scorer**: Uses message database to measure true coverage + +## State + +Import tracking and database stats stored in `~/.openclaw/skill-state/session-persistence/state.yaml`. +Database stored at `~/.openclaw/lcm-db/messages.db`. + +Fields: `last_import_at`, `db_path`, `total_messages`, `total_conversations`, `import_history`. + +## Notes + +- Uses Python's built-in `sqlite3` module — no external dependencies +- FTS5 used when available; falls back to LIKE queries otherwise +- Idempotent: safe to re-run; tracks per-conversation sequence numbers +- Import lag: up to 15 minutes behind real-time (cron interval) +- Database is local-only — never committed to the repo diff --git a/skills/openclaw-native/session-persistence/STATE_SCHEMA.yaml b/skills/openclaw-native/session-persistence/STATE_SCHEMA.yaml new file mode 100644 index 0000000..5671897 --- /dev/null +++ b/skills/openclaw-native/session-persistence/STATE_SCHEMA.yaml @@ -0,0 +1,23 @@ +version: "1.0" +description: Session import tracking, database stats, and import history. +fields: + last_import_at: + type: datetime + db_path: + type: string + default: "~/.openclaw/lcm-db/messages.db" + total_messages: + type: integer + total_conversations: + type: integer + conversation_cursors: + type: object + description: Per-conversation last imported sequence number + import_history: + type: list + description: Rolling log of past imports (last 20) + items: + imported_at: { type: datetime } + conversations_added: { type: integer } + messages_added: { type: integer } + source: { type: string } diff --git a/skills/openclaw-native/session-persistence/example-state.yaml b/skills/openclaw-native/session-persistence/example-state.yaml new file mode 100644 index 0000000..fa11d02 --- /dev/null +++ b/skills/openclaw-native/session-persistence/example-state.yaml @@ -0,0 +1,56 @@ +# Example runtime state for session-persistence +last_import_at: "2026-03-16T14:15:03.000000" +db_path: "/Users/you/.openclaw/lcm-db/messages.db" +total_messages: 4832 +total_conversations: 23 +conversation_cursors: + session-abc123: 342 + session-def456: 128 + session-ghi789: 56 +import_history: + - imported_at: "2026-03-16T14:15:03.000000" + conversations_added: 0 + messages_added: 18 + source: default + - imported_at: "2026-03-16T14:00:02.000000" + conversations_added: 1 + messages_added: 45 + source: default + - imported_at: "2026-03-16T13:45:01.000000" + conversations_added: 0 + messages_added: 12 + source: default +# ── Walkthrough ────────────────────────────────────────────────────────────── +# Cron runs every 15 min: python3 persist.py --import +# +# Session Import — 2026-03-16 14:15 +# ────────────────────────────────────────────────── +# Files scanned: 23 +# Conversations added: 0 +# Messages imported: 18 +# Database: /Users/you/.openclaw/lcm-db/messages.db +# +# python3 persist.py --search "API migration" +# +# Search: "API migration" — 5 results +# ─────────────────────────────────────────────────────── +# [assistant] 2026-03-14T10: session-abc1... +# "Migrated the API endpoints from v1 to v2..." +# +# [ user] 2026-03-14T09: session-abc1... +# "Let's start the API migration..." +# +# python3 persist.py --stats +# +# Session Persistence Stats +# ────────────────────────────────────────────────── +# Messages: 4,832 +# Conversations: 23 +# Database size: 2,340 KB +# Date range: 2026-02-01 → 2026-03-16 +# +# By role: +# user: 1,245 +# assistant: 2,891 +# system: 412 +# tool: 284 diff --git a/skills/openclaw-native/session-persistence/persist.py b/skills/openclaw-native/session-persistence/persist.py new file mode 100755 index 0000000..f422c00 --- /dev/null +++ b/skills/openclaw-native/session-persistence/persist.py @@ -0,0 +1,544 @@ +#!/usr/bin/env python3 +""" +Session Persistence for openclaw-superpowers. + +Imports OpenClaw session transcripts into SQLite with FTS5 full-text search. + +Usage: + python3 persist.py --import + python3 persist.py --import --source + python3 persist.py --search "query" + python3 persist.py --search "query" --role user + python3 persist.py --recent --hours 24 + python3 persist.py --conversation + python3 persist.py --stats + python3 persist.py --export --format jsonl + python3 persist.py --status + python3 persist.py --format json +""" + +import argparse +import json +import os +import re +import sqlite3 +import sys +from datetime import datetime, timedelta +from pathlib import Path + +try: + import yaml + HAS_YAML = True +except ImportError: + HAS_YAML = False + +OPENCLAW_DIR = Path(os.environ.get("OPENCLAW_HOME", Path.home() / ".openclaw")) +STATE_FILE = OPENCLAW_DIR / "skill-state" / "session-persistence" / "state.yaml" +DB_DIR = OPENCLAW_DIR / "lcm-db" +DB_PATH = DB_DIR / "messages.db" +SESSION_DIRS = [ + OPENCLAW_DIR / "sessions", + OPENCLAW_DIR / "data" / "sessions", + Path.home() / ".config" / "openclaw" / "sessions", +] +MAX_HISTORY = 20 + + +# ── State helpers ──────────────────────────────────────────────────────────── + +def load_state() -> dict: + if not STATE_FILE.exists(): + return {"conversation_cursors": {}, "import_history": [], + "total_messages": 0, "total_conversations": 0} + try: + text = STATE_FILE.read_text() + return (yaml.safe_load(text) or {}) if HAS_YAML else {} + except Exception: + return {"conversation_cursors": {}, "import_history": [], + "total_messages": 0, "total_conversations": 0} + + +def save_state(state: dict) -> None: + STATE_FILE.parent.mkdir(parents=True, exist_ok=True) + if HAS_YAML: + with open(STATE_FILE, "w") as f: + yaml.dump(state, f, default_flow_style=False, allow_unicode=True) + + +def estimate_tokens(text: str) -> int: + return len(text) // 4 + + +# ── Database ───────────────────────────────────────────────────────────────── + +def init_db() -> sqlite3.Connection: + """Initialize SQLite database with schema.""" + DB_DIR.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA foreign_keys=ON") + + conn.executescript(""" + CREATE TABLE IF NOT EXISTS conversations ( + id TEXT PRIMARY KEY, + channel TEXT DEFAULT '', + started_at TEXT, + last_message_at TEXT, + message_count INTEGER DEFAULT 0 + ); + + CREATE TABLE IF NOT EXISTS messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + conversation_id TEXT REFERENCES conversations(id), + seq INTEGER, + role TEXT, + content TEXT, + token_estimate INTEGER, + created_at TEXT, + UNIQUE(conversation_id, seq) + ); + + CREATE TABLE IF NOT EXISTS import_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + imported_at TEXT, + conversations_added INTEGER, + messages_added INTEGER, + source TEXT + ); + + CREATE INDEX IF NOT EXISTS idx_messages_conversation + ON messages(conversation_id, seq); + CREATE INDEX IF NOT EXISTS idx_messages_role + ON messages(role); + CREATE INDEX IF NOT EXISTS idx_messages_created + ON messages(created_at); + """) + + # Try to create FTS5 table (may fail if FTS5 not available) + try: + conn.execute(""" + CREATE VIRTUAL TABLE IF NOT EXISTS messages_fts + USING fts5(content, content='messages', content_rowid='id') + """) + except sqlite3.OperationalError: + pass # FTS5 not available — will use LIKE fallback + + conn.commit() + return conn + + +def has_fts5(conn: sqlite3.Connection) -> bool: + """Check if FTS5 table exists.""" + try: + conn.execute("SELECT count(*) FROM messages_fts LIMIT 1") + return True + except sqlite3.OperationalError: + return False + + +def sync_fts(conn: sqlite3.Connection) -> None: + """Rebuild FTS5 index from messages table.""" + if not has_fts5(conn): + return + try: + conn.execute("INSERT INTO messages_fts(messages_fts) VALUES('rebuild')") + conn.commit() + except sqlite3.OperationalError: + pass + + +# ── JSONL parsing ──────────────────────────────────────────────────────────── + +def find_session_files(source_dir: Path | None = None) -> list[Path]: + """Find all JSONL session files.""" + dirs = [source_dir] if source_dir else SESSION_DIRS + files = [] + for d in dirs: + if not d.exists(): + continue + files.extend(d.rglob("*.jsonl")) + files.extend(d.rglob("*.json")) + return sorted(set(files)) + + +def parse_jsonl_file(path: Path) -> list[dict]: + """Parse a JSONL session file into messages.""" + messages = [] + try: + text = path.read_text(errors="replace") + except (PermissionError, OSError): + return [] + + for line_num, line in enumerate(text.split("\n")): + line = line.strip() + if not line: + continue + try: + data = json.loads(line) + except json.JSONDecodeError: + continue + + # Extract message from various formats + msg = None + if "message" in data and isinstance(data["message"], dict): + msg = data["message"] + elif "role" in data and "content" in data: + msg = data + elif "type" in data and data["type"] in ("user", "assistant"): + msg = data.get("message", data) + + if not msg or "role" not in msg: + continue + + content = msg.get("content", "") + if isinstance(content, list): + # Handle structured content blocks + text_parts = [] + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + text_parts.append(block.get("text", "")) + elif isinstance(block, str): + text_parts.append(block) + content = "\n".join(text_parts) + + if not content or not isinstance(content, str): + continue + + messages.append({ + "role": msg["role"], + "content": content, + "created_at": data.get("timestamp", data.get("created_at", + datetime.now().isoformat())), + "seq": line_num, + }) + + return messages + + +def conversation_id_from_path(path: Path) -> str: + """Generate a conversation ID from the file path.""" + return path.stem + + +# ── Commands ───────────────────────────────────────────────────────────────── + +def cmd_import(state: dict, source: str | None, fmt: str) -> None: + conn = init_db() + now = datetime.now().isoformat() + source_dir = Path(source) if source else None + + files = find_session_files(source_dir) + if not files: + dirs_checked = [source_dir] if source_dir else SESSION_DIRS + print("No session files found. Searched:") + for d in dirs_checked: + print(f" {d}") + return + + cursors = state.get("conversation_cursors") or {} + total_convos_added = 0 + total_msgs_added = 0 + + for fpath in files: + conv_id = conversation_id_from_path(fpath) + messages = parse_jsonl_file(fpath) + if not messages: + continue + + last_seq = cursors.get(conv_id, -1) + new_messages = [m for m in messages if m["seq"] > last_seq] + if not new_messages: + continue + + # Ensure conversation exists + existing = conn.execute("SELECT id FROM conversations WHERE id=?", + (conv_id,)).fetchone() + if not existing: + conn.execute( + "INSERT INTO conversations (id, channel, started_at, last_message_at, message_count) " + "VALUES (?, ?, ?, ?, 0)", + (conv_id, "", new_messages[0]["created_at"], new_messages[-1]["created_at"]) + ) + total_convos_added += 1 + + for msg in new_messages: + try: + conn.execute( + "INSERT OR IGNORE INTO messages " + "(conversation_id, seq, role, content, token_estimate, created_at) " + "VALUES (?, ?, ?, ?, ?, ?)", + (conv_id, msg["seq"], msg["role"], msg["content"], + estimate_tokens(msg["content"]), msg["created_at"]) + ) + total_msgs_added += 1 + except sqlite3.IntegrityError: + pass + + # Update conversation stats + conn.execute( + "UPDATE conversations SET last_message_at=?, message_count=message_count+? WHERE id=?", + (new_messages[-1]["created_at"], len(new_messages), conv_id) + ) + cursors[conv_id] = max(m["seq"] for m in new_messages) + + conn.execute( + "INSERT INTO import_log (imported_at, conversations_added, messages_added, source) " + "VALUES (?, ?, ?, ?)", + (now, total_convos_added, total_msgs_added, str(source_dir or "default")) + ) + conn.commit() + sync_fts(conn) + conn.close() + + # Update state + state["conversation_cursors"] = cursors + state["last_import_at"] = now + state["total_messages"] = (state.get("total_messages") or 0) + total_msgs_added + state["total_conversations"] = (state.get("total_conversations") or 0) + total_convos_added + state["db_path"] = str(DB_PATH) + + history = state.get("import_history") or [] + history.insert(0, { + "imported_at": now, "conversations_added": total_convos_added, + "messages_added": total_msgs_added, "source": str(source_dir or "default"), + }) + state["import_history"] = history[:MAX_HISTORY] + save_state(state) + + if fmt == "json": + print(json.dumps({"files_scanned": len(files), "conversations_added": total_convos_added, + "messages_added": total_msgs_added}, indent=2)) + else: + print(f"\nSession Import — {datetime.now().strftime('%Y-%m-%d %H:%M')}") + print("-" * 50) + print(f" Files scanned: {len(files)}") + print(f" Conversations added: {total_convos_added}") + print(f" Messages imported: {total_msgs_added}") + print(f" Database: {DB_PATH}") + print() + + +def cmd_search(query: str, role: str | None, fmt: str) -> None: + if not DB_PATH.exists(): + print("Database not found. Run --import first.") + sys.exit(1) + + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + + results = [] + if has_fts5(conn): + # Sanitize FTS5 query + safe_query = re.sub(r'[^\w\s]', ' ', query) + sql = """ + SELECT m.conversation_id, m.seq, m.role, m.content, m.created_at, + rank + FROM messages_fts fts + JOIN messages m ON fts.rowid = m.id + WHERE messages_fts MATCH ? + """ + params = [safe_query] + if role: + sql += " AND m.role = ?" + params.append(role) + sql += " ORDER BY rank LIMIT 20" + + try: + results = conn.execute(sql, params).fetchall() + except sqlite3.OperationalError: + results = [] + + if not results: + # LIKE fallback + sql = "SELECT conversation_id, seq, role, content, created_at FROM messages WHERE content LIKE ?" + params = [f"%{query}%"] + if role: + sql += " AND role = ?" + params.append(role) + sql += " ORDER BY created_at DESC LIMIT 20" + results = conn.execute(sql, params).fetchall() + + conn.close() + + if fmt == "json": + print(json.dumps({"query": query, "results": [dict(r) for r in results]}, indent=2)) + else: + print(f"\nSearch: \"{query}\" — {len(results)} results") + print("-" * 55) + for r in results: + content = r["content"][:120].replace("\n", " ") + ts = (r["created_at"] or "")[:16] + print(f" [{r['role']:>9}] {ts} {r['conversation_id'][:12]}...") + print(f" \"{content}...\"") + print() + + +def cmd_recent(hours: int, fmt: str) -> None: + if not DB_PATH.exists(): + print("Database not found. Run --import first.") + sys.exit(1) + + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + cutoff = (datetime.now() - timedelta(hours=hours)).isoformat() + + results = conn.execute( + "SELECT conversation_id, seq, role, content, created_at FROM messages " + "WHERE created_at >= ? ORDER BY created_at DESC LIMIT 50", + (cutoff,) + ).fetchall() + conn.close() + + if fmt == "json": + print(json.dumps({"hours": hours, "messages": [dict(r) for r in results]}, indent=2)) + else: + print(f"\nRecent Messages (last {hours}h) — {len(results)} messages") + print("-" * 55) + for r in results: + content = r["content"][:100].replace("\n", " ") + ts = (r["created_at"] or "")[:16] + print(f" [{r['role']:>9}] {ts} \"{content}...\"") + print() + + +def cmd_conversation(conv_id: str, fmt: str) -> None: + if not DB_PATH.exists(): + print("Database not found. Run --import first.") + sys.exit(1) + + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + results = conn.execute( + "SELECT seq, role, content, created_at FROM messages " + "WHERE conversation_id = ? ORDER BY seq", (conv_id,) + ).fetchall() + conn.close() + + if not results: + # Try partial match + conn2 = sqlite3.connect(str(DB_PATH)) + conn2.row_factory = sqlite3.Row + results = conn2.execute( + "SELECT seq, role, content, created_at FROM messages " + "WHERE conversation_id LIKE ? ORDER BY seq", (f"%{conv_id}%",) + ).fetchall() + conn2.close() + + if fmt == "json": + print(json.dumps({"conversation": conv_id, "messages": [dict(r) for r in results]}, indent=2)) + else: + print(f"\nConversation: {conv_id} — {len(results)} messages") + print("-" * 55) + for r in results: + content = r["content"][:200].replace("\n", " ") + ts = (r["created_at"] or "")[:16] + print(f" [{r['seq']:>4}] [{r['role']:>9}] {ts}") + print(f" {content}") + print() + + +def cmd_stats(fmt: str) -> None: + if not DB_PATH.exists(): + print("Database not found. Run --import first.") + sys.exit(1) + + conn = sqlite3.connect(str(DB_PATH)) + total_msgs = conn.execute("SELECT count(*) FROM messages").fetchone()[0] + total_convos = conn.execute("SELECT count(*) FROM conversations").fetchone()[0] + roles = conn.execute("SELECT role, count(*) as cnt FROM messages GROUP BY role").fetchall() + date_range = conn.execute( + "SELECT min(created_at), max(created_at) FROM messages" + ).fetchone() + db_size = DB_PATH.stat().st_size if DB_PATH.exists() else 0 + conn.close() + + if fmt == "json": + print(json.dumps({ + "total_messages": total_msgs, "total_conversations": total_convos, + "roles": {r[0]: r[1] for r in roles}, + "earliest": date_range[0], "latest": date_range[1], + "db_size_bytes": db_size, + }, indent=2)) + else: + print(f"\nSession Persistence Stats") + print("-" * 50) + print(f" Messages: {total_msgs:,}") + print(f" Conversations: {total_convos}") + print(f" Database size: {db_size / 1024:.0f} KB") + if date_range[0]: + print(f" Date range: {date_range[0][:10]} → {date_range[1][:10]}") + print(f"\n By role:") + for r in roles: + print(f" {r[0]:>12}: {r[1]:,}") + print() + + +def cmd_export(fmt: str) -> None: + if not DB_PATH.exists(): + print("Database not found. Run --import first.") + sys.exit(1) + + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + messages = conn.execute( + "SELECT conversation_id, seq, role, content, token_estimate, created_at " + "FROM messages ORDER BY conversation_id, seq" + ).fetchall() + conn.close() + + for m in messages: + print(json.dumps(dict(m))) + + +def cmd_status(state: dict) -> None: + last = state.get("last_import_at", "never") + total_msgs = state.get("total_messages", 0) + total_convos = state.get("total_conversations", 0) + print(f"\nSession Persistence — Last import: {last}") + print(f" {total_msgs:,} messages | {total_convos} conversations") + print(f" Database: {state.get('db_path', str(DB_PATH))}") + history = state.get("import_history") or [] + if history: + h = history[0] + print(f" Last: +{h.get('messages_added', 0)} messages, " + f"+{h.get('conversations_added', 0)} conversations") + print() + + +def main(): + parser = argparse.ArgumentParser(description="Session Persistence") + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("--import", dest="do_import", action="store_true", + help="Import new messages from session files") + group.add_argument("--search", type=str, metavar="QUERY", help="FTS5 full-text search") + group.add_argument("--recent", action="store_true", help="Recent messages") + group.add_argument("--conversation", type=str, metavar="ID", help="Dump a conversation") + group.add_argument("--stats", action="store_true", help="Database statistics") + group.add_argument("--export", action="store_true", help="Export to JSONL") + group.add_argument("--status", action="store_true", help="Last import summary") + parser.add_argument("--source", type=str, metavar="DIR", help="Session files directory") + parser.add_argument("--role", type=str, help="Filter by role (user/assistant)") + parser.add_argument("--hours", type=int, default=24, help="Hours for --recent (default: 24)") + parser.add_argument("--format", choices=["text", "json"], default="text") + args = parser.parse_args() + + state = load_state() + if args.do_import: + cmd_import(state, args.source, args.format) + elif args.search: + cmd_search(args.search, args.role, args.format) + elif args.recent: + cmd_recent(args.hours, args.format) + elif args.conversation: + cmd_conversation(args.conversation, args.format) + elif args.stats: + cmd_stats(args.format) + elif args.export: + cmd_export(args.format) + elif args.status: + cmd_status(state) + + +if __name__ == "__main__": + main()