Coverage for bzfs_main / util / retry.py: 100%
514 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 10:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 10:16 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Customizable generic retry framework; defaults to jittered exponential backoff with cap unless specified otherwise.
17Purpose:
18--------
19- Provide a reusable retry helper for transient failures using customizable policy and callbacks.
20- Centralize backoff, jitter, logging and metrics behavior while keeping call sites compact.
21- Prevent accidental retries: the loop retries only when the developer explicitly raises a ``RetryableError``, which reduces
22 the risk of retrying non-idempotent operations.
23- Provide a thread-safe, fast implementation; avoid shared RNG contention.
24- Provide both sync and async API, both with the same semantics, except async API awaits ``fn`` and uses non-blocking sleep.
25- Avoid unnecessary complexity and add zero dependencies beyond the Python standard library. Everything you need is in this
26 single Python file.
28Usage:
29------
30- Wrap work in a callable ``fn(retry: Retry)`` and therein raise ``RetryableError`` for failures that should be retried.
31- Construct a policy via ``RetryPolicy(...)`` that specifies how ``RetryableError`` shall be retried.
32- Invoke ``call_with_retries(fn=fn, policy=policy, log=logger)`` or call_with_retries_async with a standard logging.Logger.
33- On success, the result of calling ``fn`` is returned.
34- By default on exhaustion, call_with_retries* either re-raises the last underlying ``RetryableError.__cause__``, or raises
35 ``RetryError`` (wrapping the last ``RetryableError``), like so:
36 - if ``RetryPolicy.reraise`` is True and the last ``RetryableError.__cause__`` is not None, re-raise the last
37 ``RetryableError.__cause__`` with its original traceback.
38 - Otherwise, raise ``RetryError`` (wrapping the last ``RetryableError``, preserving its ``__cause__`` chain).
39 - The default is ``RetryPolicy.reraise=True``.
41Advanced Configuration:
42-----------------------
43- Tune ``RetryPolicy`` parameters to control maximum retries, sleep bounds, elapsed-time budget, logging, etc.
44- Use ``RetryPolicy.config: RetryConfig`` to control logging settings.
45- Set ``log=None`` to disable logging, or customize ``info_loglevel`` / ``warning_loglevel`` for structured logs.
46- Supply a ``giveup(AttemptOutcome)`` callback to stop retrying based on domain-specific logic (for example, decisions based
47 on time budget/quota or the previous N most recent AttemptOutcome objects (via AttemptOutcome.retry.previous_outcomes).
48- Use the ``any_giveup()`` / ``all_giveup()`` helper to consult more than one callback handler in ``giveup(AttemptOutcome)``.
49- Supply an ``on_exhaustion(AttemptOutcome)`` callback to customize behavior when giving up; it may raise an error or return
50 a fallback value.
52Observability:
53--------------
54- Supply an ``after_attempt(AttemptOutcome)`` callback to collect per-attempt metrics such as success flag,
55 exhausted/terminated state, attempt number, total elapsed duration (in nanoseconds), sleep duration (in nanoseconds), etc.
56- ``AttemptOutcome.result`` is either the successful result or the most recent ``RetryableError``, enabling integration with
57 metrics and tracing systems without coupling the retry loop to any specific backend.
58- Supply an ``after_attempt(AttemptOutcome)`` callback to customize logging 100%, if necessary.
59- Use the ``multi_after_attempt()`` helper to invoke more than one callback handler in ``after_attempt(AttemptOutcome)``.
61Expert Configuration:
62---------------------
63- Supply a ``backoff(BackoffContext)`` callback to plug in a custom backoff algorithm (e.g., decorrelated-jitter or
64 retry-after HTTP 429). The default is full-jitter exponential backoff with cap (aka industry standard).
65- Supply a ``before_attempt(Retry)`` callback for optional rate limiting or other forms of internal backpressure.
66- Supply a ``on_retryable_error(AttemptOutcome)`` callback, e.g. to count failures (RetryableError) caught by the retry loop.
67- Set ``RetryPolicy.max_previous_outcomes > 0`` to pass the N most recent AttemptOutcome objects to callbacks (default is 0).
68- If ``RetryPolicy.max_previous_outcomes > 0``, you can use ``RetryableError(..., attachment=...)`` to carry domain-specific
69 state from a failed attempt to the next attempt via ``retry.previous_outcomes``. This pattern helps if attempt N+1 is a
70 function of attempt N or all prior attempts (e.g., switching endpoints or resuming from an offset).
71- Use ``RetryTemplate`` as a 'bag of knobs' configuration template for functions that shall be retried in similar ways.
72- Or package up all knobs plus a ``fn(retry: Retry)`` function into a self-contained auto-retrying higher level function by
73 constructing a ``RetryTemplate`` object (which is a ``Callable`` function itself).
74- To keep calling code retry-transparent, set ``RetryPolicy.reraise=True`` (the default) *and* raise retryable failures as
75 ``raise RetryableError(...) from exc``. Client code now won't notice whether call_with_retries* is used or not.
76- To make exhaustion observable to calling code, set ``RetryPolicy.reraise=False``: by default call_with_retries* now always
77 raises ``RetryError`` (wrapping the last ``RetryableError``) on exhaustion, so callers now catch ``RetryError`` and can
78 inspect the last underlying exception via ``err.outcome``, ``err.__cause__``, and even ``err.__cause__.__cause__`` when
79 present.
80- Set ``RetryPolicy.timing`` to customize reading the current monotonic time, sleeping and optional async termination.
81- The callback API is powerful enough to easily plug in advanced retry algorithms such as:
82 - Google SRE Client-Side Adaptive Throttling - https://sre.google/sre-book/handling-overload/
83 - gRPC retry throttling - https://grpc.io/docs/guides/retry/
84 - AWS SDK adaptive retry mode - https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html
85 - Circuit breakers - https://martinfowler.com/bliki/CircuitBreaker.html (e.g. via `pybreaker` third-party library)
86 - Rate limiting with Fixed Window, Moving Window, and Sliding Windows (e.g. via `limits` third-party library)
87 - Various example backoff strategies such as decorrelated-jitter or retry-after HTTP 429 "Too Many Requests" responses,
88 etc can be found in test_retry_examples.py.
90Example Usage:
91--------------
92 import logging
93 from bzfs_main.util.retry import Retry, RetryPolicy, RetryableError, call_with_retries
95 def unreliable_operation(retry: Retry) -> str:
96 try:
97 if retry.count < 3:
98 raise ValueError("temporary failure connecting to foo.example.com")
99 return "ok"
100 except ValueError as exc:
101 # Preserve the underlying cause for correct error propagation and logging
102 raise RetryableError(display_msg="connect") from exc
104 retry_policy = RetryPolicy(
105 max_retries=10,
106 min_sleep_secs=0,
107 initial_max_sleep_secs=0.125,
108 max_sleep_secs=10,
109 max_elapsed_secs=60,
110 )
111 log = logging.getLogger(__name__)
112 result: str = call_with_retries(fn=unreliable_operation, policy=retry_policy, log=log)
113 print(result)
115 # Sample log output:
116 # INFO:Retrying connect [1/10] in 8.79ms ...
117 # INFO:Retrying connect [2/10] in 90.1ms ...
118 # INFO:Retrying connect [3/10] in 372ms ...
119 # ok
121Background:
122-----------
123For background on exponential backoff and jitter, see for example
124https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter
125"""
127from __future__ import (
128 annotations,
129)
130import argparse
131import dataclasses
132import logging
133import random
134import threading
135import time
136from collections.abc import (
137 Awaitable,
138 Iterable,
139 Mapping,
140 Sequence,
141)
142from dataclasses import (
143 dataclass,
144)
145from typing import (
146 TYPE_CHECKING,
147 Any,
148 Callable,
149 Final,
150 Generic,
151 NamedTuple,
152 NoReturn,
153 TypeVar,
154 Union,
155 cast,
156 final,
157 overload,
158)
160if TYPE_CHECKING:
161 import asyncio
163from bzfs_main.util.utils import (
164 human_readable_duration,
165)
167# constants:
168INFINITY_MAX_RETRIES: Final[int] = 2**90 - 1 # a number that's essentially infinity for all practical retry purposes
171#############################################################################
172def full_jitter_backoff_strategy(context: BackoffContext) -> tuple[int, int]:
173 """Default implementation of ``backoff`` callback for call_with_retries(); computes delay time before next retry attempt,
174 after failure.
176 Full-jitter picks a random sleep_nanos duration from the range [min_sleep_nanos, curr_max_sleep_nanos] and applies
177 exponential backoff with cap to the next attempt; thread-safe. Typically, min_sleep_nanos is 0 and exponential_base is 2.
178 Example curr_max_sleep_nanos sequence: 125ms --> 250ms --> 500ms --> 1s --> 2s --> 4s --> 8s --> 10s --> 10s...
179 Full-jitter provides optimal balance between reducing server load and minimizing retry latency.
180 """
181 policy: RetryPolicy = context.retry.policy
182 curr_max_sleep_nanos: int = context.curr_max_sleep_nanos
183 if policy.min_sleep_nanos == curr_max_sleep_nanos:
184 sleep_nanos = curr_max_sleep_nanos # perf
185 else:
186 sleep_nanos = context.rng.randint(policy.min_sleep_nanos, curr_max_sleep_nanos) # nanos to delay until next attempt
187 curr_max_sleep_nanos = round(curr_max_sleep_nanos * policy.exponential_base) # exponential backoff
188 curr_max_sleep_nanos = min(curr_max_sleep_nanos, policy.max_sleep_nanos) # ... with cap for next attempt
189 return sleep_nanos, curr_max_sleep_nanos
192def no_giveup(outcome: AttemptOutcome) -> object | None:
193 """Default implementation of ``giveup`` callback for call_with_retries(); never gives up; returning anything other than
194 ``None`` indicates to give up retrying; thread-safe."""
195 return None # don't give up retrying
198def before_attempt_noop(retry: Retry) -> int:
199 """Default implementation of ``before_attempt`` callback for call_with_retries(); does nothing; thread-safe."""
200 return 0
203def after_attempt_log_failure(outcome: AttemptOutcome) -> None:
204 """Default implementation of ``after_attempt`` callback for call_with_retries(); performs simple logging of retry attempt
205 failures; thread-safe."""
206 retry: Retry = outcome.retry
207 policy: RetryPolicy = retry.policy
208 config: RetryConfig = policy.config
209 if outcome.is_success or retry.log is None or not config.enable_logging:
210 return
211 log: logging.Logger = retry.log
212 assert isinstance(outcome.result, RetryableError)
213 retryable_error: RetryableError = outcome.result
214 if not outcome.is_exhausted:
215 if log.isEnabledFor(config.info_loglevel): # Retrying X in Y ms ...
216 m1: str = config.format_msg(config.display_msg, retryable_error)
217 m2: str = config.format_pair(retry.count + 1, policy.max_retries)
218 m3: str = config.format_duration(outcome.sleep_nanos)
219 log.log(config.info_loglevel, "%s", f"{m1}{m2} in {m3}{config.dots}", extra=config.extra)
220 else:
221 if policy.max_retries > 0 and log.isEnabledFor(config.warning_loglevel) and not outcome.is_terminated:
222 reason: str = "" if outcome.giveup_reason is None else f"{outcome.giveup_reason}; "
223 format_duration: Callable[[int], str] = config.format_duration # lambda: nanos
224 log.log(
225 config.warning_loglevel,
226 "%s",
227 f"{config.format_msg(config.display_msg, retryable_error)}"
228 f"exhausted; giving up because {reason}the last "
229 f"{config.format_pair(retry.count, policy.max_retries)} retries across "
230 f"{config.format_pair(format_duration(outcome.elapsed_nanos), format_duration(policy.max_elapsed_nanos))} "
231 "failed",
232 exc_info=retryable_error if config.exc_info else None,
233 stack_info=config.stack_info,
234 extra=config.extra,
235 )
238def noop(outcome: AttemptOutcome) -> None:
239 """Default implementation of ``on_retryable_error`` callback for call_with_retries(); does nothing; thread-safe."""
242def on_exhaustion_raise(outcome: AttemptOutcome) -> NoReturn:
243 """Default implementation of ``on_exhaustion`` callback for call_with_retries(); always raises; thread-safe."""
244 assert outcome.is_exhausted
245 assert isinstance(outcome.result, RetryableError)
246 retryable_error: RetryableError = outcome.result
247 policy: RetryPolicy = outcome.retry.policy
248 cause: BaseException | None = retryable_error.__cause__
249 if policy.reraise and cause is not None:
250 raise cause.with_traceback(cause.__traceback__)
251 raise RetryError(outcome=outcome) from retryable_error
254#############################################################################
255_T = TypeVar("_T")
258def call_with_retries(
259 fn: Callable[[Retry], _T], # typically a lambda; wraps work and raises RetryableError for failures that shall be retried
260 policy: RetryPolicy, # specifies how ``RetryableError`` shall be retried
261 *,
262 backoff: BackoffStrategy = full_jitter_backoff_strategy, # computes delay time before next retry attempt, after failure
263 giveup: Callable[[AttemptOutcome], object | None] = no_giveup, # stop retrying based on domain-specific logic, e.g. time
264 before_attempt: Callable[[Retry], int] = before_attempt_noop, # e.g. wait due to rate limiting or internal backpressure
265 after_attempt: Callable[[AttemptOutcome], None] = after_attempt_log_failure, # e.g. record metrics and/or custom logging
266 on_retryable_error: Callable[[AttemptOutcome], None] = noop, # e.g. count failures (RetryableError) caught by retry loop
267 on_exhaustion: Callable[[AttemptOutcome], _T] = on_exhaustion_raise, # raise error or return fallback value
268 log: logging.Logger | None = None, # set this to ``None`` to disable logging
269) -> _T:
270 """Runs the function ``fn`` and returns its result; retries on failure as indicated by policy; thread-safe.
272 By default on exhaustion, call_with_retries() either re-raises the last underlying ``RetryableError.__cause__``, or raises
273 ``RetryError`` (wrapping the last ``RetryableError``), like so:
274 - if ``RetryPolicy.reraise`` is True and the last ``RetryableError.__cause__`` is not None, re-raise the last
275 ``RetryableError.__cause__`` with its original traceback.
276 - Otherwise, raise ``RetryError`` (wrapping the last ``RetryableError``, preserving its ``__cause__`` chain).
277 - The default is ``RetryPolicy.reraise=True``.
279 On the exhaustion path, ``on_exhaustion`` will be called exactly once (after the final after_attempt). The default
280 implementation raises as described above; custom ``on_exhaustion`` impls may return a fallback value instead of an error.
281 """
282 rng: random.Random | None = None
283 retry_count: int = 0
284 idle_nanos: int = 0
285 curr_max_sleep_nanos: int = policy.initial_max_sleep_nanos
286 previous_outcomes: tuple[AttemptOutcome, ...] = () # for safety pass *immutable* deque to callbacks
287 timing: RetryTiming = policy.timing
288 sleep: Callable[[int, Retry], None] = timing.sleep
289 monotonic_ns: Callable[[], int] = timing.monotonic_ns
290 call_start_nanos: Final[int] = monotonic_ns()
291 while True:
292 before_attempt_nanos: int = monotonic_ns() if retry_count != 0 else call_start_nanos
293 prev: tuple[AttemptOutcome, ...] = previous_outcomes
294 retry: Retry = Retry(
295 retry_count, call_start_nanos, before_attempt_nanos, before_attempt_nanos, idle_nanos, policy, log, prev
296 )
297 try:
298 if before_attempt is not before_attempt_noop:
299 before_attempt_sleep_nanos: int = before_attempt(retry)
300 assert before_attempt_sleep_nanos >= 0, before_attempt_sleep_nanos
301 if before_attempt_sleep_nanos > 0:
302 sleep(before_attempt_sleep_nanos, retry) # e.g. wait due to rate limiting or internal backpressure
303 attempt_start_nanos: int = monotonic_ns()
304 idle_nanos += attempt_start_nanos - before_attempt_nanos
305 retry = Retry(
306 retry_count, call_start_nanos, before_attempt_nanos, attempt_start_nanos, idle_nanos, policy, log, prev
307 )
308 timing.on_before_attempt(retry)
309 result: _T = fn(retry) # Call the target function and supply retry attempt number and other metadata
310 if after_attempt is not after_attempt_log_failure:
311 elapsed_nanos: int = monotonic_ns() - call_start_nanos
312 outcome: AttemptOutcome = AttemptOutcome(retry, True, False, False, None, elapsed_nanos, 0, result)
313 after_attempt(outcome)
314 return result
315 except RetryableError as retryable_error:
316 elapsed_nanos = monotonic_ns() - call_start_nanos
317 is_terminated: Callable[[Retry], bool] = timing.is_terminated
318 giveup_reason: object | None = None
319 sleep_nanos: int = 0
320 outcome = AttemptOutcome(retry, False, False, False, None, elapsed_nanos, sleep_nanos, retryable_error)
321 on_retryable_error(outcome) # e.g. count failures (RetryableError) caught by retry loop
322 if retry_count < policy.max_retries and elapsed_nanos < policy.max_elapsed_nanos:
323 if policy.max_sleep_nanos == 0 and backoff is full_jitter_backoff_strategy:
324 pass # perf: e.g. spin-before-block
325 elif retry_count == 0 and retryable_error.retry_immediately_once:
326 pass # retry once immediately without backoff
327 else: # jitter: default backoff strategy picks random sleep_nanos in [min_sleep_nanos, curr_max_sleep_nanos]
328 rng = _thread_local_rng() if rng is None else rng
329 sleep_nanos, curr_max_sleep_nanos = backoff( # compute delay before next retry attempt, after failure
330 BackoffContext(retry, curr_max_sleep_nanos, rng, elapsed_nanos, retryable_error)
331 )
332 assert sleep_nanos >= 0 and curr_max_sleep_nanos >= 0, sleep_nanos
334 if sleep_nanos > 0:
335 outcome = AttemptOutcome(retry, False, False, False, None, elapsed_nanos, sleep_nanos, retryable_error)
336 if (not is_terminated(retry)) and (giveup_reason := giveup(outcome)) is None:
337 after_attempt(outcome)
338 sleep(sleep_nanos, retry)
339 idle_nanos += sleep_nanos
340 if not is_terminated(retry):
341 n: int = policy.max_previous_outcomes
342 if n > 0: # outcome will be passed to next attempt via Retry.previous_outcomes
343 if previous_outcomes: # detach to reduce memory footprint
344 outcome = outcome.copy(retry=retry.copy(previous_outcomes=()))
345 previous_outcomes = previous_outcomes[len(previous_outcomes) - n + 1 :] + (outcome,) # imm deque
346 del outcome # help gc
347 retry_count += 1
348 continue # continue retry loop with next attempt
349 else:
350 sleep_nanos = 0
351 outcome = AttemptOutcome(
352 retry, False, True, is_terminated(retry), giveup_reason, elapsed_nanos, sleep_nanos, retryable_error
353 )
354 after_attempt(outcome)
355 return on_exhaustion(outcome) # raise error or return fallback value
358async def call_with_retries_async(
359 fn: Callable[[Retry], Awaitable[_T]], # wraps work and raises RetryableError for failures that shall be retried
360 policy: RetryPolicy, # specifies how ``RetryableError`` shall be retried
361 *,
362 backoff: BackoffStrategy = full_jitter_backoff_strategy, # computes delay time before next retry attempt, after failure
363 giveup: Callable[[AttemptOutcome], object | None] = no_giveup, # stop retrying based on domain-specific logic, e.g. time
364 before_attempt: Callable[[Retry], int] = before_attempt_noop, # e.g. wait due to rate limiting or internal backpressure
365 after_attempt: Callable[[AttemptOutcome], None] = after_attempt_log_failure, # e.g. record metrics and/or custom logging
366 on_retryable_error: Callable[[AttemptOutcome], None] = noop, # e.g. count failures (RetryableError) caught by retry loop
367 on_exhaustion: Callable[[AttemptOutcome], _T] = on_exhaustion_raise, # raise error or return fallback value
368 log: logging.Logger | None = None, # set this to ``None`` to disable logging
369) -> _T:
370 """Async version of call_with_retries() with the same semantics except it awaits ``fn`` and uses non-blocking sleep."""
371 rng: random.Random | None = None
372 retry_count: int = 0
373 idle_nanos: int = 0
374 curr_max_sleep_nanos: int = policy.initial_max_sleep_nanos
375 previous_outcomes: tuple[AttemptOutcome, ...] = () # for safety pass *immutable* deque to callbacks
376 timing: RetryTiming = policy.timing
377 sleep: Callable[[int, Retry], Awaitable[None]] = timing.sleep_async
378 monotonic_ns: Callable[[], int] = timing.monotonic_ns
379 call_start_nanos: Final[int] = monotonic_ns()
380 while True:
381 before_attempt_nanos: int = monotonic_ns() if retry_count != 0 else call_start_nanos
382 prev: tuple[AttemptOutcome, ...] = previous_outcomes
383 retry: Retry = Retry(
384 retry_count, call_start_nanos, before_attempt_nanos, before_attempt_nanos, idle_nanos, policy, log, prev
385 )
386 try:
387 if before_attempt is not before_attempt_noop:
388 before_attempt_sleep_nanos: int = before_attempt(retry)
389 assert before_attempt_sleep_nanos >= 0, before_attempt_sleep_nanos
390 if before_attempt_sleep_nanos > 0:
391 await sleep(before_attempt_sleep_nanos, retry) # e.g. wait due to rate limiting or internal backpressure
392 attempt_start_nanos: int = monotonic_ns()
393 idle_nanos += attempt_start_nanos - before_attempt_nanos
394 retry = Retry(
395 retry_count, call_start_nanos, before_attempt_nanos, attempt_start_nanos, idle_nanos, policy, log, prev
396 )
397 timing.on_before_attempt(retry)
398 result: _T = await fn(retry) # Call the target function and supply retry attempt number and other metadata
399 if after_attempt is not after_attempt_log_failure:
400 elapsed_nanos: int = monotonic_ns() - call_start_nanos
401 outcome: AttemptOutcome = AttemptOutcome(retry, True, False, False, None, elapsed_nanos, 0, result)
402 after_attempt(outcome)
403 return result
404 except RetryableError as retryable_error:
405 elapsed_nanos = monotonic_ns() - call_start_nanos
406 is_terminated: Callable[[Retry], bool] = timing.is_terminated
407 giveup_reason: object | None = None
408 sleep_nanos: int = 0
409 outcome = AttemptOutcome(retry, False, False, False, None, elapsed_nanos, sleep_nanos, retryable_error)
410 on_retryable_error(outcome) # e.g. count failures (RetryableError) caught by retry loop
411 if retry_count < policy.max_retries and elapsed_nanos < policy.max_elapsed_nanos:
412 if policy.max_sleep_nanos == 0 and backoff is full_jitter_backoff_strategy:
413 pass # perf: e.g. spin-before-block
414 elif retry_count == 0 and retryable_error.retry_immediately_once:
415 pass # retry once immediately without backoff
416 else: # jitter: default backoff strategy picks random sleep_nanos in [min_sleep_nanos, curr_max_sleep_nanos]
417 rng = _thread_local_rng() if rng is None else rng
418 sleep_nanos, curr_max_sleep_nanos = backoff( # compute delay before next retry attempt, after failure
419 BackoffContext(retry, curr_max_sleep_nanos, rng, elapsed_nanos, retryable_error)
420 )
421 assert sleep_nanos >= 0 and curr_max_sleep_nanos >= 0, sleep_nanos
423 if sleep_nanos > 0:
424 outcome = AttemptOutcome(retry, False, False, False, None, elapsed_nanos, sleep_nanos, retryable_error)
425 if (not is_terminated(retry)) and (giveup_reason := giveup(outcome)) is None:
426 after_attempt(outcome)
427 await sleep(sleep_nanos, retry)
428 idle_nanos += sleep_nanos
429 if not is_terminated(retry):
430 n: int = policy.max_previous_outcomes
431 if n > 0: # outcome will be passed to next attempt via Retry.previous_outcomes
432 if previous_outcomes: # detach to reduce memory footprint
433 outcome = outcome.copy(retry=retry.copy(previous_outcomes=()))
434 previous_outcomes = previous_outcomes[len(previous_outcomes) - n + 1 :] + (outcome,) # imm deque
435 del outcome # help gc
436 retry_count += 1
437 continue # continue retry loop with next attempt
438 else:
439 sleep_nanos = 0
440 outcome = AttemptOutcome(
441 retry, False, True, is_terminated(retry), giveup_reason, elapsed_nanos, sleep_nanos, retryable_error
442 )
443 after_attempt(outcome)
444 return on_exhaustion(outcome) # raise error or return fallback value
447def multi_after_attempt(handlers: Iterable[Callable[[AttemptOutcome], None]]) -> Callable[[AttemptOutcome], None]:
448 """Composes independent ``after_attempt`` handlers into one ``call_with_retries(after_attempt=...)`` callback that
449 invokes each handler in order; thread-safe."""
450 handlers = tuple(handlers)
451 if len(handlers) == 1:
452 return handlers[0] # perf
454 def _after_attempt(outcome: AttemptOutcome) -> None:
455 for handler in handlers:
456 handler(outcome)
458 return _after_attempt
461def any_giveup(handlers: Iterable[Callable[[AttemptOutcome], object | None]]) -> Callable[[AttemptOutcome], object | None]:
462 """Composes independent ``giveup`` handlers into one ``call_with_retries(giveup=...)`` callback that gives up retrying if
463 *any* handler gives up; that is if any handler returns a non-``None`` reason; thread-safe.
465 Handlers are evaluated in order and short-circuit: On giving up returns the first handler's reason for giving up.
466 """
467 handlers = tuple(handlers)
468 if len(handlers) == 1:
469 return handlers[0] # perf
471 def _giveup(outcome: AttemptOutcome) -> object | None:
472 for handler in handlers:
473 giveup_reason: object | None = handler(outcome)
474 if giveup_reason is not None:
475 return giveup_reason
476 return None # don't give up retrying
478 return _giveup
481def all_giveup(handlers: Iterable[Callable[[AttemptOutcome], object | None]]) -> Callable[[AttemptOutcome], object | None]:
482 """Composes independent ``giveup`` handlers into one ``call_with_retries(giveup=...)`` callback that gives up retrying if
483 *all* handlers give up; that is if all handlers return a non-``None`` reason; thread-safe.
485 Handlers are evaluated in order and short-circuit: stops at first ``None``; else returns the last non-``None`` reason.
486 """
487 handlers = tuple(handlers)
488 if len(handlers) == 1:
489 return handlers[0] # perf
491 def _giveup(outcome: AttemptOutcome) -> object | None:
492 giveup_reason: object | None = None
493 for handler in handlers:
494 giveup_reason = handler(outcome)
495 if giveup_reason is None:
496 return None # don't give up retrying
497 return giveup_reason
499 return _giveup
502#############################################################################
503class RetryableError(Exception):
504 """Indicates that the task that caused the underlying exception can be retried and might eventually succeed;
505 ``call_with_retries()`` will pass this exception to callbacks via ``AttemptOutcome.result``; can be subclassed."""
507 def __init__(
508 self,
509 *exc_args: object, # optional args passed into super().__init__()
510 display_msg: object = None, # for logging
511 retry_immediately_once: bool = False, # retry once immediately without backoff?
512 category: object = None, # optional classification e.g. "CONCURRENCY, "SERVER_ISSUE", "THROTTLING", "TRANSIENT", ...
513 attachment: object = None, # optional domain specific info passed to next attempt via Retry.previous_outcomes if
514 # RetryPolicy.max_previous_outcomes > 0. This helps when retrying is not just 'try again later', but
515 # 'try again differently based on what just happened'.
516 # Examples: switching network endpoints, adjusting per-attempt timeouts, capping retries by error-class, resuming
517 # with a token/offset, maintaining failure history for this invocation of call_with_retries().
518 # Example: 'cap retries to 3 for ECONNREFUSED but 12 for ETIMEDOUT' via attachment=collections.Counter
519 ) -> None:
520 super().__init__(*exc_args)
521 self.display_msg: object = display_msg
522 self.retry_immediately_once: bool = retry_immediately_once
523 self.category: object = category
524 self.attachment: object = attachment
526 def display_msg_str(self) -> str:
527 """Returns the display_msg as a str; for logging."""
528 return "" if self.display_msg is None else str(self.display_msg)
531#############################################################################
532@final
533class RetryError(Exception):
534 """Indicates that retries have been exhausted; the last RetryableError is in RetryError.__cause__."""
536 outcome: Final[AttemptOutcome]
537 """Metadata that describes why and how call_with_retries() gave up."""
539 def __init__(self, outcome: AttemptOutcome) -> None:
540 super().__init__(outcome)
541 self.outcome = outcome
544#############################################################################
545@final
546class Retry(NamedTuple):
547 """Attempt metadata provided to callback functions; includes the current retry attempt number; immutable."""
549 count: int # type: ignore[assignment]
550 """Attempt number; count=0 is the first attempt, count=1 is the second attempt aka first retry."""
552 call_start_time_nanos: int
553 """Value of time.monotonic_ns() at start of call_with_retries() invocation."""
555 before_attempt_start_time_nanos: int
556 """Value of time.monotonic_ns() at start of before_attempt() invocation."""
558 attempt_start_time_nanos: int
559 """Value of time.monotonic_ns() at start of fn() invocation."""
561 idle_nanos: int
562 """Sum of all before_attempt_sleep_nanos() plus AttemptOutcome.sleep_nanos across this call_with_retries() invocation, at
563 the start of fn() invocation."""
565 policy: RetryPolicy
566 """Policy that was passed into call_with_retries()."""
568 log: logging.Logger | None
569 """Logger that was passed into call_with_retries()."""
571 previous_outcomes: Sequence[AttemptOutcome]
572 """History/state of the N=max_previous_outcomes most recent outcomes for the current call_with_retries() invocation."""
574 def copy(self, **override_kwargs: Any) -> Retry:
575 """Creates a new object copying an existing one with the specified fields overridden for customization."""
576 return self._replace(**override_kwargs)
578 def before_attempt_sleep_nanos(self) -> int:
579 """Returns duration between the start of before_attempt() and the start of fn() attempt."""
580 return self.attempt_start_time_nanos - self.before_attempt_start_time_nanos
582 def __repr__(self) -> str:
583 return (
584 f"{type(self).__name__}(count={self.count!r}, call_start_time_nanos={self.call_start_time_nanos!r}, "
585 f"before_attempt_start_time_nanos={self.before_attempt_start_time_nanos!r}, "
586 f"attempt_start_time_nanos={self.attempt_start_time_nanos!r}, idle_nanos={self.idle_nanos!r})"
587 )
589 def __eq__(self, other: object) -> bool:
590 return self is other
592 def __hash__(self) -> int:
593 return object.__hash__(self)
596#############################################################################
597@final
598class AttemptOutcome(NamedTuple):
599 """Captures per-attempt state for ``after_attempt`` callbacks; immutable."""
601 retry: Retry
602 """Attempt metadata passed into fn(retry)."""
604 is_success: bool
605 """False if fn(retry) raised a RetryableError; True otherwise."""
607 is_exhausted: bool
608 """True if the loop is giving up retrying (possibly even due to is_terminated); False otherwise."""
610 is_terminated: bool
611 """True if the termination predicate has become true; False otherwise."""
613 giveup_reason: object | None
614 """Reason returned by giveup(); None means giveup() was not called or decided to not give up."""
616 elapsed_nanos: int
617 """Total duration between the start of call_with_retries() invocation and the end of this fn() attempt."""
619 sleep_nanos: int
620 """Duration of current sleep period."""
622 result: RetryableError | object
623 """Result of fn(retry); a RetryableError on retryable failure, or some other object on success."""
625 def attempt_elapsed_nanos(self) -> int:
626 """Returns duration between the start of this fn() attempt and the end of this fn() attempt."""
627 return self.elapsed_nanos + self.retry.call_start_time_nanos - self.retry.attempt_start_time_nanos
629 def copy(self, **override_kwargs: Any) -> AttemptOutcome:
630 """Creates a new outcome copying an existing one with the specified fields overridden for customization."""
631 return self._replace(**override_kwargs)
633 def __repr__(self) -> str:
634 return (
635 f"{type(self).__name__}("
636 f"retry={self.retry!r}, "
637 f"is_success={self.is_success!r}, "
638 f"is_exhausted={self.is_exhausted!r}, "
639 f"is_terminated={self.is_terminated!r}, "
640 f"giveup_reason={self.giveup_reason!r}, "
641 f"elapsed_nanos={self.elapsed_nanos!r}, "
642 f"sleep_nanos={self.sleep_nanos!r})"
643 )
645 def __eq__(self, other: object) -> bool:
646 return self is other
648 def __hash__(self) -> int:
649 return object.__hash__(self)
652#############################################################################
653@final
654class BackoffContext(NamedTuple):
655 """Captures per-backoff state for ``backoff`` callbacks."""
657 retry: Retry
658 """Attempt metadata passed into fn(retry)."""
660 curr_max_sleep_nanos: int
661 """Current maximum duration (in nanoseconds) to sleep before the next retry attempt;
662 Typically: ``RetryPolicy.initial_max_sleep_nanos <= curr_max_sleep_nanos <= RetryPolicy.max_sleep_nanos``."""
664 rng: random.Random
665 """Thread-local random number generator instance."""
667 elapsed_nanos: int
668 """Total duration between the start of call_with_retries() invocation and the end of this fn() attempt."""
670 retryable_error: RetryableError
671 """Result of failed fn(retry) attempt."""
673 def copy(self, **override_kwargs: Any) -> BackoffContext:
674 """Creates a new object copying an existing one with the specified fields overridden for customization."""
675 return self._replace(**override_kwargs)
677 def __repr__(self) -> str:
678 return (
679 f"{type(self).__name__}("
680 f"retry={self.retry!r}, "
681 f"curr_max_sleep_nanos={self.curr_max_sleep_nanos!r}, "
682 f"elapsed_nanos={self.elapsed_nanos!r})"
683 )
685 def __eq__(self, other: object) -> bool:
686 return self is other
688 def __hash__(self) -> int:
689 return object.__hash__(self)
692BackoffStrategy = Callable[[BackoffContext], tuple[int, int]] # typealias; returns sleep_nanos:int, curr_max_sleep_nanos:int
693"""Strategy that implements a backoff algorithm that reduces server load while minimizing retry latency; default is full
694jitter; various other example backoff strategies such as decorrelated-jitter or retry-after HTTP 429 "Too Many Requests"
695responses, etc can be found in test_retry_examples.py."""
698#############################################################################
699def _default_timing_is_terminated(retry: Retry) -> bool:
700 return False
703def _default_timing_sleep(sleep_nanos: int, retry: Retry) -> None:
704 time.sleep(sleep_nanos / 1_000_000_000)
707async def _default_timing_sleep_asyncio(sleep_nanos: int, retry: Retry) -> None:
708 import asyncio
710 await asyncio.sleep(sleep_nanos / 1_000_000_000)
713def _default_timing_on_before_attempt(retry: Retry) -> None:
714 if retry.policy.timing.is_terminated(retry):
715 raise RetryableError(display_msg="terminated before attempt") from RetryTerminationError()
718@final
719class RetryTerminationError(InterruptedError):
720 """Termination signal raised when retry loop exits before starting the next attempt."""
723@dataclass(frozen=True)
724@final
725class RetryTiming:
726 """Customizable callbacks for reading the current monotonic time, sleeping and optional async termination; immutable."""
728 monotonic_ns: Callable[[], int] = time.monotonic_ns
729 """Returns the system's current monotonic time in nanoseconds."""
731 is_terminated: Callable[[Retry], bool] = _default_timing_is_terminated
732 """Returns whether a predicate has become true; if so causes the retry loop to exit early between attempts; can be used
733 to indicate system shutdown or similar cancellation conditions; default is to always return ``False``; this function
734 should complete quickly without any blocking or sleeping."""
736 sleep: Callable[[int, Retry], None] = _default_timing_sleep
737 """Sleeps N nanoseconds between attempts; override to inject custom sleeping or for early wake-ups; thread-safe."""
739 sleep_async: Callable[[int, Retry], Awaitable[None]] = _default_timing_sleep_asyncio
740 """Sleeps N nanoseconds between attempts; override to inject custom sleeping or for early wake-ups; thread-safe."""
742 on_before_attempt: Callable[[Retry], None] = _default_timing_on_before_attempt
743 """Typically (but not necessarily) raises an error if ``is_terminated()`` is True; otherwise fn() will still run; this
744 function should complete quickly without any blocking or sleeping.
746 To disable this behavior: RetryTiming.make_from(...).copy(on_before_attempt=lambda retry: None).
747 """
749 def copy(self, **override_kwargs: Any) -> RetryTiming:
750 """Creates a new object copying an existing one with the specified fields overridden for customization; thread-
751 safe."""
752 return dataclasses.replace(self, **override_kwargs)
754 @staticmethod
755 def make_from(termination_event: threading.Event | None) -> RetryTiming:
756 """Convenience factory that creates a RetryTiming that performs async termination when termination_event is set."""
757 if termination_event is None:
758 return RetryTiming()
760 def _is_terminated(retry: Retry) -> bool:
761 return termination_event.is_set()
763 def _sleep(sleep_nanos: int, retry: Retry) -> None:
764 termination_event.wait(sleep_nanos / 1_000_000_000) # allow early wakeup on async termination
766 return RetryTiming(is_terminated=_is_terminated, sleep=_sleep)
768 @staticmethod
769 def make_from_asyncio(termination_event: asyncio.Event | None) -> RetryTiming:
770 """Convenience factory that creates a RetryTiming that performs async termination when termination_event is set;
771 Write an analog version of this function if you wish to replace asyncio with Trio or AnyIO or similar."""
772 if termination_event is None:
773 return RetryTiming()
775 def _is_terminated(retry: Retry) -> bool:
776 return termination_event.is_set()
778 async def _sleep_async(sleep_nanos: int, retry: Retry) -> None:
779 import asyncio
781 if sleep_nanos <= 0:
782 await asyncio.sleep(0) # perf: cooperative yield with less overhead than asyncio.wait_for(..., 0)
783 return
784 try:
785 await asyncio.wait_for(termination_event.wait(), timeout=sleep_nanos / 1_000_000_000)
786 except asyncio.TimeoutError:
787 pass # expected
789 return RetryTiming(is_terminated=_is_terminated, sleep_async=_sleep_async)
792#############################################################################
793def _format_msg(display_msg: str, retryable_error: RetryableError) -> str:
794 """Default implementation of ``format_msg`` callback for RetryConfig; creates simple log message; thread-safe."""
795 msg = display_msg + " " if display_msg else ""
796 errmsg: str = retryable_error.display_msg_str()
797 msg = msg + errmsg + " " if errmsg else msg
798 msg = msg if msg else "Retrying "
799 return msg
802def _format_pair(first: object, second: object) -> str:
803 """Default implementation of ``format_pair`` callback for RetryConfig; creates simple log message part; thread-safe."""
804 second = "∞" if INFINITY_MAX_RETRIES == second else second # noqa: SIM300
805 return f"[{first}/{second}]"
808@dataclass(frozen=True)
809@final
810class RetryConfig:
811 """Configures logging for call_with_retries(); all defaults work out of the box; immutable."""
813 display_msg: str = "Retrying" # message prefix for retry log messages
814 dots: str = " ..." # suffix appended to retry log messages
815 format_msg: Callable[[str, RetryableError], str] = _format_msg # lambda: display_msg, retryable_error
816 format_pair: Callable[[object, object], str] = _format_pair # lambda: first, second
817 format_duration: Callable[[int], str] = human_readable_duration # lambda: nanos
818 info_loglevel: int = logging.INFO # loglevel used when not giving up
819 warning_loglevel: int = logging.WARNING # loglevel used when giving up
820 enable_logging: bool = True # set to False to disable logging
821 exc_info: bool = False # passed into Logger.log()
822 stack_info: bool = False # passed into Logger.log()
823 extra: Mapping[str, object] | None = dataclasses.field(default=None, repr=False, compare=False) # passed to Logger.log()
824 context: object = dataclasses.field(default=None, repr=False, compare=False) # optional domain specific info
826 def copy(self, **override_kwargs: Any) -> RetryConfig:
827 """Creates a new config copying an existing one with the specified fields overridden for customization."""
828 return dataclasses.replace(self, **override_kwargs)
831#############################################################################
832@dataclass(frozen=True)
833@final
834class RetryPolicy:
835 """Configuration of maximum retries, sleep bounds, elapsed-time budget, logging, etc for call_with_retries(); immutable.
837 By default uses full jitter which works as follows: The maximum duration to sleep between attempts initially starts with
838 ``initial_max_sleep_secs`` and doubles on each retry, up to the final maximum of ``max_sleep_secs``.
839 Example: 125ms --> 250ms --> 500ms --> 1s --> 2s --> 4s --> 8s --> 10s --> 10s...
840 On each retry a random sleep duration in the range ``[min_sleep_secs, current max]`` is picked.
841 In a nutshell: ``0 <= min_sleep_secs <= initial_max_sleep_secs <= max_sleep_secs``. Typically, min_sleep_secs=0.
842 """
844 max_retries: int = INFINITY_MAX_RETRIES
845 """The maximum number of times ``fn`` will be invoked additionally after the first attempt invocation; must be >= 0."""
847 min_sleep_secs: float = 0
848 """The minimum duration to sleep between any two attempts."""
850 initial_max_sleep_secs: float = 0.125
851 """The initial maximum duration to sleep between any two attempts."""
853 max_sleep_secs: float = 10
854 """The final max duration to sleep between any two attempts; 0 <= min_sleep_secs <= initial_max_sleep_secs <=
855 max_sleep_secs."""
857 max_elapsed_secs: float = 47
858 """``fn`` will not be retried (or not retried anymore) once this much time has elapsed since the initial start of
859 call_with_retries(); set this to 365 * 86400 seconds or similar to effectively disable the time limit."""
861 exponential_base: float = 2
862 """Growth factor (aka multiplier) for backoff algorithm to calculate sleep duration; must be >= 1."""
864 max_elapsed_nanos: int = dataclasses.field(init=False, repr=False) # derived value
865 min_sleep_nanos: int = dataclasses.field(init=False, repr=False) # derived value
866 initial_max_sleep_nanos: int = dataclasses.field(init=False, repr=False) # derived value
867 max_sleep_nanos: int = dataclasses.field(init=False, repr=False) # derived value
869 reraise: bool = True
870 """On exhaustion, the default (``True``) is to re-raise the underlying exception when present."""
872 max_previous_outcomes: int = 0
873 """Pass the N=max_previous_outcomes most recent AttemptOutcome objects to callbacks via Retry.previous_outcomes."""
875 config: RetryConfig = dataclasses.field(default=RetryConfig(), repr=False, compare=False)
876 """Configures logging behavior."""
878 timing: RetryTiming = dataclasses.field(default=RetryTiming(), repr=False)
879 """Customizable callbacks for reading the current monotonic time, sleeping and optional async termination."""
881 context: object = dataclasses.field(default=None, repr=False, compare=False)
882 """Optional domain specific info."""
884 @classmethod
885 def from_namespace(cls, args: argparse.Namespace) -> RetryPolicy:
886 """Factory that reads the policy from argparse.ArgumentParser via args."""
887 return cls(
888 max_retries=getattr(args, "max_retries", INFINITY_MAX_RETRIES),
889 min_sleep_secs=getattr(args, "retry_min_sleep_secs", 0),
890 initial_max_sleep_secs=getattr(args, "retry_initial_max_sleep_secs", 0.125),
891 max_sleep_secs=getattr(args, "retry_max_sleep_secs", 10),
892 max_elapsed_secs=getattr(args, "retry_max_elapsed_secs", 47),
893 exponential_base=getattr(args, "retry_exponential_base", 2),
894 reraise=getattr(args, "retry_reraise", True),
895 max_previous_outcomes=getattr(args, "retry_max_previous_outcomes", 0),
896 config=getattr(args, "retry_config", RetryConfig()),
897 timing=getattr(args, "retry_timing", RetryTiming()),
898 context=getattr(args, "retry_context", None),
899 )
901 @classmethod
902 def no_retries(cls) -> RetryPolicy:
903 """Returns a policy that never retries."""
904 return cls(
905 max_retries=0,
906 min_sleep_secs=0,
907 initial_max_sleep_secs=0,
908 max_sleep_secs=0,
909 max_elapsed_secs=0,
910 )
912 def __post_init__(self) -> None: # validate and compute derived values
913 self._validate_min("max_retries", self.max_retries, 0)
914 self._validate_min("exponential_base", self.exponential_base, 1)
915 self._validate_min("min_sleep_secs", self.min_sleep_secs, 0)
916 self._validate_min("initial_max_sleep_secs", self.initial_max_sleep_secs, 0)
917 self._validate_min("max_sleep_secs", self.max_sleep_secs, 0)
918 self._validate_min("max_elapsed_secs", self.max_elapsed_secs, 0)
919 object.__setattr__(self, "max_elapsed_nanos", int(self.max_elapsed_secs * 1_000_000_000)) # derived value
920 min_sleep_nanos: int = int(self.min_sleep_secs * 1_000_000_000)
921 initial_max_sleep_nanos: int = int(self.initial_max_sleep_secs * 1_000_000_000)
922 max_sleep_nanos: int = int(self.max_sleep_secs * 1_000_000_000)
923 max_sleep_nanos = max(min_sleep_nanos, max_sleep_nanos)
924 initial_max_sleep_nanos = min(max_sleep_nanos, max(min_sleep_nanos, initial_max_sleep_nanos))
925 object.__setattr__(self, "min_sleep_nanos", min_sleep_nanos) # derived value
926 object.__setattr__(self, "initial_max_sleep_nanos", initial_max_sleep_nanos) # derived value
927 object.__setattr__(self, "max_sleep_nanos", max_sleep_nanos) # derived value
928 self._validate_min("max_previous_outcomes", self.max_previous_outcomes, 0)
929 assert 0 <= self.min_sleep_nanos <= self.initial_max_sleep_nanos <= self.max_sleep_nanos
930 if not isinstance(self.reraise, bool):
931 raise TypeError(f"{type(self).__name__}.reraise must be bool")
933 def _validate_min(self, attr_name: str, value: float, minimum: float) -> None:
934 if value < minimum:
935 raise ValueError(f"Invalid {type(self).__name__}.{attr_name}: must be >= {minimum} but got {value}")
937 def copy(self, **override_kwargs: Any) -> RetryPolicy:
938 """Creates a new policy copying an existing one with the specified fields overridden for customization; thread-safe.
940 Example usage: policy = retry_policy.copy(max_sleep_secs=2, max_elapsed_secs=10)
941 """
942 return dataclasses.replace(self, **override_kwargs)
945#############################################################################
946def _fn_not_implemented(_retry: Retry) -> NoReturn:
947 """Default implementation of ``fn`` callback for RetryTemplate; always raises."""
948 raise NotImplementedError("Provide fn when calling RetryTemplate")
951NO_LOGGER: Final[logging.Logger] = logging.Logger("NULL") # noqa: LOG001 do not register dummy logger with Logger.manager
952NO_LOGGER.addHandler(logging.NullHandler()) # prevents lastResort fallback
953NO_LOGGER.disabled = True
954NO_LOGGER.propagate = False
955_R = TypeVar("_R")
958@dataclass(frozen=True)
959@final
960class RetryTemplate(Generic[_T]):
961 """Convenience class that aggregates all knobs for call_with_retries(); and is itself callable too; immutable."""
963 fn: Callable[[Retry], _T] = _fn_not_implemented # set this to make the RetryTemplate object itself callable
964 policy: RetryPolicy = RetryPolicy() # specifies how ``RetryableError`` shall be retried
965 backoff: BackoffStrategy = full_jitter_backoff_strategy # computes delay time before next retry attempt, after failure
966 giveup: Callable[[AttemptOutcome], object | None] = no_giveup # stop retrying based on domain-specific logic, e.g. time
967 before_attempt: Callable[[Retry], int] = before_attempt_noop # e.g. wait due to rate limiting or internal backpressure
968 after_attempt: Callable[[AttemptOutcome], None] = after_attempt_log_failure # e.g. record metrics and/or custom logging
969 on_retryable_error: Callable[[AttemptOutcome], None] = noop # e.g. count failures (RetryableError) caught by retry loop
970 on_exhaustion: Callable[[AttemptOutcome], _T] = on_exhaustion_raise # raise error or return fallback value
971 log: logging.Logger | None = None # set this to ``None`` to disable logging
973 def copy(self, **override_kwargs: Any) -> RetryTemplate[_T]:
974 """Creates a new object copying an existing one with the specified fields overridden for customization; thread-safe.
976 Example usage: retry_template.copy(policy=policy.copy(max_sleep_secs=2, max_elapsed_secs=10), log=None)
977 """
978 return dataclasses.replace(self, **override_kwargs)
980 def __call__(self) -> _T:
981 """Invokes ``self.fn`` via the call_with_retries() retry loop using the stored parameters; thread-safe.
983 Example Usage: result: str = retry_template.copy(fn=...)()
984 """
985 return call_with_retries(
986 fn=self.fn,
987 policy=self.policy,
988 backoff=self.backoff,
989 giveup=self.giveup,
990 before_attempt=self.before_attempt,
991 after_attempt=self.after_attempt,
992 on_retryable_error=self.on_retryable_error,
993 on_exhaustion=self.on_exhaustion,
994 log=self.log,
995 )
997 def call_with_retries(
998 self,
999 fn: Callable[[Retry], _R],
1000 policy: RetryPolicy | None = None,
1001 *,
1002 backoff: BackoffStrategy | None = None,
1003 giveup: Callable[[AttemptOutcome], object | None] | None = None,
1004 before_attempt: Callable[[Retry], int] | None = None,
1005 after_attempt: Callable[[AttemptOutcome], None] | None = None,
1006 on_retryable_error: Callable[[AttemptOutcome], None] | None = None,
1007 on_exhaustion: Callable[[AttemptOutcome], _R] | None = None,
1008 log: logging.Logger | None = None, # pass NO_LOGGER to override template logger and disable logging for this call
1009 ) -> _R:
1010 """Invokes ``fn`` via the call_with_retries() retry loop using the stored or overridden params; thread-safe.
1012 Example Usage: result: str = retry_template.call_with_retries(fn=...)
1013 """
1014 return call_with_retries(
1015 fn=fn,
1016 policy=self.policy if policy is None else policy,
1017 backoff=self.backoff if backoff is None else backoff,
1018 giveup=self.giveup if giveup is None else giveup,
1019 before_attempt=self.before_attempt if before_attempt is None else before_attempt,
1020 after_attempt=self.after_attempt if after_attempt is None else after_attempt,
1021 on_retryable_error=self.on_retryable_error if on_retryable_error is None else on_retryable_error,
1022 on_exhaustion=(
1023 cast(Callable[[AttemptOutcome], _R], self.on_exhaustion) if on_exhaustion is None else on_exhaustion
1024 ),
1025 log=None if log is NO_LOGGER else self.log if log is None else log,
1026 )
1028 async def call_with_retries_async(
1029 self,
1030 fn: Callable[[Retry], Awaitable[_R]],
1031 policy: RetryPolicy | None = None,
1032 *,
1033 backoff: BackoffStrategy | None = None,
1034 giveup: Callable[[AttemptOutcome], object | None] | None = None,
1035 before_attempt: Callable[[Retry], int] | None = None,
1036 after_attempt: Callable[[AttemptOutcome], None] | None = None,
1037 on_retryable_error: Callable[[AttemptOutcome], None] | None = None,
1038 on_exhaustion: Callable[[AttemptOutcome], _R] | None = None,
1039 log: logging.Logger | None = None, # pass NO_LOGGER to override template logger and disable logging for this call
1040 ) -> _R:
1041 """Invokes ``fn`` via the call_with_retries_async() retry loop using the stored or overridden params; thread-safe.
1043 Example Usage: result: str = await retry_template.call_with_retries_async(fn=...)
1044 """
1045 return await call_with_retries_async(
1046 fn=fn,
1047 policy=self.policy if policy is None else policy,
1048 backoff=self.backoff if backoff is None else backoff,
1049 giveup=self.giveup if giveup is None else giveup,
1050 before_attempt=self.before_attempt if before_attempt is None else before_attempt,
1051 after_attempt=self.after_attempt if after_attempt is None else after_attempt,
1052 on_retryable_error=self.on_retryable_error if on_retryable_error is None else on_retryable_error,
1053 on_exhaustion=(
1054 cast(Callable[[AttemptOutcome], _R], self.on_exhaustion) if on_exhaustion is None else on_exhaustion
1055 ),
1056 log=None if log is NO_LOGGER else self.log if log is None else log,
1057 )
1059 @overload
1060 def wraps(self, fn: Callable[..., Awaitable[_R]]) -> Callable[..., Awaitable[_R]]: ...
1062 @overload
1063 def wraps(self, fn: Callable[..., _R]) -> Callable[..., _R]: ...
1065 def wraps(self, fn: Callable[..., Any]) -> Callable[..., Any]:
1066 """Returns a wrapper function that forwards all arguments to ``fn`` and retries it using this template; thread-safe.
1068 Example Usage:
1069 def fn(x: int) -> int:
1070 return x * 2
1071 func: Callable[[int], int] = retry_template.wraps(fn)
1072 y: int = func(5) # returns 10
1073 """
1074 import functools
1075 import inspect
1077 target_fn: Callable[..., Any] = fn.func if isinstance(fn, functools.partial) else fn
1078 if inspect.iscoroutinefunction(target_fn) or inspect.iscoroutinefunction(type(target_fn).__call__):
1080 @functools.wraps(fn)
1081 async def wrapped_async(*args: Any, **kwargs: Any) -> Any:
1082 return await self.call_with_retries_async(fn=lambda _retry: fn(*args, **kwargs))
1084 return wrapped_async
1085 else:
1087 @functools.wraps(fn)
1088 def wrapped(*args: Any, **kwargs: Any) -> Any:
1089 return self.call_with_retries(fn=lambda _retry: fn(*args, **kwargs))
1091 return wrapped
1094#############################################################################
1095def raise_retryable_error_from(
1096 exc: BaseException,
1097 *,
1098 display_msg: object = None,
1099 retry_immediately_once: bool = False,
1100 category: object = None,
1101 attachment: object = None,
1102) -> NoReturn:
1103 """Convenience function that raises a generic RetryableError that wraps the given underlying exception."""
1104 raise RetryableError(
1105 display_msg=type(exc).__name__ if display_msg is None else display_msg,
1106 retry_immediately_once=retry_immediately_once,
1107 category=category,
1108 attachment=attachment,
1109 ) from exc
1112ExceptionPredicate = Union[bool, Callable[[BaseException], bool]] # Type alias
1115def call_with_exception_handlers(
1116 fn: Callable[[], _T], # typically a lambda
1117 *,
1118 continue_scanning_if_no_predicate_matches: bool = False,
1119 handlers: Mapping[type[BaseException], Sequence[tuple[ExceptionPredicate, Callable[[BaseException], _T]]]],
1120) -> _T:
1121 """Convenience function that calls ``fn`` and returns its result; on exception runs the first matching handler in a per-
1122 exception handler chain; composes independent handlers via predicates into one function, in Event-Predicate-Action style.
1124 Lookup uses the exception type's Method Resolution Order (most-specific class in the exception class hierarchy wins). For
1125 the first class that exists as a key in ``handlers``, its chain is scanned in order. Each chain element is
1126 ``(predicate, handler)`` where ``predicate`` is either ``True`` (always matches), ``False`` (disabled), or
1127 ``predicate(exc) -> bool``. The first matching handler is called with the exception and its return value is returned. If
1128 no predicate matches then, by default, the original exception is re-raised and no less-specific handler chains are
1129 consulted. Set ``continue_scanning_if_no_predicate_matches=True`` to continue scanning exception base classes instead.
1131 Typically (but not necessarily) the handler raises a ``RetryableError``, via ``raise_retryable_error_from`` or similar.
1132 Or it may raise another exception type (which will not be retried), or even return a fallback value instead of raising.
1134 Example: turn transient ssh/zfs command failures into RetryableError for call_with_retries(), including feature flags:
1136 def run_remote(retry: Retry) -> str:
1137 p = subprocess.run(["ssh", "foo.example.com", "zfs", "list", "-H"], text=True, capture_output=True, check=True)
1138 return p.stdout
1140 def fn(retry: Retry) -> str:
1141 return call_with_exception_handlers(
1142 fn=lambda: run_remote(retry),
1143 handlers={
1144 TimeoutError: [(True, raise_retryable_error_from)],
1145 ConnectionResetError: [(True, lambda exc: raise_retryable_error_from(exc, display_msg="ssh reset"))],
1146 subprocess.CalledProcessError: [
1147 (lambda exc: exc.returncode == 255, lambda exc: raise_retryable_error_from(exc, display_msg="ssh error")),
1148 (lambda exc: "cannot receive" in (exc.stderr or ""), lambda exc: raise_retryable_error_from(exc, display_msg="zfs recv")),
1149 ],
1150 OSError: [
1151 (lambda exc: getattr(exc, "errno", None) in {errno.ETIMEDOUT, errno.EHOSTUNREACH},
1152 lambda exc: raise_retryable_error_from(exc, display_msg=f"network: {exc}")),
1153 (False, lambda exc: raise_retryable_error_from(exc, display_msg="disabled handler example")),
1154 ],
1155 },
1156 )
1158 stdout: str = call_with_retries(fn=fn, policy=RetryPolicy(max_retries=3))
1160 Example: return a fallback value (no retry loop required):
1162 def read_optional_file(path: str) -> str:
1163 return call_with_exception_handlers(
1164 fn=lambda: open(path, encoding="utf-8").read(),
1165 handlers={FileNotFoundError: [(True, lambda _exc: "")]},
1166 )
1167 """
1168 try:
1169 return fn()
1170 except BaseException as exc:
1171 for cls in type(exc).__mro__:
1172 handler_chain = handlers.get(cls)
1173 if handler_chain is not None:
1174 for predicate, handler in handler_chain:
1175 if predicate is True or (predicate is not False and predicate(exc)):
1176 return handler(exc)
1177 if not continue_scanning_if_no_predicate_matches:
1178 raise
1179 raise
1182#############################################################################
1183@final
1184class _ThreadLocalRNG(threading.local):
1185 """Caches a per-thread random number generator."""
1187 def __init__(self) -> None:
1188 self.rng: random.Random | None = None
1191_THREAD_LOCAL_RNG: Final[_ThreadLocalRNG] = _ThreadLocalRNG()
1194def _thread_local_rng() -> random.Random:
1195 """Returns a per-thread RNG for backoff jitter; for perf avoids locking and initializing a new random.Random() at high
1196 frequency."""
1197 threadlocal: _ThreadLocalRNG = _THREAD_LOCAL_RNG
1198 rng: random.Random | None = threadlocal.rng
1199 if rng is None:
1200 rng = random.Random() # noqa: S311 jitter isn't security sensitive, and random.SystemRandom.randint() is slow
1201 threadlocal.rng = rng
1202 return rng