Coverage for bzfs_main / util / parallel_tasktree.py: 100%

213 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-24 10:16 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Fault-tolerant, dependency-aware workflow scheduling and execution of parallel operations, ensuring that ancestor datasets 

16finish before descendants start; The design maximizes throughput while preventing inconsistent dataset states during 

17replication or snapshot deletion. 

18 

19This module contains only generic scheduling and coordination (the "algorithm"). Error handling, retries, and skip policies 

20are customizable and implemented by callers via the CompletionCallback API or wrappers such as ``parallel_tasktree_policy``. 

21 

22Has zero dependencies beyond the Python standard library. 

23""" 

24 

25from __future__ import ( 

26 annotations, 

27) 

28import concurrent 

29import heapq 

30import logging 

31import os 

32from concurrent.futures import ( 

33 FIRST_COMPLETED, 

34 Executor, 

35 Future, 

36 ThreadPoolExecutor, 

37) 

38from typing import ( 

39 Callable, 

40 Final, 

41 NamedTuple, 

42 final, 

43) 

44 

45from bzfs_main.util.utils import ( 

46 Comparable, 

47 HashedInterner, 

48 SortedInterner, 

49 SynchronousExecutor, 

50 TaskTiming, 

51 has_duplicates, 

52 has_siblings, 

53) 

54 

55# constants: 

56BARRIER_CHAR: Final[str] = "~" 

57COMPONENT_SEPARATOR: Final[str] = "/" # ZFS dataset component separator 

58 

59 

60############################################################################# 

61@final 

62class CompletionCallbackResult(NamedTuple): 

63 """Result of a CompletionCallback invocation.""" 

64 

65 no_skip: bool 

66 """True enqueues children, False skips the subtree.""" 

67 

68 fail: bool 

69 """True marks overall run as failed.""" 

70 

71 

72############################################################################# 

73CompletionCallback = Callable[[set[Future["CompletionCallback"]]], CompletionCallbackResult] # Type alias 

74"""Callable that is run by the main coordination thread after a ``process_dataset()`` task finishes. 

75 

76Purpose: 

77- Decide follow-up scheduling after a ``process_dataset()`` task finished. 

78 

79Assumptions: 

80- Runs in the single coordination thread. 

81- May inspect and cancel in-flight futures to implement fail-fast semantics. 

82- If cancelling in-flight futures for tasks that spawn subprocesses (e.g. via subprocess.run()), callers should also 

83consider terminating the corresponding process subtree to avoid child processes lingering longer than desired. Skipping 

84termination will not hang the scheduler (workers will complete naturally), but those subprocesses may outlive cancellation 

85until they exit or time out. 

86""" 

87 

88 

89############################################################################# 

90@final 

91class ParallelTaskTree: 

92 """Main class for dependency-aware workflow scheduling of dataset jobs with optional barriers and priority ordering.""" 

93 

94 def __init__( 

95 self, 

96 *, 

97 log: logging.Logger, 

98 datasets: list[str], # (sorted) list of datasets to process 

99 process_dataset: Callable[[str, int], CompletionCallback], # lambda: dataset, tid; must be thread-safe 

100 priority: Callable[[str], Comparable] = lambda dataset: dataset, # lexicographical order by default 

101 max_workers: int = os.cpu_count() or 1, 

102 executors: Callable[[], Executor] | None = None, # factory producing Executor; None means 'auto-choose' 

103 interval_nanos: Callable[ 

104 [int, str, int], int 

105 ] = lambda last_update_nanos, dataset, submit_count: 0, # optionally spread tasks out over time; e.g. for jitter 

106 timing: TaskTiming = TaskTiming(), # noqa: B008 

107 enable_barriers: bool | None = None, # for testing only; None means 'auto-detect' 

108 barrier_name: str = BARRIER_CHAR, 

109 is_test_mode: bool = False, 

110 ) -> None: 

111 """Prepares to process datasets in parallel with dependency-aware workflow scheduling and fault tolerance. 

112 

113 This class orchestrates parallel execution of dataset operations while maintaining strict hierarchical dependencies. 

