Coverage for bzfs_main / util / parallel_tasktree_policy.py: 100%

48 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-22 08:03 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Policy layer for the generic parallel task tree scheduling algorithm. 

16 

17Purpose: Provide bzfs-specific behavior on top of the policy-free generic ``parallel_tasktree`` scheduling algorithm: 

18retries, skip-on-error modes (fail/dataset/tree), and logging. 

19 

20Assumptions: Callers provide a thread-safe ``process_dataset(dataset, tid, Retry) -> bool``. Dataset list is sorted and 

21unique (enforced by tests). 

22 

23Design rationale: Keep scheduling generic and reusable while concentrating error handling and side-effects here. This module 

24exposes a stable API for callers like ``bzfs`` and ``bzfs_jobrunner``. 

25""" 

26 

27from __future__ import ( 

28 annotations, 

29) 

30import logging 

31import os 

32import subprocess 

33import threading 

34import time 

35from concurrent.futures import ( 

36 Future, 

37) 

38from typing import ( 

39 Callable, 

40) 

41 

42from bzfs_main.util.parallel_tasktree import ( 

43 CompletionCallback, 

44 CompletionCallbackResult, 

45 ParallelTaskTree, 

46) 

47from bzfs_main.util.retry import ( 

48 Retry, 

49 RetryOptions, 

50 RetryPolicy, 

51 call_with_retries, 

52) 

53from bzfs_main.util.utils import ( 

54 dry, 

55 human_readable_duration, 

56) 

57 

58 

59def process_datasets_in_parallel_and_fault_tolerant( 

60 log: logging.Logger, 

61 datasets: list[str], # (sorted) list of datasets to process 

62 process_dataset: Callable[ 

63 [str, str, Retry], bool # lambda: dataset, tid, Retry; return False to skip subtree; must be thread-safe 

64 ], 

65 skip_tree_on_error: Callable[[str], bool], # lambda: dataset # called on error; return True to skip subtree on error 

66 skip_on_error: str = "fail", 

67 max_workers: int = os.cpu_count() or 1, 

68 interval_nanos: Callable[ 

69 [int, str, int], int 

70 ] = lambda last_update_nanos, dataset, submit_count: 0, # optionally spread tasks out over time; e.g. for jitter 

71 termination_event: threading.Event | None = None, # optional event to request early async termination 

72 termination_handler: Callable[[], None] = lambda: None, 

73 task_name: str = "Task", 

74 enable_barriers: bool | None = None, # for testing only; None means 'auto-detect' 

75 append_exception: Callable[[BaseException, str, str], None] = lambda ex, task, dataset: None, # called on nonfatal error 

76 retry_policy: RetryPolicy | None = None, 

77 retry_options: RetryOptions = RetryOptions(), # noqa: B008 

78 dry_run: bool = False, 

79 is_test_mode: bool = False, 

80) -> bool: # returns True if any dataset processing failed, False if all succeeded 

81 """Runs datasets in parallel with retries and skip policy. 

82 

83 Purpose: Adapt the generic engine to bzfs needs by wrapping the worker function with retries and determining skip/fail 

84 behavior on completion. 

85 

86 Assumptions: ``skip_on_error`` is one of {"fail","dataset","tree"}. ``skip_tree_on_error(dataset)`` returns True if 

87 subtree should be skipped. 

88 

89 Design rationale: The completion callback runs in the main thread, enabling safe cancellation of in-flight futures for 

90 fail-fast while keeping worker threads free of policy decisions. 

91 """ 

92 assert callable(process_dataset) 

93 assert callable(skip_tree_on_error) 

94 termination_event = threading.Event() if termination_event is None else termination_event 

95 assert "%" not in task_name 

96 assert callable(append_exception) 

97 retry_policy = RetryPolicy.no_retries() if retry_policy is None else retry_policy 

98 len_datasets: int = len(datasets) 

99 is_debug: bool = log.isEnabledFor(logging.DEBUG) 

100 

101 def _process_dataset(dataset: str, submit_count: int) -> CompletionCallback: 

102 """Wrapper around process_dataset(); adds retries plus a callback determining if to fail or skip subtree on error.""" 

103 tid: str = f"{submit_count}/{len_datasets}" 

104 start_time_nanos: int = time.monotonic_ns() 

105 exception = None 

106 no_skip: bool = False 

107 try: 

108 no_skip = call_with_retries( 

109 fn=lambda retry: process_dataset(dataset, tid, retry), 

110 policy=retry_policy, 

111 config=retry_options.config, 

112 giveup=retry_options.giveup, 

113 after_attempt=retry_options.after_attempt, 

114 log=log, 

115 ) 

116 except (subprocess.CalledProcessError, subprocess.TimeoutExpired, SystemExit, UnicodeDecodeError) as e: 

117 exception = e # may be reraised later 

118 finally: 

119 if is_debug: 

120 elapsed_duration: str = human_readable_duration(time.monotonic_ns() - start_time_nanos) 

121 log.debug(dry(f"{tid} {task_name} done: %s took %s", dry_run), dataset, elapsed_duration) 

122 

123 def _completion_callback(todo_futures: set[Future[CompletionCallback]]) -> CompletionCallbackResult: 

124 """CompletionCallback determining if to fail or skip subtree on error; Runs in the (single) main thread as part 

125 of the coordination loop.""" 

126 nonlocal no_skip 

127 fail: bool = False 

128 if exception is not None: 

129 fail = True 

130 if skip_on_error == "fail" or termination_event.is_set(): 

131 for todo_future in todo_futures: 

132 todo_future.cancel() 

133 termination_handler() 

134 raise exception 

135 no_skip = not (skip_on_error == "tree" or skip_tree_on_error(dataset)) 

136 log.error("%s", exception) 

137 append_exception(exception, task_name, dataset) 

138 return CompletionCallbackResult(no_skip=no_skip, fail=fail) 

139 

140 return _completion_callback 

141 

142 tasktree: ParallelTaskTree = ParallelTaskTree( 

143 log=log, 

144 datasets=datasets, 

145 process_dataset=_process_dataset, 

146 max_workers=max_workers, 

147 interval_nanos=interval_nanos, 

148 termination_event=termination_event, 

149 enable_barriers=enable_barriers, 

150 is_test_mode=is_test_mode, 

151 ) 

152 return tasktree.process_datasets_in_parallel()