Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 15 additions & 33 deletions plugins/filter_log_to_metrics/log_to_metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ static void cb_send_metric_chunk(struct flb_config *config, void *data)
if (ctx->cmt == NULL || ctx->input_ins == NULL) {
return;
}

if (ctx->new_data) {
ret = flb_input_metrics_append(ctx->input_ins, ctx->tag,
strlen(ctx->tag), ctx->cmt);
Expand All @@ -486,6 +486,7 @@ static void cb_send_metric_chunk(struct flb_config *config, void *data)
static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
struct flb_config *config, void *data)
{
int i;
int ret;
struct log_to_metrics_ctx *ctx;
flb_sds_t tmp;
Expand All @@ -498,63 +499,56 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
struct flb_sched *sched;


int i;
/* Create context */
ctx = flb_calloc(1, sizeof(struct log_to_metrics_ctx));
if (!ctx) {
flb_errno();
return -1;
}
ctx->ins = f_ins;
mk_list_init(&ctx->rules);

/* Set the context */
flb_filter_set_context(f_ins, ctx);

if (flb_filter_config_map_set(f_ins, ctx) < 0) {
flb_errno();
flb_plg_error(f_ins, "configuration error");
flb_free(ctx);
return -1;
}
mk_list_init(&ctx->rules);

if (ctx->metric_name == NULL) {
flb_plg_error(f_ins, "metric_name is not set");
log_to_metrics_destroy(ctx);
return -1;
}

/* Load rules */
ret = set_rules(ctx, f_ins);
if (ret == -1) {
flb_free(ctx);
return -1;
}

/* Set the context */
flb_filter_set_context(f_ins, ctx);

/* Set buckets for histogram */
ctx->buckets = NULL;
ctx->bucket_counter = 0;
ctx->histogram_buckets = NULL;

if (set_buckets(ctx, f_ins) < 0) {
flb_plg_error(f_ins, "Setting buckets failed");
log_to_metrics_destroy(ctx);
return -1;
}

ctx->label_accessors = NULL;
ctx->label_accessors = (char **) flb_calloc(1, MAX_LABEL_COUNT * sizeof(char *));
if (!ctx->label_accessors) {
flb_errno();
log_to_metrics_destroy(ctx);
return -1;
}

for (i = 0; i < MAX_LABEL_COUNT; i++) {
ctx->label_accessors[i] = flb_calloc(1, MAX_LABEL_LENGTH * sizeof(char));
if (!ctx->label_accessors[i]) {
flb_errno();
log_to_metrics_destroy(ctx);
return -1;
}
}
Expand All @@ -565,22 +559,19 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
ctx->label_keys[i] = flb_calloc(1, MAX_LABEL_LENGTH * sizeof(char));
if (!ctx->label_keys[i]) {
flb_errno();
log_to_metrics_destroy(ctx);
return -1;
}
}

ret = set_labels(ctx, ctx->label_accessors, ctx->label_keys, f_ins);
if (ret < 0){
log_to_metrics_destroy(ctx);
return -1;
}
ctx->label_counter = ret;

/* Check metric tag */
if (ctx->tag == NULL || strlen(ctx->tag) == 0) {
flb_plg_error(f_ins, "Metric tag is not set");
log_to_metrics_destroy(ctx);
return -1;
}

Expand All @@ -604,13 +595,11 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
"invalid 'mode' value. Only "
"'counter', 'gauge' or "
"'histogram' types are allowed");
log_to_metrics_destroy(ctx);
return -1;
}
}
else {
flb_plg_error(f_ins, "configuration property not set");
log_to_metrics_destroy(ctx);
return -1;
}

Expand All @@ -637,7 +626,6 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
if (ctx->metric_description == NULL ||
strlen(ctx->metric_description) == 0) {
flb_plg_error(f_ins, "metric_description is not set");
log_to_metrics_destroy(ctx);
return -1;
}
snprintf(metric_description, sizeof(metric_description) - 1, "%s",
Expand All @@ -647,7 +635,6 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
if (ctx->mode > 0) {
if (ctx->value_field == NULL || strlen(ctx->value_field) == 0) {
flb_plg_error(f_ins, "value_field is not set");
log_to_metrics_destroy(ctx);
return -1;
}
snprintf(value_field, sizeof(value_field) - 1, "%s",
Expand All @@ -656,7 +643,6 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
ctx->value_ra = flb_ra_create(ctx->value_field, FLB_TRUE);
if (ctx->value_ra == NULL) {
flb_plg_error(f_ins, "invalid record accessor key for value_field");
log_to_metrics_destroy(ctx);
return -1;
}
}
Expand All @@ -671,7 +657,7 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
"0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0");
ctx->histogram_buckets = cmt_histogram_buckets_default_create();
}
else{
else {
ctx->histogram_buckets = cmt_histogram_buckets_create_size(ctx->buckets, ctx->bucket_counter);
}
}
Expand Down Expand Up @@ -701,7 +687,6 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
break;
default:
flb_plg_error(f_ins, "unsupported mode");
log_to_metrics_destroy(ctx);
return -1;
}

Expand All @@ -721,14 +706,12 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
flb_plg_error(f_ins, "emitter_name '%s' already exists",
ctx->emitter_name);
flb_sds_destroy(ctx->emitter_name);
log_to_metrics_destroy(ctx);
return -1;
}
input_ins = flb_input_new(config, "emitter", NULL, FLB_FALSE);
if (!input_ins) {
flb_plg_error(f_ins, "cannot create metrics emitter instance");
flb_sds_destroy(ctx->emitter_name);
log_to_metrics_destroy(ctx);
return -1;
}
/* Set the alias for emitter */
Expand All @@ -737,7 +720,6 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
flb_plg_warn(ctx->ins,
"cannot set emitter_name");
flb_sds_destroy(ctx->emitter_name);
log_to_metrics_destroy(ctx);
return -1;
}

