11import re
2-
32import openai
43import requests
4+
55from fastapi import APIRouter , BackgroundTasks , Depends
66from openai import OpenAI
77from sqlmodel import Session
8+ from langfuse .decorators import observe , langfuse_context
89
910from app .api .deps import get_current_user_org , get_db
1011from app .core import logging , settings
@@ -55,10 +56,10 @@ def validate_thread(client: OpenAI, thread_id: str) -> tuple[bool, str]:
5556 return False , f"Invalid thread ID provided { thread_id } "
5657
5758
59+ @observe (capture_input = False )
5860def setup_thread (client : OpenAI , request : dict ) -> tuple [bool , str ]:
5961 """Set up thread and add message, either creating new or using existing."""
6062 thread_id = request .get ("thread_id" )
61-
6263 if thread_id :
6364 try :
6465 client .beta .threads .messages .create (
@@ -74,6 +75,9 @@ def setup_thread(client: OpenAI, request: dict) -> tuple[bool, str]:
7475 thread_id = thread .id , role = "user" , content = request ["question" ]
7576 )
7677 request ["thread_id" ] = thread .id
78+ langfuse_context .update_current_trace (
79+ session_id = thread .id , name = "New Thread ID created" , output = thread .id
80+ )
7781 return True , None
7882 except openai .OpenAIError as e :
7983 return False , handle_openai_error (e )
@@ -109,21 +113,39 @@ def create_success_response(request: dict, message: str) -> APIResponse:
109113 )
110114
111115
116+ @observe (as_type = "generation" )
112117def process_run (request : dict , client : OpenAI ):
113118 """Process a run and send callback with results."""
114119 try :
115120 run = client .beta .threads .runs .create_and_poll (
116121 thread_id = request ["thread_id" ],
117122 assistant_id = request ["assistant_id" ],
118123 )
124+ langfuse_context .update_current_trace (
125+ session_id = request ["thread_id" ],
126+ input = request ["question" ],
127+ name = "Thread Run Started" ,
128+ )
119129
120130 if run .status == "completed" :
131+ langfuse_context .update_current_observation (
132+ model = run .model ,
133+ usage_details = {
134+ "prompt_tokens" : run .usage .prompt_tokens ,
135+ "completion_tokens" : run .usage .completion_tokens ,
136+ "total_tokens" : run .usage .total_tokens ,
137+ },
138+ )
121139 messages = client .beta .threads .messages .list (thread_id = request ["thread_id" ])
122140 latest_message = messages .data [0 ]
123141 message_content = latest_message .content [0 ].text .value
124142 message = process_message_content (
125143 message_content , request .get ("remove_citation" , False )
126144 )
145+ langfuse_context .update_current_trace (
146+ output = message , name = "Thread Run Completed"
147+ )
148+
127149 callback_response = create_success_response (request , message )
128150 else :
129151 callback_response = APIResponse .failure_response (
@@ -146,7 +168,11 @@ async def threads(
146168):
147169 """Asynchronous endpoint that processes requests in background."""
148170 client = OpenAI (api_key = settings .OPENAI_API_KEY )
149-
171+ langfuse_context .configure (
172+ secret_key = settings .LANGFUSE_SECRET_KEY ,
173+ public_key = settings .LANGFUSE_PUBLIC_KEY ,
174+ host = settings .LANGFUSE_HOST ,
175+ )
150176 # Validate thread
151177 is_valid , error_message = validate_thread (client , request .get ("thread_id" ))
152178 if not is_valid :
0 commit comments