Skip to content
This repository was archived by the owner on Nov 14, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions config.default.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@
{
"type" : "carbon",
"address" : "localhost",
"port" : 2003,
"frequency" : 10
"port" : 2004,
"frequency" : 10,
"pickle" : true,
"global_prefix" : "stats",
"prefix_counter" : "counters",
"prefix_timer" : "timers",
"prefix_histo" : "histos",
"prefix_gauge" : "gauges"
}
],

Expand Down
2 changes: 1 addition & 1 deletion src/backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ struct brubeck_backend {

int (*connect)(void *);
bool (*is_connected)(void *);
void (*sample)(const char *, value_t, void *);
void (*sample)(uint8_t, const char *, value_t, void *);
void (*flush)(void *);

uint32_t tick_time;
Expand Down
119 changes: 111 additions & 8 deletions src/backends/carbon.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ static void carbon_disconnect(struct brubeck_carbon *self)
}

static void plaintext_each(
uint8_t type,
const char *key,
value_t value,
void *backend)
Expand All @@ -58,6 +59,30 @@ static void plaintext_each(
if (!carbon_is_connected(carbon))
return;

if (carbon->namespacing.global) {
memcpy(ptr, carbon->namespacing.global, carbon->namespacing.global_len);
ptr += carbon->namespacing.global_len;
*ptr++ = '.';
}

if ((type == BRUBECK_MT_COUNTER || type == BRUBECK_MT_METER) && carbon->namespacing.counter) {
memcpy(ptr, carbon->namespacing.counter, carbon->namespacing.counter_len);
ptr += carbon->namespacing.counter_len;
*ptr++ = '.';
} else if (type == BRUBECK_MT_TIMER && carbon->namespacing.timer) {
memcpy(ptr, carbon->namespacing.timer, carbon->namespacing.timer_len);
ptr += carbon->namespacing.timer_len;
*ptr++ = '.';
} else if (type == BRUBECK_MT_HISTO && carbon->namespacing.histo) {
memcpy(ptr, carbon->namespacing.histo, carbon->namespacing.histo_len);
ptr += carbon->namespacing.histo_len;
*ptr++ = '.';
} else if (type == BRUBECK_MT_GAUGE && carbon->namespacing.gauge) {
memcpy(ptr, carbon->namespacing.gauge, carbon->namespacing.gauge_len);
ptr += carbon->namespacing.gauge_len;
*ptr++ = '.';
}

memcpy(ptr, key, key_len);
ptr += key_len;
*ptr++ = ' ';
Expand Down Expand Up @@ -103,18 +128,61 @@ static inline size_t pickle1_double(char *ptr, void *_src)
}

