Coverage for bzfs_main / util / retry.py: 100%

513 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-29 12:49 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Customizable generic retry framework; defaults to jittered exponential backoff with cap unless specified otherwise. 

16 

17Purpose: 

18-------- 

19- Provide a reusable retry helper for transient failures using customizable policy and callbacks. 

20- Centralize backoff, jitter, logging and metrics behavior while keeping call sites compact. 

21- Prevent accidental retries: the loop retries only when the developer explicitly raises a ``RetryableError``, which reduces 

22 the risk of retrying non-idempotent operations. 

23- Provide a thread-safe, fast implementation; avoid shared RNG contention. 

24- Provide both sync and async API, both with the same semantics, except async API awaits ``fn`` and uses non-blocking sleep. 

25- Avoid unnecessary complexity and add zero dependencies beyond the Python standard library. Everything you need is in this 

26 single Python file. 

27 

28Usage: 

29------ 

30- Wrap work in a callable ``fn(retry: Retry)`` and therein raise ``RetryableError`` for failures that should be retried. 

31- Construct a policy via ``RetryPolicy(...)`` that specifies how ``RetryableError`` shall be retried. 

32- Invoke ``call_with_retries(fn=fn, policy=policy, log=logger)`` or call_with_retries_async with a standard logging.Logger. 

33- On success, the result of calling ``fn`` is returned. 

34- By default on exhaustion, call_with_retries* either re-raises the last underlying ``RetryableError.__cause__``, or raises 

35 ``RetryError`` (wrapping the last ``RetryableError``), like so: 

36 - if ``RetryPolicy.reraise`` is True and the last ``RetryableError.__cause__`` is not None, re-raise the last 

37 ``RetryableError.__cause__`` with its original traceback. 

38 - Otherwise, raise ``RetryError`` (wrapping the last ``RetryableError``, preserving its ``__cause__`` chain). 

39 - The default is ``RetryPolicy.reraise=True``. 

40 

41Advanced Configuration: 

42----------------------- 

43- Tune ``RetryPolicy`` parameters to control maximum retries, sleep bounds, elapsed-time budget, logging, etc. 

44- Use ``RetryPolicy.config: RetryConfig`` to control logging settings. 

45- Set ``log=None`` to disable logging, or customize ``info_loglevel`` / ``warning_loglevel`` for structured logs. 

