Coverage for bzfs_main/retry.py: 100%
67 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Generic retry support using jittered exponential backoff with cap.
17This module retries failing operations according to a configurable policy. It assumes transient errors may eventually succeed
18and centralizes retry logic for consistency across callers.
19"""
21from __future__ import (
22 annotations,
23)
24import argparse
25import random
26import threading
27import time
28from dataclasses import (
29 dataclass,
30)
31from logging import (
32 Logger,
33)
34from typing import (
35 Any,
36 Callable,
37 Final,
38 TypeVar,
39)
41from bzfs_main.utils import (
42 human_readable_duration,
43)
46#############################################################################
47class RetryPolicy:
48 """Configuration controlling retry counts and backoff delays; Immutable."""
50 def __init__(self, args: argparse.Namespace) -> None:
51 """Option values for retries; reads from ArgumentParser via args."""
52 # immutable variables:
53 self.retries: Final[int] = args.retries
54 self.min_sleep_secs: Final[float] = args.retry_min_sleep_secs
55 self.initial_max_sleep_secs: Final[float] = args.retry_initial_max_sleep_secs
56 self.max_sleep_secs: Final[float] = args.retry_max_sleep_secs
57 self.max_elapsed_secs: Final[float] = args.retry_max_elapsed_secs
58 self.max_elapsed_nanos: Final[int] = int(self.max_elapsed_secs * 1_000_000_000)
59 min_sleep_nanos: int = int(self.min_sleep_secs * 1_000_000_000)
60 initial_max_sleep_nanos: int = int(self.initial_max_sleep_secs * 1_000_000_000)
61 max_sleep_nanos: int = int(self.max_sleep_secs * 1_000_000_000)
62 min_sleep_nanos = max(1, min_sleep_nanos)
63 max_sleep_nanos = max(min_sleep_nanos, max_sleep_nanos)
64 initial_max_sleep_nanos = min(max_sleep_nanos, max(min_sleep_nanos, initial_max_sleep_nanos))
65 self.min_sleep_nanos: Final[int] = min_sleep_nanos
66 self.initial_max_sleep_nanos: Final[int] = initial_max_sleep_nanos
67 self.max_sleep_nanos: Final[int] = max_sleep_nanos
68 assert 1 <= self.min_sleep_nanos <= self.initial_max_sleep_nanos <= self.max_sleep_nanos
70 def __repr__(self) -> str:
71 """Return debug representation of retry parameters."""
72 return (
73 f"retries: {self.retries}, min_sleep_secs: {self.min_sleep_secs}, "
74 f"initial_max_sleep_secs: {self.initial_max_sleep_secs}, max_sleep_secs: {self.max_sleep_secs}, "
75 f"max_elapsed_secs: {self.max_elapsed_secs}"
76 )
78 @classmethod
79 def no_retries(cls) -> RetryPolicy:
80 """Returns a policy that never retries."""
81 return cls(
82 argparse.Namespace(
83 retries=0,
84 retry_min_sleep_secs=0,
85 retry_initial_max_sleep_secs=0,
86 retry_max_sleep_secs=0,
87 retry_max_elapsed_secs=0,
88 )
89 )
92#############################################################################
93T = TypeVar("T")
96def run_with_retries(
97 log: Logger, policy: RetryPolicy, termination_event: threading.Event, fn: Callable[..., T], *args: Any, **kwargs: Any
98) -> T: # thread-safe
99 """Runs the given function with the given arguments, and retries on failure as indicated by policy; The termination_event
100 allows for early async cancellation of the retry loop."""
101 c_max_sleep_nanos: int = policy.initial_max_sleep_nanos
102 retry_count: int = 0
103 sysrandom: random.SystemRandom | None = None
104 start_time_nanos: int = time.monotonic_ns()
105 while True:
106 try:
107 return fn(*args, **kwargs, retry=Retry(retry_count)) # Call the target function with provided args
108 except RetryableError as retryable_error:
109 elapsed_nanos: int = time.monotonic_ns() - start_time_nanos
110 will_retry: bool = False
111 if retry_count < policy.retries and elapsed_nanos < policy.max_elapsed_nanos and not termination_event.is_set():
112 will_retry = True
113 retry_count += 1
114 if retryable_error.no_sleep and retry_count <= 1:
115 log.info(f"Retrying [{retry_count}/{policy.retries}] immediately ...")
116 else: # jitter: pick a random sleep duration within the range [min_sleep_nanos, c_max_sleep_nanos] as delay
117 sysrandom = random.SystemRandom() if sysrandom is None else sysrandom
118 sleep_nanos: int = sysrandom.randint(policy.min_sleep_nanos, c_max_sleep_nanos)
119 log.info(f"Retrying [{retry_count}/{policy.retries}] in {human_readable_duration(sleep_nanos)} ...")
120 termination_event.wait(sleep_nanos / 1_000_000_000)
121 c_max_sleep_nanos = min(policy.max_sleep_nanos, 2 * c_max_sleep_nanos) # exponential backoff with cap
122 if termination_event.is_set() or not will_retry:
123 if policy.retries > 0:
124 log.warning(
125 f"Giving up because the last [{retry_count}/{policy.retries}] retries across "
126 f"[{human_readable_duration(elapsed_nanos)}/{human_readable_duration(policy.max_elapsed_nanos)}] "
127 "for the current request failed!"
128 )
129 cause: BaseException | None = retryable_error.__cause__
130 assert cause is not None
131 raise cause.with_traceback(cause.__traceback__) # noqa: B904 intentional re-raise of cause without chaining
134#############################################################################
135class RetryableError(Exception):
136 """Indicates that the task that caused the underlying exception can be retried and might eventually succeed."""
138 def __init__(self, message: str, no_sleep: bool = False) -> None:
139 super().__init__(message)
140 self.no_sleep: bool = no_sleep
143#############################################################################
144@dataclass(frozen=True)
145class Retry:
146 """The current retry attempt number provided to the callable."""
148 count: int