Skip to content

Commit 736ad0c

Browse files
committed
FIX: Live client metadata reconnect handling
1 parent 364365b commit 736ad0c

File tree

2 files changed

+22
-19
lines changed

2 files changed

+22
-19
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#### Bug fixes
1010
- Fixed an issue where `batch.download` and `batch.download_async` would fail if requested files already existed in the output directory
1111
- Fixed an issue where `batch.download`, `batch.download_async`, and `timeseries.get_range` could use a lot of memory while streaming data
12+
- Fixed an issue where reusing a `Live` client with an open output stream would drop DBN records when received at the same time as the `Metadata` header
1213

1314
## 0.33.0 - 2024-04-16
1415

databento/live/session.py

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from typing import Final
1212

1313
import databento_dbn
14+
from databento_dbn import Metadata
1415
from databento_dbn import Schema
1516
from databento_dbn import SType
1617

@@ -195,28 +196,29 @@ def __init__(
195196
self._user_streams = user_streams
196197

197198
def _process_dbn(self, data: bytes) -> None:
198-
# Do no re-write the metadata to the stream to avoid corruption
199-
if not self._metadata or not data.startswith(b"DBN"):
200-
for stream, exc_callback in self._user_streams.items():
201-
try:
202-
stream.write(data)
203-
except Exception as exc:
204-
stream_name = getattr(stream, "name", str(stream))
205-
logger.error(
206-
"error writing %d bytes to `%s` stream",
207-
len(data),
208-
stream_name,
209-
exc_info=exc,
210-
)
211-
if exc_callback is not None:
212-
exc_callback(exc)
199+
start_index = 0
200+
if data.startswith(b"DBN") and self._metadata:
201+
# We have already received metata for the stream
202+
# Set start index to metadata length
203+
start_index = int.from_bytes(data[4:8], byteorder="little") + 8
204+
self._metadata.check(Metadata.decode(bytes(data[:start_index])))
205+
for stream, exc_callback in self._user_streams.items():
206+
try:
207+
stream.write(data[start_index:])
208+
except Exception as exc:
209+
stream_name = getattr(stream, "name", str(stream))
210+
logger.error(
211+
"error writing %d bytes to `%s` stream",
212+
len(data[start_index:]),
213+
stream_name,
214+
exc_info=exc,
215+
)
216+
if exc_callback is not None:
217+
exc_callback(exc)
213218
return super()._process_dbn(data)
214219

215220
def received_metadata(self, metadata: databento_dbn.Metadata) -> None:
216-
if not self._metadata:
217-
self._metadata.data = metadata
218-
else:
219-
self._metadata.check(metadata)
221+
self._metadata.data = metadata
220222
return super().received_metadata(metadata)
221223

222224
def received_record(self, record: DBNRecord) -> None:

0 commit comments

Comments
 (0)