Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions DSL/CronManager/script/service_enrichment.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ if [ -z "$service_id" ] || [ -z "$name" ] || [ -z "$description" ]; then
exit 1
fi

PYTHON_SCRIPT="/app/src/data_enrichment/main_enrichment.py"
PYTHON_SCRIPT="/app/src/intent_data_enrichment/main_enrichment.py"

echo "[INFO] Service ID: $service_id"
echo "[INFO] Service Name: $name"
Expand Down Expand Up @@ -42,7 +42,7 @@ echo "[PACKAGES] Installing required packages..."
echo "[PACKAGES] All packages installed successfully"

# Set Python path
export PYTHONPATH="/app:/app/src:/app/src/data_enrichment:$PYTHONPATH"
export PYTHONPATH="/app:/app/src:/app/src/intent_data_enrichment:$PYTHONPATH"

# Verify Python script exists
[ ! -f "$PYTHON_SCRIPT" ] && { echo "[ERROR] Python script not found at $PYTHON_SCRIPT"; exit 1; }
Expand Down
8 changes: 7 additions & 1 deletion DSL/Ruuter.public/rag-search/POST/services/enrich.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,14 @@ execute_enrichment:
current_state: ${service_current_state}
is_common: ${service_is_common}
result: enrichment_result
next: assign_success
on_error: handle_cron_error
next: check_enrichment_status

check_enrichment_status:
switch:
- condition: ${200 <= enrichment_result.response.statusCodeValue && enrichment_result.response.statusCodeValue < 300}
next: assign_success
next: assign_cron_failure

handle_cron_error:
log: "ERROR: Failed to queue enrichment job - ${enrichment_result.error || 'CronManager unreachable'}"
Expand Down
4 changes: 2 additions & 2 deletions docker-compose-ec2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ services:
- ./DSL/CronManager/DSL:/DSL
- ./DSL/CronManager/script:/app/scripts
- ./src/vector_indexer:/app/src/vector_indexer
- ./src/data_enrichment:/app/src/data_enrichment
- ./src/intent_data_enrichment:/app/src/intent_data_enrichment
- ./src/utils/decrypt_vault_secrets.py:/app/src/utils/decrypt_vault_secrets.py:ro # Decryption utility (read-only)
- cron_data:/app/data
- shared-volume:/app/shared # Access to shared resources for cross-container coordination
Expand All @@ -188,7 +188,7 @@ services:
- ./.env:/app/.env:ro
environment:
- server.port=9010
- PYTHONPATH=/app:/app/src/vector_indexer:/app/src/data_enrichment
- PYTHONPATH=/app:/app/src/vector_indexer:/app/src/intent_data_enrichment
- VAULT_AGENT_URL=http://vault-agent-cron:8203
ports:
- 9010:8080
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ services:
- ./DSL/CronManager/DSL:/DSL
- ./DSL/CronManager/script:/app/scripts
- ./src/vector_indexer:/app/src/vector_indexer
- ./src/data_enrichment:/app/src/data_enrichment
- ./src/intent_data_enrichment:/app/src/intent_data_enrichment
- ./src/utils/decrypt_vault_secrets.py:/app/src/utils/decrypt_vault_secrets.py:ro # Decryption utility (read-only)
- cron_data:/app/data
- shared-volume:/app/shared # Access to shared resources for cross-container coordination
Expand All @@ -187,7 +187,7 @@ services:
- ./.env:/app/.env:ro
environment:
- server.port=9010
- PYTHONPATH=/app:/app/src/vector_indexer:/app/src/data_enrichment
- PYTHONPATH=/app:/app/src/vector_indexer:/app/src/intent_data_enrichment
- VAULT_AGENT_URL=http://vault-agent-cron:8203
ports:
- 9010:8080
Expand Down
8 changes: 4 additions & 4 deletions src/intent_data_enrichment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

__version__ = "1.0.0"

from data_enrichment.models import ServiceData, EnrichedService, EnrichmentResult
from data_enrichment.api_client import LLMAPIClient
from data_enrichment.qdrant_manager import QdrantManager
from data_enrichment.constants import EnrichmentConstants
from intent_data_enrichment.models import ServiceData, EnrichedService, EnrichmentResult
from intent_data_enrichment.api_client import LLMAPIClient
from intent_data_enrichment.qdrant_manager import QdrantManager
from intent_data_enrichment.constants import EnrichmentConstants

