From 9e7db990bf2d763d02ef98e7b199f03da1173f19 Mon Sep 17 00:00:00 2001 From: Andrew Leech Date: Wed, 7 Aug 2024 16:38:14 +1000 Subject: [PATCH 1/2] micropython/streampair: Package to create bi-directional linked stream objects. Signed-off-by: Andrew Leech --- micropython/streampair/manifest.py | 5 ++ micropython/streampair/streampair.py | 83 +++++++++++++++++++++++ micropython/streampair/test_streampair.py | 54 +++++++++++++++ 3 files changed, 142 insertions(+) create mode 100644 micropython/streampair/manifest.py create mode 100644 micropython/streampair/streampair.py create mode 100644 micropython/streampair/test_streampair.py diff --git a/micropython/streampair/manifest.py b/micropython/streampair/manifest.py new file mode 100644 index 000000000..454291696 --- /dev/null +++ b/micropython/streampair/manifest.py @@ -0,0 +1,5 @@ +metadata( + description="Create a bi-directional linked pair of stream objects", version="0.0.1" +) + +module("streampair.py") diff --git a/micropython/streampair/streampair.py b/micropython/streampair/streampair.py new file mode 100644 index 000000000..5e41f9ba6 --- /dev/null +++ b/micropython/streampair/streampair.py @@ -0,0 +1,83 @@ +import io + +from collections import deque +from micropython import ringbuffer, const + +try: + from typing import Union, Tuple +except: + pass + +# From micropython/py/stream.h +_MP_STREAM_ERROR = const(-1) +_MP_STREAM_FLUSH = const(1) +_MP_STREAM_SEEK = const(2) +_MP_STREAM_POLL = const(3) +_MP_STREAM_CLOSE = const(4) +_MP_STREAM_POLL_RD = const(0x0001) + + +def streampair(buffer_size: Union[int, Tuple[int, int]]=256): + """ + Returns two bi-directional linked stream objects where writes to one can be read from the other and vice/versa. + This can be used somewhat similarly to a socket.socketpair in python, like a pipe + of data that can be used to connect stream consumers (eg. asyncio.StreamWriter, mock Uart) + """ + try: + size_a, size_b = buffer_size + except TypeError: + size_a = size_b = buffer_size + + a = ringbuffer(size_a) + b = ringbuffer(size_b) + return StreamPair(a, b), StreamPair(b, a) + + +class StreamPair(io.IOBase): + + def __init__(self, own: ringbuffer, other: ringbuffer): + self.own = own + self.other = other + super().__init__() + + def read(self, nbytes=-1): + return self.own.read(nbytes) + + def readline(self): + return self.own.readline() + + def readinto(self, buf, limit=-1): + return self.own.readinto(buf, limit) + + def write(self, data): + return self.other.write(data) + + def seek(self, offset, whence): + return self.own.seek(offset, whence) + + def flush(self): + self.own.flush() + self.other.flush() + + def close(self): + self.own.close() + self.other.close() + + def any(self): + return self.own.any() + + def ioctl(self, op, arg): + if op == _MP_STREAM_POLL: + if self.any(): + return _MP_STREAM_POLL_RD + return 0 + + elif op ==_MP_STREAM_FLUSH: + return self.flush() + elif op ==_MP_STREAM_SEEK: + return self.seek(arg[0], arg[1]) + elif op ==_MP_STREAM_CLOSE: + return self.close() + + else: + return _MP_STREAM_ERROR diff --git a/micropython/streampair/test_streampair.py b/micropython/streampair/test_streampair.py new file mode 100644 index 000000000..6f974e7fd --- /dev/null +++ b/micropython/streampair/test_streampair.py @@ -0,0 +1,54 @@ +import asyncio +import unittest +from streampair import streampair + +def async_test(f): + """ + Decorator to run an async test function + """ + def wrapper(*args, **kwargs): + loop = asyncio.new_event_loop() + # loop.set_exception_handler(_exception_handler) + t = loop.create_task(f(*args, **kwargs)) + loop.run_until_complete(t) + + return wrapper + +class StreamPairTestCase(unittest.TestCase): + + def test_streampair(self): + a, b = streampair() + assert a.write(b"foo") == 3 + assert b.write(b"bar") == 3 + + assert (r := a.read()) == b"bar", r + assert (r := b.read()) == b"foo", r + + @async_test + async def test_async_streampair(self): + a, b = streampair() + ar = asyncio.StreamReader(a) + bw = asyncio.StreamWriter(b) + + br = asyncio.StreamReader(b) + aw = asyncio.StreamWriter(a) + + aw.write(b"foo\n") + await aw.drain() + assert not a.any() + assert b.any() + assert (r := await br.readline()) == b"foo\n", r + assert not b.any() + assert not a.any() + + bw.write(b"bar\n") + await bw.drain() + assert not b.any() + assert a.any() + assert (r := await ar.readline()) == b"bar\n", r + assert not b.any() + assert not a.any() + + +if __name__ == "__main__": + unittest.main() From 35c671f54661ec341128969fa96aae6efba7e879 Mon Sep 17 00:00:00 2001 From: Andrew Leech Date: Thu, 30 Oct 2025 16:36:27 +1100 Subject: [PATCH 2/2] micropython/streampair: Use RingIO and add async tests. - Replace deprecated ringbuffer with RingIO name. - Remove unused collections.deque import. - Add test_select_poll_compatibility() to verify select.poll(). - Add test_streamreader_direct_usage() for asyncio.StreamReader. The existing ioctl implementation correctly returns -1 for unsupported operations, enabling select.poll() to work with pure Python stream objects. Since asyncio.StreamReader is built on select.poll(), StreamPair works seamlessly with async I/O. These tests document that Python objects with proper ioctl() implementation work with micropython polling and async infrastructure without requiring C-level integration. Signed-off-by: Andrew Leech Signed-off-by: Andrew Leech --- micropython/streampair/streampair.py | 9 ++-- micropython/streampair/test_streampair.py | 56 ++++++++++++++++++++++- 2 files changed, 59 insertions(+), 6 deletions(-) diff --git a/micropython/streampair/streampair.py b/micropython/streampair/streampair.py index 5e41f9ba6..03c6a90d6 100644 --- a/micropython/streampair/streampair.py +++ b/micropython/streampair/streampair.py @@ -1,7 +1,6 @@ import io -from collections import deque -from micropython import ringbuffer, const +from micropython import RingIO, const try: from typing import Union, Tuple @@ -28,14 +27,14 @@ def streampair(buffer_size: Union[int, Tuple[int, int]]=256): except TypeError: size_a = size_b = buffer_size - a = ringbuffer(size_a) - b = ringbuffer(size_b) + a = RingIO(size_a) + b = RingIO(size_b) return StreamPair(a, b), StreamPair(b, a) class StreamPair(io.IOBase): - def __init__(self, own: ringbuffer, other: ringbuffer): + def __init__(self, own: RingIO, other: RingIO): self.own = own self.other = other super().__init__() diff --git a/micropython/streampair/test_streampair.py b/micropython/streampair/test_streampair.py index 6f974e7fd..5d42456a5 100644 --- a/micropython/streampair/test_streampair.py +++ b/micropython/streampair/test_streampair.py @@ -1,5 +1,6 @@ import asyncio import unittest +import select from streampair import streampair def async_test(f): @@ -14,8 +15,8 @@ def wrapper(*args, **kwargs): return wrapper -class StreamPairTestCase(unittest.TestCase): +class StreamPairTestCase(unittest.TestCase): def test_streampair(self): a, b = streampair() assert a.write(b"foo") == 3 @@ -49,6 +50,59 @@ async def test_async_streampair(self): assert not b.any() assert not a.any() + def test_select_poll_compatibility(self): + """Test that streampair works with select.poll()""" + a, b = streampair() + + # Register stream with poll + poller = select.poll() + poller.register(a, select.POLLIN) + + # No data available initially + events = poller.poll(0) + assert len(events) == 0, f"Expected no events, got {events}" + + # Write data to b, should be readable from a + b.write(b"test data") + + # Should now poll as readable + events = poller.poll(0) + assert len(events) == 1, f"Expected 1 event, got {events}" + assert events[0][0] == a, "Event should be for stream a" + assert events[0][1] & select.POLLIN, "Should be readable" + + # Read the data + data = a.read() + assert data == b"test data", f"Expected b'test data', got {data}" + + # Should no longer poll as readable + events = poller.poll(0) + assert len(events) == 0, f"Expected no events after read, got {events}" + + poller.unregister(a) + + @async_test + async def test_streamreader_direct_usage(self): + """Test that streampair can be used directly with asyncio.StreamReader""" + a, b = streampair() + + # Create StreamReader directly on the streampair object + reader = asyncio.StreamReader(a) + + # Write data in background task + async def write_delayed(): + await asyncio.sleep_ms(10) + b.write(b"async test\n") + + task = asyncio.create_task(write_delayed()) + + # Should be able to read via StreamReader + data = await asyncio.wait_for(reader.readline(), 1.0) + assert data == b"async test\n", f"Expected b'async test\\n', got {data}" + + # Wait for background task to complete + await task + if __name__ == "__main__": unittest.main()