114 Processing of a dataset only starts after processing of all its ancestor datasets has completed, ensuring data 

115 consistency during operations like ZFS replication or snapshot deletion. 

116 

117 Purpose: 

118 -------- 

119 - Process hierarchical datasets in parallel while respecting parent-child dependencies 

120 - Provide dependency-aware workflow scheduling; error handling and retries are implemented by callers via 

121 ``CompletionCallback`` or thin wrappers 

122 - Maximize throughput by processing independent dataset subtrees in parallel 

123 - Support complex job scheduling patterns via optional barrier synchronization 

124 

125 Assumptions: 

126 ----------------- 

127 - Input `datasets` list is sorted in lexicographical order (enforced in test mode) 

128 - Input `datasets` list contains no duplicate entries (enforced in test mode) 

129 - Input `datasets` list contains no empty dataset names and none that start with '/' 

130 - Dataset hierarchy is determined by slash-separated path components 

131 - The `process_dataset` callable is thread-safe and can be executed in parallel 

132 

133 Design Rationale: 

134 ----------------- 

135 - The implementation uses a priority queue-based scheduler that maintains two key invariants: 

136 

137 - Dependency Ordering: Children are only made available for start of processing after their parent completes, 

138 preventing inconsistent dataset states. 

139 

140 - Priority: Among the datasets available for start of processing, the "smallest" is always processed next, according 

141 to a customizable priority callback function, which by default sorts by lexicographical order (not dataset size), 

142 ensuring more deterministic execution order. 

143 

144 Algorithm Selection: 

145 -------------------- 

146 - Simple Algorithm (default): Used when no barriers ('~') are detected in dataset names. Provides efficient 

147 scheduling for standard parent-child dependencies via recursive child enqueueing after parent completion. 

148 

149 - Barrier Algorithm (advanced): Activated when barriers are detected or explicitly enabled. Supports complex 

150 synchronization scenarios where jobs must wait for completion of entire subtrees before proceeding. Essential 

151 for advanced job scheduling patterns like "complete all parallel replications before starting pruning phase." 

152 

153 - Both algorithms are CPU and memory efficient. They require main memory proportional to the number of datasets 

154 (~400 bytes per dataset), and easily scale to millions of datasets. Time complexity is O(N log N), where 

155 N is the number of datasets. 

156 

157 Concurrency Design: 

158 ------------------- 

159 By default uses ThreadPoolExecutor with configurable worker limits to balance parallelism against resource 

160 consumption. Optionally, plug in a custom Executor to submit tasks to scale-out clusters via frameworks like 

161 Ray Core or Dask, etc. 

162 The single-threaded coordination loop prevents race conditions while worker threads execute dataset operations in 

163 parallel. 

164 

165 Params: 

166 ------- 

167 - datasets: Sorted list of dataset names to process (must not contain duplicates) 

168 - process_dataset: Thread-safe Callback function to execute on each dataset; returns a CompletionCallback determining 

169 if to fail or skip subtree on error; CompletionCallback runs in the (single) main thread as part of the 

170 coordination loop. 

171 - priority: Callback function to determine dataset processing order; defaults to lexicographical order. 

172 - interval_nanos: Callback that returns a non-negative delay (ns) to add to ``next_update_nanos`` for 

173 jitter/back-pressure control; arguments are ``(last_update_nanos, dataset, submit_count)`` 

174 - max_workers: Maximum number of parallel worker threads 

175 - executors: Factory returning an Executor to submit tasks to; None means 'auto-choose' 

176 - enable_barriers: Force enable/disable barrier algorithm (None = auto-detect) 

177 - barrier_name: Directory name that denotes a barrier within dataset/job strings (default '~'); must be non-empty and 

178 not contain '/' 

179 - timing: Optionally request early async termination; stops new submissions and cancels in-flight tasks 

