diff --git a/plugins/filter_log_to_metrics/log_to_metrics.c b/plugins/filter_log_to_metrics/log_to_metrics.c index df887bfeb2d..785164d399a 100644 --- a/plugins/filter_log_to_metrics/log_to_metrics.c +++ b/plugins/filter_log_to_metrics/log_to_metrics.c @@ -464,7 +464,7 @@ static void cb_send_metric_chunk(struct flb_config *config, void *data) if (ctx->cmt == NULL || ctx->input_ins == NULL) { return; } - + if (ctx->new_data) { ret = flb_input_metrics_append(ctx->input_ins, ctx->tag, strlen(ctx->tag), ctx->cmt); @@ -486,6 +486,7 @@ static void cb_send_metric_chunk(struct flb_config *config, void *data) static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, struct flb_config *config, void *data) { + int i; int ret; struct log_to_metrics_ctx *ctx; flb_sds_t tmp; @@ -498,7 +499,6 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, struct flb_sched *sched; - int i; /* Create context */ ctx = flb_calloc(1, sizeof(struct log_to_metrics_ctx)); if (!ctx) { @@ -506,31 +506,28 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, return -1; } ctx->ins = f_ins; + mk_list_init(&ctx->rules); + + /* Set the context */ + flb_filter_set_context(f_ins, ctx); if (flb_filter_config_map_set(f_ins, ctx) < 0) { flb_errno(); flb_plg_error(f_ins, "configuration error"); - flb_free(ctx); return -1; } - mk_list_init(&ctx->rules); if (ctx->metric_name == NULL) { flb_plg_error(f_ins, "metric_name is not set"); - log_to_metrics_destroy(ctx); return -1; } /* Load rules */ ret = set_rules(ctx, f_ins); if (ret == -1) { - flb_free(ctx); return -1; } - /* Set the context */ - flb_filter_set_context(f_ins, ctx); - /* Set buckets for histogram */ ctx->buckets = NULL; ctx->bucket_counter = 0; @@ -538,7 +535,6 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, if (set_buckets(ctx, f_ins) < 0) { flb_plg_error(f_ins, "Setting buckets failed"); - log_to_metrics_destroy(ctx); return -1; } @@ -546,7 +542,6 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, ctx->label_accessors = (char **) flb_calloc(1, MAX_LABEL_COUNT * sizeof(char *)); if (!ctx->label_accessors) { flb_errno(); - log_to_metrics_destroy(ctx); return -1; } @@ -554,7 +549,6 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, ctx->label_accessors[i] = flb_calloc(1, MAX_LABEL_LENGTH * sizeof(char)); if (!ctx->label_accessors[i]) { flb_errno(); - log_to_metrics_destroy(ctx); return -1; } } @@ -565,14 +559,12 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, ctx->label_keys[i] = flb_calloc(1, MAX_LABEL_LENGTH * sizeof(char)); if (!ctx->label_keys[i]) { flb_errno(); - log_to_metrics_destroy(ctx); return -1; } } ret = set_labels(ctx, ctx->label_accessors, ctx->label_keys, f_ins); if (ret < 0){ - log_to_metrics_destroy(ctx); return -1; } ctx->label_counter = ret; @@ -580,7 +572,6 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, /* Check metric tag */ if (ctx->tag == NULL || strlen(ctx->tag) == 0) { flb_plg_error(f_ins, "Metric tag is not set"); - log_to_metrics_destroy(ctx); return -1; } @@ -604,13 +595,11 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, "invalid 'mode' value. Only " "'counter', 'gauge' or " "'histogram' types are allowed"); - log_to_metrics_destroy(ctx); return -1; } } else { flb_plg_error(f_ins, "configuration property not set"); - log_to_metrics_destroy(ctx); return -1; } @@ -637,7 +626,6 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, if (ctx->metric_description == NULL || strlen(ctx->metric_description) == 0) { flb_plg_error(f_ins, "metric_description is not set"); - log_to_metrics_destroy(ctx); return -1; } snprintf(metric_description, sizeof(metric_description) - 1, "%s", @@ -647,7 +635,6 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, if (ctx->mode > 0) { if (ctx->value_field == NULL || strlen(ctx->value_field) == 0) { flb_plg_error(f_ins, "value_field is not set"); - log_to_metrics_destroy(ctx); return -1; } snprintf(value_field, sizeof(value_field) - 1, "%s", @@ -656,7 +643,6 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, ctx->value_ra = flb_ra_create(ctx->value_field, FLB_TRUE); if (ctx->value_ra == NULL) { flb_plg_error(f_ins, "invalid record accessor key for value_field"); - log_to_metrics_destroy(ctx); return -1; } } @@ -671,7 +657,7 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, "0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0"); ctx->histogram_buckets = cmt_histogram_buckets_default_create(); } - else{ + else { ctx->histogram_buckets = cmt_histogram_buckets_create_size(ctx->buckets, ctx->bucket_counter); } } @@ -701,7 +687,6 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, break; default: flb_plg_error(f_ins, "unsupported mode"); - log_to_metrics_destroy(ctx); return -1; } @@ -721,14 +706,12 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, flb_plg_error(f_ins, "emitter_name '%s' already exists", ctx->emitter_name); flb_sds_destroy(ctx->emitter_name); - log_to_metrics_destroy(ctx); return -1; } input_ins = flb_input_new(config, "emitter", NULL, FLB_FALSE); if (!input_ins) { flb_plg_error(f_ins, "cannot create metrics emitter instance"); flb_sds_destroy(ctx->emitter_name); - log_to_metrics_destroy(ctx); return -1; } /* Set the alias for emitter */ @@ -737,7 +720,6 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, flb_plg_warn(ctx->ins, "cannot set emitter_name"); flb_sds_destroy(ctx->emitter_name); - log_to_metrics_destroy(ctx); return -1; } @@ -747,7 +729,6 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, ret = flb_input_set_property(input_ins, "storage.type", "memory"); if (ret == -1) { flb_plg_error(f_ins, "cannot set storage type for emitter instance"); - log_to_metrics_destroy(ctx); return -1; } @@ -761,14 +742,12 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, if (ret == -1) { flb_errno(); flb_plg_error(f_ins, "cannot initialize metrics emitter instance."); - log_to_metrics_destroy(ctx); return -1; } ret = flb_storage_input_create(config->cio, input_ins); if (ret == -1) { flb_plg_error(ctx->ins, "cannot initialize storage for metrics stream"); - log_to_metrics_destroy(ctx); return -1; } ctx->input_ins = input_ins; @@ -786,27 +765,25 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins, ctx->timer_mode = FLB_FALSE; return 0; } - + /* Initialize timer for scheduled metric updates */ sched = flb_sched_ctx_get(); if(sched == 0) { flb_plg_error(f_ins, "could not get scheduler context"); - log_to_metrics_destroy(ctx); return -1; } /* Convert flush_interval_sec and flush_interval_nsec to milliseconds */ - ctx->timer_interval = (ctx->flush_interval_sec * 1000) + + ctx->timer_interval = (ctx->flush_interval_sec * 1000) + (ctx->flush_interval_nsec / 1000000); flb_plg_debug(ctx->ins, "Creating metric timer with frequency %d ms", ctx->timer_interval); - + ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM, ctx->timer_interval, cb_send_metric_chunk, ctx, &ctx->timer); if (ret < 0) { flb_plg_error(f_ins, "could not create timer callback"); - log_to_metrics_destroy(ctx); return -1; } ctx->timer_mode = FLB_TRUE; @@ -1028,6 +1005,11 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes, static int cb_log_to_metrics_exit(void *data, struct flb_config *config) { struct log_to_metrics_ctx *ctx = data; + + if (!ctx) { + return 0; + } + if(ctx->timer != NULL) { flb_plg_debug(ctx->ins, "Destroying callback timer"); flb_sched_timer_destroy(ctx->timer); diff --git a/src/flb_filter.c b/src/flb_filter.c index e9298dcfc82..b4bae5b41f8 100644 --- a/src/flb_filter.c +++ b/src/flb_filter.c @@ -331,6 +331,9 @@ int flb_filter_set_property(struct flb_filter_instance *ins, struct flb_kv *kv; len = strlen(k); + if (!v) { + return -1; + } tmp = flb_env_var_translate(ins->config->env, v); if (!tmp) { return -1; diff --git a/src/flb_processor.c b/src/flb_processor.c index d3522fd9a6d..4cd199140a9 100644 --- a/src/flb_processor.c +++ b/src/flb_processor.c @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -579,9 +580,11 @@ static int flb_processor_unit_set_condition(struct flb_processor_unit *pu, struc int flb_processor_unit_set_property(struct flb_processor_unit *pu, const char *k, struct cfl_variant *v) { - struct cfl_variant *val; int i; int ret; + char buf[64]; + flb_sds_t str_val; + struct cfl_variant *val; /* Handle the "condition" property for processor units */ if (strcasecmp(k, "condition") == 0) { @@ -596,7 +599,33 @@ int flb_processor_unit_set_property(struct flb_processor_unit *pu, const char *k else if (v->type == CFL_VARIANT_ARRAY) { for (i = 0; i < v->data.as_array->entry_count; i++) { val = v->data.as_array->entries[i]; - ret = flb_filter_set_property(pu->ctx, k, val->data.as_string); + + if (val->type == CFL_VARIANT_STRING) { + ret = flb_filter_set_property(pu->ctx, k, val->data.as_string); + } + else if (val->type == CFL_VARIANT_INT) { + snprintf(buf, sizeof(buf), "%" PRId64, val->data.as_int64); + str_val = flb_sds_create(buf); + ret = (str_val != NULL) ? flb_filter_set_property(pu->ctx, k, str_val) : -1; + flb_sds_destroy(str_val); + } + else if (val->type == CFL_VARIANT_UINT) { + snprintf(buf, sizeof(buf), "%" PRIu64, val->data.as_uint64); + str_val = flb_sds_create(buf); + ret = (str_val != NULL) ? flb_filter_set_property(pu->ctx, k, str_val) : -1; + flb_sds_destroy(str_val); + } + else if (val->type == CFL_VARIANT_DOUBLE) { + snprintf(buf, sizeof(buf), "%g", val->data.as_double); + str_val = flb_sds_create(buf); + ret = (str_val != NULL) ? flb_filter_set_property(pu->ctx, k, str_val) : -1; + flb_sds_destroy(str_val); + } + else { + flb_error("[processor] property '%s': array element type %d not supported for filter", + k, val->type); + return -1; + } if (ret == -1) { return ret;