Skip to content

Commit c2567c0

Browse files
committed
Include iothub metadata in EventHub event trigger metadata
Closes: #288
1 parent 521d4c5 commit c2567c0

File tree

9 files changed

+100
-8
lines changed

9 files changed

+100
-8
lines changed

azure/functions_worker/bindings/eventhub.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@ def from_proto(cls, data: protos.TypedData, *,
8484
raise NotImplementedError(
8585
f'unsupported event data payload type: {data_type}')
8686

87+
iothub_metadata = {}
88+
for f in trigger_metadata:
89+
if f.startswith('iothub-'):
90+
v = cls._decode_trigger_metadata_field(
91+
trigger_metadata, f, python_type=str)
92+
iothub_metadata[f[len('iothub-'):]] = v
93+
8794
return _eventhub.EventHubEvent(
8895
body=body,
8996
enqueued_time=cls._parse_datetime_metadata(
@@ -93,5 +100,6 @@ def from_proto(cls, data: protos.TypedData, *,
93100
sequence_number=cls._decode_trigger_metadata_field(
94101
trigger_metadata, 'SequenceNumber', python_type=int),
95102
offset=cls._decode_trigger_metadata_field(
96-
trigger_metadata, 'Offset', python_type=str)
103+
trigger_metadata, 'Offset', python_type=str),
104+
iothub_metadata=iothub_metadata
97105
)

azure/functions_worker/testutils.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,14 @@ async def load_function(self, name):
326326
return func.id, r
327327

328328
async def invoke_function(
329-
self, name, input_data: typing.List[protos.ParameterBinding]):
329+
self,
330+
name,
331+
input_data: typing.List[protos.ParameterBinding],
332+
metadata: typing.Optional[
333+
typing.Mapping[str, protos.TypedData]]=None):
334+
335+
if metadata is None:
336+
metadata = {}
330337

331338
if name not in self._available_functions:
332339
raise RuntimeError(f'cannot load function {name}')
@@ -339,7 +346,10 @@ async def invoke_function(
339346
invocation_request=protos.InvocationRequest(
340347
invocation_id=invocation_id,
341348
function_id=func.id,
342-
input_data=input_data)),
349+
input_data=input_data,
350+
trigger_metadata=metadata,
351+
)
352+
),
343353
wait_for='invocation_response')
344354

345355
return invocation_id, r

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ def run(self):
225225
install_requires=[
226226
'grpcio~=1.19.0',
227227
'grpcio-tools~=1.19.0',
228-
'azure-functions==1.0.0b3',
228+
'azure-functions==1.0.0b4',
229229
],
230230
extras_require={
231231
'dev': [

tests/eventhub_functions/eventhub_output/function.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
"name": "event",
1414
"direction": "out",
1515
"eventHubName": "python-worker-ci",
16-
"connection": "AzureWebJobsEventHubConnectionString",
16+
"connection": "AzureWebJobsEventHubConnectionString"
1717
},
1818
{
1919
"direction": "out",

tests/eventhub_functions/eventhub_trigger/function.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@
88
"name": "event",
99
"direction": "in",
1010
"eventHubName": "python-worker-ci",
11-
"connection": "AzureWebJobsEventHubConnectionString",
11+
"connection": "AzureWebJobsEventHubConnectionString"
1212
},
1313
{
1414
"type": "blob",
1515
"direction": "out",
1616
"name": "$return",
1717
"connection": "AzureWebJobsStorage",
1818
"path": "python-worker-tests/test-eventhub-triggered.txt"
19-
},
19+
}
2020
]
2121
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import json
2+
3+
import azure.functions as func
4+
5+
6+
def main(event: func.EventHubEvent) -> str:
7+
return json.dumps(event.iothub_metadata)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"scriptFile": "__init__.py",
3+
"disabled": false,
4+
5+
"bindings": [
6+
{
7+
"type": "eventHubTrigger",
8+
"name": "event",
9+
"direction": "in",
10+
"eventHubName": "python-worker-ci",
11+
"connection": "AzureWebJobsEventHubConnectionString"
12+
},
13+
{
14+
"type": "blob",
15+
"direction": "out",
16+
"name": "$return",
17+
"connection": "AzureWebJobsStorage",
18+
"path": "python-worker-tests/test-eventhub-iot-triggered.txt"
19+
}
20+
]
21+
}

tests/eventhub_functions/get_eventhub_triggered/function.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
{
1818
"type": "http",
1919
"direction": "out",
20-
"name": "$return",
20+
"name": "$return"
2121
}
2222
]
2323
}

tests/test_eventhub_functions.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
22
import time
33

4+
from azure.functions_worker import protos
45
from azure.functions_worker import testutils
56

67

@@ -39,3 +40,48 @@ def test_eventhub_trigger(self):
3940
raise
4041
else:
4142
break
43+
44+
45+
class TestEventHubMockFunctions(testutils.AsyncTestCase):
46+
47+
async def test_mock_eventhub_trigger_iot(self):
48+
async with testutils.start_mockhost(
49+
script_root='eventhub_functions') as host:
50+
51+
func_id, r = await host.load_function('eventhub_trigger_iot')
52+
53+
self.assertEqual(r.response.function_id, func_id)
54+
self.assertEqual(r.response.result.status,
55+
protos.StatusResult.Success)
56+
57+
async def call_and_check():
58+
_, r = await host.invoke_function(
59+
'eventhub_trigger_iot',
60+
[
61+
protos.ParameterBinding(
62+
name='event',
63+
data=protos.TypedData(
64+
json=json.dumps({
65+
'id': 'foo'
66+
})
67+
),
68+
),
69+
],
70+
metadata={
71+
'iothub-device-id': protos.TypedData(
72+
string='mock-iothub-device-id'
73+
),
74+
'iothub-auth-data': protos.TypedData(
75+
string='mock-iothub-auth-data'
76+
)
77+
}
78+
)
79+
80+
self.assertEqual(r.response.result.status,
81+
protos.StatusResult.Success)
82+
self.assertIn(
83+
'mock-iothub-device-id',
84+
r.response.return_value.string
85+
)
86+
87+
await call_and_check()

0 commit comments

Comments
 (0)