180 """ 

181 assert log is not None 

182 assert (not is_test_mode) or datasets == sorted(datasets), "List is not sorted" 

183 assert (not is_test_mode) or not has_duplicates(datasets), "List contains duplicates" 

184 if COMPONENT_SEPARATOR in barrier_name or not barrier_name: 

185 raise ValueError(f"Invalid barrier_name: {barrier_name}") 

186 for dataset in datasets: 

187 if dataset.startswith(COMPONENT_SEPARATOR) or not dataset: 

188 raise ValueError(f"Invalid dataset name: {dataset}") 

189 assert callable(process_dataset) 

190 assert callable(priority) 

191 assert max_workers > 0 

192 assert callable(interval_nanos) 

193 has_barrier: Final[bool] = any(barrier_name in dataset.split(COMPONENT_SEPARATOR) for dataset in datasets) 

194 assert (enable_barriers is not False) or not has_barrier, "Barrier seen in datasets but barriers explicitly disabled" 

195 

196 self._barriers_enabled: Final[bool] = bool(has_barrier or enable_barriers) 

197 self._barrier_name: Final[str] = barrier_name 

198 self._log: Final[logging.Logger] = log 

199 self._datasets: Final[list[str]] = datasets 

200 self._process_dataset: Final[Callable[[str, int], CompletionCallback]] = process_dataset 

201 self._priority: Final[Callable[[str], Comparable]] = priority 

202 self._max_workers: Final[int] = max_workers 

203 self._interval_nanos: Final[Callable[[int, str, int], int]] = interval_nanos 

204 self._timing: Final[TaskTiming] = timing 

205 self._is_test_mode: Final[bool] = is_test_mode 

206 self._priority_queue: Final[list[_TreeNode]] = [] 

207 self._tree: Final[_Tree] = _build_dataset_tree(datasets) # tree consists of nested dictionaries and is immutable 

208 self._empty_barrier: Final[_TreeNode] = _make_tree_node("empty_barrier", "empty_barrier", {}) # immutable! 

209 self._datasets_set: Final[SortedInterner[str]] = SortedInterner(datasets) # reduces memory footprint 

210 if executors is None: 

211 is_parallel: bool = max_workers > 1 and len(datasets) > 1 and has_siblings(datasets) # siblings can run in par 

212 

213 def _default_executor_factory() -> Executor: 

214 return ThreadPoolExecutor(max_workers) if is_parallel else SynchronousExecutor() 

215 

216 executors = _default_executor_factory 

217 self._executors: Final[Callable[[], Executor]] = executors 

218 assert callable(executors) 

219 

220 def process_datasets_in_parallel(self) -> bool: 

221 """Executes the configured tasks and returns True if any dataset processing failed, False if all succeeded.""" 

222 self._build_priority_queue() 

223 executor: Executor = self._executors() 

224 with executor: 

225 todo_futures: set[Future[CompletionCallback]] = set() 

226 future_to_node: dict[Future[CompletionCallback], _TreeNode] = {} 

227 submit_count: int = 0 

228 timing: TaskTiming = self._timing 

229 next_update_nanos: int = timing.monotonic_ns() 

230 wait_timeout: float | None = None 

231 failed: bool = False 

232 

233 def submit_datasets() -> bool: 

234 """Submits available datasets to worker threads and returns False if all tasks have been completed.""" 

235 nonlocal wait_timeout 

236 wait_timeout = None # indicates to use blocking flavor of concurrent.futures.wait() 

237 while len(self._priority_queue) > 0 and len(todo_futures) < self._max_workers: 

238 # pick "smallest" dataset (wrt. sort order) available for start of processing; submit to thread pool 

239 nonlocal next_update_nanos 

240 sleep_nanos: int = next_update_nanos - timing.monotonic_ns() 

241 if sleep_nanos > 0: 

242 timing.sleep(sleep_nanos) # allow early wakeup on async termination 

243 if timing.is_terminated(): 

244 break 

245 if sleep_nanos > 0 and len(todo_futures) > 0: 

246 wait_timeout = 0 # indicates to use non-blocking flavor of concurrent.futures.wait() 

247 # It's possible an even "smaller" dataset (wrt. sort order) has become available while we slept. 

248 # If so it's preferable to submit to the thread pool the smaller one first. 

249 break # break out of loop to check if that's the case via non-blocking concurrent.futures.wait() 

250 node: _TreeNode = heapq.heappop(self._priority_queue) # pick "smallest" dataset (wrt. sort order) 

251 nonlocal submit_count 

252 submit_count += 1 

253 next_update_nanos += max(0, self._interval_nanos(next_update_nanos, node.dataset, submit_count)) 

254 future: Future[CompletionCallback] = executor.submit(self._process_dataset, node.dataset, submit_count) 

255 future_to_node[future] = node 

256 todo_futures.add(future) 

257 return len(todo_futures) > 0 and not timing.is_terminated() 

258 

259 def complete_datasets() -> None: 

260 """Waits for completed futures, processes results and errors, then enqueues follow-up tasks per policy.""" 

261 nonlocal failed 

262 nonlocal todo_futures 

263 done_futures: set[Future[CompletionCallback]] 

264 done_futures, todo_futures = concurrent.futures.wait(todo_futures, wait_timeout, return_when=FIRST_COMPLETED) 

265 for done_future in done_futures: 

266 done_node: _TreeNode = future_to_node.pop(done_future) 

267 c_callback: CompletionCallback = done_future.result() # does not block as processing already completed 

268 c_callback_result: CompletionCallbackResult = c_callback(todo_futures) 

269 no_skip: bool = c_callback_result.no_skip 

270 fail: bool = c_callback_result.fail 

271 failed = failed or fail 

272 self._complete_dataset(done_node, no_skip=no_skip) 

273 

274 # coordination loop; runs in the (single) main thread; submits tasks to worker threads and handles their results 

275 while submit_datasets(): 

276 complete_datasets() 

277 

278 if timing.is_terminated(): 

279 for todo_future in todo_futures: 

280 todo_future.cancel() 

281 failed = failed or len(self._priority_queue) > 0 or len(todo_futures) > 0 

282 self._priority_queue.clear() 

283 todo_futures.clear() 

284 future_to_node.clear() 

285 assert len(self._priority_queue) == 0 

286 assert len(todo_futures) == 0 

287 assert len(future_to_node) == 0 

288 return failed 

289 

290 def _build_priority_queue(self) -> None: 

291 """Builds and fills initial priority queue of available root nodes for this task tree, ensuring the scheduler starts 

