Source code for mockslurm.mock_scancel

#!/usr/bin/env python
"""Implement a mock of the scancel command of slurm.

The jobs to cancel are found by querying the database for job IDs or job names,
filtering the jobs that are already completed (return code != default value),
then the signal to transmit (default SIGKILL) is send.
"""

import argparse
import getpass
import os
import signal

import numpy as np
from mockslurm.process_db import (
    JobState,
    find_db_file,
    get_db,
    get_db_file_handle,
    get_filtered_DB_mask,
    update_db_value,
)
from mockslurm.utils import filter_dict_from_args


[docs] def main(): parser = argparse.ArgumentParser( description="Slurm scancel mock", formatter_class=argparse.ArgumentDefaultsHelpFormatter ) user_group = parser.add_mutually_exclusive_group() parser.add_argument( "--account", "-A", type=str, dest="ACCOUNT", help="Restrict the scancel operation to jobs under this charge account", ) parser.add_argument( "--jobname", "-n", type=str, dest="NAME", help="Restrict the scancel operation to jobs with this job name" ) user_group.add_argument( "--me", action="store_true", dest="me", help="Restrict the scancel operation to jobs owned by the current account", ) parser.add_argument( "--nodelist", "-w", type=str, dest="NODELIST", help="Cancel any jobs using any of the given hosts. " "The list may be specified as a comma-separated list of hosts, a range of hosts " "(host[1-5,7,...] for example)", ) parser.add_argument( "--partition", "-P", type=str, dest="PARTITION", help="Restrict the scancel operation to jobs in this partition", ) parser.add_argument( "--quiet", "-Q", type=str, dest="quiet", help="Do not report an error if the specified job is already completed", ) parser.add_argument( "--reservation", "-R", type=str, dest="RESERVATION", help="Restrict the scancel operation to jobs with this reservation name", ) parser.add_argument( "--signal", "-s", type=str, dest="signal", default="SIGKILL", help="The name or number of the signal to send. If this option is not used the specified job or step will be terminated", ) parser.add_argument( "--user", "-u", type=str, dest="USER", help="Restrict the scancel operation to jobs owned by the given user", ) parser.add_argument("jobids", type=int, nargs="*", help="The Slurm job ID to be signaled") args = parser.parse_args() if ( not args.jobids and args.ACCOUNT is None and args.NAME is None and args.me is False and args.NODELIST is None and args.PARTITION is None and args.RESERVATION is None and args.USER is None ): print("scancel: error: No job identification provided") exit(1) if args.jobids: for id in args.jobids: if id < 0: print("scancel: error: Invalid job id {}".format(id)) exit(1) if args.me is not False: args.ACCOUNT = getpass.getuser() if args.signal in [signal.name for signal in signal.Signals]: args.signal = signal.Signals[args.signal] else: try: args.signal = int(args.signal) except: print("Unknown job signal: {}".format(args.signal)) exit(1) # Transform the arguments values into a map {field: value, field2: [values], etc...} for filtering DB field_filter_values = filter_dict_from_args(args) with get_db_file_handle(find_db_file()) as db_file: db = get_db(db_file) if db.shape[0] == 0: # db is empty: no jobs to cancel exit(0) # Get mask to select DB rows mask = get_filtered_DB_mask(db_file, field_filter_values) # filter job IDs if some were specified if args.jobids: mask[args.jobids] = True job_indices = np.nonzero(mask)[0] # Send signal for idx, job in zip(job_indices, db[mask]): if job["STATE"] == JobState.RUNNING: # job is started, we can kill it os.kill(job["PID"], args.signal) # set STATE to stopped immediately, actual exit STATE will be updated with exit code # in mock of sbatch if job was running # TODO: if signal wasn't meant to kill job, STATE is wrong ? update_db_value(db_file, idx, key="STATE", value=JobState.CANCELLED)
if __name__ == "__main__": main()