Coverage for bzfs_main / bzfs.py: 99%

1087 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-24 10:16 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15# Inline script metadata conforming to https://packaging.python.org/specifications/inline-script-metadata 

16# /// script 

17# requires-python = ">=3.9" 

18# dependencies = [] 

19# /// 

20# 

21""" 

22* Main CLI entry point for replicating and managing ZFS snapshots. It handles the low-level mechanics of `zfs send/receive`, 

23 data transfer, and snapshot management between two hosts. 

24* Overview of the bzfs.py codebase: 

25* The codebase starts with docs, definition of input data and associated argument parsing into a "Params" class. 

26* All CLI option/parameter values are reachable from the "Params" class. 

27* Control flow starts in main(), which kicks off a "Job". 

28* A Job runs one or more "tasks" via run_tasks(), each task replicating a separate dataset tree. 

29* The core replication algorithm is in run_task() and especially in replicate_datasets() and replicate_dataset(). 

30* The filter algorithms that apply include/exclude policies are in filter_datasets() and filter_snapshots(). 

31* The --create-src-snapshots-* and --delete-* and --compare-* and --monitor-* algorithms also start in run_task(). 

32* The main retry logic is in call_with_retries() and clear_resumable_recv_state_if_necessary(). 

33* Progress reporting for use during `zfs send/recv` data transfers is in class ProgressReporter. 

34* Executing a CLI command on a local or remote host is in run_ssh_command(). 

35* Network connection management is in refresh_ssh_connection_if_necessary() and class ConnectionPool. 

36* Cache functionality can be found by searching for this regex: .*cach.* 

37* The parallel processing engine is in itr_ssh_cmd_parallel() and process_datasets_in_parallel_and_fault_tolerant(). 

38* README.md is mostly auto-generated from the ArgumentParser help texts as the source of "truth", via update_readme.sh. 

39 Simply run that script whenever you change or add ArgumentParser help text. 

40""" 

41 

42from __future__ import ( 

43 annotations, 

44) 

45import argparse 

46import contextlib 

47import fcntl 

48import heapq 

49import itertools 

50import logging 

51import os 

52import re 

53import subprocess 

54import sys 

55import threading 

56import time 

57from collections import ( 

58 Counter, 

59 defaultdict, 

60) 

61from collections.abc import ( 

62 Collection, 

63 Sequence, 

64) 

65from datetime import ( 

66 datetime, 

67 timedelta, 

68) 

69from logging import ( 

70 Logger, 

71) 

72from pathlib import ( 

73 Path, 

74) 

75from subprocess import ( 

76 DEVNULL, 

77 PIPE, 

78 CalledProcessError, 

79 TimeoutExpired, 

80) 

81from typing import ( 

82 Any, 

83 Callable, 

84 Final, 

85 cast, 

86 final, 

87) 

88 

89import bzfs_main.loggers 

90from bzfs_main.argparse_actions import ( 

91 has_timerange_filter, 

92) 

93from bzfs_main.argparse_cli import ( 

94 EXCLUDE_DATASET_REGEXES_DEFAULT, 

95) 

96from bzfs_main.compare_snapshot_lists import ( 

97 run_compare_snapshot_lists, 

98) 

99from bzfs_main.configuration import ( 

100 AlertConfig, 

101 CreateSrcSnapshotConfig, 

102 LogParams, 

103 MonitorSnapshotAlert, 

104 Params, 

105 Remote, 

106 SnapshotLabel, 

107) 

108from bzfs_main.detect import ( 

109 DISABLE_PRG, 

110 RemoteConfCacheItem, 

111 are_bookmarks_enabled, 

112 detect_available_programs, 

113 is_caching_snapshots, 

114 is_dummy, 

115 is_zpool_feature_enabled_or_active, 

116) 

117from bzfs_main.filter import ( 

118 SNAPSHOT_REGEX_FILTER_NAME, 

119 dataset_regexes, 

120 filter_datasets, 

121 filter_lines, 

122 filter_lines_except, 

123 filter_snapshots, 

124) 

125from bzfs_main.loggers import ( 

126 get_simple_logger, 

127 reset_logger, 

128 set_logging_runtime_defaults, 

129) 

130from bzfs_main.parallel_batch_cmd import ( 

131 run_ssh_cmd_parallel, 

132 zfs_list_snapshots_in_parallel, 

133) 

134from bzfs_main.progress_reporter import ( 

135 ProgressReporter, 

136 count_num_bytes_transferred_by_zfs_send, 

137) 

138from bzfs_main.replication import ( 

139 delete_bookmarks, 

140 delete_datasets, 

141 delete_snapshots, 

142 replicate_dataset, 

143) 

144from bzfs_main.snapshot_cache import ( 

145 MATURITY_TIME_THRESHOLD_SECS, 

146 MONITOR_CACHE_FILE_PREFIX, 

147 REPLICATION_CACHE_FILE_PREFIX, 

148 SnapshotCache, 

149 set_last_modification_time_safe, 

150) 

151from bzfs_main.util.connection import ( 

152 SHARED, 

153 ConnectionPool, 

154 MiniJob, 

155 MiniRemote, 

156 timeout, 

157) 

158from bzfs_main.util.parallel_iterator import ( 

159 run_in_parallel, 

160) 

161from bzfs_main.util.parallel_tasktree_policy import ( 

162 process_datasets_in_parallel_and_fault_tolerant, 

163) 

164from bzfs_main.util.retry import ( 

165 Retry, 

166 RetryableError, 

167 RetryTemplate, 

168 RetryTerminationError, 

169 RetryTiming, 

170) 

171from bzfs_main.util.utils import ( 

172 DESCENDANTS_RE_SUFFIX, 

173 DIE_STATUS, 

174 DONT_SKIP_DATASET, 

175 FILE_PERMISSIONS, 

176 LOG_DEBUG, 

177 LOG_TRACE, 

178 PROG_NAME, 

179 SHELL_CHARS_AND_SLASH, 

180 UMASK, 

181 YEAR_WITH_FOUR_DIGITS_REGEX, 

182 HashedInterner, 

183 SortedInterner, 

184 Subprocesses, 

185 SynchronizedBool, 

186 SynchronizedDict, 

187 TaskTiming, 

188 append_if_absent, 

189 compile_regexes, 

190 cut, 

191 die, 

192 has_duplicates, 

193 human_readable_bytes, 

194 human_readable_duration, 

195 is_descendant, 

196 percent, 

197 pretty_print_formatter, 

198 replace_in_lines, 

199 replace_prefix, 

200 sha256_85_urlsafe_base64, 

201 sha256_128_urlsafe_base64, 

202 stderr_to_str, 

203 termination_signal_handler, 

204 validate_dataset_name, 

205 validate_property_name, 

206 xappend, 

207 xfinally, 

208 xprint, 

209) 

210 

211# constants: 

212__version__: Final[str] = bzfs_main.argparse_cli.__version__ 

213CRITICAL_STATUS: Final[int] = 2 

214WARNING_STATUS: Final[int] = 1 

215STILL_RUNNING_STATUS: Final[int] = 4 

216MIN_PYTHON_VERSION: Final[tuple[int, int]] = (3, 9) 

217if sys.version_info < MIN_PYTHON_VERSION: 

218 print(f"ERROR: {PROG_NAME} requires Python version >= {'.'.join(map(str, MIN_PYTHON_VERSION))}!") 

219 sys.exit(DIE_STATUS) 

220 

221 

222############################################################################# 

223def argument_parser() -> argparse.ArgumentParser: 

224 """Returns the CLI parser used by bzfs.""" 

225 return bzfs_main.argparse_cli.argument_parser() 

226 

227 

228def main() -> None: 

229 """API for command line clients.""" 

230 prev_umask: int = os.umask(UMASK) 

231 try: 

232 set_logging_runtime_defaults() 

233 # On CTRL-C and SIGTERM, send signal to all descendant processes to terminate them 

234 termination_event: threading.Event = threading.Event() 

235 with termination_signal_handler(termination_events=[termination_event]): 

236 run_main(argument_parser().parse_args(), sys.argv, termination_event=termination_event) 

237 except subprocess.CalledProcessError as e: 

238 sys.exit(normalize_called_process_error(e)) 

239 finally: 

240 os.umask(prev_umask) # restore prior global state 

241 

242 

243def run_main( 

244 args: argparse.Namespace, 

245 sys_argv: list[str] | None = None, 

246 log: Logger | None = None, 

247 termination_event: threading.Event | None = None, 

248) -> None: 

249 """API for Python clients; visible for testing; may become a public API eventually.""" 

250 Job(termination_event=termination_event).run_main(args, sys_argv, log) 

251 

252 

253############################################################################# 

254@final 

255class Job(MiniJob): 

256 """Executes one bzfs run, coordinating snapshot replication tasks.""" 

257 

258 def __init__(self, termination_event: threading.Event | None = None) -> None: 

259 self.params: Params 

260 self.termination_event: Final[threading.Event] = termination_event or threading.Event() 

261 self.retry_timing: Final[RetryTiming] = RetryTiming.make_from(self.termination_event).copy( 

262 on_before_attempt=lambda retry: None 

263 ) 

264 self.task_timing: Final[TaskTiming] = TaskTiming.make_from(self.termination_event) 

265 self.subprocesses: Subprocesses = Subprocesses(self.termination_event.is_set) 

266 self.all_dst_dataset_exists: Final[dict[str, dict[str, bool]]] = defaultdict(lambda: defaultdict(bool)) 

267 self.dst_dataset_exists: SynchronizedDict[str, bool] = SynchronizedDict({}) 

268 self.src_properties: dict[str, DatasetProperties] = {} 

269 self.dst_properties: dict[str, DatasetProperties] = {} 

270 self.all_exceptions: list[str] = [] 

271 self.all_exceptions_count: int = 0 

272 self.max_exceptions_to_summarize: int = 10000 

273 self.first_exception: BaseException | None = None 

274 self.remote_conf_cache: dict[tuple, RemoteConfCacheItem] = {} 

275 self.max_datasets_per_minibatch_on_list_snaps: dict[str, int] = {} 

276 self.max_workers: dict[str, int] = {} 

277 self.progress_reporter: ProgressReporter = cast(ProgressReporter, None) 

278 self.is_first_replication_task: Final[SynchronizedBool] = SynchronizedBool(True) 

279 self.replication_start_time_nanos: int = time.monotonic_ns() 

280 self.timeout_nanos: int | None = None # timestamp aka instant in time 

281 self.timeout_duration_nanos: int | None = None # duration (not a timestamp); for logging only 

282 self.cache: SnapshotCache = SnapshotCache(self) 

283 self.stats_lock: Final[threading.Lock] = threading.Lock() 

284 self.num_cache_hits: int = 0 

285 self.num_cache_misses: int = 0 

286 self.num_snapshots_found: int = 0 

287 self.num_snapshots_replicated: int = 0 

288 

289 self.is_test_mode: bool = False # for testing only 

290 self.creation_prefix: str = "" # for testing only 

291 self.use_select: bool = False # for testing only 

292 self.progress_update_intervals: tuple[float, float] | None = None # for testing only 

293 self.error_injection_triggers: dict[str, Counter[str]] = {} # for testing only 

294 self.delete_injection_triggers: dict[str, Counter[str]] = {} # for testing only 

295 self.param_injection_triggers: dict[str, dict[str, bool]] = {} # for testing only 

296 self.inject_params: dict[str, bool] = {} # for testing only 

297 self.injection_lock: threading.Lock = threading.Lock() # for testing only 

298 self.max_command_line_bytes: int | None = None # for testing only 

299 

300 def shutdown(self) -> None: 

301 """Exits any multiplexed ssh sessions that may be leftover.""" 

302 cache_items: Collection[RemoteConfCacheItem] = self.remote_conf_cache.values() 

303 for i, cache_item in enumerate(cache_items): 

304 cache_item.connection_pools.shutdown(f"{i + 1}/{len(cache_items)}") 

305 

306 def terminate(self) -> None: 

307 """Shuts down gracefully; also terminates descendant processes, if any.""" 

308 with xfinally(self.subprocesses.terminate_process_subtrees): 

309 self.shutdown() 

310 

311 def _retry_template(self) -> RetryTemplate: 

312 p = self.params 

313 return RetryTemplate(policy=p.retry_policy.copy(timing=self.retry_timing), log=p.log) 

314 

315 def run_main(self, args: argparse.Namespace, sys_argv: list[str] | None = None, log: Logger | None = None) -> None: 

316 """Parses CLI arguments, sets up logging, and executes main job loop.""" 

317 assert isinstance(self.error_injection_triggers, dict) 

318 assert isinstance(self.delete_injection_triggers, dict) 

319 assert isinstance(self.inject_params, dict) 

320 logger_name_suffix: str = "" 

321 

322 def _reset_logger() -> None: 

323 if logger_name_suffix and log is not None: # reset Logger unless it's a Logger outside of our control 

324 reset_logger(log) 

325 

326 with xfinally(_reset_logger): # runs _reset_logger() on exit, without masking error raised in body of `with` block 

327 try: 

328 log_params: LogParams = LogParams(args) 

329 logger_name_suffix = "" if log is not None else log_params.logger_name_suffix 

330 log = bzfs_main.loggers.get_logger( 

331 log_params=log_params, args=args, log=log, logger_name_suffix=logger_name_suffix 

332 ) 

333 log.info("%s", f"Log file is: {log_params.log_file}") 

334 except BaseException as e: 

335 simple_log: Logger = get_simple_logger(PROG_NAME, logger_name_suffix=logger_name_suffix) 

336 try: 

