Coverage for bzfs_main / util / retry.py: 100%

247 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-22 08:03 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Customizable generic retry support using jittered exponential backoff with cap. 

16 

17Purpose: 

18-------- 

19- Provide a reusable retry helper for transient failures using customizable policy, config and callbacks. 

20- Centralize backoff, jitter, logging and metrics behavior while keeping call sites compact. 

21- Prevent accidental retries: the loop retries only when the developer explicitly raises a ``RetryableError``, which reduces 

22 the risk of retrying non-idempotent operations. 

23- Provide a thread-safe, fast implementation; avoid shared RNG contention. 

24- Avoid unnecessary complexity and add zero dependencies beyond the Python standard library. Everything you need is in this 

25 single Python file. 

26 

27Usage: 

28------ 

29- Wrap work in a callable ``fn(retry: Retry)`` and therein raise ``RetryableError`` for failures that should be retried. 

30- Construct a policy via ``RetryPolicy(...)`` that specifies how ``RetryableError`` shall be retried. 

31- Invoke ``call_with_retries(fn=fn, policy=policy, log=logger)`` with a standard logging.Logger 

32- On success, the result of calling ``fn`` is returned. 

33- On exhaustion, call_with_retries() either re-raises the last underlying ``RetryableError.__cause__``, or raises 

34 ``RetryError`` (wrapping the last ``RetryableError``), like so: 

35 - if ``RetryPolicy.reraise`` is True and the last ``RetryableError.__cause__`` is not None, re-raise the last 

36 ``RetryableError.__cause__`` with its original traceback. 

37 - Otherwise, raise ``RetryError`` (wrapping the last ``RetryableError``, preserving its ``__cause__`` chain). 

38 - The default is ``RetryPolicy.reraise=True``. 

39 

40Advanced Configuration: 

41----------------------- 

42- Tune ``RetryPolicy`` parameters to control maximum retries, sleep bounds, and elapsed-time budget. 

43- Use ``RetryConfig`` to control termination events, and logging settings. 

44- Set ``log=None`` to disable logging, or customize ``info_loglevel`` / ``warning_loglevel`` for structured logs. 

45- Pass ``termination_event`` via ``RetryConfig`` to support async cancellation between attempts. 

46- Supply a ``giveup(AttemptOutcome)`` callback to stop retrying based on domain-specific logic (for example, error/status 

47 codes or parsing stderr), including time-aware decisions or decisions based on the previous N most recent AttemptOutcome 

48 objects (via AttemptOutcome.retry.previous_outcomes) 

49 

50Observability: 

51-------------- 

52- Supply an ``after_attempt(AttemptOutcome)`` callback to collect per-attempt metrics such as success flag, 

53 exhausted/terminated state, attempt number, total elapsed duration (in nanoseconds), sleep duration (in nanoseconds), etc. 

54- ``AttemptOutcome.result`` is either the successful result or the most recent ``RetryableError``, enabling integration with 

55 metrics and tracing systems without coupling the retry loop to any specific backend. 

56- Supply ``after_attempt`` to customize logging 100%, if necessary, in which case also set 

57 ``RetryConfig.enable_logging=False``. 

58 

59Expert Configuration: 

60--------------------- 

61- Set ``RetryPolicy.max_previous_outcomes > 0`` to pass the N most recent AttemptOutcome objects to callbacks (default is 0). 

62- If ``RetryPolicy.max_previous_outcomes > 0``, you can use ``RetryableError(..., attachment=...)`` to carry domain-specific 

63 state from a failed attempt to the next attempt via ``retry.previous_outcomes``. This pattern helps if attempt N+1 is a 

64 function of attempt N or all prior attempts (e.g., switching endpoints or resuming from an offset). 

65- Set ``backoff_strategy(retry, curr_max_sleep_nanos, rng, elapsed_nanos, retryable_error)`` to plug in a custom backoff 

66 algorithm (e.g., decorrelated-jitter). The default is full-jitter exponential backoff with cap (aka industry standard). 

67- Use ``RetryOptions`` as a 'bag of knobs' configuration template for functions that shall be retried in similar ways. 

68- Or package up all knobs plus a ``fn(retry: Retry)`` function into a self-contained auto-retrying higher level function by 

69 constructing a ``RetryOptions`` object (which is a ``Callable`` function itself). 

