From 0ab458f68f80c42db20ff6fec2cef425347f9b8d Mon Sep 17 00:00:00 2001 From: Budleigh Salterton Date: Wed, 15 Mar 2023 17:33:57 +0000 Subject: [PATCH 1/6] Suggestions --- seq-aiohttp/main.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/seq-aiohttp/main.py b/seq-aiohttp/main.py index 7b504ec..7349a19 100644 --- a/seq-aiohttp/main.py +++ b/seq-aiohttp/main.py @@ -37,19 +37,18 @@ async def websocket_handler(request, input): return ws -bound_handler = functools.partial(websocket_handler, input='input') - async def get_from_topic(input): await asyncio.sleep(1) - topic_data = input.map(lambda s: f'consumer got: {s}').each(print) + topic_data = input.map(lambda s: f'consumer got: {s}') # here should be a JSON stringifier... return topic_data async def run(context, input): nest_asyncio.apply() + bound_handler = functools.partial(websocket_handler, input=input) app = web.Application() - app.add_routes([web.get('/', root)]) + app.add_routes([web.get('/', serve)]) app.add_routes([web.get('/ws', bound_handler)]) - app.add_routes([web.static('/files', './', show_index=True)]) + app.add_routes([web.static('/files', './', show_index=True)]) # this seems to be missing? # web.run_app(app) asyncio.gather(web.run_app(app), return_exceptions=True) From c3f8fa8c20005f9dbff375d89ab814a376d19fda Mon Sep 17 00:00:00 2001 From: Budleigh Salterton Date: Wed, 15 Mar 2023 17:58:56 +0000 Subject: [PATCH 2/6] I think this should work like this --- seq-aiohttp/main.py | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/seq-aiohttp/main.py b/seq-aiohttp/main.py index 7349a19..f808ec5 100644 --- a/seq-aiohttp/main.py +++ b/seq-aiohttp/main.py @@ -2,6 +2,8 @@ import asyncio from scramjet.streams import Stream from random import randint +from pyee.base import EventEmitter + import nest_asyncio # from gpiozero import CPUTemperature, DiskUsage, LoadAverage, PingServer import functools @@ -19,23 +21,27 @@ async def root(request): async def serve(request): return web.FileResponse('index.html') -async def websocket_handler(request, input): +async def websocket_handler(request, ee): ws = web.WebSocketResponse() - connected.add(ws) await ws.prepare(request) + + handler = lambda data: await connection.send_str(f'ok, data from topic:{data}') + + ee.on('data', handler) + async for msg in ws: - for connection in connected: - if msg.type == web.WSMsgType.TEXT: - if msg.data == 'close': - await ws.close() - else: - await connection.send_str(f'ok, data from topic:{await get_from_topic(input)}') - elif msg.type == web.WSMsgType.ERROR: - print(f'ws connection closed with exception {ws.exception()}') + ws.disconnect() # any messages here should kill the connection + print("websocket connection started sending data we dont accept") + + await ee.off('data', handler) print('websocket connection closed') return ws - + +async def get_event_emitter(input): + ee = EventEmitter() + input.each(lambda s: ee.emit("data", ee)) + return ee async def get_from_topic(input): await asyncio.sleep(1) @@ -44,7 +50,9 @@ async def get_from_topic(input): async def run(context, input): nest_asyncio.apply() - bound_handler = functools.partial(websocket_handler, input=input) + ee = get_event_emitter(input) + bound_handler = functools.partial(websocket_handler, ee=ee) + app = web.Application() app.add_routes([web.get('/', serve)]) app.add_routes([web.get('/ws', bound_handler)]) From 349d01997dd3b4039f12be19f12e8d537acf6b22 Mon Sep 17 00:00:00 2001 From: mcdominik Date: Tue, 21 Mar 2023 12:57:07 +0100 Subject: [PATCH 3/6] rpi seq type fix --- seq-rpi-internal/raspberry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seq-rpi-internal/raspberry.py b/seq-rpi-internal/raspberry.py index cf5eb2f..af90e9a 100644 --- a/seq-rpi-internal/raspberry.py +++ b/seq-rpi-internal/raspberry.py @@ -31,7 +31,7 @@ async def set_internals(stream, interval=3, mock=mock): async def run(context, input): stream = Stream() asyncio.gather(set_internals(stream), return_exceptions=True) - return stream.map(lambda x : x + "\n") + return stream.map(lambda x : str(x) + "\n") From c2160de2bee8901cfd1866cc706269eaf0aee392 Mon Sep 17 00:00:00 2001 From: mcdominik Date: Tue, 21 Mar 2023 12:57:36 +0100 Subject: [PATCH 4/6] new seq scripts --- seq-aiohttp/main.py | 64 ++++++++++++++++++------------------- seq-aiohttp/main2_no_seq.py | 51 +++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 32 deletions(-) create mode 100644 seq-aiohttp/main2_no_seq.py diff --git a/seq-aiohttp/main.py b/seq-aiohttp/main.py index f808ec5..edfabd7 100644 --- a/seq-aiohttp/main.py +++ b/seq-aiohttp/main.py @@ -1,13 +1,13 @@ from aiohttp import web import asyncio -from scramjet.streams import Stream +import functools from random import randint from pyee.base import EventEmitter - +from pyee.asyncio import AsyncIOEventEmitter import nest_asyncio -# from gpiozero import CPUTemperature, DiskUsage, LoadAverage, PingServer -import functools +# ee = EventEmitter() +ee = AsyncIOEventEmitter() connected = set() requires = { @@ -15,49 +15,49 @@ 'contentType': 'text/plain' } +@ee.on('event') +async def event_handler(ws, data): + print(f'TRIGGERED, {data}, {ws}') + await ws.send_str(f'ok, data {data}') + +async def push_to_socket(socket, data): + await socket.send_str(f'pushed: {data}') + async def root(request): return web.Response(text="working..") -async def serve(request): - return web.FileResponse('index.html') - -async def websocket_handler(request, ee): +async def websocket_handler(request, input): ws = web.WebSocketResponse() + connected.add(ws) await ws.prepare(request) + # await get_from_topic(ws=ws, input=input) - handler = lambda data: await connection.send_str(f'ok, data from topic:{data}') - - ee.on('data', handler) - + print('connected') async for msg in ws: - ws.disconnect() # any messages here should kill the connection - print("websocket connection started sending data we dont accept") - - await ee.off('data', handler) - + # ws.disconnect() # any messages here should kill the connection + print("websocket connection started sending data we dont accept") print('websocket connection closed') return ws -async def get_event_emitter(input): - ee = EventEmitter() - input.each(lambda s: ee.emit("data", ee)) - return ee +async def mock_from_topic(ws): + while True: + ee.emit('event', randint(0,20), ws) + await asyncio.sleep(1) -async def get_from_topic(input): - await asyncio.sleep(1) - topic_data = input.map(lambda s: f'consumer got: {s}') # here should be a JSON stringifier... - return topic_data +async def get_from_topic(input, ws): + print(ws) + print(input) + # return input.map(lambda s: f'consumer got: {s}').each(print) + return input.each(lambda s: ee.emit("event", s, ws)) async def run(context, input): nest_asyncio.apply() - ee = get_event_emitter(input) - bound_handler = functools.partial(websocket_handler, ee=ee) - app = web.Application() - app.add_routes([web.get('/', serve)]) + bound_handler = functools.partial(websocket_handler, input=input) app.add_routes([web.get('/ws', bound_handler)]) - app.add_routes([web.static('/files', './', show_index=True)]) # this seems to be missing? - # web.run_app(app) - asyncio.gather(web.run_app(app), return_exceptions=True) + app.add_routes([web.static('/files', './', show_index=True)]) + asyncio.gather(get_from_topic(input, ws=bound_handler), web.run_app(app)) + + diff --git a/seq-aiohttp/main2_no_seq.py b/seq-aiohttp/main2_no_seq.py new file mode 100644 index 0000000..c8f9d2f --- /dev/null +++ b/seq-aiohttp/main2_no_seq.py @@ -0,0 +1,51 @@ +from aiohttp import web +import asyncio +import functools +from random import randint +from pyee.asyncio import AsyncIOEventEmitter + + +# ee = EventEmitter() +ee = AsyncIOEventEmitter() + +requires = { + 'requires': 'pi', + 'contentType': 'text/plain' +} + +@ee.on('event') +async def event_handler(ws, data): + print(f'TRIGGERED, {data}, {ws}') + await ws.send_str(f'ok, data {data}') + +async def push_to_socket(socket, data): + await socket.send_str(f'pushed: {data}') + +async def root(request): + return web.Response(text="working..") + +async def websocket_handler(request): + ws = web.WebSocketResponse() + await ws.prepare(request) + print('connected') + await mock_from_topic(ws=ws) + async for msg in ws: + # ws.disconnect() # any messages here should kill the connection + print("websocket connection started sending data we dont accept") + print('websocket connection closed') + return ws + +async def mock_from_topic(ws): + while True: + ee.emit('event', ws, randint(0,20)) + await asyncio.sleep(randint(0,4)) + +app = web.Application() +app.add_routes([web.get('/', root)]) +bound_handler = functools.partial(websocket_handler) +app.add_routes([web.get('/ws', bound_handler)]) +app.add_routes([web.static('/files','./', show_index=True)]) +web.run_app(app) + + + From dfd78b4eff2fae5ce6209d30ec280a1f0c745861 Mon Sep 17 00:00:00 2001 From: mcdominik Date: Tue, 28 Mar 2023 17:41:08 +0200 Subject: [PATCH 5/6] update --- seq-aiohttp/main.py | 49 ++++++++++++++++++++++++++---------- seq-aiohttp/requirements.txt | 3 ++- 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/seq-aiohttp/main.py b/seq-aiohttp/main.py index edfabd7..c7aa26f 100644 --- a/seq-aiohttp/main.py +++ b/seq-aiohttp/main.py @@ -2,23 +2,25 @@ import asyncio import functools from random import randint -from pyee.base import EventEmitter from pyee.asyncio import AsyncIOEventEmitter import nest_asyncio +import sys -# ee = EventEmitter() ee = AsyncIOEventEmitter() connected = set() +print(f'PATH SEQ: {sys.path}') + + requires = { 'requires': 'pi', 'contentType': 'text/plain' } -@ee.on('event') -async def event_handler(ws, data): - print(f'TRIGGERED, {data}, {ws}') - await ws.send_str(f'ok, data {data}') +# @ee.on('event') +# async def event_handler(ws, data): +# print(f'TRIGGERED, {data}, {ws}') +# await ws.send_str(f'ok, data {data}') async def push_to_socket(socket, data): await socket.send_str(f'pushed: {data}') @@ -26,11 +28,12 @@ async def push_to_socket(socket, data): async def root(request): return web.Response(text="working..") -async def websocket_handler(request, input): +async def websocket_handler(request): ws = web.WebSocketResponse() connected.add(ws) await ws.prepare(request) # await get_from_topic(ws=ws, input=input) + ee.on('event', lambda s: ws.send_str(f'ok, data {s}')) print('connected') async for msg in ws: @@ -44,20 +47,40 @@ async def mock_from_topic(ws): ee.emit('event', randint(0,20), ws) await asyncio.sleep(1) -async def get_from_topic(input, ws): - print(ws) - print(input) +async def get_from_topic(input): + print('enter func') + # print(input) + async for x in input: + ee.emit("event", x) + print(x) + print('exit func') + + # # return input.each(print) + # # return input.each(lambda s: ee.emit("event", s)) # return input.map(lambda s: f'consumer got: {s}').each(print) - return input.each(lambda s: ee.emit("event", s, ws)) async def run(context, input): nest_asyncio.apply() app = web.Application() - bound_handler = functools.partial(websocket_handler, input=input) + bound_handler = functools.partial(websocket_handler) + print('hi') app.add_routes([web.get('/ws', bound_handler)]) app.add_routes([web.static('/files', './', show_index=True)]) - asyncio.gather(get_from_topic(input, ws=bound_handler), web.run_app(app)) + loop = asyncio.get_event_loop() + print(loop) + loop.set_debug(True) + loop.create_task(web.run_app(app)) + loop.create_task(get_from_topic(input=input)) + loop.run_forever() + + # await get_from_topic(input=input) + # loop = asyncio.get_running_loop() + # print('before topic register') + # await loop.run_in_executor(web.run_app(app)) + # print('after') + + diff --git a/seq-aiohttp/requirements.txt b/seq-aiohttp/requirements.txt index 4973af7..919651b 100644 --- a/seq-aiohttp/requirements.txt +++ b/seq-aiohttp/requirements.txt @@ -1,2 +1,3 @@ scramjet-framework-py==0.10 -pyee==9.0.4 \ No newline at end of file +pyee==9.0.4 +aiohttp==3.8.4 From 2c3b7dec3dde7a406795753d68954eaa8a2f5e1d Mon Sep 17 00:00:00 2001 From: mcdominik Date: Tue, 11 Apr 2023 09:41:50 +0200 Subject: [PATCH 6/6] seq update --- seq-aiohttp/main.py | 87 ++++++++++----------------------------------- 1 file changed, 18 insertions(+), 69 deletions(-) diff --git a/seq-aiohttp/main.py b/seq-aiohttp/main.py index c7aa26f..72655d1 100644 --- a/seq-aiohttp/main.py +++ b/seq-aiohttp/main.py @@ -1,86 +1,35 @@ -from aiohttp import web -import asyncio -import functools -from random import randint -from pyee.asyncio import AsyncIOEventEmitter +import aiohttp import nest_asyncio -import sys - -ee = AsyncIOEventEmitter() -connected = set() - -print(f'PATH SEQ: {sys.path}') - - -requires = { - 'requires': 'pi', - 'contentType': 'text/plain' -} - -# @ee.on('event') -# async def event_handler(ws, data): -# print(f'TRIGGERED, {data}, {ws}') -# await ws.send_str(f'ok, data {data}') +from aiohttp import web +from client.host_client import HostClient -async def push_to_socket(socket, data): - await socket.send_str(f'pushed: {data}') -async def root(request): - return web.Response(text="working..") +api_base ='http://127.0.0.1:8000' +host = HostClient(f'{api_base}/api/v1/') +topic_gen = host.get_named_data('pi') -async def websocket_handler(request): +async def handler(request): ws = web.WebSocketResponse() - connected.add(ws) await ws.prepare(request) - # await get_from_topic(ws=ws, input=input) - ee.on('event', lambda s: ws.send_str(f'ok, data {s}')) - print('connected') + async for chunk in await topic_gen: + await ws.send_str(str(chunk)) async for msg in ws: - # ws.disconnect() # any messages here should kill the connection - print("websocket connection started sending data we dont accept") + if msg.type == aiohttp.WSMsgType.TEXT: + if msg.data == 'close': + await ws.close() + else: + await ws.send_str(f'{msg.data} /answer') + elif msg.type == aiohttp.WSMsgType.ERROR: + print(f'ws connection closed with exception {ws.exception()}') print('websocket connection closed') return ws -async def mock_from_topic(ws): - while True: - ee.emit('event', randint(0,20), ws) - await asyncio.sleep(1) - -async def get_from_topic(input): - print('enter func') - # print(input) - async for x in input: - ee.emit("event", x) - print(x) - print('exit func') - - # # return input.each(print) - # # return input.each(lambda s: ee.emit("event", s)) - # return input.map(lambda s: f'consumer got: {s}').each(print) - async def run(context, input): nest_asyncio.apply() app = web.Application() - bound_handler = functools.partial(websocket_handler) - print('hi') - app.add_routes([web.get('/ws', bound_handler)]) app.add_routes([web.static('/files', './', show_index=True)]) - - - - loop = asyncio.get_event_loop() - print(loop) - loop.set_debug(True) - loop.create_task(web.run_app(app)) - loop.create_task(get_from_topic(input=input)) - loop.run_forever() - - # await get_from_topic(input=input) - # loop = asyncio.get_running_loop() - # print('before topic register') - # await loop.run_in_executor(web.run_app(app)) - # print('after') - + app.add_routes([web.get('/ws', handler)]) + await web.run_app(app, port=8020)