-
Notifications
You must be signed in to change notification settings - Fork 433
refactor(servers): bulk insert service #7329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(servers): bulk insert service #7329
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR refactors the bulk insert service to improve code organization and performance monitoring. The key architectural change is moving FlightData decoding earlier in the pipeline (from the operator layer to the servers layer), implementing stream-based processing with per-batch timing responses. This allows for better separation of concerns where the servers layer handles protocol-specific encoding/decoding, while the operator layer focuses on business logic.
Key Changes
- Introduced
handle_put_record_batch_stream()method for stream-based bulk insert processing with per-batch responses containing elapsed time metrics - Moved FlightData decoding from operator layer to servers layer in
PutRecordBatchRequestStream, which now eagerly decodes the schema in its constructor - Enhanced
DoPutResponseto includeelapsed_secsfield for performance monitoring
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
src/common/grpc/src/flight/do_put.rs |
Added elapsed_secs field to DoPutResponse for per-batch timing metrics |
src/servers/src/query_handler/grpc.rs |
Refactored GrpcQueryHandler trait to accept decoded PutRecordBatchRequest and added stream handler method |
src/servers/src/grpc/flight.rs |
Moved FlightData decoding into PutRecordBatchRequestStream with eager schema extraction in constructor |
src/servers/src/grpc/greptime_handler.rs |
Simplified request handling to delegate to handler's stream processor and extract timing from responses |
src/operator/src/bulk_insert.rs |
Updated to accept pre-decoded RecordBatch and re-encode when forwarding to datanodes |
src/frontend/src/instance/grpc.rs |
Implemented stream-based processing with per-batch permission checks and timing |
src/servers/tests/mod.rs |
Updated test dummy implementations to match new trait signatures |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
refactor: decode FlightData early in put_record_batch pipeline - Move FlightDecoder usage from Inserter up to PutRecordBatchRequestStream, passing decoded RecordBatch and schema bytes instead of raw FlightData. - Eliminate redundant per-request decoding/encoding in Inserter; encode once and reuse for all region requests. - Streamline GrpcQueryHandler trait and implementations to accept PutRecordBatchRequest containing pre-decoded data. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
feat: stream-based bulk insert with per-batch responses - Introduce handle_put_record_batch_stream() to process Flight DoPut streams - Resolve table & permissions once, yield (request_id, AffectedRows) per batch - Replace loop-over-request with async-stream in frontend & server - Make PutRecordBatchRequestStream public for cross-crate usage Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
fix: propagate request_id with errors in bulk insert stream Changes the bulk-insert stream item type from Result<(i64, AffectedRows), E> to (i64, Result<AffectedRows, E>) so every emitted tuple carries the request_id even on failure, letting callers correlate errors with the originating request. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
refactor: unify DoPut response stream to return DoPutResponse Replace the tuple (i64, Result<AffectedRows>) with Result<DoPutResponse> throughout the gRPC bulk-insert path so the handler, adapter and server all speak the same type. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
feat: add elapsed_secs to DoPutResponse for bulk-insert timing - DoPutResponse now carries elapsed_secs field - Frontend measures and attaches insert duration - Server observes GRPC_BULK_INSERT_ELAPSED metric from response Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
refactor: unify Bytes import in flight module - Replace `bytes::Bytes` with `Bytes` alias for consistency - Remove redundant `ProstBytes` alias Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
fix: terminate gRPC stream on error and optimize FlightData handling - Stop retrying on stream errors in gRPC handler - Replace Vec1 indexing with into_iter().next() for FlightData - Remove redundant clones in bulk_insert and flight modules Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Improve permission check placement in `grpc.rs` - Moved the permission check for `BulkInsert` to occur before resolving the table reference in `GrpcQueryHandler` implementation. - Ensures permission validation is performed earlier in the process, potentially avoiding unnecessary operations if permission is denied. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
26c6338 to
7b88f2f
Compare
**Refactor Bulk Insert Handling in gRPC** - **`grpc.rs`**: - Switched from `async_stream::stream` to `async_stream::try_stream` for error handling. - Removed `body_size` parameter and added `flight_data` to `handle_bulk_insert`. - Simplified error handling and permission checks in `GrpcQueryHandler`. - **`bulk_insert.rs`**: - Added `raw_flight_data` parameter to `handle_bulk_insert`. - Calculated `body_size` from `raw_flight_data` and removed redundant encoding logic. - **`flight.rs`**: - Replaced `body_size` with `flight_data` in `PutRecordBatchRequest`. - Updated memory usage calculation to include `flight_data` components. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
perf(bulk_insert): encode record batch once per datanode Move FlightData encoding outside the per-region loop so the same encoded bytes are reused when mask.select_all(), eliminating redundant serialisation work. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
d24b156 to
6132fe5
Compare
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
What's changed and what's your intention?
This PR refactors the bulk insert service to improve code organization, performance monitoring, and error handling. The changes move FlightData decoding earlier in the pipeline, implement stream-based processing with per-batch responses.
PR Checklist
Please convert it to a draft if some of the following conditions are not met.