Coverage for bzfs_main/parallel_engine.py: 100%

183 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-06 13:30 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Fault-tolerant, dependency-aware scheduling and execution of parallel operations, ensuring that ancestor datasets finish 

16before descendants start; The design maximizes throughput while preventing inconsistent dataset states during replication or 

17snapshot deletion.""" 

18 

19from __future__ import annotations 

20import argparse 

21import concurrent 

22import heapq 

23import logging 

24import os 

25import subprocess 

26import time 

27from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor 

28from logging import Logger 

29from typing import ( 

30 Any, 

31 Callable, 

32 Dict, 

33 NamedTuple, 

34) 

35 

36from bzfs_main.retry import ( 

37 Retry, 

38 RetryPolicy, 

39 run_with_retries, 

40) 

41from bzfs_main.utils import ( 

42 DONT_SKIP_DATASET, 

43 Interner, 

44 SortedInterner, 

45 dry, 

46 has_duplicates, 

47 human_readable_duration, 

48 is_descendant, 

49 terminate_process_subtree, 

50) 

51 

52# constants: 

53BARRIER_CHAR: str = "~" 

54 

55 

56############################################################################# 

57Tree = Dict[str, "Tree"] # Type alias 

58 

59 

60def _build_dataset_tree(sorted_datasets: list[str]) -> Tree: 

61 """Takes as input a sorted list of datasets and returns a sorted directory tree containing the same dataset names, in the 

62 form of nested dicts.""" 

63 tree: Tree = {} 

64 interner: Interner[str] = Interner() # reduces memory footprint 

65 for dataset in sorted_datasets: 

66 current: Tree = tree 

67 for component in dataset.split("/"): 

68 child: Tree | None = current.get(component) 

69 if child is None: 

70 child = {} 

71 component = interner.intern(component) 

72 current[component] = child 

73 current = child 

74 shared_empty_leaf: Tree = {} 

75 

76 def compact(node: Tree) -> None: 

77 """Tree with shared empty leaf nodes has some 30% lower memory footprint than the non-compacted version.""" 

78 for key, child_node in node.items(): 

79 if len(child_node) == 0: 

80 node[key] = shared_empty_leaf # sharing is safe because the tree is treated as immutable henceforth 

81 else: 

82 compact(child_node) 

83 

84 compact(tree) 

85 return tree 

86 

87 

88def _build_dataset_tree_and_find_roots(sorted_datasets: list[str]) -> list[TreeNode]: 

89 """For consistency, processing of a dataset only starts after processing of its ancestors has completed.""" 

90 tree: Tree = _build_dataset_tree(sorted_datasets) # tree consists of nested dictionaries 

91 skip_dataset: str = DONT_SKIP_DATASET 

92 roots: list[TreeNode] = [] 

93 for dataset in sorted_datasets: 

94 if is_descendant(dataset, of_root_dataset=skip_dataset): 

95 continue 

96 skip_dataset = dataset 

97 children: Tree = tree 

98 for component in dataset.split("/"): 

99 children = children[component] 

100 roots.append(_make_tree_node(dataset, children)) 

101 return roots 

102 

103 

104class TreeNodeMutableAttributes: 

105 """Container for mutable attributes, stored space efficiently.""" 

106 

107 __slots__ = ("barrier", "pending") # uses more compact memory layout than __dict__ 

108 

109 def __init__(self) -> None: 

110 self.barrier: TreeNode | None = None # zero or one barrier TreeNode waiting for this node to complete 

111 self.pending: int = 0 # number of children added to priority queue that haven't completed their work yet 

112 

113 

114class TreeNode(NamedTuple): 

115 """Node in dataset dependency tree used by the scheduler; TreeNodes are ordered by dataset name within a priority queue 