337 simple_log.error("Log init: %s", e, exc_info=not isinstance(e, SystemExit)) 

338 finally: 

339 reset_logger(simple_log) 

340 raise 

341 

342 aux_args: list[str] = [] 

343 if getattr(args, "include_snapshot_plan", None): 

344 aux_args += args.include_snapshot_plan 

345 if getattr(args, "delete_dst_snapshots_except_plan", None): 

346 aux_args += args.delete_dst_snapshots_except_plan 

347 if len(aux_args) > 0: 

348 log.info("Auxiliary CLI arguments: %s", " ".join(aux_args)) 

349 args = argument_parser().parse_args(xappend(aux_args, "--", args.root_dataset_pairs), namespace=args) 

350 

351 def log_error_on_exit(error: Any, status_code: Any, exc_info: bool = False) -> None: 

352 log.error("%s%s", f"Exiting {PROG_NAME} with status code {status_code}. Cause: ", error, exc_info=exc_info) 

353 

354 try: 

355 log.info("CLI arguments: %s %s", " ".join(sys_argv or []), f"[uid: {os.getuid()}, euid: {os.geteuid()}]") 

356 if self.is_test_mode: 

357 log.log(LOG_TRACE, "Parsed CLI arguments: %s", args) 

358 self.params = p = Params(args, sys_argv or [], log_params, log, self.inject_params) 

359 self.timeout_duration_nanos = p.timeout_duration_nanos 

360 lock_file: str = p.lock_file_name() 

361 lock_fd = os.open( 

362 lock_file, os.O_WRONLY | os.O_TRUNC | os.O_CREAT | os.O_NOFOLLOW | os.O_CLOEXEC, FILE_PERMISSIONS 

363 ) 

364 with xfinally(lambda: os.close(lock_fd)): 

365 try: 

366 # Acquire an exclusive lock; will raise a BlockingIOError if lock is already held by this process or 

367 # another process. The (advisory) lock is auto-released when the process terminates or the fd is 

368 # closed. 

369 fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # LOCK_NB ... non-blocking 

370 except BlockingIOError: 

371 msg = "Exiting as same previous periodic job is still running without completion yet per " 

372 die(msg + lock_file, STILL_RUNNING_STATUS) 

373 

374 # xfinally: unlink the lock_file while still holding the flock on its fd - it's a correct and safe 

375 # standard POSIX pattern: 

376 # - Performing unlink() before close(fd) avoids a race where a subsequent bzfs process could recreate and 

377 # lock a fresh inode for the same path between our close() and a later unlink(). In that case, a late 

378 # unlink would delete the newer process's lock_file path. 

379 # - At this point, critical work is complete; the remaining steps are shutdown mechanics that have no 

380 # side effect, so this pattern is correct, safe, and simple. 

381 with xfinally(lambda: Path(lock_file).unlink(missing_ok=True)): # don't accumulate stale files 

382 try: 

383 self.run_tasks() # do the real work 

384 except BaseException: 

385 self.terminate() 

386 raise 

387 self.shutdown() 

388 with contextlib.suppress(BrokenPipeError): 

389 sys.stderr.flush() 

390 sys.stdout.flush() 

391 except subprocess.CalledProcessError as e: 

392 log_error_on_exit(e, e.returncode) 

393 raise 

394 except SystemExit as e: 

395 log_error_on_exit(e, e.code) 

396 raise 

397 except (subprocess.TimeoutExpired, UnicodeDecodeError) as e: 

398 log_error_on_exit(e, DIE_STATUS) 

399 raise SystemExit(DIE_STATUS) from e 

400 except re.error as e: 

401 log_error_on_exit(f"{e} within regex {e.pattern!r}", DIE_STATUS) 

402 raise SystemExit(DIE_STATUS) from e 

403 except BaseException as e: 

404 log_error_on_exit(e, DIE_STATUS, exc_info=True) 

405 raise SystemExit(DIE_STATUS) from e 

406 finally: 

407 log.info("%s", f"Log file was: {log_params.log_file}") 

408 log.info("Success. Goodbye!") 

409 with contextlib.suppress(BrokenPipeError): 

410 sys.stderr.flush() 

411 sys.stdout.flush() 

412 

413 def run_tasks(self) -> None: 

414 """Executes replication cycles, repeating until daemon lifetime expires.""" 

415 p, log = self.params, self.params.log 

416 self.all_exceptions = [] 

417 self.all_exceptions_count = 0 

418 self.first_exception = None 

419 self.remote_conf_cache = {} 

420 self.validate_once() 

421 self.replication_start_time_nanos = time.monotonic_ns() 

422 self.progress_reporter = ProgressReporter(log, p.pv_program_opts, self.use_select, self.progress_update_intervals) 

423 with xfinally(lambda: self.progress_reporter.stop()): 

424 daemon_stoptime_nanos: int = time.monotonic_ns() + p.daemon_lifetime_nanos 

425 while True: # loop for daemon mode 

426 self.timeout_nanos = ( 

427 None if p.timeout_duration_nanos is None else time.monotonic_ns() + p.timeout_duration_nanos 

428 ) 

429 self.all_dst_dataset_exists.clear() 

430 self.progress_reporter.reset() 

431 src, dst = p.src, p.dst 

432 for src_root_dataset, dst_root_dataset in p.root_dataset_pairs: 

433 if self.termination_event.is_set(): 

434 self.terminate() 

435 break 

436 src.root_dataset = src.basis_root_dataset = src_root_dataset 

437 dst.root_dataset = dst.basis_root_dataset = dst_root_dataset 

438 p.curr_zfs_send_program_opts = p.zfs_send_program_opts.copy() 

439 if p.daemon_lifetime_nanos > 0: 

440 self.timeout_nanos = ( 

441 None if p.timeout_duration_nanos is None else time.monotonic_ns() + p.timeout_duration_nanos 

442 ) 

443 recurs_sep = " " if p.recursive_flag else "" 

444 task_description = f"{src.basis_root_dataset} {p.recursive_flag}{recurs_sep}--> {dst.basis_root_dataset}" 

445 if len(p.root_dataset_pairs) > 1: 

446 log.info("Starting task: %s", task_description + " ...") 

447 try: 

448 try: 

449 self.maybe_inject_error(cmd=[], error_trigger="retryable_run_tasks") 

450 timeout(self) 

451 self.validate_task() 

452 self.run_task() # do the real work 

453 except RetryableError as retryable_error: 

454 cause: BaseException | None = retryable_error.__cause__ 

455 assert cause is not None 

456 raise cause.with_traceback(cause.__traceback__) # noqa: B904 re-raise of cause without chaining 

457 except (CalledProcessError, TimeoutExpired, SystemExit, UnicodeDecodeError, RetryTerminationError) as e: 

458 if p.skip_on_error == "fail" or ( 

459 isinstance(e, subprocess.TimeoutExpired) and p.daemon_lifetime_nanos == 0 

460 ): 

461 raise 

462 log.error("%s", e) 

463 self.append_exception(e, "task", task_description) 

464 if not self.sleep_until_next_daemon_iteration(daemon_stoptime_nanos): 

465 break 

466 if not p.skip_replication: 

467 self.print_replication_stats(self.replication_start_time_nanos) 

468 error_count = self.all_exceptions_count 

469 if error_count > 0 and p.daemon_lifetime_nanos == 0: 

470 msgs = "\n".join(f"{i + 1}/{error_count}: {e}" for i, e in enumerate(self.all_exceptions)) 

471 log.error("%s", f"Tolerated {error_count} errors. Error Summary: \n{msgs}") 

472 assert self.first_exception is not None 

473 raise self.first_exception 

474 

475 def append_exception(self, e: BaseException, task_name: str, task_description: str) -> None: 

476 """Records and logs an exception that was encountered while running a subtask.""" 

477 self.first_exception = self.first_exception or e 

478 if len(self.all_exceptions) < self.max_exceptions_to_summarize: # cap max memory consumption 

479 self.all_exceptions.append(str(e)) 

480 self.all_exceptions_count += 1 

481 self.params.log.error(f"#{self.all_exceptions_count}: Done with %s: %s", task_name, task_description) 

482 

483 def sleep_until_next_daemon_iteration(self, daemon_stoptime_nanos: int) -> bool: 

484 """Pauses until next scheduled snapshot time or daemon stop; Returns True to continue daemon loop; False to stop.""" 

485 sleep_nanos: int = daemon_stoptime_nanos - time.monotonic_ns() 

486 if sleep_nanos <= 0: 

487 return False 

488 self.progress_reporter.pause() 

489 p, log = self.params, self.params.log 

490 config: CreateSrcSnapshotConfig = p.create_src_snapshots_config 

491 curr_datetime: datetime = config.current_datetime + timedelta(microseconds=1) 

492 next_snapshotting_event_dt: datetime = min( 

493 ( 

494 config.anchors.round_datetime_up_to_duration_multiple(curr_datetime, duration_amount, duration_unit) 

495 for duration_amount, duration_unit in config.suffix_durations.values() 

496 ), 

497 default=curr_datetime + timedelta(days=10 * 365), # infinity 

498 ) 

499 offset: timedelta = next_snapshotting_event_dt - datetime.now(config.tz) 

500 offset_nanos: int = (offset.days * 86400 + offset.seconds) * 1_000_000_000 + offset.microseconds * 1_000 

501 sleep_nanos = min(sleep_nanos, max(0, offset_nanos)) 

502 log.info("Daemon sleeping for: %s%s", human_readable_duration(sleep_nanos), f" ... [Log {p.log_params.log_file}]") 

503 self.termination_event.wait(sleep_nanos / 1_000_000_000) # allow early wakeup on async termination 

504 config.current_datetime = datetime.now(config.tz) 

505 return time.monotonic_ns() < daemon_stoptime_nanos and not self.termination_event.is_set() 

506 

507 def print_replication_stats(self, start_time_nanos: int) -> None: 

508 """Logs overall replication statistics after a job cycle completes.""" 

509 p, log = self.params, self.params.log 

510 elapsed_nanos: int = time.monotonic_ns() - start_time_nanos 

511 msg = p.dry(f"zfs sent {self.num_snapshots_replicated} snapshots in {human_readable_duration(elapsed_nanos)}.") 

512 if p.is_program_available("pv", "local"): 

513 sent_bytes: int = count_num_bytes_transferred_by_zfs_send(p.log_params.pv_log_file) 

514 sent_bytes_per_sec: int = round(1_000_000_000 * sent_bytes / (elapsed_nanos or 1)) 

515 msg += f" zfs sent {human_readable_bytes(sent_bytes)} [{human_readable_bytes(sent_bytes_per_sec)}/s]." 

516 log.info("%s", msg.ljust(p.log_params.terminal_columns - len("2024-01-01 23:58:45 [I] "))) 

517 

518 def validate_once(self) -> None: 

519 """Validates CLI parameters and compiles regex lists one time only, which will later be reused many times.""" 

520 p = self.params 

521 p.zfs_recv_ox_names = self.recv_option_property_names(p.zfs_recv_program_opts) 

522 for snapshot_filter in p.snapshot_filters: 

523 for _filter in snapshot_filter: 

524 if _filter.name == SNAPSHOT_REGEX_FILTER_NAME: 

525 exclude_snapshot_regexes_strings, include_snapshot_regexes_strings = cast( 

526 tuple[list[str], list[str]], _filter.options 

527 ) 

528 exclude_snapshot_regexes = compile_regexes(exclude_snapshot_regexes_strings) 

529 include_snapshot_regexes = compile_regexes(include_snapshot_regexes_strings or [".*"]) 

530 _filter.options = (exclude_snapshot_regexes, include_snapshot_regexes) 

531 

532 exclude_regexes: list[str] = [EXCLUDE_DATASET_REGEXES_DEFAULT] 

533 if len(p.args.exclude_dataset_regex) > 0: # some patterns don't exclude anything 

534 exclude_regexes = [regex for regex in p.args.exclude_dataset_regex if regex != "" and regex != "!.*"] 

535 include_regexes: list[str] = p.args.include_dataset_regex 

536 

537 # relative datasets need not be compiled more than once as they don't change between tasks 

538 def separate_abs_vs_rel_datasets(datasets: list[str]) -> tuple[list[str], list[str]]: 

539 abs_datasets: list[str] = [] 

540 rel_datasets: list[str] = [] 

541 for dataset in datasets: 

542 (abs_datasets if dataset.startswith("/") else rel_datasets).append(dataset) 

543 return abs_datasets, rel_datasets 

544 

545 p.abs_exclude_datasets, rel_exclude_datasets = separate_abs_vs_rel_datasets(p.args.exclude_dataset) 

546 p.abs_include_datasets, rel_include_datasets = separate_abs_vs_rel_datasets(p.args.include_dataset) 

547 suffix = DESCENDANTS_RE_SUFFIX 

548 p.tmp_exclude_dataset_regexes, p.tmp_include_dataset_regexes = ( 

549 compile_regexes(exclude_regexes + dataset_regexes(p.src, p.dst, rel_exclude_datasets), suffix=suffix), 

550 compile_regexes(include_regexes + dataset_regexes(p.src, p.dst, rel_include_datasets), suffix=suffix), 

551 ) 

552 

553 if p.pv_program != DISABLE_PRG: 

554 pv_program_opts_set = set(p.pv_program_opts) 

555 if pv_program_opts_set.isdisjoint({"--bytes", "-b", "--bits", "-8"}): 

556 die("--pv-program-opts must contain one of --bytes or --bits for progress metrics to function.") 

557 if not p.log_params.quiet: 

