#!/usr/bin/env python
"""Implement a mock of the scancel command of slurm.
The jobs to cancel are found by querying the database for job IDs or job names,
filtering the jobs that are already completed (return code != default value),
then the signal to transmit (default SIGKILL) is send.
"""
import argparse
import getpass
import os
import signal
import numpy as np
from mockslurm.process_db import (
JobState,
find_db_file,
get_db,
get_db_file_handle,
get_filtered_DB_mask,
update_db_value,
)
from mockslurm.utils import filter_dict_from_args
[docs]
def main():
parser = argparse.ArgumentParser(
description="Slurm scancel mock", formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
user_group = parser.add_mutually_exclusive_group()
parser.add_argument(
"--account",
"-A",
type=str,
dest="ACCOUNT",
help="Restrict the scancel operation to jobs under this charge account",
)
parser.add_argument(
"--jobname", "-n", type=str, dest="NAME", help="Restrict the scancel operation to jobs with this job name"
)
user_group.add_argument(
"--me",
action="store_true",
dest="me",
help="Restrict the scancel operation to jobs owned by the current account",
)
parser.add_argument(
"--nodelist",
"-w",
type=str,
dest="NODELIST",
help="Cancel any jobs using any of the given hosts. "
"The list may be specified as a comma-separated list of hosts, a range of hosts "
"(host[1-5,7,...] for example)",
)
parser.add_argument(
"--partition",
"-P",
type=str,
dest="PARTITION",
help="Restrict the scancel operation to jobs in this partition",
)
parser.add_argument(
"--quiet",
"-Q",
type=str,
dest="quiet",
help="Do not report an error if the specified job is already completed",
)
parser.add_argument(
"--reservation",
"-R",
type=str,
dest="RESERVATION",
help="Restrict the scancel operation to jobs with this reservation name",
)
parser.add_argument(
"--signal",
"-s",
type=str,
dest="signal",
default="SIGKILL",
help="The name or number of the signal to send. If this option is not used the specified job or step will be terminated",
)
parser.add_argument(
"--user",
"-u",
type=str,
dest="USER",
help="Restrict the scancel operation to jobs owned by the given user",
)
parser.add_argument("jobids", type=int, nargs="*", help="The Slurm job ID to be signaled")
args = parser.parse_args()
if (
not args.jobids
and args.ACCOUNT is None
and args.NAME is None
and args.me is False
and args.NODELIST is None
and args.PARTITION is None
and args.RESERVATION is None
and args.USER is None
):
print("scancel: error: No job identification provided")
exit(1)
if args.jobids:
for id in args.jobids:
if id < 0:
print("scancel: error: Invalid job id {}".format(id))
exit(1)
if args.me is not False:
args.ACCOUNT = getpass.getuser()
if args.signal in [signal.name for signal in signal.Signals]:
args.signal = signal.Signals[args.signal]
else:
try:
args.signal = int(args.signal)
except:
print("Unknown job signal: {}".format(args.signal))
exit(1)
# Transform the arguments values into a map {field: value, field2: [values], etc...} for filtering DB
field_filter_values = filter_dict_from_args(args)
with get_db_file_handle(find_db_file()) as db_file:
db = get_db(db_file)
if db.shape[0] == 0: # db is empty: no jobs to cancel
exit(0)
# Get mask to select DB rows
mask = get_filtered_DB_mask(db_file, field_filter_values)
# filter job IDs if some were specified
if args.jobids:
mask[args.jobids] = True
job_indices = np.nonzero(mask)[0]
# Send signal
for idx, job in zip(job_indices, db[mask]):
if job["STATE"] == JobState.RUNNING: # job is started, we can kill it
os.kill(job["PID"], args.signal)
# set STATE to stopped immediately, actual exit STATE will be updated with exit code
# in mock of sbatch if job was running
# TODO: if signal wasn't meant to kill job, STATE is wrong ?
update_db_value(db_file, idx, key="STATE", value=JobState.CANCELLED)
if __name__ == "__main__":
main()