@@ -56,9 +56,7 @@ def traced_method(
5656 prompts = kwargs .get ("messages" , [])
5757 system = kwargs .get ("system" )
5858 if system :
59- prompts = [{"role" : "system" , "content" : system }] + kwargs .get (
60- "messages" , []
61- )
59+ prompts .append ({"role" : "system" , "content" : system })
6260 span_attributes = {
6361 ** get_langtrace_attributes (version , service_provider ),
6462 ** get_llm_request_attributes (kwargs , prompts = prompts ),
@@ -72,7 +70,14 @@ def traced_method(
7270 span = tracer .start_span (
7371 name = get_span_name (APIS ["MESSAGES_CREATE" ]["METHOD" ]), kind = SpanKind .CLIENT
7472 )
73+
7574 set_span_attributes (span , attributes )
75+
76+ tools = []
77+ if kwargs .get ("tools" ) is not None and kwargs .get ("tools" ):
78+ tools .append (json .dumps (kwargs .get ("tools" )))
79+ set_span_attribute (span , SpanAttributes .LLM_TOOLS , json .dumps (tools ))
80+
7681 try :
7782 # Attempt to call the original method
7883 result = wrapped (* args , ** kwargs )
@@ -127,7 +132,149 @@ def set_response_attributes(
127132 span .end ()
128133 return result
129134 else :
130- return StreamWrapper (result , span )
135+ return StreamWrapper (result , span , tool_calls = True )
131136
132137 # return the wrapped method
133138 return traced_method
139+
140+
141+ def messages_stream (version : str , tracer : Tracer ) -> Callable [..., Any ]:
142+
143+ def traced_method (
144+ wrapped : Callable [..., Any ],
145+ instance : Any ,
146+ args : List [Any ],
147+ kwargs : MessagesCreateKwargs ,
148+ ) -> Any :
149+ service_provider = SERVICE_PROVIDERS ["ANTHROPIC" ]
150+
151+ prompts = kwargs .get ("messages" , [])
152+ system = kwargs .get ("system" )
153+ if system :
154+ prompts .append ({"role" : "assistant" , "content" : system })
155+ span_attributes = {
156+ ** get_langtrace_attributes (version , service_provider ),
157+ ** get_llm_request_attributes (kwargs , prompts = prompts ),
158+ ** get_llm_url (instance ),
159+ SpanAttributes .LLM_PATH : APIS ["MESSAGES_STREAM" ]["ENDPOINT" ],
160+ ** get_extra_attributes (),
161+ }
162+
163+ attributes = LLMSpanAttributes (** span_attributes )
164+
165+ span = tracer .start_span (
166+ name = get_span_name (APIS ["MESSAGES_STREAM" ]["METHOD" ]), kind = SpanKind .CLIENT
167+ )
168+
169+ set_span_attributes (span , attributes )
170+
171+ tools = []
172+ if kwargs .get ("tools" ) is not None :
173+ tools .append (json .dumps (kwargs .get ("tools" )))
174+ set_span_attribute (span , SpanAttributes .LLM_TOOLS , json .dumps (tools ))
175+
176+ try :
177+ # Create the original message stream manager
178+ original_stream_manager = wrapped (* args , ** kwargs )
179+
180+ # Create a new stream manager that will instrument the stream
181+ # while preserving the stream
182+ class InstrumentedMessageStreamManager :
183+ def __init__ (self , original_manager , span ):
184+ self .original_manager = original_manager
185+ self .span = span
186+
187+ def __enter__ (self ):
188+ # Enter the original context manager to get the stream
189+ original_stream = self .original_manager .__enter__ ()
190+
191+ # Create a wrapper iterator
192+ class InstrumentedStream :
193+ def __init__ (self , original_stream , span ):
194+ self .original_stream = original_stream
195+ self .span = span
196+ self .message_stop_processed = False
197+
198+ def __iter__ (self ):
199+ return self
200+
201+ def __next__ (self ):
202+ try :
203+ chunk = next (self .original_stream )
204+
205+ # Apply instrumentation only once on message_stop
206+ if chunk .type == "message_stop" and not self .message_stop_processed :
207+ self .message_stop_processed = True
208+ response_message = chunk .message
209+
210+ responses = [
211+ {
212+ "role" : (
213+ response_message .role
214+ if response_message .role
215+ else "assistant"
216+ ),
217+ "content" : message .text ,
218+ }
219+ for message in response_message .content if message .type == "text"
220+ ]
221+
222+ set_event_completion (self .span , responses )
223+
224+ if hasattr (response_message , "usage" ) and response_message .usage is not None :
225+ set_span_attribute (
226+ self .span ,
227+ SpanAttributes .LLM_USAGE_PROMPT_TOKENS ,
228+ response_message .usage .input_tokens ,
229+ )
230+ set_span_attribute (
231+ self .span ,
232+ SpanAttributes .LLM_USAGE_COMPLETION_TOKENS ,
233+ response_message .usage .output_tokens ,
234+ )
235+ set_span_attribute (
236+ self .span ,
237+ SpanAttributes .LLM_USAGE_TOTAL_TOKENS ,
238+ response_message .usage .input_tokens + response_message .usage .output_tokens ,
239+ )
240+
241+ # Forward the chunk
242+ return chunk
243+ except StopIteration :
244+ # End the span when we're done with the stream
245+ self .span .end ()
246+ raise
247+ except Exception as err :
248+ self .span .record_exception (err )
249+ self .span .set_status (StatusCode .ERROR , str (err ))
250+ self .span .end ()
251+ raise
252+
253+ def close (self ):
254+ self .original_stream .close ()
255+ if not self .message_stop_processed :
256+ self .span .end ()
257+
258+ # Return our instrumented stream wrapper
259+ return InstrumentedStream (original_stream , self .span )
260+
261+ def __exit__ (self , exc_type , exc_val , exc_tb ):
262+ result = self .original_manager .__exit__ (exc_type , exc_val , exc_tb )
263+
264+ if exc_type is not None :
265+ self .span .record_exception (exc_val )
266+ self .span .set_status (StatusCode .ERROR , str (exc_val ))
267+ self .span .end ()
268+
269+ return result
270+
271+ # Return the instrumented stream manager
272+ return InstrumentedMessageStreamManager (original_stream_manager , span )
273+
274+ except Exception as err :
275+ span .record_exception (err )
276+ span .set_status (StatusCode .ERROR , str (err ))
277+ span .end ()
278+ raise
279+
280+ return traced_method
0 commit comments