292 from a synthetic root node while honoring barriers; the synthetic root simplifies enqueueing logic.""" 

293 self._priority_queue.clear() 

294 root_node: _TreeNode = _make_tree_node(priority="", dataset="", children=self._tree) 

295 self._complete_dataset(root_node, no_skip=True) 

296 

297 def _complete_dataset(self, node: _TreeNode, no_skip: bool) -> None: 

298 """Enqueues child nodes for start of processing, using the appropriate algorithm.""" 

299 if self._barriers_enabled: # This barrier-based algorithm is for more general job scheduling, as in bzfs_jobrunner 

300 self._complete_dataset_with_barriers(node, no_skip=no_skip) 

301 elif no_skip: # This simple algorithm is sufficient for most uses 

302 self._simple_enqueue_children(node) 

303 

304 def _simple_enqueue_children(self, node: _TreeNode) -> None: 

305 """Enqueues child nodes for start of processing (using iteration to avoid potentially hitting recursion limits).""" 

306 stack: list[_TreeNode] = [node] 

307 while stack: 

308 current_node: _TreeNode = stack.pop() 

309 for child, grandchildren in current_node.children.items(): # as processing of parent has now completed 

310 child_abs_dataset: str = self._join_dataset(current_node.dataset, child) 

311 child_node: _TreeNode = _make_tree_node(self._priority(child_abs_dataset), child_abs_dataset, grandchildren) 

312 if child_abs_dataset in self._datasets_set: 

313 heapq.heappush(self._priority_queue, child_node) # make it available for start of processing 

314 else: # it's an intermediate node that has no job attached; pass the enqueue operation 

315 stack.append(child_node) # ... recursively down the tree 

316 

317 def _complete_dataset_with_barriers(self, node: _TreeNode, no_skip: bool) -> None: 

318 """After successful completion, enqueues children, opens barriers, and propagates completion upwards. 