__all__ = [
"ServiceData",
Expand Down
4 changes: 2 additions & 2 deletions src/intent_data_enrichment/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from types import TracebackType
from loguru import logger

from data_enrichment.constants import EnrichmentConstants
from data_enrichment.models import ServiceData
from intent_data_enrichment.constants import EnrichmentConstants
from intent_data_enrichment.models import ServiceData


class LLMAPIClient:
Expand Down
17 changes: 9 additions & 8 deletions src/intent_data_enrichment/main_enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import asyncio
from loguru import logger

from data_enrichment.models import ServiceData, EnrichedService, EnrichmentResult
from data_enrichment.api_client import LLMAPIClient
from data_enrichment.qdrant_manager import QdrantManager
from intent_data_enrichment.models import ServiceData, EnrichedService, EnrichmentResult
from intent_data_enrichment.api_client import LLMAPIClient
from intent_data_enrichment.qdrant_manager import QdrantManager


def parse_arguments() -> ServiceData:
Expand Down Expand Up @@ -110,11 +110,12 @@ async def enrich_service(service_data: ServiceData) -> EnrichmentResult:
# Step 4: Store in Qdrant
logger.info("Step 3: Storing in Qdrant")
qdrant = QdrantManager()
qdrant.connect()
qdrant.ensure_collection()

success = qdrant.upsert_service(enriched_service)
qdrant.close()
try:
qdrant.connect()
qdrant.ensure_collection()
success = qdrant.upsert_service(enriched_service)
finally:
qdrant.close()

if success:
return EnrichmentResult(
Expand Down
33 changes: 20 additions & 13 deletions src/intent_data_enrichment/qdrant_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct

from data_enrichment.constants import EnrichmentConstants
from data_enrichment.models import EnrichedService
from intent_data_enrichment.constants import EnrichmentConstants
from intent_data_enrichment.models import EnrichedService

# Error messages
_CLIENT_NOT_INITIALIZED = "Qdrant client not initialized"
Expand Down Expand Up @@ -70,21 +70,27 @@ def ensure_collection(self) -> None:
existing_vector_size = vectors_config.size

if existing_vector_size is None:
logger.warning(
f"Could not determine vector size for '{self.collection_name}', recreating"
logger.error(
f"Collection '{self.collection_name}' exists but vector size cannot be determined"
)
raise RuntimeError(
f"Collection '{self.collection_name}' exists but vector size cannot be determined. "
"This may indicate a Qdrant API issue or unexpected collection configuration. "
"Manual intervention required: verify Qdrant health, inspect collection config, "
"or manually delete the collection if recreating is intended."
)
self.client.delete_collection(self.collection_name)
self._create_collection()
elif existing_vector_size != EnrichmentConstants.VECTOR_SIZE:
logger.warning(
f"Collection '{self.collection_name}' exists with wrong vector size: "
logger.error(
f"Collection '{self.collection_name}' has incompatible vector size: "
f"{existing_vector_size} (expected {EnrichmentConstants.VECTOR_SIZE})"
)
logger.info(
f"Deleting and recreating collection '{self.collection_name}'"
raise RuntimeError(
f"Collection '{self.collection_name}' has incompatible vector size "
f"({existing_vector_size} vs expected {EnrichmentConstants.VECTOR_SIZE}). "
"This prevents automatic deletion to avoid accidental data loss. "
"To recreate the collection, manually delete it first using: "
f"qdrant.client.delete_collection('{self.collection_name}') or via Qdrant UI/API."
)
self.client.delete_collection(self.collection_name)
self._create_collection()
else:
logger.info(
f"Collection '{self.collection_name}' already exists "
Expand Down Expand Up @@ -120,7 +126,8 @@ def upsert_service(self, enriched_service: EnrichedService) -> bool:
Upsert enriched service to Qdrant (update if exists, insert if new).

Args:
enriched_service: Enric_CLIENT_NOT_INITIALIZED
enriched_service: EnrichedService instance containing the embedding and
associated metadata to upsert into Qdrant.

Returns:
True if successful, False otherwise
Expand Down
Loading