-
Notifications
You must be signed in to change notification settings - Fork 45
Open
Description
🛡️ Technical Report: Fix Blocking Event Loop in NexusRAG
1. Vấn đề (The Issue)
Hệ thống NexusRAG gặp tình trạng treo toàn bộ Backend (Stop responding) khi người dùng thực hiện các tác vụ nặng như Upload file lớn hoặc Process Document vào Knowledge Base.
Triệu chứng (Symptoms)
- Giao diện Frontend bị "đơ", các request Chat hoặc Dashboard bị trạng thái
pending. - F5 lại trang web không tải được nội dung API cho đến khi tiến trình xử lý tài liệu hoàn tất.
- Mất khả năng xử lý đồng thời (Concurrency): Một người dùng xử lý file, tất cả người dùng khác bị chặn.
2. Nguyên nhân gốc rễ (Root Cause Analysis)
Vấn đề nằm ở việc các tác vụ Đồng bộ (Synchronous) nặng nề đang chạy trực tiếp trên Main Event Loop của FastAPI, gây ra hiện tượng Blocking I/O và CPU Starvation.
Các điểm nghẽn chính:
- File I/O Blocking: Sử dụng hàm
open(..., "wb")đồng bộ tại endpointupload_documentkhiến luồng chính phải chờ đĩa ghi dữ liệu. - CPU-Bound Tasks (Nặng nhất): Tiến trình
process_document_backgrounddù được gọi quaasyncio.create_task, nhưng bên trong lại chứa các hàm thực thi đồng bộ chiếm dụng CPU cao:- Docling Parsing: Phân tích cấu trúc PDF tốn nhiều tài nguyên.
- Sentence-Transformers: Chạy model nhúng (Embeddings) tiêu tốn năng lượng của CPU/GPU.
- ChromaDB Indexing: Ghi dữ liệu vào Vector DB thông qua SDK đồng bộ.
3. Giải pháp đề xuất (Proposed Solution)
A. Xử lý Upload không chặn (Non-blocking File I/O)
Sử dụng thư viện aiofiles để giải phóng Event Loop trong quá trình ghi file tài liệu xuống ổ cứng.
File: backend/app/api/documents.py
import aiofiles
async def save_upload_file(file_path, content):
async with aiofiles.open(file_path, "wb") as f:
await f.write(content)B. Đẩy tác vụ CPU-bound và I/O-bound vào Background Threads
Sử dụng asyncio.to_thread() (Python 3.9+) để offload các hàm đồng bộ ra khỏi Event Loop chính, cho phép FastAPI tiếp tục xử lý các HTTP request khác.
1. Xử lý Parsing (Docling):
File: nexus_rag_service.py
import asyncio
# Offload việc parse tài liệu sang một thread riêng
parsed = await asyncio.to_thread(
self.parser.parse,
file_path=file_path,
document_id=document_id,
original_filename=document.original_filename,
)2. Xử lý Indexing (Chunking, Embedding & Vector Store):
Đóng gói các logic đồng bộ vào một hàm helper và thực thi nó thông qua thread pool.
def _sync_indexing_process(chunks, metadata):
# Các bước chạy đồng bộ:
# 1. Chạy model Embedding (Sentence-Transformers)
# 2. Lưu vào ChromaDB
vector_store.add_documents(chunks, metadatas=metadata)
# Thực thi hàm đồng bộ trong môi trường async mà không gây block
await asyncio.to_thread(_sync_indexing_process, chunks, metadata)4. Kết quả mong đợi (Expected Results)
- ✅ High Concurrency: Hệ thống có thể phục vụ nhiều người dùng cùng lúc. User A xử lý tài liệu không ảnh hưởng đến User B đang chat.
- ✅ Responsive UI: Các request HTTP (Chat, Workspace, Dashboard) luôn được phản hồi ngay lập tức.
- ✅ Stability: Tránh tình trạng timeout hoặc sập dịch vụ khi gặp file có kích thước lớn.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels