Skip to content

Commit 62beb40

Browse files
authored
Add EventHub metadata E2E testcases (#719)
* Fix syntax issue in azure-pipelinesyml Fix an error in the elif statement Fix an issue in nuspec_path variable reference * Establish python_value and python_type contract with azure.functions.meta.Datum * Add e2e test for eventhub generating by azure-eventhub library * Remove unused datetime library * Add batch trigger tests * Replace eventhub using async interfaces * Python 3.6 and 3.7 not support datetime.fromisostring * EventHub timestamp is timezone aware * Fix setup.py * Add comments in EventHub trigger E2E tests * Use docstring instead
1 parent 80b5f4e commit 62beb40

File tree

23 files changed

+456
-29
lines changed

23 files changed

+456
-29
lines changed

azure_functions_worker/bindings/datumdef.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Copyright (c) Microsoft Corporation. All rights reserved.
22
# Licensed under the MIT License.
3+
4+
from typing import Any
5+
import json
36
from .. import protos
47

58

@@ -8,6 +11,29 @@ def __init__(self, value, type):
811
self.value = value
912
self.type = type
1013

14+
@property
15+
def python_value(self) -> Any:
16+
if self.value is None or self.type is None:
17+
return None
18+
elif self.type in ('bytes', 'string', 'int', 'double'):
19+
return self.value
20+
elif self.type == 'json':
21+
return json.loads(self.value)
22+
elif self.type == 'collection_string':
23+
return [v for v in self.value.string]
24+
elif self.type == 'collection_bytes':
25+
return [v for v in self.value.bytes]
26+
elif self.type == 'collection_double':
27+
return [v for v in self.value.double]
28+
elif self.type == 'collection_sint64':
29+
return [v for v in self.value.sint64]
30+
else:
31+
return self.value
32+
33+
@property
34+
def python_type(self) -> type:
35+
return type(self.python_value)
36+
1137
def __eq__(self, other):
1238
if not isinstance(other, type(self)):
1339
return False

setup.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,8 @@ def run(self):
286286
extras_require={
287287
'dev': [
288288
'azure-functions==1.3.0',
289+
'azure-eventhub~=5.1.0',
290+
'python-dateutil~=2.8.1',
289291
'flake8~=3.7.9',
290292
'mypy',
291293
'pytest',

tests/endtoend/eventhub_batch_functions/eventhub_multiple/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import json
44

55

6+
# This is an actual EventHub trigger which handles Eventhub events in batches.
7+
# It serializes multiple event data into a json and store it into a blob.
68
def main(events):
79
table_entries = []
810
for event in events:

tests/endtoend/eventhub_batch_functions/eventhub_output_batch/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import azure.functions as func
44

55

6+
# An HttpTrigger to generating EventHub event from EventHub Output Binding
67
def main(req: func.HttpRequest) -> str:
78
events = req.get_body().decode('utf-8')
89
return events

tests/endtoend/eventhub_batch_functions/get_eventhub_batch_triggered/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@
33
import azure.functions as func
44

55

6+
# Retrieve the event data from storage blob and return it as Http response
67
def main(req: func.HttpRequest, testEntities):
78
return func.HttpResponse(status_code=200, body=testEntities)
Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,27 @@
1-
{
2-
"scriptFile": "__init__.py",
3-
"bindings": [
4-
{
5-
"type": "httpTrigger",
6-
"direction": "in",
7-
"authLevel": "anonymous",
8-
"methods": [
9-
"get"
10-
],
11-
"name": "req"
12-
},
13-
{
14-
"direction": "in",
15-
"type": "table",
16-
"name": "testEntities",
17-
"partitionKey": "WillBePopulated",
18-
"tableName": "EventHubBatchTest",
19-
"connection": "AzureWebJobsStorage"
20-
},
21-
{
22-
"type": "http",
23-
"direction": "out",
24-
"name": "$return"
25-
}
26-
]
1+
{
2+
"scriptFile": "__init__.py",
3+
"bindings": [
4+
{
5+
"type": "httpTrigger",
6+
"direction": "in",
7+
"authLevel": "anonymous",
8+
"methods": [
9+
"get"
10+
],
11+
"name": "req"
12+
},
13+
{
14+
"direction": "in",
15+
"type": "table",
16+
"name": "testEntities",
17+
"partitionKey": "WillBePopulated",
18+
"tableName": "EventHubBatchTest",
19+
"connection": "AzureWebJobsStorage"
20+
},
21+
{
22+
"type": "http",
23+
"direction": "out",
24+
"name": "$return"
25+
}
26+
]
2727
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
import azure.functions as func
4+
5+
6+
# Retrieve the event data from storage blob and return it as Http response
7+
def main(req: func.HttpRequest, file: func.InputStream) -> str:
8+
return func.HttpResponse(body=file.read().decode('utf-8'),
9+
status_code=200,
10+
mimetype='application/json')
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"scriptFile": "__init__.py",
3+
"bindings": [
4+
{
5+
"type": "httpTrigger",
6+
"direction": "in",
7+
"name": "req"
8+
},
9+
{
10+
"type": "blob",
11+
"direction": "in",
12+
"name": "file",
13+
"connection": "AzureWebJobsStorage",
14+
"path": "python-worker-tests/test-metadata-batch-triggered.txt"
15+
},
16+
{
17+
"type": "http",
18+
"direction": "out",
19+
"name": "$return"
20+
}
21+
]
22+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
4+
import typing
5+
import json
6+
import azure.functions as func
7+
8+
9+
# This is an actual EventHub trigger which handles Eventhub events in batches.
10+
# It serializes multiple event data into a json and store it into a blob.
11+
def main(events: typing.List[func.EventHubEvent]) -> bytes:
12+
event_list = []
13+
for event in events:
14+
event_dict: typing.Mapping[str, typing.Any] = {
15+
'body': event.get_body().decode('utf-8'),
16+
'enqueued_time': event.enqueued_time.isoformat(),
17+
'partition_key': event.partition_key,
18+
'sequence_number': event.sequence_number,
19+
'offset': event.offset,
20+
'metadata': event.metadata
21+
}
22+
event_list.append(event_dict)
23+
24+
return json.dumps(event_list)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"scriptFile": "__init__.py",
3+
"bindings": [
4+
{
5+
"type": "eventHubTrigger",
6+
"name": "events",
7+
"direction": "in",
8+
"cardinality": "many",
9+
"dataType": "binary",
10+
"eventHubName": "python-worker-ci-eventhub-batch-metadata",
11+
"connection": "AzureWebJobsEventHubConnectionString"
12+
},
13+
{
14+
"type": "blob",
15+
"direction": "out",
16+
"name": "$return",
17+
"connection": "AzureWebJobsStorage",
18+
"path": "python-worker-tests/test-metadata-batch-triggered.txt"
19+
}
20+
]
21+
}

0 commit comments

Comments
 (0)