Coverage for bzfs_main/parallel_tasktree_policy.py: 100%
48 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Policy layer for the generic parallel task tree scheduling algorithm.
17Purpose: Provide bzfs-specific behavior on top of the policy-free generic ``parallel_tasktree`` scheduling algorithm:
18retries, skip-on-error modes (fail/dataset/tree), and logging.
20Assumptions: Callers provide a thread-safe ``process_dataset(dataset, tid, Retry) -> bool``. Dataset list is sorted and
21unique (enforced by tests).
23Design rationale: Keep scheduling generic and reusable while concentrating error handling and side-effects here. This module
24exposes a stable API for callers like ``bzfs`` and ``bzfs_jobrunner``.
25"""
27from __future__ import (
28 annotations,
29)
30import logging
31import os
32import subprocess
33import threading
34import time
35from concurrent.futures import (
36 Future,
37)
38from logging import (
39 Logger,
40)
41from typing import (
42 Callable,
43)
45from bzfs_main.parallel_tasktree import (
46 CompletionCallback,
47 CompletionCallbackResult,
48 process_datasets_in_parallel,
49)
50from bzfs_main.retry import (
51 Retry,
52 RetryPolicy,
53 run_with_retries,
54)
55from bzfs_main.utils import (
56 dry,
57 human_readable_duration,
58)
61def process_datasets_in_parallel_and_fault_tolerant(
62 log: Logger,
63 datasets: list[str], # (sorted) list of datasets to process
64 process_dataset: Callable[
65 [str, str, Retry], bool # lambda: dataset, tid, Retry; return False to skip subtree; must be thread-safe
66 ],
67 skip_tree_on_error: Callable[[str], bool], # lambda: dataset # called on error; return True to skip subtree on error
68 skip_on_error: str = "fail",
69 max_workers: int = os.cpu_count() or 1,
70 interval_nanos: Callable[
71 [int, str, int], int
72 ] = lambda last_update_nanos, dataset, submitted_count: 0, # optionally spread tasks out over time; e.g. for jitter
73 termination_event: threading.Event | None = None, # optional event to request early async termination
74 termination_handler: Callable[[], None] = lambda: None,
75 task_name: str = "Task",
76 enable_barriers: bool | None = None, # for testing only; None means 'auto-detect'
77 append_exception: Callable[[BaseException, str, str], None] = lambda ex, task, dataset: None, # called on nonfatal error
78 retry_policy: RetryPolicy | None = None,
79 dry_run: bool = False,
80 is_test_mode: bool = False,
81) -> bool: # returns True if any dataset processing failed, False if all succeeded
82 """Runs datasets in parallel with retries and skip policy.
84 Purpose: Adapt the generic engine to bzfs needs by wrapping the worker function with retries and determining skip/fail
85 behavior on completion.
87 Assumptions: ``skip_on_error`` is one of {"fail","dataset","tree"}. ``skip_tree_on_error(dataset)`` returns True if
88 subtree should be skipped.
90 Design rationale: The completion callback runs in the main thread, enabling safe cancellation of in-flight futures for
91 fail-fast while keeping worker threads free of policy decisions.
92 """
93 assert callable(process_dataset)
94 assert callable(skip_tree_on_error)
95 termination_event = threading.Event() if termination_event is None else termination_event
96 assert "%" not in task_name
97 assert callable(append_exception)
98 retry_policy = RetryPolicy.no_retries() if retry_policy is None else retry_policy
99 len_datasets: int = len(datasets)
100 is_debug: bool = log.isEnabledFor(logging.DEBUG)
102 def _process_dataset(dataset: str, submitted_count: int) -> CompletionCallback:
103 """Wrapper around process_dataset(); adds retries plus a callback determining if to fail or skip subtree on error."""
104 tid: str = f"{submitted_count}/{len_datasets}"
105 start_time_nanos: int = time.monotonic_ns()
106 exception = None
107 no_skip: bool = False
108 try:
109 no_skip = run_with_retries(log, retry_policy, termination_event, process_dataset, dataset, tid)
110 except (subprocess.CalledProcessError, subprocess.TimeoutExpired, SystemExit, UnicodeDecodeError) as e:
111 exception = e # may be reraised later
112 finally:
113 if is_debug:
114 elapsed_duration: str = human_readable_duration(time.monotonic_ns() - start_time_nanos)
115 log.debug(dry(f"{tid} {task_name} done: %s took %s", dry_run), dataset, elapsed_duration)
117 def _completion_callback(todo_futures: set[Future[CompletionCallback]]) -> CompletionCallbackResult:
118 """CompletionCallback determining if to fail or skip subtree on error; Runs in the (single) main thread as part
119 of the coordination loop."""
120 nonlocal no_skip
121 fail: bool = False
122 if exception is not None:
123 fail = True
124 if skip_on_error == "fail" or termination_event.is_set():
125 for todo_future in todo_futures:
126 todo_future.cancel()
127 termination_handler()
128 raise exception
129 no_skip = not (skip_on_error == "tree" or skip_tree_on_error(dataset))
130 log.error("%s", exception)
131 append_exception(exception, task_name, dataset)
132 return CompletionCallbackResult(no_skip=no_skip, fail=fail)
134 return _completion_callback
136 return process_datasets_in_parallel(
137 log=log,
138 datasets=datasets,
139 process_dataset=_process_dataset,
140 max_workers=max_workers,
141 interval_nanos=interval_nanos,
142 termination_event=termination_event,
143 enable_barriers=enable_barriers,
144 is_test_mode=is_test_mode,
145 )