70- To keep calling code retry-transparent, set ``RetryPolicy.reraise=True`` (the default) *and* raise retryable failures as 

71 ``raise RetryableError(...) from exc``. Client code now won't notice whether call_with_retries is used or not. 

72- To make exhaustion observable to calling code, set ``RetryPolicy.reraise=False``: call_with_retries() now always raises 

73 ``RetryError`` (wrapping the last ``RetryableError``) on exhaustion, so callers now catch ``RetryError`` and can inspect 

74 the last underlying exception via ``err.outcome``, ``err.__cause__``, and even ``err.__cause__.__cause__`` when present. 

75 

76Example Usage: 

77-------------- 

78 import logging 

79 from bzfs_main.util.retry import Retry, RetryPolicy, RetryableError, call_with_retries 

80 

81 def unreliable_operation(retry: Retry) -> str: 

82 try: 

83 if retry.count < 3: 

84 raise ValueError("temporary failure connecting to foo.example.com") 

85 return "ok" 

86 except ValueError as exc: 

87 # Preserve the underlying cause for correct error propagation and logging 

88 raise RetryableError("temporary failure", display_msg="connect") from exc 

89 

90 retry_policy = RetryPolicy( 

91 max_retries=10, 

92 min_sleep_secs=0, 

93 initial_max_sleep_secs=0.125, 

94 max_sleep_secs=10, 

95 max_elapsed_secs=60, 

96 ) 

97 log = logging.getLogger(__name__) 

98 result: str = call_with_retries(fn=unreliable_operation, policy=retry_policy, log=log) 

99 print(result) 

100 

101 # Sample log output: 

102 # INFO:Retrying connect [1/10] in 8.79ms ... 

103 # INFO:Retrying connect [2/10] in 60.1ms ... 

104 # INFO:Retrying connect [3/10] in 192ms ... 

105 # ok 

106 

107Background: 

108----------- 

109For background on exponential backoff and jitter, see for example 

110https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter 

111""" 

112 

113from __future__ import ( 

114 annotations, 

115) 

116import argparse 

117import dataclasses 

118import logging 

119import random 

120import threading 

121import time 

122from collections.abc import ( 

123 Mapping, 

124 Sequence, 

125) 

126from dataclasses import ( 

127 dataclass, 

128) 

129from logging import ( 

130 Logger, 

131) 

132from typing import ( 

133 Any, 

134 Callable, 

135 Final, 

136 Generic, 

137 NoReturn, 

138 TypeVar, 

139 final, 

140) 

141 

142from bzfs_main.util.utils import ( 

143 human_readable_duration, 

144) 

145 

146############################################################################# 

147_T = TypeVar("_T") 

148 

149 

150def _after_attempt(_outcome: AttemptOutcome) -> None: 

151 """Default implementation does nothing.""" 

152 

153 

154def _giveup(_outcome: AttemptOutcome) -> str: 

155 """Default implementation never gives up.""" 

156 return "" 

157 

158 

159def call_with_retries( 

160 fn: Callable[[Retry], _T], # typically a lambda 

161 policy: RetryPolicy, 

162 config: RetryConfig | None = None, 

163 giveup: Callable[[AttemptOutcome], str] = _giveup, # stop retrying based on domain-specific logic 

164 after_attempt: Callable[[AttemptOutcome], None] = _after_attempt, # e.g. record metrics 

165 log: Logger | None = None, 

166) -> _T: 

167 """Runs the function ``fn`` and returns its result; retries on failure as indicated by policy and config; thread-safe. 

168 

169 On exhaustion, call_with_retries() either re-raises the last underlying ``RetryableError.__cause__``, or raises 

170 ``RetryError`` (wrapping the last ``RetryableError``), like so: 

171 - if ``RetryPolicy.reraise`` is True and the last ``RetryableError.__cause__`` is not None, re-raise the last 

172 ``RetryableError.__cause__`` with its original traceback. 

173 - Otherwise, raise ``RetryError`` (wrapping the last ``RetryableError``, preserving its ``__cause__`` chain). 

174 - The default is ``RetryPolicy.reraise=True``. 

