Coverage for mockslurm/mock_squeue.py: 85%

83 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-17 10:19 +0000

1#!/usr/bin/env python 

2"""Implement a mock of the squeue command of slurm. 

3 

4The squeue -o, --format argument is supported except for the %all directive 

5""" 

6 

7import argparse 

8import datetime 

9from typing import Callable, List, Tuple 

10 

11import numpy as np 

12 

13from mockslurm.process_db import ( 

14 JobReason, 

15 JobState, 

16 find_db_file, 

17 get_db, 

18 get_db_file_handle, 

19 get_filtered_DB_mask, 

20) 

21from mockslurm.utils import filter_dict_from_args 

22 

23_SQUEUE_DEFAULT_SHORT_FORMAT = "%.18i %.9P %.8j %.8u %.2t %.10M %.6D %R" 

24_SQUEUE_DEFAULT_LONG_FORMAT = "%.18i %.9P %.8j %.8u %.8T %.10M %.9l %.6D %R" 

25 

26_SQUEUE_STATE_CODE_LONG_TO_SHORT = { 

27 JobState.PENDING: "PD", 

28 JobState.RUNNING: "R", 

29 JobState.FAILED: "F", 

30 JobState.COMPLETED: "CD", 

31 JobState.CANCELLED: "CA", 

32} 

33 

34 

35def count_nodes(nodelist: str) -> int: 

36 """Count the number of nodes requested by users in the nodelist 

37 

38 Parameters 

39 ---------- 

40 nodelist : str 

41 Nodelist string in slurm format: comma separated list of nodes or node ranges 

42 example: "cp12,cp18", "cp[10-14],cp28" 

43 

44 Returns 

45 ------- 

46 int 

47 Number of nodes concerned by the nodelist 

48 """ 

49 if not nodelist: # mock considers no nodelist jobs get allocated 1 node 49 ↛ 50line 49 didn't jump to line 50 because the condition on line 49 was never true

50 return 1 

51 

52 nodecount = 0 

53 nlist = nodelist.split(",") 

54 for node_specs in nlist: 

55 if "[" in node_specs: 

56 _, node_specs = node_specs.split("[") # ignore node name 

57 if "-" in node_specs: # count nodes in range 

58 first, last = node_specs.split("-") 

59 last = last.split("]")[0] # ignore closing "]" if any 

60 nodecount += int(last) - int(first) + 1 

61 else: # not a range: single node 

62 nodecount += 1 

63 

64 return nodecount 

65 

66 

67SQUEUE_FORMAT_LETTER_TO_DB_FIELD = { 

68 # "": lambda job_info, _: job_info["PID"], 

69 "i": ("JOBID", lambda _, job_idx: str(job_idx)), 

70 "A": ("JOBID", lambda _, job_idx: str(job_idx)), 

71 "j": ("NAME", lambda job_info, _: job_info["NAME"].decode()), 

72 "u": ("USER", lambda job_info, _: job_info["USER"].decode()), 

73 "a": ("ACCOUNT", lambda job_info, _: job_info["ACCOUNT"].decode()), 

74 "P": ("PARTITION", lambda job_info, _: job_info["PARTITION"].decode()), 

75 "v": ("RESERVATION", lambda job_info, _: job_info["RESERVATION"].decode()), 

76 "M": ( 

77 "TIME", 

78 lambda job_info, 

79 _: ( # split is to remove the floating point part, that contains microseconds that should not be displayed 

80 str(datetime.datetime.now() - datetime.datetime.fromtimestamp(job_info["START_TIME"])).split(".")[0] 

81 if job_info["STATE"] == JobState.RUNNING 

82 else "0:00:00" 

83 ), 

84 ), 

85 "l": ("TIME_LIMIT", lambda job_info, _: "UNLIMITED"), # no time limit in mock 

86 "n": ("REQ_NODES", lambda job_info, _: job_info["NODELIST"].decode()), 

87 "N": ("NODELIST", lambda job_info, _: job_info["NODELIST"].decode()), 

88 "D": ("NODES", lambda job_info, _: str(count_nodes(job_info["NODELIST"].decode()))), 

89 "S": ("START_TIME", lambda job_info, _: job_info["START_TIME"]), 

90 "V": ( 

91 "SUBMIT_TIME", 

92 lambda job_info, _: job_info["START_TIME"].decode(), 

93 ), # equal to start time for mock) 

94 "o": ("COMMAND", lambda job_info, _: job_info["CMD"].decode()), 

95 "r": ("REASON", lambda job_info, _: JobReason(job_info["REASON"]).name), 

96 "t": ("ST", lambda job_info, _: _SQUEUE_STATE_CODE_LONG_TO_SHORT[job_info["STATE"]]), 

97 "T": ("STATE", lambda job_info, _: JobState(job_info["STATE"]).name), 

98 "R": ( 

99 "NODELIST(REASON)", 

100 lambda job_info, _: ( 

101 "(" + JobReason(job_info["REASON"]).name + ")" 

102 if JobReason(job_info["STATE"]) == JobState.PENDING 

103 or JobReason(job_info["REASON"]) == JobReason.NonZeroExitCode 

104 else job_info["NODELIST"].decode() 

105 ), 

106 ), 

107 # "": lambda job_info, _: job_info["EXIT_CODE"], 

108} 