319 

320 The (more complex) algorithm below is for more general job scheduling, as in bzfs_jobrunner. Here, a "dataset" string 

321 is treated as an identifier for any kind of job rather than a reference to a concrete ZFS object. An example 

322 "dataset" job string is "src_host1/createsnapshots/replicate_to_hostA". Jobs can depend on another job via a 

323 parent/child relationship formed by '/' directory separators within the dataset string, and multiple "datasets" form 

324 a job dependency tree by way of common dataset directory prefixes. Jobs that do not depend on each other can be 

325 executed in parallel, and jobs can be told to first wait for other jobs to complete successfully. The algorithm is 

326 based on a barrier primitive and is typically disabled. It is only required for rare jobrunner configs. 

327 

328 For example, a job scheduler can specify that all parallel replications jobs to multiple destinations must succeed 

329 before the jobs of the pruning phase can start. More generally, with this algo, a job scheduler can specify that all 

330 jobs within a given job subtree (containing any nested combination of sequential and/or parallel jobs) must 

331 successfully complete before a certain other job within the job tree is started. This is specified via the barrier 

332 directory named by ``barrier_name`` (default '~'). An example is "src_host1/createsnapshots/~/prune". 

333 

334 Note that the default '~' is unambiguous as it is not a valid ZFS dataset name component per the naming rules 

335 enforced by the 'zfs create', 'zfs snapshot' and 'zfs bookmark' CLIs. Custom barrier names should avoid colliding 

336 with real dataset/job components. 

337 """ 

338 

339 def enqueue_children(node: _TreeNode) -> int: 

340 """Returns number of jobs that were added to priority_queue for immediate start of processing.""" 

341 n: int = 0 

342 children: _Tree = node.children 

343 for child, grandchildren in children.items(): 

344 abs_dataset: str = self._join_dataset(node.dataset, child) 

345 child_node: _TreeNode = _make_tree_node(self._priority(abs_dataset), abs_dataset, grandchildren, parent=node) 

346 k: int 

347 if child != self._barrier_name: 

348 if abs_dataset in self._datasets_set: 

349 # it's not a barrier; make job available for immediate start of processing 

350 heapq.heappush(self._priority_queue, child_node) 

351 k = 1 

352 else: # it's an intermediate node that has no job attached; pass the enqueue operation 

353 k = enqueue_children(child_node) # ... recursively down the tree 

354 elif len(children) == 1: # if the only child is a barrier then pass the enqueue operation 

355 k = enqueue_children(child_node) # ... recursively down the tree 

356 else: # park the barrier node within the (still closed) barrier for the time being 

357 assert node.mut.barrier is None 

358 node.mut.barrier = child_node 

359 k = 0 

360 node.mut.pending += min(1, k) 

361 n += k 

362 assert n >= 0 

363 return n 

364 

365 if no_skip: 

366 enqueue_children(node) # make child datasets available for start of processing 

367 else: # job completed without success 

368 # ... thus, opening the barrier shall always do nothing in node and its ancestors. 

369 # perf: Irrevocably mark (exactly once) barriers of this node and all its ancestors as cleared due to subtree 

370 # skip, via barriers_cleared=True. This enables to avoid redundant re-walking the entire ancestor chain on 

371 # subsequent skip. 

372 tmp: _TreeNode | None = node 

373 while (tmp is not None) and not tmp.mut.barriers_cleared: 

374 tmp.mut.barriers_cleared = True 

375 tmp.mut.barrier = self._empty_barrier 

376 tmp = tmp.parent 

377 assert node.mut.pending >= 0 

378 while node.mut.pending == 0: # have all jobs in subtree of current node completed? 

379 if no_skip: # ... if so open the barrier, if it exists, and enqueue jobs waiting on it 

380 if not (node.mut.barrier is None or node.mut.barrier is self._empty_barrier): 

381 node.mut.pending += min(1, enqueue_children(node.mut.barrier)) 

382 node.mut.barrier = self._empty_barrier 

383 if node.mut.pending > 0: # did opening of barrier cause jobs to be enqueued in subtree? 

384 break # ... if so we have not yet completed the subtree, so don't mark the subtree as completed yet 

385 if node.parent is None: 

386 break # we've reached the root node 

387 node = node.parent # recurse up the tree to propagate completion upward 

388 node.mut.pending -= 1 # mark subtree as completed 

389 assert node.mut.pending >= 0 

390 

391 def _join_dataset(self, parent: str, child: str) -> str: 

392 """Concatenates parent and child dataset names; accommodates synthetic root node; interns for memory footprint.""" 

393 return self._datasets_set.interned(f"{parent}{COMPONENT_SEPARATOR}{child}" if parent else child) 

394 

395 

396############################################################################# 

397@final 

398class _TreeNodeMutableAttributes: 

399 """Container for mutable attributes, stored space efficiently.""" 

400 

401 __slots__ = ("barrier", "barriers_cleared", "pending") # uses more compact memory layout than __dict__ 

402 

403 def __init__(self) -> None: 

404 self.barrier: _TreeNode | None = None # zero or one barrier TreeNode waiting for this node to complete 

405 self.pending: int = 0 # number of children added to priority queue that haven't completed their work yet 

406 self.barriers_cleared: bool = False # irrevocably mark barriers of this node and all its ancestors as cleared? 

407 

408 

409############################################################################# 

410@final 

411class _TreeNode(NamedTuple): 

412 """Node in dataset dependency tree used by the scheduler; _TreeNodes are ordered by priority and dataset name within a 

