Skip to content

feat: add row_num interface to IRecordBatchSupplier#79

Open
liulx20 wants to merge 4 commits intoalibaba:mainfrom
liulx20:row-num
Open

feat: add row_num interface to IRecordBatchSupplier#79
liulx20 wants to merge 4 commits intoalibaba:mainfrom
liulx20:row-num

Conversation

@liulx20
Copy link
Collaborator

@liulx20 liulx20 commented Mar 18, 2026

Fixes

Greptile Summary

This PR adds a row_num() interface to IRecordBatchSupplier and uses it in insert_vertices_impl to pre-allocate indexer capacity before iterating batches, replacing the previous per-batch growth checks. While the intent is sound (avoid repeated reallocation), the implementation has several correctness and robustness problems that should be resolved before merging.

Key issues found:

  • Compile breakODPSStreamRecordBatchSupplier and ODPSTableRecordBatchSupplier in odps_fragment_loader.h inherit from IRecordBatchSupplier but do not implement the new pure virtual row_num(). The build will fail until these are updated.
  • Infinite loop / OOM risk (vertex_table.h:280-285) — supplier->row_num() returns int64_t; adding it to size_t with no sign-check means a negative return (e.g. uninitialized row_num_) wraps to SIZE_MAX, and the capacity growth loop can never terminate.
  • No safety net if row_num() is wrong (vertex_table.h) — The per-batch capacity guard was removed entirely. If the upfront count is inaccurate, there is no fallback to prevent inserting past the allocated capacity.
  • row_num_ uninitialized (loader_utils.cc:242) — CSVStreamRecordBatchSupplier does not initialize row_num_ in its constructor initializer list; if the constructor exits early the field holds an indeterminate value.
  • nullptr for arrow::StopToken (loader_utils.cc:257) — CountRowsAsync expects a value-type StopToken, not a pointer; this implicit conversion may not compile on all Arrow versions.
  • Double file I/O (loader_utils.cc) and double scanner scan (reader.cc) — The CSV file is opened and read twice in the constructor; the Arrow scanner is fully iterated twice. For large datasets this doubles construction time.
  • Log contradicts throw (reader.cc:191-194) — The WARNING log says "Proceeding without row count" but the very next line throws an exception, making a pre-allocation hint a hard failure.
  • SupplierWrapperWithFirstBatch::row_num() undercounts (loader_utils.h:115) — first_batch_ rows are not included in the sum when has_first_batch_ is true.
  • GeneratedRecordBatchSupplier::row_num() shrinks (tests/unittest/utils.h:43) — batches_ is drained by pop_back(), so the count decreases as batches are consumed rather than reflecting the original total.

Confidence Score: 1/5

  • Not safe to merge — the PR introduces a compile break and multiple correctness issues including an infinite loop risk and removal of the per-batch safety guard.
  • Score of 1 reflects: (1) an outright compile failure due to ODPS subclasses not implementing the new pure virtual method, (2) an infinite-loop / OOM hazard from the signed→unsigned conversion in the capacity loop, (3) removal of the per-batch fallback guard with no substitute, and (4) several additional robustness problems (uninitialized field, double I/O, misleading error handling) across the changed files.
  • include/neug/storages/loader/odps_fragment_loader.h (compile break), include/neug/storages/graph/vertex_table.h (infinite loop risk), src/storages/loader/loader_utils.cc (uninitialized field, double I/O), src/utils/reader/reader.cc (double scan, incorrect error handling)

Important Files Changed

Filename Overview
include/neug/storages/graph/vertex_table.h Capacity pre-allocation moved before the batch loop and now relies solely on row_num() accuracy. Implicit int64_tsize_t addition at line 280 can wrap to SIZE_MAX on a negative return, causing the growth loop (line 283) to spin forever.
include/neug/storages/loader/loader_utils.h Adds row_num() to all in-tree concrete suppliers. SupplierWrapperWithFirstBatch::row_num() silently omits first_batch_ rows. ArrowRecordBatchStreamSupplier constructor is now a breaking change requiring a second row_num argument. ODPS subclasses (in odps_fragment_loader.h) are not updated and will fail to compile.
src/storages/loader/loader_utils.cc Opens and fully reads the CSV file a second time to count rows. Passes nullptr where Arrow's CountRowsAsync expects a value-type arrow::StopToken, risking a compile failure. row_num_ is not initialized in the constructor initializer list, leaving it indeterminate if the constructor throws before the assignment.
src/utils/reader/reader.cc Performs a full extra scan via CountRows() before ToRecordBatchReader(). On CountRows() failure the function throws despite the log saying "Proceeding without row count", turning a hint into a hard requirement and breaking reads that could otherwise succeed.
tests/unittest/utils.h Adds row_num() to the test helper, but iterates over batches_ which shrinks as GetNextBatch() pops elements — so the count decreases with each consumed batch rather than staying fixed at the total.
include/neug/storages/loader/odps_fragment_loader.h ODPSStreamRecordBatchSupplier and ODPSTableRecordBatchSupplier both inherit from IRecordBatchSupplier but neither implements the newly-added pure virtual row_num(), making both classes abstract and preventing compilation.

