From 1ecad39e8662c2c2b28e905ca0f795bfce707ba8 Mon Sep 17 00:00:00 2001 From: Charith Nuwan Bimsara <59943919+nuwangeek@users.noreply.github.com> Date: Sat, 21 Feb 2026 17:53:28 +0530 Subject: [PATCH] Intent enrichment pipeline (#319) * prompt coniguration backend to be testing * custom prompt configuration update and fixed Pyright issues * fixed copilot reviews * pre validation step added when user query is inserted * added more validation cases * fixed review comments * implement tool classification orchestration agent skeleton * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fixed copilot suggested changes * fixed issue * data enrichment pipeline for service module partially completed * complete error handling * added intent enrichment pipeline * remove unwanted file * updated changes * fixed requested changes * fixed issue --------- Co-authored-by: Thiru Dinesh <56014038+Thirunayan22@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Thiru Dinesh Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: erangi-ar <111747955+erangi-ar@users.noreply.github.com> --- DSL/CronManager/DSL/service_enrichment.yml | 5 + DSL/CronManager/script/service_enrichment.sh | 103 ++++++++++ .../rag-search/POST/services/enrich.yml | 120 +++++++++++ docker-compose-ec2.yml | 3 +- docker-compose.yml | 3 +- src/intent_data_enrichment/__init__.py | 22 ++ src/intent_data_enrichment/api_client.py | 191 ++++++++++++++++++ src/intent_data_enrichment/constants.py | 46 +++++ src/intent_data_enrichment/main_enrichment.py | 187 +++++++++++++++++ src/intent_data_enrichment/models.py | 44 ++++ src/intent_data_enrichment/qdrant_manager.py | 182 +++++++++++++++++ 11 files changed, 904 insertions(+), 2 deletions(-) create mode 100644 DSL/CronManager/DSL/service_enrichment.yml create mode 100644 DSL/CronManager/script/service_enrichment.sh create mode 100644 DSL/Ruuter.public/rag-search/POST/services/enrich.yml create mode 100644 src/intent_data_enrichment/__init__.py create mode 100644 src/intent_data_enrichment/api_client.py create mode 100644 src/intent_data_enrichment/constants.py create mode 100644 src/intent_data_enrichment/main_enrichment.py create mode 100644 src/intent_data_enrichment/models.py create mode 100644 src/intent_data_enrichment/qdrant_manager.py diff --git a/DSL/CronManager/DSL/service_enrichment.yml b/DSL/CronManager/DSL/service_enrichment.yml new file mode 100644 index 00000000..b422dfc8 --- /dev/null +++ b/DSL/CronManager/DSL/service_enrichment.yml @@ -0,0 +1,5 @@ +enrich_and_index: + trigger: off + type: exec + command: "/app/scripts/service_enrichment.sh" + allowedEnvs: ['service_id', 'name', 'description', 'examples', 'entities', 'ruuter_type', 'current_state', 'is_common'] diff --git a/DSL/CronManager/script/service_enrichment.sh b/DSL/CronManager/script/service_enrichment.sh new file mode 100644 index 00000000..c50a490a --- /dev/null +++ b/DSL/CronManager/script/service_enrichment.sh @@ -0,0 +1,103 @@ +#!/bin/bash + +echo "Starting service data enrichment pipeline..." + +# Validate required environment variables +if [ -z "$service_id" ] || [ -z "$name" ] || [ -z "$description" ]; then + echo "[ERROR] Missing required environment variables: service_id, name, or description" + exit 1 +fi + +PYTHON_SCRIPT="/app/src/intent_data_enrichment/main_enrichment.py" + +echo "[INFO] Service ID: $service_id" +echo "[INFO] Service Name: $name" + +# Install uv if not found +UV_BIN="/root/.local/bin/uv" +if [ ! -f "$UV_BIN" ]; then + echo "[UV] Installing uv..." + curl -LsSf https://astral.sh/uv/install.sh | sh || { + echo "[ERROR] Failed to install uv" + exit 1 + } +fi + +# Activate Python virtual environment +VENV_PATH="/app/python_virtual_env" +echo "[VENV] Activating virtual environment at: $VENV_PATH" +source "$VENV_PATH/bin/activate" || { + echo "[ERROR] Failed to activate virtual environment" + exit 1 +} + +# Install required packages (minimal for Phase 1) +echo "[PACKAGES] Installing required packages..." + +"$UV_BIN" pip install --python "$VENV_PATH/bin/python3" "httpx>=0.27.0" || exit 1 +"$UV_BIN" pip install --python "$VENV_PATH/bin/python3" "pydantic>=2.11.7" || exit 1 +"$UV_BIN" pip install --python "$VENV_PATH/bin/python3" "qdrant-client>=1.15.1" || exit 1 +"$UV_BIN" pip install --python "$VENV_PATH/bin/python3" "loguru>=0.7.3" || exit 1 + +echo "[PACKAGES] All packages installed successfully" + +# Set Python path +export PYTHONPATH="/app:/app/src:/app/src/intent_data_enrichment:$PYTHONPATH" + +# Verify Python script exists +[ ! -f "$PYTHON_SCRIPT" ] && { echo "[ERROR] Python script not found at $PYTHON_SCRIPT"; exit 1; } + +echo "[FOUND] Python script at: $PYTHON_SCRIPT" + +# Run enrichment script with arguments +echo "[STARTING] Service enrichment processing..." + +# URL decode function using Python +url_decode() { + python3 -c "import sys; from urllib.parse import unquote; print(unquote(sys.argv[1]))" "$1" +} + +# Write JSON arrays to temporary files to avoid bash parsing issues +# Arrays are URL-encoded from Ruuter, need to decode them +TEMP_DIR=$(mktemp -d) +EXAMPLES_FILE="$TEMP_DIR/examples.json" +ENTITIES_FILE="$TEMP_DIR/entities.json" + +if [ -n "$examples" ]; then + url_decode "$examples" > "$EXAMPLES_FILE" +fi + +if [ -n "$entities" ]; then + url_decode "$entities" > "$ENTITIES_FILE" +fi + +# Build Python command arguments array +PYTHON_ARGS=( + "$PYTHON_SCRIPT" + --service-id "$service_id" + --name "$name" + --description "$description" +) + +# Add optional fields +[ -n "$ruuter_type" ] && PYTHON_ARGS+=(--ruuter-type "$ruuter_type") +[ -n "$current_state" ] && PYTHON_ARGS+=(--current-state "$current_state") +[ -n "$is_common" ] && PYTHON_ARGS+=(--is-common "$is_common") +[ -n "$examples" ] && PYTHON_ARGS+=(--examples-file "$EXAMPLES_FILE") +[ -n "$entities" ] && PYTHON_ARGS+=(--entities-file "$ENTITIES_FILE") + +# Execute Python script directly (no eval to avoid parsing issues) +python3 -u "${PYTHON_ARGS[@]}" 2>&1 +PYTHON_EXIT_CODE=$? + +# Cleanup temporary files +rm -rf "$TEMP_DIR" + +# Handle exit codes +if [ $PYTHON_EXIT_CODE -eq 0 ]; then + echo "[SUCCESS] Service enrichment completed successfully" + exit 0 +else + echo "[ERROR] Service enrichment failed with exit code: $PYTHON_EXIT_CODE" + exit $PYTHON_EXIT_CODE +fi diff --git a/DSL/Ruuter.public/rag-search/POST/services/enrich.yml b/DSL/Ruuter.public/rag-search/POST/services/enrich.yml new file mode 100644 index 00000000..5748ad59 --- /dev/null +++ b/DSL/Ruuter.public/rag-search/POST/services/enrich.yml @@ -0,0 +1,120 @@ +declaration: + call: declare + version: 0.1 + description: "Enrich service data and index in Qdrant (async via CronManager)" + method: post + accepts: json + returns: json + namespace: rag-search + allowlist: + body: + - field: service_id + type: string + description: "Unique service identifier" + - field: name + type: string + description: "Service name" + - field: description + type: string + description: "Service description" + - field: examples + type: array + description: "Example queries" + - field: entities + type: array + description: "Expected entity names" + - field: ruuter_type + type: string + description: "HTTP method (GET/POST)" + - field: current_state + type: string + description: "Service state (active/inactive/draft)" + - field: is_common + type: boolean + description: "Is common service" + +extract_request_data: + assign: + service_id: ${incoming.body.service_id} + service_name: ${incoming.body.name} + service_description: ${incoming.body.description} + service_examples: ${encodeURIComponent(JSON.stringify(incoming.body.examples) || '[]')} + service_entities: ${encodeURIComponent(JSON.stringify(incoming.body.entities) || '[]')} + service_ruuter_type: ${incoming.body.ruuter_type || 'GET'} + service_current_state: ${incoming.body.current_state || 'draft'} + service_is_common: ${incoming.body.is_common || false} + next: validate_required_fields + +validate_required_fields: + switch: + - condition: "${!service_id || !service_name || !service_description}" + next: return_missing_fields + next: execute_enrichment + +return_missing_fields: + assign: + error_data: { + success: false, + error: "MISSING_REQUIRED_FIELDS", + message: "service_id, name, and description are required" + } + next: return_bad_request + +execute_enrichment: + call: http.post + args: + url: "[#RAG_SEARCH_CRON_MANAGER]/execute/service_enrichment/enrich_and_index" + query: + service_id: ${service_id} + name: ${service_name} + description: ${service_description} + examples: ${service_examples} + entities: ${service_entities} + ruuter_type: ${service_ruuter_type} + current_state: ${service_current_state} + is_common: ${service_is_common} + result: enrichment_result + on_error: handle_cron_error + next: check_enrichment_status + +check_enrichment_status: + switch: + - condition: ${200 <= enrichment_result.response.statusCodeValue && enrichment_result.response.statusCodeValue < 300} + next: assign_success + next: assign_cron_failure + +handle_cron_error: + log: "ERROR: Failed to queue enrichment job - ${enrichment_result.error || 'CronManager unreachable'}" + next: assign_cron_failure + +assign_cron_failure: + assign: + response_data: + success: false + error: "ENRICHMENT_QUEUE_FAILED" + message: "Failed to queue enrichment job. CronManager may be unavailable." + details: ${enrichment_result.error} + next: return_server_error + +assign_success: + assign: + response_data: + success: true + service_id: ${service_id} + message: "Service enrichment job queued successfully. Processing asynchronously." + next: return_ok + +return_ok: + status: 200 + return: ${response_data} + next: end + +return_bad_request: + status: 400 + return: ${error_data} + next: end + +return_server_error: + status: 500 + return: ${response_data} + next: end diff --git a/docker-compose-ec2.yml b/docker-compose-ec2.yml index 26c19068..cc48c1c9 100644 --- a/docker-compose-ec2.yml +++ b/docker-compose-ec2.yml @@ -179,6 +179,7 @@ services: - ./DSL/CronManager/DSL:/DSL - ./DSL/CronManager/script:/app/scripts - ./src/vector_indexer:/app/src/vector_indexer + - ./src/intent_data_enrichment:/app/src/intent_data_enrichment - ./src/utils/decrypt_vault_secrets.py:/app/src/utils/decrypt_vault_secrets.py:ro # Decryption utility (read-only) - cron_data:/app/data - shared-volume:/app/shared # Access to shared resources for cross-container coordination @@ -187,7 +188,7 @@ services: - ./.env:/app/.env:ro environment: - server.port=9010 - - PYTHONPATH=/app:/app/src/vector_indexer + - PYTHONPATH=/app:/app/src/vector_indexer:/app/src/intent_data_enrichment - VAULT_AGENT_URL=http://vault-agent-cron:8203 ports: - 9010:8080 diff --git a/docker-compose.yml b/docker-compose.yml index 8a9d119e..1fec54b5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -178,6 +178,7 @@ services: - ./DSL/CronManager/DSL:/DSL - ./DSL/CronManager/script:/app/scripts - ./src/vector_indexer:/app/src/vector_indexer + - ./src/intent_data_enrichment:/app/src/intent_data_enrichment - ./src/utils/decrypt_vault_secrets.py:/app/src/utils/decrypt_vault_secrets.py:ro # Decryption utility (read-only) - cron_data:/app/data - shared-volume:/app/shared # Access to shared resources for cross-container coordination @@ -186,7 +187,7 @@ services: - ./.env:/app/.env:ro environment: - server.port=9010 - - PYTHONPATH=/app:/app/src/vector_indexer + - PYTHONPATH=/app:/app/src/vector_indexer:/app/src/intent_data_enrichment - VAULT_AGENT_URL=http://vault-agent-cron:8203 ports: - 9010:8080 diff --git a/src/intent_data_enrichment/__init__.py b/src/intent_data_enrichment/__init__.py new file mode 100644 index 00000000..eb197d33 --- /dev/null +++ b/src/intent_data_enrichment/__init__.py @@ -0,0 +1,22 @@ +""" +Data Enrichment Module + +This module handles enrichment of service data before indexing into Qdrant. +Services are enriched with LLM-generated context and stored in intent_collections. +""" + +__version__ = "1.0.0" + +from intent_data_enrichment.models import ServiceData, EnrichedService, EnrichmentResult +from intent_data_enrichment.api_client import LLMAPIClient +from intent_data_enrichment.qdrant_manager import QdrantManager +from intent_data_enrichment.constants import EnrichmentConstants + +__all__ = [ + "ServiceData", + "EnrichedService", + "EnrichmentResult", + "LLMAPIClient", + "QdrantManager", + "EnrichmentConstants", +] diff --git a/src/intent_data_enrichment/api_client.py b/src/intent_data_enrichment/api_client.py new file mode 100644 index 00000000..31ed96e2 --- /dev/null +++ b/src/intent_data_enrichment/api_client.py @@ -0,0 +1,191 @@ +"""API client for LLM Orchestration Service.""" + +import asyncio +import httpx +from typing import List, Optional +from types import TracebackType +from loguru import logger + +from intent_data_enrichment.constants import EnrichmentConstants +from intent_data_enrichment.models import ServiceData + + +class LLMAPIClient: + """Client for calling LLM Orchestration Service endpoints.""" + + def __init__( + self, + api_base_url: str = EnrichmentConstants.DEFAULT_API_BASE_URL, + environment: str = EnrichmentConstants.DEFAULT_ENVIRONMENT, + connection_id: str = EnrichmentConstants.DEFAULT_CONNECTION_ID, + max_retries: int = EnrichmentConstants.MAX_RETRIES, + retry_delay_base: int = EnrichmentConstants.RETRY_DELAY_BASE, + timeout: int = EnrichmentConstants.REQUEST_TIMEOUT, + ) -> None: + self.api_base_url = api_base_url + self.environment = environment + self.connection_id = connection_id + self.max_retries = max_retries + self.retry_delay_base = retry_delay_base + self.timeout = timeout + self.session: Optional[httpx.AsyncClient] = None + + async def __aenter__(self) -> "LLMAPIClient": + """Async context manager entry.""" + self.session = httpx.AsyncClient(timeout=self.timeout) + return self + + async def __aexit__( + self, + exc_type: Optional[type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + """Async context manager exit.""" + if self.session: + await self.session.aclose() + + async def generate_context(self, service_data: ServiceData) -> str: + """ + Generate rich context for service using LLM. + + Args: + service_data: Service data to enrich + + Returns: + Generated context string + + Raises: + RuntimeError: If context generation fails after all retries + """ + # Build full service information + full_service_info = f"""Service: {service_data.name} +ID: {service_data.service_id} +Description: {service_data.description} +Examples: {", ".join(service_data.examples)} +Entities: {", ".join(service_data.entities)}""" + + # Build context generation prompt + context_prompt = EnrichmentConstants.CONTEXT_TEMPLATE.format( + full_service_info=full_service_info, + name=service_data.name, + description=service_data.description, + examples=", ".join(service_data.examples), + ) + + request_data = { + "document_prompt": "", # Empty for service enrichment + "chunk_prompt": context_prompt, + "environment": self.environment, + "use_cache": True, + "connection_id": self.connection_id, + } + + last_error = None + for attempt in range(self.max_retries): + try: + logger.info( + f"Generating context for service '{service_data.service_id}' " + f"(attempt {attempt + 1}/{self.max_retries})" + ) + + if not self.session: + raise RuntimeError("HTTP session not initialized") + + response = await self.session.post( + f"{self.api_base_url}/generate-context", json=request_data + ) + response.raise_for_status() + result = response.json() + + context = result.get("context", "").strip() + if not context: + raise ValueError("Empty context returned from API") + + logger.success( + f"Successfully generated context for '{service_data.service_id}': " + f"{len(context)} characters" + ) + return context + + except Exception as e: + last_error = e + logger.warning( + f"Context generation attempt {attempt + 1} failed for " + f"'{service_data.service_id}': {e}" + ) + + if attempt < self.max_retries - 1: + delay = self.retry_delay_base**attempt + logger.info(f"Retrying in {delay} seconds...") + await asyncio.sleep(delay) + + # All retries failed + error_msg = ( + f"Context generation failed for '{service_data.service_id}' " + f"after {self.max_retries} attempts: {last_error}" + ) + logger.error(error_msg) + raise RuntimeError(error_msg) + + async def create_embedding(self, text: str) -> List[float]: + """ + Create embedding vector for text. + + Args: + text: Text to embed + + Returns: + Embedding vector + + Raises: + RuntimeError: If embedding creation fails after all retries + """ + request_data = { + "texts": [text], + "environment": self.environment, + "connection_id": self.connection_id, + "batch_size": 1, + } + + last_error = None + for attempt in range(self.max_retries): + try: + logger.info( + f"Creating embedding (attempt {attempt + 1}/{self.max_retries})" + ) + + if not self.session: + raise RuntimeError("HTTP session not initialized") + + response = await self.session.post( + f"{self.api_base_url}/embeddings", json=request_data + ) + response.raise_for_status() + result = response.json() + + embeddings = result.get("embeddings", []) + if not embeddings or not embeddings[0]: + raise ValueError("Empty embedding returned from API") + + embedding = embeddings[0] + logger.success( + f"Successfully created embedding: dimension {len(embedding)}" + ) + return embedding + + except Exception as e: + last_error = e + logger.warning(f"Embedding creation attempt {attempt + 1} failed: {e}") + + if attempt < self.max_retries - 1: + delay = self.retry_delay_base**attempt + logger.info(f"Retrying in {delay} seconds...") + await asyncio.sleep(delay) + + # All retries failed + error_msg = ( + f"Embedding creation failed after {self.max_retries} attempts: {last_error}" + ) + logger.error(error_msg) + raise RuntimeError(error_msg) diff --git a/src/intent_data_enrichment/constants.py b/src/intent_data_enrichment/constants.py new file mode 100644 index 00000000..fd15a6ac --- /dev/null +++ b/src/intent_data_enrichment/constants.py @@ -0,0 +1,46 @@ +"""Constants for data enrichment service.""" + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class EnrichmentConstants: + """Constants for enrichment pipeline.""" + + # API Configuration + DEFAULT_API_BASE_URL = "http://llm-orchestration-service:8100" + DEFAULT_ENVIRONMENT = "production" + DEFAULT_CONNECTION_ID = "gpt-4o-mini" + + # Retry Configuration + MAX_RETRIES = 3 + RETRY_DELAY_BASE = 2 # Exponential backoff base (2^attempt seconds) + REQUEST_TIMEOUT = 60 # seconds + + # Qdrant Configuration + COLLECTION_NAME = "intent_collections" + DEFAULT_QDRANT_HOST = "qdrant" + DEFAULT_QDRANT_PORT = 6333 + VECTOR_SIZE = 3072 # Azure text-embedding-3-large dimension + DISTANCE_METRIC = "Cosine" + + # Context Generation + CONTEXT_TEMPLATE = """ +{full_service_info} + + +Here is the service intent we want to enrich for better search retrieval: + +Name: {name} +Description: {description} +Examples: {examples} + + +Please generate a rich, detailed context that describes this service intent comprehensively for semantic search. +Include information about: +- What the user wants to accomplish +- Key terms and synonyms +- Related concepts +- Common ways users might express this intent + +Answer only with the enriched context and nothing else.""" diff --git a/src/intent_data_enrichment/main_enrichment.py b/src/intent_data_enrichment/main_enrichment.py new file mode 100644 index 00000000..2aedb264 --- /dev/null +++ b/src/intent_data_enrichment/main_enrichment.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 +""" +Service Data Enrichment Script + +This script receives service data, enriches it with LLM-generated context, +creates embeddings, and stores in Qdrant intent_collections. +""" + +import sys +import json +import argparse +import asyncio +from loguru import logger + +from intent_data_enrichment.models import ServiceData, EnrichedService, EnrichmentResult +from intent_data_enrichment.api_client import LLMAPIClient +from intent_data_enrichment.qdrant_manager import QdrantManager + + +def parse_arguments() -> ServiceData: + """Parse command line arguments into ServiceData model.""" + parser = argparse.ArgumentParser(description="Service Data Enrichment") + parser.add_argument("--service-id", type=str, required=True, help="Service ID") + parser.add_argument("--name", type=str, required=True, help="Service name") + parser.add_argument( + "--description", type=str, required=True, help="Service description" + ) + parser.add_argument("--examples-file", type=str, help="Path to examples JSON file") + parser.add_argument("--entities-file", type=str, help="Path to entities JSON file") + parser.add_argument("--ruuter-type", type=str, default="GET", help="Ruuter type") + parser.add_argument( + "--current-state", type=str, default="draft", help="Current state" + ) + parser.add_argument( + "--is-common", + type=str, + choices=["true", "false"], + default="false", + help="Is common service", + ) + + args = parser.parse_args() + + # Read and parse JSON arrays from files + examples = [] + if args.examples_file: + try: + with open(args.examples_file, "r", encoding="utf-8") as f: + content = f.read().strip() + if content: + examples = json.loads(content) + except (FileNotFoundError, json.JSONDecodeError) as e: + logger.warning(f"Failed to read/parse examples file: {e}") + + entities = [] + if args.entities_file: + try: + with open(args.entities_file, "r", encoding="utf-8") as f: + content = f.read().strip() + if content: + entities = json.loads(content) + except (FileNotFoundError, json.JSONDecodeError) as e: + logger.warning(f"Failed to read/parse entities file: {e}") + + return ServiceData( + service_id=args.service_id, + name=args.name, + description=args.description, + examples=examples, + entities=entities, + ruuter_type=args.ruuter_type, + current_state=args.current_state, + is_common=args.is_common.lower() == "true", + ) + + +async def enrich_service(service_data: ServiceData) -> EnrichmentResult: + """ + Main enrichment pipeline: generate context, create embedding, store in Qdrant. + + Args: + service_data: Service data to enrich + + Returns: + EnrichmentResult with success/failure information + """ + try: + # Step 1: Generate rich context using LLM + logger.info("Step 1: Generating rich context with LLM") + async with LLMAPIClient() as api_client: + context = await api_client.generate_context(service_data) + logger.success(f"Context generated: {len(context)} characters") + + # Step 2: Create embedding for the context + logger.info("Step 2: Creating embedding vector") + embedding = await api_client.create_embedding(context) + logger.success(f"Embedding created: {len(embedding)}-dimensional vector") + + # Step 3: Prepare enriched service + enriched_service = EnrichedService( + id=service_data.service_id, + name=service_data.name, + description=service_data.description, + examples=service_data.examples, + entities=service_data.entities, + context=context, + embedding=embedding, + ) + + # Step 4: Store in Qdrant + logger.info("Step 3: Storing in Qdrant") + qdrant = QdrantManager() + try: + qdrant.connect() + qdrant.ensure_collection() + success = qdrant.upsert_service(enriched_service) + finally: + qdrant.close() + + if success: + return EnrichmentResult( + success=True, + service_id=service_data.service_id, + message=f"Service '{service_data.name}' enriched and indexed successfully", + context_length=len(context), + embedding_dimension=len(embedding), + error=None, + ) + else: + return EnrichmentResult( + success=False, + service_id=service_data.service_id, + message="Failed to store in Qdrant", + context_length=None, + embedding_dimension=None, + error="Qdrant upsert operation failed", + ) + + except Exception as e: + logger.error(f"Enrichment pipeline failed: {e}") + return EnrichmentResult( + success=False, + service_id=service_data.service_id, + message="Enrichment pipeline failed", + context_length=None, + embedding_dimension=None, + error=str(e), + ) + + +def main() -> int: + """Main entry point for service enrichment""" + logger.info("Starting service data enrichment pipeline") + + try: + # Parse arguments + service_data = parse_arguments() + logger.info(f"Service ID: {service_data.service_id}") + logger.info(f"Service Name: {service_data.name}") + logger.info(f"Examples: {len(service_data.examples)} provided") + logger.info(f"Entities: {len(service_data.entities)} provided") + + # Run enrichment pipeline + result = asyncio.run(enrich_service(service_data)) + + # Log results + if result.success: + logger.success("Enrichment completed successfully") + logger.info(f"Service: {result.service_id}") + logger.info(f"Message: {result.message}") + logger.info(f"Context Length: {result.context_length} characters") + logger.info(f"Embedding Dimension: {result.embedding_dimension}") + return 0 + else: + logger.error("Enrichment failed") + logger.error(f"Service: {result.service_id}") + logger.error(f"Message: {result.message}") + logger.error(f"Error: {result.error}") + return 1 + + except Exception as e: + logger.error(f"Fatal error: {e}") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/intent_data_enrichment/models.py b/src/intent_data_enrichment/models.py new file mode 100644 index 00000000..eb0ef64e --- /dev/null +++ b/src/intent_data_enrichment/models.py @@ -0,0 +1,44 @@ +"""Data models for service enrichment.""" + +from typing import List, Optional +from pydantic import BaseModel, Field + + +class ServiceData(BaseModel): + """Input service data to be enriched.""" + + service_id: str = Field(..., description="Unique service identifier") + name: str = Field(..., description="Service name") + description: str = Field(..., description="Service description") + examples: List[str] = Field(default_factory=list, description="Example queries") + entities: List[str] = Field( + default_factory=list, description="Expected entity names" + ) + ruuter_type: Optional[str] = Field(default="GET", description="HTTP method") + current_state: Optional[str] = Field(default="draft", description="Service state") + is_common: Optional[bool] = Field(default=False, description="Is common service") + + +class EnrichedService(BaseModel): + """Enriched service data ready for storage.""" + + id: str = Field(..., description="Service ID (maps to service_id)") + name: str = Field(..., description="Service name") + description: str = Field(..., description="Service description") + examples: List[str] = Field(..., description="Example queries") + entities: List[str] = Field(..., description="Expected entity names") + context: str = Field(..., description="Generated rich context") + embedding: List[float] = Field(..., description="Context embedding vector") + + +class EnrichmentResult(BaseModel): + """Result of enrichment operation.""" + + success: bool = Field(..., description="Whether enrichment succeeded") + service_id: str = Field(..., description="Service ID") + message: str = Field(..., description="Result message") + context_length: Optional[int] = Field(None, description="Generated context length") + embedding_dimension: Optional[int] = Field( + None, description="Embedding vector dimension" + ) + error: Optional[str] = Field(None, description="Error message if failed") diff --git a/src/intent_data_enrichment/qdrant_manager.py b/src/intent_data_enrichment/qdrant_manager.py new file mode 100644 index 00000000..5024e236 --- /dev/null +++ b/src/intent_data_enrichment/qdrant_manager.py @@ -0,0 +1,182 @@ +"""Qdrant manager for intent collections.""" + +import uuid +from typing import Optional +from loguru import logger +from qdrant_client import QdrantClient +from qdrant_client.models import Distance, VectorParams, PointStruct + +from intent_data_enrichment.constants import EnrichmentConstants +from intent_data_enrichment.models import EnrichedService + +# Error messages +_CLIENT_NOT_INITIALIZED = "Qdrant client not initialized" + + +class QdrantManager: + """Manages Qdrant operations for intent collections.""" + + def __init__( + self, + host: str = EnrichmentConstants.DEFAULT_QDRANT_HOST, + port: int = EnrichmentConstants.DEFAULT_QDRANT_PORT, + collection_name: str = EnrichmentConstants.COLLECTION_NAME, + ) -> None: + self.host = host + self.port = port + self.collection_name = collection_name + self.client: Optional[QdrantClient] = None + + def connect(self) -> None: + """Connect to Qdrant.""" + try: + logger.info(f"Connecting to Qdrant at {self.host}:{self.port}") + self.client = QdrantClient( + host=self.host, + port=self.port, + timeout=30, + prefer_grpc=False, + api_key=None, + ) + logger.success("Successfully connected to Qdrant") + except Exception as e: + logger.error(f"Failed to connect to Qdrant: {e}") + raise + + def ensure_collection(self) -> None: + """Ensure the intent_collections collection exists with correct vector size.""" + try: + if not self.client: + raise RuntimeError(_CLIENT_NOT_INITIALIZED) + + collections = self.client.get_collections().collections + collection_names = [col.name for col in collections] + + if self.collection_name in collection_names: + # Check if existing collection has correct vector size + collection_info = self.client.get_collection(self.collection_name) + + # Qdrant vectors config is a dict - get the default vector config + vectors_config = collection_info.config.params.vectors + + existing_vector_size: Optional[int] = None + if isinstance(vectors_config, dict): + # Get first vector config (usually the default/unnamed one) + if vectors_config: + vector_params = next(iter(vectors_config.values())) + existing_vector_size = vector_params.size + elif vectors_config is not None: + # Direct VectorParams object (older API) + existing_vector_size = vectors_config.size + + if existing_vector_size is None: + logger.error( + f"Collection '{self.collection_name}' exists but vector size cannot be determined" + ) + raise RuntimeError( + f"Collection '{self.collection_name}' exists but vector size cannot be determined. " + "This may indicate a Qdrant API issue or unexpected collection configuration. " + "Manual intervention required: verify Qdrant health, inspect collection config, " + "or manually delete the collection if recreating is intended." + ) + elif existing_vector_size != EnrichmentConstants.VECTOR_SIZE: + logger.error( + f"Collection '{self.collection_name}' has incompatible vector size: " + f"{existing_vector_size} (expected {EnrichmentConstants.VECTOR_SIZE})" + ) + raise RuntimeError( + f"Collection '{self.collection_name}' has incompatible vector size " + f"({existing_vector_size} vs expected {EnrichmentConstants.VECTOR_SIZE}). " + "This prevents automatic deletion to avoid accidental data loss. " + "To recreate the collection, manually delete it first using: " + f"qdrant.client.delete_collection('{self.collection_name}') or via Qdrant UI/API." + ) + else: + logger.info( + f"Collection '{self.collection_name}' already exists " + f"with correct vector size ({existing_vector_size})" + ) + else: + self._create_collection() + + except Exception as e: + logger.error(f"Failed to ensure collection exists: {e}") + raise + + def _create_collection(self) -> None: + """Create the collection with correct vector configuration.""" + if not self.client: + raise RuntimeError(_CLIENT_NOT_INITIALIZED) + + logger.info( + f"Creating collection '{self.collection_name}' " + f"with vector size {EnrichmentConstants.VECTOR_SIZE}" + ) + self.client.create_collection( + collection_name=self.collection_name, + vectors_config=VectorParams( + size=EnrichmentConstants.VECTOR_SIZE, + distance=Distance.COSINE, + ), + ) + logger.success(f"Collection '{self.collection_name}' created successfully") + + def upsert_service(self, enriched_service: EnrichedService) -> bool: + """ + Upsert enriched service to Qdrant (update if exists, insert if new). + + Args: + enriched_service: EnrichedService instance containing the embedding and + associated metadata to upsert into Qdrant. + + Returns: + True if successful, False otherwise + """ + try: + if not self.client: + raise RuntimeError("Qdrant client not initialized") + + logger.info(f"Upserting service '{enriched_service.id}' to Qdrant") + + # Convert service_id to UUID for Qdrant compatibility + # Qdrant requires point IDs to be either integers or UUIDs + point_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, enriched_service.id)) + + # Prepare payload (all metadata except embedding) + payload = { + "service_id": enriched_service.id, # Store original ID in payload + "name": enriched_service.name, + "description": enriched_service.description, + "examples": enriched_service.examples, + "entities": enriched_service.entities, + "context": enriched_service.context, + } + + # Create point with UUID + point = PointStruct( + id=point_id, # ✓ Now using UUID string + vector=enriched_service.embedding, + payload=payload, + ) + + # Upsert to Qdrant + self.client.upsert( + collection_name=self.collection_name, + points=[point], + ) + + logger.success( + f"Successfully upserted service '{enriched_service.id}' " + f"({len(enriched_service.embedding)}-dim vector)" + ) + return True + + except Exception as e: + logger.error(f"Failed to upsert service '{enriched_service.id}': {e}") + return False + + def close(self) -> None: + """Close Qdrant connection.""" + if self.client: + logger.info("Closing Qdrant connection") + self.client.close()