static void pickle1_push(
struct pickler *buf,
struct brubeck_carbon *carbon,
uint8_t type,
const char *key,
uint8_t key_len,
uint32_t timestamp,
value_t value)
{
uint8_t namespaced_key_len = 0;
char *type_namespace = NULL;
size_t type_namespace_len = 0;
struct pickler *buf = &carbon->pickler;
char *ptr = buf->ptr + buf->pos;

if (carbon->namespacing.global) {
// the global namespace plus the "." character
namespaced_key_len += carbon->namespacing.global_len + 1;
}

if ((type == BRUBECK_MT_COUNTER || type == BRUBECK_MT_METER) && carbon->namespacing.counter) {
type_namespace = carbon->namespacing.counter;
type_namespace_len = carbon->namespacing.counter_len;
// the counter namespace plus the "." character
namespaced_key_len += carbon->namespacing.counter_len + 1;
} else if (type == BRUBECK_MT_TIMER && carbon->namespacing.timer) {
type_namespace = carbon->namespacing.timer;
type_namespace_len = carbon->namespacing.timer_len;
// the counter namespace plus the "." character
namespaced_key_len += carbon->namespacing.timer_len + 1;
} else if (type == BRUBECK_MT_HISTO && carbon->namespacing.histo) {
type_namespace = carbon->namespacing.histo;
type_namespace_len = carbon->namespacing.histo_len;
// the counter namespace plus the "." character
namespaced_key_len += carbon->namespacing.histo_len + 1;
} else if (type == BRUBECK_MT_GAUGE && carbon->namespacing.gauge) {
type_namespace = carbon->namespacing.gauge;
type_namespace_len = carbon->namespacing.gauge_len;
// the counter namespace plus the "." character
namespaced_key_len += carbon->namespacing.gauge_len + 1;
}

namespaced_key_len += key_len;

*ptr++ = '(';

*ptr++ = 'U';
*ptr++ = key_len;
*ptr++ = namespaced_key_len;
if (carbon->namespacing.global) {
memcpy(ptr, carbon->namespacing.global, carbon->namespacing.global_len);
ptr += carbon->namespacing.global_len;
*ptr++ = '.';
}
if (type_namespace) {
memcpy(ptr, type_namespace, type_namespace_len);
ptr += type_namespace_len;
*ptr++ = '.';
}
memcpy(ptr, key, key_len);
ptr += key_len;

Expand All @@ -123,7 +191,7 @@ static void pickle1_push(

*ptr++ = '(';

ptr += pickle1_int32(ptr, &timestamp);
ptr += pickle1_int32(ptr, &carbon->backend.tick_time);
ptr += pickle1_double(ptr, &value);

*ptr++ = 't';
Expand Down Expand Up @@ -177,6 +245,7 @@ static void pickle1_flush(void *backend)
}

static void pickle1_each(
uint8_t type,
const char *key,
value_t value,
void *backend)
Expand All @@ -192,23 +261,32 @@ static void pickle1_each(
if (!carbon_is_connected(carbon))
return;

pickle1_push(&carbon->pickler, key, key_len,
carbon->backend.tick_time, value);
pickle1_push(carbon, type, key, key_len, value);
}

struct brubeck_backend *
brubeck_carbon_new(struct brubeck_server *server, json_t *settings, int shard_n)
{
struct brubeck_carbon *carbon = xcalloc(1, sizeof(struct brubeck_carbon));
char *address;
char *global_prefix = NULL,
*prefix_counter = NULL,
*prefix_timer = NULL,
*prefix_histo = NULL,
*prefix_gauge = NULL;
int port, frequency, pickle = 0;

json_unpack_or_die(settings,
"{s:s, s:i, s?:b, s:i}",
"{s:s, s:i, s?:b, s:i, s?:s, s?:s, s?:s, s?:s, s?:s}",
"address", &address,
"port", &port,
"pickle", &pickle,
"frequency", &frequency);
"frequency", &frequency,
"global_prefix", &global_prefix,
"prefix_counter", &prefix_counter,
"prefix_timer", &prefix_timer,
"prefix_histo", &prefix_histo,
"prefix_gauge", &prefix_gauge);

carbon->backend.type = BRUBECK_BACKEND_CARBON;
carbon->backend.shard_n = shard_n;
Expand All @@ -225,6 +303,31 @@ brubeck_carbon_new(struct brubeck_server *server, json_t *settings, int shard_n)
carbon->backend.flush = NULL;
}

if (global_prefix) {
carbon->namespacing.global = global_prefix;
carbon->namespacing.global_len = strlen(global_prefix);
}

if (prefix_counter) {
carbon->namespacing.counter = prefix_counter;
carbon->namespacing.counter_len = strlen(prefix_counter);
}

if (prefix_timer) {
carbon->namespacing.timer = prefix_timer;
carbon->namespacing.timer_len = strlen(prefix_timer);
}

if (prefix_histo) {
carbon->namespacing.histo = prefix_histo;
carbon->namespacing.histo_len = strlen(prefix_histo);
}

if (prefix_gauge) {
carbon->namespacing.gauge = prefix_gauge;
carbon->namespacing.gauge_len = strlen(prefix_gauge);
}

carbon->backend.sample_freq = frequency;
carbon->backend.server = server;
carbon->out_sock = -1;
Expand Down
16 changes: 16 additions & 0 deletions src/backends/carbon.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,22 @@ struct brubeck_carbon {
uint16_t pos;
uint16_t pt;
} pickler;
struct namespacing {
char *global;
size_t global_len;

char *counter;
size_t counter_len;

char *timer;
size_t timer_len;

char *histo;
size_t histo_len;

char *gauge;
size_t gauge_len;
} namespacing;
size_t sent;
};

