Skip to content

Commit 9627622

Browse files
jac0626silas.jiang
andauthored
fix(async): include event loop ID in connection alias to prevent reusing closed connections (#3086)
# Description This PR fixes a critical issue where `AsyncMilvusClient` could reuse a connection associated with a closed event loop, causing `RuntimeError: Event loop is closed` when running multiple async tasks or tests sequentially. ## Problem * The `create_connection` utility generates a connection alias based on URI, user, and other parameters but **ignores the running event loop**. * In `AsyncMilvusClient`, connections are cached globally by this alias. * If an event loop is closed (e.g., after `asyncio.run()` finishes) and a new loop is started, the client reuses the **cached connection** which is bound to the **old, closed** loop. * This results in errors like: `RuntimeError: Event loop is closed` when trying to use the client in the new loop. ## Solution * Updated `create_connection` in `pymilvus/milvus_client/_utils.py` to include the **current event loop ID** in the connection alias when `use_async=True`. * **New Alias Format:** `async-{uri}-{hash}-loop{loop_id}` * This ensures that each event loop gets its own dedicated connection instance. * Added a check to raise `RuntimeError` if no running loop is found in an async context (fail-fast instead of silent failure). ## Verification * Added a new unit test `tests/test_async_milvus_client_reuse.py` to reproduce the issue and verify the fix. * The test confirms that different event loops generate different connection aliases, preventing invalid reuse. * Existing tests pass. ## Related Issue #3087 Co-authored-by: silas.jiang <silas.jiang@zilliz.com>
1 parent ffeedbd commit 9627622

File tree

2 files changed

+58
-2
lines changed

2 files changed

+58
-2
lines changed

pymilvus/milvus_client/_utils.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import hashlib
23
import logging
34

@@ -33,8 +34,15 @@ def create_connection(
3334
md5.update(token.encode())
3435
auth_fmt = f"{md5.hexdigest()}"
3536

36-
# different uri, auth, db_name cannot share the same connection
37-
not_empty = [v for v in [use_async_fmt, uri, db_name, auth_fmt] if v]
37+
# For async connections, include event loop ID in alias to prevent
38+
# reusing connections from closed event loops
39+
loop_id_fmt = ""
40+
if use_async:
41+
loop = asyncio.get_running_loop()
42+
loop_id_fmt = f"loop{id(loop)}"
43+
44+
# different uri, auth, db_name, and event loop (for async) cannot share the same connection
45+
not_empty = [v for v in [use_async_fmt, uri, db_name, auth_fmt, loop_id_fmt] if v]
3846
using = "-".join(not_empty)
3947

4048
if connections.has_connection(using):
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import asyncio
2+
import pytest
3+
from unittest.mock import patch, MagicMock
4+
from pymilvus import AsyncMilvusClient
5+
6+
class TestAsyncConnectionReuse:
7+
def test_async_client_alias_different_loops(self):
8+
"""
9+
Test that AsyncMilvusClient generates different connection aliases
10+
for different event loops to avoid reusing closed connections.
11+
"""
12+
uri = "http://localhost:19530"
13+
14+
async def create_client():
15+
# Mock connections.connect to avoid real network connection
16+
# Mock utility.get_server_type to avoid checking server version
17+
with patch("pymilvus.orm.connections.Connections.connect") as mock_connect, \
18+
patch("pymilvus.orm.utility.get_server_type", return_value="milvus"):
19+
20+
client = AsyncMilvusClient(uri=uri)
21+
# Return the alias used and the current loop id
22+
return client._using, id(asyncio.get_running_loop())
23+
24+
# Manually create loops to ensure we have distinct objects
25+
loop1 = asyncio.new_event_loop()
26+
loop2 = asyncio.new_event_loop()
27+
28+
try:
29+
# Run in loop 1
30+
alias1, loop1_id = loop1.run_until_complete(create_client())
31+
32+
# Run in loop 2
33+
alias2, loop2_id = loop2.run_until_complete(create_client())
34+
35+
print(f"Loop 1 ID: {loop1_id}, Alias 1: {alias1}")
36+
print(f"Loop 2 ID: {loop2_id}, Alias 2: {alias2}")
37+
38+
# Since loop1 and loop2 are both alive (referenced), they must have different IDs
39+
assert loop1_id != loop2_id, "Two active loops must have different IDs"
40+
assert alias1 != alias2, "Aliases should be different for different loops"
41+
42+
# Check if loop ID is part of the alias
43+
assert f"loop{loop1_id}" in alias1
44+
assert f"loop{loop2_id}" in alias2
45+
46+
finally:
47+
loop1.close()
48+
loop2.close()

0 commit comments

Comments
 (0)