diff --git a/src/galileo/constants/routes.py b/src/galileo/constants/routes.py index 75ceb2e9..e2b6040d 100644 --- a/src/galileo/constants/routes.py +++ b/src/galileo/constants/routes.py @@ -23,3 +23,6 @@ class Routes(str, Enum): sessions = "/v2/projects/{project_id}/sessions" sessions_search = "/v2/projects/{project_id}/sessions/search" + + ingest_traces = "/ingest/traces/{project_id}" + ingest_spans = "/ingest/spans/{project_id}" diff --git a/src/galileo/logger/logger.py b/src/galileo/logger/logger.py index 6dfc5235..c6b8f1c5 100644 --- a/src/galileo/logger/logger.py +++ b/src/galileo/logger/logger.py @@ -4,6 +4,7 @@ import inspect import json import logging +import os import time import uuid from datetime import datetime @@ -31,7 +32,7 @@ TracesIngestRequest, TraceUpdateRequest, ) -from galileo.traces import Traces +from galileo.traces import IngestTraces, Traces from galileo.utils.decorators import ( async_warn_catch_exception, nop_async, @@ -153,6 +154,7 @@ class GalileoLogger(TracesLogger): _logger = logging.getLogger("galileo.logger") _traces_client: Optional["Traces"] = None + _ingest_client: Optional["IngestTraces"] = None _task_handler: ThreadPoolTaskHandler _trace_completion_submitted: bool @@ -305,9 +307,14 @@ def __init__( self._traces_client = Traces(project_id=self.project_id, log_stream_id=self.log_stream_id) elif self.experiment_id: self._traces_client = Traces(project_id=self.project_id, experiment_id=self.experiment_id) + + if os.environ.get("GALILEO_INGEST_URL"): + if self.log_stream_id: + self._ingest_client = IngestTraces(project_id=self.project_id, log_stream_id=self.log_stream_id) + elif self.experiment_id: + self._ingest_client = IngestTraces(project_id=self.project_id, experiment_id=self.experiment_id) else: - # ingestion_hook path: Traces client not created eagerly. - # If the user later calls ingest_traces(), it will be created lazily. + # ingestion_hook path: clients not created eagerly. self._traces_client = None # If continuing an existing distributed trace, create local stubs instead of @@ -348,19 +355,33 @@ def _init_log_stream(self) -> None: else: self.log_stream_id = log_stream_obj.id - def _create_traces_client(self) -> Traces: - """Lazily create a Traces client when needed (e.g. ingestion_hook users calling ingest_traces).""" - self._logger.info("Creating Traces client lazily for explicit ingest_traces call.") + def _ensure_project_and_log_stream(self) -> None: + """Ensure project_id and log_stream_id/experiment_id are initialized.""" if not self.project_id: self._init_project() if not (self.log_stream_id or self.experiment_id): self._init_log_stream() + + def _create_traces_client(self) -> Traces: + """Lazily create a Traces client when needed (e.g. ingestion_hook users calling ingest_traces).""" + self._logger.info("Creating Traces client lazily.") + self._ensure_project_and_log_stream() if self.log_stream_id: return Traces(project_id=self.project_id, log_stream_id=self.log_stream_id) if self.experiment_id: return Traces(project_id=self.project_id, experiment_id=self.experiment_id) raise GalileoLoggerException("Cannot create Traces client: no log_stream_id or experiment_id available.") + def _create_ingest_client(self) -> IngestTraces: + """Lazily create an IngestTraces client for the dedicated ingest service.""" + self._logger.info("Creating IngestTraces client lazily.") + self._ensure_project_and_log_stream() + if self.log_stream_id: + return IngestTraces(project_id=self.project_id, log_stream_id=self.log_stream_id) + if self.experiment_id: + return IngestTraces(project_id=self.project_id, experiment_id=self.experiment_id) + raise GalileoLoggerException("Cannot create IngestTraces client: no log_stream_id or experiment_id available.") + def _init_distributed_trace_stubs(self) -> None: """ Initialize local stub objects for distributed tracing. To only be used in distributed mode. @@ -514,7 +535,8 @@ def _ingest_trace_streaming(self, trace: Trace, is_complete: bool = False) -> No ) @retry_on_transient_http_error async def ingest_traces_with_backoff(request: Any) -> None: - return await self._traces_client.ingest_traces(request) + client = self._ingest_client or self._traces_client + return await client.ingest_traces(request) self._task_handler.submit_task( task_id, lambda: ingest_traces_with_backoff(traces_ingest_request), dependent_on_prev=False @@ -564,7 +586,8 @@ def _ingest_span_streaming(self, span: Span) -> None: ) @retry_on_transient_http_error async def ingest_spans_with_backoff(request: Any) -> None: - return await self._traces_client.ingest_spans(request) + client = self._ingest_client or self._traces_client + return await client.ingest_spans(request) self._task_handler.submit_task( task_id, lambda: ingest_spans_with_backoff(spans_ingest_request), dependent_on_prev=False @@ -1008,13 +1031,11 @@ def add_single_llm_span_trace( if self.current_parent() is not None: raise ValueError("A trace cannot be created within a parent trace or span, it must always be the root.") - # Trace-level I/O is serialized to string; the child LoggedLlmSpan - # preserves the full structured/multimodal content. trace = LoggedTrace( - input=serialize_to_str(input), - redacted_input=serialize_to_str(redacted_input) if redacted_input else None, - output=serialize_to_str(output), - redacted_output=serialize_to_str(redacted_output) if redacted_output else None, + input=input, + redacted_input=redacted_input, + output=output, + redacted_output=redacted_output, name=name, created_at=created_at, user_metadata=metadata, @@ -1901,7 +1922,8 @@ async def _flush_batch(self) -> list[LoggedTrace]: else: self._ingestion_hook(traces_ingest_request) else: - await self._traces_client.ingest_traces(traces_ingest_request) + client = self._ingest_client or self._traces_client + await client.ingest_traces(traces_ingest_request) self._logger.info(f"Successfully flushed {trace_count} {'trace' if trace_count == 1 else 'traces'}.") @@ -2095,9 +2117,13 @@ async def async_ingest_traces(self, ingest_request: TracesIngestRequest) -> None Can be used in combination with the `ingestion_hook` to ingest modified traces. """ - if self._traces_client is None: - self._traces_client = self._create_traces_client() - await self._traces_client.ingest_traces(ingest_request) + if self._ingest_client is None and os.environ.get("GALILEO_INGEST_URL"): + self._ingest_client = self._create_ingest_client() + client = self._ingest_client or self._traces_client + if client is None: + client = self._create_traces_client() + self._traces_client = client + await client.ingest_traces(ingest_request) @nop_sync @warn_catch_exception(exceptions=(Exception,)) @@ -2107,6 +2133,4 @@ def ingest_traces(self, ingest_request: TracesIngestRequest) -> None: Can be used in combination with the `ingestion_hook` to ingest modified traces. """ - if self._traces_client is None: - self._traces_client = self._create_traces_client() return async_run(self.async_ingest_traces(ingest_request)) diff --git a/src/galileo/traces.py b/src/galileo/traces.py index bd9e2f81..32e9ec9a 100644 --- a/src/galileo/traces.py +++ b/src/galileo/traces.py @@ -1,10 +1,14 @@ import logging +import os from typing import Any, Optional from uuid import UUID +import httpx + from galileo.config import GalileoPythonConfig from galileo.constants.routes import Routes from galileo.schema.trace import ( + LoggingMethod, LogRecordsSearchRequest, SessionCreateRequest, SpansIngestRequest, @@ -159,3 +163,96 @@ async def get_span(self, span_id: str) -> dict[str, str]: return await self._make_async_request( RequestMethod.GET, endpoint=Routes.span.format(project_id=self.project_id, span_id=span_id) ) + + +class IngestTraces: + """Client for the dedicated ingest service. + + Sends traces directly to the ingest service which may run at a + separate URL from the main Galileo API. The service URL is resolved + from the ``GALILEO_INGEST_URL`` environment variable; when unset it + falls back to the configured ``api_url``. + + Parameters + ---------- + project_id : str + The project to ingest into. + log_stream_id : Optional[str] + Log stream id (mutually exclusive with *experiment_id*). + experiment_id : Optional[str] + Experiment id (mutually exclusive with *log_stream_id*). + """ + + def __init__( + self, project_id: str, log_stream_id: Optional[str] = None, experiment_id: Optional[str] = None + ) -> None: + self.config = GalileoPythonConfig.get() + self.project_id = project_id + self.log_stream_id = log_stream_id + self.experiment_id = experiment_id + + if self.log_stream_id is None and self.experiment_id is None: + raise ValueError("log_stream_id or experiment_id must be set") + + def _get_ingest_base_url(self) -> str: + explicit = os.environ.get("GALILEO_INGEST_URL") + if explicit: + return explicit.rstrip("/") + api_url = self.config.api_url or self.config.console_url + return str(api_url).rstrip("/") + + def _get_auth_headers(self) -> dict[str, str]: + headers: dict[str, str] = {"Content-Type": "application/json", "X-Galileo-SDK": get_sdk_header()} + if self.config.api_key: + headers["Galileo-API-Key"] = self.config.api_key.get_secret_value() + elif self.config.jwt_token: + headers["Authorization"] = f"Bearer {self.config.jwt_token.get_secret_value()}" + return headers + + @async_warn_catch_exception(logger=_logger) + async def ingest_traces(self, traces_ingest_request: TracesIngestRequest) -> dict[str, Any]: + if self.experiment_id: + traces_ingest_request.experiment_id = UUID(self.experiment_id) + elif self.log_stream_id: + traces_ingest_request.log_stream_id = UUID(self.log_stream_id) + + traces_ingest_request.logging_method = LoggingMethod.python_client + + base_url = self._get_ingest_base_url() + url = f"{base_url}{Routes.ingest_traces.format(project_id=self.project_id)}" + json_body = traces_ingest_request.model_dump(mode="json") + + _logger.info( + "Sending traces to ingest service", + extra={"url": url, "project_id": self.project_id, "num_traces": len(traces_ingest_request.traces)}, + ) + + # httpx default timeout is 5s for connect/read/write/pool + # (see https://www.python-httpx.org/advanced/timeouts/) + async with httpx.AsyncClient(verify=self.config.ssl_context) as client: + response = await client.post(url, json=json_body, headers=self._get_auth_headers()) + response.raise_for_status() + return response.json() + + @async_warn_catch_exception(logger=_logger) + async def ingest_spans(self, spans_ingest_request: SpansIngestRequest) -> dict[str, Any]: + if self.experiment_id: + spans_ingest_request.experiment_id = UUID(self.experiment_id) + elif self.log_stream_id: + spans_ingest_request.log_stream_id = UUID(self.log_stream_id) + + spans_ingest_request.logging_method = LoggingMethod.python_client + + base_url = self._get_ingest_base_url() + url = f"{base_url}{Routes.ingest_spans.format(project_id=self.project_id)}" + json_body = spans_ingest_request.model_dump(mode="json") + + _logger.info( + "Sending spans to ingest service", + extra={"url": url, "project_id": self.project_id, "num_spans": len(spans_ingest_request.spans)}, + ) + + async with httpx.AsyncClient(verify=self.config.ssl_context) as client: + response = await client.post(url, json=json_body, headers=self._get_auth_headers()) + response.raise_for_status() + return response.json() diff --git a/tests/test_ingest_traces_client.py b/tests/test_ingest_traces_client.py new file mode 100644 index 00000000..7722a2a0 --- /dev/null +++ b/tests/test_ingest_traces_client.py @@ -0,0 +1,384 @@ +"""Tests for IngestTraces client (dedicated ingest service).""" + +import logging +from unittest.mock import AsyncMock, Mock, patch +from uuid import UUID + +import httpx +import pytest +import respx + +from galileo.logger import GalileoLogger +from galileo.schema.logged import LoggedLlmSpan, LoggedTrace +from galileo.schema.trace import LoggingMethod, SpansIngestRequest, TracesIngestRequest +from galileo.traces import IngestTraces +from galileo.utils.headers_data import get_package_version +from tests.testutils.setup import setup_mock_logstreams_client, setup_mock_projects_client, setup_mock_traces_client + +LOGGER = logging.getLogger(__name__) + +PROJECT_ID = "6c4e3f7e-4a9a-4e7e-8c1f-3a9a3a9a3a9a" +LOG_STREAM_ID = "6c4e3f7e-4a9a-4e7e-8c1f-3a9a3a9a3a9b" +EXPERIMENT_ID = "6c4e3f7e-4a9a-4e7e-8c1f-3a9a3a9a3a9d" +INGEST_URL = "https://ingest.example.com" + + +class TestIngestTracesInit: + @patch("galileo.traces.GalileoPythonConfig") + def test_requires_log_stream_or_experiment(self, mock_config_class) -> None: + mock_config_class.get.return_value = Mock() + + # When/Then: missing both raises ValueError + with pytest.raises(ValueError, match="log_stream_id or experiment_id must be set"): + IngestTraces(project_id=PROJECT_ID) + + @patch("galileo.traces.GalileoPythonConfig") + def test_accepts_log_stream_id(self, mock_config_class) -> None: + mock_config_class.get.return_value = Mock() + + client = IngestTraces(project_id=PROJECT_ID, log_stream_id=LOG_STREAM_ID) + assert client.project_id == PROJECT_ID + assert client.log_stream_id == LOG_STREAM_ID + + @patch("galileo.traces.GalileoPythonConfig") + def test_accepts_experiment_id(self, mock_config_class) -> None: + mock_config_class.get.return_value = Mock() + + client = IngestTraces(project_id=PROJECT_ID, experiment_id=EXPERIMENT_ID) + assert client.experiment_id == EXPERIMENT_ID + + +class TestIngestBaseUrl: + @patch("galileo.traces.GalileoPythonConfig") + def test_uses_galileo_ingest_url_when_set(self, mock_config_class, monkeypatch) -> None: + mock_config_class.get.return_value = Mock() + monkeypatch.setenv("GALILEO_INGEST_URL", INGEST_URL) + + client = IngestTraces(project_id=PROJECT_ID, log_stream_id=LOG_STREAM_ID) + assert client._get_ingest_base_url() == INGEST_URL + + @patch("galileo.traces.GalileoPythonConfig") + def test_strips_trailing_slash(self, mock_config_class, monkeypatch) -> None: + mock_config_class.get.return_value = Mock() + monkeypatch.setenv("GALILEO_INGEST_URL", f"{INGEST_URL}/") + + client = IngestTraces(project_id=PROJECT_ID, log_stream_id=LOG_STREAM_ID) + assert client._get_ingest_base_url() == INGEST_URL + + @patch("galileo.traces.GalileoPythonConfig") + def test_falls_back_to_api_url(self, mock_config_class, monkeypatch) -> None: + monkeypatch.delenv("GALILEO_INGEST_URL", raising=False) + mock_config = Mock() + mock_config.api_url = "https://api.galileo.ai" + mock_config.console_url = "https://console.galileo.ai" + mock_config_class.get.return_value = mock_config + + client = IngestTraces(project_id=PROJECT_ID, log_stream_id=LOG_STREAM_ID) + assert client._get_ingest_base_url() == "https://api.galileo.ai" + + @patch("galileo.traces.GalileoPythonConfig") + def test_falls_back_to_console_url_when_no_api_url(self, mock_config_class, monkeypatch) -> None: + monkeypatch.delenv("GALILEO_INGEST_URL", raising=False) + mock_config = Mock() + mock_config.api_url = None + mock_config.console_url = "https://console.galileo.ai" + mock_config_class.get.return_value = mock_config + + client = IngestTraces(project_id=PROJECT_ID, log_stream_id=LOG_STREAM_ID) + assert client._get_ingest_base_url() == "https://console.galileo.ai" + + +class TestAuthHeaders: + @patch("galileo.traces.GalileoPythonConfig") + def test_uses_api_key_when_available(self, mock_config_class) -> None: + mock_config = Mock() + mock_config.api_key.get_secret_value.return_value = "my-api-key" + mock_config.jwt_token = None + mock_config_class.get.return_value = mock_config + + client = IngestTraces(project_id=PROJECT_ID, log_stream_id=LOG_STREAM_ID) + headers = client._get_auth_headers() + + assert headers["Galileo-API-Key"] == "my-api-key" + assert "Authorization" not in headers + + @patch("galileo.traces.GalileoPythonConfig") + def test_uses_jwt_when_no_api_key(self, mock_config_class) -> None: + mock_config = Mock() + mock_config.api_key = None + mock_config.jwt_token.get_secret_value.return_value = "my-jwt-token" + mock_config_class.get.return_value = mock_config + + client = IngestTraces(project_id=PROJECT_ID, log_stream_id=LOG_STREAM_ID) + headers = client._get_auth_headers() + + assert headers["Authorization"] == "Bearer my-jwt-token" + assert "Galileo-API-Key" not in headers + + @patch("galileo.traces.GalileoPythonConfig") + def test_includes_sdk_header(self, mock_config_class) -> None: + mock_config = Mock() + mock_config.api_key = None + mock_config.jwt_token = None + mock_config_class.get.return_value = mock_config + + client = IngestTraces(project_id=PROJECT_ID, log_stream_id=LOG_STREAM_ID) + headers = client._get_auth_headers() + + assert headers["Content-Type"] == "application/json" + assert headers["X-Galileo-SDK"].startswith(f"galileo-python/{get_package_version()}") + + +class TestIngestTracesRequest: + @pytest.fixture + def mock_config(self): + with patch("galileo.traces.GalileoPythonConfig") as mock_config_class: + mock_config = Mock() + mock_config.api_key.get_secret_value.return_value = "test-key" + mock_config.jwt_token = None + mock_config.ssl_context = True + mock_config.api_url = "https://api.galileo.ai" + mock_config_class.get.return_value = mock_config + yield mock_config + + @pytest.fixture + def client(self, mock_config): + return IngestTraces(project_id=PROJECT_ID, log_stream_id=LOG_STREAM_ID) + + @respx.mock + @pytest.mark.asyncio + async def test_ingest_traces_posts_to_correct_url(self, client, monkeypatch) -> None: + # Given: an ingest URL is configured + monkeypatch.setenv("GALILEO_INGEST_URL", INGEST_URL) + + expected_url = f"{INGEST_URL}/ingest/traces/{PROJECT_ID}" + route = respx.post(expected_url).mock(return_value=httpx.Response(200, json={"status": "ok"})) + + trace = LoggedTrace(input="hello", output="world") + request = TracesIngestRequest(traces=[trace]) + + # When: ingesting traces + result = await client.ingest_traces(request) + + # Then: request hits the ingest service URL + assert route.called + assert result == {"status": "ok"} + + @respx.mock + @pytest.mark.asyncio + async def test_ingest_traces_sets_logging_method(self, client, monkeypatch) -> None: + monkeypatch.setenv("GALILEO_INGEST_URL", INGEST_URL) + + expected_url = f"{INGEST_URL}/ingest/traces/{PROJECT_ID}" + route = respx.post(expected_url).mock(return_value=httpx.Response(200, json={})) + + trace = LoggedTrace(input="hello") + request = TracesIngestRequest(traces=[trace]) + + await client.ingest_traces(request) + + # Then: logging_method was set to python_client + assert request.logging_method == LoggingMethod.python_client + sent_body = route.calls[0].request.content + assert b"python_client" in sent_body + + @respx.mock + @pytest.mark.asyncio + async def test_ingest_traces_sets_log_stream_id(self, client, monkeypatch) -> None: + monkeypatch.setenv("GALILEO_INGEST_URL", INGEST_URL) + expected_url = f"{INGEST_URL}/ingest/traces/{PROJECT_ID}" + respx.post(expected_url).mock(return_value=httpx.Response(200, json={})) + + trace = LoggedTrace(input="hello") + request = TracesIngestRequest(traces=[trace]) + + await client.ingest_traces(request) + + # Then: log_stream_id was populated from the client + assert request.log_stream_id == UUID(LOG_STREAM_ID) + + @respx.mock + @pytest.mark.asyncio + async def test_ingest_traces_sets_experiment_id(self, mock_config, monkeypatch) -> None: + monkeypatch.setenv("GALILEO_INGEST_URL", INGEST_URL) + client = IngestTraces(project_id=PROJECT_ID, experiment_id=EXPERIMENT_ID) + + expected_url = f"{INGEST_URL}/ingest/traces/{PROJECT_ID}" + respx.post(expected_url).mock(return_value=httpx.Response(200, json={})) + + trace = LoggedTrace(input="hello") + request = TracesIngestRequest(traces=[trace]) + + await client.ingest_traces(request) + + assert request.experiment_id == UUID(EXPERIMENT_ID) + + @respx.mock + @pytest.mark.asyncio + async def test_ingest_traces_falls_back_to_api_url(self, client, monkeypatch) -> None: + # Given: no GALILEO_INGEST_URL set + monkeypatch.delenv("GALILEO_INGEST_URL", raising=False) + + expected_url = f"https://api.galileo.ai/ingest/traces/{PROJECT_ID}" + route = respx.post(expected_url).mock(return_value=httpx.Response(200, json={"ok": True})) + + trace = LoggedTrace(input="hello") + request = TracesIngestRequest(traces=[trace]) + + result = await client.ingest_traces(request) + + assert route.called + assert result == {"ok": True} + + +class TestIngestSpansRequest: + @pytest.fixture + def mock_config(self): + with patch("galileo.traces.GalileoPythonConfig") as mock_config_class: + mock_config = Mock() + mock_config.api_key.get_secret_value.return_value = "test-key" + mock_config.jwt_token = None + mock_config.ssl_context = True + mock_config.api_url = "https://api.galileo.ai" + mock_config_class.get.return_value = mock_config + yield mock_config + + @pytest.fixture + def client(self, mock_config): + return IngestTraces(project_id=PROJECT_ID, log_stream_id=LOG_STREAM_ID) + + @respx.mock + @pytest.mark.asyncio + async def test_ingest_spans_posts_to_correct_url(self, client, monkeypatch) -> None: + monkeypatch.setenv("GALILEO_INGEST_URL", INGEST_URL) + + expected_url = f"{INGEST_URL}/ingest/spans/{PROJECT_ID}" + route = respx.post(expected_url).mock(return_value=httpx.Response(200, json={"status": "ok"})) + + span = LoggedLlmSpan(input="hello", output="world", model="gpt-4o") + request = SpansIngestRequest(spans=[span], trace_id=UUID(PROJECT_ID), parent_id=UUID(LOG_STREAM_ID)) + + result = await client.ingest_spans(request) + + assert route.called + assert result == {"status": "ok"} + + @respx.mock + @pytest.mark.asyncio + async def test_ingest_spans_sets_log_stream_id(self, client, monkeypatch) -> None: + monkeypatch.setenv("GALILEO_INGEST_URL", INGEST_URL) + expected_url = f"{INGEST_URL}/ingest/spans/{PROJECT_ID}" + respx.post(expected_url).mock(return_value=httpx.Response(200, json={})) + + span = LoggedLlmSpan(input="hello", output="world", model="gpt-4o") + request = SpansIngestRequest(spans=[span], trace_id=UUID(PROJECT_ID), parent_id=UUID(LOG_STREAM_ID)) + + await client.ingest_spans(request) + + assert request.log_stream_id == UUID(LOG_STREAM_ID) + assert request.logging_method == LoggingMethod.python_client + + +class TestLoggerIngestClientWiring: + """Test that GalileoLogger creates and uses IngestTraces when GALILEO_INGEST_URL is set.""" + + @patch("galileo.logger.logger.LogStreams") + @patch("galileo.logger.logger.Projects") + @patch("galileo.logger.logger.Traces") + def test_no_ingest_client_without_env_var( + self, mock_traces_client: Mock, mock_projects_client: Mock, mock_logstreams_client: Mock, monkeypatch + ) -> None: + # Given: GALILEO_INGEST_URL is not set + monkeypatch.delenv("GALILEO_INGEST_URL", raising=False) + setup_mock_traces_client(mock_traces_client) + setup_mock_projects_client(mock_projects_client) + setup_mock_logstreams_client(mock_logstreams_client) + + # When: creating a logger + logger = GalileoLogger(project="my_project", log_stream="my_log_stream", mode="batch") + + # Then: ingest client is not created + assert logger._ingest_client is None + assert logger._traces_client is not None + + @patch("galileo.logger.logger.IngestTraces") + @patch("galileo.logger.logger.LogStreams") + @patch("galileo.logger.logger.Projects") + @patch("galileo.logger.logger.Traces") + def test_ingest_client_created_with_env_var( + self, + mock_traces_client: Mock, + mock_projects_client: Mock, + mock_logstreams_client: Mock, + mock_ingest_client: Mock, + monkeypatch, + ) -> None: + # Given: GALILEO_INGEST_URL is set + monkeypatch.setenv("GALILEO_INGEST_URL", INGEST_URL) + setup_mock_traces_client(mock_traces_client) + setup_mock_projects_client(mock_projects_client) + setup_mock_logstreams_client(mock_logstreams_client) + + # When: creating a logger + logger = GalileoLogger(project="my_project", log_stream="my_log_stream", mode="batch") + + # Then: ingest client is created + assert logger._ingest_client is not None + mock_ingest_client.assert_called_once() + + @patch("galileo.logger.logger.IngestTraces") + @patch("galileo.logger.logger.LogStreams") + @patch("galileo.logger.logger.Projects") + @patch("galileo.logger.logger.Traces") + def test_flush_uses_ingest_client_when_available( + self, + mock_traces_client: Mock, + mock_projects_client: Mock, + mock_logstreams_client: Mock, + mock_ingest_client: Mock, + monkeypatch, + ) -> None: + # Given: GALILEO_INGEST_URL is set, logger has a trace + monkeypatch.setenv("GALILEO_INGEST_URL", INGEST_URL) + mock_traces_instance = setup_mock_traces_client(mock_traces_client) + setup_mock_projects_client(mock_projects_client) + setup_mock_logstreams_client(mock_logstreams_client) + + mock_ingest_instance = Mock() + mock_ingest_instance.ingest_traces = AsyncMock(return_value={}) + mock_ingest_client.return_value = mock_ingest_instance + + logger = GalileoLogger(project="my_project", log_stream="my_log_stream", mode="batch") + logger.start_trace(input="hello") + logger.add_llm_span(input="hello", output="world", model="gpt-4o") + logger.conclude(output="world") + + # When: flushing + logger.flush() + + # Then: ingest client was used, not the traces client + mock_ingest_instance.ingest_traces.assert_called_once() + mock_traces_instance.ingest_traces.assert_not_called() + + @patch("galileo.logger.logger.LogStreams") + @patch("galileo.logger.logger.Projects") + @patch("galileo.logger.logger.Traces") + def test_flush_falls_back_to_traces_client_without_env_var( + self, mock_traces_client: Mock, mock_projects_client: Mock, mock_logstreams_client: Mock, monkeypatch + ) -> None: + # Given: GALILEO_INGEST_URL is not set + monkeypatch.delenv("GALILEO_INGEST_URL", raising=False) + mock_traces_instance = setup_mock_traces_client(mock_traces_client) + setup_mock_projects_client(mock_projects_client) + setup_mock_logstreams_client(mock_logstreams_client) + + logger = GalileoLogger(project="my_project", log_stream="my_log_stream", mode="batch") + logger.start_trace(input="hello") + logger.add_llm_span(input="hello", output="world", model="gpt-4o") + logger.conclude(output="world") + + # When: flushing + logger.flush() + + # Then: traces client was used (fallback) + mock_traces_instance.ingest_traces.assert_called_once() diff --git a/tests/test_logger_batch.py b/tests/test_logger_batch.py index f7e5c64a..fca1969e 100644 --- a/tests/test_logger_batch.py +++ b/tests/test_logger_batch.py @@ -10,10 +10,13 @@ import pytest from galileo.logger import GalileoLogger +from galileo.schema.content_blocks import DataContentBlock, TextContentBlock from galileo.schema.logged import LoggedTrace +from galileo.schema.message import LoggedMessage from galileo.schema.metrics import LocalMetricConfig from galileo.schema.trace import TracesIngestRequest from galileo_core.schemas.logging.agent import AgentType +from galileo_core.schemas.logging.llm import MessageRole from galileo_core.schemas.logging.span import AgentSpan, LlmSpan, RetrieverSpan, Span, ToolSpan, WorkflowSpan from galileo_core.schemas.logging.step import Metrics from galileo_core.schemas.logging.trace import Trace @@ -21,6 +24,7 @@ from galileo_core.schemas.protect.payload import Payload from galileo_core.schemas.protect.response import Response, TraceMetadata from galileo_core.schemas.shared.document import Document +from galileo_core.schemas.shared.multimodal import ContentModality from tests.testutils.setup import ( setup_mock_experiments_client, setup_mock_logstreams_client, @@ -1589,6 +1593,44 @@ def test_add_single_llm_span_trace_ingestion( assert logger._parent_stack == deque() +@patch("galileo.logger.logger.LogStreams") +@patch("galileo.logger.logger.Projects") +@patch("galileo.logger.logger.Traces") +def test_multimodal_input_not_stringified_at_trace_level( + mock_traces_client: Mock, mock_projects_client: Mock, mock_logstreams_client: Mock +) -> None: + """Multimodal content must be preserved at trace level, not serialized to string.""" + mock_traces_client_instance = setup_mock_traces_client(mock_traces_client) + setup_mock_projects_client(mock_projects_client) + setup_mock_logstreams_client(mock_logstreams_client) + + logger = GalileoLogger(project="my_project", log_stream="my_log_stream") + + # Given: multimodal message input with text + image content blocks + messages = [ + LoggedMessage( + content=[ + TextContentBlock(text="Describe this image"), + DataContentBlock(modality=ContentModality.image, url="https://example.com/img.png"), + ], + role=MessageRole.user, + ) + ] + logger.start_trace(input=messages) + logger.add_llm_span(input=messages, output="A sunset", model="gpt-4o") + logger.conclude("A sunset") + logger.flush() + + # Then: trace.input is the message list, not a stringified version + payload: TracesIngestRequest = mock_traces_client_instance.ingest_traces.call_args.args[0] + trace = payload.traces[0] + assert not isinstance(trace.input, str), "trace input should not be stringified" + assert isinstance(trace.input, list) + assert isinstance(trace.input[0], LoggedMessage) + assert trace.input[0].content[0].text == "Describe this image" + assert trace.input[0].content[1].modality == ContentModality.image + + @patch("galileo.logger.logger.LogStreams") @patch("galileo.logger.logger.Projects") @patch("galileo.logger.logger.Traces")