Coverage for bzfs_main/parallel_tasktree_policy.py: 100%

48 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-07 04:44 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Policy layer for the generic parallel task tree scheduling algorithm. 

16 

17Purpose: Provide bzfs-specific behavior on top of the policy-free generic ``parallel_tasktree`` scheduling algorithm: 

18retries, skip-on-error modes (fail/dataset/tree), and logging. 

19 

20Assumptions: Callers provide a thread-safe ``process_dataset(dataset, tid, Retry) -> bool``. Dataset list is sorted and 

21unique (enforced by tests). 

22 

23Design rationale: Keep scheduling generic and reusable while concentrating error handling and side-effects here. This module 

24exposes a stable API for callers like ``bzfs`` and ``bzfs_jobrunner``. 

25""" 

26 

27from __future__ import ( 

28 annotations, 

29) 

30import logging 

31import os 

32import subprocess 

33import threading 

34import time 

35from concurrent.futures import ( 

36 Future, 

37) 

38from logging import ( 

39 Logger, 

40) 

41from typing import ( 

42 Callable, 

43) 

44 

45from bzfs_main.parallel_tasktree import ( 

46 CompletionCallback, 

47 CompletionCallbackResult, 

48 process_datasets_in_parallel, 

49) 

50from bzfs_main.retry import ( 

51 Retry, 

52 RetryPolicy, 

53 run_with_retries, 

54) 

55from bzfs_main.utils import ( 

56 dry, 

57 human_readable_duration, 

58) 

59 

60 

61def process_datasets_in_parallel_and_fault_tolerant( 

62 log: Logger, 

63 datasets: list[str], # (sorted) list of datasets to process 

64 process_dataset: Callable[ 

65 [str, str, Retry], bool # lambda: dataset, tid, Retry; return False to skip subtree; must be thread-safe 

66 ], 

67 skip_tree_on_error: Callable[[str], bool], # lambda: dataset # called on error; return True to skip subtree on error 

68 skip_on_error: str = "fail", 

69 max_workers: int = os.cpu_count() or 1, 

70 interval_nanos: Callable[ 

71 [int, str, int], int 

72 ] = lambda last_update_nanos, dataset, submitted_count: 0, # optionally spread tasks out over time; e.g. for jitter 

73 termination_event: threading.Event | None = None, # optional event to request early async termination 

74 termination_handler: Callable[[], None] = lambda: None, 

75 task_name: str = "Task", 

76 enable_barriers: bool | None = None, # for testing only; None means 'auto-detect' 

77 append_exception: Callable[[BaseException, str, str], None] = lambda ex, task, dataset: None, # called on nonfatal error 

78 retry_policy: RetryPolicy | None = None, 

79 dry_run: bool = False, 

80 is_test_mode: bool = False, 

81) -> bool: # returns True if any dataset processing failed, False if all succeeded 

82 """Runs datasets in parallel with retries and skip policy. 

83 

84 Purpose: Adapt the generic engine to bzfs needs by wrapping the worker function with retries and determining skip/fail 

85 behavior on completion. 

86 

87 Assumptions: ``skip_on_error`` is one of {"fail","dataset","tree"}. ``skip_tree_on_error(dataset)`` returns True if 

88 subtree should be skipped. 

89 

90 Design rationale: The completion callback runs in the main thread, enabling safe cancellation of in-flight futures for 

91 fail-fast while keeping worker threads free of policy decisions. 

92 """ 

93 assert callable(process_dataset) 

94 assert callable(skip_tree_on_error) 

95 termination_event = threading.Event() if termination_event is None else termination_event 

96 assert "%" not in task_name 

97 assert callable(append_exception) 

98 retry_policy = RetryPolicy.no_retries() if retry_policy is None else retry_policy 

99 len_datasets: int = len(datasets) 

100 is_debug: bool = log.isEnabledFor(logging.DEBUG) 

101 

102 def _process_dataset(dataset: str, submitted_count: int) -> CompletionCallback: 

103 """Wrapper around process_dataset(); adds retries plus a callback determining if to fail or skip subtree on error.""" 

104 tid: str = f"{submitted_count}/{len_datasets}" 

105 start_time_nanos: int = time.monotonic_ns() 

106 exception = None 

107 no_skip: bool = False 

108 try: 

109 no_skip = run_with_retries(log, retry_policy, termination_event, process_dataset, dataset, tid) 

110 except (subprocess.CalledProcessError, subprocess.TimeoutExpired, SystemExit, UnicodeDecodeError) as e: 

111 exception = e # may be reraised later 

112 finally: 

113 if is_debug: 

114 elapsed_duration: str = human_readable_duration(time.monotonic_ns() - start_time_nanos) 

115 log.debug(dry(f"{tid} {task_name} done: %s took %s", dry_run), dataset, elapsed_duration) 

116 

117 def _completion_callback(todo_futures: set[Future[CompletionCallback]]) -> CompletionCallbackResult: 

118 """CompletionCallback determining if to fail or skip subtree on error; Runs in the (single) main thread as part 

119 of the coordination loop.""" 

120 nonlocal no_skip 

121 fail: bool = False 

122 if exception is not None: 

123 fail = True 

124 if skip_on_error == "fail" or termination_event.is_set(): 

125 for todo_future in todo_futures: 

126 todo_future.cancel() 

127 termination_handler() 

128 raise exception 

129 no_skip = not (skip_on_error == "tree" or skip_tree_on_error(dataset)) 

130 log.error("%s", exception) 

131 append_exception(exception, task_name, dataset) 

132 return CompletionCallbackResult(no_skip=no_skip, fail=fail) 

133 

134 return _completion_callback 

135 

136 return process_datasets_in_parallel( 

137 log=log, 

138 datasets=datasets, 

139 process_dataset=_process_dataset, 

140 max_workers=max_workers, 

141 interval_nanos=interval_nanos, 

142 termination_event=termination_event, 

143 enable_barriers=enable_barriers, 

144 is_test_mode=is_test_mode, 

145 )