175 """ 

176 config = _DEFAULT_RETRY_CONFIG if config is None else config 

177 curr_max_sleep_nanos: int = policy.initial_max_sleep_nanos 

178 retry_count: int = 0 

179 rng: random.Random | None = None 

180 previous_outcomes: tuple[AttemptOutcome, ...] = () # for safety pass *immutable* deque to callbacks 

181 start_time_nanos: Final[int] = time.monotonic_ns() 

182 while True: 

183 giveup_reason: str = "" 

184 retry: Retry = Retry(retry_count, start_time_nanos, policy, config, previous_outcomes) 

185 try: 

186 result: _T = fn(retry) # Call the target function and supply retry attempt number and other metadata 

187 if after_attempt is not _after_attempt: 

188 outcome: AttemptOutcome = AttemptOutcome( 

189 retry, True, False, False, giveup_reason, time.monotonic_ns() - start_time_nanos, 0, result, log 

190 ) 

191 after_attempt(outcome) 

192 return result 

193 except RetryableError as retryable_error: 

194 elapsed_nanos: int = time.monotonic_ns() - start_time_nanos 

195 if retry_count == 0 and retryable_error.retry_immediately_once: 

196 sleep_nanos: int = 0 # retry once immediately without backoff 

197 else: # jitter: default backoff_strategy picks random sleep_nanos in [min_sleep_nanos, curr_max_sleep_nanos] 

198 rng = _thread_local_rng() if rng is None else rng 

199 sleep_nanos, curr_max_sleep_nanos = policy.backoff_strategy( 

200 retry, curr_max_sleep_nanos, rng, elapsed_nanos, retryable_error 

201 ) 

202 assert sleep_nanos >= 0 

203 assert curr_max_sleep_nanos >= 0 

204 

205 format_duration: Callable[[int], str] = config.format_duration # lambda: nanos 

206 termination_event: threading.Event | None = config.termination_event 

207 outcome = AttemptOutcome( 

208 retry, False, False, False, giveup_reason, elapsed_nanos, sleep_nanos, retryable_error, log 

209 ) 

210 will_retry: bool = False 

211 if ( 

212 retry_count < policy.max_retries 

213 and elapsed_nanos < policy.max_elapsed_nanos 

214 and (termination_event is None or not termination_event.is_set()) 

215 and not (giveup_reason := giveup(outcome)) 

216 ): 

217 will_retry = True 

218 retry_count += 1 

219 loglevel: int = config.info_loglevel 

220 if log is not None and config.enable_logging and log.isEnabledFor(loglevel): 

221 m1: str = config.format_msg(config.display_msg, retryable_error) 

222 m2: str = config.format_pair(retry_count, policy.max_retries) 

223 log.log(loglevel, "%s", f"{m1}{m2} in {format_duration(sleep_nanos)}{config.dots}", extra=config.extra) 

224 after_attempt(outcome) 

225 _sleep(sleep_nanos, termination_event) 

226 else: 

227 sleep_nanos = 0 

228 

229 is_terminated: bool = termination_event is not None and termination_event.is_set() 

230 if is_terminated or not will_retry: 

231 if ( 

232 policy.max_retries > 0 

233 and log is not None 

234 and config.enable_logging 

235 and log.isEnabledFor(config.warning_loglevel) 

236 and not is_terminated 

237 ): 

238 reason: str = f"{giveup_reason}; " if giveup_reason else "" 

239 log.log( 

240 config.warning_loglevel, 

241 "%s", 

242 f"{config.format_msg(config.display_msg, retryable_error)}" 

243 f"exhausted; giving up because {reason}the last " 

244 f"{config.format_pair(retry_count, policy.max_retries)} retries across " 

245 f"{config.format_pair(format_duration(elapsed_nanos), format_duration(policy.max_elapsed_nanos))} " 

246 "failed", 

247 exc_info=retryable_error if config.exc_info else None, 

248 stack_info=config.stack_info, 

249 extra=config.extra, 

250 ) 

251 outcome = AttemptOutcome( 

252 retry, False, True, is_terminated, giveup_reason, elapsed_nanos, sleep_nanos, retryable_error, log 

253 ) 

254 after_attempt(outcome) 

255 cause: BaseException | None = retryable_error.__cause__ 

256 if policy.reraise and cause is not None: 

257 raise cause.with_traceback(cause.__traceback__) # noqa: B904 intentional re-raise without chaining 

258 else: 

259 raise RetryError(outcome) from retryable_error 

260 

261 n = policy.max_previous_outcomes 

262 if n > 0: # outcome will be passed to next attempt via Retry.previous_outcomes 

263 if len(outcome.retry.previous_outcomes) > 0: 

264 outcome = outcome.copy(retry=retry.copy(previous_outcomes=())) # detach to reduce memory footprint 

265 previous_outcomes = previous_outcomes[len(previous_outcomes) - n + 1 :] + (outcome,) # *immutable* deque 

266 del outcome # help gc 

267 

268 

269def _sleep(sleep_nanos: int, termination_event: threading.Event | None) -> None: 

270 if sleep_nanos > 0: 

271 if termination_event is None: 

272 time.sleep(sleep_nanos / 1_000_000_000) 

273 else: 

274 termination_event.wait(sleep_nanos / 1_000_000_000) # allow early wakeup on async termination 

275 

276 

277############################################################################# 

278class RetryableError(Exception): 

279 """Indicates that the task that caused the underlying exception can be retried and might eventually succeed; 

