Skip to content

Conversation

@yunzheng
Copy link
Member

@yunzheng yunzheng commented Jan 21, 2026

Description

This PR adds support for reading and writing Apache Parquet files using pyarrow. This allows flow.record tools like rdump to interact with the Parquet ecosystem, enabling efficient storage and integration with other data analysis tools.

Implementation Details

The implementation introduces a new adapter in flow/record/adapter/parquet.py with the following features:

ParquetWriter

  • Usage: rdump -w parquet://[PATH]?batch_size=[BATCH_SIZE]&compression=[COMPRESSION] or rdump -w output.parquet

  • Compression: Supports snappy, gzip, brotli, zstd (default), lz4, and none.

  • Schema Handling:

    • Converts flow.record RecordDescriptor to PyArrow schemas.
    • Handles multiple schemas by splitting output into multiple files when the descriptor changes (since Parquet files strictly enforce a single schema).
    • Embeds flow.record descriptor metadata (descriptor_name, descriptor_fields) in the Parquet file metadata to ensure lossless round-tripping.
  • Type Mapping: Maps flow.record types to their Arrow equivalents:

    • Standard types: boolean, float, string, etc.
    • datetime -> timestamp('us', tz='UTC')
    • varint -> int64
    • digest -> struct<md5: binary, sha1: binary, sha256: binary>
    • path -> struct<path: string, path_type: uint8>

ParquetReader

  • Usage: rdump parquet://[PATH] or rdump dataset.parquet
  • Filtering: Supports column projection (fields and exclude arguments) to only read specific columns, leveraging Parquet's columnar nature for performance.
  • Schema Inference: Reconstruction of RecordDescriptor from Arrow schema, using embedded metadata if available, or inferring from Arrow types if not.

rdump

  • Column projection: New CLI arguments have been added to rdump to exclude or include columns when reading Parquet files:
    • --fields-read or -Fr: columns to read, skipping reading of other columns
    • --exclude-read or -Xr: columns to exclude/skip for reading

Related Improvements

  • Digest Field: Enhanced fieldtypes.digest to support initialization from bytes and fixed a bug where invalid values could be set.
  • An example Parquet file has been added to tests/_data/iris-zstd.parquet (git-lfs).
  • An .arrow adapter has also been added as a bonus to this PR. It's basically the IPC streaming format for Apache Arrow which could be handy for future.

Dependencies

pyarrow

pip install flow.record[parquet]

Usage Example

# Writing to Parquet
rdump input.records -w "parquet://output.parquet?batch_size=10000&compression=zstd"
rdump input.records -w output.parquet

# Reading from Parquet
rdump parquet://input.parquet
rdump input.parquet

# Exclude a single columns from reading
rdump input.parquet -Xr data

# Only read the following columns, skipping other columns
rdump input.parquet -Fr src_ip,dst_ip

resolves #207

- digest fields can now be initialized using bytes as well.
- fix bug where value would be set, even if the value was bad.
- added test_digest.py
@codecov
Copy link

codecov bot commented Jan 21, 2026

Codecov Report

❌ Patch coverage is 95.07246% with 17 lines in your changes missing coverage. Please review.
✅ Project coverage is 84.26%. Comparing base (c3f8cd8) to head (699ca53).

Files with missing lines Patch % Lines
flow/record/adapter/arrow.py 87.90% 15 Missing ⚠️
flow/record/fieldtypes/__init__.py 97.43% 1 Missing ⚠️
flow/record/stream.py 75.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #208      +/-   ##
==========================================
+ Coverage   83.33%   84.26%   +0.92%     
==========================================
  Files          35       37       +2     
  Lines        3714     4035     +321     
==========================================
+ Hits         3095     3400     +305     
- Misses        619      635      +16     
Flag Coverage Δ
unittests 84.26% <95.07%> (+0.92%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@yunzheng
Copy link
Member Author

yunzheng commented Jan 21, 2026

tox -e build-docs warnings now seems to be treated as errors, but there seems to be an incompatibility between ruff type checking and Sphinx type checking.

  • Sphinx prefers typing.Type, ruff prefers type.
  • One Sphinx error I cannot seem to get fixed which is:
    • def __getattr__(self, name: Literal["_desc"]) -> RecordDescriptor: ...

Could not solve the Literal["_desc"] -> RecordDescriptor warning, so going for suppressing the warning in Sphinx's conf.py.

Note that when Python 3.10 is deprecated and is not the base Python version anymore, sphinx will be auto upgraded to a newer version (eg: v9.0.4) that seems to throw more warnings.

This fixes the: ValueError: I/O operation on closed file warnings during
tests.

To summarize what was happening:

* Tests calling rdump.main() were setting up logging handlers
* When those tests finished, the handlers' streams were getting closed
* But the handlers remained attached to the root logger
* Python tried to write to the closed stream → ValueError: I/O operation on closed file

The `reset_logging` fixture ensures every test gets a clean logging state
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add Apache Parquet Adapter

2 participants