Skip to content

Commit a5ab18f

Browse files
authored
Adding support for HYBRID search. (#3813)
* Adding support for hybrid search. * Adding YIELD_SCORE_AS tests and clearing up support for the keyword * Removing commented test code. * Applying review comments - part 1 * Fixing linters * vset test causes crashes of the test servers in pipeline - changing the problematic tests to use less data(sync and async tests) * Update list concatenation to use extend. Fix spelling error in tests.Extend a test to use two reducers. * Investigate server failures during 8.4 tests * Updating docstrings for combine and scorer * MArking hybrid query related classes as experimental * Allowing multiple load and apply statements
1 parent e6fb505 commit a5ab18f

File tree

11 files changed

+8524
-5246
lines changed

11 files changed

+8524
-5246
lines changed

.github/actions/run-tests/action.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ runs:
142142
sudo apt-get install -y redis-tools
143143
echo "Docker Containers:"
144144
docker ps
145+
echo "Cluster nodes:"
145146
redis-cli -p 16379 CLUSTER NODES
146147
shell: bash
147148

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
---
22
# image tag 8.0-RC2-pre is the one matching the 8.0 GA release
33
x-client-libs-stack-image: &client-libs-stack-image
4-
image: "redislabs/client-libs-test:${CLIENT_LIBS_TEST_STACK_IMAGE_TAG:-8.2}"
4+
image: "redislabs/client-libs-test:${CLIENT_LIBS_TEST_STACK_IMAGE_TAG:-8.4-RC1-pre.2}"
55

66
x-client-libs-image: &client-libs-image
77
image: "redislabs/client-libs-test:${CLIENT_LIBS_TEST_IMAGE_TAG:-8.4-RC1-pre.2}"

dockers/sentinel.conf

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
sentinel resolve-hostnames yes
22
sentinel monitor redis-py-test redis 6379 2
3-
sentinel down-after-milliseconds redis-py-test 5000
4-
sentinel failover-timeout redis-py-test 60000
3+
# Be much more tolerant to transient stalls (index builds, GC, I/O)
4+
sentinel down-after-milliseconds redis-py-test 60000
5+
# Avoid rapid repeated failover attempts
6+
sentinel failover-timeout redis-py-test 180000
7+
# Keep it conservative: sync one replica at a time
58
sentinel parallel-syncs redis-py-test 1

redis/commands/search/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from .commands import (
55
AGGREGATE_CMD,
66
CONFIG_CMD,
7+
HYBRID_CMD,
78
INFO_CMD,
89
PROFILE_CMD,
910
SEARCH_CMD,
@@ -102,6 +103,7 @@ def __init__(self, client, index_name="idx"):
102103
self._RESP2_MODULE_CALLBACKS = {
103104
INFO_CMD: self._parse_info,
104105
SEARCH_CMD: self._parse_search,
106+
HYBRID_CMD: self._parse_hybrid_search,
105107
AGGREGATE_CMD: self._parse_aggregate,
106108
PROFILE_CMD: self._parse_profile,
107109
SPELLCHECK_CMD: self._parse_spellcheck,

redis/commands/search/commands.py

Lines changed: 144 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,25 @@
11
import itertools
22
import time
3-
from typing import Dict, List, Optional, Union
3+
from typing import Any, Dict, List, Optional, Union
44

5+
from redis._parsers.helpers import pairs_to_dict
56
from redis.client import NEVER_DECODE, Pipeline
7+
from redis.commands.search.hybrid_query import (
8+
CombineResultsMethod,
9+
HybridCursorQuery,
10+
HybridPostProcessingConfig,
11+
HybridQuery,
12+
)
13+
from redis.commands.search.hybrid_result import HybridCursorResult, HybridResult
614
from redis.utils import deprecated_function
715

816
from ..helpers import get_protocol_version
917
from ._util import to_string
10-
from .aggregation import AggregateRequest, AggregateResult, Cursor
18+
from .aggregation import (
19+
AggregateRequest,
20+
AggregateResult,
21+
Cursor,
22+
)
1123
from .document import Document
1224
from .field import Field
1325
from .index_definition import IndexDefinition
@@ -47,6 +59,7 @@
4759
SUGGET_COMMAND = "FT.SUGGET"
4860
SYNUPDATE_CMD = "FT.SYNUPDATE"
4961
SYNDUMP_CMD = "FT.SYNDUMP"
62+
HYBRID_CMD = "FT.HYBRID"
5063

5164
NOOFFSETS = "NOOFFSETS"
5265
NOFIELDS = "NOFIELDS"
@@ -84,6 +97,28 @@ def _parse_search(self, res, **kwargs):
8497
field_encodings=kwargs["query"]._return_fields_decode_as,
8598
)
8699

100+
def _parse_hybrid_search(self, res, **kwargs):
101+
res_dict = pairs_to_dict(res, decode_keys=True)
102+
if "cursor" in kwargs:
103+
return HybridCursorResult(
104+
search_cursor_id=int(res_dict["SEARCH"]),
105+
vsim_cursor_id=int(res_dict["VSIM"]),
106+
)
107+
108+
results: List[Dict[str, Any]] = []
109+
# the original results are a list of lists
110+
# we convert them to a list of dicts
111+
for res_item in res_dict["results"]:
112+
item_dict = pairs_to_dict(res_item, decode_keys=True)
113+
results.append(item_dict)
114+
115+
return HybridResult(
116+
total_results=int(res_dict["total_results"]),
117+
results=results,
118+
warnings=res_dict["warnings"],
119+
execution_time=float(res_dict["execution_time"]),
120+
)
121+
87122
def _parse_aggregate(self, res, **kwargs):
88123
return self._get_aggregate_result(res, kwargs["query"], kwargs["has_cursor"])
89124

@@ -470,7 +505,7 @@ def get_params_args(
470505
return []
471506
args = []
472507
if len(query_params) > 0:
473-
args.append("params")
508+
args.append("PARAMS")
474509
args.append(len(query_params) * 2)
475510
for key, value in query_params.items():
476511
args.append(key)
@@ -525,6 +560,59 @@ def search(
525560
SEARCH_CMD, res, query=query, duration=(time.monotonic() - st) * 1000.0
526561
)
527562

563+
def hybrid_search(
564+
self,
565+
query: HybridQuery,
566+
combine_method: Optional[CombineResultsMethod] = None,
567+
post_processing: Optional[HybridPostProcessingConfig] = None,
568+
params_substitution: Optional[Dict[str, Union[str, int, float, bytes]]] = None,
569+
timeout: Optional[int] = None,
570+
cursor: Optional[HybridCursorQuery] = None,
571+
) -> Union[HybridResult, HybridCursorResult, Pipeline]:
572+
"""
573+
Execute a hybrid search using both text and vector queries
574+
575+
Args:
576+
- **query**: HybridQuery object
577+
Contains the text and vector queries
578+
- **combine_method**: CombineResultsMethod object
579+
Contains the combine method and parameters
580+
- **post_processing**: HybridPostProcessingConfig object
581+
Contains the post processing configuration
582+
- **params_substitution**: Dict[str, Union[str, int, float, bytes]]
583+
Contains the parameters substitution
584+
- **timeout**: int - contains the timeout in milliseconds
585+
- **cursor**: HybridCursorQuery object - contains the cursor configuration
586+
587+
588+
For more information see `FT.SEARCH <https://redis.io/commands/ft.hybrid>`.
589+
"""
590+
index = self.index_name
591+
options = {}
592+
pieces = [HYBRID_CMD, index]
593+
pieces.extend(query.get_args())
594+
if combine_method:
595+
pieces.extend(combine_method.get_args())
596+
if post_processing:
597+
pieces.extend(post_processing.build_args())
598+
if params_substitution:
599+
pieces.extend(self.get_params_args(params_substitution))
600+
if timeout:
601+
pieces.extend(("TIMEOUT", timeout))
602+
if cursor:
603+
options["cursor"] = True
604+
pieces.extend(cursor.build_args())
605+
606+
if get_protocol_version(self.client) not in ["3", 3]:
607+
options[NEVER_DECODE] = True
608+
609+
res = self.execute_command(*pieces, **options)
610+
611+
if isinstance(res, Pipeline):
612+
return res
613+
614+
return self._parse_results(HYBRID_CMD, res, **options)
615+
528616
def explain(
529617
self,
530618
query: Union[str, Query],
@@ -965,6 +1053,59 @@ async def search(
9651053
SEARCH_CMD, res, query=query, duration=(time.monotonic() - st) * 1000.0
9661054
)
9671055

1056+
async def hybrid_search(
1057+
self,
1058+
query: HybridQuery,
1059+
combine_method: Optional[CombineResultsMethod] = None,
1060+
post_processing: Optional[HybridPostProcessingConfig] = None,
1061+
params_substitution: Optional[Dict[str, Union[str, int, float, bytes]]] = None,
1062+
timeout: Optional[int] = None,
1063+
cursor: Optional[HybridCursorQuery] = None,
1064+
) -> Union[HybridResult, HybridCursorResult, Pipeline]:
1065+
"""
1066+
Execute a hybrid search using both text and vector queries
1067+
1068+
Args:
1069+
- **query**: HybridQuery object
1070+
Contains the text and vector queries
1071+
- **combine_method**: CombineResultsMethod object
1072+
Contains the combine method and parameters
1073+
- **post_processing**: HybridPostProcessingConfig object
1074+
Contains the post processing configuration
1075+
- **params_substitution**: Dict[str, Union[str, int, float, bytes]]
1076+
Contains the parameters substitution
1077+
- **timeout**: int - contains the timeout in milliseconds
1078+
- **cursor**: HybridCursorQuery object - contains the cursor configuration
1079+
1080+
1081+
For more information see `FT.SEARCH <https://redis.io/commands/ft.hybrid>`.
1082+
"""
1083+
index = self.index_name
1084+
options = {}
1085+
pieces = [HYBRID_CMD, index]
1086+
pieces.extend(query.get_args())
1087+
if combine_method:
1088+
pieces.extend(combine_method.get_args())
1089+
if post_processing:
1090+
pieces.extend(post_processing.build_args())
1091+
if params_substitution:
1092+
pieces.extend(self.get_params_args(params_substitution))
1093+
if timeout:
1094+
pieces.extend(("TIMEOUT", timeout))
1095+
if cursor:
1096+
options["cursor"] = True
1097+
pieces.extend(cursor.build_args())
1098+
1099+
if get_protocol_version(self.client) not in ["3", 3]:
1100+
options[NEVER_DECODE] = True
1101+
1102+
res = await self.execute_command(*pieces, **options)
1103+
1104+
if isinstance(res, Pipeline):
1105+
return res
1106+
1107+
return self._parse_results(HYBRID_CMD, res, **options)
1108+
9681109
async def aggregate(
9691110
self,
9701111
query: Union[AggregateResult, Cursor],

0 commit comments

Comments
 (0)