Coverage for bzfs_main / util / parallel_tasktree.py: 100%

214 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-22 08:03 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Fault-tolerant, dependency-aware scheduling and execution of parallel operations, ensuring that ancestor datasets finish 

16before descendants start; The design maximizes throughput while preventing inconsistent dataset states during replication or 

17snapshot deletion. 

18 

19This module contains only generic scheduling and coordination (the "algorithm"). Error handling, retries, and skip policies 

20are customizable and implemented by callers via the CompletionCallback API or wrappers such as ``parallel_tasktree_policy``. 

21 

22Has zero dependencies beyond the Python standard library. 

23""" 

24 

25from __future__ import ( 

26 annotations, 

27) 

28import concurrent 

29import heapq 

30import logging 

31import os 

32import threading 

33import time 

34from concurrent.futures import ( 

35 FIRST_COMPLETED, 

36 Executor, 

37 Future, 

38 ThreadPoolExecutor, 

39) 

40from typing import ( 

41 Callable, 

42 Final, 

43 NamedTuple, 

44 final, 

45) 

46 

47from bzfs_main.util.utils import ( 

48 Comparable, 

49 HashedInterner, 

50 SortedInterner, 

51 SynchronousExecutor, 

52 has_duplicates, 

53 has_siblings, 

54) 

55 

56# constants: 

57BARRIER_CHAR: Final[str] = "~" 

58COMPONENT_SEPARATOR: Final[str] = "/" # ZFS dataset component separator 

59 

60 

61############################################################################# 

62@final 

63class CompletionCallbackResult(NamedTuple): 

64 """Result of a CompletionCallback invocation.""" 

65 

66 no_skip: bool 

67 """True enqueues children, False skips the subtree.""" 

68 

69 fail: bool 

70 """True marks overall run as failed.""" 

71 

72 

73############################################################################# 

74CompletionCallback = Callable[[set[Future["CompletionCallback"]]], CompletionCallbackResult] # Type alias 

75"""Callable that is run by the main coordination thread after a ``process_dataset()`` task finishes. 

76 

77Purpose: 

78- Decide follow-up scheduling after a ``process_dataset()`` task finished. 

79 

80Assumptions: 

81- Runs in the single coordination thread. 

82- May inspect and cancel in-flight futures to implement fail-fast semantics. 

83- If cancelling in-flight futures for tasks that spawn subprocesses (e.g. via subprocess.run()), callers should also 

84consider terminating the corresponding process subtree to avoid child processes lingering longer than desired. Skipping 

85termination will not hang the scheduler (workers will complete naturally), but those subprocesses may outlive cancellation 

86until they exit or time out. 

87""" 

88 

89 

90############################################################################# 

91@final 

92class ParallelTaskTree: 

93 """Main class for dependency-aware scheduling of dataset jobs with optional barriers and priority ordering.""" 

94 

95 def __init__( 

96 self, 

97 log: logging.Logger, 

98 datasets: list[str], # (sorted) list of datasets to process 

99 process_dataset: Callable[[str, int], CompletionCallback], # lambda: dataset, tid; must be thread-safe 

100 priority: Callable[[str], Comparable] = lambda dataset: dataset, # lexicographical order by default 

101 max_workers: int = os.cpu_count() or 1, 

102 executors: Callable[[], Executor] | None = None, # factory producing Executor; None means 'auto-choose' 

103 interval_nanos: Callable[ 

104 [int, str, int], int 

105 ] = lambda last_update_nanos, dataset, submit_count: 0, # optionally spread tasks out over time; e.g. for jitter 

106 termination_event: threading.Event | None = None, # optional event to request early async termination 

107 enable_barriers: bool | None = None, # for testing only; None means 'auto-detect' 

108 barrier_name: str = BARRIER_CHAR, 

109 is_test_mode: bool = False, 

110 ) -> None: 

111 """Prepares to process datasets in parallel with dependency-aware scheduling and fault tolerance. 

112 

113 This class orchestrates parallel execution of dataset operations while maintaining strict hierarchical dependencies. 

