From d2c9832d80e448e71223a58ee13bdb7912d13c3d Mon Sep 17 00:00:00 2001 From: John Toman Date: Mon, 2 Feb 2026 13:41:14 -0800 Subject: [PATCH 1/5] api changes --- graph.py | 162 ++++++++++++++++++++++++++++++++- summary.py | 2 +- tools/schemas.py | 45 ++++++++++ tools/vfs.py | 230 +++++++++++++++++++++++++++++++++++------------ 4 files changed, 379 insertions(+), 60 deletions(-) create mode 100644 tools/schemas.py diff --git a/graph.py b/graph.py index 0946e89..e104bd7 100644 --- a/graph.py +++ b/graph.py @@ -13,7 +13,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from typing import Optional, List, TypedDict, Annotated, Literal, TypeVar, Type, Protocol, cast, Any, Tuple, NotRequired, Iterable, Callable, Generator, Awaitable, Coroutine +from typing import Optional, List, TypedDict, Annotated, Literal, TypeVar, Type, Protocol, cast, Any, Tuple, NotRequired, Iterable, Generic, Callable, Generator, Awaitable, Coroutine from langchain_core.messages import ToolMessage, AnyMessage, SystemMessage, HumanMessage, BaseMessage, AIMessage, RemoveMessage from langchain_core.tools import InjectedToolCallId, BaseTool from langchain_core.language_models.base import LanguageModelInput @@ -409,6 +409,166 @@ def __call__( SplitTool = tuple[dict[str, Any], BaseTool] +_BStateT = TypeVar("_BStateT", bound=MessagesState | None) +_BStateBind = TypeVar("_BStateBind", bound=MessagesState) + +_BContextT = TypeVar("_BContextT", bound=StateLike | None) +_BContextBind = TypeVar("_BContextBind", bound=StateLike) + +_BInputT = TypeVar("_BInputT", bound=FlowInput | None) +_BInputTBind = TypeVar("_BInputTBind", bound=FlowInput) + + +class TemplateLoader(Protocol): + def __call__(self, template_name: str, **kwargs: Any) -> str: ... + + +class Builder( + Generic[_BStateT, _BContextT, _BInputT] +): + def __init__(self): + self._initial_prompt : str | None = None + self._sys_prompt : str | None = None + + self._summary_config : SummaryConfig[_BStateT] | None = None + self._unbound_llm : BaseChatModel | None = None + + self._state_class: type[_BStateT] | None = None + self._input_type : type[_BInputT] | None = None + self._context_type : type[_BContextT] | None = None + self._output_key : str | None = None + self._tools : list[BaseTool | SplitTool] = [] + self._loader : TemplateLoader | None = None + + def _copy_untyped_to_(self, other: "Builder[Any, Any, Any]"): + other._initial_prompt = self._initial_prompt + other._sys_prompt = self._sys_prompt + other._unbound_llm = self._unbound_llm + other._output_key = self._output_key + other._tools.extend(self._tools) + other._loader = self._loader + + def _copy_typed_to(self, other: "Builder[_BStateT, _BContextT, _BInputT]"): + other._state_class = self._state_class + other._context_type = self._context_type + other._input_type = self._input_type + other._summary_config = self._summary_config + + def with_state(self, t: type[_BStateBind]) -> "Builder[_BStateBind, _BContextT, _BInputT]": + to_ret: "Builder[_BStateBind, _BContextT, _BInputT]" = Builder() + self._copy_untyped_to_(to_ret) + to_ret._state_class = t + to_ret._context_type = self._context_type + to_ret._input_type = self._input_type + return to_ret + + def with_context(self, t: type[_BContextBind]) -> "Builder[_BStateT, _BContextBind, _BInputT]": + to_ret: "Builder[_BStateT, _BContextBind, _BInputT]" = Builder() + self._copy_untyped_to_(to_ret) + to_ret._state_class = self._state_class + to_ret._context_type = t + to_ret._input_type = self._input_type + to_ret._summary_config = self._summary_config + return to_ret + + def with_input(self, t: type[_BInputTBind]) -> "Builder[_BStateT, _BContextT, _BInputTBind]": + to_ret: "Builder[_BStateT, _BContextT, _BInputTBind]" = Builder() + self._copy_untyped_to_(to_ret) + to_ret._state_class = self._state_class + to_ret._context_type = self._context_type + to_ret._input_type = t + to_ret._summary_config = self._summary_config + return to_ret + + def with_initial_prompt(self, prompt: str) -> "Builder[_BStateT, _BContextT, _BInputT]": + to_ret: "Builder[_BStateT, _BContextT, _BInputT]" = Builder() + self._copy_untyped_to_(to_ret) + self._copy_typed_to(to_ret) + to_ret._initial_prompt = prompt + return to_ret + + def with_initial_prompt_template(self, template: str, **kwargs) -> "Builder[_BStateT, _BContextT, _BInputT]": + if self._loader is None: + raise ValueError("No loader configured. Use with_loader first.") + return self.with_initial_prompt(self._loader(template, **kwargs)) + + def with_sys_prompt(self, prompt: str) -> "Builder[_BStateT, _BContextT, _BInputT]": + to_ret: "Builder[_BStateT, _BContextT, _BInputT]" = Builder() + self._copy_untyped_to_(to_ret) + self._copy_typed_to(to_ret) + to_ret._sys_prompt = prompt + return to_ret + + def with_sys_prompt_template(self, template: str, **kwargs) -> "Builder[_BStateT, _BContextT, _BInputT]": + if self._loader is None: + raise ValueError("No loader configured. Use with_loader first.") + return self.with_sys_prompt(self._loader(template, **kwargs)) + + def with_loader(self, loader: TemplateLoader) -> "Builder[_BStateT, _BContextT, _BInputT]": + to_ret: "Builder[_BStateT, _BContextT, _BInputT]" = Builder() + self._copy_untyped_to_(to_ret) + self._copy_typed_to(to_ret) + to_ret._loader = loader + return to_ret + + def with_llm(self, llm: BaseChatModel) -> "Builder[_BStateT, _BContextT, _BInputT]": + to_ret: "Builder[_BStateT, _BContextT, _BInputT]" = Builder() + self._copy_untyped_to_(to_ret) + self._copy_typed_to(to_ret) + to_ret._unbound_llm = llm + return to_ret + + def with_output_key(self, key: str) -> "Builder[_BStateT, _BContextT, _BInputT]": + to_ret: "Builder[_BStateT, _BContextT, _BInputT]" = Builder() + self._copy_untyped_to_(to_ret) + self._copy_typed_to(to_ret) + to_ret._output_key = key + return to_ret + + def with_summary_config(self, config: SummaryConfig[_BStateT]) -> "Builder[_BStateT, _BContextT, _BInputT]": + to_ret: "Builder[_BStateT, _BContextT, _BInputT]" = Builder() + self._copy_untyped_to_(to_ret) + self._copy_typed_to(to_ret) + to_ret._summary_config = config + return to_ret + + def with_default_summarizer(self, *, max_messages: int = 20, enabled: bool = True) -> "Builder[_BStateT, _BContextT, _BInputT]": + return self.with_summary_config(SummaryConfig(max_messages=max_messages, enabled=enabled)) + + def with_tools(self, l: Iterable[BaseTool | SplitTool]) -> "Builder[_BStateT, _BContextT, _BInputT]": + to_ret: "Builder[_BStateT, _BContextT, _BInputT]" = Builder() + self._copy_typed_to(to_ret) + self._copy_untyped_to_(to_ret) + to_ret._tools.extend(l) + return to_ret + + def build(self) -> Tuple["StateGraph[_BStateT, _BContextT, _BInputT, Any]", BoundLLM]: #type: ignore + if self._state_class is None: + raise ValueError("state_class is required") + if self._input_type is None: + raise ValueError("input_type is required") + if self._sys_prompt is None: + raise ValueError("sys_prompt is required") + if self._initial_prompt is None: + raise ValueError("initial_prompt is required") + if self._output_key is None: + raise ValueError("output_key is required") + if self._unbound_llm is None: + raise ValueError("unbound_llm is required") + + return build_workflow( + state_class=self._state_class, #type: ignore + input_type=self._input_type, #type: ignore + tools_list=self._tools, + sys_prompt=self._sys_prompt, + initial_prompt=self._initial_prompt, + output_key=self._output_key, + unbound_llm=self._unbound_llm, + context_schema=self._context_type, + summary_config=self._summary_config, #type: ignore + ) + + def build_workflow( state_class: Type[StateT], input_type: Type[InputState], diff --git a/summary.py b/summary.py index 4bd0b7c..bfb5182 100644 --- a/summary.py +++ b/summary.py @@ -17,7 +17,7 @@ from typing import Generic, TypeVar -StateT = TypeVar("StateT") +StateT = TypeVar("StateT", contravariant=True) logger = logging.getLogger(__name__) diff --git a/tools/schemas.py b/tools/schemas.py new file mode 100644 index 0000000..c98abbb --- /dev/null +++ b/tools/schemas.py @@ -0,0 +1,45 @@ +from typing import Generic, TypeVar, Annotated, Any + +from pydantic import BaseModel + +from langchain_core.tools import InjectedToolCallId +from langgraph.prebuilt import InjectedState +from langgraph.types import Command +from langchain_core.tools import StructuredTool, BaseTool + +ST = TypeVar("ST") + +T_RES = TypeVar("T_RES", bound=str | Command) + +class WithInjectedState(BaseModel, Generic[ST]): + state: Annotated[ST, InjectedState] + +class WithInjectedId(BaseModel): + tool_call_id: Annotated[str, InjectedToolCallId] + +class WithImplementation(BaseModel, Generic[T_RES]): + def run(self) -> T_RES: + """Override this method to implement the tool logic.""" + raise NotImplementedError("Subclasses must implement run()") + + @classmethod + def as_tool( + cls, + name: str + ) -> BaseTool: + impl_method = getattr(cls, "run") + + # Simple wrapper - just accept kwargs, instantiate model, call method + def wrapper(**kwargs: Any) -> Any: + instance = cls(**kwargs) + return impl_method(instance) + + return StructuredTool.from_function( + func=wrapper, + args_schema=cls, + description=cls.__doc__, + name=name, + ) + +class InjectAll(WithInjectedState[ST], WithInjectedId): + pass diff --git a/tools/vfs.py b/tools/vfs.py index 55bdea5..afbc2e5 100644 --- a/tools/vfs.py +++ b/tools/vfs.py @@ -27,9 +27,84 @@ from langchain_core.tools.base import BaseTool from langgraph.prebuilt import InjectedState from langgraph.types import Command +from ..graph import FlowInput from ..graph import tool_output + +def _copy_base_doc[T](cls: T) -> T: + """Decorator to copy __doc__ from the first base class.""" + for base in cls.__bases__: # type: ignore + if base.__doc__: + cls.__doc__ = base.__doc__ + break + return cls + + +# returns true if the file is okay to access +def _make_checker(patt: str | None) -> Callable[[str], bool]: + if patt is None: + return lambda f_name: True + match = re.compile(patt) + return lambda f_name: match.fullmatch(f_name) is None + + +def _grep_impl( + search_string: str, + file_contents: Iterator[tuple[str, str]], + check_allowed: Callable[[str], bool] +) -> str: + """ + Generic grep implementation over file contents. + + Args: + search_string: Regex pattern to search for + file_contents: Iterator of (filename, content) tuples + check_allowed: Filter function for allowed filenames + + Returns: + Newline-separated list of matching filenames, or error message + """ + try: + pattern = re.compile(search_string) + except Exception: + return "Illegal pattern name, check your syntax and try again." + + matches: list[str] = [] + for filename, content in file_contents: + if not check_allowed(filename): + continue + if pattern.search(content) is not None: + matches.append(filename) + + return "\n".join(matches) + + +class _GetFileSchemaBase(BaseModel): + """ + Read the contents of the VFS at some relative path. + + If the path doesn't exist, this function returns "File not found" + """ + path: str = Field(description="The relative path of the file on the VFS. IMPORTANT: Do NOT include a leading `./` it is implied") + + +class _ListFileSchemaBase(BaseModel): + """ + Lists all file contents of the VFS, including in any subdirectories. Directory entries are *not* included. + Each file in the VFS has its own line in the output, any empty lines should be ignored. + """ + pass + + +class _GrepFileSchemaBase(BaseModel): + """ + Search for a specific string in the files on the VFS. Returns a list of + file names which contain the query somewhere in their contents. Matching + file names are output one per line. Empty lines should be ignored. + """ + search_string: str = Field(description="The query string to search for provided as a python regex. Thus, you must escape any special characters (like [, |, etc.)") + def merge_vfs(left: dict[str, str], right: dict[str, str]) -> dict[str, str]: new_left = left.copy() for (f_name, cont) in right.items(): @@ -40,6 +115,9 @@ def merge_vfs(left: dict[str, str], right: dict[str, str]) -> dict[str, str]: class VFSState(TypedDict): vfs: Annotated[dict[str, str], merge_vfs] +class VFSInput(FlowInput): + vfs: dict[str, str] + InputType = TypeVar("InputType", bound=VFSState) StateVar = TypeVar("StateVar", contravariant=True) @@ -170,15 +248,8 @@ class PutFileSchema(BaseModel): PutFileSchema.__doc__ = pf_doc - # returns true if the file is okay to put or get - def make_checker(patt: str | None) -> Callable[[str], bool]: - if patt is None: - return lambda f_name: True - match = re.compile(patt) - return lambda f_name: match.fullmatch(f_name) is None - - put_filter = make_checker(conf.get("forbidden_write")) - get_filter = make_checker(conf.get("forbidden_read")) + put_filter = _make_checker(conf.get("forbidden_write")) + get_filter = _make_checker(conf.get("forbidden_read")) @tool(args_schema=PutFileSchema) def put_file( @@ -196,13 +267,9 @@ def put_file( ) @inject(doc_extra=conf.get('get_doc_extra')) - class GetFileSchema(BaseModel): - """ - Read the contents of the VFS at some relative path. - - If the path doesn't exist, this function returns "File not found" - """ - path: str = Field(description="The relative path of the file on the VFS. IMPORTANT: Do NOT include a leading `./` it is implied") + @_copy_base_doc + class GetFileSchema(_GetFileSchemaBase): + pass @tool(args_schema=GetFileSchema) def get_file( @@ -233,11 +300,8 @@ def list_underlying() -> Sequence[str]: return [str(f.relative_to(base)) for f in base.rglob("*") if f.is_file()] @inject() - class ListFileSchema(BaseModel): - """ - Lists all file contents of the VFS, including in any subdirectories. Directory entries are *not* included. - Each file in the VFS has its own line in the output, any empty lines should be ignored. - """ + @_copy_base_doc + class ListFileSchema(_ListFileSchemaBase): pass @tool(args_schema=ListFileSchema) @@ -256,49 +320,32 @@ def list_files( return "\n".join(to_ret) @inject() - class GrepFileSchema(BaseModel): - """ - Search for a specific string in the files on the VFS. Returns a list of - file names which contain the query somewhere in their contents. Matching - file names are output one per line. Empty lines should be ignored. - """ - - search_string: str = Field(description="The query string to search for provided as a python regex. Thus, you must escape any special characters (like [, |, etc.)") + @_copy_base_doc + class GrepFileSchema(_GrepFileSchemaBase): + pass @tool(args_schema=GrepFileSchema) def grep_files( state: Annotated[InputType, InjectedState], search_string: str ) -> str: - comp: re.Pattern - try: - comp = re.compile(search_string) - except Exception: - return "Illegal pattern name, check your syntax and try again." - - matches: list[str] = [] - - for (k, v) in state["vfs"].items(): - if not get_filter(k): - continue - if comp.search(v) is not None: - matches.append(k) - - if (layer := conf.get("fs_layer", None)) is not None: - p = pathlib.Path(layer) - for f in p.rglob("*"): - if not f.is_file(): - continue - rel_name = str(f.relative_to(p)) - if not get_filter(rel_name): - continue - if rel_name in state["vfs"]: - continue - if not comp.search(f.read_text()): - continue - matches.append(rel_name) - - return "\n".join(matches) + def file_contents() -> Iterator[tuple[str, str]]: + for (k, v) in state["vfs"].items(): + yield (k, v) + if (layer := conf.get("fs_layer", None)) is not None: + p = pathlib.Path(layer) + for f in p.rglob("*"): + if not f.is_file(): + continue + rel_name = str(f.relative_to(p)) + if rel_name in state["vfs"]: + continue + try: + yield (rel_name, f.read_text()) + except Exception: + continue + + return _grep_impl(search_string, file_contents(), get_filter) tools: list[BaseTool] = [get_file, list_files, grep_files] if not conf["immutable"]: @@ -307,3 +354,70 @@ def grep_files( materializer = _VFSAccess[InputType](conf=conf) return (tools, materializer) + + +def fs_tools(fs_layer: str, forbidden_read: str | None = None) -> list[BaseTool]: + """ + Create stateless file system tools that operate directly on a directory. + + Unlike vfs_tools, these tools don't use langgraph state - they simply + read from the provided filesystem path. Useful for immutable file access + where no VFS overlay is needed. + + Args: + fs_layer: Path to the directory to expose + forbidden_read: Optional regex pattern for paths that cannot be read + + Returns: + List of tools: [get_file, list_files, grep_files] + """ + base_path = pathlib.Path(fs_layer) + check_allowed = _make_checker(forbidden_read) + + @cache + def list_all_files() -> Sequence[str]: + return [str(f.relative_to(base_path)) for f in base_path.rglob("*") if f.is_file()] + + @_copy_base_doc + class GetFileSchema(_GetFileSchemaBase): + pass + + @tool(args_schema=GetFileSchema) + def get_file(path: str) -> str: + if not check_allowed(path): + return "File not found" + child = base_path / path + if child.is_file(): + try: + return child.read_text() + except Exception: + return "File not found" + return "File not found" + + @_copy_base_doc + class ListFileSchema(_ListFileSchemaBase): + pass + + @tool(args_schema=ListFileSchema) + def list_files() -> str: + return "\n".join(f for f in list_all_files() if check_allowed(f)) + + @_copy_base_doc + class GrepFileSchema(_GrepFileSchemaBase): + pass + + @tool(args_schema=GrepFileSchema) + def grep_files(search_string: str) -> str: + def file_contents() -> Iterator[tuple[str, str]]: + for f in base_path.rglob("*"): + if not f.is_file(): + continue + rel_name = str(f.relative_to(base_path)) + try: + yield (rel_name, f.read_text()) + except Exception: + continue + + return _grep_impl(search_string, file_contents(), check_allowed) + + return [get_file, list_files, grep_files] From 8cc3251c41788160fcdc284e9a30be867f4514ae Mon Sep 17 00:00:00 2001 From: John Toman Date: Mon, 2 Feb 2026 16:24:54 -0800 Subject: [PATCH 2/5] Simplification --- tools/results.py | 73 ++---------------------------------------------- 1 file changed, 2 insertions(+), 71 deletions(-) diff --git a/tools/results.py b/tools/results.py index 70ab37a..eed4929 100644 --- a/tools/results.py +++ b/tools/results.py @@ -34,7 +34,7 @@ def result_tool_generator( outkey: str, result_schema: type[M], doc: str, - validator: tuple[type[ST], Callable[[ST, M, str], ValidationResult]] + validator: tuple[type[ST], Callable[[ST, M, str], ValidationResult]] | Callable[[M, str], ValidationResult] | None = None ) -> BaseTool: """ Generates a tool that can be used to complete a workflow @@ -53,61 +53,12 @@ def result_tool_generator( """ ... -@overload -def result_tool_generator( - outkey: str, - result_schema: type[M], - doc: str, - validator: Callable[[M, str], ValidationResult] -) -> BaseTool: - """ - Generates a tool that can be used to complete a workflow - Args: - outkey (str): The name of the key in the state which holds the result, and whose presence signals - completion - result_schema (type[M]): A BaseModel type which is the type of the completed state. Each field of this - basemodel becomes a field in the generated tool schema, and so these fields SHOULD have string descriptions. - doc (str): The documentation to use for the generated tool - validator (Callable[[M, str], ValidationResult]): A validator which simply accepts the resultant basemodel - and the current tool call id, and return None if there is no issue, otherwise it may return a string - (which is returned as the result of the tool call WITHOUT setting outkey), or it may return an arbitrary command. - - Returns: - BaseTool: The generated result tool - """ - ... - - @overload def result_tool_generator( outkey: str, result_schema: tuple[type[R], str], doc: str, - validator: Callable[[R, str], ValidationResult] -) -> BaseTool: - """ - Generates a tool that can be used to complete a workflow - Args: - outkey (str): The name of the key in the state which holds the result, and whose presence signals - completion - result_schema (tuple[type[R], str]): A tuple of the desired result type, and a description of what the output - should be. - doc (str): The documentation to use for the generated tool - validator (Callable[[R, str], ValidationResult]): A validator which simply accepts the resultant value - and the current tool call id, and return None if there is no issue, otherwise it may return a string - (which is returned as the result of the tool call WITHOUT setting outkey), or it may return an arbitrary command. - - Returns: - BaseTool: The generated result tool - """ - ... - -@overload -def result_tool_generator( - outkey: str, - result_schema: tuple[type[R], str], - doc: str, - validator: tuple[type[ST], Callable[[ST, R, str], ValidationResult]] + validator: tuple[type[ST], Callable[[ST, R, str], ValidationResult]] | Callable[[R, str], ValidationResult] | None = None ) -> BaseTool: """ Generates a tool that can be used to complete a workflow @@ -127,26 +78,6 @@ def result_tool_generator( """ ... -@overload -def result_tool_generator( - outkey: str, - result_schema: type[BaseModel] | tuple[type, str], - doc: str, -) -> BaseTool: - """ - Generates a tool that can be used to complete a workflow - Args: - outkey (str): The name of the key in the state which holds the result, and whose presence signals - completion - result_schema (type[BaseModel] | tuple[type, str]): Either a BaseModel type (where each field becomes - a field in the generated tool schema) or a tuple of the desired result type and description. - doc (str): The documentation to use for the generated tool - - Returns: - BaseTool: The generated result tool - """ - ... - def result_tool_generator( outkey: str, result_schema: type[BaseModel] | tuple[type, str], From 75a6997cd2e3722d39c50449974e7dfbb5abf7b1 Mon Sep 17 00:00:00 2001 From: John Toman Date: Tue, 3 Feb 2026 10:26:09 -0800 Subject: [PATCH 3/5] async builders --- graph.py | 22 +++++++++++++++++++--- tools/schemas.py | 25 +++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/graph.py b/graph.py index e104bd7..97ca4af 100644 --- a/graph.py +++ b/graph.py @@ -541,8 +541,8 @@ def with_tools(self, l: Iterable[BaseTool | SplitTool]) -> "Builder[_BStateT, _B self._copy_untyped_to_(to_ret) to_ret._tools.extend(l) return to_ret - - def build(self) -> Tuple["StateGraph[_BStateT, _BContextT, _BInputT, Any]", BoundLLM]: #type: ignore + + def _build_internal(self, r: _ResultFact, i: _InitialFact, s: _SummarizerFact) -> Tuple["StateGraph[_BStateT, _BContextT, _BInputT, Any]", BoundLLM]: #type: ignore if self._state_class is None: raise ValueError("state_class is required") if self._input_type is None: @@ -556,7 +556,7 @@ def build(self) -> Tuple["StateGraph[_BStateT, _BContextT, _BInputT, Any]", Boun if self._unbound_llm is None: raise ValueError("unbound_llm is required") - return build_workflow( + return _build_workflow( state_class=self._state_class, #type: ignore input_type=self._input_type, #type: ignore tools_list=self._tools, @@ -566,8 +566,24 @@ def build(self) -> Tuple["StateGraph[_BStateT, _BContextT, _BInputT, Any]", Boun unbound_llm=self._unbound_llm, context_schema=self._context_type, summary_config=self._summary_config, #type: ignore + init_fact=i, + result_fact=r, + summary_fact=s ) + def build(self) -> Tuple["StateGraph[_BStateT, _BContextT, _BInputT, Any]", BoundLLM]: #type: ignore + return self._build_internal( + s=get_summarizer, + i=initial_node, + r=tool_result_generator + ) + + def build_async(self) -> Tuple["StateGraph[_BStateT, _BContextT, _BInputT, Any]", BoundLLM]: #type: ignore + return self._build_internal( + s=get_async_summarizer, + i=async_initial_node, + r=async_tool_result_generator + ) def build_workflow( state_class: Type[StateT], diff --git a/tools/schemas.py b/tools/schemas.py index c98abbb..8157515 100644 --- a/tools/schemas.py +++ b/tools/schemas.py @@ -40,6 +40,31 @@ def wrapper(**kwargs: Any) -> Any: description=cls.__doc__, name=name, ) + +class WithAsyncImplementation(BaseModel, Generic[T_RES]): + async def run(self) -> T_RES: + """Override this method to implement the tool logic.""" + raise NotImplementedError("Subclasses must implement run()") + + @classmethod + def as_tool( + cls, + name: str + ) -> BaseTool: + impl_method = getattr(cls, "run") + + # Simple wrapper - just accept kwargs, instantiate model, call method + def wrapper(**kwargs: Any) -> Any: + instance = cls(**kwargs) + return impl_method(instance) + + return StructuredTool.from_function( + func=wrapper, + args_schema=cls, + description=cls.__doc__, + name=name, + ) + class InjectAll(WithInjectedState[ST], WithInjectedId): pass From a04e46e3de727d8734d57458c734fe4a70d08742 Mon Sep 17 00:00:00 2001 From: John Toman Date: Wed, 4 Feb 2026 15:06:08 -0800 Subject: [PATCH 4/5] sigh --- graph.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/graph.py b/graph.py index 97ca4af..80a150a 100644 --- a/graph.py +++ b/graph.py @@ -568,7 +568,8 @@ def _build_internal(self, r: _ResultFact, i: _InitialFact, s: _SummarizerFact) - summary_config=self._summary_config, #type: ignore init_fact=i, result_fact=r, - summary_fact=s + summary_fact=s, + output_schema=None ) def build(self) -> Tuple["StateGraph[_BStateT, _BContextT, _BInputT, Any]", BoundLLM]: #type: ignore From 0beea5860fcb393eaac38ba34ba177b4170fe03c Mon Sep 17 00:00:00 2001 From: John Toman Date: Wed, 11 Feb 2026 14:24:38 -0800 Subject: [PATCH 5/5] Bug fixes, QOL * Use actual async implementation (haha woops) * Make things optional to more resilient to the llm messing up * Add fast path for just compiling the graph --- graph.py | 15 +++++++++++++++ tools/schemas.py | 7 ++++--- tools/vfs.py | 41 +++++++++++++++++++++++------------------ 3 files changed, 42 insertions(+), 21 deletions(-) diff --git a/graph.py b/graph.py index 80a150a..23537d9 100644 --- a/graph.py +++ b/graph.py @@ -20,6 +20,8 @@ from langchain_core.language_models.chat_models import BaseChatModel from langchain_core.runnables import Runnable from langgraph.graph import StateGraph, MessagesState +from langgraph.graph.state import CompiledStateGraph +from langgraph.types import Checkpointer from langgraph._internal._typing import StateLike from langgraph.types import Command from langgraph.prebuilt import ToolNode @@ -585,6 +587,19 @@ def build_async(self) -> Tuple["StateGraph[_BStateT, _BContextT, _BInputT, Any]" i=async_initial_node, r=async_tool_result_generator ) + + def compile_async( + self, *, + checkpointer: Checkpointer = None + ) -> CompiledStateGraph[_BStateT, _BContextT, _BInputT, Any]: #type: ignore + return self.build_async()[0].compile( + checkpointer=checkpointer + ) + + def compile(self, checkpointer: Checkpointer = None) -> CompiledStateGraph[_BStateT, _BContextT, _BInputT, Any]: #type: ignore + return self.build()[0].compile( + checkpointer=checkpointer + ) def build_workflow( state_class: Type[StateT], diff --git a/tools/schemas.py b/tools/schemas.py index 8157515..9be7168 100644 --- a/tools/schemas.py +++ b/tools/schemas.py @@ -54,12 +54,13 @@ def as_tool( impl_method = getattr(cls, "run") # Simple wrapper - just accept kwargs, instantiate model, call method - def wrapper(**kwargs: Any) -> Any: + async def wrapper(**kwargs: Any) -> Any: instance = cls(**kwargs) - return impl_method(instance) + d = await impl_method(instance) + return d return StructuredTool.from_function( - func=wrapper, + coroutine=wrapper, args_schema=cls, description=cls.__doc__, name=name, diff --git a/tools/vfs.py b/tools/vfs.py index 78e846f..c78504c 100644 --- a/tools/vfs.py +++ b/tools/vfs.py @@ -48,6 +48,20 @@ def _make_checker(patt: str | None) -> Callable[[str], bool]: match = re.compile(patt) return lambda f_name: match.fullmatch(f_name) is None +class FileRange(BaseModel): + start_line: int = Field(description="The line to start reading from; lines are numbered starting from 1.") + end_line: int = Field(description="The line to read until EXCLUSIVE.") + +def _get_file(cont: str | None, range: FileRange | None) -> str: + if cont is None: + return "File not found" + if not range: + return cont + start = range.start_line - 1 + to_ret = cont.splitlines()[start:range.end_line - 1] + return "\n".join(to_ret) + + def _grep_impl( search_string: str, @@ -78,8 +92,8 @@ def _grep_impl( match_set = None if not match_in else set(match_in) should_search = \ - lambda _: True if match_set is None else \ - lambda f: f in match_set + (lambda _: True) if match_set is None else \ + (lambda f: f in match_set) for (k, v) in file_contents: if not should_search(k): @@ -103,9 +117,6 @@ def _grep_impl( return "\n".join(matched_lines) -class FileRange(BaseModel): - start_line: int = Field(description="The line to start reading from; lines are numbered starting from 1.") - end_line: int = Field(description="The line to read until EXCLUSIVE.") class _GetFileSchemaBase(BaseModel): """ @@ -114,7 +125,7 @@ class _GetFileSchemaBase(BaseModel): If the path doesn't exist, this function returns "File not found". """ path: str = Field(description="The relative path of the file on the VFS. IMPORTANT: Do NOT include a leading `./` it is implied") - range: FileRange | None = Field(description="If set, (start, end) indicates to return lines starting from line `start` (lines are 1 indexed) until `end` (exclusive). If unset, the entire file is returned.") + range: FileRange | None = Field(description="If set, (start, end) indicates to return lines starting from line `start` (lines are 1 indexed) until `end` (exclusive). If unset, the entire file is returned.", default=None) class _ListFileSchemaBase(BaseModel): @@ -335,17 +346,11 @@ class GetFileSchema(_GetFileSchemaBase): @tool(args_schema=GetFileSchema) def get_file( path: str, - range: FileRange | None, - state: Annotated[InputType, InjectedState] + state: Annotated[InputType, InjectedState], + range: FileRange | None = None ) -> str: cont = _get_content(state, path) - if cont is None: - return "File not found" - if not range: - return cont - start = range.start_line - 1 - to_ret = cont.splitlines()[start:range.end_line - 1] - return "\n".join(to_ret) + return _get_file(cont, range) @cache def list_underlying() -> Sequence[str]: @@ -436,13 +441,13 @@ class GetFileSchema(_GetFileSchemaBase): pass @tool(args_schema=GetFileSchema) - def get_file(path: str) -> str: + def get_file(path: str, range: FileRange | None = None) -> str: if not check_allowed(path): return "File not found" child = base_path / path if child.is_file(): try: - return child.read_text() + return _get_file(child.read_text(), range) except Exception: return "File not found" return "File not found" @@ -463,7 +468,7 @@ class GrepFileSchema(_GrepFileSchemaBase): def grep_files( search_string: str, matching_lines: bool, - match_in: list[str] | None + match_in: list[str] | None = None ) -> str: def file_contents() -> Iterator[tuple[str, str]]: for f in base_path.rglob("*"):