558 for opts in [["--eta", "-e"], ["--fineta", "-I"], ["--average-rate", "-a"]]: 

559 if pv_program_opts_set.isdisjoint(opts): 

560 die(f"--pv-program-opts must contain one of {', '.join(opts)} for progress report line to function.") 

561 

562 src, dst = p.src, p.dst 

563 for remote in [src, dst]: 

564 r, loc = remote, remote.location 

565 validate_user_name(r.basis_ssh_user, f"--ssh-{loc}-user") 

566 validate_host_name(r.basis_ssh_host, f"--ssh-{loc}-host") 

567 validate_port(r.ssh_port, f"--ssh-{loc}-port ") 

568 

569 def validate_task(self) -> None: 

570 """Validates a single replication task before execution.""" 

571 p, log = self.params, self.params.log 

572 src, dst = p.src, p.dst 

573 for remote in [src, dst]: 

574 r = remote 

575 r.ssh_user, r.ssh_host, r.ssh_user_host, r.pool, r.root_dataset = parse_dataset_locator( 

576 r.basis_root_dataset, user=r.basis_ssh_user, host=r.basis_ssh_host, port=r.ssh_port 

577 ) 

578 r.sudo, r.use_zfs_delegation = self.sudo_cmd(r.ssh_user_host, r.ssh_user) 

579 local_addrs = ("",) if self.is_test_mode else ("", "127.0.0.1", "::1") # ::1 is IPv6 version of loopback address 

580 remote.is_nonlocal = r.ssh_host not in local_addrs 

581 self.dst_dataset_exists = SynchronizedDict(self.all_dst_dataset_exists[dst.ssh_user_host]) 

582 

583 if src.ssh_host == dst.ssh_host: 

584 msg = f"src: {src.basis_root_dataset}, dst: {dst.basis_root_dataset}" 

585 if src.root_dataset == dst.root_dataset: 

586 die(f"Source and destination dataset must not be the same! {msg}") 

587 if p.recursive and ( 

588 is_descendant(src.root_dataset, of_root_dataset=dst.root_dataset) 

589 or is_descendant(dst.root_dataset, of_root_dataset=src.root_dataset) 

590 ): 

591 die(f"Source and destination dataset trees must not overlap! {msg}") 

592 

593 suffx: str = DESCENDANTS_RE_SUFFIX # also match descendants of a matching dataset 

594 p.exclude_dataset_regexes, p.include_dataset_regexes = ( 

595 p.tmp_exclude_dataset_regexes + compile_regexes(dataset_regexes(src, dst, p.abs_exclude_datasets), suffix=suffx), 

596 p.tmp_include_dataset_regexes + compile_regexes(dataset_regexes(src, dst, p.abs_include_datasets), suffix=suffx), 

597 ) 

598 if len(p.include_dataset_regexes) == 0: 

599 p.include_dataset_regexes = [(re.compile(r".*"), False)] 

600 

601 detect_available_programs(self) 

602 

603 if is_zpool_feature_enabled_or_active(p, dst, "feature@large_blocks"): 

604 append_if_absent(p.curr_zfs_send_program_opts, "--large-block") 

605 

606 self.max_workers = {} 

607 self.max_datasets_per_minibatch_on_list_snaps = {} 

608 for r in [src, dst]: 

609 cpus: int = int(p.available_programs[r.location].get("getconf_cpu_count", 8)) 

610 threads, is_percent = p.threads 

611 cpus = max(1, round(cpus * threads / 100.0) if is_percent else round(threads)) 

612 self.max_workers[r.location] = cpus 

613 bs: int = max(1, p.max_datasets_per_batch_on_list_snaps) # 1024 by default 

614 max_datasets_per_minibatch: int = p.max_datasets_per_minibatch_on_list_snaps 

615 if max_datasets_per_minibatch <= 0: 

