Skip to content

Commit b1a523d

Browse files
committed
Initial import of feature branch code.
1 parent 75f4702 commit b1a523d

File tree

4 files changed

+285
-0
lines changed

4 files changed

+285
-0
lines changed

README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,31 @@ statsd-elasticsearch-backend
22
============================
33

44
Elasticsearch backend for statsd
5+
6+
## Overview
7+
8+
This backend allows [Statsd][statsd] to save to [Elasticsearch][elasticsearch]. Supports dynamic index creation per day and follows the logstash naming convention of statsd-YYYY.MM.DD for index creation (TODO).
9+
10+
## History
11+
12+
Originally written by Github user rameshpy, this library was created as a feature branch of etsy/statsd. The statsd project recommended that this library be converted to its own repository as all other backends currently do. This repository started as a restructuring of the existing feature branch into a standalone backend repository.
13+
14+
## Installation
15+
16+
$ cd /path/to/statsd
17+
$ npm install statsd-elasticsearch-backend
18+
19+
## Configuration
20+
21+
Merge the following configuration into your top-level existing configuration.
22+
23+
```js
24+
{
25+
elasticPort: 9200
26+
, elasticHost: "localhost"
27+
, elasticFlushInterval: 10000
28+
, elasticIndex: "statsd"
29+
, elasticIndexType: "stats"
30+
, backends: ['statsd-elasticsearch-backend']
31+
}
32+
```