114 Processing of a dataset only starts after processing of all its ancestor datasets has completed, ensuring data 

115 consistency during operations like ZFS replication or snapshot deletion. 

116 

117 Purpose: 

118 -------- 

119 - Process hierarchical datasets in parallel while respecting parent-child dependencies 

120 - Provide dependency-aware scheduling; error handling and retries are implemented by callers via 

121 ``CompletionCallback`` or thin wrappers 

122 - Maximize throughput by processing independent dataset subtrees in parallel 

123 - Support complex job scheduling patterns via optional barrier synchronization 

124 

125 Assumptions: 

126 ----------------- 

127 - Input `datasets` list is sorted in lexicographical order (enforced in test mode) 

128 - Input `datasets` list contains no duplicate entries (enforced in test mode) 

129 - Input `datasets` list contains no empty dataset names and none that start with '/' 

130 - Dataset hierarchy is determined by slash-separated path components 

131 - The `process_dataset` callable is thread-safe and can be executed in parallel 

132 

133 Design Rationale: 

134 ----------------- 

135 - The implementation uses a priority queue-based scheduler that maintains two key invariants: 

136 

137 - Dependency Ordering: Children are only made available for start of processing after their parent completes, 

138 preventing inconsistent dataset states. 

139 

140 - Priority: Among the datasets available for start of processing, the "smallest" is always processed next, according 

141 to a customizable priority callback function, which by default sorts by lexicographical order (not dataset size), 

142 ensuring more deterministic execution order. 

143 

144 Algorithm Selection: 

145 -------------------- 

146 - Simple Algorithm (default): Used when no barriers ('~') are detected in dataset names. Provides efficient 

147 scheduling for standard parent-child dependencies via recursive child enqueueing after parent completion. 

148 

149 - Barrier Algorithm (advanced): Activated when barriers are detected or explicitly enabled. Supports complex 

150 synchronization scenarios where jobs must wait for completion of entire subtrees before proceeding. Essential 

151 for advanced job scheduling patterns like "complete all parallel replications before starting pruning phase." 

152 

153 - Both algorithms are CPU and memory efficient. They require main memory proportional to the number of datasets 

154 (~400 bytes per dataset), and easily scale to millions of datasets. Time complexity is O(N log N), where 

155 N is the number of datasets. 

156 

157 Concurrency Design: 

158 ------------------- 

159 By default uses ThreadPoolExecutor with configurable worker limits to balance parallelism against resource 

160 consumption. Optionally, plug in a custom Executor to submit tasks to scale-out clusters via frameworks like 

161 Ray Core or Dask, etc. 

162 The single-threaded coordination loop prevents race conditions while worker threads execute dataset operations in 

163 parallel. 

164 

165 Params: 

166 ------- 

167 - datasets: Sorted list of dataset names to process (must not contain duplicates) 

168 - process_dataset: Thread-safe Callback function to execute on each dataset; returns a CompletionCallback determining 

169 if to fail or skip subtree on error; CompletionCallback runs in the (single) main thread as part of the 

170 coordination loop. 

171 - priority: Callback function to determine dataset processing order; defaults to lexicographical order. 

172 - interval_nanos: Callback that returns a non-negative delay (ns) to add to ``next_update_nanos`` for 

173 jitter/back-pressure control; arguments are ``(last_update_nanos, dataset, submit_count)`` 

174 - max_workers: Maximum number of parallel worker threads 

175 - executors: Factory returning an Executor to submit tasks to; None means 'auto-choose' 

176 - enable_barriers: Force enable/disable barrier algorithm (None = auto-detect) 

177 - barrier_name: Directory name that denotes a barrier within dataset/job strings (default '~'); must be non-empty and 

178 not contain '/' 

179 - termination_event: Optional event to request early async termination; stops new submissions and cancels in-flight 

180 tasks 

