diff --git a/quasardb/stats.py b/quasardb/stats.py index a9e58c26..c07dac53 100644 --- a/quasardb/stats.py +++ b/quasardb/stats.py @@ -1,14 +1,20 @@ import re + import quasardb import logging +from collections import defaultdict from datetime import datetime +from enum import Enum logger = logging.getLogger("quasardb.stats") - +MAX_KEYS = 4 * 1024 * 1024 # 4 million max keys stats_prefix = "$qdb.statistics." + +# Compile these regexes once for speed user_pattern = re.compile(r"\$qdb.statistics.(.*).uid_([0-9]+)$") total_pattern = re.compile(r"\$qdb.statistics.(.*)$") +user_clean_pattern = re.compile(r"\.uid_\d+") def is_user_stat(s): @@ -50,49 +56,29 @@ def of_node(dconn): """ start = datetime.now() - raw = {k: _get_stat(dconn, k) for k in dconn.prefix_get(stats_prefix, 10000)} - ret = {"by_uid": _by_uid(raw), "cumulative": _cumulative(raw)} + ks = _get_all_keys(dconn) + idx = _index_keys(dconn, ks) + raw = {k: _get_stat_value(dconn, k) for k in ks} + + ret = {"by_uid": _by_uid(raw, idx), "cumulative": _cumulative(raw, idx)} check_duration = datetime.now() - start - ret["cumulative"]["check.online"] = 1 - ret["cumulative"]["check.duration_ms"] = int(check_duration.total_seconds() * 1000) + ret["cumulative"]["check.online"] = { + "value": 1, + "type": Type.ACCUMULATOR, + "unit": Unit.NONE, + } + ret["cumulative"]["check.duration_ms"] = { + "value": int(check_duration.total_seconds() * 1000), + "type": Type.ACCUMULATOR, + "unit": Unit.MILLISECONDS, + } return ret -_stat_types = { - "node_id": ("constant", None), - "operating_system": ("constant", None), - "partitions_count": ("constant", "count"), - "cpu.system": ("counter", "ns"), - "cpu.user": ("counter", "ns"), - "cpu.idle": ("counter", "ns"), - "startup": ("constant", None), - "startup_time": ("constant", None), - "shutdown_time": ("constant", None), - "network.current_users_count": ("gauge", "count"), - "hardware_concurrency": ("gauge", "count"), - "check.online": ("gauge", "count"), - "check.duration_ms": ("constant", "ms"), - "requests.bytes_in": ("counter", "bytes"), - "requests.bytes_out": ("counter", "bytes"), - "requests.errors_count": ("counter", "count"), - "requests.successes_count": ("counter", "count"), - "requests.total_count": ("counter", "count"), - "async_pipelines.merge.bucket_count": ("counter", "count"), - "async_pipelines.merge.duration_us": ("counter", "us"), - "async_pipelines.write.successes_count": ("counter", "count"), - "async_pipelines.write.failures_count": ("counter", "count"), - "async_pipelines.write.time_us": ("counter", "us"), - "async_pipelines.merge.max_bucket_count": ("gauge", "count"), - "async_pipelines.merge.max_depth_count": ("gauge", "count"), - "async_pipelines.merge.requests_count": ("counter", "count"), - "evicted.count": ("counter", "count"), - "pageins.count": ("counter", "count"), -} - async_pipeline_bytes_pattern = re.compile( r"async_pipelines.pipe_[0-9]+.merge_map.bytes" ) @@ -101,39 +87,6 @@ def of_node(dconn): ) -def _stat_type(stat_id): - if stat_id in _stat_types: - return _stat_types[stat_id] - elif stat_id.endswith("total_ns"): - return ("counter", "ns") - elif stat_id.endswith("total_bytes"): - return ("counter", "bytes") - elif stat_id.endswith("read_bytes"): - return ("counter", "bytes") - elif stat_id.endswith("written_bytes"): - return ("counter", "bytes") - elif stat_id.endswith("total_count"): - return ("counter", "count") - elif stat_id.startswith("network.sessions."): - return ("gauge", "count") - elif stat_id.startswith("memory."): - # memory statistics are all gauges i think, describes how much memory currently allocated where - return ("gauge", "bytes") - elif stat_id.startswith("persistence.") or stat_id.startswith("disk"): - # persistence are also all gauges, describes mostly how much is currently available/used on storage - return ("gauge", "bytes") - elif stat_id.startswith("license."): - return ("gauge", None) - elif stat_id.startswith("engine_"): - return ("constant", None) - elif async_pipeline_bytes_pattern.match(stat_id): - return ("gauge", "bytes") - elif async_pipeline_count_pattern.match(stat_id): - return ("gauge", "count") - else: - return None - - def stat_type(stat_id): """ Returns the statistic type by a stat id. Returns one of: @@ -144,66 +97,193 @@ def stat_type(stat_id): This is useful for determining which value should be reported in a dashboard. """ - return _stat_type(stat_id) - + import warnings -def _calculate_delta_stat(stat_id, prev, cur): - logger.info( - "calculating delta for stat_id = {}, prev = {}. cur = {}".format( - stat_id, prev, cur - ) + warnings.warn( + "The 'stat_type' method is deprecated and will be removed in a future release." + "The stat type and unit are now part of the return value of invocations to the 'of_node' and 'by_node' methods.", + DeprecationWarning, + stacklevel=2, ) - stat_type = _stat_type(stat_id) - if stat_type == "counter": - return cur - prev - elif stat_type == "gauge": - return cur - else: - return None + return None -def _calculate_delta_stats(prev_stats, cur_stats): - ret = {} - for stat_id in cur_stats.keys(): - try: - prev_stat = cur_stats[stat_id] - cur_stat = cur_stats[stat_id] +def _get_all_keys(dconn, n=1024): + """ + Returns all keys from a single node. - value = _calculate_delta_stat(stat_id, prev_stat, cur_stat) - if value is not None: - ret[stat_id] = value + Parameters: + dconn: quasardb.Node + Direct node connection to the node we wish to connect to. - except KeyError: - # Stat likely was not present yet in prev_stats - pass + n: int + Number of keys to retrieve. + """ + xs = None + increase_rate = 8 + # keep getting keys while number of results exceeds the given "n" + while xs is None or len(xs) >= n: + if xs is not None: + n = n * increase_rate + if n >= MAX_KEYS: + raise Exception(f"ERROR: Cannot fetch more than {MAX_KEYS} keys.") + xs = dconn.prefix_get(stats_prefix, n) - return ret + return xs + + +class Type(Enum): + ACCUMULATOR = 1 + GAUGE = 2 + LABEL = 3 + + +class Unit(Enum): + NONE = 0 + COUNT = 1 + + # Size units + BYTES = 32 + + # Time/duration units + EPOCH = 64 + NANOSECONDS = 65 + MICROSECONDS = 66 + MILLISECONDS = 67 + SECONDS = 68 + + +_type_string_to_enum = { + "accumulator": Type.ACCUMULATOR, + "gauge": Type.GAUGE, + "label": Type.LABEL, +} + +_unit_string_to_enum = { + "none": Unit.NONE, + "count": Unit.COUNT, + "bytes": Unit.BYTES, + "epoch": Unit.EPOCH, + "nanoseconds": Unit.NANOSECONDS, + "microseconds": Unit.MICROSECONDS, + "milliseconds": Unit.MILLISECONDS, + "seconds": Unit.SECONDS, +} + + +def _lookup_enum(dconn, k, m): + """ + Utility function to avoid code duplication: automatically looks up a key's value + from QuasarDB and looks it up in provided dict. + """ + + x = dconn.blob(k).get() + x = _clean_blob(x) + + if x not in m: + raise Exception(f"Unrecognized unit/type {x} from key {k}") + + return m[x] -def calculate_delta(prev, cur): +def _lookup_type(dconn, k): """ - Calculates the 'delta' between two successive statistic measurements. + Looks up and parses/validates the metric type. """ - ret = {} - for node_id in cur.keys(): - ret[node_id] = _calculate_delta_stats( - prev[node_id]["cumulative"], cur[node_id]["cumulative"] - ) + assert k.endswith(".type") + + return _lookup_enum(dconn, k, _type_string_to_enum) + + +def _lookup_unit(dconn, k): + """ + Looks up and parses/validates the metric type. + """ + assert k.endswith(".unit") + + return _lookup_enum(dconn, k, _unit_string_to_enum) + + +def _index_keys(dconn, ks): + """ + Takes all statistics keys that are retrieved, and "indexes" them in such a way + that we end up with a dict of all statistic keys, their type and their unit. + """ + + ### + # The keys generally look like this, for example: + # + # $qdb.statistics.requests.out_bytes + # $qdb.statistics.requests.out_bytes.type + # $qdb.statistics.requests.out_bytes.uid_1 + # $qdb.statistics.requests.out_bytes.uid_1.type + # $qdb.statistics.requests.out_bytes.uid_1.unit + # $qdb.statistics.requests.out_bytes.unit + # + # For this purpose, we simply get rid of the "uid" part, as the per-uid metrics are guaranteed + # to be of the exact same type as all the others. So after trimming those, the keys will look + # like this: + # + # $qdb.statistics.requests.out_bytes + # $qdb.statistics.requests.out_bytes.type + # $qdb.statistics.requests.out_bytes + # $qdb.statistics.requests.out_bytes.type + # $qdb.statistics.requests.out_bytes.unit + # $qdb.statistics.requests.out_bytes.unit + # + # And after deduplication like this: + # + # $qdb.statistics.requests.out_bytes + # $qdb.statistics.requests.out_bytes.type + # $qdb.statistics.requests.out_bytes.unit + # + # In which case we'll store `requests.out_bytes` as the statistic type, and look up the type + # and unit for those metrics and add a placeholder value. + + ret = defaultdict(lambda: {"value": None, "type": None, "unit": None}) + + for k in ks: + # Remove any 'uid_[0-9]+' part from the string + k_ = user_clean_pattern.sub("", k) + + matches = total_pattern.match(k_) + + parts = matches.groups()[0].rsplit(".", 1) + metric_id = parts[0] + + if len(parts) > 1 and parts[1] == "type": + if ret[metric_id]["type"] == None: + # We haven't seen this particular statistic yet + ret[metric_id]["type"] = _lookup_type(dconn, k) + elif len(parts) > 1 and parts[1] == "unit": + if ret[metric_id]["unit"] == None: + # We haven't seen this particular statistic yet + ret[metric_id]["unit"] = _lookup_unit(dconn, k) + else: + # It's a value, we look those up later + pass return ret def _clean_blob(x): + """ + Utility function that decodes a blob as an UTF-8 string, as the direct node C API + does not yet support 'string' types and as such all statistics are stored as blobs. + """ x_ = x.decode("utf-8", "replace") # remove trailing zero-terminator return "".join(c for c in x_ if ord(c) != 0) -def _get_stat(dconn, k): +def _get_stat_value(dconn, k): # Ugly, but works: try to retrieve as integer, if not an int, retrieve as # blob + # + # XXX(leon): we could use the index we built to get a much stronger hint + # on what the type is. try: return dconn.integer(k).get() @@ -216,30 +296,63 @@ def _get_stat(dconn, k): return _clean_blob(dconn.blob(k).get()) -def _by_uid(stats): +def _by_uid(stats, idx): xs = {} for k, v in stats.items(): matches = user_pattern.match(k) if is_user_stat(k) and matches: (metric, uid_str) = matches.groups() + + if metric.split(".")[-1] in ["type", "unit"]: + # We already indexed the type and unit in our idx, this is not interesting + continue + + if metric.startswith("serialized"): + # Internal stuff we don't care about nor cannot do anything with + continue + + if not metric in idx: + raise Exception(f"Metric not in internal index: {metric}") + + # Parse user id uid = int(uid_str) + + # Prepare our metric dict + x = idx[metric].copy() + x["value"] = v + if uid not in xs: xs[uid] = {} - if not metric.startswith("serialized"): - xs[uid][metric] = v + xs[uid][metric] = x return xs -def _cumulative(stats): +def _cumulative(stats, idx): xs = {} for k, v in stats.items(): matches = total_pattern.match(k) if is_cumulative_stat(k) and matches: metric = matches.groups()[0] - if not metric.startswith("serialized"): - xs[metric] = v + + if metric.split(".")[-1] in ["type", "unit"]: + # We already indexed the type and unit in our idx, this is not interesting + continue + + if metric.startswith("serialized"): + # Internal stuff we don't care about nor cannot do anything with + continue + + if not metric in idx: + raise Exception(f"Metric not in internal index: {metric}") + + x = idx[metric].copy() + x["value"] = v + xs[metric] = x return xs + + +# async_pipelines.buffer.total_bytes diff --git a/scripts/tests/setup b/scripts/tests/setup index 8fc3c537..685fd72b 160000 --- a/scripts/tests/setup +++ b/scripts/tests/setup @@ -1 +1 @@ -Subproject commit 8fc3c5377b627778b552bbc4a4bea8b867e24038 +Subproject commit 685fd72bcc2d59ca933cb161eceba1829cfcacad diff --git a/tests/test_stats.py b/tests/test_stats.py index ab940584..6657147c 100644 --- a/tests/test_stats.py +++ b/tests/test_stats.py @@ -20,16 +20,38 @@ def _write_data(conn, table): ) -def _ensure_stats(conn, table): - global _has_stats +def _has_stats(conn): + xs = qdbst.by_node(conn) + + # As we always use a secure connection, we will wait for by-uid statistics to appear. + # + # We also don't expect a cluster to be running, so just ensure we have exactly 1 node. + ks = list(xs.keys()) + assert len(ks) == 1 + + node_id = ks[0] + node_stats = xs[node_id] + + assert "by_uid" in node_stats + uid_stats = node_stats["by_uid"] + + # The actual check happens here: we expect at least 1 per-uid statistic + return len(uid_stats.keys()) > 0 + +def _ensure_stats(conn, table): # This function is merely here to generate some activity on the cluster, # so that we're guaranteed to have some statistics - if _has_stats is False: + + max_polls = 10 + n = 0 + + while _has_stats(conn) is False: _write_data(conn, table) - # Statistics refresh interval is 5 seconds. - sleep(6) - _has_stats = True + sleep(1) + + n = n + 1 + assert n <= max_polls # A fairly conservative set of user-specific stats we always expect after doing our @@ -57,6 +79,25 @@ def _ensure_stats(conn, table): ] +def _validate_stats_dict(xs): + """ + Validates a dict with keys / stat tuples association. + """ + for k, x in xs.items(): + print("k: ", k, ", x: ", x) + + # Each statistic is a dict of type/unit/value + assert isinstance(x, dict) + assert "value" in x + assert "type" in x + assert "unit" in x + + # Everything that's not a NONE unit (i.e. not a label) should be an int + if x["unit"] != qdbst.Unit.NONE: + + assert isinstance(x["value"], int) + + def _validate_node_stats(stats): assert "by_uid" in stats assert "cumulative" in stats @@ -68,19 +109,19 @@ def _validate_node_stats(stats): for expected in _expected_user_stats: assert expected in xs - for _, v in xs.items(): - # As far as I know, per-user statistics should *always* be - # integers - assert isinstance(v, int) + _validate_stats_dict(xs) # Test cumulative stats xs = stats["cumulative"] for expected in _expected_cumulative_stats: assert expected in xs + _validate_stats_dict(xs) + def test_stats_by_node(qdbd_secure_connection, secure_table): _ensure_stats(qdbd_secure_connection, secure_table) + xs = qdbst.by_node(qdbd_secure_connection) assert len(xs) == 1 @@ -91,7 +132,7 @@ def test_stats_by_node(qdbd_secure_connection, secure_table): def test_stats_of_node(qdbd_settings, qdbd_secure_connection, secure_table): # First seed the table - # _ensure_stats(qdbd_secure_connection, secure_table) + _ensure_stats(qdbd_secure_connection, secure_table) # Now establish direct connection conn = quasardb.Node( @@ -108,6 +149,7 @@ def test_stats_of_node(qdbd_settings, qdbd_secure_connection, secure_table): def test_stats_regex(): # This is mostly to test a regression, where user-stats for users with multi-digit ids # were not picked up. + user_stats = [ "$qdb.statistics.foo.uid_1", "$qdb.statistics.foo.uid_21", @@ -127,3 +169,8 @@ def test_stats_regex(): for s in total_stats: assert qdbst.is_user_stat(s) is False assert qdbst.is_cumulative_stat(s) is True + + +def test_stat_type_is_deprecated(): + with pytest.deprecated_call(): + qdbst.stat_type("foobar")