diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..acbb1ac --- /dev/null +++ b/.env.example @@ -0,0 +1,37 @@ +# job_history database configuration +# Copy this file to .env and fill in the values appropriate for your environment. +# .env is gitignored — never commit credentials. + +# ------------------------------------------------------------------- +# Backend selection: "sqlite" (default) or "postgres" +# ------------------------------------------------------------------- +JH_DB_BACKEND=sqlite + +# ------------------------------------------------------------------- +# SQLite settings (used when JH_DB_BACKEND=sqlite) +# ------------------------------------------------------------------- + +# Directory containing per-machine database files ({machine}.db). +# Default: ./data relative to the project root. +#JOB_HISTORY_DATA_DIR=./data + +# Per-machine path overrides (take precedence over JOB_HISTORY_DATA_DIR): +#QHIST_DERECHO_DB=/path/to/derecho.db +#QHIST_CASPER_DB=/path/to/casper.db + +# ------------------------------------------------------------------- +# PostgreSQL settings (used when JH_DB_BACKEND=postgres) +# ------------------------------------------------------------------- + +#JH_PG_HOST=localhost +#JH_PG_PORT=5432 +#JH_PG_USER=postgres +#JH_PG_PASSWORD=example + +# Require SSL/TLS for the PostgreSQL connection (true/false): +#JH_PG_REQUIRE_SSL=false + +# Per-machine database name overrides. +# Default pattern: {machine}_jobs (e.g. derecho_jobs, casper_jobs) +#JH_PG_DERECHO_DB=derecho_jobs +#JH_PG_CASPER_DB=casper_jobs diff --git a/.gitignore b/.gitignore index ca578ec..64d756c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,7 @@ +# never commit credentials +.env + +# ...or build cruft *.pyc __pycache__ dist/ @@ -7,3 +11,6 @@ build/ *.sh actions-runner* pbs-parser-* + +# Test make installs +test-install diff --git a/Makefile b/Makefile index d8ad6d8..5c586d2 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,9 @@ PREFIX ?= /usr/local -VERSION := 1.1 +VERSION := 1.2 -install: lib/pbsparse/Makefile +.PHONY: update-pbsparse + +install: update-pbsparse mkdir -p $(PREFIX)/bin $(PREFIX)/lib/qhist sed 's|/src|/lib/qhist|' bin/qhist > $(PREFIX)/bin/qhist cp -r src/qhist $(PREFIX)/lib/qhist @@ -9,17 +11,26 @@ install: lib/pbsparse/Makefile cp -r share $(PREFIX)/share chmod +x $(PREFIX)/bin/qhist +test-install: + @echo "Installing package into test-install directory..." + PREFIX=$(CURDIR)/test-install $(MAKE) install + PREFIX=$(CURDIR)/test-install $(MAKE) ncar-extensions + $(PREFIX)/bin/qhist: @echo "You must run 'make install' before you can install any extensions" @exit 1 -ncar-extensions: $(PREFIX)/bin/qhist - git clone https://github.com/NCAR/pbs-parser-ncar.git +ncar-extensions: $(PREFIX)/bin/qhist pbs-parser-ncar cp pbs-parser-ncar/ncar.py $(PREFIX)/lib/qhist/qhist/extensions/ +pbs-parser-ncar: + git clone https://github.com/NCAR/pbs-parser-ncar.git + +update-pbsparse: lib/pbsparse/Makefile + git submodule update + lib/pbsparse/Makefile: git submodule init - git submodule update build: python3 -m build @@ -44,4 +55,4 @@ man: --output share/man/man1/qhist.1 clean: - rm -rf dist build + rm -rf dist build test-install diff --git a/lib/pbsparse b/lib/pbsparse index 2d734eb..690d1da 160000 --- a/lib/pbsparse +++ b/lib/pbsparse @@ -1 +1 @@ -Subproject commit 2d734ebd57b55adb4c8f8e4f8c435a6cb90db46c +Subproject commit 690d1da58029bd75138f7e39018b1352abf2545a diff --git a/pyproject.toml b/pyproject.toml index 747b2dd..b9bf32d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,4 +36,10 @@ issues = "https://github.com/NCAR/qhist/issues" [project.scripts] qhist = "qhist.qhist:main" +[project.optional-dependencies] +jobhist-db = [ + # hpc-usage-queries: job history database integration + "hpc-usage-queries[postgres] @ git+https://github.com/benkirk/hpc-usage-queries.git", +] + [tool.setuptools_scm] diff --git a/src/qhist/cfg/default.json b/src/qhist/cfg/default.json index 645ddfb..8ba3ca1 100644 --- a/src/qhist/cfg/default.json +++ b/src/qhist/cfg/default.json @@ -113,5 +113,6 @@ "default" : "{short_id:12.12} {user:10.10} {queue:8.8} {numnodes:>5d} {numcpus:>6d} {numgpus:>5d} {end:%d-%H%M} {memory:>8.2f} {avgcpu:>6.2f} {elapsed:>6.2f}", "wide_status" : "{short_id:12.12} {user:15.15} {queue:10.10} {numnodes:>5d} {numcpus:>6d} {numgpus:>5d} {end:%m-%dT%H:%M} {reqmem:>10.2f} {memory:>10.2f} {avgcpu:>7.2f} {status:>4.4} {elapsed:>7.2f} {name}", "default_status": "{short_id:12.12} {user:10.10} {queue:8.8} {numnodes:>5d} {numcpus:>6d} {numgpus:>5d} {end:%d-%H%M} {memory:>8.2f} {avgcpu:>6.2f} {status:>4.4} {elapsed:>6.2f}" - } + }, + "pbs_log_error" : "log directory not found ({})" } diff --git a/src/qhist/qhist.py b/src/qhist/qhist.py index 549c2ac..ee50cf7 100644 --- a/src/qhist/qhist.py +++ b/src/qhist/qhist.py @@ -8,13 +8,22 @@ * Memory-friendly sorting """ -import sys, os, argparse, datetime, signal, string, _string, json, operator, re, importlib, textwrap +import sys, os, argparse, datetime, signal, string, _string, json, operator +import re, importlib, textwrap, difflib from collections import OrderedDict from json.decoder import JSONDecodeError from pbsparse import get_pbs_records from glob import glob +# import job_history database & plugin API, if available +try: + from job_history.database import db_available + from job_history.qhist_plugin import db_get_records +except ImportError: + db_available = lambda x: False + db_get_records = None + # Use default signal behavior on system rather than throwing IOError signal.signal(signal.SIGPIPE, signal.SIG_DFL) @@ -170,7 +179,7 @@ def load_config(self, file_path): try: self.pbs_log_start = sorted(f for f in os.listdir(self.pbs_log_path) if os.path.isfile(os.path.join(self.pbs_log_path, f)))[0] except FileNotFoundError: - exit("Error: log directory nof found ({})".format(self.pbs_log_path)) + exit("Error: " + self.pbs_log_error.format(self.pbs_log_path)) except AttributeError: pass @@ -528,11 +537,18 @@ def main(): if not CustomRecord: exit("Error: given custom record class not found in code extensions ({})".format(config.record_class)) + # These fields are computed by pbsparse and do not come directly from the PBS records + derived_fields = ["waittime"] + + # Ensure 'averages' and 'num_jobs' exist for nonlocal binding (must exist even if not used) + averages = None + num_jobs = 0 + # Long-form help if args.format == "help": print(format_help) - for key in ["id", "short_id"] + sorted(config.format_map): + for key in ["id", "short_id"] + sorted(derived_fields + list(config.format_map)): print(" {}".format(key)) print() @@ -540,7 +556,7 @@ def main(): elif args.filter == "help": print(filter_help) - for key in sorted(k for k in config.format_map if k not in ("end", "start", "nodelist")): + for key in sorted(k for k in (derived_fields + list(config.format_map)) if k not in ("end", "start", "nodelist")): print(" {}".format(key)) print() @@ -614,6 +630,8 @@ def main(): data_filters.append((False, operator.gt, "waittime", float(args.wait) / 60)) if args.filter: + available_filters = [k for k in (derived_fields + list(config.format_map)) if k not in ("end", "start", "nodelist")] + for fexpr in args.filter.split(";"): for op in ops: if op in fexpr: @@ -624,6 +642,15 @@ def main(): negation = False field, match = [e.strip() for e in fexpr.split(op)] + if field not in available_filters: + print(f"Error: {field} is not a valid filter (see 'qhist -F help' for all)", file = sys.stderr) + possible_filters = difflib.get_close_matches(field, available_filters, 3, 0.6) + + if possible_filters: + print("\nDid you mean: " + ", ".join(possible_filters) + "?", file = sys.stderr) + + sys.exit(1) + data_filters.append((negation, ops[op], config.translate_field(field), match)) break @@ -720,61 +747,70 @@ def main(): print(' "timestamp":{},'.format(int(datetime.datetime.today().timestamp()))) print(' "Jobs":{') + is_first_json_job = True - while keep_going(bounds, log_date, args.reverse): - data_date = datetime.datetime.strftime(log_date, config.pbs_date_format) - data_file = os.path.join(config.pbs_log_path, data_date) - jobs = get_pbs_records(data_file, CustomRecord, True, args.events, - id_filter, host_filter, data_filters, time_filters, - args.reverse, time_divisor) + def emit_formatted_jobs(jobs_iter): + nonlocal num_jobs, is_first_json_job - if args.list: - for job in jobs: + for job in jobs_iter: + if args.list: list_output(job, fields, labels, list_format, nodes = args.nodes) - elif args.csv: - for job in jobs: + elif args.csv: csv_output(job, fields) - elif args.json: - first_job = True - - for job in jobs: - if not first_job: + elif args.json: + if not is_first_json_job: print(",") - print(textwrap.indent(json_output(job)[2:-2], " "), end = "") - first_job = False - elif args.nodes: - if args.average: - for job in jobs: - if '[]' not in job.id: - for category in averages: - for field in averages[category]: - averages[category][field] += getattr(job, category)[field] - - num_jobs += 1 - - print("{}\n {}".format(tabular_output(vars(job), table_format), ",".join(job.get_nodes()))) + is_first_json_job = False + elif args.nodes: + if averages and '[]' not in job.id: + for category in averages: + for field in averages[category]: + averages[category][field] += getattr(job, category)[field] + num_jobs += 1 + print("{}\n {}".format(tabular_output(vars(job), table_format), ",".join(job.get_nodes()))) else: - for job in jobs: - print("{}\n {}".format(tabular_output(vars(job), table_format), ",".join(job.get_nodes()))) - else: - if args.average: - for job in jobs: - if '[]' not in job.id: - for category in averages: - for field in averages[category]: - averages[category][field] += getattr(job, category)[field] - - num_jobs += 1 - print(tabular_output(vars(job), table_format)) + if averages and '[]' not in job.id: + for category in averages: + for field in averages[category]: + averages[category][field] += getattr(job, category)[field] + num_jobs += 1 + print(tabular_output(vars(job), table_format)) + + # what machine to query. + # optional QHIST_MACHINE with fallback to "machine" from config file + machine = os.environ.get("QHIST_MACHINE", getattr(config, "machine", None)) + + if machine and db_available(machine): + emit_formatted_jobs( + db_get_records( + machine, + bounds[0], + bounds[1], + time_divisor=time_divisor, + id_filter=id_filter, + host_filter=host_filter, + data_filters=data_filters, + time_filter=time_filters, + reverse=args.reverse, + ) + ) + else: + if machine: + print(f"Warning: DB not available for {machine!r}; falling back to log scanning", file=sys.stderr) + + while keep_going(bounds, log_date, args.reverse): + data_date = datetime.datetime.strftime(log_date, config.pbs_date_format) + data_file = os.path.join(config.pbs_log_path, data_date) + jobs = get_pbs_records(data_file, CustomRecord, True, args.events, + id_filter, host_filter, data_filters, time_filters, + args.reverse, time_divisor) + emit_formatted_jobs(jobs) + + if args.reverse: + log_date -= ONE_DAY else: - for job in jobs: - print(tabular_output(vars(job), table_format)) - - if args.reverse: - log_date -= ONE_DAY - else: - log_date += ONE_DAY + log_date += ONE_DAY if args.json: print("\n }\n}") @@ -794,5 +830,5 @@ def main(): print(config.generate_header(format_type, units = units)) print(tabular_output(averages, averages_format)) - except UnboundLocalError: + except (NameError, UnboundLocalError): print("Note: statistics output is only currently supported for tabular mode", file = sys.stderr)