Coverage for bzfs_main / bzfs.py: 99%

1052 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-22 08:03 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15# Inline script metadata conforming to https://packaging.python.org/specifications/inline-script-metadata 

16# /// script 

17# requires-python = ">=3.9" 

18# dependencies = [] 

19# /// 

20# 

21""" 

22* Main CLI entry point for replicating and managing ZFS snapshots. It handles the low-level mechanics of `zfs send/receive`, 

23 data transfer, and snapshot management between two hosts. 

24* Overview of the bzfs.py codebase: 

25* The codebase starts with docs, definition of input data and associated argument parsing into a "Params" class. 

26* All CLI option/parameter values are reachable from the "Params" class. 

27* Control flow starts in main(), which kicks off a "Job". 

28* A Job runs one or more "tasks" via run_tasks(), each task replicating a separate dataset tree. 

29* The core replication algorithm is in run_task() and especially in replicate_datasets() and replicate_dataset(). 

30* The filter algorithms that apply include/exclude policies are in filter_datasets() and filter_snapshots(). 

31* The --create-src-snapshots-* and --delete-* and --compare-* and --monitor-* algorithms also start in run_task(). 

32* The main retry logic is in call_with_retries() and clear_resumable_recv_state_if_necessary(). 

33* Progress reporting for use during `zfs send/recv` data transfers is in class ProgressReporter. 

34* Executing a CLI command on a local or remote host is in run_ssh_command(). 

35* Network connection management is in refresh_ssh_connection_if_necessary() and class ConnectionPool. 

36* Cache functionality can be found by searching for this regex: .*cach.* 

37* The parallel processing engine is in itr_ssh_cmd_parallel() and process_datasets_in_parallel_and_fault_tolerant(). 

38* README.md is mostly auto-generated from the ArgumentParser help texts as the source of "truth", via update_readme.sh. 

39 Simply run that script whenever you change or add ArgumentParser help text. 

40""" 

41 

42from __future__ import ( 

43 annotations, 

44) 

45import argparse 

46import contextlib 

47import fcntl 

48import heapq 

49import itertools 

50import logging 

51import os 

52import re 

53import subprocess 

54import sys 

55import threading 

56import time 

57from collections import ( 

58 Counter, 

59 defaultdict, 

60) 

61from collections.abc import ( 

62 Collection, 

63) 

64from datetime import ( 

65 datetime, 

66 timedelta, 

67) 

68from logging import ( 

69 Logger, 

70) 

71from pathlib import ( 

72 Path, 

73) 

74from subprocess import ( 

75 DEVNULL, 

76 PIPE, 

77 CalledProcessError, 

78) 

79from typing import ( 

80 Any, 

81 Callable, 

82 Final, 

83 cast, 

84 final, 

85) 

86 

87import bzfs_main.loggers 

88from bzfs_main.argparse_actions import ( 

89 has_timerange_filter, 

90) 

91from bzfs_main.argparse_cli import ( 

92 EXCLUDE_DATASET_REGEXES_DEFAULT, 

93) 

94from bzfs_main.compare_snapshot_lists import ( 

95 run_compare_snapshot_lists, 

96) 

97from bzfs_main.configuration import ( 

98 AlertConfig, 

99 CreateSrcSnapshotConfig, 

100 LogParams, 

101 MonitorSnapshotAlert, 

102 Params, 

103 Remote, 

104 SnapshotLabel, 

105) 

106from bzfs_main.detect import ( 

107 DISABLE_PRG, 

108 RemoteConfCacheItem, 

109 are_bookmarks_enabled, 

110 detect_available_programs, 

111 is_caching_snapshots, 

112 is_dummy, 

113 is_zpool_feature_enabled_or_active, 

114) 

115from bzfs_main.filter import ( 

116 SNAPSHOT_REGEX_FILTER_NAME, 

117 dataset_regexes, 

118 filter_datasets, 

119 filter_lines, 

120 filter_lines_except, 

121 filter_snapshots, 

122) 

123from bzfs_main.loggers import ( 

124 get_simple_logger, 

125 reset_logger, 

126 set_logging_runtime_defaults, 

127) 

128from bzfs_main.parallel_batch_cmd import ( 

129 run_ssh_cmd_parallel, 

130 zfs_list_snapshots_in_parallel, 

131) 

132from bzfs_main.progress_reporter import ( 

133 ProgressReporter, 

134 count_num_bytes_transferred_by_zfs_send, 

135) 

136from bzfs_main.replication import ( 

137 delete_bookmarks, 

138 delete_datasets, 

139 delete_snapshots, 

140 replicate_dataset, 

141) 

142from bzfs_main.snapshot_cache import ( 

143 MATURITY_TIME_THRESHOLD_SECS, 

144 MONITOR_CACHE_FILE_PREFIX, 

145 REPLICATION_CACHE_FILE_PREFIX, 

146 SnapshotCache, 

147 set_last_modification_time_safe, 

148) 

149from bzfs_main.util.connection import ( 

150 SHARED, 

151 ConnectionPool, 

152 MiniJob, 

153 MiniRemote, 

154 timeout, 

155) 

156from bzfs_main.util.parallel_iterator import ( 

157 run_in_parallel, 

158) 

159from bzfs_main.util.parallel_tasktree_policy import ( 

160 process_datasets_in_parallel_and_fault_tolerant, 

161) 

162from bzfs_main.util.retry import ( 

163 Retry, 

164 RetryableError, 

165 RetryConfig, 

166 RetryOptions, 

167 call_with_retries, 

168) 

169from bzfs_main.util.utils import ( 

170 DESCENDANTS_RE_SUFFIX, 

171 DIE_STATUS, 

172 DONT_SKIP_DATASET, 

173 FILE_PERMISSIONS, 

174 LOG_DEBUG, 

175 LOG_TRACE, 

176 PROG_NAME, 

177 SHELL_CHARS, 

178 UMASK, 

179 YEAR_WITH_FOUR_DIGITS_REGEX, 

180 HashedInterner, 

181 SortedInterner, 

182 Subprocesses, 

183 SynchronizedBool, 

184 SynchronizedDict, 

185 append_if_absent, 

186 compile_regexes, 

187 cut, 

188 die, 

189 has_duplicates, 

190 human_readable_bytes, 

191 human_readable_duration, 

192 is_descendant, 

193 percent, 

194 pretty_print_formatter, 

195 replace_in_lines, 

196 replace_prefix, 

197 sha256_85_urlsafe_base64, 

198 sha256_128_urlsafe_base64, 

199 stderr_to_str, 

200 termination_signal_handler, 

201 validate_dataset_name, 

202 validate_property_name, 

203 xappend, 

204 xfinally, 

205 xprint, 

206) 

207 

208# constants: 

209__version__: Final[str] = bzfs_main.argparse_cli.__version__ 

210CRITICAL_STATUS: Final[int] = 2 

211WARNING_STATUS: Final[int] = 1 

212STILL_RUNNING_STATUS: Final[int] = 4 

213MIN_PYTHON_VERSION: Final[tuple[int, int]] = (3, 9) 

214if sys.version_info < MIN_PYTHON_VERSION: 

215 print(f"ERROR: {PROG_NAME} requires Python version >= {'.'.join(map(str, MIN_PYTHON_VERSION))}!") 

216 sys.exit(DIE_STATUS) 

217 

218 

219############################################################################# 

220def argument_parser() -> argparse.ArgumentParser: 

221 """Returns the CLI parser used by bzfs.""" 

222 return bzfs_main.argparse_cli.argument_parser() 

223 

224 

225def main() -> None: 

226 """API for command line clients.""" 

227 prev_umask: int = os.umask(UMASK) 

228 try: 

229 set_logging_runtime_defaults() 

230 # On CTRL-C and SIGTERM, send signal to all descendant processes to terminate them 

231 termination_event: threading.Event = threading.Event() 

232 with termination_signal_handler(termination_events=[termination_event]): 

233 run_main(argument_parser().parse_args(), sys.argv, termination_event=termination_event) 

234 except subprocess.CalledProcessError as e: 

235 ret: int = e.returncode 

236 ret = DIE_STATUS if isinstance(ret, int) and 1 <= ret <= STILL_RUNNING_STATUS else ret 

237 sys.exit(ret) 

238 finally: 

239 os.umask(prev_umask) # restore prior global state 

240 

241 

242def run_main( 

243 args: argparse.Namespace, 

244 sys_argv: list[str] | None = None, 

245 log: Logger | None = None, 

246 termination_event: threading.Event | None = None, 

247) -> None: 

248 """API for Python clients; visible for testing; may become a public API eventually.""" 

249 Job(termination_event=termination_event).run_main(args, sys_argv, log) 

250 

251 

252############################################################################# 

253@final 

254class Job(MiniJob): 

255 """Executes one bzfs run, coordinating snapshot replication tasks.""" 

256 

257 def __init__(self, termination_event: threading.Event | None = None) -> None: 

258 self.params: Params 

259 self.termination_event: Final[threading.Event] = termination_event or threading.Event() 

260 self.subprocesses: Subprocesses = Subprocesses(termination_event=self.termination_event) 

261 self.retry_options: Final[RetryOptions] = RetryOptions(config=RetryConfig(termination_event=self.termination_event)) 

262 self.all_dst_dataset_exists: Final[dict[str, dict[str, bool]]] = defaultdict(lambda: defaultdict(bool)) 

263 self.dst_dataset_exists: SynchronizedDict[str, bool] = SynchronizedDict({}) 

264 self.src_properties: dict[str, DatasetProperties] = {} 

265 self.dst_properties: dict[str, DatasetProperties] = {} 

266 self.all_exceptions: list[str] = [] 

267 self.all_exceptions_count: int = 0 

268 self.max_exceptions_to_summarize: int = 10000 

269 self.first_exception: BaseException | None = None 

270 self.remote_conf_cache: dict[tuple, RemoteConfCacheItem] = {} 

271 self.max_datasets_per_minibatch_on_list_snaps: dict[str, int] = {} 

272 self.max_workers: dict[str, int] = {} 

273 self.progress_reporter: ProgressReporter = cast(ProgressReporter, None) 

274 self.is_first_replication_task: Final[SynchronizedBool] = SynchronizedBool(True) 

275 self.replication_start_time_nanos: int = time.monotonic_ns() 

276 self.timeout_nanos: int | None = None # timestamp aka instant in time 

277 self.timeout_duration_nanos: int | None = None # duration (not a timestamp); for logging only 

278 self.cache: SnapshotCache = SnapshotCache(self) 

279 self.stats_lock: Final[threading.Lock] = threading.Lock() 

280 self.num_cache_hits: int = 0 

281 self.num_cache_misses: int = 0 

282 self.num_snapshots_found: int = 0 

283 self.num_snapshots_replicated: int = 0 

284 

285 self.is_test_mode: bool = False # for testing only 

286 self.creation_prefix: str = "" # for testing only 

287 self.use_select: bool = False # for testing only 

288 self.progress_update_intervals: tuple[float, float] | None = None # for testing only 

289 self.error_injection_triggers: dict[str, Counter[str]] = {} # for testing only 

290 self.delete_injection_triggers: dict[str, Counter[str]] = {} # for testing only 

291 self.param_injection_triggers: dict[str, dict[str, bool]] = {} # for testing only 

292 self.inject_params: dict[str, bool] = {} # for testing only 

293 self.injection_lock: threading.Lock = threading.Lock() # for testing only 

294 self.max_command_line_bytes: int | None = None # for testing only 

295 

296 def shutdown(self) -> None: 

297 """Exits any multiplexed ssh sessions that may be leftover.""" 

298 cache_items: Collection[RemoteConfCacheItem] = self.remote_conf_cache.values() 

299 for i, cache_item in enumerate(cache_items): 

300 cache_item.connection_pools.shutdown(f"{i + 1}/{len(cache_items)}") 

301 

302 def terminate(self) -> None: 

303 """Shuts down gracefully; also terminates descendant processes, if any.""" 

304 with xfinally(self.subprocesses.terminate_process_subtrees): 

305 self.shutdown() 

306 

307 def run_main(self, args: argparse.Namespace, sys_argv: list[str] | None = None, log: Logger | None = None) -> None: 

308 """Parses CLI arguments, sets up logging, and executes main job loop.""" 

309 assert isinstance(self.error_injection_triggers, dict) 

310 assert isinstance(self.delete_injection_triggers, dict) 

311 assert isinstance(self.inject_params, dict) 

312 logger_name_suffix: str = "" 

313 

314 def _reset_logger() -> None: 

315 if logger_name_suffix and log is not None: # reset Logger unless it's a Logger outside of our control 

316 reset_logger(log) 

317 

318 with xfinally(_reset_logger): # runs _reset_logger() on exit, without masking error raised in body of `with` block 

319 try: 

320 log_params: LogParams = LogParams(args) 

321 logger_name_suffix = "" if log is not None else log_params.logger_name_suffix 

322 log = bzfs_main.loggers.get_logger( 

323 log_params=log_params, args=args, log=log, logger_name_suffix=logger_name_suffix 

324 ) 

325 log.info("%s", f"Log file is: {log_params.log_file}") 

326 except BaseException as e: 