181 """ 

182 assert log is not None 

183 assert (not is_test_mode) or datasets == sorted(datasets), "List is not sorted" 

184 assert (not is_test_mode) or not has_duplicates(datasets), "List contains duplicates" 

185 if COMPONENT_SEPARATOR in barrier_name or not barrier_name: 

186 raise ValueError(f"Invalid barrier_name: {barrier_name}") 

187 for dataset in datasets: 

188 if dataset.startswith(COMPONENT_SEPARATOR) or not dataset: 

189 raise ValueError(f"Invalid dataset name: {dataset}") 

190 assert callable(process_dataset) 

191 assert callable(priority) 

192 assert max_workers > 0 

193 assert callable(interval_nanos) 

194 has_barrier: Final[bool] = any(barrier_name in dataset.split(COMPONENT_SEPARATOR) for dataset in datasets) 

195 assert (enable_barriers is not False) or not has_barrier, "Barrier seen in datasets but barriers explicitly disabled" 

196 

197 self._barriers_enabled: Final[bool] = bool(has_barrier or enable_barriers) 

198 self._barrier_name: Final[str] = barrier_name 

199 self._log: Final[logging.Logger] = log 

200 self._datasets: Final[list[str]] = datasets 

201 self._process_dataset: Final[Callable[[str, int], CompletionCallback]] = process_dataset 

202 self._priority: Final[Callable[[str], Comparable]] = priority 

203 self._max_workers: Final[int] = max_workers 

204 self._interval_nanos: Final[Callable[[int, str, int], int]] = interval_nanos 

205 self._termination_event: Final[threading.Event] = ( 

206 threading.Event() if termination_event is None else termination_event 

207 ) 

208 self._is_test_mode: Final[bool] = is_test_mode 

209 self._priority_queue: Final[list[_TreeNode]] = [] 

210 self._tree: Final[_Tree] = _build_dataset_tree(datasets) # tree consists of nested dictionaries and is immutable 

211 self._empty_barrier: Final[_TreeNode] = _make_tree_node("empty_barrier", "empty_barrier", {}) # immutable! 

212 self._datasets_set: Final[SortedInterner[str]] = SortedInterner(datasets) # reduces memory footprint 

213 if executors is None: 

214 is_parallel: bool = max_workers > 1 and len(datasets) > 1 and has_siblings(datasets) # siblings can run in par 

215 

216 def _default_executor_factory() -> Executor: 

217 return ThreadPoolExecutor(max_workers) if is_parallel else SynchronousExecutor() 

218 

219 executors = _default_executor_factory 

220 self._executors: Final[Callable[[], Executor]] = executors 

221 assert callable(executors) 

222 

223 def process_datasets_in_parallel(self) -> bool: 

224 """Executes the configured tasks and returns True if any dataset processing failed, False if all succeeded.""" 

225 self._build_priority_queue() 

226 executor: Executor = self._executors() 

227 with executor: 

228 todo_futures: set[Future[CompletionCallback]] = set() 

229 future_to_node: dict[Future[CompletionCallback], _TreeNode] = {} 

230 submit_count: int = 0 

231 next_update_nanos: int = time.monotonic_ns() 

232 wait_timeout: float | None = None 

233 failed: bool = False 

234 

235 def submit_datasets() -> bool: 

236 """Submits available datasets to worker threads and returns False if all tasks have been completed.""" 

237 nonlocal wait_timeout 

238 wait_timeout = None # indicates to use blocking flavor of concurrent.futures.wait() 

239 while len(self._priority_queue) > 0 and len(todo_futures) < self._max_workers: 

240 # pick "smallest" dataset (wrt. sort order) available for start of processing; submit to thread pool 

241 nonlocal next_update_nanos 

242 sleep_nanos: int = next_update_nanos - time.monotonic_ns() 

243 if sleep_nanos > 0: 

244 self._termination_event.wait(sleep_nanos / 1_000_000_000) # allow early wakeup on async termination 

245 if self._termination_event.is_set(): 

246 break 

247 if sleep_nanos > 0 and len(todo_futures) > 0: 

248 wait_timeout = 0 # indicates to use non-blocking flavor of concurrent.futures.wait() 

249 # It's possible an even "smaller" dataset (wrt. sort order) has become available while we slept. 

250 # If so it's preferable to submit to the thread pool the smaller one first. 

251 break # break out of loop to check if that's the case via non-blocking concurrent.futures.wait() 

252 node: _TreeNode = heapq.heappop(self._priority_queue) # pick "smallest" dataset (wrt. sort order) 

253 nonlocal submit_count 

254 submit_count += 1 

255 next_update_nanos += max(0, self._interval_nanos(next_update_nanos, node.dataset, submit_count)) 

256 future: Future[CompletionCallback] = executor.submit(self._process_dataset, node.dataset, submit_count) 

257 future_to_node[future] = node 

258 todo_futures.add(future) 

259 return len(todo_futures) > 0 and not self._termination_event.is_set() 

260 

261 def complete_datasets() -> None: 

262 """Waits for completed futures, processes results and errors, then enqueues follow-up tasks per policy.""" 

263 nonlocal failed 

264 nonlocal todo_futures 

265 done_futures: set[Future[CompletionCallback]] 

266 done_futures, todo_futures = concurrent.futures.wait(todo_futures, wait_timeout, return_when=FIRST_COMPLETED) 

267 for done_future in done_futures: 

268 done_node: _TreeNode = future_to_node.pop(done_future) 

269 c_callback: CompletionCallback = done_future.result() # does not block as processing already completed 

270 c_callback_result: CompletionCallbackResult = c_callback(todo_futures) 

271 no_skip: bool = c_callback_result.no_skip 

272 fail: bool = c_callback_result.fail 

273 failed = failed or fail 

274 self._complete_dataset(done_node, no_skip=no_skip) 

275 

276 # coordination loop; runs in the (single) main thread; submits tasks to worker threads and handles their results 

277 while submit_datasets(): 

278 complete_datasets() 

279 

280 if self._termination_event.is_set(): 

281 for todo_future in todo_futures: 

282 todo_future.cancel() 

283 failed = failed or len(self._priority_queue) > 0 or len(todo_futures) > 0 

284 self._priority_queue.clear() 

285 todo_futures.clear() 

286 future_to_node.clear() 

287 assert len(self._priority_queue) == 0 

288 assert len(todo_futures) == 0 

289 assert len(future_to_node) == 0 

290 return failed 

291 

292 def _build_priority_queue(self) -> None: 

293 """Builds and fills initial priority queue of available root nodes for this task tree, ensuring the scheduler starts 

