Coverage for mockslurm/process_db.py: 91%

82 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-04 23:38 +0000

1"""Implements the access to a "slurm database", stored in a HDF5 file. 

2 

3HDF5 files are locked during access, which provides a convenient method to  

4not have several process modifying the same database easily 

5""" 

6 

7import copy 

8import datetime 

9import getpass 

10import os 

11import time 

12from enum import IntEnum 

13from pathlib import Path 

14from typing import Any, Dict, List 

15 

16import h5py 

17import numpy as np 

18 

19 

20class JobState(IntEnum): 

21 COMPLETED = 1 

22 RUNNING = 2 

23 PENDING = 3 

24 CANCELLED = 4 

25 FAILED = 5 

26 

27 

28class JobReason(IntEnum): 

29 NOREASON = 1 # sometimes there are no reason: job is running for instance 

30 WaitingForScheduling = 2 

31 Dependency = 3 

32 DependencyNeverSatisfied = 4 

33 NonZeroExitCode = 5 

34 JobLaunchFailure = 6 

35 

36 

37_DB_DTYPE = np.dtype( 

38 [ 

39 ("PID", np.int64), 

40 ("NAME", np.dtype("S128")), 

41 ("USER", np.dtype("S128")), 

42 ("ACCOUNT", np.dtype("S128")), 

43 ("PARTITION", np.dtype("S128")), 

44 ("RESERVATION", np.dtype("S128")), 

45 ("NODELIST", np.dtype("S16384")), 

46 ("TIME", np.int64), 

47 ("START_TIME", np.float64), 

48 ("CMD", np.dtype("S16384")), 

49 ("STATE", np.int64), 

50 ("REASON", np.int64), 

51 ("EXIT_CODE", np.int64), 

52 ] 

53) 

54 

55DB_DEFAULTS = { 

56 "PID": -1, 

57 "NAME": "wrap", 

58 "USER": getpass.getuser(), 

59 "ACCOUNT": getpass.getuser(), 

60 "PARTITION": "", 

61 "RESERVATION": "", 

62 "NODELIST": "mocknode1", 

63 "TIME": 0, 

64 "START_TIME": datetime.datetime.now().timestamp(), 

65 "CMD": "", 

66 "STATE": JobState.PENDING, 

67 "REASON": JobReason.WaitingForScheduling, 

68 "EXIT_CODE": np.iinfo(np.int16).max, 

69} 

70 

71 

72def find_db_file() -> Path: 

73 """Return a path to the location of the mock database hdf5 file. 

74 

75 The database file location is searched among various locations typically available. 

76 

77 Returns 

78 ------- 

79 Path 

80 Path to the mock database hdf5 file. 

81 

82 Raises 

83 ------ 

84 FileNotFoundError 

85 If a suitable location for the database file could not be found. 

86 """ 

87 possible_locations = [Path("/tmp"), Path("/var/tmp"), Path(os.environ["HOME"]) / ".local", Path(".")] 

88 mock_slurm_db = Path("mock_slurm_db.h5") 

89 

90 for p in possible_locations: 

91 # retrieve the user permission for the directory 

92 # if it is 7, we can read, write and execute it so we can open the folder and write inside 

93 if p.exists() and int(oct(p.stat().st_mode)[-3]) == 7: 

94 return p / mock_slurm_db 

95 

96 raise FileNotFoundError("Could not find a location to write mock slurm DB.") 

97 

98 

99def open_file_retry_on_locked(file: Path, mode: str = "a", nb_retries: int = 40, wait_s: float = 0.01) -> h5py.File: 

100 """Open `file` as an HDF5 file in `mode`. 

101 

102 Since the HDF5 files are locked when another process access them, this function tries to 

103 open the file a certain number of time. 

104 

105 Parameters 

106 ---------- 

107 file : Path 

108 Path to the file to open 

109 nb_retries : int, optional 

110 Number of times to try to open the file, by default 40 

111 wait_s : float, optional 

112 Amount of time to wait between 2 attempts to open the file, in seconds, by default 0.1 

113 

114 Returns 

115 ------- 

116 h5py.File 

117 Open file handle to `file`. 

118 """ 

119 # Failing to open a file on fefs doesn't necessarily mean we won't succeed next time ! 

120 for _ in range(nb_retries - 1): 

121 try: 

122 f = h5py.File(file, mode) 

123 return f 

124 except BlockingIOError: 

125 time.sleep(wait_s) 

126 else: 

127 # try 1 last time, let tables error raise if failing again 

128 return h5py.File(file, mode) 

129 

130 

131def clear_db(): 