327 simple_log: Logger = get_simple_logger(PROG_NAME, logger_name_suffix=logger_name_suffix) 

328 try: 

329 simple_log.error("Log init: %s", e, exc_info=not isinstance(e, SystemExit)) 

330 finally: 

331 reset_logger(simple_log) 

332 raise 

333 

334 aux_args: list[str] = [] 

335 if getattr(args, "include_snapshot_plan", None): 

336 aux_args += args.include_snapshot_plan 

337 if getattr(args, "delete_dst_snapshots_except_plan", None): 

338 aux_args += args.delete_dst_snapshots_except_plan 

339 if len(aux_args) > 0: 

340 log.info("Auxiliary CLI arguments: %s", " ".join(aux_args)) 

341 args = argument_parser().parse_args(xappend(aux_args, "--", args.root_dataset_pairs), namespace=args) 

342 

343 def log_error_on_exit(error: Any, status_code: Any, exc_info: bool = False) -> None: 

344 log.error("%s%s", f"Exiting {PROG_NAME} with status code {status_code}. Cause: ", error, exc_info=exc_info) 

345 

346 try: 

347 log.info("CLI arguments: %s %s", " ".join(sys_argv or []), f"[uid: {os.getuid()}, euid: {os.geteuid()}]") 

348 if self.is_test_mode: 

349 log.log(LOG_TRACE, "Parsed CLI arguments: %s", args) 

350 self.params = p = Params(args, sys_argv or [], log_params, log, self.inject_params) 

351 self.timeout_duration_nanos = p.timeout_duration_nanos 

352 lock_file: str = p.lock_file_name() 

353 lock_fd = os.open( 

354 lock_file, os.O_WRONLY | os.O_TRUNC | os.O_CREAT | os.O_NOFOLLOW | os.O_CLOEXEC, FILE_PERMISSIONS 

355 ) 

356 with xfinally(lambda: os.close(lock_fd)): 

357 try: 

358 # Acquire an exclusive lock; will raise a BlockingIOError if lock is already held by this process or 

359 # another process. The (advisory) lock is auto-released when the process terminates or the fd is 

360 # closed. 

361 fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # LOCK_NB ... non-blocking 

362 except BlockingIOError: 

363 msg = "Exiting as same previous periodic job is still running without completion yet per " 

364 die(msg + lock_file, STILL_RUNNING_STATUS) 

365 

366 # xfinally: unlink the lock_file while still holding the flock on its fd - it's a correct and safe 

367 # standard POSIX pattern: 

368 # - Performing unlink() before close(fd) avoids a race where a subsequent bzfs process could recreate and 

369 # lock a fresh inode for the same path between our close() and a later unlink(). In that case, a late 

370 # unlink would delete the newer process's lock_file path. 

371 # - At this point, critical work is complete; the remaining steps are shutdown mechanics that have no 

372 # side effect, so this pattern is correct, safe, and simple. 

373 with xfinally(lambda: Path(lock_file).unlink(missing_ok=True)): # don't accumulate stale files 

374 try: 

375 self.run_tasks() # do the real work 

376 except BaseException: 

377 self.terminate() 

378 raise 

379 self.shutdown() 

380 with contextlib.suppress(BrokenPipeError): 

381 sys.stderr.flush() 

382 sys.stdout.flush() 

383 except subprocess.CalledProcessError as e: 

384 log_error_on_exit(e, e.returncode) 

385 raise 

386 except SystemExit as e: 

387 log_error_on_exit(e, e.code) 

388 raise 

389 except (subprocess.TimeoutExpired, UnicodeDecodeError) as e: 

390 log_error_on_exit(e, DIE_STATUS) 

391 raise SystemExit(DIE_STATUS) from e 

392 except re.error as e: 

393 log_error_on_exit(f"{e} within regex {e.pattern!r}", DIE_STATUS) 

394 raise SystemExit(DIE_STATUS) from e 

395 except BaseException as e: 

396 log_error_on_exit(e, DIE_STATUS, exc_info=True) 

397 raise SystemExit(DIE_STATUS) from e 

398 finally: 

399 log.info("%s", f"Log file was: {log_params.log_file}") 

400 log.info("Success. Goodbye!") 

401 with contextlib.suppress(BrokenPipeError): 

402 sys.stderr.flush() 

403 sys.stdout.flush() 

404 

405 def run_tasks(self) -> None: 

406 """Executes replication cycles, repeating until daemon lifetime expires.""" 

407 p, log = self.params, self.params.log 

408 self.all_exceptions = [] 

409 self.all_exceptions_count = 0 

410 self.first_exception = None 

411 self.remote_conf_cache = {} 

412 self.validate_once() 

413 self.replication_start_time_nanos = time.monotonic_ns() 

414 self.progress_reporter = ProgressReporter(log, p.pv_program_opts, self.use_select, self.progress_update_intervals) 

415 with xfinally(lambda: self.progress_reporter.stop()): 

416 daemon_stoptime_nanos: int = time.monotonic_ns() + p.daemon_lifetime_nanos 

417 while True: # loop for daemon mode 

418 self.timeout_nanos = ( 

419 None if p.timeout_duration_nanos is None else time.monotonic_ns() + p.timeout_duration_nanos 

420 ) 

421 self.all_dst_dataset_exists.clear() 

422 self.progress_reporter.reset() 

423 src, dst = p.src, p.dst 

424 for src_root_dataset, dst_root_dataset in p.root_dataset_pairs: 

425 if self.termination_event.is_set(): 

426 self.terminate() 

427 break 

428 src.root_dataset = src.basis_root_dataset = src_root_dataset 

429 dst.root_dataset = dst.basis_root_dataset = dst_root_dataset 

430 p.curr_zfs_send_program_opts = p.zfs_send_program_opts.copy() 

431 if p.daemon_lifetime_nanos > 0: 

432 self.timeout_nanos = ( 

433 None if p.timeout_duration_nanos is None else time.monotonic_ns() + p.timeout_duration_nanos 

434 ) 

435 recurs_sep = " " if p.recursive_flag else "" 

436 task_description = f"{src.basis_root_dataset} {p.recursive_flag}{recurs_sep}--> {dst.basis_root_dataset}" 

437 if len(p.root_dataset_pairs) > 1: 

438 log.info("Starting task: %s", task_description + " ...") 

439 try: 

440 try: 

441 self.maybe_inject_error(cmd=[], error_trigger="retryable_run_tasks") 

442 timeout(self) 

443 self.validate_task() 

444 self.run_task() # do the real work 

445 except RetryableError as retryable_error: 

446 cause: BaseException | None = retryable_error.__cause__ 

447 assert cause is not None 

448 raise cause.with_traceback(cause.__traceback__) # noqa: B904 re-raise of cause without chaining 

449 except (subprocess.CalledProcessError, subprocess.TimeoutExpired, SystemExit, UnicodeDecodeError) as e: 

450 if p.skip_on_error == "fail" or ( 

451 isinstance(e, subprocess.TimeoutExpired) and p.daemon_lifetime_nanos == 0 

452 ): 

453 raise 

454 log.error("%s", e) 

455 self.append_exception(e, "task", task_description) 

456 if not self.sleep_until_next_daemon_iteration(daemon_stoptime_nanos): 

457 break 

458 if not p.skip_replication: 

459 self.print_replication_stats(self.replication_start_time_nanos) 

460 error_count = self.all_exceptions_count 

461 if error_count > 0 and p.daemon_lifetime_nanos == 0: 

462 msgs = "\n".join(f"{i + 1}/{error_count}: {e}" for i, e in enumerate(self.all_exceptions)) 

463 log.error("%s", f"Tolerated {error_count} errors. Error Summary: \n{msgs}") 

464 assert self.first_exception is not None 

465 raise self.first_exception 

466 

467 def append_exception(self, e: BaseException, task_name: str, task_description: str) -> None: 

468 """Records and logs an exception that was encountered while running a subtask.""" 

469 self.first_exception = self.first_exception or e 

470 if len(self.all_exceptions) < self.max_exceptions_to_summarize: # cap max memory consumption 

471 self.all_exceptions.append(str(e)) 

472 self.all_exceptions_count += 1 

473 self.params.log.error(f"#{self.all_exceptions_count}: Done with %s: %s", task_name, task_description) 

474 

475 def sleep_until_next_daemon_iteration(self, daemon_stoptime_nanos: int) -> bool: 

476 """Pauses until next scheduled snapshot time or daemon stop; Returns True to continue daemon loop; False to stop.""" 

477 sleep_nanos: int = daemon_stoptime_nanos - time.monotonic_ns() 

478 if sleep_nanos <= 0: 

479 return False 

480 self.progress_reporter.pause() 

481 p, log = self.params, self.params.log 

482 config: CreateSrcSnapshotConfig = p.create_src_snapshots_config 

483 curr_datetime: datetime = config.current_datetime + timedelta(microseconds=1) 

484 next_snapshotting_event_dt: datetime = min( 

485 ( 

486 config.anchors.round_datetime_up_to_duration_multiple(curr_datetime, duration_amount, duration_unit) 

487 for duration_amount, duration_unit in config.suffix_durations.values() 

488 ), 

489 default=curr_datetime + timedelta(days=10 * 365), # infinity 

490 ) 

491 offset: timedelta = next_snapshotting_event_dt - datetime.now(config.tz) 

492 offset_nanos: int = (offset.days * 86400 + offset.seconds) * 1_000_000_000 + offset.microseconds * 1_000 

493 sleep_nanos = min(sleep_nanos, max(0, offset_nanos)) 

494 log.info("Daemon sleeping for: %s%s", human_readable_duration(sleep_nanos), f" ... [Log {p.log_params.log_file}]") 

495 self.termination_event.wait(sleep_nanos / 1_000_000_000) # allow early wakeup on async termination 

496 config.current_datetime = datetime.now(config.tz) 

497 return time.monotonic_ns() < daemon_stoptime_nanos and not self.termination_event.is_set() 

498 

499 def print_replication_stats(self, start_time_nanos: int) -> None: 

500 """Logs overall replication statistics after a job cycle completes.""" 

501 p, log = self.params, self.params.log 

502 elapsed_nanos = time.monotonic_ns() - start_time_nanos 

503 msg = p.dry(f"zfs sent {self.num_snapshots_replicated} snapshots in {human_readable_duration(elapsed_nanos)}.") 

504 if p.is_program_available("pv", "local"): 

505 sent_bytes: int = count_num_bytes_transferred_by_zfs_send(p.log_params.pv_log_file) 

506 sent_bytes_per_sec: int = round(1_000_000_000 * sent_bytes / (elapsed_nanos or 1)) 

507 msg += f" zfs sent {human_readable_bytes(sent_bytes)} [{human_readable_bytes(sent_bytes_per_sec)}/s]." 

508 log.info("%s", msg.ljust(p.log_params.terminal_columns - len("2024-01-01 23:58:45 [I] "))) 

509 

510 def validate_once(self) -> None: 

511 """Validates CLI parameters and compiles regex lists one time only, which will later be reused many times.""" 

512 p = self.params 

513 p.zfs_recv_ox_names = self.recv_option_property_names(p.zfs_recv_program_opts) 

514 for snapshot_filter in p.snapshot_filters: 

515 for _filter in snapshot_filter: 

516 if _filter.name == SNAPSHOT_REGEX_FILTER_NAME: 

517 exclude_snapshot_regexes = compile_regexes(_filter.options[0]) 

518 include_snapshot_regexes = compile_regexes(_filter.options[1] or [".*"]) 

519 _filter.options = (exclude_snapshot_regexes, include_snapshot_regexes) 

520 

521 exclude_regexes: list[str] = [EXCLUDE_DATASET_REGEXES_DEFAULT] 

522 if len(p.args.exclude_dataset_regex) > 0: # some patterns don't exclude anything 

523 exclude_regexes = [regex for regex in p.args.exclude_dataset_regex if regex != "" and regex != "!.*"] 

524 include_regexes: list[str] = p.args.include_dataset_regex 

525 

526 # relative datasets need not be compiled more than once as they don't change between tasks 

527 def separate_abs_vs_rel_datasets(datasets: list[str]) -> tuple[list[str], list[str]]: 

528 abs_datasets: list[str] = [] 

529 rel_datasets: list[str] = [] 

530 for dataset in datasets: 

531 (abs_datasets if dataset.startswith("/") else rel_datasets).append(dataset) 

532 return abs_datasets, rel_datasets 

533 

534 p.abs_exclude_datasets, rel_exclude_datasets = separate_abs_vs_rel_datasets(p.args.exclude_dataset) 

535 p.abs_include_datasets, rel_include_datasets = separate_abs_vs_rel_datasets(p.args.include_dataset) 

536 suffix = DESCENDANTS_RE_SUFFIX 

537 p.tmp_exclude_dataset_regexes, p.tmp_include_dataset_regexes = ( 

538 compile_regexes(exclude_regexes + dataset_regexes(p.src, p.dst, rel_exclude_datasets), suffix=suffix), 

539 compile_regexes(include_regexes + dataset_regexes(p.src, p.dst, rel_include_datasets), suffix=suffix), 

540 ) 

541 

