diff --git a/Makefile b/Makefile index 84c4473..3e0fdc2 100644 --- a/Makefile +++ b/Makefile @@ -13,6 +13,7 @@ all: default SOURCES = \ src/backend.c \ src/backends/carbon.c \ + src/backends/opentsdb.c \ src/bloom.c \ src/city.c \ src/histogram.c \ diff --git a/config.default.json.example b/config.default.json.example index 4a6035c..594195f 100644 --- a/config.default.json.example +++ b/config.default.json.example @@ -3,7 +3,7 @@ "server_name" : "brubeck_debug", "dumpfile" : "./brubeck.dump", "capacity" : 15, - "expire" : 5, + "expire" : 20, "http" : ":8080", "backends" : [ diff --git a/src/backend.h b/src/backend.h index e9a7f5b..1d31356 100644 --- a/src/backend.h +++ b/src/backend.h @@ -2,7 +2,8 @@ #define __BRUBECK_BACKEND_H__ enum brubeck_backend_t { - BRUBECK_BACKEND_CARBON + BRUBECK_BACKEND_CARBON, + BRUBECK_BACKEND_OPENTSDB }; struct brubeck_backend { @@ -28,10 +29,12 @@ static inline const char *brubeck_backend_name(struct brubeck_backend *backend) { switch (backend->type) { case BRUBECK_BACKEND_CARBON: return "carbon"; + case BRUBECK_BACKEND_OPENTSDB: return "opentsdb"; default: return NULL; } } #include "backends/carbon.h" +#include "backends/opentsdb.h" #endif diff --git a/src/backends/opentsdb.c b/src/backends/opentsdb.c new file mode 100644 index 0000000..5a954e8 --- /dev/null +++ b/src/backends/opentsdb.c @@ -0,0 +1,117 @@ +#include +#include +#include "brubeck.h" + +static inline int is_connected(struct brubeck_opentsdb *self) +{ + return (self->out_sock >= 0); +} + +static int opentsdb_connect(void *backend) +{ + struct brubeck_opentsdb *self = (struct brubeck_opentsdb *)backend; + + if (is_connected(self)) + return 0; + + self->out_sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + + if (self->out_sock >= 0) { + int rc = connect(self->out_sock, + (struct sockaddr *)&self->out_sockaddr, + sizeof(self->out_sockaddr)); + + if (rc == 0) { + log_splunk("backend=opentsdb event=connected"); + sock_enlarge_out(self->out_sock); + return 0; + } + + close(self->out_sock); + self->out_sock = -1; + } + + log_splunk_errno("backend=opentsdb event=failed_to_connect"); + return -1; +} + +static void opentsdb_disconnect(struct brubeck_opentsdb *self) +{ + log_splunk_errno("backend=opentsdb event=disconnected"); + + close(self->out_sock); + self->out_sock = -1; +} + +static void opentsdb_write( + const char *key, + value_t value, + void *backend) +{ + struct brubeck_opentsdb *opentsdb = (struct brubeck_opentsdb *)backend; + char buffer[1024]; + char *ptr = buffer; + size_t key_len = strlen(key); + ssize_t wr; + + if (!is_connected(opentsdb)) + return; + + strcpy(ptr, "put"); + ptr += strlen("put"); + *ptr++ = ' '; + + memcpy(ptr, key, key_len); + ptr += key_len; + *ptr++ = ' '; + + ptr += brubeck_itoa(ptr, opentsdb->backend.tick_time); + *ptr++ = ' '; + + ptr += brubeck_ftoa(ptr, value); + *ptr++ = ' '; + + strcpy(ptr, opentsdb->tags); + ptr += strlen(opentsdb->tags); + *ptr++ = '\n'; + + wr = write_in_full(opentsdb->out_sock, buffer, ptr - buffer); + if (wr < 0) { + opentsdb_disconnect(opentsdb); + return; + } + + opentsdb->sent += wr; +} + +struct brubeck_backend * +brubeck_opentsdb_new(struct brubeck_server *server, json_t *settings, int shard_n) +{ + struct brubeck_opentsdb *opentsdb = xcalloc(1, sizeof(struct brubeck_opentsdb)); + char *address; + int port, frequency = 0; + + json_unpack_or_die(settings, + "{s:s, s:i, s:i, s:s}", + "address", &address, + "port", &port, + "frequency", &frequency, + "tags", &(opentsdb->tags)); + + opentsdb->backend.type = BRUBECK_BACKEND_OPENTSDB; + opentsdb->backend.shard_n = shard_n; + opentsdb->backend.connect = &opentsdb_connect; + + opentsdb->backend.sample = &opentsdb_write; + opentsdb->backend.flush = NULL; + + opentsdb->backend.sample_freq = frequency; + opentsdb->backend.server = server; + opentsdb->out_sock = -1; + url_to_inaddr2(&opentsdb->out_sockaddr, address, port); + + brubeck_backend_run_threaded((struct brubeck_backend *)opentsdb); + log_splunk("backend=opentsdb event=started"); + + return (struct brubeck_backend *)opentsdb; +} diff --git a/src/backends/opentsdb.h b/src/backends/opentsdb.h new file mode 100644 index 0000000..60d3478 --- /dev/null +++ b/src/backends/opentsdb.h @@ -0,0 +1,17 @@ +#ifndef __BRUBECK_OPENTSDB_H__ +#define __BRUBECK_OPENTSDB_H__ + +struct brubeck_opentsdb { + struct brubeck_backend backend; + + int out_sock; + struct sockaddr_in out_sockaddr; + + const char* tags; + size_t sent; +}; + +struct brubeck_backend *brubeck_opentsdb_new( + struct brubeck_server *server, json_t *settings, int shard_n); + +#endif diff --git a/src/http.c b/src/http.c index adf8935..48cd5d6 100644 --- a/src/http.c +++ b/src/http.c @@ -83,7 +83,21 @@ send_stats(struct brubeck_server *brubeck) "port", (int)ntohs(address->sin_port), "sent", (json_int_t)carbon->sent )); - } + } else if (backend->type == BRUBECK_BACKEND_OPENTSDB) { + struct brubeck_opentsdb *opentsdb = (struct brubeck_opentsdb *)backend; + struct sockaddr_in *address = &opentsdb->out_sockaddr; + char addr[INET_ADDRSTRLEN]; + + json_array_append_new(backends, + json_pack("{s:s, s:i, s:b, s:s, s:i, s:I}", + "type", "opentsdb", + "sample_freq", (int)opentsdb->backend.sample_freq, + "connected", (opentsdb->out_sock >= 0), + "address", inet_ntop(AF_INET, &address->sin_addr.s_addr, addr, INET_ADDRSTRLEN), + "port", (int)ntohs(address->sin_port), + "sent", (json_int_t)opentsdb->sent + )); + } } samplers = json_array(); diff --git a/src/server.c b/src/server.c index 0584697..5b7df22 100644 --- a/src/server.c +++ b/src/server.c @@ -46,7 +46,18 @@ update_proctitle(struct brubeck_server *server) (i > 0) ? "," : "", i + 1, sent, size_suffix[j], (carbon->out_sock >= 0) ? "" : " (dc)"); - } + } else if (backend->type == BRUBECK_BACKEND_OPENTSDB) { + struct brubeck_opentsdb *opentsdb = (struct brubeck_opentsdb *)backend; + double sent = opentsdb->sent; + + for (j = 0; j < 7 && sent >= 1024.0; ++j) + sent /= 1024.0; + + PUTS("%s #%d %.1f%s%s", + (i > 0) ? "," : "", + i + 1, sent, size_suffix[j], + (opentsdb->out_sock >= 0) ? "": " (dc)"); + } } PUTS(" ] [ " UTF8_DOWNARROW); @@ -111,7 +122,10 @@ static void load_backends(struct brubeck_server *server, json_t *backends) if (type && !strcmp(type, "carbon")) { backend = brubeck_carbon_new(server, b, server->active_backends); server->backends[server->active_backends++] = backend; - } else { + } else if (type && !strcmp(type, "opentsdb")) { + backend = brubeck_opentsdb_new(server, b, server->active_backends); + server->backends[server->active_backends++] = backend; + } else { log_splunk("backend=%s event=invalid_backend", type); } }