Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# job_history database configuration
# Copy this file to .env and fill in the values appropriate for your environment.
# .env is gitignored — never commit credentials.

# -------------------------------------------------------------------
# Backend selection: "sqlite" (default) or "postgres"
# -------------------------------------------------------------------
JH_DB_BACKEND=sqlite

# -------------------------------------------------------------------
# SQLite settings (used when JH_DB_BACKEND=sqlite)
# -------------------------------------------------------------------

# Directory containing per-machine database files ({machine}.db).
# Default: ./data relative to the project root.
#JOB_HISTORY_DATA_DIR=./data

# Per-machine path overrides (take precedence over JOB_HISTORY_DATA_DIR):
#QHIST_DERECHO_DB=/path/to/derecho.db
#QHIST_CASPER_DB=/path/to/casper.db

# -------------------------------------------------------------------
# PostgreSQL settings (used when JH_DB_BACKEND=postgres)
# -------------------------------------------------------------------

#JH_PG_HOST=localhost
#JH_PG_PORT=5432
#JH_PG_USER=postgres
#JH_PG_PASSWORD=example

# Require SSL/TLS for the PostgreSQL connection (true/false):
#JH_PG_REQUIRE_SSL=false

# Per-machine database name overrides.
# Default pattern: {machine}_jobs (e.g. derecho_jobs, casper_jobs)
#JH_PG_DERECHO_DB=derecho_jobs
#JH_PG_CASPER_DB=casper_jobs
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# never commit credentials
.env

# ...or build cruft
*.pyc
__pycache__
dist/
Expand All @@ -7,3 +11,6 @@ build/
*.sh
actions-runner*
pbs-parser-*

# Test make installs
test-install
23 changes: 17 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,25 +1,36 @@
PREFIX ?= /usr/local
VERSION := 1.1
VERSION := 1.2

install: lib/pbsparse/Makefile
.PHONY: update-pbsparse

install: update-pbsparse
mkdir -p $(PREFIX)/bin $(PREFIX)/lib/qhist
sed 's|/src|/lib/qhist|' bin/qhist > $(PREFIX)/bin/qhist
cp -r src/qhist $(PREFIX)/lib/qhist
cp -r lib/pbsparse/src/pbsparse $(PREFIX)/lib/qhist
cp -r share $(PREFIX)/share
chmod +x $(PREFIX)/bin/qhist

test-install:
@echo "Installing package into test-install directory..."
PREFIX=$(CURDIR)/test-install $(MAKE) install
PREFIX=$(CURDIR)/test-install $(MAKE) ncar-extensions

$(PREFIX)/bin/qhist:
@echo "You must run 'make install' before you can install any extensions"
@exit 1

ncar-extensions: $(PREFIX)/bin/qhist
git clone https://github.com/NCAR/pbs-parser-ncar.git
ncar-extensions: $(PREFIX)/bin/qhist pbs-parser-ncar
cp pbs-parser-ncar/ncar.py $(PREFIX)/lib/qhist/qhist/extensions/

pbs-parser-ncar:
git clone https://github.com/NCAR/pbs-parser-ncar.git

update-pbsparse: lib/pbsparse/Makefile
git submodule update

lib/pbsparse/Makefile:
git submodule init
git submodule update

build:
python3 -m build
Expand All @@ -44,4 +55,4 @@ man:
--output share/man/man1/qhist.1

clean:
rm -rf dist build
rm -rf dist build test-install
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,10 @@ issues = "https://github.com/NCAR/qhist/issues"
[project.scripts]
qhist = "qhist.qhist:main"

[project.optional-dependencies]
jobhist-db = [
# hpc-usage-queries: job history database integration
"hpc-usage-queries[postgres] @ git+https://github.com/benkirk/hpc-usage-queries.git",
]

