Coverage for bzfs_main / util / parallel_tasktree_policy.py: 100%
44 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 10:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 10:16 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Policy layer for the generic parallel task tree scheduling algorithm.
17Purpose: Provide bzfs-specific behavior on top of the policy-free generic ``parallel_tasktree`` scheduling algorithm:
18retries, skip-on-error modes (fail/dataset/tree), and logging.
20Assumptions: Callers provide a thread-safe ``process_dataset(dataset, tid, Retry) -> bool`` callback. Dataset list is sorted
21and contains no duplicate entries (enforced by tests).
23Design rationale: Keep scheduling generic and reusable while concentrating error handling and side-effects here. This module
24exposes a stable API for callers like ``bzfs`` and ``bzfs_jobrunner``.
25"""
27from __future__ import (
28 annotations,
29)
30import logging
31import os
32import subprocess
33from concurrent.futures import (
34 Future,
35)
36from typing import (
37 Callable,
38)
40from bzfs_main.util.parallel_tasktree import (
41 CompletionCallback,
42 CompletionCallbackResult,
43 ParallelTaskTree,
44)
45from bzfs_main.util.retry import (
46 Retry,
47 RetryPolicy,
48 RetryTemplate,
49 RetryTerminationError,
50)
51from bzfs_main.util.utils import (
52 TaskTiming,
53 dry,
54 human_readable_duration,
55)
58def process_datasets_in_parallel_and_fault_tolerant(
59 *,
60 log: logging.Logger,
61 datasets: list[str], # (sorted) list of datasets to process
62 process_dataset: Callable[
63 [str, str, Retry], bool # lambda: dataset, tid, Retry; return False to skip subtree; must be thread-safe
64 ],
65 skip_tree_on_error: Callable[[str], bool], # lambda: dataset # called on error; return True to skip subtree on error
66 skip_on_error: str = "fail",
67 max_workers: int = os.cpu_count() or 1,
68 interval_nanos: Callable[
69 [int, str, int], int
70 ] = lambda last_update_nanos, dataset, submit_count: 0, # optionally spread tasks out over time; e.g. for jitter
71 timing: TaskTiming = TaskTiming(), # noqa: B008
72 termination_handler: Callable[[], None] = lambda: None,
73 task_name: str = "Task",
74 enable_barriers: bool | None = None, # for testing only; None means 'auto-detect'
75 append_exception: Callable[[BaseException, str, str], None] = lambda ex, task, dataset: None, # called on nonfatal error
76 retry_template: RetryTemplate[bool] = RetryTemplate[bool]().copy(policy=RetryPolicy.no_retries()), # noqa: B008
77 dry_run: bool = False,
78 is_test_mode: bool = False,
79) -> bool: # returns True if any dataset processing failed, False if all succeeded
80 """Runs datasets in parallel with retries and skip policy.
82 Purpose: Adapt the generic engine to bzfs needs by wrapping the worker function with retries and determining skip/fail
83 behavior on completion.
85 Assumptions: ``skip_on_error`` is one of {"fail","dataset","tree"}. ``skip_tree_on_error(dataset)`` returns True if
86 subtree should be skipped.
88 Design rationale: The completion callback runs in the main thread, enabling safe cancellation of in-flight futures for
89 fail-fast while keeping worker threads free of policy decisions.
90 """
91 assert callable(process_dataset)
92 assert callable(skip_tree_on_error)
93 assert "%" not in task_name
94 assert callable(append_exception)
95 len_datasets: int = len(datasets)
96 is_debug: bool = log.isEnabledFor(logging.DEBUG)
98 def _process_dataset(dataset: str, submit_count: int) -> CompletionCallback:
99 """Wrapper around process_dataset(); adds retries plus a callback determining if to fail or skip subtree on error."""
100 tid: str = f"{submit_count}/{len_datasets}"
101 start_time_nanos: int = timing.monotonic_ns()
102 exception = None
103 no_skip: bool = False
104 try:
105 no_skip = retry_template.call_with_retries(
106 fn=lambda retry: process_dataset(dataset, tid, retry),
107 log=log,
108 )
109 except (
110 subprocess.CalledProcessError,
111 subprocess.TimeoutExpired,
112 SystemExit,
113 UnicodeDecodeError,
114 RetryTerminationError,
115 ) as e:
116 exception = e # may be reraised later
117 finally:
118 if is_debug:
119 elapsed_duration: str = human_readable_duration(timing.monotonic_ns() - start_time_nanos)
120 log.debug(dry(f"{tid} {task_name} done: %s took %s", dry_run), dataset, elapsed_duration)
122 def _completion_callback(todo_futures: set[Future[CompletionCallback]]) -> CompletionCallbackResult:
123 """CompletionCallback determining if to fail or skip subtree on error; Runs in the (single) main thread as part
124 of the coordination loop."""
125 nonlocal no_skip
126 fail: bool = False
127 if exception is not None:
128 fail = True
129 if skip_on_error == "fail" or timing.is_terminated():
130 for todo_future in todo_futures:
131 todo_future.cancel()
132 termination_handler()
133 raise exception
134 no_skip = not (skip_on_error == "tree" or skip_tree_on_error(dataset))
135 log.error("%s", exception)
136 append_exception(exception, task_name, dataset)
137 return CompletionCallbackResult(no_skip=no_skip, fail=fail)
139 return _completion_callback
141 tasktree: ParallelTaskTree = ParallelTaskTree(
142 log=log,
143 datasets=datasets,
144 process_dataset=_process_dataset,
145 max_workers=max_workers,
146 interval_nanos=interval_nanos,
147 timing=timing,
148 enable_barriers=enable_barriers,
149 is_test_mode=is_test_mode,
150 )
151 return tasktree.process_datasets_in_parallel()