Class Diagram

%%{init: {'theme': 'neutral'}}%%
classDiagram
    class IRecordBatchSupplier {
        <<abstract>>
        +GetNextBatch() RecordBatch*
        +row_num() int64_t
    }

    class SupplierWrapperWithFirstBatch {
        -suppliers_ vector
        -first_batch_ RecordBatch
        -has_first_batch_ bool
        +GetNextBatch() RecordBatch*
        +row_num() int64_t
    }

    class CSVStreamRecordBatchSupplier {
        -row_num_ int64_t
        -file_path_ string
        -reader_ StreamingReader
        +GetNextBatch() RecordBatch*
        +row_num() int64_t
    }

    class CSVTableRecordBatchSupplier {
        -table_ Table
        +GetNextBatch() RecordBatch*
        +row_num() int64_t
    }

    class ArrowRecordBatchArraySupplier {
        -arrays_ vector~vector~
        +GetNextBatch() RecordBatch*
        +row_num() int64_t
    }

    class ArrowRecordBatchStreamSupplier {
        -row_num_ int64_t
        -reader_ RecordBatchReader
        +GetNextBatch() RecordBatch*
        +row_num() int64_t
    }

    class ODPSStreamRecordBatchSupplier {
        +GetNextBatch() RecordBatch*
        ❌ row_num() MISSING
    }

    class ODPSTableRecordBatchSupplier {
        -table_ Table
        +GetNextBatch() RecordBatch*
        ❌ row_num() MISSING
    }

    class GeneratedRecordBatchSupplier {
        -batches_ vector
        +GetNextBatch() RecordBatch*
        +row_num() int64_t
    }

    IRecordBatchSupplier <|-- SupplierWrapperWithFirstBatch
    IRecordBatchSupplier <|-- CSVStreamRecordBatchSupplier
    IRecordBatchSupplier <|-- CSVTableRecordBatchSupplier
    IRecordBatchSupplier <|-- ArrowRecordBatchArraySupplier
    IRecordBatchSupplier <|-- ArrowRecordBatchStreamSupplier
    IRecordBatchSupplier <|-- ODPSStreamRecordBatchSupplier
    IRecordBatchSupplier <|-- ODPSTableRecordBatchSupplier
    IRecordBatchSupplier <|-- GeneratedRecordBatchSupplier
Loading

Comments Outside Diff (2)

  1. src/storages/loader/loader_utils.cc, line 242-265 (link)

    P0 row_num_ left uninitialized on counting failure

    The member row_num_ is not initialized in the constructor's initializer list, and there are two code paths where it is never assigned a value:

    1. When count_file_result.ok() returns false (the outer else branch).
    2. When count_result.ok() returns false (the inner else branch).

    In both cases, row_num_ retains an indeterminate value (undefined behavior in C++). The LOG(WARNING) message even says "Proceeding with row_num_=0" — but row_num_ is never actually set to 0.

    This means the subsequently computed new_size in vertex_table.h could be wildly large, causing either an out-of-memory EnsureCapacity call or overflowing new_size.

    Fix: initialize row_num_ to 0 in the constructor initializer list:

  2. include/neug/storages/loader/odps_fragment_loader.h, line 109-151 (link)

    P0 ODPS suppliers missing row_num() — compile break

    row_num() is now declared pure virtual in IRecordBatchSupplier, but neither ODPSStreamRecordBatchSupplier (line 109) nor ODPSTableRecordBatchSupplier (line 133) implement it. Both classes are therefore abstract and cannot be instantiated — any attempt to do so will fail at compile time.

    ODPSTableRecordBatchSupplier owns a table_ member (std::shared_ptr<arrow::Table>), so a trivial fix is:

    int64_t row_num() const override { return table_->num_rows(); }

    ODPSStreamRecordBatchSupplier does not load the table upfront, so a sensible fallback is:

    int64_t row_num() const override { return 0; }  // row count not available for streaming ODPS supplier

    Both implementations must be added before this PR can compile.

Last reviewed commit: "fix"

@liulx20
Copy link
Collaborator Author

liulx20 commented Mar 18, 2026

@greptile

@liulx20
Copy link
Collaborator Author

liulx20 commented Mar 18, 2026

@greptile

@liulx20
Copy link
Collaborator Author

liulx20 commented Mar 18, 2026

@greptile

@liulx20
Copy link
Collaborator Author

liulx20 commented Mar 18, 2026

@greptile

@liulx20
Copy link
Collaborator Author

liulx20 commented Mar 18, 2026

@greptile

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Performance] Importing openaire dataset with neug is about 4x slower than with neo4j

1 participant