Skip to content
This repository was archived by the owner on Nov 14, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ all: default
SOURCES = \
src/backend.c \
src/backends/carbon.c \
src/backends/opentsdb.c \
src/bloom.c \
src/city.c \
src/histogram.c \
Expand Down
2 changes: 1 addition & 1 deletion config.default.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"server_name" : "brubeck_debug",
"dumpfile" : "./brubeck.dump",
"capacity" : 15,
"expire" : 5,
"expire" : 20,
"http" : ":8080",

"backends" : [
Expand Down
5 changes: 4 additions & 1 deletion src/backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
#define __BRUBECK_BACKEND_H__

enum brubeck_backend_t {
BRUBECK_BACKEND_CARBON
BRUBECK_BACKEND_CARBON,
BRUBECK_BACKEND_OPENTSDB
};

struct brubeck_backend {
Expand All @@ -28,10 +29,12 @@ static inline const char *brubeck_backend_name(struct brubeck_backend *backend)
{
switch (backend->type) {
case BRUBECK_BACKEND_CARBON: return "carbon";
case BRUBECK_BACKEND_OPENTSDB: return "opentsdb";
default: return NULL;
}
}

#include "backends/carbon.h"
#include "backends/opentsdb.h"

#endif
117 changes: 117 additions & 0 deletions src/backends/opentsdb.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#include <stddef.h>
#include <string.h>
#include "brubeck.h"

static inline int is_connected(struct brubeck_opentsdb *self)
{
return (self->out_sock >= 0);
}

static int opentsdb_connect(void *backend)
{
struct brubeck_opentsdb *self = (struct brubeck_opentsdb *)backend;

if (is_connected(self))
return 0;

self->out_sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);

if (self->out_sock >= 0) {
int rc = connect(self->out_sock,
(struct sockaddr *)&self->out_sockaddr,
sizeof(self->out_sockaddr));

if (rc == 0) {
log_splunk("backend=opentsdb event=connected");
sock_enlarge_out(self->out_sock);
return 0;
}

close(self->out_sock);
self->out_sock = -1;
}

log_splunk_errno("backend=opentsdb event=failed_to_connect");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly speaking the close() system call may overwrite errno, so you should probably log this just after connect() failed before calling close() and as an "else" block when socket() fails. That has the added benefit of indicating exactly which system call failed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't really care about errno here, the correct logic has been returned.

return -1;
}

static void opentsdb_disconnect(struct brubeck_opentsdb *self)
{
log_splunk_errno("backend=opentsdb event=disconnected");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a bit odd to me. Why would you log errno before doing any system calls?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connection is closed by remote here.


close(self->out_sock);
self->out_sock = -1;
}

static void opentsdb_write(
const char *key,
value_t value,
void *backend)
{
struct brubeck_opentsdb *opentsdb = (struct brubeck_opentsdb *)backend;
char buffer[1024];
char *ptr = buffer;
size_t key_len = strlen(key);
ssize_t wr;

if (!is_connected(opentsdb))
return;

strcpy(ptr, "put");
ptr += strlen("put");
*ptr++ = ' ';

memcpy(ptr, key, key_len);
ptr += key_len;
*ptr++ = ' ';

ptr += brubeck_itoa(ptr, opentsdb->backend.tick_time);
*ptr++ = ' ';

ptr += brubeck_ftoa(ptr, value);
*ptr++ = ' ';

strcpy(ptr, opentsdb->tags);
ptr += strlen(opentsdb->tags);
*ptr++ = '\n';

wr = write_in_full(opentsdb->out_sock, buffer, ptr - buffer);
if (wr < 0) {
opentsdb_disconnect(opentsdb);
return;
}

opentsdb->sent += wr;
}

struct brubeck_backend *
brubeck_opentsdb_new(struct brubeck_server *server, json_t *settings, int shard_n)
{
struct brubeck_opentsdb *opentsdb = xcalloc(1, sizeof(struct brubeck_opentsdb));
char *address;
int port, frequency = 0;

json_unpack_or_die(settings,
"{s:s, s:i, s:i, s:s}",
"address", &address,
"port", &port,
"frequency", &frequency,
"tags", &(opentsdb->tags));

opentsdb->backend.type = BRUBECK_BACKEND_OPENTSDB;
opentsdb->backend.shard_n = shard_n;
opentsdb->backend.connect = &opentsdb_connect;

opentsdb->backend.sample = &opentsdb_write;
opentsdb->backend.flush = NULL;

opentsdb->backend.sample_freq = frequency;
opentsdb->backend.server = server;
opentsdb->out_sock = -1;
url_to_inaddr2(&opentsdb->out_sockaddr, address, port);

brubeck_backend_run_threaded((struct brubeck_backend *)opentsdb);
log_splunk("backend=opentsdb event=started");

return (struct brubeck_backend *)opentsdb;
}
17 changes: 17 additions & 0 deletions src/backends/opentsdb.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#ifndef __BRUBECK_OPENTSDB_H__
#define __BRUBECK_OPENTSDB_H__

struct brubeck_opentsdb {
struct brubeck_backend backend;

int out_sock;
struct sockaddr_in out_sockaddr;

const char* tags;
size_t sent;
};

struct brubeck_backend *brubeck_opentsdb_new(
struct brubeck_server *server, json_t *settings, int shard_n);

#endif
16 changes: 15 additions & 1 deletion src/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,21 @@ send_stats(struct brubeck_server *brubeck)
"port", (int)ntohs(address->sin_port),
"sent", (json_int_t)carbon->sent
));
}
} else if (backend->type == BRUBECK_BACKEND_OPENTSDB) {
struct brubeck_opentsdb *opentsdb = (struct brubeck_opentsdb *)backend;
struct sockaddr_in *address = &opentsdb->out_sockaddr;
char addr[INET_ADDRSTRLEN];

json_array_append_new(backends,
json_pack("{s:s, s:i, s:b, s:s, s:i, s:I}",
"type", "opentsdb",
"sample_freq", (int)opentsdb->backend.sample_freq,
"connected", (opentsdb->out_sock >= 0),
"address", inet_ntop(AF_INET, &address->sin_addr.s_addr, addr, INET_ADDRSTRLEN),
"port", (int)ntohs(address->sin_port),
"sent", (json_int_t)opentsdb->sent
));
}
}