542 if p.pv_program != DISABLE_PRG: 

543 pv_program_opts_set = set(p.pv_program_opts) 

544 if pv_program_opts_set.isdisjoint({"--bytes", "-b", "--bits", "-8"}): 

545 die("--pv-program-opts must contain one of --bytes or --bits for progress metrics to function.") 

546 if not p.log_params.quiet: 

547 for opts in [["--eta", "-e"], ["--fineta", "-I"], ["--average-rate", "-a"]]: 

548 if pv_program_opts_set.isdisjoint(opts): 

549 die(f"--pv-program-opts must contain one of {', '.join(opts)} for progress report line to function.") 

550 

551 src, dst = p.src, p.dst 

552 for remote in [src, dst]: 

553 r, loc = remote, remote.location 

554 validate_user_name(r.basis_ssh_user, f"--ssh-{loc}-user") 

555 validate_host_name(r.basis_ssh_host, f"--ssh-{loc}-host") 

556 validate_port(r.ssh_port, f"--ssh-{loc}-port ") 

557 

558 def validate_task(self) -> None: 

559 """Validates a single replication task before execution.""" 

560 p, log = self.params, self.params.log 

561 src, dst = p.src, p.dst 

562 for remote in [src, dst]: 

563 r = remote 

564 r.ssh_user, r.ssh_host, r.ssh_user_host, r.pool, r.root_dataset = parse_dataset_locator( 

565 r.basis_root_dataset, user=r.basis_ssh_user, host=r.basis_ssh_host, port=r.ssh_port 

566 ) 

567 r.sudo, r.use_zfs_delegation = self.sudo_cmd(r.ssh_user_host, r.ssh_user) 

568 local_addrs = ("",) if self.is_test_mode else ("", "127.0.0.1", "::1") # ::1 is IPv6 version of loopback address 

569 remote.is_nonlocal = r.ssh_host not in local_addrs 

570 self.dst_dataset_exists = SynchronizedDict(self.all_dst_dataset_exists[dst.ssh_user_host]) 

571 

572 if src.ssh_host == dst.ssh_host: 

573 msg = f"src: {src.basis_root_dataset}, dst: {dst.basis_root_dataset}" 

574 if src.root_dataset == dst.root_dataset: 

575 die(f"Source and destination dataset must not be the same! {msg}") 

576 if p.recursive and ( 

577 is_descendant(src.root_dataset, of_root_dataset=dst.root_dataset) 

578 or is_descendant(dst.root_dataset, of_root_dataset=src.root_dataset) 

579 ): 

580 die(f"Source and destination dataset trees must not overlap! {msg}") 

581 

582 suffx = DESCENDANTS_RE_SUFFIX # also match descendants of a matching dataset 

583 p.exclude_dataset_regexes, p.include_dataset_regexes = ( 

584 p.tmp_exclude_dataset_regexes + compile_regexes(dataset_regexes(src, dst, p.abs_exclude_datasets), suffix=suffx), 

585 p.tmp_include_dataset_regexes + compile_regexes(dataset_regexes(src, dst, p.abs_include_datasets), suffix=suffx), 

586 ) 

587 if len(p.include_dataset_regexes) == 0: 

588 p.include_dataset_regexes = [(re.compile(r".*"), False)] 

589 

590 detect_available_programs(self) 

591 

592 if is_zpool_feature_enabled_or_active(p, dst, "feature@large_blocks"): 

593 append_if_absent(p.curr_zfs_send_program_opts, "--large-block") 

594 

595 self.max_workers = {} 

596 self.max_datasets_per_minibatch_on_list_snaps = {} 

597 for r in [src, dst]: 

598 cpus = int(p.available_programs[r.location].get("getconf_cpu_count", 8)) 

599 threads, is_percent = p.threads 

600 cpus = max(1, round(cpus * threads / 100.0) if is_percent else round(threads)) 

601 self.max_workers[r.location] = cpus 

602 bs = max(1, p.max_datasets_per_batch_on_list_snaps) # 1024 by default 

603 max_datasets_per_minibatch = p.max_datasets_per_minibatch_on_list_snaps 

604 if max_datasets_per_minibatch <= 0: 

