-
Notifications
You must be signed in to change notification settings - Fork 0
feat(sse): add progress events for enrichment nodes #264
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,7 @@ | |
|
|
||
| from app.core.config import settings | ||
| from app.graph.state import EvaluationState | ||
| from app.services.event_channel import create_sommelier_event, get_event_channel | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
@@ -101,14 +102,50 @@ async def rag_enrich( | |
| state: EvaluationState, config: Optional[RunnableConfig] = None | ||
| ) -> Dict[str, Any]: | ||
| started_at = datetime.now(timezone.utc).isoformat() | ||
| evaluation_id = state.get("evaluation_id") | ||
| event_channel = get_event_channel() | ||
|
|
||
| if evaluation_id: | ||
| event_channel.emit_sync( | ||
| evaluation_id, | ||
| create_sommelier_event( | ||
| evaluation_id=evaluation_id, | ||
| sommelier="rag", | ||
| event_type="enrichment_start", | ||
| progress_percent=0, | ||
| message="RAG context enrichment starting...", | ||
| ), | ||
| ) | ||
|
Comment on lines
+108
to
+118
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to other enrichment nodes in this PR, there's significant code duplication in the event-emitting logic. The To improve maintainability, this could be refactored into a helper function that encapsulates creating and emitting the event. This would make the main function body cleaner and easier to read. Example helper: def _emit_event(event_type: str, progress: int, message: str):
if evaluation_id:
event = create_sommelier_event(
evaluation_id=evaluation_id,
sommelier="rag",
event_type=event_type,
progress_percent=progress,
message=message,
)
event_channel.emit_sync(evaluation_id, event) |
||
|
|
||
| if existing := state.get("rag_context"): | ||
| if evaluation_id: | ||
| event_channel.emit_sync( | ||
| evaluation_id, | ||
| create_sommelier_event( | ||
| evaluation_id=evaluation_id, | ||
| sommelier="rag", | ||
| event_type="enrichment_complete", | ||
| progress_percent=100, | ||
| message="RAG context enrichment complete (cached)", | ||
| ), | ||
| ) | ||
| return {"rag_context": existing} | ||
|
|
||
| repo_context = state.get("repo_context", {}) | ||
| query = _create_query(state) | ||
|
|
||
| if not settings.VERTEX_API_KEY: | ||
| if evaluation_id: | ||
| event_channel.emit_sync( | ||
| evaluation_id, | ||
| create_sommelier_event( | ||
| evaluation_id=evaluation_id, | ||
| sommelier="rag", | ||
| event_type="enrichment_complete", | ||
| progress_percent=100, | ||
| message="RAG enrichment skipped (no API key)", | ||
| ), | ||
| ) | ||
| return { | ||
| "rag_context": { | ||
| "query": query, | ||
|
|
@@ -127,6 +164,17 @@ async def rag_enrich( | |
| try: | ||
| docs = _build_documents_from_context(repo_context) | ||
| if not docs: | ||
| if evaluation_id: | ||
| event_channel.emit_sync( | ||
| evaluation_id, | ||
| create_sommelier_event( | ||
| evaluation_id=evaluation_id, | ||
| sommelier="rag", | ||
| event_type="enrichment_complete", | ||
| progress_percent=100, | ||
| message="RAG enrichment complete (no documents)", | ||
| ), | ||
| ) | ||
| return { | ||
| "rag_context": {"query": query, "chunks": [], "error": None}, | ||
| } | ||
|
|
@@ -145,6 +193,18 @@ async def rag_enrich( | |
| min(settings.RAG_TOP_K, len(docs)), | ||
| ) | ||
|
|
||
| if evaluation_id: | ||
| event_channel.emit_sync( | ||
| evaluation_id, | ||
| create_sommelier_event( | ||
| evaluation_id=evaluation_id, | ||
| sommelier="rag", | ||
| event_type="enrichment_complete", | ||
| progress_percent=100, | ||
| message=f"RAG enrichment complete ({len(chunks)} chunks)", | ||
| ), | ||
| ) | ||
|
|
||
| return { | ||
| "rag_context": {"query": query, "chunks": chunks, "error": None}, | ||
| "trace_metadata": { | ||
|
|
@@ -157,6 +217,17 @@ async def rag_enrich( | |
|
|
||
| except Exception as e: | ||
| logger.warning(f"RAG embedding failed: {e}") | ||
| if evaluation_id: | ||
| event_channel.emit_sync( | ||
| evaluation_id, | ||
| create_sommelier_event( | ||
| evaluation_id=evaluation_id, | ||
| sommelier="rag", | ||
| event_type="enrichment_error", | ||
| progress_percent=100, | ||
| message="RAG enrichment failed due to an internal error.", | ||
| ), | ||
| ) | ||
| return { | ||
| "rag_context": {"query": query, "chunks": [], "error": str(e)}, | ||
| "errors": [f"rag_enrich failed: {e!s}"], | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ | |
|
|
||
| from app.core.config import settings | ||
| from app.graph.state import EvaluationState | ||
| from app.services.event_channel import create_sommelier_event, get_event_channel | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
@@ -23,11 +24,47 @@ async def web_search_enrich( | |
| state: EvaluationState, config: Optional[RunnableConfig] = None | ||
| ) -> Dict[str, Any]: | ||
| started_at = datetime.now(timezone.utc).isoformat() | ||
| evaluation_id = state.get("evaluation_id") | ||
| event_channel = get_event_channel() | ||
|
|
||
| if evaluation_id: | ||
| event_channel.emit_sync( | ||
| evaluation_id, | ||
| create_sommelier_event( | ||
| evaluation_id=evaluation_id, | ||
| sommelier="web_search", | ||
| event_type="enrichment_start", | ||
| progress_percent=0, | ||
| message="Web search enrichment starting...", | ||
| ), | ||
| ) | ||
|
Comment on lines
+30
to
+40
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This block for emitting an SSE event is repeated multiple times throughout the function, with slight variations. This pattern of Consider creating a small helper function to handle event creation and emission. This will reduce code duplication and make the logic more maintainable. Example helper: def _emit_event(event_type: str, progress: int, message: str):
if evaluation_id:
event = create_sommelier_event(
evaluation_id=evaluation_id,
sommelier="web_search",
event_type=event_type,
progress_percent=progress,
message=message,
)
event_channel.emit_sync(evaluation_id, event) |
||
|
|
||
| if existing := state.get("web_search_context"): | ||
| if evaluation_id: | ||
| event_channel.emit_sync( | ||
| evaluation_id, | ||
| create_sommelier_event( | ||
| evaluation_id=evaluation_id, | ||
| sommelier="web_search", | ||
| event_type="enrichment_complete", | ||
| progress_percent=100, | ||
| message="Web search enrichment complete (cached)", | ||
| ), | ||
| ) | ||
| return {"web_search_context": existing} | ||
|
|
||
| if not settings.VERTEX_API_KEY: | ||
| if evaluation_id: | ||
| event_channel.emit_sync( | ||
| evaluation_id, | ||
| create_sommelier_event( | ||
| evaluation_id=evaluation_id, | ||
| sommelier="web_search", | ||
| event_type="enrichment_complete", | ||
| progress_percent=100, | ||
| message="Web search skipped (no API key)", | ||
| ), | ||
| ) | ||
| return { | ||
| "web_search_context": { | ||
| "query": "", | ||
|
|
@@ -85,6 +122,18 @@ async def web_search_enrich( | |
| } | ||
| ) | ||
|
|
||
| if evaluation_id: | ||
| event_channel.emit_sync( | ||
| evaluation_id, | ||
| create_sommelier_event( | ||
| evaluation_id=evaluation_id, | ||
| sommelier="web_search", | ||
| event_type="enrichment_complete", | ||
| progress_percent=100, | ||
| message=f"Web search complete ({len(sources)} sources)", | ||
| ), | ||
| ) | ||
|
|
||
| return { | ||
| "web_search_context": { | ||
| "query": query, | ||
|
|
@@ -103,6 +152,17 @@ async def web_search_enrich( | |
|
|
||
| except Exception as e: | ||
| logger.warning(f"Web search grounding failed: {e}") | ||
| if evaluation_id: | ||
| event_channel.emit_sync( | ||
| evaluation_id, | ||
| create_sommelier_event( | ||
| evaluation_id=evaluation_id, | ||
| sommelier="web_search", | ||
| event_type="enrichment_error", | ||
| progress_percent=100, | ||
| message="Web search failed due to an internal error.", | ||
| ), | ||
| ) | ||
|
Comment on lines
+155
to
+165
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 에러 메시지에 민감한 정보가 포함될 수 있습니다.
에러 이벤트의 메시지에는 일반화된 문자열을 사용하고, 상세 예외 정보는 서버 측 로그에만 남기는 것이 안전합니다. 🤖 Prompt for AI Agents |
||
| return { | ||
| "web_search_context": { | ||
| "query": query, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is significant code duplication in how SSE events are emitted. This block, and four other similar blocks in this file, repeat the
if evaluation_id:check and the call toevent_channel.emit_sync. This pattern is also present inrag_enrich.pyandweb_search_enrich.py.To improve maintainability and reduce redundancy, consider refactoring this logic into a local helper function within
code_analysis_enrich. For example: