Coverage for bzfs_main / util / parallel_tasktree.py: 100%
208 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 12:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 12:49 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Fault-tolerant, dependency-aware workflow scheduling and execution of parallel operations, ensuring that ancestor datasets
16finish before descendants start; The design maximizes throughput while preventing inconsistent dataset states during
17replication or snapshot deletion.
19This module contains only generic scheduling and coordination (the "algorithm"). Error handling, retries, and skip policies
20are customizable and implemented by callers via the CompletionCallback API or wrappers such as ``parallel_tasktree_policy``.
22Has zero dependencies beyond the Python standard library.
24Example Usage:
25--------------
26 import logging
27 from bzfs_main.util.parallel_tasktree import CompletionCallback, CompletionCallbackResult, ParallelTaskTree
29 datasets = ["a", "a/b", "a/b/c", "a/d", "e"]
31 def process_dataset(dataset: str, _submit_count: int) -> CompletionCallback:
32 print(dataset)
34 def completion_callback(_todo_futures) -> CompletionCallbackResult:
35 return CompletionCallbackResult(no_skip=True, fail=False)
37 return completion_callback
39 log = logging.getLogger(__name__)
40 tasktree = ParallelTaskTree(
41 log=log,
42 datasets=datasets,
43 process_dataset=process_dataset,
44 max_workers=2,
45 )
46 tasktree.process_datasets_in_parallel()
48 # Sample output:
49 # a
50 # e
51 # a/b
52 # a/d
53 # a/b/c
54"""
56from __future__ import (
57 annotations,
58)
59import concurrent
60import heapq
61import logging
62import os
63from concurrent.futures import (
64 FIRST_COMPLETED,
65 Executor,
66 Future,
67)
68from concurrent.futures.thread import (
69 ThreadPoolExecutor,
70)
71from typing import (
72 Callable,
73 Final,
74 NamedTuple,
75 final,
76)
78from bzfs_main.util.utils import (
79 Comparable,
80 HashedInterner,
81 SortedInterner,
82 SynchronousExecutor,
83 TaskTiming,
84 has_duplicates,
85 has_siblings,
86)
88# constants:
89BARRIER_CHAR: Final[str] = "~"
90COMPONENT_SEPARATOR: Final[str] = "/" # ZFS dataset component separator
93#############################################################################
94@final
95class CompletionCallbackResult(NamedTuple):
96 """Result of a CompletionCallback invocation."""
98 no_skip: bool
99 """True enqueues children, False skips the subtree."""
101 fail: bool
102 """True marks overall run as failed."""
105#############################################################################
106CompletionCallback = Callable[[set[Future["CompletionCallback"]]], CompletionCallbackResult] # Type alias
107"""Callable that is run by the main coordination thread after a ``process_dataset()`` task finishes.
109Purpose:
110- Decide follow-up scheduling after a ``process_dataset()`` task finished.
112Assumptions:
113- Runs in the single coordination thread.
114- May inspect and cancel in-flight futures to implement fail-fast semantics.
115- If cancelling in-flight futures for tasks that spawn subprocesses (e.g. via subprocess.run()), callers should also
116consider terminating the corresponding process subtree to avoid child processes lingering longer than desired. Skipping
117termination will not hang the scheduler (workers will complete naturally), but those subprocesses may outlive cancellation
118until they exit or time out.
119"""
122#############################################################################
123@final
124class ParallelTaskTree:
125 """Main class for dependency-aware workflow scheduling of dataset jobs with optional barriers and priority ordering."""
127 def __init__(
128 self,
129 *,
130 log: logging.Logger,
131 datasets: list[str], # (sorted) list of datasets to process
132 process_dataset: Callable[[str, int], CompletionCallback], # lambda: dataset, tid; must be thread-safe
133 priority: Callable[[str], Comparable] = lambda dataset: dataset, # lexicographical order by default
134 max_workers: int = os.cpu_count() or 1,
135 executors: Callable[[], Executor] | None = None, # factory producing Executor; None means 'auto-choose'
136 interval_nanos: Callable[
137 [int, str, int], int
138 ] = lambda last_update_nanos, dataset, submit_count: 0, # optionally spread tasks out over time; e.g. for jitter
139 timing: TaskTiming = TaskTiming(), # noqa: B008
140 enable_barriers: bool | None = None, # for testing only; None means 'auto-detect'
141 barrier_name: str = BARRIER_CHAR,
142 is_test_mode: bool = False,
143 ) -> None:
144 """Prepares to process datasets in parallel with dependency-aware workflow scheduling and fault tolerance.
146 This class orchestrates parallel execution of dataset operations while maintaining strict hierarchical dependencies.
147 Processing of a dataset only starts after processing of all its ancestor datasets has completed, ensuring data
148 consistency during operations like ZFS replication or snapshot deletion.
150 Purpose:
151 --------
152 - Process hierarchical datasets in parallel while respecting parent-child dependencies
153 - Provide dependency-aware workflow scheduling; error handling and retries are implemented by callers via
154 ``CompletionCallback`` or thin wrappers
155 - Maximize throughput by processing independent dataset subtrees in parallel
156 - Support complex job scheduling patterns via optional barrier synchronization
158 Assumptions:
159 -----------------
160 - Input `datasets` list is sorted in lexicographical order (enforced in test mode)
161 - Input `datasets` list contains no duplicate entries (enforced in test mode)
162 - Input `datasets` list contains no empty dataset names and none that start with '/'
163 - Dataset hierarchy is determined by slash-separated path components
164 - The `process_dataset` callable is thread-safe and can be executed in parallel
166 Design Rationale:
167 -----------------
168 - The implementation uses a priority queue-based scheduler that maintains two key invariants:
170 - Dependency Ordering: Children are only made available for start of processing after their parent completes,
171 preventing inconsistent dataset states.
173 - Priority: Among the datasets available for start of processing, the "smallest" is always processed next, according
174 to a customizable priority callback function, which by default sorts by lexicographical order (not dataset size),
175 ensuring more deterministic execution order.
177 Algorithm Selection:
178 --------------------
179 - Simple Algorithm (default): Used when no barriers ('~') are detected in dataset names. Provides efficient
180 scheduling for standard parent-child dependencies via recursive child enqueueing after parent completion.
182 - Barrier Algorithm (advanced): Activated when barriers are detected or explicitly enabled. Supports complex
183 synchronization scenarios where jobs must wait for completion of entire subtrees before proceeding. Essential
184 for advanced job scheduling patterns like "complete all parallel replications before starting pruning phase."
186 - Both algorithms are CPU and memory efficient. They require main memory proportional to the number of datasets
187 (~400 bytes per dataset), and easily scale to millions of datasets. Time complexity is O(N log N), where
188 N is the number of datasets.
190 Concurrency Design:
191 -------------------
192 By default uses ThreadPoolExecutor with configurable worker limits to balance parallelism against resource
193 consumption. Optionally, plug in a custom Executor to submit tasks to scale-out clusters via frameworks like
194 Ray Core or Dask, etc.
195 The single-threaded coordination loop prevents race conditions while worker threads execute dataset operations in
196 parallel.
198 Params:
199 -------
200 - datasets: Sorted list of dataset names to process (must not contain duplicates)
201 - process_dataset: Thread-safe Callback function to execute on each dataset; returns a CompletionCallback determining
202 if to fail or skip subtree on error; CompletionCallback runs in the (single) main thread as part of the
203 coordination loop.
204 - priority: Callback function to determine dataset processing order; defaults to lexicographical order.
205 - interval_nanos: Callback that returns a non-negative delay (ns) to add to ``next_update_nanos`` for
206 jitter/back-pressure control; arguments are ``(last_update_nanos, dataset, submit_count)``
207 - max_workers: Maximum number of parallel worker threads
208 - executors: Factory returning an Executor to submit tasks to; None means 'auto-choose'
209 - enable_barriers: Force enable/disable barrier algorithm (None = auto-detect)
210 - barrier_name: Directory name that denotes a barrier within dataset/job strings (default '~'); must be non-empty and
211 not contain '/'
212 - timing: Optionally request early async termination; stops new submissions and cancels in-flight tasks
213 """
214 assert log is not None
215 assert (not is_test_mode) or datasets == sorted(datasets), "List is not sorted"
216 assert (not is_test_mode) or not has_duplicates(datasets), "List contains duplicates"
217 if COMPONENT_SEPARATOR in barrier_name or not barrier_name:
218 raise ValueError(f"Invalid barrier_name: {barrier_name}")
219 for dataset in datasets:
220 if dataset.startswith(COMPONENT_SEPARATOR) or not dataset:
221 raise ValueError(f"Invalid dataset name: {dataset}")
222 assert callable(process_dataset)
223 assert callable(priority)
224 assert max_workers > 0
225 assert callable(interval_nanos)
226 has_barrier: Final[bool] = any(barrier_name in dataset.split(COMPONENT_SEPARATOR) for dataset in datasets)
227 assert (enable_barriers is not False) or not has_barrier, "Barrier seen in datasets but barriers explicitly disabled"
229 self._barriers_enabled: Final[bool] = bool(has_barrier or enable_barriers)
230 self._barrier_name: Final[str] = barrier_name
231 self._log: Final[logging.Logger] = log
232 self._datasets: Final[list[str]] = datasets
233 self._process_dataset: Final[Callable[[str, int], CompletionCallback]] = process_dataset
234 self._priority: Final[Callable[[str], Comparable]] = priority
235 self._max_workers: Final[int] = max_workers
236 self._interval_nanos: Final[Callable[[int, str, int], int]] = interval_nanos
237 self._timing: Final[TaskTiming] = timing
238 self._is_test_mode: Final[bool] = is_test_mode
239 self._priority_queue: Final[list[_TreeNode]] = []
240 self._tree: Final[_Tree] = _build_dataset_tree(datasets) # tree consists of nested dictionaries and is immutable
241 self._empty_barrier: Final[_TreeNode] = _make_tree_node("empty_barrier", "empty_barrier", {}) # immutable!
242 self._datasets_set: Final[SortedInterner[str]] = SortedInterner(datasets) # reduces memory footprint
243 if executors is None:
244 is_parallel: bool = max_workers > 1 and len(datasets) > 1 and has_siblings(datasets) # siblings can run in par
246 def _default_executor_factory() -> Executor:
247 return ThreadPoolExecutor(max_workers) if is_parallel else SynchronousExecutor()
249 executors = _default_executor_factory
250 self._executors: Final[Callable[[], Executor]] = executors
251 assert callable(executors)
253 def process_datasets_in_parallel(self) -> bool:
254 """Executes the configured tasks and returns True if any dataset processing failed, False if all succeeded."""
255 self._build_priority_queue()
256 executor: Executor = self._executors()
257 with executor:
258 todo_futures: set[Future[CompletionCallback]] = set()
259 future_to_node: dict[Future[CompletionCallback], _TreeNode] = {}
260 submit_count: int = 0
261 timing: TaskTiming = self._timing
262 next_update_nanos: int = timing.monotonic_ns()
263 wait_timeout: float | None = None
264 failed: bool = False
266 def submit_datasets() -> bool:
267 """Submits available datasets to worker threads and returns False if all tasks have been completed."""
268 nonlocal wait_timeout
269 wait_timeout = None # indicates to use blocking flavor of concurrent.futures.wait()
270 while len(self._priority_queue) > 0 and len(todo_futures) < self._max_workers:
271 # pick "smallest" dataset (wrt. sort order) available for start of processing; submit to thread pool
272 nonlocal next_update_nanos
273 sleep_nanos: int = next_update_nanos - timing.monotonic_ns()
274 if sleep_nanos > 0:
275 timing.sleep(sleep_nanos) # allow early wakeup on async termination
276 if timing.is_terminated():
277 break
278 if sleep_nanos > 0 and len(todo_futures) > 0:
279 wait_timeout = 0 # indicates to use non-blocking flavor of concurrent.futures.wait()
280 # It's possible an even "smaller" dataset (wrt. sort order) has become available while we slept.
281 # If so it's preferable to submit to the thread pool the smaller one first.
282 break # break out of loop to check if that's the case via non-blocking concurrent.futures.wait()
283 node: _TreeNode = heapq.heappop(self._priority_queue) # pick "smallest" dataset (wrt. sort order)
284 nonlocal submit_count
285 submit_count += 1
286 next_update_nanos += max(0, self._interval_nanos(next_update_nanos, node.dataset, submit_count))
287 future: Future[CompletionCallback] = executor.submit(self._process_dataset, node.dataset, submit_count)
288 future_to_node[future] = node
289 todo_futures.add(future)
290 return len(todo_futures) > 0 and not timing.is_terminated()
292 def complete_datasets() -> None:
293 """Waits for completed futures, processes results and errors, then enqueues follow-up tasks per policy."""
294 nonlocal failed
295 nonlocal todo_futures
296 done_futures: set[Future[CompletionCallback]]
297 done_futures, todo_futures = concurrent.futures.wait(todo_futures, wait_timeout, return_when=FIRST_COMPLETED)
298 for done_future in done_futures:
299 done_node: _TreeNode = future_to_node.pop(done_future)
300 c_callback: CompletionCallback = done_future.result() # does not block as processing already completed
301 c_callback_result: CompletionCallbackResult = c_callback(todo_futures)
302 no_skip: bool = c_callback_result.no_skip
303 fail: bool = c_callback_result.fail
304 failed = failed or fail
305 self._complete_dataset(done_node, no_skip=no_skip)
307 # coordination loop; runs in the (single) main thread; submits tasks to worker threads and handles their results
308 while submit_datasets():
309 complete_datasets()
311 if timing.is_terminated():
312 for todo_future in todo_futures:
313 todo_future.cancel()
314 failed = failed or len(self._priority_queue) > 0 or len(todo_futures) > 0
315 self._priority_queue.clear()
316 todo_futures.clear()
317 future_to_node.clear()
318 assert len(self._priority_queue) == 0
319 assert len(todo_futures) == 0
320 assert len(future_to_node) == 0
321 return failed
323 def _build_priority_queue(self) -> None:
324 """Builds and fills initial priority queue of available root nodes for this task tree, ensuring the scheduler starts
325 from a synthetic root node while honoring barriers; the synthetic root simplifies enqueueing logic."""
326 self._priority_queue.clear()
327 root_node: _TreeNode = _make_tree_node(priority="", dataset="", children=self._tree)
328 self._complete_dataset(root_node, no_skip=True)
330 def _complete_dataset(self, node: _TreeNode, no_skip: bool) -> None:
331 """Enqueues child nodes for start of processing, using the appropriate algorithm."""
332 if self._barriers_enabled: # This barrier-based algorithm is for more general job scheduling, as in bzfs_jobrunner
333 self._complete_dataset_with_barriers(node, no_skip=no_skip)
334 elif no_skip: # This simple algorithm is sufficient for most uses
335 self._simple_enqueue_children(node)
337 def _simple_enqueue_children(self, node: _TreeNode) -> None:
338 """Enqueues child nodes for start of processing (using iteration to avoid potentially hitting recursion limits)."""
339 stack: list[_TreeNode] = [node]
340 while stack:
341 current_node: _TreeNode = stack.pop()
342 for child, grandchildren in current_node.children.items(): # as processing of parent has now completed
343 child_abs_dataset: str = self._join_dataset(current_node.dataset, child)
344 child_node: _TreeNode = _make_tree_node(self._priority(child_abs_dataset), child_abs_dataset, grandchildren)
345 if child_abs_dataset in self._datasets_set:
346 heapq.heappush(self._priority_queue, child_node) # make it available for start of processing
347 else: # it's an intermediate node that has no job attached; pass the enqueue operation
348 stack.append(child_node) # ... recursively down the tree
350 def _complete_dataset_with_barriers(self, node: _TreeNode, no_skip: bool) -> None:
351 """After successful completion, enqueues children, opens barriers, and propagates completion upwards.
353 The (more complex) algorithm below is for more general job scheduling, as in bzfs_jobrunner. Here, a "dataset" string
354 is treated as an identifier for any kind of job rather than a reference to a concrete ZFS object. An example
355 "dataset" job string is "src_host1/createsnapshots/replicate_to_hostA". Jobs can depend on another job via a
356 parent/child relationship formed by '/' directory separators within the dataset string, and multiple "datasets" form
357 a job dependency tree by way of common dataset directory prefixes. Jobs that do not depend on each other can be
358 executed in parallel, and jobs can be told to first wait for other jobs to complete successfully. The algorithm is
359 based on a barrier primitive and is typically disabled. It is only required for rare jobrunner configs.
361 For example, a job scheduler can specify that all parallel replications jobs to multiple destinations must succeed
362 before the jobs of the pruning phase can start. More generally, with this algo, a job scheduler can specify that all
363 jobs within a given job subtree (containing any nested combination of sequential and/or parallel jobs) must
364 successfully complete before a certain other job within the job tree is started. This is specified via the barrier
365 directory named by ``barrier_name`` (default '~'). An example is "src_host1/createsnapshots/~/prune".
367 Note that the default '~' is unambiguous as it is not a valid ZFS dataset name component per the naming rules
368 enforced by the 'zfs create', 'zfs snapshot' and 'zfs bookmark' CLIs. Custom barrier names should avoid colliding
369 with real dataset/job components.
370 """
372 def enqueue_children(node: _TreeNode) -> int:
373 """Returns number of jobs that were added to priority_queue for immediate start of processing."""
374 n: int = 0
375 children: _Tree = node.children
376 for child, grandchildren in children.items():
377 abs_dataset: str = self._join_dataset(node.dataset, child)
378 child_node: _TreeNode = _make_tree_node(self._priority(abs_dataset), abs_dataset, grandchildren, parent=node)
379 k: int
380 if child != self._barrier_name:
381 if abs_dataset in self._datasets_set:
382 # it's not a barrier; make job available for immediate start of processing
383 heapq.heappush(self._priority_queue, child_node)
384 k = 1
385 else: # it's an intermediate node that has no job attached; pass the enqueue operation
386 k = enqueue_children(child_node) # ... recursively down the tree
387 elif len(children) == 1: # if the only child is a barrier then pass the enqueue operation
388 k = enqueue_children(child_node) # ... recursively down the tree
389 else: # park the barrier node within the (still closed) barrier for the time being
390 assert node.mut.barrier is None
391 node.mut.barrier = child_node
392 k = 0
393 node.mut.pending += min(1, k)
394 n += k
395 assert n >= 0
396 return n
398 if no_skip:
399 enqueue_children(node) # make child datasets available for start of processing
400 else: # job completed without success
401 # ... thus, opening the barrier shall always do nothing in node and its ancestors.
402 # perf: Irrevocably mark (exactly once) barriers of this node and all its ancestors as cleared due to subtree
403 # skip, via barriers_cleared=True. This enables to avoid redundant re-walking the ancestor chain on subsequent
404 # skip.
405 tmp: _TreeNode | None = node
406 while (tmp is not None) and not tmp.mut.barriers_cleared:
407 tmp.mut.barriers_cleared = True
408 tmp.mut.barrier = self._empty_barrier
409 tmp = tmp.parent
410 assert node.mut.pending >= 0
411 while node.mut.pending == 0: # have all jobs in subtree of current node completed?
412 if no_skip: # ... if so open the barrier, if it exists, and enqueue jobs waiting on it
413 if not (node.mut.barrier is None or node.mut.barrier is self._empty_barrier):
414 node.mut.pending += min(1, enqueue_children(node.mut.barrier))
415 node.mut.barrier = self._empty_barrier
416 if node.mut.pending > 0: # did opening of barrier cause jobs to be enqueued in subtree?
417 break # ... if so we have not yet completed the subtree, so don't mark the subtree as completed yet
418 if node.parent is None:
419 break # we've reached the root node
420 node = node.parent # recurse up the tree to propagate completion upward
421 node.mut.pending -= 1 # mark subtree as completed
422 assert node.mut.pending >= 0
424 def _join_dataset(self, parent: str, child: str) -> str:
425 """Concatenates parent and child dataset names; accommodates synthetic root node; interns for memory footprint."""
426 return self._datasets_set.interned(f"{parent}{COMPONENT_SEPARATOR}{child}" if parent else child)
429#############################################################################
430@final
431class _TreeNodeMutableAttributes:
432 """Container for mutable attributes, stored space efficiently."""
434 __slots__ = ("barrier", "barriers_cleared", "pending") # uses more compact memory layout than __dict__
436 def __init__(self) -> None:
437 self.barrier: _TreeNode | None = None # zero or one barrier TreeNode waiting for this node to complete
438 self.pending: int = 0 # number of children added to priority queue that haven't completed their work yet
439 self.barriers_cleared: bool = False # irrevocably mark barriers of this node and all its ancestors as cleared?
442#############################################################################
443@final
444class _TreeNode(NamedTuple):
445 """Node in dataset dependency tree used by the scheduler; _TreeNodes are ordered by priority and dataset name within a
446 priority queue, via __lt__ comparisons."""
448 priority: Comparable # determines the processing order once this dataset has become available for start of processing
449 dataset: str # each dataset name is unique; attribs other than `priority` and `dataset` are never used for comparisons
450 children: _Tree # dataset "directory" tree consists of nested dicts; aka dict[str, dict]
451 parent: _TreeNode | None
452 mut: _TreeNodeMutableAttributes
454 def __repr__(self) -> str:
455 priority, dataset, pending, barrier = self.priority, self.dataset, self.mut.pending, self.mut.barrier
456 return str({"priority": priority, "dataset": dataset, "pending": pending, "barrier": barrier is not None})
459def _make_tree_node(priority: Comparable, dataset: str, children: _Tree, parent: _TreeNode | None = None) -> _TreeNode:
460 """Creates a TreeNode with mutable state container."""
461 return _TreeNode(priority, dataset, children, parent, _TreeNodeMutableAttributes())
464#############################################################################
465_Tree = dict[str, "_Tree"] # Type alias
468def _build_dataset_tree(sorted_datasets: list[str]) -> _Tree:
469 """Takes as input a sorted list of datasets and returns a (reverse) sorted directory tree containing the same dataset
470 names, in the form of nested dicts; This converts the dataset list into a dependency tree."""
471 tree: _Tree = {}
472 interner: HashedInterner[str] = HashedInterner() # reduces memory footprint
473 shared_empty_leaf: _Tree = {} # tree with shared empty leafs has ~30% lower memory footprint than non-compacted version
475 for dataset in reversed(sorted_datasets):
476 current: _Tree = tree
477 components: list[str] = dataset.split(COMPONENT_SEPARATOR)
478 k: int = len(components) - 1
479 for i, component in enumerate(components):
480 child: _Tree | None = current.get(component)
481 if child is None:
482 child = {} if i < k else shared_empty_leaf # sharing is safe as the tree is treated as immutable henceforth
483 assert current is not shared_empty_leaf
484 component = interner.intern(component)
485 current[component] = child
486 current = child
487 return tree