Skip to content

feat: add IngestTraces client for dedicated ingest service#501

Open
pratduv wants to merge 2 commits intosc-58230/local-ingest-modelsfrom
sc-58230/ingest-client
Open

feat: add IngestTraces client for dedicated ingest service#501
pratduv wants to merge 2 commits intosc-58230/local-ingest-modelsfrom
sc-58230/ingest-client

Conversation

@pratduv
Copy link
Contributor

@pratduv pratduv commented Mar 13, 2026

User description

Summary

  • Adds IngestTraces client class that communicates directly with the Go ingest service via httpx, bypassing the auto-generated API client
  • Conditionally activated when GALILEO_INGEST_URL is set; falls back to the existing Traces client otherwise
  • Wires the new client into all GalileoLogger ingestion paths (batch flush, streaming traces, streaming spans)

Changes

New: IngestTraces client (src/galileo/traces.py)

  • Standalone httpx.AsyncClient-based client targeting POST /ingest/traces/{project_id} and POST /ingest/spans/{project_id}
  • Resolves base URL from GALILEO_INGEST_URL env var, falling back to api_url
  • Handles auth headers (API key or JWT) independently
  • Uses @async_warn_catch_exception for resilient telemetry semantics

Modified: GalileoLogger (src/galileo/logger/logger.py)

  • Added _ingest_client field, created only when GALILEO_INGEST_URL is set
  • All ingestion call sites use self._ingest_client or self._traces_client fallback pattern
  • Refactored init into _ensure_project_and_log_stream, _create_traces_client, and _create_ingest_client for lazy creation

New routes (src/galileo/constants/routes.py)

  • ingest_traces = "/ingest/traces/{project_id}"
  • ingest_spans = "/ingest/spans/{project_id}"

Tests (tests/test_ingest_traces_client.py)

  • Constructor validation, URL resolution, auth header generation
  • respx-mocked HTTP tests for ingest_traces and ingest_spans
  • Logger integration: verifies client wiring for batch and streaming modes

Why

The Go ingest service runs at a separate URL from the main API and exposes raw HTTP endpoints (no OpenAPI client). This PR adds a thin httpx client so the SDK can route ingestion traffic directly to it when configured, without changing any behavior for users who don't set the env var.

Test plan

  • All new tests in test_ingest_traces_client.py pass
  • Pre-commit hooks (ruff, mypy) pass
  • Verify existing test suite still passes in CI

Fixes sc-58541

Made with Cursor


Generated description

Below is a concise technical summary of the changes proposed in this PR:
Enable direct ingestion through the new IngestTraces client that targets dedicated ingest endpoints via httpx, respects GALILEO_INGEST_URL, and handles auth/logging metadata so traffic can bypass the autogenerated API client when needed. Wire GalileoLogger ingestion paths to prefer this client, lazily creating it while preserving existing Traces fallbacks for batch and streaming flows.

TopicDetails
Logger wiring Refactor GalileoLogger so every ingestion path prefers the ingest client when configured, lazily creates clients via shared helpers, and keeps the Traces client as a fallback.
Modified files (1)
  • src/galileo/logger/logger.py
Latest Contributors(2)
UserCommitDate
calebe.sep@hotmail.comfix-auto-convert-non-s...March 02, 2026
mason@galileo.aifeat-allow-session-met...February 13, 2026
Ingest client flow Introduce IngestTraces to call the dedicated ingest endpoints, resolve the service URL from GALILEO_INGEST_URL, attach auth and SDK headers, and cover its behavior with HTTP mocks and constructor/auth URL tests.
Modified files (3)
  • src/galileo/constants/routes.py
  • src/galileo/traces.py
  • tests/test_ingest_traces_client.py
Latest Contributors(2)
UserCommitDate
vamaq@users.noreply.gi...fix-Define-explicit-er...February 03, 2026
nachiket@galileo.aifeat-Upgrade-linters-f...September 24, 2025
This pull request is reviewed by Baz. Review like a pro on (Baz).