616 max_datasets_per_minibatch = max(1, bs // cpus) 

617 max_datasets_per_minibatch = min(bs, max_datasets_per_minibatch) 

618 self.max_datasets_per_minibatch_on_list_snaps[r.location] = max_datasets_per_minibatch 

619 log.log( 

620 LOG_TRACE, 

621 "%s", 

622 f"max_datasets_per_batch_on_list_snaps: {p.max_datasets_per_batch_on_list_snaps}, " 

623 f"max_datasets_per_minibatch_on_list_snaps: {max_datasets_per_minibatch}, " 

624 f"max_workers: {self.max_workers[r.location]}, " 

625 f"location: {r.location}", 

626 ) 

627 if self.is_test_mode: 

628 log.log(LOG_TRACE, "Validated Param values: %s", pretty_print_formatter(self.params)) 

629 

630 def sudo_cmd(self, ssh_user_host: str, ssh_user: str) -> tuple[str, bool]: 

631 """Returns sudo command prefix and whether root privileges are required.""" 

632 p: Params = self.params 

633 assert isinstance(ssh_user_host, str) 

634 assert isinstance(ssh_user, str) 

635 assert isinstance(p.sudo_program, str) 

636 assert isinstance(p.enable_privilege_elevation, bool) 

637 

638 is_root: bool = True 

639 if ssh_user_host != "": 

640 if ssh_user == "": 

641 if os.getuid() != 0: 

642 is_root = False 

643 elif ssh_user != "root": 

644 is_root = False 

645 elif os.getuid() != 0: 

646 is_root = False 

647 

648 if is_root: 

649 sudo = "" # using sudo in an attempt to make ZFS operations work even if we are not root user? 

650 use_zfs_delegation = False # or instead using 'zfs allow' delegation? 

651 return sudo, use_zfs_delegation 

652 elif p.enable_privilege_elevation: 

653 if p.sudo_program == DISABLE_PRG: 

654 die(f"sudo CLI is not available on host: {ssh_user_host or 'localhost'}") 

655 # The '-n' option makes 'sudo' safer and more fail-fast. It avoids having sudo prompt the user for input of any 

656 # kind. If a password is required for the sudo command to run, sudo will display an error message and exit. 

657 return p.sudo_program + " -n", False 

658 else: 

659 return "", True 

660 

661 def run_task(self) -> None: 

662 """Replicates all snapshots for the current root dataset pair.""" 

663 

664 def filter_src_datasets() -> list[str]: # apply --{include|exclude}-dataset policy 

665 return filter_datasets(self, src, basis_src_datasets) if src_datasets is None else src_datasets 

666 

667 p, log = self.params, self.params.log 

668 src, dst = p.src, p.dst 

669 max_workers: int = min(self.max_workers[src.location], self.max_workers[dst.location]) 

670 recursive_sep: str = " " if p.recursive_flag else "" 

671 task_description: str = f"{src.basis_root_dataset} {p.recursive_flag}{recursive_sep}--> {dst.basis_root_dataset} ..." 

672 failed: bool = False 

673 src_datasets: list[str] | None = None 

674 basis_src_datasets: list[str] = [] 

675 self.src_properties = {} 

676 self.dst_properties = {} 

677 if not is_dummy(src): # find src dataset or all datasets in src dataset tree (with --recursive) 

678 basis_src_datasets = self.list_src_datasets_task() 

679 

680 if not p.create_src_snapshots_config.skip_create_src_snapshots: 

681 log.info(p.dry("--create-src-snapshots: %s"), f"{src.basis_root_dataset} {p.recursive_flag}{recursive_sep}...") 

682 src_datasets = filter_src_datasets() # apply include/exclude policy 

683 self.create_src_snapshots_task(basis_src_datasets, src_datasets) 

684 

685 # Optionally, replicate src.root_dataset (optionally including its descendants) to dst.root_dataset 

686 if not p.skip_replication: 

687 if len(basis_src_datasets) == 0: 

688 die(f"Replication: Source dataset does not exist: {src.basis_root_dataset}") 

689 if is_dummy(dst): 

690 die("Replication: Destination may be a dummy dataset only if exclusively creating snapshots on the source!") 

691 src_datasets = filter_src_datasets() # apply include/exclude policy 

692 failed = self.replicate_datasets(src_datasets, task_description, max_workers) 

693 

694 if failed or not ( 

695 p.delete_dst_datasets 

696 or p.delete_dst_snapshots 

697 or p.delete_empty_dst_datasets 

698 or p.compare_snapshot_lists 

699 or p.monitor_snapshots_config.enable_monitor_snapshots 

700 ): 

701 return 

702 log.info("Listing dst datasets: %s", task_description) 

703 if is_dummy(dst): 

704 die("Destination may be a dummy dataset only if exclusively creating snapshots on the source!") 

705 basis_dst_datasets: list[str] = self.list_dst_datasets_task() 

706 dst_datasets: list[str] = filter_datasets(self, dst, basis_dst_datasets) # apply include/exclude policy 

707 

708 if p.delete_dst_datasets and not failed: 

709 log.info(p.dry("--delete-dst-datasets: %s"), task_description) 

710 basis_dst_datasets, dst_datasets = self.delete_dst_datasets_task( 

711 basis_src_datasets, basis_dst_datasets, dst_datasets 

712 ) 

713 

714 if p.delete_dst_snapshots and not failed: 

715 log.info(p.dry("--delete-dst-snapshots: %s"), task_description + f" [{len(dst_datasets)} datasets]") 

716 failed = self.delete_destination_snapshots_task(basis_src_datasets, dst_datasets, max_workers, task_description) 

717 

718 if p.delete_empty_dst_datasets and p.recursive and not failed: 

719 log.info(p.dry("--delete-empty-dst-datasets: %s"), task_description) 

720 basis_dst_datasets, dst_datasets = self.delete_empty_dst_datasets_task(basis_dst_datasets, dst_datasets) 

721 

722 if p.compare_snapshot_lists and not failed: 

723 log.info("--compare-snapshot-lists: %s", task_description) 

724 if len(basis_src_datasets) == 0 and not is_dummy(src): 

725 die(f"Source dataset does not exist: {src.basis_root_dataset}") 

726 src_datasets = filter_src_datasets() # apply include/exclude policy 

727 run_compare_snapshot_lists(self, src_datasets, dst_datasets) 

728 

729 if p.monitor_snapshots_config.enable_monitor_snapshots and not failed: 

730 log.info("--monitor-snapshots: %s", task_description) 

731 src_datasets = filter_src_datasets() # apply include/exclude policy 

732 self.monitor_snapshots_task(src_datasets, dst_datasets, task_description) 

733 

734 def list_src_datasets_task(self) -> list[str]: 

735 """Lists datasets on the source host.""" 

736 p = self.params 

737 src = p.src 

738 basis_src_datasets: list[str] = [] 

739 is_caching: bool = is_caching_snapshots(p, src) 

740 props: str = "volblocksize,recordsize,name" 

741 props = "snapshots_changed," + props if is_caching else props 

742 cmd: list[str] = p.split_args( 

743 f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o {props} {p.recursive_flag}", src.root_dataset 

744 ) 

745 for line in (self.try_ssh_command_with_retries(src, LOG_DEBUG, cmd=cmd) or "").splitlines(): 

746 cols: list[str] = line.split("\t") 

747 snapshots_changed, volblocksize, recordsize, src_dataset = cols if is_caching else ["-"] + cols 

748 self.src_properties[src_dataset] = DatasetProperties( 

749 recordsize=int(recordsize) if recordsize != "-" else -int(volblocksize), 

750 snapshots_changed=int(snapshots_changed) if snapshots_changed and snapshots_changed != "-" else 0, 

751 ) 

752 basis_src_datasets.append(src_dataset) 

753 assert (not self.is_test_mode) or basis_src_datasets == sorted(basis_src_datasets), "List is not sorted" 

754 return basis_src_datasets 

755 

756 def list_dst_datasets_task(self) -> list[str]: 

757 """Lists datasets on the destination host.""" 

758 p, log = self.params, self.params.log 

759 dst = p.dst 

760 is_caching: bool = is_caching_snapshots(p, dst) and p.monitor_snapshots_config.enable_monitor_snapshots 

761 props: str = "name" 

762 props = "snapshots_changed," + props if is_caching else props 

763 cmd: list[str] = p.split_args( 

764 f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o {props} {p.recursive_flag}", dst.root_dataset 

765 ) 

766 basis_dst_datasets: list[str] = [] 

767 basis_dst_datasets_str: str | None = self.try_ssh_command_with_retries(dst, LOG_TRACE, cmd=cmd) 

768 if basis_dst_datasets_str is None: 

769 log.warning("Destination dataset does not exist: %s", dst.root_dataset) 

770 else: 

771 for line in basis_dst_datasets_str.splitlines(): 

772 cols: list[str] = line.split("\t") 

773 snapshots_changed, dst_dataset = cols if is_caching else ["-"] + cols 

774 self.dst_properties[dst_dataset] = DatasetProperties( 

775 recordsize=0, 

776 snapshots_changed=int(snapshots_changed) if snapshots_changed and snapshots_changed != "-" else 0, 

777 ) 

778 basis_dst_datasets.append(dst_dataset) 

779 assert (not self.is_test_mode) or basis_dst_datasets == sorted(basis_dst_datasets), "List is not sorted" 

780 return basis_dst_datasets 

781 

782 def create_src_snapshots_task(self, basis_src_datasets: list[str], src_datasets: list[str]) -> None: 

783 """Atomically creates a new snapshot of the src datasets selected by --{include|exclude}-dataset* policy; implements 

784 --create-src-snapshots. 

785 

786 The implementation attempts to fit as many datasets as possible into a single (atomic) 'zfs snapshot' command line, 

787 using lexicographical sort order, and using 'zfs snapshot -r' to the extent that this is compatible with the 

788 --{include|exclude}-dataset* pruning policy. The snapshots of all datasets that fit within the same single 'zfs 

789 snapshot' CLI invocation will be taken within the same ZFS transaction group, and correspondingly have identical 

790 'createtxg' ZFS property (but not necessarily identical 'creation' ZFS time property as ZFS actually provides no such 

791 guarantee), and thus be consistent. Dataset names that can't fit into a single command line are spread over multiple 

792 command line invocations, respecting the limits that the operating system places on the maximum length of a single 

793 command line, per `getconf ARG_MAX`. 

794 

795 Time complexity is O((N log N) + (N * M log M)) where N is the number of datasets and M is the number of snapshots 

796 per dataset. Space complexity is O(max(N, M)). 

797 """ 

798 p, log = self.params, self.params.log 

799 src = p.src 

800 if len(basis_src_datasets) == 0: 

801 die(f"Source dataset does not exist: {src.basis_root_dataset}") 

802 datasets_to_snapshot: dict[SnapshotLabel, list[str]] = self.find_datasets_to_snapshot(src_datasets) 

803 datasets_to_snapshot = {label: datasets for label, datasets in datasets_to_snapshot.items() if len(datasets) > 0} 

804 basis_datasets_to_snapshot: dict[SnapshotLabel, list[str]] = datasets_to_snapshot.copy() # shallow copy 

805 commands: dict[SnapshotLabel, list[str]] = {} 

806 for label, datasets in datasets_to_snapshot.items(): 

807 cmd: list[str] = p.split_args(f"{src.sudo} {p.zfs_program} snapshot") 

808 if p.recursive: 

809 # Run 'zfs snapshot -r' on the roots of subtrees if possible, else fallback to non-recursive CLI flavor 

810 root_datasets = self.root_datasets_if_recursive_zfs_snapshot_is_possible(datasets, basis_src_datasets) 

811 if root_datasets is not None: 

812 cmd.append("-r") # recursive; takes a snapshot of all datasets in the subtree(s) 

813 datasets_to_snapshot[label] = root_datasets 

814 commands[label] = cmd 

815 creation_msg = f"Creating {sum(len(datasets) for datasets in basis_datasets_to_snapshot.values())} snapshots" 

816 log.info(p.dry("--create-src-snapshots: %s"), f"{creation_msg} within {len(src_datasets)} datasets ...") 

817 # create snapshots in large (parallel) batches, without using a command line that's too big for the OS to handle 

818 run_ssh_cmd_parallel( 

819 self, 

820 src, 

821 ((commands[lbl], (f"{ds}@{lbl}" for ds in datasets)) for lbl, datasets in datasets_to_snapshot.items()), 

822 fn=lambda cmd, batch: self.run_ssh_command_with_retries( 

823 src, is_dry=p.dry_run, print_stdout=True, cmd=cmd + batch, retry_on_generic_ssh_error=False 

824 ), # retry_on_generic_ssh_error=False means only retry on SSH connect errors b/c `zfs snapshot` isn't idempotent 

825 max_batch_items=2**29, 

826 ) 

827 if is_caching_snapshots(p, src): 

828 # perf: copy lastmodified time of source dataset into local cache to reduce future 'zfs list -t snapshot' calls 

829 self.cache.update_last_modified_cache(basis_datasets_to_snapshot) 

830 

831 def delete_destination_snapshots_task( 

832 self, basis_src_datasets: list[str], dst_datasets: list[str], max_workers: int, task_description: str 

833 ) -> bool: 

834 """Deletes existing destination snapshots that do not exist within the source dataset if they are included by the 

835 --{include|exclude}-snapshot-* policy, and the destination dataset is included via --{include|exclude}-dataset* 

836 policy; implements --delete-dst-snapshots. Does not attempt to delete snapshots that carry a `zfs hold`.""" 

837 p, log = self.params, self.params.log 

838 src, dst = p.src, p.dst 

839 kind: str = "bookmark" if p.delete_dst_bookmarks else "snapshot" 

840 filter_needs_creation_time: bool = has_timerange_filter(p.snapshot_filters) 

841 props: str = "guid,name,userrefs" 

842 props = self.creation_prefix + "creation," + props if filter_needs_creation_time else props 

843 basis_src_datasets_set: set[str] = set(basis_src_datasets) 

844 num_snapshots_found, num_snapshots_deleted = 0, 0 

845 

846 def delete_destination_snapshots(dst_dataset: str, tid: str, retry: Retry) -> bool: # thread-safe 

847 src_dataset: str = replace_prefix(dst_dataset, old_prefix=dst.root_dataset, new_prefix=src.root_dataset) 

848 if src_dataset in basis_src_datasets_set and (are_bookmarks_enabled(p, src) or not p.delete_dst_bookmarks): 

849 src_kind: str = kind 

850 if not p.delete_dst_snapshots_no_crosscheck: 

851 src_kind = "snapshot,bookmark" if are_bookmarks_enabled(p, src) else "snapshot" 

852 src_cmd = p.split_args(f"{p.zfs_program} list -t {src_kind} -d 1 -s name -Hp -o guid", src_dataset) 

853 else: 

854 src_cmd = None 

855 dst_cmd = p.split_args(f"{p.zfs_program} list -t {kind} -d 1 -s createtxg -Hp -o {props}", dst_dataset) 

856 self.maybe_inject_delete(dst, dataset=dst_dataset, delete_trigger="zfs_list_delete_dst_snapshots") 

857 src_snaps_with_guids, dst_snaps_with_guids_str = run_in_parallel( # list src+dst snapshots in parallel 

858 lambda: set(self.run_ssh_command(src, LOG_TRACE, cmd=src_cmd).splitlines() if src_cmd else []), 

859 lambda: self.try_ssh_command(dst, LOG_TRACE, cmd=dst_cmd), 

860 ) 

861 if dst_snaps_with_guids_str is None: 

862 log.warning("Third party deleted destination: %s", dst_dataset) 

863 return False 

864 held_dst_snapshots: set[str] = set() 

865 dst_snaps_with_guids: list[str] = [] 

866 no_userrefs: tuple[str, ...] = ("", "-", "0") # ZFS snapshot property userrefs > 0 indicates a zfs hold 

867 for line in dst_snaps_with_guids_str.splitlines(): 

868 dst_snaps_with_guids.append(line[: line.rindex("\t")]) # strip off trailing userrefs column 

869 _, name, userrefs = line.rsplit("\t", 2) 

870 if userrefs not in no_userrefs: 

871 tag: str = name[name.index("@") + 1 :] 

872 held_dst_snapshots.add(tag) # don't attempt to delete snapshots that carry a `zfs hold` 

873 num_dst_snaps_with_guids = len(dst_snaps_with_guids) 

874 basis_dst_snaps_with_guids: list[str] = dst_snaps_with_guids.copy() 

875 if p.delete_dst_bookmarks: 

876 replace_in_lines(dst_snaps_with_guids, old="#", new="@", count=1) # treat bookmarks as snapshots 

877 # The check against the source dataset happens *after* filtering the dst snapshots with filter_snapshots(). 

878 # `p.delete_dst_snapshots_except` means the user wants to specify snapshots to *retain* aka *keep* 

879 all_except: bool = p.delete_dst_snapshots_except 

880 if p.delete_dst_snapshots_except and not is_dummy(src): 

881 # However, as here we are in "except" mode AND the source is NOT a dummy, we first filter to get what 

882 # the policy says to *keep* (so all_except=False for the filter_snapshots() call), then from that "keep" 

883 # list, we later further refine by checking what's on the source dataset. 

884 all_except = False 

885 dst_snaps_with_guids = filter_snapshots(self, dst_snaps_with_guids, all_except=all_except) 

886 if p.delete_dst_bookmarks: 

887 replace_in_lines(dst_snaps_with_guids, old="@", new="#", count=1) # restore pre-filtering bookmark state 

888 if filter_needs_creation_time: 

889 dst_snaps_with_guids = cut(field=2, lines=dst_snaps_with_guids) 

890 basis_dst_snaps_with_guids = cut(field=2, lines=basis_dst_snaps_with_guids) 

891 if p.delete_dst_snapshots_except and not is_dummy(src): # Non-dummy Source + "Except" (Keep) Mode 

892 # Retain dst snapshots that match snapshot filter policy AND are on src dataset, aka 

893 # Delete dst snapshots except snapshots that match snapshot filter policy AND are on src dataset. 

894 # Concretely, `dst_snaps_with_guids` contains GUIDs of DST snapshots that the filter policy says to KEEP. 

895 # We only actually keep them if they are ALSO on the SRC. 

896 # So, snapshots to DELETE (`dst_tags_to_delete`) are ALL snapshots on DST (`basis_dst_snaps_with_guids`) 

897 # EXCEPT those whose GUIDs are in `dst_snaps_with_guids` AND ALSO in `src_snaps_with_guids`. 

898 except_dst_guids: set[str] = set(cut(field=1, lines=dst_snaps_with_guids)).intersection(src_snaps_with_guids) 

899 dst_tags_to_delete: list[str] = filter_lines_except(basis_dst_snaps_with_guids, except_dst_guids) 

900 else: # Standard Delete Mode OR Dummy Source + "Except" (Keep) Mode 

901 # In standard delete mode: 

902 # `dst_snaps_with_guids` contains GUIDs of policy-selected snapshots on DST. 

903 # We delete those that are NOT on SRC. 

904 # `dst_tags_to_delete` = `dst_snaps_with_guids` - `src_snaps_with_guids`. 

905 # In dummy source + "except" (keep) mode: 

906 # `all_except` was True. 

907 # `dst_snaps_with_guids` contains snaps NOT matching the "keep" policy -- these are the ones to delete. 

908 # `src_snaps_with_guids` is empty. 

909 # `dst_tags_to_delete` = `dst_snaps_with_guids` - {} = `dst_snaps_with_guids`. 

910 dst_guids_to_delete = set(cut(field=1, lines=dst_snaps_with_guids)).difference(src_snaps_with_guids) 

911 dst_tags_to_delete = filter_lines(dst_snaps_with_guids, dst_guids_to_delete) 

912 separator: str = "#" if p.delete_dst_bookmarks else "@" 

913 dst_tags_to_delete = cut(field=2, separator=separator, lines=dst_tags_to_delete) 

914 if p.delete_dst_bookmarks: 

915 delete_bookmarks(self, dst, dst_dataset, snapshot_tags=dst_tags_to_delete) 

916 else: 

917 dst_tags_to_delete = [tag for tag in dst_tags_to_delete if tag not in held_dst_snapshots] 

918 delete_snapshots(self, dst, dst_dataset, snapshot_tags=dst_tags_to_delete) 

919 with self.stats_lock: 

920 nonlocal num_snapshots_found 

921 num_snapshots_found += num_dst_snaps_with_guids 

922 nonlocal num_snapshots_deleted 

923 num_snapshots_deleted += len(dst_tags_to_delete) 

924 if len(dst_tags_to_delete) > 0 and not p.delete_dst_bookmarks: 

925 self.dst_properties[dst_dataset].snapshots_changed = 0 # invalidate cache 

926 return True 

927 

928 # Run delete_destination_snapshots(dataset) for each dataset, while handling errors, retries + parallel exec 

929 failed: bool = False 

930 if are_bookmarks_enabled(p, dst) or not p.delete_dst_bookmarks: 

931 start_time_nanos = time.monotonic_ns() 

932 failed = process_datasets_in_parallel_and_fault_tolerant( 

933 log=log, 

934 datasets=dst_datasets, 

935 process_dataset=delete_destination_snapshots, # lambda 

936 skip_tree_on_error=lambda dataset: False, 

937 skip_on_error=p.skip_on_error, 

938 max_workers=max_workers, 

939 timing=self.task_timing, 

940 termination_handler=self.terminate, 

941 enable_barriers=False, 

942 task_name="--delete-dst-snapshots", 

943 append_exception=self.append_exception, 

944 retry_template=self._retry_template(), 

945 dry_run=p.dry_run, 

946 is_test_mode=self.is_test_mode, 

947 ) 

948 elapsed_nanos = time.monotonic_ns() - start_time_nanos 

949 log.info( 

950 p.dry("--delete-dst-snapshots: %s"), 

951 task_description + f" [Deleted {num_snapshots_deleted} out of {num_snapshots_found} {kind}s " 

952 f"within {len(dst_datasets)} datasets; took {human_readable_duration(elapsed_nanos)}]", 

953 ) 

954 return failed 

955 

956 def delete_dst_datasets_task( 

957 self, basis_src_datasets: list[str], basis_dst_datasets: list[str], sorted_dst_datasets: list[str] 

958 ) -> tuple[list[str], list[str]]: 

959 """Deletes existing destination datasets that do not exist within the source dataset if they are included via 

960 --{include|exclude}-dataset* policy; implements --delete-dst-datasets. 

961 

962 Do not recurse without --recursive. With --recursive, never delete non-selected dataset subtrees or their ancestors. 

963 """ 

964 p = self.params 

965 src, dst = p.src, p.dst 

966 children: dict[str, set[str]] = defaultdict(set) 

967 for dst_dataset in basis_dst_datasets: # Compute the direct children of each NON-FILTERED dataset 

968 parent: str = os.path.dirname(dst_dataset) 

969 children[parent].add(dst_dataset) 

970 to_delete: set[str] = set() 

971 for dst_dataset in reversed(sorted_dst_datasets): # Reverse order facilitates efficient O(N) time algorithm 

972 if children[dst_dataset].issubset(to_delete): 

973 to_delete.add(dst_dataset) # all children are deletable, thus the dataset itself is deletable too 

974 to_delete = to_delete.difference( 

975 replace_prefix(src_dataset, src.root_dataset, dst.root_dataset) for src_dataset in basis_src_datasets 

976 ) 

977 delete_datasets(self, dst, to_delete) 

978 sorted_dst_datasets = sorted(set(sorted_dst_datasets).difference(to_delete)) 

979 basis_dst_datasets = sorted(set(basis_dst_datasets).difference(to_delete)) 

980 return basis_dst_datasets, sorted_dst_datasets 

981 

982 def delete_empty_dst_datasets_task( 

983 self, basis_dst_datasets: list[str], sorted_dst_datasets: list[str] 

984 ) -> tuple[list[str], list[str]]: 

985 """Deletes any existing destination dataset that has no snapshot and no bookmark if all descendants of that dataset 

986 do not have a snapshot or bookmark either; implements --delete-empty-dst-datasets. 

987 

988 To do so, we walk the dataset list (conceptually, a tree) depth-first (i.e. sorted descending). If a dst dataset has 

989 zero snapshots and zero bookmarks and all its children are already marked as orphans, then it is itself an orphan, 

990 and we mark it as such. Walking in a reverse sorted way means that we efficiently check for zero snapshots/bookmarks 

991 not just over the direct children but the entire tree. Finally, delete all orphan datasets in an efficient batched 

992 way. 

993 """ 

994 p = self.params 

995 dst = p.dst 

996 

997 # Compute the direct children of each NON-FILTERED dataset. Thus, no non-selected dataset and no ancestor of a 

998 # non-selected dataset will ever be added to the "orphan" set. In other words, this treats non-selected dataset 

999 # subtrees as if they all had snapshots, so non-selected dataset subtrees and their ancestors are guaranteed 

1000 # to not get deleted. 

1001 children: dict[str, set[str]] = defaultdict(set) 

1002 for dst_dataset in basis_dst_datasets: 

1003 parent: str = os.path.dirname(dst_dataset) 

1004 children[parent].add(dst_dataset) 

1005 

1006 def compute_orphans(datasets_having_snapshots: set[str]) -> set[str]: 

1007 """Returns destination datasets having zero snapshots whose children are all orphans.""" 

1008 orphans: set[str] = set() 

1009 for dst_dataset in reversed(sorted_dst_datasets): # Reverse order facilitates efficient O(N) time algorithm 

1010 if (dst_dataset not in datasets_having_snapshots) and children[dst_dataset].issubset(orphans): 

1011 orphans.add(dst_dataset) 

1012 return orphans 

1013 

1014 # Compute candidate orphan datasets, which reduces the list of datasets for which we list snapshots via 

1015 # 'zfs list -t snapshot ...' from dst_datasets to a subset of dst_datasets, which in turn reduces I/O and improves 

1016 # perf. Essentially, this eliminates the I/O to list snapshots for ancestors of excluded datasets. 

1017 candidate_orphans: set[str] = compute_orphans(set()) 

1018 

1019 # Compute destination datasets having more than zero snapshots 

1020 dst_datasets_having_snapshots: set[str] = set() 

1021 with_bookmarks: bool = p.delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots and are_bookmarks_enabled(p, dst) 

1022 btype: str = "bookmark,snapshot" if with_bookmarks else "snapshot" 

1023 cmd: list[str] = p.split_args(f"{p.zfs_program} list -t {btype} -d 1 -S name -Hp -o name") 

1024 for snapshots in zfs_list_snapshots_in_parallel(self, dst, cmd, sorted(candidate_orphans), ordered=False): 

1025 if with_bookmarks: 

1026 replace_in_lines(snapshots, old="#", new="@", count=1) # treat bookmarks as snapshots 

1027 dst_datasets_having_snapshots.update(snap[: snap.index("@")] for snap in snapshots) # union 

1028 

1029 orphans: set[str] = compute_orphans(dst_datasets_having_snapshots) # compute the real orphans 

1030 delete_datasets(self, dst, orphans) # finally, delete the orphan datasets in an efficient way 

1031 sorted_dst_datasets = sorted(set(sorted_dst_datasets).difference(orphans)) 

1032 basis_dst_datasets = sorted(set(basis_dst_datasets).difference(orphans)) 

1033 return basis_dst_datasets, sorted_dst_datasets 

1034 

1035 def monitor_snapshots_task( 

1036 self, sorted_src_datasets: list[str], sorted_dst_datasets: list[str], task_description: str 

1037 ) -> None: 

1038 """Monitors src and dst snapshots; implements --monitor-snapshots.""" 

1039 p, log = self.params, self.params.log 

1040 src, dst = p.src, p.dst 

1041 num_cache_hits: int = self.num_cache_hits 

1042 num_cache_misses: int = self.num_cache_misses 

1043 start_time_nanos: int = time.monotonic_ns() 

1044 dst_alert, src_alert = run_in_parallel( 

1045 lambda: self.monitor_snapshots(dst, sorted_dst_datasets), 

1046 lambda: self.monitor_snapshots(src, sorted_src_datasets), 

1047 ) 

1048 exit_code, _exit_kind, exit_msg = min(dst_alert, src_alert) 

1049 if exit_code != 0: 

1050 die(exit_msg, -exit_code) 

1051 elapsed: str = human_readable_duration(time.monotonic_ns() - start_time_nanos) 

1052 num_cache_hits = self.num_cache_hits - num_cache_hits 

1053 num_cache_misses = self.num_cache_misses - num_cache_misses 

1054 if num_cache_hits > 0 or num_cache_misses > 0: 

1055 msg = self._cache_hits_msg(hits=num_cache_hits, misses=num_cache_misses) 

1056 else: 

1057 msg = "" 

1058 log.info( 

1059 "--monitor-snapshots done: %s", 

1060 f"{task_description} [{len(sorted_src_datasets) + len(sorted_dst_datasets)} datasets; took {elapsed}{msg}]", 

1061 ) 

1062 

1063 def monitor_snapshots(self, remote: Remote, sorted_datasets: list[str]) -> tuple[int, str, str]: 

1064 """Checks snapshot freshness and warns or errors out when limits are exceeded. 

1065 

1066 Alerts the user if the ZFS 'creation' time property of the latest or oldest snapshot for any specified snapshot name 

1067 pattern within the selected datasets is too old wrt. the specified age limit. The purpose is to check if snapshots 

1068 are successfully taken on schedule, successfully replicated on schedule, and successfully pruned on schedule. Process 

1069 exit code is 0, 1, 2 on OK, WARNING, CRITICAL, respectively. 

1070 

1071 Time complexity is O((N log N) + (N * M log M)) where N is the number of datasets and M is the number of snapshots 

1072 per dataset. Space complexity is O(max(N, M)). 

1073 """ 

1074 p, log = self.params, self.params.log 

1075 alerts: list[MonitorSnapshotAlert] = p.monitor_snapshots_config.alerts 

1076 labels: list[SnapshotLabel] = [alert.label for alert in alerts] 

1077 oldest_skip_holds: list[bool] = [alert.oldest_skip_holds for alert in alerts] 

1078 current_unixtime_millis: float = p.create_src_snapshots_config.current_datetime.timestamp() * 1000 

1079 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

1080 if is_caching_snapshots(p, remote): 

1081 props: dict[str, DatasetProperties] = self.dst_properties if remote is p.dst else self.src_properties 

1082 snapshots_changed_dict: dict[str, int] = {dataset: vals.snapshots_changed for dataset, vals in props.items()} 

1083 alerts_hash: str = sha256_128_urlsafe_base64(str(tuple(alerts))) 

1084 label_hashes: dict[SnapshotLabel, str] = { 

1085 label: sha256_128_urlsafe_base64(label.notimestamp_str()) for label in labels 

1086 } 

1087 is_caching: bool = False 

1088 worst_alert: tuple[int, str, str] = (0, "", "") # -exit_code, exit_kind, exit_msg 

1089 

1090 def record_alert(exit_code: int, exit_kind: str, exit_msg: str) -> None: 

1091 nonlocal worst_alert 

1092 worst_alert = min(worst_alert, (-exit_code, exit_kind, exit_msg)) # min() sorts "Latest" before "Oldest" on tie 

1093 

1094 def monitor_last_modified_cache_file(r: Remote, dataset: str, label: SnapshotLabel, alert_cfg: AlertConfig) -> str: 

1095 cache_label: str = os.path.join(MONITOR_CACHE_FILE_PREFIX, alert_cfg.kind[0], label_hashes[label], alerts_hash) 

1096 return self.cache.last_modified_cache_file(r, dataset, cache_label) 

1097 

1098 def alert_msg( 

1099 kind: str, dataset: str, snapshot: str, label: SnapshotLabel, snapshot_age_millis: float, delta_millis: int 

1100 ) -> str: 

1101 assert kind == "Latest" or kind == "Oldest" 

1102 lbl = f"{label.prefix}{label.infix}<timestamp>{label.suffix}" 

1103 if snapshot_age_millis >= current_unixtime_millis: 

1104 return f"No snapshot exists for {dataset}@{lbl}" 

1105 msg = f"{kind} snapshot for {dataset}@{lbl} is {human_readable_duration(snapshot_age_millis, unit='ms')} old" 

1106 s = f": @{snapshot}" if snapshot else "" 

1107 if delta_millis == -1: 

1108 return f"{msg}{s}" 

1109 return f"{msg} but should be at most {human_readable_duration(delta_millis, unit='ms')} old{s}" 

1110 

1111 def check_alert( 

1112 label: SnapshotLabel, alert_cfg: AlertConfig | None, creation_unixtime_secs: int, dataset: str, snapshot: str 

1113 ) -> None: # thread-safe 

1114 if alert_cfg is None: 

1115 return 

1116 if is_caching and not p.dry_run: # update cache with latest state from 'zfs list -t snapshot' 

1117 snapshots_changed: int = snapshots_changed_dict.get(dataset, 0) 

1118 cache_file: str = monitor_last_modified_cache_file(remote, dataset, label, alert_cfg) 

1119 set_last_modification_time_safe( 

1120 cache_file, unixtime_in_secs=(creation_unixtime_secs, snapshots_changed), if_more_recent=True 

1121 ) 

1122 warning_millis: int = alert_cfg.warning_millis 

1123 critical_millis: int = alert_cfg.critical_millis 

1124 alert_kind = alert_cfg.kind 

1125 snapshot_age_millis: float = current_unixtime_millis - creation_unixtime_secs * 1000 

1126 m = "--monitor_snapshots: " 

1127 if snapshot_age_millis > critical_millis: 

1128 msg = m + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, critical_millis) 