280 ``call_with_retries()`` will pass this exception to callbacks via ``AttemptOutcome.result``.""" 

281 

282 def __init__( 

283 self, message: str, display_msg: object = None, retry_immediately_once: bool = False, attachment: object = None 

284 ) -> None: 

285 super().__init__(message) 

286 self.display_msg: object = display_msg # for logging 

287 self.retry_immediately_once: bool = retry_immediately_once # retry once immediately without backoff? 

288 

289 self.attachment: object = attachment # domain specific info passed to next attempt via Retry.previous_outcomes if 

290 # RetryPolicy.max_previous_outcomes > 0. This helps when retrying is not just 'try again later', but 

291 # 'try again differently based on what just happened'. 

292 # Examples: switching network endpoints, adjusting per-attempt timeouts, capping retries by error-class, resuming 

293 # with a token/offset, maintaining failure history for this invocation of call_with_retries(). 

294 # Example: 'cap retries to 3 for ECONNREFUSED but 12 for ETIMEDOUT' via attachment=collections.Counter 

295 

296 def display_msg_str(self) -> str: 

297 """Returns the display_msg as a str; for logging.""" 

298 return "" if self.display_msg is None else str(self.display_msg) 

299 

300 

301############################################################################# 

302@dataclass 

303@final 

304class RetryError(Exception): 

305 """Indicates that retries have been exhausted; the last RetryableError is in RetryError.__cause__.""" 

306 

307 outcome: AttemptOutcome 

308 """Metadata that describes why and how call_with_retries() gave up.""" 

309 

310 

311############################################################################# 

312@dataclass(frozen=True) 

313@final 

314class Retry: 

315 """Attempt metadata provided to callback functions; includes the current retry attempt number; immutable.""" 

316 

317 count: int 

318 """Attempt number, count=0 is the first attempt, count=1 is the second attempt aka first retry.""" 

319 

320 start_time_nanos: int 

321 """Value of time.monotonic_ns() at start of call_with_retries() invocation.""" 

322 

323 policy: RetryPolicy = dataclasses.field(repr=False, compare=False) 

324 """Policy that was passed into call_with_retries().""" 

325 

326 config: RetryConfig = dataclasses.field(repr=False, compare=False) 

327 """Config that is used by call_with_retries().""" 

328 

329 previous_outcomes: Sequence[AttemptOutcome] = dataclasses.field(repr=False, compare=False) 

330 """History/state of the N=max_previous_outcomes most recent outcomes for the current call_with_retries() invocation.""" 

331 

332 def copy(self, **override_kwargs: Any) -> Retry: 

333 """Creates a new object copying an existing one with the specified fields overridden for customization.""" 

334 return dataclasses.replace(self, **override_kwargs) 

335 

336 

337############################################################################# 

338@dataclass(frozen=True) 

339@final 

340class AttemptOutcome: 

341 """Captures per-attempt state for ``after_attempt`` callbacks; immutable.""" 

342 

343 retry: Retry 

344 """Attempt metadata passed into fn(retry).""" 

345 

346 is_success: bool 

347 """False if fn(retry) raised a RetryableError; True otherwise.""" 

348 

349 is_exhausted: bool 

350 """True if the loop is giving up retrying (possibly even due to is_terminated); False otherwise.""" 

351 

352 is_terminated: bool 

353 """True if termination_event has become set; False otherwise.""" 

354 

355 giveup_reason: str 

356 """Reason returned by giveup(); Empty string means giveup() was not called or giveup() decided to not give up.""" 

357 

358 elapsed_nanos: int 

359 """Total duration since the start of call_with_retries() invocation and end of this fn() attempt.""" 

360 

361 sleep_nanos: int 

362 """Duration of current sleep period.""" 

363 

364 result: RetryableError | object = dataclasses.field(repr=False, compare=False) 

365 """Result of fn(retry); a RetryableError on retryable failure or some other object on success.""" 

366 

367 log: Logger | None = dataclasses.field(repr=False, compare=False) 

368 """Logger that was passed into call_with_retries().""" 

369 

370 def copy(self, **override_kwargs: Any) -> AttemptOutcome: 

371 """Creates a new outcome copying an existing one with the specified fields overridden for customization.""" 

372 return dataclasses.replace(self, **override_kwargs) 

373 

374 

375############################################################################# 

376def _full_jitter_backoff_strategy( 

377 retry: Retry, curr_max_sleep_nanos: int, rand: random.Random, elapsed_nanos: int, retryable_error: RetryableError 

378) -> tuple[int, int]: 

379 """Full-jitter picks a random sleep_nanos duration from the range [min_sleep_nanos, curr_max_sleep_nanos] and applies 

