From 827c54379e588a51129e233933f6e160188f3e57 Mon Sep 17 00:00:00 2001 From: "jinyong.choi" Date: Mon, 5 Jan 2026 13:22:55 +0900 Subject: [PATCH 1/5] ring_buffer: add used size getter function Add flb_ring_buffer_get_used() to retrieve the current used size of a ring buffer. This is needed to check pending data during shutdown sequence. Fixes #11338 Signed-off-by: jinyong.choi --- include/fluent-bit/flb_ring_buffer.h | 1 + src/flb_ring_buffer.c | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/include/fluent-bit/flb_ring_buffer.h b/include/fluent-bit/flb_ring_buffer.h index 9ee3a133ff9..f3c04f3ed99 100644 --- a/include/fluent-bit/flb_ring_buffer.h +++ b/include/fluent-bit/flb_ring_buffer.h @@ -40,5 +40,6 @@ int flb_ring_buffer_add_event_loop(struct flb_ring_buffer *rb, void *evl, uint8_ int flb_ring_buffer_write(struct flb_ring_buffer *rb, void *ptr, size_t size); int flb_ring_buffer_read(struct flb_ring_buffer *rb, void *ptr, size_t size); +size_t flb_ring_buffer_get_used(struct flb_ring_buffer *rb); #endif diff --git a/src/flb_ring_buffer.c b/src/flb_ring_buffer.c index 00195603f2c..3e0e3f57647 100644 --- a/src/flb_ring_buffer.c +++ b/src/flb_ring_buffer.c @@ -202,4 +202,8 @@ int flb_ring_buffer_read(struct flb_ring_buffer *rb, void *ptr, size_t size) return 0; } +size_t flb_ring_buffer_get_used(struct flb_ring_buffer *rb) +{ + return lwrb_get_full(rb->ctx); +} From e88c6259756c4f86cffe4b40f107f78679561554 Mon Sep 17 00:00:00 2001 From: "jinyong.choi" Date: Mon, 19 Jan 2026 10:25:43 +0900 Subject: [PATCH 2/5] config: add ensure_threaded_flush_on_shutdown option Add new configuration option 'thread.flush_on_shutdown' to control whether the engine waits for threaded input ring buffers to flush during graceful shutdown. - Add ensure_threaded_flush_on_shutdown field to flb_config struct - Add FLB_CONF_STR_THREAD_FLUSH_ON_SHUTDOWN configuration key - Initialize to FLB_FALSE by default (disabled) When enabled, the engine will wait for ring buffer data to be flushed before completing shutdown, ensuring no data loss from threaded inputs. Fixes #11338 Signed-off-by: jinyong.choi --- include/fluent-bit/flb_config.h | 2 ++ src/flb_config.c | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index d171ef08167..047382d867f 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -72,6 +72,7 @@ struct flb_config { int grace; int grace_count; /* Count of grace shutdown tries */ int grace_input; /* Shutdown grace to keep inputs ingesting */ + int ensure_threaded_flush_on_shutdown; /* Ensure threaded inputs flush */ flb_pipefd_t flush_fd; /* Timer FD associated to flush */ int convert_nan_to_null; /* Convert null to nan ? */ @@ -369,6 +370,7 @@ enum conf_type { #define FLB_CONF_STR_FLUSH "Flush" #define FLB_CONF_STR_GRACE "Grace" +#define FLB_CONF_STR_THREAD_FLUSH_ON_SHUTDOWN "thread.flush_on_shutdown" #define FLB_CONF_STR_DAEMON "Daemon" #define FLB_CONF_STR_LOGFILE "Log_File" #define FLB_CONF_STR_LOGLEVEL "Log_Level" diff --git a/src/flb_config.c b/src/flb_config.c index b2c33312b2b..8a07e8e5382 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -59,6 +59,10 @@ struct flb_service_config service_configs[] = { FLB_CONF_TYPE_INT, offsetof(struct flb_config, grace)}, + {FLB_CONF_STR_THREAD_FLUSH_ON_SHUTDOWN, + FLB_CONF_TYPE_BOOL, + offsetof(struct flb_config, ensure_threaded_flush_on_shutdown)}, + {FLB_CONF_STR_CONV_NAN, FLB_CONF_TYPE_BOOL, offsetof(struct flb_config, convert_nan_to_null)}, @@ -277,6 +281,7 @@ struct flb_config *flb_config_init() config->grace = 5; config->grace_count = 0; config->grace_input = config->grace / 2; + config->ensure_threaded_flush_on_shutdown = FLB_FALSE; config->exit_status_code = 0; /* json */ From 60e43d09cd6eedd447b68f0ab2bc269d4cafc4d7 Mon Sep 17 00:00:00 2001 From: "jinyong.choi" Date: Mon, 19 Jan 2026 11:03:37 +0900 Subject: [PATCH 3/5] input_thread: add pause acknowledgement flag Add volatile is_paused flag to track when a threaded input has completed its pause operation. This allows the engine to verify all threaded inputs are fully paused before proceeding with shutdown. Also initialize is_paused flag to FLB_FALSE in input_thread_instance_create() to ensure defined initial state. Fixes #11338 Signed-off-by: jinyong.choi --- include/fluent-bit/flb_input_thread.h | 8 ++++++++ src/flb_input_thread.c | 15 ++++++++++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/include/fluent-bit/flb_input_thread.h b/include/fluent-bit/flb_input_thread.h index 4abfc1511ea..d85baf363a8 100644 --- a/include/fluent-bit/flb_input_thread.h +++ b/include/fluent-bit/flb_input_thread.h @@ -26,6 +26,7 @@ #include #include #include +#include #define BUFFER_SIZE 65535 @@ -89,6 +90,13 @@ struct flb_input_thread_instance { int input_coro_id; struct mk_list input_coro_list; struct mk_list input_coro_list_destroy; + + /* + * Pause state flag for shutdown synchronization. + * Set to 1 when thread completes pause processing. + * Checked by main thread to ensure safe shutdown. + */ + volatile sig_atomic_t is_paused; }; int flb_input_thread_instance_init(struct flb_config *config, diff --git a/src/flb_input_thread.c b/src/flb_input_thread.c index 8604a407081..1e99c44e6dc 100644 --- a/src/flb_input_thread.c +++ b/src/flb_input_thread.c @@ -77,11 +77,21 @@ static inline int handle_input_event(flb_pipefd_t fd, struct flb_input_instance if (operation == FLB_INPUT_THREAD_PAUSE) { if (ins->p->cb_pause && ins->context) { ins->p->cb_pause(ins->context, ins->config); + + /* Mark thread as paused for shutdown synchronization */ + if (ins->is_threaded && ins->thi) { + ins->thi->is_paused = FLB_TRUE; + } } } else if (operation == FLB_INPUT_THREAD_RESUME) { - if (ins->p->cb_resume) { + if (ins->p->cb_resume && ins->context) { ins->p->cb_resume(ins->context, ins->config); + + /* Clear paused flag on resume */ + if (ins->is_threaded && ins->thi) { + ins->thi->is_paused = FLB_FALSE; + } } } else if (operation == FLB_INPUT_THREAD_EXIT) { @@ -240,6 +250,9 @@ static struct flb_input_thread_instance *input_thread_instance_create(struct flb thi->init_status = 0; pthread_mutex_init(&thi->init_mutex, NULL); + /* Initialize pause state flag (not paused initially) */ + thi->is_paused = FLB_FALSE; + /* init condition */ pthread_cond_init(&thi->init_condition, NULL); From a32ced89d9900fed3fd85b6e4aeb848da6b2f3bf Mon Sep 17 00:00:00 2001 From: "jinyong.choi" Date: Mon, 5 Jan 2026 13:23:32 +0900 Subject: [PATCH 4/5] input_chunk: bypass pause check during shutdown During shutdown (is_shutting_down=TRUE), bypass the pause check to allow flushing remaining ring buffer data. Also add helper function flb_input_chunk_total_ring_buffers_size() to calculate total pending data across all threaded inputs, and improve cleanup logging. Fixes #11338 Signed-off-by: jinyong.choi --- include/fluent-bit/flb_input_chunk.h | 1 + src/flb_input_chunk.c | 59 ++++++++++++++++++++++++---- 2 files changed, 53 insertions(+), 7 deletions(-) diff --git a/include/fluent-bit/flb_input_chunk.h b/include/fluent-bit/flb_input_chunk.h index 08caaf8539b..bbd0e087eed 100644 --- a/include/fluent-bit/flb_input_chunk.h +++ b/include/fluent-bit/flb_input_chunk.h @@ -162,5 +162,6 @@ int flb_input_chunk_down(struct flb_input_chunk *ic); int flb_input_chunk_is_up(struct flb_input_chunk *ic); void flb_input_chunk_update_output_instances(struct flb_input_chunk *ic, size_t chunk_size); +size_t flb_input_chunk_get_total_ring_buffer_size(const struct flb_config *config); #endif diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index 58d98779782..c3cc1f393a9 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -2413,6 +2413,7 @@ size_t flb_input_chunk_set_limits(struct flb_input_instance *in) if (flb_input_chunk_is_mem_overlimit(in) == FLB_FALSE && in->config->is_running == FLB_TRUE && in->config->is_ingestion_active == FLB_TRUE && + in->config->is_shutting_down == FLB_FALSE && in->mem_buf_status == FLB_INPUT_PAUSED) { in->mem_buf_status = FLB_INPUT_RUNNING; if (in->p->cb_resume) { @@ -2426,6 +2427,7 @@ size_t flb_input_chunk_set_limits(struct flb_input_instance *in) if (flb_input_chunk_is_storage_overlimit(in) == FLB_FALSE && in->config->is_running == FLB_TRUE && in->config->is_ingestion_active == FLB_TRUE && + in->config->is_shutting_down == FLB_FALSE && in->storage_buf_status == FLB_INPUT_PAUSED) { in->storage_buf_status = FLB_INPUT_RUNNING; if (in->p->cb_resume) { @@ -2667,11 +2669,20 @@ static int input_chunk_append_raw(struct flb_input_instance *in, } } - /* Check if the input plugin has been paused */ - if (flb_input_buf_paused(in) == FLB_TRUE) { - flb_debug("[input chunk] %s is paused, cannot append records", - flb_input_name(in)); - return -1; + /* + * Check if the input plugin has been paused. + * During shutdown with ensure_threaded_flush_on_shutdown enabled, only + * threaded inputs can bypass the pause check to drain their ring buffers. + * Non-threaded inputs should still honor pause during shutdown. + */ + if (in->config->is_shutting_down == FLB_FALSE || + in->config->ensure_threaded_flush_on_shutdown == FLB_FALSE || + flb_input_is_threaded(in) == FLB_FALSE) { + if (flb_input_buf_paused(in) == FLB_TRUE) { + flb_debug("[input chunk] %s is paused, cannot append records", + flb_input_name(in)); + return -1; + } } if (buf_size == 0) { @@ -3063,8 +3074,22 @@ void flb_input_chunk_ring_buffer_collector(struct flb_config *ctx, void *data) cr = NULL; while (1) { - if (flb_input_buf_paused(ins) == FLB_TRUE) { - break; + /* + * During normal operation we respect the pause state to maintain + * backpressure: if the input is paused we stop consuming from + * the ring buffer. + * + * During shutdown with ensure_threaded_flush_on_shutdown enabled, + * we skip this pause check so the ring buffer can be fully drained, + * even when backpressure would normally prevent further reads. + * This is critical to flush all enqueued records and avoid data + * loss during graceful shutdown. + */ + if (ctx->is_shutting_down == FLB_FALSE || + ctx->ensure_threaded_flush_on_shutdown == FLB_FALSE) { + if (flb_input_buf_paused(ins) == FLB_TRUE) { + break; + } } ret = flb_ring_buffer_read(ins->rb, @@ -3286,3 +3311,23 @@ void flb_input_chunk_update_output_instances(struct flb_input_chunk *ic, } } } + +/* + * Calculate total size of all ring buffers across all threaded input instances. + * Returns 0 if no data is pending in ring buffers. + */ +size_t flb_input_chunk_get_total_ring_buffer_size(const struct flb_config *config) +{ + size_t total_size = 0; + struct mk_list *head; + struct flb_input_instance *ins; + + mk_list_foreach(head, &config->inputs) { + ins = mk_list_entry(head, struct flb_input_instance, _head); + if (flb_input_is_threaded(ins) && ins->rb) { + total_size += flb_ring_buffer_get_used(ins->rb); + } + } + + return total_size; +} From 4f09373cbe3076f3178649c207293438cdda1b86 Mon Sep 17 00:00:00 2001 From: "jinyong.choi" Date: Mon, 19 Jan 2026 11:03:56 +0900 Subject: [PATCH 5/5] engine: wait for threaded inputs during shutdown During grace period, ensure all threaded inputs have acknowledged pause before final shutdown. Also drain pending ring buffer data to prevent data loss. This fixes a race condition where the engine could exit while threaded inputs still had buffered data. Ring buffer flush and pause waiting are only performed when thread_shutdown_flush configuration option is enabled. Fixes #11338 Signed-off-by: jinyong.choi --- src/flb_engine.c | 52 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/src/flb_engine.c b/src/flb_engine.c index 9d998b1b702..c10c7db0caa 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -33,9 +33,11 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -800,12 +802,42 @@ int sb_segregate_chunks(struct flb_config *config) } #endif +/* Check if all threaded inputs have completed pause */ +static int all_threaded_inputs_paused(struct flb_config *config) +{ + struct mk_list *head; + struct flb_input_instance *in; + + mk_list_foreach(head, &config->inputs) { + in = mk_list_entry(head, struct flb_input_instance, _head); + + if (in->is_threaded && in->thi) { + /* + * Skip inputs that cannot acknowledge pause: + * - No pause/resume callbacks defined + * - No context (plugin initialization failed) + */ + if (in->p->cb_pause == NULL || in->p->cb_resume == NULL || + in->context == NULL) { + continue; + } + + if (in->thi->is_paused == FLB_FALSE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + int flb_engine_start(struct flb_config *config) { int ret; int tasks = 0; int fs_chunks = 0; int mem_chunks = 0; + size_t rb_size = 0; uint64_t ts; char tmp[16]; int rb_flush_flag; @@ -1175,6 +1207,26 @@ int flb_engine_start(struct flb_config *config) } ret = tasks + mem_chunks + fs_chunks; + + /* + * Ring buffer flush and pause waiting are only performed + * when ensure_threaded_flush_on_shutdown option is enabled. + */ + if (config->ensure_threaded_flush_on_shutdown == FLB_TRUE) { + rb_size = flb_input_chunk_get_total_ring_buffer_size(config); + if (rb_size > 0) { + ret++; + flb_debug("[engine] ring buffer pending: %zu bytes", rb_size); + flb_input_chunk_ring_buffer_collector(config, NULL); + } + + /* Check thread pause only when all other work is done */ + if (ret == 0 && !all_threaded_inputs_paused(config)) { + ret++; + flb_debug("[engine] waiting for threaded inputs to complete pause"); + } + } + if (ret > 0 && (config->grace_count < config->grace || config->grace == -1)) { if (config->grace_count == 1) { /*