Coverage for bzfs_main/parallel_engine.py: 100%
183 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Fault-tolerant, dependency-aware scheduling and execution of parallel operations, ensuring that ancestor datasets finish
16before descendants start; The design maximizes throughput while preventing inconsistent dataset states during replication or
17snapshot deletion."""
19from __future__ import annotations
20import argparse
21import concurrent
22import heapq
23import logging
24import os
25import subprocess
26import time
27from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor
28from logging import Logger
29from typing import (
30 Any,
31 Callable,
32 Dict,
33 NamedTuple,
34)
36from bzfs_main.retry import (
37 Retry,
38 RetryPolicy,
39 run_with_retries,
40)
41from bzfs_main.utils import (
42 DONT_SKIP_DATASET,
43 Interner,
44 SortedInterner,
45 dry,
46 has_duplicates,
47 human_readable_duration,
48 is_descendant,
49 terminate_process_subtree,
50)
52# constants:
53BARRIER_CHAR: str = "~"
56#############################################################################
57Tree = Dict[str, "Tree"] # Type alias
60def _build_dataset_tree(sorted_datasets: list[str]) -> Tree:
61 """Takes as input a sorted list of datasets and returns a sorted directory tree containing the same dataset names, in the
62 form of nested dicts."""
63 tree: Tree = {}
64 interner: Interner[str] = Interner() # reduces memory footprint
65 for dataset in sorted_datasets:
66 current: Tree = tree
67 for component in dataset.split("/"):
68 child: Tree | None = current.get(component)
69 if child is None:
70 child = {}
71 component = interner.intern(component)
72 current[component] = child
73 current = child
74 shared_empty_leaf: Tree = {}
76 def compact(node: Tree) -> None:
77 """Tree with shared empty leaf nodes has some 30% lower memory footprint than the non-compacted version."""
78 for key, child_node in node.items():
79 if len(child_node) == 0:
80 node[key] = shared_empty_leaf # sharing is safe because the tree is treated as immutable henceforth
81 else:
82 compact(child_node)
84 compact(tree)
85 return tree
88def _build_dataset_tree_and_find_roots(sorted_datasets: list[str]) -> list[TreeNode]:
89 """For consistency, processing of a dataset only starts after processing of its ancestors has completed."""
90 tree: Tree = _build_dataset_tree(sorted_datasets) # tree consists of nested dictionaries
91 skip_dataset: str = DONT_SKIP_DATASET
92 roots: list[TreeNode] = []
93 for dataset in sorted_datasets:
94 if is_descendant(dataset, of_root_dataset=skip_dataset):
95 continue
96 skip_dataset = dataset
97 children: Tree = tree
98 for component in dataset.split("/"):
99 children = children[component]
100 roots.append(_make_tree_node(dataset, children))
101 return roots
104class TreeNodeMutableAttributes:
105 """Container for mutable attributes, stored space efficiently."""
107 __slots__ = ("barrier", "pending") # uses more compact memory layout than __dict__
109 def __init__(self) -> None:
110 self.barrier: TreeNode | None = None # zero or one barrier TreeNode waiting for this node to complete
111 self.pending: int = 0 # number of children added to priority queue that haven't completed their work yet
114class TreeNode(NamedTuple):
115 """Node in dataset dependency tree used by the scheduler; TreeNodes are ordered by dataset name within a priority queue
116 via __lt__ comparisons."""
118 dataset: str # Each dataset name is unique, thus attributes other than `dataset` are never used for comparisons
119 children: Tree # dataset "directory" tree consists of nested dicts; aka Dict[str, Dict]
120 parent: Any # aka TreeNode
121 mut: TreeNodeMutableAttributes
123 def __repr__(self) -> str:
124 dataset, pending, barrier, nchildren = self.dataset, self.mut.pending, self.mut.barrier, len(self.children)
125 return str({"dataset": dataset, "pending": pending, "barrier": barrier is not None, "nchildren": nchildren})
128def _make_tree_node(dataset: str, children: Tree, parent: TreeNode | None = None) -> TreeNode:
129 """Creates a TreeNode with mutable state container."""
130 return TreeNode(dataset, children, parent, TreeNodeMutableAttributes())
133def process_datasets_in_parallel_and_fault_tolerant(
134 log: Logger,
135 datasets: list[str],
136 process_dataset: Callable[[str, str, Retry], bool], # lambda[dataset, tid, Retry]; must be thread-safe
137 skip_tree_on_error: Callable[[str], bool], # lambda[dataset]
138 skip_on_error: str = "fail",
139 max_workers: int = os.cpu_count() or 1,
140 interval_nanos: Callable[[str], int] = lambda dataset: 0, # optionally, spread tasks out over time; e.g. for jitter
141 task_name: str = "Task",
142 enable_barriers: bool | None = None, # for testing only; None means 'auto-detect'
143 append_exception: Callable[[BaseException, str, str], None] = lambda ex, task_name, task_description: None,
144 retry_policy: RetryPolicy | None = None,
145 dry_run: bool = False,
146 is_test_mode: bool = False,
147) -> bool:
148 """Executes dataset processing operations in parallel with dependency-aware scheduling and fault tolerance.
150 This function orchestrates parallel execution of dataset operations while maintaining strict hierarchical
151 dependencies. Processing of a dataset only starts after processing of all its ancestor datasets has completed,
152 ensuring data consistency during operations like ZFS replication or snapshot deletion.
154 Purpose:
155 --------
156 - Process hierarchical datasets in parallel while respecting parent-child dependencies
157 - Provide fault tolerance with configurable error handling and retry mechanisms
158 - Maximize throughput by processing independent dataset subtrees in parallel
159 - Support complex job scheduling patterns via optional barrier synchronization
161 Assumptions:
162 -----------------
163 - Input `datasets` list is sorted in lexicographical order (enforced in test mode)
164 - Input `datasets` list contains no duplicate entries (enforced in test mode)
165 - Dataset hierarchy is determined by slash-separated path components
166 - The `process_dataset` callable is thread-safe and can be executed in parallel
168 Design Rationale:
169 -----------------
170 - The implementation uses a priority queue-based scheduler that maintains two key invariants:
172 - Dependency Ordering: Children are only made available for start of processing after their parent completes,
173 preventing inconsistent dataset states.
175 - Lexicographical Priority: Among the datasets available for start of processing, the lexicographically smallest
176 is always processed next, ensuring more deterministic execution order.
178 Algorithm Selection:
179 --------------------
180 - Simple Algorithm (default): Used when no barriers ('~') are detected in dataset names. Provides efficient
181 scheduling for standard parent-child dependencies via recursive child enqueueing after parent completion.
183 - Barrier Algorithm (advanced): Activated when barriers are detected or explicitly enabled. Supports complex
184 synchronization scenarios where jobs must wait for completion of entire subtrees before proceeding. Essential
185 for advanced job scheduling patterns like "complete all parallel replications before starting pruning phase."
187 - Both algorithms are CPU and memory efficient. They require main memory proportional to the number of dataset
188 names (~400 bytes per dataset name), and easily scale to millions of datasets. Time complexity is O(N log N),
189 where N is the number of datasets.
191 Error Handling Strategy:
192 ------------------------
193 - "fail": Immediately terminate all processing on first error (fail-fast)
194 - "dataset": Skip failed dataset but continue processing others
195 - "tree": Skip entire subtree rooted at failed dataset, determined by `skip_tree_on_error`
197 Concurrency Design:
198 -------------------
199 Uses ThreadPoolExecutor with configurable worker limits to balance parallelism against resource consumption. The
200 single-threaded coordination loop prevents race conditions while worker threads execute dataset operations in parallel.
202 Params:
203 -------
204 - datasets: Sorted list of dataset names to process (must not contain duplicates)
205 - process_dataset: Thread-safe function to execute on each dataset
206 - skip_tree_on_error: Function determining whether to skip subtree on error
207 - max_workers: Maximum number of parallel worker threads
208 - enable_barriers: Force enable/disable barrier algorithm (None = auto-detect)
210 Returns:
211 --------
212 bool: True if any dataset processing failed, False if all succeeded
214 Raises:
215 -------
216 AssertionError: If input validation fails (sorted order, no duplicates, etc.)
217 Various exceptions: Propagated from process_dataset when skip_on_error="fail"
218 """
219 assert (not is_test_mode) or datasets == sorted(datasets), "List is not sorted"
220 assert (not is_test_mode) or not has_duplicates(datasets), "List contains duplicates"
221 assert callable(process_dataset)
222 assert callable(skip_tree_on_error)
223 assert max_workers > 0
224 assert callable(interval_nanos)
225 assert "%" not in task_name
226 has_barrier: bool = any(BARRIER_CHAR in dataset.split("/") for dataset in datasets)
227 assert (enable_barriers is not False) or not has_barrier
228 barriers_enabled: bool = bool(has_barrier or enable_barriers)
229 assert callable(append_exception)
230 retry_policy = (
231 retry_policy
232 if retry_policy is not None
233 else RetryPolicy( # no retries
234 argparse.Namespace(retries=0, retry_min_sleep_secs=0, retry_max_sleep_secs=0, retry_max_elapsed_secs=0)
235 )
236 )
237 is_debug: bool = log.isEnabledFor(logging.DEBUG)
239 def _process_dataset(dataset: str, tid: str) -> bool:
240 """Runs ``process_dataset`` with retries and logs duration."""
241 start_time_nanos: int = time.monotonic_ns()
242 try:
243 return run_with_retries(log, retry_policy, process_dataset, dataset, tid)
244 finally:
245 if is_debug:
246 elapsed_duration: str = human_readable_duration(time.monotonic_ns() - start_time_nanos)
247 log.debug(dry(f"{tid} {task_name} done: %s took %s", dry_run), dataset, elapsed_duration)
249 assert (not is_test_mode) or str(_make_tree_node("foo", {}))
250 immutable_empty_barrier: TreeNode = _make_tree_node("immutable_empty_barrier", {})
251 priority_queue: list[TreeNode] = _build_dataset_tree_and_find_roots(datasets)
252 heapq.heapify(priority_queue) # same order as sorted()
253 len_datasets: int = len(datasets)
254 datasets_set: SortedInterner[str] = SortedInterner(datasets) # reduces memory footprint
255 with ThreadPoolExecutor(max_workers=max_workers) as executor:
256 todo_futures: set[Future[Any]] = set()
257 future_to_node: dict[Future[Any], TreeNode] = {}
258 submitted: int = 0
259 next_update_nanos: int = time.monotonic_ns()
260 fw_timeout: float | None = None
262 def submit_datasets() -> bool:
263 """Submits available datasets to worker threads and returns False if all tasks have been completed."""
264 nonlocal fw_timeout
265 fw_timeout = None # indicates to use blocking flavor of concurrent.futures.wait()
266 while len(priority_queue) > 0 and len(todo_futures) < max_workers:
267 # pick "smallest" dataset (wrt. sort order) available for start of processing; submit to thread pool
268 nonlocal next_update_nanos
269 sleep_nanos: int = next_update_nanos - time.monotonic_ns()
270 if sleep_nanos > 0:
271 time.sleep(sleep_nanos / 1_000_000_000) # seconds
272 if sleep_nanos > 0 and len(todo_futures) > 0:
273 fw_timeout = 0 # indicates to use non-blocking flavor of concurrent.futures.wait()
274 # It's possible an even "smaller" dataset (wrt. sort order) has become available while we slept.
275 # If so it's preferable to submit to the thread pool the smaller one first.
276 break # break out of loop to check if that's the case via non-blocking concurrent.futures.wait()
277 node: TreeNode = heapq.heappop(priority_queue) # pick "smallest" dataset (wrt. sort order)
278 next_update_nanos += max(0, interval_nanos(node.dataset))
279 nonlocal submitted
280 submitted += 1
281 future: Future[Any] = executor.submit(_process_dataset, node.dataset, tid=f"{submitted}/{len_datasets}")
282 future_to_node[future] = node
283 todo_futures.add(future)
284 return len(todo_futures) > 0
286 # coordination loop; runs in the (single) main thread; submits tasks to worker threads and handles their results
287 failed: bool = False
288 while submit_datasets():
289 done_futures: set[Future[Any]]
290 done_futures, todo_futures = concurrent.futures.wait(todo_futures, fw_timeout, return_when=FIRST_COMPLETED)
291 for done_future in done_futures:
292 done_future_node: TreeNode = future_to_node.pop(done_future)
293 dataset: str = done_future_node.dataset
294 try:
295 no_skip: bool = done_future.result() # does not block as processing has already completed
296 except (subprocess.CalledProcessError, subprocess.TimeoutExpired, SystemExit, UnicodeDecodeError) as e:
297 failed = True
298 if skip_on_error == "fail":
299 [todo_future.cancel() for todo_future in todo_futures]
300 terminate_process_subtree(except_current_process=True)
301 raise
302 no_skip = not (skip_on_error == "tree" or skip_tree_on_error(dataset))
303 log.error("%s", e)
304 append_exception(e, task_name, dataset)
306 if not barriers_enabled:
307 # This simple algorithm is sufficient for almost all use cases:
308 def simple_enqueue_children(node: TreeNode) -> None:
309 """Recursively enqueues child nodes for start of processing."""
310 for child, grandchildren in node.children.items(): # as processing of parent has now completed
311 child_node: TreeNode = _make_tree_node(
312 datasets_set.interned(f"{node.dataset}/{child}"), grandchildren
313 )
314 if child_node.dataset in datasets_set:
315 heapq.heappush(priority_queue, child_node) # make it available for start of processing
316 else: # it's an intermediate node that has no job attached; pass the enqueue operation
317 simple_enqueue_children(child_node) # ... recursively down the tree
319 if no_skip:
320 simple_enqueue_children(done_future_node)
321 else:
322 # The (more complex) algorithm below is for more general job scheduling, as in bzfs_jobrunner.
323 # Here, a "dataset" string is treated as an identifier for any kind of job rather than a reference
324 # to a concrete ZFS object. Example "dataset" job string: "src_host1/createsnapshot/push/prune".
325 # Jobs can depend on another job via a parent/child relationship formed by '/' directory separators
326 # within the dataset string, and multiple "datasets" form a job dependency tree by way of common
327 # dataset directory prefixes. Jobs that do not depend on each other can be executed in parallel, and
328 # jobs can be told to first wait for other jobs to complete successfully. The algorithm is based on
329 # a barrier primitive and is typically disabled; it is only required for rare jobrunner configs. For
330 # example, a job scheduler can specify that all parallel push replications jobs to multiple
331 # destinations must succeed before the jobs of the pruning phase can start. More generally, with
332 # this algo, a job scheduler can specify that all jobs within a given job subtree (containing any
333 # nested combination of sequential and/or parallel jobs) must successfully complete before a certain
334 # other job within the job tree is started. This is specified via the barrier directory named "~".
335 # Example: "src_host1/createsnapshot/~/prune".
336 # Note that "~" is unambiguous as it is not a valid ZFS dataset name component per the naming rules
337 # enforced by the 'zfs create', 'zfs snapshot' and 'zfs bookmark' CLIs.
338 def enqueue_children(node: TreeNode) -> int:
339 """Returns number of jobs that were added to priority_queue for immediate start of processing."""
340 n: int = 0
341 children: Tree = node.children
342 for child, grandchildren in children.items():
343 child_node: TreeNode = _make_tree_node(
344 datasets_set.interned(f"{node.dataset}/{child}"), grandchildren, parent=node
345 )
346 k: int
347 if child != BARRIER_CHAR:
348 if child_node.dataset in datasets_set:
349 # it's not a barrier; make job available for immediate start of processing
350 heapq.heappush(priority_queue, child_node)
351 k = 1
352 else: # it's an intermediate node that has no job attached; pass the enqueue operation
353 k = enqueue_children(child_node) # ... recursively down the tree
354 elif len(children) == 1: # if the only child is a barrier then pass the enqueue operation
355 k = enqueue_children(child_node) # ... recursively down the tree
356 else: # park the barrier node within the (still closed) barrier for the time being
357 assert node.mut.barrier is None
358 node.mut.barrier = child_node
359 k = 0
360 node.mut.pending += min(1, k)
361 n += k
362 assert n >= 0
363 return n
365 def on_job_completion_with_barriers(node: TreeNode, no_skip: bool) -> None:
366 """After successful completion, enqueues children, opens barriers + propagates completion upwards."""
367 if no_skip:
368 enqueue_children(node) # make child datasets available for start of processing
369 else: # job completed without success
370 tmp = node # ... thus, opening the barrier shall always do nothing in node and its ancestors
371 while tmp is not None:
372 tmp.mut.barrier = immutable_empty_barrier
373 tmp = tmp.parent
374 assert node.mut.pending >= 0
375 while node.mut.pending == 0: # have all jobs in subtree of current node completed?
376 if no_skip: # ... if so open the barrier, if it exists, and enqueue jobs waiting on it
377 if not (node.mut.barrier is None or node.mut.barrier is immutable_empty_barrier):
378 node.mut.pending += min(1, enqueue_children(node.mut.barrier))
379 node.mut.barrier = immutable_empty_barrier
380 if node.mut.pending > 0: # did opening of barrier cause jobs to be enqueued in subtree?
381 break # ... if so we aren't quite done yet with this subtree
382 if node.parent is None:
383 break # we've reached the root node
384 node = node.parent # recurse up the tree to propagate completion upward
385 node.mut.pending -= 1 # mark subtree as completed
386 assert node.mut.pending >= 0
388 assert barriers_enabled
389 on_job_completion_with_barriers(done_future_node, no_skip)
390 # endwhile submit_datasets()
391 assert len(priority_queue) == 0
392 assert len(todo_futures) == 0
393 assert len(future_to_node) == 0
394 return failed