46- Supply a ``giveup(AttemptOutcome)`` callback to stop retrying based on domain-specific logic (for example, decisions based 

47 on time budget/quota or the previous N most recent AttemptOutcome objects (via AttemptOutcome.retry.previous_outcomes). 

48- Use the ``any_giveup()`` / ``all_giveup()`` helper to consult more than one callback handler in ``giveup(AttemptOutcome)``. 

49- Supply an ``on_exhaustion(AttemptOutcome)`` callback to customize behavior when giving up; it may raise an error or return 

50 a fallback value. 

51 

52Observability: 

53-------------- 

54- Supply an ``after_attempt(AttemptOutcome)`` callback to collect per-attempt metrics such as success flag, 

55 exhausted/terminated state, attempt number, total elapsed duration (in nanoseconds), sleep duration (in nanoseconds), etc. 

56- ``AttemptOutcome.result`` is either the successful result or the most recent ``RetryableError``, enabling integration with 

57 metrics and tracing systems without coupling the retry loop to any specific backend. 

58- Supply an ``after_attempt(AttemptOutcome)`` callback to customize logging 100%, if necessary. 

59- Use the ``multi_after_attempt()`` helper to invoke more than one callback handler in ``after_attempt(AttemptOutcome)``. 

60 

61Expert Configuration: 

62--------------------- 

63- Supply a ``backoff(BackoffContext)`` callback to plug in a custom backoff algorithm (e.g., decorrelated-jitter or 

64 retry-after HTTP 429). The default is full-jitter exponential backoff with cap (aka industry standard). 

65- Supply a ``before_attempt(Retry)`` callback for optional rate limiting or other forms of internal backpressure. 

66- Supply a ``on_retryable_error(AttemptOutcome)`` callback, e.g. to count failures (RetryableError) caught by the retry loop. 

67- Set ``RetryPolicy.max_previous_outcomes > 0`` to pass the N most recent AttemptOutcome objects to callbacks (default is 0). 

68- If ``RetryPolicy.max_previous_outcomes > 0``, you can use ``RetryableError(..., attachment=...)`` to carry domain-specific 

69 state from a failed attempt to the next attempt via ``retry.previous_outcomes``. This pattern helps if attempt N+1 is a 

70 function of attempt N or all prior attempts (e.g., switching endpoints or resuming from an offset). 

71- Use ``RetryTemplate`` as a 'bag of knobs' configuration template for functions that shall be retried in similar ways. 

72- Or package up all knobs plus a ``fn(retry: Retry)`` function into a self-contained auto-retrying higher level function by 

73 constructing a ``RetryTemplate`` object (which is a ``Callable`` function itself). 

74- To keep calling code retry-transparent, set ``RetryPolicy.reraise=True`` (the default) *and* raise retryable failures as 

75 ``raise RetryableError(...) from exc``. Client code now won't notice whether call_with_retries* is used or not. 

76- To make exhaustion observable to calling code, set ``RetryPolicy.reraise=False``: by default call_with_retries* now always 

77 raises ``RetryError`` (wrapping the last ``RetryableError``) on exhaustion, so callers now catch ``RetryError`` and can 

78 inspect the last underlying exception via ``err.outcome``, ``err.__cause__``, and even ``err.__cause__.__cause__`` when 

79 present. 

80- Set ``RetryPolicy.timing`` to customize reading the current monotonic time, sleeping and optional async termination. 

81- The callback API is powerful enough to easily plug in advanced retry algorithms such as: 

82 - Google SRE Client-Side Adaptive Throttling - https://sre.google/sre-book/handling-overload/ 

83 - gRPC retry throttling - https://grpc.io/docs/guides/retry/ 

84 - AWS SDK adaptive retry mode - https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html 

85 - Circuit breakers - https://martinfowler.com/bliki/CircuitBreaker.html (e.g. via `pybreaker` third-party library) 

86 - Rate limiting with Fixed Window, Moving Window, and Sliding Windows (e.g. via `limits` third-party library) 

87 - Various example backoff strategies such as decorrelated-jitter or retry-after HTTP 429 "Too Many Requests" responses, 

88 etc can be found in test_retry_examples.py. 

89 

90Example Usage: 

91-------------- 

92 import logging 

93 from bzfs_main.util.retry import Retry, RetryPolicy, RetryableError, call_with_retries 

94 

95 def unreliable_operation(retry: Retry) -> str: 

96 try: 

97 if retry.count < 3: 

98 raise ValueError("temporary failure connecting to foo.example.com") 

99 return "ok" 

100 except ValueError as exc: 

101 # Preserve the underlying cause for correct error propagation and logging 

102 raise RetryableError(display_msg="connect") from exc 

103 

104 retry_policy = RetryPolicy( 

105 max_retries=10, 

106 min_sleep_secs=0, 

107 initial_max_sleep_secs=0.125, 

108 max_sleep_secs=10, 

109 max_elapsed_secs=60, 

110 ) 

111 log = logging.getLogger(__name__) 

112 result: str = call_with_retries(fn=unreliable_operation, policy=retry_policy, log=log) 

113 print(result) 

114 

115 # Sample log output: 

116 # INFO:Retrying connect [1/10] in 8.79ms ... 

117 # INFO:Retrying connect [2/10] in 90.1ms ... 

118 # INFO:Retrying connect [3/10] in 372ms ... 

119 # ok 

120 

121Background: 

122----------- 

123For background on exponential backoff and jitter, see for example 

124https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter 

125""" 

126 

127from __future__ import ( 

128 annotations, 

129) 

130import argparse 

131import dataclasses 

132import logging 

133import random 

134import threading 

135import time 

136from collections.abc import ( 

137 Awaitable, 

138 Iterable, 

139 Mapping, 

140 Sequence, 

141) 

142from dataclasses import ( 

143 dataclass, 

144) 

145from typing import ( 

146 TYPE_CHECKING, 

147 Any, 

148 Callable, 

149 Final, 

150 Generic, 

151 NamedTuple, 

152 NoReturn, 

153 TypeVar, 

154 Union, 

155 cast, 

156 final, 

157 overload, 

158) 

159 

160if TYPE_CHECKING: 

161 import asyncio 

162 

163from bzfs_main.util.utils import ( 

164 human_readable_duration, 

165) 

166 

167# constants: 

168INFINITY_MAX_RETRIES: Final[int] = 2**90 - 1 # a number that's essentially infinity for all practical retry purposes 

169 

170 

171############################################################################# 

172def full_jitter_backoff_strategy(context: BackoffContext) -> tuple[int, int]: 

173 """Default implementation of ``backoff`` callback for call_with_retries(); computes delay time before next retry attempt, 

174 after failure. 

175 

176 Full-jitter picks a random sleep_nanos duration from the range [min_sleep_nanos, curr_max_sleep_nanos] and applies 

177 exponential backoff with cap to the next attempt; thread-safe. Typically, min_sleep_nanos is 0 and exponential_base is 2. 

178 Example curr_max_sleep_nanos sequence: 125ms --> 250ms --> 500ms --> 1s --> 2s --> 4s --> 8s --> 10s --> 10s... 

179 Full-jitter provides optimal balance between reducing server load and minimizing retry latency. 

180 """ 

181 policy: RetryPolicy = context.retry.policy 

182 curr_max_sleep_nanos: int = context.curr_max_sleep_nanos 

183 if policy.min_sleep_nanos == curr_max_sleep_nanos: 

184 sleep_nanos = curr_max_sleep_nanos # perf 

185 else: 

186 sleep_nanos = context.rng.randint(policy.min_sleep_nanos, curr_max_sleep_nanos) # nanos to delay until next attempt 

187 curr_max_sleep_nanos = round(curr_max_sleep_nanos * policy.exponential_base) # exponential backoff 

188 curr_max_sleep_nanos = min(curr_max_sleep_nanos, policy.max_sleep_nanos) # ... with cap for next attempt 

189 return sleep_nanos, curr_max_sleep_nanos 

190 

191 

192def no_giveup(outcome: AttemptOutcome) -> object | None: 

193 """Default implementation of ``giveup`` callback for call_with_retries(); never gives up; returning anything other than 

194 ``None`` indicates to give up retrying; thread-safe.""" 

195 return None # don't give up retrying 

196 

197 

198def before_attempt_noop(retry: Retry) -> int: 

199 """Default implementation of ``before_attempt`` callback for call_with_retries(); does nothing; thread-safe.""" 

200 return 0 

201 

202 

203def after_attempt_log_failure(outcome: AttemptOutcome) -> None: 

204 """Default implementation of ``after_attempt`` callback for call_with_retries(); performs simple logging of retry attempt 

205 failures; thread-safe.""" 

206 retry: Retry = outcome.retry 

207 policy: RetryPolicy = retry.policy 

208 config: RetryConfig = policy.config 

209 if outcome.is_success or retry.log is None or not config.enable_logging: 

210 return 

211 log: logging.Logger = retry.log 

212 assert isinstance(outcome.result, RetryableError) 

213 retryable_error: RetryableError = outcome.result 

214 if not outcome.is_exhausted: 

215 if log.isEnabledFor(config.info_loglevel): # Retrying X in Y ms ... 

216 m1: str = config.format_msg(config.display_msg, retryable_error) 

217 m2: str = config.format_pair(retry.count + 1, policy.max_retries) 

218 m3: str = config.format_duration(outcome.sleep_nanos) 

219 log.log(config.info_loglevel, "%s", f"{m1}{m2} in {m3}{config.dots}", extra=config.extra) 

220 else: 

221 if policy.max_retries > 0 and log.isEnabledFor(config.warning_loglevel) and not outcome.is_terminated: 

222 reason: str = "" if outcome.giveup_reason is None else f"{outcome.giveup_reason}; " 

223 format_duration: Callable[[int], str] = config.format_duration # lambda: nanos 

224 log.log( 

225 config.warning_loglevel, 

226 "%s", 

227 f"{config.format_msg(config.display_msg, retryable_error)}" 

228 f"exhausted; giving up because {reason}the last " 

229 f"{config.format_pair(retry.count, policy.max_retries)} retries across " 

230 f"{config.format_pair(format_duration(outcome.elapsed_nanos), format_duration(policy.max_elapsed_nanos))} " 

231 "failed", 

232 exc_info=retryable_error if config.exc_info else None, 

233 stack_info=config.stack_info, 

234 extra=config.extra, 

235 ) 

236 

237 

238def noop(outcome: AttemptOutcome) -> None: 

239 """Default implementation of ``on_retryable_error`` callback for call_with_retries(); does nothing; thread-safe.""" 

240 

241 

242def on_exhaustion_raise(outcome: AttemptOutcome) -> NoReturn: 

243 """Default implementation of ``on_exhaustion`` callback for call_with_retries(); always raises; thread-safe.""" 

244 assert outcome.is_exhausted 

245 assert isinstance(outcome.result, RetryableError) 

246 retryable_error: RetryableError = outcome.result 

247 policy: RetryPolicy = outcome.retry.policy 

248 cause: BaseException | None = retryable_error.__cause__ 

249 if policy.reraise and cause is not None: 

250 raise cause.with_traceback(cause.__traceback__) 

251 raise RetryError(outcome=outcome) from retryable_error 

252 

253 

254def _update_previous_outcomes( 

255 previous_outcomes: tuple[AttemptOutcome, ...], outcome: AttemptOutcome, policy: RetryPolicy, retry: Retry 

256) -> tuple[AttemptOutcome, ...]: 

257 """Computes value of previous_outcomes for next retry iteration.""" 

258 n: int = policy.max_previous_outcomes 

259 if n > 0: # outcome will be passed to next attempt via Retry.previous_outcomes 

260 if previous_outcomes: # detach to reduce memory footprint 

261 outcome = outcome.copy(retry=retry.copy(previous_outcomes=())) 

262 previous_outcomes = previous_outcomes[len(previous_outcomes) - n + 1 :] + (outcome,) # immutable deque 

263 return previous_outcomes 

264 

265 

266############################################################################# 

267_T = TypeVar("_T") 

268 

269 

270def call_with_retries( 

271 fn: Callable[[Retry], _T], # typically a lambda; wraps work and raises RetryableError for failures that shall be retried 

272 policy: RetryPolicy, # specifies how ``RetryableError`` shall be retried 

273 *, 

274 backoff: BackoffStrategy = full_jitter_backoff_strategy, # computes delay time before next retry attempt, after failure 

275 giveup: Callable[[AttemptOutcome], object | None] = no_giveup, # stop retrying based on domain-specific logic, e.g. time 

276 before_attempt: Callable[[Retry], int] = before_attempt_noop, # e.g. wait due to rate limiting or internal backpressure 

277 after_attempt: Callable[[AttemptOutcome], None] = after_attempt_log_failure, # e.g. record metrics and/or custom logging 

278 on_retryable_error: Callable[[AttemptOutcome], None] = noop, # e.g. count failures (RetryableError) caught by retry loop 

279 on_exhaustion: Callable[[AttemptOutcome], _T] = on_exhaustion_raise, # raise error or return fallback value 

280 log: logging.Logger | None = None, # set this to ``None`` to disable logging 

281) -> _T: 

282 """Runs the function ``fn`` and returns its result; retries on failure as indicated by policy; thread-safe. 

283 

284 By default on exhaustion, call_with_retries() either re-raises the last underlying ``RetryableError.__cause__``, or raises 

285 ``RetryError`` (wrapping the last ``RetryableError``), like so: 

286 - if ``RetryPolicy.reraise`` is True and the last ``RetryableError.__cause__`` is not None, re-raise the last 

287 ``RetryableError.__cause__`` with its original traceback. 

288 - Otherwise, raise ``RetryError`` (wrapping the last ``RetryableError``, preserving its ``__cause__`` chain). 

289 - The default is ``RetryPolicy.reraise=True``. 

290 

291 On the exhaustion path, ``on_exhaustion`` will be called exactly once (after the final after_attempt). The default 

292 implementation raises as described above; custom ``on_exhaustion`` impls may return a fallback value instead of an error. 

293 """ 

294 rng: random.Random | None = None 

295 retry_count: int = 0 

296 idle_nanos: int = 0 

297 curr_max_sleep_nanos: int = policy.initial_max_sleep_nanos 

298 previous_outcomes: tuple[AttemptOutcome, ...] = () # for safety pass *immutable* deque to callbacks 

299 timing: RetryTiming = policy.timing 

300 sleep: Callable[[int, Retry], None] = timing.sleep 

301 monotonic_ns: Callable[[], int] = timing.monotonic_ns 

302 call_start_nanos: Final[int] = monotonic_ns() 

303 while True: 

304 before_attempt_nanos: int = monotonic_ns() if retry_count != 0 else call_start_nanos 

305 prev: tuple[AttemptOutcome, ...] = previous_outcomes 

306 retry: Retry = Retry( 

307 retry_count, call_start_nanos, before_attempt_nanos, before_attempt_nanos, idle_nanos, policy, log, prev 

308 ) 

309 try: 

310 if before_attempt is not before_attempt_noop: 

311 before_attempt_sleep_nanos: int = before_attempt(retry) 

312 assert before_attempt_sleep_nanos >= 0, before_attempt_sleep_nanos 

313 if before_attempt_sleep_nanos > 0: 

314 sleep(before_attempt_sleep_nanos, retry) # e.g. wait due to rate limiting or internal backpressure 

315 attempt_start_nanos: int = monotonic_ns() 

316 idle_nanos += attempt_start_nanos - before_attempt_nanos 

317 retry = Retry( 

318 retry_count, call_start_nanos, before_attempt_nanos, attempt_start_nanos, idle_nanos, policy, log, prev 

319 ) 

320 timing.on_before_attempt(retry) 

321 result: _T = fn(retry) # Call the target function and supply retry attempt number and other metadata 

322 if after_attempt is not after_attempt_log_failure: 

323 elapsed_nanos: int = monotonic_ns() - call_start_nanos 

324 outcome: AttemptOutcome = AttemptOutcome(retry, True, False, False, None, elapsed_nanos, 0, result) 

325 after_attempt(outcome) 

326 return result 

327 except RetryableError as retryable_error: 

328 elapsed_nanos = monotonic_ns() - call_start_nanos 

329 is_terminated: Callable[[Retry], bool] = timing.is_terminated 

330 giveup_reason: object | None = None 

331 sleep_nanos: int = 0 

332 outcome = AttemptOutcome(retry, False, False, False, None, elapsed_nanos, sleep_nanos, retryable_error) 

333 on_retryable_error(outcome) # e.g. count failures (RetryableError) caught by retry loop 

334 if retry_count < policy.max_retries and elapsed_nanos < policy.max_elapsed_nanos: 

335 if policy.max_sleep_nanos == 0 and backoff is full_jitter_backoff_strategy: 

336 pass # perf: e.g. spin-before-block 

337 elif retry_count == 0 and retryable_error.retry_immediately_once: 

338 pass # retry once immediately without backoff 

339 else: # jitter: default backoff strategy picks random sleep_nanos in [min_sleep_nanos, curr_max_sleep_nanos] 

340 rng = _thread_local_rng() if rng is None else rng 

341 sleep_nanos, curr_max_sleep_nanos = backoff( # compute delay before next retry attempt, after failure 

342 BackoffContext(retry, curr_max_sleep_nanos, rng, elapsed_nanos, retryable_error) 

343 ) 

344 assert sleep_nanos >= 0 and curr_max_sleep_nanos >= 0, sleep_nanos 

345 

346 if sleep_nanos > 0: 

347 outcome = AttemptOutcome(retry, False, False, False, None, elapsed_nanos, sleep_nanos, retryable_error) 

348 if (not is_terminated(retry)) and (giveup_reason := giveup(outcome)) is None: 

349 after_attempt(outcome) 

350 sleep(sleep_nanos, retry) 

351 idle_nanos += sleep_nanos 

352 if not is_terminated(retry): 

353 previous_outcomes = _update_previous_outcomes(previous_outcomes, outcome, policy, retry) 

354 del outcome # help gc 

355 retry_count += 1 

356 continue # continue retry loop with next attempt 

357 else: 

358 sleep_nanos = 0 

359 outcome = AttemptOutcome( 

360 retry, False, True, is_terminated(retry), giveup_reason, elapsed_nanos, sleep_nanos, retryable_error 

361 ) 

362 after_attempt(outcome) 

363 return on_exhaustion(outcome) # raise error or return fallback value 

364 

365 

366async def call_with_retries_async( 

367 fn: Callable[[Retry], Awaitable[_T]], # wraps work and raises RetryableError for failures that shall be retried 

368 policy: RetryPolicy, # specifies how ``RetryableError`` shall be retried 

369 *, 

370 backoff: BackoffStrategy = full_jitter_backoff_strategy, # computes delay time before next retry attempt, after failure 

371 giveup: Callable[[AttemptOutcome], object | None] = no_giveup, # stop retrying based on domain-specific logic, e.g. time 

372 before_attempt: Callable[[Retry], int] = before_attempt_noop, # e.g. wait due to rate limiting or internal backpressure 

373 after_attempt: Callable[[AttemptOutcome], None] = after_attempt_log_failure, # e.g. record metrics and/or custom logging 

374 on_retryable_error: Callable[[AttemptOutcome], None] = noop, # e.g. count failures (RetryableError) caught by retry loop 

375 on_exhaustion: Callable[[AttemptOutcome], _T] = on_exhaustion_raise, # raise error or return fallback value 

376 log: logging.Logger | None = None, # set this to ``None`` to disable logging 

377) -> _T: 

378 """Async version of call_with_retries() with the same semantics except it awaits ``fn`` and uses non-blocking sleep.""" 

379 rng: random.Random | None = None 

380 retry_count: int = 0 

381 idle_nanos: int = 0 

382 curr_max_sleep_nanos: int = policy.initial_max_sleep_nanos 

383 previous_outcomes: tuple[AttemptOutcome, ...] = () # for safety pass *immutable* deque to callbacks 

384 timing: RetryTiming = policy.timing 

385 sleep: Callable[[int, Retry], Awaitable[None]] = timing.sleep_async 

386 monotonic_ns: Callable[[], int] = timing.monotonic_ns 

387 call_start_nanos: Final[int] = monotonic_ns() 

388 while True: 

389 before_attempt_nanos: int = monotonic_ns() if retry_count != 0 else call_start_nanos 

390 prev: tuple[AttemptOutcome, ...] = previous_outcomes 

391 retry: Retry = Retry( 

392 retry_count, call_start_nanos, before_attempt_nanos, before_attempt_nanos, idle_nanos, policy, log, prev 

393 ) 

394 try: 

395 if before_attempt is not before_attempt_noop: 

396 before_attempt_sleep_nanos: int = before_attempt(retry) 

397 assert before_attempt_sleep_nanos >= 0, before_attempt_sleep_nanos 

398 if before_attempt_sleep_nanos > 0: 

399 await sleep(before_attempt_sleep_nanos, retry) # e.g. wait due to rate limiting or internal backpressure 

400 attempt_start_nanos: int = monotonic_ns() 

401 idle_nanos += attempt_start_nanos - before_attempt_nanos 

402 retry = Retry( 

403 retry_count, call_start_nanos, before_attempt_nanos, attempt_start_nanos, idle_nanos, policy, log, prev 

404 ) 

405 timing.on_before_attempt(retry) 

406 result: _T = await fn(retry) # Call the target function and supply retry attempt number and other metadata 

407 if after_attempt is not after_attempt_log_failure: 

408 elapsed_nanos: int = monotonic_ns() - call_start_nanos 

409 outcome: AttemptOutcome = AttemptOutcome(retry, True, False, False, None, elapsed_nanos, 0, result) 

410 after_attempt(outcome) 

411 return result 

412 except RetryableError as retryable_error: 

413 elapsed_nanos = monotonic_ns() - call_start_nanos 

414 is_terminated: Callable[[Retry], bool] = timing.is_terminated 

415 giveup_reason: object | None = None 

416 sleep_nanos: int = 0 

417 outcome = AttemptOutcome(retry, False, False, False, None, elapsed_nanos, sleep_nanos, retryable_error) 

418 on_retryable_error(outcome) # e.g. count failures (RetryableError) caught by retry loop 

419 if retry_count < policy.max_retries and elapsed_nanos < policy.max_elapsed_nanos: 

420 if policy.max_sleep_nanos == 0 and backoff is full_jitter_backoff_strategy: 

421 pass # perf: e.g. spin-before-block 

422 elif retry_count == 0 and retryable_error.retry_immediately_once: 

423 pass # retry once immediately without backoff 

424 else: # jitter: default backoff strategy picks random sleep_nanos in [min_sleep_nanos, curr_max_sleep_nanos] 

425 rng = _thread_local_rng() if rng is None else rng 

426 sleep_nanos, curr_max_sleep_nanos = backoff( # compute delay before next retry attempt, after failure 

427 BackoffContext(retry, curr_max_sleep_nanos, rng, elapsed_nanos, retryable_error) 

428 ) 

429 assert sleep_nanos >= 0 and curr_max_sleep_nanos >= 0, sleep_nanos 

430 

431 if sleep_nanos > 0: 

432 outcome = AttemptOutcome(retry, False, False, False, None, elapsed_nanos, sleep_nanos, retryable_error) 

433 if (not is_terminated(retry)) and (giveup_reason := giveup(outcome)) is None: 

434 after_attempt(outcome) 

435 await sleep(sleep_nanos, retry) 

436 idle_nanos += sleep_nanos 

437 if not is_terminated(retry): 

438 previous_outcomes = _update_previous_outcomes(previous_outcomes, outcome, policy, retry) 

439 del outcome # help gc 

440 retry_count += 1 

441 continue # continue retry loop with next attempt 

442 else: 

443 sleep_nanos = 0 

444 outcome = AttemptOutcome( 

445 retry, False, True, is_terminated(retry), giveup_reason, elapsed_nanos, sleep_nanos, retryable_error 

446 ) 

447 after_attempt(outcome) 

448 return on_exhaustion(outcome) # raise error or return fallback value 

449 

450 

451def multi_after_attempt(handlers: Iterable[Callable[[AttemptOutcome], None]]) -> Callable[[AttemptOutcome], None]: 

452 """Composes independent ``after_attempt`` handlers into one ``call_with_retries(after_attempt=...)`` callback that 

453 invokes each handler in order; thread-safe.""" 

454 handlers = tuple(handlers) 

455 if len(handlers) == 1: 

456 return handlers[0] # perf 

457 

458 def _after_attempt(outcome: AttemptOutcome) -> None: 

459 for handler in handlers: 

460 handler(outcome) 

461 

462 return _after_attempt 

463 

464 

465def any_giveup(handlers: Iterable[Callable[[AttemptOutcome], object | None]]) -> Callable[[AttemptOutcome], object | None]: 

466 """Composes independent ``giveup`` handlers into one ``call_with_retries(giveup=...)`` callback that gives up retrying if 

467 *any* handler gives up; that is if any handler returns a non-``None`` reason; thread-safe. 

468 

469 Handlers are evaluated in order and short-circuit: On giving up returns the first handler's reason for giving up. 

470 """ 

471 handlers = tuple(handlers) 

472 if len(handlers) == 1: 

473 return handlers[0] # perf 

474 

475 def _giveup(outcome: AttemptOutcome) -> object | None: 

476 for handler in handlers: 

477 giveup_reason: object | None = handler(outcome) 

478 if giveup_reason is not None: 

479 return giveup_reason 

480 return None # don't give up retrying 

481 

482 return _giveup 

483 

484 

485def all_giveup(handlers: Iterable[Callable[[AttemptOutcome], object | None]]) -> Callable[[AttemptOutcome], object | None]: 

486 """Composes independent ``giveup`` handlers into one ``call_with_retries(giveup=...)`` callback that gives up retrying if 

487 *all* handlers give up; that is if all handlers return a non-``None`` reason; thread-safe. 

488 

489 Handlers are evaluated in order and short-circuit: stops at first ``None``; else returns the last non-``None`` reason. 

490 """ 

491 handlers = tuple(handlers) 

492 if len(handlers) == 1: 

493 return handlers[0] # perf 

494 

495 def _giveup(outcome: AttemptOutcome) -> object | None: 

496 giveup_reason: object | None = None 

497 for handler in handlers: 

498 giveup_reason = handler(outcome) 

499 if giveup_reason is None: 

500 return None # don't give up retrying 

501 return giveup_reason 

502 

503 return _giveup 

504 

505 

506############################################################################# 

507class RetryableError(Exception): 

508 """Indicates that the task that caused the underlying exception can be retried and might eventually succeed; 

509 ``call_with_retries()`` will pass this exception to callbacks via ``AttemptOutcome.result``; can be subclassed.""" 

510 

511 def __init__( 

512 self, 

513 *exc_args: object, # optional args passed into super().__init__() 

514 display_msg: object = None, # for logging 

515 retry_immediately_once: bool = False, # retry once immediately without backoff? 

516 category: object = None, # optional classification e.g. "CONCURRENCY, "SERVER_ISSUE", "THROTTLING", "TRANSIENT", ... 

517 attachment: object = None, # optional domain specific info passed to next attempt via Retry.previous_outcomes if 

518 # RetryPolicy.max_previous_outcomes > 0. This helps when retrying is not just 'try again later', but 

519 # 'try again differently based on what just happened'. 

520 # Examples: switching network endpoints, adjusting per-attempt timeouts, capping retries by error-class, resuming 

521 # with a token/offset, maintaining failure history for this invocation of call_with_retries(). 

522 # Example: 'cap retries to 3 for ECONNREFUSED but 12 for ETIMEDOUT' via attachment=collections.Counter 

523 ) -> None: 

524 super().__init__(*exc_args) 

525 self.display_msg: object = display_msg 

526 self.retry_immediately_once: bool = retry_immediately_once 

527 self.category: object = category 

528 self.attachment: object = attachment 

529 

530 def display_msg_str(self) -> str: 

531 """Returns the display_msg as a str; for logging.""" 

532 return "" if self.display_msg is None else str(self.display_msg) 

533 

534 

535############################################################################# 

536@final 

537class RetryError(Exception): 

538 """Indicates that retries have been exhausted; the last RetryableError is in RetryError.__cause__.""" 

539 

540 outcome: Final[AttemptOutcome] 

541 """Metadata that describes why and how call_with_retries() gave up.""" 

542 

543 def __init__(self, outcome: AttemptOutcome) -> None: 

544 super().__init__(outcome) 

545 self.outcome = outcome 

546 

547 

548############################################################################# 

549@final 

550class Retry(NamedTuple): 

551 """Attempt metadata provided to callback functions; includes the current retry attempt number; immutable.""" 

552 

553 count: int # type: ignore[assignment] 

554 """Attempt number; count=0 is the first attempt, count=1 is the second attempt aka first retry.""" 

555 

556 call_start_time_nanos: int 

557 """Value of time.monotonic_ns() at start of call_with_retries() invocation.""" 

558 

559 before_attempt_start_time_nanos: int 

560 """Value of time.monotonic_ns() at start of before_attempt() invocation.""" 

561 

562 attempt_start_time_nanos: int 

563 """Value of time.monotonic_ns() at start of fn() invocation.""" 

564 

565 idle_nanos: int 

566 """Sum of all before_attempt_sleep_nanos() plus AttemptOutcome.sleep_nanos across this call_with_retries() invocation, at 

567 the start of fn() invocation.""" 

568 

569 policy: RetryPolicy 

570 """Policy that was passed into call_with_retries().""" 

571 

572 log: logging.Logger | None 

573 """Logger that was passed into call_with_retries().""" 

574 

575 previous_outcomes: Sequence[AttemptOutcome] 

576 """History/state of the N=max_previous_outcomes most recent outcomes for the current call_with_retries() invocation.""" 

577 

578 def copy(self, **override_kwargs: Any) -> Retry: 

579 """Creates a new object copying an existing one with the specified fields overridden for customization.""" 

580 return self._replace(**override_kwargs) 

581 

582 def before_attempt_sleep_nanos(self) -> int: 

583 """Returns duration between the start of before_attempt() and the start of fn() attempt.""" 

584 return self.attempt_start_time_nanos - self.before_attempt_start_time_nanos 

585 

586 def __repr__(self) -> str: 

587 return ( 

588 f"{type(self).__name__}(count={self.count!r}, call_start_time_nanos={self.call_start_time_nanos!r}, " 

589 f"before_attempt_start_time_nanos={self.before_attempt_start_time_nanos!r}, " 

590 f"attempt_start_time_nanos={self.attempt_start_time_nanos!r}, idle_nanos={self.idle_nanos!r})" 

591 ) 

592 

593 def __eq__(self, other: object) -> bool: 

594 return self is other 

595 

596 def __hash__(self) -> int: 

597 return object.__hash__(self) 

598 

599 

600############################################################################# 

601@final 

602class AttemptOutcome(NamedTuple): 

603 """Captures per-attempt state for ``after_attempt`` callbacks; immutable.""" 

604 

605 retry: Retry 

606 """Attempt metadata passed into fn(retry).""" 

607 

608 is_success: bool 

609 """False if fn(retry) raised a RetryableError; True otherwise.""" 

610 

611 is_exhausted: bool 

612 """True if the loop is giving up retrying (possibly even due to is_terminated); False otherwise.""" 

613 

614 is_terminated: bool 

615 """True if the termination predicate has become true; False otherwise.""" 

616 

617 giveup_reason: object | None 

618 """Reason returned by giveup(); None means giveup() was not called or decided to not give up.""" 

619 

620 elapsed_nanos: int 

621 """Total duration between the start of call_with_retries() invocation and the end of this fn() attempt.""" 

622 

623 sleep_nanos: int 

624 """Duration of current sleep period.""" 

625 

626 result: RetryableError | object 

627 """Result of fn(retry); a RetryableError on retryable failure, or some other object on success.""" 

628 

629 def attempt_elapsed_nanos(self) -> int: 

630 """Returns duration between the start of this fn() attempt and the end of this fn() attempt.""" 

631 return self.elapsed_nanos + self.retry.call_start_time_nanos - self.retry.attempt_start_time_nanos 

632 

633 def copy(self, **override_kwargs: Any) -> AttemptOutcome: 

634 """Creates a new outcome copying an existing one with the specified fields overridden for customization.""" 

635 return self._replace(**override_kwargs) 

636 

637 def __repr__(self) -> str: 

638 return ( 

639 f"{type(self).__name__}(" 

640 f"retry={self.retry!r}, " 

641 f"is_success={self.is_success!r}, " 

642 f"is_exhausted={self.is_exhausted!r}, " 

643 f"is_terminated={self.is_terminated!r}, " 

644 f"giveup_reason={self.giveup_reason!r}, " 

645 f"elapsed_nanos={self.elapsed_nanos!r}, " 

646 f"sleep_nanos={self.sleep_nanos!r})" 

647 ) 

648 

649 def __eq__(self, other: object) -> bool: 

650 return self is other 

651 

652 def __hash__(self) -> int: 

653 return object.__hash__(self) 

654 

655 

656############################################################################# 

657@final 

658class BackoffContext(NamedTuple): 

659 """Captures per-backoff state for ``backoff`` callbacks.""" 

660 

661 retry: Retry 

662 """Attempt metadata passed into fn(retry).""" 

663 

664 curr_max_sleep_nanos: int 

665 """Current maximum duration (in nanoseconds) to sleep before the next retry attempt; 

666 Typically: ``RetryPolicy.initial_max_sleep_nanos <= curr_max_sleep_nanos <= RetryPolicy.max_sleep_nanos``.""" 

667 

668 rng: random.Random 

669 """Thread-local random number generator instance.""" 

670 

671 elapsed_nanos: int 

672 """Total duration between the start of call_with_retries() invocation and the end of this fn() attempt.""" 

673 

674 retryable_error: RetryableError 

675 """Result of failed fn(retry) attempt.""" 

676 

677 def copy(self, **override_kwargs: Any) -> BackoffContext: 

678 """Creates a new object copying an existing one with the specified fields overridden for customization.""" 

679 return self._replace(**override_kwargs) 

680 

681 def __repr__(self) -> str: 

682 return ( 

683 f"{type(self).__name__}(" 

684 f"retry={self.retry!r}, " 

685 f"curr_max_sleep_nanos={self.curr_max_sleep_nanos!r}, " 

686 f"elapsed_nanos={self.elapsed_nanos!r})" 

687 ) 

688 

689 def __eq__(self, other: object) -> bool: 

690 return self is other 

691 

692 def __hash__(self) -> int: 

693 return object.__hash__(self) 

694 

695 

696BackoffStrategy = Callable[[BackoffContext], tuple[int, int]] # typealias; returns sleep_nanos:int, curr_max_sleep_nanos:int 

697"""Strategy that implements a backoff algorithm that reduces server load while minimizing retry latency; default is full 

698jitter; various other example backoff strategies such as decorrelated-jitter or retry-after HTTP 429 "Too Many Requests" 

699responses, etc can be found in test_retry_examples.py.""" 

700 

701 

702############################################################################# 

703def _default_timing_is_terminated(retry: Retry) -> bool: 

704 return False 

705 

706 

707def _default_timing_sleep(sleep_nanos: int, retry: Retry) -> None: 

708 time.sleep(sleep_nanos / 1_000_000_000) 

709 

710 

711async def _default_timing_sleep_asyncio(sleep_nanos: int, retry: Retry) -> None: 

712 import asyncio 

713 

714 await asyncio.sleep(sleep_nanos / 1_000_000_000) 

715 

716 

717def _default_timing_on_before_attempt(retry: Retry) -> None: 

718 if retry.policy.timing.is_terminated(retry): 

719 raise RetryableError(display_msg="terminated before attempt") from RetryTerminationError() 

720 

721 

722@final 

723class RetryTerminationError(InterruptedError): 

724 """Termination signal raised when retry loop exits before starting the next attempt.""" 

725 

726 

727@dataclass(frozen=True) 

728@final 

729class RetryTiming: 

730 """Customizable callbacks for reading the current monotonic time, sleeping and optional async termination; immutable.""" 

731 

732 monotonic_ns: Callable[[], int] = time.monotonic_ns 

733 """Returns the system's current monotonic time in nanoseconds.""" 

734 

735 is_terminated: Callable[[Retry], bool] = _default_timing_is_terminated 

736 """Returns whether a predicate has become true; if so causes the retry loop to exit early between attempts; can be used 

737 to indicate system shutdown or similar cancellation conditions; default is to always return ``False``; this function 

738 should complete quickly without any blocking or sleeping.""" 

739 

740 sleep: Callable[[int, Retry], None] = _default_timing_sleep 

741 """Sleeps N nanoseconds between attempts; override to inject custom sleeping or for early wake-ups; thread-safe.""" 

742 

743 sleep_async: Callable[[int, Retry], Awaitable[None]] = _default_timing_sleep_asyncio 

744 """Sleeps N nanoseconds between attempts; override to inject custom sleeping or for early wake-ups; thread-safe.""" 

745 

746 on_before_attempt: Callable[[Retry], None] = _default_timing_on_before_attempt 

747 """Typically (but not necessarily) raises an error if ``is_terminated()`` is True; otherwise fn() will still run; this 

748 function should complete quickly without any blocking or sleeping. 

749 

750 To disable this behavior: RetryTiming.make_from(...).copy(on_before_attempt=lambda retry: None). 

751 """ 

752 

753 def copy(self, **override_kwargs: Any) -> RetryTiming: 

754 """Creates a new object copying an existing one with the specified fields overridden for customization; thread- 

755 safe.""" 

756 return dataclasses.replace(self, **override_kwargs) 

757 

758 @staticmethod 

759 def make_from(termination_event: threading.Event | None) -> RetryTiming: 

760 """Convenience factory that creates a RetryTiming that performs async termination when termination_event is set.""" 

761 if termination_event is None: 

762 return RetryTiming() 

763 

764 def _is_terminated(retry: Retry) -> bool: 

765 return termination_event.is_set() 

766 

767 def _sleep(sleep_nanos: int, retry: Retry) -> None: 

768 termination_event.wait(sleep_nanos / 1_000_000_000) # allow early wakeup on async termination 

769 

770 return RetryTiming(is_terminated=_is_terminated, sleep=_sleep) 

771 

772 @staticmethod 

773 def make_from_asyncio(termination_event: asyncio.Event | None) -> RetryTiming: 

774 """Convenience factory that creates a RetryTiming that performs async termination when termination_event is set; 

775 Write an analog version of this function if you wish to replace asyncio with Trio or AnyIO or similar.""" 

776 if termination_event is None: 

777 return RetryTiming() 

778 

779 def _is_terminated(retry: Retry) -> bool: 

780 return termination_event.is_set() 

781 

782 async def _sleep_async(sleep_nanos: int, retry: Retry) -> None: 

783 import asyncio 

784 

785 if sleep_nanos <= 0: 

786 await asyncio.sleep(0) # perf: cooperative yield with less overhead than asyncio.wait_for(..., 0) 

787 return 

788 try: 

789 await asyncio.wait_for(termination_event.wait(), timeout=sleep_nanos / 1_000_000_000) 

790 except asyncio.TimeoutError: 

791 pass # expected 

792 

793 return RetryTiming(is_terminated=_is_terminated, sleep_async=_sleep_async) 

794 

795 

796############################################################################# 

797def _format_msg(display_msg: str, retryable_error: RetryableError) -> str: 

798 """Default implementation of ``format_msg`` callback for RetryConfig; creates simple log message; thread-safe.""" 

799 msg = display_msg + " " if display_msg else "" 

800 errmsg: str = retryable_error.display_msg_str() 

801 msg = msg + errmsg + " " if errmsg else msg 

802 msg = msg if msg else "Retrying " 

803 return msg 

804 

805 

806def _format_pair(first: object, second: object) -> str: 

807 """Default implementation of ``format_pair`` callback for RetryConfig; creates simple log message part; thread-safe.""" 

808 second = "∞" if INFINITY_MAX_RETRIES == second else second # noqa: SIM300 

809 return f"[{first}/{second}]" 

810 

811 

812@dataclass(frozen=True) 

813@final 

814class RetryConfig: 

815 """Configures logging for call_with_retries(); all defaults work out of the box; immutable.""" 

816 

817 display_msg: str = "Retrying" # message prefix for retry log messages 

818 dots: str = " ..." # suffix appended to retry log messages 

819 format_msg: Callable[[str, RetryableError], str] = _format_msg # lambda: display_msg, retryable_error 

820 format_pair: Callable[[object, object], str] = _format_pair # lambda: first, second 

821 format_duration: Callable[[int], str] = human_readable_duration # lambda: nanos 

822 info_loglevel: int = logging.INFO # loglevel used when not giving up 

823 warning_loglevel: int = logging.WARNING # loglevel used when giving up 

824 enable_logging: bool = True # set to False to disable logging 

825 exc_info: bool = False # passed into Logger.log() 

826 stack_info: bool = False # passed into Logger.log() 

827 extra: Mapping[str, object] | None = dataclasses.field(default=None, repr=False, compare=False) # passed to Logger.log() 

828 context: object = dataclasses.field(default=None, repr=False, compare=False) # optional domain specific info 

829 

830 def copy(self, **override_kwargs: Any) -> RetryConfig: 

831 """Creates a new config copying an existing one with the specified fields overridden for customization.""" 

832 return dataclasses.replace(self, **override_kwargs) 

833 

834 

835############################################################################# 

836@dataclass(frozen=True) 

837@final 

838class RetryPolicy: 

839 """Configuration of maximum retries, sleep bounds, elapsed-time budget, logging, etc for call_with_retries(); immutable. 

840 

841 By default uses full jitter which works as follows: The maximum duration to sleep between attempts initially starts with 

842 ``initial_max_sleep_secs`` and doubles on each retry, up to the final maximum of ``max_sleep_secs``. 

843 Example: 125ms --> 250ms --> 500ms --> 1s --> 2s --> 4s --> 8s --> 10s --> 10s... 

844 On each retry a random sleep duration in the range ``[min_sleep_secs, current max]`` is picked. 

845 In a nutshell: ``0 <= min_sleep_secs <= initial_max_sleep_secs <= max_sleep_secs``. Typically, min_sleep_secs=0. 

846 """ 

847 

848 max_retries: int = INFINITY_MAX_RETRIES 

849 """The maximum number of times ``fn`` will be invoked additionally after the first attempt invocation; must be >= 0.""" 

850 

851 min_sleep_secs: float = 0 

852 """The minimum duration to sleep between any two attempts.""" 

853 

854 initial_max_sleep_secs: float = 0.125 

855 """The initial maximum duration to sleep between any two attempts.""" 

856 

857 max_sleep_secs: float = 10 

858 """The final max duration to sleep between any two attempts; 0 <= min_sleep_secs <= initial_max_sleep_secs <= 

859 max_sleep_secs.""" 

860 

861 max_elapsed_secs: float = 47 

862 """``fn`` will not be retried (or not retried anymore) once this much time has elapsed since the initial start of 

863 call_with_retries(); set this to 365 * 86400 seconds or similar to effectively disable the time limit.""" 

864 

865 exponential_base: float = 2 

866 """Growth factor (aka multiplier) for backoff algorithm to calculate sleep duration; must be >= 1.""" 

867 

868 max_elapsed_nanos: int = dataclasses.field(init=False, repr=False) # derived value 

869 min_sleep_nanos: int = dataclasses.field(init=False, repr=False) # derived value 

870 initial_max_sleep_nanos: int = dataclasses.field(init=False, repr=False) # derived value 

871 max_sleep_nanos: int = dataclasses.field(init=False, repr=False) # derived value 

872 

873 reraise: bool = True 

874 """On exhaustion, the default (``True``) is to re-raise the underlying exception when present.""" 

875 

876 max_previous_outcomes: int = 0 

877 """Pass the N=max_previous_outcomes most recent AttemptOutcome objects to callbacks via Retry.previous_outcomes.""" 

878 

879 config: RetryConfig = dataclasses.field(default=RetryConfig(), repr=False, compare=False) 

880 """Configures logging behavior.""" 

881 

882 timing: RetryTiming = dataclasses.field(default=RetryTiming(), repr=False) 

883 """Customizable callbacks for reading the current monotonic time, sleeping and optional async termination.""" 

884 

885 context: object = dataclasses.field(default=None, repr=False, compare=False) 

886 """Optional domain specific info.""" 

887 

888 @classmethod 

889 def from_namespace(cls, args: argparse.Namespace) -> RetryPolicy: 

890 """Factory that reads the policy from argparse.ArgumentParser via args.""" 

891 return cls( 

892 max_retries=getattr(args, "max_retries", INFINITY_MAX_RETRIES), 

893 min_sleep_secs=getattr(args, "retry_min_sleep_secs", 0), 

894 initial_max_sleep_secs=getattr(args, "retry_initial_max_sleep_secs", 0.125), 

895 max_sleep_secs=getattr(args, "retry_max_sleep_secs", 10), 

896 max_elapsed_secs=getattr(args, "retry_max_elapsed_secs", 47), 

897 exponential_base=getattr(args, "retry_exponential_base", 2), 

898 reraise=getattr(args, "retry_reraise", True), 

899 max_previous_outcomes=getattr(args, "retry_max_previous_outcomes", 0), 

900 config=getattr(args, "retry_config", RetryConfig()), 

901 timing=getattr(args, "retry_timing", RetryTiming()), 

902 context=getattr(args, "retry_context", None), 

903 ) 

904 

905 @classmethod 

906 def no_retries(cls) -> RetryPolicy: 

907 """Returns a policy that never retries.""" 

908 return cls( 

909 max_retries=0, 

910 min_sleep_secs=0, 

911 initial_max_sleep_secs=0, 

912 max_sleep_secs=0, 

913 max_elapsed_secs=0, 

914 ) 

915 

916 def __post_init__(self) -> None: # validate and compute derived values 

917 self._validate_min("max_retries", self.max_retries, 0) 

918 self._validate_min("exponential_base", self.exponential_base, 1) 

919 self._validate_min("min_sleep_secs", self.min_sleep_secs, 0) 

920 self._validate_min("initial_max_sleep_secs", self.initial_max_sleep_secs, 0) 

921 self._validate_min("max_sleep_secs", self.max_sleep_secs, 0) 

922 self._validate_min("max_elapsed_secs", self.max_elapsed_secs, 0) 

923 object.__setattr__(self, "max_elapsed_nanos", int(self.max_elapsed_secs * 1_000_000_000)) # derived value 

924 min_sleep_nanos: int = int(self.min_sleep_secs * 1_000_000_000) 

925 initial_max_sleep_nanos: int = int(self.initial_max_sleep_secs * 1_000_000_000) 

926 max_sleep_nanos: int = int(self.max_sleep_secs * 1_000_000_000) 

927 max_sleep_nanos = max(min_sleep_nanos, max_sleep_nanos) 

928 initial_max_sleep_nanos = min(max_sleep_nanos, max(min_sleep_nanos, initial_max_sleep_nanos)) 

929 object.__setattr__(self, "min_sleep_nanos", min_sleep_nanos) # derived value 

930 object.__setattr__(self, "initial_max_sleep_nanos", initial_max_sleep_nanos) # derived value 

931 object.__setattr__(self, "max_sleep_nanos", max_sleep_nanos) # derived value 

932 self._validate_min("max_previous_outcomes", self.max_previous_outcomes, 0) 

933 assert 0 <= self.min_sleep_nanos <= self.initial_max_sleep_nanos <= self.max_sleep_nanos 

934 if not isinstance(self.reraise, bool): 

935 raise TypeError(f"{type(self).__name__}.reraise must be bool") 

936 

937 def _validate_min(self, attr_name: str, value: float, minimum: float) -> None: 

938 if value < minimum: 

939 raise ValueError(f"Invalid {type(self).__name__}.{attr_name}: must be >= {minimum} but got {value}") 

940 

941 def copy(self, **override_kwargs: Any) -> RetryPolicy: 

942 """Creates a new policy copying an existing one with the specified fields overridden for customization; thread-safe. 

943 

944 Example usage: policy = retry_policy.copy(max_sleep_secs=2, max_elapsed_secs=10) 

945 """ 

946 return dataclasses.replace(self, **override_kwargs) 

947 

948 

949############################################################################# 

950def _fn_not_implemented(_retry: Retry) -> NoReturn: 

951 """Default implementation of ``fn`` callback for RetryTemplate; always raises.""" 

952 raise NotImplementedError("Provide fn when calling RetryTemplate") 

953 

954 

955NO_LOGGER: Final[logging.Logger] = logging.Logger("NULL") # noqa: LOG001 do not register dummy logger with Logger.manager 

956NO_LOGGER.addHandler(logging.NullHandler()) # prevents lastResort fallback 

957NO_LOGGER.disabled = True 

958NO_LOGGER.propagate = False 

959_R = TypeVar("_R") 

960 

961 

962@dataclass(frozen=True) 

963@final 

964class RetryTemplate(Generic[_T]): 

965 """Convenience class that aggregates all knobs for call_with_retries(); and is itself callable too; immutable.""" 

966 

967 fn: Callable[[Retry], _T] = _fn_not_implemented # set this to make the RetryTemplate object itself callable 

968 policy: RetryPolicy = RetryPolicy() # specifies how ``RetryableError`` shall be retried 

969 backoff: BackoffStrategy = full_jitter_backoff_strategy # computes delay time before next retry attempt, after failure 

970 giveup: Callable[[AttemptOutcome], object | None] = no_giveup # stop retrying based on domain-specific logic, e.g. time 

971 before_attempt: Callable[[Retry], int] = before_attempt_noop # e.g. wait due to rate limiting or internal backpressure 

972 after_attempt: Callable[[AttemptOutcome], None] = after_attempt_log_failure # e.g. record metrics and/or custom logging 

973 on_retryable_error: Callable[[AttemptOutcome], None] = noop # e.g. count failures (RetryableError) caught by retry loop 

974 on_exhaustion: Callable[[AttemptOutcome], _T] = on_exhaustion_raise # raise error or return fallback value 

975 log: logging.Logger | None = None # set this to ``None`` to disable logging 

976 

977 def copy(self, **override_kwargs: Any) -> RetryTemplate[_T]: 

978 """Creates a new object copying an existing one with the specified fields overridden for customization; thread-safe. 

979 

980 Example usage: retry_template.copy(policy=policy.copy(max_sleep_secs=2, max_elapsed_secs=10), log=None) 

981 """ 

982 return dataclasses.replace(self, **override_kwargs) 

983 

984 def __call__(self) -> _T: 

985 """Invokes ``self.fn`` via the call_with_retries() retry loop using the stored parameters; thread-safe. 

986 

987 Example Usage: result: str = retry_template.copy(fn=...)() 

988 """ 

989 return call_with_retries( 

990 fn=self.fn, 

991 policy=self.policy, 

992 backoff=self.backoff, 

993 giveup=self.giveup, 

994 before_attempt=self.before_attempt, 

995 after_attempt=self.after_attempt, 

996 on_retryable_error=self.on_retryable_error, 

997 on_exhaustion=self.on_exhaustion, 

998 log=self.log, 

999 ) 

1000 

1001 def call_with_retries( 

1002 self, 

1003 fn: Callable[[Retry], _R], 

1004 policy: RetryPolicy | None = None, 

1005 *, 

1006 backoff: BackoffStrategy | None = None, 

1007 giveup: Callable[[AttemptOutcome], object | None] | None = None, 

1008 before_attempt: Callable[[Retry], int] | None = None, 

1009 after_attempt: Callable[[AttemptOutcome], None] | None = None, 

1010 on_retryable_error: Callable[[AttemptOutcome], None] | None = None, 

1011 on_exhaustion: Callable[[AttemptOutcome], _R] | None = None, 

1012 log: logging.Logger | None = None, # pass NO_LOGGER to override template logger and disable logging for this call 

1013 ) -> _R: 

1014 """Invokes ``fn`` via the call_with_retries() retry loop using the stored or overridden params; thread-safe. 

1015 

1016 Example Usage: result: str = retry_template.call_with_retries(fn=...) 

1017 """ 

1018 return call_with_retries( 

1019 fn=fn, 

1020 policy=self.policy if policy is None else policy, 

1021 backoff=self.backoff if backoff is None else backoff, 

1022 giveup=self.giveup if giveup is None else giveup, 

1023 before_attempt=self.before_attempt if before_attempt is None else before_attempt, 

1024 after_attempt=self.after_attempt if after_attempt is None else after_attempt, 

1025 on_retryable_error=self.on_retryable_error if on_retryable_error is None else on_retryable_error, 

1026 on_exhaustion=( 

1027 cast(Callable[[AttemptOutcome], _R], self.on_exhaustion) if on_exhaustion is None else on_exhaustion 

1028 ), 

1029 log=None if log is NO_LOGGER else self.log if log is None else log, 

1030 ) 

1031 

1032 async def call_with_retries_async( 

1033 self, 

1034 fn: Callable[[Retry], Awaitable[_R]], 

1035 policy: RetryPolicy | None = None, 

1036 *, 

1037 backoff: BackoffStrategy | None = None, 

1038 giveup: Callable[[AttemptOutcome], object | None] | None = None, 

1039 before_attempt: Callable[[Retry], int] | None = None, 

1040 after_attempt: Callable[[AttemptOutcome], None] | None = None, 

1041 on_retryable_error: Callable[[AttemptOutcome], None] | None = None, 

1042 on_exhaustion: Callable[[AttemptOutcome], _R] | None = None, 

1043 log: logging.Logger | None = None, # pass NO_LOGGER to override template logger and disable logging for this call 

1044 ) -> _R: 

1045 """Invokes ``fn`` via the call_with_retries_async() retry loop using the stored or overridden params; thread-safe. 

1046 

1047 Example Usage: result: str = await retry_template.call_with_retries_async(fn=...) 

1048 """ 

1049 return await call_with_retries_async( 

1050 fn=fn, 

1051 policy=self.policy if policy is None else policy, 

1052 backoff=self.backoff if backoff is None else backoff, 

1053 giveup=self.giveup if giveup is None else giveup, 

1054 before_attempt=self.before_attempt if before_attempt is None else before_attempt, 

1055 after_attempt=self.after_attempt if after_attempt is None else after_attempt, 

1056 on_retryable_error=self.on_retryable_error if on_retryable_error is None else on_retryable_error, 

1057 on_exhaustion=( 

1058 cast(Callable[[AttemptOutcome], _R], self.on_exhaustion) if on_exhaustion is None else on_exhaustion 

1059 ), 

1060 log=None if log is NO_LOGGER else self.log if log is None else log, 

1061 ) 

1062 

1063 @overload 

1064 def wraps(self, fn: Callable[..., Awaitable[_R]]) -> Callable[..., Awaitable[_R]]: ... 

1065 

1066 @overload 

1067 def wraps(self, fn: Callable[..., _R]) -> Callable[..., _R]: ... 

1068 

1069 def wraps(self, fn: Callable[..., Any]) -> Callable[..., Any]: 

1070 """Returns a wrapper function that forwards all arguments to ``fn`` and retries it using this template; thread-safe. 

1071 

1072 Example Usage: 

1073 def fn(x: int) -> int: 

1074 return x * 2 

1075 func: Callable[[int], int] = retry_template.wraps(fn) 

1076 y: int = func(5) # returns 10 

1077 """ 

1078 import functools 

1079 import inspect 

1080 

1081 target_fn: Callable[..., Any] = fn.func if isinstance(fn, functools.partial) else fn 

1082 if inspect.iscoroutinefunction(target_fn) or inspect.iscoroutinefunction(type(target_fn).__call__): 

1083 

1084 @functools.wraps(fn) 

1085 async def wrapped_async(*args: Any, **kwargs: Any) -> Any: 

1086 return await self.call_with_retries_async(fn=lambda _retry: fn(*args, **kwargs)) 

1087 

1088 return wrapped_async 

1089 else: 

1090 

1091 @functools.wraps(fn) 

1092 def wrapped(*args: Any, **kwargs: Any) -> Any: 

1093 return self.call_with_retries(fn=lambda _retry: fn(*args, **kwargs)) 

1094 

1095 return wrapped 

1096 

1097 

1098############################################################################# 

1099def raise_retryable_error_from( 

1100 exc: BaseException, 

1101 *, 

1102 display_msg: object = None, 

1103 retry_immediately_once: bool = False, 

1104 category: object = None, 

1105 attachment: object = None, 

1106) -> NoReturn: 

1107 """Convenience function that raises a generic RetryableError that wraps the given underlying exception.""" 

1108 raise RetryableError( 

1109 display_msg=type(exc).__name__ if display_msg is None else display_msg, 

1110 retry_immediately_once=retry_immediately_once, 

1111 category=category, 

1112 attachment=attachment, 

1113 ) from exc 

1114 

1115 

1116ExceptionPredicate = Union[bool, Callable[[BaseException], bool]] # Type alias 

1117 

1118 

1119def call_with_exception_handlers( 

1120 fn: Callable[[], _T], # typically a lambda 

1121 *, 

1122 continue_scanning_if_no_predicate_matches: bool = False, 

1123 handlers: Mapping[type[BaseException], Sequence[tuple[ExceptionPredicate, Callable[[BaseException], _T]]]], 

1124) -> _T: 

1125 """Convenience function that calls ``fn`` and returns its result; on exception runs the first matching handler in a per- 

1126 exception handler chain; composes independent handlers via predicates into one function, in Event-Predicate-Action style. 

1127 

1128 Lookup uses the exception type's Method Resolution Order (most-specific class in the exception class hierarchy wins). For 

1129 the first class that exists as a key in ``handlers``, its chain is scanned in order. Each chain element is 

1130 ``(predicate, handler)`` where ``predicate`` is either ``True`` (always matches), ``False`` (disabled), or 

1131 ``predicate(exc) -> bool``. The first matching handler is called with the exception and its return value is returned. If 

1132 no predicate matches then, by default, the original exception is re-raised and no less-specific handler chains are 

1133 consulted. Set ``continue_scanning_if_no_predicate_matches=True`` to continue scanning exception base classes instead. 

1134 

1135 Typically (but not necessarily) the handler raises a ``RetryableError``, via ``raise_retryable_error_from`` or similar. 

1136 Or it may raise another exception type (which will not be retried), or even return a fallback value instead of raising. 

1137 

1138 Example: turn transient ssh/zfs command failures into RetryableError for call_with_retries(), including feature flags: 

1139 

1140 def run_remote(retry: Retry) -> str: 

1141 p = subprocess.run(["ssh", "foo.example.com", "zfs", "list", "-H"], text=True, capture_output=True, check=True) 

1142 return p.stdout 

1143 

1144 def fn(retry: Retry) -> str: 

1145 return call_with_exception_handlers( 

1146 fn=lambda: run_remote(retry), 

1147 handlers={ 

1148 TimeoutError: [(True, raise_retryable_error_from)], 

1149 ConnectionResetError: [(True, lambda exc: raise_retryable_error_from(exc, display_msg="ssh reset"))], 

1150 subprocess.CalledProcessError: [ 

1151 (lambda exc: exc.returncode == 255, lambda exc: raise_retryable_error_from(exc, display_msg="ssh error")), 

1152 (lambda exc: "cannot receive" in (exc.stderr or ""), lambda exc: raise_retryable_error_from(exc, display_msg="zfs recv")), 

1153 ], 

1154 OSError: [ 

1155 (lambda exc: getattr(exc, "errno", None) in {errno.ETIMEDOUT, errno.EHOSTUNREACH}, 

1156 lambda exc: raise_retryable_error_from(exc, display_msg=f"network: {exc}")), 

1157 (False, lambda exc: raise_retryable_error_from(exc, display_msg="disabled handler example")), 

1158 ], 

1159 }, 

1160 ) 

1161 

1162 stdout: str = call_with_retries(fn=fn, policy=RetryPolicy(max_retries=3)) 

1163 

1164 Example: return a fallback value (no retry loop required): 

1165 

1166 def read_optional_file(path: str) -> str: 

1167 return call_with_exception_handlers( 

1168 fn=lambda: open(path, encoding="utf-8").read(), 

1169 handlers={FileNotFoundError: [(True, lambda _exc: "")]}, 

1170 ) 

1171 """ 

1172 try: 

1173 return fn() 

1174 except BaseException as exc: 

1175 for cls in type(exc).__mro__: 

1176 handler_chain = handlers.get(cls) 

1177 if handler_chain is not None: 

1178 for predicate, handler in handler_chain: 

1179 if predicate is True or (predicate is not False and predicate(exc)): 

1180 return handler(exc) 

1181 if not continue_scanning_if_no_predicate_matches: 

1182 raise 

1183 raise 

1184 

1185 

1186############################################################################# 

1187@final 

1188class _ThreadLocalRNG(threading.local): 

1189 """Caches a per-thread random number generator.""" 

1190 

1191 def __init__(self) -> None: 

1192 self.rng: random.Random | None = None 

1193 

1194 

1195_THREAD_LOCAL_RNG: Final[_ThreadLocalRNG] = _ThreadLocalRNG() 

1196 

1197 

1198def _thread_local_rng() -> random.Random: 

1199 """Returns a per-thread RNG for backoff jitter; for perf avoids locking and initializing a new random.Random() at high 

1200 frequency.""" 

1201 threadlocal: _ThreadLocalRNG = _THREAD_LOCAL_RNG 

1202 rng: random.Random | None = threadlocal.rng 

1203 if rng is None: 

1204 rng = random.Random() # noqa: S311 jitter isn't security sensitive, and random.SystemRandom.randint() is slow 

1205 threadlocal.rng = rng 

1206 return rng