Coverage for bzfs_main / util / retry.py: 100%

514 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-24 10:16 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Customizable generic retry framework; defaults to jittered exponential backoff with cap unless specified otherwise. 

16 

17Purpose: 

18-------- 

19- Provide a reusable retry helper for transient failures using customizable policy and callbacks. 

20- Centralize backoff, jitter, logging and metrics behavior while keeping call sites compact. 

21- Prevent accidental retries: the loop retries only when the developer explicitly raises a ``RetryableError``, which reduces 

22 the risk of retrying non-idempotent operations. 

23- Provide a thread-safe, fast implementation; avoid shared RNG contention. 

24- Provide both sync and async API, both with the same semantics, except async API awaits ``fn`` and uses non-blocking sleep. 

25- Avoid unnecessary complexity and add zero dependencies beyond the Python standard library. Everything you need is in this 

26 single Python file. 

27 

28Usage: 

29------ 

30- Wrap work in a callable ``fn(retry: Retry)`` and therein raise ``RetryableError`` for failures that should be retried. 

31- Construct a policy via ``RetryPolicy(...)`` that specifies how ``RetryableError`` shall be retried. 

32- Invoke ``call_with_retries(fn=fn, policy=policy, log=logger)`` or call_with_retries_async with a standard logging.Logger. 

33- On success, the result of calling ``fn`` is returned. 

34- By default on exhaustion, call_with_retries* either re-raises the last underlying ``RetryableError.__cause__``, or raises 

35 ``RetryError`` (wrapping the last ``RetryableError``), like so: 

36 - if ``RetryPolicy.reraise`` is True and the last ``RetryableError.__cause__`` is not None, re-raise the last 

37 ``RetryableError.__cause__`` with its original traceback. 

38 - Otherwise, raise ``RetryError`` (wrapping the last ``RetryableError``, preserving its ``__cause__`` chain). 

39 - The default is ``RetryPolicy.reraise=True``. 

40 

41Advanced Configuration: 

42----------------------- 

43- Tune ``RetryPolicy`` parameters to control maximum retries, sleep bounds, elapsed-time budget, logging, etc. 

44- Use ``RetryPolicy.config: RetryConfig`` to control logging settings. 

45- Set ``log=None`` to disable logging, or customize ``info_loglevel`` / ``warning_loglevel`` for structured logs. 