116 via __lt__ comparisons.""" 

117 

118 dataset: str # Each dataset name is unique, thus attributes other than `dataset` are never used for comparisons 

119 children: Tree # dataset "directory" tree consists of nested dicts; aka Dict[str, Dict] 

120 parent: Any # aka TreeNode 

121 mut: TreeNodeMutableAttributes 

122 

123 def __repr__(self) -> str: 

124 dataset, pending, barrier, nchildren = self.dataset, self.mut.pending, self.mut.barrier, len(self.children) 

125 return str({"dataset": dataset, "pending": pending, "barrier": barrier is not None, "nchildren": nchildren}) 

126 

127 

128def _make_tree_node(dataset: str, children: Tree, parent: TreeNode | None = None) -> TreeNode: 

129 """Creates a TreeNode with mutable state container.""" 

130 return TreeNode(dataset, children, parent, TreeNodeMutableAttributes()) 

131 

132 

133def process_datasets_in_parallel_and_fault_tolerant( 

134 log: Logger, 

135 datasets: list[str], 

136 process_dataset: Callable[[str, str, Retry], bool], # lambda[dataset, tid, Retry]; must be thread-safe 

137 skip_tree_on_error: Callable[[str], bool], # lambda[dataset] 

138 skip_on_error: str = "fail", 

139 max_workers: int = os.cpu_count() or 1, 

140 interval_nanos: Callable[[str], int] = lambda dataset: 0, # optionally, spread tasks out over time; e.g. for jitter 

141 task_name: str = "Task", 

142 enable_barriers: bool | None = None, # for testing only; None means 'auto-detect' 

143 append_exception: Callable[[BaseException, str, str], None] = lambda ex, task_name, task_description: None, 

144 retry_policy: RetryPolicy | None = None, 

145 dry_run: bool = False, 

146 is_test_mode: bool = False, 

147) -> bool: 

148 """Executes dataset processing operations in parallel with dependency-aware scheduling and fault tolerance. 

149 

150 This function orchestrates parallel execution of dataset operations while maintaining strict hierarchical 

151 dependencies. Processing of a dataset only starts after processing of all its ancestor datasets has completed, 

152 ensuring data consistency during operations like ZFS replication or snapshot deletion. 

153 

154 Purpose: 

155 -------- 

156 - Process hierarchical datasets in parallel while respecting parent-child dependencies 

157 - Provide fault tolerance with configurable error handling and retry mechanisms 

158 - Maximize throughput by processing independent dataset subtrees in parallel 

159 - Support complex job scheduling patterns via optional barrier synchronization 

160 

161 Assumptions: 

162 ----------------- 

163 - Input `datasets` list is sorted in lexicographical order (enforced in test mode) 

164 - Input `datasets` list contains no duplicate entries (enforced in test mode) 

165 - Dataset hierarchy is determined by slash-separated path components 

166 - The `process_dataset` callable is thread-safe and can be executed in parallel 

167 

168 Design Rationale: 

169 ----------------- 

170 - The implementation uses a priority queue-based scheduler that maintains two key invariants: 

171 

172 - Dependency Ordering: Children are only made available for start of processing after their parent completes, 

173 preventing inconsistent dataset states. 

174 

175 - Lexicographical Priority: Among the datasets available for start of processing, the lexicographically smallest 

176 is always processed next, ensuring more deterministic execution order. 

177 

178 Algorithm Selection: 

179 -------------------- 

180 - Simple Algorithm (default): Used when no barriers ('~') are detected in dataset names. Provides efficient 

181 scheduling for standard parent-child dependencies via recursive child enqueueing after parent completion. 

182 

183 - Barrier Algorithm (advanced): Activated when barriers are detected or explicitly enabled. Supports complex 

184 synchronization scenarios where jobs must wait for completion of entire subtrees before proceeding. Essential 

185 for advanced job scheduling patterns like "complete all parallel replications before starting pruning phase." 

186 

187 - Both algorithms are CPU and memory efficient. They require main memory proportional to the number of dataset 

188 names (~400 bytes per dataset name), and easily scale to millions of datasets. Time complexity is O(N log N), 

