diff --git a/DSL/CronManager/script/service_enrichment.sh b/DSL/CronManager/script/service_enrichment.sh index 4828833..c50a490 100644 --- a/DSL/CronManager/script/service_enrichment.sh +++ b/DSL/CronManager/script/service_enrichment.sh @@ -8,7 +8,7 @@ if [ -z "$service_id" ] || [ -z "$name" ] || [ -z "$description" ]; then exit 1 fi -PYTHON_SCRIPT="/app/src/data_enrichment/main_enrichment.py" +PYTHON_SCRIPT="/app/src/intent_data_enrichment/main_enrichment.py" echo "[INFO] Service ID: $service_id" echo "[INFO] Service Name: $name" @@ -42,7 +42,7 @@ echo "[PACKAGES] Installing required packages..." echo "[PACKAGES] All packages installed successfully" # Set Python path -export PYTHONPATH="/app:/app/src:/app/src/data_enrichment:$PYTHONPATH" +export PYTHONPATH="/app:/app/src:/app/src/intent_data_enrichment:$PYTHONPATH" # Verify Python script exists [ ! -f "$PYTHON_SCRIPT" ] && { echo "[ERROR] Python script not found at $PYTHON_SCRIPT"; exit 1; } diff --git a/DSL/Ruuter.public/rag-search/POST/services/enrich.yml b/DSL/Ruuter.public/rag-search/POST/services/enrich.yml index 8e42737..5748ad5 100644 --- a/DSL/Ruuter.public/rag-search/POST/services/enrich.yml +++ b/DSL/Ruuter.public/rag-search/POST/services/enrich.yml @@ -74,8 +74,14 @@ execute_enrichment: current_state: ${service_current_state} is_common: ${service_is_common} result: enrichment_result - next: assign_success on_error: handle_cron_error + next: check_enrichment_status + +check_enrichment_status: + switch: + - condition: ${200 <= enrichment_result.response.statusCodeValue && enrichment_result.response.statusCodeValue < 300} + next: assign_success + next: assign_cron_failure handle_cron_error: log: "ERROR: Failed to queue enrichment job - ${enrichment_result.error || 'CronManager unreachable'}" diff --git a/docker-compose-ec2.yml b/docker-compose-ec2.yml index c6b8819..cc48c1c 100644 --- a/docker-compose-ec2.yml +++ b/docker-compose-ec2.yml @@ -179,7 +179,7 @@ services: - ./DSL/CronManager/DSL:/DSL - ./DSL/CronManager/script:/app/scripts - ./src/vector_indexer:/app/src/vector_indexer - - ./src/data_enrichment:/app/src/data_enrichment + - ./src/intent_data_enrichment:/app/src/intent_data_enrichment - ./src/utils/decrypt_vault_secrets.py:/app/src/utils/decrypt_vault_secrets.py:ro # Decryption utility (read-only) - cron_data:/app/data - shared-volume:/app/shared # Access to shared resources for cross-container coordination @@ -188,7 +188,7 @@ services: - ./.env:/app/.env:ro environment: - server.port=9010 - - PYTHONPATH=/app:/app/src/vector_indexer:/app/src/data_enrichment + - PYTHONPATH=/app:/app/src/vector_indexer:/app/src/intent_data_enrichment - VAULT_AGENT_URL=http://vault-agent-cron:8203 ports: - 9010:8080 diff --git a/docker-compose.yml b/docker-compose.yml index 5ac933e..1fec54b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -178,7 +178,7 @@ services: - ./DSL/CronManager/DSL:/DSL - ./DSL/CronManager/script:/app/scripts - ./src/vector_indexer:/app/src/vector_indexer - - ./src/data_enrichment:/app/src/data_enrichment + - ./src/intent_data_enrichment:/app/src/intent_data_enrichment - ./src/utils/decrypt_vault_secrets.py:/app/src/utils/decrypt_vault_secrets.py:ro # Decryption utility (read-only) - cron_data:/app/data - shared-volume:/app/shared # Access to shared resources for cross-container coordination @@ -187,7 +187,7 @@ services: - ./.env:/app/.env:ro environment: - server.port=9010 - - PYTHONPATH=/app:/app/src/vector_indexer:/app/src/data_enrichment + - PYTHONPATH=/app:/app/src/vector_indexer:/app/src/intent_data_enrichment - VAULT_AGENT_URL=http://vault-agent-cron:8203 ports: - 9010:8080 diff --git a/enrich.yml.backup b/enrich.yml.backup deleted file mode 100644 index 28cd5b3..0000000 --- a/enrich.yml.backup +++ /dev/null @@ -1,157 +0,0 @@ -declaration: - call: declare - version: 0.1 - description: "Enrich service data and index in Qdrant" - method: post - accepts: json - returns: json - namespace: rag-search - allowlist: - body: - - field: service_id - type: string - description: "Unique service identifier" - - field: name - type: string - description: "Service name" - - field: description - type: string - description: "Service description" - - field: examples - type: array - description: "Example queries" - - field: entities - type: array - description: "Expected entity names" - - field: ruuter_type - type: string - description: "HTTP method (GET/POST)" - - field: current_state - type: string - description: "Service state (active/inactive/draft)" - - field: is_common - type: boolean - description: "Is common service" - -validate_request: - assign: - service_id: ${incoming.body.service_id} - service_name: ${incoming.body.name} - service_description: ${incoming.body.description} - next: check_required_fields - -check_required_fields: - switch: - - condition: ${!service_id} - next: assign_missing_service_id_error - - condition: ${!service_name} - next: assign_missing_name_error - - condition: ${!service_description} - next: assign_missing_description_error - next: prepare_service_data - -assign_missing_service_id_error: - assign: - error_response: { - success: false, - error: "MISSING_SERVICE_ID", - message: "service_id is required" - } - next: return_missing_service_id - -return_missing_service_id: - status: 400 - return: ${error_response} - next: end - -assign_missing_name_error: - assign: - error_response: { - success: false, - error: "MISSING_NAME", - message: "name is required" - } - next: return_missing_name - -return_missing_name: - status: 400 - return: ${error_response} - next: end - -assign_missing_description_error: - assign: - error_response: { - success: false, - error: "MISSING_DESCRIPTION", - message: "description is required" - } - next: return_missing_description - -return_missing_description: - status: 400 - return: ${error_response} - next: end - -prepare_service_data: - assign: - service_data: { - service_id: ${service_id}, - name: ${service_name}, - description: ${service_description}, - examples: ${incoming.body.examples || []}, - entities: ${incoming.body.entities || []}, - ruuter_type: ${incoming.body.ruuter_type || 'GET'}, - current_state: ${incoming.body.current_state || 'draft'}, - is_common: ${incoming.body.is_common || false} - } - next: stringify_service_data - -stringify_service_data: - assign: - service_json: ${JSON.stringify(service_data)} - next: execute_enrichment - -execute_enrichment: - call: http.post - args: - url: "[#RAG_SEARCH_CRON_MANAGER]/execute/service_enrichment/enrich_and_index" - query: - service_id: ${service_id} - service_data: ${service_json} - result: enrichment_result - next: assign_success_response - on_error: handle_enrichment_error - -handle_enrichment_error: - log: "ERROR: Service enrichment failed - ${enrichment_result.error || 'Unknown error'}" - next: assign_error_response - -assign_success_response: - assign: - success_response: { - success: true, - service_id: ${service_id}, - message: "Service enriched and indexed successfully", - enrichment_details: ${enrichment_result.response.body} - } - next: return_success - -assign_error_response: - assign: - error_response: { - success: false, - error: "ENRICHMENT_FAILED", - message: "Failed to enrich and index service", - details: ${enrichment_result.response.body || enrichment_result.error} - } - next: return_enrichment_error - -return_success: - status: 200 - return: ${success_response} - next: end - -return_enrichment_error: - status: 500 - return: ${error_response} - next: end diff --git a/src/intent_data_enrichment/__init__.py b/src/intent_data_enrichment/__init__.py index 8b538d6..eb197d3 100644 --- a/src/intent_data_enrichment/__init__.py +++ b/src/intent_data_enrichment/__init__.py @@ -7,10 +7,10 @@ __version__ = "1.0.0" -from data_enrichment.models import ServiceData, EnrichedService, EnrichmentResult -from data_enrichment.api_client import LLMAPIClient -from data_enrichment.qdrant_manager import QdrantManager -from data_enrichment.constants import EnrichmentConstants +from intent_data_enrichment.models import ServiceData, EnrichedService, EnrichmentResult +from intent_data_enrichment.api_client import LLMAPIClient +from intent_data_enrichment.qdrant_manager import QdrantManager +from intent_data_enrichment.constants import EnrichmentConstants __all__ = [ "ServiceData", diff --git a/src/intent_data_enrichment/api_client.py b/src/intent_data_enrichment/api_client.py index 903e642..31ed96e 100644 --- a/src/intent_data_enrichment/api_client.py +++ b/src/intent_data_enrichment/api_client.py @@ -6,8 +6,8 @@ from types import TracebackType from loguru import logger -from data_enrichment.constants import EnrichmentConstants -from data_enrichment.models import ServiceData +from intent_data_enrichment.constants import EnrichmentConstants +from intent_data_enrichment.models import ServiceData class LLMAPIClient: diff --git a/src/intent_data_enrichment/main_enrichment.py b/src/intent_data_enrichment/main_enrichment.py index 2ed294f..2aedb26 100644 --- a/src/intent_data_enrichment/main_enrichment.py +++ b/src/intent_data_enrichment/main_enrichment.py @@ -12,9 +12,9 @@ import asyncio from loguru import logger -from data_enrichment.models import ServiceData, EnrichedService, EnrichmentResult -from data_enrichment.api_client import LLMAPIClient -from data_enrichment.qdrant_manager import QdrantManager +from intent_data_enrichment.models import ServiceData, EnrichedService, EnrichmentResult +from intent_data_enrichment.api_client import LLMAPIClient +from intent_data_enrichment.qdrant_manager import QdrantManager def parse_arguments() -> ServiceData: @@ -110,11 +110,12 @@ async def enrich_service(service_data: ServiceData) -> EnrichmentResult: # Step 4: Store in Qdrant logger.info("Step 3: Storing in Qdrant") qdrant = QdrantManager() - qdrant.connect() - qdrant.ensure_collection() - - success = qdrant.upsert_service(enriched_service) - qdrant.close() + try: + qdrant.connect() + qdrant.ensure_collection() + success = qdrant.upsert_service(enriched_service) + finally: + qdrant.close() if success: return EnrichmentResult( diff --git a/src/intent_data_enrichment/qdrant_manager.py b/src/intent_data_enrichment/qdrant_manager.py index 3aaad61..5024e23 100644 --- a/src/intent_data_enrichment/qdrant_manager.py +++ b/src/intent_data_enrichment/qdrant_manager.py @@ -6,8 +6,8 @@ from qdrant_client import QdrantClient from qdrant_client.models import Distance, VectorParams, PointStruct -from data_enrichment.constants import EnrichmentConstants -from data_enrichment.models import EnrichedService +from intent_data_enrichment.constants import EnrichmentConstants +from intent_data_enrichment.models import EnrichedService # Error messages _CLIENT_NOT_INITIALIZED = "Qdrant client not initialized" @@ -70,21 +70,27 @@ def ensure_collection(self) -> None: existing_vector_size = vectors_config.size if existing_vector_size is None: - logger.warning( - f"Could not determine vector size for '{self.collection_name}', recreating" + logger.error( + f"Collection '{self.collection_name}' exists but vector size cannot be determined" + ) + raise RuntimeError( + f"Collection '{self.collection_name}' exists but vector size cannot be determined. " + "This may indicate a Qdrant API issue or unexpected collection configuration. " + "Manual intervention required: verify Qdrant health, inspect collection config, " + "or manually delete the collection if recreating is intended." ) - self.client.delete_collection(self.collection_name) - self._create_collection() elif existing_vector_size != EnrichmentConstants.VECTOR_SIZE: - logger.warning( - f"Collection '{self.collection_name}' exists with wrong vector size: " + logger.error( + f"Collection '{self.collection_name}' has incompatible vector size: " f"{existing_vector_size} (expected {EnrichmentConstants.VECTOR_SIZE})" ) - logger.info( - f"Deleting and recreating collection '{self.collection_name}'" + raise RuntimeError( + f"Collection '{self.collection_name}' has incompatible vector size " + f"({existing_vector_size} vs expected {EnrichmentConstants.VECTOR_SIZE}). " + "This prevents automatic deletion to avoid accidental data loss. " + "To recreate the collection, manually delete it first using: " + f"qdrant.client.delete_collection('{self.collection_name}') or via Qdrant UI/API." ) - self.client.delete_collection(self.collection_name) - self._create_collection() else: logger.info( f"Collection '{self.collection_name}' already exists " @@ -120,7 +126,8 @@ def upsert_service(self, enriched_service: EnrichedService) -> bool: Upsert enriched service to Qdrant (update if exists, insert if new). Args: - enriched_service: Enric_CLIENT_NOT_INITIALIZED + enriched_service: EnrichedService instance containing the embedding and + associated metadata to upsert into Qdrant. Returns: True if successful, False otherwise