Coverage for bzfs_main / util / retry.py: 100%
513 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 12:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 12:49 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Customizable generic retry framework; defaults to jittered exponential backoff with cap unless specified otherwise.
17Purpose:
18--------
19- Provide a reusable retry helper for transient failures using customizable policy and callbacks.
20- Centralize backoff, jitter, logging and metrics behavior while keeping call sites compact.
21- Prevent accidental retries: the loop retries only when the developer explicitly raises a ``RetryableError``, which reduces
22 the risk of retrying non-idempotent operations.
23- Provide a thread-safe, fast implementation; avoid shared RNG contention.
24- Provide both sync and async API, both with the same semantics, except async API awaits ``fn`` and uses non-blocking sleep.
25- Avoid unnecessary complexity and add zero dependencies beyond the Python standard library. Everything you need is in this
26 single Python file.
28Usage:
29------
30- Wrap work in a callable ``fn(retry: Retry)`` and therein raise ``RetryableError`` for failures that should be retried.
31- Construct a policy via ``RetryPolicy(...)`` that specifies how ``RetryableError`` shall be retried.
32- Invoke ``call_with_retries(fn=fn, policy=policy, log=logger)`` or call_with_retries_async with a standard logging.Logger.
33- On success, the result of calling ``fn`` is returned.
34- By default on exhaustion, call_with_retries* either re-raises the last underlying ``RetryableError.__cause__``, or raises
35 ``RetryError`` (wrapping the last ``RetryableError``), like so:
36 - if ``RetryPolicy.reraise`` is True and the last ``RetryableError.__cause__`` is not None, re-raise the last
37 ``RetryableError.__cause__`` with its original traceback.
38 - Otherwise, raise ``RetryError`` (wrapping the last ``RetryableError``, preserving its ``__cause__`` chain).
39 - The default is ``RetryPolicy.reraise=True``.
41Advanced Configuration:
42-----------------------
43- Tune ``RetryPolicy`` parameters to control maximum retries, sleep bounds, elapsed-time budget, logging, etc.
44- Use ``RetryPolicy.config: RetryConfig`` to control logging settings.
45- Set ``log=None`` to disable logging, or customize ``info_loglevel`` / ``warning_loglevel`` for structured logs.
46- Supply a ``giveup(AttemptOutcome)`` callback to stop retrying based on domain-specific logic (for example, decisions based
47 on time budget/quota or the previous N most recent AttemptOutcome objects (via AttemptOutcome.retry.previous_outcomes).
48- Use the ``any_giveup()`` / ``all_giveup()`` helper to consult more than one callback handler in ``giveup(AttemptOutcome)``.
49- Supply an ``on_exhaustion(AttemptOutcome)`` callback to customize behavior when giving up; it may raise an error or return
50 a fallback value.
52Observability:
53--------------
54- Supply an ``after_attempt(AttemptOutcome)`` callback to collect per-attempt metrics such as success flag,
55 exhausted/terminated state, attempt number, total elapsed duration (in nanoseconds), sleep duration (in nanoseconds), etc.
56- ``AttemptOutcome.result`` is either the successful result or the most recent ``RetryableError``, enabling integration with
57 metrics and tracing systems without coupling the retry loop to any specific backend.
58- Supply an ``after_attempt(AttemptOutcome)`` callback to customize logging 100%, if necessary.
59- Use the ``multi_after_attempt()`` helper to invoke more than one callback handler in ``after_attempt(AttemptOutcome)``.
61Expert Configuration:
62---------------------
63- Supply a ``backoff(BackoffContext)`` callback to plug in a custom backoff algorithm (e.g., decorrelated-jitter or
64 retry-after HTTP 429). The default is full-jitter exponential backoff with cap (aka industry standard).
65- Supply a ``before_attempt(Retry)`` callback for optional rate limiting or other forms of internal backpressure.
66- Supply a ``on_retryable_error(AttemptOutcome)`` callback, e.g. to count failures (RetryableError) caught by the retry loop.
67- Set ``RetryPolicy.max_previous_outcomes > 0`` to pass the N most recent AttemptOutcome objects to callbacks (default is 0).
68- If ``RetryPolicy.max_previous_outcomes > 0``, you can use ``RetryableError(..., attachment=...)`` to carry domain-specific
69 state from a failed attempt to the next attempt via ``retry.previous_outcomes``. This pattern helps if attempt N+1 is a
70 function of attempt N or all prior attempts (e.g., switching endpoints or resuming from an offset).
71- Use ``RetryTemplate`` as a 'bag of knobs' configuration template for functions that shall be retried in similar ways.
72- Or package up all knobs plus a ``fn(retry: Retry)`` function into a self-contained auto-retrying higher level function by
73 constructing a ``RetryTemplate`` object (which is a ``Callable`` function itself).
74- To keep calling code retry-transparent, set ``RetryPolicy.reraise=True`` (the default) *and* raise retryable failures as
75 ``raise RetryableError(...) from exc``. Client code now won't notice whether call_with_retries* is used or not.
76- To make exhaustion observable to calling code, set ``RetryPolicy.reraise=False``: by default call_with_retries* now always
77 raises ``RetryError`` (wrapping the last ``RetryableError``) on exhaustion, so callers now catch ``RetryError`` and can
78 inspect the last underlying exception via ``err.outcome``, ``err.__cause__``, and even ``err.__cause__.__cause__`` when
79 present.
80- Set ``RetryPolicy.timing`` to customize reading the current monotonic time, sleeping and optional async termination.
81- The callback API is powerful enough to easily plug in advanced retry algorithms such as:
82 - Google SRE Client-Side Adaptive Throttling - https://sre.google/sre-book/handling-overload/
83 - gRPC retry throttling - https://grpc.io/docs/guides/retry/
84 - AWS SDK adaptive retry mode - https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html
85 - Circuit breakers - https://martinfowler.com/bliki/CircuitBreaker.html (e.g. via `pybreaker` third-party library)
86 - Rate limiting with Fixed Window, Moving Window, and Sliding Windows (e.g. via `limits` third-party library)
87 - Various example backoff strategies such as decorrelated-jitter or retry-after HTTP 429 "Too Many Requests" responses,
88 etc can be found in test_retry_examples.py.
90Example Usage:
91--------------
92 import logging
93 from bzfs_main.util.retry import Retry, RetryPolicy, RetryableError, call_with_retries
95 def unreliable_operation(retry: Retry) -> str:
96 try:
97 if retry.count < 3:
98 raise ValueError("temporary failure connecting to foo.example.com")
99 return "ok"
100 except ValueError as exc:
101 # Preserve the underlying cause for correct error propagation and logging
102 raise RetryableError(display_msg="connect") from exc
104 retry_policy = RetryPolicy(
105 max_retries=10,
106 min_sleep_secs=0,
107 initial_max_sleep_secs=0.125,
108 max_sleep_secs=10,
109 max_elapsed_secs=60,
110 )
111 log = logging.getLogger(__name__)
112 result: str = call_with_retries(fn=unreliable_operation, policy=retry_policy, log=log)
113 print(result)
115 # Sample log output:
116 # INFO:Retrying connect [1/10] in 8.79ms ...
117 # INFO:Retrying connect [2/10] in 90.1ms ...
118 # INFO:Retrying connect [3/10] in 372ms ...
119 # ok
121Background:
122-----------
123For background on exponential backoff and jitter, see for example
124https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter
125"""
127from __future__ import (
128 annotations,
129)
130import argparse
131import dataclasses
132import logging
133import random
134import threading
135import time
136from collections.abc import (
137 Awaitable,
138 Iterable,
139 Mapping,
140 Sequence,
141)
142from dataclasses import (
143 dataclass,
144)
145from typing import (
146 TYPE_CHECKING,
147 Any,
148 Callable,
149 Final,
150 Generic,
151 NamedTuple,
152 NoReturn,
153 TypeVar,
154 Union,
155 cast,
156 final,
157 overload,
158)
160if TYPE_CHECKING:
161 import asyncio
163from bzfs_main.util.utils import (
164 human_readable_duration,
165)
167# constants:
168INFINITY_MAX_RETRIES: Final[int] = 2**90 - 1 # a number that's essentially infinity for all practical retry purposes
171#############################################################################
172def full_jitter_backoff_strategy(context: BackoffContext) -> tuple[int, int]:
173 """Default implementation of ``backoff`` callback for call_with_retries(); computes delay time before next retry attempt,
174 after failure.
176 Full-jitter picks a random sleep_nanos duration from the range [min_sleep_nanos, curr_max_sleep_nanos] and applies
177 exponential backoff with cap to the next attempt; thread-safe. Typically, min_sleep_nanos is 0 and exponential_base is 2.
178 Example curr_max_sleep_nanos sequence: 125ms --> 250ms --> 500ms --> 1s --> 2s --> 4s --> 8s --> 10s --> 10s...
179 Full-jitter provides optimal balance between reducing server load and minimizing retry latency.
180 """
181 policy: RetryPolicy = context.retry.policy
182 curr_max_sleep_nanos: int = context.curr_max_sleep_nanos
183 if policy.min_sleep_nanos == curr_max_sleep_nanos:
184 sleep_nanos = curr_max_sleep_nanos # perf
185 else:
186 sleep_nanos = context.rng.randint(policy.min_sleep_nanos, curr_max_sleep_nanos) # nanos to delay until next attempt
187 curr_max_sleep_nanos = round(curr_max_sleep_nanos * policy.exponential_base) # exponential backoff
188 curr_max_sleep_nanos = min(curr_max_sleep_nanos, policy.max_sleep_nanos) # ... with cap for next attempt
189 return sleep_nanos, curr_max_sleep_nanos
192def no_giveup(outcome: AttemptOutcome) -> object | None:
193 """Default implementation of ``giveup`` callback for call_with_retries(); never gives up; returning anything other than
194 ``None`` indicates to give up retrying; thread-safe."""
195 return None # don't give up retrying
198def before_attempt_noop(retry: Retry) -> int:
199 """Default implementation of ``before_attempt`` callback for call_with_retries(); does nothing; thread-safe."""
200 return 0
203def after_attempt_log_failure(outcome: AttemptOutcome) -> None:
204 """Default implementation of ``after_attempt`` callback for call_with_retries(); performs simple logging of retry attempt
205 failures; thread-safe."""
206 retry: Retry = outcome.retry
207 policy: RetryPolicy = retry.policy
208 config: RetryConfig = policy.config
209 if outcome.is_success or retry.log is None or not config.enable_logging:
210 return
211 log: logging.Logger = retry.log
212 assert isinstance(outcome.result, RetryableError)
213 retryable_error: RetryableError = outcome.result
214 if not outcome.is_exhausted:
215 if log.isEnabledFor(config.info_loglevel): # Retrying X in Y ms ...
216 m1: str = config.format_msg(config.display_msg, retryable_error)
217 m2: str = config.format_pair(retry.count + 1, policy.max_retries)
218 m3: str = config.format_duration(outcome.sleep_nanos)
219 log.log(config.info_loglevel, "%s", f"{m1}{m2} in {m3}{config.dots}", extra=config.extra)
220 else:
221 if policy.max_retries > 0 and log.isEnabledFor(config.warning_loglevel) and not outcome.is_terminated:
222 reason: str = "" if outcome.giveup_reason is None else f"{outcome.giveup_reason}; "
223 format_duration: Callable[[int], str] = config.format_duration # lambda: nanos
224 log.log(
225 config.warning_loglevel,
226 "%s",
227 f"{config.format_msg(config.display_msg, retryable_error)}"
228 f"exhausted; giving up because {reason}the last "
229 f"{config.format_pair(retry.count, policy.max_retries)} retries across "
230 f"{config.format_pair(format_duration(outcome.elapsed_nanos), format_duration(policy.max_elapsed_nanos))} "
231 "failed",
232 exc_info=retryable_error if config.exc_info else None,
233 stack_info=config.stack_info,
234 extra=config.extra,
235 )
238def noop(outcome: AttemptOutcome) -> None:
239 """Default implementation of ``on_retryable_error`` callback for call_with_retries(); does nothing; thread-safe."""
242def on_exhaustion_raise(outcome: AttemptOutcome) -> NoReturn:
243 """Default implementation of ``on_exhaustion`` callback for call_with_retries(); always raises; thread-safe."""
244 assert outcome.is_exhausted
245 assert isinstance(outcome.result, RetryableError)
246 retryable_error: RetryableError = outcome.result
247 policy: RetryPolicy = outcome.retry.policy
248 cause: BaseException | None = retryable_error.__cause__
249 if policy.reraise and cause is not None:
250 raise cause.with_traceback(cause.__traceback__)
251 raise RetryError(outcome=outcome) from retryable_error
254def _update_previous_outcomes(
255 previous_outcomes: tuple[AttemptOutcome, ...], outcome: AttemptOutcome, policy: RetryPolicy, retry: Retry
256) -> tuple[AttemptOutcome, ...]:
257 """Computes value of previous_outcomes for next retry iteration."""
258 n: int = policy.max_previous_outcomes
259 if n > 0: # outcome will be passed to next attempt via Retry.previous_outcomes
260 if previous_outcomes: # detach to reduce memory footprint
261 outcome = outcome.copy(retry=retry.copy(previous_outcomes=()))
262 previous_outcomes = previous_outcomes[len(previous_outcomes) - n + 1 :] + (outcome,) # immutable deque
263 return previous_outcomes
266#############################################################################
267_T = TypeVar("_T")
270def call_with_retries(
271 fn: Callable[[Retry], _T], # typically a lambda; wraps work and raises RetryableError for failures that shall be retried
272 policy: RetryPolicy, # specifies how ``RetryableError`` shall be retried
273 *,
274 backoff: BackoffStrategy = full_jitter_backoff_strategy, # computes delay time before next retry attempt, after failure
275 giveup: Callable[[AttemptOutcome], object | None] = no_giveup, # stop retrying based on domain-specific logic, e.g. time
276 before_attempt: Callable[[Retry], int] = before_attempt_noop, # e.g. wait due to rate limiting or internal backpressure
277 after_attempt: Callable[[AttemptOutcome], None] = after_attempt_log_failure, # e.g. record metrics and/or custom logging
278 on_retryable_error: Callable[[AttemptOutcome], None] = noop, # e.g. count failures (RetryableError) caught by retry loop
279 on_exhaustion: Callable[[AttemptOutcome], _T] = on_exhaustion_raise, # raise error or return fallback value
280 log: logging.Logger | None = None, # set this to ``None`` to disable logging
281) -> _T:
282 """Runs the function ``fn`` and returns its result; retries on failure as indicated by policy; thread-safe.
284 By default on exhaustion, call_with_retries() either re-raises the last underlying ``RetryableError.__cause__``, or raises
285 ``RetryError`` (wrapping the last ``RetryableError``), like so:
286 - if ``RetryPolicy.reraise`` is True and the last ``RetryableError.__cause__`` is not None, re-raise the last
287 ``RetryableError.__cause__`` with its original traceback.
288 - Otherwise, raise ``RetryError`` (wrapping the last ``RetryableError``, preserving its ``__cause__`` chain).
289 - The default is ``RetryPolicy.reraise=True``.
291 On the exhaustion path, ``on_exhaustion`` will be called exactly once (after the final after_attempt). The default
292 implementation raises as described above; custom ``on_exhaustion`` impls may return a fallback value instead of an error.
293 """
294 rng: random.Random | None = None
295 retry_count: int = 0
296 idle_nanos: int = 0
297 curr_max_sleep_nanos: int = policy.initial_max_sleep_nanos
298 previous_outcomes: tuple[AttemptOutcome, ...] = () # for safety pass *immutable* deque to callbacks
299 timing: RetryTiming = policy.timing
300 sleep: Callable[[int, Retry], None] = timing.sleep
301 monotonic_ns: Callable[[], int] = timing.monotonic_ns
302 call_start_nanos: Final[int] = monotonic_ns()
303 while True:
304 before_attempt_nanos: int = monotonic_ns() if retry_count != 0 else call_start_nanos
305 prev: tuple[AttemptOutcome, ...] = previous_outcomes
306 retry: Retry = Retry(
307 retry_count, call_start_nanos, before_attempt_nanos, before_attempt_nanos, idle_nanos, policy, log, prev
308 )
309 try:
310 if before_attempt is not before_attempt_noop:
311 before_attempt_sleep_nanos: int = before_attempt(retry)
312 assert before_attempt_sleep_nanos >= 0, before_attempt_sleep_nanos
313 if before_attempt_sleep_nanos > 0:
314 sleep(before_attempt_sleep_nanos, retry) # e.g. wait due to rate limiting or internal backpressure
315 attempt_start_nanos: int = monotonic_ns()
316 idle_nanos += attempt_start_nanos - before_attempt_nanos
317 retry = Retry(
318 retry_count, call_start_nanos, before_attempt_nanos, attempt_start_nanos, idle_nanos, policy, log, prev
319 )
320 timing.on_before_attempt(retry)
321 result: _T = fn(retry) # Call the target function and supply retry attempt number and other metadata
322 if after_attempt is not after_attempt_log_failure:
323 elapsed_nanos: int = monotonic_ns() - call_start_nanos
324 outcome: AttemptOutcome = AttemptOutcome(retry, True, False, False, None, elapsed_nanos, 0, result)
325 after_attempt(outcome)
326 return result
327 except RetryableError as retryable_error:
328 elapsed_nanos = monotonic_ns() - call_start_nanos
329 is_terminated: Callable[[Retry], bool] = timing.is_terminated
330 giveup_reason: object | None = None
331 sleep_nanos: int = 0
332 outcome = AttemptOutcome(retry, False, False, False, None, elapsed_nanos, sleep_nanos, retryable_error)
333 on_retryable_error(outcome) # e.g. count failures (RetryableError) caught by retry loop
334 if retry_count < policy.max_retries and elapsed_nanos < policy.max_elapsed_nanos:
335 if policy.max_sleep_nanos == 0 and backoff is full_jitter_backoff_strategy:
336 pass # perf: e.g. spin-before-block
337 elif retry_count == 0 and retryable_error.retry_immediately_once:
338 pass # retry once immediately without backoff
339 else: # jitter: default backoff strategy picks random sleep_nanos in [min_sleep_nanos, curr_max_sleep_nanos]
340 rng = _thread_local_rng() if rng is None else rng
341 sleep_nanos, curr_max_sleep_nanos = backoff( # compute delay before next retry attempt, after failure
342 BackoffContext(retry, curr_max_sleep_nanos, rng, elapsed_nanos, retryable_error)
343 )
344 assert sleep_nanos >= 0 and curr_max_sleep_nanos >= 0, sleep_nanos
346 if sleep_nanos > 0:
347 outcome = AttemptOutcome(retry, False, False, False, None, elapsed_nanos, sleep_nanos, retryable_error)
348 if (not is_terminated(retry)) and (giveup_reason := giveup(outcome)) is None:
349 after_attempt(outcome)
350 sleep(sleep_nanos, retry)
351 idle_nanos += sleep_nanos
352 if not is_terminated(retry):
353 previous_outcomes = _update_previous_outcomes(previous_outcomes, outcome, policy, retry)
354 del outcome # help gc
355 retry_count += 1
356 continue # continue retry loop with next attempt
357 else:
358 sleep_nanos = 0
359 outcome = AttemptOutcome(
360 retry, False, True, is_terminated(retry), giveup_reason, elapsed_nanos, sleep_nanos, retryable_error
361 )
362 after_attempt(outcome)
363 return on_exhaustion(outcome) # raise error or return fallback value
366async def call_with_retries_async(
367 fn: Callable[[Retry], Awaitable[_T]], # wraps work and raises RetryableError for failures that shall be retried
368 policy: RetryPolicy, # specifies how ``RetryableError`` shall be retried
369 *,
370 backoff: BackoffStrategy = full_jitter_backoff_strategy, # computes delay time before next retry attempt, after failure
371 giveup: Callable[[AttemptOutcome], object | None] = no_giveup, # stop retrying based on domain-specific logic, e.g. time
372 before_attempt: Callable[[Retry], int] = before_attempt_noop, # e.g. wait due to rate limiting or internal backpressure
373 after_attempt: Callable[[AttemptOutcome], None] = after_attempt_log_failure, # e.g. record metrics and/or custom logging
374 on_retryable_error: Callable[[AttemptOutcome], None] = noop, # e.g. count failures (RetryableError) caught by retry loop
375 on_exhaustion: Callable[[AttemptOutcome], _T] = on_exhaustion_raise, # raise error or return fallback value
376 log: logging.Logger | None = None, # set this to ``None`` to disable logging
377) -> _T:
378 """Async version of call_with_retries() with the same semantics except it awaits ``fn`` and uses non-blocking sleep."""
379 rng: random.Random | None = None
380 retry_count: int = 0
381 idle_nanos: int = 0
382 curr_max_sleep_nanos: int = policy.initial_max_sleep_nanos
383 previous_outcomes: tuple[AttemptOutcome, ...] = () # for safety pass *immutable* deque to callbacks
384 timing: RetryTiming = policy.timing
385 sleep: Callable[[int, Retry], Awaitable[None]] = timing.sleep_async
386 monotonic_ns: Callable[[], int] = timing.monotonic_ns
387 call_start_nanos: Final[int] = monotonic_ns()
388 while True:
389 before_attempt_nanos: int = monotonic_ns() if retry_count != 0 else call_start_nanos
390 prev: tuple[AttemptOutcome, ...] = previous_outcomes
391 retry: Retry = Retry(
392 retry_count, call_start_nanos, before_attempt_nanos, before_attempt_nanos, idle_nanos, policy, log, prev
393 )
394 try:
395 if before_attempt is not before_attempt_noop:
396 before_attempt_sleep_nanos: int = before_attempt(retry)
397 assert before_attempt_sleep_nanos >= 0, before_attempt_sleep_nanos
398 if before_attempt_sleep_nanos > 0:
399 await sleep(before_attempt_sleep_nanos, retry) # e.g. wait due to rate limiting or internal backpressure
400 attempt_start_nanos: int = monotonic_ns()
401 idle_nanos += attempt_start_nanos - before_attempt_nanos
402 retry = Retry(
403 retry_count, call_start_nanos, before_attempt_nanos, attempt_start_nanos, idle_nanos, policy, log, prev
404 )
405 timing.on_before_attempt(retry)
406 result: _T = await fn(retry) # Call the target function and supply retry attempt number and other metadata
407 if after_attempt is not after_attempt_log_failure:
408 elapsed_nanos: int = monotonic_ns() - call_start_nanos
409 outcome: AttemptOutcome = AttemptOutcome(retry, True, False, False, None, elapsed_nanos, 0, result)
410 after_attempt(outcome)
411 return result
412 except RetryableError as retryable_error:
413 elapsed_nanos = monotonic_ns() - call_start_nanos
414 is_terminated: Callable[[Retry], bool] = timing.is_terminated
415 giveup_reason: object | None = None
416 sleep_nanos: int = 0
417 outcome = AttemptOutcome(retry, False, False, False, None, elapsed_nanos, sleep_nanos, retryable_error)
418 on_retryable_error(outcome) # e.g. count failures (RetryableError) caught by retry loop
419 if retry_count < policy.max_retries and elapsed_nanos < policy.max_elapsed_nanos:
420 if policy.max_sleep_nanos == 0 and backoff is full_jitter_backoff_strategy:
421 pass # perf: e.g. spin-before-block
422 elif retry_count == 0 and retryable_error.retry_immediately_once:
423 pass # retry once immediately without backoff
424 else: # jitter: default backoff strategy picks random sleep_nanos in [min_sleep_nanos, curr_max_sleep_nanos]
425 rng = _thread_local_rng() if rng is None else rng
426 sleep_nanos, curr_max_sleep_nanos = backoff( # compute delay before next retry attempt, after failure
427 BackoffContext(retry, curr_max_sleep_nanos, rng, elapsed_nanos, retryable_error)
428 )
429 assert sleep_nanos >= 0 and curr_max_sleep_nanos >= 0, sleep_nanos
431 if sleep_nanos > 0:
432 outcome = AttemptOutcome(retry, False, False, False, None, elapsed_nanos, sleep_nanos, retryable_error)
433 if (not is_terminated(retry)) and (giveup_reason := giveup(outcome)) is None:
434 after_attempt(outcome)
435 await sleep(sleep_nanos, retry)
436 idle_nanos += sleep_nanos
437 if not is_terminated(retry):
438 previous_outcomes = _update_previous_outcomes(previous_outcomes, outcome, policy, retry)
439 del outcome # help gc
440 retry_count += 1
441 continue # continue retry loop with next attempt
442 else:
443 sleep_nanos = 0
444 outcome = AttemptOutcome(
445 retry, False, True, is_terminated(retry), giveup_reason, elapsed_nanos, sleep_nanos, retryable_error
446 )
447 after_attempt(outcome)
448 return on_exhaustion(outcome) # raise error or return fallback value
451def multi_after_attempt(handlers: Iterable[Callable[[AttemptOutcome], None]]) -> Callable[[AttemptOutcome], None]:
452 """Composes independent ``after_attempt`` handlers into one ``call_with_retries(after_attempt=...)`` callback that
453 invokes each handler in order; thread-safe."""
454 handlers = tuple(handlers)
455 if len(handlers) == 1:
456 return handlers[0] # perf
458 def _after_attempt(outcome: AttemptOutcome) -> None:
459 for handler in handlers:
460 handler(outcome)
462 return _after_attempt
465def any_giveup(handlers: Iterable[Callable[[AttemptOutcome], object | None]]) -> Callable[[AttemptOutcome], object | None]:
466 """Composes independent ``giveup`` handlers into one ``call_with_retries(giveup=...)`` callback that gives up retrying if
467 *any* handler gives up; that is if any handler returns a non-``None`` reason; thread-safe.
469 Handlers are evaluated in order and short-circuit: On giving up returns the first handler's reason for giving up.
470 """
471 handlers = tuple(handlers)
472 if len(handlers) == 1:
473 return handlers[0] # perf
475 def _giveup(outcome: AttemptOutcome) -> object | None:
476 for handler in handlers:
477 giveup_reason: object | None = handler(outcome)
478 if giveup_reason is not None:
479 return giveup_reason
480 return None # don't give up retrying
482 return _giveup
485def all_giveup(handlers: Iterable[Callable[[AttemptOutcome], object | None]]) -> Callable[[AttemptOutcome], object | None]:
486 """Composes independent ``giveup`` handlers into one ``call_with_retries(giveup=...)`` callback that gives up retrying if
487 *all* handlers give up; that is if all handlers return a non-``None`` reason; thread-safe.
489 Handlers are evaluated in order and short-circuit: stops at first ``None``; else returns the last non-``None`` reason.
490 """
491 handlers = tuple(handlers)
492 if len(handlers) == 1:
493 return handlers[0] # perf
495 def _giveup(outcome: AttemptOutcome) -> object | None:
496 giveup_reason: object | None = None
497 for handler in handlers:
498 giveup_reason = handler(outcome)
499 if giveup_reason is None:
500 return None # don't give up retrying
501 return giveup_reason
503 return _giveup
506#############################################################################
507class RetryableError(Exception):
508 """Indicates that the task that caused the underlying exception can be retried and might eventually succeed;
509 ``call_with_retries()`` will pass this exception to callbacks via ``AttemptOutcome.result``; can be subclassed."""
511 def __init__(
512 self,
513 *exc_args: object, # optional args passed into super().__init__()
514 display_msg: object = None, # for logging
515 retry_immediately_once: bool = False, # retry once immediately without backoff?
516 category: object = None, # optional classification e.g. "CONCURRENCY, "SERVER_ISSUE", "THROTTLING", "TRANSIENT", ...
517 attachment: object = None, # optional domain specific info passed to next attempt via Retry.previous_outcomes if
518 # RetryPolicy.max_previous_outcomes > 0. This helps when retrying is not just 'try again later', but
519 # 'try again differently based on what just happened'.
520 # Examples: switching network endpoints, adjusting per-attempt timeouts, capping retries by error-class, resuming
521 # with a token/offset, maintaining failure history for this invocation of call_with_retries().
522 # Example: 'cap retries to 3 for ECONNREFUSED but 12 for ETIMEDOUT' via attachment=collections.Counter
523 ) -> None:
524 super().__init__(*exc_args)
525 self.display_msg: object = display_msg
526 self.retry_immediately_once: bool = retry_immediately_once
527 self.category: object = category
528 self.attachment: object = attachment
530 def display_msg_str(self) -> str:
531 """Returns the display_msg as a str; for logging."""
532 return "" if self.display_msg is None else str(self.display_msg)
535#############################################################################
536@final
537class RetryError(Exception):
538 """Indicates that retries have been exhausted; the last RetryableError is in RetryError.__cause__."""
540 outcome: Final[AttemptOutcome]
541 """Metadata that describes why and how call_with_retries() gave up."""
543 def __init__(self, outcome: AttemptOutcome) -> None:
544 super().__init__(outcome)
545 self.outcome = outcome
548#############################################################################
549@final
550class Retry(NamedTuple):
551 """Attempt metadata provided to callback functions; includes the current retry attempt number; immutable."""
553 count: int # type: ignore[assignment]
554 """Attempt number; count=0 is the first attempt, count=1 is the second attempt aka first retry."""
556 call_start_time_nanos: int
557 """Value of time.monotonic_ns() at start of call_with_retries() invocation."""
559 before_attempt_start_time_nanos: int
560 """Value of time.monotonic_ns() at start of before_attempt() invocation."""
562 attempt_start_time_nanos: int
563 """Value of time.monotonic_ns() at start of fn() invocation."""
565 idle_nanos: int
566 """Sum of all before_attempt_sleep_nanos() plus AttemptOutcome.sleep_nanos across this call_with_retries() invocation, at
567 the start of fn() invocation."""
569 policy: RetryPolicy
570 """Policy that was passed into call_with_retries()."""
572 log: logging.Logger | None
573 """Logger that was passed into call_with_retries()."""
575 previous_outcomes: Sequence[AttemptOutcome]
576 """History/state of the N=max_previous_outcomes most recent outcomes for the current call_with_retries() invocation."""
578 def copy(self, **override_kwargs: Any) -> Retry:
579 """Creates a new object copying an existing one with the specified fields overridden for customization."""
580 return self._replace(**override_kwargs)
582 def before_attempt_sleep_nanos(self) -> int:
583 """Returns duration between the start of before_attempt() and the start of fn() attempt."""
584 return self.attempt_start_time_nanos - self.before_attempt_start_time_nanos
586 def __repr__(self) -> str:
587 return (
588 f"{type(self).__name__}(count={self.count!r}, call_start_time_nanos={self.call_start_time_nanos!r}, "
589 f"before_attempt_start_time_nanos={self.before_attempt_start_time_nanos!r}, "
590 f"attempt_start_time_nanos={self.attempt_start_time_nanos!r}, idle_nanos={self.idle_nanos!r})"
591 )
593 def __eq__(self, other: object) -> bool:
594 return self is other
596 def __hash__(self) -> int:
597 return object.__hash__(self)
600#############################################################################
601@final
602class AttemptOutcome(NamedTuple):
603 """Captures per-attempt state for ``after_attempt`` callbacks; immutable."""
605 retry: Retry
606 """Attempt metadata passed into fn(retry)."""
608 is_success: bool
609 """False if fn(retry) raised a RetryableError; True otherwise."""
611 is_exhausted: bool
612 """True if the loop is giving up retrying (possibly even due to is_terminated); False otherwise."""
614 is_terminated: bool
615 """True if the termination predicate has become true; False otherwise."""
617 giveup_reason: object | None
618 """Reason returned by giveup(); None means giveup() was not called or decided to not give up."""
620 elapsed_nanos: int
621 """Total duration between the start of call_with_retries() invocation and the end of this fn() attempt."""
623 sleep_nanos: int
624 """Duration of current sleep period."""
626 result: RetryableError | object
627 """Result of fn(retry); a RetryableError on retryable failure, or some other object on success."""
629 def attempt_elapsed_nanos(self) -> int:
630 """Returns duration between the start of this fn() attempt and the end of this fn() attempt."""
631 return self.elapsed_nanos + self.retry.call_start_time_nanos - self.retry.attempt_start_time_nanos
633 def copy(self, **override_kwargs: Any) -> AttemptOutcome:
634 """Creates a new outcome copying an existing one with the specified fields overridden for customization."""
635 return self._replace(**override_kwargs)
637 def __repr__(self) -> str:
638 return (
639 f"{type(self).__name__}("
640 f"retry={self.retry!r}, "
641 f"is_success={self.is_success!r}, "
642 f"is_exhausted={self.is_exhausted!r}, "
643 f"is_terminated={self.is_terminated!r}, "
644 f"giveup_reason={self.giveup_reason!r}, "
645 f"elapsed_nanos={self.elapsed_nanos!r}, "
646 f"sleep_nanos={self.sleep_nanos!r})"
647 )
649 def __eq__(self, other: object) -> bool:
650 return self is other
652 def __hash__(self) -> int:
653 return object.__hash__(self)
656#############################################################################
657@final
658class BackoffContext(NamedTuple):
659 """Captures per-backoff state for ``backoff`` callbacks."""
661 retry: Retry
662 """Attempt metadata passed into fn(retry)."""
664 curr_max_sleep_nanos: int
665 """Current maximum duration (in nanoseconds) to sleep before the next retry attempt;
666 Typically: ``RetryPolicy.initial_max_sleep_nanos <= curr_max_sleep_nanos <= RetryPolicy.max_sleep_nanos``."""
668 rng: random.Random
669 """Thread-local random number generator instance."""
671 elapsed_nanos: int
672 """Total duration between the start of call_with_retries() invocation and the end of this fn() attempt."""
674 retryable_error: RetryableError
675 """Result of failed fn(retry) attempt."""
677 def copy(self, **override_kwargs: Any) -> BackoffContext:
678 """Creates a new object copying an existing one with the specified fields overridden for customization."""
679 return self._replace(**override_kwargs)
681 def __repr__(self) -> str:
682 return (
683 f"{type(self).__name__}("
684 f"retry={self.retry!r}, "
685 f"curr_max_sleep_nanos={self.curr_max_sleep_nanos!r}, "
686 f"elapsed_nanos={self.elapsed_nanos!r})"
687 )
689 def __eq__(self, other: object) -> bool:
690 return self is other
692 def __hash__(self) -> int:
693 return object.__hash__(self)
696BackoffStrategy = Callable[[BackoffContext], tuple[int, int]] # typealias; returns sleep_nanos:int, curr_max_sleep_nanos:int
697"""Strategy that implements a backoff algorithm that reduces server load while minimizing retry latency; default is full
698jitter; various other example backoff strategies such as decorrelated-jitter or retry-after HTTP 429 "Too Many Requests"
699responses, etc can be found in test_retry_examples.py."""
702#############################################################################
703def _default_timing_is_terminated(retry: Retry) -> bool:
704 return False
707def _default_timing_sleep(sleep_nanos: int, retry: Retry) -> None:
708 time.sleep(sleep_nanos / 1_000_000_000)
711async def _default_timing_sleep_asyncio(sleep_nanos: int, retry: Retry) -> None:
712 import asyncio
714 await asyncio.sleep(sleep_nanos / 1_000_000_000)
717def _default_timing_on_before_attempt(retry: Retry) -> None:
718 if retry.policy.timing.is_terminated(retry):
719 raise RetryableError(display_msg="terminated before attempt") from RetryTerminationError()
722@final
723class RetryTerminationError(InterruptedError):
724 """Termination signal raised when retry loop exits before starting the next attempt."""
727@dataclass(frozen=True)
728@final
729class RetryTiming:
730 """Customizable callbacks for reading the current monotonic time, sleeping and optional async termination; immutable."""
732 monotonic_ns: Callable[[], int] = time.monotonic_ns
733 """Returns the system's current monotonic time in nanoseconds."""
735 is_terminated: Callable[[Retry], bool] = _default_timing_is_terminated
736 """Returns whether a predicate has become true; if so causes the retry loop to exit early between attempts; can be used
737 to indicate system shutdown or similar cancellation conditions; default is to always return ``False``; this function
738 should complete quickly without any blocking or sleeping."""
740 sleep: Callable[[int, Retry], None] = _default_timing_sleep
741 """Sleeps N nanoseconds between attempts; override to inject custom sleeping or for early wake-ups; thread-safe."""
743 sleep_async: Callable[[int, Retry], Awaitable[None]] = _default_timing_sleep_asyncio
744 """Sleeps N nanoseconds between attempts; override to inject custom sleeping or for early wake-ups; thread-safe."""
746 on_before_attempt: Callable[[Retry], None] = _default_timing_on_before_attempt
747 """Typically (but not necessarily) raises an error if ``is_terminated()`` is True; otherwise fn() will still run; this
748 function should complete quickly without any blocking or sleeping.
750 To disable this behavior: RetryTiming.make_from(...).copy(on_before_attempt=lambda retry: None).
751 """
753 def copy(self, **override_kwargs: Any) -> RetryTiming:
754 """Creates a new object copying an existing one with the specified fields overridden for customization; thread-
755 safe."""
756 return dataclasses.replace(self, **override_kwargs)
758 @staticmethod
759 def make_from(termination_event: threading.Event | None) -> RetryTiming:
760 """Convenience factory that creates a RetryTiming that performs async termination when termination_event is set."""
761 if termination_event is None:
762 return RetryTiming()
764 def _is_terminated(retry: Retry) -> bool:
765 return termination_event.is_set()
767 def _sleep(sleep_nanos: int, retry: Retry) -> None:
768 termination_event.wait(sleep_nanos / 1_000_000_000) # allow early wakeup on async termination
770 return RetryTiming(is_terminated=_is_terminated, sleep=_sleep)
772 @staticmethod
773 def make_from_asyncio(termination_event: asyncio.Event | None) -> RetryTiming:
774 """Convenience factory that creates a RetryTiming that performs async termination when termination_event is set;
775 Write an analog version of this function if you wish to replace asyncio with Trio or AnyIO or similar."""
776 if termination_event is None:
777 return RetryTiming()
779 def _is_terminated(retry: Retry) -> bool:
780 return termination_event.is_set()
782 async def _sleep_async(sleep_nanos: int, retry: Retry) -> None:
783 import asyncio
785 if sleep_nanos <= 0:
786 await asyncio.sleep(0) # perf: cooperative yield with less overhead than asyncio.wait_for(..., 0)
787 return
788 try:
789 await asyncio.wait_for(termination_event.wait(), timeout=sleep_nanos / 1_000_000_000)
790 except asyncio.TimeoutError:
791 pass # expected
793 return RetryTiming(is_terminated=_is_terminated, sleep_async=_sleep_async)
796#############################################################################
797def _format_msg(display_msg: str, retryable_error: RetryableError) -> str:
798 """Default implementation of ``format_msg`` callback for RetryConfig; creates simple log message; thread-safe."""
799 msg = display_msg + " " if display_msg else ""
800 errmsg: str = retryable_error.display_msg_str()
801 msg = msg + errmsg + " " if errmsg else msg
802 msg = msg if msg else "Retrying "
803 return msg
806def _format_pair(first: object, second: object) -> str:
807 """Default implementation of ``format_pair`` callback for RetryConfig; creates simple log message part; thread-safe."""
808 second = "∞" if INFINITY_MAX_RETRIES == second else second # noqa: SIM300
809 return f"[{first}/{second}]"
812@dataclass(frozen=True)
813@final
814class RetryConfig:
815 """Configures logging for call_with_retries(); all defaults work out of the box; immutable."""
817 display_msg: str = "Retrying" # message prefix for retry log messages
818 dots: str = " ..." # suffix appended to retry log messages
819 format_msg: Callable[[str, RetryableError], str] = _format_msg # lambda: display_msg, retryable_error
820 format_pair: Callable[[object, object], str] = _format_pair # lambda: first, second
821 format_duration: Callable[[int], str] = human_readable_duration # lambda: nanos
822 info_loglevel: int = logging.INFO # loglevel used when not giving up
823 warning_loglevel: int = logging.WARNING # loglevel used when giving up
824 enable_logging: bool = True # set to False to disable logging
825 exc_info: bool = False # passed into Logger.log()
826 stack_info: bool = False # passed into Logger.log()
827 extra: Mapping[str, object] | None = dataclasses.field(default=None, repr=False, compare=False) # passed to Logger.log()
828 context: object = dataclasses.field(default=None, repr=False, compare=False) # optional domain specific info
830 def copy(self, **override_kwargs: Any) -> RetryConfig:
831 """Creates a new config copying an existing one with the specified fields overridden for customization."""
832 return dataclasses.replace(self, **override_kwargs)
835#############################################################################
836@dataclass(frozen=True)
837@final
838class RetryPolicy:
839 """Configuration of maximum retries, sleep bounds, elapsed-time budget, logging, etc for call_with_retries(); immutable.
841 By default uses full jitter which works as follows: The maximum duration to sleep between attempts initially starts with
842 ``initial_max_sleep_secs`` and doubles on each retry, up to the final maximum of ``max_sleep_secs``.
843 Example: 125ms --> 250ms --> 500ms --> 1s --> 2s --> 4s --> 8s --> 10s --> 10s...
844 On each retry a random sleep duration in the range ``[min_sleep_secs, current max]`` is picked.
845 In a nutshell: ``0 <= min_sleep_secs <= initial_max_sleep_secs <= max_sleep_secs``. Typically, min_sleep_secs=0.
846 """
848 max_retries: int = INFINITY_MAX_RETRIES
849 """The maximum number of times ``fn`` will be invoked additionally after the first attempt invocation; must be >= 0."""
851 min_sleep_secs: float = 0
852 """The minimum duration to sleep between any two attempts."""
854 initial_max_sleep_secs: float = 0.125
855 """The initial maximum duration to sleep between any two attempts."""
857 max_sleep_secs: float = 10
858 """The final max duration to sleep between any two attempts; 0 <= min_sleep_secs <= initial_max_sleep_secs <=
859 max_sleep_secs."""
861 max_elapsed_secs: float = 47
862 """``fn`` will not be retried (or not retried anymore) once this much time has elapsed since the initial start of
863 call_with_retries(); set this to 365 * 86400 seconds or similar to effectively disable the time limit."""
865 exponential_base: float = 2
866 """Growth factor (aka multiplier) for backoff algorithm to calculate sleep duration; must be >= 1."""
868 max_elapsed_nanos: int = dataclasses.field(init=False, repr=False) # derived value
869 min_sleep_nanos: int = dataclasses.field(init=False, repr=False) # derived value
870 initial_max_sleep_nanos: int = dataclasses.field(init=False, repr=False) # derived value
871 max_sleep_nanos: int = dataclasses.field(init=False, repr=False) # derived value
873 reraise: bool = True
874 """On exhaustion, the default (``True``) is to re-raise the underlying exception when present."""
876 max_previous_outcomes: int = 0
877 """Pass the N=max_previous_outcomes most recent AttemptOutcome objects to callbacks via Retry.previous_outcomes."""
879 config: RetryConfig = dataclasses.field(default=RetryConfig(), repr=False, compare=False)
880 """Configures logging behavior."""
882 timing: RetryTiming = dataclasses.field(default=RetryTiming(), repr=False)
883 """Customizable callbacks for reading the current monotonic time, sleeping and optional async termination."""
885 context: object = dataclasses.field(default=None, repr=False, compare=False)
886 """Optional domain specific info."""
888 @classmethod
889 def from_namespace(cls, args: argparse.Namespace) -> RetryPolicy:
890 """Factory that reads the policy from argparse.ArgumentParser via args."""
891 return cls(
892 max_retries=getattr(args, "max_retries", INFINITY_MAX_RETRIES),
893 min_sleep_secs=getattr(args, "retry_min_sleep_secs", 0),
894 initial_max_sleep_secs=getattr(args, "retry_initial_max_sleep_secs", 0.125),
895 max_sleep_secs=getattr(args, "retry_max_sleep_secs", 10),
896 max_elapsed_secs=getattr(args, "retry_max_elapsed_secs", 47),
897 exponential_base=getattr(args, "retry_exponential_base", 2),
898 reraise=getattr(args, "retry_reraise", True),
899 max_previous_outcomes=getattr(args, "retry_max_previous_outcomes", 0),
900 config=getattr(args, "retry_config", RetryConfig()),
901 timing=getattr(args, "retry_timing", RetryTiming()),
902 context=getattr(args, "retry_context", None),
903 )
905 @classmethod
906 def no_retries(cls) -> RetryPolicy:
907 """Returns a policy that never retries."""
908 return cls(
909 max_retries=0,
910 min_sleep_secs=0,
911 initial_max_sleep_secs=0,
912 max_sleep_secs=0,
913 max_elapsed_secs=0,
914 )
916 def __post_init__(self) -> None: # validate and compute derived values
917 self._validate_min("max_retries", self.max_retries, 0)
918 self._validate_min("exponential_base", self.exponential_base, 1)
919 self._validate_min("min_sleep_secs", self.min_sleep_secs, 0)
920 self._validate_min("initial_max_sleep_secs", self.initial_max_sleep_secs, 0)
921 self._validate_min("max_sleep_secs", self.max_sleep_secs, 0)
922 self._validate_min("max_elapsed_secs", self.max_elapsed_secs, 0)
923 object.__setattr__(self, "max_elapsed_nanos", int(self.max_elapsed_secs * 1_000_000_000)) # derived value
924 min_sleep_nanos: int = int(self.min_sleep_secs * 1_000_000_000)
925 initial_max_sleep_nanos: int = int(self.initial_max_sleep_secs * 1_000_000_000)
926 max_sleep_nanos: int = int(self.max_sleep_secs * 1_000_000_000)
927 max_sleep_nanos = max(min_sleep_nanos, max_sleep_nanos)
928 initial_max_sleep_nanos = min(max_sleep_nanos, max(min_sleep_nanos, initial_max_sleep_nanos))
929 object.__setattr__(self, "min_sleep_nanos", min_sleep_nanos) # derived value
930 object.__setattr__(self, "initial_max_sleep_nanos", initial_max_sleep_nanos) # derived value
931 object.__setattr__(self, "max_sleep_nanos", max_sleep_nanos) # derived value
932 self._validate_min("max_previous_outcomes", self.max_previous_outcomes, 0)
933 assert 0 <= self.min_sleep_nanos <= self.initial_max_sleep_nanos <= self.max_sleep_nanos
934 if not isinstance(self.reraise, bool):
935 raise TypeError(f"{type(self).__name__}.reraise must be bool")
937 def _validate_min(self, attr_name: str, value: float, minimum: float) -> None:
938 if value < minimum:
939 raise ValueError(f"Invalid {type(self).__name__}.{attr_name}: must be >= {minimum} but got {value}")
941 def copy(self, **override_kwargs: Any) -> RetryPolicy:
942 """Creates a new policy copying an existing one with the specified fields overridden for customization; thread-safe.
944 Example usage: policy = retry_policy.copy(max_sleep_secs=2, max_elapsed_secs=10)
945 """
946 return dataclasses.replace(self, **override_kwargs)
949#############################################################################
950def _fn_not_implemented(_retry: Retry) -> NoReturn:
951 """Default implementation of ``fn`` callback for RetryTemplate; always raises."""
952 raise NotImplementedError("Provide fn when calling RetryTemplate")
955NO_LOGGER: Final[logging.Logger] = logging.Logger("NULL") # noqa: LOG001 do not register dummy logger with Logger.manager
956NO_LOGGER.addHandler(logging.NullHandler()) # prevents lastResort fallback
957NO_LOGGER.disabled = True
958NO_LOGGER.propagate = False
959_R = TypeVar("_R")
962@dataclass(frozen=True)
963@final
964class RetryTemplate(Generic[_T]):
965 """Convenience class that aggregates all knobs for call_with_retries(); and is itself callable too; immutable."""
967 fn: Callable[[Retry], _T] = _fn_not_implemented # set this to make the RetryTemplate object itself callable
968 policy: RetryPolicy = RetryPolicy() # specifies how ``RetryableError`` shall be retried
969 backoff: BackoffStrategy = full_jitter_backoff_strategy # computes delay time before next retry attempt, after failure
970 giveup: Callable[[AttemptOutcome], object | None] = no_giveup # stop retrying based on domain-specific logic, e.g. time
971 before_attempt: Callable[[Retry], int] = before_attempt_noop # e.g. wait due to rate limiting or internal backpressure
972 after_attempt: Callable[[AttemptOutcome], None] = after_attempt_log_failure # e.g. record metrics and/or custom logging
973 on_retryable_error: Callable[[AttemptOutcome], None] = noop # e.g. count failures (RetryableError) caught by retry loop
974 on_exhaustion: Callable[[AttemptOutcome], _T] = on_exhaustion_raise # raise error or return fallback value
975 log: logging.Logger | None = None # set this to ``None`` to disable logging
977 def copy(self, **override_kwargs: Any) -> RetryTemplate[_T]:
978 """Creates a new object copying an existing one with the specified fields overridden for customization; thread-safe.
980 Example usage: retry_template.copy(policy=policy.copy(max_sleep_secs=2, max_elapsed_secs=10), log=None)
981 """
982 return dataclasses.replace(self, **override_kwargs)
984 def __call__(self) -> _T:
985 """Invokes ``self.fn`` via the call_with_retries() retry loop using the stored parameters; thread-safe.
987 Example Usage: result: str = retry_template.copy(fn=...)()
988 """
989 return call_with_retries(
990 fn=self.fn,
991 policy=self.policy,
992 backoff=self.backoff,
993 giveup=self.giveup,
994 before_attempt=self.before_attempt,
995 after_attempt=self.after_attempt,
996 on_retryable_error=self.on_retryable_error,
997 on_exhaustion=self.on_exhaustion,
998 log=self.log,
999 )
1001 def call_with_retries(
1002 self,
1003 fn: Callable[[Retry], _R],
1004 policy: RetryPolicy | None = None,
1005 *,
1006 backoff: BackoffStrategy | None = None,
1007 giveup: Callable[[AttemptOutcome], object | None] | None = None,
1008 before_attempt: Callable[[Retry], int] | None = None,
1009 after_attempt: Callable[[AttemptOutcome], None] | None = None,
1010 on_retryable_error: Callable[[AttemptOutcome], None] | None = None,
1011 on_exhaustion: Callable[[AttemptOutcome], _R] | None = None,
1012 log: logging.Logger | None = None, # pass NO_LOGGER to override template logger and disable logging for this call
1013 ) -> _R:
1014 """Invokes ``fn`` via the call_with_retries() retry loop using the stored or overridden params; thread-safe.
1016 Example Usage: result: str = retry_template.call_with_retries(fn=...)
1017 """
1018 return call_with_retries(
1019 fn=fn,
1020 policy=self.policy if policy is None else policy,
1021 backoff=self.backoff if backoff is None else backoff,
1022 giveup=self.giveup if giveup is None else giveup,
1023 before_attempt=self.before_attempt if before_attempt is None else before_attempt,
1024 after_attempt=self.after_attempt if after_attempt is None else after_attempt,
1025 on_retryable_error=self.on_retryable_error if on_retryable_error is None else on_retryable_error,
1026 on_exhaustion=(
1027 cast(Callable[[AttemptOutcome], _R], self.on_exhaustion) if on_exhaustion is None else on_exhaustion
1028 ),
1029 log=None if log is NO_LOGGER else self.log if log is None else log,
1030 )
1032 async def call_with_retries_async(
1033 self,
1034 fn: Callable[[Retry], Awaitable[_R]],
1035 policy: RetryPolicy | None = None,
1036 *,
1037 backoff: BackoffStrategy | None = None,
1038 giveup: Callable[[AttemptOutcome], object | None] | None = None,
1039 before_attempt: Callable[[Retry], int] | None = None,
1040 after_attempt: Callable[[AttemptOutcome], None] | None = None,
1041 on_retryable_error: Callable[[AttemptOutcome], None] | None = None,
1042 on_exhaustion: Callable[[AttemptOutcome], _R] | None = None,
1043 log: logging.Logger | None = None, # pass NO_LOGGER to override template logger and disable logging for this call
1044 ) -> _R:
1045 """Invokes ``fn`` via the call_with_retries_async() retry loop using the stored or overridden params; thread-safe.
1047 Example Usage: result: str = await retry_template.call_with_retries_async(fn=...)
1048 """
1049 return await call_with_retries_async(
1050 fn=fn,
1051 policy=self.policy if policy is None else policy,
1052 backoff=self.backoff if backoff is None else backoff,
1053 giveup=self.giveup if giveup is None else giveup,
1054 before_attempt=self.before_attempt if before_attempt is None else before_attempt,
1055 after_attempt=self.after_attempt if after_attempt is None else after_attempt,
1056 on_retryable_error=self.on_retryable_error if on_retryable_error is None else on_retryable_error,
1057 on_exhaustion=(
1058 cast(Callable[[AttemptOutcome], _R], self.on_exhaustion) if on_exhaustion is None else on_exhaustion
1059 ),
1060 log=None if log is NO_LOGGER else self.log if log is None else log,
1061 )
1063 @overload
1064 def wraps(self, fn: Callable[..., Awaitable[_R]]) -> Callable[..., Awaitable[_R]]: ...
1066 @overload
1067 def wraps(self, fn: Callable[..., _R]) -> Callable[..., _R]: ...
1069 def wraps(self, fn: Callable[..., Any]) -> Callable[..., Any]:
1070 """Returns a wrapper function that forwards all arguments to ``fn`` and retries it using this template; thread-safe.
1072 Example Usage:
1073 def fn(x: int) -> int:
1074 return x * 2
1075 func: Callable[[int], int] = retry_template.wraps(fn)
1076 y: int = func(5) # returns 10
1077 """
1078 import functools
1079 import inspect
1081 target_fn: Callable[..., Any] = fn.func if isinstance(fn, functools.partial) else fn
1082 if inspect.iscoroutinefunction(target_fn) or inspect.iscoroutinefunction(type(target_fn).__call__):
1084 @functools.wraps(fn)
1085 async def wrapped_async(*args: Any, **kwargs: Any) -> Any:
1086 return await self.call_with_retries_async(fn=lambda _retry: fn(*args, **kwargs))
1088 return wrapped_async
1089 else:
1091 @functools.wraps(fn)
1092 def wrapped(*args: Any, **kwargs: Any) -> Any:
1093 return self.call_with_retries(fn=lambda _retry: fn(*args, **kwargs))
1095 return wrapped
1098#############################################################################
1099def raise_retryable_error_from(
1100 exc: BaseException,
1101 *,
1102 display_msg: object = None,
1103 retry_immediately_once: bool = False,
1104 category: object = None,
1105 attachment: object = None,
1106) -> NoReturn:
1107 """Convenience function that raises a generic RetryableError that wraps the given underlying exception."""
1108 raise RetryableError(
1109 display_msg=type(exc).__name__ if display_msg is None else display_msg,
1110 retry_immediately_once=retry_immediately_once,
1111 category=category,
1112 attachment=attachment,
1113 ) from exc
1116ExceptionPredicate = Union[bool, Callable[[BaseException], bool]] # Type alias
1119def call_with_exception_handlers(
1120 fn: Callable[[], _T], # typically a lambda
1121 *,
1122 continue_scanning_if_no_predicate_matches: bool = False,
1123 handlers: Mapping[type[BaseException], Sequence[tuple[ExceptionPredicate, Callable[[BaseException], _T]]]],
1124) -> _T:
1125 """Convenience function that calls ``fn`` and returns its result; on exception runs the first matching handler in a per-
1126 exception handler chain; composes independent handlers via predicates into one function, in Event-Predicate-Action style.
1128 Lookup uses the exception type's Method Resolution Order (most-specific class in the exception class hierarchy wins). For
1129 the first class that exists as a key in ``handlers``, its chain is scanned in order. Each chain element is
1130 ``(predicate, handler)`` where ``predicate`` is either ``True`` (always matches), ``False`` (disabled), or
1131 ``predicate(exc) -> bool``. The first matching handler is called with the exception and its return value is returned. If
1132 no predicate matches then, by default, the original exception is re-raised and no less-specific handler chains are
1133 consulted. Set ``continue_scanning_if_no_predicate_matches=True`` to continue scanning exception base classes instead.
1135 Typically (but not necessarily) the handler raises a ``RetryableError``, via ``raise_retryable_error_from`` or similar.
1136 Or it may raise another exception type (which will not be retried), or even return a fallback value instead of raising.
1138 Example: turn transient ssh/zfs command failures into RetryableError for call_with_retries(), including feature flags:
1140 def run_remote(retry: Retry) -> str:
1141 p = subprocess.run(["ssh", "foo.example.com", "zfs", "list", "-H"], text=True, capture_output=True, check=True)
1142 return p.stdout
1144 def fn(retry: Retry) -> str:
1145 return call_with_exception_handlers(
1146 fn=lambda: run_remote(retry),
1147 handlers={
1148 TimeoutError: [(True, raise_retryable_error_from)],
1149 ConnectionResetError: [(True, lambda exc: raise_retryable_error_from(exc, display_msg="ssh reset"))],
1150 subprocess.CalledProcessError: [
1151 (lambda exc: exc.returncode == 255, lambda exc: raise_retryable_error_from(exc, display_msg="ssh error")),
1152 (lambda exc: "cannot receive" in (exc.stderr or ""), lambda exc: raise_retryable_error_from(exc, display_msg="zfs recv")),
1153 ],
1154 OSError: [
1155 (lambda exc: getattr(exc, "errno", None) in {errno.ETIMEDOUT, errno.EHOSTUNREACH},
1156 lambda exc: raise_retryable_error_from(exc, display_msg=f"network: {exc}")),
1157 (False, lambda exc: raise_retryable_error_from(exc, display_msg="disabled handler example")),
1158 ],
1159 },
1160 )
1162 stdout: str = call_with_retries(fn=fn, policy=RetryPolicy(max_retries=3))
1164 Example: return a fallback value (no retry loop required):
1166 def read_optional_file(path: str) -> str:
1167 return call_with_exception_handlers(
1168 fn=lambda: open(path, encoding="utf-8").read(),
1169 handlers={FileNotFoundError: [(True, lambda _exc: "")]},
1170 )
1171 """
1172 try:
1173 return fn()
1174 except BaseException as exc:
1175 for cls in type(exc).__mro__:
1176 handler_chain = handlers.get(cls)
1177 if handler_chain is not None:
1178 for predicate, handler in handler_chain:
1179 if predicate is True or (predicate is not False and predicate(exc)):
1180 return handler(exc)
1181 if not continue_scanning_if_no_predicate_matches:
1182 raise
1183 raise
1186#############################################################################
1187@final
1188class _ThreadLocalRNG(threading.local):
1189 """Caches a per-thread random number generator."""
1191 def __init__(self) -> None:
1192 self.rng: random.Random | None = None
1195_THREAD_LOCAL_RNG: Final[_ThreadLocalRNG] = _ThreadLocalRNG()
1198def _thread_local_rng() -> random.Random:
1199 """Returns a per-thread RNG for backoff jitter; for perf avoids locking and initializing a new random.Random() at high
1200 frequency."""
1201 threadlocal: _ThreadLocalRNG = _THREAD_LOCAL_RNG
1202 rng: random.Random | None = threadlocal.rng
1203 if rng is None:
1204 rng = random.Random() # noqa: S311 jitter isn't security sensitive, and random.SystemRandom.randint() is slow
1205 threadlocal.rng = rng
1206 return rng