Skip to content

Commit 629069d

Browse files
committed
MOD: Live client iteration queue
1 parent 474c1fb commit 629069d

File tree

5 files changed

+190
-53
lines changed

5 files changed

+190
-53
lines changed

CHANGELOG.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,23 @@
22

33
## 0.28.0 - TBD
44

5-
#### Breaking change
5+
#### Enhancements
6+
- Substantially increased iteration queue size
7+
- Added methods `DBNQueue.enable` and `DBNQueue.disable` for controlling queue consumption
8+
- Added method `DBNQueue.is_enabled` to signal the queue can accept records
9+
- Added method `DBNQueue.is_full` to signal the queue has reached capacity
10+
- Added enabled checks to `DBNQueue.put` and `DBNQueue.put_nowait`
11+
12+
#### Breaking changes
613
- Iterating a `Live` client after the streaming session has started will now raise a `ValueError`. Calling `Live.start` is not necessary when iterating the `Live` client
14+
- Moved constant `databento.live.client.DEFAULT_QUEUE_SIZE` to `databento.live.session.DBN_QUEUE_CAPACITY`
15+
- Removed `maxsize` parameter from `DBNQueue` constructor, `None` is now used instead
16+
- Removed property `DBNQueue.enabled`, use `DBNQueue.is_enabled` instead
17+
- Removed method `DBNQueue.is_half_full`, use `DBNQueue.is_full` instead
18+
19+
#### Bug fixes
20+
- Fixed an issue where DBN records could be dropped while iterating
21+
- Fixed an issue where async iteration would block the event loop
722

823
## 0.27.0 - 2024-01-23
924

databento/live/client.py

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from collections.abc import Iterable
1010
from concurrent import futures
1111
from os import PathLike
12-
from typing import IO, Final
12+
from typing import IO
1313

1414
import databento_dbn
1515
from databento_dbn import Schema
@@ -33,7 +33,6 @@
3333

3434

3535
logger = logging.getLogger(__name__)
36-
DEFAULT_QUEUE_SIZE: Final = 2048
3736

3837