189 where N is the number of datasets. 

190 

191 Error Handling Strategy: 

192 ------------------------ 

193 - "fail": Immediately terminate all processing on first error (fail-fast) 

194 - "dataset": Skip failed dataset but continue processing others 

195 - "tree": Skip entire subtree rooted at failed dataset, determined by `skip_tree_on_error` 

196 

197 Concurrency Design: 

198 ------------------- 

199 Uses ThreadPoolExecutor with configurable worker limits to balance parallelism against resource consumption. The 

200 single-threaded coordination loop prevents race conditions while worker threads execute dataset operations in parallel. 

201 

202 Params: 

203 ------- 

204 - datasets: Sorted list of dataset names to process (must not contain duplicates) 

205 - process_dataset: Thread-safe function to execute on each dataset 

206 - skip_tree_on_error: Function determining whether to skip subtree on error 

207 - max_workers: Maximum number of parallel worker threads 

208 - enable_barriers: Force enable/disable barrier algorithm (None = auto-detect) 

209 

210 Returns: 

211 -------- 

212 bool: True if any dataset processing failed, False if all succeeded 

213 

214 Raises: 

215 ------- 

216 AssertionError: If input validation fails (sorted order, no duplicates, etc.) 

217 Various exceptions: Propagated from process_dataset when skip_on_error="fail" 

