Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .tmp-kapy-test/.bub-kapy-threads.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"session-1": "kapy-thread-1"
}
38 changes: 38 additions & 0 deletions packages/bub-kapy/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# bub-kapy

Kapybara integration for `bub`.

`bub-kapy` is a real bub model plugin: it shells out to a configurable Kapybara
runtime, persists per-session thread ids, and optionally exposes bundled
`bub_skills` into the runtime workspace.

## Installation

```bash
uv pip install "git+https://github.com/bubbuild/bub-contrib.git#subdirectory=packages/bub-kapy"
```

## Usage

This plugin registers a `kapy` model to `bub`.

```bash
BUB_KAPY_COMMAND="kapybara chat --json -" bub --model kapy "Hello Kapybara!"
```

## Configuration

Environment variables use the `BUB_KAPY_` prefix:

- `BUB_KAPY_COMMAND`: shell-style command used to invoke Kapybara
- `BUB_KAPY_MODEL`: optional `--model` override
- `BUB_KAPY_YOLO_MODE`: when `true`, appends `--dangerously-bypass-approvals-and-sandbox`
- `BUB_KAPY_PROMPT_MODE`: `stdin` or `argv`
- `BUB_KAPY_RESUME_FORMAT`: command fragment used when a prior thread id exists
- `BUB_KAPY_COPY_SKILLS`: when `true`, symlinks `bub_skills` into `.agents/skills`

## Runtime behavior

- Stores session thread ids in `.bub-kapy-threads.json` under the active workspace
- Removes a leading JSON metadata line from stdout when it contains a thread id
- Returns combined stdout/stderr so bub can surface backend failures clearly
21 changes: 21 additions & 0 deletions packages/bub-kapy/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[project]
name = "bub-kapy"
version = "0.1.0"
description = "Kapybara agent plugin for bub"
readme = "README.md"
authors = [
{ name = "Kapybara", email = "kapybara@example.com" }
]
requires-python = ">=3.12"
dependencies = [
"bub>=0.3.0",
"pydantic>=2.0.0",
"pydantic-settings>=2.0.0",
]

[project.entry-points.bub]
kapy = "bub_kapy.plugin"

[build-system]
requires = ["uv_build>=0.9.7,<0.10.0"]
build-backend = "uv_build"
3 changes: 3 additions & 0 deletions packages/bub-kapy/src/bub_kapy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .plugin import KapyModel, run_model

__all__ = ["KapyModel", "run_model"]
212 changes: 212 additions & 0 deletions packages/bub-kapy/src/bub_kapy/plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
from __future__ import annotations

import asyncio
import contextlib
import json
import shlex
from collections.abc import AsyncIterator, Generator
from pathlib import Path

from bub import hookimpl
from bub.types import State
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict

THREADS_FILE = ".bub-kapy-threads.json"


def workspace_from_state(state: State) -> Path:
raw = state.get("_runtime_workspace")
if isinstance(raw, str) and raw.strip():
return Path(raw).expanduser().resolve()
return Path.cwd().resolve()


def _load_thread_id(session_id: str, state: State) -> str | None:
workspace = workspace_from_state(state)
threads_file = workspace / THREADS_FILE
with contextlib.suppress(FileNotFoundError):
with threads_file.open() as f:
threads = json.load(f)
value = threads.get(session_id)
if isinstance(value, str) and value.strip():
return value
return None


def _save_thread_id(session_id: str, thread_id: str, state: State) -> None:
workspace = workspace_from_state(state)
threads_file = workspace / THREADS_FILE
if threads_file.exists():
with threads_file.open() as f:
threads = json.load(f)
else:
threads = {}
threads[session_id] = thread_id
with threads_file.open("w") as f:
json.dump(threads, f, indent=2, sort_keys=True)


def _copy_bub_skills(workspace: Path) -> list[Path]:
with contextlib.suppress(ImportError):
import bub_skills

