From 6af24a9f276c7f05b77d387f6f6311c31e9a5be6 Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Wed, 1 Oct 2025 17:34:41 +0200 Subject: [PATCH] Example for journal analysis via JSON export --- extra/journal_analysis.ipynb | 294 +++++++++++++++++++++++++++++++++++ 1 file changed, 294 insertions(+) create mode 100644 extra/journal_analysis.ipynb diff --git a/extra/journal_analysis.ipynb b/extra/journal_analysis.ipynb new file mode 100644 index 000000000..ec4edf315 --- /dev/null +++ b/extra/journal_analysis.ipynb @@ -0,0 +1,294 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "e21c672d-c631-4b27-9b5e-2b248c00a2e1", + "metadata": {}, + "source": [ + "# Journal processing demonstration\n", + "\n", + "This notebook demonstrates how to load HyperQueue JSON journal export and\n", + "do a simple analysis over it.\n", + "\n", + "You can export journal file to JSON via:\n", + "\n", + "```bash\n", + "$ hq journal export > journal.json\n", + "```\n", + "\n", + "or get JSON events from live server via:\n", + "\n", + "```bash\n", + "$ hq journal replay > journal.json\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cb5b0606-8547-43b2-855e-ee3ca7c4523f", + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "import dateutil.parser\n", + "from datetime import datetime\n", + "import pandas as pd\n", + "import plotly.express as px" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "dd3625d2-80ef-4842-8395-dc86d713fdaa", + "metadata": {}, + "outputs": [], + "source": [ + "# Load journal from file\n", + "with open(\"journal.json\") as f:\n", + " journal_events = [json.loads(s) for s in f]\n", + "\n", + "# Show first 10 events\n", + "for event in journal_events[:10]:\n", + " print(event[\"time\"], event[\"event\"][\"type\"]) " + ] + }, + { + "cell_type": "markdown", + "id": "aaf8e4a1-a35a-4756-ae41-4259400d0f5b", + "metadata": {}, + "source": [ + "# Analysis 1: What tasks were assigned to each worker" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c9fef944-624c-47a5-9621-8aa17146b92a", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "# Mapping from workers to tasks: dict[int, list[str]]\n", + "worker_tasks = {}\n", + "\n", + "for event in journal_events:\n", + " ev = event[\"event\"] \n", + " if ev[\"type\"] == \"worker-connected\":\n", + " # If worker is a new connected then create an entry in worker_tasks\n", + " worker_tasks[ev[\"id\"]] = []\n", + " if ev[\"type\"] == \"task-started\": \n", + " # This event is called when a task is started on a worker\n", + "\n", + " # Construct a string @\n", + " job_task_id = f\"{ev[\"job\"]}@{ev[\"task\"]}\"\n", + " \n", + " if \"worker\" in ev:\n", + " # Handle single-node tasks and put the task into record for the worker\n", + " worker_tasks[ev[\"worker\"]].append(job_task_id)\n", + " else:\n", + " # Handle multi-node tasks and put the task into record for the workers\n", + " for w in ev[\"workers\"]:\n", + " worker_tasks[w].append(job_task_id)\n", + "\n", + "print(\"Tasks assigned to workers:\")\n", + "for worker_id, tasks in worker_tasks.items():\n", + " print(f\"{worker_id}: {','.join(tasks)}\")" + ] + }, + { + "cell_type": "markdown", + "id": "6e6dbbfc-257d-4cb1-b6d1-a75addd3a344", + "metadata": {}, + "source": [ + "# Analsis 2: CPU utilization over time across all workers\n", + "\n", + "This tracks the total number of CPUs in use at each point in time" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d279364f-13b1-43a0-8950-37b393b79635", + "metadata": {}, + "outputs": [], + "source": [ + "def parse_time(tm):\n", + " \"\"\"\n", + " Parse time of an event\n", + " \"\"\"\n", + " try:\n", + " return datetime.strptime(tm, \"%Y-%m-%dT%H:%M:%S.%fZ\")\n", + " except ValueError:\n", + " return datetime.strptime(tm, \"%Y-%m-%dT%H:%M:%SZ\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ff2c3889-221f-4d4e-97f1-4bf49f16171d", + "metadata": {}, + "outputs": [], + "source": [ + "def task_configs(submit_desc):\n", + " \"\"\"\n", + " Extract resource configurations from a submit description\n", + " \"\"\"\n", + " task_desc = submit_desc[\"task_desc\"]\n", + " result = {} \n", + " if \"n_tasks\" in task_desc:\n", + " raise Exception(\"Task graphs not supported\")\n", + " for id_range in task_desc[\"ids\"][\"ranges\"]:\n", + " start = id_range[\"start\"]\n", + " for i in range(0, id_range[\"count\"], id_range[\"step\"]):\n", + " result[start + i] = task_desc[\"resources\"]\n", + " return result\n", + "\n", + "def get_resource_amount(resources, name, all_amount):\n", + " \"\"\"\n", + " Get a resource amount from task configution.\n", + " \n", + " The parameter \"all_amount\" are resources on a worker to resolve policy \"all\"\n", + " that takes all resources of the worker, i.e. it depends\n", + " on the specific worker how many resources we get.\n", + " \"\"\"\n", + " for r in resources[\"resources\"]: \n", + " if r[\"resource\"] == name:\n", + " if r[\"policy\"] == \"All\":\n", + " return all_amount\n", + " elif r[\"policy\"] in (\"ForceCompact\", \"Compact\", \"ForceTight\", \"Tight\", \"Scatter\"):\n", + " # Resources are represented in fixed point where 1.0 = 10_000\n", + " return list(r[\"policy\"].values())[0] / 10_000\n", + " return 0\n", + "\n", + "def get_worker_res_count(resources, name):\n", + " \"\"\"\n", + " Get an amount of a resources provided by a worker\n", + " \"\"\"\n", + " for r in resources:\n", + " if r[\"name\"] == name:\n", + " kind = r[\"kind\"] \n", + " if \"List\" in kind:\n", + " return len(kind[\"List\"][\"values\"])\n", + " elif \"Groups\" in kind:\n", + " return sum(len(g) for g in kind[\"Groups\"][\"groups\"])\n", + " elif \"Range\" in kind:\n", + " return kind[\"Range\"][\"end\"] - kind[\"Range\"][\"start\"] + 1\n", + " elif \"Sum\" in kind:\n", + " # Resources are represented in fixed point where 1.0 = 10_000\n", + " return kind[\"Sum\"][\"size\"] / 10_000\n", + " else:\n", + " raise Exception(\"Unknown resurce kind\")\n", + " return 0" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "69c9524e-cfa7-404b-bdaf-5e41605760ed", + "metadata": {}, + "outputs": [], + "source": [ + "# Read the starting time from the first event\n", + "BASE_TIME = parse_time(journal_events[0][\"time\"])\n", + "\n", + "# Job definitions, we need it to get resource requests of tasks\n", + "job_defs = {} \n", + "\n", + "# Worker deinitions, we need to be able to resolve allocation policy \"all\"\n", + "worker_resources = {}\n", + "\n", + "# Amount of cpus of currently running tasks\n", + "running_tasks = {}\n", + "\n", + "# Output variable. It is initialized by (0,0), i.e. at time 0, there are 0 running cpus\n", + "running_cpus = [(0, 0)]\n", + "\n", + "\n", + "for event in journal_events:\n", + " ev = event[\"event\"] \n", + " if ev[\"type\"] == \"job-created\":\n", + " # When a job is created, remember resource requests of tasks \n", + " # Note that they may be multiple submits into one job\n", + " job_defs.setdefault(ev[\"job\"], {}) \n", + " job_defs[ev[\"job\"]].update(task_configs(ev[\"submit_desc\"]))\n", + " elif ev[\"type\"] == \"worker-connected\":\n", + " # When a worker is connected, lets remember its resources\n", + " worker_resources[ev[\"id\"]] = ev[\"configuration\"][\"resources\"][\"resources\"] \n", + " elif ev[\"type\"] == \"task-started\": \n", + " # When task is started, compute allocated cpus and store them in `running_cpus`.\n", + " time = (parse_time(event[\"time\"]) - BASE_TIME).total_seconds()\n", + " \n", + " # Get task resource request\n", + " # There may be more resource request variants, so we have to choose the resource request that\n", + " # was actually started on the worker\n", + " task_def = job_defs[ev[\"job\"]][ev[\"task\"]]\n", + " task_resources = task_def[\"variants\"][ev.get(\"variant\", 0)] \n", + " \n", + " if \"worker\" in ev:\n", + " # Get CPUs of a where the task was started\n", + " worker_res = worker_resources[ev[\"worker\"]]\n", + " all_amount = get_worker_res_count(worker_res, \"cpus\")\n", + "\n", + " # Get amount of resources that task asked for\n", + " amount = get_resource_amount(task_resources, \"cpus\", all_amount)\n", + "\n", + " # Store how many CPUs we have asked for\n", + " running_tasks[(ev[\"job\"], ev[\"task\"])] = amount\n", + "\n", + " # Remember the current used CPUs in the given time\n", + " running_cpus.append((time, running_cpus[-1][1] + amount))\n", + " else:\n", + " raise Exception(\"This analysis support only single node tasks\")\n", + " elif ev[\"type\"] in (\"task-finished\", \"task-failed\", \"task-canceled\"):\n", + " # When task is finished/failed/canceled, we need to modify our counter\n", + " time = (parse_time(event[\"time\"]) - BASE_TIME).total_seconds()\n", + " amount = running_tasks.get((ev[\"job\"], ev[\"task\"]), 0)\n", + " if amount > 0:\n", + " running_cpus.append((time, running_cpus[-1][1] - all_amount)) " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "87e1af8f-7c20-427b-8a83-de6a876be43e", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0fdea084-ff4d-482c-a08a-3bfd80544ed5", + "metadata": {}, + "outputs": [], + "source": [ + "# Let's visualize the result\n", + "df = pd.DataFrame(running_cpus, columns=[\"time\", \"cpus\"])\n", + "px.line(df, x=\"time\", y=\"cpus\", line_shape=\"vh\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}