From a5b2d854cc90a56b09e0770072d0ccf62d36c095 Mon Sep 17 00:00:00 2001 From: George Feinberg Date: Wed, 2 Jul 2025 13:26:28 -0400 Subject: [PATCH] Fix use of threads for background rate limiter updates when enabled. Previously a hang might result when enabling rate limiting --- CHANGELOG.md | 7 +++++++ src/borneo/client.py | 12 ++++++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b605970..570451d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/). ==================== +# [Unreleased] + +## Fixed + +- Cloud only: Fix thread used when rate limiting is enabled. Previously enabling + rate limiting might cause a hang + # 5.4.2 - 2024-05-13 ## Added diff --git a/src/borneo/client.py b/src/borneo/client.py index 4ce5ec1..57e7664 100644 --- a/src/borneo/client.py +++ b/src/borneo/client.py @@ -5,8 +5,8 @@ # https://oss.oracle.com/licenses/upl/ # +from concurrent.futures import ThreadPoolExecutor from logging import DEBUG -from multiprocessing import pool from platform import python_version from sys import version_info from threading import Lock @@ -105,7 +105,7 @@ def __init__(self, config, logger): 'Starting client with rate limiting enabled') self._rate_limiter_map = RateLimiterMap() self._table_limit_update_map = dict() - self._threadpool = pool.ThreadPool(1) + self._threadpool = ThreadPoolExecutor(max_workers=1) else: self._logutils.log_debug('Starting client with no rate limiting') self._rate_limiter_map = None @@ -126,7 +126,7 @@ def background_update_limiters(self, table_name): return self._set_table_needs_refresh(table_name, False) try: - self._threadpool.map(self._update_table_limiters, ['table_name']) + self._threadpool.submit(self._update_table_limiters, ['table_name']) except RuntimeError: self._set_table_needs_refresh(table_name, True) @@ -142,14 +142,14 @@ def enable_rate_limiting(self, enable, use_percent): if enable and self._rate_limiter_map is None: self._rate_limiter_map = RateLimiterMap() self._table_limit_update_map = dict() - self._threadpool = pool.ThreadPool(1) + self._threadpool = ThreadPoolExecutor(max_workers=1) elif not enable and self._rate_limiter_map is not None: self._rate_limiter_map.clear() self._rate_limiter_map = None self._table_limit_update_map.clear() self._table_limit_update_map = None if self._threadpool is not None: - self._threadpool.close() + self._threadpool.shutdown() self._threadpool = None def execute(self, request): @@ -363,7 +363,7 @@ def shut_down(self): if self._sess is not None: self._sess.close() if self._threadpool is not None: - self._threadpool.close() + self._threadpool.shutdown() if self._stats_control is not None: self._stats_control.shutdown()