Skip to content

Commit 8287c53

Browse files
authored
VER: Release 0.28.0
2 parents 63ce2a3 + 4adeee9 commit 8287c53

File tree

11 files changed

+260
-68
lines changed

11 files changed

+260
-68
lines changed

CHANGELOG.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,25 @@
11
# Changelog
22

3+
## 0.28.0 - 2024-02-01
4+
5+
#### Enhancements
6+
- Substantially increased iteration queue size
7+
- Added methods `DBNQueue.enable` and `DBNQueue.disable` for controlling queue consumption
8+
- Added method `DBNQueue.is_enabled` to signal the queue can accept records
9+
- Added method `DBNQueue.is_full` to signal the queue has reached capacity
10+
- Added enabled checks to `DBNQueue.put` and `DBNQueue.put_nowait`
11+
12+
#### Breaking changes
13+
- Iterating a `Live` client after the streaming session has started will now raise a `ValueError`. Calling `Live.start` is not necessary when iterating the `Live` client
14+
- Moved constant `databento.live.client.DEFAULT_QUEUE_SIZE` to `databento.live.session.DBN_QUEUE_CAPACITY`
15+
- Removed `maxsize` parameter from `DBNQueue` constructor. `DBNQueue` now subclasses `SimpleQueue` instead
16+
- Removed property `DBNQueue.enabled`, use `DBNQueue.is_enabled` instead
17+
- Removed method `DBNQueue.is_half_full`, use `DBNQueue.is_full` instead
18+
19+
#### Bug fixes
20+
- Fixed an issue where DBN records could be dropped while iterating
21+
- Fixed an issue where async iteration would block the event loop
22+
323
## 0.27.0 - 2024-01-23
424

525
#### Enhancements

databento/common/enums.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def coercible(enum_type: type[M]) -> type[M]:
4040
4141
Notes
4242
-----
43-
This decorator makes some assuptions about your Enum class.
43+
This decorator makes some assumptions about your Enum class.
4444
1. Your attribute names are all UPPERCASE
4545
2. Your attribute values are all lowercase
4646
@@ -100,6 +100,7 @@ def __str__(self) -> str:
100100
return getattr(self, "name").lower()
101101
return getattr(self, "value")
102102

103+
103104
@unique
104105
@coercible
105106
class HistoricalGateway(StringyMixin, str, Enum):

databento/common/validation.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ def validate_semantic_string(value: str, param: str) -> str:
215215
if not value:
216216
raise ValueError(f"The `{param}` cannot be an empty string.")
217217
if str.isspace(value):
218-
raise ValueError(f"The `{param}` cannot contain only whitepsace.")
218+
raise ValueError(f"The `{param}` cannot contain only whitespace.")
219219
if not str.isprintable(value):
220220
raise ValueError(f"The `{param}` cannot contain unprintable characters.")
221221
return value

databento/live/client.py

Lines changed: 56 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from collections.abc import Iterable
1010
from concurrent import futures
1111
from os import PathLike
12-
from typing import IO, Final
12+
from typing import IO
1313

1414
import databento_dbn
1515
from databento_dbn import Schema
@@ -33,7 +33,6 @@
3333

3434

3535
logger = logging.getLogger(__name__)
36-
DEFAULT_QUEUE_SIZE: Final = 2048
3736

3837

