Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ invoke==2.2.0
mock
packaging>=20.4
pytest
pytest-asyncio>=0.23.0
pytest-asyncio>=0.24.0
pytest-cov
pytest-profiling==1.8.1
pytest-timeout
Expand Down
2 changes: 1 addition & 1 deletion redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def int_or_str(value):
return value


__version__ = "6.2.0"
__version__ = "6.4.0"
VERSION = tuple(map(int_or_str, __version__.split(".")))


Expand Down
263 changes: 133 additions & 130 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ async def _execute_command(
# Reset the counter
self.reinitialize_counter = 0
else:
self.nodes_manager._moved_exception = e
self.nodes_manager.move_slot(e)
moved = True
except AskError as e:
redirect_addr = get_node_name(host=e.host, port=e.port)
Expand Down Expand Up @@ -1266,12 +1266,13 @@ async def _mock(self, error: RedisError):
class NodesManager:
__slots__ = (
"_dynamic_startup_nodes",
"_moved_exception",
"_event_dispatcher",
"connection_kwargs",
"default_node",
"nodes_cache",
"_epoch",
"read_load_balancer",
"_initialize_lock",
"require_full_coverage",
"slots_cache",
"startup_nodes",
Expand All @@ -1295,10 +1296,11 @@ def __init__(
self.default_node: "ClusterNode" = None
self.nodes_cache: Dict[str, "ClusterNode"] = {}
self.slots_cache: Dict[int, List["ClusterNode"]] = {}
self._epoch: int = 0
self.read_load_balancer = LoadBalancer()
self._initialize_lock: asyncio.Lock = asyncio.Lock()

self._dynamic_startup_nodes: bool = dynamic_startup_nodes
self._moved_exception: MovedError = None
if event_dispatcher is None:
self._event_dispatcher = EventDispatcher()
else:
Expand Down Expand Up @@ -1340,11 +1342,7 @@ def set_nodes(
task = asyncio.create_task(old[name].disconnect()) # noqa
old[name] = node

def update_moved_exception(self, exception):
self._moved_exception = exception

def _update_moved_slots(self) -> None:
e = self._moved_exception
def move_slot(self, e: AskError | MovedError):
redirected_node = self.get_node(host=e.host, port=e.port)
if redirected_node:
# The node already exists
Expand Down Expand Up @@ -1378,18 +1376,13 @@ def _update_moved_slots(self) -> None:
# shard. We need to remove all current nodes from the slot's list
# (including replications) and add just the new node.
self.slots_cache[e.slot_id] = [redirected_node]
# Reset moved_exception
self._moved_exception = None

def get_node_from_slot(
self,
slot: int,
read_from_replicas: bool = False,
load_balancing_strategy=None,
) -> "ClusterNode":
if self._moved_exception:
self._update_moved_slots()

if read_from_replicas is True and load_balancing_strategy is None:
load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN

Expand Down Expand Up @@ -1423,135 +1416,147 @@ async def initialize(self) -> None:
startup_nodes_reachable = False
fully_covered = False
exception = None
# Convert to tuple to prevent RuntimeError if self.startup_nodes
# is modified during iteration
for startup_node in tuple(self.startup_nodes.values()):
try:
# Make sure cluster mode is enabled on this node
epoch = self._epoch

async with self._initialize_lock:
if self._epoch != epoch:
# another initialize call has already reinitialized the
# nodes since we started waiting for the lock;
# we don't need to do it again.
return

# Convert to tuple to prevent RuntimeError if self.startup_nodes
# is modified during iteration
for startup_node in tuple(self.startup_nodes.values()):
try:
self._event_dispatcher.dispatch(
AfterAsyncClusterInstantiationEvent(
self.nodes_cache,
self.connection_kwargs.get("credential_provider", None),
# Make sure cluster mode is enabled on this node
try:
self._event_dispatcher.dispatch(
AfterAsyncClusterInstantiationEvent(
self.nodes_cache,
self.connection_kwargs.get("credential_provider", None),
)
)
)
cluster_slots = await startup_node.execute_command("CLUSTER SLOTS")
except ResponseError:
raise RedisClusterException(
"Cluster mode is not enabled on this node"
)
startup_nodes_reachable = True
except Exception as e:
# Try the next startup node.
# The exception is saved and raised only if we have no more nodes.
exception = e
continue

# CLUSTER SLOTS command results in the following output:
# [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
# where each node contains the following list: [IP, port, node_id]
# Therefore, cluster_slots[0][2][0] will be the IP address of the
# primary node of the first slot section.
# If there's only one server in the cluster, its ``host`` is ''
# Fix it to the host in startup_nodes
if (
len(cluster_slots) == 1
and not cluster_slots[0][2][0]
and len(self.startup_nodes) == 1
):
cluster_slots[0][2][0] = startup_node.host

for slot in cluster_slots:
for i in range(2, len(slot)):
slot[i] = [str_if_bytes(val) for val in slot[i]]
primary_node = slot[2]
host = primary_node[0]
if host == "":
host = startup_node.host
port = int(primary_node[1])
host, port = self.remap_host_port(host, port)

nodes_for_slot = []

target_node = tmp_nodes_cache.get(get_node_name(host, port))
if not target_node:
target_node = ClusterNode(
host, port, PRIMARY, **self.connection_kwargs
)
# add this node to the nodes cache
tmp_nodes_cache[target_node.name] = target_node
nodes_for_slot.append(target_node)

replica_nodes = slot[3:]
for replica_node in replica_nodes:
host = replica_node[0]
port = replica_node[1]
cluster_slots = await startup_node.execute_command(
"CLUSTER SLOTS"
)
except ResponseError:
raise RedisClusterException(
"Cluster mode is not enabled on this node"
)
startup_nodes_reachable = True
except Exception as e:
# Try the next startup node.
# The exception is saved and raised only if we have no more nodes.
exception = e
continue

# CLUSTER SLOTS command results in the following output:
# [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
# where each node contains the following list: [IP, port, node_id]
# Therefore, cluster_slots[0][2][0] will be the IP address of the
# primary node of the first slot section.
# If there's only one server in the cluster, its ``host`` is ''
# Fix it to the host in startup_nodes
if (
len(cluster_slots) == 1
and not cluster_slots[0][2][0]
and len(self.startup_nodes) == 1
):
cluster_slots[0][2][0] = startup_node.host

for slot in cluster_slots:
for i in range(2, len(slot)):
slot[i] = [str_if_bytes(val) for val in slot[i]]
primary_node = slot[2]
host = primary_node[0]
if host == "":
host = startup_node.host
port = int(primary_node[1])
host, port = self.remap_host_port(host, port)

target_replica_node = tmp_nodes_cache.get(get_node_name(host, port))
if not target_replica_node:
target_replica_node = ClusterNode(
host, port, REPLICA, **self.connection_kwargs
nodes_for_slot = []

target_node = tmp_nodes_cache.get(get_node_name(host, port))
if not target_node:
target_node = ClusterNode(
host, port, PRIMARY, **self.connection_kwargs
)
# add this node to the nodes cache
tmp_nodes_cache[target_replica_node.name] = target_replica_node
nodes_for_slot.append(target_replica_node)
tmp_nodes_cache[target_node.name] = target_node
nodes_for_slot.append(target_node)

for i in range(int(slot[0]), int(slot[1]) + 1):
if i not in tmp_slots:
tmp_slots[i] = nodes_for_slot
else:
# Validate that 2 nodes want to use the same slot cache
# setup
tmp_slot = tmp_slots[i][0]
if tmp_slot.name != target_node.name:
disagreements.append(
f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
)
replica_nodes = slot[3:]
for replica_node in replica_nodes:
host = replica_node[0]
port = replica_node[1]
host, port = self.remap_host_port(host, port)

if len(disagreements) > 5:
raise RedisClusterException(
f"startup_nodes could not agree on a valid "
f"slots cache: {', '.join(disagreements)}"
target_replica_node = tmp_nodes_cache.get(
get_node_name(host, port)
)
if not target_replica_node:
target_replica_node = ClusterNode(
host, port, REPLICA, **self.connection_kwargs
)
# add this node to the nodes cache
tmp_nodes_cache[target_replica_node.name] = target_replica_node
nodes_for_slot.append(target_replica_node)

for i in range(int(slot[0]), int(slot[1]) + 1):
if i not in tmp_slots:
tmp_slots[i] = nodes_for_slot
else:
# Validate that 2 nodes want to use the same slot cache
# setup
tmp_slot = tmp_slots[i][0]
if tmp_slot.name != target_node.name:
disagreements.append(
f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
)

# Validate if all slots are covered or if we should try next startup node
fully_covered = True
for i in range(REDIS_CLUSTER_HASH_SLOTS):
if i not in tmp_slots:
fully_covered = False
if len(disagreements) > 5:
raise RedisClusterException(
f"startup_nodes could not agree on a valid "
f"slots cache: {', '.join(disagreements)}"
)

# Validate if all slots are covered or if we should try next startup node
fully_covered = True
for i in range(REDIS_CLUSTER_HASH_SLOTS):
if i not in tmp_slots:
fully_covered = False
break
if fully_covered:
break
if fully_covered:
break

if not startup_nodes_reachable:
raise RedisClusterException(
f"Redis Cluster cannot be connected. Please provide at least "
f"one reachable node: {str(exception)}"
) from exception

# Check if the slots are not fully covered
if not fully_covered and self.require_full_coverage:
# Despite the requirement that the slots be covered, there
# isn't a full coverage
raise RedisClusterException(
f"All slots are not covered after query all startup_nodes. "
f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
f"covered..."
)
if not startup_nodes_reachable:
raise RedisClusterException(
f"Redis Cluster cannot be connected. Please provide at least "
f"one reachable node: {str(exception)}"
) from exception

# Check if the slots are not fully covered
if not fully_covered and self.require_full_coverage:
# Despite the requirement that the slots be covered, there
# isn't a full coverage
raise RedisClusterException(
f"All slots are not covered after query all startup_nodes. "
f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
f"covered..."
)

# Set the tmp variables to the real variables
self.slots_cache = tmp_slots
self.set_nodes(self.nodes_cache, tmp_nodes_cache, remove_old=True)
# Set the tmp variables to the real variables
self.slots_cache = tmp_slots
self.set_nodes(self.nodes_cache, tmp_nodes_cache, remove_old=True)

if self._dynamic_startup_nodes:
# Populate the startup nodes with all discovered nodes
self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)
if self._dynamic_startup_nodes:
# Populate the startup nodes with all discovered nodes
self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)

# Set the default node
self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
# If initialize was called after a MovedError, clear it
self._moved_exception = None
# Set the default node
self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
self._epoch += 1

async def aclose(self, attr: str = "nodes_cache") -> None:
self.default_node = None
Expand Down Expand Up @@ -2255,9 +2260,7 @@ async def _reinitialize_on_error(self, error):
self.reinitialize_counter = 0
else:
if isinstance(error, AskError):
self._pipe.cluster_client.nodes_manager.update_moved_exception(
error
)
self._pipe.cluster_client.nodes_manager.move_slot(error)

self._executing = False

Expand Down
Loading