Coverage for bzfs_main / util / parallel_tasktree_policy.py: 100%
48 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Policy layer for the generic parallel task tree scheduling algorithm.
17Purpose: Provide bzfs-specific behavior on top of the policy-free generic ``parallel_tasktree`` scheduling algorithm:
18retries, skip-on-error modes (fail/dataset/tree), and logging.
20Assumptions: Callers provide a thread-safe ``process_dataset(dataset, tid, Retry) -> bool``. Dataset list is sorted and
21unique (enforced by tests).
23Design rationale: Keep scheduling generic and reusable while concentrating error handling and side-effects here. This module
24exposes a stable API for callers like ``bzfs`` and ``bzfs_jobrunner``.
25"""
27from __future__ import (
28 annotations,
29)
30import logging
31import os
32import subprocess
33import threading
34import time
35from concurrent.futures import (
36 Future,
37)
38from typing import (
39 Callable,
40)
42from bzfs_main.util.parallel_tasktree import (
43 CompletionCallback,
44 CompletionCallbackResult,
45 ParallelTaskTree,
46)
47from bzfs_main.util.retry import (
48 Retry,
49 RetryOptions,
50 RetryPolicy,
51 call_with_retries,
52)
53from bzfs_main.util.utils import (
54 dry,
55 human_readable_duration,
56)
59def process_datasets_in_parallel_and_fault_tolerant(
60 log: logging.Logger,
61 datasets: list[str], # (sorted) list of datasets to process
62 process_dataset: Callable[
63 [str, str, Retry], bool # lambda: dataset, tid, Retry; return False to skip subtree; must be thread-safe
64 ],
65 skip_tree_on_error: Callable[[str], bool], # lambda: dataset # called on error; return True to skip subtree on error
66 skip_on_error: str = "fail",
67 max_workers: int = os.cpu_count() or 1,
68 interval_nanos: Callable[
69 [int, str, int], int
70 ] = lambda last_update_nanos, dataset, submit_count: 0, # optionally spread tasks out over time; e.g. for jitter
71 termination_event: threading.Event | None = None, # optional event to request early async termination
72 termination_handler: Callable[[], None] = lambda: None,
73 task_name: str = "Task",
74 enable_barriers: bool | None = None, # for testing only; None means 'auto-detect'
75 append_exception: Callable[[BaseException, str, str], None] = lambda ex, task, dataset: None, # called on nonfatal error
76 retry_policy: RetryPolicy | None = None,
77 retry_options: RetryOptions = RetryOptions(), # noqa: B008
78 dry_run: bool = False,
79 is_test_mode: bool = False,
80) -> bool: # returns True if any dataset processing failed, False if all succeeded
81 """Runs datasets in parallel with retries and skip policy.
83 Purpose: Adapt the generic engine to bzfs needs by wrapping the worker function with retries and determining skip/fail
84 behavior on completion.
86 Assumptions: ``skip_on_error`` is one of {"fail","dataset","tree"}. ``skip_tree_on_error(dataset)`` returns True if
87 subtree should be skipped.
89 Design rationale: The completion callback runs in the main thread, enabling safe cancellation of in-flight futures for
90 fail-fast while keeping worker threads free of policy decisions.
91 """
92 assert callable(process_dataset)
93 assert callable(skip_tree_on_error)
94 termination_event = threading.Event() if termination_event is None else termination_event
95 assert "%" not in task_name
96 assert callable(append_exception)
97 retry_policy = RetryPolicy.no_retries() if retry_policy is None else retry_policy
98 len_datasets: int = len(datasets)
99 is_debug: bool = log.isEnabledFor(logging.DEBUG)
101 def _process_dataset(dataset: str, submit_count: int) -> CompletionCallback:
102 """Wrapper around process_dataset(); adds retries plus a callback determining if to fail or skip subtree on error."""
103 tid: str = f"{submit_count}/{len_datasets}"
104 start_time_nanos: int = time.monotonic_ns()
105 exception = None
106 no_skip: bool = False
107 try:
108 no_skip = call_with_retries(
109 fn=lambda retry: process_dataset(dataset, tid, retry),
110 policy=retry_policy,
111 config=retry_options.config,
112 giveup=retry_options.giveup,
113 after_attempt=retry_options.after_attempt,
114 log=log,
115 )
116 except (subprocess.CalledProcessError, subprocess.TimeoutExpired, SystemExit, UnicodeDecodeError) as e:
117 exception = e # may be reraised later
118 finally:
119 if is_debug:
120 elapsed_duration: str = human_readable_duration(time.monotonic_ns() - start_time_nanos)
121 log.debug(dry(f"{tid} {task_name} done: %s took %s", dry_run), dataset, elapsed_duration)
123 def _completion_callback(todo_futures: set[Future[CompletionCallback]]) -> CompletionCallbackResult:
124 """CompletionCallback determining if to fail or skip subtree on error; Runs in the (single) main thread as part
125 of the coordination loop."""
126 nonlocal no_skip
127 fail: bool = False
128 if exception is not None:
129 fail = True
130 if skip_on_error == "fail" or termination_event.is_set():
131 for todo_future in todo_futures:
132 todo_future.cancel()
133 termination_handler()
134 raise exception
135 no_skip = not (skip_on_error == "tree" or skip_tree_on_error(dataset))
136 log.error("%s", exception)
137 append_exception(exception, task_name, dataset)
138 return CompletionCallbackResult(no_skip=no_skip, fail=fail)
140 return _completion_callback
142 tasktree: ParallelTaskTree = ParallelTaskTree(
143 log=log,
144 datasets=datasets,
145 process_dataset=_process_dataset,
146 max_workers=max_workers,
147 interval_nanos=interval_nanos,
148 termination_event=termination_event,
149 enable_barriers=enable_barriers,
150 is_test_mode=is_test_mode,
151 )
152 return tasktree.process_datasets_in_parallel()