3938
class Live:
@@ -85,7 +84,7 @@ def __init__(
8584
self._dataset: Dataset | str = ""
8685
self._ts_out = ts_out
8786

88-
self._dbn_queue: DBNQueue = DBNQueue(maxsize=DEFAULT_QUEUE_SIZE)
87+
self._dbn_queue: DBNQueue = DBNQueue()
8988
self._metadata: SessionMetadata = SessionMetadata()
9089
self._symbology_map: dict[int, str | int] = {}
9190
self._user_callbacks: dict[RecordCallback, ExceptionCallback | None] = {
@@ -120,43 +119,73 @@ def __aiter__(self) -> Live:
120119
return iter(self)
121120

122121
async def __anext__(self) -> DBNRecord:
122+
if not self._dbn_queue.is_enabled():
123+
raise ValueError("iteration has not started")
124+
125+
loop = asyncio.get_running_loop()
126+
123127
try:
124-
return next(self)
125-
except StopIteration:
126-
raise StopAsyncIteration
128+
return self._dbn_queue.get_nowait()
129+
except queue.Empty:
130+
while True:
131+
try:
132+
return await loop.run_in_executor(
133+
None,
134+
self._dbn_queue.get,
135+
True,
136+
0.1,
137+
)
138+
except queue.Empty:
139+
if self._session.is_disconnected():
140+
break
141+
finally:
142+
if not self._dbn_queue.is_full() and not self._session.is_reading():
143+
logger.debug(
144+
"resuming reading with %d pending records",
145+
self._dbn_queue.qsize(),
146+
)
147+
self._session.resume_reading()
148+
149+
self._dbn_queue.disable()
150+
await self.wait_for_close()
151+
logger.debug("completed async iteration")
152+
raise StopAsyncIteration
127153

128154
def __iter__(self) -> Live:
129155
logger.debug("starting iteration")
130-
self._dbn_queue._enabled.set()
131-
if not self._session.is_started() and self.is_connected():
156+
if self._session.is_started():
157+
logger.error("iteration started after session has started")
158+
raise ValueError(
159+
"Cannot start iteration after streaming has started, records may be missed. Don't call `Live.start` before iterating.",
160+
)
161+
elif self.is_connected():
132162
self.start()
163+
self._dbn_queue._enabled.set()
133164
return self
134165

135166
def __next__(self) -> DBNRecord:
136-
if self._dbn_queue is None:
167+
if not self._dbn_queue.is_enabled():
137168
raise ValueError("iteration has not started")
138169

139-
while not self._session.is_disconnected() or self._dbn_queue.qsize() > 0:
170+
while True:
140171
try:
141-
record = self._dbn_queue.get(block=False)
172+
record = self._dbn_queue.get(timeout=0.1)
142173
except queue.Empty:
143-
continue
174+
if self._session.is_disconnected():
175+
break
144176
else:
145-
logger.debug(
146-
"yielding %s record from next",
147-
type(record).__name__,
148-
)
149177
return record
150178
finally:
151-
if not self._dbn_queue.half_full() and not self._session.is_reading():
179+
if not self._dbn_queue.is_full() and not self._session.is_reading():
152180
logger.debug(
153181
"resuming reading with %d pending records",
154-
self._dbn_queue._qsize(),
182+
self._dbn_queue.qsize(),
155183
)
156184
self._session.resume_reading()
157185

158-
self._dbn_queue._enabled.clear()
186+
self._dbn_queue.disable()
159187
self.block_for_close()
188+
logger.debug("completed iteration")
160189
raise StopIteration
161190

162191
def __repr__(self) -> str:
@@ -358,12 +387,14 @@ def start(
358387
"""
359388
Start the live client session.
360389
390+
It is not necessary to call `Live.start` before iterating a `Live` client and doing so will result in an error.
391+
361392
Raises
362393
------
363394
ValueError
364-
If `start()` is called before a subscription has been made.
365-
If `start()` is called after streaming has already started.
366-
If `start()` is called after the live session has closed.
395+
If `Live.start` is called before a subscription has been made.
396+
If `Live.start` is called after streaming has already started.
397+
If `Live.start` is called after the live session has closed.
367398
368399
See Also
369400
--------
@@ -388,7 +419,7 @@ def stop(self) -> None:
388419
Raises
389420
------
390421
ValueError
391-
If `stop()` is called before a connection has been made.
422+
If `Live.stop` is called before a connection has been made.
392423
393424
See Also
394425
--------
@@ -505,7 +536,7 @@ def block_for_close(
505536
) -> None:
506537
"""
507538
Block until the session closes or a timeout is reached. A session will
508-
close after `stop()` is called or the remote gateway disconnects.
539+
close after `Live.stop` is called or the remote gateway disconnects.
509540
510541
Parameters
511542
----------
@@ -548,7 +579,7 @@ async def wait_for_close(
548579
) -> None:
549580
"""
550581
Coroutine to wait until the session closes or a timeout is reached. A
551-
session will close after `stop()` is called or the remote gateway
582+
session will close after `Live.stop` is called or the remote gateway
552583
disconnects.
553584
554585
Parameters

databento/live/session.py

Lines changed: 82 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -25,41 +25,98 @@
2525

2626
AUTH_TIMEOUT_SECONDS: Final = 2.0
2727
CONNECT_TIMEOUT_SECONDS: Final = 5.0
28+
DBN_QUEUE_CAPACITY: Final = 2**20
2829
DEFAULT_REMOTE_PORT: Final = 13000
2930

3031

31-
class DBNQueue(queue.Queue): # type: ignore [type-arg]
32+
class DBNQueue(queue.SimpleQueue): # type: ignore [type-arg]
3233
"""
3334
Queue for DBNRecords that can only be pushed to when enabled.
34-
35-
Parameters
36-
----------
37-
maxsize : int
38-
The `maxsize` for the Queue.
39-
4035
"""
4136

42-
def __init__(self, maxsize: int) -> None:
43-
super().__init__(maxsize)
37+
def __init__(self) -> None:
38+
super().__init__()
4439
self._enabled = threading.Event()
4540

46-
@property
47-
def enabled(self) -> bool:
41+
def is_enabled(self) -> bool:
4842
"""
49-
True if the Queue will allow pushing.
43+
Return True if the Queue will allow pushing; False otherwise.
5044
5145
A queue should only be enabled when it has a consumer.
5246
5347
"""
5448
return self._enabled.is_set()
5549

56-
def half_full(self) -> bool:
50+
def is_full(self) -> bool:
51+
"""
52+
Return True when the queue has reached capacity; False otherwise.
53+
"""
54+
return self.qsize() > DBN_QUEUE_CAPACITY
55+
56+
def enable(self) -> None:
57+
"""
58+
Enable the DBN queue for pushing.
59+
"""
60+
self._enabled.set()
61+
62+
def disable(self) -> None:
63+
"""
64+
Disable the DBN queue for pushing.
65+
"""
66+
self._enabled.clear()
67+
68+
def put(self, item: DBNRecord, block: bool = True, timeout: float | None = None) -> None:
69+
"""
70+
Put an item on the queue if the queue is enabled.
71+
72+
Parameters
73+
----------
74+
item: DBNRecord
75+
The DBNRecord to put into the queue
76+
block: bool, default True
77+
Block if necessary until a free slot is available or the `timeout` is reached
78+
timeout: float | None, default None
79+
The maximum amount of time to block, when `block` is True, for the queue to become enabled.
80+
81+
Raises
82+
------
83+
BentoError
84+
If the queue is not enabled.
85+
If the queue is not enabled within `timeout` seconds.
86+
87+
See Also
88+
--------
89+
queue.SimpleQueue.put
90+
5791
"""
58-
Return True when the queue has reached half capacity.
92+
if self._enabled.wait(timeout):
93+
return super().put(item, block, timeout)
94+
if timeout is not None:
95+
raise BentoError(f"queue is not enabled after {timeout} second(s)")
96+
raise BentoError("queue is not enabled")
97+
98+
def put_nowait(self, item: DBNRecord) -> None:
5999
"""
60-
with self.mutex:
61-
return self._qsize() > self.maxsize // 2
100+
Put an item on the queue, if the queue is enabled, without blocking.
62101
102+
Parameters
103+
----------
104+
item: DBNRecord
105+
The DBNRecord to put into the queue
106+
107+
Raises
108+
------
109+
BentoError
110+
If the queue is not enabled.
111+
112+
See Also
113+
--------
114+
queue.SimpleQueue.put_nowait
115+
116+
"""
117+
if self.is_enabled():
118+
return super().put_nowait(item)
119+
raise BentoError("queue is not enabled")
63120

64121
@dataclasses.dataclass
65122
class SessionMetadata:
@@ -173,22 +230,16 @@ def received_record(self, record: DBNRecord) -> None:
173230
if exc_callback is not None:
174231
exc_callback(exc)
175232

176-
if self._dbn_queue.enabled:
177-
try:
178-
self._dbn_queue.put_nowait(record)
179-
except queue.Full:
180-
logger.error(
181-
"record queue is full; dropped %s record ts_event=%s",
182-
type(record).__name__,
183-
record.ts_event,
233+
if self._dbn_queue.is_enabled():
234+
self._dbn_queue.put(record)
235+
236+
# DBNQueue has no max size; so check if it's above capacity, and if so, pause reading
237+
if self._dbn_queue.is_full():
238+
logger.warning(
239+
"record queue is full; %d record(s) to be processed",
240+
self._dbn_queue.qsize(),
184241
)
185-
else:
186-
if self._dbn_queue.half_full():
187-
logger.warning(
188-
"record queue is full; %d record(s) to be processed",
189-
self._dbn_queue._qsize(),
190-
)
191-
self.transport.pause_reading()
242+
self.transport.pause_reading()
192243

193244
return super().received_record(record)
194245

databento/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.27.0"
1+
__version__ = "0.28.0"

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "databento"
3-
version = "0.27.0"
3+
version = "0.28.0"
44
description = "Official Python client library for Databento"
55
authors = [
66
"Databento <support@databento.com>",

tests/mock_live_server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ def peer(self) -> str:
181181
def user_api_keys(self) -> dict[str, str]:
182182
"""
183183
Return a dictionary of user api keys for testing. The keys to this
184-
dictionary are the bucket_ids. The value shoud be a single user API
184+
dictionary are the bucket_ids. The value should be a single user API
185185
key.
186186
187187
Returns
@@ -475,7 +475,7 @@ class MockLiveServer:
475475
-------
476476
create(host="localhost", port=0)
477477
Factory method to create a new MockLiveServer instance.
478-
This is the prefered way to create an instance of
478+
This is the preferred way to create an instance of
479479
this class.
480480
481481
See Also

0 commit comments

Comments
 (0)