Coverage for mockslurm/process_db.py: 88%
82 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-17 10:19 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-17 10:19 +0000
1"""Implements the access to a "slurm database", stored in a HDF5 file.
3HDF5 files are locked during access, which provides a convenient method to
4not have several process modifying the same database easily
5"""
7import copy
8import datetime
9import getpass
10import os
11import time
12from enum import IntEnum
13from pathlib import Path
14from typing import Any, Dict, List
16import h5py
17import numpy as np
20class JobState(IntEnum):
21 COMPLETED = 1
22 RUNNING = 2
23 PENDING = 3
24 CANCELLED = 4
25 FAILED = 5
28class JobReason(IntEnum):
29 NOREASON = 1 # sometimes there are no reason: job is running for instance
30 WaitingForScheduling = 2
31 Dependency = 3
32 DependencyNeverSatisfied = 4
33 NonZeroExitCode = 5
34 JobLaunchFailure = 6
37_DB_DTYPE = np.dtype(
38 [
39 ("PID", np.int64),
40 ("NAME", np.dtype("S128")),
41 ("USER", np.dtype("S128")),
42 ("ACCOUNT", np.dtype("S128")),
43 ("PARTITION", np.dtype("S128")),
44 ("RESERVATION", np.dtype("S128")),
45 ("NODELIST", np.dtype("S16384")),
46 ("TIME", np.int64),
47 ("START_TIME", np.float64),
48 ("CMD", np.dtype("S16384")),
49 ("STATE", np.int64),
50 ("REASON", np.int64),
51 ("EXIT_CODE", np.int64),
52 ]
53)
55DB_DEFAULTS = {
56 "PID": -1,
57 "NAME": "wrap",
58 "USER": getpass.getuser(),
59 "ACCOUNT": getpass.getuser(),
60 "PARTITION": "",
61 "RESERVATION": "",
62 "NODELIST": "mocknode1",
63 "TIME": 0,
64 "START_TIME": datetime.datetime.now().timestamp(),
65 "CMD": "",
66 "STATE": JobState.PENDING,
67 "REASON": JobReason.WaitingForScheduling,
68 "EXIT_CODE": np.iinfo(np.int16).max,
69}
72def find_db_file() -> Path:
73 """Return a path to the location of the mock database hdf5 file.
75 The database file location is searched among various locations typically available.
77 Returns
78 -------
79 Path
80 Path to the mock database hdf5 file.
82 Raises
83 ------
84 FileNotFoundError
85 If a suitable location for the database file could not be found.
86 """
87 possible_locations = [Path("/tmp"), Path("/var/tmp"), Path(os.environ["HOME"]) / ".local", Path(".")]
88 mock_slurm_db = Path("mock_slurm_db.h5")
90 for p in possible_locations: 90 ↛ 96line 90 didn't jump to line 96 because the loop on line 90 didn't complete
91 # retrieve the user permission for the directory
92 # if it is 7, we can read, write and execute it so we can open the folder and write inside
93 if p.exists() and int(oct(p.stat().st_mode)[-3]) == 7: 93 ↛ 90line 93 didn't jump to line 90 because the condition on line 93 was always true
94 return p / mock_slurm_db
96 raise FileNotFoundError("Could not find a location to write mock slurm DB.")
99def open_file_retry_on_locked(file: Path, mode: str = "a", nb_retries: int = 40, wait_s: float = 0.01) -> h5py.File:
100 """Open `file` as an HDF5 file in `mode`.
102 Since the HDF5 files are locked when another process access them, this function tries to
103 open the file a certain number of time.
105 Parameters
106 ----------
107 file : Path
108 Path to the file to open
109 nb_retries : int, optional
110 Number of times to try to open the file, by default 40
111 wait_s : float, optional
112 Amount of time to wait between 2 attempts to open the file, in seconds, by default 0.1
114 Returns
115 -------
116 h5py.File
117 Open file handle to `file`.
118 """
119 # Failing to open a file on fefs doesn't necessarily mean we won't succeed next time !
120 for _ in range(nb_retries - 1): 120 ↛ 128line 120 didn't jump to line 128 because the loop on line 120 didn't complete
121 try:
122 f = h5py.File(file, mode)
123 return f
124 except BlockingIOError:
125 time.sleep(wait_s)
126 else:
127 # try 1 last time, let tables error raise if failing again
128 return h5py.File(file, mode)
131def clear_db():
132 """Deletes the database file."""
133 db_file = find_db_file()
134 if db_file.exists():
135 print("Deleting db at {}".format(db_file))
136 db_file.unlink()
137 else:
138 print("No file to delete.")
141def get_db_file_handle(db_file: Path) -> h5py.File:
142 """Open the database HDF5 file in append mode.
144 On success, the returned h5py.File contains a dataset with the expected dtypes of the database.
146 Parameters
147 ----------
148 db_file : Path
149 Path to the database file to open
151 Returns
152 -------
153 h5py.File
154 File handle to the database
155 """
156 if not db_file.exists():
157 db = np.empty(
158 dtype=_DB_DTYPE,
159 shape=(0,),
160 )
161 f = open_file_retry_on_locked(db_file)
162 f.create_dataset("SLURM_DB", data=db, maxshape=(None,))
163 return f
165 else:
166 return open_file_retry_on_locked(db_file)
169def get_db(db_file: h5py.File) -> h5py.Dataset:
170 """Get the Database as a dataset in the HDF5 file
172 Parameters
173 ----------
174 db_file : h5py.File
175 Opened database file handle
177 Returns
178 -------
179 h5py.Dataset
180 Dataset storing the database
181 """
182 return db_file["SLURM_DB"]
185def update_with_default_value(db_dict: Dict) -> Dict:
186 """Update `db_dict` missing fields with the default value in the database
188 Parameters
189 ----------
190 db_dict : Dict
191 dict containing a database row data, possibly missing some columns to be filled with default values
193 Returns
194 -------
195 Dict
196 dict with all fields expected by the database present, with `dict` values if present, otherwise default values
197 """
198 default_dict = copy.deepcopy(DB_DEFAULTS)
199 default_dict.update(db_dict)
200 return default_dict
203def append_job(db_file: h5py.File, **kwargs) -> int:
204 """Append a job to the database
206 Parameters
207 ----------
208 db_file : h5py.File
209 Opened file handle to the database
211 Returns
212 -------
213 int
214 Index of the job that was appended
215 """
216 dataset = get_db(db_file)
217 dataset.resize(dataset.shape[0] + 1, axis=0)
218 job_data = np.empty(dtype=dataset.dtype, shape=(1,))
219 for k, v in update_with_default_value(kwargs).items():
220 job_data[k] = v
221 dataset[-1] = job_data
222 return dataset.shape[0] - 1
225def update_db_value(db_file: h5py.File, index: int, key: str, value: Any):
226 """Update the `value` of `key` in the database, at `index`
228 Parameters
229 ----------
230 db_file : h5py.File
231 Opened file handle to the database HDF5 file
232 index : int
233 Index of the row to update
234 key : str
235 Field to update
236 value : Any
237 New value for the field
238 """
239 dataset = get_db(db_file)
240 update_value = dataset[index]
241 update_value[key] = value
242 dataset[index] = update_value
245def get_filtered_DB_mask(db_file: h5py.File, fields_values: Dict[str, str | List[str]]) -> np.ndarray:
246 """Get a mask selecting the DB rows where the field values are equal to `fields_values` values.
248 Parameters
249 ----------
250 db_file : h5py.File
251 Opened file handle to the database HDF5 file
252 fields_values : Dict[str, str | List[str]]
253 Map from fields to allowed fields values. Rows where the fields value is not equal to
254 one of the field values are not selected.
255 Key: field name, eg "NAME", "USER"
256 values: field value, eg "Robert", ["Robert", "Roberta"]
258 Returns
259 -------
260 np.ndarray
261 Index mask array, True where the row's fields are equal to the `field_values`.
262 """
263 db = get_db(db_file)
264 total_mask = np.ones(shape=(db.shape[0],), dtype=bool)
265 for field, values in fields_values.items():
266 mask = np.zeros(shape=(db.shape[0],), dtype=bool)
267 value_list = values if isinstance(values, list) else [values]
268 for v in value_list:
269 mask |= db[field] == v
270 total_mask &= mask
271 return total_mask