Conversation
|
close: PR #1245 |
There was a problem hiding this comment.
Pull request overview
This PR removes the standalone OpenSearch filter-merge utility (and its unit tests), updates the OpenSearch Langflow component(s) and flow definitions to no longer import that utility, and simplifies the Langflow Docker image by removing the extra src copy + PYTHONPATH override.
Changes:
- Deleted
src/utils/opensearch_filter_merge.pyand its unit tests. - Updated OpenSearch component logic (and embedded flow code) to perform filter/limit merging internally, plus added request timeout/retry configuration.
- Updated
Dockerfile.langflowto stop copyingsrc/and settingPYTHONPATH.
Reviewed changes
Copilot reviewed 4 out of 9 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/unit/test_opensearch_filter_merge.py | Removed unit tests for the deleted OpenSearch filter merge utility. |
| src/utils/opensearch_filter_merge.py | Removed the shared filter merge/coercion implementation. |
| flows/openrag_agent.json | Updated embedded OpenSearch component code to remove the deleted util import and adjust filtering/timeouts. |
| flows/ingestion_flow.json | Same as above for ingestion flow’s embedded OpenSearch component. |
| flows/components/opensearch_multimodal.py | Inlined filter coercion/merge behavior, added timeout/retry inputs, and changed raw_search + aggregation behavior. |
| Dockerfile.langflow | Removed COPY src ... and PYTHONPATH injection previously needed for util imports. |
Comments suppressed due to low confidence (1)
flows/components/opensearch_multimodal.py:314
auth_modenow defaults to"jwt", but the input field visibility defaults still showusername/passwordand hidejwt_token/jwt_header/bearer_prefix. If the UI doesn’t invokeupdate_build_configon initial render, the form will be inconsistent with the selected auth mode. Align the initialshowflags (or keep the default auth mode asbasic) so the component is usable without a manual toggle.
DropdownInput(
name="auth_mode",
display_name="Authentication Mode",
value="jwt",
options=["basic", "jwt"],
info=(
"Authentication method: 'basic' for username/password authentication, "
"or 'jwt' for JSON Web Token (Bearer) authentication."
),
real_time_refresh=True,
advanced=False,
),
StrInput(
name="username",
display_name="Username",
value="admin",
show=True,
),
SecretStrInput(
name="password",
display_name="OpenSearch Password",
value="admin",
show=True,
),
SecretStrInput(
name="jwt_token",
display_name="JWT Token",
value="JWT",
load_from_db=False,
show=False,
info=(
"Valid JSON Web Token for authentication. "
"Will be sent in the Authorization header (with optional 'Bearer ' prefix)."
),
),
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| field_mapping = { | ||
| "data_sources": "filename", | ||
| "document_types": "mimetype", | ||
| "owners": "owner", |
There was a problem hiding this comment.
The context-style field_mapping here omits connector_types -> connector_type. Elsewhere in the codebase (e.g., chat/search services) connector_types is a supported filter key, so this component will silently generate filters on the wrong field (or not map at all), causing connector-type scoping to stop working. Add the missing mapping (and keep it consistent with src/services/chat_service.py).
| "owners": "owner", | |
| "owners": "owner", | |
| "connector_types": "connector_type", |
| @@ -391,13 +423,43 @@ | |||
| msg = f"Invalid filter_expression JSON: {e}" | |||
| raise ValueError(msg) from e | |||
|
|
|||
| body = apply_chat_filter_expression_to_search_body(raw_query, filter_obj) | |||
| filter_clauses = self._coerce_filter_clauses(filter_obj) | |||
|
|
|||
| if filter_clauses: | |||
| if "query" in query_body: | |||
| original_query = query_body["query"] | |||
| query_body["query"] = { | |||
| "bool": { | |||
| "must": [original_query], | |||
| "filter": filter_clauses, | |||
| } | |||
| } | |||
| else: | |||
| query_body["query"] = { | |||
| "bool": { | |||
| "must": [{"match_all": {}}], | |||
| "filter": filter_clauses, | |||
| } | |||
| } | |||
|
|
|||
There was a problem hiding this comment.
raw_search assigns query_body = raw_query when raw_query is a dict, then mutates query_body when applying filters/limits. This will mutate the caller-provided dict (and potentially self.search_query if it was already parsed), which is surprising and can cause hard-to-track side effects. Make query_body a deep copy (or at least a shallow copy) before any modifications.
| filter_clauses = self._coerce_filter_clauses(filter_obj) | ||
|
|
||
| if filter_clauses: | ||
| if "query" in query_body: | ||
| original_query = query_body["query"] | ||
| query_body["query"] = { | ||
| "bool": { | ||
| "must": [original_query], | ||
| "filter": filter_clauses, | ||
| } | ||
| } | ||
| else: | ||
| query_body["query"] = { | ||
| "bool": { | ||
| "must": [{"match_all": {}}], | ||
| "filter": filter_clauses, | ||
| } | ||
| } |
There was a problem hiding this comment.
raw_search previously handled top-level knn queries by injecting filters into each knn field’s filter (because query filters do not constrain top-level knn execution in OpenSearch). The new logic only modifies query, so filter_expression will not scope results for raw bodies that use a top-level knn block. Consider restoring the per-field knn.filter injection behavior for top-level knn payloads.
| if not knn_queries_with_candidates: | ||
| # No valid fields found - this can happen when: | ||
| # 1. Index is empty (no documents yet) | ||
| # 2. KNN mappings are missing/broken for current embedding model | ||
| # 2. Embedding model has changed and field doesn't exist yet | ||
| # Return empty results instead of failing | ||
| logger.warning( | ||
| "No valid knn_vector fields found for embedding models. " | ||
| "Falling back to keyword-only retrieval." | ||
| "This may indicate an empty index or missing field mappings. " | ||
| "Returning empty search results." | ||
| ) | ||
| self.log( | ||
| f"[WARN] No valid KNN queries could be built. " | ||
| f"Query embeddings generated: {list(query_embeddings.keys())}, " | ||
| f"but no matching knn_vector fields found in index." | ||
| ) | ||
| resp = run_keyword_fallback("no valid knn_vector fields available") | ||
| hits = resp.get("hits", {}).get("hits", []) | ||
| logger.info(f"Found {len(hits)} results (keyword fallback)") | ||
| return [ | ||
| { | ||
| "page_content": hit["_source"].get("text", ""), | ||
| "metadata": {k: v for k, v in hit["_source"].items() if k != "text"}, | ||
| "score": hit.get("_score"), | ||
| } | ||
| for hit in hits | ||
| ] | ||
| return [] |
There was a problem hiding this comment.
When no valid knn_vector fields are available, search() now returns [] immediately. Previously this path fell back to keyword-only retrieval, which would still return results for indexes missing KNN mappings or before vectors are indexed. Returning empty results can make the component appear broken in common transitional states; consider restoring the keyword fallback (or making this behavior configurable).
| if "term" in f and isinstance(f["term"], dict) and not self._is_placeholder_term(f["term"]): | ||
| explicit_clauses.append(f) | ||
| elif "terms" in f and isinstance(f["terms"], dict): | ||
| field, vals = next(iter(f["terms"].items())) |
There was a problem hiding this comment.
In _coerce_filter_clauses, the terms branch does next(iter(f["terms"].items())) without guarding against an empty {}. If a client sends { "terms": {} } this will raise StopIteration and break searches. Add an explicit empty-check (or safe extraction) before calling next(iter(...)) and skip invalid/empty terms clauses.
| field, vals = next(iter(f["terms"].items())) | |
| # Guard against empty or invalid "terms" dicts to avoid StopIteration | |
| terms_dict = f["terms"] | |
| if not terms_dict: | |
| # Skip empty terms clauses | |
| continue | |
| field, vals = next(iter(terms_dict.items())) |
| query_body["query"] = { | ||
| "bool": { | ||
| "must": [original_query], | ||
| "filter": filter_clauses, | ||
| } | ||
| } |
There was a problem hiding this comment.
Filter injection in raw_search always wraps the existing query under bool.must. If the incoming query is already a bool query (especially with its own filter/must/should), this changes semantics and can lose/override existing bool.filter behavior. Prefer merging into an existing query.bool.filter (append/extend) when present, and only wrap when query is non-bool.
| query_body["query"] = { | |
| "bool": { | |
| "must": [original_query], | |
| "filter": filter_clauses, | |
| } | |
| } | |
| # If the existing query is already a bool query, merge filters into it | |
| if isinstance(original_query, dict) and "bool" in original_query and isinstance( | |
| original_query["bool"], dict | |
| ): | |
| bool_q = original_query["bool"] | |
| existing_filter = bool_q.get("filter") | |
| if existing_filter is None: | |
| bool_q["filter"] = filter_clauses | |
| elif isinstance(existing_filter, list): | |
| bool_q["filter"] = existing_filter + filter_clauses | |
| else: | |
| bool_q["filter"] = [existing_filter] + filter_clauses | |
| query_body["query"] = original_query | |
| else: | |
| # Wrap non-bool queries in a bool with the additional filter clauses | |
| query_body["query"] = { | |
| "bool": { | |
| "must": [original_query], | |
| "filter": filter_clauses, | |
| } | |
| } |
| "document_types": {"terms": {"field": "mimetype", "size": 10}}, | ||
| "owners": {"terms": {"field": "owner", "size": 10}}, | ||
| "embedding_models": {"terms": {"field": "embedding_model", "size": 10}}, |
There was a problem hiding this comment.
The aggregations for document_types and owners use mimetype / owner directly. In typical OpenSearch dynamic mappings these are text fields with a .keyword subfield; running terms aggs on the text field can raise fielddata errors. Either aggregate on the keyword subfields (as before) or add mapping-aware selection similar to _get_filename_agg_field for these fields too.
| "document_types": {"terms": {"field": "mimetype", "size": 10}}, | |
| "owners": {"terms": {"field": "owner", "size": 10}}, | |
| "embedding_models": {"terms": {"field": "embedding_model", "size": 10}}, | |
| "document_types": {"terms": {"field": "mimetype.keyword", "size": 10}}, | |
| "owners": {"terms": {"field": "owner.keyword", "size": 10}}, | |
| "embedding_models": {"terms": {"field": "embedding_model.keyword", "size": 10}}, |
| def raw_search(self, query: str | dict | None = None) -> Data: | ||
| """Execute a raw OpenSearch query against the target index. | ||
|
|
||
| When ``filter_expression`` is set (e.g. chat knowledge scope), the same filter | ||
| clauses and optional ``limit`` / ``score_threshold`` as ``search_documents`` are | ||
| applied via :func:`apply_chat_filter_expression_to_search_body`. | ||
|
|
||
| Args: | ||
| query (dict[str, Any]): The OpenSearch query DSL dictionary. | ||
|
|
||
| Returns: | ||
| Data: Search results as a Data object. | ||
|
|
||
| Raises: | ||
| ValueError: If the query string is invalid JSON, the parsed body is not a JSON object, | ||
| or ``filter_expression`` is invalid JSON. | ||
| ValueError: If 'query' is not a valid OpenSearch query (must be a non-empty dict). | ||
| """ | ||
| raw_query = query if query is not None else self.search_query |
There was a problem hiding this comment.
PR description says the OpenSearch filter-merge utilities are “no longer needed”, but this file reintroduces equivalent filter coercion/merge behavior inside the component (raw_search filter injection + _coerce_filter_clauses). Either update the PR description to reflect that the logic is being moved (not removed), or remove the duplicated logic if it’s truly unnecessary.
4388a42 to
e5dec88
Compare
Parse and apply the component's filter_expression (same JSON parsing as search()) when handling raw_search queries in the multimodal multi-embedding OpenSearch component. The change: - JSON-decode filter_expression and raise a ValueError on parse errors. - Coerce filter object into ES filter clauses via _coerce_filter_clauses and wrap the existing query into a bool query with those filter clauses (or use match_all if no original query). - If a filter object is present, apply its limit to query_body.size when not already set, and map score_threshold / scoreThreshold to query_body.min_score if not already set and > 0. This ensures consistent filtering, sizing, and min_score behavior for raw_search like the search() path.
Change the default value of the auth_mode DropdownInput from 'basic' to 'jwt' in OpenSearchVectorStoreComponentMultimodalMultiEmbedding (flows/components/opensearch_multimodal.py). This makes JWT the default authentication method while keeping the available options ['basic', 'jwt'].
e5dec88 to
f4206fc
Compare
Screen.Recording.2026-03-24.at.6.02.13.PM.mov |


This pull request removes the OpenSearch filter merge utility and its related test suite from the codebase. These utilities were previously used to merge chat/knowledge filter clauses into OpenSearch search request bodies, but are no longer needed in this repository. The Dockerfile is also updated to stop copying the OpenRAG utils source directory and setting the
PYTHONPATHenvironment variable.The most important changes are:
Removal of OpenSearch filter merge utilities:
src/utils/opensearch_filter_merge.pyfile, which contained all logic for merging chat filter expressions, applying limits, and integrating filters into OpenSearch request bodies.tests/unit/test_opensearch_filter_merge.py, which included comprehensive unit tests for the filter merge utilities.Dockerfile update:
Dockerfile.langflowto stop copying the OpenRAG utils source (src) into the image and removed thePYTHONPATHenvironment variable, since the OpenSearch filter merge code is no longer present or required.