3938
class Live:
@@ -85,7 +84,7 @@ def __init__(
8584
self._dataset: Dataset | str = ""
8685
self._ts_out = ts_out
8786

88-
self._dbn_queue: DBNQueue = DBNQueue(maxsize=DEFAULT_QUEUE_SIZE)
87+
self._dbn_queue: DBNQueue = DBNQueue()
8988
self._metadata: SessionMetadata = SessionMetadata()
9089
self._symbology_map: dict[int, str | int] = {}
9190
self._user_callbacks: dict[RecordCallback, ExceptionCallback | None] = {
@@ -120,10 +119,37 @@ def __aiter__(self) -> Live:
120119
return iter(self)
121120

122121
async def __anext__(self) -> DBNRecord:
122+
if not self._dbn_queue.is_enabled():
123+
raise ValueError("iteration has not started")
124+
125+
loop = asyncio.get_running_loop()
126+
123127
try:
124-
return next(self)
125-
except StopIteration:
126-
raise StopAsyncIteration
128+
return self._dbn_queue.get_nowait()
129+
except queue.Empty:
130+
while True:
131+
try:
132+
return await loop.run_in_executor(
133+
None,
134+
self._dbn_queue.get,
135+
True,
136+
0.1,
137+
)
138+
except queue.Empty:
139+
if self._session.is_disconnected():
140+
break
141+
finally:
142+
if not self._dbn_queue.is_full() and not self._session.is_reading():
143+
logger.debug(
144+
"resuming reading with %d pending records",
145+
self._dbn_queue.qsize(),
146+
)
147+
self._session.resume_reading()
148+
149+
self._dbn_queue.disable()
150+
await self.wait_for_close()
151+
logger.debug("completed async iteration")
152+
raise StopAsyncIteration
127153

128154
def __iter__(self) -> Live:
129155
logger.debug("starting iteration")
@@ -138,30 +164,28 @@ def __iter__(self) -> Live:
138164
return self
139165

140166
def __next__(self) -> DBNRecord:
141-
if self._dbn_queue is None:
167+
if not self._dbn_queue.is_enabled():
142168
raise ValueError("iteration has not started")
143169

144-
while not self._session.is_disconnected() or self._dbn_queue.qsize() > 0:
170+
while True:
145171
try:
146-
record = self._dbn_queue.get(block=False)
172+
record = self._dbn_queue.get(timeout=0.1)
147173
except queue.Empty:
148-
continue
174+
if self._session.is_disconnected():
175+
break
149176
else:
150-
logger.debug(
151-
"yielding %s record from next",
152-
type(record).__name__,
153-
)
154177
return record
155178
finally:
156-
if not self._dbn_queue.half_full() and not self._session.is_reading():
179+
if not self._dbn_queue.is_full() and not self._session.is_reading():
157180
logger.debug(
158181
"resuming reading with %d pending records",
159-
self._dbn_queue._qsize(),
182+
self._dbn_queue.qsize(),
160183
)
161184
self._session.resume_reading()
162185

163-
self._dbn_queue._enabled.clear()
186+
self._dbn_queue.disable()
164187
self.block_for_close()
188+
logger.debug("completed iteration")
165189
raise StopIteration
166190

167191
def __repr__(self) -> str:

databento/live/session.py

Lines changed: 82 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -25,41 +25,98 @@
2525

2626
AUTH_TIMEOUT_SECONDS: Final = 2.0
2727
CONNECT_TIMEOUT_SECONDS: Final = 5.0
28+
DBN_QUEUE_CAPACITY: Final = 2**20
2829
DEFAULT_REMOTE_PORT: Final = 13000
2930

3031

31-
class DBNQueue(queue.Queue): # type: ignore [type-arg]
32+
class DBNQueue(queue.SimpleQueue): # type: ignore [type-arg]
3233
"""
3334
Queue for DBNRecords that can only be pushed to when enabled.
34-
35-
Parameters
36-
----------
37-
maxsize : int
38-
The `maxsize` for the Queue.
39-
4035
"""
4136

42-
def __init__(self, maxsize: int) -> None:
43-
super().__init__(maxsize)
37+
def __init__(self) -> None:
38+
super().__init__()
4439
self._enabled = threading.Event()
4540

46-
@property
47-
def enabled(self) -> bool:
41+
def is_enabled(self) -> bool:
4842
"""
49-
True if the Queue will allow pushing.
43+
Return True if the Queue will allow pushing; False otherwise.
5044
5145
A queue should only be enabled when it has a consumer.
5246
5347
"""
5448
return self._enabled.is_set()
5549

56-
def half_full(self) -> bool:
50+
def is_full(self) -> bool:
51+
"""
52+
Return True when the queue has reached capacity; False otherwise.
53+
"""
54+
return self.qsize() > DBN_QUEUE_CAPACITY
55+
56+
def enable(self) -> None:
57+
"""
58+
Enable the DBN queue for pushing.
59+
"""
60+
self._enabled.set()
61+
62+
def disable(self) -> None:
63+
"""
64+
Disable the DBN queue for pushing.
65+
"""
66+
self._enabled.clear()
67+
68+
def put(self, item: DBNRecord, block: bool = True, timeout: float | None = None) -> None:
69+
"""
70+
Put an item on the queue if the queue is enabled.
71+
72+
Parameters
73+
----------
74+
item: DBNRecord
75+
The DBNRecord to put into the queue
76+
block: bool, default True
77+
Block if necessary until a free slot is available or the `timeout` is reached
78+
timeout: float | None, default None
79+
The maximum amount of time to block, when `block` is True, for the queue to become enabled.
80+
81+
Raises
82+
------
83+
BentoError
84+
If the queue is not enabled.
85+
If the queue is not enabled within `timeout` seconds.
86+
87+
See Also
88+
--------
89+
queue.SimpleQueue.put
90+
5791
"""
58-
Return True when the queue has reached half capacity.
92+
if self._enabled.wait(timeout):
93+
return super().put(item, block, timeout)
94+
if timeout is not None:
95+
raise BentoError(f"queue is not enabled after {timeout} second(s)")
96+
raise BentoError("queue is not enabled")
97+
98+
def put_nowait(self, item: DBNRecord) -> None:
5999
"""
60-
with self.mutex:
61-
return self._qsize() > self.maxsize // 2
100+
Put an item on the queue, if the queue is enabled, without blocking.
62101
102+
Parameters
103+
----------
104+
item: DBNRecord
105+
The DBNRecord to put into the queue
106+
107+
Raises
108+
------
109+
BentoError
110+
If the queue is not enabled.
111+
112+
See Also
113+
--------
114+
queue.SimpleQueue.put_nowait
115+
116+
"""
117+
if self.is_enabled():
118+
return super().put_nowait(item)
119+
raise BentoError("queue is not enabled")
63120

64121
@dataclasses.dataclass
65122
class SessionMetadata:
@@ -173,22 +230,16 @@ def received_record(self, record: DBNRecord) -> None:
173230
if exc_callback is not None:
174231
exc_callback(exc)
175232

176-
if self._dbn_queue.enabled:
177-
try:
178-
self._dbn_queue.put_nowait(record)
179-
except queue.Full:
180-
logger.error(
181-
"record queue is full; dropped %s record ts_event=%s",
182-
type(record).__name__,
183-
record.ts_event,
233+
if self._dbn_queue.is_enabled():
234+
self._dbn_queue.put(record)
235+
236+
# DBNQueue has no max size; so check if it's above capacity, and if so, pause reading
237+
if self._dbn_queue.is_full():
238+
logger.warning(
239+
"record queue is full; %d record(s) to be processed",
240+
self._dbn_queue.qsize(),
184241
)
185-
else:
186-
if self._dbn_queue.half_full():
187-
logger.warning(
188-
"record queue is full; %d record(s) to be processed",
189-
self._dbn_queue._qsize(),
190-
)
191-
self.transport.pause_reading()
242+
self.transport.pause_reading()
192243

193244
return super().received_record(record)
194245

tests/test_live_client.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -837,7 +837,7 @@ async def test_live_async_iteration_backpressure(
837837
when the queue is depleted when iterating asynchronously.
838838
"""
839839
# Arrange
840-
monkeypatch.setattr(client, "DEFAULT_QUEUE_SIZE", 4)
840+
monkeypatch.setattr(session, "DBN_QUEUE_CAPACITY", 2)
841841

842842
live_client = client.Live(
843843
key=test_api_key,
@@ -878,10 +878,11 @@ async def test_live_async_iteration_dropped(
878878
test_api_key: str,
879879
) -> None:
880880
"""
881-
Test that an artificially small queue size will drop messages when full.
881+
Test that an artificially small queue size will not drop messages when
882+
full.
882883
"""
883884
# Arrange
884-
monkeypatch.setattr(client, "DEFAULT_QUEUE_SIZE", 1)
885+
monkeypatch.setattr(session, "DBN_QUEUE_CAPACITY", 1)
885886

886887
live_client = client.Live(
887888
key=test_api_key,
@@ -911,7 +912,7 @@ async def test_live_async_iteration_dropped(
911912
records = list(live_it)
912913

913914
# Assert
914-
assert len(records) == 1
915+
assert len(records) == 4
915916
assert live_client._dbn_queue.empty()
916917

917918

tests/test_live_session.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import pytest
2+
from databento.common.error import BentoError
3+
from databento.live.session import DBNQueue
4+
5+
6+
def test_dbn_queue_put(
7+
timeout: float = 0.01,
8+
) -> None:
9+
"""
10+
Test that DBNQueue.put raises a BentoError if disabled.
11+
12+
The `timeout` is required, otherwise we will block forever.
13+
14+
"""
15+
# Arrange
16+
queue = DBNQueue()
17+
18+
# Act, Assert
19+
with pytest.raises(BentoError):
20+
queue.put(None, timeout=timeout)
21+
22+
queue.enable()
23+
queue.put(None, timeout=timeout)
24+
25+
queue.disable()
26+
with pytest.raises(BentoError):
27+
queue.put(None, timeout=timeout)
28+
29+
30+
def test_dbn_queue_put_nowait() -> None:
31+
"""
32+
Test that DBNQueue.put_nowait raises a BentoError if disabled.
33+
"""
34+
# Arrange
35+
queue = DBNQueue()
36+
37+
# Act, Assert
38+
with pytest.raises(BentoError):
39+
queue.put_nowait(None)
40+
41+
queue.enable()
42+
queue.put_nowait(None)
43+
44+
queue.disable()
45+
with pytest.raises(BentoError):
46+
queue.put_nowait(None)

0 commit comments

Comments
 (0)