Expand Down
14 changes: 7 additions & 7 deletions src/internal_sampler.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,44 @@ brubeck_internal__sample(struct brubeck_metric *metric, brubeck_sample_cb sample
WITH_SUFFIX(".metrics") {
value = brubeck_atomic_swap(&stats->live.metrics, 0);
stats->sample.metrics = value;
sample(key, (value_t)value, opaque);
sample(metric->type, key, (value_t)value, opaque);
}

WITH_SUFFIX(".errors") {
value = brubeck_atomic_swap(&stats->live.errors, 0);
stats->sample.errors = value;
sample(key, (value_t)value, opaque);
sample(metric->type, key, (value_t)value, opaque);
}

WITH_SUFFIX(".unique_keys") {
value = brubeck_atomic_fetch(&stats->live.unique_keys);
stats->sample.unique_keys = value;
sample(key, (value_t)value, opaque);
sample(metric->type, key, (value_t)value, opaque);
}

/* Secure statsd endpoint */
WITH_SUFFIX(".secure.failed") {
value = brubeck_atomic_swap(&stats->live.secure.failed, 0);
stats->sample.secure.failed = value;
sample(key, (value_t)value, opaque);
sample(metric->type, key, (value_t)value, opaque);
}

WITH_SUFFIX(".secure.from_future") {
value = brubeck_atomic_swap(&stats->live.secure.from_future, 0);
stats->sample.secure.from_future = value;
sample(key, (value_t)value, opaque);
sample(metric->type, key, (value_t)value, opaque);
}

WITH_SUFFIX(".secure.delayed") {
value = brubeck_atomic_swap(&stats->live.secure.delayed, 0);
stats->sample.secure.delayed = value;
sample(key, (value_t)value, opaque);
sample(metric->type, key, (value_t)value, opaque);
}

WITH_SUFFIX(".secure.replayed") {
value = brubeck_atomic_swap(&stats->live.secure.replayed, 0);
stats->sample.secure.replayed = value;
sample(key, (value_t)value, opaque);
sample(metric->type, key, (value_t)value, opaque);
}

/*
Expand Down
28 changes: 14 additions & 14 deletions src/metric.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ gauge__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opa
}
pthread_spin_unlock(&metric->lock);

sample(metric->key, value, opaque);
sample(metric->type, metric->key, value, opaque);
}


Expand Down Expand Up @@ -98,7 +98,7 @@ meter__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *opa
}
pthread_spin_unlock(&metric->lock);

sample(metric->key, value, opaque);
sample(metric->type, metric->key, value, opaque);
}


Expand Down Expand Up @@ -140,7 +140,7 @@ counter__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void *o
}
pthread_spin_unlock(&metric->lock);

sample(metric->key, value, opaque);
sample(metric->type, metric->key, value, opaque);
}


Expand Down Expand Up @@ -179,7 +179,7 @@ histogram__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void


WITH_SUFFIX(".count") {
sample(key, hsample.count, opaque);
sample(metric->type, key, hsample.count, opaque);
}

/* if there have been no metrics during this sampling period,
Expand All @@ -188,43 +188,43 @@ histogram__sample(struct brubeck_metric *metric, brubeck_sample_cb sample, void
return;

WITH_SUFFIX(".min") {
sample(key, hsample.min, opaque);
sample(metric->type, key, hsample.min, opaque);
}

WITH_SUFFIX(".max") {
sample(key, hsample.max, opaque);
sample(metric->type, key, hsample.max, opaque);
}

WITH_SUFFIX(".sum") {
sample(key, hsample.sum, opaque);
sample(metric->type, key, hsample.sum, opaque);
}

WITH_SUFFIX(".mean") {
sample(key, hsample.mean, opaque);
sample(metric->type, key, hsample.mean, opaque);
}

WITH_SUFFIX(".median") {
sample(key, hsample.median, opaque);
sample(metric->type, key, hsample.median, opaque);
}

WITH_SUFFIX(".percentile.75") {
sample(key, hsample.percentile[PC_75], opaque);
sample(metric->type, key, hsample.percentile[PC_75], opaque);
}

WITH_SUFFIX(".percentile.95") {
sample(key, hsample.percentile[PC_95], opaque);
sample(metric->type, key, hsample.percentile[PC_95], opaque);
}

WITH_SUFFIX(".percentile.98") {
sample(key, hsample.percentile[PC_98], opaque);
sample(metric->type, key, hsample.percentile[PC_98], opaque);
}

WITH_SUFFIX(".percentile.99") {
sample(key, hsample.percentile[PC_99], opaque);
sample(metric->type, key, hsample.percentile[PC_99], opaque);
}

WITH_SUFFIX(".percentile.999") {
sample(key, hsample.percentile[PC_999], opaque);
sample(metric->type, key, hsample.percentile[PC_999], opaque);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ struct brubeck_metric {
};

typedef void (*brubeck_sample_cb)(
uint8_t type,
const char *key,
value_t value,
void *backend);
Expand Down