109 

110 

111def parse_squeue_format(squeue_format_str: str) -> Tuple[str, List[Callable]]: 

112 """Convert squeue format argument (-o format) to a python format string and 

113 list of function filling the values of the format string for each job DB row. 

114 

115 Parameters 

116 ---------- 

117 squeue_format_str : str 

118 Squeue format string, eg "%.18i %.9P %.8j %.8u %.2t %.10M %.6D %R". 

119 

120 Warning 

121 ------- 

122 Does not support squeue's "%all" formatting string. 

123 

124 Returns 

125 ------- 

126 Tuple[str, List[Callable]] 

127 python format string, and a list of callable that should be used to fill the values 

128 in the formatted string with a job DB row and job_index. 

129 

130 Examples 

131 -------- 

132 >>> format_str, fields_filler_fcts = parse_squeue_format("%.18i %.9P %.8j %.8u %.2t %.10M %.6D %R") 

133 >>> squeue_output = format_str.format(*[fct(job_DB_row, row_idx) for fct in fields_filler_fcts]) 

134 """ 

135 # Iterate on the format string and for each found % 

136 # add a "{:ndigit}" to the python string, with optional ">" if "." follows "%" 

137 # ndigit is found by converting the characters following % 

138 # the function filling the value is taken from the map of squeue format letters to DB function 

139 python_format_string = "" 

140 fields_header = [] 

141 fields_filler_functions = [] 

142 # split 1st value is empty if string starts with delimiter 

143 for field_format_str in squeue_format_str.split("%")[1:]: 

144 python_format_string += "{:" 

145 if field_format_str[0] == ".": 

146 python_format_string += ">" 

147 field_format_str = field_format_str[1:] 

148 idx = 0 

149 while field_format_str[idx].isdigit(): 

150 python_format_string += field_format_str[idx] 

151 idx += 1 

152 header, fct = SQUEUE_FORMAT_LETTER_TO_DB_FIELD[field_format_str[idx]] 

153 fields_header.append(header) 

154 fields_filler_functions.append(fct) 

155 python_format_string += "}" + field_format_str[idx + 1 :] 

156 

157 return python_format_string, fields_header, fields_filler_functions 

158 

159 

160def main(): 

161 parser = argparse.ArgumentParser( 

162 description="Slurm scancel mock", formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False 

163 ) 

164 user_group = parser.add_mutually_exclusive_group() 

165 parser.add_argument( 

166 "--account", 

167 "-A", 

168 type=str, 

169 dest="ACCOUNT", 

170 help="Specify the accounts of the jobs to view. Accepts a comma separated list of account names", 

171 ) 

172 parser.add_argument( 

173 "--name", 

174 "-n", 

175 type=str, 

176 dest="NAME", 

177 help="Request jobs having one of the specified names. The list consists of a comma separated list of job names.", 

178 ) 

179 user_group.add_argument( 

180 "--me", 

181 action="store_true", 

182 dest="me", 

183 help="Equivalent to --user=<my username>", 

184 ) 

