From 40e5bd93528adb74052fe1ddeee152695ac1e9b7 Mon Sep 17 00:00:00 2001 From: Anton Leonov Date: Wed, 17 Apr 2019 20:15:31 +0300 Subject: [PATCH 1/9] Removed unused helpers. Added subprocess helper. --- Hermes/appscale/hermes/constants.py | 15 +---- Hermes/appscale/hermes/helper.py | 58 +++++++++---------- .../hermes/producers/cassandra_stats.py | 36 +----------- 3 files changed, 35 insertions(+), 74 deletions(-) diff --git a/Hermes/appscale/hermes/constants.py b/Hermes/appscale/hermes/constants.py index c8e314c381..8472e3b186 100644 --- a/Hermes/appscale/hermes/constants.py +++ b/Hermes/appscale/hermes/constants.py @@ -14,11 +14,6 @@ # Stats which were produce less than X seconds ago is considered as current ACCEPTABLE_STATS_AGE = 10 -# The ZooKeeper location for storing Hermes configurations -NODES_STATS_CONFIGS_NODE = '/appscale/stats/profiling/nodes' -PROCESSES_STATS_CONFIGS_NODE = '/appscale/stats/profiling/processes' -PROXIES_STATS_CONFIGS_NODE = '/appscale/stats/profiling/proxies' - class _MissedValue(object): """ @@ -43,10 +38,6 @@ def __repr__(self): SECRET_HEADER = 'Appscale-Secret' -class HTTP_Codes(object): - """ A class with HTTP status codes. """ - HTTP_OK = 200 - HTTP_BAD_REQUEST = 400 - HTTP_DENIED = 403 - HTTP_INTERNAL_ERROR = 500 - HTTP_NOT_IMPLEMENTED = 501 +class SubprocessError(Exception): + """ Indicates that subcommand failed. """ + pass diff --git a/Hermes/appscale/hermes/helper.py b/Hermes/appscale/hermes/helper.py index 1ff7bcf29c..36c5573700 100644 --- a/Hermes/appscale/hermes/helper.py +++ b/Hermes/appscale/hermes/helper.py @@ -1,34 +1,34 @@ """ Helper functions for Hermes operations. """ -import errno -import os +import asyncio +import logging -class JSONTags(object): - """ A class containing all JSON tags used for Hermes functionality. """ - ALL_STATS = 'all_stats' - BUCKET_NAME = 'bucket_name' - BODY = 'body' - DEPLOYMENT_ID = 'deployment_id' - ERROR = 'error' - OBJECT_NAME = 'object_name' - REASON = 'reason' - STATUS = 'status' - STORAGE = 'storage' - SUCCESS = 'success' - TASK_ID = 'task_id' - TIMESTAMP = 'timestamp' - TYPE = 'type' - UNREACHABLE = 'unreachable' +from appscale.hermes.constants import SubprocessError -def ensure_directory(dir_path): - """ Ensures that the directory exists. +logger = logging.getLogger(__name__) + + +async def subprocess(command, timeout): + process = await asyncio.create_subprocess_shell( + command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + logger.debug('Started subprocess `{}` (pid: {})' + .format(command, process.pid)) - Args: - dir_path: A str representing the directory path. - """ try: - os.makedirs(dir_path) - except OSError as os_error: - if os_error.errno == errno.EEXIST and os.path.isdir(dir_path): - pass - else: - raise + # Wait for the subprocess to finish + stdout, stderr = await asyncio.wait_for(process.communicate(), timeout) + except asyncio.TimeoutError: + raise SubprocessError('Timed out waiting for subprocess `{}` (pid: {})' + .format(command, process.pid)) + + output = stdout.decode() + error = stderr.decode() + if error: + logger.warning(error) + if process.returncode != 0: + raise SubprocessError('Subprocess failed with return code {} ({})' + .format(process.returncode, error)) + + return output, error diff --git a/Hermes/appscale/hermes/producers/cassandra_stats.py b/Hermes/appscale/hermes/producers/cassandra_stats.py index 207e060725..1292bac66b 100644 --- a/Hermes/appscale/hermes/producers/cassandra_stats.py +++ b/Hermes/appscale/hermes/producers/cassandra_stats.py @@ -7,6 +7,7 @@ import attr from appscale.common import appscale_info +from appscale.hermes import helper from appscale.hermes.converter import Meta, include_list_name # The endpoint used for retrieving queue stats. @@ -16,11 +17,6 @@ logger = logging.getLogger(__name__) -class NodetoolStatusError(Exception): - """ Indicates that `nodetool status` command failed. """ - pass - - @include_list_name('cassandra.node') @attr.s(cmp=False, hash=False, slots=True, frozen=True) class CassandraNodeStats(object): @@ -99,34 +95,8 @@ async def get_current(cls): An instance of CassandraStatsSnapshot. """ start = time.time() - - process = await asyncio.create_subprocess_shell( - NODETOOL_STATUS_COMMAND, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE - ) - logger.info('Started subprocess `{}` (pid: {})' - .format(NODETOOL_STATUS_COMMAND, process.pid)) - - try: - # Wait for the subprocess to finish - stdout, stderr = await asyncio.wait_for( - process.communicate(), NODETOOL_STATUS_TIMEOUT - ) - except asyncio.TimeoutError: - raise NodetoolStatusError( - 'Timed out waiting for subprocess `{}` (pid: {})' - .format(NODETOOL_STATUS_COMMAND, process.pid) - ) - - output = stdout.decode() - error = stderr.decode() - if error: - logger.warning(error) - if process.returncode != 0: - raise NodetoolStatusError('Subprocess failed with return code {} ({})' - .format(process.returncode, error)) - + output, error = helper.subprocess(NODETOOL_STATUS_COMMAND, + NODETOOL_STATUS_TIMEOUT) known_db_nodes = set(appscale_info.get_db_ips()) nodes = [] shown_nodes = set() From be66198966216cb74df91599e195e666670b73e8 Mon Sep 17 00:00:00 2001 From: Anton Leonov Date: Wed, 17 Apr 2019 20:18:35 +0300 Subject: [PATCH 2/9] New monitoring resource concept and first process resource --- Hermes/appscale/hermes/resources/__init__.py | 0 Hermes/appscale/hermes/resources/process.py | 217 +++++++++++++++ .../hermes/resources/resource_handlers.py | 257 ++++++++++++++++++ Hermes/setup.py | 13 +- 4 files changed, 482 insertions(+), 5 deletions(-) create mode 100644 Hermes/appscale/hermes/resources/__init__.py create mode 100644 Hermes/appscale/hermes/resources/process.py create mode 100644 Hermes/appscale/hermes/resources/resource_handlers.py diff --git a/Hermes/appscale/hermes/resources/__init__.py b/Hermes/appscale/hermes/resources/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/Hermes/appscale/hermes/resources/process.py b/Hermes/appscale/hermes/resources/process.py new file mode 100644 index 0000000000..3ea495f96c --- /dev/null +++ b/Hermes/appscale/hermes/resources/process.py @@ -0,0 +1,217 @@ +import logging +import re +import time + +import attr +import psutil + +from appscale.admin.service_manager import ServiceManager +from appscale.common import appscale_info +from appscale.hermes import helper + +from appscale.hermes.unified_service_names import ( + find_service_by_monit_name +) + +logger = logging.getLogger(__name__) + +APPSCALE_PROCESS_TAG = 'appscale' + + +@attr.s(cmp=False, hash=False, slots=True) +class Process(object): + """ + A container for all parameters representing process state at + a specific moment of time. + """ + utc_timestamp = attr.ib(default=None) + host = attr.ib(default=None) + + pid = attr.ib(default=None) + ppid = attr.ib(default=None) + create_time = attr.ib(default=None) + status = attr.ib(default=None) + username = attr.ib(default=None) + cwd = attr.ib(default=None) + name = attr.ib(default=None) + exe = attr.ib(default=None) + cmdline = attr.ib(default=None) + + own_tags = attr.ib(default=None) # Tags related to the process. + all_tags = attr.ib(default=None) # Own tags + ancestors' tags. + + cpu_user = attr.ib(default=None) + cpu_system = attr.ib(default=None) + cpu_percent = attr.ib(default=None) + + memory_resident = attr.ib(default=None) + memory_virtual = attr.ib(default=None) + memory_shared = attr.ib(default=None) + + disk_io_read_count = attr.ib(default=None) + disk_io_write_count = attr.ib(default=None) + disk_io_read_bytes = attr.ib(default=None) + disk_io_write_bytes = attr.ib(default=None) + + threads_num = attr.ib(default=None) + file_descriptors_num = attr.ib(default=None) + ctx_switches_voluntary = attr.ib(default=None) + ctx_switches_involuntary = attr.ib(default=None) + + +MONIT_PROCESS_PATTERN = re.compile( + r"^Process \'(?P[^']+)\' *\n" + r"(^ .*\n)*?" + r"^ pid +(?P\d+)\n", + re.MULTILINE +) +PROCESS_ATTRS = ( + 'pid', 'ppid', 'name', 'cwd', 'exe', 'cmdline', 'status', 'username', + 'cpu_times', 'cpu_percent', 'memory_info', 'io_counters', 'num_threads', + 'num_fds', 'num_ctx_switches', 'create_time' +) + + +async def list_resource(): + """ A coroutine which prepares a list of Process, + converts it to dictionaries. + + Returns: + A tuple (list of dict representation of Process, empty list of failures). + """ + processes = [attr.asdict(process) for process in (await list_processes())] + failed = [] + return processes, failed + + +async def list_processes(): + """ Function for building a list of Process. + + Returns: + A list of Processes. + """ + start_time = time.time() + + # Get dict with known processes (: ) + known_processes = await get_known_processes() + # Iterate through all processes and init majority of its info. + pid_to_process = { + process.pid: init_process_info(process, known_processes) + for process in psutil.process_iter(attrs=PROCESS_ATTRS, ad_value=None) + } + + def list_ancestors_tags(ppid): + """ A recursive function for collecting ancestors' tags. + + Args: + ppid: An int - parent PID. + Returns: + A list of ancestors' tags. + """ + parent_process = pid_to_process.get(ppid) + if not parent_process: + return [] + if parent_process.ppid == 0: + return parent_process.own_tags + return parent_process.own_tags + list_ancestors_tags(parent_process.ppid) + + # Set the rest of information about processes state + for process in pid_to_process.values(): + process.utc_timestamp = start_time + process.host = appscale_info.get_private_ip() + process.all_tags += list_ancestors_tags(process.ppid) + + processes = pid_to_process.values() + logger.info( + "Prepared info about {} processes in {:.3f}s." + .format(len(processes), time.time() - start_time) + ) + return processes + + +async def get_known_processes(): + """ Gets tags (e.g.: appscale, taskqueue, datastore, ...) + for appscale-related processes using all AppScale supervisors + (monit, ServiceManager and systemd). + + Returns: + A dict containing tags for known processes (: ). + """ + known_processes = {} + + # Detect processes supervised by monit + output, error = await helper.subprocess('monit status', timeout=5) + for match in MONIT_PROCESS_PATTERN.finditer(output): + monit_name = match.group('name') + pid = int(match.group('pid')) + service = find_service_by_monit_name(monit_name) + application_id = service.get_application_id_by_monit_name(monit_name) + tags = [APPSCALE_PROCESS_TAG, service.name, monit_name] + if application_id: + tags.append(application_id) + known_processes[pid] = tags + + # Detect processes supervised by AppScale ServiceManager + for server in ServiceManager.get_state(): + known_processes[server.process.pid] = [ + APPSCALE_PROCESS_TAG, server.type, server.service.slice + ] + + # Detect processes supervised by systemd + output, error = await helper.subprocess( + 'systemctl status solr.service | grep \'Main PID\' | awk \'{ print $3 }\'', + timeout=5 + ) + if output.isdigit() and output != '0': + solr_pid = int(output) + known_processes[solr_pid] = [ + APPSCALE_PROCESS_TAG, 'solr' + ] + return known_processes + + +def init_process_info(psutil_process, known_processes): + """ Initializes Process entity accoring to information in psutil process + and known appscale processes. + + Args: + psutil_process: An instance of psutil.Process. + known_processes: A dict - tags for known processes (: ). + Returns: + An instance of Process. + """ + process = Process() + + process_info = psutil_process.info + cpu_times = process_info['cpu_times'] + memory_info = process_info['memory_info'] + io_counters = process_info['io_counters'] + ctx_switches = process_info['num_ctx_switches'] + + # Fill process attributes: + process.pid = process_info['pid'] + process.ppid = process_info['ppid'] + process.create_time = process_info['create_time'] + process.status = process_info['status'] + process.username = process_info['username'] + process.cwd = process_info['cwd'] + process.name = process_info['name'] + process.exe = process_info['exe'] + process.cmdline = process_info['cmdline'] + process.own_tags = known_processes.get(psutil_process.pid, [process.name]) + process.all_tags = process.own_tags[::] + process.cpu_user = cpu_times.user + process.cpu_system = cpu_times.system + process.cpu_percent = process_info['cpu_percent'] + process.memory_resident = memory_info.rss + process.memory_virtual = memory_info.vms + process.memory_shared = memory_info.shared + process.disk_io_read_count = io_counters.read_count + process.disk_io_write_count = io_counters.write_count + process.disk_io_read_bytes = io_counters.read_bytes + process.disk_io_write_bytes = io_counters.write_bytes + process.threads_num = process_info['num_threads'] + process.file_descriptors_num = process_info['num_fds'] + process.ctx_switches_voluntary = ctx_switches.voluntary + process.ctx_switches_involuntary = ctx_switches.involuntary + return process diff --git a/Hermes/appscale/hermes/resources/resource_handlers.py b/Hermes/appscale/hermes/resources/resource_handlers.py new file mode 100644 index 0000000000..0db87a223c --- /dev/null +++ b/Hermes/appscale/hermes/resources/resource_handlers.py @@ -0,0 +1,257 @@ +""" Implementation of stats sources for cluster stats. """ +import asyncio +import inspect +import json +import logging + +import aiohttp +from aiohttp import web + +from appscale.common import appscale_info +from appscale.hermes import constants +from appscale.hermes.resources import process + +logger = logging.getLogger(__name__) + +# Do not run more than 100 concurrent requests to remote hermes servers. +max_concurrency = asyncio.Semaphore(100) + +# To avoid unnecessary JSON decoding and encoding, +# when listing a resource from a remote hermes, +# entities and failures are sent as two separate JSON lists connected by: +BODY_CONNECTOR = b'\n\n\xff\xff\xff\xff\n\n' + + +class HermesError(aiohttp.ClientError): + """ Represents an error while listing resource from local/remote Hermes. """ + def __init__(self, host, message): + self.host = host + self.message = message + super().__init__(message) + + +class ResourceHandler(object): + """ + A class implementing HTTP handlers for listing a monitored resource. + + It provides two public handler methods: + - list_local(request) # For listing local resource + - list_cluster(request) # For listing resource on many nodes + """ + def __init__(self, default_ips_getter, resource_name, local_source): + """ Initialised instance of ResourceHandler. + + Args: + default_ips_getter: A callable - should return a list of cluster nodes + to query resource from. + resource_name: A str - name of resource (should match name in route). + local_source: A callable (optionally async) - should return a tuple + of two lists: (entity_dicts, failure_strings). + """ + self.default_ips_getter = default_ips_getter + self.resource_name = resource_name + self.local_source = local_source + self.private_ip = appscale_info.get_private_ip() + + async def list_local(self, request): + """ A handler method to be assigned to route + 'GET /v2/'. + It accepts one optional query argument: 'return-as-2-json-objects=yes', + if it is passed, entities and failures are returned as two JSON objects + connected by BODY_CONNECTOR. + + Args: + request: An instance of aiohttp.web.Request. + Returns: + An instance of aiohttp.web.Response. + """ + entities, failures = await self._call_local_source() + + if request.query.get('return-as-2-json-objects', 'no') == 'yes': + # Return body used for joining entities without decoding JSON. + body = b'%(entities)b %(connector)b %(failures)b' % { + b'entities': json.dumps(entities).encode(), + b'connector': BODY_CONNECTOR, + b'failures': json.dumps(failures).encode() + } + return web.Response(body=body) + else: + # Return a regular JSON body. + body = b'{"entities": %(entities)b, "failures": %(failures)b}' % { + b'entities': json.dumps(entities).encode(), + b'failures': json.dumps(failures).encode() + } + return web.Response(body=body, content_type='application/json') + + async def list_cluster(self, request): + """ A handler method to be assigned to route + 'GET /v2//_cluster'. + It accepts optional JSON body containing list of locations to collect + resource entities from. e.g.: + {"locations": ["10.0.2.15", "10.0.2.16", "10.0.2.17"]} + + If body is missing, + locations returned by self.default_ips_getter() will be used. + + Args: + request: An instance of aiohttp.web.Request. + Returns: + An instance of aiohttp.web.Response. + """ + if request.has_body: + try: + locations = (await request.json())['locations'] + except (ValueError, TypeError, KeyError) as err: + reason = 'JSON body should contain "locations" attr ({}).'.format(err) + return web.Response(status=400, reason=reason) + else: + locations = self.default_ips_getter() + + joined_entities_json, failures = await self._list_resource(locations) + # As joined_entities_json is already valid JSON array, + # we're rendering final JSON body manually. + body = b'{"entities": %(entities)b, "failures": %(failures)b}' % { + b'entities': joined_entities_json, + b'failures': json.dumps(failures).encode() + } + return web.Response(body=body, content_type='application/json') + + async def _list_resource(self, hermes_locations): + """ Asynchronously collects full list of resource entities + from remote and local nodes. + + Args: + hermes_locations: a list of strings - hermes locations as [:]. + Returns: + A Future object which wraps a dict with node IP as key and + an instance of stats snapshot as value. + """ + entities_json_list = [] + failures = [] + + async def process_node_result(hermes_location): + """ Retrieves entities and failures from a particular hermes server + and appends results to local lists. + + Args: + hermes_location: A string - hermes locations as [:]. + """ + try: + entities_json, node_failures = await self._get_from_node(hermes_location) + if entities_json: + entities_json_list.append(entities_json) + if node_failures: + failures.extend(node_failures) + except HermesError as err: + failures.append({'host': err.host, 'message': err.message}) + + # Do multiple requests asynchronously and wait for all results + async with max_concurrency: + await asyncio.gather(*[ + process_node_result(location) for location in hermes_locations + ]) + + logger.info('Fetched {name} from {nodes} nodes.' + .format(name=self.resource_name, nodes=len(entities_json_list))) + + # Join raw JSON lists of entities to a single big list. + # This way we avoid extra JSON decoding and encoding. + joined_entities_json = b',\n\n'.join([ + raw_bytes.strip(b'[] ') for raw_bytes in entities_json_list + ]) + return b'[%b]' % joined_entities_json, failures + + async def _get_from_node(self, hermes_location): + """ Retrieves entities and failures from a particular hermes server + (local or remote). + + Args: + hermes_location: A string - hermes locations as [:]. + Returns: + A tuple (JSON encoded list of resource entities, list of failures). + """ + if hermes_location.split(':')[0] == self.private_ip: + # List local resource entities (and failures). + try: + entities, failures = await self._call_local_source() + entities_json = json.dumps(entities).encode() + return entities_json, failures + except Exception as err: + logger.error('Failed to prepare local stats: {err}'.format(err=err)) + raise HermesError(host=hermes_location, message=str(err)) + else: + # List remote resource entities (and failures). + entities_json, failures = await self._fetch_remote(hermes_location) + return entities_json, failures + + async def _fetch_remote(self, hermes_location): + """ Fetches resource entities from a single remote node. + + Args: + hermes_location: a string - remote hermes location as [:]. + Returns: + A tuple (JSON encoded list of resource entities, list of failures). + """ + # Security header + headers = {constants.SECRET_HEADER: appscale_info.get_secret()} + + # Determine host and port of remote hermes + if ':' in hermes_location: + host, port = hermes_location.split(':') + else: + host = hermes_location + port = constants.HERMES_PORT + + url = 'http://{host}:{port}/v2/{resource}'.format( + host=host, port=port, resource=self.resource_name + ) + try: + # Do HTTP call to remote hermes requesting body in two parts. + async with aiohttp.ClientSession() as session: + awaitable_get = session.get( + url, headers=headers, params={'return-as-2-json-objects': 'yes'}, + timeout=constants.REMOTE_REQUEST_TIMEOUT + ) + async with awaitable_get as resp: + if resp.status >= 400: + # Handler client error + err_message = 'HTTP {}: {}'.format(resp.status, resp.reason) + resp_text = await resp.text() + if resp_text: + err_message += '. {}'.format(resp_text) + logger.error("Failed to get {} ({})".format(url, err_message)) + raise HermesError(host=host, message=err_message) + + # Read body without decoding JSON + body = await resp.read() + entities_json, failures_json = body.split(BODY_CONNECTOR) + failures = json.loads(failures_json.decode()) + return entities_json, failures + + except aiohttp.ClientError as err: + # Handle server error + logger.error("Failed to get {} ({})".format(url, err)) + raise HermesError(host=host, message=str(err)) + + async def _call_local_source(self): + """ A wrapper method for retrieving local resource entities and failures. + It awaits awaitable if needed and add host information to failures. + + Returns: + A tuple (list of resource entities, list of failures). + """ + local = self.local_source() + if inspect.isawaitable(local): + entities, failures = await local + else: + entities, failures = local + failures = [{'host': self.private_ip, 'message': message} + for message in failures] + return entities, failures + + +processes = ResourceHandler( + default_ips_getter=appscale_info.get_all_ips, + resource_name='processes', + local_source=process.list_resource +) diff --git a/Hermes/setup.py b/Hermes/setup.py index 6ea60cccb8..3fae13b2bf 100644 --- a/Hermes/setup.py +++ b/Hermes/setup.py @@ -2,7 +2,7 @@ setup( name='appscale-hermes', - version='0.4.0', + version='0.5.0', description='AppScale module which provides statistics API.', author='AppScale Systems, Inc.', url='https://github.com/AppScale/appscale', @@ -12,7 +12,7 @@ install_requires=[ 'appscale-common', 'appscale-admin', - 'psutil==5.6.3', + 'psutil==5.6', 'attrs==19.1.0', 'mock==2.0.0', 'aiohttp==2.3.9' @@ -24,9 +24,12 @@ 'Programming Language :: Python :: 3.5' ], namespace_packages=['appscale'], - packages=['appscale', - 'appscale.hermes', - 'appscale.hermes.producers'], + packages=[ + 'appscale', + 'appscale.hermes', + 'appscale.hermes.producers', + 'appscale.hermes.resources' + ], entry_points={'console_scripts': [ 'appscale-hermes=appscale.hermes.hermes_server:main' ]} From eec65278d13faec2731be79a13e49cfbd1c33b05 Mon Sep 17 00:00:00 2001 From: Anton Leonov Date: Wed, 17 Apr 2019 20:26:08 +0300 Subject: [PATCH 3/9] Added v2 API routes to process resource --- Hermes/appscale/hermes/handlers.py | 14 +++++++++----- Hermes/appscale/hermes/hermes_server.py | 6 ++++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/Hermes/appscale/hermes/handlers.py b/Hermes/appscale/hermes/handlers.py index 510cff2d25..a310e90803 100644 --- a/Hermes/appscale/hermes/handlers.py +++ b/Hermes/appscale/hermes/handlers.py @@ -138,7 +138,8 @@ async def __call__(self, request): snapshot = await snapshot self.cached_snapshot = snapshot - return web.json_response(stats_to_dict(snapshot, include_lists)) + return web.json_response(stats_to_dict(snapshot, include_lists), + content_type='application/json') class ClusterStatsHandler: @@ -215,10 +216,13 @@ async def __call__(self, request): for node_ip, snapshot in new_snapshots_dict.items() } - return web.json_response({ - "stats": rendered_snapshots, - "failures": failures - }) + return web.json_response( + { + "stats": rendered_snapshots, + "failures": failures + }, + content_type='application/json' + ) def not_found(reason): diff --git a/Hermes/appscale/hermes/hermes_server.py b/Hermes/appscale/hermes/hermes_server.py index 25add17214..b1e85e3751 100644 --- a/Hermes/appscale/hermes/hermes_server.py +++ b/Hermes/appscale/hermes/hermes_server.py @@ -25,6 +25,7 @@ from appscale.hermes.producers.rabbitmq_stats import PushQueueStatsSource from appscale.hermes.producers.rabbitmq_stats import RabbitMQStatsSource from appscale.hermes.producers.taskqueue_stats import TaskqueueStatsSource +from appscale.hermes.resources.resource_handlers import processes logger = logging.getLogger(__name__) @@ -143,12 +144,17 @@ def main(): app = web.Application(middlewares=[verify_secret_middleware]) + # Add routes for old style structured statistics. route_items = [] route_items += get_local_stats_api_routes(is_lb, is_tq, is_db) route_items += get_cluster_stats_api_routes(is_master) for route, handler in route_items: app.router.add_get(route, handler) + # Add routes for new resources API. + app.router.add_get('/v2/processes', processes.list_local) + app.router.add_get('/v2/processes/_cluster', processes.list_cluster) + logger.info("Starting Hermes on port: {}.".format(args.port)) web.run_app(app, port=args.port, access_log=logger, access_log_format='%a "%r" %s %bB %Tfs "%{User-Agent}i"') From b76e9e1ba056443111e67d43c78af44d46c235f1 Mon Sep 17 00:00:00 2001 From: Anton Leonov Date: Fri, 19 Apr 2019 18:51:02 +0300 Subject: [PATCH 4/9] Fix Subprocess failure handling --- .../hermes/producers/cassandra_stats.py | 8 ++--- Hermes/appscale/hermes/resources/process.py | 33 ++++++++++++------- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/Hermes/appscale/hermes/producers/cassandra_stats.py b/Hermes/appscale/hermes/producers/cassandra_stats.py index 1292bac66b..d2411770ac 100644 --- a/Hermes/appscale/hermes/producers/cassandra_stats.py +++ b/Hermes/appscale/hermes/producers/cassandra_stats.py @@ -1,5 +1,4 @@ """ Fetches `nodetool status` info. """ -import asyncio import logging import re import time @@ -8,6 +7,7 @@ from appscale.common import appscale_info from appscale.hermes import helper +from appscale.hermes.constants import SubprocessError from appscale.hermes.converter import Meta, include_list_name # The endpoint used for retrieving queue stats. @@ -95,8 +95,8 @@ async def get_current(cls): An instance of CassandraStatsSnapshot. """ start = time.time() - output, error = helper.subprocess(NODETOOL_STATUS_COMMAND, - NODETOOL_STATUS_TIMEOUT) + output, error = await helper.subprocess(NODETOOL_STATUS_COMMAND, + NODETOOL_STATUS_TIMEOUT) known_db_nodes = set(appscale_info.get_db_ips()) nodes = [] shown_nodes = set() @@ -150,7 +150,7 @@ async def get_current(cls): shown_nodes.add(address) else: - raise NodetoolStatusError( + raise SubprocessError( '`{}` output does not contain expected header. Actual output:\n{}' .format(NODETOOL_STATUS_COMMAND, output) ) diff --git a/Hermes/appscale/hermes/resources/process.py b/Hermes/appscale/hermes/resources/process.py index 3ea495f96c..18ee1f8243 100644 --- a/Hermes/appscale/hermes/resources/process.py +++ b/Hermes/appscale/hermes/resources/process.py @@ -8,6 +8,7 @@ from appscale.admin.service_manager import ServiceManager from appscale.common import appscale_info from appscale.hermes import helper +from appscale.hermes.constants import SubprocessError from appscale.hermes.unified_service_names import ( find_service_by_monit_name @@ -139,8 +140,12 @@ async def get_known_processes(): """ known_processes = {} - # Detect processes supervised by monit - output, error = await helper.subprocess('monit status', timeout=5) + try: + # Detect processes supervised by monit + output, error = await helper.subprocess('monit status', timeout=5) + except SubprocessError as err: + logger.warning('Failed to run `monit status` ({})'.format(err)) + output = '' for match in MONIT_PROCESS_PATTERN.finditer(output): monit_name = match.group('name') pid = int(match.group('pid')) @@ -148,20 +153,24 @@ async def get_known_processes(): application_id = service.get_application_id_by_monit_name(monit_name) tags = [APPSCALE_PROCESS_TAG, service.name, monit_name] if application_id: - tags.append(application_id) + tags.append('app___{}'.format(application_id)) known_processes[pid] = tags # Detect processes supervised by AppScale ServiceManager for server in ServiceManager.get_state(): - known_processes[server.process.pid] = [ - APPSCALE_PROCESS_TAG, server.type, server.service.slice - ] - - # Detect processes supervised by systemd - output, error = await helper.subprocess( - 'systemctl status solr.service | grep \'Main PID\' | awk \'{ print $3 }\'', - timeout=5 - ) + known_processes[server.process.pid] = [APPSCALE_PROCESS_TAG, server.type] + + try: + # Detect processes supervised by systemd + output, error = await helper.subprocess( + 'systemctl status solr.service ' + '| grep \'Main PID\' | awk \'{ print $3 }\'', + timeout=5 + ) + except SubprocessError as err: + logger.warning('Failed to run `systemctl status solr.service` ({})' + .format(err)) + output = '' if output.isdigit() and output != '0': solr_pid = int(output) known_processes[solr_pid] = [ From e895b4021e885d9ff04a2f387d272c9467a558bc Mon Sep 17 00:00:00 2001 From: Anton Leonov Date: Thu, 25 Apr 2019 14:04:49 +0300 Subject: [PATCH 5/9] Estimate 1h diff for cumulative parameters --- Hermes/appscale/hermes/resources/process.py | 66 +++++++++++++++++++-- 1 file changed, 60 insertions(+), 6 deletions(-) diff --git a/Hermes/appscale/hermes/resources/process.py b/Hermes/appscale/hermes/resources/process.py index 18ee1f8243..0ab902a39c 100644 --- a/Hermes/appscale/hermes/resources/process.py +++ b/Hermes/appscale/hermes/resources/process.py @@ -25,9 +25,14 @@ class Process(object): A container for all parameters representing process state at a specific moment of time. """ + # A global dict containing previous processes state. + # It is used for computing *_1h_diff attributes. + PREVIOUS_STATE = {} + utc_timestamp = attr.ib(default=None) host = attr.ib(default=None) + long_pid = attr.ib(default=None) pid = attr.ib(default=None) ppid = attr.ib(default=None) create_time = attr.ib(default=None) @@ -44,6 +49,8 @@ class Process(object): cpu_user = attr.ib(default=None) cpu_system = attr.ib(default=None) cpu_percent = attr.ib(default=None) + cpu_user_1h_diff = attr.ib(default=None) + cpu_system_1h_diff = attr.ib(default=None) memory_resident = attr.ib(default=None) memory_virtual = attr.ib(default=None) @@ -53,11 +60,20 @@ class Process(object): disk_io_write_count = attr.ib(default=None) disk_io_read_bytes = attr.ib(default=None) disk_io_write_bytes = attr.ib(default=None) + disk_io_read_count_1h_diff = attr.ib(default=None) + disk_io_write_count_1h_diff = attr.ib(default=None) + disk_io_read_bytes_1h_diff = attr.ib(default=None) + disk_io_write_bytes_1h_diff = attr.ib(default=None) threads_num = attr.ib(default=None) file_descriptors_num = attr.ib(default=None) + ctx_switches_voluntary = attr.ib(default=None) ctx_switches_involuntary = attr.ib(default=None) + ctx_switches_voluntary_1h_diff = attr.ib(default=None) + ctx_switches_involuntary_1h_diff = attr.ib(default=None) + + sample_time_diff = attr.ib(default=None) MONIT_PROCESS_PATTERN = re.compile( @@ -112,21 +128,59 @@ def list_ancestors_tags(ppid): parent_process = pid_to_process.get(ppid) if not parent_process: return [] - if parent_process.ppid == 0: + if parent_process.ppid in [0, 1, 2]: # Skip common root processes return parent_process.own_tags return parent_process.own_tags + list_ancestors_tags(parent_process.ppid) + host = appscale_info.get_private_ip() + # Set the rest of information about processes state - for process in pid_to_process.values(): - process.utc_timestamp = start_time - process.host = appscale_info.get_private_ip() - process.all_tags += list_ancestors_tags(process.ppid) + for p in pid_to_process.values(): + # Set unique process identifier + p.long_pid = '{}_{}_{}'.format( + host, p.pid, int(p.create_time*1000) + ) + # and *_1h_diff attributes + prev = Process.PREVIOUS_STATE.get(p.pid) + if prev: + # Compute one hour difference coefficient + diff_coef = 60 * 60 / (start_time - prev.utc_timestamp) + # Set diff attributes + p.cpu_user_1h_diff = ( + (p.cpu_user - prev.cpu_user) * diff_coef + ) + p.cpu_system_1h_diff = ( + (p.cpu_system - prev.cpu_system) * diff_coef + ) + p.disk_io_read_count_1h_diff = ( + (p.disk_io_read_count - prev.disk_io_read_count) * diff_coef + ) + p.disk_io_write_count_1h_diff = ( + (p.disk_io_write_count - prev.disk_io_write_count) * diff_coef + ) + p.disk_io_read_bytes_1h_diff = ( + (p.disk_io_read_bytes - prev.disk_io_read_bytes) * diff_coef + ) + p.disk_io_write_bytes_1h_diff = ( + (p.disk_io_write_bytes - prev.disk_io_write_bytes) * diff_coef + ) + p.ctx_switches_voluntary_1h_diff = ( + (p.ctx_switches_voluntary - prev.ctx_switches_voluntary) * diff_coef + ) + p.ctx_switches_involuntary_1h_diff = ( + (p.ctx_switches_involuntary - prev.ctx_switches_involuntary) * diff_coef + ) + + p.utc_timestamp = start_time + p.host = host + p.all_tags += list_ancestors_tags(p.ppid) processes = pid_to_process.values() logger.info( "Prepared info about {} processes in {:.3f}s." .format(len(processes), time.time() - start_time) ) + Process.PREVIOUS_STATE = pid_to_process return processes @@ -197,7 +251,7 @@ def init_process_info(psutil_process, known_processes): io_counters = process_info['io_counters'] ctx_switches = process_info['num_ctx_switches'] - # Fill process attributes: + # Fill psutil process attributes: process.pid = process_info['pid'] process.ppid = process_info['ppid'] process.create_time = process_info['create_time'] From 9e2d888bea666a27a02dee9721084a03def604b4 Mon Sep 17 00:00:00 2001 From: Anton Leonov Date: Thu, 25 Apr 2019 14:05:11 +0300 Subject: [PATCH 6/9] Update unit tests --- Hermes/tests/producers/__init__.py | 0 .../test-data/haproxy-stats-v1.4.csv | 0 .../test-data/haproxy-stats-v1.5.csv | 0 .../{ => producers}/test-data/node-stats.json | 0 .../test-data/processes-stats.json | 0 .../test-data/proxies-stats.json | 0 .../test-data/taskqueue-stats.json | 0 .../tests/{ => producers}/test_cassandra.py | 0 .../{ => producers}/test_cluster_stats.py | 0 Hermes/tests/{ => producers}/test_node.py | 0 Hermes/tests/{ => producers}/test_process.py | 0 Hermes/tests/{ => producers}/test_proxy.py | 0 .../tests/{ => producers}/test_taskqueue.py | 0 Hermes/tests/resources/__init__.py | 0 Hermes/tests/resources/test_process.py | 198 ++++++++++++++++++ .../tests/resources/test_resource_handler.py | 0 16 files changed, 198 insertions(+) create mode 100644 Hermes/tests/producers/__init__.py rename Hermes/tests/{ => producers}/test-data/haproxy-stats-v1.4.csv (100%) rename Hermes/tests/{ => producers}/test-data/haproxy-stats-v1.5.csv (100%) rename Hermes/tests/{ => producers}/test-data/node-stats.json (100%) rename Hermes/tests/{ => producers}/test-data/processes-stats.json (100%) rename Hermes/tests/{ => producers}/test-data/proxies-stats.json (100%) rename Hermes/tests/{ => producers}/test-data/taskqueue-stats.json (100%) rename Hermes/tests/{ => producers}/test_cassandra.py (100%) rename Hermes/tests/{ => producers}/test_cluster_stats.py (100%) rename Hermes/tests/{ => producers}/test_node.py (100%) rename Hermes/tests/{ => producers}/test_process.py (100%) rename Hermes/tests/{ => producers}/test_proxy.py (100%) rename Hermes/tests/{ => producers}/test_taskqueue.py (100%) create mode 100644 Hermes/tests/resources/__init__.py create mode 100644 Hermes/tests/resources/test_process.py create mode 100644 Hermes/tests/resources/test_resource_handler.py diff --git a/Hermes/tests/producers/__init__.py b/Hermes/tests/producers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/Hermes/tests/test-data/haproxy-stats-v1.4.csv b/Hermes/tests/producers/test-data/haproxy-stats-v1.4.csv similarity index 100% rename from Hermes/tests/test-data/haproxy-stats-v1.4.csv rename to Hermes/tests/producers/test-data/haproxy-stats-v1.4.csv diff --git a/Hermes/tests/test-data/haproxy-stats-v1.5.csv b/Hermes/tests/producers/test-data/haproxy-stats-v1.5.csv similarity index 100% rename from Hermes/tests/test-data/haproxy-stats-v1.5.csv rename to Hermes/tests/producers/test-data/haproxy-stats-v1.5.csv diff --git a/Hermes/tests/test-data/node-stats.json b/Hermes/tests/producers/test-data/node-stats.json similarity index 100% rename from Hermes/tests/test-data/node-stats.json rename to Hermes/tests/producers/test-data/node-stats.json diff --git a/Hermes/tests/test-data/processes-stats.json b/Hermes/tests/producers/test-data/processes-stats.json similarity index 100% rename from Hermes/tests/test-data/processes-stats.json rename to Hermes/tests/producers/test-data/processes-stats.json diff --git a/Hermes/tests/test-data/proxies-stats.json b/Hermes/tests/producers/test-data/proxies-stats.json similarity index 100% rename from Hermes/tests/test-data/proxies-stats.json rename to Hermes/tests/producers/test-data/proxies-stats.json diff --git a/Hermes/tests/test-data/taskqueue-stats.json b/Hermes/tests/producers/test-data/taskqueue-stats.json similarity index 100% rename from Hermes/tests/test-data/taskqueue-stats.json rename to Hermes/tests/producers/test-data/taskqueue-stats.json diff --git a/Hermes/tests/test_cassandra.py b/Hermes/tests/producers/test_cassandra.py similarity index 100% rename from Hermes/tests/test_cassandra.py rename to Hermes/tests/producers/test_cassandra.py diff --git a/Hermes/tests/test_cluster_stats.py b/Hermes/tests/producers/test_cluster_stats.py similarity index 100% rename from Hermes/tests/test_cluster_stats.py rename to Hermes/tests/producers/test_cluster_stats.py diff --git a/Hermes/tests/test_node.py b/Hermes/tests/producers/test_node.py similarity index 100% rename from Hermes/tests/test_node.py rename to Hermes/tests/producers/test_node.py diff --git a/Hermes/tests/test_process.py b/Hermes/tests/producers/test_process.py similarity index 100% rename from Hermes/tests/test_process.py rename to Hermes/tests/producers/test_process.py diff --git a/Hermes/tests/test_proxy.py b/Hermes/tests/producers/test_proxy.py similarity index 100% rename from Hermes/tests/test_proxy.py rename to Hermes/tests/producers/test_proxy.py diff --git a/Hermes/tests/test_taskqueue.py b/Hermes/tests/producers/test_taskqueue.py similarity index 100% rename from Hermes/tests/test_taskqueue.py rename to Hermes/tests/producers/test_taskqueue.py diff --git a/Hermes/tests/resources/__init__.py b/Hermes/tests/resources/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/Hermes/tests/resources/test_process.py b/Hermes/tests/resources/test_process.py new file mode 100644 index 0000000000..d1e797e04f --- /dev/null +++ b/Hermes/tests/resources/test_process.py @@ -0,0 +1,198 @@ +import asyncio + +import psutil +import pytest +from mock import patch, MagicMock + +from appscale.hermes.resources import process + + +def future(value=None): + future_obj = asyncio.Future() + future_obj.set_result(value) + return future_obj + +MONIT_STATUS = b""" +The Monit daemon 5.6 uptime: 20h 22m + +Process 'haproxy' + status Running + monitoring status Monitored + pid 8466 + parent pid 1 + uptime 20h 21m + children 0 + memory kilobytes 8140 + memory kilobytes total 8140 + memory percent 0.2% + memory percent total 0.2% + cpu percent 0.0% + cpu percent total 0.0% + data collected Wed, 19 Apr 2017 14:15:29 + +File 'groomer_file_check' + status Accessible + monitoring status Monitored + permission 644 + +Process 'appmanagerserver' + status Not monitored + monitoring status Not monitored + data collected Wed, 19 Apr 2017 13:49:44 + +Process 'app___my-25app-20003' + status Running + monitoring status Monitored + pid 5045 + parent pid 5044 + uptime 21h 41m + children 1 + memory kilobytes 65508 + memory kilobytes total 132940 + memory percent 1.7% + memory percent total 3.5% + cpu percent 0.0% + cpu percent total 0.0% + port response time 0.000s to 10.10.9.111:20000 [DEFAULT via TCP] + data collected Wed, 19 Apr 2017 14:18:33 + +System 'appscale-image0' + status Running + monitoring status Monitored + load average [0.23] [0.40] [0.46] + cpu 2.8%us 2.4%sy 1.3%wa + memory usage 2653952 kB [70.7%] + swap usage 0 kB [0.0%] + data collected Wed, 19 Apr 2017 14:15:29 +""" + +# `systemctl status solr.service | grep 'Main PID' | awk '{ print $3 }'` +SYSTEMCTL_STATUS = b'28783' + + +@pytest.mark.asyncio +async def test_get_known_processes(): + # Mock `monit status` output + monit_mock = MagicMock(returncode=0) + stdout = MONIT_STATUS + stderr = b'' + monit_mock.communicate.return_value = future((stdout, stderr)) + # Mock `systemctl status solr.service` output + systemctl_mock = MagicMock(returncode=0) + stdout = SYSTEMCTL_STATUS + stderr = b'' + systemctl_mock.communicate.return_value = future((stdout, stderr)) + # Mock ServiceManager.get_state result + state_mock = [ + MagicMock(process=MagicMock(pid=9850), type='datastore'), + MagicMock(process=MagicMock(pid=9851), type='datastore'), + MagicMock(process=MagicMock(pid=9852), type='datastore'), + MagicMock(process=MagicMock(pid=3589), type='search'), + MagicMock(process=MagicMock(pid=4589), type='search'), + ] + + def fake_subprocess_shell(command, **kwargs): + if command.startswith('monit'): + return future(monit_mock) + if command.startswith('systemctl'): + return future(systemctl_mock) + assert False, 'Unexpected command "{}"'.format(command) + + subprocess_patcher = patch( + 'asyncio.create_subprocess_shell', + side_effect=fake_subprocess_shell + ) + service_manager_patcher = patch( + 'appscale.admin.service_manager.ServiceManager.get_state', + return_value=state_mock + ) + + # ^^^ ALL INPUTS ARE SPECIFIED (or mocked) ^^^ + with subprocess_patcher: + with service_manager_patcher: + # Calling method under test + known_processes = await process.get_known_processes() + + # ASSERTING EXPECTATIONS + assert known_processes == { + 8466: ['appscale', 'haproxy', 'haproxy'], + 5045: ['appscale', 'application', 'app___my-25app-20003', 'app___my-25app'], + 9850: ['appscale', 'datastore'], + 9851: ['appscale', 'datastore'], + 9852: ['appscale', 'datastore'], + 3589: ['appscale', 'search'], + 4589: ['appscale', 'search'], + 28783: ['appscale', 'solr'], + } + + +@pytest.mark.asyncio +async def test_init_process_info(): + # Get info about current process + psutil_process = psutil.Process() + proc_info = psutil_process.as_dict(process.PROCESS_ATTRS) + psutil_process.info = proc_info + + my_pid = psutil_process.pid + # Call function under test + process_ = process.init_process_info(psutil_process, {my_pid: ['test-tag']}) + + # Check if attributes were assigned properly + assert process_.pid == proc_info['pid'] + assert process_.ppid == proc_info['ppid'] + assert process_.create_time == proc_info['create_time'] + assert process_.status == proc_info['status'] + assert process_.username == proc_info['username'] + assert process_.cwd == proc_info['cwd'] + assert process_.name == proc_info['name'] + assert process_.exe == proc_info['exe'] + assert process_.cmdline == proc_info['cmdline'] + assert process_.own_tags == ['test-tag'] + assert process_.all_tags == ['test-tag'] + assert process_.cpu_user == proc_info['cpu_times'].user + assert process_.cpu_system == proc_info['cpu_times'].system + assert process_.cpu_user_1h_diff is None + assert process_.cpu_system_1h_diff is None + assert process_.cpu_percent == proc_info['cpu_percent'] + assert process_.memory_resident == proc_info['memory_info'].rss + assert process_.memory_virtual == proc_info['memory_info'].vms + assert process_.memory_shared == proc_info['memory_info'].shared + assert process_.disk_io_read_count == proc_info['io_counters'].read_count + assert process_.disk_io_write_count == proc_info['io_counters'].write_count + assert process_.disk_io_read_bytes == proc_info['io_counters'].read_bytes + assert process_.disk_io_write_bytes == proc_info['io_counters'].write_bytes + assert process_.disk_io_read_count_1h_diff is None + assert process_.disk_io_write_count_1h_diff is None + assert process_.disk_io_read_bytes_1h_diff is None + assert process_.disk_io_write_bytes_1h_diff is None + assert process_.threads_num == proc_info['num_threads'] + assert process_.file_descriptors_num == proc_info['num_fds'] + assert ( + process_.ctx_switches_voluntary + == proc_info['num_ctx_switches'].voluntary + ) + assert ( + process_.ctx_switches_involuntary + == proc_info['num_ctx_switches'].involuntary + ) + assert process_.ctx_switches_voluntary_1h_diff is None + assert process_.ctx_switches_involuntary_1h_diff is None + + +@pytest.mark.asyncio +async def test_list_processes(): + # Mock `monit status` with empty output + monit_mock = MagicMock(returncode=0) + monit_mock.communicate.return_value = future((b'', b'')) + # Mock `systemctl status solr.service` with empty output + systemctl_mock = MagicMock(returncode=0) + systemctl_mock.communicate.return_value = future((b'', b'')) + # Mock ServiceManager.get_state result + state_mock = [ + MagicMock(process=MagicMock(pid=9850), type='datastore'), + MagicMock(process=MagicMock(pid=9851), type='datastore'), + ] + # Mock psutil.process_iter + fake_processes = [ + MagicMock() + ] diff --git a/Hermes/tests/resources/test_resource_handler.py b/Hermes/tests/resources/test_resource_handler.py new file mode 100644 index 0000000000..e69de29bb2 From cb500044650ef999b16ccbc3c35863b2e8eb2447 Mon Sep 17 00:00:00 2001 From: Anton Leonov Date: Thu, 23 May 2019 18:10:46 +0300 Subject: [PATCH 7/9] Address merge issue related to moved file --- Hermes/tests/{ => producers}/test-data/haproxy-stats-mapping.csv | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename Hermes/tests/{ => producers}/test-data/haproxy-stats-mapping.csv (100%) diff --git a/Hermes/tests/test-data/haproxy-stats-mapping.csv b/Hermes/tests/producers/test-data/haproxy-stats-mapping.csv similarity index 100% rename from Hermes/tests/test-data/haproxy-stats-mapping.csv rename to Hermes/tests/producers/test-data/haproxy-stats-mapping.csv From 53d8b9292235da38fb9054e03f8d33487410069f Mon Sep 17 00:00:00 2001 From: Anton Leonov Date: Wed, 2 Oct 2019 16:00:09 +0300 Subject: [PATCH 8/9] Track previous processes state using long_pid --- Hermes/appscale/hermes/resources/process.py | 48 ++++++++++----------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/Hermes/appscale/hermes/resources/process.py b/Hermes/appscale/hermes/resources/process.py index 0ab902a39c..485851ea8d 100644 --- a/Hermes/appscale/hermes/resources/process.py +++ b/Hermes/appscale/hermes/resources/process.py @@ -108,12 +108,14 @@ async def list_processes(): A list of Processes. """ start_time = time.time() + host = appscale_info.get_private_ip() # Get dict with known processes (: ) known_processes = await get_known_processes() # Iterate through all processes and init majority of its info. pid_to_process = { - process.pid: init_process_info(process, known_processes) + '{}_{}_{}'.format(host, process.pid, int(process.create_time() * 1000)): + init_process_info(process, known_processes) for process in psutil.process_iter(attrs=PROCESS_ATTRS, ad_value=None) } @@ -132,16 +134,12 @@ def list_ancestors_tags(ppid): return parent_process.own_tags return parent_process.own_tags + list_ancestors_tags(parent_process.ppid) - host = appscale_info.get_private_ip() - # Set the rest of information about processes state - for p in pid_to_process.values(): + for long_pid, p in pid_to_process.items(): # Set unique process identifier - p.long_pid = '{}_{}_{}'.format( - host, p.pid, int(p.create_time*1000) - ) + p.long_pid = long_pid # and *_1h_diff attributes - prev = Process.PREVIOUS_STATE.get(p.pid) + prev = Process.PREVIOUS_STATE.get(p.long_pid) if prev: # Compute one hour difference coefficient diff_coef = 60 * 60 / (start_time - prev.utc_timestamp) @@ -152,18 +150,19 @@ def list_ancestors_tags(ppid): p.cpu_system_1h_diff = ( (p.cpu_system - prev.cpu_system) * diff_coef ) - p.disk_io_read_count_1h_diff = ( - (p.disk_io_read_count - prev.disk_io_read_count) * diff_coef - ) - p.disk_io_write_count_1h_diff = ( - (p.disk_io_write_count - prev.disk_io_write_count) * diff_coef - ) - p.disk_io_read_bytes_1h_diff = ( - (p.disk_io_read_bytes - prev.disk_io_read_bytes) * diff_coef - ) - p.disk_io_write_bytes_1h_diff = ( - (p.disk_io_write_bytes - prev.disk_io_write_bytes) * diff_coef - ) + if p.disk_io_read_count is not None: + p.disk_io_read_count_1h_diff = ( + (p.disk_io_read_count - prev.disk_io_read_count) * diff_coef + ) + p.disk_io_write_count_1h_diff = ( + (p.disk_io_write_count - prev.disk_io_write_count) * diff_coef + ) + p.disk_io_read_bytes_1h_diff = ( + (p.disk_io_read_bytes - prev.disk_io_read_bytes) * diff_coef + ) + p.disk_io_write_bytes_1h_diff = ( + (p.disk_io_write_bytes - prev.disk_io_write_bytes) * diff_coef + ) p.ctx_switches_voluntary_1h_diff = ( (p.ctx_switches_voluntary - prev.ctx_switches_voluntary) * diff_coef ) @@ -269,10 +268,11 @@ def init_process_info(psutil_process, known_processes): process.memory_resident = memory_info.rss process.memory_virtual = memory_info.vms process.memory_shared = memory_info.shared - process.disk_io_read_count = io_counters.read_count - process.disk_io_write_count = io_counters.write_count - process.disk_io_read_bytes = io_counters.read_bytes - process.disk_io_write_bytes = io_counters.write_bytes + if io_counters: + process.disk_io_read_count = io_counters.read_count + process.disk_io_write_count = io_counters.write_count + process.disk_io_read_bytes = io_counters.read_bytes + process.disk_io_write_bytes = io_counters.write_bytes process.threads_num = process_info['num_threads'] process.file_descriptors_num = process_info['num_fds'] process.ctx_switches_voluntary = ctx_switches.voluntary From 377a7a3d2221f586af83047e87922d45e68ca206 Mon Sep 17 00:00:00 2001 From: Anton Leonov Date: Thu, 3 Oct 2019 14:37:11 +0300 Subject: [PATCH 9/9] Improve systemd services recognition --- AppController/djinn.rb | 4 +- .../appscale/hermes/producers/proxy_stats.py | 10 +- Hermes/appscale/hermes/resources/process.py | 178 ++++++++++------ Hermes/tests/resources/test_process.py | 196 ++++++++++-------- .../{solr.service => appscale-solr.service} | 2 +- .../solr-management/ensure_solr_running.sh | 18 +- 6 files changed, 242 insertions(+), 166 deletions(-) rename SearchService2/solr-management/{solr.service => appscale-solr.service} (94%) diff --git a/AppController/djinn.rb b/AppController/djinn.rb index d47099023c..bb191413bb 100644 --- a/AppController/djinn.rb +++ b/AppController/djinn.rb @@ -3431,8 +3431,8 @@ def start_search2_role def stop_search2_role # Stop Solr Djinn.log_debug('Stopping SOLR on this node.') - Djinn.log_run('systemctl stop solr') - Djinn.log_run('systemctl disable solr') + Djinn.log_run('systemctl stop appscale-solr') + Djinn.log_run('systemctl disable appscale-solr') Djinn.log_debug('Done stopping SOLR.') end diff --git a/Hermes/appscale/hermes/producers/proxy_stats.py b/Hermes/appscale/hermes/producers/proxy_stats.py index 7a6764e22d..eb4b9a63b0 100644 --- a/Hermes/appscale/hermes/producers/proxy_stats.py +++ b/Hermes/appscale/hermes/producers/proxy_stats.py @@ -752,9 +752,13 @@ async def get_current(): proxy_stats_list = [] for haproxy_process_name, info in HAPROXY_PROCESSES.items(): logger.debug("Processing {} haproxy stats".format(haproxy_process_name)) - proxy_stats_list += await get_stats_from_one_haproxy( - info['socket'], info['configs'], net_connections - ) + try: + proxy_stats_list += await get_stats_from_one_haproxy( + info['socket'], info['configs'], net_connections + ) + except IOError as error: + logger.warning("Failed to read {} haproxy stats ({})" + .format(haproxy_process_name, error)) stats = ProxiesStatsSnapshot( utc_timestamp=time.mktime(datetime.now().timetuple()), diff --git a/Hermes/appscale/hermes/resources/process.py b/Hermes/appscale/hermes/resources/process.py index 485851ea8d..553c7bd1fc 100644 --- a/Hermes/appscale/hermes/resources/process.py +++ b/Hermes/appscale/hermes/resources/process.py @@ -1,22 +1,25 @@ import logging -import re import time +import re import attr import psutil -from appscale.admin.service_manager import ServiceManager from appscale.common import appscale_info from appscale.hermes import helper from appscale.hermes.constants import SubprocessError -from appscale.hermes.unified_service_names import ( - find_service_by_monit_name -) logger = logging.getLogger(__name__) APPSCALE_PROCESS_TAG = 'appscale' +SERVICE_NAME_PATTERN = re.compile( + r'(appscale-)?(?P[^@]+)(@(?P[^.]+))?.service' +) +PID_SLICE_LINE_PATTERN = re.compile( + r'(?P\d+) /sys/fs/cgroup/systemd/appscale\.slice/appscale-' + r'(?P[^\.]+)\.slice/' +) @attr.s(cmp=False, hash=False, slots=True) @@ -76,12 +79,6 @@ class Process(object): sample_time_diff = attr.ib(default=None) -MONIT_PROCESS_PATTERN = re.compile( - r"^Process \'(?P[^']+)\' *\n" - r"(^ .*\n)*?" - r"^ pid +(?P\d+)\n", - re.MULTILINE -) PROCESS_ATTRS = ( 'pid', 'ppid', 'name', 'cwd', 'exe', 'cmdline', 'status', 'username', 'cpu_times', 'cpu_percent', 'memory_info', 'io_counters', 'num_threads', @@ -183,55 +180,6 @@ def list_ancestors_tags(ppid): return processes -async def get_known_processes(): - """ Gets tags (e.g.: appscale, taskqueue, datastore, ...) - for appscale-related processes using all AppScale supervisors - (monit, ServiceManager and systemd). - - Returns: - A dict containing tags for known processes (: ). - """ - known_processes = {} - - try: - # Detect processes supervised by monit - output, error = await helper.subprocess('monit status', timeout=5) - except SubprocessError as err: - logger.warning('Failed to run `monit status` ({})'.format(err)) - output = '' - for match in MONIT_PROCESS_PATTERN.finditer(output): - monit_name = match.group('name') - pid = int(match.group('pid')) - service = find_service_by_monit_name(monit_name) - application_id = service.get_application_id_by_monit_name(monit_name) - tags = [APPSCALE_PROCESS_TAG, service.name, monit_name] - if application_id: - tags.append('app___{}'.format(application_id)) - known_processes[pid] = tags - - # Detect processes supervised by AppScale ServiceManager - for server in ServiceManager.get_state(): - known_processes[server.process.pid] = [APPSCALE_PROCESS_TAG, server.type] - - try: - # Detect processes supervised by systemd - output, error = await helper.subprocess( - 'systemctl status solr.service ' - '| grep \'Main PID\' | awk \'{ print $3 }\'', - timeout=5 - ) - except SubprocessError as err: - logger.warning('Failed to run `systemctl status solr.service` ({})' - .format(err)) - output = '' - if output.isdigit() and output != '0': - solr_pid = int(output) - known_processes[solr_pid] = [ - APPSCALE_PROCESS_TAG, 'solr' - ] - return known_processes - - def init_process_info(psutil_process, known_processes): """ Initializes Process entity accoring to information in psutil process and known appscale processes. @@ -278,3 +226,113 @@ def init_process_info(psutil_process, known_processes): process.ctx_switches_voluntary = ctx_switches.voluntary process.ctx_switches_involuntary = ctx_switches.involuntary return process + + +async def get_known_processes(): + """ Gets tags (e.g.: appscale, taskqueue, datastore, ...) + for appscale-related processes using systemd-provided information. + + Returns: + A dict containing tags for known processes (: ). + """ + service_processes = await identify_appscale_service_processes() + slice_processes = await identify_appscale_slice_processes() + known_processes = service_processes + known_processes.update(slice_processes) + return known_processes + + +async def identify_appscale_service_processes(): + """ Gets tags (e.g.: appscale, taskqueue, datastore, ...) + for appscale-related processes which are run as service. + + Returns: + A dict containing tags for known processes (: ). + """ + known_processes = {} + for service in await identify_appscale_services(): + try: + # Get Main PID for each service + show_cmd = 'systemctl show --property MainPID --value {}'.format(service) + output, error = await helper.subprocess(show_cmd, timeout=5) + except SubprocessError as err: + logger.warning('Failed to get Main PID for {} ({})'.format(service, err)) + continue + output = output.strip(' \t\n') + if output.isdigit() and output != '0': + pid = int(output) + process_tags = [APPSCALE_PROCESS_TAG] + # Sample service names are: + # appscale-instance-run@testapp_default_v1_1570022208920-20000.service + # appscale-memcached.service + match = SERVICE_NAME_PATTERN.match(service) + if not match: + logger.warning('Could not parse service name "{}"'.format(service)) + continue + before_at = match.group('before_at') + after_at = match.group('after_at') + process_tags.append(before_at) + if after_at: + for part in after_at.split('_'): + process_tags.append('_{}'.format(part)) + known_processes[pid] = process_tags + return known_processes + + +async def identify_appscale_services(): + """ Lists all appscale-related services. + + Returns: + A list of service names. + """ + dependencies_cmd = ('cat /lib/systemd/system/appscale-*.target ' + '| grep -E "^After=.*\.service$" | cut -d "=" -f 2') + try: + # Detect appscale dependency services + output, error = await helper.subprocess(dependencies_cmd, timeout=5) + services = output.strip().split('\n') + except SubprocessError as err: + logger.warning('Failed to detect appscale dependency services ' + 'by running `{}` ({})'.format(dependencies_cmd, err)) + services = [] + + services_cmd = ('systemctl --no-legend list-units "appscale-*.service" ' + '| cut -d " " -f 1') + try: + # Detect appscale own services + output, error = await helper.subprocess(services_cmd, timeout=5) + services += output.strip().split('\n') + except SubprocessError as err: + logger.warning('Failed to detect appscale own services ' + 'by running `{}` ({})'.format(services_cmd, err)) + return services + + +async def identify_appscale_slice_processes(): + """ Gets tags (e.g.: appscale, taskqueue, datastore, ...) + for processes running in appscale-slice. + + Returns: + A dict containing tags for known processes (: ). + """ + slice_processes = ( + 'for slice in /sys/fs/cgroup/systemd/appscale.slice/appscale-*.slice/;' + ' do sed -e "s|\$| ${slice}|" ${slice}/cgroup.procs ; done' + ) + try: + # Detect appscale own services + output, error = await helper.subprocess(slice_processes, timeout=5) + except SubprocessError as err: + logger.warning('Failed to detect appscale-slice processes ' + 'by running {} ({})'.format(slice_processes, err)) + return {} + detected_pids = {} + lines = output.strip(' \t\n').split('\n') + for line in lines: + match = PID_SLICE_LINE_PATTERN.match(line) + if not match: + logger.warning('Could not parse PID-slice line "{}"'.format(line)) + continue + pid = int(match.group('pid')) + detected_pids[pid] = [APPSCALE_PROCESS_TAG, match.group('name')] + return detected_pids diff --git a/Hermes/tests/resources/test_process.py b/Hermes/tests/resources/test_process.py index d1e797e04f..2737fa9fbd 100644 --- a/Hermes/tests/resources/test_process.py +++ b/Hermes/tests/resources/test_process.py @@ -12,117 +12,131 @@ def future(value=None): future_obj.set_result(value) return future_obj -MONIT_STATUS = b""" -The Monit daemon 5.6 uptime: 20h 22m - -Process 'haproxy' - status Running - monitoring status Monitored - pid 8466 - parent pid 1 - uptime 20h 21m - children 0 - memory kilobytes 8140 - memory kilobytes total 8140 - memory percent 0.2% - memory percent total 0.2% - cpu percent 0.0% - cpu percent total 0.0% - data collected Wed, 19 Apr 2017 14:15:29 - -File 'groomer_file_check' - status Accessible - monitoring status Monitored - permission 644 - -Process 'appmanagerserver' - status Not monitored - monitoring status Not monitored - data collected Wed, 19 Apr 2017 13:49:44 - -Process 'app___my-25app-20003' - status Running - monitoring status Monitored - pid 5045 - parent pid 5044 - uptime 21h 41m - children 1 - memory kilobytes 65508 - memory kilobytes total 132940 - memory percent 1.7% - memory percent total 3.5% - cpu percent 0.0% - cpu percent total 0.0% - port response time 0.000s to 10.10.9.111:20000 [DEFAULT via TCP] - data collected Wed, 19 Apr 2017 14:18:33 - -System 'appscale-image0' - status Running - monitoring status Monitored - load average [0.23] [0.40] [0.46] - cpu 2.8%us 2.4%sy 1.3%wa - memory usage 2653952 kB [70.7%] - swap usage 0 kB [0.0%] - data collected Wed, 19 Apr 2017 14:15:29 + +# cat /lib/systemd/system/appscale-*.target +# | grep -E "^After=.*\.service$" | cut -d "=" -f 2 +APPSCALE_TARGETS = b""" +ejabberd.service +nginx.service +rabbitmq-server.service +zookeeper.service +""" + +# systemctl --no-legend list-units "appscale-*.service" | cut -d " " -f 1 +APPSCALE_SERVICES = b""" +appscale-blobstore.service +appscale-cassandra.service +appscale-controller.service +appscale-groomer.service +appscale-haproxy@app.service +appscale-haproxy@service.service +appscale-hermes.service +appscale-infrastructure@basic.service +appscale-instance-manager.service +appscale-instance-run@testapp_mod1_v1_1570022208920-20000.service +appscale-memcached.service +appscale-transaction-groomer.service +appscale-uaserver.service """ -# `systemctl status solr.service | grep 'Main PID' | awk '{ print $3 }'` -SYSTEMCTL_STATUS = b'28783' +# systemctl show --property MainPID --value +SERVICE_PID_MAP = { + 'ejabberd.service': b'9021', + 'nginx.service': b'9022', + 'rabbitmq-server.service': b'9023', + 'zookeeper.service': b'9024', + 'appscale-blobstore.service': b'10025', + 'appscale-cassandra.service': b'10026', + 'appscale-controller.service': b'10027', + 'appscale-groomer.service': b'10028', + 'appscale-haproxy@app.service': b'10029', + 'appscale-haproxy@service.service': b'10030', + 'appscale-hermes.service': b'10031', + 'appscale-infrastructure@basic.service': b'10032', + 'appscale-instance-manager.service': b'10033', + 'appscale-instance-run@testapp_mod1_v1_1570022208920-20000.service': b'10034', + 'appscale-memcached.service': b'10035', + 'appscale-transaction-groomer.service': b'10036', + 'appscale-uaserver.service': b'10037', +} + +# for slice in /sys/fs/cgroup/systemd/appscale.slice/appscale-*.slice/; do +# sed -e "s|\$| ${slice}|" ${slice}/cgroup.procs +# done +APPSCALE_SLICE_PIDS = b""" +11038 /sys/fs/cgroup/systemd/appscale.slice/appscale-datastore.slice/ +11039 /sys/fs/cgroup/systemd/appscale.slice/appscale-datastore.slice/ +11040 /sys/fs/cgroup/systemd/appscale.slice/appscale-search.slice/ +""" @pytest.mark.asyncio async def test_get_known_processes(): - # Mock `monit status` output - monit_mock = MagicMock(returncode=0) - stdout = MONIT_STATUS - stderr = b'' - monit_mock.communicate.return_value = future((stdout, stderr)) - # Mock `systemctl status solr.service` output - systemctl_mock = MagicMock(returncode=0) - stdout = SYSTEMCTL_STATUS - stderr = b'' - systemctl_mock.communicate.return_value = future((stdout, stderr)) - # Mock ServiceManager.get_state result - state_mock = [ - MagicMock(process=MagicMock(pid=9850), type='datastore'), - MagicMock(process=MagicMock(pid=9851), type='datastore'), - MagicMock(process=MagicMock(pid=9852), type='datastore'), - MagicMock(process=MagicMock(pid=3589), type='search'), - MagicMock(process=MagicMock(pid=4589), type='search'), - ] + subprocess_mocks = [] + + # Mock `cat /lib/systemd/system/appscale-*.target | ...` output + targets_mock = MagicMock(returncode=0) + stdout, stderr = APPSCALE_TARGETS, b'' + targets_mock.communicate.return_value = future((stdout, stderr)) + subprocess_mocks.append(('cat /lib/systemd/', targets_mock)) + + # Mock `systemctl --no-legend list-units "appscale-*.service" | ...` output + list_units_mock = MagicMock(returncode=0) + stdout, stderr = APPSCALE_SERVICES, b'' + list_units_mock.communicate.return_value = future((stdout, stderr)) + subprocess_mocks.append(('systemctl --no-legend list-units', list_units_mock)) + + # Mock `systemctl show --property MainPID --value ` output + for service, pid in SERVICE_PID_MAP.items(): + show_mainpid_mock = MagicMock(returncode=0) + stdout, stderr = pid, b'' + show_mainpid_mock.communicate.return_value = future((stdout, stderr)) + subprocess_mocks.append(('--value {}'.format(service), show_mainpid_mock)) + + # Mock `for slice in /sys/fs/cgroup/systemd/appscale.slice/... ; do...` output + appscale_slice_pids_mock = MagicMock(returncode=0) + stdout, stderr = APPSCALE_SLICE_PIDS, b'' + appscale_slice_pids_mock.communicate.return_value = future((stdout, stderr)) + subprocess_mocks.append(('for slice in /sys/fs/', appscale_slice_pids_mock)) def fake_subprocess_shell(command, **kwargs): - if command.startswith('monit'): - return future(monit_mock) - if command.startswith('systemctl'): - return future(systemctl_mock) + for matcher, command_mock in subprocess_mocks: + if matcher in command: + return future(command_mock) assert False, 'Unexpected command "{}"'.format(command) subprocess_patcher = patch( 'asyncio.create_subprocess_shell', side_effect=fake_subprocess_shell ) - service_manager_patcher = patch( - 'appscale.admin.service_manager.ServiceManager.get_state', - return_value=state_mock - ) # ^^^ ALL INPUTS ARE SPECIFIED (or mocked) ^^^ with subprocess_patcher: - with service_manager_patcher: - # Calling method under test - known_processes = await process.get_known_processes() + # Calling method under test + known_processes = await process.get_known_processes() # ASSERTING EXPECTATIONS assert known_processes == { - 8466: ['appscale', 'haproxy', 'haproxy'], - 5045: ['appscale', 'application', 'app___my-25app-20003', 'app___my-25app'], - 9850: ['appscale', 'datastore'], - 9851: ['appscale', 'datastore'], - 9852: ['appscale', 'datastore'], - 3589: ['appscale', 'search'], - 4589: ['appscale', 'search'], - 28783: ['appscale', 'solr'], + 9021: ['appscale', 'ejabberd'], + 9022: ['appscale', 'nginx'], + 9023: ['appscale', 'rabbitmq-server'], + 9024: ['appscale', 'zookeeper'], + 10025: ['appscale', 'blobstore'], + 10026: ['appscale', 'cassandra'], + 10027: ['appscale', 'controller'], + 10028: ['appscale', 'groomer'], + 10029: ['appscale', 'haproxy', '_app'], + 10030: ['appscale', 'haproxy', '_service'], + 10031: ['appscale', 'hermes'], + 10032: ['appscale', 'infrastructure', '_basic'], + 10033: ['appscale', 'instance-manager'], + 10034: ['appscale', 'instance-run', '_testapp', '_mod1', '_v1', '_1570022208920-20000'], + 10035: ['appscale', 'memcached'], + 10036: ['appscale', 'transaction-groomer'], + 10037: ['appscale', 'uaserver'], + 11038: ['appscale', 'datastore'], + 11039: ['appscale', 'datastore'], + 11040: ['appscale', 'search'], } diff --git a/SearchService2/solr-management/solr.service b/SearchService2/solr-management/appscale-solr.service similarity index 94% rename from SearchService2/solr-management/solr.service rename to SearchService2/solr-management/appscale-solr.service index c2da9de983..3217724654 100644 --- a/SearchService2/solr-management/solr.service +++ b/SearchService2/solr-management/appscale-solr.service @@ -1,4 +1,4 @@ -# /etc/systemd/system/solr.service +# /etc/systemd/system/appscale-solr.service [Unit] Description=Apache SOLR After=syslog.target network.target remote-fs.target nss-lookup.target diff --git a/SearchService2/solr-management/ensure_solr_running.sh b/SearchService2/solr-management/ensure_solr_running.sh index 6554e0d886..086bbfb460 100755 --- a/SearchService2/solr-management/ensure_solr_running.sh +++ b/SearchService2/solr-management/ensure_solr_running.sh @@ -63,24 +63,24 @@ export PRIVATE_IP envsubst '$SOLR_HEAP $ZK_HOST $PRIVATE_IP' \ < "${SOLR_MANAGEMENT_DIR}/solr.in.sh" > "/tmp/solr.in.sh" envsubst '$MEMORY_LOW $MEMORY_HIGH $MEMORY_MAX'\ - < "${SOLR_MANAGEMENT_DIR}/solr.service" > "/tmp/solr.service" + < "${SOLR_MANAGEMENT_DIR}/appscale-solr.service" > "/tmp/appscale-solr.service" if cmp -s "/tmp/solr.in.sh" "/etc/default/solr.in.sh" \ -&& cmp -s "/tmp/solr.service" "/etc/systemd/system/solr.service" +&& cmp -s "/tmp/appscale-solr.service" "/etc/systemd/system/appscale-solr.service" then echo "/etc/default/solr.in.sh has no changes." - echo "/etc/systemd/system/solr.service has no changes." + echo "/etc/systemd/system/appscale-solr.service has no changes." echo "Making sure Solr is running." - sudo systemctl enable solr - sudo systemctl start solr + sudo systemctl enable appscale-solr + sudo systemctl start appscale-solr else echo "Copying new solr.in.sh to /etc/default/solr.in.sh" sudo cp "/tmp/solr.in.sh" "/etc/default/solr.in.sh" - echo "Copying new solr.service to /etc/systemd/system/solr.service" - sudo cp "/tmp/solr.service" "/etc/systemd/system/solr.service" + echo "Copying new solr.service to /etc/systemd/system/appscale-solr.service" + sudo cp "/tmp/appscale-solr.service" "/etc/systemd/system/appscale-solr.service" echo "Making sure Solr is restarted." sudo systemctl daemon-reload - sudo systemctl enable solr - sudo systemctl restart solr + sudo systemctl enable appscale-solr + sudo systemctl restart appscale-solr fi echo "Making sure appscale-specific config set is uploaded to zookeeper."