605 max_datasets_per_minibatch = max(1, bs // cpus) 

606 max_datasets_per_minibatch = min(bs, max_datasets_per_minibatch) 

607 self.max_datasets_per_minibatch_on_list_snaps[r.location] = max_datasets_per_minibatch 

608 log.log( 

609 LOG_TRACE, 

610 "%s", 

611 f"max_datasets_per_batch_on_list_snaps: {p.max_datasets_per_batch_on_list_snaps}, " 

612 f"max_datasets_per_minibatch_on_list_snaps: {max_datasets_per_minibatch}, " 

613 f"max_workers: {self.max_workers[r.location]}, " 

614 f"location: {r.location}", 

615 ) 

616 if self.is_test_mode: 

617 log.log(LOG_TRACE, "Validated Param values: %s", pretty_print_formatter(self.params)) 

618 

619 def sudo_cmd(self, ssh_user_host: str, ssh_user: str) -> tuple[str, bool]: 

620 """Returns sudo command prefix and whether root privileges are required.""" 

621 p: Params = self.params 

622 assert isinstance(ssh_user_host, str) 

623 assert isinstance(ssh_user, str) 

624 assert isinstance(p.sudo_program, str) 

625 assert isinstance(p.enable_privilege_elevation, bool) 

626 

627 is_root: bool = True 

628 if ssh_user_host != "": 

629 if ssh_user == "": 

630 if os.getuid() != 0: 

631 is_root = False 

632 elif ssh_user != "root": 

633 is_root = False 

634 elif os.getuid() != 0: 

635 is_root = False 

636 

637 if is_root: 

638 sudo = "" # using sudo in an attempt to make ZFS operations work even if we are not root user? 

639 use_zfs_delegation = False # or instead using 'zfs allow' delegation? 

640 return sudo, use_zfs_delegation 

641 elif p.enable_privilege_elevation: 

642 if p.sudo_program == DISABLE_PRG: 

643 die(f"sudo CLI is not available on host: {ssh_user_host or 'localhost'}") 

644 # The '-n' option makes 'sudo' safer and more fail-fast. It avoids having sudo prompt the user for input of any 

645 # kind. If a password is required for the sudo command to run, sudo will display an error message and exit. 

646 return p.sudo_program + " -n", False 

647 else: 

648 return "", True 

649 

650 def run_task(self) -> None: 

651 """Replicates all snapshots for the current root dataset pair.""" 

652 

653 def filter_src_datasets() -> list[str]: # apply --{include|exclude}-dataset policy 

654 return filter_datasets(self, src, basis_src_datasets) if src_datasets is None else src_datasets 

655 

656 p, log = self.params, self.params.log 

657 src, dst = p.src, p.dst 

658 max_workers: int = min(self.max_workers[src.location], self.max_workers[dst.location]) 

659 recursive_sep: str = " " if p.recursive_flag else "" 

660 task_description: str = f"{src.basis_root_dataset} {p.recursive_flag}{recursive_sep}--> {dst.basis_root_dataset} ..." 

661 failed: bool = False 

662 src_datasets: list[str] | None = None 

663 basis_src_datasets: list[str] = [] 

664 self.src_properties = {} 

665 self.dst_properties = {} 

666 if not is_dummy(src): # find src dataset or all datasets in src dataset tree (with --recursive) 

667 basis_src_datasets = self.list_src_datasets_task() 

668 

669 if not p.create_src_snapshots_config.skip_create_src_snapshots: 

670 log.info(p.dry("--create-src-snapshots: %s"), f"{src.basis_root_dataset} {p.recursive_flag}{recursive_sep}...") 

671 src_datasets = filter_src_datasets() # apply include/exclude policy 

672 self.create_src_snapshots_task(basis_src_datasets, src_datasets) 

673 

674 # Optionally, replicate src.root_dataset (optionally including its descendants) to dst.root_dataset 

675 if not p.skip_replication: 

676 if len(basis_src_datasets) == 0: 

677 die(f"Replication: Source dataset does not exist: {src.basis_root_dataset}") 

678 if is_dummy(dst): 

679 die("Replication: Destination may be a dummy dataset only if exclusively creating snapshots on the source!") 

680 src_datasets = filter_src_datasets() # apply include/exclude policy 

681 failed = self.replicate_datasets(src_datasets, task_description, max_workers) 

682 

683 if failed or not ( 

684 p.delete_dst_datasets 

685 or p.delete_dst_snapshots 

686 or p.delete_empty_dst_datasets 

687 or p.compare_snapshot_lists 

688 or p.monitor_snapshots_config.enable_monitor_snapshots 

689 ): 

690 return 

691 log.info("Listing dst datasets: %s", task_description) 

692 if is_dummy(dst): 

693 die("Destination may be a dummy dataset only if exclusively creating snapshots on the source!") 

694 basis_dst_datasets: list[str] = self.list_dst_datasets_task() 

695 dst_datasets: list[str] = filter_datasets(self, dst, basis_dst_datasets) # apply include/exclude policy 

696 

697 if p.delete_dst_datasets and not failed: 

698 log.info(p.dry("--delete-dst-datasets: %s"), task_description) 

699 basis_dst_datasets, dst_datasets = self.delete_dst_datasets_task( 

700 basis_src_datasets, basis_dst_datasets, dst_datasets 

701 ) 

702 

703 if p.delete_dst_snapshots and not failed: 

704 log.info(p.dry("--delete-dst-snapshots: %s"), task_description + f" [{len(dst_datasets)} datasets]") 

705 failed = self.delete_destination_snapshots_task(basis_src_datasets, dst_datasets, max_workers, task_description) 

706 

707 if p.delete_empty_dst_datasets and p.recursive and not failed: 

708 log.info(p.dry("--delete-empty-dst-datasets: %s"), task_description) 

709 basis_dst_datasets, dst_datasets = self.delete_empty_dst_datasets_task(basis_dst_datasets, dst_datasets) 

710 

711 if p.compare_snapshot_lists and not failed: 

712 log.info("--compare-snapshot-lists: %s", task_description) 

713 if len(basis_src_datasets) == 0 and not is_dummy(src): 

714 die(f"Source dataset does not exist: {src.basis_root_dataset}") 

715 src_datasets = filter_src_datasets() # apply include/exclude policy 

716 run_compare_snapshot_lists(self, src_datasets, dst_datasets) 

717 

718 if p.monitor_snapshots_config.enable_monitor_snapshots and not failed: 

719 log.info("--monitor-snapshots: %s", task_description) 

720 src_datasets = filter_src_datasets() # apply include/exclude policy 

721 self.monitor_snapshots_task(src_datasets, dst_datasets, task_description) 

722 

723 def list_src_datasets_task(self) -> list[str]: 

724 """Lists datasets on the source host.""" 

725 p = self.params 

726 src = p.src 

727 basis_src_datasets: list[str] = [] 

728 is_caching: bool = is_caching_snapshots(p, src) 

729 props: str = "volblocksize,recordsize,name" 

730 props = "snapshots_changed," + props if is_caching else props 

731 cmd: list[str] = p.split_args( 

732 f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o {props} {p.recursive_flag}", src.root_dataset 

733 ) 

734 for line in (self.try_ssh_command_with_retries(src, LOG_DEBUG, cmd=cmd) or "").splitlines(): 

735 cols: list[str] = line.split("\t") 

736 snapshots_changed, volblocksize, recordsize, src_dataset = cols if is_caching else ["-"] + cols 

737 self.src_properties[src_dataset] = DatasetProperties( 

738 recordsize=int(recordsize) if recordsize != "-" else -int(volblocksize), 

739 snapshots_changed=int(snapshots_changed) if snapshots_changed and snapshots_changed != "-" else 0, 

740 ) 

741 basis_src_datasets.append(src_dataset) 

742 assert (not self.is_test_mode) or basis_src_datasets == sorted(basis_src_datasets), "List is not sorted" 

743 return basis_src_datasets 

744 

745 def list_dst_datasets_task(self) -> list[str]: 

746 """Lists datasets on the destination host.""" 

747 p, log = self.params, self.params.log 

748 dst = p.dst 

749 is_caching: bool = is_caching_snapshots(p, dst) and p.monitor_snapshots_config.enable_monitor_snapshots 

750 props: str = "name" 

751 props = "snapshots_changed," + props if is_caching else props 

752 cmd: list[str] = p.split_args( 

753 f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o {props} {p.recursive_flag}", dst.root_dataset 

754 ) 

755 basis_dst_datasets: list[str] = [] 

756 basis_dst_datasets_str: str | None = self.try_ssh_command_with_retries(dst, LOG_TRACE, cmd=cmd) 

757 if basis_dst_datasets_str is None: 

758 log.warning("Destination dataset does not exist: %s", dst.root_dataset) 

759 else: 

760 for line in basis_dst_datasets_str.splitlines(): 

761 cols: list[str] = line.split("\t") 

762 snapshots_changed, dst_dataset = cols if is_caching else ["-"] + cols 

763 self.dst_properties[dst_dataset] = DatasetProperties( 

764 recordsize=0, 

765 snapshots_changed=int(snapshots_changed) if snapshots_changed and snapshots_changed != "-" else 0, 

766 ) 

767 basis_dst_datasets.append(dst_dataset) 

768 assert (not self.is_test_mode) or basis_dst_datasets == sorted(basis_dst_datasets), "List is not sorted" 

769 return basis_dst_datasets 

770 

771 def create_src_snapshots_task(self, basis_src_datasets: list[str], src_datasets: list[str]) -> None: 

772 """Atomically creates a new snapshot of the src datasets selected by --{include|exclude}-dataset* policy; implements 

773 --create-src-snapshots. 

774 

775 The implementation attempts to fit as many datasets as possible into a single (atomic) 'zfs snapshot' command line, 

776 using lexicographical sort order, and using 'zfs snapshot -r' to the extent that this is compatible with the 

777 --{include|exclude}-dataset* pruning policy. The snapshots of all datasets that fit within the same single 'zfs 

778 snapshot' CLI invocation will be taken within the same ZFS transaction group, and correspondingly have identical 

779 'createtxg' ZFS property (but not necessarily identical 'creation' ZFS time property as ZFS actually provides no such 

780 guarantee), and thus be consistent. Dataset names that can't fit into a single command line are spread over multiple 

781 command line invocations, respecting the limits that the operating system places on the maximum length of a single 

782 command line, per `getconf ARG_MAX`. 

783 

784 Time complexity is O((N log N) + (N * M log M)) where N is the number of datasets and M is the number of snapshots 

785 per dataset. Space complexity is O(max(N, M)). 

786 """ 

787 p, log = self.params, self.params.log 

788 src = p.src 

789 if len(basis_src_datasets) == 0: 

790 die(f"Source dataset does not exist: {src.basis_root_dataset}") 

791 datasets_to_snapshot: dict[SnapshotLabel, list[str]] = self.find_datasets_to_snapshot(src_datasets) 

792 datasets_to_snapshot = {label: datasets for label, datasets in datasets_to_snapshot.items() if len(datasets) > 0} 

793 basis_datasets_to_snapshot: dict[SnapshotLabel, list[str]] = datasets_to_snapshot.copy() # shallow copy 

794 commands: dict[SnapshotLabel, list[str]] = {} 

795 for label, datasets in datasets_to_snapshot.items(): 

796 cmd: list[str] = p.split_args(f"{src.sudo} {p.zfs_program} snapshot") 

797 if p.recursive: 

798 # Run 'zfs snapshot -r' on the roots of subtrees if possible, else fallback to non-recursive CLI flavor 

799 root_datasets = self.root_datasets_if_recursive_zfs_snapshot_is_possible(datasets, basis_src_datasets) 

800 if root_datasets is not None: 

801 cmd.append("-r") # recursive; takes a snapshot of all datasets in the subtree(s) 

802 datasets_to_snapshot[label] = root_datasets 

803 commands[label] = cmd 

804 creation_msg = f"Creating {sum(len(datasets) for datasets in basis_datasets_to_snapshot.values())} snapshots" 

805 log.info(p.dry("--create-src-snapshots: %s"), f"{creation_msg} within {len(src_datasets)} datasets ...") 

806 # create snapshots in large (parallel) batches, without using a command line that's too big for the OS to handle 

807 run_ssh_cmd_parallel( 

808 self, 

809 src, 

810 ((commands[lbl], (f"{ds}@{lbl}" for ds in datasets)) for lbl, datasets in datasets_to_snapshot.items()), 

811 fn=lambda cmd, batch: self.run_ssh_command_with_retries( 

812 src, is_dry=p.dry_run, print_stdout=True, cmd=cmd + batch, retry_on_generic_ssh_error=False 

813 ), # retry_on_generic_ssh_error=False means only retry on SSH connect errors b/c `zfs snapshot` isn't idempotent 

814 max_batch_items=2**29, 

815 ) 

816 if is_caching_snapshots(p, src): 

817 # perf: copy lastmodified time of source dataset into local cache to reduce future 'zfs list -t snapshot' calls 

818 self.cache.update_last_modified_cache(basis_datasets_to_snapshot) 

819 

820 def delete_destination_snapshots_task( 

821 self, basis_src_datasets: list[str], dst_datasets: list[str], max_workers: int, task_description: str 

822 ) -> bool: 

823 """Deletes existing destination snapshots that do not exist within the source dataset if they are included by the 

824 --{include|exclude}-snapshot-* policy, and the destination dataset is included via --{include|exclude}-dataset* 

825 policy; implements --delete-dst-snapshots.""" 

826 p, log = self.params, self.params.log 

827 src, dst = p.src, p.dst 

828 kind: str = "bookmark" if p.delete_dst_bookmarks else "snapshot" 

829 filter_needs_creation_time: bool = has_timerange_filter(p.snapshot_filters) 

830 props: str = self.creation_prefix + "creation,guid,name" if filter_needs_creation_time else "guid,name" 

831 basis_src_datasets_set: set[str] = set(basis_src_datasets) 

832 num_snapshots_found, num_snapshots_deleted = 0, 0 

833 

834 def delete_destination_snapshots(dst_dataset: str, tid: str, retry: Retry) -> bool: # thread-safe 

835 src_dataset: str = replace_prefix(dst_dataset, old_prefix=dst.root_dataset, new_prefix=src.root_dataset) 

836 if src_dataset in basis_src_datasets_set and (are_bookmarks_enabled(p, src) or not p.delete_dst_bookmarks): 

837 src_kind: str = kind 

838 if not p.delete_dst_snapshots_no_crosscheck: 

839 src_kind = "snapshot,bookmark" if are_bookmarks_enabled(p, src) else "snapshot" 

840 src_cmd = p.split_args(f"{p.zfs_program} list -t {src_kind} -d 1 -s name -Hp -o guid", src_dataset) 

841 else: 

842 src_cmd = None 

843 dst_cmd = p.split_args(f"{p.zfs_program} list -t {kind} -d 1 -s createtxg -Hp -o {props}", dst_dataset) 

844 self.maybe_inject_delete(dst, dataset=dst_dataset, delete_trigger="zfs_list_delete_dst_snapshots") 

845 src_snaps_with_guids, dst_snaps_with_guids_str = run_in_parallel( # list src+dst snapshots in parallel 

846 lambda: set(self.run_ssh_command(src, LOG_TRACE, cmd=src_cmd).splitlines() if src_cmd else []), 

847 lambda: self.try_ssh_command(dst, LOG_TRACE, cmd=dst_cmd), 

848 ) 

849 if dst_snaps_with_guids_str is None: 

850 log.warning("Third party deleted destination: %s", dst_dataset) 

851 return False 

852 dst_snaps_with_guids: list[str] = dst_snaps_with_guids_str.splitlines() 

853 num_dst_snaps_with_guids = len(dst_snaps_with_guids) 

854 basis_dst_snaps_with_guids: list[str] = dst_snaps_with_guids.copy() 

855 if p.delete_dst_bookmarks: 

856 replace_in_lines(dst_snaps_with_guids, old="#", new="@", count=1) # treat bookmarks as snapshots 

857 # The check against the source dataset happens *after* filtering the dst snapshots with filter_snapshots(). 

858 # `p.delete_dst_snapshots_except` means the user wants to specify snapshots to *retain* aka *keep* 

859 all_except: bool = p.delete_dst_snapshots_except 

860 if p.delete_dst_snapshots_except and not is_dummy(src): 

861 # However, as here we are in "except" mode AND the source is NOT a dummy, we first filter to get what 

862 # the policy says to *keep* (so all_except=False for the filter_snapshots() call), then from that "keep" 

863 # list, we later further refine by checking what's on the source dataset. 

864 all_except = False 

865 dst_snaps_with_guids = filter_snapshots(self, dst_snaps_with_guids, all_except=all_except) 

866 if p.delete_dst_bookmarks: 

867 replace_in_lines(dst_snaps_with_guids, old="@", new="#", count=1) # restore pre-filtering bookmark state 

868 if filter_needs_creation_time: 

869 dst_snaps_with_guids = cut(field=2, lines=dst_snaps_with_guids) 

870 basis_dst_snaps_with_guids = cut(field=2, lines=basis_dst_snaps_with_guids) 

871 if p.delete_dst_snapshots_except and not is_dummy(src): # Non-dummy Source + "Except" (Keep) Mode 

872 # Retain dst snapshots that match snapshot filter policy AND are on src dataset, aka 

873 # Delete dst snapshots except snapshots that match snapshot filter policy AND are on src dataset. 

874 # Concretely, `dst_snaps_with_guids` contains GUIDs of DST snapshots that the filter policy says to KEEP. 

875 # We only actually keep them if they are ALSO on the SRC. 

876 # So, snapshots to DELETE (`dst_tags_to_delete`) are ALL snapshots on DST (`basis_dst_snaps_with_guids`) 

877 # EXCEPT those whose GUIDs are in `dst_snaps_with_guids` AND ALSO in `src_snaps_with_guids`. 

878 except_dst_guids: set[str] = set(cut(field=1, lines=dst_snaps_with_guids)).intersection(src_snaps_with_guids) 

879 dst_tags_to_delete: list[str] = filter_lines_except(basis_dst_snaps_with_guids, except_dst_guids) 

880 else: # Standard Delete Mode OR Dummy Source + "Except" (Keep) Mode 

881 # In standard delete mode: 

882 # `dst_snaps_with_guids` contains GUIDs of policy-selected snapshots on DST. 

883 # We delete those that are NOT on SRC. 

884 # `dst_tags_to_delete` = `dst_snaps_with_guids` - `src_snaps_with_guids`. 

885 # In dummy source + "except" (keep) mode: 

886 # `all_except` was True. 

887 # `dst_snaps_with_guids` contains snaps NOT matching the "keep" policy -- these are the ones to delete. 

888 # `src_snaps_with_guids` is empty. 

889 # `dst_tags_to_delete` = `dst_snaps_with_guids` - {} = `dst_snaps_with_guids`. 

890 dst_guids_to_delete = set(cut(field=1, lines=dst_snaps_with_guids)).difference(src_snaps_with_guids) 

891 dst_tags_to_delete = filter_lines(dst_snaps_with_guids, dst_guids_to_delete) 

892 separator: str = "#" if p.delete_dst_bookmarks else "@" 

893 dst_tags_to_delete = cut(field=2, separator=separator, lines=dst_tags_to_delete) 

894 if p.delete_dst_bookmarks: 

895 delete_bookmarks(self, dst, dst_dataset, snapshot_tags=dst_tags_to_delete) 

896 else: 

897 delete_snapshots(self, dst, dst_dataset, snapshot_tags=dst_tags_to_delete) 

898 with self.stats_lock: 

899 nonlocal num_snapshots_found 

900 num_snapshots_found += num_dst_snaps_with_guids 

901 nonlocal num_snapshots_deleted 

902 num_snapshots_deleted += len(dst_tags_to_delete) 

903 if len(dst_tags_to_delete) > 0 and not p.delete_dst_bookmarks: 

904 self.dst_properties[dst_dataset].snapshots_changed = 0 # invalidate cache 

905 return True 

906 

907 # Run delete_destination_snapshots(dataset) for each dataset, while handling errors, retries + parallel exec 

908 failed: bool = False 

909 if are_bookmarks_enabled(p, dst) or not p.delete_dst_bookmarks: 

910 start_time_nanos = time.monotonic_ns() 

911 failed = process_datasets_in_parallel_and_fault_tolerant( 

912 log=log, 

913 datasets=dst_datasets, 

914 process_dataset=delete_destination_snapshots, # lambda 

915 skip_tree_on_error=lambda dataset: False, 

916 skip_on_error=p.skip_on_error, 

917 max_workers=max_workers, 

918 termination_event=self.termination_event, 

919 termination_handler=self.terminate, 

920 enable_barriers=False, 

921 task_name="--delete-dst-snapshots", 

922 append_exception=self.append_exception, 

923 retry_policy=p.retry_policy, 

924 retry_options=self.retry_options, 

925 dry_run=p.dry_run, 

926 is_test_mode=self.is_test_mode, 

927 ) 

928 elapsed_nanos = time.monotonic_ns() - start_time_nanos 

929 log.info( 

930 p.dry("--delete-dst-snapshots: %s"), 

931 task_description + f" [Deleted {num_snapshots_deleted} out of {num_snapshots_found} {kind}s " 

932 f"within {len(dst_datasets)} datasets; took {human_readable_duration(elapsed_nanos)}]", 

933 ) 

934 return failed 

935 

936 def delete_dst_datasets_task( 

937 self, basis_src_datasets: list[str], basis_dst_datasets: list[str], sorted_dst_datasets: list[str] 

938 ) -> tuple[list[str], list[str]]: 

939 """Deletes existing destination datasets that do not exist within the source dataset if they are included via 

940 --{include|exclude}-dataset* policy; implements --delete-dst-datasets. 

941 

942 Do not recurse without --recursive. With --recursive, never delete non-selected dataset subtrees or their ancestors. 

943 """ 

944 p = self.params 

945 src, dst = p.src, p.dst 

946 children: dict[str, set[str]] = defaultdict(set) 

947 for dst_dataset in basis_dst_datasets: # Compute the direct children of each NON-FILTERED dataset 

948 parent: str = os.path.dirname(dst_dataset) 

949 children[parent].add(dst_dataset) 

950 to_delete: set[str] = set() 

951 for dst_dataset in reversed(sorted_dst_datasets): # Reverse order facilitates efficient O(N) time algorithm 

952 if children[dst_dataset].issubset(to_delete): 

953 to_delete.add(dst_dataset) # all children are deletable, thus the dataset itself is deletable too 

954 to_delete = to_delete.difference( 

955 replace_prefix(src_dataset, src.root_dataset, dst.root_dataset) for src_dataset in basis_src_datasets 

956 ) 

957 delete_datasets(self, dst, to_delete) 

958 sorted_dst_datasets = sorted(set(sorted_dst_datasets).difference(to_delete)) 

959 basis_dst_datasets = sorted(set(basis_dst_datasets).difference(to_delete)) 

960 return basis_dst_datasets, sorted_dst_datasets 

961 

962 def delete_empty_dst_datasets_task( 

963 self, basis_dst_datasets: list[str], sorted_dst_datasets: list[str] 

964 ) -> tuple[list[str], list[str]]: 

965 """Deletes any existing destination dataset that has no snapshot and no bookmark if all descendants of that dataset 

966 do not have a snapshot or bookmark either; implements --delete-empty-dst-datasets. 

967 

968 To do so, we walk the dataset list (conceptually, a tree) depth-first (i.e. sorted descending). If a dst dataset has 

969 zero snapshots and zero bookmarks and all its children are already marked as orphans, then it is itself an orphan, 

970 and we mark it as such. Walking in a reverse sorted way means that we efficiently check for zero snapshots/bookmarks 

971 not just over the direct children but the entire tree. Finally, delete all orphan datasets in an efficient batched 

972 way. 

973 """ 

974 p = self.params 

975 dst = p.dst 

976 

977 # Compute the direct children of each NON-FILTERED dataset. Thus, no non-selected dataset and no ancestor of a 

978 # non-selected dataset will ever be added to the "orphan" set. In other words, this treats non-selected dataset 

979 # subtrees as if they all had snapshots, so non-selected dataset subtrees and their ancestors are guaranteed 

980 # to not get deleted. 

981 children: dict[str, set[str]] = defaultdict(set) 

982 for dst_dataset in basis_dst_datasets: 

983 parent: str = os.path.dirname(dst_dataset) 

984 children[parent].add(dst_dataset) 

985 

986 def compute_orphans(datasets_having_snapshots: set[str]) -> set[str]: 

987 """Returns destination datasets having zero snapshots whose children are all orphans.""" 

988 orphans: set[str] = set() 

989 for dst_dataset in reversed(sorted_dst_datasets): # Reverse order facilitates efficient O(N) time algorithm 

990 if (dst_dataset not in datasets_having_snapshots) and children[dst_dataset].issubset(orphans): 

991 orphans.add(dst_dataset) 

992 return orphans 

993 

994 # Compute candidate orphan datasets, which reduces the list of datasets for which we list snapshots via 

995 # 'zfs list -t snapshot ...' from dst_datasets to a subset of dst_datasets, which in turn reduces I/O and improves 

996 # perf. Essentially, this eliminates the I/O to list snapshots for ancestors of excluded datasets. 

997 candidate_orphans: set[str] = compute_orphans(set()) 

998 

999 # Compute destination datasets having more than zero snapshots 

1000 dst_datasets_having_snapshots: set[str] = set() 

1001 with_bookmarks: bool = p.delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots and are_bookmarks_enabled(p, dst) 

1002 btype: str = "bookmark,snapshot" if with_bookmarks else "snapshot" 

1003 cmd: list[str] = p.split_args(f"{p.zfs_program} list -t {btype} -d 1 -S name -Hp -o name") 

1004 for snapshots in zfs_list_snapshots_in_parallel(self, dst, cmd, sorted(candidate_orphans), ordered=False): 

1005 if with_bookmarks: 

1006 replace_in_lines(snapshots, old="#", new="@", count=1) # treat bookmarks as snapshots 

1007 dst_datasets_having_snapshots.update(snap[0 : snap.index("@")] for snap in snapshots) # union 

1008 

1009 orphans = compute_orphans(dst_datasets_having_snapshots) # compute the real orphans 

1010 delete_datasets(self, dst, orphans) # finally, delete the orphan datasets in an efficient way 

1011 sorted_dst_datasets = sorted(set(sorted_dst_datasets).difference(orphans)) 

1012 basis_dst_datasets = sorted(set(basis_dst_datasets).difference(orphans)) 

1013 return basis_dst_datasets, sorted_dst_datasets 

1014 

1015 def monitor_snapshots_task( 

1016 self, sorted_src_datasets: list[str], sorted_dst_datasets: list[str], task_description: str 

1017 ) -> None: 

1018 """Monitors src and dst snapshots; implements --monitor-snapshots.""" 

1019 p, log = self.params, self.params.log 

1020 src, dst = p.src, p.dst 

1021 num_cache_hits: int = self.num_cache_hits 

1022 num_cache_misses: int = self.num_cache_misses 

1023 start_time_nanos: int = time.monotonic_ns() 

1024 run_in_parallel( 

1025 lambda: self.monitor_snapshots(dst, sorted_dst_datasets), 

1026 lambda: self.monitor_snapshots(src, sorted_src_datasets), 

1027 ) 

1028 elapsed: str = human_readable_duration(time.monotonic_ns() - start_time_nanos) 

1029 num_cache_hits = self.num_cache_hits - num_cache_hits 

1030 num_cache_misses = self.num_cache_misses - num_cache_misses 

1031 if num_cache_hits > 0 or num_cache_misses > 0: 

1032 msg = self._cache_hits_msg(hits=num_cache_hits, misses=num_cache_misses) 

1033 else: 

1034 msg = "" 

1035 log.info( 

1036 "--monitor-snapshots done: %s", 

1037 f"{task_description} [{len(sorted_src_datasets) + len(sorted_dst_datasets)} datasets; took {elapsed}{msg}]", 

1038 ) 

1039 

1040 def monitor_snapshots(self, remote: Remote, sorted_datasets: list[str]) -> None: 

1041 """Checks snapshot freshness and warns or errors out when limits are exceeded. 

1042 

1043 Alerts the user if the ZFS 'creation' time property of the latest or oldest snapshot for any specified snapshot name 

1044 pattern within the selected datasets is too old wrt. the specified age limit. The purpose is to check if snapshots 

1045 are successfully taken on schedule, successfully replicated on schedule, and successfully pruned on schedule. Process 

1046 exit code is 0, 1, 2 on OK, WARNING, CRITICAL, respectively. 

1047 

1048 Time complexity is O((N log N) + (N * M log M)) where N is the number of datasets and M is the number of snapshots 

1049 per dataset. Space complexity is O(max(N, M)). 

1050 """ 

1051 p, log = self.params, self.params.log 

1052 alerts: list[MonitorSnapshotAlert] = p.monitor_snapshots_config.alerts 

1053 labels: list[SnapshotLabel] = [alert.label for alert in alerts] 

1054 current_unixtime_millis: float = p.create_src_snapshots_config.current_datetime.timestamp() * 1000 

1055 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

1056 if is_caching_snapshots(p, remote): 

1057 props: dict[str, DatasetProperties] = self.dst_properties if remote is p.dst else self.src_properties 

1058 snapshots_changed_dict: dict[str, int] = {dataset: vals.snapshots_changed for dataset, vals in props.items()} 

1059 alerts_hash: str = sha256_128_urlsafe_base64(str(tuple(alerts))) 

1060 label_hashes: dict[SnapshotLabel, str] = { 

1061 label: sha256_128_urlsafe_base64(label.notimestamp_str()) for label in labels 

1062 } 

1063 is_caching: bool = False 

1064 

1065 def monitor_last_modified_cache_file(r: Remote, dataset: str, label: SnapshotLabel, alert_cfg: AlertConfig) -> str: 

1066 cache_label: str = os.path.join(MONITOR_CACHE_FILE_PREFIX, alert_cfg.kind[0], label_hashes[label], alerts_hash) 

1067 return self.cache.last_modified_cache_file(r, dataset, cache_label) 

1068 

1069 def alert_msg( 

1070 kind: str, dataset: str, snapshot: str, label: SnapshotLabel, snapshot_age_millis: float, delta_millis: int 

1071 ) -> str: 

1072 assert kind == "Latest" or kind == "Oldest" 

1073 lbl = f"{label.prefix}{label.infix}<timestamp>{label.suffix}" 

1074 if snapshot_age_millis >= current_unixtime_millis: 

1075 return f"No snapshot exists for {dataset}@{lbl}" 

1076 msg = f"{kind} snapshot for {dataset}@{lbl} is {human_readable_duration(snapshot_age_millis, unit='ms')} old" 

1077 s = f": @{snapshot}" if snapshot else "" 

1078 if delta_millis == -1: 

1079 return f"{msg}{s}" 

1080 return f"{msg} but should be at most {human_readable_duration(delta_millis, unit='ms')} old{s}" 

1081 

1082 def check_alert( 

1083 label: SnapshotLabel, alert_cfg: AlertConfig | None, creation_unixtime_secs: int, dataset: str, snapshot: str 

1084 ) -> None: # thread-safe 

1085 if alert_cfg is None: 

1086 return 

1087 if is_caching and not p.dry_run: # update cache with latest state from 'zfs list -t snapshot' 

1088 snapshots_changed: int = snapshots_changed_dict.get(dataset, 0) 

1089 cache_file: str = monitor_last_modified_cache_file(remote, dataset, label, alert_cfg) 

1090 set_last_modification_time_safe( 

1091 cache_file, unixtime_in_secs=(creation_unixtime_secs, snapshots_changed), if_more_recent=True 

1092 ) 

1093 warning_millis: int = alert_cfg.warning_millis 

1094 critical_millis: int = alert_cfg.critical_millis 

1095 alert_kind = alert_cfg.kind 

1096 snapshot_age_millis: float = current_unixtime_millis - creation_unixtime_secs * 1000 

1097 m = "--monitor_snapshots: " 

1098 if snapshot_age_millis > critical_millis: 

1099 msg = m + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, critical_millis) 

