Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 171 additions & 95 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from contextlib import asynccontextmanager
from utils.version_utils import OPENRAG_VERSION
import asyncio
import atexit
Expand Down Expand Up @@ -974,7 +975,9 @@ async def refresh_default_openrag_docs(
previous_signature=previous_signature,
new_signature=signature,
)
await _delete_existing_default_docs(session_manager, connector_type="openrag_docs")
await _delete_existing_default_docs(
session_manager, connector_type="openrag_docs"
)
await ingest_openrag_docs_when_ready(
document_service,
task_service,
Expand Down Expand Up @@ -1086,7 +1089,9 @@ async def _update_mcp_servers_with_provider_credentials(services):
from utils.langflow_headers import build_mcp_global_vars_from_config

flows_service = services.get("flows_service")
global_vars = await build_mcp_global_vars_from_config(config, flows_service=flows_service)
global_vars = await build_mcp_global_vars_from_config(
config, flows_service=flows_service
)

# In no-auth mode, add the anonymous JWT token and user details
if is_no_auth_mode() and session_manager:
Expand Down Expand Up @@ -1340,8 +1345,6 @@ async def initialize_services():
Category.SERVICE_INITIALIZATION, MessageId.ORB_SVC_INIT_SUCCESS
)



# API Key service for public API authentication
api_key_service = APIKeyService(session_manager)

Expand All @@ -1362,13 +1365,91 @@ async def initialize_services():
}


@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup Tasks
app.state.background_tasks = set()

await TelemetryClient.send_event(
Category.APPLICATION_STARTUP, MessageId.ORB_APP_STARTED
)
# Start index initialization in background to avoid blocking OIDC endpoints
t1 = asyncio.create_task(startup_tasks(app.state.services))
app.state.background_tasks.add(t1)
t1.add_done_callback(app.state.background_tasks.discard)

# Start periodic task cleanup scheduler
app.state.services["task_service"].start_cleanup_scheduler()

# Start periodic flow backup task (every 5 minutes)
async def periodic_backup():
"""Periodic backup task that runs every 15 minutes"""
while True:
try:
await asyncio.sleep(5 * 60) # Wait 5 minutes

# Check if onboarding has been completed
config = get_openrag_config()
if not config.edited:
logger.debug(
"Onboarding not completed yet, skipping periodic backup"
)
continue

flows_service = app.state.services.get("flows_service")
if flows_service:
logger.info("Running periodic flow backup")
backup_results = await flows_service.backup_all_flows(
only_if_changed=True
)
if backup_results["backed_up"]:
logger.info(
"Periodic backup completed",
backed_up=len(backup_results["backed_up"]),
skipped=len(backup_results["skipped"]),
)
else:
logger.debug(
"Periodic backup: no flows changed",
skipped=len(backup_results["skipped"]),
)
except asyncio.CancelledError:
logger.info("Periodic backup task cancelled")
break
except Exception as e:
logger.error(f"Error in periodic backup task: {str(e)}")
# Continue running even if one backup fails

backup_task = asyncio.create_task(periodic_backup())
app.state.background_tasks.add(backup_task)
backup_task.add_done_callback(app.state.background_tasks.discard)

yield

# shutdown tasks

await TelemetryClient.send_event(
Category.APPLICATION_SHUTDOWN, MessageId.ORB_APP_SHUTDOWN
)
await cleanup_subscriptions_proper(app.state.services)
# Cleanup task service (cancels background tasks and process pool)
await app.state.services["task_service"].shutdown()
# Cleanup async clients
await clients.cleanup()
# Cleanup telemetry client
from utils.telemetry.client import cleanup_telemetry_client

await cleanup_telemetry_client()


async def create_app():
"""Create and configure the FastAPI application"""
services = await initialize_services()

app = FastAPI(title="OpenRAG API", version=OPENRAG_VERSION, debug=True)
app = FastAPI(
title="OpenRAG API", version=OPENRAG_VERSION, debug=True, lifespan=lifespan
)
app.state.services = services # Store services for cleanup
app.state.background_tasks = set()

# Register route handlers — auth and service injection done via FastAPI Depends() in each handler

Expand Down Expand Up @@ -1523,23 +1604,92 @@ async def create_app():
)

