refactor(tests): split suites by execution level and speed up CI#1609
refactor(tests): split suites by execution level and speed up CI#1609
Conversation
|
@claude review |
| 1 | ||
| for node in module.body | ||
| if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)) | ||
| and node.name.startswith("test_") | ||
| ) | ||
|
|
||
|
|
||
| def estimate_weight(path: Path) -> int: |
There was a problem hiding this comment.
count_test_functions misses class-based test methods
count_test_functions inspects only module.body (top-level nodes), so any test defined as a method inside a class TestFoo block is invisible to the weight estimator. A file that uses class-based tests would be assigned weight 1 (the max(count, 1) floor), potentially skewing shard balance as the e2e suite grows.
A minimal fix:
def count_test_functions(path: Path) -> int:
module = ast.parse(path.read_text(encoding="utf-8"))
count = 0
for node in ast.walk(module):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)) and node.name.startswith("test_"):
count += 1
return countUsing ast.walk visits all nested scopes including class bodies.
| return result | ||
|
|
||
| if monotonic() >= deadline: | ||
| return result | ||
|
|
||
| sleep(interval_seconds) | ||
|
|
||
| if monotonic() >= deadline and last_error is not None: |
There was a problem hiding this comment.
Stale error raised after a successful-but-not-ready result
At the bottom of the loop:
sleep(interval_seconds)
if monotonic() >= deadline and last_error is not None:
raise last_errorlast_error is only cleared when a non-retryable exception is re-raised; it is never reset after a successful call. If a prior iteration set last_error (e.g. a NotFoundError) and a later iteration returns a result that fails is_result_ready, the code sleeps, the deadline passes, and the old NotFoundError is raised—even though the operation succeeded. The caller would see a confusing, stale error rather than the (admittedly not-yet-ready) result.
Fix: reset last_error in the else branch after a successful operation:
else:
last_error = None # successful call; discard any prior error
if is_result_ready is None or is_result_ready(result):
return result
if monotonic() >= deadline:
return result| def run(self) -> None: | ||
| while self.running: | ||
| task = self._queue.get() | ||
|
|
||
| if task is _SHUTDOWN_SENTINEL: | ||
| self._queue.task_done() | ||
| continue | ||
|
|
||
| logger.debug( | ||
| f"PromptCacheRefreshConsumer processing task, {self._identifier}" | ||
| ) | ||
| try: | ||
| task = self._queue.get(timeout=1) | ||
| logger.debug( | ||
| f"PromptCacheRefreshConsumer processing task, {self._identifier}" | ||
| task() | ||
| # Task failed, but we still consider it processed | ||
| except Exception as e: | ||
| logger.warning( | ||
| f"PromptCacheRefreshConsumer encountered an error, cache was not refreshed: {self._identifier}, {e}" | ||
| ) | ||
| try: | ||
| task() | ||
| # Task failed, but we still consider it processed | ||
| except Exception as e: | ||
| logger.warning( | ||
| f"PromptCacheRefreshConsumer encountered an error, cache was not refreshed: {self._identifier}, {e}" | ||
| ) | ||
|
|
||
| self._queue.task_done() | ||
| except Empty: | ||
| pass | ||
| self._queue.task_done() | ||
|
|
||
| def pause(self) -> None: | ||
| """Pause the consumer.""" | ||
| self.running = False | ||
| self._queue.put(_SHUTDOWN_SENTINEL) |
There was a problem hiding this comment.
PromptCacheRefreshConsumer shutdown is unsafe when threads > 1
shutdown() calls consumer.pause() once per consumer; each pause() sets self.running = False and puts one _SHUTDOWN_SENTINEL into the shared queue. With N consumers sharing one queue there are N sentinels, but nothing prevents consumer 0 from draining all N sentinels before consumer 1 ever wakes up—leaving consumer 1 blocked on the blocking queue.get() indefinitely, and causing consumer.join() to deadlock.
In practice DEFAULT_PROMPT_CACHE_REFRESH_WORKERS = 1 keeps this safe today, but PromptCache(max_prompt_refresh_workers=10) is used in test_prompt_atexit.py, so the multi-consumer path is exercised in tests. A safe pattern is to give each consumer its own private queue or to use threading.Event for the stop signal and keep a short timeout on queue.get() as a fallback.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: eb98e3ad52
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| LangfuseResourceManager.reset() | ||
| yield | ||
| LangfuseResourceManager.reset() |
There was a problem hiding this comment.
Avoid global reset fixture for all suites
Calling LangfuseResourceManager.reset() before every test invalidates any module-level Langfuse() instances created at import time (for example tests/live_provider/test_openai.py), so later langfuse.flush() calls operate on a shutdown resource manager while instrumentation paths (get_client(...)) create a new one; this can leave events unflushed and make live-provider assertions intermittently fail due to eventual-consistency timing. Limit this autouse reset to suites that only use per-test clients, or re-initialize module-level clients after reset.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 6fed925e43
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if task is _SHUTDOWN_SENTINEL: | ||
| self._queue.task_done() | ||
| continue |
There was a problem hiding this comment.
Exit prompt refresh worker on shutdown sentinel
In PromptCacheRefreshConsumer.run, consuming _SHUTDOWN_SENTINEL currently does continue, which allows a non-paused worker to eat a sentinel intended to wake another paused worker. With max_prompt_refresh_workers > 1, this race can leave a paused worker blocked forever on queue.get(), and PromptCacheTaskManager.shutdown() can hang indefinitely on join(). This is reachable through the public PromptCache(max_prompt_refresh_workers=...) constructor and can freeze test/process shutdown.
Useful? React with 👍 / 👎.
| if monotonic() >= deadline and last_error is not None: | ||
| raise last_error |
There was a problem hiding this comment.
Don't raise stale retry error after a later successful call
retry_until_ready can raise an old last_error even after operation() has started succeeding: if an early retryable error is stored, then a later call returns a not-ready result and the deadline is crossed after sleep, the function raises the stale exception from line 63 instead of returning the latest result. This makes eventual-consistency reads (for example list/get_many waits) fail with misleading NotFoundError even when the entity is now visible.
Useful? React with 👍 / 👎.
| else: | ||
| if is_result_ready is None or is_result_ready(result): | ||
| return result | ||
|
|
||
| if monotonic() >= deadline: | ||
| return result | ||
|
|
||
| sleep(interval_seconds) | ||
|
|
||
| if monotonic() >= deadline and last_error is not None: | ||
| raise last_error |
There was a problem hiding this comment.
🔴 In retry_until_ready (tests/support/retry.py:54-64), last_error is set when a retryable exception occurs but is never cleared when a subsequent call succeeds. If an early attempt fails (setting last_error), a later attempt succeeds but is_result_ready returns False, and then sleep(interval_seconds) pushes monotonic() past the deadline, the final guard on lines 63-64 raises the stale error from the earlier failure instead of returning the valid result. Fix: add last_error = None at the start of the else block.
Extended reasoning...
The bug is real and introduced by the new tests/support/retry.py file in this PR.
What the bug is: retry_until_ready maintains a last_error variable that is set whenever a retryable exception is caught (lines 46-50). The else branch — executed when operation() succeeds — never resets last_error to None. A stale exception from an earlier failed attempt therefore persists even after the operation has successfully returned a result in a later attempt.
The specific code path: (1) Attempt N raises a retryable error → last_error = error_N. (2) Attempt M (M > N) succeeds; the else block runs. (3) is_result_ready(result) returns False. (4) The inline deadline check at line 58 (if monotonic() >= deadline: return result) is False — still within deadline. (5) sleep(interval_seconds) at line 61 runs and its duration pushes monotonic() past the deadline. (6) The final guard at lines 63-64 fires: monotonic() >= deadline and last_error is not None is True → raises the stale error_N instead of returning the valid result from step 2.
Why existing code does not prevent it: The only safeguard is the inline deadline check at line 58, which returns the result early when is_result_ready fails and the deadline is already exceeded. But that check cannot protect against the sleep call itself crossing the deadline — a window that is exactly as wide as interval_seconds (default 0.25 s, or 0.5 s in CI).
Impact: An e2e test that eventually obtains a valid API response can spuriously fail with a LangfuseNotFoundError that was raised minutes earlier in the same retry loop. The failure is intermittent and misleading because the error message describes a problem that had already resolved.
Step-by-step proof (timeline with timeout_seconds=1.0, interval_seconds=0.25):
- T+0.00s: Attempt 1 raises
NotFoundError→last_error = err1. - T+0.25s:
sleepcompletes;monotonic() = 0.25 < 1.0so we retry. - T+0.25s: Attempt 2 succeeds;
elseblock runs;is_result_ready→False;monotonic() = 0.25 < 1.0so we do NOT return early;last_erroris stillerr1. - T+0.25s..T+1.01s:
sleep(0.25)runs (system slow, actual sleep = 0.76s). - T+1.01s: Final guard:
monotonic() >= deadlineisTrueANDlast_error is not None→ raiseserr1instead of returning the valid result from Attempt 2.
Fix: Add last_error = None at the start of the else block (immediately after line 54). This ensures a successful operation always clears any prior saved error, so the final guard can only re-raise a currently-relevant error.
| - name: Run mypy type checking | ||
| run: uv run --frozen mypy langfuse --no-error-summary | ||
|
|
||
| ci: | ||
| runs-on: ubuntu-latest | ||
| unit-tests: | ||
| runs-on: blacksmith-4vcpu-ubuntu-2404 | ||
| timeout-minutes: 30 | ||
| env: | ||
| LANGFUSE_BASE_URL: "http://localhost:3000" | ||
| LANGFUSE_PUBLIC_KEY: "pk-lf-1234567890" | ||
| LANGFUSE_SECRET_KEY: "sk-lf-1234567890" | ||
| OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} | ||
| # SERPAPI_API_KEY: ${{ secrets.SERPAPI_API_KEY }} | ||
| HUGGINGFACEHUB_API_TOKEN: ${{ secrets.HUGGINGFACEHUB_API_TOKEN }} | ||
| ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }} | ||
| LANGFUSE_PUBLIC_KEY: "pk-lf-test" | ||
| LANGFUSE_SECRET_KEY: "sk-lf-test" | ||
| OPENAI_API_KEY: "test-openai-key" | ||
| strategy: | ||
| fail-fast: false | ||
| matrix: |
There was a problem hiding this comment.
🟡 The new unit-tests job uses unpinned mutable action tags (actions/checkout@v3 and astral-sh/setup-uv@v7) while every other job in the same workflow file — linting, type-checking, and e2e-tests — uses pinned SHA hashes, creating an unintentional supply-chain regression. Fix by replacing those two steps with the same pinned references already used in the e2e-tests job (actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd and astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8).
Extended reasoning...
What the bug is and how it manifests
The unit-tests job introduced by this PR uses two mutable action tags:
- uses: actions/checkout@v3
- uses: astral-sh/setup-uv@v7A mutable tag can be force-pushed to point at any commit at any time. If either tag is redirected to malicious code, every CI run of the unit-test matrix (Python 3.10–3.14, five parallel jobs) will silently execute that code.
The specific code path that triggers it
See .github/workflows/ci.yml, unit-tests job, steps block (lines ~75-82 in the current file). The original ci: job used actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6. The refactor replaced the old job with a new unit-tests job but did not carry forward the pinned SHA. The e2e-tests job created in the same PR correctly pins both actions, as do linting and type-checking.
Why existing code does not prevent it
GitHub Actions resolves mutable tags at job execution time, not at PR merge time. There is no framework-level mechanism in this repo that enforces SHA pinning; it is purely a convention. Because only this one new job was written with tags instead of SHAs, the enforcement gap was invisible during review.
Impact
A supply-chain compromise via a poisoned tag would allow an attacker to execute arbitrary code in the CI runner for every Python version in the matrix. The practical blast radius is limited here because the unit-tests job deliberately uses only placeholder credentials (pk-lf-test, sk-lf-test, test-openai-key) — no real secrets are injected. However, the runner itself could still be used as a pivot point or to exfiltrate build artifacts, and the inconsistency is an unintentional regression that violates the repo's established security posture.
Additionally, the setup-uv version tag differs: v7 in unit-tests vs v8 (pinned) in e2e-tests. Both specify version: "0.11.2" for the uv binary, so the functional impact is likely zero today, but the mismatch makes it harder to reason about toolchain consistency.
How to fix it
Replace the two unpinned steps in the unit-tests job with the same pinned references used in e2e-tests:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
- uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8Step-by-step proof of the inconsistency
- Open
.github/workflows/ci.ymlin this PR. lintingjob, steps 1–2:checkout@de0fac2e...andsetup-uv@cec20831...— pinned.type-checkingjob, steps 1–2: same pinned SHAs.e2e-testsjob, steps 1–2: same pinned SHAs.unit-testsjob, steps 1–2:checkout@v3andsetup-uv@v7— mutable tags.- The diff shows the original
ci:job usedactions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6, confirming the regression was introduced by this PR when the job was split and rewritten.
| uv run --frozen pytest tests/e2e -m "serial_e2e" | ||
|
|
||
| # Live provider tests | ||
| uv run --frozen pytest -n 4 --dist worksteal tests/live_provider -m "live_provider" | ||
|
|
||
| # Single test | ||
| uv run --frozen pytest tests/unit/test_resource_manager.py::test_pause_signals_score_consumer_shutdown | ||
| ``` |
There was a problem hiding this comment.
🟡 The single-test example in AGENTS.md (line 66) references test_pause_signals_score_consumer_shutdown, which does not exist; the actual tests added by this PR are test_score_ingestion_consumer_pause_wakes_blocked_thread and test_media_upload_consumer_pause_wakes_blocked_thread. Running the documented command will fail with pytest exit code 4 (no tests collected), misleading any developer who follows the example.
Extended reasoning...
What the bug is and how it manifests
The single-test example in AGENTS.md line 66 reads:
uv run --frozen pytest tests/unit/test_resource_manager.py::test_pause_signals_score_consumer_shutdown
The test name test_pause_signals_score_consumer_shutdown does not exist anywhere in the repository. Running this command returns pytest exit code 4 (no tests were collected).
The specific code path that triggers it
The two regression tests actually added in tests/unit/test_resource_manager.py by this PR are:
- test_score_ingestion_consumer_pause_wakes_blocked_thread
- test_media_upload_consumer_pause_wakes_blocked_thread
Neither matches the name used in the AGENTS.md example.
Why existing code does not prevent it
Documentation examples are not validated by CI or linters. The unit-test suite passes because pytest simply ignores a node-id it cannot collect when invoked as a standalone command, so no CI check catches the mismatch.
Impact
Any agent or developer who copies the example command to run the new shutdown regression tests will see a silent no-tests-ran result. This could mask a broken test environment or lead someone to believe the tests passed when they were never executed.
How to fix it
Replace the nonexistent test name with one (or both) of the real test names, for example:
uv run --frozen pytest tests/unit/test_resource_manager.py::test_score_ingestion_consumer_pause_wakes_blocked_thread
Step-by-step proof
- This PR adds tests/unit/test_resource_manager.py containing test_score_ingestion_consumer_pause_wakes_blocked_thread and test_media_upload_consumer_pause_wakes_blocked_thread.
- This PR also adds AGENTS.md with the example referencing test_pause_signals_score_consumer_shutdown.
- Grep of test_resource_manager.py confirms zero occurrences of the string test_pause_signals_score_consumer_shutdown.
- Running the documented command: pytest collects 0 items and exits with code 4.
- The developer sees no failures but also no passing tests - the example silently does nothing useful.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 170beb8201
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if upload_job is _SHUTDOWN_SENTINEL: | ||
| self._queue.task_done() | ||
| return |
There was a problem hiding this comment.
Exit media worker when consuming shutdown sentinel
With LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT > 1, this sentinel path can be consumed by a different worker than the one being paused, and because it just returns (without forcing that worker to stop), the same active worker can consume multiple sentinels while another paused worker stays blocked on queue.get(). During LangfuseResourceManager._stop_and_join_consumer_threads(), that blocked worker can make join() hang indefinitely. This is a regression from the previous timeout-based wakeup and is reachable via the public media thread-count configuration.
Useful? React with 👍 / 👎.
| with self._lock: | ||
| for key in list(self._cache): | ||
| if key.startswith(prompt_name): | ||
| del self._cache[key] |
There was a problem hiding this comment.
🟣 Pre-existing bug: PromptCache.invalidate(prompt_name) uses key.startswith(prompt_name) at line 181, which over-invalidates cache entries when one prompt name is a prefix of another. For example, invalidating prompt "foo" also silently evicts the "foo-bar-label:production" entry because "foo-bar-label:production".startswith("foo") is True. The PR directly touches this method (adding the RLock wrapper) without correcting the prefix check; a correct fix must match against name + "-label:" or name + "-version:" rather than a bare name prefix.
Extended reasoning...
What the bug is and how it manifests
PromptCache.invalidate(prompt_name) iterates over all cached keys and deletes any whose string representation starts with prompt_name. Cache keys are produced by generate_cache_key() with the format "{name}-label:{L}" or "{name}-version:{N}" (the name and the qualifier are joined with "-"). When two prompt names share a prefix relationship — e.g. "summarize" and "summarize-brief" — the over-broad check causes invalidate("summarize") to also delete "summarize-brief-label:production", silently evicting a cache entry the caller never intended to touch.
The specific code path that triggers it
PromptCache.invalidate at lines 179-182 of langfuse/_utils/prompt_cache.py:
for key in list(self._cache):
if key.startswith(prompt_name): # too broad
del self._cache[key]
generate_cache_key("summarize-brief", version=None, label=None) produces "summarize-brief-label:production". "summarize-brief-label:production".startswith("summarize") is True, so a call to invalidate("summarize") removes it.
Why existing code does not prevent it
The startswith check has no awareness of the "-" separator that separates the prompt name from the qualifier. Any prompt name that is a byte-level prefix of another (without even requiring a hyphen boundary) will trigger false matches. The RLock wrapper added in this PR serializes access correctly but leaves the matching predicate unchanged.
Impact
Any deployment that uses prompt names where one is a string prefix of another (a common pattern: "my-prompt" / "my-prompt-v2", or "summarize" / "summarize-brief") will silently lose cache entries on every call to update_prompt (which calls invalidate). The evicted prompt will be fetched from the server on the next get_prompt call — an unnecessary round-trip that also resets the TTL.
How to fix it
Because the separator character "-" can also appear in prompt names, even startswith(prompt_name + "-") is insufficient (e.g. invalidating "foo" with startswith("foo-") still matches "foo-bar-label:production"). A correct fix anchors to the known qualifier tokens produced by generate_cache_key:
for key in list(self._cache):
if key.startswith(prompt_name + "-label:") or key.startswith(prompt_name + "-version:"):
del self._cache[key]
Step-by-step proof
- Two prompts are created: "foo" and "foo-bar".
- generate_cache_key("foo", version=None, label=None) -> "foo-label:production".
- generate_cache_key("foo-bar", version=None, label=None) -> "foo-bar-label:production".
- Both entries are populated in _cache.
- update_prompt(name="foo", ...) calls invalidate("foo").
- Loop checks "foo-label:production".startswith("foo") -> True (correct eviction).
- Loop checks "foo-bar-label:production".startswith("foo") -> True (spurious eviction).
- The "foo-bar" entry is deleted; the next get_prompt("foo-bar") triggers an unintended server round-trip.
| continue | ||
|
|
||
| logger.debug( | ||
| f"PromptCacheRefreshConsumer processing task, {self._identifier}" |
There was a problem hiding this comment.
🔴 In PromptCacheRefreshConsumer.run() (langfuse/_utils/prompt_cache.py:54-57), consuming a _SHUTDOWN_SENTINEL does task_done() then continue, which re-checks while self.running — if the consumer that stole the sentinel still has running=True, it loops back and blocks on queue.get() again. With N consumers sharing one queue and shutdown() putting exactly N sentinels, a non-paused consumer can steal a sentinel meant for a paused consumer, leaving the paused consumer permanently blocked on get() and causing consumer.join() to hang. Fix: change continue to break so that receiving any sentinel always exits the loop unconditionally.
Extended reasoning...
What the bug is and how it manifests
In PromptCacheRefreshConsumer.run(), the sentinel-handling branch looks like:
if task is _SHUTDOWN_SENTINEL:
self._queue.task_done()
continue # loops back to the top of while self.running:
After task_done() the code hits continue, which returns control to the top of the while self.running: loop. If self.running is still True at that point (because this consumer stole a sentinel that was intended for a different, already-paused consumer), it falls through to self._queue.get() again and blocks indefinitely. The sentinel has been consumed but the consumer is still running.
The specific code path that triggers it
shutdown() calls consumer.pause() once per consumer. Each pause() sets self.running = False and puts exactly one _SHUTDOWN_SENTINEL into the shared queue. With N consumers the queue contains exactly N sentinels. However, the sentinels are indistinguishable: any consumer that is unblocked can dequeue any sentinel. If a consumer with running=True dequeues a sentinel, it calls task_done() + continue, re-evaluates while self.running: True, then blocks again on get(). The sentinel is gone. The consumer whose pause() put that sentinel never receives it.
Why existing code does not prevent it
The while self.running: check at the top of the loop is the only exit condition besides exhausting the queue. Because self.running is set by pause() on the calling thread and the loop body re-checks it each iteration, a consumer that has not yet had its own pause() called will see running=True even after consuming a sentinel that was never meant for it. The RLock added in this PR serialises cache access correctly but does not affect the consumer loop.
Concrete deadlock scenario (N=2 workers)
- consumer_0 and consumer_1 are both blocked on queue.get(); running_0=True, running_1=True.
- pause(consumer_0): running_0 = False, puts sentinel_0 in queue.
- consumer_1 (running_1=True) dequeues sentinel_0: task_done() + continue -> while running_1=True -> blocks on get() again. sentinel_0 is gone.
- pause(consumer_1): running_1 = False, puts sentinel_1 in queue.
- consumer_1 dequeues sentinel_1: task_done() + continue -> while running_1=False -> exits loop cleanly. consumer_1.join() returns.
- consumer_0 is still blocked on get(). running_0=False but no sentinel remains. consumer_0.join() hangs forever.
Impact
The deadlock is reachable through the public API via PromptCache(max_prompt_refresh_workers=N) with N>1. DEFAULT_PROMPT_CACHE_REFRESH_WORKERS=1 prevents this in production defaults, but test_prompt_atexit.py uses max_prompt_refresh_workers=10 and is annotated @pytest.mark.timeout(10). Any process exit that triggers this race will hang.
How to fix it
Change continue to break at line 57 of langfuse/_utils/prompt_cache.py:
if task is _SHUTDOWN_SENTINEL:
self._queue.task_done()
break # exit the loop unconditionally regardless of self.running
This ensures that any consumer that receives a sentinel exits its loop immediately, regardless of its own running state. Because shutdown() puts exactly one sentinel per consumer, every consumer will exit.
|
|
||
|
|
||
| def estimate_weight(path: Path) -> int: | ||
| try: | ||
| relative_path = relative_test_path(path) | ||
| except ValueError: | ||
| relative_path = None |
There was a problem hiding this comment.
🟡 count_test_functions (scripts/select_e2e_shard.py:38-44) iterates over module.body instead of ast.walk(module), so test methods defined inside a class (e.g. class TestFoo: def test_bar(self): ...) are never counted. Any future e2e file that uses class-based tests and lacks a HISTORICAL_WEIGHTS entry will receive weight=1 regardless of how many tests it contains, potentially producing imbalanced shards.
Extended reasoning...
What the bug is and how it manifests
count_test_functions builds its count by iterating over module.body — the list of top-level AST nodes. A class definition such as class TestFoo: appears as a single ast.ClassDef node in module.body; the individual def test_bar(self) methods inside that class are children of that ClassDef, not direct children of module. They are therefore never visited by the loop and never counted.
For a file that uses only class-based tests, count_test_functions returns 0. estimate_weight then falls through to max(count_test_functions(path), 1) and returns 1 — the minimum possible weight — regardless of how many tests the file actually contains.
The specific code path that triggers it
scripts/select_e2e_shard.py lines 38-44:
def count_test_functions(path: Path) -> int:
module = ast.parse(path.read_text(encoding="utf-8"))
return sum(
1
for node in module.body # top-level only
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef))
and node.name.startswith("test_")
)A file containing class TestFoo: / def test_bar(self): ... produces no ast.FunctionDef nodes at the module.body level; only an ast.ClassDef node appears there.
Why existing code does not prevent it
The fallback path (count_test_functions) is guarded by HISTORICAL_WEIGHTS: if the relative path of a discovered e2e file is listed in that dict, count_test_functions is never called. Currently all seven e2e files are present in HISTORICAL_WEIGHTS, so the defect has zero observable impact today. However, as confirmed by all three agreeing verifiers, the code is structurally wrong and will silently miscount any new e2e file that (a) is not added to HISTORICAL_WEIGHTS and (b) organises its tests in a class.
Regarding the refutation that this duplicates a previously-refuted bug: the refutation addresses current impact, which is indeed zero. The code defect itself is real and introduced by this PR, and the AGENTS.md instructions explicitly tell developers to add new test files under tests/e2e/ without adding them to any special registry, making it likely the fallback path will eventually be exercised.
Impact
The consequence is sharding imbalance, not incorrect test coverage: all tests still run, they are just potentially grouped into heavily skewed shards. This can slow CI by lengthening the critical path of the slower shard, but will not cause failures or missed tests.
Step-by-step proof
- A developer creates tests/e2e/test_new_feature.py containing:
class TestNewFeature:
def test_alpha(self): ...
def test_beta(self): ...
def test_gamma(self): ... - They do not add it to HISTORICAL_WEIGHTS (per current guidance: CI shards tests/e2e mechanically).
- discover_e2e_files() picks it up.
- estimate_weight calls count_test_functions(path).
- ast.parse produces a module whose module.body contains a single ClassDef node, no FunctionDef nodes.
- The sum comprehension yields 0.
- estimate_weight returns max(0, 1) == 1.
- assign_shards assigns it weight 1 instead of weight 3, skewing the shard balance.
How to fix it
Replace module.body with ast.walk(module):
def count_test_functions(path: Path) -> int:
module = ast.parse(path.read_text(encoding="utf-8"))
return sum(
1
for node in ast.walk(module)
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef))
and node.name.startswith("test_")
)
Summary
This PR restructures the test suite around execution level and follows through on the CI and reliability work needed to make that split useful in practice.
tests/unittests/e2etests/live_providertests/supportAGENTS.mdand removeCLAUDE.mdCI and test infrastructure
This also updates CI and test plumbing so the new layout stays fast and reliable.
tests/uniton the Python 3.10-3.14 matrixtests/e2ein 2 mechanical shards selected byscripts/select_e2e_shard.pytests/live_provideras one dedicated suite that always runsserial_e2eas the scheduling-specific pytest markere2e_core/e2e_datamarker routing with mechanical file shardingcancel-in-progress: trueLANGFUSE_INIT_*Unit test speedups
The unit-suite speed work keeps coverage intact rather than weakening assertions.
--dist workstealfor the unit laneWhy
The repo already had a mix of local-only, real-server, and live-provider tests, but the old flat
tests/layout made the boundary hard to see and easy to erode. This change makes the split explicit in the filesystem, keeps unit coverage fast, and keeps the expensive e2e and provider surface intentional.Results
Local measurements after the unit refactor:
tests/unit/test_otel.py+tests/unit/test_propagate_attributes.py:215.08s->2.38stests/unitrun:360 passed, 2 skipped in 13.42s-n auto --dist worksteal:360 passed, 2 skipped in 5.42sValidation
uv run --frozen pytest -q tests/unit --maxfail=1uv run --frozen pytest -q -n auto --dist worksteal tests/unit --maxfail=1uv run --frozen pytest --collect-only -q tests/unit tests/e2e tests/live_provideruv run --frozen pytest -q tests/unit/test_e2e_sharding.pyuv run --frozen pytest --collect-only -q tests/live_provider -m 'live_provider'uv run --frozen ruff check scripts/select_e2e_shard.py tests/conftest.py tests/unit/test_e2e_sharding.py tests/e2e/test_core_sdk.pyCI is the authoritative validation for the real Langfuse-server and live-provider suites.
Disclaimer: Experimental PR review
Greptile Summary
This PR is a large, well-engineered refactoring that splits the flat
tests/directory intotests/unit,tests/e2e,tests/live_provider, andtests/support, and rewires CI to match. It also meaningfully speeds up the unit suite (360 tests in ~5 s) by replacing polling-based waits with sentinel-driven shutdown and deterministic queue draining, and adds proper thread-safety locks toPromptCache. The structural changes are clean and the new retry/sharding helpers are solid.Key changes:
langfuse/_utils/prompt_cache.py:PromptCacheandPromptCacheTaskManagerare now fully protected byRLock; consumer switches from a 1-second polling loop to a blockingqueue.get()with sentinel-based shutdown;add_refresh_prompt_task_if_current()avoids redundant background refreshes.langfuse/_task_manager/media_upload_consumer.py/score_ingestion_consumer.py/media_manager.py: sentinel-drivenpause()/signal_shutdown()for immediate worker wake-up on shutdown.scripts/select_e2e_shard.py: greedy weight-based shard assignment that falls back to local AST-based test counting for unknown files.tests/support/retry.py: bounded retry helper for e2e eventual-consistency reads..github/workflows/ci.yml:unit-testsjob runs across Python 3.10–3.14;e2e-testsjob runs two mechanical shards + live-provider suite; Langfuse server bootstrapped viaLANGFUSE_INIT_*env vars instead of a manual seeder script.Confidence Score: 5/5
Safe to merge — all findings are P2 style/improvement items with no production correctness risk under default configuration
The core source changes (sentinel-based shutdown, RLock thread-safety, prompt cache freshness guard) are sound and well-tested. The multi-consumer sentinel issue only affects threads > 1 which is not the default. The retry edge case requires a specific timing race that is unlikely in practice. All remaining findings are style, robustness improvements, or minor logic nuances — none block correctness on the happy path.
tests/support/retry.py (stale-error edge case), langfuse/_utils/prompt_cache.py (multi-consumer shutdown), scripts/select_e2e_shard.py (class-based test counting)
Important Files Changed
Flowchart
%%{init: {'theme': 'neutral'}}%% flowchart TD PR[Pull Request / Push] --> C{Concurrency group} C -->|cancel-in-progress| LINT[linting job] C --> TC[type-checking job] C --> UNIT[unit-tests\nPython 3.10–3.14 matrix] C --> E2E[e2e-tests matrix] E2E --> S1[Shard 1\nselect_e2e_shard --shard-index 0] E2E --> S2[Shard 2\nselect_e2e_shard --shard-index 1] E2E --> LP[live_provider suite] S1 --> PAR1[pytest -n 4 --dist worksteal\nnot serial_e2e] S1 --> SER1[pytest serial_e2e] S2 --> PAR2[pytest -n 4 --dist worksteal\nnot serial_e2e] S2 --> SER2[pytest serial_e2e] LP --> LPT[pytest -n 4 --dist worksteal\ntests/live_provider -m live_provider] UNIT --> |pytest -n auto --dist worksteal| UT[tests/unit 360 tests] LINT --> GATE[all-tests-passed] TC --> GATE UNIT --> GATE S1 --> GATE S2 --> GATE LP --> GATE GATE --> MERGE[Branch protection check]Reviews (1): Last reviewed commit: "fix(prompt-cache): avoid redundant refre..." | Re-trigger Greptile
Context used:
Learnt From
langfuse/langfuse-python#1387