Skip to content

Commit b5e55c3

Browse files
committed
Provide an example of using an in-process queue
1 parent b9b4a72 commit b5e55c3

File tree

1 file changed

+26
-0
lines changed

1 file changed

+26
-0
lines changed

docs/howto/patterns.rst

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,32 @@ terminates, after canceling the other task::
8787
for task in pending:
8888
task.cancel()
8989

90+
In-process queue
91+
----------------
92+
93+
For simple applications that need to be able to consume and act on multiple
94+
messages at one, an in-process queue can be leveraged::
95+
96+
MAX_WORKERS = os.cpu_count()
97+
98+
queue = asyncio.Queue(MAX_WORKERS)
99+
100+
async def consumer_handler(websocket):
101+
async for message in websocket:
102+
await queue.put(message)
103+
104+
async def worker(websocket):
105+
while True:
106+
message = await queue.get()
107+
result = await long_running_task(message)
108+
await websocket.send(result)
109+
110+
async def handler(websocket):
111+
async with asyncio.TaskGroup() as tg:
112+
tg.create_task(consumer_handler(websocket))
113+
for _ in range(MAX_WORKERS):
114+
tg.create_task(worker(websocket))
115+
90116
Registration
91117
------------
92118

0 commit comments

Comments
 (0)