Coverage for mockslurm/mock_scancel.py: 70%

53 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-04 23:38 +0000

1#!/usr/bin/env python 

2"""Implement a mock of the scancel command of slurm. 

3 

4The jobs to cancel are found by querying the database for job IDs or job names,  

5filtering the jobs that are already completed (return code != default value),  

6then the signal to transmit (default SIGKILL) is send. 

7""" 

8 

9import argparse 

10import getpass 

11import os 

12import signal 

13 

14import numpy as np 

15from mockslurm.process_db import ( 

16 JobState, 

17 find_db_file, 

18 get_db, 

19 get_db_file_handle, 

20 get_filtered_DB_mask, 

21 update_db_value, 

22) 

23from mockslurm.utils import filter_dict_from_args 

24 

25 

26def main(): 

27 parser = argparse.ArgumentParser( 

28 description="Slurm scancel mock", formatter_class=argparse.ArgumentDefaultsHelpFormatter 

29 ) 

30 user_group = parser.add_mutually_exclusive_group() 

31 parser.add_argument( 

32 "--account", 

33 "-A", 

34 type=str, 

35 dest="ACCOUNT", 

36 help="Restrict the scancel operation to jobs under this charge account", 

37 ) 

38 parser.add_argument( 

39 "--jobname", "-n", type=str, dest="NAME", help="Restrict the scancel operation to jobs with this job name" 

40 ) 

41 user_group.add_argument( 

42 "--me", 

43 action="store_true", 

44 dest="me", 

45 help="Restrict the scancel operation to jobs owned by the current account", 

46 ) 

47 parser.add_argument( 

48 "--nodelist", 

49 "-w", 

50 type=str, 

51 dest="NODELIST", 

52 help="Cancel any jobs using any of the given hosts. " 

53 "The list may be specified as a comma-separated list of hosts, a range of hosts " 

54 "(host[1-5,7,...] for example)", 

55 ) 

56 parser.add_argument( 

57 "--partition", 

58 "-P", 

59 type=str, 

60 dest="PARTITION", 

61 help="Restrict the scancel operation to jobs in this partition", 

62 ) 

63 parser.add_argument( 

64 "--quiet", 

65 "-Q", 

66 type=str, 

67 dest="quiet", 

68 help="Do not report an error if the specified job is already completed", 

69 ) 

70 parser.add_argument( 

71 "--reservation", 

72 "-R", 

73 type=str, 

74 dest="RESERVATION", 

75 help="Restrict the scancel operation to jobs with this reservation name", 

76 ) 

77 parser.add_argument( 

78 "--signal", 

79 "-s", 

80 type=str, 

81 dest="signal", 

82 default="SIGKILL", 

83 help="The name or number of the signal to send. If this option is not used the specified job or step will be terminated", 

84 ) 

85 parser.add_argument( 

86 "--user", 

87 "-u", 

88 type=str, 

89 dest="USER", 

90 help="Restrict the scancel operation to jobs owned by the given user", 

91 ) 

92 parser.add_argument("jobids", type=int, nargs="*", help="The Slurm job ID to be signaled") 

93 args = parser.parse_args() 

94 if ( 

95 not args.jobids 

96 and args.ACCOUNT is None 

97 and args.NAME is None 

98 and args.me is False 

99 and args.NODELIST is None 

100 and args.PARTITION is None 

101 and args.RESERVATION is None 

102 and args.USER is None 

103 ): 

104 print("scancel: error: No job identification provided") 

105 exit(1) 

106 

107 if args.jobids: 

108 for id in args.jobids: 

109 if id < 1: 

110 print("scancel: error: Invalid job id {}".format(id)) 

111 exit(1) 

112 

113 if args.me is not False: 

114 args.ACCOUNT = getpass.getuser() 

115 

116 if args.signal in [signal.name for signal in signal.Signals]: 

117 args.signal = signal.Signals[args.signal] 

118 else: 

119 try: 

120 args.signal = int(args.signal) 

121 except: 

122 print("Unknown job signal: {}".format(args.signal)) 

123 exit(1) 

124 

125 # Transform the arguments values into a map {field: value, field2: [values], etc...} for filtering DB 

126 field_filter_values = filter_dict_from_args(args) 

127 

128 with get_db_file_handle(find_db_file()) as db_file: 

129 db = get_db(db_file) 

130 if db.shape[0] == 0: # db is empty: no jobs to cancel 

131 exit(0) 

132 

133 # Get mask to select DB rows 

134 mask = get_filtered_DB_mask(db_file, field_filter_values) 

135 # filter job IDs if some were specified 

136 if args.jobids: 

137 mask[args.jobids] = True 

138 

139 job_indices = np.nonzero(mask)[0] 

140 # Send signal 

141 for idx, job in zip(job_indices, db[mask]): 

142 if job["STATE"] == JobState.RUNNING: # job is started, we can kill it 

143 os.kill(job["PID"], args.signal) 

144 # set STATE to stopped immediately, actual exit STATE will be updated with exit code 

145 # in mock of sbatch if job was running 

146 # TODO: if signal wasn't meant to kill job, STATE is wrong ? 

147 update_db_value(db_file, idx, key="STATE", value=JobState.CANCELLED) 

148 

149 

150if __name__ == "__main__": 

151 main()