413 priority queue, via __lt__ comparisons.""" 

414 

415 priority: Comparable # determines the processing order once this dataset has become available for start of processing 

416 dataset: str # each dataset name is unique; attribs other than `priority` and `dataset` are never used for comparisons 

417 children: _Tree # dataset "directory" tree consists of nested dicts; aka dict[str, dict] 

418 parent: _TreeNode | None 

419 mut: _TreeNodeMutableAttributes 

420 

421 def __repr__(self) -> str: 

422 priority, dataset, pending, barrier = self.priority, self.dataset, self.mut.pending, self.mut.barrier 

423 return str({"priority": priority, "dataset": dataset, "pending": pending, "barrier": barrier is not None}) 

424 

425 

426def _make_tree_node(priority: Comparable, dataset: str, children: _Tree, parent: _TreeNode | None = None) -> _TreeNode: 

427 """Creates a TreeNode with mutable state container.""" 

428 return _TreeNode(priority, dataset, children, parent, _TreeNodeMutableAttributes()) 

429 

430 

431############################################################################# 

432_Tree = dict[str, "_Tree"] # Type alias 

433 

434 

435def _build_dataset_tree(sorted_datasets: list[str]) -> _Tree: 

436 """Takes as input a sorted list of datasets and returns a sorted directory tree containing the same dataset names, in the 

437 form of nested dicts; This converts the dataset list into a dependency tree.""" 

438 tree: _Tree = {} 

439 interner: HashedInterner[str] = HashedInterner() # reduces memory footprint 

440 for dataset in sorted_datasets: 

441 current: _Tree = tree 

442 for component in dataset.split(COMPONENT_SEPARATOR): 

443 child: _Tree | None = current.get(component) 

444 if child is None: 

445 child = {} 

446 component = interner.intern(component) 

447 current[component] = child 

448 current = child 

449 

450 def compact(node: _Tree) -> None: 

451 """Tree with shared empty leaf nodes has some 30% lower memory footprint than the non-compacted version.""" 

452 shared_empty_leaf: _Tree = {} 

453 stack: list[_Tree] = [node] 

454 while stack: # algo implemented using iteration to avoid hitting recursion limits with pathologically deep trees 

455 current_node: _Tree = stack.pop() 

456 for key, child_node in current_node.items(): 

457 if len(child_node) == 0: 

458 current_node[key] = shared_empty_leaf # sharing is safe as the tree is treated as immutable henceforth 

459 else: 

460 stack.append(child_node) # recurse down the tree 

461 

462 compact(tree) 

463 return tree