From d2b9aab95daec2b6c52440cd71c602670db69951 Mon Sep 17 00:00:00 2001 From: Andy Lindeman Date: Fri, 18 Mar 2016 10:56:03 -0400 Subject: [PATCH] Implements Carbon namespacing --- config.default.json.example | 10 ++- src/backend.h | 2 +- src/backends/carbon.c | 119 +++++++++++++++++++++++++++++++++--- src/backends/carbon.h | 16 +++++ src/internal_sampler.c | 14 ++--- src/metric.c | 28 ++++----- src/metric.h | 1 + 7 files changed, 158 insertions(+), 32 deletions(-) diff --git a/config.default.json.example b/config.default.json.example index 594195f..45f889e 100644 --- a/config.default.json.example +++ b/config.default.json.example @@ -10,8 +10,14 @@ { "type" : "carbon", "address" : "localhost", - "port" : 2003, - "frequency" : 10 + "port" : 2004, + "frequency" : 10, + "pickle" : true, + "global_prefix" : "stats", + "prefix_counter" : "counters", + "prefix_timer" : "timers", + "prefix_histo" : "histos", + "prefix_gauge" : "gauges" } ], diff --git a/src/backend.h b/src/backend.h index d18d804..b13a01f 100644 --- a/src/backend.h +++ b/src/backend.h @@ -13,7 +13,7 @@ struct brubeck_backend { int (*connect)(void *); bool (*is_connected)(void *); - void (*sample)(const char *, value_t, void *); + void (*sample)(uint8_t, const char *, value_t, void *); void (*flush)(void *); uint32_t tick_time; diff --git a/src/backends/carbon.c b/src/backends/carbon.c index e1071fb..035f858 100644 --- a/src/backends/carbon.c +++ b/src/backends/carbon.c @@ -45,6 +45,7 @@ static void carbon_disconnect(struct brubeck_carbon *self) } static void plaintext_each( + uint8_t type, const char *key, value_t value, void *backend) @@ -58,6 +59,30 @@ static void plaintext_each( if (!carbon_is_connected(carbon)) return; + if (carbon->namespacing.global) { + memcpy(ptr, carbon->namespacing.global, carbon->namespacing.global_len); + ptr += carbon->namespacing.global_len; + *ptr++ = '.'; + } + + if ((type == BRUBECK_MT_COUNTER || type == BRUBECK_MT_METER) && carbon->namespacing.counter) { + memcpy(ptr, carbon->namespacing.counter, carbon->namespacing.counter_len); + ptr += carbon->namespacing.counter_len; + *ptr++ = '.'; + } else if (type == BRUBECK_MT_TIMER && carbon->namespacing.timer) { + memcpy(ptr, carbon->namespacing.timer, carbon->namespacing.timer_len); + ptr += carbon->namespacing.timer_len; + *ptr++ = '.'; + } else if (type == BRUBECK_MT_HISTO && carbon->namespacing.histo) { + memcpy(ptr, carbon->namespacing.histo, carbon->namespacing.histo_len); + ptr += carbon->namespacing.histo_len; + *ptr++ = '.'; + } else if (type == BRUBECK_MT_GAUGE && carbon->namespacing.gauge) { + memcpy(ptr, carbon->namespacing.gauge, carbon->namespacing.gauge_len); + ptr += carbon->namespacing.gauge_len; + *ptr++ = '.'; + } + memcpy(ptr, key, key_len); ptr += key_len; *ptr++ = ' '; @@ -103,18 +128,61 @@ static inline size_t pickle1_double(char *ptr, void *_src) } static void pickle1_push( - struct pickler *buf, + struct brubeck_carbon *carbon, + uint8_t type, const char *key, uint8_t key_len, - uint32_t timestamp, value_t value) { + uint8_t namespaced_key_len = 0; + char *type_namespace = NULL; + size_t type_namespace_len = 0; + struct pickler *buf = &carbon->pickler; char *ptr = buf->ptr + buf->pos; + if (carbon->namespacing.global) { + // the global namespace plus the "." character + namespaced_key_len += carbon->namespacing.global_len + 1; + } + + if ((type == BRUBECK_MT_COUNTER || type == BRUBECK_MT_METER) && carbon->namespacing.counter) { + type_namespace = carbon->namespacing.counter; + type_namespace_len = carbon->namespacing.counter_len; + // the counter namespace plus the "." character + namespaced_key_len += carbon->namespacing.counter_len + 1; + } else if (type == BRUBECK_MT_TIMER && carbon->namespacing.timer) { + type_namespace = carbon->namespacing.timer; + type_namespace_len = carbon->namespacing.timer_len; + // the counter namespace plus the "." character + namespaced_key_len += carbon->namespacing.timer_len + 1; + } else if (type == BRUBECK_MT_HISTO && carbon->namespacing.histo) { + type_namespace = carbon->namespacing.histo; + type_namespace_len = carbon->namespacing.histo_len; + // the counter namespace plus the "." character + namespaced_key_len += carbon->namespacing.histo_len + 1; + } else if (type == BRUBECK_MT_GAUGE && carbon->namespacing.gauge) { + type_namespace = carbon->namespacing.gauge; + type_namespace_len = carbon->namespacing.gauge_len; + // the counter namespace plus the "." character + namespaced_key_len += carbon->namespacing.gauge_len + 1; + } + + namespaced_key_len += key_len; + *ptr++ = '('; *ptr++ = 'U'; - *ptr++ = key_len; + *ptr++ = namespaced_key_len; + if (carbon->namespacing.global) { + memcpy(ptr, carbon->namespacing.global, carbon->namespacing.global_len); + ptr += carbon->namespacing.global_len; + *ptr++ = '.'; + } + if (type_namespace) { + memcpy(ptr, type_namespace, type_namespace_len); + ptr += type_namespace_len; + *ptr++ = '.'; + } memcpy(ptr, key, key_len); ptr += key_len; @@ -123,7 +191,7 @@ static void pickle1_push( *ptr++ = '('; - ptr += pickle1_int32(ptr, ×tamp); + ptr += pickle1_int32(ptr, &carbon->backend.tick_time); ptr += pickle1_double(ptr, &value); *ptr++ = 't'; @@ -177,6 +245,7 @@ static void pickle1_flush(void *backend) } static void pickle1_each( + uint8_t type, const char *key, value_t value, void *backend) @@ -192,8 +261,7 @@ static void pickle1_each( if (!carbon_is_connected(carbon)) return; - pickle1_push(&carbon->pickler, key, key_len, - carbon->backend.tick_time, value); + pickle1_push(carbon, type, key, key_len, value); } struct brubeck_backend * @@ -201,14 +269,24 @@ brubeck_carbon_new(struct brubeck_server *server, json_t *settings, int shard_n) { struct brubeck_carbon *carbon = xcalloc(1, sizeof(struct brubeck_carbon)); char *address; + char *global_prefix = NULL, + *prefix_counter = NULL, + *prefix_timer = NULL, + *prefix_histo = NULL, + *prefix_gauge = NULL; int port, frequency, pickle = 0; json_unpack_or_die(settings, - "{s:s, s:i, s?:b, s:i}", + "{s:s, s:i, s?:b, s:i, s?:s, s?:s, s?:s, s?:s, s?:s}", "address", &address, "port", &port, "pickle", &pickle, - "frequency", &frequency); + "frequency", &frequency, + "global_prefix", &global_prefix, + "prefix_counter", &prefix_counter, + "prefix_timer", &prefix_timer, + "prefix_histo", &prefix_histo, + "prefix_gauge", &prefix_gauge); carbon->backend.type = BRUBECK_BACKEND_CARBON; carbon->backend.shard_n = shard_n; @@ -225,6 +303,31 @@ brubeck_carbon_new(struct brubeck_server *server, json_t *settings, int shard_n) carbon->backend.flush = NULL; } + if (global_prefix) { + carbon->namespacing.global = global_prefix; + carbon->namespacing.global_len = strlen(global_prefix); + } + + if (prefix_counter) { + carbon->namespacing.counter = prefix_counter; + carbon->namespacing.counter_len = strlen(prefix_counter); + } + + if (prefix_timer) { + carbon->namespacing.timer = prefix_timer; + carbon->namespacing.timer_len = strlen(prefix_timer); + } + + if (prefix_histo) { + carbon->namespacing.histo = prefix_histo; + carbon->namespacing.histo_len = strlen(prefix_histo); + } + + if (prefix_gauge) { + carbon->namespacing.gauge = prefix_gauge; + carbon->namespacing.gauge_len = strlen(prefix_gauge); + } + carbon->backend.sample_freq = frequency; carbon->backend.server = server; carbon->out_sock = -1; diff --git a/src/backends/carbon.h b/src/backends/carbon.h index a8f9546..08e54a0 100644 --- a/src/backends/carbon.h +++ b/src/backends/carbon.h @@ -15,6 +15,22 @@ struct brubeck_carbon { uint16_t pos; uint16_t pt; } pickler; + struct namespacing { + char *global; + size_t global_len; + + char *counter; + size_t counter_len; + + char *timer; + size_t timer_len; + + char *histo; + size_t histo_len; + + char *gauge; + size_t gauge_len; + } namespacing; size_t sent; }; diff --git a/src/internal_sampler.c b/src/internal_sampler.c index 50a7651..ac7b6d4 100644 --- a/src/internal_sampler.c +++ b/src/internal_sampler.c @@ -14,44 +14,44 @@ brubeck_internal__sample(struct brubeck_metric *metric, brubeck_sample_cb sample WITH_SUFFIX(".metrics") { value = brubeck_atomic_swap(&stats->live.metrics, 0); stats->sample.metrics = value; - sample(key, (value_t)value, opaque); + sample(metric->type, key, (value_t)value, opaque); } WITH_SUFFIX(".errors") { value = brubeck_atomic_swap(&stats->live.errors, 0); stats->sample.errors = value; - sample(key, (value_t)value, opaque); + sample(metric->type, key, (value_t)value, opaque); } WITH_SUFFIX(".unique_keys") { value = brubeck_atomic_fetch(&stats->live.unique_keys); stats->sample.unique_keys = value; - sample(key, (value_t)value, opaque); + sample(metric->type, key, (value_t)value, opaque); } /* Secure statsd endpoint */ WITH_SUFFIX(".secure.failed") { value = brubeck_atomic_swap(&stats->live.secure.failed, 0); stats->sample.secure.failed = value; - sample(key, (value_t)value, opaque); + sample(metric->type, key, (value_t)value, opaque); } WITH_SUFFIX(".secure.from_future") { value = brubeck_atomic_swap(&stats->live.secure.from_future, 0); stats->sample.secure.from_future = value; - sample(key, (value_t)value, opaque); + sample(metric->type, key, (value_t)value, opaque); } WITH_SUFFIX(".secure.delayed") { value = brubeck_atomic_swap(&stats->live.secure.delayed, 0); stats->sample.secure.delayed = value; - sample(key, (value_t)value, opaque); + sample(metric->type, key, (value_t)value, opaque); } WITH_SUFFIX(".secure.replayed") { value = brubeck_atomic_swap(&stats->live.secure.replayed, 0); stats->sample.secure.replayed = value; - sample(key, (value_t)value, opaque); + sample(metric->type, key, (value_t)value, opaque); } /* diff --git a/src/metric.c b/src/metric.c index 7943f29..f0eb5c1 100644 --- a/src/metric.c +++ b/src/metric.c @@ -64,7 +64,7 @@ gauge__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opa } pthread_spin_unlock(&metric->lock); - sample(metric->key, value, opaque); + sample(metric->type, metric->key, value, opaque); } @@ -98,7 +98,7 @@ meter__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opa } pthread_spin_unlock(&metric->lock); - sample(metric->key, value, opaque); + sample(metric->type, metric->key, value, opaque); } @@ -140,7 +140,7 @@ counter__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *o } pthread_spin_unlock(&metric->lock); - sample(metric->key, value, opaque); + sample(metric->type, metric->key, value, opaque); } @@ -179,7 +179,7 @@ histogram__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void WITH_SUFFIX(".count") { - sample(key, hsample.count, opaque); + sample(metric->type, key, hsample.count, opaque); } /* if there have been no metrics during this sampling period, @@ -188,43 +188,43 @@ histogram__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void return; WITH_SUFFIX(".min") { - sample(key, hsample.min, opaque); + sample(metric->type, key, hsample.min, opaque); } WITH_SUFFIX(".max") { - sample(key, hsample.max, opaque); + sample(metric->type, key, hsample.max, opaque); } WITH_SUFFIX(".sum") { - sample(key, hsample.sum, opaque); + sample(metric->type, key, hsample.sum, opaque); } WITH_SUFFIX(".mean") { - sample(key, hsample.mean, opaque); + sample(metric->type, key, hsample.mean, opaque); } WITH_SUFFIX(".median") { - sample(key, hsample.median, opaque); + sample(metric->type, key, hsample.median, opaque); } WITH_SUFFIX(".percentile.75") { - sample(key, hsample.percentile[PC_75], opaque); + sample(metric->type, key, hsample.percentile[PC_75], opaque); } WITH_SUFFIX(".percentile.95") { - sample(key, hsample.percentile[PC_95], opaque); + sample(metric->type, key, hsample.percentile[PC_95], opaque); } WITH_SUFFIX(".percentile.98") { - sample(key, hsample.percentile[PC_98], opaque); + sample(metric->type, key, hsample.percentile[PC_98], opaque); } WITH_SUFFIX(".percentile.99") { - sample(key, hsample.percentile[PC_99], opaque); + sample(metric->type, key, hsample.percentile[PC_99], opaque); } WITH_SUFFIX(".percentile.999") { - sample(key, hsample.percentile[PC_999], opaque); + sample(metric->type, key, hsample.percentile[PC_999], opaque); } } diff --git a/src/metric.h b/src/metric.h index ad96727..32dff73 100644 --- a/src/metric.h +++ b/src/metric.h @@ -55,6 +55,7 @@ struct brubeck_metric { }; typedef void (*brubeck_sample_cb)( + uint8_t type, const char *key, value_t value, void *backend);