Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/fluent-bit/flb_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ struct flb_input_instance {
struct flb_ring_buffer *rb;
size_t ring_buffer_size; /* ring buffer size */
uint8_t ring_buffer_window; /* ring buffer window percentage */
int ring_buffer_retry_limit; /* ring buffer write retry limit */

/* List of upstreams */
struct mk_list upstreams;
Expand Down
24 changes: 23 additions & 1 deletion src/flb_input.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,17 @@ pthread_key_t libco_in_param_key;
* ring buffer will emit a flush request whenever the window threshold is reached.
* The window percentage can be tuned per input instance using the
* 'thread.ring_buffer.window' property.
*
* Ring buffer retry limit: when the ring buffer is full, the input thread will
* retry writing to the buffer up to 'retry_limit' times (with 100ms sleep between
* retries) before dropping the data. The default is 10 retries (1 second total).
* This can be tuned per input instance using 'thread.ring_buffer.retry_limit'.
*/

#define FLB_INPUT_RING_BUFFER_CAPACITY 1024
#define FLB_INPUT_RING_BUFFER_SIZE (sizeof(void *) * FLB_INPUT_RING_BUFFER_CAPACITY)
#define FLB_INPUT_RING_BUFFER_WINDOW (5)
#define FLB_INPUT_RING_BUFFER_RETRY_LIMIT (10)

/* config map to register options available for all input plugins */
struct flb_config_map input_global_properties[] = {
Expand Down Expand Up @@ -138,6 +144,12 @@ struct flb_config_map input_global_properties[] = {
0, FLB_FALSE, 0,
"Set custom ring buffer window percentage for threaded inputs"
},
{
FLB_CONFIG_MAP_INT, "thread.ring_buffer.retry_limit",
STR(FLB_INPUT_RING_BUFFER_RETRY_LIMIT),
0, FLB_FALSE, 0,
"Set maximum retry attempts when ring buffer is full before dropping data"
},

{0}
};
Expand Down Expand Up @@ -420,9 +432,10 @@ struct flb_input_instance *flb_input_new(struct flb_config *config,

}

/* set default ring buffer size and window */
/* set default ring buffer size, window, and retry limit */
instance->ring_buffer_size = FLB_INPUT_RING_BUFFER_SIZE;
instance->ring_buffer_window = FLB_INPUT_RING_BUFFER_WINDOW;
instance->ring_buffer_retry_limit = FLB_INPUT_RING_BUFFER_RETRY_LIMIT;

/* allocate a ring buffer */
instance->rb = flb_ring_buffer_create(instance->ring_buffer_size);
Expand Down Expand Up @@ -759,6 +772,15 @@ int flb_input_set_property(struct flb_input_instance *ins,
}
ins->ring_buffer_window = (uint8_t) ret;
}
else if (prop_key_check("thread.ring_buffer.retry_limit", k, len) == 0 && tmp) {
ret = atoi(tmp);
flb_sds_destroy(tmp);
if (ret <= 0) {
flb_error("[input] thread.ring_buffer.retry_limit must be greater than 0");
return -1;
}
ins->ring_buffer_retry_limit = ret;
}
else if (prop_key_check("storage.pause_on_chunks_overlimit", k, len) == 0 && tmp) {
ret = flb_utils_bool(tmp);
flb_sds_destroy(tmp);
Expand Down
5 changes: 2 additions & 3 deletions src/flb_input_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -2945,7 +2945,6 @@ static int append_to_ring_buffer(struct flb_input_instance *ins,
{
int ret;
int retries = 0;
int retry_limit = 10;
struct input_chunk_raw *cr;

if (buf_size == 0) {
Expand Down Expand Up @@ -2995,9 +2994,9 @@ static int append_to_ring_buffer(struct flb_input_instance *ins,
/*
* There is a little chance that the ring buffer is full or due to saturation
* from the main thread the data is not being consumed. On this scenario we
* retry up to 'retry_limit' times with a little wait time.
* retry up to 'ring_buffer_retry_limit' times with a little wait time.
*/
if (retries >= retry_limit) {
if (retries >= ins->ring_buffer_retry_limit) {
flb_plg_error(ins, "could not enqueue records into the ring buffer");
destroy_chunk_raw(cr);

Expand Down
Loading