Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cystatsd/collector/collector.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ cdef class MetricCollector:
def __init__(self, int mtu=512):
self.collector = CPPMetricCollector(mtu)

cpdef _push_timer(self, bytes name, int value, float rate):
cpdef _push_timer(self, bytes name, float value, float rate):
self.collector.pushTimer(name, value, rate)

def push_timer(self, name, int value, float rate=1.0):
def push_timer(self, name, float value, float rate=1.0):
if isinstance(name, UNICODE_TYPE):
name = name.encode('utf-8')
self._push_timer(name, value, rate)
Expand All @@ -37,13 +37,13 @@ cdef class MetricCollector:
name = name.encode('utf-8')
self._push_counter(name, value, rate)

cpdef _push_gauge(self, bytes name, int value, float rate=1.0):
cpdef _push_gauge(self, bytes name, float value, float rate=1.0):
self.collector.pushGauge(name, value, rate)

cpdef _push_gauge_delta(self, bytes name, int value, float rate=1.0):
cpdef _push_gauge_delta(self, bytes name, float value, float rate=1.0):
self.collector.pushGaugeDelta(name, value, rate)

def push_gauge(self, name, int value, float rate=1.0, delta = False):
def push_gauge(self, name, float value, float rate=1.0, delta = False):
if isinstance(name, UNICODE_TYPE):
name = name.encode('utf-8')
if delta:
Expand Down
12 changes: 6 additions & 6 deletions cystatsd/collector/cstatsd_proto.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ cdef extern from "./statsd_proto.hpp" namespace "statsd_proto":
cdef cppclass MetricCollector:
MetricCollector()
MetricCollector(size_t)
void pushGauge(const string &name, int value)
void pushGaugeDelta(const string &name, int value)
void pushTimer(const string &name, int value)
void pushGauge(const string &name, float value)
void pushGaugeDelta(const string &name, float value)
void pushTimer(const string &name, float value)
void pushCounter(const string &name, int value)
void pushSet(const string &name, int value)
void pushGauge(const string &name, int value, float rate)
void pushGaugeDelta(const string &name, int value, float rate)
void pushTimer(const string &name, int value, float rate)
void pushGauge(const string &name, float value, float rate)
void pushGaugeDelta(const string &name, float value, float rate)
void pushTimer(const string &name, float value, float rate)
void pushCounter(const string &name, int value, float rate)
void pushSet(const string &name, int value, float rate)
vector[string] flush()
Expand Down
94 changes: 74 additions & 20 deletions cystatsd/collector/statsd_proto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,28 @@ const char* BufferHandle::data() const {


Metric::Metric(){}

Metric::Metric(const Metric & met)
: metricType_(met.metricType_), name_(met.name_), value_(met.value_) {}

Metric::Metric(MetricType mtype, std::string name, int64_t val)
: metricType_(mtype), name_(name), value_(val) {}

void Metric::formatValue(char * buffer) const {
if (metricType_ == MetricType::GAUGE_DELTA) {
sprintf(buffer, ":%+" PRId64 "|", value_);
} else {
sprintf(buffer, ":%" PRId64 "|", value_);
}
}

void Metric::encodeTo(BufferHandle *buff) {
if (metricType_ == MetricType::NONE) {
return;
}
buff->write(name_);
char intBuff[35];
if (metricType_ == MetricType::GAUGE_DELTA) {
sprintf(intBuff, ":%+" PRId64 "|", value_);
} else {
sprintf(intBuff, ":%" PRId64 "|", value_);
}
this->formatValue(intBuff);
buff->write(intBuff, strlen(intBuff));
switch (metricType_) {
case MetricType::COUNTER:
Expand All @@ -78,28 +86,74 @@ void Metric::encodeTo(BufferHandle *buff) {
}
}

MetricType Metric::getMetricType() const {
return metricType_;
}


FloatMetric::FloatMetric(const FloatMetric & met) : Metric(met), floatValue_(met.floatValue_) {}

FloatMetric::FloatMetric(MetricType mtype, std::string name, float val)
: Metric(mtype, name, 0), floatValue_(val) {}

void FloatMetric::formatValue(char * buffer) const {
if (metricType_ == MetricType::GAUGE_DELTA) {
sprintf(buffer, ":%+.6f|", floatValue_);
} else {
sprintf(buffer, ":%.6f|", floatValue_);
}
}


SampledMetric::~SampledMetric() {
if (metric_ != NULL) {
delete metric_;
metric_ = NULL;
}
}

SampledMetric::SampledMetric(const SampledMetric & met) : rate_(met.rate_) {
if (met.metric_ != NULL) {
switch (met.metric_->getMetricType()) {
case MetricType::COUNTER:
case MetricType::SET:
metric_ = new Metric(*met.metric_);
break;

case MetricType::GAUGE:
case MetricType::GAUGE_DELTA:
case MetricType::TIMER:
metric_ = new FloatMetric(*((FloatMetric*)met.metric_));
break;

case MetricType::NONE:
metric_ = NULL;
}
}
}

SampledMetric::SampledMetric(Metric met): metric_(met) {}
SampledMetric::SampledMetric(Metric met, float rate)
: metric_(met), rate_(rate) {}
SampledMetric::SampledMetric(MetricType mtype, const string& name, int64_t value, float rate) : metric_(NULL), rate_(rate) {
metric_ = new Metric(mtype, name, value);
}

SampledMetric::SampledMetric(MetricType mtype,
const string& name, int64_t value, float rate)
: metric_(mtype, name, value), rate_(rate) {}
SampledMetric::SampledMetric(MetricType mtype, const string& name, float value, float rate) : metric_(NULL), rate_(rate) {
metric_ = new FloatMetric(mtype, name, value);
}

SampledMetric::SampledMetric() {}
SampledMetric::SampledMetric() : metric_(NULL) {}

bool SampledMetric::isSampled() const {
return rate_ < 1.0;
}

void SampledMetric::encodeTo(BufferHandle *buff) {
metric_.encodeTo(buff);
if (isSampled()) {
char rateBuff[8];
sprintf(rateBuff, "|@%.2f", rate_);
buff->write(rateBuff, strlen(rateBuff));
if (metric_ != NULL) {
metric_->encodeTo(buff);
if (isSampled()) {
char rateBuff[8];
sprintf(rateBuff, "|@%.2f", rate_);
buff->write(rateBuff, strlen(rateBuff));
}
}
}

Expand Down Expand Up @@ -166,19 +220,19 @@ std::vector<std::string> MetricCollector::flush() {
return result;
}

void MetricCollector::pushTimer(const string& name, int64_t val, float rate) {
void MetricCollector::pushTimer(const string& name, float val, float rate) {
metrics_.emplace_back(MetricType::TIMER, name, val, rate);
}

void MetricCollector::pushCounter(const string& name, int64_t val, float rate) {
metrics_.emplace_back(MetricType::COUNTER, name, val, rate);
}

void MetricCollector::pushGauge(const string& name, int64_t val, float rate) {
void MetricCollector::pushGauge(const string& name, float val, float rate) {
metrics_.emplace_back(MetricType::GAUGE, name, val, rate);
}

void MetricCollector::pushGaugeDelta(const string& name, int64_t val, float rate) {
void MetricCollector::pushGaugeDelta(const string& name, float val, float rate) {
metrics_.emplace_back(MetricType::GAUGE_DELTA, name, val, rate);
}

Expand Down
31 changes: 24 additions & 7 deletions cystatsd/collector/statsd_proto.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,39 @@ class Metric {
MetricType metricType_ {MetricType::NONE};
std::string name_ {""};
int64_t value_ {0};

virtual void formatValue(char * buffer) const;
public:
Metric();
Metric(const Metric & met);
virtual ~Metric() {};
Metric(MetricType mtype, std::string name, int64_t val);
void encodeTo(BufferHandle*);
MetricType getMetricType() const;
};

class FloatMetric : public Metric {
protected:
float floatValue_ {0};

void formatValue(char * buffer) const;
public:
FloatMetric(const FloatMetric & met);
FloatMetric(MetricType mtype, std::string name, float val);
virtual ~FloatMetric() {};
};

class SampledMetric {
protected:
Metric metric_;
Metric * metric_;
float rate_ {1.0};
public:
SampledMetric();
explicit SampledMetric();
virtual ~SampledMetric();
bool isSampled() const;
SampledMetric(Metric met, float rate);
SampledMetric(const SampledMetric & met);
SampledMetric(MetricType met, const std::string &name, int64_t val, float rate);
SampledMetric(Metric met);
SampledMetric(MetricType met, const std::string &name, float val, float rate);
void encodeTo(BufferHandle*);
};

Expand All @@ -70,10 +87,10 @@ class MetricCollector {
size_t count() const;
bool empty() const;
std::vector<std::string> flush();
void pushTimer(const std::string& name, int64_t value, float rate = 1.0);
void pushTimer(const std::string& name, float value, float rate = 1.0);
void pushCounter(const std::string& name, int64_t value, float rate = 1.0);
void pushGauge(const std::string& name, int64_t value, float rate = 1.0);
void pushGaugeDelta(const std::string& name, int64_t value, float rate = 1.0);
void pushGauge(const std::string& name, float value, float rate = 1.0);
void pushGaugeDelta(const std::string& name, float value, float rate = 1.0);
void pushSet(const std::string& name, int64_t value, float rate = 1.0);
};

Expand Down
45 changes: 29 additions & 16 deletions test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@

from cystatsd import MetricCollector
import unittest
import re


def get_gauge_number(name, value):
regex = r"^{}:([0-9]+\.[0-9]+)\|g".format(name)
m = re.match(regex, value.decode())
if m:
return float(m.group(1))


def encode_timer(name, val, rate=1.0):
collector = MetricCollector()
Expand Down Expand Up @@ -36,21 +45,25 @@ def encode_set(name, val, rate=1.0):
class BasicCollectorTests(unittest.TestCase):

def test_timer(self):
self.assertEqual(b"foo:200|ms", encode_timer("foo", 200))
self.assertEqual(b"bar:215|ms|@0.50", encode_timer("bar", 215, 0.5))
self.assertEqual(b"bar:215|ms", encode_timer("bar", 215, 1.5))
self.assertEqual(b"foo:200.000000|ms", encode_timer("foo", 200))
self.assertEqual(b"bar:215.000000|ms|@0.50", encode_timer("bar", 215, 0.5))
self.assertEqual(b"bar:215.000000|ms", encode_timer("bar", 215, 1.5))

def test_counter(self):
self.assertEqual(b"foo:200|c", encode_counter("foo", 200))
self.assertEqual(b"bar:215|c|@0.50", encode_counter("bar", 215, 0.5))
self.assertEqual(b"bar:215|c", encode_counter("bar", 215, 1.5))

def test_gauge(self):
self.assertEqual(b"foo:200|g", encode_gauge("foo", 200))
self.assertEqual(b"foo:+200|g", encode_gauge("foo", 200, delta=True))
self.assertEqual(b"foo:-200|g", encode_gauge("foo", -200, delta=True))
self.assertEqual(b"bar:215|g|@0.50", encode_gauge("bar", 215, 0.5))
self.assertEqual(b"bar:215|g", encode_gauge("bar", 215, 1.5))
self.assertEqual(b"foo:200.000000|g", encode_gauge("foo", 200))
self.assertEqual(b"foo:+200.000000|g", encode_gauge("foo", 200, delta=True))
self.assertEqual(b"foo:-200.000000|g", encode_gauge("foo", -200, delta=True))
self.assertEqual(b"bar:215.000000|g|@0.50", encode_gauge("bar", 215, 0.5))
self.assertEqual(b"bar:215.000000|g", encode_gauge("bar", 215, 1.5))

def test_gauge_float(self):
result = get_gauge_number("foo", encode_gauge("foo", 200.123))
self.assertAlmostEqual(200.123, result, places = 3)

def test_set(self):
self.assertEqual(b"foo:200|s", encode_set("foo", 200))
Expand All @@ -66,10 +79,10 @@ def test_batching_1(self):
collector.push_gauge(met_name, 100 + i)
result = list(collector.flush())
self.assertEqual([
b"m0:100|g\nm1:101|g",
b"m2:102|g\nm3:103|g",
b"m4:104|g\nm5:105|g",
b"m6:106|g\nm7:107|g"
b"m0:100.000000|g\nm1:101.000000|g",
b"m2:102.000000|g\nm3:103.000000|g",
b"m4:104.000000|g\nm5:105.000000|g",
b"m6:106.000000|g\nm7:107.000000|g"
], result)

def test_batching_1(self):
Expand All @@ -81,8 +94,8 @@ def test_batching_1(self):
result = list(collector.flush())
print('RESULT', result)
self.assertEqual([
b"m0:100|g\nm1:101|g\n"
b"m2:102|g\nm3:103|g\n"
b"m4:104|g\nm5:105|g\n"
b"m6:106|g\nm7:107|g"
b"m0:100.000000|g\nm1:101.000000|g\n"
b"m2:102.000000|g\nm3:103.000000|g\n"
b"m4:104.000000|g\nm5:105.000000|g\n"
b"m6:106.000000|g\nm7:107.000000|g"
], result)