294 from a synthetic root node while honoring barriers; the synthetic root simplifies enqueueing logic.""" 

295 self._priority_queue.clear() 

296 root_node: _TreeNode = _make_tree_node(priority="", dataset="", children=self._tree) 

297 self._complete_dataset(root_node, no_skip=True) 

298 

299 def _complete_dataset(self, node: _TreeNode, no_skip: bool) -> None: 

300 """Enqueues child nodes for start of processing, using the appropriate algorithm.""" 

301 if self._barriers_enabled: # This barrier-based algorithm is for more general job scheduling, as in bzfs_jobrunner 

302 self._complete_dataset_with_barriers(node, no_skip=no_skip) 

303 elif no_skip: # This simple algorithm is sufficient for most uses 

304 self._simple_enqueue_children(node) 

305 

306 def _simple_enqueue_children(self, node: _TreeNode) -> None: 

307 """Enqueues child nodes for start of processing (using iteration to avoid potentially hitting recursion limits).""" 

308 stack: list[_TreeNode] = [node] 

309 while stack: 

310 current_node: _TreeNode = stack.pop() 

311 for child, grandchildren in current_node.children.items(): # as processing of parent has now completed 

312 child_abs_dataset: str = self._join_dataset(current_node.dataset, child) 

313 child_node: _TreeNode = _make_tree_node(self._priority(child_abs_dataset), child_abs_dataset, grandchildren) 

314 if child_abs_dataset in self._datasets_set: 

315 heapq.heappush(self._priority_queue, child_node) # make it available for start of processing 

316 else: # it's an intermediate node that has no job attached; pass the enqueue operation 

317 stack.append(child_node) # ... recursively down the tree 

318 

319 def _complete_dataset_with_barriers(self, node: _TreeNode, no_skip: bool) -> None: 

320 """After successful completion, enqueues children, opens barriers, and propagates completion upwards. 