[tool.setuptools_scm]
3 changes: 2 additions & 1 deletion src/qhist/cfg/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,6 @@
"default" : "{short_id:12.12} {user:10.10} {queue:8.8} {numnodes:>5d} {numcpus:>6d} {numgpus:>5d} {end:%d-%H%M} {memory:>8.2f} {avgcpu:>6.2f} {elapsed:>6.2f}",
"wide_status" : "{short_id:12.12} {user:15.15} {queue:10.10} {numnodes:>5d} {numcpus:>6d} {numgpus:>5d} {end:%m-%dT%H:%M} {reqmem:>10.2f} {memory:>10.2f} {avgcpu:>7.2f} {status:>4.4} {elapsed:>7.2f} {name}",
"default_status": "{short_id:12.12} {user:10.10} {queue:8.8} {numnodes:>5d} {numcpus:>6d} {numgpus:>5d} {end:%d-%H%M} {memory:>8.2f} {avgcpu:>6.2f} {status:>4.4} {elapsed:>6.2f}"
}
},
"pbs_log_error" : "log directory not found ({})"
}
140 changes: 88 additions & 52 deletions src/qhist/qhist.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,22 @@
* Memory-friendly sorting
"""

import sys, os, argparse, datetime, signal, string, _string, json, operator, re, importlib, textwrap
import sys, os, argparse, datetime, signal, string, _string, json, operator
import re, importlib, textwrap, difflib

from collections import OrderedDict
from json.decoder import JSONDecodeError
from pbsparse import get_pbs_records
from glob import glob

# import job_history database & plugin API, if available
try:
from job_history.database import db_available
from job_history.qhist_plugin import db_get_records
except ImportError:
db_available = lambda x: False
db_get_records = None


# Use default signal behavior on system rather than throwing IOError
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
Expand Down Expand Up @@ -170,7 +179,7 @@ def load_config(self, file_path):
try:
self.pbs_log_start = sorted(f for f in os.listdir(self.pbs_log_path) if os.path.isfile(os.path.join(self.pbs_log_path, f)))[0]
except FileNotFoundError:
exit("Error: log directory nof found ({})".format(self.pbs_log_path))
exit("Error: " + self.pbs_log_error.format(self.pbs_log_path))
except AttributeError:
pass

Expand Down Expand Up @@ -528,19 +537,26 @@ def main():
if not CustomRecord:
exit("Error: given custom record class not found in code extensions ({})".format(config.record_class))

# These fields are computed by pbsparse and do not come directly from the PBS records
derived_fields = ["waittime"]

# Ensure 'averages' and 'num_jobs' exist for nonlocal binding (must exist even if not used)
averages = None
num_jobs = 0

# Long-form help
if args.format == "help":
print(format_help)

for key in ["id", "short_id"] + sorted(config.format_map):
for key in ["id", "short_id"] + sorted(derived_fields + list(config.format_map)):
print(" {}".format(key))

print()
sys.exit()
elif args.filter == "help":
print(filter_help)

for key in sorted(k for k in config.format_map if k not in ("end", "start", "nodelist")):
for key in sorted(k for k in (derived_fields + list(config.format_map)) if k not in ("end", "start", "nodelist")):
print(" {}".format(key))

print()
Expand Down Expand Up @@ -614,6 +630,8 @@ def main():
data_filters.append((False, operator.gt, "waittime", float(args.wait) / 60))

if args.filter:
available_filters = [k for k in (derived_fields + list(config.format_map)) if k not in ("end", "start", "nodelist")]

for fexpr in args.filter.split(";"):
for op in ops:
if op in fexpr:
Expand All @@ -624,6 +642,15 @@ def main():
negation = False
field, match = [e.strip() for e in fexpr.split(op)]

if field not in available_filters:
print(f"Error: {field} is not a valid filter (see 'qhist -F help' for all)", file = sys.stderr)
possible_filters = difflib.get_close_matches(field, available_filters, 3, 0.6)

if possible_filters:
print("\nDid you mean: " + ", ".join(possible_filters) + "?", file = sys.stderr)

sys.exit(1)

data_filters.append((negation, ops[op], config.translate_field(field), match))
break

Expand Down Expand Up @@ -720,61 +747,70 @@ def main():
print(' "timestamp":{},'.format(int(datetime.datetime.today().timestamp())))
print(' "Jobs":{')

is_first_json_job = True

while keep_going(bounds, log_date, args.reverse):
data_date = datetime.datetime.strftime(log_date, config.pbs_date_format)
data_file = os.path.join(config.pbs_log_path, data_date)
jobs = get_pbs_records(data_file, CustomRecord, True, args.events,
id_filter, host_filter, data_filters, time_filters,
args.reverse, time_divisor)
def emit_formatted_jobs(jobs_iter):
nonlocal num_jobs, is_first_json_job

if args.list:
for job in jobs:
for job in jobs_iter:
if args.list:
list_output(job, fields, labels, list_format, nodes = args.nodes)
elif args.csv:
for job in jobs:
elif args.csv:
csv_output(job, fields)
elif args.json:
first_job = True

for job in jobs:
if not first_job:
elif args.json:
if not is_first_json_job:
print(",")

print(textwrap.indent(json_output(job)[2:-2], " "), end = "")
first_job = False
elif args.nodes:
if args.average:
for job in jobs:
if '[]' not in job.id:
for category in averages:
for field in averages[category]:
averages[category][field] += getattr(job, category)[field]

num_jobs += 1

print("{}\n {}".format(tabular_output(vars(job), table_format), ",".join(job.get_nodes())))
is_first_json_job = False
elif args.nodes:
if averages and '[]' not in job.id:
for category in averages:
for field in averages[category]:
averages[category][field] += getattr(job, category)[field]
num_jobs += 1
print("{}\n {}".format(tabular_output(vars(job), table_format), ",".join(job.get_nodes())))
else:
for job in jobs:
print("{}\n {}".format(tabular_output(vars(job), table_format), ",".join(job.get_nodes())))
else:
if args.average:
for job in jobs:
if '[]' not in job.id:
for category in averages:
for field in averages[category]:
averages[category][field] += getattr(job, category)[field]

num_jobs += 1
print(tabular_output(vars(job), table_format))
if averages and '[]' not in job.id:
for category in averages:
for field in averages[category]:
averages[category][field] += getattr(job, category)[field]
num_jobs += 1
print(tabular_output(vars(job), table_format))

# what machine to query.
# optional QHIST_MACHINE with fallback to "machine" from config file
machine = os.environ.get("QHIST_MACHINE", getattr(config, "machine", None))

if machine and db_available(machine):
emit_formatted_jobs(
db_get_records(
machine,
bounds[0],
bounds[1],
time_divisor=time_divisor,
id_filter=id_filter,
host_filter=host_filter,
data_filters=data_filters,
time_filter=time_filters,
reverse=args.reverse,
)
)
else:
if machine:
print(f"Warning: DB not available for {machine!r}; falling back to log scanning", file=sys.stderr)

while keep_going(bounds, log_date, args.reverse):
data_date = datetime.datetime.strftime(log_date, config.pbs_date_format)
data_file = os.path.join(config.pbs_log_path, data_date)
jobs = get_pbs_records(data_file, CustomRecord, True, args.events,
id_filter, host_filter, data_filters, time_filters,
args.reverse, time_divisor)
emit_formatted_jobs(jobs)

if args.reverse:
log_date -= ONE_DAY
else:
for job in jobs:
print(tabular_output(vars(job), table_format))

if args.reverse:
log_date -= ONE_DAY
else:
log_date += ONE_DAY
log_date += ONE_DAY

if args.json:
print("\n }\n}")
Expand All @@ -794,5 +830,5 @@ def main():
print(config.generate_header(format_type, units = units))

print(tabular_output(averages, averages_format))
except UnboundLocalError:
except (NameError, UnboundLocalError):
print("Note: statistics output is only currently supported for tabular mode", file = sys.stderr)
Loading