Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 19 additions & 40 deletions seq-aiohttp/main.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,35 @@
from aiohttp import web
import asyncio
from scramjet.streams import Stream
from random import randint
import aiohttp
import nest_asyncio
# from gpiozero import CPUTemperature, DiskUsage, LoadAverage, PingServer
import functools

connected = set()

requires = {
'requires': 'pi',
'contentType': 'text/plain'
}
from aiohttp import web
from client.host_client import HostClient

async def root(request):
return web.Response(text="working..")

async def serve(request):
return web.FileResponse('index.html')
api_base ='http://127.0.0.1:8000'
host = HostClient(f'{api_base}/api/v1/')
topic_gen = host.get_named_data('pi')

async def websocket_handler(request, input):
async def handler(request):
ws = web.WebSocketResponse()
connected.add(ws)
await ws.prepare(request)
async for msg in ws:
for connection in connected:
if msg.type == web.WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
else:
await connection.send_str(f'ok, data from topic:{await get_from_topic(input)}')
elif msg.type == web.WSMsgType.ERROR:
print(f'ws connection closed with exception {ws.exception()}')

async for chunk in await topic_gen:
await ws.send_str(str(chunk))
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
else:
await ws.send_str(f'{msg.data} /answer')
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f'ws connection closed with exception {ws.exception()}')
print('websocket connection closed')
return ws


bound_handler = functools.partial(websocket_handler, input='input')

async def get_from_topic(input):
await asyncio.sleep(1)
topic_data = input.map(lambda s: f'consumer got: {s}').each(print)
return topic_data

async def run(context, input):
nest_asyncio.apply()
app = web.Application()
app.add_routes([web.get('/', root)])
app.add_routes([web.get('/ws', bound_handler)])
app.add_routes([web.static('/files', './', show_index=True)])
# web.run_app(app)
asyncio.gather(web.run_app(app), return_exceptions=True)
app.add_routes([web.get('/ws', handler)])
await web.run_app(app, port=8020)


51 changes: 51 additions & 0 deletions seq-aiohttp/main2_no_seq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from aiohttp import web
import asyncio
import functools
from random import randint
from pyee.asyncio import AsyncIOEventEmitter


# ee = EventEmitter()
ee = AsyncIOEventEmitter()

requires = {
'requires': 'pi',
'contentType': 'text/plain'
}

@ee.on('event')
async def event_handler(ws, data):
print(f'TRIGGERED, {data}, {ws}')
await ws.send_str(f'ok, data {data}')

async def push_to_socket(socket, data):
await socket.send_str(f'pushed: {data}')

async def root(request):
return web.Response(text="working..")

async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
print('connected')
await mock_from_topic(ws=ws)
async for msg in ws:
# ws.disconnect() # any messages here should kill the connection
print("websocket connection started sending data we dont accept")
print('websocket connection closed')
return ws

async def mock_from_topic(ws):
while True:
ee.emit('event', ws, randint(0,20))
await asyncio.sleep(randint(0,4))

app = web.Application()
app.add_routes([web.get('/', root)])
bound_handler = functools.partial(websocket_handler)
app.add_routes([web.get('/ws', bound_handler)])
app.add_routes([web.static('/files','./', show_index=True)])
web.run_app(app)



3 changes: 2 additions & 1 deletion seq-aiohttp/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
scramjet-framework-py==0.10
pyee==9.0.4
pyee==9.0.4
aiohttp==3.8.4
2 changes: 1 addition & 1 deletion seq-rpi-internal/raspberry.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async def set_internals(stream, interval=3, mock=mock):
async def run(context, input):
stream = Stream()
asyncio.gather(set_internals(stream), return_exceptions=True)
return stream.map(lambda x : x + "\n")
return stream.map(lambda x : str(x) + "\n")