diff --git a/pipelinerl/streams.py b/pipelinerl/streams.py index 632b760e..75371a3e 100644 --- a/pipelinerl/streams.py +++ b/pipelinerl/streams.py @@ -299,8 +299,10 @@ def read_jsonl_stream(f: TextIO, retry_delay: float = _REREAD_DELAY) -> Iterator class FileStreamReader(StreamReader): - def __init__(self, stream: SingleStreamSpec): + def __init__(self, stream: SingleStreamSpec, skip_corrupted: bool = True, max_retries: int = 10): self.stream = stream + self.skip_corrupted = skip_corrupted + self.max_retries = max_retries def __enter__(self): _file_dir = stream_dir(self.stream.exp_path, self.stream.topic, self.stream.instance, self.stream.partition) @@ -317,22 +319,37 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, traceback): self._file.close() + def _skip_to_next_line(self, position: int) -> int: + """ + Skip to the next newline character after the given position. + Returns the new position after the newline, or the current position if at EOF. + """ + self._file.seek(position) + # Read until we find a newline + while True: + char = self._file.read(1) + if not char: + # EOF reached + return self._file.tell() + if char == '\n': + return self._file.tell() + def read(self): retry_time = 0.01 cur_retries = 0 - max_retries = 10 while True: try: for line in read_jsonl_stream(self._file): yield line cur_retries = 0 + retry_time = 0.01 except json.JSONDecodeError as e: - # Sometimes when the stream file is being written to as the as time as we reading it, + # Sometimes when the stream file is being written to at the same time as we're reading it, # we get lines like \0x00\0x00\0x00\0x00\0x00\0x00\0x00\0x00 that break the JSON decoder. # We have to reopen the file and seek to the previous position to try again. - if cur_retries < max_retries: + if cur_retries < self.max_retries: logger.warning( - f"Could not decode JSON from {self.stream}, might have run into end of the file. Will reopen the file and retry ({cur_retries}/{max_retries}), starting from position {e.position})" + f"Could not decode JSON from {self.stream}, might have run into end of the file. Will reopen the file and retry ({cur_retries}/{self.max_retries}), starting from position {e.position})" ) # type: ignore time.sleep(retry_time) self._file.close() @@ -342,8 +359,25 @@ def read(self): cur_retries += 1 continue else: - logger.error(f"Error reading stream {self.stream}, giving up after {max_retries} retries") - raise e + # After max retries, the data is likely corrupted (not just a partial write). + if self.skip_corrupted: + # Skip to the next line and try to continue reading + new_position = self._skip_to_next_line(e.position) + logger.error( + f"Corrupted data in stream {self.stream} at position {e.position}, " + f"skipping to position {new_position} and continuing. " + f"Error was: {e.msg}" + ) + self._file.close() + self._file = open(self._file_path, "r") + self._file.seek(new_position) + # Reset retry counters for the next potential corruption + cur_retries = 0 + retry_time = 0.01 + continue + else: + logger.error(f"Error reading stream {self.stream}, giving up after {self.max_retries} retries") + raise e class RoundRobinFileStreamWriter(StreamWriter):