Coverage for bzfs_main/retry.py: 100%

67 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-07 04:44 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Generic retry support using jittered exponential backoff with cap. 

16 

17This module retries failing operations according to a configurable policy. It assumes transient errors may eventually succeed 

18and centralizes retry logic for consistency across callers. 

19""" 

20 

21from __future__ import ( 

22 annotations, 

23) 

24import argparse 

25import random 

26import threading 

27import time 

28from dataclasses import ( 

29 dataclass, 

30) 

31from logging import ( 

32 Logger, 

33) 

34from typing import ( 

35 Any, 

36 Callable, 

37 Final, 

38 TypeVar, 

39) 

40 

41from bzfs_main.utils import ( 

42 human_readable_duration, 

43) 

44 

45 

46############################################################################# 

47class RetryPolicy: 

48 """Configuration controlling retry counts and backoff delays; Immutable.""" 

49 

50 def __init__(self, args: argparse.Namespace) -> None: 

51 """Option values for retries; reads from ArgumentParser via args.""" 

52 # immutable variables: 

53 self.retries: Final[int] = args.retries 

54 self.min_sleep_secs: Final[float] = args.retry_min_sleep_secs 

55 self.initial_max_sleep_secs: Final[float] = args.retry_initial_max_sleep_secs 

56 self.max_sleep_secs: Final[float] = args.retry_max_sleep_secs 

57 self.max_elapsed_secs: Final[float] = args.retry_max_elapsed_secs 

58 self.max_elapsed_nanos: Final[int] = int(self.max_elapsed_secs * 1_000_000_000) 

59 min_sleep_nanos: int = int(self.min_sleep_secs * 1_000_000_000) 

60 initial_max_sleep_nanos: int = int(self.initial_max_sleep_secs * 1_000_000_000) 

61 max_sleep_nanos: int = int(self.max_sleep_secs * 1_000_000_000) 

62 min_sleep_nanos = max(1, min_sleep_nanos) 

63 max_sleep_nanos = max(min_sleep_nanos, max_sleep_nanos) 

64 initial_max_sleep_nanos = min(max_sleep_nanos, max(min_sleep_nanos, initial_max_sleep_nanos)) 

65 self.min_sleep_nanos: Final[int] = min_sleep_nanos 

66 self.initial_max_sleep_nanos: Final[int] = initial_max_sleep_nanos 

67 self.max_sleep_nanos: Final[int] = max_sleep_nanos 

68 assert 1 <= self.min_sleep_nanos <= self.initial_max_sleep_nanos <= self.max_sleep_nanos 

69 

70 def __repr__(self) -> str: 

71 """Return debug representation of retry parameters.""" 

72 return ( 

73 f"retries: {self.retries}, min_sleep_secs: {self.min_sleep_secs}, " 

74 f"initial_max_sleep_secs: {self.initial_max_sleep_secs}, max_sleep_secs: {self.max_sleep_secs}, " 

75 f"max_elapsed_secs: {self.max_elapsed_secs}" 

76 ) 

77 

78 @classmethod 

79 def no_retries(cls) -> RetryPolicy: 

80 """Returns a policy that never retries.""" 

81 return cls( 

82 argparse.Namespace( 

83 retries=0, 

84 retry_min_sleep_secs=0, 

85 retry_initial_max_sleep_secs=0, 

86 retry_max_sleep_secs=0, 

87 retry_max_elapsed_secs=0, 

88 ) 

89 ) 

90 

91 

92############################################################################# 

93T = TypeVar("T") 

94 

95 

96def run_with_retries( 

97 log: Logger, policy: RetryPolicy, termination_event: threading.Event, fn: Callable[..., T], *args: Any, **kwargs: Any 

98) -> T: # thread-safe 

99 """Runs the given function with the given arguments, and retries on failure as indicated by policy; The termination_event 

100 allows for early async cancellation of the retry loop.""" 

101 c_max_sleep_nanos: int = policy.initial_max_sleep_nanos 

102 retry_count: int = 0 

103 sysrandom: random.SystemRandom | None = None 

104 start_time_nanos: int = time.monotonic_ns() 

105 while True: 

106 try: 

107 return fn(*args, **kwargs, retry=Retry(retry_count)) # Call the target function with provided args 

108 except RetryableError as retryable_error: 

109 elapsed_nanos: int = time.monotonic_ns() - start_time_nanos 

110 will_retry: bool = False 

111 if retry_count < policy.retries and elapsed_nanos < policy.max_elapsed_nanos and not termination_event.is_set(): 

112 will_retry = True 

113 retry_count += 1 

114 if retryable_error.no_sleep and retry_count <= 1: 

115 log.info(f"Retrying [{retry_count}/{policy.retries}] immediately ...") 

116 else: # jitter: pick a random sleep duration within the range [min_sleep_nanos, c_max_sleep_nanos] as delay 

117 sysrandom = random.SystemRandom() if sysrandom is None else sysrandom 

118 sleep_nanos: int = sysrandom.randint(policy.min_sleep_nanos, c_max_sleep_nanos) 

119 log.info(f"Retrying [{retry_count}/{policy.retries}] in {human_readable_duration(sleep_nanos)} ...") 

120 termination_event.wait(sleep_nanos / 1_000_000_000) 

121 c_max_sleep_nanos = min(policy.max_sleep_nanos, 2 * c_max_sleep_nanos) # exponential backoff with cap 

122 if termination_event.is_set() or not will_retry: 

123 if policy.retries > 0: 

124 log.warning( 

125 f"Giving up because the last [{retry_count}/{policy.retries}] retries across " 

126 f"[{human_readable_duration(elapsed_nanos)}/{human_readable_duration(policy.max_elapsed_nanos)}] " 

127 "for the current request failed!" 

128 ) 

129 cause: BaseException | None = retryable_error.__cause__ 

130 assert cause is not None 

131 raise cause.with_traceback(cause.__traceback__) # noqa: B904 intentional re-raise of cause without chaining 

132 

133 

134############################################################################# 

135class RetryableError(Exception): 

136 """Indicates that the task that caused the underlying exception can be retried and might eventually succeed.""" 

137 

138 def __init__(self, message: str, no_sleep: bool = False) -> None: 

139 super().__init__(message) 

140 self.no_sleep: bool = no_sleep 

141 

142 

143############################################################################# 

144@dataclass(frozen=True) 

145class Retry: 

146 """The current retry attempt number provided to the callable.""" 

147 

148 count: int