1919from datetime import datetime
2020
2121import grpc
22-
2322from . import bindings , constants , functions , loader , protos
2423from .bindings .shared_memory_data_transfer import SharedMemoryManager
2524from .constants import (PYTHON_ROLLBACK_CWD_PATH ,
3332 PYTHON_LANGUAGE_RUNTIME , PYTHON_ENABLE_INIT_INDEXING ,
3433 METADATA_PROPERTIES_WORKER_INDEXED )
3534from .extension import ExtensionManager
35+ from .http_v2 import http_coordinator , initialize_http_server , HttpV2Registry , \
36+ sync_http_request , HttpServerInitError
3637from .logging import disable_console_logging , enable_console_logging
3738from .logging import (logger , error_logger , is_system_log_category ,
3839 CONSOLE_LOG_PREFIX , format_exception )
@@ -158,6 +159,7 @@ async def dispatch_forever(self): # sourcery skip: swap-if-expression
158159
159160 log_level = logging .INFO if not is_envvar_true (
160161 PYTHON_ENABLE_DEBUG_LOGGING ) else logging .DEBUG
162+
161163 root_logger .setLevel (log_level )
162164 root_logger .addHandler (logging_handler )
163165 logger .info ('Switched to gRPC logging.' )
@@ -189,7 +191,8 @@ def stop(self) -> None:
189191
190192 self ._stop_sync_call_tp ()
191193
192- def on_logging (self , record : logging .LogRecord , formatted_msg : str ) -> None :
194+ def on_logging (self , record : logging .LogRecord ,
195+ formatted_msg : str ) -> None :
193196 if record .levelno >= logging .CRITICAL :
194197 log_level = protos .RpcLog .Critical
195198 elif record .levelno >= logging .ERROR :
@@ -306,6 +309,13 @@ async def _handle__worker_init_request(self, request):
306309 self .load_function_metadata (
307310 worker_init_request .function_app_directory ,
308311 caller_info = "worker_init_request" )
312+
313+ if HttpV2Registry .http_v2_enabled ():
314+ capabilities [constants .HTTP_URI ] = \
315+ initialize_http_server (self ._host )
316+
317+ except HttpServerInitError :
318+ raise
309319 except Exception as ex :
310320 self ._function_metadata_exception = ex
311321
@@ -508,6 +518,7 @@ async def _handle__invocation_request(self, request):
508518 logger .info (', ' .join (function_invocation_logs ))
509519
510520 args = {}
521+
511522 for pb in invoc_request .input_data :
512523 pb_type_info = fi .input_types [pb .name ]
513524 if bindings .is_trigger_binding (pb_type_info .binding_name ):
@@ -523,7 +534,19 @@ async def _handle__invocation_request(self, request):
523534 shmem_mgr = self ._shmem_mgr ,
524535 is_deferred_binding = pb_type_info .deferred_bindings_enabled )
525536
526- fi_context = self ._get_context (invoc_request , fi .name , fi .directory )
537+ http_v2_enabled = self ._functions .get_function (function_id ) \
538+ .is_http_func and \
539+ HttpV2Registry .http_v2_enabled ()
540+
541+ if http_v2_enabled :
542+ http_request = await http_coordinator .get_http_request_async (
543+ invocation_id )
544+
545+ await sync_http_request (http_request , invoc_request )
546+ args [fi .trigger_metadata .get ('param_name' )] = http_request
547+
548+ fi_context = self ._get_context (invoc_request , fi .name ,
549+ fi .directory )
527550
528551 # Use local thread storage to store the invocation ID
529552 # for a customer's threads
@@ -536,17 +559,21 @@ async def _handle__invocation_request(self, request):
536559 args [name ] = bindings .Out ()
537560
538561 if fi .is_async :
539- call_result = await self ._run_async_func (
540- fi_context , fi .func , args
541- )
562+ call_result = \
563+ await self ._run_async_func (fi_context , fi .func , args )
542564 else :
543565 call_result = await self ._loop .run_in_executor (
544566 self ._sync_call_tp ,
545567 self ._run_sync_func ,
546568 invocation_id , fi_context , fi .func , args )
569+
547570 if call_result is not None and not fi .has_return :
548- raise RuntimeError (f'function { fi .name !r} without a $return '
549- 'binding returned a non-None value' )
571+ raise RuntimeError (
572+ f'function { fi .name !r} without a $return binding'
573+ 'returned a non-None value' )
574+
575+ if http_v2_enabled :
576+ http_coordinator .set_http_response (invocation_id , call_result )
550577
551578 output_data = []
552579 cache_enabled = self ._function_data_cache_enabled
@@ -566,10 +593,12 @@ async def _handle__invocation_request(self, request):
566593 output_data .append (param_binding )
567594
568595 return_value = None
569- if fi .return_type is not None :
596+ if fi .return_type is not None and not http_v2_enabled :
570597 return_value = bindings .to_outgoing_proto (
571- fi .return_type .binding_name , call_result ,
572- pytype = fi .return_type .pytype )
598+ fi .return_type .binding_name ,
599+ call_result ,
600+ pytype = fi .return_type .pytype ,
601+ )
573602
574603 # Actively flush customer print() function to console
575604 sys .stdout .flush ()
@@ -584,6 +613,9 @@ async def _handle__invocation_request(self, request):
584613 output_data = output_data ))
585614
586615 except Exception as ex :
616+ if http_v2_enabled :
617+ http_coordinator .set_http_response (invocation_id , ex )
618+
587619 return protos .StreamingMessage (
588620 request_id = self .request_id ,
589621 invocation_response = protos .InvocationResponse (
@@ -640,11 +672,18 @@ async def _handle__function_environment_reload_request(self, request):
640672 # reload_customer_libraries call clears the registry
641673 bindings .load_binding_registry ()
642674
675+ capabilities = {}
643676 if is_envvar_true (PYTHON_ENABLE_INIT_INDEXING ):
644677 try :
645678 self .load_function_metadata (
646679 directory ,
647680 caller_info = "environment_reload_request" )
681+
682+ if HttpV2Registry .http_v2_enabled ():
683+ capabilities [constants .HTTP_URI ] = \
684+ initialize_http_server (self ._host )
685+ except HttpServerInitError :
686+ raise
648687 except Exception as ex :
649688 self ._function_metadata_exception = ex
650689
@@ -655,7 +694,7 @@ async def _handle__function_environment_reload_request(self, request):
655694 func_env_reload_request .function_app_directory )
656695
657696 success_response = protos .FunctionEnvironmentReloadResponse (
658- capabilities = {} ,
697+ capabilities = capabilities ,
659698 worker_metadata = self .get_worker_metadata (),
660699 result = protos .StatusResult (
661700 status = protos .StatusResult .Success ))
@@ -676,8 +715,10 @@ async def _handle__function_environment_reload_request(self, request):
676715
677716 def index_functions (self , function_path : str ):
678717 indexed_functions = loader .index_function_app (function_path )
679- logger .info ('Indexed function app and found %s functions' ,
680- len (indexed_functions ))
718+ logger .info (
719+ "Indexed function app and found %s functions" ,
720+ len (indexed_functions )
721+ )
681722
682723 if indexed_functions :
683724 fx_metadata_results , fx_bindings_logs = (
@@ -747,7 +788,8 @@ async def _handle__close_shared_memory_resources_request(self, request):
747788 @staticmethod
748789 def _get_context (invoc_request : protos .InvocationRequest , name : str ,
749790 directory : str ) -> bindings .Context :
750- """ For more information refer: https://aka.ms/azfunc-invocation-context
791+ """ For more information refer:
792+ https://aka.ms/azfunc-invocation-context
751793 """
752794 trace_context = bindings .TraceContext (
753795 invoc_request .trace_context .trace_parent ,
@@ -889,7 +931,6 @@ def gen(resp_queue):
889931
890932
891933class AsyncLoggingHandler (logging .Handler ):
892-
893934 def emit (self , record : LogRecord ) -> None :
894935 # Since we disable console log after gRPC channel is initiated,
895936 # we should redirect all the messages into dispatcher.
0 commit comments