Skip to content

Commit 3fac83b

Browse files
Kasper JungeRalphify
authored andcommitted
refactor: unify inherit and capture branches in _run_agent_blocking
The blocking execution path had two nearly-identical branches that duplicated the Popen β†’ writer thread β†’ wait β†’ timeout β†’ cleanup sequence. Merge them into a single flow that conditionally sets stdout/stderr to PIPE based on whether any subscriber needs the output, eliminating ~30 lines of duplicated lifecycle logic. Co-authored-by: Ralphify <noreply@ralphify.co>
1 parent c519266 commit 3fac83b

2 files changed

Lines changed: 37 additions & 70 deletions

File tree

β€Žsrc/ralphify/_agent.pyβ€Ž

Lines changed: 35 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -518,9 +518,7 @@ def _start_writer_thread(
518518
``Thread(…, daemon=True) / .start()`` boilerplate across the streaming and
519519
blocking execution paths.
520520
"""
521-
thread = threading.Thread(
522-
target=_deliver_prompt, args=(proc, prompt), daemon=True
523-
)
521+
thread = threading.Thread(target=_deliver_prompt, args=(proc, prompt), daemon=True)
524522
thread.start()
525523
return thread
526524

@@ -602,17 +600,18 @@ def _run_agent_blocking(
602600
) -> AgentResult:
603601
"""Run the agent subprocess and return the result.
604602
605-
Uses a three-way capture strategy:
603+
Conditionally pipes stdout/stderr based on whether any subscriber
604+
needs the output:
606605
607-
1. **Inherit** (``on_output_line is None and log_path_dir is None``) β€”
608-
stdout/stderr are not piped; the child writes directly to the
609-
parent's file descriptors. No reader threads, no buffering.
610-
2. **Callback only** (``on_output_line`` set, no log dir) β€” reader
611-
threads forward lines to the callback without accumulating them,
612-
avoiding unbounded memory growth.
613-
3. **Log capture** (``log_path_dir`` set) β€” reader threads accumulate
614-
lines into lists for log writing; lines are also forwarded to the
615-
callback if provided.
606+
- **Inherit** (``on_output_line is None and log_path_dir is None``) β€”
607+
stdout/stderr are not piped; the child writes directly to the
608+
parent's file descriptors. No reader threads, no buffering.
609+
- **Callback only** (``on_output_line`` set, no log dir) β€” reader
610+
threads forward lines to the callback without accumulating them,
611+
avoiding unbounded memory growth.
612+
- **Log capture** (``log_path_dir`` set) β€” reader threads accumulate
613+
lines into lists for log writing; lines are also forwarded to the
614+
callback if provided.
616615
617616
The subprocess is started in its own process group so that on
618617
``KeyboardInterrupt`` or timeout the entire child tree can be killed
@@ -624,72 +623,41 @@ def _run_agent_blocking(
624623
start = time.monotonic()
625624
capture = log_path_dir is not None or on_output_line is not None
626625

627-
if not capture:
628-
# ── Inherit path ─────────────────────────────────────────
629-
# No subscriber needs the bytes β€” let the child write directly
630-
# to the terminal. Avoids silent output loss when the user
631-
# pipes ralph's output (e.g. ``ralph run | cat``).
632-
returncode: int | None = None
633-
timed_out = False
634-
writer_thread: threading.Thread | None = None
635-
636-
proc = subprocess.Popen(
637-
cmd,
638-
stdin=subprocess.PIPE,
639-
**SUBPROCESS_TEXT_KWARGS,
640-
**SESSION_KWARGS,
641-
)
642-
try:
643-
if proc.stdin is None:
644-
raise RuntimeError("subprocess.Popen failed to create PIPE stdin")
645-
646-
writer_thread = _start_writer_thread(proc, prompt)
647-
648-
try:
649-
returncode = proc.wait(timeout=timeout)
650-
except subprocess.TimeoutExpired:
651-
_ensure_process_dead(proc)
652-
timed_out = True
653-
finally:
654-
_cleanup_agent(proc, writer_thread)
655-
656-
return AgentResult(
657-
returncode=None if timed_out else returncode,
658-
elapsed=time.monotonic() - start,
659-
log_file=None,
660-
timed_out=timed_out,
661-
)
662-
663-
# ── Capture path ─────────────────────────────────────────────
664-
# Reader threads drain stdout/stderr concurrently. Lines are only
665-
# accumulated into buffers when a log file will be written; otherwise
666-
# the callback alone observes them, avoiding unbounded memory growth.
667-
returncode = None
626+
# When no subscriber needs the bytes, stdout/stderr are left
627+
# un-piped so the child writes directly to the terminal. When
628+
# capture is needed, reader threads drain stdout/stderr
629+
# concurrently. Lines are only accumulated into buffers when a
630+
# log file will be written; otherwise the callback alone observes
631+
# them, avoiding unbounded memory growth.
632+
returncode: int | None = None
668633
timed_out = False
669634
writer_thread: threading.Thread | None = None
670-
stdout_lines: list[str] | None = [] if log_path_dir is not None else None
671-
stderr_lines: list[str] | None = [] if log_path_dir is not None else None
672635
stdout_thread: threading.Thread | None = None
673636
stderr_thread: threading.Thread | None = None
637+
stdout_lines: list[str] | None = [] if log_path_dir is not None else None
638+
stderr_lines: list[str] | None = [] if log_path_dir is not None else None
674639

640+
pipe = subprocess.PIPE if capture else None
675641
proc = subprocess.Popen(
676642
cmd,
677643
stdin=subprocess.PIPE,
678-
stdout=subprocess.PIPE,
679-
stderr=subprocess.PIPE,
644+
stdout=pipe,
645+
stderr=pipe,
680646
**SUBPROCESS_TEXT_KWARGS,
681647
**SESSION_KWARGS,
682648
)
683649
try:
684-
if proc.stdin is None or proc.stdout is None or proc.stderr is None:
685-
raise RuntimeError("subprocess.Popen failed to create PIPE streams")
686-
687-
stdout_thread = _start_pump_thread(
688-
proc.stdout, stdout_lines, _STDOUT, on_output_line
689-
)
690-
stderr_thread = _start_pump_thread(
691-
proc.stderr, stderr_lines, _STDERR, on_output_line
692-
)
650+
if proc.stdin is None:
651+
raise RuntimeError("subprocess.Popen failed to create PIPE stdin")
652+
if capture:
653+
if proc.stdout is None or proc.stderr is None:
654+
raise RuntimeError("subprocess.Popen failed to create PIPE streams")
655+
stdout_thread = _start_pump_thread(
656+
proc.stdout, stdout_lines, _STDOUT, on_output_line
657+
)
658+
stderr_thread = _start_pump_thread(
659+
proc.stderr, stderr_lines, _STDERR, on_output_line
660+
)
693661

694662
writer_thread = _start_writer_thread(proc, prompt)
695663

@@ -703,7 +671,6 @@ def _run_agent_blocking(
703671

704672
stdout = "".join(stdout_lines) if stdout_lines is not None else None
705673
stderr = "".join(stderr_lines) if stderr_lines is not None else None
706-
707674
log_file = _write_log(log_path_dir, iteration, stdout, stderr)
708675

709676
return AgentResult(

β€Žtests/test_agent.pyβ€Ž

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,8 +1098,8 @@ def test_no_pipe_when_no_log_no_callback(self, mock_popen):
10981098
)
10991099

11001100
call_kwargs = mock_popen.call_args[1]
1101-
assert "stdout" not in call_kwargs
1102-
assert "stderr" not in call_kwargs
1101+
assert call_kwargs.get("stdout") is None
1102+
assert call_kwargs.get("stderr") is None
11031103
assert result.returncode == 0
11041104
assert result.log_file is None
11051105

0 commit comments

Comments
Β (0)