- 
                Notifications
    You must be signed in to change notification settings 
- Fork 130
feat: add ws msg ack to runner protocol #3275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add ws msg ack to runner protocol #3275
Conversation
| The latest updates on your projects. Learn more about Vercel for GitHub. 
 3 Skipped Deployments
 | 
| 
 How to use the Graphite Merge QueueAdd the label merge-queue to this PR to add it to the merge queue. You must have a Graphite account in order to use the merge queue. Sign up using this link. An organization admin has enabled the Graphite Merge Queue in this repository. Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue. This stack of pull requests is managed by Graphite. Learn more about stacking. | 
| More templates
 
 @rivetkit/actor
 @rivetkit/cloudflare-workers
 @rivetkit/core
 @rivetkit/db
 @rivetkit/framework-base
 @rivetkit/next-js
 @rivetkit/react
 rivetkit
 @rivetkit/sql-loader
 @rivetkit/engine-runner
 @rivetkit/engine-runner-protocol
 commit:  | 
| PR Review: WebSocket Message Acknowledgment FeatureSummaryOverall Assessment: Approve with Minor Suggestions This PR adds WebSocket message acknowledgment and hibernation support to the runner protocol. Well-implemented feature with solid error handling that follows codebase conventions. Positive Aspects
 Critical Issues1. GC Logic Inverted (shared_state.rs:420-421, 428-429)The retention logic is backwards - keeps timed-out connections and removes valid ones. Current:  Impact: Critical - causes premature cleanup of valid connections. 2. Race Condition in Entry::Occupied (shared_state.rs:103-108)Clearing pending_msgs while they may be in flight could cause message loss when old msg_tx channel is dropped. Suggestion: Drain pending messages before clearing or document expected behavior. 3. Missing Index Validation (tunnel.ts:713)_handleMessage passes msg.index without validation before reaching __ackWebsocketMessage bounds check. Medium Priority Issues
 Low Priority
 Security & PerformanceSecurity: Good practices - size limits prevent exhaustion, timeouts prevent leaks, no unvalidated input. Minor message replay concern. Performance: Batched acks and efficient storage. Consider VecDeque for O(1) retention operations. Test Coverage GapsMissing tests for: 
 RecommendationsMust Fix Before Merge:
 Should Fix:
 Nice to Have:
 ConclusionSolid implementation of a complex distributed systems feature. The acknowledgment protocol is well-designed and the hibernation mechanism is creative. Critical bugs found are fixable with minimal changes. Will significantly improve WebSocket reliability once issues are addressed. Recommendation: Approve with required fixes for GC logic and race condition. Generated with Claude Code | 
3d340c9    to
    7edb49a      
    Compare
  
    | Pull Request Review: WebSocket Message Acknowledgment ProtocolOverviewThis PR adds message acknowledgment support to the runner protocol for WebSocket connections, enabling reliable message delivery with hibernation/reconnection support. This is a substantial enhancement to the WebSocket tunneling infrastructure. Code Quality & Architecture✅ Strengths
 🔍 Potential IssuesCritical
 hs.total_pending_ws_msgs_size += message_serialized.len() as u64;
if hs.total_pending_ws_msgs_size > MAX_PENDING_MSGS_SIZE_PER_REQ
	|| hs.pending_ws_msgs.len() >= u16::MAX as usizeThe size tracking never decreases when messages are acknowledged. This will cause false positives and connection drops after sending 1MB total data, even if most was already acked. Fix: Decrement  pub async fn ack_pending_websocket_messages(...) -> Result<()> {
    // ... existing code ...
    hs.pending_ws_msgs.retain(|msg| {
        let keep = wrapping_gt(msg_index, ack_index);
        if !keep {
            hs.total_pending_ws_msgs_size -= msg.payload.len() as u64;
        }
        keep
    });
}
 let de = protocol::versioned::ToClient::deserialize_with_embedded_version(
    &pending_msg.payload,
);This deserialization result  
 keep = now.duration_since(earliest_pending_msg.send_instant)
    > MESSAGE_ACK_TIMEOUT;This logic is inverted - it will keep requests that have timed out and drop ones that haven't. Should be  High Priority
 // TODO: This ends up skipping 0 as an index when initiated but whateverSkipping index 0 could cause off-by-one issues in client tracking. This should be properly fixed or documented why it's acceptable. 
 Entry::Occupied(mut entry) => {
    entry.receiver_subject = receiver_subject;
    entry.msg_tx = msg_tx;
    entry.opened = false;
    entry.pending_msgs.clear();
}When reusing an existing request entry,  Medium Priority
 // NOTE: This gets set in shared_state.tsThe file is actually  
 // fn wrapping_lt(a: u16, b: u16) -> bool {