Expand All @@ -747,7 +729,6 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
ret = flb_input_set_property(input_ins, "storage.type", "memory");
if (ret == -1) {
flb_plg_error(f_ins, "cannot set storage type for emitter instance");
log_to_metrics_destroy(ctx);
return -1;
}

Expand All @@ -761,14 +742,12 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
if (ret == -1) {
flb_errno();
flb_plg_error(f_ins, "cannot initialize metrics emitter instance.");
log_to_metrics_destroy(ctx);
return -1;
Comment on lines 742 to 745

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Clean up emitter/context on init failure

When flb_input_instance_init (or later storage init) fails, cb_log_to_metrics_init now returns -1 without tearing down the partially created emitter instance or freeing the filter context. flb_filter_init_all only calls flb_filter_instance_destroy on init failure and never runs cb_exit, so those allocations (including the emitter input instance) leak for configs that hit this error path (e.g., invalid emitter setup or storage init failure). Consider restoring log_to_metrics_destroy(ctx) (or equivalent cleanup) on these failure returns.

Useful? React with 👍 / 👎.

}

ret = flb_storage_input_create(config->cio, input_ins);
if (ret == -1) {
flb_plg_error(ctx->ins, "cannot initialize storage for metrics stream");
log_to_metrics_destroy(ctx);
return -1;
}
ctx->input_ins = input_ins;
Expand All @@ -786,27 +765,25 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
ctx->timer_mode = FLB_FALSE;
return 0;
}

/* Initialize timer for scheduled metric updates */
sched = flb_sched_ctx_get();
if(sched == 0) {
flb_plg_error(f_ins, "could not get scheduler context");
log_to_metrics_destroy(ctx);
return -1;
}
/* Convert flush_interval_sec and flush_interval_nsec to milliseconds */
ctx->timer_interval = (ctx->flush_interval_sec * 1000) +
ctx->timer_interval = (ctx->flush_interval_sec * 1000) +
(ctx->flush_interval_nsec / 1000000);
flb_plg_debug(ctx->ins,
"Creating metric timer with frequency %d ms",
ctx->timer_interval);

ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
ctx->timer_interval, cb_send_metric_chunk,
ctx, &ctx->timer);
if (ret < 0) {
flb_plg_error(f_ins, "could not create timer callback");
log_to_metrics_destroy(ctx);
return -1;
}
ctx->timer_mode = FLB_TRUE;
Expand Down Expand Up @@ -1028,6 +1005,11 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes,
static int cb_log_to_metrics_exit(void *data, struct flb_config *config)
{
struct log_to_metrics_ctx *ctx = data;

if (!ctx) {
return 0;
}

if(ctx->timer != NULL) {
flb_plg_debug(ctx->ins, "Destroying callback timer");
flb_sched_timer_destroy(ctx->timer);
Expand Down
3 changes: 3 additions & 0 deletions src/flb_filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,9 @@ int flb_filter_set_property(struct flb_filter_instance *ins,
struct flb_kv *kv;

len = strlen(k);
if (!v) {
return -1;
}
tmp = flb_env_var_translate(ins->config->env, v);
if (!tmp) {
return -1;
Expand Down
33 changes: 31 additions & 2 deletions src/flb_processor.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <fluent-bit/flb_processor_plugin.h>
#include <fluent-bit/flb_filter.h>
#include <fluent-bit/flb_kv.h>
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_mp_chunk.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_log_event_encoder.h>
Expand Down Expand Up @@ -579,9 +580,11 @@ static int flb_processor_unit_set_condition(struct flb_processor_unit *pu, struc

int flb_processor_unit_set_property(struct flb_processor_unit *pu, const char *k, struct cfl_variant *v)
{
struct cfl_variant *val;
int i;
int ret;
char buf[64];
flb_sds_t str_val;
struct cfl_variant *val;

/* Handle the "condition" property for processor units */
if (strcasecmp(k, "condition") == 0) {
Expand All @@ -596,7 +599,33 @@ int flb_processor_unit_set_property(struct flb_processor_unit *pu, const char *k
else if (v->type == CFL_VARIANT_ARRAY) {
for (i = 0; i < v->data.as_array->entry_count; i++) {
val = v->data.as_array->entries[i];
ret = flb_filter_set_property(pu->ctx, k, val->data.as_string);

if (val->type == CFL_VARIANT_STRING) {
ret = flb_filter_set_property(pu->ctx, k, val->data.as_string);
}
else if (val->type == CFL_VARIANT_INT) {
snprintf(buf, sizeof(buf), "%" PRId64, val->data.as_int64);
str_val = flb_sds_create(buf);
ret = (str_val != NULL) ? flb_filter_set_property(pu->ctx, k, str_val) : -1;
flb_sds_destroy(str_val);
}
else if (val->type == CFL_VARIANT_UINT) {
snprintf(buf, sizeof(buf), "%" PRIu64, val->data.as_uint64);
str_val = flb_sds_create(buf);
ret = (str_val != NULL) ? flb_filter_set_property(pu->ctx, k, str_val) : -1;
flb_sds_destroy(str_val);
}
else if (val->type == CFL_VARIANT_DOUBLE) {
snprintf(buf, sizeof(buf), "%g", val->data.as_double);
str_val = flb_sds_create(buf);
ret = (str_val != NULL) ? flb_filter_set_property(pu->ctx, k, str_val) : -1;
flb_sds_destroy(str_val);
}
else {
flb_error("[processor] property '%s': array element type %d not supported for filter",
k, val->type);
return -1;
}

if (ret == -1) {
return ret;
Expand Down