Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
ae45464
PYTHON-5517 Updates to connection pool backoff
blink1073 Oct 22, 2025
a4dd0f1
wip add tests
blink1073 Oct 23, 2025
25ab418
update tests
blink1073 Oct 23, 2025
58602c7
update sdam tests
blink1073 Oct 24, 2025
d3a4958
wip update tests
blink1073 Oct 24, 2025
1895e00
Merge branch 'backpressure' of github.com:mongodb/mongo-python-driver…
blink1073 Oct 24, 2025
76c4ee6
Revert "Merge branch 'backpressure' of github.com:mongodb/mongo-pytho…
blink1073 Oct 24, 2025
546976d
Revert "wip update tests"
blink1073 Oct 24, 2025
e52ecdf
wip update tests
blink1073 Oct 24, 2025
5ef7656
update to branch
blink1073 Oct 24, 2025
6d8369f
wip
blink1073 Oct 24, 2025
24542c9
fix backoff logic
blink1073 Oct 24, 2025
5e64aa9
fix race condition
blink1073 Oct 24, 2025
f67195e
update to use durationms
blink1073 Oct 24, 2025
873d1f1
add test that transitions from backoff to clear
blink1073 Oct 24, 2025
02aec91
clean up the tests
blink1073 Oct 24, 2025
73ff3d6
update logging test
blink1073 Oct 24, 2025
c70b66c
fix typing
blink1073 Oct 24, 2025
f20cc0a
add final test
blink1073 Oct 25, 2025
e905b9b
fix ready condition
blink1073 Oct 25, 2025
d228f08
wip incorporate design changes
blink1073 Oct 27, 2025
73e78b6
update tests
blink1073 Oct 28, 2025
adc1375
PYTHON-5627 - Update feedback link (#2601)
NoahStapp Oct 24, 2025
d6d43e7
fix tests
blink1073 Oct 28, 2025
36d4490
fix tests
blink1073 Oct 28, 2025
f936b1b
update backoff logic and fix test
blink1073 Oct 28, 2025
714bc31
fix test
blink1073 Oct 28, 2025
2748749
address failure
blink1073 Oct 28, 2025
2c3c9ad
revert changes to lb test
blink1073 Oct 28, 2025
84f3b68
more test cleanup
blink1073 Oct 28, 2025
ca6c981
more test cleanup
blink1073 Oct 28, 2025
94bc9a3
fix load balancer test
blink1073 Oct 28, 2025
85d2a6b
fix load balancer test
blink1073 Oct 28, 2025
c75ea23
clean up tests
blink1073 Oct 29, 2025
b2b4507
try pypy 3.11
blink1073 Oct 29, 2025
7411be6
add logic for multiple pending connections
blink1073 Oct 29, 2025
c2c8d40
fix race condition in tests
blink1073 Oct 29, 2025
d0aa7c7
undo change to flaky condition
blink1073 Oct 29, 2025
65eb2dc
fix test format
blink1073 Oct 30, 2025
063238d
update schema version
blink1073 Oct 30, 2025
0e9c29a
formatting
blink1073 Oct 30, 2025
a2aec86
update tests
blink1073 Oct 30, 2025
fd63597
fix supported schema version
blink1073 Oct 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 60 additions & 60 deletions .evergreen/generated_configs/tasks.yml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion .evergreen/scripts/generate_config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

ALL_VERSIONS = ["4.2", "4.4", "5.0", "6.0", "7.0", "8.0", "rapid", "latest"]
CPYTHONS = ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"]
PYPYS = ["pypy3.10"]
PYPYS = ["pypy3.11"]
ALL_PYTHONS = CPYTHONS + PYPYS
MIN_MAX_PYTHON = [CPYTHONS[0], CPYTHONS[-1]]
BATCHTIME_WEEK = 10080
Expand Down
2 changes: 1 addition & 1 deletion doc/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ project.

Feature Requests / Feedback
---------------------------
Use our `feedback engine <https://feedback.mongodb.com/forums/924286-drivers>`_
Use our `feedback engine <https://feedback.mongodb.com/?category=7548141816650747033>`_
to send us feature requests and general feedback about PyMongo.

Contributing
Expand Down
83 changes: 61 additions & 22 deletions pymongo/asynchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
DocumentTooLarge,
ExecutionTimeout,
InvalidOperation,
NetworkTimeout,
NotPrimaryError,
OperationFailure,
PyMongoError,
Expand Down Expand Up @@ -723,6 +724,7 @@ class PoolState:
PAUSED = 1
READY = 2
CLOSED = 3
BACKOFF = 4


# Do *not* explicitly inherit from object or Jython won't call __del__
Expand Down Expand Up @@ -791,6 +793,7 @@ def __init__(
self._pending = 0
self._client_id = client_id
self._backoff = 0
self._backoff_connection_time = 0.0
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_created(
Expand All @@ -817,6 +820,8 @@ def __init__(
async def ready(self) -> None:
# Take the lock to avoid the race condition described in PYTHON-2699.
async with self.lock:
if self.state == PoolState.BACKOFF:
return
if self.state != PoolState.READY:
self.state = PoolState.READY
if self.enabled_for_cmap:
Expand Down Expand Up @@ -846,7 +851,7 @@ async def _reset(
async with self.size_cond:
if self.closed:
return
# Clear the backoff state.
# Clear the backoff amount.
self._backoff = 0
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
old_state, self.state = self.state, PoolState.PAUSED
Expand Down Expand Up @@ -948,7 +953,9 @@ async def reset(
self, service_id: Optional[ObjectId] = None, interrupt_connections: bool = False
) -> None:
await self._reset(
close=False, service_id=service_id, interrupt_connections=interrupt_connections
close=False,
service_id=service_id,
interrupt_connections=interrupt_connections,
)

async def reset_without_pause(self) -> None:
Expand Down Expand Up @@ -1029,17 +1036,31 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
self.requests -= 1
self.size_cond.notify()

def _handle_connection_error(self, error: BaseException, phase: str, conn_id: int) -> None:
async def _handle_connection_error(self, error: BaseException, phase: str) -> None:
# Handle system overload condition for non-sdam pools.
# Look for an AutoReconnect error raised from a ConnectionResetError with
# errno == errno.ECONNRESET or raised from an OSError that we've created due to
# a closed connection.
# Look for an AutoReconnect or NetworkTimeout error.
# If found, set backoff and add error labels.
if self.is_sdam or type(error) != AutoReconnect:
if self.is_sdam or type(error) not in (AutoReconnect, NetworkTimeout):
return
self._backoff += 1
error._add_error_label("SystemOverloadedError")
error._add_error_label("RetryableError")
error._add_error_label("SystemOverloadedError") # type:ignore[attr-defined]
error._add_error_label("RetryableError") # type:ignore[attr-defined]
await self.backoff()

async def backoff(self) -> None:
"""Set/increase backoff mode."""
async with self.lock:
self._backoff += 1
backoff_duration_sec = _backoff(self._backoff)
backoff_duration_ms = int(backoff_duration_sec * 1000)
if self.state != PoolState.BACKOFF:
self.state = PoolState.BACKOFF
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_backoff(
self.address, self._backoff, backoff_duration_ms
)
self._backoff_connection_time = backoff_duration_sec + time.monotonic()

# Log the pool backoff message.
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
Expand All @@ -1048,7 +1069,8 @@ def _handle_connection_error(self, error: BaseException, phase: str, conn_id: in
clientId=self._client_id,
serverHost=self.address[0],
serverPort=self.address[1],
driverConnectionId=conn_id,
attempt=self._backoff,
durationMS=backoff_duration_ms,
reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF),
error=ConnectionClosedReason.POOL_BACKOFF,
)
Expand All @@ -1061,6 +1083,10 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
Note that the pool does not keep a reference to the socket -- you
must call checkin() when you're done with it.
"""
# Mark whether we were in ready state before starting the process, to
# handle the case of multiple pending connections.
was_ready = self.state == PoolState.READY

async with self.lock:
conn_id = self.next_connection_id
self.next_connection_id += 1
Expand All @@ -1082,10 +1108,6 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
driverConnectionId=conn_id,
)

# Apply backoff if applicable.
if self._backoff:
await asyncio.sleep(_backoff(self._backoff))

# Pass a context to determine if we successfully create a configured socket.
context = dict(has_created_socket=False)

Expand Down Expand Up @@ -1113,8 +1135,10 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR),
error=ConnectionClosedReason.ERROR,
)
if context["has_created_socket"]:
self._handle_connection_error(error, "handshake", conn_id)
if context["has_created_socket"] and not (
was_ready and self.state == PoolState.BACKOFF
):
await self._handle_connection_error(error, "handshake")
if isinstance(error, (IOError, OSError, *SSLErrors)):
details = _get_timeout_details(self.opts)
_raise_connection_failure(self.address, error, timeout_details=details)
Expand All @@ -1126,9 +1150,11 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
self.active_contexts.discard(tmp_context)
if tmp_context.cancelled:
conn.cancel_context.cancel()
has_completed_hello = False
try:
if not self.is_sdam:
await conn.hello()
has_completed_hello = True
self.is_writable = conn.is_writable
if handler:
handler.contribute_socket(conn, completed_handshake=False)
Expand All @@ -1138,15 +1164,19 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
except BaseException as e:
async with self.lock:
self.active_contexts.discard(conn.cancel_context)
self._handle_connection_error(e, "hello", conn_id)
if not has_completed_hello and not (was_ready and self.state == PoolState.BACKOFF):
await self._handle_connection_error(e, "hello")
await conn.close_conn(ConnectionClosedReason.ERROR)
raise

if handler:
await handler.client._topology.receive_cluster_time(conn._cluster_time)

# Clear the backoff state.
self._backoff = 0
if self._backoff:
self._backoff = 0
await self.ready()

return conn

@contextlib.asynccontextmanager
Expand Down Expand Up @@ -1229,7 +1259,7 @@ async def checkout(
await self.checkin(conn)

def _raise_if_not_ready(self, checkout_started_time: float, emit_event: bool) -> None:
if self.state != PoolState.READY:
if self.state not in (PoolState.READY, PoolState.BACKOFF):
if emit_event:
duration = time.monotonic() - checkout_started_time
if self.enabled_for_cmap:
Expand Down Expand Up @@ -1320,12 +1350,21 @@ async def _get_conn(
incremented = True
while conn is None:
# CMAP: we MUST wait for either maxConnecting OR for a socket
# to be checked back into the pool.
# to be checked back into the pool OR for the backoff period to expire.
async with self._max_connecting_cond:
self._raise_if_not_ready(checkout_started_time, emit_event=False)
while not (self.conns or self._pending < self.max_connecting):
timeout = deadline - time.monotonic() if deadline else None
curr_time = time.monotonic()
timeout = deadline - curr_time if deadline else None
if self._backoff:
if self._backoff_connection_time < curr_time:
break
if deadline is None or deadline > self._backoff_connection_time:
timeout = self._backoff_connection_time - curr_time
if not await _async_cond_wait(self._max_connecting_cond, timeout):
# Check whether a backoff period has expired.
if self._backoff and time.monotonic() > self._backoff_connection_time:
break
# Timed out, notify the next thread to ensure a
# timeout doesn't consume the condition.
if self.conns or self._pending < self.max_connecting:
Expand Down
3 changes: 2 additions & 1 deletion pymongo/asynchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
_SDAMStatusMessage,
_ServerSelectionStatusMessage,
)
from pymongo.pool import PoolState
from pymongo.pool_options import PoolOptions
from pymongo.server_description import ServerDescription
from pymongo.server_selectors import (
Expand Down Expand Up @@ -485,7 +486,7 @@ async def _process_change(
server_description.is_server_type_known and new_td.topology_type == TOPOLOGY_TYPE.Single
):
server = self._servers.get(server_description.address)
if server:
if server and server.pool.state != PoolState.BACKOFF:
await server.pool.ready()

suppress_event = sd_old == server_description
Expand Down
50 changes: 50 additions & 0 deletions pymongo/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ def pool_cleared(self, event):
def pool_closed(self, event):
logging.info("[pool {0.address}] pool closed".format(event))

def pool_backoff(self, event):
logging.info("[pool {0.address}] pool backoff attempt {0.event}".format(event))

def connection_created(self, event):
logging.info("[pool {0.address}][connection #{0.connection_id}] "
"connection created".format(event))
Expand Down Expand Up @@ -305,6 +308,15 @@ def pool_closed(self, event: PoolClosedEvent) -> None:
"""
raise NotImplementedError

def pool_backoff(self, event: PoolBackoffEvent) -> None:
"""Abstract method to handle a `PoolBackoffEvent`.

Emitted when a connection Pool is in backoff.

:param event: An instance of :class:`PoolBackoffEvent`.
"""
raise NotImplementedError

def connection_created(self, event: ConnectionCreatedEvent) -> None:
"""Abstract method to handle a :class:`ConnectionCreatedEvent`.

Expand Down Expand Up @@ -914,6 +926,35 @@ class PoolClosedEvent(_PoolEvent):
__slots__ = ()


class PoolBackoffEvent(_PoolEvent):
"""Published when a Connection Pool is backing off.

:param address: The address (host, port) pair of the server this Pool is
attempting to connect to.
:param attempt: The backoff attempt.
:param duration_ms: The backoff duration in ms.

.. versionadded:: 4.16
"""

__slots__ = ("__attempt", "__duration_ms")

def __init__(self, address: _Address, attempt: int, duration_ms: int) -> None:
super().__init__(address)
self.__attempt = attempt
self.__duration_ms = duration_ms

@property
def attempt(self) -> int:
"""The backoff attempt."""
return self.__attempt

@property
def duration_ms(self) -> int:
"""The backoff duration in ms."""
return self.__duration_ms


class ConnectionClosedReason:
"""An enum that defines values for `reason` on a
:class:`ConnectionClosedEvent`.
Expand Down Expand Up @@ -1830,6 +1871,15 @@ def publish_pool_closed(self, address: _Address) -> None:
except Exception:
_handle_exception()

def publish_pool_backoff(self, address: _Address, attempt: int, duration_ms: int) -> None:
"""Publish a :class:`PoolBackoffEvent` to all pool listeners."""
event = PoolBackoffEvent(address, attempt, duration_ms)
for subscriber in self.__cmap_listeners:
try:
subscriber.pool_backoff(event)
except Exception:
_handle_exception()

def publish_connection_created(self, address: _Address, connection_id: int) -> None:
"""Publish a :class:`ConnectionCreatedEvent` to all connection
listeners.
Expand Down
Loading
Loading