2323from . import protos
2424from . import constants
2525
26- from .constants import CONSOLE_LOG_PREFIX
26+ from .constants import (
27+ CONSOLE_LOG_PREFIX ,
28+ PYTHON_THREADPOOL_THREAD_COUNT ,
29+ PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT ,
30+ PYTHON_THREADPOOL_THREAD_COUNT_MIN ,
31+ PYTHON_THREADPOOL_THREAD_COUNT_MAX
32+ )
2733from .logging import error_logger , logger , is_system_log_category
2834from .logging import enable_console_logging , disable_console_logging
35+ from .utils .common import get_app_setting
2936from .utils .tracing import marshall_exception_trace
3037from .utils .wrappers import disable_feature_by
3138from asyncio import BaseEventLoop
@@ -62,24 +69,19 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,
6269
6370 self ._old_task_factory = None
6471
65- # A thread-pool for synchronous function calls. We limit
66- # the number of threads to 1 so that one Python worker can
67- # only run one synchronous function in parallel. This is
68- # because synchronous code in Python is rarely designed with
69- # concurrency in mind, so we don't want to allow users to
70- # have races in their synchronous functions. Moreover,
71- # because of the GIL in CPython, it rarely makes sense to
72- # use threads (unless the code is IO bound, but we have
73- # async support for that.)
74- self ._sync_call_tp = concurrent .futures .ThreadPoolExecutor (
75- max_workers = 1 )
76-
77- self ._grpc_connect_timeout = grpc_connect_timeout
72+ # We allow the customer to change synchronous thread pool count by
73+ # PYTHON_THREADPOOL_THREAD_COUNT app setting. The default value is 1.
74+ self ._sync_tp_max_workers : int = self ._get_sync_tp_max_workers ()
75+ self ._sync_call_tp : concurrent .futures .Executor = (
76+ concurrent .futures .ThreadPoolExecutor (
77+ max_workers = self ._sync_tp_max_workers ))
78+
79+ self ._grpc_connect_timeout : float = grpc_connect_timeout
7880 # This is set to -1 by default to remove the limitation on msg size
79- self ._grpc_max_msg_len = grpc_max_msg_len
81+ self ._grpc_max_msg_len : int = grpc_max_msg_len
8082 self ._grpc_resp_queue : queue .Queue = queue .Queue ()
8183 self ._grpc_connected_fut = loop .create_future ()
82- self ._grpc_thread = threading .Thread (
84+ self ._grpc_thread : threading . Thread = threading .Thread (
8385 name = 'grpc-thread' , target = self .__poll_grpc )
8486
8587 @classmethod
@@ -89,7 +91,9 @@ async def connect(cls, host: str, port: int, worker_id: str,
8991 disp = cls (loop , host , port , worker_id , request_id , connect_timeout )
9092 disp ._grpc_thread .start ()
9193 await disp ._grpc_connected_fut
92- logger .info ('Successfully opened gRPC channel to %s:%s' , host , port )
94+ logger .info ('Successfully opened gRPC channel to %s:%s '
95+ 'with sync threadpool max workers set to %s' ,
96+ host , port , disp ._sync_tp_max_workers )
9397 return disp
9498
9599 async def dispatch_forever (self ):
@@ -122,21 +126,21 @@ async def dispatch_forever(self):
122126 # established, should use it for system and user logs
123127 logging_handler = AsyncLoggingHandler ()
124128 root_logger = logging .getLogger ()
125- root_logger .setLevel (logging .INFO )
129+ root_logger .setLevel (logging .DEBUG )
126130 root_logger .addHandler (logging_handler )
127131 logger .info ('Switched to gRPC logging.' )
128132 logging_handler .flush ()
129133
130134 try :
131135 await forever
132136 finally :
133- logger .warn ('Detaching gRPC logging due to exception.' )
137+ logger .warning ('Detaching gRPC logging due to exception.' )
134138 logging_handler .flush ()
135139 root_logger .removeHandler (logging_handler )
136140
137141 # Reenable console logging when there's an exception
138142 enable_console_logging ()
139- logger .warn ('Switched to console logging due to exception.' )
143+ logger .warning ('Switched to console logging due to exception.' )
140144 finally :
141145 DispatcherMeta .__current_dispatcher__ = None
142146
@@ -210,8 +214,8 @@ def _serialize_exception(exc: Exception):
210214 try :
211215 message = f'{ type (exc ).__name__ } : { exc } '
212216 except Exception :
213- message = (f 'Unhandled exception in function. '
214- f 'Could not serialize original exception message.' )
217+ message = ('Unhandled exception in function. '
218+ 'Could not serialize original exception message.' )
215219
216220 try :
217221 stack_trace = marshall_exception_trace (exc )
@@ -475,7 +479,29 @@ def _change_cwd(self, new_cwd: str):
475479 os .chdir (new_cwd )
476480 logger .info ('Changing current working directory to %s' , new_cwd )
477481 else :
478- logger .warn ('Directory %s is not found when reloading' , new_cwd )
482+ logger .warning ('Directory %s is not found when reloading' , new_cwd )
483+
484+ def _get_sync_tp_max_workers (self ) -> int :
485+ def tp_max_workers_validator (value : str ) -> bool :
486+ try :
487+ int_value = int (value )
488+ except ValueError :
489+ logger .warning (f'{ PYTHON_THREADPOOL_THREAD_COUNT } must be an '
490+ 'integer' )
491+ return False
492+
493+ if int_value < PYTHON_THREADPOOL_THREAD_COUNT_MIN or (
494+ int_value > PYTHON_THREADPOOL_THREAD_COUNT_MAX ):
495+ logger .warning (f'{ PYTHON_THREADPOOL_THREAD_COUNT } must be set '
496+ 'to a value between 1 and 32' )
497+ return False
498+
499+ return True
500+
501+ return int (get_app_setting (
502+ setting = PYTHON_THREADPOOL_THREAD_COUNT ,
503+ default_value = f'{ PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT } ' ,
504+ validator = tp_max_workers_validator ))
479505
480506 def __run_sync_func (self , invocation_id , func , params ):
481507 # This helper exists because we need to access the current
0 commit comments