# Connector endpoints
app.add_api_route("/connectors", connectors.list_connectors, methods=["GET"], tags=["internal"])
app.add_api_route(
"/connectors", connectors.list_connectors, methods=["GET"], tags=["internal"]
)
# IBM COS-specific routes (registered before generic /{connector_type}/... to avoid shadowing)
app.add_api_route("/connectors/ibm_cos/defaults", ibm_cos_defaults, methods=["GET"], tags=["internal"])
app.add_api_route("/connectors/ibm_cos/configure", ibm_cos_configure, methods=["POST"], tags=["internal"])
app.add_api_route("/connectors/ibm_cos/{connection_id}/buckets", ibm_cos_list_buckets, methods=["GET"], tags=["internal"])
app.add_api_route("/connectors/ibm_cos/{connection_id}/bucket-status", ibm_cos_bucket_status, methods=["GET"], tags=["internal"])
app.add_api_route(
"/connectors/ibm_cos/defaults",
ibm_cos_defaults,
methods=["GET"],
tags=["internal"],
)
app.add_api_route(
"/connectors/ibm_cos/configure",
ibm_cos_configure,
methods=["POST"],
tags=["internal"],
)
app.add_api_route(
"/connectors/ibm_cos/{connection_id}/buckets",
ibm_cos_list_buckets,
methods=["GET"],
tags=["internal"],
)
app.add_api_route(
"/connectors/ibm_cos/{connection_id}/bucket-status",
ibm_cos_bucket_status,
methods=["GET"],
tags=["internal"],
)
# AWS S3-specific routes (registered before generic /{connector_type}/... to avoid shadowing)
app.add_api_route("/connectors/aws_s3/defaults", s3_defaults, methods=["GET"], tags=["internal"])
app.add_api_route("/connectors/aws_s3/configure", s3_configure, methods=["POST"], tags=["internal"])
app.add_api_route("/connectors/aws_s3/{connection_id}/buckets", s3_list_buckets, methods=["GET"], tags=["internal"])
app.add_api_route("/connectors/aws_s3/{connection_id}/bucket-status", s3_bucket_status, methods=["GET"], tags=["internal"])
app.add_api_route("/connectors/{connector_type}/sync", connectors.connector_sync, methods=["POST"], tags=["internal"])
app.add_api_route("/connectors/sync-all", connectors.sync_all_connectors, methods=["POST"], tags=["internal"])
app.add_api_route("/connectors/{connector_type}/status", connectors.connector_status, methods=["GET"], tags=["internal"])
app.add_api_route("/connectors/{connector_type}/token", connectors.connector_token, methods=["GET"], tags=["internal"])
app.add_api_route("/connectors/{connector_type}/disconnect", connectors.connector_disconnect, methods=["DELETE"], tags=["internal"])
app.add_api_route("/connectors/{connector_type}/webhook", connectors.connector_webhook, methods=["POST", "GET"], tags=["internal"])
app.add_api_route(
"/connectors/aws_s3/defaults", s3_defaults, methods=["GET"], tags=["internal"]
)
app.add_api_route(
"/connectors/aws_s3/configure",
s3_configure,
methods=["POST"],
tags=["internal"],
)
app.add_api_route(
"/connectors/aws_s3/{connection_id}/buckets",
s3_list_buckets,
methods=["GET"],
tags=["internal"],
)
app.add_api_route(
"/connectors/aws_s3/{connection_id}/bucket-status",
s3_bucket_status,
methods=["GET"],
tags=["internal"],
)
app.add_api_route(
"/connectors/{connector_type}/sync",
connectors.connector_sync,
methods=["POST"],
tags=["internal"],
)
app.add_api_route(
"/connectors/sync-all",
connectors.sync_all_connectors,
methods=["POST"],
tags=["internal"],
)
app.add_api_route(
"/connectors/{connector_type}/status",
connectors.connector_status,
methods=["GET"],
tags=["internal"],
)
app.add_api_route(
"/connectors/{connector_type}/token",
connectors.connector_token,
methods=["GET"],
tags=["internal"],
)
app.add_api_route(
"/connectors/{connector_type}/disconnect",
connectors.connector_disconnect,
methods=["DELETE"],
tags=["internal"],
)
app.add_api_route(
"/connectors/{connector_type}/webhook",
connectors.connector_webhook,
methods=["POST", "GET"],
tags=["internal"],
)

# Document endpoints
app.add_api_route(
Expand Down Expand Up @@ -1787,80 +1937,6 @@ async def create_app():
methods=["DELETE"],
tags=["public"],
)

# Add startup event handler
@app.on_event("startup")
async def startup_event():
await TelemetryClient.send_event(
Category.APPLICATION_STARTUP, MessageId.ORB_APP_STARTED
)
# Start index initialization in background to avoid blocking OIDC endpoints
t1 = asyncio.create_task(startup_tasks(services))
app.state.background_tasks.add(t1)
t1.add_done_callback(app.state.background_tasks.discard)

# Start periodic task cleanup scheduler
services["task_service"].start_cleanup_scheduler()

# Start periodic flow backup task (every 5 minutes)
async def periodic_backup():
"""Periodic backup task that runs every 15 minutes"""
while True:
try:
await asyncio.sleep(5 * 60) # Wait 5 minutes

# Check if onboarding has been completed
config = get_openrag_config()
if not config.edited:
logger.debug(
"Onboarding not completed yet, skipping periodic backup"
)
continue

flows_service = services.get("flows_service")
if flows_service:
logger.info("Running periodic flow backup")
backup_results = await flows_service.backup_all_flows(
only_if_changed=True
)
if backup_results["backed_up"]:
logger.info(
"Periodic backup completed",
backed_up=len(backup_results["backed_up"]),
skipped=len(backup_results["skipped"]),
)
else:
logger.debug(
"Periodic backup: no flows changed",
skipped=len(backup_results["skipped"]),
)
except asyncio.CancelledError:
logger.info("Periodic backup task cancelled")
break
except Exception as e:
logger.error(f"Error in periodic backup task: {str(e)}")
# Continue running even if one backup fails

backup_task = asyncio.create_task(periodic_backup())
app.state.background_tasks.add(backup_task)
backup_task.add_done_callback(app.state.background_tasks.discard)

# Add shutdown event handler
@app.on_event("shutdown")
async def shutdown_event():
await TelemetryClient.send_event(
Category.APPLICATION_SHUTDOWN, MessageId.ORB_APP_SHUTDOWN
)
await cleanup_subscriptions_proper(services)
# Cleanup task service (cancels background tasks and process pool)
await services["task_service"].shutdown()
# Cleanup async clients
await clients.cleanup()
# Cleanup telemetry client
from utils.telemetry.client import cleanup_telemetry_client

await cleanup_telemetry_client()

return app


Expand Down
Loading