321 

322 The (more complex) algorithm below is for more general job scheduling, as in bzfs_jobrunner. Here, a "dataset" string 

323 is treated as an identifier for any kind of job rather than a reference to a concrete ZFS object. An example 

324 "dataset" job string is "src_host1/createsnapshots/replicate_to_hostA". Jobs can depend on another job via a 

325 parent/child relationship formed by '/' directory separators within the dataset string, and multiple "datasets" form 

326 a job dependency tree by way of common dataset directory prefixes. Jobs that do not depend on each other can be 

327 executed in parallel, and jobs can be told to first wait for other jobs to complete successfully. The algorithm is 

328 based on a barrier primitive and is typically disabled. It is only required for rare jobrunner configs. 

329 

330 For example, a job scheduler can specify that all parallel replications jobs to multiple destinations must succeed 

331 before the jobs of the pruning phase can start. More generally, with this algo, a job scheduler can specify that all 

332 jobs within a given job subtree (containing any nested combination of sequential and/or parallel jobs) must 

333 successfully complete before a certain other job within the job tree is started. This is specified via the barrier 

334 directory named by ``barrier_name`` (default '~'). An example is "src_host1/createsnapshots/~/prune". 

335 

336 Note that the default '~' is unambiguous as it is not a valid ZFS dataset name component per the naming rules 

337 enforced by the 'zfs create', 'zfs snapshot' and 'zfs bookmark' CLIs. Custom barrier names should avoid colliding 

338 with real dataset/job components. 

339 """ 

340 

341 def enqueue_children(node: _TreeNode) -> int: 

342 """Returns number of jobs that were added to priority_queue for immediate start of processing.""" 

343 n: int = 0 

344 children: _Tree = node.children 

345 for child, grandchildren in children.items(): 

346 abs_dataset: str = self._join_dataset(node.dataset, child) 

347 child_node: _TreeNode = _make_tree_node(self._priority(abs_dataset), abs_dataset, grandchildren, parent=node) 

348 k: int 

349 if child != self._barrier_name: 

350 if abs_dataset in self._datasets_set: 

351 # it's not a barrier; make job available for immediate start of processing 

352 heapq.heappush(self._priority_queue, child_node) 

353 k = 1 

354 else: # it's an intermediate node that has no job attached; pass the enqueue operation 

355 k = enqueue_children(child_node) # ... recursively down the tree 

356 elif len(children) == 1: # if the only child is a barrier then pass the enqueue operation 

357 k = enqueue_children(child_node) # ... recursively down the tree 

358 else: # park the barrier node within the (still closed) barrier for the time being 

359 assert node.mut.barrier is None 

360 node.mut.barrier = child_node 

361 k = 0 

362 node.mut.pending += min(1, k) 

363 n += k 

364 assert n >= 0 

365 return n 

366 

367 if no_skip: 

368 enqueue_children(node) # make child datasets available for start of processing 

369 else: # job completed without success 

370 # ... thus, opening the barrier shall always do nothing in node and its ancestors. 

371 # perf: Irrevocably mark (exactly once) barriers of this node and all its ancestors as cleared due to subtree 

372 # skip, via barriers_cleared=True. This enables to avoid redundant re-walking the entire ancestor chain on 

373 # subsequent skip. 

374 tmp: _TreeNode | None = node 

375 while (tmp is not None) and not tmp.mut.barriers_cleared: 

376 tmp.mut.barriers_cleared = True 

377 tmp.mut.barrier = self._empty_barrier 

378 tmp = tmp.parent 

379 assert node.mut.pending >= 0 

380 while node.mut.pending == 0: # have all jobs in subtree of current node completed? 

381 if no_skip: # ... if so open the barrier, if it exists, and enqueue jobs waiting on it 

382 if not (node.mut.barrier is None or node.mut.barrier is self._empty_barrier): 

383 node.mut.pending += min(1, enqueue_children(node.mut.barrier)) 

384 node.mut.barrier = self._empty_barrier 

385 if node.mut.pending > 0: # did opening of barrier cause jobs to be enqueued in subtree? 

386 break # ... if so we have not yet completed the subtree, so don't mark the subtree as completed yet 

387 if node.parent is None: 

388 break # we've reached the root node 

389 node = node.parent # recurse up the tree to propagate completion upward 

390 node.mut.pending -= 1 # mark subtree as completed 

391 assert node.mut.pending >= 0 

392 

393 def _join_dataset(self, parent: str, child: str) -> str: 

394 """Concatenates parent and child dataset names; accommodates synthetic root node; interns for memory footprint.""" 

395 return self._datasets_set.interned(f"{parent}{COMPONENT_SEPARATOR}{child}" if parent else child) 

396 

397 

398############################################################################# 

399@final 

400class _TreeNodeMutableAttributes: 

401 """Container for mutable attributes, stored space efficiently.""" 

402 

403 __slots__ = ("barrier", "barriers_cleared", "pending") # uses more compact memory layout than __dict__ 

404 

405 def __init__(self) -> None: 

406 self.barrier: _TreeNode | None = None # zero or one barrier TreeNode waiting for this node to complete 

407 self.pending: int = 0 # number of children added to priority queue that haven't completed their work yet 

408 self.barriers_cleared: bool = False # irrevocably mark barriers of this node and all its ancestors as cleared? 

409 

410 

411############################################################################# 

412@final 

413class _TreeNode(NamedTuple): 

414 """Node in dataset dependency tree used by the scheduler; _TreeNodes are ordered by priority and dataset name within a 

