1818
1919import grpc
2020
21- from . import bindings
22- from . import constants
23- from . import functions
24- from . import loader
25- from . import protos
21+ from . import bindings , constants , functions , loader , protos
2622from .bindings .shared_memory_data_transfer import SharedMemoryManager
2723from .constants import (PYTHON_THREADPOOL_THREAD_COUNT ,
2824 PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT ,
2925 PYTHON_THREADPOOL_THREAD_COUNT_MAX_37 ,
3026 PYTHON_THREADPOOL_THREAD_COUNT_MIN ,
31- PYTHON_ENABLE_DEBUG_LOGGING )
27+ PYTHON_ENABLE_DEBUG_LOGGING , SCRIPT_FILE_NAME )
3228from .extension import ExtensionManager
3329from .logging import disable_console_logging , enable_console_logging
3430from .logging import enable_debug_logging_recommendation
4036from .utils .wrappers import disable_feature_by
4137from .version import VERSION
4238
43-
4439_TRUE = "true"
4540
4641"""In Python 3.6, the current_task method was in the Task class, but got moved
@@ -260,13 +255,13 @@ async def _dispatch_grpc_request(self, request):
260255 resp = await request_handler (request )
261256 self ._grpc_resp_queue .put_nowait (resp )
262257
263- async def _handle__worker_init_request (self , req ):
258+ async def _handle__worker_init_request (self , request ):
264259 logger .info ('Received WorkerInitRequest, '
265260 'python version %s, worker version %s, request ID %s' ,
266261 sys .version , VERSION , self .request_id )
267262 enable_debug_logging_recommendation ()
268263
269- worker_init_request = req .worker_init_request
264+ worker_init_request = request .worker_init_request
270265 host_capabilities = worker_init_request .capabilities
271266 if constants .FUNCTION_DATA_CACHE in host_capabilities :
272267 val = host_capabilities [constants .FUNCTION_DATA_CACHE ]
@@ -294,42 +289,93 @@ async def _handle__worker_init_request(self, req):
294289 result = protos .StatusResult (
295290 status = protos .StatusResult .Success )))
296291
297- async def _handle__worker_status_request (self , req ):
292+ async def _handle__worker_status_request (self , request ):
298293 # Logging is not necessary in this request since the response is used
299294 # for host to judge scale decisions of out-of-proc languages.
300295 # Having log here will reduce the responsiveness of the worker.
301296 return protos .StreamingMessage (
302- request_id = req .request_id ,
297+ request_id = request .request_id ,
303298 worker_status_response = protos .WorkerStatusResponse ())
304299
305- async def _handle__function_load_request (self , req ):
306- func_request = req .function_load_request
300+ async def _handle__functions_metadata_request (self , request ):
301+ metadata_request = request .functions_metadata_request
302+ directory = metadata_request .function_app_directory
303+ function_path = os .path .join (directory , SCRIPT_FILE_NAME )
304+
305+ if not os .path .exists (function_path ):
306+ # Fallback to legacy model
307+ logger .info (f"{ SCRIPT_FILE_NAME } does not exist. "
308+ "Switching to host indexing." )
309+ return protos .StreamingMessage (
310+ request_id = request .request_id ,
311+ function_metadata_response = protos .FunctionMetadataResponse (
312+ use_default_metadata_indexing = True ,
313+ result = protos .StatusResult (
314+ status = protos .StatusResult .Success )))
315+
316+ try :
317+ fx_metadata_results = []
318+ indexed_functions = loader .index_function_app (function_path )
319+ if indexed_functions :
320+ indexed_function_logs : List [str ] = []
321+ for func in indexed_functions :
322+ function_log = \
323+ f"Function Name: { func .get_function_name ()} " \
324+ "Function Binding: " \
325+ f"{ [binding .name for binding in func .get_bindings ()]} "
326+ indexed_function_logs .append (function_log )
327+
328+ logger .info (
329+ f'Successfully processed FunctionMetadataRequest for '
330+ f'functions: { " " .join (indexed_function_logs )} ' )
331+
332+ fx_metadata_results = loader .process_indexed_function (
333+ self ._functions ,
334+ indexed_functions )
335+ else :
336+ logger .warning ("No functions indexed. Please refer to the "
337+ "documentation." )
338+
339+ return protos .StreamingMessage (
340+ request_id = request .request_id ,
341+ function_metadata_response = protos .FunctionMetadataResponse (
342+ function_metadata_results = fx_metadata_results ,
343+ result = protos .StatusResult (
344+ status = protos .StatusResult .Success )))
345+
346+ except Exception as ex :
347+ return protos .StreamingMessage (
348+ request_id = self .request_id ,
349+ function_metadata_response = protos .FunctionMetadataResponse (
350+ result = protos .StatusResult (
351+ status = protos .StatusResult .Failure ,
352+ exception = self ._serialize_exception (ex ))))
353+
354+ async def _handle__function_load_request (self , request ):
355+ func_request = request .function_load_request
307356 function_id = func_request .function_id
308357 function_name = func_request .metadata .name
309358
310- logger .info (f'Received FunctionLoadRequest, '
311- f'request ID: { self .request_id } , '
312- f'function ID: { function_id } '
313- f'function Name: { function_name } ' )
314359 try :
315- func = loader .load_function (
316- func_request .metadata .name ,
317- func_request .metadata .directory ,
318- func_request .metadata .script_file ,
319- func_request .metadata .entry_point )
320-
321- self ._functions .add_function (
322- function_id , func , func_request .metadata )
323-
324- ExtensionManager .function_load_extension (
325- function_name ,
326- func_request .metadata .directory
327- )
360+ if not self ._functions .get_function (function_id ):
361+ func = loader .load_function (
362+ func_request .metadata .name ,
363+ func_request .metadata .directory ,
364+ func_request .metadata .script_file ,
365+ func_request .metadata .entry_point )
366+
367+ self ._functions .add_function (
368+ function_id , func , func_request .metadata )
369+
370+ ExtensionManager .function_load_extension (
371+ function_name ,
372+ func_request .metadata .directory
373+ )
328374
329- logger .info ('Successfully processed FunctionLoadRequest, '
330- f'request ID: { self .request_id } , '
331- f'function ID: { function_id } ,'
332- f'function Name: { function_name } ' )
375+ logger .info ('Successfully processed FunctionLoadRequest, '
376+ f'request ID: { self .request_id } , '
377+ f'function ID: { function_id } ,'
378+ f'function Name: { function_name } ' )
333379
334380 return protos .StreamingMessage (
335381 request_id = self .request_id ,
@@ -347,8 +393,8 @@ async def _handle__function_load_request(self, req):
347393 status = protos .StatusResult .Failure ,
348394 exception = self ._serialize_exception (ex ))))
349395
350- async def _handle__invocation_request (self , req ):
351- invoc_request = req .invocation_request
396+ async def _handle__invocation_request (self , request ):
397+ invoc_request = request .invocation_request
352398 invocation_id = invoc_request .invocation_id
353399 function_id = invoc_request .function_id
354400
@@ -361,6 +407,7 @@ async def _handle__invocation_request(self, req):
361407 try :
362408 fi : functions .FunctionInfo = self ._functions .get_function (
363409 function_id )
410+ assert fi is not None
364411
365412 function_invocation_logs : List [str ] = [
366413 'Received FunctionInvocationRequest' ,
@@ -456,15 +503,16 @@ async def _handle__invocation_request(self, req):
456503 status = protos .StatusResult .Failure ,
457504 exception = self ._serialize_exception (ex ))))
458505
459- async def _handle__function_environment_reload_request (self , req ):
506+ async def _handle__function_environment_reload_request (self , request ):
460507 """Only runs on Linux Consumption placeholder specialization.
461508 """
462509 try :
463510 logger .info ('Received FunctionEnvironmentReloadRequest, '
464511 'request ID: %s' , self .request_id )
465512 enable_debug_logging_recommendation ()
466513
467- func_env_reload_request = req .function_environment_reload_request
514+ func_env_reload_request = \
515+ request .function_environment_reload_request
468516
469517 # Import before clearing path cache so that the default
470518 # azure.functions modules is available in sys.modules for
@@ -523,7 +571,7 @@ async def _handle__function_environment_reload_request(self, req):
523571 request_id = self .request_id ,
524572 function_environment_reload_response = failure_response )
525573
526- async def _handle__close_shared_memory_resources_request (self , req ):
574+ async def _handle__close_shared_memory_resources_request (self , request ):
527575 """
528576 Frees any memory maps that were produced as output for a given
529577 invocation.
@@ -534,7 +582,7 @@ async def _handle__close_shared_memory_resources_request(self, req):
534582 If the cache is not enabled, the worker should free the resources as at
535583 this point the host has read the memory maps and does not need them.
536584 """
537- close_request = req .close_shared_memory_resources_request
585+ close_request = request .close_shared_memory_resources_request
538586 map_names = close_request .map_names
539587 # Assign default value of False to all result values.
540588 # If we are successfully able to close a memory map, its result will be
0 commit comments