132 """Deletes the database file.""" 

133 db_file = find_db_file() 

134 if db_file.exists(): 

135 print("Deleting db at {}".format(db_file)) 

136 db_file.unlink() 

137 else: 

138 print("No file to delete.") 

139 

140 

141def get_db_file_handle(db_file: Path) -> h5py.File: 

142 """Open the database HDF5 file in append mode. 

143 

144 On success, the returned h5py.File contains a dataset with the expected dtypes of the database. 

145 

146 Parameters 

147 ---------- 

148 db_file : Path 

149 Path to the database file to open 

150 

151 Returns 

152 ------- 

153 h5py.File 

154 File handle to the database 

155 """ 

156 if not db_file.exists(): 

157 db = np.empty( 

158 dtype=_DB_DTYPE, 

159 shape=(0,), 

160 ) 

161 f = open_file_retry_on_locked(db_file) 

162 f.create_dataset("SLURM_DB", data=db, maxshape=(None,)) 

163 return f 

164 

165 else: 

166 return open_file_retry_on_locked(db_file) 

167 

168 

169def get_db(db_file: h5py.File) -> h5py.Dataset: 

170 """Get the Database as a dataset in the HDF5 file 

171 

172 Parameters 

173 ---------- 

174 db_file : h5py.File 

175 Opened database file handle 

176 

177 Returns 

178 ------- 

179 h5py.Dataset 

180 Dataset storing the database 

181 """ 

182 return db_file["SLURM_DB"] 

183 

184 

185def update_with_default_value(db_dict: Dict) -> Dict: 

186 """Update `db_dict` missing fields with the default value in the database 

187 

188 Parameters 

189 ---------- 

190 db_dict : Dict 

191 dict containing a database row data, possibly missing some columns to be filled with default values 

192 

193 Returns 

194 ------- 

195 Dict 

196 dict with all fields expected by the database present, with `dict` values if present, otherwise default values 

197 """ 

198 default_dict = copy.deepcopy(DB_DEFAULTS) 

199 default_dict.update(db_dict) 

200 return default_dict 

201 

202 

203def append_job(db_file: h5py.File, **kwargs) -> int: 

204 """Append a job to the database 

205 

206 Parameters 

207 ---------- 

208 db_file : h5py.File 

209 Opened file handle to the database 

210 

211 Returns 

212 ------- 

213 int 

214 Index of the job that was appended 

215 """ 

216 dataset = get_db(db_file) 

217 dataset.resize(dataset.shape[0] + 1, axis=0) 

218 job_data = np.empty(dtype=dataset.dtype, shape=(1,)) 

219 for k, v in update_with_default_value(kwargs).items(): 

220 job_data[k] = v 

221 dataset[-1] = job_data 

222 return dataset.shape[0] - 1 

223 

224 

225def update_db_value(db_file: h5py.File, index: int, key: str, value: Any): 

226 """Update the `value` of `key` in the database, at `index` 

227 

228 Parameters 

229 ---------- 

230 db_file : h5py.File 

231 Opened file handle to the database HDF5 file 

232 index : int 

233 Index of the row to update 

234 key : str 

235 Field to update 

236 value : Any 

237 New value for the field 

238 """ 

239 dataset = get_db(db_file) 

240 update_value = dataset[index] 

241 update_value[key] = value 

242 dataset[index] = update_value 

243 

244 

245def get_filtered_DB_mask(db_file: h5py.File, fields_values: Dict[str, str | List[str]]) -> np.ndarray: 

246 """Get a mask selecting the DB rows where the field values are equal to `fields_values` values. 

247 

248 Parameters 

249 ---------- 

250 db_file : h5py.File 

251 Opened file handle to the database HDF5 file 

252 fields_values : Dict[str, str | List[str]] 

253 Map from fields to allowed fields values. Rows where the fields value is not equal to 

254 one of the field values are not selected. 

255 Key: field name, eg "NAME", "USER" 

256 values: field value, eg "Robert", ["Robert", "Roberta"] 

257 

258 Returns 

259 ------- 

260 np.ndarray 

261 Index mask array, True where the row's fields are equal to the `field_values`. 

262 """ 

263 db = get_db(db_file) 

264 total_mask = np.ones(shape=(db.shape[0],), dtype=bool) 

265 for field, values in fields_values.items(): 

266 mask = np.zeros(shape=(db.shape[0],), dtype=bool) 

267 value_list = values if isinstance(values, list) else [values] 

268 for v in value_list: 

269 mask |= db[field] == v 

270 total_mask &= mask 

271 return total_mask