-
Notifications
You must be signed in to change notification settings - Fork 11
feat: serialize LangChain multimodal content to ingest content blocks #487
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ | |
| import inspect | ||
| import json | ||
| import logging | ||
| import os | ||
| import time | ||
| import uuid | ||
| from datetime import datetime | ||
|
|
@@ -30,7 +31,7 @@ | |
| TracesIngestRequest, | ||
| TraceUpdateRequest, | ||
| ) | ||
| from galileo.traces import Traces | ||
| from galileo.traces import IngestTraces, Traces | ||
| from galileo.utils.decorators import ( | ||
| async_warn_catch_exception, | ||
| nop_async, | ||
|
|
@@ -153,6 +154,7 @@ class GalileoLogger(TracesLogger): | |
|
|
||
| _logger = logging.getLogger("galileo.logger") | ||
| _traces_client: Optional["Traces"] = None | ||
| _ingest_client: Optional["IngestTraces"] = None | ||
| _task_handler: ThreadPoolTaskHandler | ||
| _trace_completion_submitted: bool | ||
|
|
||
|
|
@@ -305,6 +307,11 @@ def __init__( | |
| self._traces_client = Traces(project_id=self.project_id, log_stream_id=self.log_stream_id) | ||
| elif self.experiment_id: | ||
| self._traces_client = Traces(project_id=self.project_id, experiment_id=self.experiment_id) | ||
|
|
||
| if os.environ.get("GALILEO_INGEST_URL"): | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only created when this URL is set |
||
| self._ingest_client = IngestTraces( | ||
| project_id=self.project_id, log_stream_id=self.log_stream_id, experiment_id=self.experiment_id | ||
| ) | ||
| else: | ||
| # ingestion_hook path: Traces client not created eagerly. | ||
| # If the user later calls ingest_traces(), it will be created lazily. | ||
|
|
@@ -474,6 +481,8 @@ def _ingest_trace_streaming(self, trace: Trace, is_complete: bool = False) -> No | |
| ) | ||
| @retry_on_transient_http_error | ||
| async def ingest_traces_with_backoff(request: Any) -> None: | ||
| if self._ingest_client: | ||
| return await self._ingest_client.ingest_traces(request) | ||
| return await self._traces_client.ingest_traces(request) | ||
|
|
||
| self._task_handler.submit_task( | ||
|
|
@@ -1837,6 +1846,8 @@ async def _flush_batch(self) -> list[Trace]: | |
| await self._ingestion_hook(traces_ingest_request) | ||
| else: | ||
| self._ingestion_hook(traces_ingest_request) | ||
| elif self._ingest_client: | ||
| await self._ingest_client.ingest_traces(traces_ingest_request) | ||
| else: | ||
| await self._traces_client.ingest_traces(traces_ingest_request) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,14 @@ | ||
| import logging | ||
| import os | ||
| from typing import Any, Optional | ||
| from uuid import UUID | ||
|
|
||
| import httpx | ||
|
|
||
| from galileo.config import GalileoPythonConfig | ||
| from galileo.constants.routes import Routes | ||
| from galileo.schema.trace import ( | ||
| LoggingMethod, | ||
| LogRecordsSearchRequest, | ||
| SessionCreateRequest, | ||
| SpansIngestRequest, | ||
|
|
@@ -19,6 +23,8 @@ | |
|
|
||
| _logger = logging.getLogger(__name__) | ||
|
|
||
| INGEST_SERVICE_TIMEOUT_SECONDS = 120.0 | ||
|
|
||
|
|
||
| class Traces: | ||
| """ | ||
|
|
@@ -159,3 +165,62 @@ async def get_span(self, span_id: str) -> dict[str, str]: | |
| return await self._make_async_request( | ||
| RequestMethod.GET, endpoint=Routes.span.format(project_id=self.project_id, span_id=span_id) | ||
| ) | ||
|
|
||
|
|
||
| class IngestTraces: | ||
| """Client for the orbit ingest service (``/ingest/traces/:project_id``). | ||
|
|
||
| The ingest service accepts multimodal content blocks natively and | ||
| runs on a separate URL from the main Galileo API. | ||
|
|
||
| The service URL is resolved from ``GALILEO_INGEST_URL`` env var. | ||
| If not set, it falls back to ``{api_url}/ingest/traces/{project_id}``. | ||
| """ | ||
|
|
||
| def __init__(self, project_id: str, log_stream_id: Optional[str] = None, experiment_id: Optional[str] = None): | ||
| self.config = GalileoPythonConfig.get() | ||
| self.project_id = project_id | ||
| self.log_stream_id = log_stream_id | ||
| self.experiment_id = experiment_id | ||
|
|
||
| if self.log_stream_id is None and self.experiment_id is None: | ||
| raise ValueError("log_stream_id or experiment_id must be set") | ||
|
|
||
| def _get_ingest_base_url(self) -> str: | ||
| explicit = os.environ.get("GALILEO_INGEST_URL") | ||
| if explicit: | ||
| return explicit.rstrip("/") | ||
| return str(self.config.api_url or self.config.console_url).rstrip("/") | ||
|
|
||
| def _get_auth_headers(self) -> dict[str, str]: | ||
| headers: dict[str, str] = {"Content-Type": "application/json", "X-Galileo-SDK": get_sdk_header()} | ||
| if self.config.api_key: | ||
| headers["Galileo-API-Key"] = self.config.api_key.get_secret_value() | ||
| elif self.config.jwt_token: | ||
| headers["Authorization"] = f"Bearer {self.config.jwt_token.get_secret_value()}" | ||
| return headers | ||
|
|
||
| @async_warn_catch_exception(logger=_logger) | ||
| async def ingest_traces(self, traces_ingest_request: TracesIngestRequest) -> dict[str, Any]: | ||
| if self.experiment_id: | ||
| traces_ingest_request.experiment_id = UUID(self.experiment_id) | ||
| elif self.log_stream_id: | ||
|
Comment on lines
+203
to
+207
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Finding type:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will clean this up in a follow-up, keeping it as-is for now.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Understood, we can leave it as-is for now and plan the cleanup for a follow-up; I’ll remember this once the PR is merged. |
||
| traces_ingest_request.log_stream_id = UUID(self.log_stream_id) | ||
|
|
||
| traces_ingest_request.logging_method = LoggingMethod.python_client | ||
|
|
||
| base_url = self._get_ingest_base_url() | ||
| url = f"{base_url}{Routes.ingest_traces.format(project_id=self.project_id)}" | ||
| json_body = traces_ingest_request.model_dump(mode="json") | ||
|
|
||
| _logger.info( | ||
| "Sending traces to ingest service", | ||
| extra={"url": url, "project_id": self.project_id, "num_traces": len(traces_ingest_request.traces)}, | ||
| ) | ||
|
|
||
| async with httpx.AsyncClient( | ||
| timeout=httpx.Timeout(INGEST_SERVICE_TIMEOUT_SECONDS, connect=10.0), verify=self.config.ssl_context | ||
| ) as client: | ||
| response = await client.post(url, json=json_body, headers=self._get_auth_headers()) | ||
| response.raise_for_status() | ||
| return response.json() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,6 +14,7 @@ | |
| from pydantic import BaseModel | ||
|
|
||
| from galileo.utils.dependencies import is_langchain_available, is_langgraph_available, is_proto_plus_available | ||
| from galileo_core.schemas.shared.content_blocks import DataContentBlock, IngestContentBlock, TextContentBlock | ||
|
|
||
| _logger = logging.getLogger(__name__) | ||
|
|
||
|
|
@@ -42,10 +43,79 @@ def _serialize_zoned_datetime(v: dt.datetime) -> str: | |
|
|
||
|
|
||
| def map_langchain_role(role: str) -> str: | ||
| role_map = {"ai": "assistant", "human": "user"} | ||
| # Non-chunk types like "system", "tool", "function", "chat" pass through unchanged. | ||
| # Chunk classes set type to the class name (e.g. "AIMessageChunk") so we map those too. | ||
| role_map = { | ||
| "ai": "assistant", | ||
| "AIMessageChunk": "assistant", | ||
| "human": "user", | ||
| "HumanMessageChunk": "user", | ||
| "SystemMessageChunk": "system", | ||
| "ToolMessageChunk": "tool", | ||
| "FunctionMessageChunk": "function", | ||
| "ChatMessageChunk": "chat", | ||
| } | ||
| return role_map.get(role, role) | ||
|
|
||
|
|
||
| # LangChain multimodal message format mapping. | ||
| # See https://python.langchain.com/docs/concepts/multimodality/ | ||
| # LangChain uses {"type": "<modality>_url", "<modality>_url": {"url": "..."}} for media, | ||
| # and {"type": "text", "text": "..."} for text segments. | ||
| _LANGCHAIN_TYPE_TO_MODALITY = { | ||
| "image_url": "image", | ||
| "audio_url": "audio", | ||
| "video_url": "video", | ||
| "document_url": "document", | ||
| "input_image": "image", | ||
| "input_audio": "audio", | ||
| } | ||
|
Comment on lines
+61
to
+72
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Finding type: Want Baz to fix this for you? Activate Fixer Other fix methodsPrompt for AI Agents:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Intentional. The schema is being widened across the stack (galileo-core, orbit, API). The openapi.yaml here is auto-generated from the API via scripts/import-openapi-yaml.sh, so it'll be updated once the API PR lands and we re-import.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, thanks for the clarification on the intentional schema widening and OpenAPI auto-generation; I’ll remember this behavior and treat it as expected once the PR is merged. |
||
|
|
||
|
|
||
| def _convert_langchain_content_block(block: dict) -> IngestContentBlock: | ||
| """Convert a single LangChain content block dict to a Galileo ingest content block. | ||
|
|
||
| LangChain multimodal format (https://python.langchain.com/docs/concepts/multimodality/): | ||
| {"type": "text", "text": "hello"} | ||
| {"type": "image_url", "image_url": {"url": "https://..."}} | ||
| {"type": "input_audio", "input_audio": {"data": "base64...", "format": "wav"}} | ||
|
|
||
| Returns either a TextContentBlock or DataContentBlock instance. | ||
| """ | ||
| block_type = block.get("type", "") | ||
| if block_type == "text" or (not block_type and "text" in block): | ||
| return TextContentBlock(text=block.get("text", "")) | ||
| modality = _LANGCHAIN_TYPE_TO_MODALITY.get(block_type) | ||
| if modality: | ||
| nested = block.get(block_type, {}) | ||
| if isinstance(nested, dict): | ||
| url = nested.get("url", "") | ||
| raw_data = nested.get("data", "") | ||
| fmt = nested.get("format", "") | ||
| else: | ||
| url = str(nested) | ||
| raw_data = "" | ||
| fmt = "" | ||
| kwargs: dict[str, Any] = {"type": modality} | ||
| if url: | ||
| if url.startswith("data:"): | ||
| kwargs["base64"] = url | ||
| else: | ||
| kwargs["url"] = url | ||
| elif raw_data: | ||
| mime = f"{modality}/{fmt}" if fmt else modality | ||
| kwargs["base64"] = f"data:{mime};base64,{raw_data}" | ||
| return DataContentBlock(**kwargs) | ||
| return TextContentBlock(text=str(block)) | ||
|
|
||
|
|
||
| def _normalize_multimodal_content(dumped: dict) -> None: | ||
| """If ``dumped["content"]`` is a list of dicts, convert each to an ingest content block in-place.""" | ||
| content = dumped.get("content") | ||
| if isinstance(content, list) and content and isinstance(content[0], dict): | ||
| dumped["content"] = [_convert_langchain_content_block(b) for b in content] | ||
|
|
||
|
|
||
| class EventSerializer(JSONEncoder): | ||
| """Custom JSON encoder to assist in the serialization of a wide range of objects.""" | ||
|
|
||
|
|
@@ -96,6 +166,14 @@ def default(self, obj: Any) -> Any: | |
| return self.default(obj.message) | ||
| if isinstance(obj, LLMResult): | ||
| return self.default(obj.generations[0]) | ||
| # LangChain message type multimodal audit (all 12 types accounted for): | ||
| # Branch 1 (AIMessageChunk, AIMessage): explicit — tool_calls + multimodal | ||
| # Branch 2 (ToolMessage, ToolMessageChunk via inheritance): explicit — status + multimodal | ||
| # Branch 3 (BaseMessage catch-all): HumanMessage, HumanMessageChunk, | ||
| # SystemMessage, SystemMessageChunk, ChatMessage, ChatMessageChunk, | ||
| # FunctionMessage (deprecated), FunctionMessageChunk (deprecated) | ||
| # _normalize_multimodal_content() is called in every branch so list[dict] | ||
| # content is converted to IngestContentBlock for all message types. | ||
| if isinstance(obj, (AIMessageChunk, AIMessage)): | ||
| # Map the `type` to `role`. | ||
| if hasattr(obj, "model_dump"): | ||
|
|
@@ -105,12 +183,7 @@ def default(self, obj: Any) -> Any: | |
| else: | ||
| # Fallback to using the dict method if model_dump is not available i.e pydantic v1 | ||
| dumped = obj.dict(include={"content", "type", "additional_kwargs", "tool_calls"}) | ||
| content = dumped.get("content") | ||
| if isinstance(content, list): | ||
| # Responses API returns content as a list of dicts | ||
| # Convert list content to string format for consistency | ||
| if content and isinstance(content[0], dict): | ||
| dumped["content"] = content[0].get("text", "") | ||
| _normalize_multimodal_content(dumped) | ||
| dumped["role"] = map_langchain_role(dumped.pop("type")) | ||
| additional_kwargs = dumped.pop("additional_kwargs", {}) | ||
| # Check both direct attribute and additional_kwargs for tool_calls | ||
|
|
@@ -156,6 +229,7 @@ def default(self, obj: Any) -> Any: | |
| else: | ||
| # Fallback to using the dict method if model_dump is not available i.e pydantic v1 | ||
| dumped = obj.dict(include={"content", "type", "status", "tool_call_id"}) | ||
| _normalize_multimodal_content(dumped) | ||
| dumped["role"] = map_langchain_role(dumped.pop("type")) | ||
| return dumped | ||
| if isinstance(obj, BaseMessage): | ||
|
|
@@ -165,6 +239,7 @@ def default(self, obj: Any) -> Any: | |
| else: | ||
| # Fallback to using the dict method if model_dump is not available i.e pydantic v1 | ||
| dumped = obj.dict(include={"content", "type"}) | ||
| _normalize_multimodal_content(dumped) | ||
| dumped["role"] = map_langchain_role(dumped.pop("type")) | ||
| return dumped | ||
|
|
||
|
|
@@ -212,6 +287,9 @@ def default(self, obj: Any) -> Any: | |
| return f"<{obj.__name__}>" | ||
| return f"<{obj.__name__}>" | ||
|
|
||
| if isinstance(obj, (TextContentBlock, DataContentBlock)): | ||
| return obj.model_dump(mode="json", exclude_none=True) | ||
|
|
||
| if isinstance(obj, BaseModel): | ||
| if hasattr(obj, "model_dump"): | ||
| return self.default( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.