380 exponential backoff with cap to the next attempt.""" 

381 policy: RetryPolicy = retry.policy 

382 if policy.min_sleep_nanos == curr_max_sleep_nanos: 

383 sleep_nanos = curr_max_sleep_nanos # perf 

384 else: 

385 sleep_nanos = rand.randint(policy.min_sleep_nanos, curr_max_sleep_nanos) # nanos to delay until next attempt 

386 curr_max_sleep_nanos = round(curr_max_sleep_nanos * policy.exponential_base) # exponential backoff 

387 curr_max_sleep_nanos = min(curr_max_sleep_nanos, policy.max_sleep_nanos) # ... with cap for next attempt 

388 return sleep_nanos, curr_max_sleep_nanos 

389 

390 

391@dataclass(frozen=True) 

392@final 

393class RetryPolicy: 

394 """Configuration controlling max retry counts and backoff delays for call_with_retries(); immutable. 

395 

396 By default works as follows: The maximum duration to sleep between retries initially starts with 

397 ``initial_max_sleep_secs`` and doubles on each retry, up to the final maximum of ``max_sleep_secs``. 

398 On each retry a random sleep duration in the range ``[min_sleep_secs, current max]`` is picked. 

399 In a nutshell: ``0 <= min_sleep_secs <= initial_max_sleep_secs <= max_sleep_secs`` 

400 """ 

401 

402 max_retries: int = 10 

403 """The maximum number of times ``fn`` will be invoked additionally after the first attempt invocation; must be >= 0.""" 

404 

405 min_sleep_secs: float = 0 

406 """The minimum duration to sleep between retries .""" 

407 

408 initial_max_sleep_secs: float = 0.125 

409 """The initial maximum duration to sleep between retries.""" 

410 

411 max_sleep_secs: float = 10 

412 """The final max duration to sleep between retries; 0 <= min_sleep_secs <= initial_max_sleep_secs <= max_sleep_secs.""" 

413 

414 max_elapsed_secs: float = 60 

415 """``fn`` will not be retried (or not retried anymore) once this much time has elapsed since the initial start of 

