From 285c30052319adc4014dc588465e5d6253db1380 Mon Sep 17 00:00:00 2001 From: Castor Sky Date: Mon, 19 Jan 2026 20:20:06 +0300 Subject: [PATCH 1/2] out_es: fix error treatment on response parsing Previously, any response containing 'errors=true' was considered successful if there were at least one message with 200/201 status, regarding any error statuses. In addition, all status codes other than 409 (including range [200;299]) caused the FLB_ES_STATUS_ERROR bit to be set in the 'check' flag. This behavior caused some batches to skip retrying when batch contained errors but had one successful message status. Now the logic of error treatment considers only statuses from range [200;299] as successful and only statuses from range [400;409)&(409;599] as faulty. Afterward, the message batch is considered successful if there were only 2xx statuses or 409 (version conflict), and scheduled for retry if there were any errors ([400;409)&(409;599], or failed response parsing). Signed-off-by: Castor Sky --- plugins/out_es/es.c | 16 +++++++++------- plugins/out_es/es.h | 4 ++-- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index 459da80cbe5..b8eea22d2a6 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -679,7 +679,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, if (ret == -1) { /* Is this an incomplete HTTP Request ? */ if (c->resp.payload_size <= 0) { - check |= FLB_ES_STATUS_IMCOMPLETE; + check |= FLB_ES_STATUS_INCOMPLETE; return check; } @@ -718,7 +718,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, if (key.type != MSGPACK_OBJECT_STR) { flb_plg_error(ctx->ins, "unexpected key type=%i", key.type); - check |= FLB_ES_STATUS_INVAILD_ARGUMENT; + check |= FLB_ES_STATUS_INVALID_ARGUMENT; goto done; } @@ -759,7 +759,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, if (item.via.map.size != 1) { flb_plg_error(ctx->ins, "unexpected 'item' size=%i", item.via.map.size); - check |= FLB_ES_STATUS_INVAILD_ARGUMENT; + check |= FLB_ES_STATUS_INVALID_ARGUMENT; goto done; } @@ -790,11 +790,11 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, goto done; } /* Check for success responses */ - if (item_val.via.i64 == 200 || item_val.via.i64 == 201) { + if ((item_val.via.i64 >= 200 && item_val.via.i64 < 300) || item_val.via.i64 == 409) { check |= FLB_ES_STATUS_SUCCESS; } /* Check for errors other than version conflict (document already exists) */ - if (item_val.via.i64 != 409) { + if (item_val.via.i64 >= 400 && item_val.via.i64 != 409) { check |= FLB_ES_STATUS_ERROR; } } @@ -955,12 +955,14 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk, * and lookup the 'error' field. */ ret = elasticsearch_error_check(ctx, c); - if (ret & FLB_ES_STATUS_SUCCESS) { + if (ret == FLB_ES_STATUS_SUCCESS) { + /* Only the SUCCESS flag was set => the batch was completely accepted by ElasticSearch. */ flb_plg_debug(ctx->ins, "Elasticsearch response\n%s", c->resp.payload); } else { - /* we got an error */ + /* Some errors were discovered while parsing the response. + * Any error that may coexist with the SUCCESS flag should cause a retry. */ if (ctx->trace_error) { /* * If trace_error is set, trace the actual diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index 60575379d48..979d3779fe2 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -37,10 +37,10 @@ #define FLB_ES_WRITE_OP_UPSERT "upsert" #define FLB_ES_STATUS_SUCCESS (1 << 0) -#define FLB_ES_STATUS_IMCOMPLETE (1 << 1) +#define FLB_ES_STATUS_INCOMPLETE (1 << 1) #define FLB_ES_STATUS_ERROR_UNPACK (1 << 2) #define FLB_ES_STATUS_BAD_TYPE (1 << 3) -#define FLB_ES_STATUS_INVAILD_ARGUMENT (1 << 4) +#define FLB_ES_STATUS_INVALID_ARGUMENT (1 << 4) #define FLB_ES_STATUS_BAD_RESPONSE (1 << 5) #define FLB_ES_STATUS_DUPLICATES (1 << 6) #define FLB_ES_STATUS_ERROR (1 << 7) From 60d61802cafc42cfef01b99b6eac5b1c0e3e99df Mon Sep 17 00:00:00 2001 From: Castor Sky Date: Tue, 20 Jan 2026 13:09:51 +0300 Subject: [PATCH 2/2] out_es: fix possible memory leak Used cleanup procedure for 'out_buf' and 'result' for the case when errors of JSON parsing occurred. Signed-off-by: Castor Sky --- plugins/out_es/es.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index b8eea22d2a6..f1b50d42ad3 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -702,7 +702,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx, flb_plg_error(ctx->ins, "Cannot unpack response to find error\n%s", c->resp.payload); check |= FLB_ES_STATUS_ERROR_UNPACK; - return check; + goto done; } root = result.data;