Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 41 additions & 7 deletions pipelinerl/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,10 @@ def read_jsonl_stream(f: TextIO, retry_delay: float = _REREAD_DELAY) -> Iterator


class FileStreamReader(StreamReader):
def __init__(self, stream: SingleStreamSpec):
def __init__(self, stream: SingleStreamSpec, skip_corrupted: bool = True, max_retries: int = 10):
self.stream = stream
self.skip_corrupted = skip_corrupted
self.max_retries = max_retries

def __enter__(self):
_file_dir = stream_dir(self.stream.exp_path, self.stream.topic, self.stream.instance, self.stream.partition)
Expand All @@ -317,22 +319,37 @@ def __enter__(self):
def __exit__(self, exc_type, exc_value, traceback):
self._file.close()

def _skip_to_next_line(self, position: int) -> int:
"""
Skip to the next newline character after the given position.
Returns the new position after the newline, or the current position if at EOF.
"""
self._file.seek(position)
# Read until we find a newline
while True:
char = self._file.read(1)
if not char:
# EOF reached
return self._file.tell()
if char == '\n':
return self._file.tell()

def read(self):
retry_time = 0.01
cur_retries = 0
max_retries = 10
while True:
try:
for line in read_jsonl_stream(self._file):
yield line
cur_retries = 0
retry_time = 0.01
except json.JSONDecodeError as e:
# Sometimes when the stream file is being written to as the as time as we reading it,
# Sometimes when the stream file is being written to at the same time as we're reading it,
# we get lines like \0x00\0x00\0x00\0x00\0x00\0x00\0x00\0x00 that break the JSON decoder.
# We have to reopen the file and seek to the previous position to try again.
if cur_retries < max_retries:
if cur_retries < self.max_retries:
logger.warning(
f"Could not decode JSON from {self.stream}, might have run into end of the file. Will reopen the file and retry ({cur_retries}/{max_retries}), starting from position {e.position})"
f"Could not decode JSON from {self.stream}, might have run into end of the file. Will reopen the file and retry ({cur_retries}/{self.max_retries}), starting from position {e.position})"
) # type: ignore
time.sleep(retry_time)
self._file.close()
Expand All @@ -342,8 +359,25 @@ def read(self):
cur_retries += 1
continue
else:
logger.error(f"Error reading stream {self.stream}, giving up after {max_retries} retries")
raise e
# After max retries, the data is likely corrupted (not just a partial write).
if self.skip_corrupted:
# Skip to the next line and try to continue reading
new_position = self._skip_to_next_line(e.position)
logger.error(
f"Corrupted data in stream {self.stream} at position {e.position}, "
f"skipping to position {new_position} and continuing. "
f"Error was: {e.msg}"
)
self._file.close()
self._file = open(self._file_path, "r")
self._file.seek(new_position)
# Reset retry counters for the next potential corruption
cur_retries = 0
retry_time = 0.01
continue
else:
logger.error(f"Error reading stream {self.stream}, giving up after {self.max_retries} retries")
raise e


class RoundRobinFileStreamWriter(StreamWriter):
Expand Down