Coverage for bzfs_main/parallel_tasktree.py: 100%
187 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Fault-tolerant, dependency-aware scheduling and execution of parallel operations, ensuring that ancestor datasets finish
16before descendants start; The design maximizes throughput while preventing inconsistent dataset states during replication or
17snapshot deletion.
19This module contains only generic scheduling and coordination (the "algorithm"). Error handling, retries, and skip policies
20are customizable and implemented by callers via the CompletionCallback API or wrappers such as ``parallel_tasktree_policy``.
21"""
23from __future__ import (
24 annotations,
25)
26import concurrent
27import heapq
28import os
29import threading
30import time
31from concurrent.futures import (
32 FIRST_COMPLETED,
33 Executor,
34 Future,
35 ThreadPoolExecutor,
36)
37from logging import (
38 Logger,
39)
40from typing import (
41 Callable,
42 Final,
43 NamedTuple,
44)
46from bzfs_main.utils import (
47 DONT_SKIP_DATASET,
48 Comparable,
49 Interner,
50 SortedInterner,
51 SynchronousExecutor,
52 has_duplicates,
53 has_siblings,
54 is_descendant,
55)
57# constants:
58BARRIER_CHAR: Final[str] = "~"
61#############################################################################
62_Tree = dict[str, "_Tree"] # Type alias
65def _build_dataset_tree(sorted_datasets: list[str]) -> _Tree:
66 """Takes as input a sorted list of datasets and returns a sorted directory tree containing the same dataset names, in the
67 form of nested dicts."""
68 tree: _Tree = {}
69 interner: Interner[str] = Interner() # reduces memory footprint
70 for dataset in sorted_datasets:
71 current: _Tree = tree
72 for component in dataset.split("/"):
73 child: _Tree | None = current.get(component)
74 if child is None:
75 child = {}
76 component = interner.intern(component)
77 current[component] = child
78 current = child
79 shared_empty_leaf: _Tree = {}
81 def compact(node: _Tree) -> None:
82 """Tree with shared empty leaf nodes has some 30% lower memory footprint than the non-compacted version."""
83 for key, child_node in node.items():
84 if len(child_node) == 0:
85 node[key] = shared_empty_leaf # sharing is safe because the tree is treated as immutable henceforth
86 else:
87 compact(child_node)
89 compact(tree)
90 return tree
93def _build_dataset_tree_and_find_roots(sorted_datasets: list[str], priority: Callable[[str], Comparable]) -> list[_TreeNode]:
94 """For consistency, processing of a dataset only starts after processing of its ancestors has completed."""
95 tree: _Tree = _build_dataset_tree(sorted_datasets) # tree consists of nested dictionaries
96 skip_dataset: str = DONT_SKIP_DATASET
97 roots: list[_TreeNode] = []
98 for dataset in sorted_datasets:
99 if is_descendant(dataset, of_root_dataset=skip_dataset):
100 continue
101 skip_dataset = dataset
102 children: _Tree = tree
103 for component in dataset.split("/"):
104 children = children[component]
105 roots.append(_make_tree_node(priority(dataset), dataset, children))
106 return roots
109class _TreeNodeMutableAttributes:
110 """Container for mutable attributes, stored space efficiently."""
112 __slots__ = ("barrier", "barriers_cleared", "pending") # uses more compact memory layout than __dict__
114 def __init__(self) -> None:
115 self.barrier: _TreeNode | None = None # zero or one barrier TreeNode waiting for this node to complete
116 self.pending: int = 0 # number of children added to priority queue that haven't completed their work yet
117 self.barriers_cleared: bool = False # irrevocably mark barriers of this node and all its ancestors as cleared?
120class _TreeNode(NamedTuple):
121 """Node in dataset dependency tree used by the scheduler; _TreeNodes are ordered by priority and dataset name within a
122 priority queue, via __lt__ comparisons."""
124 priority: Comparable # determines the processing order once this dataset has become available for start of processing
125 dataset: str # each dataset name is unique; attribs other than `priority` and `dataset` are never used for comparisons
126 children: _Tree # dataset "directory" tree consists of nested dicts; aka Dict[str, Dict]
127 parent: _TreeNode | None
128 mut: _TreeNodeMutableAttributes
130 def __repr__(self) -> str:
131 priority, dataset, pending, barrier = self.priority, self.dataset, self.mut.pending, self.mut.barrier
132 return str({"priority": priority, "dataset": dataset, "pending": pending, "barrier": barrier is not None})
135def _make_tree_node(priority: Comparable, dataset: str, children: _Tree, parent: _TreeNode | None = None) -> _TreeNode:
136 """Creates a TreeNode with mutable state container."""
137 return _TreeNode(priority, dataset, children, parent, _TreeNodeMutableAttributes())
140#############################################################################
141class CompletionCallbackResult(NamedTuple):
142 """Result of a CompletionCallback invocation."""
144 no_skip: bool
145 """True enqueues descendants, False skips the subtree."""
147 fail: bool
148 """True marks overall run as failed; exceptions may be raised here."""
151CompletionCallback = Callable[[set[Future["CompletionCallback"]]], CompletionCallbackResult] # Type alias
152"""Callable that is run by the main coordination thread after a ``process_dataset()`` task finishes.
154Purpose:
155- Decide follow-up scheduling after a ``process_dataset()`` task finished.
157Assumptions:
158- Runs in the single coordination thread.
159- May inspect and cancel in-flight futures to implement fail-fast semantics.
160- If cancelling in-flight futures for tasks that spawn subprocesses (e.g. via subprocess.run()), callers should also
161consider terminating the corresponding process subtree to avoid child processes lingering longer than desired. Skipping
162termination will not hang the scheduler (workers will complete naturally), but those subprocesses may outlive cancellation
163until they exit or time out.
164"""
167#############################################################################
168def process_datasets_in_parallel(
169 log: Logger,
170 datasets: list[str], # (sorted) list of datasets to process
171 process_dataset: Callable[[str, int], CompletionCallback], # lambda: dataset, tid; must be thread-safe
172 priority: Callable[[str], Comparable] = lambda dataset: dataset, # lexicographical order by default
173 max_workers: int = os.cpu_count() or 1,
174 executor: Executor | None = None, # the Executor to submit tasks to; None means 'auto-choose'
175 interval_nanos: Callable[
176 [int, str, int], int
177 ] = lambda last_update_nanos, dataset, submitted_count: 0, # optionally spread tasks out over time; e.g. for jitter
178 termination_event: threading.Event | None = None, # optional event to request early async termination
179 enable_barriers: bool | None = None, # for testing only; None means 'auto-detect'
180 is_test_mode: bool = False,
181) -> bool: # returns True if any dataset processing failed, False if all succeeded; thread-safe
182 """Executes dataset processing operations in parallel with dependency-aware scheduling and fault tolerance.
184 This function orchestrates parallel execution of dataset operations while maintaining strict hierarchical dependencies.
185 Processing of a dataset only starts after processing of all its ancestor datasets has completed, ensuring data
186 consistency during operations like ZFS replication or snapshot deletion.
188 Purpose:
189 --------
190 - Process hierarchical datasets in parallel while respecting parent-child dependencies
191 - Provide dependency-aware scheduling; error handling and retries are implemented by callers via ``CompletionCallback``
192 or thin wrappers
193 - Maximize throughput by processing independent dataset subtrees in parallel
194 - Support complex job scheduling patterns via optional barrier synchronization
196 Assumptions:
197 -----------------
198 - Input `datasets` list is sorted in lexicographical order (enforced in test mode)
199 - Input `datasets` list contains no duplicate entries (enforced in test mode)
200 - Dataset hierarchy is determined by slash-separated path components
201 - The `process_dataset` callable is thread-safe and can be executed in parallel
203 Design Rationale:
204 -----------------
205 - The implementation uses a priority queue-based scheduler that maintains two key invariants:
207 - Dependency Ordering: Children are only made available for start of processing after their parent completes,
208 preventing inconsistent dataset states.
210 - Priority: Among the datasets available for start of processing, the "smallest" is always processed next, according to a
211 customizable priority callback function, which by default sorts by lexicographical order (not dataset size), ensuring
212 more deterministic execution order.
214 Algorithm Selection:
215 --------------------
216 - Simple Algorithm (default): Used when no barriers ('~') are detected in dataset names. Provides efficient
217 scheduling for standard parent-child dependencies via recursive child enqueueing after parent completion.
219 - Barrier Algorithm (advanced): Activated when barriers are detected or explicitly enabled. Supports complex
220 synchronization scenarios where jobs must wait for completion of entire subtrees before proceeding. Essential
221 for advanced job scheduling patterns like "complete all parallel replications before starting pruning phase."
223 - Both algorithms are CPU and memory efficient. They require main memory proportional to the number of datasets
224 (~400 bytes per dataset), and easily scale to millions of datasets. Time complexity is O(N log N), where
225 N is the number of datasets.
227 Concurrency Design:
228 -------------------
229 By default uses ThreadPoolExecutor with configurable worker limits to balance parallelism against resource consumption.
230 Optionally, plug in a custom Executor to submit tasks to scale-out clusters via frameworks like Ray Core or Dask, etc.
231 The single-threaded coordination loop prevents race conditions while worker threads execute dataset operations in parallel.
233 Params:
234 -------
235 - datasets: Sorted list of dataset names to process (must not contain duplicates)
236 - process_dataset: Thread-safe Callback function to execute on each dataset; returns a CompletionCallback determining if
237 to fail or skip subtree on error; CompletionCallback runs in the (single) main thread as part of the coordination loop.
238 - priority: Callback function to determine dataset processing order; defaults to lexicographical order.
239 - interval_nanos: Callback that returns a non-negative delay (ns) to add to ``next_update_nanos`` for
240 jitter/back-pressure control; arguments are ``(last_update_nanos, dataset, submitted_count)``
241 - max_workers: Maximum number of parallel worker threads
242 - executor: the Executor to submit tasks to; None means 'auto-choose'
243 - enable_barriers: Force enable/disable barrier algorithm (None = auto-detect)
244 - termination_event: Optional event to request early async termination; stops new submissions and cancels in-flight tasks
246 Returns:
247 --------
248 bool: True if any dataset processing failed, False if all succeeded
250 Raises:
251 -------
252 AssertionError: If input validation fails (sorted order, no duplicates, etc.)
253 Various exceptions: Propagated from process_dataset and its CompletionCallback
254 """
255 assert log is not None
256 assert (not is_test_mode) or datasets == sorted(datasets), "List is not sorted"
257 assert (not is_test_mode) or not has_duplicates(datasets), "List contains duplicates"
258 assert callable(process_dataset)
259 assert callable(priority)
260 assert max_workers > 0
261 assert callable(interval_nanos)
262 termination_event = threading.Event() if termination_event is None else termination_event
263 has_barrier: Final[bool] = any(BARRIER_CHAR in dataset.split("/") for dataset in datasets)
264 assert (enable_barriers is not False) or not has_barrier, "Barriers seen in datasets but barriers explicitly disabled"
265 barriers_enabled: Final[bool] = bool(has_barrier or enable_barriers)
267 empty_barrier: Final[_TreeNode] = _make_tree_node("empty_barrier", "empty_barrier", {}) # immutable!
268 datasets_set: Final[SortedInterner[str]] = SortedInterner(datasets) # reduces memory footprint
269 priority_queue: Final[list[_TreeNode]] = _build_dataset_tree_and_find_roots(datasets, priority)
270 heapq.heapify(priority_queue) # same order as sorted()
271 if executor is None:
272 is_parallel: bool = max_workers > 1 and len(datasets) > 1 and has_siblings(datasets) # siblings can run in parallel
273 executor = ThreadPoolExecutor(max_workers) if is_parallel else SynchronousExecutor()
274 with executor:
275 todo_futures: set[Future[CompletionCallback]] = set()
276 future_to_node: dict[Future[CompletionCallback], _TreeNode] = {}
277 submitted_count: int = 0
278 next_update_nanos: int = time.monotonic_ns()
279 wait_timeout: float | None = None
280 failed: bool = False
282 def submit_datasets() -> bool:
283 """Submits available datasets to worker threads and returns False if all tasks have been completed."""
284 nonlocal wait_timeout
285 wait_timeout = None # indicates to use blocking flavor of concurrent.futures.wait()
286 while len(priority_queue) > 0 and len(todo_futures) < max_workers:
287 # pick "smallest" dataset (wrt. sort order) available for start of processing; submit to thread pool
288 nonlocal next_update_nanos
289 sleep_nanos: int = next_update_nanos - time.monotonic_ns()
290 if sleep_nanos > 0:
291 termination_event.wait(sleep_nanos / 1_000_000_000) # allow early wakeup on async termination
292 if termination_event.is_set():
293 break
294 if sleep_nanos > 0 and len(todo_futures) > 0:
295 wait_timeout = 0 # indicates to use non-blocking flavor of concurrent.futures.wait()
296 # It's possible an even "smaller" dataset (wrt. sort order) has become available while we slept.
297 # If so it's preferable to submit to the thread pool the smaller one first.
298 break # break out of loop to check if that's the case via non-blocking concurrent.futures.wait()
299 node: _TreeNode = heapq.heappop(priority_queue) # pick "smallest" dataset (wrt. sort order)
300 nonlocal submitted_count
301 submitted_count += 1
302 next_update_nanos += max(0, interval_nanos(next_update_nanos, node.dataset, submitted_count))
303 future: Future[CompletionCallback] = executor.submit(process_dataset, node.dataset, submitted_count)
304 future_to_node[future] = node
305 todo_futures.add(future)
306 return len(todo_futures) > 0 and not termination_event.is_set()
308 def complete_datasets() -> None:
309 """Waits for completed futures, processes results and errors, then enqueues follow-up tasks per policy."""
310 nonlocal failed
311 nonlocal todo_futures
312 done_futures: set[Future[CompletionCallback]]
313 done_futures, todo_futures = concurrent.futures.wait(todo_futures, wait_timeout, return_when=FIRST_COMPLETED)
314 for done_future in done_futures:
315 done_node: _TreeNode = future_to_node.pop(done_future)
316 c_callback: CompletionCallback = done_future.result() # does not block as processing has already completed
317 c_callback_result: CompletionCallbackResult = c_callback(todo_futures)
318 no_skip: bool = c_callback_result.no_skip
319 fail: bool = c_callback_result.fail
320 failed = failed or fail
321 if barriers_enabled: # This barrier-based algorithm is for more general job scheduling, as in bzfs_jobrunner
322 _complete_job_with_barriers(done_node, no_skip, priority_queue, priority, datasets_set, empty_barrier)
323 elif no_skip: # This simple algorithm is sufficient for almost all use cases
324 _simple_enqueue_children(done_node, priority_queue, priority, datasets_set)
326 # coordination loop; runs in the (single) main thread; submits tasks to worker threads and handles their results
327 while submit_datasets():
328 complete_datasets()
330 if termination_event.is_set():
331 for todo_future in todo_futures:
332 todo_future.cancel()
333 failed = failed or len(priority_queue) > 0 or len(todo_futures) > 0
334 priority_queue.clear()
335 todo_futures.clear()
336 future_to_node.clear()
337 assert len(priority_queue) == 0
338 assert len(todo_futures) == 0
339 assert len(future_to_node) == 0
340 return failed
343def _simple_enqueue_children(
344 node: _TreeNode,
345 priority_queue: list[_TreeNode],
346 priority: Callable[[str], Comparable],
347 datasets_set: SortedInterner[str],
348) -> None:
349 """Enqueues child nodes for start of processing."""
350 for child, grandchildren in node.children.items(): # as processing of parent has now completed
351 child_abs_dataset: str = datasets_set.interned(f"{node.dataset}/{child}")
352 child_node: _TreeNode = _make_tree_node(priority(child_abs_dataset), child_abs_dataset, grandchildren)
353 if child_abs_dataset in datasets_set:
354 heapq.heappush(priority_queue, child_node) # make it available for start of processing
355 else: # it's an intermediate node that has no job attached; pass the enqueue operation
356 _simple_enqueue_children(child_node, priority_queue, priority, datasets_set) # ... recursively down the tree
359def _complete_job_with_barriers(
360 node: _TreeNode,
361 no_skip: bool,
362 priority_queue: list[_TreeNode],
363 priority: Callable[[str], Comparable],
364 datasets_set: SortedInterner[str],
365 empty_barrier: _TreeNode,
366) -> None:
367 """After successful completion, enqueues children, opens barriers, and propagates completion upwards.
369 The (more complex) algorithm below is for more general job scheduling, as in bzfs_jobrunner. Here, a "dataset" string is
370 treated as an identifier for any kind of job rather than a reference to a concrete ZFS object. An example "dataset" job
371 string is "src_host1/createsnapshots/replicate_to_hostA". Jobs can depend on another job via a parent/child relationship
372 formed by '/' directory separators within the dataset string, and multiple "datasets" form a job dependency tree by way
373 of common dataset directory prefixes. Jobs that do not depend on each other can be executed in parallel, and jobs can be
374 told to first wait for other jobs to complete successfully. The algorithm is based on a barrier primitive and is
375 typically disabled. It is only required for rare jobrunner configs.
377 For example, a job scheduler can specify that all parallel replications jobs to multiple destinations must succeed before
378 the jobs of the pruning phase can start. More generally, with this algo, a job scheduler can specify that all jobs within
379 a given job subtree (containing any nested combination of sequential and/or parallel jobs) must successfully complete
380 before a certain other job within the job tree is started. This is specified via the barrier directory named '~'. An
381 example is "src_host1/createsnapshots/~/prune".
383 Note that '~' is unambiguous as it is not a valid ZFS dataset name component per the naming rules enforced by the 'zfs
384 create', 'zfs snapshot' and 'zfs bookmark' CLIs.
385 """
387 def enqueue_children(node: _TreeNode) -> int:
388 """Returns number of jobs that were added to priority_queue for immediate start of processing."""
389 n: int = 0
390 children: _Tree = node.children
391 for child, grandchildren in children.items():
392 abs_dataset: str = datasets_set.interned(f"{node.dataset}/{child}")
393 child_node: _TreeNode = _make_tree_node(priority(abs_dataset), abs_dataset, grandchildren, parent=node)
394 k: int
395 if child != BARRIER_CHAR:
396 if abs_dataset in datasets_set:
397 # it's not a barrier; make job available for immediate start of processing
398 heapq.heappush(priority_queue, child_node)
399 k = 1
400 else: # it's an intermediate node that has no job attached; pass the enqueue operation
401 k = enqueue_children(child_node) # ... recursively down the tree
402 elif len(children) == 1: # if the only child is a barrier then pass the enqueue operation
403 k = enqueue_children(child_node) # ... recursively down the tree
404 else: # park the barrier node within the (still closed) barrier for the time being
405 assert node.mut.barrier is None
406 node.mut.barrier = child_node
407 k = 0
408 node.mut.pending += min(1, k)
409 n += k
410 assert n >= 0
411 return n
413 if no_skip:
414 enqueue_children(node) # make child datasets available for start of processing
415 else: # job completed without success
416 # ... thus, opening the barrier shall always do nothing in node and its ancestors.
417 # perf: Irrevocably mark (exactly once) barriers of this node and all its ancestors as cleared due to subtree skip,
418 # via barriers_cleared=True. This enables to avoid redundant re-walking the entire ancestor chain on subsequent skip.
419 tmp: _TreeNode | None = node
420 while tmp is not None and not tmp.mut.barriers_cleared:
421 tmp.mut.barriers_cleared = True
422 tmp.mut.barrier = empty_barrier
423 tmp = tmp.parent
424 assert node.mut.pending >= 0
425 while node.mut.pending == 0: # have all jobs in subtree of current node completed?
426 if no_skip: # ... if so open the barrier, if it exists, and enqueue jobs waiting on it
427 if not (node.mut.barrier is None or node.mut.barrier is empty_barrier):
428 node.mut.pending += min(1, enqueue_children(node.mut.barrier))
429 node.mut.barrier = empty_barrier
430 if node.mut.pending > 0: # did opening of barrier cause jobs to be enqueued in subtree?
431 break # ... if so we have not yet completed the subtree, so don't mark the subtree as completed yet
432 if node.parent is None:
433 break # we've reached the root node
434 node = node.parent # recurse up the tree to propagate completion upward
435 node.mut.pending -= 1 # mark subtree as completed
436 assert node.mut.pending >= 0