Coverage for mockslurm/mock_squeue.py: 85%
83 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-17 10:19 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-17 10:19 +0000
1#!/usr/bin/env python
2"""Implement a mock of the squeue command of slurm.
4The squeue -o, --format argument is supported except for the %all directive
5"""
7import argparse
8import datetime
9from typing import Callable, List, Tuple
11import numpy as np
13from mockslurm.process_db import (
14 JobReason,
15 JobState,
16 find_db_file,
17 get_db,
18 get_db_file_handle,
19 get_filtered_DB_mask,
20)
21from mockslurm.utils import filter_dict_from_args
23_SQUEUE_DEFAULT_SHORT_FORMAT = "%.18i %.9P %.8j %.8u %.2t %.10M %.6D %R"
24_SQUEUE_DEFAULT_LONG_FORMAT = "%.18i %.9P %.8j %.8u %.8T %.10M %.9l %.6D %R"
26_SQUEUE_STATE_CODE_LONG_TO_SHORT = {
27 JobState.PENDING: "PD",
28 JobState.RUNNING: "R",
29 JobState.FAILED: "F",
30 JobState.COMPLETED: "CD",
31 JobState.CANCELLED: "CA",
32}
35def count_nodes(nodelist: str) -> int:
36 """Count the number of nodes requested by users in the nodelist
38 Parameters
39 ----------
40 nodelist : str
41 Nodelist string in slurm format: comma separated list of nodes or node ranges
42 example: "cp12,cp18", "cp[10-14],cp28"
44 Returns
45 -------
46 int
47 Number of nodes concerned by the nodelist
48 """
49 if not nodelist: # mock considers no nodelist jobs get allocated 1 node 49 ↛ 50line 49 didn't jump to line 50 because the condition on line 49 was never true
50 return 1
52 nodecount = 0
53 nlist = nodelist.split(",")
54 for node_specs in nlist:
55 if "[" in node_specs:
56 _, node_specs = node_specs.split("[") # ignore node name
57 if "-" in node_specs: # count nodes in range
58 first, last = node_specs.split("-")
59 last = last.split("]")[0] # ignore closing "]" if any
60 nodecount += int(last) - int(first) + 1
61 else: # not a range: single node
62 nodecount += 1
64 return nodecount
67SQUEUE_FORMAT_LETTER_TO_DB_FIELD = {
68 # "": lambda job_info, _: job_info["PID"],
69 "i": ("JOBID", lambda _, job_idx: str(job_idx)),
70 "A": ("JOBID", lambda _, job_idx: str(job_idx)),
71 "j": ("NAME", lambda job_info, _: job_info["NAME"].decode()),
72 "u": ("USER", lambda job_info, _: job_info["USER"].decode()),
73 "a": ("ACCOUNT", lambda job_info, _: job_info["ACCOUNT"].decode()),
74 "P": ("PARTITION", lambda job_info, _: job_info["PARTITION"].decode()),
75 "v": ("RESERVATION", lambda job_info, _: job_info["RESERVATION"].decode()),
76 "M": (
77 "TIME",
78 lambda job_info,
79 _: ( # split is to remove the floating point part, that contains microseconds that should not be displayed
80 str(datetime.datetime.now() - datetime.datetime.fromtimestamp(job_info["START_TIME"])).split(".")[0]
81 if job_info["STATE"] == JobState.RUNNING
82 else "0:00:00"
83 ),
84 ),
85 "l": ("TIME_LIMIT", lambda job_info, _: "UNLIMITED"), # no time limit in mock
86 "n": ("REQ_NODES", lambda job_info, _: job_info["NODELIST"].decode()),
87 "N": ("NODELIST", lambda job_info, _: job_info["NODELIST"].decode()),
88 "D": ("NODES", lambda job_info, _: str(count_nodes(job_info["NODELIST"].decode()))),
89 "S": ("START_TIME", lambda job_info, _: job_info["START_TIME"]),
90 "V": (
91 "SUBMIT_TIME",
92 lambda job_info, _: job_info["START_TIME"].decode(),
93 ), # equal to start time for mock)
94 "o": ("COMMAND", lambda job_info, _: job_info["CMD"].decode()),
95 "r": ("REASON", lambda job_info, _: JobReason(job_info["REASON"]).name),
96 "t": ("ST", lambda job_info, _: _SQUEUE_STATE_CODE_LONG_TO_SHORT[job_info["STATE"]]),
97 "T": ("STATE", lambda job_info, _: JobState(job_info["STATE"]).name),
98 "R": (
99 "NODELIST(REASON)",
100 lambda job_info, _: (
101 "(" + JobReason(job_info["REASON"]).name + ")"
102 if JobReason(job_info["STATE"]) == JobState.PENDING
103 or JobReason(job_info["REASON"]) == JobReason.NonZeroExitCode
104 else job_info["NODELIST"].decode()
105 ),
106 ),
107 # "": lambda job_info, _: job_info["EXIT_CODE"],
108}
111def parse_squeue_format(squeue_format_str: str) -> Tuple[str, List[Callable]]:
112 """Convert squeue format argument (-o format) to a python format string and
113 list of function filling the values of the format string for each job DB row.
115 Parameters
116 ----------
117 squeue_format_str : str
118 Squeue format string, eg "%.18i %.9P %.8j %.8u %.2t %.10M %.6D %R".
120 Warning
121 -------
122 Does not support squeue's "%all" formatting string.
124 Returns
125 -------
126 Tuple[str, List[Callable]]
127 python format string, and a list of callable that should be used to fill the values
128 in the formatted string with a job DB row and job_index.
130 Examples
131 --------
132 >>> format_str, fields_filler_fcts = parse_squeue_format("%.18i %.9P %.8j %.8u %.2t %.10M %.6D %R")
133 >>> squeue_output = format_str.format(*[fct(job_DB_row, row_idx) for fct in fields_filler_fcts])
134 """
135 # Iterate on the format string and for each found %
136 # add a "{:ndigit}" to the python string, with optional ">" if "." follows "%"
137 # ndigit is found by converting the characters following %
138 # the function filling the value is taken from the map of squeue format letters to DB function
139 python_format_string = ""
140 fields_header = []
141 fields_filler_functions = []
142 # split 1st value is empty if string starts with delimiter
143 for field_format_str in squeue_format_str.split("%")[1:]:
144 python_format_string += "{:"
145 if field_format_str[0] == ".":
146 python_format_string += ">"
147 field_format_str = field_format_str[1:]
148 idx = 0
149 while field_format_str[idx].isdigit():
150 python_format_string += field_format_str[idx]
151 idx += 1
152 header, fct = SQUEUE_FORMAT_LETTER_TO_DB_FIELD[field_format_str[idx]]
153 fields_header.append(header)
154 fields_filler_functions.append(fct)
155 python_format_string += "}" + field_format_str[idx + 1 :]
157 return python_format_string, fields_header, fields_filler_functions
160def main():
161 parser = argparse.ArgumentParser(
162 description="Slurm scancel mock", formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False
163 )
164 user_group = parser.add_mutually_exclusive_group()
165 parser.add_argument(
166 "--account",
167 "-A",
168 type=str,
169 dest="ACCOUNT",
170 help="Specify the accounts of the jobs to view. Accepts a comma separated list of account names",
171 )
172 parser.add_argument(
173 "--name",
174 "-n",
175 type=str,
176 dest="NAME",
177 help="Request jobs having one of the specified names. The list consists of a comma separated list of job names.",
178 )
179 user_group.add_argument(
180 "--me",
181 action="store_true",
182 dest="me",
183 help="Equivalent to --user=<my username>",
184 )
185 parser.add_argument(
186 "--nodelist",
187 "-w",
188 type=str,
189 dest="NODELIST",
190 help="Report only on jobs allocated to the specified node or list of nodes",
191 )
192 parser.add_argument(
193 "--format",
194 "-o",
195 type=str,
196 default=_SQUEUE_DEFAULT_SHORT_FORMAT,
197 dest="format_str",
198 help="Specify the information to be displayed, its size and position",
199 )
200 parser.add_argument(
201 "--noheader", "-h", action="store_true", dest="no_header", help="Do not print a header on the output"
202 )
203 parser.add_argument(
204 "--long",
205 "-l",
206 action="store_true",
207 dest="long",
208 help="Report more of the available information for the selected jobs or job steps, subject to any constraints specified",
209 )
210 parser.add_argument(
211 "--partition",
212 "-p",
213 type=str,
214 dest="PARTITION",
215 help="Specify the partitions of the jobs or steps to view. Accepts a comma separated list of partition names",
216 )
217 parser.add_argument(
218 "--reservation",
219 "-R",
220 type=str,
221 dest="RESERVATION",
222 help="Specify the reservation of the jobs to view",
223 )
224 parser.add_argument(
225 "--usage",
226 action="store_true",
227 dest="print_help",
228 help="Print a brief help message listing the squeue options",
229 )
230 parser.add_argument(
231 "--jobs",
232 "-j",
233 type=str,
234 dest="jobs",
235 help="Specify a comma separated list of job IDs to display, Defaults to all jobs",
236 )
237 args = parser.parse_args()
239 if args.print_help: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true
240 parser.print_help()
242 if args.long and not args.format_str: 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true
243 args.format_str = _SQUEUE_DEFAULT_LONG_FORMAT
245 # Split args that take list in comma separated string to list!
246 args.ACCOUNT = args.ACCOUNT.split(",") if args.ACCOUNT is not None else None
247 args.NAME = args.NAME.split(",") if args.NAME is not None else None
248 args.PARTITION = args.PARTITION.split(",") if args.PARTITION is not None else None
249 args.jobs = [int(job_id) for job_id in args.jobs.split(",")] if args.jobs is not None else None
251 field_filter_values = filter_dict_from_args(args)
252 # filter out finished jobs
253 field_filter_values["STATE"] = [JobState.RUNNING, JobState.PENDING]
255 with get_db_file_handle(find_db_file()) as db_file:
256 # Get mask to select DB rows
257 mask = get_filtered_DB_mask(db_file, field_filter_values)
258 # filter job IDs if some were specified
259 if args.jobs and len(mask) > 0: # if mask is empty (no jobs in DB), skip 259 ↛ 262line 259 didn't jump to line 262 because the condition on line 259 was never true
260 # job IDs are just the index of the jobs in the DB!
261 # remove (silently like real squeue...) job IDs that do not exist
262 args.jobs = [job_ID for job_ID in args.jobs if 0 <= job_ID < len(mask)]
263 if not args.jobs: # no jobs remaining: exit with error like squeue
264 print("slurm_load_jobs error: Invalid job id specified")
265 exit(1)
266 mask[args.jobs] = True
268 # Get format string and function formating field value based on squeue --format
269 format_str, fields_header, fields_filler_fct = parse_squeue_format(args.format_str)
271 # Print header
272 if not args.no_header: 272 ↛ 276line 272 didn't jump to line 276 because the condition on line 272 was always true
273 print(format_str.format(*fields_header))
275 # Print the jobs found in filtered DB
276 job_indices = np.nonzero(mask)[0]
277 for idx, job in zip(job_indices, get_db(db_file)[mask]):
278 print(format_str.format(*[fct(job, idx) for fct in fields_filler_fct]))
281if __name__ == "__main__": 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true
282 main()