Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 193 additions & 1 deletion graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from typing import Optional, List, TypedDict, Annotated, Literal, TypeVar, Type, Protocol, cast, Any, Tuple, NotRequired, Iterable, Callable, Generator, Awaitable, Coroutine
from typing import Optional, List, TypedDict, Annotated, Literal, TypeVar, Type, Protocol, cast, Any, Tuple, NotRequired, Iterable, Generic, Callable, Generator, Awaitable, Coroutine
from langchain_core.messages import ToolMessage, AnyMessage, SystemMessage, HumanMessage, BaseMessage, AIMessage, RemoveMessage
from langchain_core.tools import InjectedToolCallId, BaseTool
from langchain_core.language_models.base import LanguageModelInput
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.runnables import Runnable
from langgraph.graph import StateGraph, MessagesState
from langgraph.graph.state import CompiledStateGraph
from langgraph.types import Checkpointer
from langgraph._internal._typing import StateLike
from langgraph.types import Command
from langgraph.prebuilt import ToolNode
Expand Down Expand Up @@ -409,6 +411,196 @@ def __call__(

SplitTool = tuple[dict[str, Any], BaseTool]

_BStateT = TypeVar("_BStateT", bound=MessagesState | None)
_BStateBind = TypeVar("_BStateBind", bound=MessagesState)

_BContextT = TypeVar("_BContextT", bound=StateLike | None)
_BContextBind = TypeVar("_BContextBind", bound=StateLike)

_BInputT = TypeVar("_BInputT", bound=FlowInput | None)
_BInputTBind = TypeVar("_BInputTBind", bound=FlowInput)


class TemplateLoader(Protocol):
def __call__(self, template_name: str, **kwargs: Any) -> str: ...


class Builder(
Copy link
Copy Markdown

@certo-gco certo-gco Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that you plan to use this class as:

instance = Builder()
new_instance = instance.with_state(state)
# do something with new_instance
new_instance_with_context = new_instance.with_context(another_context)

If that's the case, new_instance_with_context would "inherit" the original new_instance's attributes by reference, due to _copy_{untyped,typed}_to_ which instantiate an empty Builder() and populate it by reference. This pattern does not preserve immutability across instances, which is what you want I suppose, that is, I guess you want new_instance_with_context to be a proxy of new_instance, with the former getting the attributes updates of the latter by reference.

Stated differently, with your current pattern (simplified):

builder1 = Builder(list, str, str).with_state(["foo", "bar"])
builder2 = builder1.with_context("some context")
builder1._state_class.append("bogus")
print(builder2._state_class)   # <-- this prints ["foo", "bar", "bogus"]

As a side note, with your current pattern, if you chain the methods "à la rust" like:

foo = Builder().with_state(state).with_context(context).with_input(input).build()

you'd instantiate 4 different objects, all of them eligible for GC, because in the end build only returns a Tuple of different objects.

My humble opinion is that you could get rid of the _copy_* pattern by simply doing something like:

def with_state(self, t: type[_BStateBind]) -> "Builder[_BStateBind, _BContextT, _BInputT]":
    self._state_class = t
    return self

if you don't need immutability. This would reduce the factory-like pattern boiler plate but also, more importantly, every time you add a new field, you don't need to remember to update these methods.

If instead you really intended to have immutable objects, then you could simply use copy for shallow copies or, better, dataclasses with .replace().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I do need immutability. there are many places where I build some "common" parameters of a builder and then use it multiple times to fill in the details. For example:

basic_builder = Builder().with_input(FlowInput).with_loader(load_jinja_template)

with_project_source = basic_builder.with_tools(fs_tools(...))

cvl_builder = basic_builder.with_tools([cvl_tools(...)]

I then use cvl_builder to build multiple workflows with different state types. Doing the changes by reference as you describe would be a disaster.

The reason for doing the explicit copying is that I believe it makes the type system happy. if I do copy or replace I don't think that let's me change the type parameters of the class I'm copying. I could always throw a type: ignore on there, but I prefer to avoid that where possible. Am I wrong?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then in that case beware of the fact that Python objects (except for a few basic types such as strings and ints) are mutable by default. If a subset of builders inherit some attribute which is, say, dict-like or list-like, any modification to such attributes in basic_builder would propagate downstream (see my former example with builder1 and builder2).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right. the only reference type I use is the tools list, which believe I am careful to copy.

Generic[_BStateT, _BContextT, _BInputT]
):
def __init__(self):
self._initial_prompt : str | None = None
self._sys_prompt : str | None = None

self._summary_config : SummaryConfig[_BStateT] | None = None
self._unbound_llm : BaseChatModel | None = None

self._state_class: type[_BStateT] | None = None
self._input_type : type[_BInputT] | None = None
self._context_type : type[_BContextT] | None = None
self._output_key : str | None = None
self._tools : list[BaseTool | SplitTool] = []
self._loader : TemplateLoader | None = None

def _copy_untyped_to_(self, other: "Builder[Any, Any, Any]"):
other._initial_prompt = self._initial_prompt
other._sys_prompt = self._sys_prompt
other._unbound_llm = self._unbound_llm
other._output_key = self._output_key
other._tools.extend(self._tools)
other._loader = self._loader

def _copy_typed_to(self, other: "Builder[_BStateT, _BContextT, _BInputT]"):
other._state_class = self._state_class
other._context_type = self._context_type
other._input_type = self._input_type
other._summary_config = self._summary_config

def with_state(self, t: type[_BStateBind]) -> "Builder[_BStateBind, _BContextT, _BInputT]":
to_ret: "Builder[_BStateBind, _BContextT, _BInputT]" = Builder()
self._copy_untyped_to_(to_ret)
to_ret._state_class = t
to_ret._context_type = self._context_type
to_ret._input_type = self._input_type
return to_ret

def with_context(self, t: type[_BContextBind]) -> "Builder[_BStateT, _BContextBind, _BInputT]":
to_ret: "Builder[_BStateT, _BContextBind, _BInputT]" = Builder()
self._copy_untyped_to_(to_ret)
to_ret._state_class = self._state_class
to_ret._context_type = t
to_ret._input_type = self._input_type
to_ret._summary_config = self._summary_config
return to_ret

def with_input(self, t: type[_BInputTBind]) -> "Builder[_BStateT, _BContextT, _BInputTBind]":
to_ret: "Builder[_BStateT, _BContextT, _BInputTBind]" = Builder()
self._copy_untyped_to_(to_ret)
to_ret._state_class = self._state_class
to_ret._context_type = self._context_type
to_ret._input_type = t
to_ret._summary_config = self._summary_config
return to_ret

def with_initial_prompt(self, prompt: str) -> "Builder[_BStateT, _BContextT, _BInputT]":
to_ret: "Builder[_BStateT, _BContextT, _BInputT]" = Builder()
self._copy_untyped_to_(to_ret)
self._copy_typed_to(to_ret)
to_ret._initial_prompt = prompt
return to_ret

def with_initial_prompt_template(self, template: str, **kwargs) -> "Builder[_BStateT, _BContextT, _BInputT]":
if self._loader is None:
raise ValueError("No loader configured. Use with_loader first.")
return self.with_initial_prompt(self._loader(template, **kwargs))

def with_sys_prompt(self, prompt: str) -> "Builder[_BStateT, _BContextT, _BInputT]":
to_ret: "Builder[_BStateT, _BContextT, _BInputT]" = Builder()
self._copy_untyped_to_(to_ret)
self._copy_typed_to(to_ret)
to_ret._sys_prompt = prompt
return to_ret

def with_sys_prompt_template(self, template: str, **kwargs) -> "Builder[_BStateT, _BContextT, _BInputT]":
if self._loader is None:
raise ValueError("No loader configured. Use with_loader first.")
return self.with_sys_prompt(self._loader(template, **kwargs))

def with_loader(self, loader: TemplateLoader) -> "Builder[_BStateT, _BContextT, _BInputT]":
to_ret: "Builder[_BStateT, _BContextT, _BInputT]" = Builder()
self._copy_untyped_to_(to_ret)
self._copy_typed_to(to_ret)
to_ret._loader = loader
return to_ret

def with_llm(self, llm: BaseChatModel) -> "Builder[_BStateT, _BContextT, _BInputT]":
to_ret: "Builder[_BStateT, _BContextT, _BInputT]" = Builder()
self._copy_untyped_to_(to_ret)
self._copy_typed_to(to_ret)
to_ret._unbound_llm = llm
return to_ret

def with_output_key(self, key: str) -> "Builder[_BStateT, _BContextT, _BInputT]":
to_ret: "Builder[_BStateT, _BContextT, _BInputT]" = Builder()
self._copy_untyped_to_(to_ret)
self._copy_typed_to(to_ret)
to_ret._output_key = key
return to_ret

def with_summary_config(self, config: SummaryConfig[_BStateT]) -> "Builder[_BStateT, _BContextT, _BInputT]":
to_ret: "Builder[_BStateT, _BContextT, _BInputT]" = Builder()
self._copy_untyped_to_(to_ret)
self._copy_typed_to(to_ret)
to_ret._summary_config = config
return to_ret

def with_default_summarizer(self, *, max_messages: int = 20, enabled: bool = True) -> "Builder[_BStateT, _BContextT, _BInputT]":
return self.with_summary_config(SummaryConfig(max_messages=max_messages, enabled=enabled))

def with_tools(self, l: Iterable[BaseTool | SplitTool]) -> "Builder[_BStateT, _BContextT, _BInputT]":
to_ret: "Builder[_BStateT, _BContextT, _BInputT]" = Builder()
self._copy_typed_to(to_ret)
self._copy_untyped_to_(to_ret)
to_ret._tools.extend(l)
return to_ret

def _build_internal(self, r: _ResultFact, i: _InitialFact, s: _SummarizerFact) -> Tuple["StateGraph[_BStateT, _BContextT, _BInputT, Any]", BoundLLM]: #type: ignore
if self._state_class is None:
raise ValueError("state_class is required")
if self._input_type is None:
raise ValueError("input_type is required")
if self._sys_prompt is None:
raise ValueError("sys_prompt is required")
if self._initial_prompt is None:
raise ValueError("initial_prompt is required")
if self._output_key is None:
raise ValueError("output_key is required")
if self._unbound_llm is None:
raise ValueError("unbound_llm is required")

return _build_workflow(
state_class=self._state_class, #type: ignore
input_type=self._input_type, #type: ignore
tools_list=self._tools,
sys_prompt=self._sys_prompt,
initial_prompt=self._initial_prompt,
output_key=self._output_key,
unbound_llm=self._unbound_llm,
context_schema=self._context_type,
summary_config=self._summary_config, #type: ignore
init_fact=i,
result_fact=r,
summary_fact=s,
output_schema=None
)

def build(self) -> Tuple["StateGraph[_BStateT, _BContextT, _BInputT, Any]", BoundLLM]: #type: ignore
return self._build_internal(
s=get_summarizer,
i=initial_node,
r=tool_result_generator
)

def build_async(self) -> Tuple["StateGraph[_BStateT, _BContextT, _BInputT, Any]", BoundLLM]: #type: ignore
return self._build_internal(
s=get_async_summarizer,
i=async_initial_node,
r=async_tool_result_generator
)

def compile_async(
self, *,
checkpointer: Checkpointer = None
) -> CompiledStateGraph[_BStateT, _BContextT, _BInputT, Any]: #type: ignore
return self.build_async()[0].compile(
checkpointer=checkpointer
)

def compile(self, checkpointer: Checkpointer = None) -> CompiledStateGraph[_BStateT, _BContextT, _BInputT, Any]: #type: ignore
return self.build()[0].compile(
checkpointer=checkpointer
)

def build_workflow(
state_class: Type[StateT],
input_type: Type[InputState],
Expand Down
2 changes: 1 addition & 1 deletion summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from typing import Generic, TypeVar

StateT = TypeVar("StateT")
StateT = TypeVar("StateT", contravariant=True)

logger = logging.getLogger(__name__)

Expand Down
73 changes: 2 additions & 71 deletions tools/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def result_tool_generator(
outkey: str,
result_schema: type[M],
doc: str,
validator: tuple[type[ST], Callable[[ST, M, str], ValidationResult]]
validator: tuple[type[ST], Callable[[ST, M, str], ValidationResult]] | Callable[[M, str], ValidationResult] | None = None
) -> BaseTool:
"""
Generates a tool that can be used to complete a workflow
Expand All @@ -53,61 +53,12 @@ def result_tool_generator(
"""
...

@overload
def result_tool_generator(
outkey: str,
result_schema: type[M],
doc: str,
validator: Callable[[M, str], ValidationResult]
) -> BaseTool:
"""
Generates a tool that can be used to complete a workflow
Args:
outkey (str): The name of the key in the state which holds the result, and whose presence signals
completion
result_schema (type[M]): A BaseModel type which is the type of the completed state. Each field of this
basemodel becomes a field in the generated tool schema, and so these fields SHOULD have string descriptions.
doc (str): The documentation to use for the generated tool
validator (Callable[[M, str], ValidationResult]): A validator which simply accepts the resultant basemodel
and the current tool call id, and return None if there is no issue, otherwise it may return a string
(which is returned as the result of the tool call WITHOUT setting outkey), or it may return an arbitrary command.

Returns:
BaseTool: The generated result tool
"""
...


@overload
def result_tool_generator(
outkey: str,
result_schema: tuple[type[R], str],
doc: str,
validator: Callable[[R, str], ValidationResult]
) -> BaseTool:
"""
Generates a tool that can be used to complete a workflow
Args:
outkey (str): The name of the key in the state which holds the result, and whose presence signals
completion
result_schema (tuple[type[R], str]): A tuple of the desired result type, and a description of what the output
should be.
doc (str): The documentation to use for the generated tool
validator (Callable[[R, str], ValidationResult]): A validator which simply accepts the resultant value
and the current tool call id, and return None if there is no issue, otherwise it may return a string
(which is returned as the result of the tool call WITHOUT setting outkey), or it may return an arbitrary command.

Returns:
BaseTool: The generated result tool
"""
...

@overload
def result_tool_generator(
outkey: str,
result_schema: tuple[type[R], str],
doc: str,
validator: tuple[type[ST], Callable[[ST, R, str], ValidationResult]]
validator: tuple[type[ST], Callable[[ST, R, str], ValidationResult]] | Callable[[R, str], ValidationResult] | None = None
) -> BaseTool:
"""
Generates a tool that can be used to complete a workflow
Expand All @@ -127,26 +78,6 @@ def result_tool_generator(
"""
...

@overload
def result_tool_generator(
outkey: str,
result_schema: type[BaseModel] | tuple[type, str],
doc: str,
) -> BaseTool:
"""
Generates a tool that can be used to complete a workflow
Args:
outkey (str): The name of the key in the state which holds the result, and whose presence signals
completion
result_schema (type[BaseModel] | tuple[type, str]): Either a BaseModel type (where each field becomes
a field in the generated tool schema) or a tuple of the desired result type and description.
doc (str): The documentation to use for the generated tool

Returns:
BaseTool: The generated result tool
"""
...

def result_tool_generator(
outkey: str,
result_schema: type[BaseModel] | tuple[type, str],
Expand Down
71 changes: 71 additions & 0 deletions tools/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from typing import Generic, TypeVar, Annotated, Any

from pydantic import BaseModel

from langchain_core.tools import InjectedToolCallId
from langgraph.prebuilt import InjectedState
from langgraph.types import Command
from langchain_core.tools import StructuredTool, BaseTool

ST = TypeVar("ST")

T_RES = TypeVar("T_RES", bound=str | Command)

class WithInjectedState(BaseModel, Generic[ST]):
state: Annotated[ST, InjectedState]

class WithInjectedId(BaseModel):
tool_call_id: Annotated[str, InjectedToolCallId]

class WithImplementation(BaseModel, Generic[T_RES]):
def run(self) -> T_RES:
"""Override this method to implement the tool logic."""
raise NotImplementedError("Subclasses must implement run()")

@classmethod
def as_tool(
cls,
name: str
) -> BaseTool:
impl_method = getattr(cls, "run")

# Simple wrapper - just accept kwargs, instantiate model, call method
def wrapper(**kwargs: Any) -> Any:
instance = cls(**kwargs)
return impl_method(instance)

return StructuredTool.from_function(
func=wrapper,
args_schema=cls,
description=cls.__doc__,
name=name,
)

class WithAsyncImplementation(BaseModel, Generic[T_RES]):
async def run(self) -> T_RES:
"""Override this method to implement the tool logic."""
raise NotImplementedError("Subclasses must implement run()")

@classmethod
def as_tool(
cls,
name: str
) -> BaseTool:
impl_method = getattr(cls, "run")

# Simple wrapper - just accept kwargs, instantiate model, call method
async def wrapper(**kwargs: Any) -> Any:
instance = cls(**kwargs)
d = await impl_method(instance)
return d

return StructuredTool.from_function(
coroutine=wrapper,
args_schema=cls,
description=cls.__doc__,
name=name,
)


class InjectAll(WithInjectedState[ST], WithInjectedId):
pass
Loading