workspace.joinpath(".agents/skills").mkdir(parents=True, exist_ok=True)
collected_symlinks: list[Path] = []
for skill_root in bub_skills.__path__:
for skill_dir in Path(skill_root).iterdir():
if skill_dir.joinpath("SKILL.md").is_file():
symlink_path = workspace / ".agents/skills" / skill_dir.name
if not symlink_path.exists():
symlink_path.symlink_to(skill_dir, target_is_directory=True)
collected_symlinks.append(symlink_path)
return collected_symlinks
return []


@contextlib.contextmanager
def with_bub_skills(workspace: Path, enabled: bool) -> Generator[None, None, None]:
if not enabled:
yield
return
skills = _copy_bub_skills(workspace)
try:
yield
finally:
for skill in skills:
with contextlib.suppress(OSError):
skill.unlink()


class KapySettings(BaseSettings):
"""Configuration for the Kapybara bub plugin."""

model_config = SettingsConfigDict(
env_prefix="BUB_KAPY_", env_file=".env", extra="ignore"
)

command: str = Field(
default="kapybara chat --json -",
description="Shell-style command used to invoke the Kapybara agent.",
)
model: str | None = Field(default=None)
yolo_mode: bool = Field(default=False)
prompt_mode: str = Field(default="stdin")
resume_format: str = Field(default="resume {thread_id}")
bubble_wrap_prompt: bool = Field(default=True)
copy_skills: bool = Field(default=True)


kapy_settings = KapySettings()


def _build_command(
prompt: str, session_id: str, state: State, settings: KapySettings
) -> tuple[list[str], bytes | None]:
workspace = workspace_from_state(state)
thread_id = _load_thread_id(session_id, state)
command = shlex.split(settings.command)
if not command:
raise ValueError("BUB_KAPY_COMMAND must not be empty.")

if thread_id and settings.resume_format.strip():
command.extend(shlex.split(settings.resume_format.format(thread_id=thread_id)))
if settings.model:
command.extend(["--model", settings.model])
if settings.yolo_mode:
command.append("--dangerously-bypass-approvals-and-sandbox")

prompt_text = prompt
if settings.bubble_wrap_prompt:
prompt_text = (
"You are Kapybara operating through bub. Reply as Kapybara and treat "
"this as the active user request.\n\n"
f"{prompt}"
)

if settings.prompt_mode == "argv":
command.append(prompt_text)
stdin = None
elif settings.prompt_mode == "stdin":
stdin = prompt_text.encode()
else:
raise ValueError("BUB_KAPY_PROMPT_MODE must be either 'stdin' or 'argv'.")

if "-" in command and stdin is None:
command.remove("-")

# Resolve relative executable paths from the runtime workspace.
if command[0].startswith("."):
command[0] = str((workspace / command[0]).resolve())
return command, stdin


def _extract_thread_id(output: str) -> tuple[str | None, str]:
lines = output.splitlines()
if not lines:
return None, output
first_line = lines[0].strip()
if not first_line:
return None, output
with contextlib.suppress(json.JSONDecodeError):
payload = json.loads(first_line)
if isinstance(payload, dict):
for key in ("thread_id", "session_id", "conversation_id"):
value = payload.get(key)
if isinstance(value, str) and value.strip():
remaining = "\n".join(lines[1:]).strip()
return value, remaining
return None, output


async def _invoke_kapybara(prompt: str, session_id: str, state: State) -> str:
workspace = workspace_from_state(state)
try:
command, stdin = _build_command(prompt, session_id, state, kapy_settings)
except ValueError as exc:
return f"bub-kapy configuration error: {exc}"

try:
with with_bub_skills(workspace, kapy_settings.copy_skills):
process = await asyncio.create_subprocess_exec(
*command,
stdin=asyncio.subprocess.PIPE if stdin is not None else None,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(workspace),
)
stdout, stderr = await process.communicate(stdin)
except FileNotFoundError:
return (
"bub-kapy could not start the Kapybara runtime. "
f"Configured command not found: {command[0]!r}. "
"Set BUB_KAPY_COMMAND to the installed Kapybara CLI."
)

