@@ -84,16 +84,6 @@ def __init__(
8484 """Initialize service workflow executor."""
8585 self .llm_manager = llm_manager
8686 self .orchestration_service = orchestration_service
87- self ._qdrant_client : Optional [httpx .AsyncClient ] = None
88-
89- async def _get_qdrant_client (self ) -> httpx .AsyncClient :
90- """Get or create Qdrant HTTP client (lazy initialization)."""
91- if self ._qdrant_client is None :
92- qdrant_url = f"http://{ QDRANT_HOST } :{ QDRANT_PORT } "
93- self ._qdrant_client = httpx .AsyncClient (
94- base_url = qdrant_url , timeout = QDRANT_TIMEOUT
95- )
96- return self ._qdrant_client
9787
9888 async def _semantic_search_services (
9989 self ,
@@ -102,7 +92,11 @@ async def _semantic_search_services(
10292 chat_id : str ,
10393 top_k : int = SEMANTIC_SEARCH_TOP_K ,
10494 ) -> Optional [List [Dict [str , Any ]]]:
105- """Search services using semantic search via Qdrant."""
95+ """Search services using semantic search via Qdrant.
96+
97+ Creates a new httpx.AsyncClient per request to ensure proper resource cleanup.
98+ This is safe and efficient since semantic search is infrequent (only when many services exist).
99+ """
106100 if not self .orchestration_service :
107101 logger .error (
108102 f"[{ chat_id } ] Semantic search unavailable: orchestration service not provided"
@@ -125,73 +119,76 @@ async def _semantic_search_services(
125119
126120 query_embedding = embeddings [0 ]
127121
128- # Verify collection exists and has data
129- client = await self ._get_qdrant_client ()
130-
131- try :
132- collection_info = await client .get (f"/collections/{ QDRANT_COLLECTION } " )
133- if collection_info .status_code == 200 :
134- info = collection_info .json ()
135- points_count = info .get ("result" , {}).get ("points_count" , 0 )
136- if points_count == 0 :
137- logger .error (f"[{ chat_id } ] Collection is empty" )
138- return None
139- except Exception as e :
140- logger .warning (f"[{ chat_id } ] Could not verify collection: { e } " )
141-
142- # Search Qdrant collection
143- client = await self ._get_qdrant_client ()
144-
145- search_payload = {
146- "vector" : query_embedding ,
147- "limit" : top_k ,
148- "score_threshold" : SEMANTIC_SEARCH_THRESHOLD ,
149- "with_payload" : True ,
150- }
151-
152- response = await client .post (
153- f"/collections/{ QDRANT_COLLECTION } /points/search" ,
154- json = search_payload ,
155- )
122+ # Create Qdrant client with proper resource cleanup via context manager
123+ qdrant_url = f"http://{ QDRANT_HOST } :{ QDRANT_PORT } "
124+ async with httpx .AsyncClient (
125+ base_url = qdrant_url , timeout = QDRANT_TIMEOUT
126+ ) as client :
127+ # Verify collection exists and has data
128+ try :
129+ collection_info = await client .get (
130+ f"/collections/{ QDRANT_COLLECTION } "
131+ )
132+ if collection_info .status_code == 200 :
133+ info = collection_info .json ()
134+ points_count = info .get ("result" , {}).get ("points_count" , 0 )
135+ if points_count == 0 :
136+ logger .error (f"[{ chat_id } ] Collection is empty" )
137+ return None
138+ except Exception as e :
139+ logger .warning (f"[{ chat_id } ] Could not verify collection: { e } " )
140+
141+ # Search Qdrant collection
142+ search_payload = {
143+ "vector" : query_embedding ,
144+ "limit" : top_k ,
145+ "score_threshold" : SEMANTIC_SEARCH_THRESHOLD ,
146+ "with_payload" : True ,
147+ }
156148
157- if response . status_code != 200 :
158- logger . error (
159- f"[ { chat_id } ] Qdrant search failed: HTTP { response . status_code } "
149+ response = await client . post (
150+ f"/collections/ { QDRANT_COLLECTION } /points/search" ,
151+ json = search_payload ,
160152 )
161- return None
162153
163- search_results = response .json ()
164- points = search_results .get ("result" , [])
165-
166- if len (points ) == 0 :
167- logger .warning (
168- f"[{ chat_id } ] No services matched (threshold={ SEMANTIC_SEARCH_THRESHOLD } )"
169- )
170- return None
154+ if response .status_code != 200 :
155+ logger .error (
156+ f"[{ chat_id } ] Qdrant search failed: HTTP { response .status_code } "
157+ )
158+ return None
171159
172- # Transform Qdrant results to service format
173- services : List [Dict [str , Any ]] = []
174- for point in points :
175- payload = point .get ("payload" , {})
176- score = float (point .get ("score" , 0 ))
177-
178- service = {
179- "serviceId" : payload .get ("service_id" ),
180- "service_id" : payload .get ("service_id" ),
181- "name" : payload .get ("name" ),
182- "description" : payload .get ("description" ),
183- "examples" : payload .get ("examples" , []),
184- "entities" : payload .get ("entities" , []),
185- # Note: endpoint not stored in intent_collections,
186- # will be resolved via database lookup if needed
187- "similarity_score" : score ,
188- }
189- services .append (service )
160+ search_results = response .json ()
161+ points = search_results .get ("result" , [])
190162
191- logger .info (
192- f"[{ chat_id } ] Found { len (services )} services via semantic search"
193- )
194- return services
163+ if len (points ) == 0 :
164+ logger .warning (
165+ f"[{ chat_id } ] No services matched (threshold={ SEMANTIC_SEARCH_THRESHOLD } )"
166+ )
167+ return None
168+
169+ # Transform Qdrant results to service format
170+ services : List [Dict [str , Any ]] = []
171+ for point in points :
172+ payload = point .get ("payload" , {})
173+ score = float (point .get ("score" , 0 ))
174+
175+ service = {
176+ "serviceId" : payload .get ("service_id" ),
177+ "service_id" : payload .get ("service_id" ),
178+ "name" : payload .get ("name" ),
179+ "description" : payload .get ("description" ),
180+ "examples" : payload .get ("examples" , []),
181+ "entities" : payload .get ("entities" , []),
182+ # Note: endpoint not stored in intent_collections,
183+ # will be resolved via database lookup if needed
184+ "similarity_score" : score ,
185+ }
186+ services .append (service )
187+
188+ logger .info (
189+ f"[{ chat_id } ] Found { len (services )} services via semantic search"
190+ )
191+ return services
195192
196193 except Exception as e :
197194 logger .error (f"[{ chat_id } ] Semantic search failed: { e } " , exc_info = True )
@@ -291,6 +288,53 @@ def _validate_detected_service(
291288 )
292289 return None
293290
291+ async def _process_intent_detection (
292+ self ,
293+ services : List [Dict [str , Any ]],
294+ request : OrchestrationRequest ,
295+ chat_id : str ,
296+ context : Dict [str , Any ],
297+ costs_dict : Dict [str , Dict [str , Any ]],
298+ ) -> None :
299+ """Detect intent, validate service, and populate context.
300+
301+ This helper method encapsulates the common logic of:
302+ 1. Calling intent detection (LLM)
303+ 2. Tracking costs
304+ 3. Validating matched service
305+ 4. Populating context with service metadata
306+
307+ Args:
308+ services: List of services to match against
309+ request: Orchestration request
310+ chat_id: Chat ID for logging
311+ context: Context dict to populate with results
312+ costs_dict: Dictionary to track LLM costs
313+ """
314+ intent_result , intent_usage = await self ._detect_service_intent (
315+ user_query = request .message ,
316+ services = services ,
317+ conversation_history = request .conversationHistory ,
318+ chat_id = chat_id ,
319+ )
320+ costs_dict ["intent_detection" ] = intent_usage
321+
322+ if intent_result and intent_result .get ("matched_service_id" ):
323+ service_id = intent_result ["matched_service_id" ]
324+ logger .info (f"[{ chat_id } ] Matched: { service_id } " )
325+
326+ validated_service = self ._validate_detected_service (
327+ matched_service_id = service_id ,
328+ services = services ,
329+ chat_id = chat_id ,
330+ )
331+
332+ if validated_service :
333+ context ["service_id" ] = service_id
334+ context ["confidence" ] = intent_result .get ("confidence" , 0.0 )
335+ context ["entities" ] = intent_result .get ("entities" , {})
336+ context ["service_data" ] = validated_service
337+
294338 def _extract_service_metadata (
295339 self , context : Dict [str , Any ], chat_id : str
296340 ) -> Optional [Dict [str , Any ]]:
@@ -478,56 +522,24 @@ async def _log_request_details(
478522 services = []
479523
480524 if services :
481- intent_result , intent_usage = await self ._detect_service_intent (
482- user_query = request .message ,
525+ await self ._process_intent_detection (
483526 services = services ,
484- conversation_history = request . conversationHistory ,
527+ request = request ,
485528 chat_id = chat_id ,
529+ context = context ,
530+ costs_dict = costs_dict ,
486531 )
487- costs_dict ["intent_detection" ] = intent_usage
488-
489- if intent_result and intent_result .get ("matched_service_id" ):
490- service_id = intent_result ["matched_service_id" ]
491- logger .info (f"[{ chat_id } ] Matched: { service_id } " )
492-
493- validated_service = self ._validate_detected_service (
494- matched_service_id = service_id ,
495- services = services ,
496- chat_id = chat_id ,
497- )
498-
499- if validated_service :
500- context ["service_id" ] = service_id
501- context ["confidence" ] = intent_result .get ("confidence" , 0.0 )
502- context ["entities" ] = intent_result .get ("entities" , {})
503- context ["service_data" ] = validated_service
504532 else :
505533 services = response_data .get ("services" , [])
506534
507535 if services :
508- intent_result , intent_usage = await self ._detect_service_intent (
509- user_query = request .message ,
536+ await self ._process_intent_detection (
510537 services = services ,
511- conversation_history = request . conversationHistory ,
538+ request = request ,
512539 chat_id = chat_id ,
540+ context = context ,
541+ costs_dict = costs_dict ,
513542 )
514- costs_dict ["intent_detection" ] = intent_usage
515-
516- if intent_result and intent_result .get ("matched_service_id" ):
517- service_id = intent_result ["matched_service_id" ]
518- logger .info (f"[{ chat_id } ] Matched: { service_id } " )
519-
520- validated_service = self ._validate_detected_service (
521- matched_service_id = service_id ,
522- services = services ,
523- chat_id = chat_id ,
524- )
525-
526- if validated_service :
527- context ["service_id" ] = service_id
528- context ["confidence" ] = intent_result .get ("confidence" , 0.0 )
529- context ["entities" ] = intent_result .get ("entities" , {})
530- context ["service_data" ] = validated_service
531543 else :
532544 logger .warning (f"[{ chat_id } ] Service discovery failed" )
533545
0 commit comments