Coverage for bzfs_main / util / retry.py: 100%
247 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Customizable generic retry support using jittered exponential backoff with cap.
17Purpose:
18--------
19- Provide a reusable retry helper for transient failures using customizable policy, config and callbacks.
20- Centralize backoff, jitter, logging and metrics behavior while keeping call sites compact.
21- Prevent accidental retries: the loop retries only when the developer explicitly raises a ``RetryableError``, which reduces
22 the risk of retrying non-idempotent operations.
23- Provide a thread-safe, fast implementation; avoid shared RNG contention.
24- Avoid unnecessary complexity and add zero dependencies beyond the Python standard library. Everything you need is in this
25 single Python file.
27Usage:
28------
29- Wrap work in a callable ``fn(retry: Retry)`` and therein raise ``RetryableError`` for failures that should be retried.
30- Construct a policy via ``RetryPolicy(...)`` that specifies how ``RetryableError`` shall be retried.
31- Invoke ``call_with_retries(fn=fn, policy=policy, log=logger)`` with a standard logging.Logger
32- On success, the result of calling ``fn`` is returned.
33- On exhaustion, call_with_retries() either re-raises the last underlying ``RetryableError.__cause__``, or raises
34 ``RetryError`` (wrapping the last ``RetryableError``), like so:
35 - if ``RetryPolicy.reraise`` is True and the last ``RetryableError.__cause__`` is not None, re-raise the last
36 ``RetryableError.__cause__`` with its original traceback.
37 - Otherwise, raise ``RetryError`` (wrapping the last ``RetryableError``, preserving its ``__cause__`` chain).
38 - The default is ``RetryPolicy.reraise=True``.
40Advanced Configuration:
41-----------------------
42- Tune ``RetryPolicy`` parameters to control maximum retries, sleep bounds, and elapsed-time budget.
43- Use ``RetryConfig`` to control termination events, and logging settings.
44- Set ``log=None`` to disable logging, or customize ``info_loglevel`` / ``warning_loglevel`` for structured logs.
45- Pass ``termination_event`` via ``RetryConfig`` to support async cancellation between attempts.
46- Supply a ``giveup(AttemptOutcome)`` callback to stop retrying based on domain-specific logic (for example, error/status
47 codes or parsing stderr), including time-aware decisions or decisions based on the previous N most recent AttemptOutcome
48 objects (via AttemptOutcome.retry.previous_outcomes)
50Observability:
51--------------
52- Supply an ``after_attempt(AttemptOutcome)`` callback to collect per-attempt metrics such as success flag,
53 exhausted/terminated state, attempt number, total elapsed duration (in nanoseconds), sleep duration (in nanoseconds), etc.
54- ``AttemptOutcome.result`` is either the successful result or the most recent ``RetryableError``, enabling integration with
55 metrics and tracing systems without coupling the retry loop to any specific backend.
56- Supply ``after_attempt`` to customize logging 100%, if necessary, in which case also set
57 ``RetryConfig.enable_logging=False``.
59Expert Configuration:
60---------------------
61- Set ``RetryPolicy.max_previous_outcomes > 0`` to pass the N most recent AttemptOutcome objects to callbacks (default is 0).
62- If ``RetryPolicy.max_previous_outcomes > 0``, you can use ``RetryableError(..., attachment=...)`` to carry domain-specific
63 state from a failed attempt to the next attempt via ``retry.previous_outcomes``. This pattern helps if attempt N+1 is a
64 function of attempt N or all prior attempts (e.g., switching endpoints or resuming from an offset).
65- Set ``backoff_strategy(retry, curr_max_sleep_nanos, rng, elapsed_nanos, retryable_error)`` to plug in a custom backoff
66 algorithm (e.g., decorrelated-jitter). The default is full-jitter exponential backoff with cap (aka industry standard).
67- Use ``RetryOptions`` as a 'bag of knobs' configuration template for functions that shall be retried in similar ways.
68- Or package up all knobs plus a ``fn(retry: Retry)`` function into a self-contained auto-retrying higher level function by
69 constructing a ``RetryOptions`` object (which is a ``Callable`` function itself).
70- To keep calling code retry-transparent, set ``RetryPolicy.reraise=True`` (the default) *and* raise retryable failures as
71 ``raise RetryableError(...) from exc``. Client code now won't notice whether call_with_retries is used or not.
72- To make exhaustion observable to calling code, set ``RetryPolicy.reraise=False``: call_with_retries() now always raises
73 ``RetryError`` (wrapping the last ``RetryableError``) on exhaustion, so callers now catch ``RetryError`` and can inspect
74 the last underlying exception via ``err.outcome``, ``err.__cause__``, and even ``err.__cause__.__cause__`` when present.
76Example Usage:
77--------------
78 import logging
79 from bzfs_main.util.retry import Retry, RetryPolicy, RetryableError, call_with_retries
81 def unreliable_operation(retry: Retry) -> str:
82 try:
83 if retry.count < 3:
84 raise ValueError("temporary failure connecting to foo.example.com")
85 return "ok"
86 except ValueError as exc:
87 # Preserve the underlying cause for correct error propagation and logging
88 raise RetryableError("temporary failure", display_msg="connect") from exc
90 retry_policy = RetryPolicy(
91 max_retries=10,
92 min_sleep_secs=0,
93 initial_max_sleep_secs=0.125,
94 max_sleep_secs=10,
95 max_elapsed_secs=60,
96 )
97 log = logging.getLogger(__name__)
98 result: str = call_with_retries(fn=unreliable_operation, policy=retry_policy, log=log)
99 print(result)
101 # Sample log output:
102 # INFO:Retrying connect [1/10] in 8.79ms ...
103 # INFO:Retrying connect [2/10] in 60.1ms ...
104 # INFO:Retrying connect [3/10] in 192ms ...
105 # ok
107Background:
108-----------
109For background on exponential backoff and jitter, see for example
110https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter
111"""
113from __future__ import (
114 annotations,
115)
116import argparse
117import dataclasses
118import logging
119import random
120import threading
121import time
122from collections.abc import (
123 Mapping,
124 Sequence,
125)
126from dataclasses import (
127 dataclass,
128)
129from logging import (
130 Logger,
131)
132from typing import (
133 Any,
134 Callable,
135 Final,
136 Generic,
137 NoReturn,
138 TypeVar,
139 final,
140)
142from bzfs_main.util.utils import (
143 human_readable_duration,
144)
146#############################################################################
147_T = TypeVar("_T")
150def _after_attempt(_outcome: AttemptOutcome) -> None:
151 """Default implementation does nothing."""
154def _giveup(_outcome: AttemptOutcome) -> str:
155 """Default implementation never gives up."""
156 return ""
159def call_with_retries(
160 fn: Callable[[Retry], _T], # typically a lambda
161 policy: RetryPolicy,
162 config: RetryConfig | None = None,
163 giveup: Callable[[AttemptOutcome], str] = _giveup, # stop retrying based on domain-specific logic
164 after_attempt: Callable[[AttemptOutcome], None] = _after_attempt, # e.g. record metrics
165 log: Logger | None = None,
166) -> _T:
167 """Runs the function ``fn`` and returns its result; retries on failure as indicated by policy and config; thread-safe.
169 On exhaustion, call_with_retries() either re-raises the last underlying ``RetryableError.__cause__``, or raises
170 ``RetryError`` (wrapping the last ``RetryableError``), like so:
171 - if ``RetryPolicy.reraise`` is True and the last ``RetryableError.__cause__`` is not None, re-raise the last
172 ``RetryableError.__cause__`` with its original traceback.
173 - Otherwise, raise ``RetryError`` (wrapping the last ``RetryableError``, preserving its ``__cause__`` chain).
174 - The default is ``RetryPolicy.reraise=True``.
175 """
176 config = _DEFAULT_RETRY_CONFIG if config is None else config
177 curr_max_sleep_nanos: int = policy.initial_max_sleep_nanos
178 retry_count: int = 0
179 rng: random.Random | None = None
180 previous_outcomes: tuple[AttemptOutcome, ...] = () # for safety pass *immutable* deque to callbacks
181 start_time_nanos: Final[int] = time.monotonic_ns()
182 while True:
183 giveup_reason: str = ""
184 retry: Retry = Retry(retry_count, start_time_nanos, policy, config, previous_outcomes)
185 try:
186 result: _T = fn(retry) # Call the target function and supply retry attempt number and other metadata
187 if after_attempt is not _after_attempt:
188 outcome: AttemptOutcome = AttemptOutcome(
189 retry, True, False, False, giveup_reason, time.monotonic_ns() - start_time_nanos, 0, result, log
190 )
191 after_attempt(outcome)
192 return result
193 except RetryableError as retryable_error:
194 elapsed_nanos: int = time.monotonic_ns() - start_time_nanos
195 if retry_count == 0 and retryable_error.retry_immediately_once:
196 sleep_nanos: int = 0 # retry once immediately without backoff
197 else: # jitter: default backoff_strategy picks random sleep_nanos in [min_sleep_nanos, curr_max_sleep_nanos]
198 rng = _thread_local_rng() if rng is None else rng
199 sleep_nanos, curr_max_sleep_nanos = policy.backoff_strategy(
200 retry, curr_max_sleep_nanos, rng, elapsed_nanos, retryable_error
201 )
202 assert sleep_nanos >= 0
203 assert curr_max_sleep_nanos >= 0
205 format_duration: Callable[[int], str] = config.format_duration # lambda: nanos
206 termination_event: threading.Event | None = config.termination_event
207 outcome = AttemptOutcome(
208 retry, False, False, False, giveup_reason, elapsed_nanos, sleep_nanos, retryable_error, log
209 )
210 will_retry: bool = False
211 if (
212 retry_count < policy.max_retries
213 and elapsed_nanos < policy.max_elapsed_nanos
214 and (termination_event is None or not termination_event.is_set())
215 and not (giveup_reason := giveup(outcome))
216 ):
217 will_retry = True
218 retry_count += 1
219 loglevel: int = config.info_loglevel
220 if log is not None and config.enable_logging and log.isEnabledFor(loglevel):
221 m1: str = config.format_msg(config.display_msg, retryable_error)
222 m2: str = config.format_pair(retry_count, policy.max_retries)
223 log.log(loglevel, "%s", f"{m1}{m2} in {format_duration(sleep_nanos)}{config.dots}", extra=config.extra)
224 after_attempt(outcome)
225 _sleep(sleep_nanos, termination_event)
226 else:
227 sleep_nanos = 0
229 is_terminated: bool = termination_event is not None and termination_event.is_set()
230 if is_terminated or not will_retry:
231 if (
232 policy.max_retries > 0
233 and log is not None
234 and config.enable_logging
235 and log.isEnabledFor(config.warning_loglevel)
236 and not is_terminated
237 ):
238 reason: str = f"{giveup_reason}; " if giveup_reason else ""
239 log.log(
240 config.warning_loglevel,
241 "%s",
242 f"{config.format_msg(config.display_msg, retryable_error)}"
243 f"exhausted; giving up because {reason}the last "
244 f"{config.format_pair(retry_count, policy.max_retries)} retries across "
245 f"{config.format_pair(format_duration(elapsed_nanos), format_duration(policy.max_elapsed_nanos))} "
246 "failed",
247 exc_info=retryable_error if config.exc_info else None,
248 stack_info=config.stack_info,
249 extra=config.extra,
250 )
251 outcome = AttemptOutcome(
252 retry, False, True, is_terminated, giveup_reason, elapsed_nanos, sleep_nanos, retryable_error, log
253 )
254 after_attempt(outcome)
255 cause: BaseException | None = retryable_error.__cause__
256 if policy.reraise and cause is not None:
257 raise cause.with_traceback(cause.__traceback__) # noqa: B904 intentional re-raise without chaining
258 else:
259 raise RetryError(outcome) from retryable_error
261 n = policy.max_previous_outcomes
262 if n > 0: # outcome will be passed to next attempt via Retry.previous_outcomes
263 if len(outcome.retry.previous_outcomes) > 0:
264 outcome = outcome.copy(retry=retry.copy(previous_outcomes=())) # detach to reduce memory footprint
265 previous_outcomes = previous_outcomes[len(previous_outcomes) - n + 1 :] + (outcome,) # *immutable* deque
266 del outcome # help gc
269def _sleep(sleep_nanos: int, termination_event: threading.Event | None) -> None:
270 if sleep_nanos > 0:
271 if termination_event is None:
272 time.sleep(sleep_nanos / 1_000_000_000)
273 else:
274 termination_event.wait(sleep_nanos / 1_000_000_000) # allow early wakeup on async termination
277#############################################################################
278class RetryableError(Exception):
279 """Indicates that the task that caused the underlying exception can be retried and might eventually succeed;
280 ``call_with_retries()`` will pass this exception to callbacks via ``AttemptOutcome.result``."""
282 def __init__(
283 self, message: str, display_msg: object = None, retry_immediately_once: bool = False, attachment: object = None
284 ) -> None:
285 super().__init__(message)
286 self.display_msg: object = display_msg # for logging
287 self.retry_immediately_once: bool = retry_immediately_once # retry once immediately without backoff?
289 self.attachment: object = attachment # domain specific info passed to next attempt via Retry.previous_outcomes if
290 # RetryPolicy.max_previous_outcomes > 0. This helps when retrying is not just 'try again later', but
291 # 'try again differently based on what just happened'.
292 # Examples: switching network endpoints, adjusting per-attempt timeouts, capping retries by error-class, resuming
293 # with a token/offset, maintaining failure history for this invocation of call_with_retries().
294 # Example: 'cap retries to 3 for ECONNREFUSED but 12 for ETIMEDOUT' via attachment=collections.Counter
296 def display_msg_str(self) -> str:
297 """Returns the display_msg as a str; for logging."""
298 return "" if self.display_msg is None else str(self.display_msg)
301#############################################################################
302@dataclass
303@final
304class RetryError(Exception):
305 """Indicates that retries have been exhausted; the last RetryableError is in RetryError.__cause__."""
307 outcome: AttemptOutcome
308 """Metadata that describes why and how call_with_retries() gave up."""
311#############################################################################
312@dataclass(frozen=True)
313@final
314class Retry:
315 """Attempt metadata provided to callback functions; includes the current retry attempt number; immutable."""
317 count: int
318 """Attempt number, count=0 is the first attempt, count=1 is the second attempt aka first retry."""
320 start_time_nanos: int
321 """Value of time.monotonic_ns() at start of call_with_retries() invocation."""
323 policy: RetryPolicy = dataclasses.field(repr=False, compare=False)
324 """Policy that was passed into call_with_retries()."""
326 config: RetryConfig = dataclasses.field(repr=False, compare=False)
327 """Config that is used by call_with_retries()."""
329 previous_outcomes: Sequence[AttemptOutcome] = dataclasses.field(repr=False, compare=False)
330 """History/state of the N=max_previous_outcomes most recent outcomes for the current call_with_retries() invocation."""
332 def copy(self, **override_kwargs: Any) -> Retry:
333 """Creates a new object copying an existing one with the specified fields overridden for customization."""
334 return dataclasses.replace(self, **override_kwargs)
337#############################################################################
338@dataclass(frozen=True)
339@final
340class AttemptOutcome:
341 """Captures per-attempt state for ``after_attempt`` callbacks; immutable."""
343 retry: Retry
344 """Attempt metadata passed into fn(retry)."""
346 is_success: bool
347 """False if fn(retry) raised a RetryableError; True otherwise."""
349 is_exhausted: bool
350 """True if the loop is giving up retrying (possibly even due to is_terminated); False otherwise."""
352 is_terminated: bool
353 """True if termination_event has become set; False otherwise."""
355 giveup_reason: str
356 """Reason returned by giveup(); Empty string means giveup() was not called or giveup() decided to not give up."""
358 elapsed_nanos: int
359 """Total duration since the start of call_with_retries() invocation and end of this fn() attempt."""
361 sleep_nanos: int
362 """Duration of current sleep period."""
364 result: RetryableError | object = dataclasses.field(repr=False, compare=False)
365 """Result of fn(retry); a RetryableError on retryable failure or some other object on success."""
367 log: Logger | None = dataclasses.field(repr=False, compare=False)
368 """Logger that was passed into call_with_retries()."""
370 def copy(self, **override_kwargs: Any) -> AttemptOutcome:
371 """Creates a new outcome copying an existing one with the specified fields overridden for customization."""
372 return dataclasses.replace(self, **override_kwargs)
375#############################################################################
376def _full_jitter_backoff_strategy(
377 retry: Retry, curr_max_sleep_nanos: int, rand: random.Random, elapsed_nanos: int, retryable_error: RetryableError
378) -> tuple[int, int]:
379 """Full-jitter picks a random sleep_nanos duration from the range [min_sleep_nanos, curr_max_sleep_nanos] and applies
380 exponential backoff with cap to the next attempt."""
381 policy: RetryPolicy = retry.policy
382 if policy.min_sleep_nanos == curr_max_sleep_nanos:
383 sleep_nanos = curr_max_sleep_nanos # perf
384 else:
385 sleep_nanos = rand.randint(policy.min_sleep_nanos, curr_max_sleep_nanos) # nanos to delay until next attempt
386 curr_max_sleep_nanos = round(curr_max_sleep_nanos * policy.exponential_base) # exponential backoff
387 curr_max_sleep_nanos = min(curr_max_sleep_nanos, policy.max_sleep_nanos) # ... with cap for next attempt
388 return sleep_nanos, curr_max_sleep_nanos
391@dataclass(frozen=True)
392@final
393class RetryPolicy:
394 """Configuration controlling max retry counts and backoff delays for call_with_retries(); immutable.
396 By default works as follows: The maximum duration to sleep between retries initially starts with
397 ``initial_max_sleep_secs`` and doubles on each retry, up to the final maximum of ``max_sleep_secs``.
398 On each retry a random sleep duration in the range ``[min_sleep_secs, current max]`` is picked.
399 In a nutshell: ``0 <= min_sleep_secs <= initial_max_sleep_secs <= max_sleep_secs``
400 """
402 max_retries: int = 10
403 """The maximum number of times ``fn`` will be invoked additionally after the first attempt invocation; must be >= 0."""
405 min_sleep_secs: float = 0
406 """The minimum duration to sleep between retries ."""
408 initial_max_sleep_secs: float = 0.125
409 """The initial maximum duration to sleep between retries."""
411 max_sleep_secs: float = 10
412 """The final max duration to sleep between retries; 0 <= min_sleep_secs <= initial_max_sleep_secs <= max_sleep_secs."""
414 max_elapsed_secs: float = 60
415 """``fn`` will not be retried (or not retried anymore) once this much time has elapsed since the initial start of
416 call_with_retries()."""
418 exponential_base: float = 2
419 """Growth factor for backoff algorithm to calculate sleep duration; must be >= 1."""
421 max_elapsed_nanos: int = dataclasses.field(init=False, repr=False) # derived value
422 min_sleep_nanos: int = dataclasses.field(init=False, repr=False) # derived value
423 initial_max_sleep_nanos: int = dataclasses.field(init=False, repr=False) # derived value
424 max_sleep_nanos: int = dataclasses.field(init=False, repr=False) # derived value
426 backoff_strategy: Callable[[Retry, int, random.Random, int, RetryableError], tuple[int, int]] = dataclasses.field(
427 default=_full_jitter_backoff_strategy, repr=False # retry, curr_max_sleep_nanos, rng, elapsed_nanos, retryable_error
428 )
429 """Strategy that implements a backoff algorithm to reduce resource contention."""
431 reraise: bool = True
432 """On exhaustion, the default (``True``) is to re-raise the underlying exception when present."""
434 max_previous_outcomes: int = 0
435 """Pass the N=max_previous_outcomes most recent AttemptOutcome objects to callbacks."""
437 context: object = dataclasses.field(default=None, repr=False, compare=False)
438 """Optional domain specific info."""
440 @classmethod
441 def from_namespace(cls, args: argparse.Namespace) -> RetryPolicy:
442 """Factory that reads the policy from ArgumentParser via args."""
443 return RetryPolicy(
444 max_retries=getattr(args, "max_retries", 10),
445 min_sleep_secs=getattr(args, "retry_min_sleep_secs", 0),
446 initial_max_sleep_secs=getattr(args, "retry_initial_max_sleep_secs", 0.125),
447 max_sleep_secs=getattr(args, "retry_max_sleep_secs", 10),
448 max_elapsed_secs=getattr(args, "retry_max_elapsed_secs", 60),
449 exponential_base=getattr(args, "retry_exponential_base", 2),
450 backoff_strategy=getattr(args, "retry_backoff_strategy", _full_jitter_backoff_strategy),
451 reraise=getattr(args, "retry_reraise", True),
452 max_previous_outcomes=getattr(args, "retry_max_previous_outcomes", 0),
453 context=getattr(args, "retry_context", None),
454 )
456 @classmethod
457 def no_retries(cls) -> RetryPolicy:
458 """Returns a policy that never retries."""
459 return RetryPolicy(
460 max_retries=0,
461 min_sleep_secs=0,
462 initial_max_sleep_secs=0,
463 max_sleep_secs=0,
464 max_elapsed_secs=0,
465 )
467 def __post_init__(self) -> None: # compute derived values
468 object.__setattr__(self, "max_retries", max(0, self.max_retries))
469 object.__setattr__(self, "exponential_base", max(1, self.exponential_base))
470 object.__setattr__(self, "min_sleep_secs", max(0, self.min_sleep_secs))
471 object.__setattr__(self, "initial_max_sleep_secs", max(0, self.initial_max_sleep_secs))
472 object.__setattr__(self, "max_sleep_secs", max(0, self.max_sleep_secs))
473 object.__setattr__(self, "max_elapsed_secs", max(0, self.max_elapsed_secs))
474 object.__setattr__(self, "max_elapsed_nanos", int(self.max_elapsed_secs * 1_000_000_000))
475 min_sleep_nanos: int = int(self.min_sleep_secs * 1_000_000_000)
476 initial_max_sleep_nanos: int = int(self.initial_max_sleep_secs * 1_000_000_000)
477 max_sleep_nanos: int = int(self.max_sleep_secs * 1_000_000_000)
478 max_sleep_nanos = max(min_sleep_nanos, max_sleep_nanos)
479 initial_max_sleep_nanos = min(max_sleep_nanos, max(min_sleep_nanos, initial_max_sleep_nanos))
480 object.__setattr__(self, "min_sleep_nanos", min_sleep_nanos)
481 object.__setattr__(self, "initial_max_sleep_nanos", initial_max_sleep_nanos)
482 object.__setattr__(self, "max_sleep_nanos", max_sleep_nanos)
483 object.__setattr__(self, "max_previous_outcomes", max(0, self.max_previous_outcomes))
484 assert 0 <= self.min_sleep_nanos <= self.initial_max_sleep_nanos <= self.max_sleep_nanos
485 if not callable(self.backoff_strategy):
486 raise TypeError("RetryPolicy.backoff_strategy must be callable")
487 if not isinstance(self.reraise, bool):
488 raise TypeError("RetryPolicy.reraise must be bool")
490 def copy(self, **override_kwargs: Any) -> RetryPolicy:
491 """Creates a new policy copying an existing one with the specified fields overridden for customization."""
492 return dataclasses.replace(self, **override_kwargs)
495#############################################################################
496def _format_msg(display_msg: str, retryable_error: RetryableError) -> str: # thread-safe
497 msg = display_msg + " " if display_msg else ""
498 errmsg: str = retryable_error.display_msg_str()
499 msg = msg + errmsg + " " if errmsg else msg
500 msg = msg if msg else "Retrying "
501 return msg
504def _format_pair(first: object, second: object) -> str: # thread-safe
505 return f"[{first}/{second}]"
508@dataclass(frozen=True)
509@final
510class RetryConfig:
511 """Configures termination behavior and logging for call_with_retries(); all defaults work out of the box; immutable."""
513 termination_event: threading.Event | None = None # optionally allows for async cancellation
514 display_msg: str = "Retrying"
515 dots: str = " ..."
516 format_msg: Callable[[str, RetryableError], str] = _format_msg # lambda: display_msg, retryable_error
517 format_pair: Callable[[object, object], str] = _format_pair # lambda: first, second
518 format_duration: Callable[[int], str] = human_readable_duration # lambda: nanos
519 info_loglevel: int = logging.INFO # loglevel used when not giving up
520 warning_loglevel: int = logging.WARNING # loglevel used when giving up
521 enable_logging: bool = True # set to False to move logging aspect into after_attempt()
522 exc_info: bool = False # passed into Logger.log()
523 stack_info: bool = False # passed into Logger.log()
524 extra: Mapping[str, object] | None = dataclasses.field(default=None, repr=False, compare=False) # passed to Logger.log()
525 context: object = dataclasses.field(default=None, repr=False, compare=False) # optional domain specific info
527 def copy(self, **override_kwargs: Any) -> RetryConfig:
528 """Creates a new config copying an existing one with the specified fields overridden for customization."""
529 return dataclasses.replace(self, **override_kwargs)
532_DEFAULT_RETRY_CONFIG: Final[RetryConfig] = RetryConfig() # constant
535#############################################################################
536def _fn(_retry: Retry) -> NoReturn:
537 """Default implementation always raises."""
538 raise NotImplementedError("Provide fn when calling RetryOptions")
541@dataclass(frozen=True)
542@final
543class RetryOptions(Generic[_T]):
544 """Convenience class that aggregates all knobs for call_with_retries(); and is itself callable too; immutable."""
546 fn: Callable[[Retry], _T] = _fn
547 policy: RetryPolicy = RetryPolicy()
548 config: RetryConfig = RetryConfig()
549 giveup: Callable[[AttemptOutcome], str] = _giveup # stop retrying based on domain-specific logic
550 after_attempt: Callable[[AttemptOutcome], None] = _after_attempt # e.g. record metrics
551 log: Logger | None = None
553 def copy(self, **override_kwargs: Any) -> RetryOptions[_T]:
554 """Creates a new object copying an existing one with the specified fields overridden for customization."""
555 return dataclasses.replace(self, **override_kwargs)
557 def __call__(self) -> _T:
558 """Executes ``self.fn`` via the call_with_retries() retry loop using the stored parameters; thread-safe."""
559 return call_with_retries(
560 fn=self.fn,
561 policy=self.policy,
562 config=self.config,
563 giveup=self.giveup,
564 after_attempt=self.after_attempt,
565 log=self.log,
566 )
569#############################################################################
570@final
571class _ThreadLocalRNG(threading.local):
572 """Caches a per-thread random number generator."""
574 def __init__(self) -> None:
575 self.rng: random.Random | None = None
578_THREAD_LOCAL_RNG: Final[_ThreadLocalRNG] = _ThreadLocalRNG()
581def _thread_local_rng() -> random.Random:
582 """Returns a per-thread RNG for backoff jitter; for perf avoids locking and initializing a new random.Random() at high
583 frequency."""
584 threadlocal: _ThreadLocalRNG = _THREAD_LOCAL_RNG
585 rng: random.Random | None = threadlocal.rng
586 if rng is None:
587 rng = random.Random() # noqa: S311 jitter isn't security sensitive, and random.SystemRandom.randint() is slow
588 threadlocal.rng = rng
589 return rng