Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,7 @@ class NodesManager:
"_dynamic_startup_nodes",
"_moved_exception",
"_event_dispatcher",
"_background_tasks",
"connection_kwargs",
"default_node",
"nodes_cache",
Expand Down Expand Up @@ -1297,6 +1298,7 @@ def __init__(
self.slots_cache: Dict[int, List["ClusterNode"]] = {}
self.read_load_balancer = LoadBalancer()

self._background_tasks: Set[asyncio.Task] = set()
self._dynamic_startup_nodes: bool = dynamic_startup_nodes
self._moved_exception: MovedError = None
if event_dispatcher is None:
Expand Down Expand Up @@ -1331,13 +1333,17 @@ def set_nodes(
if remove_old:
for name in list(old.keys()):
if name not in new:
task = asyncio.create_task(old.pop(name).disconnect()) # noqa
task = asyncio.create_task(old.pop(name).disconnect())
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed an issue where if multiple tasks were created, all but the last one would no longer be strongly referenced. Tasks that are not strongly referenced can be deleted by the GC.


for name, node in new.items():
if name in old:
if old[name] is node:
continue
task = asyncio.create_task(old[name].disconnect()) # noqa
task = asyncio.create_task(old[name].disconnect())
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
old[name] = node

def update_moved_exception(self, exception):
Expand Down