Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/galileo/constants/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ class Routes(str, Enum):

sessions = "/v2/projects/{project_id}/sessions"
sessions_search = "/v2/projects/{project_id}/sessions/search"

ingest_traces = "/ingest/traces/{project_id}"
ingest_spans = "/ingest/spans/{project_id}"
64 changes: 44 additions & 20 deletions src/galileo/logger/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import inspect
import json
import logging
import os
import time
import uuid
from datetime import datetime
Expand Down Expand Up @@ -31,7 +32,7 @@
TracesIngestRequest,
TraceUpdateRequest,
)
from galileo.traces import Traces
from galileo.traces import IngestTraces, Traces
from galileo.utils.decorators import (
async_warn_catch_exception,
nop_async,
Expand Down Expand Up @@ -153,6 +154,7 @@ class GalileoLogger(TracesLogger):

_logger = logging.getLogger("galileo.logger")
_traces_client: Optional["Traces"] = None
_ingest_client: Optional["IngestTraces"] = None
_task_handler: ThreadPoolTaskHandler
_trace_completion_submitted: bool

Expand Down Expand Up @@ -305,9 +307,14 @@ def __init__(
self._traces_client = Traces(project_id=self.project_id, log_stream_id=self.log_stream_id)
elif self.experiment_id:
self._traces_client = Traces(project_id=self.project_id, experiment_id=self.experiment_id)

if os.environ.get("GALILEO_INGEST_URL"):
if self.log_stream_id:
self._ingest_client = IngestTraces(project_id=self.project_id, log_stream_id=self.log_stream_id)
elif self.experiment_id:
self._ingest_client = IngestTraces(project_id=self.project_id, experiment_id=self.experiment_id)
else:
# ingestion_hook path: Traces client not created eagerly.
# If the user later calls ingest_traces(), it will be created lazily.
# ingestion_hook path: clients not created eagerly.
self._traces_client = None

# If continuing an existing distributed trace, create local stubs instead of
Expand Down Expand Up @@ -348,19 +355,33 @@ def _init_log_stream(self) -> None:
else:
self.log_stream_id = log_stream_obj.id

def _create_traces_client(self) -> Traces:
"""Lazily create a Traces client when needed (e.g. ingestion_hook users calling ingest_traces)."""
self._logger.info("Creating Traces client lazily for explicit ingest_traces call.")
def _ensure_project_and_log_stream(self) -> None:
"""Ensure project_id and log_stream_id/experiment_id are initialized."""
if not self.project_id:
self._init_project()
if not (self.log_stream_id or self.experiment_id):
self._init_log_stream()

def _create_traces_client(self) -> Traces:
"""Lazily create a Traces client when needed (e.g. ingestion_hook users calling ingest_traces)."""
self._logger.info("Creating Traces client lazily.")
self._ensure_project_and_log_stream()
if self.log_stream_id:
return Traces(project_id=self.project_id, log_stream_id=self.log_stream_id)
if self.experiment_id:
return Traces(project_id=self.project_id, experiment_id=self.experiment_id)
raise GalileoLoggerException("Cannot create Traces client: no log_stream_id or experiment_id available.")

def _create_ingest_client(self) -> IngestTraces:
"""Lazily create an IngestTraces client for the dedicated ingest service."""
self._logger.info("Creating IngestTraces client lazily.")
self._ensure_project_and_log_stream()
if self.log_stream_id:
return IngestTraces(project_id=self.project_id, log_stream_id=self.log_stream_id)
if self.experiment_id:
return IngestTraces(project_id=self.project_id, experiment_id=self.experiment_id)
raise GalileoLoggerException("Cannot create IngestTraces client: no log_stream_id or experiment_id available.")

def _init_distributed_trace_stubs(self) -> None:
"""
Initialize local stub objects for distributed tracing. To only be used in distributed mode.
Expand Down Expand Up @@ -514,7 +535,8 @@ def _ingest_trace_streaming(self, trace: Trace, is_complete: bool = False) -> No
)
@retry_on_transient_http_error
async def ingest_traces_with_backoff(request: Any) -> None:
return await self._traces_client.ingest_traces(request)
client = self._ingest_client or self._traces_client
return await client.ingest_traces(request)

self._task_handler.submit_task(
task_id, lambda: ingest_traces_with_backoff(traces_ingest_request), dependent_on_prev=False
Expand Down Expand Up @@ -564,7 +586,8 @@ def _ingest_span_streaming(self, span: Span) -> None:
)
@retry_on_transient_http_error
async def ingest_spans_with_backoff(request: Any) -> None:
return await self._traces_client.ingest_spans(request)
client = self._ingest_client or self._traces_client
return await client.ingest_spans(request)

self._task_handler.submit_task(
task_id, lambda: ingest_spans_with_backoff(spans_ingest_request), dependent_on_prev=False
Expand Down Expand Up @@ -1008,13 +1031,11 @@ def add_single_llm_span_trace(
if self.current_parent() is not None:
raise ValueError("A trace cannot be created within a parent trace or span, it must always be the root.")

# Trace-level I/O is serialized to string; the child LoggedLlmSpan
# preserves the full structured/multimodal content.
trace = LoggedTrace(
input=serialize_to_str(input),
redacted_input=serialize_to_str(redacted_input) if redacted_input else None,
output=serialize_to_str(output),
redacted_output=serialize_to_str(redacted_output) if redacted_output else None,
input=input,
redacted_input=redacted_input,
output=output,
redacted_output=redacted_output,
name=name,
created_at=created_at,
user_metadata=metadata,
Expand Down Expand Up @@ -1901,7 +1922,8 @@ async def _flush_batch(self) -> list[LoggedTrace]:
else:
self._ingestion_hook(traces_ingest_request)
else:
await self._traces_client.ingest_traces(traces_ingest_request)
client = self._ingest_client or self._traces_client
await client.ingest_traces(traces_ingest_request)
Comment on lines +1925 to +1926
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flush in batch mode can keep using the traces client instead of the dedicated ingest client when GALILEO_INGEST_URL is set after constructing GalileoLogger. Flush selects client via client = self._ingest_client or self._traces_client and immediately calls await client.ingest_traces(...) (lines 1929-1930), but _ingest_client is only created in __init__ so it stays None; can we call _create_ingest_client() before selecting the client (as async_ingest_traces does around 2124-2130) and apply the same lazy-creation fix to other similar call sites?

Finding type: Logical Bugs | Severity: 🔴 High


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In src/galileo/logger/logger.py around lines 1929-1930 (method _flush_batch), the code
picks client = self._ingest_client or self._traces_client and then calls await
client.ingest_traces(...). This fails if GALILEO_INGEST_URL was set after the
GalileoLogger was constructed because _ingest_client was never created. Change the
selection to lazily create the ingest client first (e.g. if self._ingest_client is None
and os.environ.get('GALILEO_INGEST_URL'): self._ingest_client =
self._create_ingest_client()), then set client = self._ingest_client or
self._traces_client (and ensure _traces_client is created if needed). Also update the
similar client-selection logic in _ingest_span_streaming (lines ~550-593) and
_ingest_trace_streaming (lines ~516-542) to lazily create _ingest_client before using it
so the dedicated ingest service is used if the env var is present.


self._logger.info(f"Successfully flushed {trace_count} {'trace' if trace_count == 1 else 'traces'}.")

Expand Down Expand Up @@ -2095,9 +2117,13 @@ async def async_ingest_traces(self, ingest_request: TracesIngestRequest) -> None

Can be used in combination with the `ingestion_hook` to ingest modified traces.
"""
if self._traces_client is None:
self._traces_client = self._create_traces_client()
await self._traces_client.ingest_traces(ingest_request)
if self._ingest_client is None and os.environ.get("GALILEO_INGEST_URL"):
self._ingest_client = self._create_ingest_client()
client = self._ingest_client or self._traces_client
if client is None:
client = self._create_traces_client()
self._traces_client = client
await client.ingest_traces(ingest_request)

@nop_sync
@warn_catch_exception(exceptions=(Exception,))
Expand All @@ -2107,6 +2133,4 @@ def ingest_traces(self, ingest_request: TracesIngestRequest) -> None:

Can be used in combination with the `ingestion_hook` to ingest modified traces.
"""
if self._traces_client is None:
self._traces_client = self._create_traces_client()
return async_run(self.async_ingest_traces(ingest_request))
97 changes: 97 additions & 0 deletions src/galileo/traces.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import logging
import os
from typing import Any, Optional
from uuid import UUID

import httpx

from galileo.config import GalileoPythonConfig
from galileo.constants.routes import Routes
from galileo.schema.trace import (
LoggingMethod,
LogRecordsSearchRequest,
SessionCreateRequest,
SpansIngestRequest,
Expand Down Expand Up @@ -159,3 +163,96 @@ async def get_span(self, span_id: str) -> dict[str, str]:
return await self._make_async_request(
RequestMethod.GET, endpoint=Routes.span.format(project_id=self.project_id, span_id=span_id)
)


class IngestTraces:
"""Client for the dedicated ingest service.

Sends traces directly to the ingest service which may run at a
separate URL from the main Galileo API. The service URL is resolved
from the ``GALILEO_INGEST_URL`` environment variable; when unset it
falls back to the configured ``api_url``.

Parameters
----------
project_id : str
The project to ingest into.
log_stream_id : Optional[str]
Log stream id (mutually exclusive with *experiment_id*).
experiment_id : Optional[str]
Experiment id (mutually exclusive with *log_stream_id*).
"""

def __init__(
self, project_id: str, log_stream_id: Optional[str] = None, experiment_id: Optional[str] = None
) -> None:
self.config = GalileoPythonConfig.get()
self.project_id = project_id
self.log_stream_id = log_stream_id
self.experiment_id = experiment_id

if self.log_stream_id is None and self.experiment_id is None:
raise ValueError("log_stream_id or experiment_id must be set")

def _get_ingest_base_url(self) -> str:
explicit = os.environ.get("GALILEO_INGEST_URL")
if explicit:
return explicit.rstrip("/")
api_url = self.config.api_url or self.config.console_url
return str(api_url).rstrip("/")

def _get_auth_headers(self) -> dict[str, str]:
headers: dict[str, str] = {"Content-Type": "application/json", "X-Galileo-SDK": get_sdk_header()}
if self.config.api_key:
headers["Galileo-API-Key"] = self.config.api_key.get_secret_value()
elif self.config.jwt_token:
headers["Authorization"] = f"Bearer {self.config.jwt_token.get_secret_value()}"
return headers

@async_warn_catch_exception(logger=_logger)
async def ingest_traces(self, traces_ingest_request: TracesIngestRequest) -> dict[str, Any]:
if self.experiment_id:
traces_ingest_request.experiment_id = UUID(self.experiment_id)
elif self.log_stream_id:
traces_ingest_request.log_stream_id = UUID(self.log_stream_id)
Comment on lines +213 to +217
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IngestTraces.ingest_traces repeats almost the same flow as ingest_spans: set the experiment/log stream IDs, flip on LoggingMethod.python_client, build the ingest URL, log the payload size, and post via an httpx.AsyncClient. The only real differences between the two methods are which route/attribute they use and the log message. Any future change to headers, timeout, metrics, or logging therefore needs to be made twice in blocks that are otherwise identical. Can we extract a shared helper (e.g. _post_to_ingest(endpoint: str, payload_attr: str, log_message: str, request: BaseModel)) that handles the ID injection, LoggingMethod assignment, URL construction, httpx request, and logging, and then let ingest_traces/ingest_spans simply call it with the different route and attribute? That keeps the helper consistent (shown below) and leaves the call sites tiny:

async def _post_to_ingest(...):
    if self.experiment_id:
        request.experiment_id = UUID(self.experiment_id)
    elif self.log_stream_id:
        request.log_stream_id = UUID(self.log_stream_id)
    request.logging_method = LoggingMethod.python_client
    url = f"{self._get_ingest_base_url()}{endpoint}"
    _logger.info(log_message, extra={"url": url, "project_id": self.project_id, "num_items": len(getattr(request, payload_attr))})
    async with httpx.AsyncClient(...) as client:
        response = await client.post(url, json=request.model_dump(mode="json"), headers=self._get_auth_headers())
        response.raise_for_status()
        return response.json()
async def ingest_traces(...):
    return await self._post_to_ingest(Routes.ingest_traces.format(...), "traces", "Sending traces to ingest service", traces_ingest_request)

async def ingest_spans(...):
    return await self._post_to_ingest(Routes.ingest_spans.format(...), "spans", "Sending spans to ingest service", spans_ingest_request)

This avoids the same control flow being duplicated across the two methods.

Finding type: Code Dedup and Conventions | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer


traces_ingest_request.logging_method = LoggingMethod.python_client

base_url = self._get_ingest_base_url()
url = f"{base_url}{Routes.ingest_traces.format(project_id=self.project_id)}"
json_body = traces_ingest_request.model_dump(mode="json")

_logger.info(
"Sending traces to ingest service",
extra={"url": url, "project_id": self.project_id, "num_traces": len(traces_ingest_request.traces)},
)

Comment on lines +225 to +229
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ingest_traces logs the "Sending traces to ingest service" start event but never logs success or failure after httpx.AsyncClient.post/response.raise_for_status. This violates AGENTS.md's "Logging & sensitive-data handling" lifecycle requirement and leaves ingest writes unobservable. Can we add a completion log (success or failure with non-sensitive context) immediately after the POST/response.raise_for_status?

Finding type: AI Coding Guidelines | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In src/galileo/traces.py around lines 227 to 231, the ingest_traces method only logs the
start of the request but never logs success or failure after the HTTP POST. Modify
ingest_traces to wrap the httpx post/response.raise_for_status sequence in a try/except:
after a successful response.raise_for_status() emit an _logger.info() completion log
with non-sensitive context (url, project_id, num_traces, response.status_code); on
exceptions (catch httpx.HTTPStatusError and a generic Exception) emit an _logger.error()
with safe context (url, project_id, num_traces, status code or exception message
trimmed) without logging request/response bodies or auth headers, then re-raise the
exception to preserve existing error behavior.

# httpx default timeout is 5s for connect/read/write/pool
# (see https://www.python-httpx.org/advanced/timeouts/)
async with httpx.AsyncClient(verify=self.config.ssl_context) as client:
response = await client.post(url, json=json_body, headers=self._get_auth_headers())
response.raise_for_status()
return response.json()

@async_warn_catch_exception(logger=_logger)
async def ingest_spans(self, spans_ingest_request: SpansIngestRequest) -> dict[str, Any]:
if self.experiment_id:
spans_ingest_request.experiment_id = UUID(self.experiment_id)
elif self.log_stream_id:
spans_ingest_request.log_stream_id = UUID(self.log_stream_id)

spans_ingest_request.logging_method = LoggingMethod.python_client

base_url = self._get_ingest_base_url()
url = f"{base_url}{Routes.ingest_spans.format(project_id=self.project_id)}"
json_body = spans_ingest_request.model_dump(mode="json")

_logger.info(
"Sending spans to ingest service",
extra={"url": url, "project_id": self.project_id, "num_spans": len(spans_ingest_request.spans)},
)

Comment on lines +250 to +254
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ingest_spans only logs 'Sending spans to ingest service' before the HTTP POST and never emits a completion log. AGENTS.md requires start+completion lifecycle logs so operators lack success/failure context; can we add a completion log after response.raise_for_status()/on success that doesn't include sensitive data?

Finding type: AI Coding Guidelines | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In src/galileo/traces.py around lines 252 to 256, the ingest_spans method only logs the
start of the HTTP request and never emits a completion/success log. Add a safe
completion log immediately after response.raise_for_status() and before returning
response.json(), e.g. call _logger.info("Finished sending spans to ingest service",
extra={"url": url, "project_id": self.project_id, "num_spans":
len(spans_ingest_request.spans), "status_code": response.status_code}). Do not log
response body, headers, auth, or any sensitive data. Keep the rest of the method
unchanged.

async with httpx.AsyncClient(verify=self.config.ssl_context) as client:
response = await client.post(url, json=json_body, headers=self._get_auth_headers())
response.raise_for_status()
return response.json()
Loading
Loading