//     b.wrapping_sub(a) < u16::MAX / 2
// }Either remove commented code or add a comment explaining why it's kept for future use. 
 return Err(WebsocketPendingLimitReached {}.build());The struct definition was changed to have no fields, but this could be more informative by including actual limit values for debugging. 
 // Denotes that the connection did not fail, but needs to be retried to
// resole a new target"resole" should be "resolve". 🔒 Security Concerns
 ⚡ Performance Considerations
 payload: message_serialized.clone(),Every WebSocket message is cloned for hibernation storage. For high-throughput connections, this adds significant overhead. Consider using  
 🧪 Test CoverageMissing Tests: 
 Recommendation: Add tests for: #[cfg(test)]
mod tests {
    #[test]
    fn test_wrapping_gt_wraparound() {
        assert!(wrapping_gt(1, u16::MAX));
        assert!(wrapping_gt(u16::MAX/2, 0));
        assert!(!wrapping_gt(u16::MAX, 1));
    }
    
    // Add tests for ack logic, hibernation, etc.
}📋 Additional Notes
 SummaryThis is a well-architected feature that significantly improves WebSocket reliability. However, there are several critical bugs that need to be fixed before merging: Must Fix: 
 Should Fix: 
 Consider: 
 Overall: Approve with required changes ✅ (pending fixes for critical issues) Great work on this complex feature! The acknowledgment protocol design is solid, just needs the implementation bugs ironed out. | 
7edb49a    to
    e662b92      
    Compare
  
    049fe1f    to
    dafcccf      
    Compare
  
    | Code Review: WebSocket Message Acknowledgment SystemI've completed a thorough review of PR #3275 which adds WebSocket message acknowledgment to the runner protocol. Here's my comprehensive feedback: OverviewThis PR implements a sophisticated message acknowledgment system for WebSocket communications between the gateway and runners, enabling reliable message delivery with hibernation support. The implementation appears solid overall with good error handling patterns. 🟢 Strengths1. Protocol Design
 2. Hibernation State Management
 3. Error Handling
 4. Timeout Management
 🟡 Issues & ConcernsHigh Priority1. Logic Bug in GC Retention (engine/packages/pegboard-gateway/src/shared_state.rs:420-422)let mut keep = true;
if let Some(earliest_pending_msg) = req.pending_msgs.first() {
    keep = now.duration_since(earliest_pending_msg.send_instant)
        > MESSAGE_ACK_TIMEOUT;  // ❌ Should be: < MESSAGE_ACK_TIMEOUT or use <
}Problem: The condition is inverted. Currently it keeps requests that have exceeded the timeout and removes those that haven't, which is backwards. Expected: Keep requests where the oldest pending message is within the timeout window. Fix: keep = now.duration_since(earliest_pending_msg.send_instant) < MESSAGE_ACK_TIMEOUT;The same issue exists at line 429 for hibernation state. 2. Potential Index Skip Issue (engine/packages/pegboard-gateway/src/shared_state.rs:137)// TODO: This ends up skipping 0 as an index when initiated but whatever
msg.index = hs.last_ws_msg_index.wrapping_add(1);The TODO suggests index 0 is skipped on init. This could cause issues: 
 Recommendation: Either properly initialize to start at 0, or document why skipping 0 is acceptable. Medium Priority3. Commented Debug Code (multiple files)// fn wrapping_lt(a: u16, b: u16) -> bool {