samplers = json_array();
Expand Down
18 changes: 16 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,18 @@ update_proctitle(struct brubeck_server *server)
(i > 0) ? "," : "",
i + 1, sent, size_suffix[j],
(carbon->out_sock >= 0) ? "" : " (dc)");
}
} else if (backend->type == BRUBECK_BACKEND_OPENTSDB) {
struct brubeck_opentsdb *opentsdb = (struct brubeck_opentsdb *)backend;
double sent = opentsdb->sent;

for (j = 0; j < 7 && sent >= 1024.0; ++j)
sent /= 1024.0;

PUTS("%s #%d %.1f%s%s",
(i > 0) ? "," : "",
i + 1, sent, size_suffix[j],
(opentsdb->out_sock >= 0) ? "": " (dc)");
}
}

PUTS(" ] [ " UTF8_DOWNARROW);
Expand Down Expand Up @@ -111,7 +122,10 @@ static void load_backends(struct brubeck_server *server, json_t *backends)
if (type && !strcmp(type, "carbon")) {
backend = brubeck_carbon_new(server, b, server->active_backends);
server->backends[server->active_backends++] = backend;
} else {
} else if (type && !strcmp(type, "opentsdb")) {
backend = brubeck_opentsdb_new(server, b, server->active_backends);
server->backends[server->active_backends++] = backend;
} else {
log_splunk("backend=%s event=invalid_backend", type);
}
}
Expand Down