218 """ 

219 assert (not is_test_mode) or datasets == sorted(datasets), "List is not sorted" 

220 assert (not is_test_mode) or not has_duplicates(datasets), "List contains duplicates" 

221 assert callable(process_dataset) 

222 assert callable(skip_tree_on_error) 

223 assert max_workers > 0 

224 assert callable(interval_nanos) 

225 assert "%" not in task_name 

226 has_barrier: bool = any(BARRIER_CHAR in dataset.split("/") for dataset in datasets) 

227 assert (enable_barriers is not False) or not has_barrier 

228 barriers_enabled: bool = bool(has_barrier or enable_barriers) 

229 assert callable(append_exception) 

230 retry_policy = ( 

231 retry_policy 

232 if retry_policy is not None 

233 else RetryPolicy( # no retries 

234 argparse.Namespace(retries=0, retry_min_sleep_secs=0, retry_max_sleep_secs=0, retry_max_elapsed_secs=0) 

235 ) 

236 ) 

237 is_debug: bool = log.isEnabledFor(logging.DEBUG) 

238 

239 def _process_dataset(dataset: str, tid: str) -> bool: 

240 """Runs ``process_dataset`` with retries and logs duration.""" 

241 start_time_nanos: int = time.monotonic_ns() 

242 try: 

243 return run_with_retries(log, retry_policy, process_dataset, dataset, tid) 

244 finally: 

245 if is_debug: 

246 elapsed_duration: str = human_readable_duration(time.monotonic_ns() - start_time_nanos) 

247 log.debug(dry(f"{tid} {task_name} done: %s took %s", dry_run), dataset, elapsed_duration) 

248 

249 assert (not is_test_mode) or str(_make_tree_node("foo", {})) 

250 immutable_empty_barrier: TreeNode = _make_tree_node("immutable_empty_barrier", {}) 

251 priority_queue: list[TreeNode] = _build_dataset_tree_and_find_roots(datasets) 

252 heapq.heapify(priority_queue) # same order as sorted() 

253 len_datasets: int = len(datasets) 

254 datasets_set: SortedInterner[str] = SortedInterner(datasets) # reduces memory footprint 

255 with ThreadPoolExecutor(max_workers=max_workers) as executor: 

256 todo_futures: set[Future[Any]] = set() 

257 future_to_node: dict[Future[Any], TreeNode] = {} 

258 submitted: int = 0 

259 next_update_nanos: int = time.monotonic_ns() 

260 fw_timeout: float | None = None 

261 

262 def submit_datasets() -> bool: 

263 """Submits available datasets to worker threads and returns False if all tasks have been completed.""" 

264 nonlocal fw_timeout 

265 fw_timeout = None # indicates to use blocking flavor of concurrent.futures.wait() 

266 while len(priority_queue) > 0 and len(todo_futures) < max_workers: 

267 # pick "smallest" dataset (wrt. sort order) available for start of processing; submit to thread pool 

268 nonlocal next_update_nanos 

269 sleep_nanos: int = next_update_nanos - time.monotonic_ns() 

270 if sleep_nanos > 0: 

271 time.sleep(sleep_nanos / 1_000_000_000) # seconds 

272 if sleep_nanos > 0 and len(todo_futures) > 0: 

273 fw_timeout = 0 # indicates to use non-blocking flavor of concurrent.futures.wait() 

274 # It's possible an even "smaller" dataset (wrt. sort order) has become available while we slept. 

275 # If so it's preferable to submit to the thread pool the smaller one first. 

276 break # break out of loop to check if that's the case via non-blocking concurrent.futures.wait() 

277 node: TreeNode = heapq.heappop(priority_queue) # pick "smallest" dataset (wrt. sort order) 

278 next_update_nanos += max(0, interval_nanos(node.dataset)) 

279 nonlocal submitted 

280 submitted += 1 

281 future: Future[Any] = executor.submit(_process_dataset, node.dataset, tid=f"{submitted}/{len_datasets}") 

282 future_to_node[future] = node 

283 todo_futures.add(future) 

284 return len(todo_futures) > 0 

285 

286 # coordination loop; runs in the (single) main thread; submits tasks to worker threads and handles their results 

287 failed: bool = False 

288 while submit_datasets(): 

289 done_futures: set[Future[Any]] 

290 done_futures, todo_futures = concurrent.futures.wait(todo_futures, fw_timeout, return_when=FIRST_COMPLETED) 

291 for done_future in done_futures: 

292 done_future_node: TreeNode = future_to_node.pop(done_future) 

293 dataset: str = done_future_node.dataset 

294 try: 

295 no_skip: bool = done_future.result() # does not block as processing has already completed 

296 except (subprocess.CalledProcessError, subprocess.TimeoutExpired, SystemExit, UnicodeDecodeError) as e: 

297 failed = True 

298 if skip_on_error == "fail": 

299 [todo_future.cancel() for todo_future in todo_futures] 

300 terminate_process_subtree(except_current_process=True) 

301 raise 

302 no_skip = not (skip_on_error == "tree" or skip_tree_on_error(dataset)) 

303 log.error("%s", e) 

304 append_exception(e, task_name, dataset) 

305 

306 if not barriers_enabled: 

307 # This simple algorithm is sufficient for almost all use cases: 

308 def simple_enqueue_children(node: TreeNode) -> None: 

309 """Recursively enqueues child nodes for start of processing.""" 

310 for child, grandchildren in node.children.items(): # as processing of parent has now completed 

311 child_node: TreeNode = _make_tree_node( 

312 datasets_set.interned(f"{node.dataset}/{child}"), grandchildren 

313 ) 

314 if child_node.dataset in datasets_set: 

315 heapq.heappush(priority_queue, child_node) # make it available for start of processing 

316 else: # it's an intermediate node that has no job attached; pass the enqueue operation 

317 simple_enqueue_children(child_node) # ... recursively down the tree 

318 

319 if no_skip: 

320 simple_enqueue_children(done_future_node) 

321 else: 

322 # The (more complex) algorithm below is for more general job scheduling, as in bzfs_jobrunner. 

323 # Here, a "dataset" string is treated as an identifier for any kind of job rather than a reference 

324 # to a concrete ZFS object. Example "dataset" job string: "src_host1/createsnapshot/push/prune". 

325 # Jobs can depend on another job via a parent/child relationship formed by '/' directory separators 

326 # within the dataset string, and multiple "datasets" form a job dependency tree by way of common 

327 # dataset directory prefixes. Jobs that do not depend on each other can be executed in parallel, and 

328 # jobs can be told to first wait for other jobs to complete successfully. The algorithm is based on 

329 # a barrier primitive and is typically disabled; it is only required for rare jobrunner configs. For 

330 # example, a job scheduler can specify that all parallel push replications jobs to multiple 

331 # destinations must succeed before the jobs of the pruning phase can start. More generally, with 

332 # this algo, a job scheduler can specify that all jobs within a given job subtree (containing any 

333 # nested combination of sequential and/or parallel jobs) must successfully complete before a certain 

334 # other job within the job tree is started. This is specified via the barrier directory named "~". 

335 # Example: "src_host1/createsnapshot/~/prune". 

336 # Note that "~" is unambiguous as it is not a valid ZFS dataset name component per the naming rules 

337 # enforced by the 'zfs create', 'zfs snapshot' and 'zfs bookmark' CLIs. 

338 def enqueue_children(node: TreeNode) -> int: 

339 """Returns number of jobs that were added to priority_queue for immediate start of processing.""" 

340 n: int = 0 

341 children: Tree = node.children 

342 for child, grandchildren in children.items(): 

343 child_node: TreeNode = _make_tree_node( 

344 datasets_set.interned(f"{node.dataset}/{child}"), grandchildren, parent=node 

345 ) 

346 k: int 

347 if child != BARRIER_CHAR: 

348 if child_node.dataset in datasets_set: 

349 # it's not a barrier; make job available for immediate start of processing 

350 heapq.heappush(priority_queue, child_node) 

351 k = 1 

352 else: # it's an intermediate node that has no job attached; pass the enqueue operation 

353 k = enqueue_children(child_node) # ... recursively down the tree 

354 elif len(children) == 1: # if the only child is a barrier then pass the enqueue operation 

355 k = enqueue_children(child_node) # ... recursively down the tree 

356 else: # park the barrier node within the (still closed) barrier for the time being 

357 assert node.mut.barrier is None 

358 node.mut.barrier = child_node 

359 k = 0 

360 node.mut.pending += min(1, k) 

361 n += k 

362 assert n >= 0 

363 return n 

364 

365 def on_job_completion_with_barriers(node: TreeNode, no_skip: bool) -> None: 

366 """After successful completion, enqueues children, opens barriers + propagates completion upwards.""" 

367 if no_skip: 

368 enqueue_children(node) # make child datasets available for start of processing 

369 else: # job completed without success 

370 tmp = node # ... thus, opening the barrier shall always do nothing in node and its ancestors 

371 while tmp is not None: 

372 tmp.mut.barrier = immutable_empty_barrier 

373 tmp = tmp.parent 

374 assert node.mut.pending >= 0 

375 while node.mut.pending == 0: # have all jobs in subtree of current node completed? 

376 if no_skip: # ... if so open the barrier, if it exists, and enqueue jobs waiting on it 

377 if not (node.mut.barrier is None or node.mut.barrier is immutable_empty_barrier): 

378 node.mut.pending += min(1, enqueue_children(node.mut.barrier)) 

379 node.mut.barrier = immutable_empty_barrier 

380 if node.mut.pending > 0: # did opening of barrier cause jobs to be enqueued in subtree? 

381 break # ... if so we aren't quite done yet with this subtree 

382 if node.parent is None: 

383 break # we've reached the root node 

384 node = node.parent # recurse up the tree to propagate completion upward 

385 node.mut.pending -= 1 # mark subtree as completed 

386 assert node.mut.pending >= 0 

387 

388 assert barriers_enabled 

389 on_job_completion_with_barriers(done_future_node, no_skip) 

390 # endwhile submit_datasets() 

391 assert len(priority_queue) == 0 

392 assert len(todo_futures) == 0 

393 assert len(future_to_node) == 0 

394 return failed