Skip to content

Commit ca32f02

Browse files
elpransmaiqbal11
authored andcommitted
Formalize worker logging (#231)
* Formalize worker logging The new `--log-to` and `--log-level` command line arguments allow configuring the logging behavior of the logger. The default is to log to stdout with the `INFO` level. Internal logging has been switched to use this logger, and a few debug-level logs added around various RPC messages. Fixes: #181. Fixes: #226. * Switch to split stderr/stdout logging by default * Fix split logging
1 parent 273c3dc commit ca32f02

File tree

7 files changed

+131
-62
lines changed

7 files changed

+131
-62
lines changed

azure/functions_worker/__init__.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +0,0 @@
1-
from . import aio_compat
2-
from . import dispatcher
3-
4-
5-
async def start_async(host, port, worker_id, request_id, grpc_max_msg_len):
6-
disp = await dispatcher.Dispatcher.connect(
7-
host, port, worker_id, request_id,
8-
connect_timeout=5.0, max_msg_len=grpc_max_msg_len)
9-
10-
await disp.dispatch_forever()
11-
12-
13-
def start(host: str, port: int, worker_id: str, request_id: str,
14-
grpc_max_msg_len: int):
15-
return aio_compat.run(start_async(
16-
host, port, worker_id, request_id, grpc_max_msg_len))

azure/functions_worker/__main__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from azure.functions_worker import main
2+
3+
if __name__ == '__main__':
4+
main.main()

azure/functions_worker/dispatcher.py

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
from . import loader
1818
from . import protos
1919

20+
from .logging import error_logger, logger
21+
2022

2123
class DispatcherMeta(type):
2224

@@ -62,8 +64,6 @@ def __init__(self, loop, host, port, worker_id, request_id,
6264
self._grpc_thread = threading.Thread(
6365
name='grpc-thread', target=self.__poll_grpc)
6466

65-
self._logger = logging.getLogger('azure.functions_worker')
66-
6767
@classmethod
6868
async def connect(cls, host, port, worker_id, request_id,
6969
connect_timeout, max_msg_len=None):
@@ -72,6 +72,7 @@ async def connect(cls, host, port, worker_id, request_id,
7272
connect_timeout, max_msg_len)
7373
disp._grpc_thread.start()
7474
await disp._grpc_connected_fut
75+
logger.info('Successfully opened gRPC channel to %s:%s', host, port)
7576
return disp
7677

7778
async def dispatch_forever(self):
@@ -186,14 +187,16 @@ async def _dispatch_grpc_request(self, request):
186187
# Don't crash on unknown messages. Some of them can be ignored;
187188
# and if something goes really wrong the host can always just
188189
# kill the worker's process.
189-
self._logger.error(
190+
logger.error(
190191
f'unknown StreamingMessage content type {content_type}')
191192
return
192193

193194
resp = await request_handler(request)
194195
self._grpc_resp_queue.put_nowait(resp)
195196

196197
async def _handle__worker_init_request(self, req):
198+
logger.info('Received WorkerInitRequest, request ID %s',
199+
self.request_id)
197200
return protos.StreamingMessage(
198201
request_id=self.request_id,
199202
worker_init_response=protos.WorkerInitResponse(
@@ -204,6 +207,9 @@ async def _handle__function_load_request(self, req):
204207
func_request = req.function_load_request
205208
function_id = func_request.function_id
206209

210+
logger.info('Received FunctionLoadRequest, request ID: %s, '
211+
'function ID: %s', self.request_id, function_id)
212+
207213
try:
208214
func = loader.load_function(
209215
func_request.metadata.name,
@@ -214,6 +220,10 @@ async def _handle__function_load_request(self, req):
214220
self._functions.add_function(
215221
function_id, func, func_request.metadata)
216222

223+
logger.info('Successfully processed FunctionLoadRequest, '
224+
'request ID: %s, function ID: %s',
225+
self.request_id, function_id)
226+
217227
return protos.StreamingMessage(
218228
request_id=self.request_id,
219229
function_load_response=protos.FunctionLoadResponse(
@@ -242,6 +252,10 @@ async def _handle__invocation_request(self, req):
242252
assert isinstance(current_task, ContextEnabledTask)
243253
current_task.set_azure_invocation_id(invocation_id)
244254

255+
logger.info('Received FunctionInvocationRequest, request ID: %s, '
256+
'function ID: %s, invocation ID: %s',
257+
self.request_id, function_id, invocation_id)
258+
245259
try:
246260
fi: functions.FunctionInfo = self._functions.get_function(
247261
function_id)
@@ -303,6 +317,10 @@ async def _handle__invocation_request(self, req):
303317
fi.return_type.binding_name, call_result,
304318
pytype=fi.return_type.pytype)
305319

320+
logger.info('Successfully processed FunctionInvocationRequest, '
321+
'request ID: %s, function ID: %s, invocation ID: %s',
322+
self.request_id, function_id, invocation_id)
323+
306324
return protos.StreamingMessage(
307325
request_id=self.request_id,
308326
invocation_response=protos.InvocationResponse(
@@ -371,14 +389,17 @@ def gen(resp_queue):
371389
if ex is grpc_req_stream:
372390
# Yes, this is how grpc_req_stream iterator exits.
373391
return
392+
error_logger.exception('unhandled error in gRPC thread')
374393
raise
375394

376395

377396
class AsyncLoggingHandler(logging.Handler):
378397

379398
def emit(self, record):
380-
msg = self.format(record)
381-
Dispatcher.current._on_logging(record, msg)
399+
if not record.name.startswith('azure.functions_worker'):
400+
# Skip worker system logs
401+
msg = self.format(record)
402+
Dispatcher.current._on_logging(record, msg)
382403

383404

384405
class ContextEnabledTask(asyncio.Task):

azure/functions_worker/logging.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import logging
2+
import logging.handlers
3+
import sys
4+
5+
6+
logger = logging.getLogger('azure.functions_worker')
7+
error_logger = logging.getLogger('azure.functions_worker_errors')
8+
9+
10+
def setup(log_level, log_destination):
11+
if log_level == 'TRACE':
12+
log_level = 'DEBUG'
13+
14+
formatter = logging.Formatter(
15+
'LanguageWorkerConsoleLog %(levelname)s: %(message)s')
16+
17+
error_handler = None
18+
handler = None
19+
20+
if log_destination is None:
21+
# With no explicit log destination we do split logging,
22+
# errors go into stderr, everything else -- to stdout.
23+
error_handler = logging.StreamHandler(sys.stderr)
24+
error_handler.setFormatter(formatter)
25+
error_handler.setLevel(getattr(logging, log_level))
26+
27+
handler = logging.StreamHandler(sys.stdout)
28+
29+
elif log_destination in ('stdout', 'stderr'):
30+
handler = logging.StreamHandler(getattr(sys, log_destination))
31+
32+
elif log_destination == 'syslog':
33+
handler = logging.handlers.SysLogHandler()
34+
35+
else:
36+
handler = logging.FileHandler(log_destination)
37+
38+
if error_handler is None:
39+
error_handler = handler
40+
41+
handler.setFormatter(formatter)
42+
handler.setLevel(getattr(logging, log_level))
43+
44+
logger.addHandler(handler)
45+
logger.setLevel(getattr(logging, log_level))
46+
47+
error_logger.addHandler(error_handler)
48+
error_logger.setLevel(getattr(logging, log_level))

azure/functions_worker/main.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
"""Main entrypoint."""
2+
3+
4+
import argparse
5+
6+
from . import aio_compat
7+
from . import dispatcher
8+
from . import logging
9+
from .logging import error_logger, logger
10+
11+
12+
def parse_args():
13+
parser = argparse.ArgumentParser(
14+
description='Python Azure Functions Worker')
15+
parser.add_argument('--host')
16+
parser.add_argument('--port', type=int)
17+
parser.add_argument('--workerId', dest='worker_id')
18+
parser.add_argument('--requestId', dest='request_id')
19+
parser.add_argument('--log-level', type=str, default='INFO',
20+
choices=['TRACE', 'INFO', 'WARNING', 'ERROR'],)
21+
parser.add_argument('--log-to', type=str, default=None,
22+
help='log destination: stdout, stderr, '
23+
'syslog, or a file path')
24+
parser.add_argument('--grpcMaxMessageLength', type=int,
25+
dest='grpc_max_msg_len')
26+
return parser.parse_args()
27+
28+
29+
def main():
30+
args = parse_args()
31+
logging.setup(log_level=args.log_level, log_destination=args.log_to)
32+
33+
logger.info('Starting Azure Functions Python Worker.')
34+
logger.info('Worker ID: %s, Request ID: %s, Host Address: %s:%s',
35+
args.worker_id, args.request_id, args.host, args.port)
36+
37+
try:
38+
return aio_compat.run(start_async(
39+
args.host, args.port, args.worker_id, args.request_id,
40+
args.grpc_max_msg_len))
41+
except Exception:
42+
error_logger.exception('unhandled error in functions worker')
43+
raise
44+
45+
46+
async def start_async(host, port, worker_id, request_id, grpc_max_msg_len):
47+
disp = await dispatcher.Dispatcher.connect(
48+
host, port, worker_id, request_id,
49+
connect_timeout=5.0, max_msg_len=grpc_max_msg_len)
50+
51+
await disp.dispatch_forever()

python/worker.py

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,4 @@
1-
"""Main entrypoint."""
2-
3-
4-
import argparse
5-
import traceback
6-
7-
8-
def parse_args():
9-
parser = argparse.ArgumentParser(
10-
description='Python Azure Functions Worker')
11-
parser.add_argument('--host')
12-
parser.add_argument('--port', type=int)
13-
parser.add_argument('--workerId', dest='worker_id')
14-
parser.add_argument('--requestId', dest='request_id')
15-
parser.add_argument('--grpcMaxMessageLength', type=int,
16-
dest='grpc_max_msg_len')
17-
return parser.parse_args()
18-
19-
20-
def main():
21-
args = parse_args()
22-
23-
import azure.functions # NoQA
24-
import azure.functions_worker
25-
26-
try:
27-
azure.functions_worker.start(
28-
args.host, args.port, args.worker_id, args.request_id,
29-
args.grpc_max_msg_len)
30-
except Exception:
31-
print(traceback.format_exc(), flush=True)
32-
raise
33-
1+
from azure.functions_worker import main
342

353
if __name__ == '__main__':
36-
main()
4+
main.main()

tests/test_mock_http_functions.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,3 @@ async def test_handles_unsupported_messages_gracefully(self):
7171
_, r = await host.load_function('return_out')
7272
self.assertEqual(r.response.result.status,
7373
protos.StatusResult.Success)
74-
75-
for log in r.logs:
76-
if 'unknown StreamingMessage' in log.message:
77-
break
78-
else:
79-
raise AssertionError('the worker did not log about an '
80-
'"unknown StreamingMessage"')

0 commit comments

Comments
 (0)