Source code for mockslurm.mock_squeue

#!/usr/bin/env python
"""Implement a mock of the squeue command of slurm.

The squeue -o, --format argument is supported except for the %all directive
"""

import argparse
import datetime
from typing import Callable, List, Tuple

import numpy as np

from mockslurm.process_db import (
    JobReason,
    JobState,
    find_db_file,
    get_db,
    get_db_file_handle,
    get_filtered_DB_mask,
)
from mockslurm.utils import filter_dict_from_args

_SQUEUE_DEFAULT_SHORT_FORMAT = "%.18i %.9P %.8j %.8u %.2t %.10M %.6D %R"
_SQUEUE_DEFAULT_LONG_FORMAT = "%.18i %.9P %.8j %.8u %.8T %.10M %.9l %.6D %R"

_SQUEUE_STATE_CODE_LONG_TO_SHORT = {
    JobState.PENDING: "PD",
    JobState.RUNNING: "R",
    JobState.FAILED: "F",
    JobState.COMPLETED: "CD",
    JobState.CANCELLED: "CA",
}


[docs] def count_nodes(nodelist: str) -> int: """Count the number of nodes requested by users in the nodelist Parameters ---------- nodelist : str Nodelist string in slurm format: comma separated list of nodes or node ranges example: "cp12,cp18", "cp[10-14],cp28" Returns ------- int Number of nodes concerned by the nodelist """ if not nodelist: # mock considers no nodelist jobs get allocated 1 node return 1 nodecount = 0 nlist = nodelist.split(",") for node_specs in nlist: if "[" in node_specs: _, node_specs = node_specs.split("[") # ignore node name if "-" in node_specs: # count nodes in range first, last = node_specs.split("-") last = last.split("]")[0] # ignore closing "]" if any nodecount += int(last) - int(first) + 1 else: # not a range: single node nodecount += 1 return nodecount
SQUEUE_FORMAT_LETTER_TO_DB_FIELD = { # "": lambda job_info, _: job_info["PID"], "i": ("JOBID", lambda _, job_idx: str(job_idx)), "A": ("JOBID", lambda _, job_idx: str(job_idx)), "j": ("NAME", lambda job_info, _: job_info["NAME"].decode()), "u": ("USER", lambda job_info, _: job_info["USER"].decode()), "a": ("ACCOUNT", lambda job_info, _: job_info["ACCOUNT"].decode()), "P": ("PARTITION", lambda job_info, _: job_info["PARTITION"].decode()), "v": ("RESERVATION", lambda job_info, _: job_info["RESERVATION"].decode()), "M": ( "TIME", lambda job_info, _: ( # split is to remove the floating point part, that contains microseconds that should not be displayed str(datetime.datetime.now() - datetime.datetime.fromtimestamp(job_info["START_TIME"])).split(".")[0] if job_info["STATE"] == JobState.RUNNING else "0:00:00" ), ), "l": ("TIME_LIMIT", lambda job_info, _: "UNLIMITED"), # no time limit in mock "n": ("REQ_NODES", lambda job_info, _: job_info["NODELIST"].decode()), "N": ("NODELIST", lambda job_info, _: job_info["NODELIST"].decode()), "D": ("NODES", lambda job_info, _: str(count_nodes(job_info["NODELIST"].decode()))), "S": ("START_TIME", lambda job_info, _: job_info["START_TIME"]), "V": ( "SUBMIT_TIME", lambda job_info, _: job_info["START_TIME"].decode(), ), # equal to start time for mock) "o": ("COMMAND", lambda job_info, _: job_info["CMD"].decode()), "r": ("REASON", lambda job_info, _: JobReason(job_info["REASON"]).name), "t": ("ST", lambda job_info, _: _SQUEUE_STATE_CODE_LONG_TO_SHORT[job_info["STATE"]]), "T": ("STATE", lambda job_info, _: JobState(job_info["STATE"]).name), "R": ( "NODELIST(REASON)", lambda job_info, _: ( "(" + JobReason(job_info["REASON"]).name + ")" if JobReason(job_info["STATE"]) == JobState.PENDING or JobReason(job_info["REASON"]) == JobReason.NonZeroExitCode else job_info["NODELIST"].decode() ), ), # "": lambda job_info, _: job_info["EXIT_CODE"], }
[docs] def parse_squeue_format(squeue_format_str: str) -> Tuple[str, List[Callable]]: """Convert squeue format argument (-o format) to a python format string and list of function filling the values of the format string for each job DB row. Parameters ---------- squeue_format_str : str Squeue format string, eg "%.18i %.9P %.8j %.8u %.2t %.10M %.6D %R". Warning ------- Does not support squeue's "%all" formatting string. Returns ------- Tuple[str, List[Callable]] python format string, and a list of callable that should be used to fill the values in the formatted string with a job DB row and job_index. Examples -------- >>> format_str, fields_filler_fcts = parse_squeue_format("%.18i %.9P %.8j %.8u %.2t %.10M %.6D %R") >>> squeue_output = format_str.format(*[fct(job_DB_row, row_idx) for fct in fields_filler_fcts]) """ # Iterate on the format string and for each found % # add a "{:ndigit}" to the python string, with optional ">" if "." follows "%" # ndigit is found by converting the characters following % # the function filling the value is taken from the map of squeue format letters to DB function python_format_string = "" fields_header = [] fields_filler_functions = [] # split 1st value is empty if string starts with delimiter for field_format_str in squeue_format_str.split("%")[1:]: python_format_string += "{:" if field_format_str[0] == ".": python_format_string += ">" field_format_str = field_format_str[1:] idx = 0 while field_format_str[idx].isdigit(): python_format_string += field_format_str[idx] idx += 1 header, fct = SQUEUE_FORMAT_LETTER_TO_DB_FIELD[field_format_str[idx]] fields_header.append(header) fields_filler_functions.append(fct) python_format_string += "}" + field_format_str[idx + 1 :] return python_format_string, fields_header, fields_filler_functions
[docs] def main(): parser = argparse.ArgumentParser( description="Slurm scancel mock", formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False ) user_group = parser.add_mutually_exclusive_group() parser.add_argument( "--account", "-A", type=str, dest="ACCOUNT", help="Specify the accounts of the jobs to view. Accepts a comma separated list of account names", ) parser.add_argument( "--name", "-n", type=str, dest="NAME", help="Request jobs having one of the specified names. The list consists of a comma separated list of job names.", ) user_group.add_argument( "--me", action="store_true", dest="me", help="Equivalent to --user=<my username>", ) parser.add_argument( "--nodelist", "-w", type=str, dest="NODELIST", help="Report only on jobs allocated to the specified node or list of nodes", ) parser.add_argument( "--format", "-o", type=str, default=_SQUEUE_DEFAULT_SHORT_FORMAT, dest="format_str", help="Specify the information to be displayed, its size and position", ) parser.add_argument( "--noheader", "-h", action="store_true", dest="no_header", help="Do not print a header on the output" ) parser.add_argument( "--long", "-l", action="store_true", dest="long", help="Report more of the available information for the selected jobs or job steps, subject to any constraints specified", ) parser.add_argument( "--partition", "-p", type=str, dest="PARTITION", help="Specify the partitions of the jobs or steps to view. Accepts a comma separated list of partition names", ) parser.add_argument( "--reservation", "-R", type=str, dest="RESERVATION", help="Specify the reservation of the jobs to view", ) parser.add_argument( "--usage", action="store_true", dest="print_help", help="Print a brief help message listing the squeue options", ) parser.add_argument( "--jobs", "-j", type=str, dest="jobs", help="Specify a comma separated list of job IDs to display, Defaults to all jobs", ) args = parser.parse_args() if args.print_help: parser.print_help() if args.long and not args.format_str: args.format_str = _SQUEUE_DEFAULT_LONG_FORMAT # Split args that take list in comma separated string to list! args.ACCOUNT = args.ACCOUNT.split(",") if args.ACCOUNT is not None else None args.NAME = args.NAME.split(",") if args.NAME is not None else None args.PARTITION = args.PARTITION.split(",") if args.PARTITION is not None else None args.jobs = [int(job_id) for job_id in args.jobs.split(",")] if args.jobs is not None else None field_filter_values = filter_dict_from_args(args) # filter out finished jobs field_filter_values["STATE"] = [JobState.RUNNING, JobState.PENDING] with get_db_file_handle(find_db_file()) as db_file: # Get mask to select DB rows mask = get_filtered_DB_mask(db_file, field_filter_values) # filter job IDs if some were specified if args.jobs and len(mask) > 0: # if mask is empty (no jobs in DB), skip # job IDs are just the index of the jobs in the DB! # remove (silently like real squeue...) job IDs that do not exist args.jobs = [job_ID for job_ID in args.jobs if 0 <= job_ID < len(mask)] if not args.jobs: # no jobs remaining: exit with error like squeue print("slurm_load_jobs error: Invalid job id specified") exit(1) mask[args.jobs] = True # Get format string and function formating field value based on squeue --format format_str, fields_header, fields_filler_fct = parse_squeue_format(args.format_str) # Print header if not args.no_header: print(format_str.format(*fields_header)) # Print the jobs found in filtered DB job_indices = np.nonzero(mask)[0] for idx, job in zip(job_indices, get_db(db_file)[mask]): print(format_str.format(*[fct(job, idx) for fct in fields_filler_fct]))
if __name__ == "__main__": main()