diff --git a/src/backend.h b/src/backend.h index e9a7f5b..2f9a181 100644 --- a/src/backend.h +++ b/src/backend.h @@ -12,7 +12,7 @@ struct brubeck_backend { int shard_n; int (*connect)(void *); - void (*sample)(const char *, value_t, void *); + void (*sample)(const struct brubeck_metric*, const char *, value_t, void *); void (*flush)(void *); uint32_t tick_time; diff --git a/src/backends/carbon.c b/src/backends/carbon.c index a01111d..a449303 100644 --- a/src/backends/carbon.c +++ b/src/backends/carbon.c @@ -2,6 +2,13 @@ #include #include "brubeck.h" +static const char carbon_empty_str[] = ""; +static const char carbon_global_prefix[] = "stats."; +static const char carbon_global_count_prefix[] = "stats_counts."; +static const char carbon_prefix_counter[] = "counters."; +static const char carbon_prefix_timer[] = "timers."; +static const char carbon_prefix_gauge[] = "gauges."; + static inline int is_connected(struct brubeck_carbon *self) { return (self->out_sock >= 0); @@ -43,15 +50,77 @@ static void carbon_disconnect(struct brubeck_carbon *self) self->out_sock = -1; } -static void plaintext_each( +static int carbon_namespace( + char *out_key, + const struct brubeck_metric *metric, const char *key, + const uint8_t key_len, + const struct brubeck_carbon *carbon, + uint8_t counter_abs) +{ + char *ptr = out_key; + + uint8_t prefix_len; + if (!carbon->legacy_namespace || + !(IS_COUNTER(metric->type) && counter_abs)) { + + prefix_len = strlen(carbon->global_prefix); + memcpy(ptr, carbon->global_prefix, prefix_len); + ptr += prefix_len; + } + else { + prefix_len = strlen(carbon->global_count_prefix); + memcpy(ptr, carbon->global_count_prefix, prefix_len); + ptr += prefix_len; + } + + uint8_t metric_prefix_len = 0; + switch (metric->type) { + case BRUBECK_MT_COUNTER: + case BRUBECK_MT_METER: + metric_prefix_len = strlen(carbon->prefix_counter); + memcpy(ptr, carbon->prefix_counter, metric_prefix_len); + break; + case BRUBECK_MT_TIMER: + metric_prefix_len = strlen(carbon->prefix_timer); + memcpy(ptr, carbon->prefix_timer, metric_prefix_len); + break; + case BRUBECK_MT_GAUGE: + metric_prefix_len = strlen(carbon->prefix_gauge); + memcpy(ptr, carbon->prefix_gauge, metric_prefix_len); + break; + default: + break; + } + + ptr += metric_prefix_len; + + memcpy(ptr, key, key_len); + ptr += key_len; + + if (IS_COUNTER(metric->type) && !carbon->legacy_namespace) { + if (counter_abs) { + memcpy(ptr, ".count", strlen(".count")); + ptr += strlen(".count"); + } + else { + memcpy(ptr, ".rate", strlen(".rate")); + ptr += strlen(".rate"); + } + } + + return ptr - out_key; +} + +static void plaintext_send( + const char *key, + uint8_t key_len, value_t value, void *backend) { struct brubeck_carbon *carbon = (struct brubeck_carbon *)backend; char buffer[1024]; char *ptr = buffer; - size_t key_len = strlen(key); ssize_t wr; if (!is_connected(carbon)) @@ -76,6 +145,35 @@ static void plaintext_each( carbon->sent += wr; } +static void plaintext_each( + const struct brubeck_metric *metric, + const char *key, + value_t value, + void *backend) +{ + struct brubeck_carbon *carbon = (struct brubeck_carbon *)backend; + size_t key_len = strlen(key); + + if (!carbon->namespacing || metric->type == BRUBECK_MT_INTERNAL_STATS) { + plaintext_send(key, key_len, value, backend); + return; + } + + char prefix_key[1024]; + uint8_t prefix_key_len = 0; + + prefix_key_len = carbon_namespace(prefix_key, metric, key, key_len, carbon, true); + plaintext_send(prefix_key, prefix_key_len, value, backend); + + if (IS_COUNTER(metric->type) && + carbon->backend.sample_freq != 0) { + prefix_key_len = carbon_namespace(prefix_key, metric, + key, key_len, carbon, false); + value_t normalized_val = value / carbon->backend.sample_freq; + plaintext_send(prefix_key, prefix_key_len, normalized_val, backend); + } +} + static inline size_t pickle1_int32(char *ptr, void *_src) { *ptr = 'J'; @@ -176,6 +274,7 @@ static void pickle1_flush(void *backend) } static void pickle1_each( + const struct brubeck_metric *metric, const char *key, value_t value, void *backend) @@ -191,8 +290,27 @@ static void pickle1_each( if (!is_connected(carbon)) return; - pickle1_push(&carbon->pickler, key, key_len, + if (!carbon->namespacing || metric->type == BRUBECK_MT_INTERNAL_STATS) { + pickle1_push(&carbon->pickler, key, key_len, + carbon->backend.tick_time, value); + return; + } + + char prefix_key[1024]; + uint8_t prefix_key_len = 0; + + prefix_key_len = carbon_namespace(prefix_key, metric, key, key_len, carbon, true); + pickle1_push(&carbon->pickler, prefix_key, prefix_key_len, carbon->backend.tick_time, value); + + if (IS_COUNTER(metric->type) && + carbon->backend.sample_freq != 0) { + prefix_key_len = carbon_namespace(prefix_key, metric, + key, key_len, carbon, false); + value_t normalized_val = value / carbon->backend.sample_freq; + pickle1_push(&carbon->pickler, prefix_key, prefix_key_len, + carbon->backend.tick_time, normalized_val); + } } struct brubeck_backend * @@ -200,14 +318,28 @@ brubeck_carbon_new(struct brubeck_server *server, json_t *settings, int shard_n) { struct brubeck_carbon *carbon = xcalloc(1, sizeof(struct brubeck_carbon)); char *address; - int port, frequency, pickle = 0; + const char *global_prefix = carbon_global_prefix; + const char *global_count_prefix = carbon_global_count_prefix; + const char *prefix_counter = carbon_prefix_counter; + const char *prefix_timer = carbon_prefix_timer; + const char *prefix_gauge = carbon_prefix_gauge; + int port, frequency, pickle, namespacing = 0; + int legacy_namespace = 1; json_unpack_or_die(settings, - "{s:s, s:i, s?:b, s:i}", + "{s:s, s:i, s?:b, s:i, s?:b, s?:b, s?:s, s?:s, s?:s, s?:s, s?:s}", "address", &address, "port", &port, "pickle", &pickle, - "frequency", &frequency); + "frequency", &frequency, + + "namespacing", &namespacing, + "legacy_namespace", &legacy_namespace, + "global_prefix", &global_prefix, + "global_count_prefix", &global_count_prefix, + "prefix_counter", &prefix_counter, + "prefix_timer", &prefix_timer, + "prefix_gauge", &prefix_gauge); carbon->backend.type = BRUBECK_BACKEND_CARBON; carbon->backend.shard_n = shard_n; @@ -224,6 +356,14 @@ brubeck_carbon_new(struct brubeck_server *server, json_t *settings, int shard_n) } carbon->backend.sample_freq = frequency; + carbon->namespacing = namespacing; + carbon->legacy_namespace = legacy_namespace; + carbon->global_prefix = global_prefix; + carbon->global_count_prefix = global_count_prefix; + carbon->prefix_counter = prefix_counter; + carbon->prefix_timer = prefix_timer; + carbon->prefix_gauge = prefix_gauge; + carbon->backend.server = server; carbon->out_sock = -1; url_to_inaddr2(&carbon->out_sockaddr, address, port); diff --git a/src/backends/carbon.h b/src/backends/carbon.h index a8f9546..39aaca6 100644 --- a/src/backends/carbon.h +++ b/src/backends/carbon.h @@ -16,9 +16,19 @@ struct brubeck_carbon { uint16_t pt; } pickler; size_t sent; + + int namespacing; + int legacy_namespace; + const char *global_prefix; + const char *global_count_prefix; + const char *prefix_counter; + const char *prefix_timer; + const char *prefix_gauge; }; struct brubeck_backend *brubeck_carbon_new( struct brubeck_server *server, json_t *settings, int shard_n); +#define IS_COUNTER(t) (t == BRUBECK_MT_COUNTER || t == BRUBECK_MT_METER) + #endif diff --git a/src/internal_sampler.c b/src/internal_sampler.c index 87c0945..1f797ed 100644 --- a/src/internal_sampler.c +++ b/src/internal_sampler.c @@ -13,43 +13,43 @@ brubeck_internal__sample(struct brubeck_metric *metric, brubeck_sample_cb sample WITH_SUFFIX(".metrics") { value = brubeck_atomic_swap(&stats->metrics, 0); - sample(key, value, opaque); + sample(metric, key, value, opaque); } WITH_SUFFIX(".errors") { value = brubeck_atomic_swap(&stats->errors, 0); - sample(key, value, opaque); + sample(metric, key, value, opaque); } WITH_SUFFIX(".unique_keys") { value = brubeck_atomic_fetch(&stats->unique_keys); - sample(key, value, opaque); + sample(metric, key, value, opaque); } WITH_SUFFIX(".memory") { value = brubeck_atomic_fetch(&stats->memory); - sample(key, value, opaque); + sample(metric, key, value, opaque); } /* Secure statsd endpoint */ WITH_SUFFIX(".secure.failed") { value = brubeck_atomic_swap(&stats->secure.failed, 0); - sample(key, value, opaque); + sample(metric, key, value, opaque); } WITH_SUFFIX(".secure.from_future") { value = brubeck_atomic_swap(&stats->secure.from_future, 0); - sample(key, value, opaque); + sample(metric, key, value, opaque); } WITH_SUFFIX(".secure.delayed") { value = brubeck_atomic_swap(&stats->secure.delayed, 0); - sample(key, value, opaque); + sample(metric, key, value, opaque); } WITH_SUFFIX(".secure.replayed") { value = brubeck_atomic_swap(&stats->secure.replayed, 0); - sample(key, value, opaque); + sample(metric, key, value, opaque); } /* diff --git a/src/metric.c b/src/metric.c index f34a239..feb5343 100644 --- a/src/metric.c +++ b/src/metric.c @@ -52,7 +52,7 @@ gauge__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opa } pthread_spin_unlock(&metric->lock); - sample(metric->key, value, opaque); + sample(metric, metric->key, value, opaque); } @@ -83,7 +83,7 @@ meter__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opa } pthread_spin_unlock(&metric->lock); - sample(metric->key, value, opaque); + sample(metric, metric->key, value, opaque); } @@ -122,7 +122,7 @@ counter__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *o } pthread_spin_unlock(&metric->lock); - sample(metric->key, value, opaque); + sample(metric, metric->key, value, opaque); } @@ -160,47 +160,47 @@ histogram__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void memcpy(key, metric->key, metric->key_len); WITH_SUFFIX(".min") { - sample(key, hsample.min, opaque); + sample(metric, key, hsample.min, opaque); } WITH_SUFFIX(".max") { - sample(key, hsample.max, opaque); + sample(metric, key, hsample.max, opaque); } WITH_SUFFIX(".sum") { - sample(key, hsample.sum, opaque); + sample(metric, key, hsample.sum, opaque); } WITH_SUFFIX(".mean") { - sample(key, hsample.mean, opaque); + sample(metric, key, hsample.mean, opaque); } WITH_SUFFIX(".count") { - sample(key, hsample.count, opaque); + sample(metric, key, hsample.count, opaque); } WITH_SUFFIX(".median") { - sample(key, hsample.median, opaque); + sample(metric, key, hsample.median, opaque); } WITH_SUFFIX(".percentile.75") { - sample(key, hsample.percentile[PC_75], opaque); + sample(metric, key, hsample.percentile[PC_75], opaque); } WITH_SUFFIX(".percentile.95") { - sample(key, hsample.percentile[PC_95], opaque); + sample(metric, key, hsample.percentile[PC_95], opaque); } WITH_SUFFIX(".percentile.98") { - sample(key, hsample.percentile[PC_98], opaque); + sample(metric, key, hsample.percentile[PC_98], opaque); } WITH_SUFFIX(".percentile.99") { - sample(key, hsample.percentile[PC_99], opaque); + sample(metric, key, hsample.percentile[PC_99], opaque); } WITH_SUFFIX(".percentile.999") { - sample(key, hsample.percentile[PC_999], opaque); + sample(metric, key, hsample.percentile[PC_999], opaque); } } diff --git a/src/metric.h b/src/metric.h index 1be7fed..aaf5fd3 100644 --- a/src/metric.h +++ b/src/metric.h @@ -47,6 +47,7 @@ struct brubeck_metric { }; typedef void (*brubeck_sample_cb)( + const struct brubeck_metric *metric, const char *key, value_t value, void *backend);