416 call_with_retries().""" 

417 

418 exponential_base: float = 2 

419 """Growth factor for backoff algorithm to calculate sleep duration; must be >= 1.""" 

420 

421 max_elapsed_nanos: int = dataclasses.field(init=False, repr=False) # derived value 

422 min_sleep_nanos: int = dataclasses.field(init=False, repr=False) # derived value 

423 initial_max_sleep_nanos: int = dataclasses.field(init=False, repr=False) # derived value 

424 max_sleep_nanos: int = dataclasses.field(init=False, repr=False) # derived value 

425 

426 backoff_strategy: Callable[[Retry, int, random.Random, int, RetryableError], tuple[int, int]] = dataclasses.field( 

427 default=_full_jitter_backoff_strategy, repr=False # retry, curr_max_sleep_nanos, rng, elapsed_nanos, retryable_error 

428 ) 

429 """Strategy that implements a backoff algorithm to reduce resource contention.""" 

430 

431 reraise: bool = True 

432 """On exhaustion, the default (``True``) is to re-raise the underlying exception when present.""" 

433 

434 max_previous_outcomes: int = 0 

435 """Pass the N=max_previous_outcomes most recent AttemptOutcome objects to callbacks.""" 

436 

437 context: object = dataclasses.field(default=None, repr=False, compare=False) 

438 """Optional domain specific info.""" 

439 

440 @classmethod 

441 def from_namespace(cls, args: argparse.Namespace) -> RetryPolicy: 

442 """Factory that reads the policy from ArgumentParser via args.""" 

443 return RetryPolicy( 

444 max_retries=getattr(args, "max_retries", 10), 

445 min_sleep_secs=getattr(args, "retry_min_sleep_secs", 0), 

446 initial_max_sleep_secs=getattr(args, "retry_initial_max_sleep_secs", 0.125), 

447 max_sleep_secs=getattr(args, "retry_max_sleep_secs", 10), 

448 max_elapsed_secs=getattr(args, "retry_max_elapsed_secs", 60), 

449 exponential_base=getattr(args, "retry_exponential_base", 2), 

450 backoff_strategy=getattr(args, "retry_backoff_strategy", _full_jitter_backoff_strategy), 

451 reraise=getattr(args, "retry_reraise", True), 

452 max_previous_outcomes=getattr(args, "retry_max_previous_outcomes", 0), 

453 context=getattr(args, "retry_context", None), 

454 ) 

455 

456 @classmethod 

457 def no_retries(cls) -> RetryPolicy: 

458 """Returns a policy that never retries.""" 

459 return RetryPolicy( 

460 max_retries=0, 

461 min_sleep_secs=0, 

462 initial_max_sleep_secs=0, 

463 max_sleep_secs=0, 

464 max_elapsed_secs=0, 

465 ) 

466 

467 def __post_init__(self) -> None: # compute derived values 

468 object.__setattr__(self, "max_retries", max(0, self.max_retries)) 

469 object.__setattr__(self, "exponential_base", max(1, self.exponential_base)) 

470 object.__setattr__(self, "min_sleep_secs", max(0, self.min_sleep_secs)) 

471 object.__setattr__(self, "initial_max_sleep_secs", max(0, self.initial_max_sleep_secs)) 

472 object.__setattr__(self, "max_sleep_secs", max(0, self.max_sleep_secs)) 

473 object.__setattr__(self, "max_elapsed_secs", max(0, self.max_elapsed_secs)) 

474 object.__setattr__(self, "max_elapsed_nanos", int(self.max_elapsed_secs * 1_000_000_000)) 

475 min_sleep_nanos: int = int(self.min_sleep_secs * 1_000_000_000) 

476 initial_max_sleep_nanos: int = int(self.initial_max_sleep_secs * 1_000_000_000) 

477 max_sleep_nanos: int = int(self.max_sleep_secs * 1_000_000_000) 

478 max_sleep_nanos = max(min_sleep_nanos, max_sleep_nanos) 

479 initial_max_sleep_nanos = min(max_sleep_nanos, max(min_sleep_nanos, initial_max_sleep_nanos)) 

480 object.__setattr__(self, "min_sleep_nanos", min_sleep_nanos) 

481 object.__setattr__(self, "initial_max_sleep_nanos", initial_max_sleep_nanos) 

482 object.__setattr__(self, "max_sleep_nanos", max_sleep_nanos) 

483 object.__setattr__(self, "max_previous_outcomes", max(0, self.max_previous_outcomes)) 

484 assert 0 <= self.min_sleep_nanos <= self.initial_max_sleep_nanos <= self.max_sleep_nanos 

485 if not callable(self.backoff_strategy): 

486 raise TypeError("RetryPolicy.backoff_strategy must be callable") 

487 if not isinstance(self.reraise, bool): 

488 raise TypeError("RetryPolicy.reraise must be bool") 

489 

490 def copy(self, **override_kwargs: Any) -> RetryPolicy: 

491 """Creates a new policy copying an existing one with the specified fields overridden for customization.""" 

492 return dataclasses.replace(self, **override_kwargs) 

493 

494 

495############################################################################# 

496def _format_msg(display_msg: str, retryable_error: RetryableError) -> str: # thread-safe 

497 msg = display_msg + " " if display_msg else "" 

498 errmsg: str = retryable_error.display_msg_str() 

499 msg = msg + errmsg + " " if errmsg else msg 

500 msg = msg if msg else "Retrying " 

501 return msg 

502 

503 

504def _format_pair(first: object, second: object) -> str: # thread-safe 

505 return f"[{first}/{second}]" 

506 

507 

508@dataclass(frozen=True) 

509@final 

510class RetryConfig: 

511 """Configures termination behavior and logging for call_with_retries(); all defaults work out of the box; immutable.""" 

512 

513 termination_event: threading.Event | None = None # optionally allows for async cancellation 

514 display_msg: str = "Retrying" 

515 dots: str = " ..." 

516 format_msg: Callable[[str, RetryableError], str] = _format_msg # lambda: display_msg, retryable_error 

517 format_pair: Callable[[object, object], str] = _format_pair # lambda: first, second 

518 format_duration: Callable[[int], str] = human_readable_duration # lambda: nanos 

519 info_loglevel: int = logging.INFO # loglevel used when not giving up 

520 warning_loglevel: int = logging.WARNING # loglevel used when giving up 

521 enable_logging: bool = True # set to False to move logging aspect into after_attempt() 

522 exc_info: bool = False # passed into Logger.log() 

523 stack_info: bool = False # passed into Logger.log() 

524 extra: Mapping[str, object] | None = dataclasses.field(default=None, repr=False, compare=False) # passed to Logger.log() 

525 context: object = dataclasses.field(default=None, repr=False, compare=False) # optional domain specific info 

526 

527 def copy(self, **override_kwargs: Any) -> RetryConfig: 

528 """Creates a new config copying an existing one with the specified fields overridden for customization.""" 

529 return dataclasses.replace(self, **override_kwargs) 

530 

531 

532_DEFAULT_RETRY_CONFIG: Final[RetryConfig] = RetryConfig() # constant 

533 

534 

535############################################################################# 

536def _fn(_retry: Retry) -> NoReturn: 

537 """Default implementation always raises.""" 

538 raise NotImplementedError("Provide fn when calling RetryOptions") 

539 

540 

541@dataclass(frozen=True) 

542@final 

543class RetryOptions(Generic[_T]): 

544 """Convenience class that aggregates all knobs for call_with_retries(); and is itself callable too; immutable.""" 

545 

546 fn: Callable[[Retry], _T] = _fn 

547 policy: RetryPolicy = RetryPolicy() 

548 config: RetryConfig = RetryConfig() 

549 giveup: Callable[[AttemptOutcome], str] = _giveup # stop retrying based on domain-specific logic 

550 after_attempt: Callable[[AttemptOutcome], None] = _after_attempt # e.g. record metrics 

551 log: Logger | None = None 

552 

553 def copy(self, **override_kwargs: Any) -> RetryOptions[_T]: 

554 """Creates a new object copying an existing one with the specified fields overridden for customization.""" 

555 return dataclasses.replace(self, **override_kwargs) 

556 

557 def __call__(self) -> _T: 

558 """Executes ``self.fn`` via the call_with_retries() retry loop using the stored parameters; thread-safe.""" 

559 return call_with_retries( 

560 fn=self.fn, 

561 policy=self.policy, 

562 config=self.config, 

563 giveup=self.giveup, 

564 after_attempt=self.after_attempt, 

565 log=self.log, 

566 ) 

567 

568 

569############################################################################# 

570@final 

571class _ThreadLocalRNG(threading.local): 

572 """Caches a per-thread random number generator.""" 

573 

574 def __init__(self) -> None: 

575 self.rng: random.Random | None = None 

576 

577 

578_THREAD_LOCAL_RNG: Final[_ThreadLocalRNG] = _ThreadLocalRNG() 

579 

580 

581def _thread_local_rng() -> random.Random: 

582 """Returns a per-thread RNG for backoff jitter; for perf avoids locking and initializing a new random.Random() at high 

583 frequency.""" 

584 threadlocal: _ThreadLocalRNG = _THREAD_LOCAL_RNG 

585 rng: random.Random | None = threadlocal.rng 

586 if rng is None: 

587 rng = random.Random() # noqa: S311 jitter isn't security sensitive, and random.SystemRandom.randint() is slow 

588 threadlocal.rng = rng 

589 return rng