Skip to content

Commit 25b5090

Browse files
author
tatoao pro
committed
debug async processor genererator ordered output pipe jamed bug
1 parent 72d5d67 commit 25b5090

2 files changed

Lines changed: 14 additions & 4 deletions

File tree

processor_pipeline/new_core/processor.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ async def execute_output_ordered_async_generator(self) -> Any:
281281
self.logger.info(f"Starting execute_output_ordered_async_generator for {self.processor_id}")
282282
task_ids = []
283283
task_queue_dict = defaultdict(asyncio.Queue)
284+
output_task_message_id_queue = asyncio.Queue()
284285

285286
async def process_task(data: Any, message_id: str) -> Any:
286287
"""
@@ -298,7 +299,10 @@ async def process_task(data: Any, message_id: str) -> Any:
298299

299300
async def output_from_task_queue_with_order():
300301
result_list_of_list= []
301-
for message_id in task_ids:
302+
while True:
303+
message_id = await output_task_message_id_queue.get()
304+
if message_id is None:
305+
break
302306
result = []
303307
item_count = 0
304308
while True:
@@ -317,13 +321,14 @@ async def output_from_task_queue_with_order():
317321
try:
318322
task_allocated = []
319323
input_count = 0
324+
output_task = asyncio.create_task(output_from_task_queue_with_order())
320325
async for message_id, data in self.input_pipe:
321326
input_count += 1
322-
task_ids.append(message_id)
323327
task = asyncio.create_task(process_task(data, message_id=message_id))
328+
output_task_message_id_queue.put_nowait(message_id)
324329
task_allocated.append(task)
330+
output_task_message_id_queue.put_nowait(None)
325331

326-
output_task = asyncio.create_task(output_from_task_queue_with_order())
327332
self.logger.info(f"Starting ordered output for {len(task_ids)} tasks")
328333
results, *_ = await asyncio.gather(output_task, *task_allocated)
329334

processor_pipeline/new_core/tests/test_graph.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,12 @@ async def process(self, input_data:str, *args, **kwargs) -> str:
3838
# logger.info(f"input_data: {input_data}, hashed: {hashed}")
3939
logger.info(f"HasherProcessor input_data: {input_data}")
4040
hashed = input_data[0]
41-
return hashed
41+
42+
for i in range(5):
43+
await asyncio.sleep(0.1)
44+
logger.info(f"HasherProcessor sleeping for 0.1 seconds")
45+
yield i
46+
4247

4348

4449
# python -m processor_pipeline.new_core.tests.test_graph

0 commit comments

Comments
 (0)