//     b.wrapping_sub(a) < u16::MAX / 2
// }Dead code should be removed rather than commented out. 4. Inconsistent Error Variable Naming (engine/packages/pegboard-gateway/src/shared_state.rs:230-235)The ack handling has this warning: if prev_len == in_flight.pending_msgs.len() {
    tracing::warn!("pending message does not exist or ack received after message body")
}This suggests a potential race condition. Consider: 
 5. Missing Index ValidationWhen acking messages via  Low Priority6. Hardcoded Limits (engine/packages/pegboard-gateway/src/shared_state.rs:18)const MAX_PENDING_MSGS_SIZE_PER_REQ: u64 = util::size::mebibytes(1);Consider making this configurable per-actor or per-runner for different use cases (e.g., high-throughput actors might need more buffer). 7. Tracing ImprovementsSeveral trace points could benefit from structured fields: // Current
tracing::debug!("gateway waiting for response from tunnel");
// Better
tracing::debug!(request_id=?Uuid::from_bytes(request_id), "gateway waiting for response from tunnel");This would help with debugging production issues. 📋 Code QualityStyle & Conventions ✅
 Logging Patterns ✅
 Error Patterns ✅
 🔒 Security Considerations✅ Good
 
 | 
| Pull Request Review: WebSocket Message Acknowledgment ProtocolThis PR adds a critical reliability feature to the runner protocol - WebSocket message acknowledgments for guaranteed message delivery during actor hibernation. The implementation is generally solid with good attention to reliability patterns. Here are my findings: ✅ StrengthsProtocol Design
 Implementation Quality
 Reliability Features
 
 | 
| Pull Request Review: WebSocket Message Acknowledgment ProtocolOverviewThis PR introduces a WebSocket message acknowledgment system to the runner protocol (v2), enabling reliable message delivery with hibernation support. The implementation adds message indexing, acknowledgment tracking, and automatic retry/resend capabilities. Positive Aspects✅ Well-Designed Protocol
 ✅ Proper Error Handling
 ✅ Comprehensive Implementation
 Issues & Concerns🔴 Critical: Timeout MismatchLocation:  Gateway timeout: 30 seconds const MESSAGE_ACK_TIMEOUT: Duration = Duration::from_secs(30);Runner timeout: 5 seconds const MESSAGE_ACK_TIMEOUT = 5000; // 5 secondsImpact: The runner may close connections or retry 6x earlier than the gateway expects. This could lead to: 
 Recommendation: Synchronize these timeouts. Consider making the runner timeout slightly shorter (e.g., 28s) to give it time to initiate cleanup before gateway timeout. 
 | 
d7f7fba    to
    a7d0289      
    Compare
  
    | Pull Request Review: WebSocket Message AcknowledgmentOverviewThis PR adds WebSocket message acknowledgment to the runner protocol (v2), enabling reliable message delivery with hibernation support for actors. The changes are substantial but well-structured. 📋 Summary of Changes
 ✅ Strengths1. Well-Designed ProtocolThe new v2 protocol schema is clean and comprehensive: 
 2. Robust Message TrackingThe  
 3. Error HandlingGood addition of specific error types: 
 4. Graceful DegradationThe retry logic with hibernation support allows actors to: 
 
 | 
a7d0289    to
    a3e2e41      
    Compare
  
    | PR Review: WebSocket Message Acknowledgment ImplementationOverviewThis PR adds a message acknowledgment mechanism to the runner protocol for WebSocket communications. The changes enable reliable message delivery tracking between the gateway and runners, which is critical for production reliability. Positive Aspects ✅1. Well-Structured Protocol Changes
 2. Comprehensive Error Handling
 3. Robust Acknowledgment Tracking
 4. TypeScript SDK Improvements
 Issues & Concerns  | 

No description provided.