Coverage for bzfs_main / util / parallel_tasktree_policy.py: 100%

44 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-24 10:16 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Policy layer for the generic parallel task tree scheduling algorithm. 

16 

17Purpose: Provide bzfs-specific behavior on top of the policy-free generic ``parallel_tasktree`` scheduling algorithm: 

18retries, skip-on-error modes (fail/dataset/tree), and logging. 

19 

20Assumptions: Callers provide a thread-safe ``process_dataset(dataset, tid, Retry) -> bool`` callback. Dataset list is sorted 

21and contains no duplicate entries (enforced by tests). 

22 

23Design rationale: Keep scheduling generic and reusable while concentrating error handling and side-effects here. This module 

24exposes a stable API for callers like ``bzfs`` and ``bzfs_jobrunner``. 

25""" 

26 

27from __future__ import ( 

28 annotations, 

29) 

30import logging 

31import os 

32import subprocess 

33from concurrent.futures import ( 

34 Future, 

35) 

36from typing import ( 

37 Callable, 

38) 

39 

40from bzfs_main.util.parallel_tasktree import ( 

41 CompletionCallback, 

42 CompletionCallbackResult, 

43 ParallelTaskTree, 

44) 

45from bzfs_main.util.retry import ( 

46 Retry, 

47 RetryPolicy, 

48 RetryTemplate, 

49 RetryTerminationError, 

50) 

51from bzfs_main.util.utils import ( 

52 TaskTiming, 

53 dry, 

54 human_readable_duration, 

55) 

56 

57 

58def process_datasets_in_parallel_and_fault_tolerant( 

59 *, 

60 log: logging.Logger, 

61 datasets: list[str], # (sorted) list of datasets to process 

62 process_dataset: Callable[ 

63 [str, str, Retry], bool # lambda: dataset, tid, Retry; return False to skip subtree; must be thread-safe 

64 ], 

65 skip_tree_on_error: Callable[[str], bool], # lambda: dataset # called on error; return True to skip subtree on error 

66 skip_on_error: str = "fail", 

67 max_workers: int = os.cpu_count() or 1, 

68 interval_nanos: Callable[ 

69 [int, str, int], int 

70 ] = lambda last_update_nanos, dataset, submit_count: 0, # optionally spread tasks out over time; e.g. for jitter 

71 timing: TaskTiming = TaskTiming(), # noqa: B008 

72 termination_handler: Callable[[], None] = lambda: None, 

73 task_name: str = "Task", 

74 enable_barriers: bool | None = None, # for testing only; None means 'auto-detect' 

75 append_exception: Callable[[BaseException, str, str], None] = lambda ex, task, dataset: None, # called on nonfatal error 

76 retry_template: RetryTemplate[bool] = RetryTemplate[bool]().copy(policy=RetryPolicy.no_retries()), # noqa: B008 

77 dry_run: bool = False, 

78 is_test_mode: bool = False, 

79) -> bool: # returns True if any dataset processing failed, False if all succeeded 

80 """Runs datasets in parallel with retries and skip policy. 

81 

82 Purpose: Adapt the generic engine to bzfs needs by wrapping the worker function with retries and determining skip/fail 

83 behavior on completion. 

84 

85 Assumptions: ``skip_on_error`` is one of {"fail","dataset","tree"}. ``skip_tree_on_error(dataset)`` returns True if 

86 subtree should be skipped. 

87 

88 Design rationale: The completion callback runs in the main thread, enabling safe cancellation of in-flight futures for 

89 fail-fast while keeping worker threads free of policy decisions. 

90 """ 

91 assert callable(process_dataset) 

92 assert callable(skip_tree_on_error) 

93 assert "%" not in task_name 

94 assert callable(append_exception) 

95 len_datasets: int = len(datasets) 

96 is_debug: bool = log.isEnabledFor(logging.DEBUG) 

97 

98 def _process_dataset(dataset: str, submit_count: int) -> CompletionCallback: 

99 """Wrapper around process_dataset(); adds retries plus a callback determining if to fail or skip subtree on error.""" 

100 tid: str = f"{submit_count}/{len_datasets}" 

101 start_time_nanos: int = timing.monotonic_ns() 

102 exception = None 

103 no_skip: bool = False 

104 try: 

105 no_skip = retry_template.call_with_retries( 

106 fn=lambda retry: process_dataset(dataset, tid, retry), 

107 log=log, 

108 ) 

109 except ( 

110 subprocess.CalledProcessError, 

111 subprocess.TimeoutExpired, 

112 SystemExit, 

113 UnicodeDecodeError, 

114 RetryTerminationError, 

115 ) as e: 

116 exception = e # may be reraised later 

117 finally: 

118 if is_debug: 

119 elapsed_duration: str = human_readable_duration(timing.monotonic_ns() - start_time_nanos) 

120 log.debug(dry(f"{tid} {task_name} done: %s took %s", dry_run), dataset, elapsed_duration) 

121 

122 def _completion_callback(todo_futures: set[Future[CompletionCallback]]) -> CompletionCallbackResult: 

123 """CompletionCallback determining if to fail or skip subtree on error; Runs in the (single) main thread as part 

124 of the coordination loop.""" 

125 nonlocal no_skip 

126 fail: bool = False 

127 if exception is not None: 

128 fail = True 

129 if skip_on_error == "fail" or timing.is_terminated(): 

130 for todo_future in todo_futures: 

131 todo_future.cancel() 

132 termination_handler() 

133 raise exception 

134 no_skip = not (skip_on_error == "tree" or skip_tree_on_error(dataset)) 

135 log.error("%s", exception) 

136 append_exception(exception, task_name, dataset) 

137 return CompletionCallbackResult(no_skip=no_skip, fail=fail) 

138 

139 return _completion_callback 

140 

141 tasktree: ParallelTaskTree = ParallelTaskTree( 

142 log=log, 

143 datasets=datasets, 

144 process_dataset=_process_dataset, 

145 max_workers=max_workers, 

146 interval_nanos=interval_nanos, 

147 timing=timing, 

148 enable_barriers=enable_barriers, 

149 is_test_mode=is_test_mode, 

150 ) 

151 return tasktree.process_datasets_in_parallel()