46- Supply a ``giveup(AttemptOutcome)`` callback to stop retrying based on domain-specific logic (for example, decisions based 

47 on time budget/quota or the previous N most recent AttemptOutcome objects (via AttemptOutcome.retry.previous_outcomes). 

48- Use the ``any_giveup()`` / ``all_giveup()`` helper to consult more than one callback handler in ``giveup(AttemptOutcome)``. 

49- Supply an ``on_exhaustion(AttemptOutcome)`` callback to customize behavior when giving up; it may raise an error or return 

50 a fallback value. 

51 

52Observability: 

53-------------- 

54- Supply an ``after_attempt(AttemptOutcome)`` callback to collect per-attempt metrics such as success flag, 

55 exhausted/terminated state, attempt number, total elapsed duration (in nanoseconds), sleep duration (in nanoseconds), etc. 

56- ``AttemptOutcome.result`` is either the successful result or the most recent ``RetryableError``, enabling integration with 

57 metrics and tracing systems without coupling the retry loop to any specific backend. 

58- Supply an ``after_attempt(AttemptOutcome)`` callback to customize logging 100%, if necessary. 

59- Use the ``multi_after_attempt()`` helper to invoke more than one callback handler in ``after_attempt(AttemptOutcome)``. 

60 

61Expert Configuration: 

62--------------------- 

63- Supply a ``backoff(BackoffContext)`` callback to plug in a custom backoff algorithm (e.g., decorrelated-jitter or 

64 retry-after HTTP 429). The default is full-jitter exponential backoff with cap (aka industry standard). 

65- Supply a ``before_attempt(Retry)`` callback for optional rate limiting or other forms of internal backpressure. 

66- Supply a ``on_retryable_error(AttemptOutcome)`` callback, e.g. to count failures (RetryableError) caught by the retry loop. 

67- Set ``RetryPolicy.max_previous_outcomes > 0`` to pass the N most recent AttemptOutcome objects to callbacks (default is 0). 

68- If ``RetryPolicy.max_previous_outcomes > 0``, you can use ``RetryableError(..., attachment=...)`` to carry domain-specific 

69 state from a failed attempt to the next attempt via ``retry.previous_outcomes``. This pattern helps if attempt N+1 is a 

70 function of attempt N or all prior attempts (e.g., switching endpoints or resuming from an offset). 

71- Use ``RetryTemplate`` as a 'bag of knobs' configuration template for functions that shall be retried in similar ways. 

72- Or package up all knobs plus a ``fn(retry: Retry)`` function into a self-contained auto-retrying higher level function by 

73 constructing a ``RetryTemplate`` object (which is a ``Callable`` function itself). 

74- To keep calling code retry-transparent, set ``RetryPolicy.reraise=True`` (the default) *and* raise retryable failures as 

75 ``raise RetryableError(...) from exc``. Client code now won't notice whether call_with_retries* is used or not. 

76- To make exhaustion observable to calling code, set ``RetryPolicy.reraise=False``: by default call_with_retries* now always 

77 raises ``RetryError`` (wrapping the last ``RetryableError``) on exhaustion, so callers now catch ``RetryError`` and can 

78 inspect the last underlying exception via ``err.outcome``, ``err.__cause__``, and even ``err.__cause__.__cause__`` when 

79 present. 

80- Set ``RetryPolicy.timing`` to customize reading the current monotonic time, sleeping and optional async termination. 

81- The callback API is powerful enough to easily plug in advanced retry algorithms such as: 

82 - Google SRE Client-Side Adaptive Throttling - https://sre.google/sre-book/handling-overload/ 

83 - gRPC retry throttling - https://grpc.io/docs/guides/retry/ 

84 - AWS SDK adaptive retry mode - https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html 

85 - Circuit breakers - https://martinfowler.com/bliki/CircuitBreaker.html (e.g. via `pybreaker` third-party library) 

86 - Rate limiting with Fixed Window, Moving Window, and Sliding Windows (e.g. via `limits` third-party library) 

87 - Various example backoff strategies such as decorrelated-jitter or retry-after HTTP 429 "Too Many Requests" responses, 

88 etc can be found in test_retry_examples.py. 

89 

90Example Usage: 

91-------------- 

92 import logging 

93 from bzfs_main.util.retry import Retry, RetryPolicy, RetryableError, call_with_retries 

94 

95 def unreliable_operation(retry: Retry) -> str: 

96 try: 

97 if retry.count < 3: 

98 raise ValueError("temporary failure connecting to foo.example.com") 

99 return "ok" 

100 except ValueError as exc: 

101 # Preserve the underlying cause for correct error propagation and logging 

102 raise RetryableError(display_msg="connect") from exc 

103 

104 retry_policy = RetryPolicy( 

105 max_retries=10, 

106 min_sleep_secs=0, 

107 initial_max_sleep_secs=0.125, 

108 max_sleep_secs=10, 

109 max_elapsed_secs=60, 

110 ) 

111 log = logging.getLogger(__name__) 

112 result: str = call_with_retries(fn=unreliable_operation, policy=retry_policy, log=log) 

113 print(result) 

114 

115 # Sample log output: 

116 # INFO:Retrying connect [1/10] in 8.79ms ... 

117 # INFO:Retrying connect [2/10] in 90.1ms ... 

118 # INFO:Retrying connect [3/10] in 372ms ... 

119 # ok 

120 

121Background: 

122----------- 

123For background on exponential backoff and jitter, see for example 

124https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter 

125""" 

126 

127from __future__ import ( 

128 annotations, 

129) 

130import argparse 

131import dataclasses 

132import logging 

133import random 

134import threading 

135import time 

136from collections.abc import ( 

137 Awaitable, 

138 Iterable, 

139 Mapping, 

140 Sequence, 

141) 

142from dataclasses import ( 

143 dataclass, 

144) 

145from typing import ( 

146 TYPE_CHECKING, 

147 Any, 

148 Callable, 

149 Final, 

150 Generic, 

151 NamedTuple, 

152 NoReturn, 

153 TypeVar, 

154 Union, 

155 cast, 

156 final, 

157 overload, 

158) 

159 

160if TYPE_CHECKING: 

161 import asyncio 

162 

163from bzfs_main.util.utils import ( 

164 human_readable_duration, 

165) 

166 

167# constants: 

168INFINITY_MAX_RETRIES: Final[int] = 2**90 - 1 # a number that's essentially infinity for all practical retry purposes 

169 

170 

171############################################################################# 

172def full_jitter_backoff_strategy(context: BackoffContext) -> tuple[int, int]: 

173 """Default implementation of ``backoff`` callback for call_with_retries(); computes delay time before next retry attempt, 

174 after failure. 

175 

176 Full-jitter picks a random sleep_nanos duration from the range [min_sleep_nanos, curr_max_sleep_nanos] and applies 

177 exponential backoff with cap to the next attempt; thread-safe. Typically, min_sleep_nanos is 0 and exponential_base is 2. 

178 Example curr_max_sleep_nanos sequence: 125ms --> 250ms --> 500ms --> 1s --> 2s --> 4s --> 8s --> 10s --> 10s... 

179 Full-jitter provides optimal balance between reducing server load and minimizing retry latency. 

180 """ 

181 policy: RetryPolicy = context.retry.policy 

182 curr_max_sleep_nanos: int = context.curr_max_sleep_nanos 

183 if policy.min_sleep_nanos == curr_max_sleep_nanos: 

184 sleep_nanos = curr_max_sleep_nanos # perf 

185 else: 

186 sleep_nanos = context.rng.randint(policy.min_sleep_nanos, curr_max_sleep_nanos) # nanos to delay until next attempt 

187 curr_max_sleep_nanos = round(curr_max_sleep_nanos * policy.exponential_base) # exponential backoff 

188 curr_max_sleep_nanos = min(curr_max_sleep_nanos, policy.max_sleep_nanos) # ... with cap for next attempt 

189 return sleep_nanos, curr_max_sleep_nanos 

190 

191 

192def no_giveup(outcome: AttemptOutcome) -> object | None: 

193 """Default implementation of ``giveup`` callback for call_with_retries(); never gives up; returning anything other than 

194 ``None`` indicates to give up retrying; thread-safe.""" 

195 return None # don't give up retrying 

196 

197 

198def before_attempt_noop(retry: Retry) -> int: 

199 """Default implementation of ``before_attempt`` callback for call_with_retries(); does nothing; thread-safe.""" 

200 return 0 

201 

202 

203def after_attempt_log_failure(outcome: AttemptOutcome) -> None: 

204 """Default implementation of ``after_attempt`` callback for call_with_retries(); performs simple logging of retry attempt 

205 failures; thread-safe.""" 

206 retry: Retry = outcome.retry 

207 policy: RetryPolicy = retry.policy 

208 config: RetryConfig = policy.config 

209 if outcome.is_success or retry.log is None or not config.enable_logging: 

210 return 

211 log: logging.Logger = retry.log 

212 assert isinstance(outcome.result, RetryableError) 

213 retryable_error: RetryableError = outcome.result 

214 if not outcome.is_exhausted: 

215 if log.isEnabledFor(config.info_loglevel): # Retrying X in Y ms ... 

216 m1: str = config.format_msg(config.display_msg, retryable_error) 

217 m2: str = config.format_pair(retry.count + 1, policy.max_retries) 

218 m3: str = config.format_duration(outcome.sleep_nanos) 

219 log.log(config.info_loglevel, "%s", f"{m1}{m2} in {m3}{config.dots}", extra=config.extra) 

220 else: 

221 if policy.max_retries > 0 and log.isEnabledFor(config.warning_loglevel) and not outcome.is_terminated: 

222 reason: str = "" if outcome.giveup_reason is None else f"{outcome.giveup_reason}; " 

223 format_duration: Callable[[int], str] = config.format_duration # lambda: nanos 

224 log.log( 

225 config.warning_loglevel, 

226 "%s", 

227 f"{config.format_msg(config.display_msg, retryable_error)}" 

228 f"exhausted; giving up because {reason}the last " 

229 f"{config.format_pair(retry.count, policy.max_retries)} retries across " 

230 f"{config.format_pair(format_duration(outcome.elapsed_nanos), format_duration(policy.max_elapsed_nanos))} " 

231 "failed", 

232 exc_info=retryable_error if config.exc_info else None, 

233 stack_info=config.stack_info, 

234 extra=config.extra, 

235 ) 

236 

237 

238def noop(outcome: AttemptOutcome) -> None: 

239 """Default implementation of ``on_retryable_error`` callback for call_with_retries(); does nothing; thread-safe.""" 

240 

241 

242def on_exhaustion_raise(outcome: AttemptOutcome) -> NoReturn: 

243 """Default implementation of ``on_exhaustion`` callback for call_with_retries(); always raises; thread-safe.""" 

244 assert outcome.is_exhausted 

245 assert isinstance(outcome.result, RetryableError) 

246 retryable_error: RetryableError = outcome.result 

247 policy: RetryPolicy = outcome.retry.policy 

248 cause: BaseException | None = retryable_error.__cause__ 

249 if policy.reraise and cause is not None: 

250 raise cause.with_traceback(cause.__traceback__) 

251 raise RetryError(outcome=outcome) from retryable_error 

252 

253 

254############################################################################# 

255_T = TypeVar("_T") 

256 

257 

258def call_with_retries( 

259 fn: Callable[[Retry], _T], # typically a lambda; wraps work and raises RetryableError for failures that shall be retried 

260 policy: RetryPolicy, # specifies how ``RetryableError`` shall be retried 

261 *, 

262 backoff: BackoffStrategy = full_jitter_backoff_strategy, # computes delay time before next retry attempt, after failure 

263 giveup: Callable[[AttemptOutcome], object | None] = no_giveup, # stop retrying based on domain-specific logic, e.g. time 

264 before_attempt: Callable[[Retry], int] = before_attempt_noop, # e.g. wait due to rate limiting or internal backpressure 

265 after_attempt: Callable[[AttemptOutcome], None] = after_attempt_log_failure, # e.g. record metrics and/or custom logging 

266 on_retryable_error: Callable[[AttemptOutcome], None] = noop, # e.g. count failures (RetryableError) caught by retry loop 

267 on_exhaustion: Callable[[AttemptOutcome], _T] = on_exhaustion_raise, # raise error or return fallback value 

268 log: logging.Logger | None = None, # set this to ``None`` to disable logging 

269) -> _T: 

270 """Runs the function ``fn`` and returns its result; retries on failure as indicated by policy; thread-safe. 

271 

272 By default on exhaustion, call_with_retries() either re-raises the last underlying ``RetryableError.__cause__``, or raises 

273 ``RetryError`` (wrapping the last ``RetryableError``), like so: 

274 - if ``RetryPolicy.reraise`` is True and the last ``RetryableError.__cause__`` is not None, re-raise the last 

275 ``RetryableError.__cause__`` with its original traceback. 

276 - Otherwise, raise ``RetryError`` (wrapping the last ``RetryableError``, preserving its ``__cause__`` chain). 

277 - The default is ``RetryPolicy.reraise=True``. 

278 

279 On the exhaustion path, ``on_exhaustion`` will be called exactly once (after the final after_attempt). The default 

280 implementation raises as described above; custom ``on_exhaustion`` impls may return a fallback value instead of an error. 

281 """ 

282 rng: random.Random | None = None 

283 retry_count: int = 0 

284 idle_nanos: int = 0 

285 curr_max_sleep_nanos: int = policy.initial_max_sleep_nanos 

286 previous_outcomes: tuple[AttemptOutcome, ...] = () # for safety pass *immutable* deque to callbacks 

287 timing: RetryTiming = policy.timing 

288 sleep: Callable[[int, Retry], None] = timing.sleep 

289 monotonic_ns: Callable[[], int] = timing.monotonic_ns 

290 call_start_nanos: Final[int] = monotonic_ns() 

291 while True: 

292 before_attempt_nanos: int = monotonic_ns() if retry_count != 0 else call_start_nanos 

293 prev: tuple[AttemptOutcome, ...] = previous_outcomes 

294 retry: Retry = Retry( 

295 retry_count, call_start_nanos, before_attempt_nanos, before_attempt_nanos, idle_nanos, policy, log, prev 

296 ) 

297 try: 

298 if before_attempt is not before_attempt_noop: 

299 before_attempt_sleep_nanos: int = before_attempt(retry) 

300 assert before_attempt_sleep_nanos >= 0, before_attempt_sleep_nanos 

301 if before_attempt_sleep_nanos > 0: 

302 sleep(before_attempt_sleep_nanos, retry) # e.g. wait due to rate limiting or internal backpressure 

303 attempt_start_nanos: int = monotonic_ns() 

304 idle_nanos += attempt_start_nanos - before_attempt_nanos 

305 retry = Retry( 

306 retry_count, call_start_nanos, before_attempt_nanos, attempt_start_nanos, idle_nanos, policy, log, prev 

307 ) 

308 timing.on_before_attempt(retry) 

309 result: _T = fn(retry) # Call the target function and supply retry attempt number and other metadata 

310 if after_attempt is not after_attempt_log_failure: 

311 elapsed_nanos: int = monotonic_ns() - call_start_nanos 

312 outcome: AttemptOutcome = AttemptOutcome(retry, True, False, False, None, elapsed_nanos, 0, result) 

313 after_attempt(outcome) 

314 return result 

315 except RetryableError as retryable_error: 

316 elapsed_nanos = monotonic_ns() - call_start_nanos 

317 is_terminated: Callable[[Retry], bool] = timing.is_terminated 

318 giveup_reason: object | None = None 

319 sleep_nanos: int = 0 

320 outcome = AttemptOutcome(retry, False, False, False, None, elapsed_nanos, sleep_nanos, retryable_error) 

321 on_retryable_error(outcome) # e.g. count failures (RetryableError) caught by retry loop 

322 if retry_count < policy.max_retries and elapsed_nanos < policy.max_elapsed_nanos: 

323 if policy.max_sleep_nanos == 0 and backoff is full_jitter_backoff_strategy: 

324 pass # perf: e.g. spin-before-block 

325 elif retry_count == 0 and retryable_error.retry_immediately_once: 

326 pass # retry once immediately without backoff 

327 else: # jitter: default backoff strategy picks random sleep_nanos in [min_sleep_nanos, curr_max_sleep_nanos] 

328 rng = _thread_local_rng() if rng is None else rng 

329 sleep_nanos, curr_max_sleep_nanos = backoff( # compute delay before next retry attempt, after failure 

330 BackoffContext(retry, curr_max_sleep_nanos, rng, elapsed_nanos, retryable_error) 

331 ) 

332 assert sleep_nanos >= 0 and curr_max_sleep_nanos >= 0, sleep_nanos 

333 

334 if sleep_nanos > 0: 

335 outcome = AttemptOutcome(retry, False, False, False, None, elapsed_nanos, sleep_nanos, retryable_error) 

336 if (not is_terminated(retry)) and (giveup_reason := giveup(outcome)) is None: 

337 after_attempt(outcome) 

338 sleep(sleep_nanos, retry) 

339 idle_nanos += sleep_nanos 

340 if not is_terminated(retry): 

341 n: int = policy.max_previous_outcomes 

342 if n > 0: # outcome will be passed to next attempt via Retry.previous_outcomes 

343 if previous_outcomes: # detach to reduce memory footprint 

344 outcome = outcome.copy(retry=retry.copy(previous_outcomes=())) 

345 previous_outcomes = previous_outcomes[len(previous_outcomes) - n + 1 :] + (outcome,) # imm deque 

346 del outcome # help gc 

347 retry_count += 1 

348 continue # continue retry loop with next attempt 

349 else: 

350 sleep_nanos = 0 

351 outcome = AttemptOutcome( 

352 retry, False, True, is_terminated(retry), giveup_reason, elapsed_nanos, sleep_nanos, retryable_error 

353 ) 

354 after_attempt(outcome) 

355 return on_exhaustion(outcome) # raise error or return fallback value 

356 

357 

358async def call_with_retries_async( 

359 fn: Callable[[Retry], Awaitable[_T]], # wraps work and raises RetryableError for failures that shall be retried 

360 policy: RetryPolicy, # specifies how ``RetryableError`` shall be retried 

361 *, 

362 backoff: BackoffStrategy = full_jitter_backoff_strategy, # computes delay time before next retry attempt, after failure 

363 giveup: Callable[[AttemptOutcome], object | None] = no_giveup, # stop retrying based on domain-specific logic, e.g. time 

364 before_attempt: Callable[[Retry], int] = before_attempt_noop, # e.g. wait due to rate limiting or internal backpressure 

365 after_attempt: Callable[[AttemptOutcome], None] = after_attempt_log_failure, # e.g. record metrics and/or custom logging 

366 on_retryable_error: Callable[[AttemptOutcome], None] = noop, # e.g. count failures (RetryableError) caught by retry loop 

367 on_exhaustion: Callable[[AttemptOutcome], _T] = on_exhaustion_raise, # raise error or return fallback value 

368 log: logging.Logger | None = None, # set this to ``None`` to disable logging 

369) -> _T: 

370 """Async version of call_with_retries() with the same semantics except it awaits ``fn`` and uses non-blocking sleep.""" 

371 rng: random.Random | None = None 

372 retry_count: int = 0 

373 idle_nanos: int = 0 

374 curr_max_sleep_nanos: int = policy.initial_max_sleep_nanos 

375 previous_outcomes: tuple[AttemptOutcome, ...] = () # for safety pass *immutable* deque to callbacks 

376 timing: RetryTiming = policy.timing 

377 sleep: Callable[[int, Retry], Awaitable[None]] = timing.sleep_async 

378 monotonic_ns: Callable[[], int] = timing.monotonic_ns 

379 call_start_nanos: Final[int] = monotonic_ns() 

380 while True: 

381 before_attempt_nanos: int = monotonic_ns() if retry_count != 0 else call_start_nanos 

382 prev: tuple[AttemptOutcome, ...] = previous_outcomes 

383 retry: Retry = Retry( 

384 retry_count, call_start_nanos, before_attempt_nanos, before_attempt_nanos, idle_nanos, policy, log, prev 

385 ) 

386 try: 

387 if before_attempt is not before_attempt_noop: 

388 before_attempt_sleep_nanos: int = before_attempt(retry) 

389 assert before_attempt_sleep_nanos >= 0, before_attempt_sleep_nanos 

390 if before_attempt_sleep_nanos > 0: 

391 await sleep(before_attempt_sleep_nanos, retry) # e.g. wait due to rate limiting or internal backpressure 

392 attempt_start_nanos: int = monotonic_ns() 

393 idle_nanos += attempt_start_nanos - before_attempt_nanos 

394 retry = Retry( 

395 retry_count, call_start_nanos, before_attempt_nanos, attempt_start_nanos, idle_nanos, policy, log, prev 

396 ) 

397 timing.on_before_attempt(retry) 

398 result: _T = await fn(retry) # Call the target function and supply retry attempt number and other metadata 

399 if after_attempt is not after_attempt_log_failure: 

400 elapsed_nanos: int = monotonic_ns() - call_start_nanos 

401 outcome: AttemptOutcome = AttemptOutcome(retry, True, False, False, None, elapsed_nanos, 0, result) 

402 after_attempt(outcome) 

403 return result 

404 except RetryableError as retryable_error: 

405 elapsed_nanos = monotonic_ns() - call_start_nanos 

406 is_terminated: Callable[[Retry], bool] = timing.is_terminated 

407 giveup_reason: object | None = None 

408 sleep_nanos: int = 0 

409 outcome = AttemptOutcome(retry, False, False, False, None, elapsed_nanos, sleep_nanos, retryable_error) 

410 on_retryable_error(outcome) # e.g. count failures (RetryableError) caught by retry loop 

411 if retry_count < policy.max_retries and elapsed_nanos < policy.max_elapsed_nanos: 

412 if policy.max_sleep_nanos == 0 and backoff is full_jitter_backoff_strategy: 

413 pass # perf: e.g. spin-before-block 

414 elif retry_count == 0 and retryable_error.retry_immediately_once: 

415 pass # retry once immediately without backoff 

416 else: # jitter: default backoff strategy picks random sleep_nanos in [min_sleep_nanos, curr_max_sleep_nanos] 

417 rng = _thread_local_rng() if rng is None else rng 

418 sleep_nanos, curr_max_sleep_nanos = backoff( # compute delay before next retry attempt, after failure 

419 BackoffContext(retry, curr_max_sleep_nanos, rng, elapsed_nanos, retryable_error) 

420 ) 

421 assert sleep_nanos >= 0 and curr_max_sleep_nanos >= 0, sleep_nanos 

422 

423 if sleep_nanos > 0: 

424 outcome = AttemptOutcome(retry, False, False, False, None, elapsed_nanos, sleep_nanos, retryable_error) 

425 if (not is_terminated(retry)) and (giveup_reason := giveup(outcome)) is None: 

426 after_attempt(outcome) 

427 await sleep(sleep_nanos, retry) 

428 idle_nanos += sleep_nanos 

429 if not is_terminated(retry): 

430 n: int = policy.max_previous_outcomes 

431 if n > 0: # outcome will be passed to next attempt via Retry.previous_outcomes 

432 if previous_outcomes: # detach to reduce memory footprint 

433 outcome = outcome.copy(retry=retry.copy(previous_outcomes=())) 

434 previous_outcomes = previous_outcomes[len(previous_outcomes) - n + 1 :] + (outcome,) # imm deque 

435 del outcome # help gc 

436 retry_count += 1 

437 continue # continue retry loop with next attempt 

438 else: 

439 sleep_nanos = 0 

440 outcome = AttemptOutcome( 

441 retry, False, True, is_terminated(retry), giveup_reason, elapsed_nanos, sleep_nanos, retryable_error 

442 ) 

443 after_attempt(outcome) 

444 return on_exhaustion(outcome) # raise error or return fallback value 

445 

446 

447def multi_after_attempt(handlers: Iterable[Callable[[AttemptOutcome], None]]) -> Callable[[AttemptOutcome], None]: 

448 """Composes independent ``after_attempt`` handlers into one ``call_with_retries(after_attempt=...)`` callback that 

449 invokes each handler in order; thread-safe.""" 

450 handlers = tuple(handlers) 

451 if len(handlers) == 1: 

452 return handlers[0] # perf 

453 

454 def _after_attempt(outcome: AttemptOutcome) -> None: 

455 for handler in handlers: 

456 handler(outcome) 

457 

458 return _after_attempt 

459 

460 

461def any_giveup(handlers: Iterable[Callable[[AttemptOutcome], object | None]]) -> Callable[[AttemptOutcome], object | None]: 

462 """Composes independent ``giveup`` handlers into one ``call_with_retries(giveup=...)`` callback that gives up retrying if 

463 *any* handler gives up; that is if any handler returns a non-``None`` reason; thread-safe. 

464 

465 Handlers are evaluated in order and short-circuit: On giving up returns the first handler's reason for giving up. 

466 """ 

467 handlers = tuple(handlers) 

468 if len(handlers) == 1: 

469 return handlers[0] # perf 

470 

471 def _giveup(outcome: AttemptOutcome) -> object | None: 

472 for handler in handlers: 

473 giveup_reason: object | None = handler(outcome) 

474 if giveup_reason is not None: 

475 return giveup_reason 

476 return None # don't give up retrying 

477 

478 return _giveup 

479 

480 

481def all_giveup(handlers: Iterable[Callable[[AttemptOutcome], object | None]]) -> Callable[[AttemptOutcome], object | None]: 

482 """Composes independent ``giveup`` handlers into one ``call_with_retries(giveup=...)`` callback that gives up retrying if 

483 *all* handlers give up; that is if all handlers return a non-``None`` reason; thread-safe. 

484 

485 Handlers are evaluated in order and short-circuit: stops at first ``None``; else returns the last non-``None`` reason. 

486 """ 

487 handlers = tuple(handlers) 

488 if len(handlers) == 1: 

489 return handlers[0] # perf 

490 

491 def _giveup(outcome: AttemptOutcome) -> object | None: 

492 giveup_reason: object | None = None 

493 for handler in handlers: 

494 giveup_reason = handler(outcome) 

495 if giveup_reason is None: 

496 return None # don't give up retrying 

497 return giveup_reason 

498 

499 return _giveup 

500 

501 

502############################################################################# 

503class RetryableError(Exception): 

504 """Indicates that the task that caused the underlying exception can be retried and might eventually succeed; 

505 ``call_with_retries()`` will pass this exception to callbacks via ``AttemptOutcome.result``; can be subclassed.""" 

506 

507 def __init__( 

508 self, 

509 *exc_args: object, # optional args passed into super().__init__() 

510 display_msg: object = None, # for logging 

511 retry_immediately_once: bool = False, # retry once immediately without backoff? 

512 category: object = None, # optional classification e.g. "CONCURRENCY, "SERVER_ISSUE", "THROTTLING", "TRANSIENT", ... 

513 attachment: object = None, # optional domain specific info passed to next attempt via Retry.previous_outcomes if 

514 # RetryPolicy.max_previous_outcomes > 0. This helps when retrying is not just 'try again later', but 

515 # 'try again differently based on what just happened'. 

516 # Examples: switching network endpoints, adjusting per-attempt timeouts, capping retries by error-class, resuming 

517 # with a token/offset, maintaining failure history for this invocation of call_with_retries(). 

518 # Example: 'cap retries to 3 for ECONNREFUSED but 12 for ETIMEDOUT' via attachment=collections.Counter 

519 ) -> None: 

520 super().__init__(*exc_args) 

521 self.display_msg: object = display_msg 

522 self.retry_immediately_once: bool = retry_immediately_once 

523 self.category: object = category 

524 self.attachment: object = attachment 

525 

526 def display_msg_str(self) -> str: 

527 """Returns the display_msg as a str; for logging.""" 

528 return "" if self.display_msg is None else str(self.display_msg) 

529 

530 

531############################################################################# 

532@final 

533class RetryError(Exception): 

534 """Indicates that retries have been exhausted; the last RetryableError is in RetryError.__cause__.""" 

535 

536 outcome: Final[AttemptOutcome] 

537 """Metadata that describes why and how call_with_retries() gave up.""" 

538 

539 def __init__(self, outcome: AttemptOutcome) -> None: 

540 super().__init__(outcome) 

541 self.outcome = outcome 

542 

543 

544############################################################################# 

545@final 

546class Retry(NamedTuple): 

547 """Attempt metadata provided to callback functions; includes the current retry attempt number; immutable.""" 

548 

549 count: int # type: ignore[assignment] 

550 """Attempt number; count=0 is the first attempt, count=1 is the second attempt aka first retry.""" 

551 

552 call_start_time_nanos: int 

553 """Value of time.monotonic_ns() at start of call_with_retries() invocation.""" 

554 

555 before_attempt_start_time_nanos: int 

556 """Value of time.monotonic_ns() at start of before_attempt() invocation.""" 

557 

558 attempt_start_time_nanos: int 

559 """Value of time.monotonic_ns() at start of fn() invocation.""" 

560 

561 idle_nanos: int 

562 """Sum of all before_attempt_sleep_nanos() plus AttemptOutcome.sleep_nanos across this call_with_retries() invocation, at 

563 the start of fn() invocation.""" 

564 

565 policy: RetryPolicy 

566 """Policy that was passed into call_with_retries().""" 

567 

568 log: logging.Logger | None 

569 """Logger that was passed into call_with_retries().""" 

570 

571 previous_outcomes: Sequence[AttemptOutcome] 

572 """History/state of the N=max_previous_outcomes most recent outcomes for the current call_with_retries() invocation.""" 

573 

574 def copy(self, **override_kwargs: Any) -> Retry: 

575 """Creates a new object copying an existing one with the specified fields overridden for customization.""" 

576 return self._replace(**override_kwargs) 

577 

578 def before_attempt_sleep_nanos(self) -> int: 

579 """Returns duration between the start of before_attempt() and the start of fn() attempt.""" 

580 return self.attempt_start_time_nanos - self.before_attempt_start_time_nanos 

581 

582 def __repr__(self) -> str: 

583 return ( 

584 f"{type(self).__name__}(count={self.count!r}, call_start_time_nanos={self.call_start_time_nanos!r}, " 

585 f"before_attempt_start_time_nanos={self.before_attempt_start_time_nanos!r}, " 

586 f"attempt_start_time_nanos={self.attempt_start_time_nanos!r}, idle_nanos={self.idle_nanos!r})" 

587 ) 

588 

589 def __eq__(self, other: object) -> bool: 

590 return self is other 

591 

592 def __hash__(self) -> int: 

593 return object.__hash__(self) 

594 

595 

596############################################################################# 

597@final 

598class AttemptOutcome(NamedTuple): 

599 """Captures per-attempt state for ``after_attempt`` callbacks; immutable.""" 

600 

601 retry: Retry 

602 """Attempt metadata passed into fn(retry).""" 

603 

604 is_success: bool 

605 """False if fn(retry) raised a RetryableError; True otherwise.""" 

606 

607 is_exhausted: bool 

608 """True if the loop is giving up retrying (possibly even due to is_terminated); False otherwise.""" 

609 

610 is_terminated: bool 

611 """True if the termination predicate has become true; False otherwise.""" 

612 

613 giveup_reason: object | None 

614 """Reason returned by giveup(); None means giveup() was not called or decided to not give up.""" 

615 

616 elapsed_nanos: int 

617 """Total duration between the start of call_with_retries() invocation and the end of this fn() attempt.""" 

618 

619 sleep_nanos: int 

620 """Duration of current sleep period.""" 

621 

622 result: RetryableError | object 

623 """Result of fn(retry); a RetryableError on retryable failure, or some other object on success.""" 

624 

625 def attempt_elapsed_nanos(self) -> int: 

626 """Returns duration between the start of this fn() attempt and the end of this fn() attempt.""" 

627 return self.elapsed_nanos + self.retry.call_start_time_nanos - self.retry.attempt_start_time_nanos 

628 

629 def copy(self, **override_kwargs: Any) -> AttemptOutcome: 

630 """Creates a new outcome copying an existing one with the specified fields overridden for customization.""" 

631 return self._replace(**override_kwargs) 

632 

633 def __repr__(self) -> str: 

634 return ( 

635 f"{type(self).__name__}(" 

636 f"retry={self.retry!r}, " 

637 f"is_success={self.is_success!r}, " 

638 f"is_exhausted={self.is_exhausted!r}, " 

639 f"is_terminated={self.is_terminated!r}, " 

640 f"giveup_reason={self.giveup_reason!r}, " 

641 f"elapsed_nanos={self.elapsed_nanos!r}, " 

642 f"sleep_nanos={self.sleep_nanos!r})" 

643 ) 

644 

645 def __eq__(self, other: object) -> bool: 

646 return self is other 

647 

648 def __hash__(self) -> int: 

649 return object.__hash__(self) 

650 

651 

652############################################################################# 

653@final 

654class BackoffContext(NamedTuple): 

655 """Captures per-backoff state for ``backoff`` callbacks.""" 

656 

657 retry: Retry 

658 """Attempt metadata passed into fn(retry).""" 

659 

660 curr_max_sleep_nanos: int 

661 """Current maximum duration (in nanoseconds) to sleep before the next retry attempt; 

662 Typically: ``RetryPolicy.initial_max_sleep_nanos <= curr_max_sleep_nanos <= RetryPolicy.max_sleep_nanos``.""" 

663 

664 rng: random.Random 

665 """Thread-local random number generator instance.""" 

666 

667 elapsed_nanos: int 

668 """Total duration between the start of call_with_retries() invocation and the end of this fn() attempt.""" 

669 

670 retryable_error: RetryableError 

671 """Result of failed fn(retry) attempt.""" 

672 

673 def copy(self, **override_kwargs: Any) -> BackoffContext: 

674 """Creates a new object copying an existing one with the specified fields overridden for customization.""" 

675 return self._replace(**override_kwargs) 

676 

677 def __repr__(self) -> str: 

678 return ( 

679 f"{type(self).__name__}(" 

680 f"retry={self.retry!r}, " 

681 f"curr_max_sleep_nanos={self.curr_max_sleep_nanos!r}, " 

682 f"elapsed_nanos={self.elapsed_nanos!r})" 

683 ) 

684 

685 def __eq__(self, other: object) -> bool: 

686 return self is other 

687 

688 def __hash__(self) -> int: 

689 return object.__hash__(self) 

690 

691 

692BackoffStrategy = Callable[[BackoffContext], tuple[int, int]] # typealias; returns sleep_nanos:int, curr_max_sleep_nanos:int 

693"""Strategy that implements a backoff algorithm that reduces server load while minimizing retry latency; default is full 

694jitter; various other example backoff strategies such as decorrelated-jitter or retry-after HTTP 429 "Too Many Requests" 

695responses, etc can be found in test_retry_examples.py.""" 

696 

697 

698############################################################################# 

699def _default_timing_is_terminated(retry: Retry) -> bool: 

700 return False 

701 

702 

703def _default_timing_sleep(sleep_nanos: int, retry: Retry) -> None: 

704 time.sleep(sleep_nanos / 1_000_000_000) 

705 

706 

707async def _default_timing_sleep_asyncio(sleep_nanos: int, retry: Retry) -> None: 

708 import asyncio 

709 

710 await asyncio.sleep(sleep_nanos / 1_000_000_000) 

711 

712 

713def _default_timing_on_before_attempt(retry: Retry) -> None: 

714 if retry.policy.timing.is_terminated(retry): 

715 raise RetryableError(display_msg="terminated before attempt") from RetryTerminationError() 

716 

717 

718@final 

719class RetryTerminationError(InterruptedError): 

720 """Termination signal raised when retry loop exits before starting the next attempt.""" 

721 

722 

723@dataclass(frozen=True) 

724@final 

725class RetryTiming: 

726 """Customizable callbacks for reading the current monotonic time, sleeping and optional async termination; immutable.""" 

727 

728 monotonic_ns: Callable[[], int] = time.monotonic_ns 

729 """Returns the system's current monotonic time in nanoseconds.""" 

730 

731 is_terminated: Callable[[Retry], bool] = _default_timing_is_terminated 

732 """Returns whether a predicate has become true; if so causes the retry loop to exit early between attempts; can be used 

733 to indicate system shutdown or similar cancellation conditions; default is to always return ``False``; this function 

734 should complete quickly without any blocking or sleeping.""" 

735 

736 sleep: Callable[[int, Retry], None] = _default_timing_sleep 

737 """Sleeps N nanoseconds between attempts; override to inject custom sleeping or for early wake-ups; thread-safe.""" 

738 

739 sleep_async: Callable[[int, Retry], Awaitable[None]] = _default_timing_sleep_asyncio 

740 """Sleeps N nanoseconds between attempts; override to inject custom sleeping or for early wake-ups; thread-safe.""" 

741 

742 on_before_attempt: Callable[[Retry], None] = _default_timing_on_before_attempt 

743 """Typically (but not necessarily) raises an error if ``is_terminated()`` is True; otherwise fn() will still run; this 

744 function should complete quickly without any blocking or sleeping. 

745 

746 To disable this behavior: RetryTiming.make_from(...).copy(on_before_attempt=lambda retry: None). 

747 """ 

748 

749 def copy(self, **override_kwargs: Any) -> RetryTiming: 

750 """Creates a new object copying an existing one with the specified fields overridden for customization; thread- 

751 safe.""" 

752 return dataclasses.replace(self, **override_kwargs) 

753 

754 @staticmethod 

755 def make_from(termination_event: threading.Event | None) -> RetryTiming: 

756 """Convenience factory that creates a RetryTiming that performs async termination when termination_event is set.""" 

757 if termination_event is None: 

758 return RetryTiming() 

759 

760 def _is_terminated(retry: Retry) -> bool: 

761 return termination_event.is_set() 

762 

763 def _sleep(sleep_nanos: int, retry: Retry) -> None: 

764 termination_event.wait(sleep_nanos / 1_000_000_000) # allow early wakeup on async termination 

765 

766 return RetryTiming(is_terminated=_is_terminated, sleep=_sleep) 

767 

768 @staticmethod 

769 def make_from_asyncio(termination_event: asyncio.Event | None) -> RetryTiming: 

770 """Convenience factory that creates a RetryTiming that performs async termination when termination_event is set; 

771 Write an analog version of this function if you wish to replace asyncio with Trio or AnyIO or similar.""" 

772 if termination_event is None: 

773 return RetryTiming() 

774 

775 def _is_terminated(retry: Retry) -> bool: 

776 return termination_event.is_set() 

777 

778 async def _sleep_async(sleep_nanos: int, retry: Retry) -> None: 

779 import asyncio 

780 

781 if sleep_nanos <= 0: 

782 await asyncio.sleep(0) # perf: cooperative yield with less overhead than asyncio.wait_for(..., 0) 

783 return 

784 try: 

785 await asyncio.wait_for(termination_event.wait(), timeout=sleep_nanos / 1_000_000_000) 

786 except asyncio.TimeoutError: 

787 pass # expected 

788 

789 return RetryTiming(is_terminated=_is_terminated, sleep_async=_sleep_async) 

790 

791 

792############################################################################# 

793def _format_msg(display_msg: str, retryable_error: RetryableError) -> str: 

794 """Default implementation of ``format_msg`` callback for RetryConfig; creates simple log message; thread-safe.""" 

795 msg = display_msg + " " if display_msg else "" 

796 errmsg: str = retryable_error.display_msg_str() 

797 msg = msg + errmsg + " " if errmsg else msg 

798 msg = msg if msg else "Retrying " 

799 return msg 

800 

801 

802def _format_pair(first: object, second: object) -> str: 

803 """Default implementation of ``format_pair`` callback for RetryConfig; creates simple log message part; thread-safe.""" 

804 second = "∞" if INFINITY_MAX_RETRIES == second else second # noqa: SIM300 

805 return f"[{first}/{second}]" 

806 

807 

808@dataclass(frozen=True) 

809@final 

810class RetryConfig: 

811 """Configures logging for call_with_retries(); all defaults work out of the box; immutable.""" 

812 

813 display_msg: str = "Retrying" # message prefix for retry log messages 

814 dots: str = " ..." # suffix appended to retry log messages 

815 format_msg: Callable[[str, RetryableError], str] = _format_msg # lambda: display_msg, retryable_error 

816 format_pair: Callable[[object, object], str] = _format_pair # lambda: first, second 

817 format_duration: Callable[[int], str] = human_readable_duration # lambda: nanos 

818 info_loglevel: int = logging.INFO # loglevel used when not giving up 

819 warning_loglevel: int = logging.WARNING # loglevel used when giving up 

820 enable_logging: bool = True # set to False to disable logging 

821 exc_info: bool = False # passed into Logger.log() 

822 stack_info: bool = False # passed into Logger.log() 

823 extra: Mapping[str, object] | None = dataclasses.field(default=None, repr=False, compare=False) # passed to Logger.log() 

824 context: object = dataclasses.field(default=None, repr=False, compare=False) # optional domain specific info 

825 

826 def copy(self, **override_kwargs: Any) -> RetryConfig: 

827 """Creates a new config copying an existing one with the specified fields overridden for customization.""" 

828 return dataclasses.replace(self, **override_kwargs) 

829 

830 

831############################################################################# 

832@dataclass(frozen=True) 

833@final 

834class RetryPolicy: 

835 """Configuration of maximum retries, sleep bounds, elapsed-time budget, logging, etc for call_with_retries(); immutable. 

836 

837 By default uses full jitter which works as follows: The maximum duration to sleep between attempts initially starts with 

838 ``initial_max_sleep_secs`` and doubles on each retry, up to the final maximum of ``max_sleep_secs``. 

839 Example: 125ms --> 250ms --> 500ms --> 1s --> 2s --> 4s --> 8s --> 10s --> 10s... 

840 On each retry a random sleep duration in the range ``[min_sleep_secs, current max]`` is picked. 

841 In a nutshell: ``0 <= min_sleep_secs <= initial_max_sleep_secs <= max_sleep_secs``. Typically, min_sleep_secs=0. 

842 """ 

843 

844 max_retries: int = INFINITY_MAX_RETRIES 

845 """The maximum number of times ``fn`` will be invoked additionally after the first attempt invocation; must be >= 0.""" 

846 

847 min_sleep_secs: float = 0 

848 """The minimum duration to sleep between any two attempts.""" 

849 

850 initial_max_sleep_secs: float = 0.125 

851 """The initial maximum duration to sleep between any two attempts.""" 

852 

853 max_sleep_secs: float = 10 

854 """The final max duration to sleep between any two attempts; 0 <= min_sleep_secs <= initial_max_sleep_secs <= 

855 max_sleep_secs.""" 

856 

857 max_elapsed_secs: float = 47 

858 """``fn`` will not be retried (or not retried anymore) once this much time has elapsed since the initial start of 

859 call_with_retries(); set this to 365 * 86400 seconds or similar to effectively disable the time limit.""" 

860 

861 exponential_base: float = 2 

862 """Growth factor (aka multiplier) for backoff algorithm to calculate sleep duration; must be >= 1.""" 

863 

864 max_elapsed_nanos: int = dataclasses.field(init=False, repr=False) # derived value 

865 min_sleep_nanos: int = dataclasses.field(init=False, repr=False) # derived value 

866 initial_max_sleep_nanos: int = dataclasses.field(init=False, repr=False) # derived value 

867 max_sleep_nanos: int = dataclasses.field(init=False, repr=False) # derived value 

868 

869 reraise: bool = True 

870 """On exhaustion, the default (``True``) is to re-raise the underlying exception when present.""" 

871 

872 max_previous_outcomes: int = 0 

873 """Pass the N=max_previous_outcomes most recent AttemptOutcome objects to callbacks via Retry.previous_outcomes.""" 

874 

875 config: RetryConfig = dataclasses.field(default=RetryConfig(), repr=False, compare=False) 

876 """Configures logging behavior.""" 

877 

878 timing: RetryTiming = dataclasses.field(default=RetryTiming(), repr=False) 

879 """Customizable callbacks for reading the current monotonic time, sleeping and optional async termination.""" 

880 

881 context: object = dataclasses.field(default=None, repr=False, compare=False) 

882 """Optional domain specific info.""" 

883 

884 @classmethod 

885 def from_namespace(cls, args: argparse.Namespace) -> RetryPolicy: 

886 """Factory that reads the policy from argparse.ArgumentParser via args.""" 

887 return cls( 

888 max_retries=getattr(args, "max_retries", INFINITY_MAX_RETRIES), 

889 min_sleep_secs=getattr(args, "retry_min_sleep_secs", 0), 

890 initial_max_sleep_secs=getattr(args, "retry_initial_max_sleep_secs", 0.125), 

891 max_sleep_secs=getattr(args, "retry_max_sleep_secs", 10), 

892 max_elapsed_secs=getattr(args, "retry_max_elapsed_secs", 47), 

893 exponential_base=getattr(args, "retry_exponential_base", 2), 

894 reraise=getattr(args, "retry_reraise", True), 

895 max_previous_outcomes=getattr(args, "retry_max_previous_outcomes", 0), 

896 config=getattr(args, "retry_config", RetryConfig()), 

897 timing=getattr(args, "retry_timing", RetryTiming()), 

898 context=getattr(args, "retry_context", None), 

899 ) 

900 

901 @classmethod 

902 def no_retries(cls) -> RetryPolicy: 

903 """Returns a policy that never retries.""" 

904 return cls( 

905 max_retries=0, 

906 min_sleep_secs=0, 

907 initial_max_sleep_secs=0, 

908 max_sleep_secs=0, 

909 max_elapsed_secs=0, 

910 ) 

911 

912 def __post_init__(self) -> None: # validate and compute derived values 

913 self._validate_min("max_retries", self.max_retries, 0) 

914 self._validate_min("exponential_base", self.exponential_base, 1) 

915 self._validate_min("min_sleep_secs", self.min_sleep_secs, 0) 

916 self._validate_min("initial_max_sleep_secs", self.initial_max_sleep_secs, 0) 

917 self._validate_min("max_sleep_secs", self.max_sleep_secs, 0) 

918 self._validate_min("max_elapsed_secs", self.max_elapsed_secs, 0) 

919 object.__setattr__(self, "max_elapsed_nanos", int(self.max_elapsed_secs * 1_000_000_000)) # derived value 

920 min_sleep_nanos: int = int(self.min_sleep_secs * 1_000_000_000) 

921 initial_max_sleep_nanos: int = int(self.initial_max_sleep_secs * 1_000_000_000) 

922 max_sleep_nanos: int = int(self.max_sleep_secs * 1_000_000_000) 

923 max_sleep_nanos = max(min_sleep_nanos, max_sleep_nanos) 

924 initial_max_sleep_nanos = min(max_sleep_nanos, max(min_sleep_nanos, initial_max_sleep_nanos)) 

925 object.__setattr__(self, "min_sleep_nanos", min_sleep_nanos) # derived value 

926 object.__setattr__(self, "initial_max_sleep_nanos", initial_max_sleep_nanos) # derived value 

927 object.__setattr__(self, "max_sleep_nanos", max_sleep_nanos) # derived value 

928 self._validate_min("max_previous_outcomes", self.max_previous_outcomes, 0) 

929 assert 0 <= self.min_sleep_nanos <= self.initial_max_sleep_nanos <= self.max_sleep_nanos 

930 if not isinstance(self.reraise, bool): 

931 raise TypeError(f"{type(self).__name__}.reraise must be bool") 

932 

933 def _validate_min(self, attr_name: str, value: float, minimum: float) -> None: 

934 if value < minimum: 

935 raise ValueError(f"Invalid {type(self).__name__}.{attr_name}: must be >= {minimum} but got {value}") 

936 

937 def copy(self, **override_kwargs: Any) -> RetryPolicy: 

938 """Creates a new policy copying an existing one with the specified fields overridden for customization; thread-safe. 

939 

940 Example usage: policy = retry_policy.copy(max_sleep_secs=2, max_elapsed_secs=10) 

941 """ 

942 return dataclasses.replace(self, **override_kwargs) 

943 

944 

945############################################################################# 

946def _fn_not_implemented(_retry: Retry) -> NoReturn: 

947 """Default implementation of ``fn`` callback for RetryTemplate; always raises.""" 

948 raise NotImplementedError("Provide fn when calling RetryTemplate") 

949 

950 

951NO_LOGGER: Final[logging.Logger] = logging.Logger("NULL") # noqa: LOG001 do not register dummy logger with Logger.manager 

952NO_LOGGER.addHandler(logging.NullHandler()) # prevents lastResort fallback 

953NO_LOGGER.disabled = True 

954NO_LOGGER.propagate = False 

955_R = TypeVar("_R") 

956 

957 

958@dataclass(frozen=True) 

959@final 

960class RetryTemplate(Generic[_T]): 

961 """Convenience class that aggregates all knobs for call_with_retries(); and is itself callable too; immutable.""" 

962 

963 fn: Callable[[Retry], _T] = _fn_not_implemented # set this to make the RetryTemplate object itself callable 

964 policy: RetryPolicy = RetryPolicy() # specifies how ``RetryableError`` shall be retried 

965 backoff: BackoffStrategy = full_jitter_backoff_strategy # computes delay time before next retry attempt, after failure 

966 giveup: Callable[[AttemptOutcome], object | None] = no_giveup # stop retrying based on domain-specific logic, e.g. time 

967 before_attempt: Callable[[Retry], int] = before_attempt_noop # e.g. wait due to rate limiting or internal backpressure 

968 after_attempt: Callable[[AttemptOutcome], None] = after_attempt_log_failure # e.g. record metrics and/or custom logging 

969 on_retryable_error: Callable[[AttemptOutcome], None] = noop # e.g. count failures (RetryableError) caught by retry loop 

970 on_exhaustion: Callable[[AttemptOutcome], _T] = on_exhaustion_raise # raise error or return fallback value 

971 log: logging.Logger | None = None # set this to ``None`` to disable logging 

972 

973 def copy(self, **override_kwargs: Any) -> RetryTemplate[_T]: 

974 """Creates a new object copying an existing one with the specified fields overridden for customization; thread-safe. 

975 

976 Example usage: retry_template.copy(policy=policy.copy(max_sleep_secs=2, max_elapsed_secs=10), log=None) 

977 """ 

978 return dataclasses.replace(self, **override_kwargs) 

979 

980 def __call__(self) -> _T: 

981 """Invokes ``self.fn`` via the call_with_retries() retry loop using the stored parameters; thread-safe. 

982 

983 Example Usage: result: str = retry_template.copy(fn=...)() 

984 """ 

985 return call_with_retries( 

986 fn=self.fn, 

987 policy=self.policy, 

988 backoff=self.backoff, 

989 giveup=self.giveup, 

990 before_attempt=self.before_attempt, 

991 after_attempt=self.after_attempt, 

992 on_retryable_error=self.on_retryable_error, 

993 on_exhaustion=self.on_exhaustion, 

994 log=self.log, 

995 ) 

996 

997 def call_with_retries( 

998 self, 

999 fn: Callable[[Retry], _R], 

1000 policy: RetryPolicy | None = None, 

1001 *, 

1002 backoff: BackoffStrategy | None = None, 

1003 giveup: Callable[[AttemptOutcome], object | None] | None = None, 

1004 before_attempt: Callable[[Retry], int] | None = None, 

1005 after_attempt: Callable[[AttemptOutcome], None] | None = None, 

1006 on_retryable_error: Callable[[AttemptOutcome], None] | None = None, 

1007 on_exhaustion: Callable[[AttemptOutcome], _R] | None = None, 

1008 log: logging.Logger | None = None, # pass NO_LOGGER to override template logger and disable logging for this call 

1009 ) -> _R: 

1010 """Invokes ``fn`` via the call_with_retries() retry loop using the stored or overridden params; thread-safe. 

1011 

1012 Example Usage: result: str = retry_template.call_with_retries(fn=...) 

1013 """ 

1014 return call_with_retries( 

1015 fn=fn, 

1016 policy=self.policy if policy is None else policy, 

1017 backoff=self.backoff if backoff is None else backoff, 

1018 giveup=self.giveup if giveup is None else giveup, 

1019 before_attempt=self.before_attempt if before_attempt is None else before_attempt, 

1020 after_attempt=self.after_attempt if after_attempt is None else after_attempt, 

1021 on_retryable_error=self.on_retryable_error if on_retryable_error is None else on_retryable_error, 

1022 on_exhaustion=( 

1023 cast(Callable[[AttemptOutcome], _R], self.on_exhaustion) if on_exhaustion is None else on_exhaustion 

1024 ), 

1025 log=None if log is NO_LOGGER else self.log if log is None else log, 

1026 ) 

1027 

1028 async def call_with_retries_async( 

1029 self, 

1030 fn: Callable[[Retry], Awaitable[_R]], 

1031 policy: RetryPolicy | None = None, 

1032 *, 

1033 backoff: BackoffStrategy | None = None, 

1034 giveup: Callable[[AttemptOutcome], object | None] | None = None, 

1035 before_attempt: Callable[[Retry], int] | None = None, 

1036 after_attempt: Callable[[AttemptOutcome], None] | None = None, 

1037 on_retryable_error: Callable[[AttemptOutcome], None] | None = None, 

1038 on_exhaustion: Callable[[AttemptOutcome], _R] | None = None, 

1039 log: logging.Logger | None = None, # pass NO_LOGGER to override template logger and disable logging for this call 

1040 ) -> _R: 

1041 """Invokes ``fn`` via the call_with_retries_async() retry loop using the stored or overridden params; thread-safe. 

1042 

1043 Example Usage: result: str = await retry_template.call_with_retries_async(fn=...) 

1044 """ 

1045 return await call_with_retries_async( 

1046 fn=fn, 

1047 policy=self.policy if policy is None else policy, 

1048 backoff=self.backoff if backoff is None else backoff, 

1049 giveup=self.giveup if giveup is None else giveup, 

1050 before_attempt=self.before_attempt if before_attempt is None else before_attempt, 

1051 after_attempt=self.after_attempt if after_attempt is None else after_attempt, 

1052 on_retryable_error=self.on_retryable_error if on_retryable_error is None else on_retryable_error, 

1053 on_exhaustion=( 

1054 cast(Callable[[AttemptOutcome], _R], self.on_exhaustion) if on_exhaustion is None else on_exhaustion 

1055 ), 

1056 log=None if log is NO_LOGGER else self.log if log is None else log, 

1057 ) 

1058 

1059 @overload 

1060 def wraps(self, fn: Callable[..., Awaitable[_R]]) -> Callable[..., Awaitable[_R]]: ... 

1061 

1062 @overload 

1063 def wraps(self, fn: Callable[..., _R]) -> Callable[..., _R]: ... 

1064 

1065 def wraps(self, fn: Callable[..., Any]) -> Callable[..., Any]: 

1066 """Returns a wrapper function that forwards all arguments to ``fn`` and retries it using this template; thread-safe. 

1067 

1068 Example Usage: 

1069 def fn(x: int) -> int: 

1070 return x * 2 

1071 func: Callable[[int], int] = retry_template.wraps(fn) 

1072 y: int = func(5) # returns 10 

1073 """ 

1074 import functools 

1075 import inspect 

1076 

1077 target_fn: Callable[..., Any] = fn.func if isinstance(fn, functools.partial) else fn 

1078 if inspect.iscoroutinefunction(target_fn) or inspect.iscoroutinefunction(type(target_fn).__call__): 

1079 

1080 @functools.wraps(fn) 

1081 async def wrapped_async(*args: Any, **kwargs: Any) -> Any: 

1082 return await self.call_with_retries_async(fn=lambda _retry: fn(*args, **kwargs)) 

1083 

1084 return wrapped_async 

1085 else: 

1086 

1087 @functools.wraps(fn) 

1088 def wrapped(*args: Any, **kwargs: Any) -> Any: 

1089 return self.call_with_retries(fn=lambda _retry: fn(*args, **kwargs)) 

1090 

1091 return wrapped 

1092 

1093 

1094############################################################################# 

1095def raise_retryable_error_from( 

1096 exc: BaseException, 

1097 *, 

1098 display_msg: object = None, 

1099 retry_immediately_once: bool = False, 

1100 category: object = None, 

1101 attachment: object = None, 

1102) -> NoReturn: 

1103 """Convenience function that raises a generic RetryableError that wraps the given underlying exception.""" 

1104 raise RetryableError( 

1105 display_msg=type(exc).__name__ if display_msg is None else display_msg, 

1106 retry_immediately_once=retry_immediately_once, 

1107 category=category, 

1108 attachment=attachment, 

1109 ) from exc 

1110 

1111 

1112ExceptionPredicate = Union[bool, Callable[[BaseException], bool]] # Type alias 

1113 

1114 

1115def call_with_exception_handlers( 

1116 fn: Callable[[], _T], # typically a lambda 

1117 *, 

1118 continue_scanning_if_no_predicate_matches: bool = False, 

1119 handlers: Mapping[type[BaseException], Sequence[tuple[ExceptionPredicate, Callable[[BaseException], _T]]]], 

1120) -> _T: 

1121 """Convenience function that calls ``fn`` and returns its result; on exception runs the first matching handler in a per- 

1122 exception handler chain; composes independent handlers via predicates into one function, in Event-Predicate-Action style. 

1123 

1124 Lookup uses the exception type's Method Resolution Order (most-specific class in the exception class hierarchy wins). For 

1125 the first class that exists as a key in ``handlers``, its chain is scanned in order. Each chain element is 

1126 ``(predicate, handler)`` where ``predicate`` is either ``True`` (always matches), ``False`` (disabled), or 

1127 ``predicate(exc) -> bool``. The first matching handler is called with the exception and its return value is returned. If 

1128 no predicate matches then, by default, the original exception is re-raised and no less-specific handler chains are 

1129 consulted. Set ``continue_scanning_if_no_predicate_matches=True`` to continue scanning exception base classes instead. 

1130 

1131 Typically (but not necessarily) the handler raises a ``RetryableError``, via ``raise_retryable_error_from`` or similar. 

1132 Or it may raise another exception type (which will not be retried), or even return a fallback value instead of raising. 

1133 

1134 Example: turn transient ssh/zfs command failures into RetryableError for call_with_retries(), including feature flags: 

1135 

1136 def run_remote(retry: Retry) -> str: 

1137 p = subprocess.run(["ssh", "foo.example.com", "zfs", "list", "-H"], text=True, capture_output=True, check=True) 

1138 return p.stdout 

1139 

1140 def fn(retry: Retry) -> str: 

1141 return call_with_exception_handlers( 

1142 fn=lambda: run_remote(retry), 

1143 handlers={ 

1144 TimeoutError: [(True, raise_retryable_error_from)], 

1145 ConnectionResetError: [(True, lambda exc: raise_retryable_error_from(exc, display_msg="ssh reset"))], 

1146 subprocess.CalledProcessError: [ 

1147 (lambda exc: exc.returncode == 255, lambda exc: raise_retryable_error_from(exc, display_msg="ssh error")), 

1148 (lambda exc: "cannot receive" in (exc.stderr or ""), lambda exc: raise_retryable_error_from(exc, display_msg="zfs recv")), 

1149 ], 

1150 OSError: [ 

1151 (lambda exc: getattr(exc, "errno", None) in {errno.ETIMEDOUT, errno.EHOSTUNREACH}, 

1152 lambda exc: raise_retryable_error_from(exc, display_msg=f"network: {exc}")), 

1153 (False, lambda exc: raise_retryable_error_from(exc, display_msg="disabled handler example")), 

1154 ], 

1155 }, 

1156 ) 

1157 

1158 stdout: str = call_with_retries(fn=fn, policy=RetryPolicy(max_retries=3)) 

1159 

1160 Example: return a fallback value (no retry loop required): 

1161 

1162 def read_optional_file(path: str) -> str: 

1163 return call_with_exception_handlers( 

1164 fn=lambda: open(path, encoding="utf-8").read(), 

1165 handlers={FileNotFoundError: [(True, lambda _exc: "")]}, 

1166 ) 

1167 """ 

1168 try: 

1169 return fn() 

1170 except BaseException as exc: 

1171 for cls in type(exc).__mro__: 

1172 handler_chain = handlers.get(cls) 

1173 if handler_chain is not None: 

1174 for predicate, handler in handler_chain: 

1175 if predicate is True or (predicate is not False and predicate(exc)): 

1176 return handler(exc) 

1177 if not continue_scanning_if_no_predicate_matches: 

1178 raise 

1179 raise 

1180 

1181 

1182############################################################################# 

1183@final 

1184class _ThreadLocalRNG(threading.local): 

1185 """Caches a per-thread random number generator.""" 

1186 

1187 def __init__(self) -> None: 

1188 self.rng: random.Random | None = None 

1189 

1190 

1191_THREAD_LOCAL_RNG: Final[_ThreadLocalRNG] = _ThreadLocalRNG() 

1192 

1193 

1194def _thread_local_rng() -> random.Random: 

1195 """Returns a per-thread RNG for backoff jitter; for perf avoids locking and initializing a new random.Random() at high 

1196 frequency.""" 

1197 threadlocal: _ThreadLocalRNG = _THREAD_LOCAL_RNG 

1198 rng: random.Random | None = threadlocal.rng 

1199 if rng is None: 

1200 rng = random.Random() # noqa: S311 jitter isn't security sensitive, and random.SystemRandom.randint() is slow 

1201 threadlocal.rng = rng 

1202 return rng