415 priority queue, via __lt__ comparisons.""" 

416 

417 priority: Comparable # determines the processing order once this dataset has become available for start of processing 

418 dataset: str # each dataset name is unique; attribs other than `priority` and `dataset` are never used for comparisons 

419 children: _Tree # dataset "directory" tree consists of nested dicts; aka dict[str, dict] 

420 parent: _TreeNode | None 

421 mut: _TreeNodeMutableAttributes 

422 

423 def __repr__(self) -> str: 

424 priority, dataset, pending, barrier = self.priority, self.dataset, self.mut.pending, self.mut.barrier 

425 return str({"priority": priority, "dataset": dataset, "pending": pending, "barrier": barrier is not None}) 

426 

427 

428def _make_tree_node(priority: Comparable, dataset: str, children: _Tree, parent: _TreeNode | None = None) -> _TreeNode: 

429 """Creates a TreeNode with mutable state container.""" 

430 return _TreeNode(priority, dataset, children, parent, _TreeNodeMutableAttributes()) 

431 

432 

433############################################################################# 

434_Tree = dict[str, "_Tree"] # Type alias 

435 

436 

437def _build_dataset_tree(sorted_datasets: list[str]) -> _Tree: 

438 """Takes as input a sorted list of datasets and returns a sorted directory tree containing the same dataset names, in the 

439 form of nested dicts; This converts the dataset list into a dependency tree.""" 

440 tree: _Tree = {} 

441 interner: HashedInterner[str] = HashedInterner() # reduces memory footprint 

442 for dataset in sorted_datasets: 

443 current: _Tree = tree 

444 for component in dataset.split(COMPONENT_SEPARATOR): 

445 child: _Tree | None = current.get(component) 

446 if child is None: 

447 child = {} 

448 component = interner.intern(component) 

449 current[component] = child 

450 current = child 

451 shared_empty_leaf: _Tree = {} 

452 

453 def compact(node: _Tree) -> None: 

454 """Tree with shared empty leaf nodes has some 30% lower memory footprint than the non-compacted version.""" 

455 stack: list[_Tree] = [node] 

456 while stack: # algo implemented using iteration to avoid hitting recursion limits with pathologically deep trees 

457 current_node: _Tree = stack.pop() 

458 for key, child_node in current_node.items(): 

459 if len(child_node) == 0: 

460 current_node[key] = shared_empty_leaf # sharing is safe as the tree is treated as immutable henceforth 

461 else: 

462 stack.append(child_node) # recurse down the tree 

463 

464 compact(tree) 

465 return tree