Coverage for bzfs_main / util / parallel_tasktree.py: 100%
214 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Fault-tolerant, dependency-aware scheduling and execution of parallel operations, ensuring that ancestor datasets finish
16before descendants start; The design maximizes throughput while preventing inconsistent dataset states during replication or
17snapshot deletion.
19This module contains only generic scheduling and coordination (the "algorithm"). Error handling, retries, and skip policies
20are customizable and implemented by callers via the CompletionCallback API or wrappers such as ``parallel_tasktree_policy``.
22Has zero dependencies beyond the Python standard library.
23"""
25from __future__ import (
26 annotations,
27)
28import concurrent
29import heapq
30import logging
31import os
32import threading
33import time
34from concurrent.futures import (
35 FIRST_COMPLETED,
36 Executor,
37 Future,
38 ThreadPoolExecutor,
39)
40from typing import (
41 Callable,
42 Final,
43 NamedTuple,
44 final,
45)
47from bzfs_main.util.utils import (
48 Comparable,
49 HashedInterner,
50 SortedInterner,
51 SynchronousExecutor,
52 has_duplicates,
53 has_siblings,
54)
56# constants:
57BARRIER_CHAR: Final[str] = "~"
58COMPONENT_SEPARATOR: Final[str] = "/" # ZFS dataset component separator
61#############################################################################
62@final
63class CompletionCallbackResult(NamedTuple):
64 """Result of a CompletionCallback invocation."""
66 no_skip: bool
67 """True enqueues children, False skips the subtree."""
69 fail: bool
70 """True marks overall run as failed."""
73#############################################################################
74CompletionCallback = Callable[[set[Future["CompletionCallback"]]], CompletionCallbackResult] # Type alias
75"""Callable that is run by the main coordination thread after a ``process_dataset()`` task finishes.
77Purpose:
78- Decide follow-up scheduling after a ``process_dataset()`` task finished.
80Assumptions:
81- Runs in the single coordination thread.
82- May inspect and cancel in-flight futures to implement fail-fast semantics.
83- If cancelling in-flight futures for tasks that spawn subprocesses (e.g. via subprocess.run()), callers should also
84consider terminating the corresponding process subtree to avoid child processes lingering longer than desired. Skipping
85termination will not hang the scheduler (workers will complete naturally), but those subprocesses may outlive cancellation
86until they exit or time out.
87"""
90#############################################################################
91@final
92class ParallelTaskTree:
93 """Main class for dependency-aware scheduling of dataset jobs with optional barriers and priority ordering."""
95 def __init__(
96 self,
97 log: logging.Logger,
98 datasets: list[str], # (sorted) list of datasets to process
99 process_dataset: Callable[[str, int], CompletionCallback], # lambda: dataset, tid; must be thread-safe
100 priority: Callable[[str], Comparable] = lambda dataset: dataset, # lexicographical order by default
101 max_workers: int = os.cpu_count() or 1,
102 executors: Callable[[], Executor] | None = None, # factory producing Executor; None means 'auto-choose'
103 interval_nanos: Callable[
104 [int, str, int], int
105 ] = lambda last_update_nanos, dataset, submit_count: 0, # optionally spread tasks out over time; e.g. for jitter
106 termination_event: threading.Event | None = None, # optional event to request early async termination
107 enable_barriers: bool | None = None, # for testing only; None means 'auto-detect'
108 barrier_name: str = BARRIER_CHAR,
109 is_test_mode: bool = False,
110 ) -> None:
111 """Prepares to process datasets in parallel with dependency-aware scheduling and fault tolerance.
113 This class orchestrates parallel execution of dataset operations while maintaining strict hierarchical dependencies.
114 Processing of a dataset only starts after processing of all its ancestor datasets has completed, ensuring data
115 consistency during operations like ZFS replication or snapshot deletion.
117 Purpose:
118 --------
119 - Process hierarchical datasets in parallel while respecting parent-child dependencies
120 - Provide dependency-aware scheduling; error handling and retries are implemented by callers via
121 ``CompletionCallback`` or thin wrappers
122 - Maximize throughput by processing independent dataset subtrees in parallel
123 - Support complex job scheduling patterns via optional barrier synchronization
125 Assumptions:
126 -----------------
127 - Input `datasets` list is sorted in lexicographical order (enforced in test mode)
128 - Input `datasets` list contains no duplicate entries (enforced in test mode)
129 - Input `datasets` list contains no empty dataset names and none that start with '/'
130 - Dataset hierarchy is determined by slash-separated path components
131 - The `process_dataset` callable is thread-safe and can be executed in parallel
133 Design Rationale:
134 -----------------
135 - The implementation uses a priority queue-based scheduler that maintains two key invariants:
137 - Dependency Ordering: Children are only made available for start of processing after their parent completes,
138 preventing inconsistent dataset states.
140 - Priority: Among the datasets available for start of processing, the "smallest" is always processed next, according
141 to a customizable priority callback function, which by default sorts by lexicographical order (not dataset size),
142 ensuring more deterministic execution order.
144 Algorithm Selection:
145 --------------------
146 - Simple Algorithm (default): Used when no barriers ('~') are detected in dataset names. Provides efficient
147 scheduling for standard parent-child dependencies via recursive child enqueueing after parent completion.
149 - Barrier Algorithm (advanced): Activated when barriers are detected or explicitly enabled. Supports complex
150 synchronization scenarios where jobs must wait for completion of entire subtrees before proceeding. Essential
151 for advanced job scheduling patterns like "complete all parallel replications before starting pruning phase."
153 - Both algorithms are CPU and memory efficient. They require main memory proportional to the number of datasets
154 (~400 bytes per dataset), and easily scale to millions of datasets. Time complexity is O(N log N), where
155 N is the number of datasets.
157 Concurrency Design:
158 -------------------
159 By default uses ThreadPoolExecutor with configurable worker limits to balance parallelism against resource
160 consumption. Optionally, plug in a custom Executor to submit tasks to scale-out clusters via frameworks like
161 Ray Core or Dask, etc.
162 The single-threaded coordination loop prevents race conditions while worker threads execute dataset operations in
163 parallel.
165 Params:
166 -------
167 - datasets: Sorted list of dataset names to process (must not contain duplicates)
168 - process_dataset: Thread-safe Callback function to execute on each dataset; returns a CompletionCallback determining
169 if to fail or skip subtree on error; CompletionCallback runs in the (single) main thread as part of the
170 coordination loop.
171 - priority: Callback function to determine dataset processing order; defaults to lexicographical order.
172 - interval_nanos: Callback that returns a non-negative delay (ns) to add to ``next_update_nanos`` for
173 jitter/back-pressure control; arguments are ``(last_update_nanos, dataset, submit_count)``
174 - max_workers: Maximum number of parallel worker threads
175 - executors: Factory returning an Executor to submit tasks to; None means 'auto-choose'
176 - enable_barriers: Force enable/disable barrier algorithm (None = auto-detect)
177 - barrier_name: Directory name that denotes a barrier within dataset/job strings (default '~'); must be non-empty and
178 not contain '/'
179 - termination_event: Optional event to request early async termination; stops new submissions and cancels in-flight
180 tasks
181 """
182 assert log is not None
183 assert (not is_test_mode) or datasets == sorted(datasets), "List is not sorted"
184 assert (not is_test_mode) or not has_duplicates(datasets), "List contains duplicates"
185 if COMPONENT_SEPARATOR in barrier_name or not barrier_name:
186 raise ValueError(f"Invalid barrier_name: {barrier_name}")
187 for dataset in datasets:
188 if dataset.startswith(COMPONENT_SEPARATOR) or not dataset:
189 raise ValueError(f"Invalid dataset name: {dataset}")
190 assert callable(process_dataset)
191 assert callable(priority)
192 assert max_workers > 0
193 assert callable(interval_nanos)
194 has_barrier: Final[bool] = any(barrier_name in dataset.split(COMPONENT_SEPARATOR) for dataset in datasets)
195 assert (enable_barriers is not False) or not has_barrier, "Barrier seen in datasets but barriers explicitly disabled"
197 self._barriers_enabled: Final[bool] = bool(has_barrier or enable_barriers)
198 self._barrier_name: Final[str] = barrier_name
199 self._log: Final[logging.Logger] = log
200 self._datasets: Final[list[str]] = datasets
201 self._process_dataset: Final[Callable[[str, int], CompletionCallback]] = process_dataset
202 self._priority: Final[Callable[[str], Comparable]] = priority
203 self._max_workers: Final[int] = max_workers
204 self._interval_nanos: Final[Callable[[int, str, int], int]] = interval_nanos
205 self._termination_event: Final[threading.Event] = (
206 threading.Event() if termination_event is None else termination_event
207 )
208 self._is_test_mode: Final[bool] = is_test_mode
209 self._priority_queue: Final[list[_TreeNode]] = []
210 self._tree: Final[_Tree] = _build_dataset_tree(datasets) # tree consists of nested dictionaries and is immutable
211 self._empty_barrier: Final[_TreeNode] = _make_tree_node("empty_barrier", "empty_barrier", {}) # immutable!
212 self._datasets_set: Final[SortedInterner[str]] = SortedInterner(datasets) # reduces memory footprint
213 if executors is None:
214 is_parallel: bool = max_workers > 1 and len(datasets) > 1 and has_siblings(datasets) # siblings can run in par
216 def _default_executor_factory() -> Executor:
217 return ThreadPoolExecutor(max_workers) if is_parallel else SynchronousExecutor()
219 executors = _default_executor_factory
220 self._executors: Final[Callable[[], Executor]] = executors
221 assert callable(executors)
223 def process_datasets_in_parallel(self) -> bool:
224 """Executes the configured tasks and returns True if any dataset processing failed, False if all succeeded."""
225 self._build_priority_queue()
226 executor: Executor = self._executors()
227 with executor:
228 todo_futures: set[Future[CompletionCallback]] = set()
229 future_to_node: dict[Future[CompletionCallback], _TreeNode] = {}
230 submit_count: int = 0
231 next_update_nanos: int = time.monotonic_ns()
232 wait_timeout: float | None = None
233 failed: bool = False
235 def submit_datasets() -> bool:
236 """Submits available datasets to worker threads and returns False if all tasks have been completed."""
237 nonlocal wait_timeout
238 wait_timeout = None # indicates to use blocking flavor of concurrent.futures.wait()
239 while len(self._priority_queue) > 0 and len(todo_futures) < self._max_workers:
240 # pick "smallest" dataset (wrt. sort order) available for start of processing; submit to thread pool
241 nonlocal next_update_nanos
242 sleep_nanos: int = next_update_nanos - time.monotonic_ns()
243 if sleep_nanos > 0:
244 self._termination_event.wait(sleep_nanos / 1_000_000_000) # allow early wakeup on async termination
245 if self._termination_event.is_set():
246 break
247 if sleep_nanos > 0 and len(todo_futures) > 0:
248 wait_timeout = 0 # indicates to use non-blocking flavor of concurrent.futures.wait()
249 # It's possible an even "smaller" dataset (wrt. sort order) has become available while we slept.
250 # If so it's preferable to submit to the thread pool the smaller one first.
251 break # break out of loop to check if that's the case via non-blocking concurrent.futures.wait()
252 node: _TreeNode = heapq.heappop(self._priority_queue) # pick "smallest" dataset (wrt. sort order)
253 nonlocal submit_count
254 submit_count += 1
255 next_update_nanos += max(0, self._interval_nanos(next_update_nanos, node.dataset, submit_count))
256 future: Future[CompletionCallback] = executor.submit(self._process_dataset, node.dataset, submit_count)
257 future_to_node[future] = node
258 todo_futures.add(future)
259 return len(todo_futures) > 0 and not self._termination_event.is_set()
261 def complete_datasets() -> None:
262 """Waits for completed futures, processes results and errors, then enqueues follow-up tasks per policy."""
263 nonlocal failed
264 nonlocal todo_futures
265 done_futures: set[Future[CompletionCallback]]
266 done_futures, todo_futures = concurrent.futures.wait(todo_futures, wait_timeout, return_when=FIRST_COMPLETED)
267 for done_future in done_futures:
268 done_node: _TreeNode = future_to_node.pop(done_future)
269 c_callback: CompletionCallback = done_future.result() # does not block as processing already completed
270 c_callback_result: CompletionCallbackResult = c_callback(todo_futures)
271 no_skip: bool = c_callback_result.no_skip
272 fail: bool = c_callback_result.fail
273 failed = failed or fail
274 self._complete_dataset(done_node, no_skip=no_skip)
276 # coordination loop; runs in the (single) main thread; submits tasks to worker threads and handles their results
277 while submit_datasets():
278 complete_datasets()
280 if self._termination_event.is_set():
281 for todo_future in todo_futures:
282 todo_future.cancel()
283 failed = failed or len(self._priority_queue) > 0 or len(todo_futures) > 0
284 self._priority_queue.clear()
285 todo_futures.clear()
286 future_to_node.clear()
287 assert len(self._priority_queue) == 0
288 assert len(todo_futures) == 0
289 assert len(future_to_node) == 0
290 return failed
292 def _build_priority_queue(self) -> None:
293 """Builds and fills initial priority queue of available root nodes for this task tree, ensuring the scheduler starts
294 from a synthetic root node while honoring barriers; the synthetic root simplifies enqueueing logic."""
295 self._priority_queue.clear()
296 root_node: _TreeNode = _make_tree_node(priority="", dataset="", children=self._tree)
297 self._complete_dataset(root_node, no_skip=True)
299 def _complete_dataset(self, node: _TreeNode, no_skip: bool) -> None:
300 """Enqueues child nodes for start of processing, using the appropriate algorithm."""
301 if self._barriers_enabled: # This barrier-based algorithm is for more general job scheduling, as in bzfs_jobrunner
302 self._complete_dataset_with_barriers(node, no_skip=no_skip)
303 elif no_skip: # This simple algorithm is sufficient for most uses
304 self._simple_enqueue_children(node)
306 def _simple_enqueue_children(self, node: _TreeNode) -> None:
307 """Enqueues child nodes for start of processing (using iteration to avoid potentially hitting recursion limits)."""
308 stack: list[_TreeNode] = [node]
309 while stack:
310 current_node: _TreeNode = stack.pop()
311 for child, grandchildren in current_node.children.items(): # as processing of parent has now completed
312 child_abs_dataset: str = self._join_dataset(current_node.dataset, child)
313 child_node: _TreeNode = _make_tree_node(self._priority(child_abs_dataset), child_abs_dataset, grandchildren)
314 if child_abs_dataset in self._datasets_set:
315 heapq.heappush(self._priority_queue, child_node) # make it available for start of processing
316 else: # it's an intermediate node that has no job attached; pass the enqueue operation
317 stack.append(child_node) # ... recursively down the tree
319 def _complete_dataset_with_barriers(self, node: _TreeNode, no_skip: bool) -> None:
320 """After successful completion, enqueues children, opens barriers, and propagates completion upwards.
322 The (more complex) algorithm below is for more general job scheduling, as in bzfs_jobrunner. Here, a "dataset" string
323 is treated as an identifier for any kind of job rather than a reference to a concrete ZFS object. An example
324 "dataset" job string is "src_host1/createsnapshots/replicate_to_hostA". Jobs can depend on another job via a
325 parent/child relationship formed by '/' directory separators within the dataset string, and multiple "datasets" form
326 a job dependency tree by way of common dataset directory prefixes. Jobs that do not depend on each other can be
327 executed in parallel, and jobs can be told to first wait for other jobs to complete successfully. The algorithm is
328 based on a barrier primitive and is typically disabled. It is only required for rare jobrunner configs.
330 For example, a job scheduler can specify that all parallel replications jobs to multiple destinations must succeed
331 before the jobs of the pruning phase can start. More generally, with this algo, a job scheduler can specify that all
332 jobs within a given job subtree (containing any nested combination of sequential and/or parallel jobs) must
333 successfully complete before a certain other job within the job tree is started. This is specified via the barrier
334 directory named by ``barrier_name`` (default '~'). An example is "src_host1/createsnapshots/~/prune".
336 Note that the default '~' is unambiguous as it is not a valid ZFS dataset name component per the naming rules
337 enforced by the 'zfs create', 'zfs snapshot' and 'zfs bookmark' CLIs. Custom barrier names should avoid colliding
338 with real dataset/job components.
339 """
341 def enqueue_children(node: _TreeNode) -> int:
342 """Returns number of jobs that were added to priority_queue for immediate start of processing."""
343 n: int = 0
344 children: _Tree = node.children
345 for child, grandchildren in children.items():
346 abs_dataset: str = self._join_dataset(node.dataset, child)
347 child_node: _TreeNode = _make_tree_node(self._priority(abs_dataset), abs_dataset, grandchildren, parent=node)
348 k: int
349 if child != self._barrier_name:
350 if abs_dataset in self._datasets_set:
351 # it's not a barrier; make job available for immediate start of processing
352 heapq.heappush(self._priority_queue, child_node)
353 k = 1
354 else: # it's an intermediate node that has no job attached; pass the enqueue operation
355 k = enqueue_children(child_node) # ... recursively down the tree
356 elif len(children) == 1: # if the only child is a barrier then pass the enqueue operation
357 k = enqueue_children(child_node) # ... recursively down the tree
358 else: # park the barrier node within the (still closed) barrier for the time being
359 assert node.mut.barrier is None
360 node.mut.barrier = child_node
361 k = 0
362 node.mut.pending += min(1, k)
363 n += k
364 assert n >= 0
365 return n
367 if no_skip:
368 enqueue_children(node) # make child datasets available for start of processing
369 else: # job completed without success
370 # ... thus, opening the barrier shall always do nothing in node and its ancestors.
371 # perf: Irrevocably mark (exactly once) barriers of this node and all its ancestors as cleared due to subtree
372 # skip, via barriers_cleared=True. This enables to avoid redundant re-walking the entire ancestor chain on
373 # subsequent skip.
374 tmp: _TreeNode | None = node
375 while (tmp is not None) and not tmp.mut.barriers_cleared:
376 tmp.mut.barriers_cleared = True
377 tmp.mut.barrier = self._empty_barrier
378 tmp = tmp.parent
379 assert node.mut.pending >= 0
380 while node.mut.pending == 0: # have all jobs in subtree of current node completed?
381 if no_skip: # ... if so open the barrier, if it exists, and enqueue jobs waiting on it
382 if not (node.mut.barrier is None or node.mut.barrier is self._empty_barrier):
383 node.mut.pending += min(1, enqueue_children(node.mut.barrier))
384 node.mut.barrier = self._empty_barrier
385 if node.mut.pending > 0: # did opening of barrier cause jobs to be enqueued in subtree?
386 break # ... if so we have not yet completed the subtree, so don't mark the subtree as completed yet
387 if node.parent is None:
388 break # we've reached the root node
389 node = node.parent # recurse up the tree to propagate completion upward
390 node.mut.pending -= 1 # mark subtree as completed
391 assert node.mut.pending >= 0
393 def _join_dataset(self, parent: str, child: str) -> str:
394 """Concatenates parent and child dataset names; accommodates synthetic root node; interns for memory footprint."""
395 return self._datasets_set.interned(f"{parent}{COMPONENT_SEPARATOR}{child}" if parent else child)
398#############################################################################
399@final
400class _TreeNodeMutableAttributes:
401 """Container for mutable attributes, stored space efficiently."""
403 __slots__ = ("barrier", "barriers_cleared", "pending") # uses more compact memory layout than __dict__
405 def __init__(self) -> None:
406 self.barrier: _TreeNode | None = None # zero or one barrier TreeNode waiting for this node to complete
407 self.pending: int = 0 # number of children added to priority queue that haven't completed their work yet
408 self.barriers_cleared: bool = False # irrevocably mark barriers of this node and all its ancestors as cleared?
411#############################################################################
412@final
413class _TreeNode(NamedTuple):
414 """Node in dataset dependency tree used by the scheduler; _TreeNodes are ordered by priority and dataset name within a
415 priority queue, via __lt__ comparisons."""
417 priority: Comparable # determines the processing order once this dataset has become available for start of processing
418 dataset: str # each dataset name is unique; attribs other than `priority` and `dataset` are never used for comparisons
419 children: _Tree # dataset "directory" tree consists of nested dicts; aka dict[str, dict]
420 parent: _TreeNode | None
421 mut: _TreeNodeMutableAttributes
423 def __repr__(self) -> str:
424 priority, dataset, pending, barrier = self.priority, self.dataset, self.mut.pending, self.mut.barrier
425 return str({"priority": priority, "dataset": dataset, "pending": pending, "barrier": barrier is not None})
428def _make_tree_node(priority: Comparable, dataset: str, children: _Tree, parent: _TreeNode | None = None) -> _TreeNode:
429 """Creates a TreeNode with mutable state container."""
430 return _TreeNode(priority, dataset, children, parent, _TreeNodeMutableAttributes())
433#############################################################################
434_Tree = dict[str, "_Tree"] # Type alias
437def _build_dataset_tree(sorted_datasets: list[str]) -> _Tree:
438 """Takes as input a sorted list of datasets and returns a sorted directory tree containing the same dataset names, in the
439 form of nested dicts; This converts the dataset list into a dependency tree."""
440 tree: _Tree = {}
441 interner: HashedInterner[str] = HashedInterner() # reduces memory footprint
442 for dataset in sorted_datasets:
443 current: _Tree = tree
444 for component in dataset.split(COMPONENT_SEPARATOR):
445 child: _Tree | None = current.get(component)
446 if child is None:
447 child = {}
448 component = interner.intern(component)
449 current[component] = child
450 current = child
451 shared_empty_leaf: _Tree = {}
453 def compact(node: _Tree) -> None:
454 """Tree with shared empty leaf nodes has some 30% lower memory footprint than the non-compacted version."""
455 stack: list[_Tree] = [node]
456 while stack: # algo implemented using iteration to avoid hitting recursion limits with pathologically deep trees
457 current_node: _Tree = stack.pop()
458 for key, child_node in current_node.items():
459 if len(child_node) == 0:
460 current_node[key] = shared_empty_leaf # sharing is safe as the tree is treated as immutable henceforth
461 else:
462 stack.append(child_node) # recurse down the tree
464 compact(tree)
465 return tree