1100 log.critical("%s", msg) 

1101 if not p.monitor_snapshots_config.dont_crit: 

1102 die(msg, exit_code=CRITICAL_STATUS) 

1103 elif snapshot_age_millis > warning_millis: 

1104 msg = m + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, warning_millis) 

1105 log.warning("%s", msg) 

1106 if not p.monitor_snapshots_config.dont_warn: 

1107 die(msg, exit_code=WARNING_STATUS) 

1108 elif is_debug: 

1109 msg = m + "OK. " + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, delta_millis=-1) 

1110 log.debug("%s", msg) 

1111 

1112 def alert_latest_snapshot(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None: 

1113 alert: MonitorSnapshotAlert = alerts[i] 

1114 check_alert(alert.label, alert.latest, creation_unixtime_secs, dataset, snapshot) 

1115 

1116 def alert_oldest_snapshot(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None: 

1117 alert: MonitorSnapshotAlert = alerts[i] 

1118 check_alert(alert.label, alert.oldest, creation_unixtime_secs, dataset, snapshot) 

1119 

1120 def find_stale_datasets_and_check_alerts() -> list[str]: 

1121 """If the cache is enabled, check which datasets have changed to determine which datasets can be skipped cheaply, 

1122 that is, without incurring 'zfs list -t snapshots'. 

1123 

1124 This is done by comparing the "snapshots_changed" ZFS dataset property with the local cache. See 

1125 https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed 

1126 """ 

1127 stale_datasets: list[str] = [] 

1128 time_threshold: float = time.time() - MATURITY_TIME_THRESHOLD_SECS 

1129 for dataset in sorted_datasets: 

1130 is_stale_dataset: bool = False 

1131 snapshots_changed: int = snapshots_changed_dict.get(dataset, 0) 

1132 for alert in alerts: 

1133 for cfg in (alert.latest, alert.oldest): 

1134 if cfg is None: 

1135 continue 

1136 if ( 

1137 snapshots_changed != 0 

1138 and snapshots_changed < time_threshold 

1139 and ( # always True 

1140 cached_unix_times := self.cache.get_snapshots_changed2( 

1141 monitor_last_modified_cache_file(remote, dataset, alert.label, cfg) 

1142 ) 

1143 ) 

1144 and snapshots_changed == cached_unix_times[1] # cached snapshots_changed aka last modified time 

1145 and snapshots_changed >= cached_unix_times[0] # creation time of minmax snapshot aka access time 

1146 ): # cached state is still valid; emit an alert if the latest/oldest snapshot is too old 

1147 lbl = alert.label 

1148 check_alert(lbl, cfg, creation_unixtime_secs=cached_unix_times[0], dataset=dataset, snapshot="") 

1149 else: # cached state is no longer valid; fallback to 'zfs list -t snapshot' 

1150 is_stale_dataset = True 

1151 if is_stale_dataset: 

1152 stale_datasets.append(dataset) 

1153 return stale_datasets 

1154 

1155 # satisfy request from local cache as much as possible 

1156 if is_caching_snapshots(p, remote): 

1157 stale_datasets: list[str] = find_stale_datasets_and_check_alerts() 

1158 with self.stats_lock: 

1159 self.num_cache_misses += len(stale_datasets) 

1160 self.num_cache_hits += len(sorted_datasets) - len(stale_datasets) 

1161 else: 

1162 stale_datasets = sorted_datasets 

1163 

1164 # fallback to 'zfs list -t snapshot' for any remaining datasets, as these couldn't be satisfied from local cache 

1165 is_caching = is_caching_snapshots(p, remote) 

1166 datasets_without_snapshots: list[str] = self.handle_minmax_snapshots( 

1167 remote, stale_datasets, labels, fn_latest=alert_latest_snapshot, fn_oldest=alert_oldest_snapshot 

1168 ) 

1169 for dataset in datasets_without_snapshots: 

1170 for i in range(len(alerts)): 

1171 alert_latest_snapshot(i, creation_unixtime_secs=0, dataset=dataset, snapshot="") 

1172 alert_oldest_snapshot(i, creation_unixtime_secs=0, dataset=dataset, snapshot="") 

1173 

1174 def replicate_datasets(self, src_datasets: list[str], task_description: str, max_workers: int) -> bool: 

1175 """Replicates a list of datasets.""" 

1176 assert (not self.is_test_mode) or src_datasets == sorted(src_datasets), "List is not sorted" 

1177 p, log = self.params, self.params.log 

1178 src, dst = p.src, p.dst 

1179 self.num_snapshots_found = 0 

1180 self.num_snapshots_replicated = 0 

1181 log.info("Starting replication task: %s", task_description + f" [{len(src_datasets)} datasets]") 

1182 start_time_nanos: int = time.monotonic_ns() 

1183 

1184 def src2dst(src_dataset: str) -> str: 

1185 return replace_prefix(src_dataset, old_prefix=src.root_dataset, new_prefix=dst.root_dataset) 

1186 

1187 def dst2src(dst_dataset: str) -> str: 

1188 return replace_prefix(dst_dataset, old_prefix=dst.root_dataset, new_prefix=src.root_dataset) 

1189 

1190 def find_stale_datasets() -> tuple[list[str], dict[str, str]]: 

1191 """If the cache is enabled on replication, check which src datasets or dst datasets have changed to determine 

1192 which datasets can be skipped cheaply, i.e. without incurring 'zfs list -t snapshots'. 

1193 

1194 This is done by comparing the "snapshots_changed" ZFS dataset property with the local cache. See 

1195 https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed 

1196 """ 

1197 # First, check which src datasets have changed since the last replication to that destination 

1198 cache_files: dict[str, str] = {} 

1199 stale_src_datasets1: list[str] = [] 

1200 maybe_stale_dst_datasets: list[str] = [] 

1201 userhost_dir: str = sha256_85_urlsafe_base64(p.dst.cache_namespace()) 

1202 filter_key = tuple(tuple(f) for f in p.snapshot_filters) # cache is only valid for same --include/excl-snapshot* 

1203 filter_hash_code: str = sha256_85_urlsafe_base64(str(filter_key)) 

1204 for src_dataset in src_datasets: 

1205 dst_dataset: str = src2dst(src_dataset) # cache is only valid for identical destination dataset 

1206 dst_dataset_dir: str = sha256_85_urlsafe_base64(dst_dataset) 

1207 cache_label: str = os.path.join( 

1208 REPLICATION_CACHE_FILE_PREFIX, userhost_dir, dst_dataset_dir, filter_hash_code 

1209 ) 

1210 cache_file: str = self.cache.last_modified_cache_file(src, src_dataset, cache_label) 

1211 cache_files[src_dataset] = cache_file 

1212 snapshots_changed: int = self.src_properties[src_dataset].snapshots_changed # get prop "for free" 

1213 if ( 

1214 snapshots_changed != 0 

1215 and time.time() > snapshots_changed + MATURITY_TIME_THRESHOLD_SECS 

1216 and snapshots_changed == self.cache.get_snapshots_changed(cache_file) 

1217 ): 

1218 maybe_stale_dst_datasets.append(dst_dataset) 

1219 else: 

1220 stale_src_datasets1.append(src_dataset) 

1221 

1222 # For each src dataset that hasn't changed, check if the corresponding dst dataset has changed 

1223 stale_src_datasets2: list[str] = [] 

1224 dst_snapshots_changed_dict: dict[str, int] = self.cache.zfs_get_snapshots_changed(dst, maybe_stale_dst_datasets) 

1225 for dst_dataset in maybe_stale_dst_datasets: 

1226 snapshots_changed = dst_snapshots_changed_dict.get(dst_dataset, 0) 

1227 if ( 

1228 snapshots_changed != 0 

1229 and time.time() > snapshots_changed + MATURITY_TIME_THRESHOLD_SECS 

1230 and snapshots_changed 

1231 == self.cache.get_snapshots_changed(self.cache.last_modified_cache_file(dst, dst_dataset)) 

1232 ): 

1233 log.info("Already up-to-date [cached]: %s", dst_dataset) 

1234 else: 

1235 stale_src_datasets2.append(dst2src(dst_dataset)) 

1236 assert (not self.is_test_mode) or stale_src_datasets1 == sorted(stale_src_datasets1), "List is not sorted" 

1237 assert (not self.is_test_mode) or stale_src_datasets2 == sorted(stale_src_datasets2), "List is not sorted" 

1238 stale_src_datasets = list(heapq.merge(stale_src_datasets1, stale_src_datasets2)) # merge two sorted lists 

1239 assert (not self.is_test_mode) or not has_duplicates(stale_src_datasets), "List contains duplicates" 

1240 return stale_src_datasets, cache_files 

1241 

1242 if is_caching_snapshots(p, src): 

1243 stale_src_datasets, cache_files = find_stale_datasets() 

1244 num_cache_misses = len(stale_src_datasets) 

1245 num_cache_hits = len(src_datasets) - len(stale_src_datasets) 

1246 self.num_cache_misses += num_cache_misses 

1247 self.num_cache_hits += num_cache_hits 

1248 cmsg = self._cache_hits_msg(hits=num_cache_hits, misses=num_cache_misses) 

1249 else: 

1250 stale_src_datasets = src_datasets 

1251 cache_files = {} 

1252 cmsg = "" 

1253 

1254 done_src_datasets: list[str] = [] 

1255 done_src_datasets_lock: threading.Lock = threading.Lock() 

1256 

1257 def _process_dataset_fn(src_dataset: str, tid: str, retry: Retry) -> bool: 

1258 result: bool = replicate_dataset(job=self, src_dataset=src_dataset, tid=tid, retry=retry) 

1259 with done_src_datasets_lock: 

1260 done_src_datasets.append(src_dataset) # record datasets that were actually replicated (not skipped) 

1261 return result 

1262 

1263 # Run replicate_dataset(dataset) for each dataset, while taking care of errors, retries + parallel execution 

1264 failed: bool = process_datasets_in_parallel_and_fault_tolerant( 

1265 log=log, 

1266 datasets=stale_src_datasets, 

1267 process_dataset=_process_dataset_fn, 

1268 skip_tree_on_error=lambda dataset: not self.dst_dataset_exists[src2dst(dataset)], 

1269 skip_on_error=p.skip_on_error, 

1270 max_workers=max_workers, 

1271 termination_event=self.termination_event, 

1272 termination_handler=self.terminate, 

1273 enable_barriers=False, 

1274 task_name="Replication", 

1275 append_exception=self.append_exception, 

1276 retry_policy=p.retry_policy, 

1277 retry_options=self.retry_options, 

1278 dry_run=p.dry_run, 

1279 is_test_mode=self.is_test_mode, 

1280 ) 

1281 

1282 if is_caching_snapshots(p, src) and len(done_src_datasets) > 0: 

1283 # refresh "snapshots_changed" ZFS dataset property from dst 

1284 stale_dst_datasets: list[str] = [src2dst(src_dataset) for src_dataset in sorted(done_src_datasets)] 

1285 dst_snapshots_changed_dict: dict[str, int] = self.cache.zfs_get_snapshots_changed(dst, stale_dst_datasets) 

1286 for dst_dataset in stale_dst_datasets: # update local cache 

1287 dst_snapshots_changed: int = dst_snapshots_changed_dict.get(dst_dataset, 0) 

1288 dst_cache_file: str = self.cache.last_modified_cache_file(dst, dst_dataset) 

1289 src_dataset: str = dst2src(dst_dataset) 

1290 src_snapshots_changed: int = self.src_properties[src_dataset].snapshots_changed 

1291 if not p.dry_run: 

1292 set_last_modification_time_safe( 

1293 cache_files[src_dataset], unixtime_in_secs=src_snapshots_changed, if_more_recent=True 

1294 ) 

1295 set_last_modification_time_safe( 

1296 dst_cache_file, unixtime_in_secs=dst_snapshots_changed, if_more_recent=True 

1297 ) 

1298 

1299 elapsed_nanos: int = time.monotonic_ns() - start_time_nanos 

1300 log.info( 

1301 p.dry("Replication done: %s"), 

1302 f"{task_description} [Replicated {self.num_snapshots_replicated} out of {self.num_snapshots_found} snapshots" 

1303 f" within {len(src_datasets)} datasets; took {human_readable_duration(elapsed_nanos)}{cmsg}]", 

1304 ) 

1305 return failed 

1306 

1307 def maybe_inject_delete(self, remote: Remote, dataset: str, delete_trigger: str) -> None: 

1308 """For testing only; for unit tests to delete datasets during replication and test correct handling of that.""" 

1309 assert delete_trigger 

1310 counter = self.delete_injection_triggers.get("before") 

1311 if counter and self.decrement_injection_counter(counter, delete_trigger): 

1312 p = self.params 

1313 cmd = p.split_args(f"{remote.sudo} {p.zfs_program} destroy -r", p.force_unmount, p.force_hard, dataset or "") 

1314 self.run_ssh_command(remote, LOG_DEBUG, print_stdout=True, cmd=cmd) 

1315 

1316 def maybe_inject_params(self, error_trigger: str) -> None: 

1317 """For testing only; for unit tests to simulate errors during replication and test correct handling of them.""" 

1318 assert error_trigger 

1319 counter = self.error_injection_triggers.get("before") 

1320 if counter and self.decrement_injection_counter(counter, error_trigger): 

1321 self.inject_params = self.param_injection_triggers[error_trigger] 

1322 elif error_trigger in self.param_injection_triggers: 

1323 self.inject_params = {} 

1324 

1325 @staticmethod 

1326 def recv_option_property_names(recv_opts: list[str]) -> set[str]: 

1327 """Extracts -o and -x property names that are already specified on the command line; This can be used to check for 

1328 dupes because 'zfs receive' does not accept multiple -o or -x options with the same property name.""" 

1329 propnames: set[str] = set() 

1330 i = 0 

1331 n = len(recv_opts) 

1332 while i < n: 

1333 stripped: str = recv_opts[i].strip() 

1334 if stripped in ("-o", "-x"): 

1335 i += 1 

1336 if i == n or recv_opts[i].strip() in ("-o", "-x"): 

1337 die(f"Missing value for {stripped} option in --zfs-recv-program-opt(s): {' '.join(recv_opts)}") 

1338 if stripped == "-o" and "=" not in recv_opts[i]: 

1339 die(f"Missing value for {stripped} name=value pair in --zfs-recv-program-opt(s): {' '.join(recv_opts)}") 

1340 propname: str = recv_opts[i] if stripped == "-x" else recv_opts[i].split("=", 1)[0] 

1341 validate_property_name(propname, "--zfs-recv-program-opt(s)") 

1342 propnames.add(propname) 

1343 i += 1 

1344 return propnames 

1345 

1346 def root_datasets_if_recursive_zfs_snapshot_is_possible( 

1347 self, datasets: list[str], basis_datasets: list[str] 

1348 ) -> list[str] | None: 

1349 """Returns the root datasets within the (filtered) `datasets` list if no incompatible pruning is detected. A dataset 

1350 within `datasets` is considered a root dataset if it has no parent, i.e. it is not a descendant of any dataset in 

1351 `datasets`. Returns `None` if any (unfiltered) dataset in `basis_dataset` that is a descendant of at least one of the 

1352 root datasets is missing in `datasets`, indicating that --include/exclude-dataset* or the snapshot schedule have 

1353 pruned a dataset in a way that is incompatible with 'zfs snapshot -r' CLI semantics, thus requiring a switch to the 

1354 non-recursive 'zfs snapshot snapshot1 .. snapshot N' CLI flavor. 

1355 

1356 Assumes that set(datasets).issubset(set(basis_datasets)). Also assumes that datasets and basis_datasets are both 

1357 sorted (and thus the output root_datasets list is sorted too), which is why this algorithm is efficient - O(N) time 

1358 complexity. The impl is akin to the merge algorithm of a merge sort, adapted to our specific use case. 

1359 See root_datasets_if_recursive_zfs_snapshot_is_possible_slow_but_correct() in the unit test suite for an alternative 

1360 impl that's easier to grok. 

1361 """ 

1362 assert (not self.is_test_mode) or datasets == sorted(datasets), "List is not sorted" 

1363 assert (not self.is_test_mode) or not has_duplicates(datasets), "List contains duplicates" 

1364 assert (not self.is_test_mode) or basis_datasets == sorted(basis_datasets), "List is not sorted" 

1365 assert (not self.is_test_mode) or not has_duplicates(basis_datasets), "List contains duplicates" 

1366 assert (not self.is_test_mode) or set(datasets).issubset(set(basis_datasets)), "Not a subset" 

1367 root_datasets: list[str] = self.find_root_datasets(datasets) 

1368 i = 0 

1369 j = 0 

1370 k = 0 

1371 len_root_datasets = len(root_datasets) 

1372 len_basis_datasets = len(basis_datasets) 

1373 len_datasets = len(datasets) 

1374 while i < len_root_datasets and j < len_basis_datasets: # walk and "merge" the sorted lists, in sync 

1375 if basis_datasets[j] < root_datasets[i]: # irrelevant subtree? 

1376 j += 1 # move to next basis_datasets[j] 

1377 elif is_descendant(basis_datasets[j], of_root_dataset=root_datasets[i]): # relevant subtree? 

1378 while k < len_datasets and datasets[k] < basis_datasets[j]: 

1379 k += 1 # move to next datasets[k] 

1380 if k == len_datasets or datasets[k] != basis_datasets[j]: # dataset chopped off by schedule or --incl/excl*? 

1381 return None # detected filter pruning that is incompatible with 'zfs snapshot -r' 

1382 j += 1 # move to next basis_datasets[j] 

1383 else: 

1384 i += 1 # move to next root_dataset[i]; no need to check root_datasets that are no longer (or not yet) reachable 

1385 return root_datasets 

1386 

1387 @staticmethod 

1388 def find_root_datasets(sorted_datasets: list[str]) -> list[str]: 

1389 """Returns the roots of the subtrees in the (sorted) input datasets; The output root dataset list is sorted, too; A 

1390 dataset is a root dataset if it has no parent, i.e. it is not a descendant of any dataset in the input datasets.""" 

1391 root_datasets: list[str] = [] 

1392 skip_dataset: str = DONT_SKIP_DATASET 

1393 for dataset in sorted_datasets: 

1394 if is_descendant(dataset, of_root_dataset=skip_dataset): 

1395 continue 

1396 skip_dataset = dataset 

1397 root_datasets.append(dataset) 

1398 return root_datasets 

1399 

1400 def find_datasets_to_snapshot(self, sorted_datasets: list[str]) -> dict[SnapshotLabel, list[str]]: 

1401 """Given a (sorted) list of source datasets, returns a dict where the key is a snapshot name (aka SnapshotLabel, e.g. 

1402 bzfs_2024-11-06_08:30:05_hourly) and the value is the (sorted) (sub)list of datasets for which a snapshot needs to be 

1403 created with that name, because these datasets are due per the schedule, either because the 'creation' time of their 

1404 most recent snapshot with that name pattern is now too old, or such a snapshot does not even exist. 

1405 

1406 The baseline implementation uses the 'zfs list -t snapshot' CLI to find the most recent snapshots, which is simple 

1407 but doesn't scale well with the number of snapshots, at least if the goal is to take snapshots every second. An 

1408 alternative, much more scalable, implementation queries the standard ZFS "snapshots_changed" dataset property 

1409 (requires zfs >= 2.2.0), in combination with a local cache that stores this property, as well as the creation time of 

1410 the most recent snapshot, for each SnapshotLabel and each dataset. 

1411 """ 

1412 p, log = self.params, self.params.log 

1413 src = p.src 

1414 config: CreateSrcSnapshotConfig = p.create_src_snapshots_config 

1415 datasets_to_snapshot: dict[SnapshotLabel, list[str]] = defaultdict(list) 

1416 is_caching: bool = False 

1417 interner: HashedInterner[datetime] = HashedInterner() # reduces memory footprint 

1418 msgs: list[tuple[datetime, str, SnapshotLabel, str]] = [] 

1419 

1420 def create_snapshot_if_latest_is_too_old( 

1421 datasets_to_snapshot: dict[SnapshotLabel, list[str]], dataset: str, label: SnapshotLabel, creation_unixtime: int 

1422 ) -> None: # thread-safe 

1423 """Schedules creation of a snapshot for the given label if the label's existing latest snapshot is too old.""" 

1424 creation_dt: datetime = datetime.fromtimestamp(creation_unixtime, tz=config.tz) 

1425 log.log(LOG_TRACE, "Latest snapshot creation: %s for %s", creation_dt, label) 

1426 duration_amount, duration_unit = config.suffix_durations[label.suffix] 

1427 next_event_dt: datetime = config.anchors.round_datetime_up_to_duration_multiple( 

1428 creation_dt + timedelta(microseconds=1), duration_amount, duration_unit 

1429 ) 

1430 msg: str = "" 

1431 if config.current_datetime >= next_event_dt: 

1432 datasets_to_snapshot[label].append(dataset) # mark it as scheduled for snapshot creation 

1433 msg = " has passed" 

1434 next_event_dt = interner.intern(next_event_dt) 

1435 msgs.append((next_event_dt, dataset, label, msg)) 

1436 if is_caching and not p.dry_run: # update cache with latest state from 'zfs list -t snapshot' 

1437 # Per-label cache stores (atime=creation, mtime=snapshots_changed) so later runs can safely trust creation 

1438 # only when the label's mtime matches the current dataset-level '=' cache value. Excludes timestamp of label. 

1439 cache_file: str = self.cache.last_modified_cache_file(src, dataset, label_hashes[label]) 

1440 unixtimes: tuple[int, int] = (creation_unixtime, self.src_properties[dataset].snapshots_changed) 

1441 set_last_modification_time_safe(cache_file, unixtime_in_secs=unixtimes, if_more_recent=True) 

1442 

1443 labels: list[SnapshotLabel] = [] 

1444 config_labels: list[SnapshotLabel] = config.snapshot_labels() 

1445 for label in config_labels: 

1446 duration_amount_, _duration_unit = config.suffix_durations[label.suffix] 

1447 if duration_amount_ == 0 or config.create_src_snapshots_even_if_not_due: 

1448 datasets_to_snapshot[label] = sorted_datasets # take snapshot regardless of creation time of existing snaps 

1449 else: 

1450 labels.append(label) 

1451 if len(labels) == 0: 

1452 return datasets_to_snapshot # nothing more TBD 

1453 label_hashes: dict[SnapshotLabel, str] = { 

1454 label: sha256_128_urlsafe_base64(label.notimestamp_str()) for label in labels 

1455 } 

1456 

1457 # satisfy request from local cache as much as possible 

1458 cached_datasets_to_snapshot: dict[SnapshotLabel, list[str]] = defaultdict(list) 

1459 if is_caching_snapshots(p, src): 

1460 sorted_datasets_todo: list[str] = [] 

1461 time_threshold: float = time.time() - MATURITY_TIME_THRESHOLD_SECS 

1462 for dataset in sorted_datasets: 

1463 cache: SnapshotCache = self.cache 

1464 cached_snapshots_changed: int = cache.get_snapshots_changed(cache.last_modified_cache_file(src, dataset)) 

1465 if cached_snapshots_changed == 0: 

1466 sorted_datasets_todo.append(dataset) # request cannot be answered from cache 

1467 continue 

1468 if cached_snapshots_changed != self.src_properties[dataset].snapshots_changed: # get that prop "for free" 

1469 cache.invalidate_last_modified_cache_dataset(dataset) 

1470 sorted_datasets_todo.append(dataset) # request cannot be answered from cache 

1471 continue 

1472 if cached_snapshots_changed >= time_threshold: # Avoid equal-second races: only trust matured cache entries 

1473 sorted_datasets_todo.append(dataset) # cache entry isn't mature enough to be trusted; skip cache 

1474 continue 

1475 creation_unixtimes: list[int] = [] 

1476 for label_hash in label_hashes.values(): 

1477 # For per-label files, atime stores the latest matching snapshot's creation time, while mtime stores 

1478 # the dataset-level snapshots_changed observed when this label file was written. 

1479 atime, mtime = cache.get_snapshots_changed2(cache.last_modified_cache_file(src, dataset, label_hash)) 

1480 # Sanity check: trust per-label cache only when: 

1481 # - mtime equals the dataset-level '=' cache (same snapshots_changed), and 

1482 # - atime is plausible and not later than mtime (creation <= snapshots_changed), and 

1483 # - neither atime nor mtime is zero (unknown provenance). 

1484 # Otherwise fall back to 'zfs list -t snapshot' to avoid stale creation times after newer changes. 

1485 if atime == 0 or mtime == 0 or mtime != cached_snapshots_changed or atime > mtime: 

1486 sorted_datasets_todo.append(dataset) # request cannot be answered from cache 

1487 break 

1488 creation_unixtimes.append(atime) 

1489 if len(creation_unixtimes) == len(labels): 

1490 for j, label in enumerate(labels): 

1491 create_snapshot_if_latest_is_too_old( 

1492 cached_datasets_to_snapshot, dataset, label, creation_unixtimes[j] 

1493 ) 

1494 sorted_datasets = sorted_datasets_todo 

1495 

1496 def create_snapshot_fn(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None: 

1497 create_snapshot_if_latest_is_too_old(datasets_to_snapshot, dataset, labels[i], creation_unixtime_secs) 

1498 

1499 def on_finish_dataset(dataset: str) -> None: 

1500 if is_caching_snapshots(p, src) and not p.dry_run: 

1501 set_last_modification_time_safe( 

1502 self.cache.last_modified_cache_file(src, dataset), 

1503 unixtime_in_secs=self.src_properties[dataset].snapshots_changed, 

1504 if_more_recent=True, 

1505 ) 

1506 

1507 # fallback to 'zfs list -t snapshot' for any remaining datasets, as these couldn't be satisfied from local cache 

1508 is_caching = is_caching_snapshots(p, src) 

1509 datasets_without_snapshots: list[str] = self.handle_minmax_snapshots( 

1510 src, sorted_datasets, labels, fn_latest=create_snapshot_fn, fn_on_finish_dataset=on_finish_dataset 

1511 ) 

1512 for lbl in labels: # merge (sorted) results from local cache + 'zfs list -t snapshot' into (sorted) combined result 

1513 datasets_to_snapshot[lbl].sort() 

1514 if datasets_without_snapshots or (lbl in cached_datasets_to_snapshot): # +take snaps for snapshot-less datasets 

1515 datasets_to_snapshot[lbl] = list( # inputs to merge() are sorted, and outputs are sorted too 

1516 heapq.merge(datasets_to_snapshot[lbl], cached_datasets_to_snapshot[lbl], datasets_without_snapshots) 

1517 ) 

1518 for label, datasets in datasets_to_snapshot.items(): 

1519 assert (not self.is_test_mode) or datasets == sorted(datasets), "List is not sorted" 

1520 assert (not self.is_test_mode) or not has_duplicates(datasets), "List contains duplicates" 

1521 assert label 

1522 

1523 msgs.sort() # sort by time, dataset, label 

1524 for i in range(0, len(msgs), 10_000): # reduce logging overhead via mini-batching 

1525 text = "".join( 

1526 f"\nNext scheduled snapshot time: {next_event_dt} for {dataset}@{label}{msg}" 

1527 for next_event_dt, dataset, label, msg in msgs[i : i + 10_000] 

1528 ) 

1529 log.info("Next scheduled snapshot times ...%s", text) 

1530 

1531 # sort keys to ensure that we take snapshots for dailies before hourlies, and so on 

1532 label_indexes: dict[SnapshotLabel, int] = {label: k for k, label in enumerate(config_labels)} 

1533 datasets_to_snapshot = dict(sorted(datasets_to_snapshot.items(), key=lambda kv: label_indexes[kv[0]])) 

1534 return datasets_to_snapshot 

1535 

1536 def handle_minmax_snapshots( 

1537 self, 

1538 remote: Remote, 

1539 sorted_datasets: list[str], 

1540 labels: list[SnapshotLabel], 

1541 fn_latest: Callable[[int, int, str, str], None], # callback function for latest snapshot 

1542 fn_oldest: Callable[[int, int, str, str], None] | None = None, # callback function for oldest snapshot 

1543 fn_on_finish_dataset: Callable[[str], None] = lambda dataset: None, 

1544 ) -> list[str]: # thread-safe 

1545 """For each dataset in `sorted_datasets`, for each label in `labels`, finds the latest and oldest snapshot, and runs 

1546 the callback functions on them; Ignores the timestamp of the input labels and the timestamp of the snapshot names.""" 

1547 assert (not self.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted" 

1548 p = self.params 

1549 cmd = p.split_args(f"{p.zfs_program} list -t snapshot -d 1 -Hp -o createtxg,creation,name") # sort dataset,createtxg 

1550 datasets_with_snapshots: set[str] = set() 

1551 interner: SortedInterner[str] = SortedInterner(sorted_datasets) # reduces memory footprint 

1552 for lines in zfs_list_snapshots_in_parallel(self, remote, cmd, sorted_datasets, ordered=False): 

1553 # streaming group by dataset name (consumes constant memory only) 

1554 for dataset, group in itertools.groupby(lines, key=lambda line: line.rsplit("\t", 1)[1].split("@", 1)[0]): 

1555 dataset = interner.interned(dataset) 

1556 snapshots = sorted( # fetch all snapshots of current dataset and sort by createtxg,creation,name 

1557 (int(createtxg), int(creation_unixtime_secs), name.split("@", 1)[1]) 

1558 for createtxg, creation_unixtime_secs, name in (line.split("\t", 2) for line in group) 

1559 ) # perf: sorted() is fast because Timsort is close to O(N) for nearly sorted input, which is our case 

1560 assert len(snapshots) > 0 

1561 datasets_with_snapshots.add(dataset) 

1562 snapshot_names: tuple[str, ...] = tuple(snapshot[-1] for snapshot in snapshots) 

1563 reversed_snapshot_names: tuple[str, ...] = snapshot_names[::-1] 

1564 year_with_4_digits_regex: re.Pattern[str] = YEAR_WITH_FOUR_DIGITS_REGEX 

1565 year_with_4_digits_regex_fullmatch = year_with_4_digits_regex.fullmatch 

1566 startswith = str.startswith 

1567 endswith = str.endswith 

1568 fns = ((fn_latest, True),) if fn_oldest is None else ((fn_latest, True), (fn_oldest, False)) 

1569 for i, label in enumerate(labels): 

1570 infix: str = label.infix 

1571 start: str = label.prefix + infix 

1572 end: str = label.suffix 

1573 startlen: int = len(start) 

1574 endlen: int = len(end) 

1575 minlen: int = startlen + endlen if infix else 4 + startlen + endlen # year_with_four_digits_regex 

1576 startlen_4: int = startlen + 4 # [startlen:startlen+4] # year_with_four_digits_regex 

1577 has_infix: bool = bool(infix) 

1578 for fn, is_reverse in fns: 

1579 creation_unixtime_secs: int = 0 # find creation time of latest or oldest snapshot matching the label 

1580 minmax_snapshot: str = "" 

1581 for j, snapshot_name in enumerate(reversed_snapshot_names if is_reverse else snapshot_names): 

1582 if ( 

1583 endswith(snapshot_name, end) # aka snapshot_name.endswith(end) 

1584 and startswith(snapshot_name, start) # aka snapshot_name.startswith(start) 

1585 and len(snapshot_name) >= minlen 

1586 and (has_infix or year_with_4_digits_regex_fullmatch(snapshot_name, startlen, startlen_4)) 

1587 ): 

1588 k: int = len(snapshots) - j - 1 if is_reverse else j 

1589 creation_unixtime_secs = snapshots[k][1] 

1590 minmax_snapshot = snapshot_name 

1591 break 

1592 fn(i, creation_unixtime_secs, dataset, minmax_snapshot) 

1593 fn_on_finish_dataset(dataset) 

1594 datasets_without_snapshots = [dataset for dataset in sorted_datasets if dataset not in datasets_with_snapshots] 

1595 return datasets_without_snapshots 

1596 

1597 @staticmethod 

1598 def _cache_hits_msg(hits: int, misses: int) -> str: 

1599 total = hits + misses 

1600 return f", cache hits: {percent(hits, total, print_total=True)}, misses: {percent(misses, total, print_total=True)}" 

1601 

1602 def run_ssh_command( 

1603 self, 

1604 remote: MiniRemote, 

1605 loglevel: int = logging.INFO, 

1606 is_dry: bool = False, 

1607 check: bool = True, 

1608 print_stdout: bool = False, 

1609 print_stderr: bool = True, 

1610 cmd: list[str] | None = None, 

1611 retry_on_generic_ssh_error: bool = True, 

1612 ) -> str: 

1613 """Runs the given CLI cmd via ssh on the given remote, and returns stdout.""" 

1614 assert cmd is not None and isinstance(cmd, list) and len(cmd) > 0 

1615 conn_pool: ConnectionPool = self.params.connection_pools[remote.location].pool(SHARED) 

1616 with conn_pool.connection() as conn: 

1617 log: logging.Logger = self.params.log 

1618 try: 

1619 process: subprocess.CompletedProcess[str] = conn.run_ssh_command( 

1620 cmd=cmd, 

1621 job=self, 

1622 loglevel=loglevel, 

1623 is_dry=is_dry, 

1624 check=check, 

1625 stdin=DEVNULL, 

1626 stdout=PIPE, 

1627 stderr=PIPE, 

1628 text=True, 

1629 ) 

1630 except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e: 

1631 xprint(log, stderr_to_str(e.stdout) if print_stdout else e.stdout, run=print_stdout, file=sys.stdout, end="") 

1632 xprint(log, stderr_to_str(e.stderr) if print_stderr else e.stderr, run=print_stderr, file=sys.stderr, end="") 

1633 if retry_on_generic_ssh_error and isinstance(e, subprocess.CalledProcessError): 

1634 stderr: str = stderr_to_str(e.stderr) 

1635 if stderr.startswith("ssh: "): 

1636 assert e.returncode == 255, e.returncode # error within SSH itself (not during the remote command) 

1637 raise RetryableError("Subprocess failed", display_msg="ssh") from e 

1638 raise 

1639 else: 

1640 if is_dry: 

1641 return "" 

1642 xprint(log, process.stdout, run=print_stdout, file=sys.stdout, end="") 

1643 xprint(log, process.stderr, run=print_stderr, file=sys.stderr, end="") 

1644 return process.stdout 

1645 

1646 def try_ssh_command( 

1647 self, 

1648 remote: MiniRemote, 

1649 loglevel: int, 

1650 is_dry: bool = False, 

1651 print_stdout: bool = False, 

1652 cmd: list[str] | None = None, 

1653 exists: bool = True, 

1654 error_trigger: str | None = None, 

1655 ) -> str | None: 

1656 """Convenience method that helps retry/react to a dataset or pool that potentially doesn't exist anymore.""" 

1657 assert cmd is not None and isinstance(cmd, list) and len(cmd) > 0 

1658 log = self.params.log 

1659 try: 

1660 self.maybe_inject_error(cmd=cmd, error_trigger=error_trigger) 

1661 return self.run_ssh_command(remote=remote, loglevel=loglevel, is_dry=is_dry, print_stdout=print_stdout, cmd=cmd) 

1662 except (subprocess.CalledProcessError, UnicodeDecodeError) as e: 

1663 if not isinstance(e, UnicodeDecodeError): 

1664 stderr: str = stderr_to_str(e.stderr) 

1665 if exists and ( 

1666 ": dataset does not exist" in stderr 

1667 or ": filesystem does not exist" in stderr # solaris 11.4.0 

1668 or ": no such pool" in stderr 

1669 or "does not have any resumable receive state to abort" in stderr # harmless `zfs receive -A` race 

1670 ): 

1671 return None 

1672 log.warning("%s", stderr.rstrip()) 

1673 raise RetryableError("Subprocess failed") from e 

1674 

1675 def try_ssh_command_with_retries(self, *args: Any, **kwargs: Any) -> str | None: 

1676 """Convenience method that auto-retries try_ssh_command() on failure.""" 

1677 p = self.params 

1678 return call_with_retries( 

1679 fn=lambda retry: self.try_ssh_command(*args, **kwargs), 

1680 policy=p.retry_policy, 

1681 config=self.retry_options.config, 

1682 giveup=self.retry_options.giveup, 

1683 after_attempt=self.retry_options.after_attempt, 

1684 log=p.log, 

1685 ) 

1686 

1687 def run_ssh_command_with_retries(self, *args: Any, **kwargs: Any) -> str: 

1688 """Convenience method that auto-retries run_ssh_command() on transport failure (not on remote command failure).""" 

1689 p = self.params 

1690 return call_with_retries( 

1691 fn=lambda retry: self.run_ssh_command(*args, **kwargs), 

1692 policy=p.retry_policy, 

1693 config=self.retry_options.config, 

1694 giveup=self.retry_options.giveup, 

1695 after_attempt=self.retry_options.after_attempt, 

1696 log=p.log, 

1697 ) 

1698 

1699 def maybe_inject_error(self, cmd: list[str], error_trigger: str | None = None) -> None: 

1700 """For testing only; for unit tests to simulate errors during replication and test correct handling of them.""" 

1701 if error_trigger: 

1702 counter = self.error_injection_triggers.get("before") 

1703 if counter and self.decrement_injection_counter(counter, error_trigger): 

1704 try: 

1705 raise CalledProcessError(returncode=1, cmd=" ".join(cmd), stderr=error_trigger + ":dataset is busy") 

1706 except subprocess.CalledProcessError as e: 

1707 if error_trigger.startswith("retryable_"): 

1708 raise RetryableError("Subprocess failed") from e 

1709 else: 

1710 raise 

1711 

1712 def decrement_injection_counter(self, counter: Counter[str], trigger: str) -> bool: 

1713 """For testing only.""" 

1714 with self.injection_lock: 

1715 if counter[trigger] <= 0: 

1716 return False 

1717 counter[trigger] -= 1 

1718 return True 

1719 

1720 

1721############################################################################# 

1722@final 

1723class DatasetProperties: 

1724 """Properties of a ZFS dataset.""" 

1725 

1726 __slots__ = ("recordsize", "snapshots_changed") # uses more compact memory layout than __dict__ 

1727 

1728 def __init__(self, recordsize: int, snapshots_changed: int) -> None: 

1729 # immutable variables: 

1730 self.recordsize: Final[int] = recordsize 

1731 

1732 # mutable variables: 

1733 self.snapshots_changed: int = snapshots_changed 

1734 

1735 

1736############################################################################# 

1737# Input format is [[user@]host:]dataset 

1738# 1234 5 6 

1739_DATASET_LOCATOR_REGEX: Final[re.Pattern[str]] = re.compile(r"(((([^@]*)@)?([^:]+)):)?(.*)", flags=re.DOTALL) 

1740 

1741 

1742def parse_dataset_locator( 

1743 input_text: str, validate: bool = True, user: str | None = None, host: str | None = None, port: int | None = None 

1744) -> tuple[str, str, str, str, str]: 

1745 """Splits user@host:dataset into its components with optional checks.""" 

1746 

1747 def convert_ipv6(hostname: str) -> str: # support IPv6 without getting confused by host:dataset colon separator ... 

1748 return hostname.replace("|", ":") # ... and any colons that may be part of a (valid) ZFS dataset name 

1749 

1750 user_undefined: bool = user is None 

1751 if user is None: 

1752 user = "" 

1753 host_undefined: bool = host is None 

1754 if host is None: 

1755 host = "" 

1756 host = convert_ipv6(host) 

1757 user_host, dataset, pool = "", "", "" 

1758 

1759 if match := _DATASET_LOCATOR_REGEX.fullmatch(input_text): 1759 ↛ 1776line 1759 didn't jump to line 1776 because the condition on line 1759 was always true

1760 if user_undefined: 

1761 user = match.group(4) or "" 

1762 if host_undefined: 

1763 host = match.group(5) or "" 

1764 host = convert_ipv6(host) 

1765 if host == "-": 

1766 host = "" 

1767 dataset = match.group(6) or "" 

1768 i = dataset.find("/") 

1769 pool = dataset[0:i] if i >= 0 else dataset 

1770 

1771 if user and host: 

1772 user_host = f"{user}@{host}" 

1773 elif host: 

1774 user_host = host 

1775 

1776 if validate: 

1777 validate_user_name(user, input_text) 

1778 validate_host_name(host, input_text) 

1779 if port is not None: 

1780 validate_port(port, f"Invalid port number: '{port}' for: '{input_text}' - ") 

1781 validate_dataset_name(dataset, input_text) 

1782 

1783 return user, host, user_host, pool, dataset 

1784 

1785 

1786def validate_user_name(user: str, input_text: str) -> None: 

1787 """Checks that the username is safe for ssh or local usage.""" 

1788 invalid_chars: str = SHELL_CHARS + "/" 

1789 if user and (".." in user or any(c.isspace() or c in invalid_chars for c in user)): 

1790 die(f"Invalid user name: '{user}' for: '{input_text}'") 

1791 

1792 

1793def validate_host_name(host: str, input_text: str) -> None: 

1794 """Checks hostname for forbidden characters or patterns.""" 

1795 invalid_chars: str = SHELL_CHARS + "/" 

1796 if host and (host.startswith("-") or ".." in host or any(c.isspace() or c in invalid_chars for c in host)): 

1797 die(f"Invalid host name: '{host}' for: '{input_text}'") 

1798 

1799 

1800def validate_port(port: str | int | None, message: str) -> None: 

1801 """Checks that port specification is a valid integer.""" 

1802 if isinstance(port, int): 

1803 port = str(port) 

1804 if port and not port.isdigit(): 

1805 die(message + f"must be empty or a positive integer: '{port}'") 

1806 

1807 

1808############################################################################# 

1809if __name__ == "__main__": 1809 ↛ 1810line 1809 didn't jump to line 1810 because the condition on line 1809 was never true

1810 main()