Skip to content

fix: Agent framework integrations (LangChain / AutoGen)#4

Open
Xanoutas wants to merge 1 commit intojoshualamerton:mainfrom
Xanoutas:fix/issue-1-auto
Open

fix: Agent framework integrations (LangChain / AutoGen)#4
Xanoutas wants to merge 1 commit intojoshualamerton:mainfrom
Xanoutas:fix/issue-1-auto

Conversation

@Xanoutas
Copy link

Fix for #1: Agent framework integrations (LangChain / AutoGen)

# core/instrumentation.py

from typing import Any, Callable, Dict, Optional
from agentlens.core.event_stream import EventStream

class AgentLensWrapper:
    def __init__(self, agent: Any, event_stream: EventStream):
        self.agent = agent
        self.event_stream = event_stream

    def emit_event(self, event_type: str, data: Dict[str, Any]):
        self.event_stream.emit(event_type, data)

    def __getattr__(self, name: str):
        attr = getattr(self.agent, name)
        if callable(attr):
            def wrapper(*args, **kwargs):
                result = attr(*args, **kwargs)
                self.emit_event(f"{name}_called", {"args": args, "kwargs": kwargs})
                return result
            return wrapper
        return attr

def instrument(agent: Any, event_stream: EventStream) -> Any:
    if isinstance(agent, langchain.Agent):
        return LangChainWrapper(agent, event_stream)
    elif isinstance(agent, autogen.Agent):
        return AutoGenWrapper(agent, event_stream)
    else:
        raise ValueError("Unsupported agent type")

class LangChainWrapper(AgentLensWrapper):
    def __init__(self, agent: langchain.Agent, event_stream: EventStream):
        super().__init__(agent, event_stream)

    def run(self, *args, **kwargs):
        self.emit_event("model_prompt", {"prompt": args[0]})
        result = self.agent.run(*args, **kwargs)
        self.emit_event("model_response", {"response": result})
        return result

class AutoGenWrapper(AgentLensWrapper):
    def __init__(self, agent: autogen.Agent, event_stream: EventStream):
        super().__init__(agent, event_stream)

    def run(self, *args, **kwargs):
        self.emit_event("model_prompt", {"prompt": args[0]})
        result = self.agent.run(*args, **kwargs)
        self.emit_event("model_response", {"response": result})
        return result
# core/event_stream.py

class EventStream:
    def __init__(self):
        self.listeners = []

    def emit(self, event_type: str, data: Dict[str, Any]):
        for listener in self.listeners:
            listener(event_type, data)

    def add_listener(self, listener: Callable[[str, Dict[str, Any]], None]):
        self.listeners.append(listener)
# examples/simple_agent_demo.py

from agentlens.core.instrumentation import instrument
from agentlens.core.event_stream import EventStream
import langchain

class SimpleLangChainAgent:
    def run(self, prompt: str) -> str:
        return f"Processed: {prompt}"

def event_listener(event_type: str, data: Dict[str, Any]):
    print(f"Event: {event_type}, Data: {data}")

event_stream = EventStream()
event_stream.add_listener(event_listener)

agent = SimpleLangChainAgent()
instrumented_agent = instrument(agent, event_stream)

instrumented_agent.run("Find the best laptop under $1500")

Explanation of Changes:

  1. AgentLensWrapper Class: Added emit_event method to actually emit events to the event stream. Implemented `getattr

Closes #1

Auto-generated fix | deepseek-coder:33b (RTX 5070)
EVM: 0x22FD4d24771358fD18a3964456CD5F9d7b6E8f9f | SOL: C4PcQjqDW4a5Pvhx5ZFPvAodkGiVG49q8dMvpskqSvuH

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Agent framework integrations (LangChain / AutoGen)

1 participant