fix(topology): Fix for issue causing stalling on shutdown for sinks configured w/ disk buffers#24949
Conversation
- This fixes a bug where Vector waited until `batch.timeout_secs` to complete a configuration reload, which can cause errors if this value is large as buffers will start to overflow at the source. - Fixes: vectordotdev#24948
- This fixes an issue observed after shutdown signal (ctrl-c) is sent and Vector must ungracefully shutdown after its 60s grace period. - It was discovered the issue was the s3 sink waiting until `batch.timeout_secs` to flush its buffer, which only after completed lets shutdown proceed. Naturally if this value is high the 60s graceful shutdown timeout will expire first. - The solution is to send the shutdown trigger first to the sinks so that they flush their partial batches.
|
@codex review |
| // Signal all sinks to flush their partial batches and stop accepting | ||
| // input. Without this, sinks rely on the natural stream EOF which only |
There was a problem hiding this comment.
The bit about "stop accepting input" makes me more than a little nervous. Could this cause problems if upstream sources send more events?
There was a problem hiding this comment.
In my manual testing before this change vector would stall on shutdown. With this change it takes a few seconds but I believe that is the time it takes to complete a round trip request of the payload to S3 (was a little less than 100MB).
I believe data will still be in the channel, orphaned. This data will be lost on shutdown, however this already occurs if the 60s timeout expires. Thoughts?
There was a problem hiding this comment.
In theory the best option would be to stop accepting input, drain all channels, then await until disk flushes have completed, then proceed with rest of shutdown
There was a problem hiding this comment.
I'm considering dropping this commit as its not really related to the original reported issue anyway
There was a problem hiding this comment.
I'm considering dropping this commit
I agree with this. We should keep this fix as short as possible.
Hmm, unfortunately this integration failed. This fix raises some concerns which I believe are also captured by the session below. Codex SessionFor issue #24948, I’d aim for a narrower fix than PR 24949. The issue context is: on reload, a changed sink with a disk buffer gets its fanout input Paused, then Vector waits for that sink to shut down. Because the buffer sender is still alive, the sink never sees EOF, so it only flushes its partial batch when batch.timeout_secs expires. That’s why the pipeline stalls for one batch timeout. The PR fixes the stall by canceling the sink input stream, but that changes behavior to potentially drop queued events. A better fix is:During reload of changed sinks, stop using Pause for sinks we are about to wait on. So my concrete recommendation would be: Short-term safe fixDon’t reuse disk-buffer channels across reload for changed sinks. Longer-term better designAdd an explicit sink lifecycle signal like “flush_partial_batch_now” separate from “stop accepting input”. Why I prefer that over PR 24949: It addresses the actual stall described in #24948. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: eb5630a0fd
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| for (_, trigger) in self.detach_triggers { | ||
| trigger.into_inner().cancel(); |
There was a problem hiding this comment.
Preserve graceful draining during global shutdown
Canceling every sink detach_trigger at the top of RunningTopology::stop forces all sink input streams (take_until_if(tripwire)) to end before shutdown_coordinator.shutdown_all(...) starts upstream shutdown, so sinks can terminate while events are still buffered or in flight. That changes graceful shutdown semantics into a lossy path for any pipeline with pending events, because those events are never drained by the sink.
Useful? React with 👍 / 👎.
| if let Some(trigger) = self.detach_triggers.remove(key) { | ||
| trigger.into_inner().cancel(); |
There was a problem hiding this comment.
Let changed sinks drain instead of canceling their input
In RunningTopology::shutdown_diff, this new branch cancels the old sink trigger for wait_for_sinks before waiting for task completion, which makes the sink stop via take_until_if(tripwire) rather than EOF from drained inputs. For changed sinks whose buffers are not being reused, queued events in the old buffer are abandoned when the old component is torn down, so reloads under resource conflict can now drop data instead of draining naturally.
Useful? React with 👍 / 👎.
Summary
This PR includes two fixes when sinks configured with buffers are shutdown. First issue is observed when reloading a config that contains a sink with a disk buffer. Vector will wait until
batch.timeout_secshas completed which causes buffers to flush and unblocks the reload process. The fix for this is to send thecancel()signal to the sink, so it doesn't block on its buffer not being flushed downstream.The second fix is for an issue with the same root cause. I noticed the same hang on issue of control-c with the
aws_s3sink. I employed the same solution there in thestop()method - to call thecancel()signal early in thestop()method.Vector configuration
How did you test this PR?
By using the config above and a simple HTTP traffic generator. Modifying the sink buffer and re-saving the file, looking for errors from the source.
Change Type
Is this a breaking change?
Does this PR include user facing changes?
no-changeloglabel to this PR.References
DatadogandPrometheusand S3 sink hangs forever #17666Notes
@vectordotdev/vectorto reach out to us regarding this PR.pre-pushhook, please see this template.make fmtmake check-clippy(if there are failures it's possible some of them can be fixed withmake clippy-fix)make testgit merge origin masterandgit push.Cargo.lock), pleaserun
make build-licensesto regenerate the license inventory and commit the changes (if any). More details here.