From f9ad0524b62d23fe3fc0fd59cad9da9301c34e1c Mon Sep 17 00:00:00 2001 From: Yorick van Pelt Date: Thu, 2 Apr 2026 13:45:44 +0200 Subject: [PATCH 1/3] fix: add close and aclose methods to GeneratorWrapper Currently, close and aclose methods are missing on GeneratorWrapper, making it impossible to close generators from client code without accessing the .generator directly, and missing span uploads in the process. This commit adds the missing methods for this usecase. --- langfuse/_client/observe.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/langfuse/_client/observe.py b/langfuse/_client/observe.py index c648a0a62..75d79af21 100644 --- a/langfuse/_client/observe.py +++ b/langfuse/_client/observe.py @@ -593,6 +593,17 @@ def __next__(self) -> Any: raise + def close(self) -> None: + output: Any = self.items + + if self.transform_fn is not None: + output = self.transform_fn(self.items) + elif all(isinstance(item, str) for item in self.items): + output = "".join(self.items) + + self.span.update(output=output).end() + self.generator.close() + class _ContextPreservedAsyncGeneratorWrapper: """Async generator wrapper that ensures each iteration runs in preserved context.""" @@ -659,3 +670,14 @@ async def __anext__(self) -> Any: ).end() raise + + async def aclose(self) -> None: + output: Any = self.items + + if self.transform_fn is not None: + output = self.transform_fn(self.items) + elif all(isinstance(item, str) for item in self.items): + output = "".join(self.items) + + self.span.update(output=output).end() + await self.generator.aclose() From 3530d3bee838810fae01308fa2cc024a976b0b0c Mon Sep 17 00:00:00 2001 From: Yorick van Pelt Date: Thu, 2 Apr 2026 13:48:48 +0200 Subject: [PATCH 2/3] fix: add _end_span method to prevent double-ending generators --- langfuse/_client/observe.py | 88 ++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 40 deletions(-) diff --git a/langfuse/_client/observe.py b/langfuse/_client/observe.py index 75d79af21..dc329ae4a 100644 --- a/langfuse/_client/observe.py +++ b/langfuse/_client/observe.py @@ -560,6 +560,26 @@ def __init__( self.items: List[Any] = [] self.span = span self.transform_fn = transform_fn + self._ended = False + + def _end_span( + self, *, level: Optional[str] = None, status_message: Optional[str] = None + ) -> None: + if self._ended: + return + self._ended = True + + output: Any = self.items + + if self.transform_fn is not None: + output = self.transform_fn(self.items) + elif all(isinstance(item, str) for item in self.items): + output = "".join(self.items) + + if level is not None: + self.span.update(output=output, level=level, status_message=status_message).end() + else: + self.span.update(output=output).end() def __iter__(self) -> "_ContextPreservedSyncGeneratorWrapper": return self @@ -573,35 +593,19 @@ def __next__(self) -> Any: return item except StopIteration: - # Handle output and span cleanup when generator is exhausted - output: Any = self.items - - if self.transform_fn is not None: - output = self.transform_fn(self.items) - - elif all(isinstance(item, str) for item in self.items): - output = "".join(self.items) - - self.span.update(output=output).end() + self._end_span() raise # Re-raise StopIteration except (Exception, asyncio.CancelledError) as e: - self.span.update( + self._end_span( level="ERROR", status_message=str(e) or type(e).__name__ - ).end() + ) raise def close(self) -> None: - output: Any = self.items - - if self.transform_fn is not None: - output = self.transform_fn(self.items) - elif all(isinstance(item, str) for item in self.items): - output = "".join(self.items) - - self.span.update(output=output).end() + self._end_span() self.generator.close() @@ -630,6 +634,26 @@ def __init__( self.items: List[Any] = [] self.span = span self.transform_fn = transform_fn + self._ended = False + + def _end_span( + self, *, level: Optional[str] = None, status_message: Optional[str] = None + ) -> None: + if self._ended: + return + self._ended = True + + output: Any = self.items + + if self.transform_fn is not None: + output = self.transform_fn(self.items) + elif all(isinstance(item, str) for item in self.items): + output = "".join(self.items) + + if level is not None: + self.span.update(output=output, level=level, status_message=status_message).end() + else: + self.span.update(output=output).end() def __aiter__(self) -> "_ContextPreservedAsyncGeneratorWrapper": return self @@ -652,32 +676,16 @@ async def __anext__(self) -> Any: return item except StopAsyncIteration: - # Handle output and span cleanup when generator is exhausted - output: Any = self.items - - if self.transform_fn is not None: - output = self.transform_fn(self.items) - - elif all(isinstance(item, str) for item in self.items): - output = "".join(self.items) - - self.span.update(output=output).end() + self._end_span() raise # Re-raise StopAsyncIteration except (Exception, asyncio.CancelledError) as e: - self.span.update( + self._end_span( level="ERROR", status_message=str(e) or type(e).__name__ - ).end() + ) raise async def aclose(self) -> None: - output: Any = self.items - - if self.transform_fn is not None: - output = self.transform_fn(self.items) - elif all(isinstance(item, str) for item in self.items): - output = "".join(self.items) - - self.span.update(output=output).end() + self._end_span() await self.generator.aclose() From 69a592bcb8f6c1cb6022813ea2134abb09587673 Mon Sep 17 00:00:00 2001 From: Yorick van Pelt Date: Thu, 2 Apr 2026 13:57:15 +0200 Subject: [PATCH 3/3] Preserve the context for close methods --- langfuse/_client/observe.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/langfuse/_client/observe.py b/langfuse/_client/observe.py index dc329ae4a..250a15a56 100644 --- a/langfuse/_client/observe.py +++ b/langfuse/_client/observe.py @@ -605,8 +605,16 @@ def __next__(self) -> Any: raise def close(self) -> None: - self._end_span() - self.generator.close() + tokens = [] + try: + if self.context: + for var, value in self.context.items(): + tokens.append((var, var.set(value))) + self._end_span() + self.generator.close() + finally: + for var, token in tokens: + var.reset(token) class _ContextPreservedAsyncGeneratorWrapper: @@ -687,5 +695,13 @@ async def __anext__(self) -> Any: raise async def aclose(self) -> None: - self._end_span() - await self.generator.aclose() + tokens = [] + try: + if self.context: + for var, value in self.context.items(): + tokens.append((var, var.set(value))) + self._end_span() + await self.generator.aclose() + finally: + for var, token in tokens: + var.reset(token)