lib/elasticsearch.js

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Flush stats to ElasticSearch (http://www.elasticsearch.org/)
3+
*
4+
* To enable this backend, include 'elastic' in the backends
5+
* configuration array:
6+
*
7+
* backends: ['./backends/elastic']
8+
* (if the config file is in the statsd folder)
9+
*
10+
* A sample configuration can be found in exampleElasticConfig.js
11+
*
12+
* This backend supports the following config options:
13+
*
14+
* elasticHost: hostname or IP of ElasticSearch server
15+
* elasticPort: port of Elastic Search Server
16+
* elasticIndex: Index of the ElasticSearch server where the metrics are to be saved (_index)
17+
* elasticIndexType: The type of the index to be saved with (_type)
18+
* elasticFlushInterval: Right now, not being used and is the same as flushInterval, but hope to use this to make the logging to ES over a longer interval than for graphite
19+
*/
20+
21+
var net = require('net'),
22+
util = require('util'),
23+
http = require('http');
24+
25+
var debug;
26+
var flushInterval;
27+
var elasticHost;
28+
var elasticPort;
29+
var elasticIndex;
30+
var elasticIndexType;
31+
32+
var elasticStats = {};
33+
34+
var post_stats = function elastic_post_stats(statString) {
35+
if (elasticHost) {
36+
try {
37+
var elastic = require('../utils/httpReq')
38+
elasticUrl = 'http://' + elasticHost + ':' + elasticPort + '/' + elasticIndex + '/' + elasticIndexType ;
39+
elastic.urlReq(elasticUrl,{
40+
method: 'POST',
41+
params: statString.toString()
42+
}, function(body, res){
43+
44+
});
45+
46+
47+
} catch(e){
48+
if (debug) {
49+
util.log(e);
50+
}
51+
elasticStats.last_exception = Math.round(new Date().getTime() / 1000);
52+
}
53+
}
54+
}
55+
56+
var flush_stats = function elastic_flush(ts, metrics) {
57+
var statString = '';
58+
var numStats = 0;
59+
var key;
60+
var message_array = new Array();
61+
62+
ts = new Date()
63+
var counters = metrics.counters;
64+
var gauges = metrics.gauges;
65+
var timers = metrics.timers;
66+
var pctThreshold = metrics.pctThreshold;
67+
68+
for (key in counters) {
69+
var value = counters[key];
70+
var valuePerSecond = value / (flushInterval / 1000); // calculate "per second" rate
71+
72+
//statString += 'stats.' + key + ' ' + valuePerSecond + ' ' + "\n";
73+
//statString += 'stats_counts.' + key + ' ' + value + ' ' + "\n";
74+
message_array.push(create_json(key + '.persecond',valuePerSecond,ts));
75+
message_array.push(create_json(key,value,ts));
76+
77+
numStats += 1;
78+
}
79+
80+
for (key in timers) {
81+
if (timers[key].length > 0) {
82+
var values = timers[key].sort(function (a,b) { return a-b; });
83+
var count = values.length;
84+
var min = values[0];
85+
var max = values[count - 1];
86+
87+
var mean = min;
88+
var maxAtThreshold = max;
89+
90+
var message = "";
91+
92+
var key2;
93+
94+
for (key2 in pctThreshold) {
95+
var pct = pctThreshold[key2];
96+
if (count > 1) {
97+
var thresholdIndex = Math.round(((100 - pct) / 100) * count);
98+
var numInThreshold = count - thresholdIndex;
99+
var pctValues = values.slice(0, numInThreshold);
100+
maxAtThreshold = pctValues[numInThreshold - 1];
101+
102+
// average the remaining timings
103+
var sum = 0;
104+
for (var i = 0; i < numInThreshold; i++) {
105+
sum += pctValues[i];
106+
}
107+
108+
mean = sum / numInThreshold;
109+
}
110+
111+
var clean_pct = '' + pct;
112+
clean_pct.replace('.', '_');
113+
message_array.push(create_json(key + '.timers.mean_' + clean_pct ,mean,ts));
114+
message_array.push(create_json(key + '.timers.mean_' + clean_pct ,maxAtThreshold,ts));
115+
}
116+
117+
message_array.push(create_json(key + '.timers.upper' ,max,ts));
118+
message_array.push(create_json(key + '.timers.lower' ,min,ts));
119+
message_array.push(create_json(key + '.timers.count' ,count,ts));
120+
//statString += message;
121+
122+
numStats += 1;
123+
}
124+
}
125+
126+
for (key in gauges) {
127+
message_array.push(create_json(key + '.gauges' , gauges[key],ts));
128+
numStats += 1;
129+
}
130+
131+
for(var i = 0; i < message_array.length; i++ ) {
132+
post_stats(message_array[i].toString())
133+
};
134+
135+
};
136+
137+
var create_json = function create_elastic_json(entity, value, timestamp){
138+
result = {"entity" : entity , "value" : value , "@timestamp" : timestamp}
139+
return JSON.stringify(result);
140+
};
141+
142+
143+
var elastic_backend_status = function graphite_status(writeCb) {
144+
for (stat in elasticStats) {
145+
writeCb(null, 'elastic', stat, elasticStats[stat]);
146+
}
147+
};
148+
149+
exports.init = function graphite_init(startup_time, config, events) {
150+
debug = config.debug;
151+
elasticHost = config.elasticHost;
152+
elasticPort = config.elasticPort;
153+
elasticIndex = config.elasticIndex;
154+
elasticIndexType = config.elasticIndexType;
155+
156+
elasticStats.last_flush = startup_time;
157+
elasticStats.last_exception = startup_time;
158+
159+
flushInterval = config.flushInterval;
160+
161+
events.on('flush', flush_stats);
162+
events.on('status', elastic_backend_status);
163+
164+
return true;
165+
};
166+

package.json

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
{
2+
"author": "Ramesh Perumalsamy",
3+
"contributors": [
4+
{
5+
"name": "Mark Kimsal"
6+
}
7+
],
8+
"name": "statsd-elasticsearch-backend",
9+
"description": "A StatsD backend for Elasticsearch",
10+
"version": "0.0.1",
11+
"homepage": "https://github.com/markkimsal/statsd-elasticsearch-backend",
12+
"repository": {
13+
"type": "git",
14+
"url": "git://github.com/markkkimsal/statsd-elasticsearch-backend.git"
15+
},
16+
"keywords": [
17+
"elasticsearch",
18+
"metrics",
19+
"statsd"
20+
],
21+
"engines": {
22+
"node": ">=0.8"
23+
},
24+
"dependencies": {},
25+
"devDependencies": {},
26+
"main": "lib/elasticsearch.js"
27+
}

utils/httpReq.js

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// module dependencies
2+
var http = require('http'),
3+
url = require('url');
4+
5+
6+
/**
7+
* UrlReq - Wraps the http.request function making it nice for unit testing APIs.
8+
*
9+
* @param {string} reqUrl The required url in any form
10+
* @param {object} options An options object (this is optional)
11+
* @param {Function} cb This is passed the 'res' object from your request
12+
*
13+
* Credits : https://gist.github.com/1943352
14+
* Picked it up from the above URL
15+
*/
16+
exports.urlReq = function(reqUrl, options, cb){
17+
if(typeof options === "function"){ cb = options; options = {}; }// incase no options passed in
18+
19+
// parse url to chunks
20+
reqUrl = url.parse(reqUrl);
21+
22+
// http.request settings
23+
var settings = {
24+
host: reqUrl.hostname,
25+
port: reqUrl.port || 80,
26+
path: reqUrl.pathname,
27+
headers: options.headers || {},
28+
method: options.method || 'GET'
29+
};
30+
31+
// if there are params:
32+
if(options.params){
33+
settings.headers['Content-Type'] = 'application/json';
34+
};
35+
36+
// MAKE THE REQUEST
37+
var req = http.request(settings);
38+
39+
// if there are params: write them to the request
40+
if(options.params){ req.write(options.params) };
41+
42+
// when the response comes back
43+
req.on('response', function(res){
44+
res.body = '';
45+
res.setEncoding('utf-8');
46+
47+
// concat chunks
48+
res.on('data', function(chunk){
49+
res.body += chunk ;
50+
});
51+
52+
// when the response has finished
53+
res.on('end', function(){
54+
55+
// fire callback
56+
cb(res.body, res);
57+
});
58+
});
59+
60+
// end the request
61+
req.end();
62+
}
63+
64+

0 commit comments

Comments
 (0)