stdout_text = stdout.decode() if stdout else ""
stderr_text = stderr.decode() if stderr else ""

thread_id, cleaned_stdout = _extract_thread_id(stdout_text)
if thread_id:
_save_thread_id(session_id, thread_id, state)

output_blocks = [block for block in (cleaned_stdout, stderr_text.strip()) if block]
if process.returncode:
output_blocks.append(f"Kapybara process exited with code {process.returncode}.")
return "\n".join(output_blocks).strip()


class KapyModel:
"""Compatibility wrapper for direct tests and ad-hoc use."""

def __init__(self, session_id: str = "default", state: State | None = None) -> None:
self.session_id = session_id
self.state = state or {}

async def run_model(self, prompt: str) -> AsyncIterator[str]:
yield await _invoke_kapybara(prompt, self.session_id, self.state)


@hookimpl
async def run_model(prompt: str, session_id: str, state: State) -> str:
return await _invoke_kapybara(prompt, session_id, state)
1 change: 1 addition & 0 deletions packages/bub-kapy/src/bub_kapy/py.typed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

69 changes: 69 additions & 0 deletions packages/bub-kapy/tests/test_smoke.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import asyncio
import json
import os
import sys
import types
from pathlib import Path
from unittest.mock import patch

# Add src to path for direct testing without install.
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))

bub_module = types.ModuleType("bub")
bub_module.hookimpl = lambda func: func
sys.modules.setdefault("bub", bub_module)

bub_types_module = types.ModuleType("bub.types")
bub_types_module.State = dict
sys.modules.setdefault("bub.types", bub_types_module)

from bub_kapy import plugin


class FakeProcess:
def __init__(self, stdout: bytes, stderr: bytes = b"", returncode: int = 0) -> None:
self._stdout = stdout
self._stderr = stderr
self.returncode = returncode

async def communicate(self, stdin: bytes | None = None) -> tuple[bytes, bytes]:
assert stdin is not None
assert b"Test prompt" in stdin
return self._stdout, self._stderr


async def test_kapy_model() -> None:
workspace = Path(os.getcwd()) / ".tmp-kapy-test"
workspace.mkdir(exist_ok=True)
state = {"_runtime_workspace": str(workspace)}

created_commands: list[list[str]] = []

async def fake_exec(*args, **kwargs):
created_commands.append(list(args))
return FakeProcess(
b'{"thread_id":"kapy-thread-1"}\nKapybara reply',
b"",
0,
)

with patch.object(plugin.kapy_settings, "command", "kapybara chat --json -"), patch.object(
plugin.kapy_settings, "resume_format", "resume {thread_id}"
), patch.object(plugin.kapy_settings, "copy_skills", False), patch(
"bub_kapy.plugin.asyncio.create_subprocess_exec", new=fake_exec
):
model = plugin.KapyModel(session_id="session-1", state=state)
results = []
async for chunk in model.run_model("Test prompt"):
results.append(chunk)

output = "".join(results)
assert output == "Kapybara reply"
assert created_commands == [["kapybara", "chat", "--json", "-"]]

threads_file = workspace / plugin.THREADS_FILE
assert json.loads(threads_file.read_text()) == {"session-1": "kapy-thread-1"}


if __name__ == "__main__":
asyncio.run(test_kapy_model())
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ requires-python = ">=3.12"
dependencies = [
"bub",
"bub-codex",
"bub-kapy",
"bub-discord",
"bub-schedule",
"bub-tg-feed",
Expand All @@ -19,5 +20,6 @@ members = ["packages/*"]
bub = { git = "https://github.com/bubbuild/bub.git" }
bub-tg-feed = { workspace = true }
bub-codex = { workspace = true }
bub-kapy = { workspace = true }
bub-discord = { workspace = true }
bub-schedule = { workspace = true }