diff --git a/.evergreen/generated_configs/tasks.yml b/.evergreen/generated_configs/tasks.yml index 65813db1cf..657c4a30ca 100644 --- a/.evergreen/generated_configs/tasks.yml +++ b/.evergreen/generated_configs/tasks.yml @@ -297,13 +297,13 @@ tasks: vars: PYTHON_VERSION: "3.14" tags: [test-no-orchestration, python-3.14] - - name: test-no-orchestration-pypy3.10 + - name: test-no-orchestration-pypy3.11 commands: - func: assume ec2 role - func: run tests vars: - PYTHON_VERSION: pypy3.10 - tags: [test-no-orchestration, python-pypy3.10] + PYTHON_VERSION: pypy3.11 + tags: [test-no-orchestration, python-pypy3.11] # No toolchain tests - name: test-no-toolchain-sync-noauth-nossl-standalone @@ -2492,7 +2492,7 @@ tasks: - python-3.14 - standalone-noauth-ssl - async - - name: test-server-version-pypy3.10-sync-noauth-nossl-standalone + - name: test-server-version-pypy3.11-sync-noauth-nossl-standalone commands: - func: run server vars: @@ -2504,11 +2504,11 @@ tasks: AUTH: noauth SSL: nossl TOPOLOGY: standalone - PYTHON_VERSION: pypy3.10 + PYTHON_VERSION: pypy3.11 TEST_NAME: default_sync tags: - server-version - - python-pypy3.10 + - python-pypy3.11 - standalone-noauth-nossl - sync - pr @@ -2639,7 +2639,7 @@ tasks: - python-3.14 - replica_set-noauth-ssl - sync - - name: test-server-version-pypy3.10-async-noauth-ssl-replica-set + - name: test-server-version-pypy3.11-async-noauth-ssl-replica-set commands: - func: run server vars: @@ -2651,11 +2651,11 @@ tasks: AUTH: noauth SSL: ssl TOPOLOGY: replica_set - PYTHON_VERSION: pypy3.10 + PYTHON_VERSION: pypy3.11 TEST_NAME: default_async tags: - server-version - - python-pypy3.10 + - python-pypy3.11 - replica_set-noauth-ssl - async - name: test-server-version-python3.9-sync-noauth-nossl-replica-set-cov @@ -2788,7 +2788,7 @@ tasks: - python-3.14 - sharded_cluster-auth-nossl - async - - name: test-server-version-pypy3.10-sync-noauth-ssl-sharded-cluster + - name: test-server-version-pypy3.11-sync-noauth-ssl-sharded-cluster commands: - func: run server vars: @@ -2800,11 +2800,11 @@ tasks: AUTH: noauth SSL: ssl TOPOLOGY: sharded_cluster - PYTHON_VERSION: pypy3.10 + PYTHON_VERSION: pypy3.11 TEST_NAME: default_sync tags: - server-version - - python-pypy3.10 + - python-pypy3.11 - sharded_cluster-noauth-ssl - sync - name: test-server-version-python3.9-async-noauth-ssl-sharded-cluster-cov @@ -3080,7 +3080,7 @@ tasks: - python-3.14 - sharded_cluster-auth-ssl - async - - name: test-server-version-pypy3.10-sync-auth-ssl-sharded-cluster + - name: test-server-version-pypy3.11-sync-auth-ssl-sharded-cluster commands: - func: run server vars: @@ -3092,14 +3092,14 @@ tasks: AUTH: auth SSL: ssl TOPOLOGY: sharded_cluster - PYTHON_VERSION: pypy3.10 + PYTHON_VERSION: pypy3.11 TEST_NAME: default_sync tags: - server-version - - python-pypy3.10 + - python-pypy3.11 - sharded_cluster-auth-ssl - sync - - name: test-server-version-pypy3.10-async-auth-ssl-sharded-cluster + - name: test-server-version-pypy3.11-async-auth-ssl-sharded-cluster commands: - func: run server vars: @@ -3111,11 +3111,11 @@ tasks: AUTH: auth SSL: ssl TOPOLOGY: sharded_cluster - PYTHON_VERSION: pypy3.10 + PYTHON_VERSION: pypy3.11 TEST_NAME: default_async tags: - server-version - - python-pypy3.10 + - python-pypy3.11 - sharded_cluster-auth-ssl - async @@ -3186,7 +3186,7 @@ tasks: - python-3.11 - sharded_cluster-auth-ssl - sync - - name: test-standard-v4.2-pypy3.10-sync-auth-ssl-sharded-cluster + - name: test-standard-v4.2-pypy3.11-sync-auth-ssl-sharded-cluster commands: - func: run server vars: @@ -3200,12 +3200,12 @@ tasks: SSL: ssl TOPOLOGY: sharded_cluster VERSION: "4.2" - PYTHON_VERSION: pypy3.10 + PYTHON_VERSION: pypy3.11 TEST_NAME: default_sync tags: - test-standard - server-4.2 - - python-pypy3.10 + - python-pypy3.11 - sharded_cluster-auth-ssl - sync - pypy @@ -3319,7 +3319,7 @@ tasks: - python-3.11 - sharded_cluster-auth-ssl - async - - name: test-standard-v4.4-pypy3.10-async-auth-ssl-sharded-cluster + - name: test-standard-v4.4-pypy3.11-async-auth-ssl-sharded-cluster commands: - func: run server vars: @@ -3333,12 +3333,12 @@ tasks: SSL: ssl TOPOLOGY: sharded_cluster VERSION: "4.4" - PYTHON_VERSION: pypy3.10 + PYTHON_VERSION: pypy3.11 TEST_NAME: default_async tags: - test-standard - server-4.4 - - python-pypy3.10 + - python-pypy3.11 - sharded_cluster-auth-ssl - async - pypy @@ -3694,7 +3694,7 @@ tasks: - python-3.11 - standalone-noauth-nossl - sync - - name: test-standard-v7.0-pypy3.10-sync-noauth-nossl-standalone + - name: test-standard-v7.0-pypy3.11-sync-noauth-nossl-standalone commands: - func: run server vars: @@ -3708,12 +3708,12 @@ tasks: SSL: nossl TOPOLOGY: standalone VERSION: "7.0" - PYTHON_VERSION: pypy3.10 + PYTHON_VERSION: pypy3.11 TEST_NAME: default_sync tags: - test-standard - server-7.0 - - python-pypy3.10 + - python-pypy3.11 - standalone-noauth-nossl - sync - pypy @@ -3805,7 +3805,7 @@ tasks: - python-3.11 - standalone-noauth-nossl - async - - name: test-standard-v8.0-pypy3.10-async-noauth-nossl-standalone + - name: test-standard-v8.0-pypy3.11-async-noauth-nossl-standalone commands: - func: run server vars: @@ -3819,12 +3819,12 @@ tasks: SSL: nossl TOPOLOGY: standalone VERSION: "8.0" - PYTHON_VERSION: pypy3.10 + PYTHON_VERSION: pypy3.11 TEST_NAME: default_async tags: - test-standard - server-8.0 - - python-pypy3.10 + - python-pypy3.11 - standalone-noauth-nossl - async - pypy @@ -3851,7 +3851,7 @@ tasks: - replica_set-noauth-ssl - async - pr - - name: test-standard-latest-pypy3.10-async-noauth-ssl-replica-set + - name: test-standard-latest-pypy3.11-async-noauth-ssl-replica-set commands: - func: run server vars: @@ -3865,12 +3865,12 @@ tasks: SSL: ssl TOPOLOGY: replica_set VERSION: latest - PYTHON_VERSION: pypy3.10 + PYTHON_VERSION: pypy3.11 TEST_NAME: default_async tags: - test-standard - server-latest - - python-pypy3.10 + - python-pypy3.11 - replica_set-noauth-ssl - async - pypy @@ -3965,7 +3965,7 @@ tasks: - python-3.11 - replica_set-noauth-ssl - sync - - name: test-standard-rapid-pypy3.10-sync-noauth-ssl-replica-set + - name: test-standard-rapid-pypy3.11-sync-noauth-ssl-replica-set commands: - func: run server vars: @@ -3979,12 +3979,12 @@ tasks: SSL: ssl TOPOLOGY: replica_set VERSION: rapid - PYTHON_VERSION: pypy3.10 + PYTHON_VERSION: pypy3.11 TEST_NAME: default_sync tags: - test-standard - server-rapid - - python-pypy3.10 + - python-pypy3.11 - replica_set-noauth-ssl - sync - pypy @@ -4563,7 +4563,7 @@ tasks: - sharded_cluster-auth-ssl - auth - pr - - name: test-non-standard-v4.2-pypy3.10-noauth-nossl-standalone + - name: test-non-standard-v4.2-pypy3.11-noauth-nossl-standalone commands: - func: run server vars: @@ -4577,15 +4577,15 @@ tasks: SSL: nossl TOPOLOGY: standalone VERSION: "4.2" - PYTHON_VERSION: pypy3.10 + PYTHON_VERSION: pypy3.11 tags: - test-non-standard - server-4.2 - - python-pypy3.10 + - python-pypy3.11 - standalone-noauth-nossl - noauth - pypy - - name: test-non-standard-v4.4-pypy3.10-noauth-ssl-replica-set + - name: test-non-standard-v4.4-pypy3.11-noauth-ssl-replica-set commands: - func: run server vars: @@ -4599,15 +4599,15 @@ tasks: SSL: ssl TOPOLOGY: replica_set VERSION: "4.4" - PYTHON_VERSION: pypy3.10 + PYTHON_VERSION: pypy3.11 tags: - test-non-standard - server-4.4 - - python-pypy3.10 + - python-pypy3.11 - replica_set-noauth-ssl - noauth - pypy - - name: test-non-standard-v5.0-pypy3.10-auth-ssl-sharded-cluster + - name: test-non-standard-v5.0-pypy3.11-auth-ssl-sharded-cluster commands: - func: run server vars: @@ -4621,15 +4621,15 @@ tasks: SSL: ssl TOPOLOGY: sharded_cluster VERSION: "5.0" - PYTHON_VERSION: pypy3.10 + PYTHON_VERSION: pypy3.11 tags: - test-non-standard - server-5.0 - - python-pypy3.10 + - python-pypy3.11 - sharded_cluster-auth-ssl - auth - pypy - - name: test-non-standard-v6.0-pypy3.10-noauth-nossl-standalone + - name: test-non-standard-v6.0-pypy3.11-noauth-nossl-standalone commands: - func: run server vars: @@ -4643,15 +4643,15 @@ tasks: SSL: nossl TOPOLOGY: standalone VERSION: "6.0" - PYTHON_VERSION: pypy3.10 + PYTHON_VERSION: pypy3.11 tags: - test-non-standard - server-6.0 - - python-pypy3.10 + - python-pypy3.11 - standalone-noauth-nossl - noauth - pypy - - name: test-non-standard-v7.0-pypy3.10-noauth-ssl-replica-set + - name: test-non-standard-v7.0-pypy3.11-noauth-ssl-replica-set commands: - func: run server vars: @@ -4665,15 +4665,15 @@ tasks: SSL: ssl TOPOLOGY: replica_set VERSION: "7.0" - PYTHON_VERSION: pypy3.10 + PYTHON_VERSION: pypy3.11 tags: - test-non-standard - server-7.0 - - python-pypy3.10 + - python-pypy3.11 - replica_set-noauth-ssl - noauth - pypy - - name: test-non-standard-v8.0-pypy3.10-auth-ssl-sharded-cluster + - name: test-non-standard-v8.0-pypy3.11-auth-ssl-sharded-cluster commands: - func: run server vars: @@ -4687,15 +4687,15 @@ tasks: SSL: ssl TOPOLOGY: sharded_cluster VERSION: "8.0" - PYTHON_VERSION: pypy3.10 + PYTHON_VERSION: pypy3.11 tags: - test-non-standard - server-8.0 - - python-pypy3.10 + - python-pypy3.11 - sharded_cluster-auth-ssl - auth - pypy - - name: test-non-standard-rapid-pypy3.10-noauth-nossl-standalone + - name: test-non-standard-rapid-pypy3.11-noauth-nossl-standalone commands: - func: run server vars: @@ -4709,15 +4709,15 @@ tasks: SSL: nossl TOPOLOGY: standalone VERSION: rapid - PYTHON_VERSION: pypy3.10 + PYTHON_VERSION: pypy3.11 tags: - test-non-standard - server-rapid - - python-pypy3.10 + - python-pypy3.11 - standalone-noauth-nossl - noauth - pypy - - name: test-non-standard-latest-pypy3.10-noauth-ssl-replica-set + - name: test-non-standard-latest-pypy3.11-noauth-ssl-replica-set commands: - func: run server vars: @@ -4731,11 +4731,11 @@ tasks: SSL: ssl TOPOLOGY: replica_set VERSION: latest - PYTHON_VERSION: pypy3.10 + PYTHON_VERSION: pypy3.11 tags: - test-non-standard - server-latest - - python-pypy3.10 + - python-pypy3.11 - replica_set-noauth-ssl - noauth - pypy diff --git a/.evergreen/scripts/generate_config_utils.py b/.evergreen/scripts/generate_config_utils.py index 632d34ea6f..c5054b6f48 100644 --- a/.evergreen/scripts/generate_config_utils.py +++ b/.evergreen/scripts/generate_config_utils.py @@ -23,7 +23,7 @@ ALL_VERSIONS = ["4.2", "4.4", "5.0", "6.0", "7.0", "8.0", "rapid", "latest"] CPYTHONS = ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] -PYPYS = ["pypy3.10"] +PYPYS = ["pypy3.11"] ALL_PYTHONS = CPYTHONS + PYPYS MIN_MAX_PYTHON = [CPYTHONS[0], CPYTHONS[-1]] BATCHTIME_WEEK = 10080 diff --git a/doc/index.rst b/doc/index.rst index 85812d1b14..9a2c3eb6b2 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -37,7 +37,7 @@ project. Feature Requests / Feedback --------------------------- -Use our `feedback engine `_ +Use our `feedback engine `_ to send us feature requests and general feedback about PyMongo. Contributing diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 065686f43a..74aa903596 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -52,6 +52,7 @@ DocumentTooLarge, ExecutionTimeout, InvalidOperation, + NetworkTimeout, NotPrimaryError, OperationFailure, PyMongoError, @@ -723,6 +724,7 @@ class PoolState: PAUSED = 1 READY = 2 CLOSED = 3 + BACKOFF = 4 # Do *not* explicitly inherit from object or Jython won't call __del__ @@ -791,6 +793,7 @@ def __init__( self._pending = 0 self._client_id = client_id self._backoff = 0 + self._backoff_connection_time = 0.0 if self.enabled_for_cmap: assert self.opts._event_listeners is not None self.opts._event_listeners.publish_pool_created( @@ -817,6 +820,8 @@ def __init__( async def ready(self) -> None: # Take the lock to avoid the race condition described in PYTHON-2699. async with self.lock: + if self.state == PoolState.BACKOFF: + return if self.state != PoolState.READY: self.state = PoolState.READY if self.enabled_for_cmap: @@ -846,7 +851,7 @@ async def _reset( async with self.size_cond: if self.closed: return - # Clear the backoff state. + # Clear the backoff amount. self._backoff = 0 if self.opts.pause_enabled and pause and not self.opts.load_balanced: old_state, self.state = self.state, PoolState.PAUSED @@ -948,7 +953,9 @@ async def reset( self, service_id: Optional[ObjectId] = None, interrupt_connections: bool = False ) -> None: await self._reset( - close=False, service_id=service_id, interrupt_connections=interrupt_connections + close=False, + service_id=service_id, + interrupt_connections=interrupt_connections, ) async def reset_without_pause(self) -> None: @@ -1029,17 +1036,31 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: self.requests -= 1 self.size_cond.notify() - def _handle_connection_error(self, error: BaseException, phase: str, conn_id: int) -> None: + async def _handle_connection_error(self, error: BaseException, phase: str) -> None: # Handle system overload condition for non-sdam pools. - # Look for an AutoReconnect error raised from a ConnectionResetError with - # errno == errno.ECONNRESET or raised from an OSError that we've created due to - # a closed connection. + # Look for an AutoReconnect or NetworkTimeout error. # If found, set backoff and add error labels. - if self.is_sdam or type(error) != AutoReconnect: + if self.is_sdam or type(error) not in (AutoReconnect, NetworkTimeout): return - self._backoff += 1 - error._add_error_label("SystemOverloadedError") - error._add_error_label("RetryableError") + error._add_error_label("SystemOverloadedError") # type:ignore[attr-defined] + error._add_error_label("RetryableError") # type:ignore[attr-defined] + await self.backoff() + + async def backoff(self) -> None: + """Set/increase backoff mode.""" + async with self.lock: + self._backoff += 1 + backoff_duration_sec = _backoff(self._backoff) + backoff_duration_ms = int(backoff_duration_sec * 1000) + if self.state != PoolState.BACKOFF: + self.state = PoolState.BACKOFF + if self.enabled_for_cmap: + assert self.opts._event_listeners is not None + self.opts._event_listeners.publish_pool_backoff( + self.address, self._backoff, backoff_duration_ms + ) + self._backoff_connection_time = backoff_duration_sec + time.monotonic() + # Log the pool backoff message. if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( @@ -1048,7 +1069,8 @@ def _handle_connection_error(self, error: BaseException, phase: str, conn_id: in clientId=self._client_id, serverHost=self.address[0], serverPort=self.address[1], - driverConnectionId=conn_id, + attempt=self._backoff, + durationMS=backoff_duration_ms, reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF), error=ConnectionClosedReason.POOL_BACKOFF, ) @@ -1061,6 +1083,10 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A Note that the pool does not keep a reference to the socket -- you must call checkin() when you're done with it. """ + # Mark whether we were in ready state before starting the process, to + # handle the case of multiple pending connections. + was_ready = self.state == PoolState.READY + async with self.lock: conn_id = self.next_connection_id self.next_connection_id += 1 @@ -1082,10 +1108,6 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A driverConnectionId=conn_id, ) - # Apply backoff if applicable. - if self._backoff: - await asyncio.sleep(_backoff(self._backoff)) - # Pass a context to determine if we successfully create a configured socket. context = dict(has_created_socket=False) @@ -1113,8 +1135,10 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), error=ConnectionClosedReason.ERROR, ) - if context["has_created_socket"]: - self._handle_connection_error(error, "handshake", conn_id) + if context["has_created_socket"] and not ( + was_ready and self.state == PoolState.BACKOFF + ): + await self._handle_connection_error(error, "handshake") if isinstance(error, (IOError, OSError, *SSLErrors)): details = _get_timeout_details(self.opts) _raise_connection_failure(self.address, error, timeout_details=details) @@ -1126,9 +1150,11 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A self.active_contexts.discard(tmp_context) if tmp_context.cancelled: conn.cancel_context.cancel() + has_completed_hello = False try: if not self.is_sdam: await conn.hello() + has_completed_hello = True self.is_writable = conn.is_writable if handler: handler.contribute_socket(conn, completed_handshake=False) @@ -1138,7 +1164,8 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A except BaseException as e: async with self.lock: self.active_contexts.discard(conn.cancel_context) - self._handle_connection_error(e, "hello", conn_id) + if not has_completed_hello and not (was_ready and self.state == PoolState.BACKOFF): + await self._handle_connection_error(e, "hello") await conn.close_conn(ConnectionClosedReason.ERROR) raise @@ -1146,7 +1173,10 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A await handler.client._topology.receive_cluster_time(conn._cluster_time) # Clear the backoff state. - self._backoff = 0 + if self._backoff: + self._backoff = 0 + await self.ready() + return conn @contextlib.asynccontextmanager @@ -1229,7 +1259,7 @@ async def checkout( await self.checkin(conn) def _raise_if_not_ready(self, checkout_started_time: float, emit_event: bool) -> None: - if self.state != PoolState.READY: + if self.state not in (PoolState.READY, PoolState.BACKOFF): if emit_event: duration = time.monotonic() - checkout_started_time if self.enabled_for_cmap: @@ -1320,12 +1350,21 @@ async def _get_conn( incremented = True while conn is None: # CMAP: we MUST wait for either maxConnecting OR for a socket - # to be checked back into the pool. + # to be checked back into the pool OR for the backoff period to expire. async with self._max_connecting_cond: self._raise_if_not_ready(checkout_started_time, emit_event=False) while not (self.conns or self._pending < self.max_connecting): - timeout = deadline - time.monotonic() if deadline else None + curr_time = time.monotonic() + timeout = deadline - curr_time if deadline else None + if self._backoff: + if self._backoff_connection_time < curr_time: + break + if deadline is None or deadline > self._backoff_connection_time: + timeout = self._backoff_connection_time - curr_time if not await _async_cond_wait(self._max_connecting_cond, timeout): + # Check whether a backoff period has expired. + if self._backoff and time.monotonic() > self._backoff_connection_time: + break # Timed out, notify the next thread to ensure a # timeout doesn't consume the condition. if self.conns or self._pending < self.max_connecting: diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 1e91bbe79b..76cd2f4cb0 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -57,6 +57,7 @@ _SDAMStatusMessage, _ServerSelectionStatusMessage, ) +from pymongo.pool import PoolState from pymongo.pool_options import PoolOptions from pymongo.server_description import ServerDescription from pymongo.server_selectors import ( @@ -485,7 +486,7 @@ async def _process_change( server_description.is_server_type_known and new_td.topology_type == TOPOLOGY_TYPE.Single ): server = self._servers.get(server_description.address) - if server: + if server and server.pool.state != PoolState.BACKOFF: await server.pool.ready() suppress_event = sd_old == server_description diff --git a/pymongo/monitoring.py b/pymongo/monitoring.py index 0dfbbb915a..3f2dc9d06a 100644 --- a/pymongo/monitoring.py +++ b/pymongo/monitoring.py @@ -137,6 +137,9 @@ def pool_cleared(self, event): def pool_closed(self, event): logging.info("[pool {0.address}] pool closed".format(event)) + def pool_backoff(self, event): + logging.info("[pool {0.address}] pool backoff attempt {0.event}".format(event)) + def connection_created(self, event): logging.info("[pool {0.address}][connection #{0.connection_id}] " "connection created".format(event)) @@ -305,6 +308,15 @@ def pool_closed(self, event: PoolClosedEvent) -> None: """ raise NotImplementedError + def pool_backoff(self, event: PoolBackoffEvent) -> None: + """Abstract method to handle a `PoolBackoffEvent`. + + Emitted when a connection Pool is in backoff. + + :param event: An instance of :class:`PoolBackoffEvent`. + """ + raise NotImplementedError + def connection_created(self, event: ConnectionCreatedEvent) -> None: """Abstract method to handle a :class:`ConnectionCreatedEvent`. @@ -914,6 +926,35 @@ class PoolClosedEvent(_PoolEvent): __slots__ = () +class PoolBackoffEvent(_PoolEvent): + """Published when a Connection Pool is backing off. + + :param address: The address (host, port) pair of the server this Pool is + attempting to connect to. + :param attempt: The backoff attempt. + :param duration_ms: The backoff duration in ms. + + .. versionadded:: 4.16 + """ + + __slots__ = ("__attempt", "__duration_ms") + + def __init__(self, address: _Address, attempt: int, duration_ms: int) -> None: + super().__init__(address) + self.__attempt = attempt + self.__duration_ms = duration_ms + + @property + def attempt(self) -> int: + """The backoff attempt.""" + return self.__attempt + + @property + def duration_ms(self) -> int: + """The backoff duration in ms.""" + return self.__duration_ms + + class ConnectionClosedReason: """An enum that defines values for `reason` on a :class:`ConnectionClosedEvent`. @@ -1830,6 +1871,15 @@ def publish_pool_closed(self, address: _Address) -> None: except Exception: _handle_exception() + def publish_pool_backoff(self, address: _Address, attempt: int, duration_ms: int) -> None: + """Publish a :class:`PoolBackoffEvent` to all pool listeners.""" + event = PoolBackoffEvent(address, attempt, duration_ms) + for subscriber in self.__cmap_listeners: + try: + subscriber.pool_backoff(event) + except Exception: + _handle_exception() + def publish_connection_created(self, address: _Address, connection_id: int) -> None: """Publish a :class:`ConnectionCreatedEvent` to all connection listeners. diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index d0c517f186..dff4fb7ac7 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -49,6 +49,7 @@ DocumentTooLarge, ExecutionTimeout, InvalidOperation, + NetworkTimeout, NotPrimaryError, OperationFailure, PyMongoError, @@ -721,6 +722,7 @@ class PoolState: PAUSED = 1 READY = 2 CLOSED = 3 + BACKOFF = 4 # Do *not* explicitly inherit from object or Jython won't call __del__ @@ -789,6 +791,7 @@ def __init__( self._pending = 0 self._client_id = client_id self._backoff = 0 + self._backoff_connection_time = 0.0 if self.enabled_for_cmap: assert self.opts._event_listeners is not None self.opts._event_listeners.publish_pool_created( @@ -815,6 +818,8 @@ def __init__( def ready(self) -> None: # Take the lock to avoid the race condition described in PYTHON-2699. with self.lock: + if self.state == PoolState.BACKOFF: + return if self.state != PoolState.READY: self.state = PoolState.READY if self.enabled_for_cmap: @@ -844,7 +849,7 @@ def _reset( with self.size_cond: if self.closed: return - # Clear the backoff state. + # Clear the backoff amount. self._backoff = 0 if self.opts.pause_enabled and pause and not self.opts.load_balanced: old_state, self.state = self.state, PoolState.PAUSED @@ -945,7 +950,11 @@ def update_is_writable(self, is_writable: Optional[bool]) -> None: def reset( self, service_id: Optional[ObjectId] = None, interrupt_connections: bool = False ) -> None: - self._reset(close=False, service_id=service_id, interrupt_connections=interrupt_connections) + self._reset( + close=False, + service_id=service_id, + interrupt_connections=interrupt_connections, + ) def reset_without_pause(self) -> None: self._reset(close=False, pause=False) @@ -1025,17 +1034,31 @@ def remove_stale_sockets(self, reference_generation: int) -> None: self.requests -= 1 self.size_cond.notify() - def _handle_connection_error(self, error: BaseException, phase: str, conn_id: int) -> None: + def _handle_connection_error(self, error: BaseException, phase: str) -> None: # Handle system overload condition for non-sdam pools. - # Look for an AutoReconnect error raised from a ConnectionResetError with - # errno == errno.ECONNRESET or raised from an OSError that we've created due to - # a closed connection. + # Look for an AutoReconnect or NetworkTimeout error. # If found, set backoff and add error labels. - if self.is_sdam or type(error) != AutoReconnect: + if self.is_sdam or type(error) not in (AutoReconnect, NetworkTimeout): return - self._backoff += 1 - error._add_error_label("SystemOverloadedError") - error._add_error_label("RetryableError") + error._add_error_label("SystemOverloadedError") # type:ignore[attr-defined] + error._add_error_label("RetryableError") # type:ignore[attr-defined] + self.backoff() + + def backoff(self) -> None: + """Set/increase backoff mode.""" + with self.lock: + self._backoff += 1 + backoff_duration_sec = _backoff(self._backoff) + backoff_duration_ms = int(backoff_duration_sec * 1000) + if self.state != PoolState.BACKOFF: + self.state = PoolState.BACKOFF + if self.enabled_for_cmap: + assert self.opts._event_listeners is not None + self.opts._event_listeners.publish_pool_backoff( + self.address, self._backoff, backoff_duration_ms + ) + self._backoff_connection_time = backoff_duration_sec + time.monotonic() + # Log the pool backoff message. if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( @@ -1044,7 +1067,8 @@ def _handle_connection_error(self, error: BaseException, phase: str, conn_id: in clientId=self._client_id, serverHost=self.address[0], serverPort=self.address[1], - driverConnectionId=conn_id, + attempt=self._backoff, + durationMS=backoff_duration_ms, reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF), error=ConnectionClosedReason.POOL_BACKOFF, ) @@ -1057,6 +1081,10 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect Note that the pool does not keep a reference to the socket -- you must call checkin() when you're done with it. """ + # Mark whether we were in ready state before starting the process, to + # handle the case of multiple pending connections. + was_ready = self.state == PoolState.READY + with self.lock: conn_id = self.next_connection_id self.next_connection_id += 1 @@ -1078,10 +1106,6 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect driverConnectionId=conn_id, ) - # Apply backoff if applicable. - if self._backoff: - time.sleep(_backoff(self._backoff)) - # Pass a context to determine if we successfully create a configured socket. context = dict(has_created_socket=False) @@ -1109,8 +1133,10 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), error=ConnectionClosedReason.ERROR, ) - if context["has_created_socket"]: - self._handle_connection_error(error, "handshake", conn_id) + if context["has_created_socket"] and not ( + was_ready and self.state == PoolState.BACKOFF + ): + self._handle_connection_error(error, "handshake") if isinstance(error, (IOError, OSError, *SSLErrors)): details = _get_timeout_details(self.opts) _raise_connection_failure(self.address, error, timeout_details=details) @@ -1122,9 +1148,11 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect self.active_contexts.discard(tmp_context) if tmp_context.cancelled: conn.cancel_context.cancel() + has_completed_hello = False try: if not self.is_sdam: conn.hello() + has_completed_hello = True self.is_writable = conn.is_writable if handler: handler.contribute_socket(conn, completed_handshake=False) @@ -1134,7 +1162,8 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect except BaseException as e: with self.lock: self.active_contexts.discard(conn.cancel_context) - self._handle_connection_error(e, "hello", conn_id) + if not has_completed_hello and not (was_ready and self.state == PoolState.BACKOFF): + self._handle_connection_error(e, "hello") conn.close_conn(ConnectionClosedReason.ERROR) raise @@ -1142,7 +1171,10 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect handler.client._topology.receive_cluster_time(conn._cluster_time) # Clear the backoff state. - self._backoff = 0 + if self._backoff: + self._backoff = 0 + self.ready() + return conn @contextlib.contextmanager @@ -1225,7 +1257,7 @@ def checkout( self.checkin(conn) def _raise_if_not_ready(self, checkout_started_time: float, emit_event: bool) -> None: - if self.state != PoolState.READY: + if self.state not in (PoolState.READY, PoolState.BACKOFF): if emit_event: duration = time.monotonic() - checkout_started_time if self.enabled_for_cmap: @@ -1316,12 +1348,21 @@ def _get_conn( incremented = True while conn is None: # CMAP: we MUST wait for either maxConnecting OR for a socket - # to be checked back into the pool. + # to be checked back into the pool OR for the backoff period to expire. with self._max_connecting_cond: self._raise_if_not_ready(checkout_started_time, emit_event=False) while not (self.conns or self._pending < self.max_connecting): - timeout = deadline - time.monotonic() if deadline else None + curr_time = time.monotonic() + timeout = deadline - curr_time if deadline else None + if self._backoff: + if self._backoff_connection_time < curr_time: + break + if deadline is None or deadline > self._backoff_connection_time: + timeout = self._backoff_connection_time - curr_time if not _cond_wait(self._max_connecting_cond, timeout): + # Check whether a backoff period has expired. + if self._backoff and time.monotonic() > self._backoff_connection_time: + break # Timed out, notify the next thread to ensure a # timeout doesn't consume the condition. if self.conns or self._pending < self.max_connecting: diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index 0f6592dfc0..62c9485f4a 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -53,6 +53,7 @@ _SDAMStatusMessage, _ServerSelectionStatusMessage, ) +from pymongo.pool import PoolState from pymongo.pool_options import PoolOptions from pymongo.server_description import ServerDescription from pymongo.server_selectors import ( @@ -485,7 +486,7 @@ def _process_change( server_description.is_server_type_known and new_td.topology_type == TOPOLOGY_TYPE.Single ): server = self._servers.get(server_description.address) - if server: + if server and server.pool.state != PoolState.BACKOFF: server.pool.ready() suppress_event = sd_old == server_description diff --git a/test/asynchronous/test_connection_monitoring.py b/test/asynchronous/test_connection_monitoring.py index c6dc6f0a69..f2502a7d54 100644 --- a/test/asynchronous/test_connection_monitoring.py +++ b/test/asynchronous/test_connection_monitoring.py @@ -52,6 +52,7 @@ ConnectionClosedReason, ConnectionCreatedEvent, ConnectionReadyEvent, + PoolBackoffEvent, PoolClearedEvent, PoolClosedEvent, PoolCreatedEvent, @@ -75,6 +76,7 @@ "ConnectionPoolReady": PoolReadyEvent, "ConnectionPoolCleared": PoolClearedEvent, "ConnectionPoolClosed": PoolClosedEvent, + "ConnectionPoolBackoff": PoolBackoffEvent, # Error types. "PoolClosedError": _PoolClosedError, "WaitQueueTimeoutError": WaitQueueTimeoutError, diff --git a/test/asynchronous/test_pooling.py b/test/asynchronous/test_pooling.py index 6cbdf7a65c..0d48746243 100644 --- a/test/asynchronous/test_pooling.py +++ b/test/asynchronous/test_pooling.py @@ -517,7 +517,7 @@ async def test_connection_timeout_message(self): async def test_pool_check_backoff(self): # Test that Pool recovers from two connection failures in a row. # This exercises code at the end of Pool._check(). - cx_pool = await self.create_pool(max_pool_size=1, connect_timeout=1, wait_queue_timeout=1) + cx_pool = await self.create_pool(max_pool_size=1, connect_timeout=1, wait_queue_timeout=10) self.addAsyncCleanup(cx_pool.close) async with cx_pool.checkout() as conn: @@ -526,7 +526,7 @@ async def test_pool_check_backoff(self): await conn.conn.close() # Enable backoff. - cx_pool._backoff = 1 + await cx_pool.backoff() # Swap pool's address with a bad one. address, cx_pool.address = cx_pool.address, ("foo.com", 1234) diff --git a/test/asynchronous/unified_format.py b/test/asynchronous/unified_format.py index 09bf7e83ea..94ae84953a 100644 --- a/test/asynchronous/unified_format.py +++ b/test/asynchronous/unified_format.py @@ -430,7 +430,7 @@ class UnifiedSpecTestMixinV1(AsyncIntegrationTest): a class attribute ``TEST_SPEC``. """ - SCHEMA_VERSION = Version.from_string("1.22") + SCHEMA_VERSION = Version.from_string("1.28") RUN_ON_LOAD_BALANCER = True TEST_SPEC: Any TEST_PATH = "" # This gets filled in by generate_test_classes diff --git a/test/asynchronous/utils.py b/test/asynchronous/utils.py index 02ba46c71a..29ef4a3a2a 100644 --- a/test/asynchronous/utils.py +++ b/test/asynchronous/utils.py @@ -242,6 +242,7 @@ def __init__(self, address, options, is_sdam=False, client_id=None): self.opts = options self.operation_count = 0 self.conns = [] + self.state = 0 def stale_generation(self, gen, service_id): return self.gen.stale(gen, service_id) diff --git a/test/connection_logging/connection-logging.json b/test/connection_logging/connection-logging.json index 60190c7dc0..f9f34ae95b 100644 --- a/test/connection_logging/connection-logging.json +++ b/test/connection_logging/connection-logging.json @@ -331,9 +331,7 @@ "uriOptions": { "retryReads": false, "appname": "clientAppName", - "heartbeatFrequencyMS": 10000, - "socketTimeoutMS": 500, - "connectTimeoutMS": 500 + "heartbeatFrequencyMS": 10000 }, "observeLogMessages": { "connection": "debug" @@ -357,9 +355,7 @@ "failCommands": [ "saslContinue" ], - "closeConnection": false, - "blockConnection": true, - "blockTimeMS": 1000, + "errorCode": 18, "appName": "clientAppName" } } @@ -372,7 +368,7 @@ "filter": {} }, "expectError": { - "isClientError": true + "isError": true } } ], @@ -522,6 +518,363 @@ ] } ] + }, + { + "description": "Connection enters backoff on closed connection", + "runOnRequirements": [ + { + "minServerVersion": "4.4" + } + ], + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "retryReads": true, + "appname": "clientAppName", + "heartbeatFrequencyMS": 5000 + }, + "observeEvents": [ + "serverHeartbeatSucceededEvent" + ], + "observeLogMessages": { + "connection": "debug" + } + } + }, + { + "database": { + "id": "database0", + "client": "client", + "databaseName": "ci-tests" + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatSucceededEvent": {} + }, + "count": 1 + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "isMaster", + "hello" + ], + "closeConnection": true, + "appName": "clientAppName" + } + } + } + }, + { + "object": "database0", + "name": "runCommand", + "arguments": { + "command": { + "find": "test" + }, + "commandName": "find" + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool created", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool ready", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checkout started", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection created", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection closed", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "reason": "An error occurred while using the connection", + "error": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool backoff", + "serverHost": { + "$$type": "string" + }, + "durationMS": { + "$$type": "int" + }, + "attempt": { + "$$type": "int" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "reason": "Connection pool is in backoff", + "error": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checkout failed", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "reason": "An error occurred while trying to establish a new connection", + "error": { + "$$exists": true + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checkout started", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection created", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection ready", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checked out", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checked in", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + } + ] + } + ] } ] } diff --git a/test/connection_monitoring/pool-backoff-connection-close.json b/test/connection_monitoring/pool-backoff-connection-close.json new file mode 100644 index 0000000000..571cd9f769 --- /dev/null +++ b/test/connection_monitoring/pool-backoff-connection-close.json @@ -0,0 +1,72 @@ +{ + "version": 1, + "style": "integration", + "description": "pool enters backoff on connection close", + "runOn": [ + { + "minServerVersion": "4.9.0" + } + ], + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "isMaster", + "hello" + ], + "closeConnection": true + } + }, + "poolOptions": { + "minPoolSize": 0 + }, + "operations": [ + { + "name": "ready" + }, + { + "name": "start", + "target": "thread1" + }, + { + "name": "checkOut", + "thread": "thread1" + }, + { + "name": "waitForEvent", + "event": "ConnectionCreated", + "count": 1 + }, + { + "name": "waitForEvent", + "event": "ConnectionCheckOutFailed", + "count": 1 + } + ], + "events": [ + { + "type": "ConnectionCheckOutStarted" + }, + { + "type": "ConnectionCreated" + }, + { + "type": "ConnectionClosed" + }, + { + "type": "ConnectionPoolBackoff" + }, + { + "type": "ConnectionCheckOutFailed" + } + ], + "ignore": [ + "ConnectionCheckedIn", + "ConnectionCheckedOut", + "ConnectionPoolCreated", + "ConnectionPoolReady" + ] +} diff --git a/test/connection_monitoring/pool-create-min-size-error.json b/test/connection_monitoring/pool-create-min-size-error.json index 8ec958780d..da9357b963 100644 --- a/test/connection_monitoring/pool-create-min-size-error.json +++ b/test/connection_monitoring/pool-create-min-size-error.json @@ -15,17 +15,13 @@ "isMaster", "hello" ], - "closeConnection": false, - "blockConnection": true, - "blockTimeMS": 1000, + "errorCode": 18, "appName": "poolCreateMinSizeErrorTest" } }, "poolOptions": { "minPoolSize": 1, "backgroundThreadIntervalMS": 50, - "socketTimeoutMS": 500, - "connectTimeoutMS": 500, "appName": "poolCreateMinSizeErrorTest" }, "operations": [ diff --git a/test/discovery_and_monitoring/unified/auth-network-error.json b/test/discovery_and_monitoring/unified/auth-network-error.json index 656b291366..c278665d68 100644 --- a/test/discovery_and_monitoring/unified/auth-network-error.json +++ b/test/discovery_and_monitoring/unified/auth-network-error.json @@ -53,9 +53,7 @@ "failCommands": [ "saslContinue" ], - "closeConnection": false, - "blockConnection": true, - "blockTimeMS": 1000, + "closeConnection": true, "appName": "authNetworkErrorTest" } } @@ -77,8 +75,6 @@ ], "uriOptions": { "retryWrites": false, - "socketTimeoutMS": 500, - "connectTimeoutMS": 500, "appname": "authNetworkErrorTest" } } @@ -231,4 +227,4 @@ ] } ] -} +} \ No newline at end of file diff --git a/test/discovery_and_monitoring/unified/auth-network-timeout-error.json b/test/discovery_and_monitoring/unified/auth-network-timeout-error.json index 3cf9576eba..c278665d68 100644 --- a/test/discovery_and_monitoring/unified/auth-network-timeout-error.json +++ b/test/discovery_and_monitoring/unified/auth-network-timeout-error.json @@ -1,5 +1,5 @@ { - "description": "auth-network-timeout-error", + "description": "auth-network-error", "schemaVersion": "1.4", "runOnRequirements": [ { @@ -23,7 +23,7 @@ ], "initialData": [ { - "collectionName": "auth-network-timeout-error", + "collectionName": "auth-network-error", "databaseName": "sdam-tests", "documents": [ { @@ -37,7 +37,7 @@ ], "tests": [ { - "description": "Reset server and pool after network timeout error during authentication", + "description": "Reset server and pool after network error during authentication", "operations": [ { "name": "failPoint", @@ -53,9 +53,8 @@ "failCommands": [ "saslContinue" ], - "blockConnection": true, - "blockTimeMS": 500, - "appName": "authNetworkTimeoutErrorTest" + "closeConnection": true, + "appName": "authNetworkErrorTest" } } } @@ -76,9 +75,7 @@ ], "uriOptions": { "retryWrites": false, - "appname": "authNetworkTimeoutErrorTest", - "connectTimeoutMS": 250, - "socketTimeoutMS": 250 + "appname": "authNetworkErrorTest" } } }, @@ -93,7 +90,7 @@ "collection": { "id": "collection", "database": "database", - "collectionName": "auth-network-timeout-error" + "collectionName": "auth-network-error" } } ] @@ -191,7 +188,7 @@ { "commandStartedEvent": { "command": { - "insert": "auth-network-timeout-error", + "insert": "auth-network-error", "documents": [ { "_id": 5 @@ -210,7 +207,7 @@ ], "outcome": [ { - "collectionName": "auth-network-timeout-error", + "collectionName": "auth-network-error", "databaseName": "sdam-tests", "documents": [ { @@ -230,4 +227,4 @@ ] } ] -} +} \ No newline at end of file diff --git a/test/discovery_and_monitoring/unified/backoff-heartbeat-failure.json b/test/discovery_and_monitoring/unified/backoff-heartbeat-failure.json new file mode 100644 index 0000000000..2d4e0471f9 --- /dev/null +++ b/test/discovery_and_monitoring/unified/backoff-heartbeat-failure.json @@ -0,0 +1,172 @@ +{ + "description": "heartbeat-failure-clears-backoff-pool", + "schemaVersion": "1.28", + "runOnRequirements": [ + { + "minServerVersion": "4.4", + "serverless": "forbid", + "topologies": [ + "single", + "replicaset", + "sharded" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "setupClient", + "useMultipleMongoses": false + } + } + ], + "initialData": [ + { + "collectionName": "heartbeat-backoff-error", + "databaseName": "sdam-tests", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + ], + "tests": [ + { + "description": "A heartbeat failure during backoff should clear the pool", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "observeEvents": [ + "commandStartedEvent", + "poolBackoffEvent", + "poolClearedEvent", + "serverHeartbeatFailedEvent", + "serverHeartbeatSucceededEvent" + ], + "uriOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 10000, + "serverMonitoringMode": "poll", + "appname": "heartbeatBackoffFailTest" + } + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "sdam-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "heartbeat-backoff-error" + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatSucceededEvent": {} + }, + "count": 1 + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "isMaster", + "hello" + ], + "appName": "heartbeatBackoffFailTest", + "closeConnection": true + } + } + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + "expectError": { + "isError": true + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatFailedEvent": {} + }, + "count": 1 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "poolBackoffEvent": {} + }, + { + "poolBackoffEvent": {} + }, + { + "poolBackoffEvent": {} + }, + { + "poolBackoffEvent": {} + }, + { + "poolBackoffEvent": {} + }, + { + "poolBackoffEvent": {} + }, + { + "poolClearedEvent": {} + } + ] + } + ] + } + ] +} diff --git a/test/discovery_and_monitoring/unified/backoff-heartbeat-success.json b/test/discovery_and_monitoring/unified/backoff-heartbeat-success.json new file mode 100644 index 0000000000..4f790007b5 --- /dev/null +++ b/test/discovery_and_monitoring/unified/backoff-heartbeat-success.json @@ -0,0 +1,180 @@ +{ + "description": "heartbeat-success-backoff", + "schemaVersion": "1.28", + "runOnRequirements": [ + { + "minServerVersion": "4.4", + "serverless": "forbid", + "topologies": [ + "single", + "replicaset", + "sharded" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "setupClient", + "useMultipleMongoses": false + } + } + ], + "initialData": [ + { + "collectionName": "heartbeat-backoff-success", + "databaseName": "sdam-tests", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + ], + "tests": [ + { + "description": "A heartbeat success during backoff not mark the pool as ready", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "observeEvents": [ + "commandStartedEvent", + "poolBackoffEvent", + "poolClearedEvent", + "serverHeartbeatFailedEvent", + "serverHeartbeatSucceededEvent" + ], + "uriOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 5000, + "serverMonitoringMode": "poll", + "appname": "heartbeatBackoffSuccessTest" + } + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "sdam-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "heartbeat-backoff-success" + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatSucceededEvent": {} + }, + "count": 1 + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "isMaster", + "hello" + ], + "appName": "heartbeatBackoffSuccessTest", + "closeConnection": true + } + } + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + "expectError": { + "isError": true + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "off" + } + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatSucceededEvent": {} + }, + "count": 1 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "poolBackoffEvent": {} + }, + { + "poolBackoffEvent": {} + }, + { + "poolBackoffEvent": {} + }, + { + "poolBackoffEvent": {} + }, + { + "poolBackoffEvent": {} + }, + { + "poolBackoffEvent": {} + } + ] + } + ] + } + ] +} diff --git a/test/discovery_and_monitoring/unified/backoff-network-error-fail.json b/test/discovery_and_monitoring/unified/backoff-network-error-fail.json new file mode 100644 index 0000000000..c128198415 --- /dev/null +++ b/test/discovery_and_monitoring/unified/backoff-network-error-fail.json @@ -0,0 +1,302 @@ +{ + "description": "backoff-network-error-fail", + "schemaVersion": "1.28", + "runOnRequirements": [ + { + "minServerVersion": "4.4", + "serverless": "forbid", + "topologies": [ + "single", + "replicaset", + "sharded" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "setupClient", + "useMultipleMongoses": false + } + } + ], + "initialData": [ + { + "collectionName": "backoff-network-error-fail", + "databaseName": "sdam-tests", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + ], + "tests": [ + { + "description": "Backoff and fail after network connection errors during connection establishment", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatSucceededEvent", + "commandStartedEvent", + "poolBackoffEvent", + "poolReadyEvent", + "poolClearedEvent" + ], + "uriOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 10000, + "serverMonitoringMode": "poll", + "appname": "backoffNetworkErrorFailTest" + } + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "sdam-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "backoff-network-error-fail" + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatSucceededEvent": {} + }, + "count": 1 + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "isMaster", + "hello" + ], + "appName": "backoffNetworkErrorFailTest", + "closeConnection": true + } + } + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + "expectError": { + "isError": true + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "poolBackoffEvent": {} + }, + "count": 5 + } + } + ] + }, + { + "description": "Backoff and clear the pool after network failures followed by server error", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatSucceededEvent", + "commandStartedEvent", + "poolBackoffEvent", + "poolReadyEvent", + "poolClearedEvent" + ], + "uriOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 10000, + "serverMonitoringMode": "poll", + "appname": "backoffNetworkErrorFailClearTest" + } + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "sdam-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "backoff-network-error-fail-clear" + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatSucceededEvent": {} + }, + "count": 1 + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "isMaster", + "hello" + ], + "appName": "backoffNetworkErrorFailClearTest", + "closeConnection": true + } + } + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + "expectError": { + "isError": true + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "poolBackoffEvent": {} + }, + "count": 5 + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "off" + } + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "hello", + "isMaster" + ], + "appName": "backoffNetworkErrorFailClearTest", + "errorCode": 1 + } + } + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 3 + } + }, + "expectError": { + "isError": true + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "poolClearedEvent": {} + }, + "count": 1 + } + } + ] + } + ] +} diff --git a/test/discovery_and_monitoring/unified/backoff-network-error.json b/test/discovery_and_monitoring/unified/backoff-network-error.json new file mode 100644 index 0000000000..b3b230ca6d --- /dev/null +++ b/test/discovery_and_monitoring/unified/backoff-network-error.json @@ -0,0 +1,204 @@ +{ + "description": "backoff-network-error", + "schemaVersion": "1.28", + "runOnRequirements": [ + { + "minServerVersion": "4.4", + "serverless": "forbid", + "topologies": [ + "single", + "replicaset", + "sharded" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "setupClient", + "useMultipleMongoses": false + } + } + ], + "initialData": [ + { + "collectionName": "backoff-network-error", + "databaseName": "sdam-tests", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + ], + "tests": [ + { + "description": "Backoff and retry after network connection error during connection establishment", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatSucceededEvent", + "commandStartedEvent", + "poolBackoffEvent", + "poolReadyEvent", + "poolClearedEvent" + ], + "uriOptions": { + "retryWrites": false, + "appname": "backoffNetworkErrorTest", + "heartbeatFrequencyMS": 10000, + "serverMonitoringMode": "poll", + "connectTimeoutMS": 250, + "socketTimeoutMS": 250 + } + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "sdam-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "backoff-network-error" + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatSucceededEvent": {} + }, + "count": 1 + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 2 + }, + "data": { + "failCommands": [ + "isMaster", + "hello" + ], + "closeConnection": true, + "appName": "backoffNetworkErrorTest" + } + } + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "poolBackoffEvent": { + "attempt": 1 + } + }, + "count": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "poolBackoffEvent": { + "attempt": 2 + } + }, + "count": 1 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "command", + "events": [ + { + "commandStartedEvent": { + "command": { + "insert": "backoff-network-error", + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + "commandName": "insert", + "databaseName": "sdam-tests" + } + } + ] + } + ], + "outcome": [ + { + "collectionName": "backoff-network-error", + "databaseName": "sdam-tests", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + }, + { + "_id": 4 + } + ] + } + ] + } + ] +} diff --git a/test/discovery_and_monitoring/unified/backoff-network-timeout-error.json b/test/discovery_and_monitoring/unified/backoff-network-timeout-error.json new file mode 100644 index 0000000000..87c50bc2f2 --- /dev/null +++ b/test/discovery_and_monitoring/unified/backoff-network-timeout-error.json @@ -0,0 +1,200 @@ +{ + "description": "backoff-network-timeout-error", + "schemaVersion": "1.28", + "runOnRequirements": [ + { + "minServerVersion": "4.4", + "serverless": "forbid", + "topologies": [ + "single", + "replicaset", + "sharded" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "setupClient", + "useMultipleMongoses": false + } + } + ], + "initialData": [ + { + "collectionName": "backoff-network-timeout-error", + "databaseName": "sdam-tests", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + ], + "tests": [ + { + "description": "Backoff and retry after network timeout error during connection establishment", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatSucceededEvent", + "commandStartedEvent", + "poolBackoffEvent", + "poolReadyEvent", + "poolClearedEvent" + ], + "uriOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 10000, + "appname": "backoffNetworkTimeoutErrorTest", + "serverMonitoringMode": "poll", + "connectTimeoutMS": 250, + "socketTimeoutMS": 250 + } + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "sdam-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "backoff-network-timeout-error" + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatSucceededEvent": {} + }, + "count": 1 + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "setupClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 2 + }, + "data": { + "failCommands": [ + "isMaster", + "hello" + ], + "closeConnection": true, + "appName": "backoffNetworkTimeoutErrorTest" + } + } + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "poolBackoffEvent": {} + }, + "count": 2 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "poolReadyEvent": {} + }, + "count": 1 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "command", + "events": [ + { + "commandStartedEvent": { + "command": { + "insert": "backoff-network-timeout-error", + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + "commandName": "insert", + "databaseName": "sdam-tests" + } + } + ] + } + ], + "outcome": [ + { + "collectionName": "backoff-network-timeout-error", + "databaseName": "sdam-tests", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + }, + { + "_id": 4 + } + ] + } + ] + } + ] +} diff --git a/test/load_balancer/sdam-error-handling.json b/test/load_balancer/sdam-error-handling.json index b9842b8017..7654eff6fb 100644 --- a/test/load_balancer/sdam-error-handling.json +++ b/test/load_balancer/sdam-error-handling.json @@ -1,6 +1,6 @@ { "description": "state change errors are correctly handled", - "schemaVersion": "1.4", + "schemaVersion": "1.28", "runOnRequirements": [ { "topologies": [ @@ -32,8 +32,6 @@ "useMultipleMongoses": false, "uriOptions": { "appname": "lbSDAMErrorTestClient", - "socketTimeoutMS": 500, - "connectTimeoutMS": 500, "retryWrites": false }, "observeEvents": [ @@ -43,7 +41,8 @@ "connectionCheckOutFailedEvent", "connectionCheckedInEvent", "connectionClosedEvent", - "poolClearedEvent" + "poolClearedEvent", + "poolBackoffEvent" ] } }, @@ -66,9 +65,7 @@ "id": "multiClient", "useMultipleMongoses": true, "uriOptions": { - "retryWrites": false, - "socketTimeoutMS": 500, - "connectTimeoutMS": 500 + "retryWrites": false }, "observeEvents": [ "connectionCreatedEvent", @@ -264,7 +261,7 @@ ] }, { - "description": "errors during the initial connection hello are ignored", + "description": "errors during the initial connection hello trigger backoff", "runOnRequirements": [ { "minServerVersion": "4.4.7" @@ -286,8 +283,7 @@ "isMaster", "hello" ], - "blockConnection": true, - "blockTimeMS": 1000, + "closeConnection": true, "appName": "lbSDAMErrorTestClient" } } @@ -300,9 +296,17 @@ "document": { "x": 1 } - }, - "expectError": { - "isClientError": true + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "singleClient", + "event": { + "poolBackoffEvent": {} + }, + "count": 1 } } ], @@ -319,10 +323,27 @@ "reason": "error" } }, + { + "poolBackoffEvent": { + "attempt": 1 + } + }, { "connectionCheckOutFailedEvent": { "reason": "connectionError" } + }, + { + "connectionCreatedEvent": {} + }, + { + "connectionReadyEvent": {} + }, + { + "connectionCheckedOutEvent": {} + }, + { + "connectionCheckedInEvent": {} } ] } @@ -350,8 +371,7 @@ "failCommands": [ "saslContinue" ], - "blockConnection": true, - "blockTimeMS": 1000, + "closeConnection": true, "appName": "lbSDAMErrorTestClient" } } @@ -412,8 +432,7 @@ "failCommands": [ "getMore" ], - "closeConnection": true, - "appName": "lbSDAMErrorTestClient" + "closeConnection": true } } } diff --git a/test/test_connection_monitoring.py b/test/test_connection_monitoring.py index 1405824453..580d214541 100644 --- a/test/test_connection_monitoring.py +++ b/test/test_connection_monitoring.py @@ -51,6 +51,7 @@ ConnectionClosedReason, ConnectionCreatedEvent, ConnectionReadyEvent, + PoolBackoffEvent, PoolClearedEvent, PoolClosedEvent, PoolCreatedEvent, @@ -75,6 +76,7 @@ "ConnectionPoolReady": PoolReadyEvent, "ConnectionPoolCleared": PoolClearedEvent, "ConnectionPoolClosed": PoolClosedEvent, + "ConnectionPoolBackoff": PoolBackoffEvent, # Error types. "PoolClosedError": _PoolClosedError, "WaitQueueTimeoutError": WaitQueueTimeoutError, diff --git a/test/test_pooling.py b/test/test_pooling.py index f3bfcf4ba2..4d17139953 100644 --- a/test/test_pooling.py +++ b/test/test_pooling.py @@ -515,7 +515,7 @@ def test_connection_timeout_message(self): def test_pool_check_backoff(self): # Test that Pool recovers from two connection failures in a row. # This exercises code at the end of Pool._check(). - cx_pool = self.create_pool(max_pool_size=1, connect_timeout=1, wait_queue_timeout=1) + cx_pool = self.create_pool(max_pool_size=1, connect_timeout=1, wait_queue_timeout=10) self.addCleanup(cx_pool.close) with cx_pool.checkout() as conn: @@ -524,7 +524,7 @@ def test_pool_check_backoff(self): conn.conn.close() # Enable backoff. - cx_pool._backoff = 1 + cx_pool.backoff() # Swap pool's address with a bad one. address, cx_pool.address = cx_pool.address, ("foo.com", 1234) diff --git a/test/unified_format.py b/test/unified_format.py index 3496b2ad44..630a525c14 100644 --- a/test/unified_format.py +++ b/test/unified_format.py @@ -429,7 +429,7 @@ class UnifiedSpecTestMixinV1(IntegrationTest): a class attribute ``TEST_SPEC``. """ - SCHEMA_VERSION = Version.from_string("1.22") + SCHEMA_VERSION = Version.from_string("1.28") RUN_ON_LOAD_BALANCER = True TEST_SPEC: Any TEST_PATH = "" # This gets filled in by generate_test_classes diff --git a/test/unified_format_shared.py b/test/unified_format_shared.py index 17dd73ec8c..1bc60a34ec 100644 --- a/test/unified_format_shared.py +++ b/test/unified_format_shared.py @@ -64,6 +64,7 @@ ConnectionClosedEvent, ConnectionCreatedEvent, ConnectionReadyEvent, + PoolBackoffEvent, PoolClearedEvent, PoolClosedEvent, PoolCreatedEvent, @@ -618,6 +619,10 @@ def match_event(self, expectation, actual): self.test.assertIsInstance(actual.interrupt_connections, bool) elif name == "poolClosedEvent": self.test.assertIsInstance(actual, PoolClosedEvent) + elif name == "poolBackoffEvent": + self.test.assertIsInstance(actual, PoolBackoffEvent) + self.test.assertIsInstance(actual.duration_ms, int) + self.test.assertIsInstance(actual.attempt, int) elif name == "connectionCreatedEvent": self.test.assertIsInstance(actual, ConnectionCreatedEvent) elif name == "connectionReadyEvent": diff --git a/test/utils.py b/test/utils.py index bfc606fe83..80491f4d26 100644 --- a/test/utils.py +++ b/test/utils.py @@ -240,6 +240,7 @@ def __init__(self, address, options, is_sdam=False, client_id=None): self.opts = options self.operation_count = 0 self.conns = [] + self.state = 0 def stale_generation(self, gen, service_id): return self.gen.stale(gen, service_id) diff --git a/test/utils_shared.py b/test/utils_shared.py index f2e8852f0c..e64d600a46 100644 --- a/test/utils_shared.py +++ b/test/utils_shared.py @@ -48,6 +48,7 @@ ConnectionClosedEvent, ConnectionCreatedEvent, ConnectionReadyEvent, + PoolBackoffEvent, PoolClearedEvent, PoolClosedEvent, PoolCreatedEvent, @@ -142,6 +143,10 @@ def pool_closed(self, event): assert isinstance(event, PoolClosedEvent) self.add_event(event) + def pool_backoff(self, event): + assert isinstance(event, PoolBackoffEvent) + self.add_event(event) + class EventListener(BaseListener, monitoring.CommandListener): def __init__(self):