Coverage for bzfs_main/retry.py: 100%

52 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-06 13:30 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Generic retry support using jittered exponential backoff with cap. 

16 

17This module retries failing operations according to a configurable policy. It assumes transient errors may eventually succeed 

18and centralizes retry logic for consistency across callers. 

19""" 

20 

21from __future__ import annotations 

22import argparse 

23import random 

24import time 

25from dataclasses import dataclass 

26from logging import Logger 

27from typing import ( 

28 Any, 

29 Callable, 

30 TypeVar, 

31) 

32 

33from bzfs_main.utils import human_readable_duration 

34 

35 

36############################################################################# 

37class RetryPolicy: 

38 """Configuration controlling retry counts and backoff delays.""" 

39 

40 def __init__(self, args: argparse.Namespace) -> None: 

41 """Option values for retries; reads from ArgumentParser via args.""" 

42 # immutable variables: 

43 self.retries: int = args.retries 

44 self.min_sleep_secs: float = args.retry_min_sleep_secs 

45 self.max_sleep_secs: float = args.retry_max_sleep_secs 

46 self.max_elapsed_secs: float = args.retry_max_elapsed_secs 

47 self.min_sleep_nanos: int = int(self.min_sleep_secs * 1_000_000_000) 

48 self.max_sleep_nanos: int = int(self.max_sleep_secs * 1_000_000_000) 

49 self.max_elapsed_nanos: int = int(self.max_elapsed_secs * 1_000_000_000) 

50 self.min_sleep_nanos = max(1, self.min_sleep_nanos) 

51 self.max_sleep_nanos = max(self.min_sleep_nanos, self.max_sleep_nanos) 

52 

53 def __repr__(self) -> str: 

54 """Return debug representation of retry parameters.""" 

55 return ( 

56 f"retries: {self.retries}, min_sleep_secs: {self.min_sleep_secs}, " 

57 f"max_sleep_secs: {self.max_sleep_secs}, max_elapsed_secs: {self.max_elapsed_secs}" 

58 ) 

59 

60 

61############################################################################# 

62T = TypeVar("T") 

63 

64 

65def run_with_retries(log: Logger, policy: RetryPolicy, fn: Callable[..., T], *args: Any, **kwargs: Any) -> T: 

66 """Runs the given function with the given arguments, and retries on failure as indicated by policy.""" 

67 max_sleep_mark: int = policy.min_sleep_nanos 

68 retry_count: int = 0 

69 sysrandom: random.SystemRandom | None = None 

70 start_time_nanos: int = time.monotonic_ns() 

71 while True: 

72 try: 

73 return fn(*args, **kwargs, retry=Retry(retry_count)) # Call the target function with provided args 

74 except RetryableError as retryable_error: 

75 elapsed_nanos: int = time.monotonic_ns() - start_time_nanos 

76 if retry_count < policy.retries and elapsed_nanos < policy.max_elapsed_nanos: 

77 retry_count += 1 

78 if retryable_error.no_sleep and retry_count <= 1: 

79 log.info(f"Retrying [{retry_count}/{policy.retries}] immediately ...") 

80 else: # jitter: pick a random sleep duration within the range [min_sleep_nanos, max_sleep_mark] as delay 

81 sysrandom = random.SystemRandom() if sysrandom is None else sysrandom 

82 sleep_nanos: int = sysrandom.randint(policy.min_sleep_nanos, max_sleep_mark) 

83 log.info(f"Retrying [{retry_count}/{policy.retries}] in {human_readable_duration(sleep_nanos)} ...") 

84 time.sleep(sleep_nanos / 1_000_000_000) 

85 max_sleep_mark = min(policy.max_sleep_nanos, 2 * max_sleep_mark) # exponential backoff with cap 

86 else: 

87 if policy.retries > 0: 

88 log.warning( 

89 f"Giving up because the last [{retry_count}/{policy.retries}] retries across " 

90 f"[{elapsed_nanos // 1_000_000_000}/{policy.max_elapsed_nanos // 1_000_000_000}] " 

91 "seconds for the current request failed!" 

92 ) 

93 assert retryable_error.__cause__ is not None 

94 raise retryable_error.__cause__ from None 

95 

96 

97############################################################################# 

98class RetryableError(Exception): 

99 """Indicates that the task that caused the underlying exception can be retried and might eventually succeed.""" 

100 

101 def __init__(self, message: str, no_sleep: bool = False) -> None: 

102 super().__init__(message) 

103 self.no_sleep: bool = no_sleep 

104 

105 

106############################################################################# 

107@dataclass(frozen=True) 

108class Retry: 

109 """The current retry attempt number provided to the callable.""" 

110 

111 count: int