185 parser.add_argument( 

186 "--nodelist", 

187 "-w", 

188 type=str, 

189 dest="NODELIST", 

190 help="Report only on jobs allocated to the specified node or list of nodes", 

191 ) 

192 parser.add_argument( 

193 "--format", 

194 "-o", 

195 type=str, 

196 default=_SQUEUE_DEFAULT_SHORT_FORMAT, 

197 dest="format_str", 

198 help="Specify the information to be displayed, its size and position", 

199 ) 

200 parser.add_argument( 

201 "--noheader", "-h", action="store_true", dest="no_header", help="Do not print a header on the output" 

202 ) 

203 parser.add_argument( 

204 "--long", 

205 "-l", 

206 action="store_true", 

207 dest="long", 

208 help="Report more of the available information for the selected jobs or job steps, subject to any constraints specified", 

209 ) 

210 parser.add_argument( 

211 "--partition", 

212 "-p", 

213 type=str, 

214 dest="PARTITION", 

215 help="Specify the partitions of the jobs or steps to view. Accepts a comma separated list of partition names", 

216 ) 

217 parser.add_argument( 

218 "--reservation", 

219 "-R", 

220 type=str, 

221 dest="RESERVATION", 

222 help="Specify the reservation of the jobs to view", 

223 ) 

224 parser.add_argument( 

225 "--usage", 

226 action="store_true", 

227 dest="print_help", 

228 help="Print a brief help message listing the squeue options", 

229 ) 

230 parser.add_argument( 

231 "--jobs", 

232 "-j", 

233 type=str, 

234 dest="jobs", 

235 help="Specify a comma separated list of job IDs to display, Defaults to all jobs", 

236 ) 

237 args = parser.parse_args() 

238 

239 if args.print_help: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true

240 parser.print_help() 

241 

242 if args.long and not args.format_str: 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true

243 args.format_str = _SQUEUE_DEFAULT_LONG_FORMAT 

244 

245 # Split args that take list in comma separated string to list! 

246 args.ACCOUNT = args.ACCOUNT.split(",") if args.ACCOUNT is not None else None 

247 args.NAME = args.NAME.split(",") if args.NAME is not None else None 

248 args.PARTITION = args.PARTITION.split(",") if args.PARTITION is not None else None 

249 args.jobs = [int(job_id) for job_id in args.jobs.split(",")] if args.jobs is not None else None 

250 

251 field_filter_values = filter_dict_from_args(args) 

252 # filter out finished jobs 

253 field_filter_values["STATE"] = [JobState.RUNNING, JobState.PENDING] 

254 

255 with get_db_file_handle(find_db_file()) as db_file: 

256 # Get mask to select DB rows 

257 mask = get_filtered_DB_mask(db_file, field_filter_values) 

258 # filter job IDs if some were specified 

259 if args.jobs and len(mask) > 0: # if mask is empty (no jobs in DB), skip 259 ↛ 262line 259 didn't jump to line 262 because the condition on line 259 was never true

260 # job IDs are just the index of the jobs in the DB! 

261 # remove (silently like real squeue...) job IDs that do not exist 

262 args.jobs = [job_ID for job_ID in args.jobs if 0 <= job_ID < len(mask)] 

263 if not args.jobs: # no jobs remaining: exit with error like squeue 

264 print("slurm_load_jobs error: Invalid job id specified") 

265 exit(1) 

266 mask[args.jobs] = True 

267 

268 # Get format string and function formating field value based on squeue --format 

269 format_str, fields_header, fields_filler_fct = parse_squeue_format(args.format_str) 

270 

271 # Print header 

272 if not args.no_header: 272 ↛ 276line 272 didn't jump to line 276 because the condition on line 272 was always true

273 print(format_str.format(*fields_header)) 

274 

275 # Print the jobs found in filtered DB 

276 job_indices = np.nonzero(mask)[0] 

277 for idx, job in zip(job_indices, get_db(db_file)[mask]): 

278 print(format_str.format(*[fct(job, idx) for fct in fields_filler_fct])) 

279 

280 

281if __name__ == "__main__": 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true

282 main()