From 5b6360e0d18fc93692cbc895546f547529d27822 Mon Sep 17 00:00:00 2001 From: Max Groot <19346100+MaxGroot@users.noreply.github.com> Date: Thu, 14 Sep 2023 13:36:28 +0200 Subject: [PATCH 01/10] Support file-like objects in record writer --- flow/record/adapter/jsonfile.py | 4 +- flow/record/adapter/line.py | 4 +- flow/record/adapter/stream.py | 4 +- flow/record/adapter/text.py | 4 +- flow/record/adapter/xlsx.py | 4 +- flow/record/base.py | 77 ++++++++++++++++++--------------- tests/test_record_adapter.py | 41 ++++++++++++++---- 7 files changed, 84 insertions(+), 54 deletions(-) diff --git a/flow/record/adapter/jsonfile.py b/flow/record/adapter/jsonfile.py index 1ad67c06..042f0d36 100644 --- a/flow/record/adapter/jsonfile.py +++ b/flow/record/adapter/jsonfile.py @@ -23,7 +23,7 @@ class JsonfileWriter(AbstractWriter): def __init__(self, path, indent=None, descriptors=True, **kwargs): self.descriptors = str(descriptors).lower() in ("true", "1") - self.fp = record.open_path(path, "w") + self.fp = record.open_file(path, "w") if isinstance(indent, str): indent = int(indent) self.packer = JsonRecordPacker(indent=indent, pack_descriptors=self.descriptors) @@ -55,7 +55,7 @@ class JsonfileReader(AbstractReader): def __init__(self, path, selector=None, **kwargs): self.selector = make_selector(selector) - self.fp = record.open_path(path, "r") + self.fp = record.open_file(path, "r") self.packer = JsonRecordPacker() def close(self): diff --git a/flow/record/adapter/line.py b/flow/record/adapter/line.py index 31139a55..4fadf2ad 100644 --- a/flow/record/adapter/line.py +++ b/flow/record/adapter/line.py @@ -1,4 +1,4 @@ -from flow.record import open_path +from flow.record import open_file from flow.record.adapter import AbstractWriter from flow.record.utils import is_stdout @@ -16,7 +16,7 @@ class LineWriter(AbstractWriter): fp = None def __init__(self, path, fields=None, exclude=None, **kwargs): - self.fp = open_path(path, "wb") + self.fp = open_file(path, "wb") self.count = 0 self.fields = fields self.exclude = exclude diff --git a/flow/record/adapter/stream.py b/flow/record/adapter/stream.py index b682d982..8bcd130d 100644 --- a/flow/record/adapter/stream.py +++ b/flow/record/adapter/stream.py @@ -1,6 +1,6 @@ from typing import Iterator, Union -from flow.record import Record, RecordOutput, RecordStreamReader, open_file, open_path +from flow.record import Record, RecordOutput, RecordStreamReader, open_file from flow.record.adapter import AbstractReader, AbstractWriter from flow.record.selector import Selector from flow.record.utils import is_stdout @@ -19,7 +19,7 @@ class StreamWriter(AbstractWriter): stream = None def __init__(self, path: str, clobber=True, **kwargs): - self.fp = open_path(path, "wb", clobber=clobber) + self.fp = open_file(path, "wb", clobber=clobber) self.stream = RecordOutput(self.fp) def write(self, record: Record) -> None: diff --git a/flow/record/adapter/text.py b/flow/record/adapter/text.py index dd29bf1d..3a297d66 100644 --- a/flow/record/adapter/text.py +++ b/flow/record/adapter/text.py @@ -1,4 +1,4 @@ -from flow.record import open_path +from flow.record import open_file from flow.record.adapter import AbstractWriter from flow.record.utils import is_stdout @@ -27,7 +27,7 @@ class TextWriter(AbstractWriter): fp = None def __init__(self, path, flush=True, format_spec=None, **kwargs): - self.fp = open_path(path, "wb") + self.fp = open_file(path, "wb") self.auto_flush = flush self.format_spec = format_spec diff --git a/flow/record/adapter/xlsx.py b/flow/record/adapter/xlsx.py index f83bff4f..7849b9ae 100644 --- a/flow/record/adapter/xlsx.py +++ b/flow/record/adapter/xlsx.py @@ -19,7 +19,7 @@ class XlsxWriter(AbstractWriter): wb = None def __init__(self, path, **kwargs): - self.fp = record.open_path(path, "wb") + self.fp = record.open_file(path, "wb") self.wb = openpyxl.Workbook() self.ws = self.wb.active self.desc = None @@ -51,7 +51,7 @@ class XlsxReader(AbstractReader): def __init__(self, path, selector=None, **kwargs): self.selector = make_selector(selector) - self.fp = record.open_path(path, "rb") + self.fp = record.open_file(path, "rb") self.desc = None self.wb = openpyxl.load_workbook(self.fp) self.ws = self.wb.active diff --git a/flow/record/base.py b/flow/record/base.py index 1e4d63f7..84ea2e15 100644 --- a/flow/record/base.py +++ b/flow/record/base.py @@ -645,6 +645,8 @@ def DynamicDescriptor(name, fields): def open_stream(fp: BinaryIO, mode: str) -> BinaryIO: + if "w" in mode: + return fp if not hasattr(fp, "peek"): fp = io.BufferedReader(fp) @@ -688,7 +690,7 @@ def open_file(path: Union[str, Path, BinaryIO], mode: str, clobber: bool = True) if isinstance(path, str): return open_path(path, mode, clobber) elif isinstance(path, io.IOBase): - return open_stream(path, "rb") + return open_stream(path, mode) else: raise ValueError(f"Unsupported path type {path}") @@ -798,41 +800,43 @@ def RecordAdapter( cls_url = p.netloc + p.path if sub_adapter: cls_url = sub_adapter + "://" + cls_url - elif url in ("-", ""): - # For reading stdin, we cannot rely on an extension to know what sort of stream is incoming. Thus, we will treat - # it as a 'fileobj', where we can peek into the stream and try to select the appropriate adapter. - fileobj = getattr(sys.stdin, "buffer", sys.stdin) - if fileobj is not None: - # This record adapter has received a file-like object for record reading - # We just need to find the right adapter by peeking into the first few bytes. - - # First, we open the stream. If the stream is compressed, open_stream will wrap it for us into a decompressor. - cls_stream = open_stream(fileobj, "rb") - - # Now, we have a stream that will be transparently decompressed but we still do not know what adapter to use. - # This requires a new peek into the transparent stream. This peek will cause the stream pointer to be moved. - # Therefore, find_adapter_for_stream returns both a BinaryIO-supportive object that can correctly read the - # adjusted stream, and a string indicating the type of adapter to be used on said stream. - arg_dict = kwargs.copy() - - # If a user did not provide a url, we have to peek into the stream to be able to determine the right adapter - # based on magic bytes encountered in the first few bytes of the stream. - if adapter is None: - cls_stream, adapter = find_adapter_for_stream(cls_stream) + if out is False: + if url in ("-", ""): + # For reading stdin, we cannot rely on an extension to know what sort of stream is incoming. Thus, we will + # treat it as a 'fileobj', where we can peek into the stream and try to select the appropriate adapter. + fileobj = getattr(sys.stdin, "buffer", sys.stdin) + if fileobj is not None: + # This record adapter has received a file-like object for record reading + # We just need to find the right adapter by peeking into the first few bytes. + + # First, we open the stream. If the stream is compressed, open_stream will wrap it for us into a + # decompressor. + cls_stream = open_stream(fileobj, "rb") + + # Now, we have a stream that will be transparently decompressed but we still do not know what adapter to + # use. This requires a new peek into the transparent stream. This peek will cause the stream pointer to be + # moved. Therefore, find_adapter_for_stream returns both a BinaryIO-supportive object that can correctly + # read the adjusted stream, and a string indicating the type of adapter to be used on said stream. + arg_dict = kwargs.copy() + + # If a user did not provide a url, we have to peek into the stream to be able to determine the right adapter + # based on magic bytes encountered in the first few bytes of the stream. if adapter is None: - peek_data = cls_stream.peek(RECORDSTREAM_MAGIC_DEPTH) - if peek_data and peek_data.startswith(b"<"): - # As peek() can result in a larger buffer than requested, we make sure the peek_data variable isn't - # unnecessarily long in the error message. - peek_data = peek_data[:RECORDSTREAM_MAGIC_DEPTH] - raise RecordAdapterNotFound( - ( - f"Could not find a reader for input {peek_data!r}. Are you perhaps " - "entering record text, rather than a record stream? This can be fixed by using " - "'rdump -w -' to write a record stream to stdout." + cls_stream, adapter = find_adapter_for_stream(cls_stream) + if adapter is None: + peek_data = cls_stream.peek(RECORDSTREAM_MAGIC_DEPTH) + if peek_data and peek_data.startswith(b"<"): + # As peek() can result in a larger buffer than requested, we make sure the peek_data variable + # isn't unnecessarily long in the error message. + peek_data = peek_data[:RECORDSTREAM_MAGIC_DEPTH] + raise RecordAdapterNotFound( + ( + f"Could not find a reader for input {peek_data!r}. Are you perhaps " + "entering record text, rather than a record stream? This can be fixed by using " + "'rdump -w -' to write a record stream to stdout." + ) ) - ) - raise RecordAdapterNotFound("Could not find adapter for file-like object") + raise RecordAdapterNotFound("Could not find adapter for file-like object") # Now that we know which adapter is needed, we import it. mod = importlib.import_module("flow.record.adapter.{}".format(adapter)) @@ -845,9 +849,10 @@ def RecordAdapter( if out: arg_dict["clobber"] = clobber log.debug("Creating {!r} for {!r} with args {!r}".format(cls, url, arg_dict)) - - if fileobj is not None: + if cls_stream is not None: return cls(cls_stream, **arg_dict) + if fileobj is not None: + return cls(fileobj, **arg_dict) return cls(cls_url, **arg_dict) diff --git a/tests/test_record_adapter.py b/tests/test_record_adapter.py index 143c7b3f..c141bc56 100644 --- a/tests/test_record_adapter.py +++ b/tests/test_record_adapter.py @@ -1,16 +1,13 @@ import datetime import platform import sys +from io import BytesIO import pytest -try: - from StringIO import StringIO -except ImportError: - from io import BytesIO as StringIO - from flow.record import ( PathTemplateWriter, + RecordAdapter, RecordArchiver, RecordDescriptor, RecordOutput, @@ -18,7 +15,7 @@ RecordStreamReader, RecordWriter, ) -from flow.record.adapter.stream import StreamReader +from flow.record.adapter.stream import StreamReader, StreamWriter from flow.record.base import ( BZ2_MAGIC, GZIP_MAGIC, @@ -33,7 +30,7 @@ def test_stream_writer_reader(): - fp = StringIO() + fp = BytesIO() out = RecordOutput(fp) for rec in generate_records(): out.write(rec) @@ -48,7 +45,7 @@ def test_stream_writer_reader(): def test_recordstream_filelike_object(): - fp = StringIO() + fp = BytesIO() out = RecordOutput(fp) for rec in generate_records(): out.write(rec) @@ -477,3 +474,31 @@ def test_csvfilereader(tmp_path): with RecordReader(f"csvfile://{path}", selector="r.count == '2'") as reader: for i, rec in enumerate(reader): assert rec.count == "2" + + +def test_gcs_writer() -> None: + test_buf = BytesIO() + + adapter = RecordAdapter(fileobj=test_buf, out=True) + + assert isinstance(adapter, StreamWriter) + + # Add mock records + test_records = list(generate_records(10)) + for record in test_records: + adapter.write(record) + + adapter.flush() + + # Grab the bytes before closing the BytesIO object. + read_buf = BytesIO(test_buf.getvalue()) + + # Close the writer and assure the object has been closed + adapter.close() + + # Verify if the written record stream is something we can read + reader = RecordAdapter(fileobj=read_buf) + read_records = list(reader) + assert len(read_records) == 10 + for idx, record in enumerate(read_records): + assert record == test_records[idx] From f078d4fe083f5e0d3f9f76baff7f5e40ef5a435c Mon Sep 17 00:00:00 2001 From: Max Groot <19346100+MaxGroot@users.noreply.github.com> Date: Thu, 14 Sep 2023 13:37:16 +0200 Subject: [PATCH 02/10] Add Google Cloud Storage Adapter --- flow/record/adapter/gcs.py | 97 +++++++++++++++++++++ tests/test_gcs_adapter.py | 172 +++++++++++++++++++++++++++++++++++++ 2 files changed, 269 insertions(+) create mode 100644 flow/record/adapter/gcs.py create mode 100644 tests/test_gcs_adapter.py diff --git a/flow/record/adapter/gcs.py b/flow/record/adapter/gcs.py new file mode 100644 index 00000000..39b1f0b7 --- /dev/null +++ b/flow/record/adapter/gcs.py @@ -0,0 +1,97 @@ +import logging +from fnmatch import fnmatch +from typing import Iterator, Union + +from google.cloud.storage.client import Client +from google.cloud.storage.fileio import BlobReader, BlobWriter + +from flow.record.adapter import AbstractReader, AbstractWriter +from flow.record.base import Record, RecordAdapter +from flow.record.selector import CompiledSelector, Selector + +__usage__ = """ +Google Cloud Storage adapter +--- +Read usage: rdump gcs://[PROJECT-ID]:[BUCKET-ID]?path=[PATH] +[PROJECT-ID]: Google Cloud Project ID +[BUCKET-ID]: Bucket ID +[path]: Path to look for files, with support for glob-pattern matching + +Write usage: rdump gcs://[PROJECT-ID]:[BUCKET-ID]?path=[PATH] +[PROJECT-ID]: Google Cloud Project ID +[BUCKET-ID]: Bucket ID +[path]: Path to write records to +""" + +log = logging.getLogger(__name__) + +GLOB_CHARACTERS = "*?[]" + + +class GcsReader(AbstractReader): + def __init__(self, uri: str, path: str, selector: Union[None, Selector, CompiledSelector] = None, **kwargs) -> None: + self.selector = selector + project_name = uri[: uri.find(":")] + bucket_name = uri[uri.find(":") + 1 :] + + self.gcs = Client(project=project_name) + self.bucket = self.gcs.bucket(bucket_name) + + # GCS Doesn't support iterating blobs using a glob pattern, so we have to do that ourselves. + # To split the path prefix from the glob-specific stuff, we have to find the first place where + # the glob starts. We'll then go through all files that match the path prefix before the glob, + # and do fnmatch ourselves to check whether any given blob matches with our glob. + lowest_pos = min([path.find(char) if path.find(char) >= 0 else float("inf") for char in GLOB_CHARACTERS]) + if lowest_pos == float("inf"): + # No glob character was found + self.glob = None + self.prefix = path + else: + # Split the glob and the prefix + self.prefix = path[:lowest_pos] + self.glob = path + + def __iter__(self) -> Iterator[Record]: + blobs = self.gcs.list_blobs(bucket_or_name=self.bucket, prefix=self.prefix) + for blob in blobs: + if blob.size == 0: # Skip empty files + continue + if self.glob and not fnmatch(blob.name, self.glob): + continue + blobreader = BlobReader(blob) + + # Give the file-like object to RecordAdapter so it will select the right adapter by peeking into the stream + reader = RecordAdapter(fileobj=blobreader, out=False, selector=self.selector) + for record in reader: + yield record + + def close(self) -> None: + self.gcs.close() + + +class GcsWriter(AbstractWriter): + def __init__(self, uri: str, path: str, **kwargs): + project_name = uri[: uri.find(":")] + bucket_name = uri[uri.find(":") + 1 :] + self.writer = None + + self.gcs = Client(project=project_name) + self.bucket = self.gcs.bucket(bucket_name) + + blob = self.bucket.blob(path) + self.writer = BlobWriter(blob, ignore_flush=True) + self.adapter = RecordAdapter(url=path, fileobj=self.writer, out=True, **kwargs) + + def write(self, record: Record) -> None: + self.adapter.write(record) + + def flush(self) -> None: + # https://cloud.google.com/python/docs/reference/storage/latest/google.cloud.storage.fileio.BlobWriter) + # Flushing without closing is not supported by the remote service and therefore calling it on this class + # normally results in io.UnsupportedOperation. However, that behavior is incompatible with some consumers and + # wrappers of fileobjects in Python. + pass + + def close(self) -> None: + if self.writer: + self.writer.close() diff --git a/tests/test_gcs_adapter.py b/tests/test_gcs_adapter.py new file mode 100644 index 00000000..96cfcad7 --- /dev/null +++ b/tests/test_gcs_adapter.py @@ -0,0 +1,172 @@ +from __future__ import annotations + +import sys +from io import BytesIO +from typing import Any, Generator, Iterator +from unittest.mock import MagicMock, patch + +import pytest + +from flow.record import Record, RecordAdapter, RecordDescriptor, RecordStreamWriter + + +def generate_records(amount) -> Generator[Record, Any, None]: + TestRecordWithFooBar = RecordDescriptor( + "test/record", + [ + ("string", "name"), + ("string", "foo"), + ("varint", "idx"), + ], + ) + for i in range(amount): + yield TestRecordWithFooBar(name=f"record{i}", foo="bar", idx=i) + + +def clean_up_adapter_import(test_function): + def wrapper(mock_google_sdk): + try: + result = test_function(mock_google_sdk) + finally: + if "flow.record.adapter.gcs" in sys.modules: + del sys.modules["flow.record.adapter.gcs"] + return result + + return wrapper + + +@pytest.fixture +def mock_google_sdk(monkeypatch: pytest.MonkeyPatch) -> Iterator[MagicMock]: + with monkeypatch.context() as m: + mock_google_sdk = MagicMock() + m.setitem(sys.modules, "google", mock_google_sdk) + m.setitem(sys.modules, "google.cloud", mock_google_sdk.cloud) + m.setitem(sys.modules, "google.cloud.storage", mock_google_sdk.cloud.storage) + m.setitem(sys.modules, "google.cloud.storage.client", mock_google_sdk.cloud.storage.client) + m.setitem(sys.modules, "google.cloud.storage.fileio", mock_google_sdk.cloud.storage.fileio) + + yield mock_google_sdk + + +@clean_up_adapter_import +def test_gcs_uri_and_path(mock_google_sdk: MagicMock) -> None: + from flow.record.adapter.gcs import GcsReader + + mock_client = MagicMock() + mock_google_sdk.cloud.storage.client.Client.return_value = mock_client + adapter_with_glob = RecordAdapter("gcs://test-project:test-bucket?path=/path/to/records/*/*.avro") + + assert isinstance(adapter_with_glob, GcsReader) + + mock_google_sdk.cloud.storage.client.Client.assert_called_with(project="test-project") + mock_client.bucket.assert_called_with("test-bucket") + + assert adapter_with_glob.prefix == "/path/to/records/" + assert adapter_with_glob.glob == "/path/to/records/*/*.avro" + + adapter_without_glob = RecordAdapter("gcs://test-project:test-bucket?path=/path/to/records/test-records.rec") + assert isinstance(adapter_without_glob, GcsReader) + + assert adapter_without_glob.prefix == "/path/to/records/test-records.rec" + assert adapter_without_glob.glob is None + + +@clean_up_adapter_import +def test_gcs_reader_glob(mock_google_sdk) -> None: + # Create a mocked record stream + test_records = list(generate_records(10)) + mock_blob = BytesIO() + writer = RecordStreamWriter(fp=mock_blob) + for record in test_records: + writer.write(record) + writer.flush() + mock_recordstream = mock_blob.getvalue() + writer.close() + + # Create a mocked client that will return the test-bucket + mock_client = MagicMock() + mock_client.bucket.return_value = "test-bucket-returned-from-client" + mock_google_sdk.cloud.storage.client.Client.return_value = mock_client + + # Create a mocked instance of the 'Blob' class of google.cloud.storage.fileio + recordsfile_blob_mock = MagicMock() + recordsfile_blob_mock.name = "/path/to/records/subfolder/results/tests.records" + recordsfile_blob_mock.data = mock_recordstream + recordsfile_blob_mock.size = len(mock_recordstream) + + # As this blob is located in the '🍩 select' folder, it should not match with the glob that will be used later + # (which requires /results/ to be present in the path string) + wrong_location_blob = MagicMock() + wrong_location_blob.name = "/path/to/records/subfolder/donutselect/tests.records" + wrong_location_blob.size = 0x69 + wrong_location_blob.data = b"" + + # Return one empty file, one file that should match the glob, and one file that shouldn't match the glob + mock_client.list_blobs.return_value = [MagicMock(size=0), recordsfile_blob_mock, wrong_location_blob] + + test_read_buf = BytesIO(mock_recordstream) + mock_reader = MagicMock(wraps=test_read_buf, spec=BytesIO) + mock_reader.closed = False + mock_google_sdk.cloud.storage.fileio.BlobReader.return_value = mock_reader + with patch("io.open", MagicMock(return_value=mock_reader)): + adapter = RecordAdapter( + url="gcs://test-project:test-bucket?path=/path/to/records/*/results/*.records", + selector="r.idx >= 5", + ) + + found_records = list(adapter) + mock_client.bucket.assert_called_with("test-bucket") + mock_client.list_blobs.assert_called_with( + bucket_or_name="test-bucket-returned-from-client", + prefix="/path/to/records/", + ) + + # We expect the GCS Reader to skip over blobs of size 0, as those will inherently not contain records. + # Thus, a BlobReader should only have been initialized once, for the mocked records blob. + mock_google_sdk.cloud.storage.fileio.BlobReader.assert_called_once() + + # We expect 5 records rather than 10 because of the selector that we used + assert len(found_records) == 5 + for record in found_records: + assert record.foo == "bar" + assert record == test_records[record.idx] + + adapter.close() + mock_client.close.assert_called() + + +@clean_up_adapter_import +def test_gcs_writer(mock_google_sdk) -> None: + from flow.record.adapter.gcs import GcsWriter + + test_buf = BytesIO() + mock_writer = MagicMock(wraps=test_buf, spec=BytesIO) + mock_google_sdk.cloud.storage.fileio.BlobWriter.return_value = mock_writer + + adapter = RecordAdapter("gcs://test-project:test-bucket?path=/test/test.records", out=True) + + assert isinstance(adapter, GcsWriter) + + # Add mock records + test_records = list(generate_records(10)) + for record in test_records: + adapter.write(record) + + # For GCS, the flush() function should do nothing, as GCS does not support it. + adapter.flush() + mock_writer.flush.assert_not_called() + + # Grab the bytes before it's too late + read_buf = BytesIO(test_buf.getvalue()) + + # Close the writer and assure the object has been closed + adapter.close() + mock_writer.close.assert_called() + assert test_buf.closed + + # Verify if the written record stream is something we can read + reader = RecordAdapter(fileobj=read_buf) + read_records = list(reader) + assert len(read_records) == 10 + for idx, record in enumerate(read_records): + assert record == test_records[idx] From 79acd5ed022b7f537df7c81fbeb94c56b9843aa0 Mon Sep 17 00:00:00 2001 From: Max Groot <19346100+MaxGroot@users.noreply.github.com> Date: Wed, 8 Nov 2023 13:06:22 +0100 Subject: [PATCH 03/10] Implement review suggestions --- flow/record/adapter/gcs.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/flow/record/adapter/gcs.py b/flow/record/adapter/gcs.py index 39b1f0b7..be40af93 100644 --- a/flow/record/adapter/gcs.py +++ b/flow/record/adapter/gcs.py @@ -25,14 +25,13 @@ log = logging.getLogger(__name__) -GLOB_CHARACTERS = "*?[]" +GLOB_CHARACTERS = ["*", "?", "[", "]"] class GcsReader(AbstractReader): def __init__(self, uri: str, path: str, selector: Union[None, Selector, CompiledSelector] = None, **kwargs) -> None: self.selector = selector - project_name = uri[: uri.find(":")] - bucket_name = uri[uri.find(":") + 1 :] + project_name, _, bucket_name = uri.partition(":") self.gcs = Client(project=project_name) self.bucket = self.gcs.bucket(bucket_name) @@ -41,14 +40,14 @@ def __init__(self, uri: str, path: str, selector: Union[None, Selector, Compiled # To split the path prefix from the glob-specific stuff, we have to find the first place where # the glob starts. We'll then go through all files that match the path prefix before the glob, # and do fnmatch ourselves to check whether any given blob matches with our glob. - lowest_pos = min([path.find(char) if path.find(char) >= 0 else float("inf") for char in GLOB_CHARACTERS]) - if lowest_pos == float("inf"): + first_glob_character = min((path.find(char) for char in GLOB_CHARACTERS if char in path), default=-1) + if first_glob_character == -1: # No glob character was found self.glob = None self.prefix = path else: # Split the glob and the prefix - self.prefix = path[:lowest_pos] + self.prefix = path[:first_glob_character] self.glob = path def __iter__(self) -> Iterator[Record]: @@ -71,8 +70,7 @@ def close(self) -> None: class GcsWriter(AbstractWriter): def __init__(self, uri: str, path: str, **kwargs): - project_name = uri[: uri.find(":")] - bucket_name = uri[uri.find(":") + 1 :] + project_name, _, bucket_name = uri.partition(":") self.writer = None self.gcs = Client(project=project_name) @@ -95,3 +93,4 @@ def flush(self) -> None: def close(self) -> None: if self.writer: self.writer.close() + self.writer = None From 3394b6a5e090a82dc2708d470b67edf4f1a256b9 Mon Sep 17 00:00:00 2001 From: Max Groot <19346100+MaxGroot@users.noreply.github.com> Date: Fri, 10 Nov 2023 10:11:13 +0100 Subject: [PATCH 04/10] Use `re.split` to determine prefix and pattern Also renamed self.glob to self.pattern. --- flow/record/adapter/gcs.py | 27 +++++++++++++-------------- tests/test_gcs_adapter.py | 4 ++-- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/flow/record/adapter/gcs.py b/flow/record/adapter/gcs.py index be40af93..3c6a077a 100644 --- a/flow/record/adapter/gcs.py +++ b/flow/record/adapter/gcs.py @@ -1,4 +1,5 @@ import logging +import re from fnmatch import fnmatch from typing import Iterator, Union @@ -25,7 +26,7 @@ log = logging.getLogger(__name__) -GLOB_CHARACTERS = ["*", "?", "[", "]"] +GLOB_CHARACTERS_RE = r"[\[\]\*\?]" class GcsReader(AbstractReader): @@ -36,26 +37,24 @@ def __init__(self, uri: str, path: str, selector: Union[None, Selector, Compiled self.gcs = Client(project=project_name) self.bucket = self.gcs.bucket(bucket_name) - # GCS Doesn't support iterating blobs using a glob pattern, so we have to do that ourselves. - # To split the path prefix from the glob-specific stuff, we have to find the first place where - # the glob starts. We'll then go through all files that match the path prefix before the glob, - # and do fnmatch ourselves to check whether any given blob matches with our glob. - first_glob_character = min((path.find(char) for char in GLOB_CHARACTERS if char in path), default=-1) - if first_glob_character == -1: - # No glob character was found - self.glob = None - self.prefix = path + # GCS Doesn't support iterating blobs using a glob pattern, so we have to do that ourselves. To extract the path + # prefix from the glob-pattern we have to find the first place where the glob starts. We'll then go through all + # files that match the path prefix, and do fnmatch ourselves to check whether any given blob path matches with + # the full pattern. + prefix_and_glob = re.split(GLOB_CHARACTERS_RE, path, maxsplit=1) + if len(prefix_and_glob) == 2: + self.prefix = prefix_and_glob[0] + self.pattern = path else: - # Split the glob and the prefix - self.prefix = path[:first_glob_character] - self.glob = path + self.prefix = path + self.pattern = None def __iter__(self) -> Iterator[Record]: blobs = self.gcs.list_blobs(bucket_or_name=self.bucket, prefix=self.prefix) for blob in blobs: if blob.size == 0: # Skip empty files continue - if self.glob and not fnmatch(blob.name, self.glob): + if self.pattern and not fnmatch(blob.name, self.pattern): continue blobreader = BlobReader(blob) diff --git a/tests/test_gcs_adapter.py b/tests/test_gcs_adapter.py index 96cfcad7..b2b2886b 100644 --- a/tests/test_gcs_adapter.py +++ b/tests/test_gcs_adapter.py @@ -62,13 +62,13 @@ def test_gcs_uri_and_path(mock_google_sdk: MagicMock) -> None: mock_client.bucket.assert_called_with("test-bucket") assert adapter_with_glob.prefix == "/path/to/records/" - assert adapter_with_glob.glob == "/path/to/records/*/*.avro" + assert adapter_with_glob.pattern == "/path/to/records/*/*.avro" adapter_without_glob = RecordAdapter("gcs://test-project:test-bucket?path=/path/to/records/test-records.rec") assert isinstance(adapter_without_glob, GcsReader) assert adapter_without_glob.prefix == "/path/to/records/test-records.rec" - assert adapter_without_glob.glob is None + assert adapter_without_glob.pattern is None @clean_up_adapter_import From 901c2ecdf26c227bf71729a3ca6ee5515f58a583 Mon Sep 17 00:00:00 2001 From: Max Groot <19346100+MaxGroot@users.noreply.github.com> Date: Fri, 10 Nov 2023 11:49:02 +0100 Subject: [PATCH 05/10] Further simplify prefix and pattern logic --- flow/record/adapter/gcs.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/flow/record/adapter/gcs.py b/flow/record/adapter/gcs.py index 3c6a077a..1a190afc 100644 --- a/flow/record/adapter/gcs.py +++ b/flow/record/adapter/gcs.py @@ -38,16 +38,9 @@ def __init__(self, uri: str, path: str, selector: Union[None, Selector, Compiled self.bucket = self.gcs.bucket(bucket_name) # GCS Doesn't support iterating blobs using a glob pattern, so we have to do that ourselves. To extract the path - # prefix from the glob-pattern we have to find the first place where the glob starts. We'll then go through all - # files that match the path prefix, and do fnmatch ourselves to check whether any given blob path matches with - # the full pattern. - prefix_and_glob = re.split(GLOB_CHARACTERS_RE, path, maxsplit=1) - if len(prefix_and_glob) == 2: - self.prefix = prefix_and_glob[0] - self.pattern = path - else: - self.prefix = path - self.pattern = None + # prefix from the glob-pattern we have to find the first place where the glob starts. + self.prefix, *glob_pattern = re.split(GLOB_CHARACTERS_RE, path) + self.pattern = path if glob_pattern else None def __iter__(self) -> Iterator[Record]: blobs = self.gcs.list_blobs(bucket_or_name=self.bucket, prefix=self.prefix) From a7d7860a621fef0a908f4f4cc2c726680d132049 Mon Sep 17 00:00:00 2001 From: Max Groot <19346100+MaxGroot@users.noreply.github.com> Date: Mon, 20 Nov 2023 17:25:29 +0100 Subject: [PATCH 06/10] Implement code review suggestions --- flow/record/adapter/gcs.py | 25 ++++++++++++------------- pyproject.toml | 3 +++ tests/test_gcs_adapter.py | 21 +++++++++++---------- 3 files changed, 26 insertions(+), 23 deletions(-) diff --git a/flow/record/adapter/gcs.py b/flow/record/adapter/gcs.py index 1a190afc..66d710cf 100644 --- a/flow/record/adapter/gcs.py +++ b/flow/record/adapter/gcs.py @@ -13,14 +13,14 @@ __usage__ = """ Google Cloud Storage adapter --- -Read usage: rdump gcs://[PROJECT-ID]:[BUCKET-ID]?path=[PATH] -[PROJECT-ID]: Google Cloud Project ID -[BUCKET-ID]: Bucket ID +Read usage: rdump gcs://[BUCKET_ID]/path?project_id=[PROJECT_ID] +[PROJECT_ID]: Google Cloud Project ID +[BUCKET_ID]: Bucket ID [path]: Path to look for files, with support for glob-pattern matching -Write usage: rdump gcs://[PROJECT-ID]:[BUCKET-ID]?path=[PATH] -[PROJECT-ID]: Google Cloud Project ID -[BUCKET-ID]: Bucket ID +Write usage: rdump gcs://[BUCKET_ID]/path?project_id=[PROJECT_ID] +[PROJECT_ID]: Google Cloud Project ID +[BUCKET_ID]: Bucket ID [path]: Path to write records to """ @@ -30,11 +30,10 @@ class GcsReader(AbstractReader): - def __init__(self, uri: str, path: str, selector: Union[None, Selector, CompiledSelector] = None, **kwargs) -> None: + def __init__(self, uri: str, project: str, selector: Union[None, Selector, CompiledSelector] = None, **kwargs): self.selector = selector - project_name, _, bucket_name = uri.partition(":") - - self.gcs = Client(project=project_name) + bucket_name, _, path = uri.partition("/") + self.gcs = Client(project=project) self.bucket = self.gcs.bucket(bucket_name) # GCS Doesn't support iterating blobs using a glob pattern, so we have to do that ourselves. To extract the path @@ -61,11 +60,11 @@ def close(self) -> None: class GcsWriter(AbstractWriter): - def __init__(self, uri: str, path: str, **kwargs): - project_name, _, bucket_name = uri.partition(":") + def __init__(self, uri: str, project: str, **kwargs): + bucket_name, _, path = uri.partition("/") self.writer = None - self.gcs = Client(project=project_name) + self.gcs = Client(project=project) self.bucket = self.gcs.bucket(bucket_name) blob = self.bucket.blob(path) diff --git a/pyproject.toml b/pyproject.toml index c519170b..c7fd64ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,6 +49,9 @@ geoip = [ avro = [ "fastavro[snappy]", ] +gcs = [ + "google-cloud-storage", +] test = [ "lz4", "zstandard", diff --git a/tests/test_gcs_adapter.py b/tests/test_gcs_adapter.py index b2b2886b..b20ea1c6 100644 --- a/tests/test_gcs_adapter.py +++ b/tests/test_gcs_adapter.py @@ -54,20 +54,20 @@ def test_gcs_uri_and_path(mock_google_sdk: MagicMock) -> None: mock_client = MagicMock() mock_google_sdk.cloud.storage.client.Client.return_value = mock_client - adapter_with_glob = RecordAdapter("gcs://test-project:test-bucket?path=/path/to/records/*/*.avro") + adapter_with_glob = RecordAdapter("gcs://test-bucket/path/to/records/*/*.avro", project="test-project") assert isinstance(adapter_with_glob, GcsReader) mock_google_sdk.cloud.storage.client.Client.assert_called_with(project="test-project") mock_client.bucket.assert_called_with("test-bucket") - assert adapter_with_glob.prefix == "/path/to/records/" - assert adapter_with_glob.pattern == "/path/to/records/*/*.avro" + assert adapter_with_glob.prefix == "path/to/records/" + assert adapter_with_glob.pattern == "path/to/records/*/*.avro" - adapter_without_glob = RecordAdapter("gcs://test-project:test-bucket?path=/path/to/records/test-records.rec") + adapter_without_glob = RecordAdapter("gcs://test-bucket/path/to/records/test-records.rec", project="test-project") assert isinstance(adapter_without_glob, GcsReader) - assert adapter_without_glob.prefix == "/path/to/records/test-records.rec" + assert adapter_without_glob.prefix == "path/to/records/test-records.rec" assert adapter_without_glob.pattern is None @@ -90,14 +90,14 @@ def test_gcs_reader_glob(mock_google_sdk) -> None: # Create a mocked instance of the 'Blob' class of google.cloud.storage.fileio recordsfile_blob_mock = MagicMock() - recordsfile_blob_mock.name = "/path/to/records/subfolder/results/tests.records" + recordsfile_blob_mock.name = "path/to/records/subfolder/results/tests.records" recordsfile_blob_mock.data = mock_recordstream recordsfile_blob_mock.size = len(mock_recordstream) # As this blob is located in the '🍩 select' folder, it should not match with the glob that will be used later # (which requires /results/ to be present in the path string) wrong_location_blob = MagicMock() - wrong_location_blob.name = "/path/to/records/subfolder/donutselect/tests.records" + wrong_location_blob.name = "path/to/records/subfolder/donutselect/tests.records" wrong_location_blob.size = 0x69 wrong_location_blob.data = b"" @@ -110,7 +110,8 @@ def test_gcs_reader_glob(mock_google_sdk) -> None: mock_google_sdk.cloud.storage.fileio.BlobReader.return_value = mock_reader with patch("io.open", MagicMock(return_value=mock_reader)): adapter = RecordAdapter( - url="gcs://test-project:test-bucket?path=/path/to/records/*/results/*.records", + url="gcs://test-bucket/path/to/records/*/results/*.records", + project="test-project", selector="r.idx >= 5", ) @@ -118,7 +119,7 @@ def test_gcs_reader_glob(mock_google_sdk) -> None: mock_client.bucket.assert_called_with("test-bucket") mock_client.list_blobs.assert_called_with( bucket_or_name="test-bucket-returned-from-client", - prefix="/path/to/records/", + prefix="path/to/records/", ) # We expect the GCS Reader to skip over blobs of size 0, as those will inherently not contain records. @@ -143,7 +144,7 @@ def test_gcs_writer(mock_google_sdk) -> None: mock_writer = MagicMock(wraps=test_buf, spec=BytesIO) mock_google_sdk.cloud.storage.fileio.BlobWriter.return_value = mock_writer - adapter = RecordAdapter("gcs://test-project:test-bucket?path=/test/test.records", out=True) + adapter = RecordAdapter("gcs://test-bucket/test/test.records", project="test-project", out=True) assert isinstance(adapter, GcsWriter) From 4f0324975e8350af38930699d0040762ee707661 Mon Sep 17 00:00:00 2001 From: Max Groot <19346100+MaxGroot@users.noreply.github.com> Date: Mon, 27 Nov 2023 14:08:43 +0100 Subject: [PATCH 07/10] Implement code review suggestions --- flow/record/adapter/gcs.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/flow/record/adapter/gcs.py b/flow/record/adapter/gcs.py index 66d710cf..ac6f7159 100644 --- a/flow/record/adapter/gcs.py +++ b/flow/record/adapter/gcs.py @@ -1,27 +1,28 @@ +from __future__ import annotations + import logging import re from fnmatch import fnmatch -from typing import Iterator, Union +from typing import Iterator from google.cloud.storage.client import Client from google.cloud.storage.fileio import BlobReader, BlobWriter from flow.record.adapter import AbstractReader, AbstractWriter from flow.record.base import Record, RecordAdapter -from flow.record.selector import CompiledSelector, Selector +from flow.record.selector import Selector __usage__ = """ Google Cloud Storage adapter --- -Read usage: rdump gcs://[BUCKET_ID]/path?project_id=[PROJECT_ID] -[PROJECT_ID]: Google Cloud Project ID -[BUCKET_ID]: Bucket ID -[path]: Path to look for files, with support for glob-pattern matching +Read usage: rdump gcs://[BUCKET_ID]/path?project=[PROJECT] +Write usage: rdump -w gcs://[BUCKET_ID]/path?project=[PROJECT] -Write usage: rdump gcs://[BUCKET_ID]/path?project_id=[PROJECT_ID] -[PROJECT_ID]: Google Cloud Project ID [BUCKET_ID]: Bucket ID -[path]: Path to write records to +[path]: Path to read from or write to, supports glob-pattern matching when reading + +Optional arguments: + [PROJECT]: Google Cloud Project ID, If not passed, falls back to the default inferred from the environment. """ log = logging.getLogger(__name__) @@ -30,7 +31,7 @@ class GcsReader(AbstractReader): - def __init__(self, uri: str, project: str, selector: Union[None, Selector, CompiledSelector] = None, **kwargs): + def __init__(self, uri: str, *, project: str | None = None, selector: Selector | None = None, **kwargs): self.selector = selector bucket_name, _, path = uri.partition("/") self.gcs = Client(project=project) From 6624d73bcaec338576126731b152fc703c0f1bff Mon Sep 17 00:00:00 2001 From: Max Groot <19346100+MaxGroot@users.noreply.github.com> Date: Mon, 1 Apr 2024 11:29:37 +0200 Subject: [PATCH 08/10] Support transparent compression when writing to fileobj --- flow/record/base.py | 70 +++++++++++++++++++----------------- tests/test_record_adapter.py | 20 +++++++++-- 2 files changed, 55 insertions(+), 35 deletions(-) diff --git a/flow/record/base.py b/flow/record/base.py index 80874931..ed9138e6 100644 --- a/flow/record/base.py +++ b/flow/record/base.py @@ -726,6 +726,29 @@ def open_path_or_stream(path: Union[str, Path, BinaryIO], mode: str, clobber: bo raise ValueError(f"Unsupported path type {path}") +def wrap_in_compression(fp: BinaryIO, mode: str, path: str) -> BinaryIO: + if path.endswith(".gz"): + return gzip.GzipFile(fileobj=fp, mode=mode) + elif path.endswith(".bz2"): + if not HAS_BZ2: + raise RuntimeError("bz2 python module not available") + return bz2.BZ2File(fp, mode) + elif path.endswith(".lz4"): + if not HAS_LZ4: + raise RuntimeError("lz4 python module not available") + return lz4.open(fp, mode) + elif path.endswith((".zstd", ".zst")): + if not HAS_ZSTD: + raise RuntimeError("zstandard python module not available") + if "w" not in mode: + dctx = zstd.ZstdDecompressor() + return dctx.stream_reader(fp) + else: + cctx = zstd.ZstdCompressor() + return cctx.stream_writer(fp) + return fp + + def open_path(path: str, mode: str, clobber: bool = True) -> IO: """ Open ``path`` using ``mode`` and returns a file object. @@ -755,40 +778,18 @@ def open_path(path: str, mode: str, clobber: bool = True) -> IO: if not is_stdio and not clobber and os.path.exists(path) and out: raise IOError("Output file {!r} already exists, and clobber=False".format(path)) - # check path extension for compression - if path: - if path.endswith(".gz"): - fp = gzip.GzipFile(path, mode) - elif path.endswith(".bz2"): - if not HAS_BZ2: - raise RuntimeError("bz2 python module not available") - fp = bz2.BZ2File(path, mode) - elif path.endswith(".lz4"): - if not HAS_LZ4: - raise RuntimeError("lz4 python module not available") - fp = lz4.open(path, mode) - elif path.endswith((".zstd", ".zst")): - if not HAS_ZSTD: - raise RuntimeError("zstandard python module not available") - if not out: - dctx = zstd.ZstdDecompressor() - fp = dctx.stream_reader(open(path, "rb")) - else: - cctx = zstd.ZstdCompressor() - fp = cctx.stream_writer(open(path, "wb")) - # normal file or stdio for reading or writing - if not fp: - if is_stdio: - if binary: - fp = getattr(sys.stdout, "buffer", sys.stdout) if out else getattr(sys.stdin, "buffer", sys.stdin) - else: - fp = sys.stdout if out else sys.stdin + if is_stdio: + if binary: + fp = getattr(sys.stdout, "buffer", sys.stdout) if out else getattr(sys.stdin, "buffer", sys.stdin) else: - fp = io.open(path, mode) - # check if we are reading a compressed stream - if not out and binary: - fp = open_stream(fp, mode) + fp = sys.stdout if out else sys.stdin + else: + fp = wrap_in_compression(io.open(path, mode), mode, path) + + # check if we are reading a compressed stream + if not out and binary: + fp = open_stream(fp, mode) return fp @@ -831,6 +832,11 @@ def RecordAdapter( cls_url = p.netloc + p.path if sub_adapter: cls_url = sub_adapter + "://" + cls_url + + # If the destination path ends with a compression extension, we wrap the fileobj in a transparent compressor. + if out and fileobj is not None: + fileobj = wrap_in_compression(fileobj, "wb", p.path) + if out is False: if url in ("-", "", None) and fileobj is None: # For reading stdin, we cannot rely on an extension to know what sort of stream is incoming. Thus, we will diff --git a/tests/test_record_adapter.py b/tests/test_record_adapter.py index 310e3cb2..eabef0ee 100644 --- a/tests/test_record_adapter.py +++ b/tests/test_record_adapter.py @@ -1,6 +1,7 @@ import datetime import platform import sys +from gzip import GzipFile from io import BytesIO import pytest @@ -22,6 +23,8 @@ HAS_LZ4, HAS_ZSTD, LZ4_MAGIC, + RECORDSTREAM_MAGIC, + RECORDSTREAM_MAGIC_DEPTH, ZSTD_MAGIC, ) from flow.record.selector import CompiledSelector, Selector @@ -476,12 +479,16 @@ def test_csvfilereader(tmp_path): assert rec.count == "2" -def test_file_like_writer_reader() -> None: +@pytest.mark.parametrize("use_gzip", [False, True]) +def test_file_like_writer_reader(use_gzip: bool) -> None: test_buf = BytesIO() - adapter = RecordAdapter(fileobj=test_buf, out=True) + url = "nonexistent/path/my_records.gz" if use_gzip else None + adapter = RecordAdapter(url, fileobj=test_buf, out=True) assert isinstance(adapter, StreamWriter) + if use_gzip: + assert isinstance(adapter.fp, GzipFile) # Add mock records test_records = list(generate_records(10)) @@ -491,7 +498,14 @@ def test_file_like_writer_reader() -> None: adapter.flush() # Grab the bytes before closing the BytesIO object. - read_buf = BytesIO(test_buf.getvalue()) + written_bytes = test_buf.getvalue() + + if use_gzip: + assert written_bytes.startswith(GZIP_MAGIC) + else: + assert written_bytes[:RECORDSTREAM_MAGIC_DEPTH].endswith(RECORDSTREAM_MAGIC) + + read_buf = BytesIO(written_bytes) # Close the writer and assure the object has been closed adapter.close() From 92390bfde4817baf1e3ba9c0c8645d50dc416a6a Mon Sep 17 00:00:00 2001 From: Max Groot <19346100+MaxGroot@users.noreply.github.com> Date: Mon, 1 Apr 2024 13:20:59 +0200 Subject: [PATCH 09/10] Fix flushing & closing, make write test use gzip --- flow/record/adapter/gcs.py | 10 +++++----- tests/test_gcs_adapter.py | 9 ++++++--- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/flow/record/adapter/gcs.py b/flow/record/adapter/gcs.py index ac6f7159..2eeaf971 100644 --- a/flow/record/adapter/gcs.py +++ b/flow/record/adapter/gcs.py @@ -76,13 +76,13 @@ def write(self, record: Record) -> None: self.adapter.write(record) def flush(self) -> None: - # https://cloud.google.com/python/docs/reference/storage/latest/google.cloud.storage.fileio.BlobWriter) - # Flushing without closing is not supported by the remote service and therefore calling it on this class - # normally results in io.UnsupportedOperation. However, that behavior is incompatible with some consumers and - # wrappers of fileobjects in Python. - pass + # The underlying adapter may require flushing + self.adapter.flush() def close(self) -> None: + self.flush() + self.adapter.close() + if self.writer: self.writer.close() self.writer = None diff --git a/tests/test_gcs_adapter.py b/tests/test_gcs_adapter.py index b20ea1c6..ede57ed5 100644 --- a/tests/test_gcs_adapter.py +++ b/tests/test_gcs_adapter.py @@ -8,6 +8,7 @@ import pytest from flow.record import Record, RecordAdapter, RecordDescriptor, RecordStreamWriter +from flow.record.base import GZIP_MAGIC def generate_records(amount) -> Generator[Record, Any, None]: @@ -144,7 +145,7 @@ def test_gcs_writer(mock_google_sdk) -> None: mock_writer = MagicMock(wraps=test_buf, spec=BytesIO) mock_google_sdk.cloud.storage.fileio.BlobWriter.return_value = mock_writer - adapter = RecordAdapter("gcs://test-bucket/test/test.records", project="test-project", out=True) + adapter = RecordAdapter("gcs://test-bucket/test/test.records.gz", project="test-project", out=True) assert isinstance(adapter, GcsWriter) @@ -153,11 +154,13 @@ def test_gcs_writer(mock_google_sdk) -> None: for record in test_records: adapter.write(record) - # For GCS, the flush() function should do nothing, as GCS does not support it. adapter.flush() - mock_writer.flush.assert_not_called() + mock_writer.flush.assert_called() # Grab the bytes before it's too late + written_bytes = test_buf.getvalue() + assert written_bytes.startswith(GZIP_MAGIC) + read_buf = BytesIO(test_buf.getvalue()) # Close the writer and assure the object has been closed From 028b4e47cec1fe620ffd36f4073bea1470c16fed Mon Sep 17 00:00:00 2001 From: Max Groot <19346100+MaxGroot@users.noreply.github.com> Date: Mon, 1 Apr 2024 13:23:57 +0200 Subject: [PATCH 10/10] Implement review suggestion --- flow/record/adapter/gcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/record/adapter/gcs.py b/flow/record/adapter/gcs.py index 2eeaf971..376415e9 100644 --- a/flow/record/adapter/gcs.py +++ b/flow/record/adapter/gcs.py @@ -61,7 +61,7 @@ def close(self) -> None: class GcsWriter(AbstractWriter): - def __init__(self, uri: str, project: str, **kwargs): + def __init__(self, uri: str, *, project: str | None = None, **kwargs): bucket_name, _, path = uri.partition("/") self.writer = None