Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions alibabacloud_credentials/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ def __init__(
#### ECS RAM Role Type
- `role_name` (str): Role name.
- `disable_imds_v1` (bool, optional): Whether to disable IMDS v1. Default is `False`.
- `enable_imds_v2` (bool, optional): Whether to enable IMDS v2. Default is `None`.
- `metadata_token_duration` (int, optional): Metadata token expiration time in seconds. Default is `None`.

#### Credentials URI Type
- `credentials_uri` (str): Credentials URI.
Expand Down
37 changes: 9 additions & 28 deletions alibabacloud_credentials/provider/refreshable.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import random
import asyncio
import threading
import weakref
import logging
import time
import atexit
from datetime import datetime
from enum import Enum
from typing import Callable, Generic, TypeVar, Coroutine, Any
from threading import Semaphore
from concurrent.futures.thread import ThreadPoolExecutor, _worker, _base, _threads_queues
from concurrent.futures.thread import ThreadPoolExecutor

from alibabacloud_credentials.exceptions import CredentialException
from alibabacloud_credentials_api import ICredentials
Expand All @@ -20,36 +20,15 @@
INT64_MAX = 2 ** 63 - 1
MAX_CONCURRENT_REFRESHES = 100
CONCURRENT_REFRESH_LEASES = Semaphore(MAX_CONCURRENT_REFRESHES)
EXECUTOR = ThreadPoolExecutor(max_workers=INT64_MAX, thread_name_prefix='non-blocking-refresh')


class _DaemonThreadPoolExecutor(ThreadPoolExecutor):
def _adjust_thread_count(self):
# if idle threads are available, don't spin new threads
if self._idle_semaphore.acquire(timeout=0):
return

# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)

num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix or self,
num_threads)
t = threading.Thread(target=_worker,
name=thread_name,
args=(weakref.ref(self, weakref_cb),
self._work_queue,
self._initializer,
self._initargs),
daemon=True) # Set thread as daemon
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
def _shutdown_handler():
log.debug("Shutting down executor...")
EXECUTOR.shutdown(wait=False)


EXECUTOR = _DaemonThreadPoolExecutor(max_workers=INT64_MAX, thread_name_prefix='non-blocking-refresh')
atexit.register(_shutdown_handler)


def _jitter_time(now: int, jitter_start: int, jitter_end: int) -> int:
Expand Down Expand Up @@ -141,6 +120,8 @@ def prefetch(self, action: Callable):

try:
EXECUTOR.submit(action)
except KeyboardInterrupt:
_shutdown_handler()
except Exception as t:
log.warning(f'Exception occurred when submitting background task.', exc_info=True)
finally:
Expand Down