1129 log.critical("%s", msg) 

1130 if not p.monitor_snapshots_config.dont_crit: 

1131 record_alert(CRITICAL_STATUS, alert_kind, msg) 

1132 elif snapshot_age_millis > warning_millis: 

1133 msg = m + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, warning_millis) 

1134 log.warning("%s", msg) 

1135 if not p.monitor_snapshots_config.dont_warn: 

1136 record_alert(WARNING_STATUS, alert_kind, msg) 

1137 elif is_debug: 

1138 msg = m + "OK. " + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, delta_millis=-1) 

1139 log.debug("%s", msg) 

1140 

1141 def alert_latest_snapshot(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None: 

1142 alert: MonitorSnapshotAlert = alerts[i] 

1143 check_alert(alert.label, alert.latest, creation_unixtime_secs, dataset, snapshot) 

1144 

1145 def alert_oldest_snapshot(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None: 

1146 alert: MonitorSnapshotAlert = alerts[i] 

1147 check_alert(alert.label, alert.oldest, creation_unixtime_secs, dataset, snapshot) 

1148 

1149 def find_stale_datasets_and_check_alerts() -> list[str]: 

1150 """If the cache is enabled, check which datasets have changed to determine which datasets can be skipped cheaply, 

1151 that is, without incurring 'zfs list -t snapshots'. 

1152 

1153 This is done by comparing the "snapshots_changed" ZFS dataset property with the local cache. See 

1154 https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed 

1155 """ 

1156 stale_datasets: list[str] = [] 

1157 time_threshold: float = time.time() - MATURITY_TIME_THRESHOLD_SECS 

1158 for dataset in sorted_datasets: 

1159 is_stale_dataset: bool = False 

1160 snapshots_changed: int = snapshots_changed_dict.get(dataset, 0) 

1161 for alert in alerts: 

1162 for cfg in (alert.latest, alert.oldest): 

1163 if cfg is None: 

1164 continue 

1165 if ( 

1166 snapshots_changed != 0 

1167 and snapshots_changed < time_threshold 

1168 and ( # always True 

1169 cached_unix_times := self.cache.get_snapshots_changed2( 

1170 monitor_last_modified_cache_file(remote, dataset, alert.label, cfg) 

1171 ) 

1172 ) 

1173 and snapshots_changed == cached_unix_times[1] # cached snapshots_changed aka last modified time 

1174 and snapshots_changed >= cached_unix_times[0] # creation time of minmax snapshot aka access time 

1175 ): # cached state is still valid; emit an alert if the latest/oldest snapshot is too old 

1176 lbl = alert.label 

1177 check_alert(lbl, cfg, creation_unixtime_secs=cached_unix_times[0], dataset=dataset, snapshot="") 

1178 else: # cached state is no longer valid; fallback to 'zfs list -t snapshot' 

1179 is_stale_dataset = True 

1180 if is_stale_dataset: 

1181 stale_datasets.append(dataset) 

1182 return stale_datasets 

1183 

1184 # satisfy request from local cache as much as possible 

1185 if is_caching_snapshots(p, remote): 

1186 stale_datasets: list[str] = find_stale_datasets_and_check_alerts() 

1187 with self.stats_lock: 

1188 self.num_cache_misses += len(stale_datasets) 

1189 self.num_cache_hits += len(sorted_datasets) - len(stale_datasets) 

1190 else: 

1191 stale_datasets = sorted_datasets 

1192 

1193 # fallback to 'zfs list -t snapshot' for any remaining datasets, as these couldn't be satisfied from local cache 

1194 is_caching = is_caching_snapshots(p, remote) 

1195 datasets_without_snapshots: list[str] = self.handle_minmax_snapshots( 

1196 remote, 

1197 stale_datasets, 

1198 labels, 

1199 fn_latest=alert_latest_snapshot, 

1200 fn_oldest=alert_oldest_snapshot, 

1201 fn_oldest_skip_holds=oldest_skip_holds, 

1202 ) 

1203 for dataset in datasets_without_snapshots: 

1204 for i in range(len(alerts)): 

1205 alert_latest_snapshot(i, creation_unixtime_secs=0, dataset=dataset, snapshot="") 

1206 alert_oldest_snapshot(i, creation_unixtime_secs=0, dataset=dataset, snapshot="") 

1207 return worst_alert 

1208 

1209 def replicate_datasets(self, src_datasets: list[str], task_description: str, max_workers: int) -> bool: 

1210 """Replicates a list of datasets.""" 

1211 assert (not self.is_test_mode) or src_datasets == sorted(src_datasets), "List is not sorted" 

1212 p, log = self.params, self.params.log 

1213 src, dst = p.src, p.dst 

1214 self.num_snapshots_found = 0 

1215 self.num_snapshots_replicated = 0 

1216 log.info("Starting replication task: %s", task_description + f" [{len(src_datasets)} datasets]") 

1217 start_time_nanos: int = time.monotonic_ns() 

1218 

1219 def src2dst(src_dataset: str) -> str: 

1220 return replace_prefix(src_dataset, old_prefix=src.root_dataset, new_prefix=dst.root_dataset) 

1221 

1222 def dst2src(dst_dataset: str) -> str: 

1223 return replace_prefix(dst_dataset, old_prefix=dst.root_dataset, new_prefix=src.root_dataset) 

1224 

1225 def find_stale_datasets() -> tuple[list[str], dict[str, str]]: 

1226 """If the cache is enabled on replication, check which src datasets or dst datasets have changed to determine 

1227 which datasets can be skipped cheaply, i.e. without incurring 'zfs list -t snapshots'. 

1228 

1229 This is done by comparing the "snapshots_changed" ZFS dataset property with the local cache. See 

1230 https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed 

1231 """ 

1232 # First, check which src datasets have changed since the last replication to that destination 

1233 cache_files: dict[str, str] = {} 

1234 stale_src_datasets1: list[str] = [] 

1235 maybe_stale_dst_datasets: list[str] = [] 

1236 userhost_dir: str = sha256_85_urlsafe_base64(p.dst.cache_namespace()) 

1237 filter_key = tuple(tuple(f) for f in p.snapshot_filters) # cache is only valid for same --include/excl-snapshot* 

1238 filter_hash_code: str = sha256_85_urlsafe_base64(str(filter_key)) 

1239 for src_dataset in src_datasets: 

1240 dst_dataset: str = src2dst(src_dataset) # cache is only valid for identical destination dataset 

1241 dst_dataset_dir: str = sha256_85_urlsafe_base64(dst_dataset) 

1242 cache_label: str = os.path.join( 

1243 REPLICATION_CACHE_FILE_PREFIX, userhost_dir, dst_dataset_dir, filter_hash_code 

1244 ) 

1245 cache_file: str = self.cache.last_modified_cache_file(src, src_dataset, cache_label) 

1246 cache_files[src_dataset] = cache_file 

1247 snapshots_changed: int = self.src_properties[src_dataset].snapshots_changed # get prop "for free" 

1248 if ( 

1249 snapshots_changed != 0 

1250 and time.time() > snapshots_changed + MATURITY_TIME_THRESHOLD_SECS 

1251 and snapshots_changed == self.cache.get_snapshots_changed(cache_file) 

1252 ): 

1253 maybe_stale_dst_datasets.append(dst_dataset) 

1254 else: 

1255 stale_src_datasets1.append(src_dataset) 

1256 

1257 # For each src dataset that hasn't changed, check if the corresponding dst dataset has changed 

1258 stale_src_datasets2: list[str] = [] 

1259 dst_snapshots_changed_dict: dict[str, int] = self.cache.zfs_get_snapshots_changed(dst, maybe_stale_dst_datasets) 

1260 for dst_dataset in maybe_stale_dst_datasets: 

1261 snapshots_changed = dst_snapshots_changed_dict.get(dst_dataset, 0) 

1262 if ( 

1263 snapshots_changed != 0 

1264 and time.time() > snapshots_changed + MATURITY_TIME_THRESHOLD_SECS 

1265 and snapshots_changed 

1266 == self.cache.get_snapshots_changed(self.cache.last_modified_cache_file(dst, dst_dataset)) 

1267 ): 

1268 log.info("Already up-to-date [cached]: %s", dst_dataset) 

1269 else: 

1270 stale_src_datasets2.append(dst2src(dst_dataset)) 

1271 assert (not self.is_test_mode) or stale_src_datasets1 == sorted(stale_src_datasets1), "List is not sorted" 

1272 assert (not self.is_test_mode) or stale_src_datasets2 == sorted(stale_src_datasets2), "List is not sorted" 

1273 stale_src_datasets = list(heapq.merge(stale_src_datasets1, stale_src_datasets2)) # merge two sorted lists 

1274 assert (not self.is_test_mode) or not has_duplicates(stale_src_datasets), "List contains duplicates" 

1275 return stale_src_datasets, cache_files 

1276 

1277 if is_caching_snapshots(p, src): 

1278 stale_src_datasets, cache_files = find_stale_datasets() 

1279 num_cache_misses = len(stale_src_datasets) 

1280 num_cache_hits = len(src_datasets) - len(stale_src_datasets) 

1281 self.num_cache_misses += num_cache_misses 

1282 self.num_cache_hits += num_cache_hits 

1283 cmsg = self._cache_hits_msg(hits=num_cache_hits, misses=num_cache_misses) 

1284 else: 

1285 stale_src_datasets = src_datasets 

1286 cache_files = {} 

1287 cmsg = "" 

1288 

1289 done_src_datasets: list[str] = [] 

1290 done_src_datasets_lock: threading.Lock = threading.Lock() 

1291 

1292 def _process_dataset_fn(src_dataset: str, tid: str, retry: Retry) -> bool: 

1293 result: bool = replicate_dataset(job=self, src_dataset=src_dataset, tid=tid, retry=retry) 

1294 with done_src_datasets_lock: 

1295 done_src_datasets.append(src_dataset) # record datasets that were actually replicated (not skipped) 

1296 return result 

1297 

1298 # Run replicate_dataset(dataset) for each dataset, while taking care of errors, retries + parallel execution 

1299 failed: bool = process_datasets_in_parallel_and_fault_tolerant( 

1300 log=log, 

1301 datasets=stale_src_datasets, 

1302 process_dataset=_process_dataset_fn, 

1303 skip_tree_on_error=lambda dataset: not self.dst_dataset_exists[src2dst(dataset)], 

1304 skip_on_error=p.skip_on_error, 

1305 max_workers=max_workers, 

1306 timing=self.task_timing, 

1307 termination_handler=self.terminate, 

1308 enable_barriers=False, 

1309 task_name="Replication", 

1310 append_exception=self.append_exception, 

1311 retry_template=self._retry_template(), 

1312 dry_run=p.dry_run, 

1313 is_test_mode=self.is_test_mode, 

1314 ) 

1315 

1316 if is_caching_snapshots(p, src) and len(done_src_datasets) > 0: 

1317 # refresh "snapshots_changed" ZFS dataset property from dst 

1318 stale_dst_datasets: list[str] = [src2dst(src_dataset) for src_dataset in sorted(done_src_datasets)] 

1319 dst_snapshots_changed_dict: dict[str, int] = self.cache.zfs_get_snapshots_changed(dst, stale_dst_datasets) 

1320 for dst_dataset in stale_dst_datasets: # update local cache 

1321 dst_snapshots_changed: int = dst_snapshots_changed_dict.get(dst_dataset, 0) 

1322 dst_cache_file: str = self.cache.last_modified_cache_file(dst, dst_dataset) 

1323 src_dataset: str = dst2src(dst_dataset) 

1324 src_snapshots_changed: int = self.src_properties[src_dataset].snapshots_changed 

1325 if not p.dry_run: 

1326 set_last_modification_time_safe( 

1327 cache_files[src_dataset], unixtime_in_secs=src_snapshots_changed, if_more_recent=True 

1328 ) 

1329 set_last_modification_time_safe( 

1330 dst_cache_file, unixtime_in_secs=dst_snapshots_changed, if_more_recent=True 

1331 ) 

1332 

1333 elapsed_nanos: int = time.monotonic_ns() - start_time_nanos 

1334 log.info( 

1335 p.dry("Replication done: %s"), 

1336 f"{task_description} [Replicated {self.num_snapshots_replicated} out of {self.num_snapshots_found} snapshots" 

1337 f" within {len(src_datasets)} datasets; took {human_readable_duration(elapsed_nanos)}{cmsg}]", 

1338 ) 

1339 return failed 

1340 

1341 def maybe_inject_delete(self, remote: Remote, dataset: str, delete_trigger: str) -> None: 

1342 """For testing only; for unit tests to delete datasets during replication and test correct handling of that.""" 

1343 assert delete_trigger 

1344 counter = self.delete_injection_triggers.get("before") 

1345 if counter and self.decrement_injection_counter(counter, delete_trigger): 

1346 p = self.params 

1347 cmd = p.split_args(f"{remote.sudo} {p.zfs_program} destroy -r", p.force_unmount, p.force_hard, dataset or "") 

1348 self.run_ssh_command(remote, LOG_DEBUG, print_stdout=True, cmd=cmd) 

1349 

1350 def maybe_inject_params(self, error_trigger: str) -> None: 

1351 """For testing only; for unit tests to simulate errors during replication and test correct handling of them.""" 

1352 assert error_trigger 

1353 counter = self.error_injection_triggers.get("before") 

1354 if counter and self.decrement_injection_counter(counter, error_trigger): 

1355 self.inject_params = self.param_injection_triggers[error_trigger] 

1356 elif error_trigger in self.param_injection_triggers: 

1357 self.inject_params = {} 

1358 

1359 @staticmethod 

1360 def recv_option_property_names(recv_opts: list[str]) -> set[str]: 

1361 """Extracts -o and -x property names that are already specified on the command line; This can be used to check for 

1362 dupes because 'zfs receive' does not accept multiple -o or -x options with the same property name.""" 

1363 propnames: set[str] = set() 

1364 i = 0 

1365 n = len(recv_opts) 

1366 while i < n: 

1367 stripped: str = recv_opts[i].strip() 

1368 if stripped in ("-o", "-x"): 

1369 i += 1 

1370 if i == n or recv_opts[i].strip() in ("-o", "-x"): 

1371 die(f"Missing value for {stripped} option in --zfs-recv-program-opt(s): {' '.join(recv_opts)}") 

1372 if stripped == "-o" and "=" not in recv_opts[i]: 

1373 die(f"Missing value for {stripped} name=value pair in --zfs-recv-program-opt(s): {' '.join(recv_opts)}") 

1374 propname: str = recv_opts[i] if stripped == "-x" else recv_opts[i].split("=", 1)[0] 

1375 validate_property_name(propname, "--zfs-recv-program-opt(s)") 

1376 propnames.add(propname) 

1377 i += 1 

1378 return propnames 

1379 

1380 def root_datasets_if_recursive_zfs_snapshot_is_possible( 

1381 self, datasets: list[str], basis_datasets: list[str] 

1382 ) -> list[str] | None: 

1383 """Returns the root datasets within the (filtered) `datasets` list if no incompatible pruning is detected. A dataset 

1384 within `datasets` is considered a root dataset if it has no parent, i.e. it is not a descendant of any dataset in 

1385 `datasets`. Returns `None` if any (unfiltered) dataset in `basis_dataset` that is a descendant of at least one of the 

1386 root datasets is missing in `datasets`, indicating that --include/exclude-dataset* or the snapshot schedule have 

1387 pruned a dataset in a way that is incompatible with 'zfs snapshot -r' CLI semantics, thus requiring a switch to the 

1388 non-recursive 'zfs snapshot snapshot1 .. snapshot N' CLI flavor. 

1389 

1390 Assumes that set(datasets).issubset(set(basis_datasets)). Also assumes that datasets and basis_datasets are both 

1391 sorted (and thus the output root_datasets list is sorted too), which is why this algorithm is efficient - O(N) time 

1392 complexity. The impl is akin to the merge algorithm of a merge sort, adapted to our specific use case. 

1393 See root_datasets_if_recursive_zfs_snapshot_is_possible_slow_but_correct() in the unit test suite for an alternative 

1394 impl that's easier to grok. 

1395 """ 

1396 assert (not self.is_test_mode) or datasets == sorted(datasets), "List is not sorted" 

1397 assert (not self.is_test_mode) or not has_duplicates(datasets), "List contains duplicates" 

1398 assert (not self.is_test_mode) or basis_datasets == sorted(basis_datasets), "List is not sorted" 

1399 assert (not self.is_test_mode) or not has_duplicates(basis_datasets), "List contains duplicates" 

1400 assert (not self.is_test_mode) or set(datasets).issubset(set(basis_datasets)), "Not a subset" 

1401 root_datasets: list[str] = self.find_root_datasets(datasets) 

1402 i = 0 

1403 j = 0 

1404 k = 0 

1405 len_root_datasets = len(root_datasets) 

1406 len_basis_datasets = len(basis_datasets) 

1407 len_datasets = len(datasets) 

1408 while i < len_root_datasets and j < len_basis_datasets: # walk and "merge" the sorted lists, in sync 

1409 if basis_datasets[j] < root_datasets[i]: # irrelevant subtree? 

1410 j += 1 # move to next basis_datasets[j] 

1411 elif is_descendant(basis_datasets[j], of_root_dataset=root_datasets[i]): # relevant subtree? 

1412 while k < len_datasets and datasets[k] < basis_datasets[j]: 

1413 k += 1 # move to next datasets[k] 

1414 if k == len_datasets or datasets[k] != basis_datasets[j]: # dataset chopped off by schedule or --incl/excl*? 

1415 return None # detected filter pruning that is incompatible with 'zfs snapshot -r' 

1416 j += 1 # move to next basis_datasets[j] 

1417 else: 

1418 i += 1 # move to next root_dataset[i]; no need to check root_datasets that are no longer (or not yet) reachable 

1419 return root_datasets 

1420 

1421 @staticmethod 

1422 def find_root_datasets(sorted_datasets: list[str]) -> list[str]: 

1423 """Returns the roots of the subtrees in the (sorted) input datasets; The output root dataset list is sorted, too; A 

1424 dataset is a root dataset if it has no parent, i.e. it is not a descendant of any dataset in the input datasets.""" 

1425 root_datasets: list[str] = [] 

1426 skip_dataset: str = DONT_SKIP_DATASET 

1427 for dataset in sorted_datasets: 

1428 if is_descendant(dataset, of_root_dataset=skip_dataset): 

1429 continue 

1430 skip_dataset = dataset 

1431 root_datasets.append(dataset) 

1432 return root_datasets 

1433 

1434 def find_datasets_to_snapshot(self, sorted_datasets: list[str]) -> dict[SnapshotLabel, list[str]]: 

1435 """Given a (sorted) list of source datasets, returns a dict where the key is a snapshot name (aka SnapshotLabel, e.g. 

1436 bzfs_2024-11-06_08:30:05_hourly) and the value is the (sorted) (sub)list of datasets for which a snapshot needs to be 

1437 created with that name, because these datasets are due per the schedule, either because the 'creation' time of their 

1438 most recent snapshot with that name pattern is now too old, or such a snapshot does not even exist. 

1439 

1440 The baseline implementation uses the 'zfs list -t snapshot' CLI to find the most recent snapshots, which is simple 

1441 but doesn't scale well with the number of snapshots, at least if the goal is to take snapshots every second. An 

1442 alternative, much more scalable, implementation queries the standard ZFS "snapshots_changed" dataset property 

1443 (requires zfs >= 2.2.0), in combination with a local cache that stores this property, as well as the creation time of 

1444 the most recent snapshot, for each SnapshotLabel and each dataset. 

1445 """ 

1446 p, log = self.params, self.params.log 

1447 src = p.src 

1448 config: CreateSrcSnapshotConfig = p.create_src_snapshots_config 

1449 datasets_to_snapshot: dict[SnapshotLabel, list[str]] = defaultdict(list) 

1450 is_caching: bool = False 

1451 interner: HashedInterner[datetime] = HashedInterner() # reduces memory footprint 

1452 msgs: list[tuple[datetime, str, SnapshotLabel, str]] = [] 

1453 

1454 def create_snapshot_if_latest_is_too_old( 

1455 datasets_to_snapshot: dict[SnapshotLabel, list[str]], dataset: str, label: SnapshotLabel, creation_unixtime: int 

1456 ) -> None: # thread-safe 

1457 """Schedules creation of a snapshot for the given label if the label's existing latest snapshot is too old.""" 

1458 creation_dt: datetime = datetime.fromtimestamp(creation_unixtime, tz=config.tz) 

1459 log.log(LOG_TRACE, "Latest snapshot creation: %s for %s", creation_dt, label) 

1460 duration_amount, duration_unit = config.suffix_durations[label.suffix] 

1461 next_event_dt: datetime = config.anchors.round_datetime_up_to_duration_multiple( 

1462 creation_dt + timedelta(microseconds=1), duration_amount, duration_unit 

1463 ) 

1464 msg: str = "" 

1465 if config.current_datetime >= next_event_dt: 

1466 datasets_to_snapshot[label].append(dataset) # mark it as scheduled for snapshot creation 

1467 msg = " has passed" 

1468 next_event_dt = interner.intern(next_event_dt) 

1469 msgs.append((next_event_dt, dataset, label, msg)) 

1470 if is_caching and not p.dry_run: # update cache with latest state from 'zfs list -t snapshot' 

1471 # Per-label cache stores (atime=creation, mtime=snapshots_changed) so later runs can safely trust creation 

1472 # only when the label's mtime matches the current dataset-level '=' cache value. Excludes timestamp of label. 

1473 cache_file: str = self.cache.last_modified_cache_file(src, dataset, label_hashes[label]) 

1474 unixtimes: tuple[int, int] = (creation_unixtime, self.src_properties[dataset].snapshots_changed) 

1475 set_last_modification_time_safe(cache_file, unixtime_in_secs=unixtimes, if_more_recent=True) 

1476 

1477 labels: list[SnapshotLabel] = [] 

1478 config_labels: list[SnapshotLabel] = config.snapshot_labels() 

1479 for label in config_labels: 

1480 duration_amount_, _duration_unit = config.suffix_durations[label.suffix] 

1481 if duration_amount_ == 0 or config.create_src_snapshots_even_if_not_due: 

1482 datasets_to_snapshot[label] = sorted_datasets # take snapshot regardless of creation time of existing snaps 

1483 else: 

1484 labels.append(label) 

1485 if len(labels) == 0: 

1486 return datasets_to_snapshot # nothing more TBD 

1487 label_hashes: dict[SnapshotLabel, str] = { 

1488 label: sha256_128_urlsafe_base64(label.notimestamp_str()) for label in labels 

1489 } 

1490 

1491 # satisfy request from local cache as much as possible 

1492 cached_datasets_to_snapshot: dict[SnapshotLabel, list[str]] = defaultdict(list) 

1493 if is_caching_snapshots(p, src): 

1494 sorted_datasets_todo: list[str] = [] 

1495 time_threshold: float = time.time() - MATURITY_TIME_THRESHOLD_SECS 

1496 for dataset in sorted_datasets: 

1497 cache: SnapshotCache = self.cache 

1498 cached_snapshots_changed: int = cache.get_snapshots_changed(cache.last_modified_cache_file(src, dataset)) 

1499 if cached_snapshots_changed == 0: 

1500 sorted_datasets_todo.append(dataset) # request cannot be answered from cache 

1501 continue 

1502 if cached_snapshots_changed != self.src_properties[dataset].snapshots_changed: # get that prop "for free" 

1503 cache.invalidate_last_modified_cache_dataset(dataset) 

1504 sorted_datasets_todo.append(dataset) # request cannot be answered from cache 

1505 continue 

1506 if cached_snapshots_changed >= time_threshold: # Avoid equal-second races: only trust matured cache entries 

1507 sorted_datasets_todo.append(dataset) # cache entry isn't mature enough to be trusted; skip cache 

1508 continue 

1509 creation_unixtimes: list[int] = [] 

1510 for label_hash in label_hashes.values(): 

1511 # For per-label files, atime stores the latest matching snapshot's creation time, while mtime stores 

1512 # the dataset-level snapshots_changed observed when this label file was written. 

1513 atime, mtime = cache.get_snapshots_changed2(cache.last_modified_cache_file(src, dataset, label_hash)) 

1514 # Sanity check: trust per-label cache only when: 

1515 # - mtime equals the dataset-level '=' cache (same snapshots_changed), and 

1516 # - atime is plausible and not later than mtime (creation <= snapshots_changed), and 

1517 # - neither atime nor mtime is zero (unknown provenance). 

1518 # Otherwise fall back to 'zfs list -t snapshot' to avoid stale creation times after newer changes. 

1519 if atime == 0 or mtime == 0 or mtime != cached_snapshots_changed or atime > mtime: 

1520 sorted_datasets_todo.append(dataset) # request cannot be answered from cache 

1521 break 

1522 creation_unixtimes.append(atime) 

1523 if len(creation_unixtimes) == len(labels): 

1524 for j, label in enumerate(labels): 

1525 create_snapshot_if_latest_is_too_old( 

1526 cached_datasets_to_snapshot, dataset, label, creation_unixtimes[j] 

1527 ) 

1528 sorted_datasets = sorted_datasets_todo 

1529 

1530 def create_snapshot_fn(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None: 

1531 create_snapshot_if_latest_is_too_old(datasets_to_snapshot, dataset, labels[i], creation_unixtime_secs) 

1532 

1533 def on_finish_dataset(dataset: str) -> None: 

1534 if is_caching_snapshots(p, src) and not p.dry_run: 

1535 set_last_modification_time_safe( 

1536 self.cache.last_modified_cache_file(src, dataset), 

1537 unixtime_in_secs=self.src_properties[dataset].snapshots_changed, 

1538 if_more_recent=True, 

1539 ) 

1540 

1541 # fallback to 'zfs list -t snapshot' for any remaining datasets, as these couldn't be satisfied from local cache 

1542 is_caching = is_caching_snapshots(p, src) 

1543 datasets_without_snapshots: list[str] = self.handle_minmax_snapshots( 

1544 src, sorted_datasets, labels, fn_latest=create_snapshot_fn, fn_on_finish_dataset=on_finish_dataset 

1545 ) 

1546 for lbl in labels: # merge (sorted) results from local cache + 'zfs list -t snapshot' into (sorted) combined result 

1547 datasets_to_snapshot[lbl].sort() 

1548 if datasets_without_snapshots or (lbl in cached_datasets_to_snapshot): # +take snaps for snapshot-less datasets 

1549 datasets_to_snapshot[lbl] = list( # inputs to merge() are sorted, and outputs are sorted too 

1550 heapq.merge(datasets_to_snapshot[lbl], cached_datasets_to_snapshot[lbl], datasets_without_snapshots) 

1551 ) 

1552 for label, datasets in datasets_to_snapshot.items(): 

1553 assert (not self.is_test_mode) or datasets == sorted(datasets), "List is not sorted" 

1554 assert (not self.is_test_mode) or not has_duplicates(datasets), "List contains duplicates" 

1555 assert label 

1556 

1557 msgs.sort() # sort by time, dataset, label 

1558 for i in range(0, len(msgs), 10_000): # reduce logging overhead via mini-batching 

1559 text = "".join( 

1560 f"\nNext scheduled snapshot time: {next_event_dt} for {dataset}@{label}{msg}" 

1561 for next_event_dt, dataset, label, msg in msgs[i : i + 10_000] 

1562 ) 

1563 log.info("Next scheduled snapshot times ...%s", text) 

1564 

1565 # sort keys to ensure that we take snapshots for dailies before hourlies, and so on 

1566 label_indexes: dict[SnapshotLabel, int] = {label: k for k, label in enumerate(config_labels)} 

1567 datasets_to_snapshot = dict(sorted(datasets_to_snapshot.items(), key=lambda kv: label_indexes[kv[0]])) 

1568 return datasets_to_snapshot 

1569 

1570 def handle_minmax_snapshots( 

1571 self, 

1572 remote: Remote, 

1573 sorted_datasets: list[str], 

1574 labels: list[SnapshotLabel], 

1575 *, 

1576 fn_latest: Callable[[int, int, str, str], None], # callback function for latest snapshot 

1577 fn_oldest: Callable[[int, int, str, str], None] | None = None, # callback function for oldest snapshot 

1578 fn_oldest_skip_holds: Sequence[bool] = (), 

1579 fn_on_finish_dataset: Callable[[str], None] = lambda dataset: None, 

1580 ) -> list[str]: # thread-safe 

1581 """For each dataset in `sorted_datasets`, for each label in `labels`, finds the latest and oldest snapshot, and runs 

1582 the callback functions on them; Ignores the timestamp of the input labels and the timestamp of the snapshot names. 

1583 

1584 If the (optional) fn_oldest_skip_holds=True for a given label, then snapshots for that label that carry a 'zfs hold' 

1585 are skipped (ignored) when finding the oldest snapshot for fn_oldest. This can be useful for monitor_snapshots(), 

1586 given that users often intentionally retain holds for longer than the "normal" snapshot retention period, and this 

1587 shouldn't necessarily cause monitoring to emit alerts. 

1588 """ 

1589 if fn_oldest is not None: 

1590 assert len(labels) == len(fn_oldest_skip_holds) 

1591 assert (not self.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted" 

1592 no_userrefs: tuple[str, ...] = ("", "-", "0") # ZFS snapshot property userrefs > 0 indicates a zfs hold 

1593 

1594 def extract_fields(line: str) -> tuple[int, int, str, bool]: 

1595 fields: list[str] = line.split("\t") 

1596 if len(fields) == 3: 

1597 name, createtxg, creation_unixtime_secs = fields 

1598 userrefs = "" 

1599 else: 

1600 name, createtxg, creation_unixtime_secs, userrefs = fields 

1601 return ( 

1602 int(createtxg), 

1603 int(creation_unixtime_secs), 

1604 name.split("@", 1)[1], 

1605 userrefs in no_userrefs, 

1606 ) 

1607 

1608 p = self.params 

1609 props: str = "name,createtxg,creation" 

1610 props = props if fn_oldest is None or not any(fn_oldest_skip_holds) else props + ",userrefs" 

1611 cmd = p.split_args(f"{p.zfs_program} list -t snapshot -d 1 -Hp -o {props}") # sorts by dataset,creation 

1612 datasets_with_snapshots: set[str] = set() 

1613 interner: SortedInterner[str] = SortedInterner(sorted_datasets) # reduces memory footprint 

1614 for lines in zfs_list_snapshots_in_parallel(self, remote, cmd, sorted_datasets, ordered=False): 

1615 # streaming group by dataset name (consumes constant memory only) 

1616 for dataset, group in itertools.groupby(lines, key=lambda line: line.split("\t", 1)[0].split("@", 1)[0]): 

1617 dataset = interner.interned(dataset) 

1618 snapshots = sorted( # fetch all snapshots of current dataset and sort by createtxg,creation,name 

1619 extract_fields(line) for line in group 

1620 ) # perf: sorted() is fast because Timsort is close to O(N) for nearly sorted input, which is our case 

1621 assert len(snapshots) > 0 

1622 datasets_with_snapshots.add(dataset) 

1623 snapshot_names: tuple[str, ...] = tuple(snapshot[2] for snapshot in snapshots) 

1624 year_with_4_digits_regex: re.Pattern[str] = YEAR_WITH_FOUR_DIGITS_REGEX 

1625 year_with_4_digits_regex_fullmatch = year_with_4_digits_regex.fullmatch 

1626 startswith = str.startswith 

1627 endswith = str.endswith 

1628 fns = ((fn_latest, True),) if fn_oldest is None else ((fn_latest, True), (fn_oldest, False)) 

1629 for i, label in enumerate(labels): 

1630 infix: str = label.infix 

1631 start: str = label.prefix + infix 

1632 end: str = label.suffix 

1633 startlen: int = len(start) 

1634 endlen: int = len(end) 

1635 minlen: int = startlen + endlen if infix else 4 + startlen + endlen # year_with_four_digits_regex 

1636 startlen_4: int = startlen + 4 # [startlen:startlen+4] # year_with_four_digits_regex 

1637 has_infix: bool = bool(infix) 

1638 for fn, is_reverse in fns: 

1639 creation_unixtime_secs: int = 0 # find creation time of latest or oldest snapshot matching the label 

1640 minmax_snapshot: str = "" 

1641 no_skip_holds: bool = is_reverse or not fn_oldest_skip_holds[i] 

1642 for j in range(len(snapshot_names) - 1, -1, -1) if is_reverse else range(len(snapshot_names)): 

1643 snapshot_name: str = snapshot_names[j] 

1644 if ( 

1645 endswith(snapshot_name, end) # aka snapshot_name.endswith(end) 

1646 and startswith(snapshot_name, start) # aka snapshot_name.startswith(start) 

1647 and len(snapshot_name) >= minlen 

1648 and (has_infix or year_with_4_digits_regex_fullmatch(snapshot_name, startlen, startlen_4)) 

1649 and (no_skip_holds or snapshots[j][3]) 

1650 ): 

1651 creation_unixtime_secs = snapshots[j][1] 

1652 minmax_snapshot = snapshot_name 

1653 break 

1654 fn(i, creation_unixtime_secs, dataset, minmax_snapshot) 

1655 fn_on_finish_dataset(dataset) 

1656 datasets_without_snapshots = [dataset for dataset in sorted_datasets if dataset not in datasets_with_snapshots] 

1657 return datasets_without_snapshots 

1658 

1659 @staticmethod 

1660 def _cache_hits_msg(hits: int, misses: int) -> str: 

1661 total = hits + misses 

1662 return f", cache hits: {percent(hits, total, print_total=True)}, misses: {percent(misses, total, print_total=True)}" 

1663 

1664 def run_ssh_command( 

1665 self, 

1666 remote: MiniRemote, 

1667 loglevel: int = logging.INFO, 

1668 is_dry: bool = False, 

1669 check: bool = True, 

1670 print_stdout: bool = False, 

1671 print_stderr: bool = True, 

1672 cmd: list[str] | None = None, 

1673 retry_on_generic_ssh_error: bool = True, 

1674 ) -> str: 

1675 """Runs the given CLI cmd via ssh on the given remote, and returns stdout.""" 

1676 assert cmd is not None and isinstance(cmd, list) and len(cmd) > 0 

1677 conn_pool: ConnectionPool = self.params.connection_pools[remote.location].pool(SHARED) 

1678 with conn_pool.connection() as conn: 

1679 log: logging.Logger = self.params.log 

1680 try: 

1681 process: subprocess.CompletedProcess[str] = conn.run_ssh_command( 

1682 cmd=cmd, 

1683 job=self, 

1684 loglevel=loglevel, 

1685 is_dry=is_dry, 

1686 check=check, 

1687 stdin=DEVNULL, 

1688 stdout=PIPE, 

1689 stderr=PIPE, 

1690 text=True, 

1691 ) 

1692 except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e: 

1693 xprint(log, stderr_to_str(e.stdout) if print_stdout else e.stdout, run=print_stdout, file=sys.stdout, end="") 

1694 xprint(log, stderr_to_str(e.stderr) if print_stderr else e.stderr, run=print_stderr, file=sys.stderr, end="") 

1695 if retry_on_generic_ssh_error and isinstance(e, subprocess.CalledProcessError): 

1696 stderr: str = stderr_to_str(e.stderr) 

1697 if stderr.startswith("ssh: "): 

1698 assert e.returncode == 255, e.returncode # error within SSH itself (not during the remote command) 

1699 raise RetryableError(display_msg="ssh") from e 

1700 raise 

1701 else: 

1702 if is_dry: 

1703 return "" 

1704 xprint(log, process.stdout, run=print_stdout, file=sys.stdout, end="") 

1705 xprint(log, process.stderr, run=print_stderr, file=sys.stderr, end="") 

1706 return process.stdout 

1707 

1708 def try_ssh_command( 

1709 self, 

1710 remote: MiniRemote, 

1711 loglevel: int, 

1712 is_dry: bool = False, 

1713 print_stdout: bool = False, 

1714 cmd: list[str] | None = None, 

1715 exists: bool = True, 

1716 error_trigger: str | None = None, 

1717 ) -> str | None: 

1718 """Convenience method that helps retry/react to a dataset or pool that potentially doesn't exist anymore.""" 

1719 assert cmd is not None and isinstance(cmd, list) and len(cmd) > 0 

1720 log = self.params.log 

1721 try: 

1722 self.maybe_inject_error(cmd=cmd, error_trigger=error_trigger) 

1723 return self.run_ssh_command(remote=remote, loglevel=loglevel, is_dry=is_dry, print_stdout=print_stdout, cmd=cmd) 

1724 except (subprocess.CalledProcessError, UnicodeDecodeError) as e: 

1725 if not isinstance(e, UnicodeDecodeError): 

1726 stderr: str = stderr_to_str(e.stderr) 

1727 if exists and ( 

1728 ": dataset does not exist" in stderr 

1729 or ": filesystem does not exist" in stderr # solaris 11.4.0 

1730 or ": no such pool" in stderr 

1731 or "does not have any resumable receive state to abort" in stderr # harmless `zfs receive -A` race 

1732 ): 

1733 return None 

1734 log.warning("%s", stderr.rstrip()) 

1735 raise RetryableError("Subprocess failed") from e 

1736 

1737 def try_ssh_command_with_retries(self, *args: Any, **kwargs: Any) -> str | None: 

1738 """Convenience method that auto-retries try_ssh_command() on failure.""" 

1739 return self._retry_template().call_with_retries(fn=lambda retry: self.try_ssh_command(*args, **kwargs)) 

1740 

1741 def run_ssh_command_with_retries(self, *args: Any, **kwargs: Any) -> str: 

1742 """Convenience method that auto-retries run_ssh_command() on transport failure (not on remote command failure).""" 

1743 return self._retry_template().call_with_retries(fn=lambda retry: self.run_ssh_command(*args, **kwargs)) 

1744 

1745 def maybe_inject_error(self, cmd: list[str], error_trigger: str | None = None) -> None: 

1746 """For testing only; for unit tests to simulate errors during replication and test correct handling of them.""" 

1747 if error_trigger: 

1748 counter = self.error_injection_triggers.get("before") 

1749 if counter and self.decrement_injection_counter(counter, error_trigger): 

1750 try: 

1751 raise CalledProcessError(returncode=1, cmd=" ".join(cmd), stderr=error_trigger + ":dataset is busy") 

1752 except subprocess.CalledProcessError as e: 

1753 if error_trigger.startswith("retryable_"): 

1754 raise RetryableError("Subprocess failed") from e 

1755 else: 

1756 raise 

1757 

1758 def decrement_injection_counter(self, counter: Counter[str], trigger: str) -> bool: 

1759 """For testing only.""" 

1760 with self.injection_lock: 

1761 if counter[trigger] <= 0: 

1762 return False 

1763 counter[trigger] -= 1 

1764 return True 

1765 

1766 

1767############################################################################# 

1768@final 

1769class DatasetProperties: 

1770 """Properties of a ZFS dataset.""" 

1771 

1772 __slots__ = ("recordsize", "snapshots_changed") # uses more compact memory layout than __dict__ 

1773 

1774 def __init__(self, recordsize: int, snapshots_changed: int) -> None: 

1775 # immutable variables: 

1776 self.recordsize: Final[int] = recordsize 

1777 

1778 # mutable variables: 

1779 self.snapshots_changed: int = snapshots_changed 

1780 

1781 

1782############################################################################# 

1783# Input format is [[user@]host:]dataset 

1784# 1234 5 6 

1785_DATASET_LOCATOR_REGEX: Final[re.Pattern[str]] = re.compile(r"(((([^@]*)@)?([^:]+)):)?(.*)", flags=re.DOTALL) 

1786 

1787 

1788def parse_dataset_locator( 

1789 input_text: str, validate: bool = True, user: str | None = None, host: str | None = None, port: int | None = None 

1790) -> tuple[str, str, str, str, str]: 

1791 """Splits user@host:dataset into its components with optional checks.""" 

1792 

1793 def convert_ipv6(hostname: str) -> str: # support IPv6 without getting confused by host:dataset colon separator ... 

1794 return hostname.replace("|", ":") # ... and any colons that may be part of a (valid) ZFS dataset name 

1795 

1796 user_undefined: bool = user is None 

1797 if user is None: 

1798 user = "" 

1799 host_undefined: bool = host is None 

1800 if host is None: 

1801 host = "" 

1802 host = convert_ipv6(host) 

1803 user_host, dataset, pool = "", "", "" 

1804 

1805 if match := _DATASET_LOCATOR_REGEX.fullmatch(input_text): 1805 ↛ 1822line 1805 didn't jump to line 1822 because the condition on line 1805 was always true

1806 if user_undefined: 

1807 user = match.group(4) or "" 

1808 if host_undefined: 

1809 host = match.group(5) or "" 

1810 host = convert_ipv6(host) 

1811 if host == "-": 

1812 host = "" 

1813 dataset = match.group(6) or "" 

1814 i = dataset.find("/") 

1815 pool = dataset[0:i] if i >= 0 else dataset 

1816 

1817 if user and host: 

1818 user_host = f"{user}@{host}" 

1819 elif host: 

1820 user_host = host 

1821 

1822 if validate: 

1823 validate_user_name(user, input_text) 

1824 validate_host_name(host, input_text) 

1825 if port is not None: 

1826 validate_port(port, f"Invalid port number: '{port}' for: '{input_text}' - ") 

1827 validate_dataset_name(dataset, input_text) 

1828 

1829 return user, host, user_host, pool, dataset 

1830 

1831 

1832def validate_user_name(user: str, input_text: str) -> None: 

1833 """Checks that the username is safe for ssh or local usage.""" 

1834 invalid_chars: str = SHELL_CHARS_AND_SLASH 

1835 if user and (user.startswith("-") or ".." in user or any(c.isspace() or c in invalid_chars for c in user)): 

1836 die(f"Invalid user name: '{user}' for: '{input_text}'") 

1837 

1838 

1839def validate_host_name(host: str, input_text: str) -> None: 

1840 """Checks hostname for forbidden characters or patterns.""" 

1841 invalid_chars: str = SHELL_CHARS_AND_SLASH 

1842 if host and (host.startswith("-") or ".." in host or any(c.isspace() or c in invalid_chars for c in host)): 

1843 die(f"Invalid host name: '{host}' for: '{input_text}'") 

1844 

1845 

1846def validate_port(port: str | int | None, message: str) -> None: 

1847 """Checks that port specification is a valid integer.""" 

1848 if isinstance(port, int): 

1849 port = str(port) 

1850 if port and not port.isdigit(): 

1851 die(message + f"must be empty or a positive integer: '{port}'") 

1852 

1853 

1854def normalize_called_process_error(error: subprocess.CalledProcessError) -> int: 

1855 """Normalizes `CalledProcessError.returncode` to avoid reserved exit codes so callers don't misclassify them.""" 

1856 ret: int = error.returncode 

1857 ret = DIE_STATUS if isinstance(ret, int) and 1 <= ret <= STILL_RUNNING_STATUS else ret 

1858 return ret 

1859 

1860 

1861############################################################################# 

1862if __name__ == "__main__": 1862 ↛ 1863line 1862 didn't jump to line 1863 because the condition on line 1862 was never true

1863 main()