@pratduv pratduv requested a review from a team as a code owner March 13, 2026 03:48
@pratduv pratduv requested review from anurag-lang and removed request for a team and anurag-lang March 13, 2026 03:48
Comment on lines +215 to +219
async def ingest_traces(self, traces_ingest_request: TracesIngestRequest) -> dict[str, Any]:
if self.experiment_id:
traces_ingest_request.experiment_id = UUID(self.experiment_id)
elif self.log_stream_id:
traces_ingest_request.log_stream_id = UUID(self.log_stream_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IngestTraces.ingest_traces repeats almost the same flow as ingest_spans: set the experiment/log stream IDs, flip on LoggingMethod.python_client, build the ingest URL, log the payload size, and post via an httpx.AsyncClient. The only real differences between the two methods are which route/attribute they use and the log message. Any future change to headers, timeout, metrics, or logging therefore needs to be made twice in blocks that are otherwise identical. Can we extract a shared helper (e.g. _post_to_ingest(endpoint: str, payload_attr: str, log_message: str, request: BaseModel)) that handles the ID injection, LoggingMethod assignment, URL construction, httpx request, and logging, and then let ingest_traces/ingest_spans simply call it with the different route and attribute? That keeps the helper consistent (shown below) and leaves the call sites tiny:

async def _post_to_ingest(...):
    if self.experiment_id:
        request.experiment_id = UUID(self.experiment_id)
    elif self.log_stream_id:
        request.log_stream_id = UUID(self.log_stream_id)
    request.logging_method = LoggingMethod.python_client
    url = f"{self._get_ingest_base_url()}{endpoint}"
    _logger.info(log_message, extra={"url": url, "project_id": self.project_id, "num_items": len(getattr(request, payload_attr))})
    async with httpx.AsyncClient(...) as client:
        response = await client.post(url, json=request.model_dump(mode="json"), headers=self._get_auth_headers())
        response.raise_for_status()
        return response.json()
async def ingest_traces(...):
    return await self._post_to_ingest(Routes.ingest_traces.format(...), "traces", "Sending traces to ingest service", traces_ingest_request)

async def ingest_spans(...):
    return await self._post_to_ingest(Routes.ingest_spans.format(...), "spans", "Sending spans to ingest service", spans_ingest_request)

This avoids the same control flow being duplicated across the two methods.

Finding type: Code Dedup and Conventions | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

Comment on lines +227 to +231
_logger.info(
"Sending traces to ingest service",
extra={"url": url, "project_id": self.project_id, "num_traces": len(traces_ingest_request.traces)},
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ingest_traces logs the "Sending traces to ingest service" start event but never logs success or failure after httpx.AsyncClient.post/response.raise_for_status. This violates AGENTS.md's "Logging & sensitive-data handling" lifecycle requirement and leaves ingest writes unobservable. Can we add a completion log (success or failure with non-sensitive context) immediately after the POST/response.raise_for_status?

Finding type: AI Coding Guidelines | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In src/galileo/traces.py around lines 227 to 231, the ingest_traces method only logs the
start of the request but never logs success or failure after the HTTP POST. Modify
ingest_traces to wrap the httpx post/response.raise_for_status sequence in a try/except:
after a successful response.raise_for_status() emit an _logger.info() completion log
with non-sensitive context (url, project_id, num_traces, response.status_code); on
exceptions (catch httpx.HTTPStatusError and a generic Exception) emit an _logger.error()
with safe context (url, project_id, num_traces, status code or exception message
trimmed) without logging request/response bodies or auth headers, then re-raise the
exception to preserve existing error behavior.

Comment on lines +252 to +256
_logger.info(
"Sending spans to ingest service",
extra={"url": url, "project_id": self.project_id, "num_spans": len(spans_ingest_request.spans)},
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ingest_spans only logs 'Sending spans to ingest service' before the HTTP POST and never emits a completion log. AGENTS.md requires start+completion lifecycle logs so operators lack success/failure context; can we add a completion log after response.raise_for_status()/on success that doesn't include sensitive data?

Finding type: AI Coding Guidelines | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In src/galileo/traces.py around lines 252 to 256, the ingest_spans method only logs the
start of the HTTP request and never emits a completion/success log. Add a safe
completion log immediately after response.raise_for_status() and before returning
response.json(), e.g. call _logger.info("Finished sending spans to ingest service",
extra={"url": url, "project_id": self.project_id, "num_spans":
len(spans_ingest_request.spans), "status_code": response.status_code}). Do not log
response body, headers, auth, or any sensitive data. Keep the rest of the method
unchanged.

@pratduv pratduv force-pushed the sc-58230/local-ingest-models branch from 1a29b4f to 2405b0e Compare March 13, 2026 04:12
Adds a new IngestTraces client that talks directly to the Go ingest
service via httpx, activated when GALILEO_INGEST_URL is set. Falls
back to the existing Traces client (main API) otherwise.

[sc-58541]
@pratduv pratduv force-pushed the sc-58230/ingest-client branch from cce7d18 to 4d196b7 Compare March 13, 2026 04:16
Comment on lines +1929 to +1930
client = self._ingest_client or self._traces_client
await client.ingest_traces(traces_ingest_request)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flush in batch mode can keep using the traces client instead of the dedicated ingest client when GALILEO_INGEST_URL is set after constructing GalileoLogger. Flush selects client via client = self._ingest_client or self._traces_client and immediately calls await client.ingest_traces(...) (lines 1929-1930), but _ingest_client is only created in __init__ so it stays None; can we call _create_ingest_client() before selecting the client (as async_ingest_traces does around 2124-2130) and apply the same lazy-creation fix to other similar call sites?

Finding type: Logical Bugs | Severity: 🔴 High


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In src/galileo/logger/logger.py around lines 1929-1930 (method _flush_batch), the code
picks client = self._ingest_client or self._traces_client and then calls await
client.ingest_traces(...). This fails if GALILEO_INGEST_URL was set after the
GalileoLogger was constructed because _ingest_client was never created. Change the
selection to lazily create the ingest client first (e.g. if self._ingest_client is None
and os.environ.get('GALILEO_INGEST_URL'): self._ingest_client =
self._create_ingest_client()), then set client = self._ingest_client or
self._traces_client (and ensure _traces_client is created if needed). Also update the
similar client-selection logic in _ingest_span_streaming (lines ~550-593) and
_ingest_trace_streaming (lines ~516-542) to lazily create _ingest_client before using it
so the dedicated ingest service is used if the env var is present.

Comment on lines +35 to +39
@patch("galileo.traces.GalileoPythonConfig")
def test_accepts_log_stream_id(self, mock_config_class) -> None:
mock_config_class.get.return_value = Mock()

client = IngestTraces(project_id=PROJECT_ID, log_stream_id=LOG_STREAM_ID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_accepts_log_stream_id and other methods in TestIngestTracesInit omit the sentence-case # Given:/When:/Then: comments required by AGENTS.md, so these tests deviate from the documented testing convention and reduce clarity. Can we add non-empty sentence-case # Given:/When:/Then: comments to each test in tests/?

Finding type: AI Coding Guidelines | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In tests/test_ingest_traces_client.py around lines 35 to 39 (and similarly for the other
test methods in this file such as those in TestIngestTracesInit and the request test
classes), the test methods lack the required sentence-case `# Given:/When:/Then:`
comments per AGENTS.md. Edit each test function to add three short, sentence-case
comments: `# Given:` describing the test setup, `# When:` describing the action under
test, and `# Then:` describing the expected outcome (do not leave any of them empty).
Keep existing code and assertions unchanged aside from inserting these comments in the
appropriate places immediately above the relevant code blocks.

Comment on lines +148 to +153
@respx.mock
@pytest.mark.asyncio
async def test_ingest_traces_posts_to_correct_url(self, client, monkeypatch) -> None:
# Given: an ingest URL is configured
monkeypatch.setenv("GALILEO_INGEST_URL", INGEST_URL)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestIngestTracesRequest uses @respx.mock/httpx instead of the shared mock_request fixture required by AGENTS.md (lines 207–235) and tests/conftest.py. This bypasses the shared setup (including --disable-socket) and breaks the documented mocking convention; can we refactor these tests to use mock_request and the accompanying mocks instead of wiring respx manually?

Finding type: AI Coding Guidelines | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In tests/test_ingest_traces_client.py around lines 148 to 153, the tests in
TestIngestTracesRequest use @respx.mock and respx.post to stub HTTP calls which violates
AGENTS.md and bypasses the shared mock_request fixture. Refactor these tests to remove
@respx.mock and respx usage: have each async test accept the mock_request fixture (add
mock_request to the parameter list), use that fixture to register the expected ingest
URL and canned httpx response (matching the JSON bodies used today), and assert route
calls via the mock_request tracking API instead of respx. Apply the same replacement to
the other tests in TestIngestTracesRequest and TestIngestSpansRequest that use respx so
all HTTP interactions reuse mock_request and respect --disable-socket.

@pratduv pratduv force-pushed the sc-58230/local-ingest-models branch 6 times, most recently from c9eb90e to f2c95c6 Compare March 14, 2026 00:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant