Coverage for bzfs_main/bzfs.py: 99%

983 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-06 13:30 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15# Inline script metadata conforming to https://packaging.python.org/specifications/inline-script-metadata 

16# /// script 

17# requires-python = ">=3.8" 

18# dependencies = [] 

19# /// 

20# 

21""" 

22* Main CLI entry point for replicating and managing ZFS snapshots. It handles the low-level mechanics of `zfs send/receive`, 

23 data transfer, and snapshot management between two hosts. 

24* Overview of the bzfs.py codebase: 

25* The codebase starts with docs, definition of input data and associated argument parsing into a "Params" class. 

26* All CLI option/parameter values are reachable from the "Params" class. 

27* Control flow starts in main(), which kicks off a "Job". 

28* A Job runs one or more "tasks" via run_tasks(), each task replicating a separate dataset tree. 

29* The core replication algorithm is in run_task() and especially in replicate_datasets() and replicate_dataset(). 

30* The filter algorithms that apply include/exclude policies are in filter_datasets() and filter_snapshots(). 

31* The --create-src-snapshots-* and --delete-* and --compare-* and --monitor-* algorithms also start in run_task(). 

32* The main retry logic is in run_with_retries() and clear_resumable_recv_state_if_necessary(). 

33* Progress reporting for use during `zfs send/recv` data transfers is in class ProgressReporter. 

34* Executing a CLI command on a local or remote host is in run_ssh_command(). 

35* Network connection management is in refresh_ssh_connection_if_necessary() and class ConnectionPool. 

36* Cache functionality can be found by searching for this regex: .*cach.* 

37* The parallel processing engine is in itr_ssh_cmd_parallel() and process_datasets_in_parallel_and_fault_tolerant(). 

38* README.md is mostly auto-generated from the ArgumentParser help texts as the source of "truth", via update_readme.sh. 

39 Simply run that script whenever you change or add ArgumentParser help text. 

40""" 

41 

42from __future__ import annotations 

43import argparse 

44import contextlib 

45import fcntl 

46import hashlib 

47import heapq 

48import itertools 

49import os 

50import re 

51import signal 

52import subprocess 

53import sys 

54import threading 

55import time 

56from collections import defaultdict 

57from datetime import datetime, timedelta 

58from logging import Logger 

59from os.path import join as os_path_join 

60from pathlib import Path 

61from subprocess import CalledProcessError 

62from typing import ( 

63 IO, 

64 Any, 

65 Callable, 

66 Collection, 

67 Counter, 

68 cast, 

69) 

70 

71import bzfs_main.loggers 

72from bzfs_main.argparse_actions import ( 

73 has_timerange_filter, 

74) 

75from bzfs_main.argparse_cli import ( 

76 EXCLUDE_DATASET_REGEXES_DEFAULT, 

77) 

78from bzfs_main.compare_snapshot_lists import ( 

79 run_compare_snapshot_lists, 

80) 

81from bzfs_main.configuration import ( 

82 AlertConfig, 

83 CreateSrcSnapshotConfig, 

84 LogParams, 

85 MonitorSnapshotAlert, 

86 Params, 

87 Remote, 

88 SnapshotLabel, 

89) 

90from bzfs_main.connection import ( 

91 decrement_injection_counter, 

92 maybe_inject_error, 

93 run_ssh_command, 

94 timeout, 

95 try_ssh_command, 

96) 

97from bzfs_main.detect import ( 

98 DISABLE_PRG, 

99 RemoteConfCacheItem, 

100 are_bookmarks_enabled, 

101 detect_available_programs, 

102 is_caching_snapshots, 

103 is_dummy, 

104 is_solaris_zfs, 

105 is_zpool_feature_enabled_or_active, 

106) 

107from bzfs_main.filter import ( 

108 SNAPSHOT_REGEX_FILTER_NAME, 

109 dataset_regexes, 

110 filter_datasets, 

111 filter_lines, 

112 filter_lines_except, 

113 filter_snapshots, 

114) 

115from bzfs_main.loggers import ( 

116 Tee, 

117 get_simple_logger, 

118 reset_logger, 

119) 

120from bzfs_main.parallel_batch_cmd import ( 

121 run_ssh_cmd_parallel, 

122 zfs_list_snapshots_in_parallel, 

123) 

124from bzfs_main.parallel_engine import ( 

125 process_datasets_in_parallel_and_fault_tolerant, 

126) 

127from bzfs_main.parallel_iterator import ( 

128 run_in_parallel, 

129) 

130from bzfs_main.period_anchors import ( 

131 round_datetime_up_to_duration_multiple, 

132) 

133from bzfs_main.progress_reporter import ( 

134 ProgressReporter, 

135 count_num_bytes_transferred_by_zfs_send, 

136) 

137from bzfs_main.replication import ( 

138 delete_bookmarks, 

139 delete_datasets, 

140 delete_snapshots, 

141 replicate_dataset, 

142) 

143from bzfs_main.retry import ( 

144 Retry, 

145 RetryableError, 

146) 

147from bzfs_main.snapshot_cache import ( 

148 SnapshotCache, 

149 set_last_modification_time_safe, 

150) 

151from bzfs_main.utils import ( 

152 DESCENDANTS_RE_SUFFIX, 

153 DIE_STATUS, 

154 DONT_SKIP_DATASET, 

155 FILE_PERMISSIONS, 

156 LOG_DEBUG, 

157 LOG_TRACE, 

158 PROG_NAME, 

159 SHELL_CHARS, 

160 YEAR_WITH_FOUR_DIGITS_REGEX, 

161 SortedInterner, 

162 SynchronizedBool, 

163 SynchronizedDict, 

164 append_if_absent, 

165 compile_regexes, 

166 cut, 

167 die, 

168 has_duplicates, 

169 human_readable_bytes, 

170 human_readable_duration, 

171 is_descendant, 

172 open_nofollow, 

173 percent, 

174 pretty_print_formatter, 

175 replace_in_lines, 

176 replace_prefix, 

177 terminate_process_subtree, 

178 validate_dataset_name, 

179 validate_property_name, 

180 xappend, 

181 xfinally, 

182) 

183 

184# constants: 

185__version__: str = bzfs_main.argparse_cli.__version__ 

186CRITICAL_STATUS: int = 2 

187WARNING_STATUS: int = 1 

188STILL_RUNNING_STATUS: int = 4 

189MIN_PYTHON_VERSION: tuple[int, int] = (3, 8) 

190if sys.version_info < MIN_PYTHON_VERSION: 

191 print(f"ERROR: {PROG_NAME} requires Python version >= {'.'.join(map(str, MIN_PYTHON_VERSION))}!") 

192 sys.exit(DIE_STATUS) 

193CREATE_SRC_SNAPSHOTS_PREFIX_DFLT: str = PROG_NAME + "_" 

194CREATE_SRC_SNAPSHOTS_SUFFIX_DFLT: str = "_adhoc" 

195TIME_THRESHOLD_SECS: float = 1.1 # 1 second ZFS creation time resolution + NTP clock skew is typically < 10ms 

196 

197 

198############################################################################# 

199def argument_parser() -> argparse.ArgumentParser: 

200 """Returns the CLI parser used by bzfs.""" 

201 return bzfs_main.argparse_cli.argument_parser() 

202 

203 

204def main() -> None: 

205 """API for command line clients.""" 

206 try: 

207 run_main(argument_parser().parse_args(), sys.argv) 

208 except subprocess.CalledProcessError as e: 

209 sys.exit(e.returncode) 

210 

211 

212def run_main(args: argparse.Namespace, sys_argv: list[str] | None = None, log: Logger | None = None) -> None: 

213 """API for Python clients; visible for testing; may become a public API eventually.""" 

214 Job().run_main(args, sys_argv, log) 

215 

216 

217############################################################################# 

218class Job: 

219 """Executes one bzfs run, coordinating snapshot replication tasks.""" 

220 

221 def __init__(self) -> None: 

222 self.params: Params 

223 self.all_dst_dataset_exists: dict[str, dict[str, bool]] = defaultdict(lambda: defaultdict(bool)) 

224 self.dst_dataset_exists: SynchronizedDict[str, bool] = SynchronizedDict({}) 

225 self.src_properties: dict[str, DatasetProperties] = {} 

226 self.dst_properties: dict[str, DatasetProperties] = {} 

227 self.all_exceptions: list[str] = [] 

228 self.all_exceptions_count: int = 0 

229 self.max_exceptions_to_summarize: int = 10000 

230 self.first_exception: BaseException | None = None 

231 self.remote_conf_cache: dict[tuple, RemoteConfCacheItem] = {} 

232 self.dedicated_tcp_connection_per_zfs_send: bool = True 

233 self.max_datasets_per_minibatch_on_list_snaps: dict[str, int] = {} 

234 self.max_workers: dict[str, int] = {} 

235 self.stats_lock: threading.Lock = threading.Lock() 

236 self.num_cache_hits: int = 0 

237 self.num_cache_misses: int = 0 

238 self.num_snapshots_found: int = 0 

239 self.num_snapshots_replicated: int = 0 

240 self.control_persist_secs: int = 90 

241 self.control_persist_margin_secs: int = 2 

242 self.progress_reporter: ProgressReporter = cast(ProgressReporter, None) 

243 self.is_first_replication_task: SynchronizedBool = SynchronizedBool(True) 

244 self.replication_start_time_nanos: int = time.monotonic_ns() 

245 self.timeout_nanos: int | None = None 

246 self.cache: SnapshotCache = SnapshotCache(self) 

247 

248 self.is_test_mode: bool = False # for testing only 

249 self.creation_prefix: str = "" # for testing only 

250 self.isatty: bool | None = None # for testing only 

251 self.use_select: bool = False # for testing only 

252 self.progress_update_intervals: tuple[float, float] | None = None # for testing only 

253 self.error_injection_triggers: dict[str, Counter[str]] = {} # for testing only 

254 self.delete_injection_triggers: dict[str, Counter[str]] = {} # for testing only 

255 self.param_injection_triggers: dict[str, dict[str, bool]] = {} # for testing only 

256 self.inject_params: dict[str, bool] = {} # for testing only 

257 self.injection_lock: threading.Lock = threading.Lock() # for testing only 

258 self.max_command_line_bytes: int | None = None # for testing only 

259 

260 def shutdown(self) -> None: 

261 """Exits any multiplexed ssh sessions that may be leftover.""" 

262 cache_items: Collection[RemoteConfCacheItem] = self.remote_conf_cache.values() 

263 for i, cache_item in enumerate(cache_items): 

264 cache_item.connection_pools.shutdown(f"{i + 1}/{len(cache_items)}") 

265 

266 def terminate(self, old_term_handler: Any, except_current_process: bool = False) -> None: 

267 """Shuts down gracefully on SIGTERM, optionally killing descendants.""" 

268 

269 def post_shutdown() -> None: 

270 signal.signal(signal.SIGTERM, old_term_handler) # restore original signal handler 

271 terminate_process_subtree(except_current_process=except_current_process) 

272 

273 with xfinally(post_shutdown): 

274 self.shutdown() 

275 

276 def run_main(self, args: argparse.Namespace, sys_argv: list[str] | None = None, log: Logger | None = None) -> None: 

277 """Parses CLI arguments, sets up logging, and executes main job loop.""" 

278 assert isinstance(self.error_injection_triggers, dict) 

279 assert isinstance(self.delete_injection_triggers, dict) 

280 assert isinstance(self.inject_params, dict) 

281 with xfinally(reset_logger): # runs reset_logger() on exit, without masking exception raised in body of `with` block 

282 try: 

283 log_params: LogParams = LogParams(args) 

284 log = bzfs_main.loggers.get_logger(log_params=log_params, args=args, log=log) 

285 log.info("%s", f"Log file is: {log_params.log_file}") 

286 except BaseException as e: 

287 get_simple_logger(PROG_NAME).error("Log init: %s", e, exc_info=False if isinstance(e, SystemExit) else True) 

288 raise 

289 

290 aux_args: list[str] = [] 

291 if getattr(args, "include_snapshot_plan", None): 

292 aux_args += args.include_snapshot_plan 

293 if getattr(args, "delete_dst_snapshots_except_plan", None): 

294 aux_args += args.delete_dst_snapshots_except_plan 

295 if len(aux_args) > 0: 

296 log.info("Auxiliary CLI arguments: %s", " ".join(aux_args)) 

297 args = argument_parser().parse_args(xappend(aux_args, "--", args.root_dataset_pairs), namespace=args) 

298 

299 def log_error_on_exit(error: Any, status_code: Any, exc_info: bool = False) -> None: 

300 log.error("%s%s", f"Exiting {PROG_NAME} with status code {status_code}. Cause: ", error, exc_info=exc_info) 

301 

302 try: 

303 log.info("CLI arguments: %s %s", " ".join(sys_argv or []), f"[euid: {os.geteuid()}]") 

304 if self.is_test_mode: 

305 log.log(LOG_TRACE, "Parsed CLI arguments: %s", args) 

306 self.params = p = Params(args, sys_argv or [], log_params, log, self.inject_params) 

307 log_params.params = p 

308 with open_nofollow(log_params.log_file, "a", encoding="utf-8", perm=FILE_PERMISSIONS) as log_file_fd: 

309 with contextlib.redirect_stderr(cast(IO[Any], Tee(log_file_fd, sys.stderr))): # stderr to logfile+stderr 

310 lock_file: str = p.lock_file_name() 

311 lock_fd = os.open(lock_file, os.O_WRONLY | os.O_TRUNC | os.O_CREAT | os.O_NOFOLLOW, FILE_PERMISSIONS) 

312 with xfinally(lambda: os.close(lock_fd)): 

313 try: 

314 # Acquire an exclusive lock; will raise an error if lock is already held by another process. 

315 # The (advisory) lock is auto-released when the process terminates or the fd is closed. 

316 fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # LOCK_NB ... non-blocking 

317 except BlockingIOError: 

318 msg = "Exiting as same previous periodic job is still running without completion yet per " 

319 die(msg + lock_file, STILL_RUNNING_STATUS) 

320 with xfinally(lambda: Path(lock_file).unlink(missing_ok=True)): # don't accumulate stale files 

321 # On CTRL-C and SIGTERM, send signal to descendant processes to also terminate descendants 

322 old_term_handler = signal.getsignal(signal.SIGTERM) 

323 signal.signal(signal.SIGTERM, lambda sig, f: self.terminate(old_term_handler)) 

324 old_int_handler = signal.signal(signal.SIGINT, lambda s, f: self.terminate(old_term_handler)) 

325 try: 

326 self.run_tasks() 

327 except BaseException: 

328 self.terminate(old_term_handler, except_current_process=True) 

329 raise 

330 finally: 

331 signal.signal(signal.SIGTERM, old_term_handler) # restore original signal handler 

332 signal.signal(signal.SIGINT, old_int_handler) # restore original signal handler 

333 for _ in range(2 if self.max_command_line_bytes else 1): 

334 self.shutdown() 

335 print("", end="", file=sys.stderr) 

336 sys.stderr.flush() 

337 except subprocess.CalledProcessError as e: 

338 log_error_on_exit(e, e.returncode) 

339 raise 

340 except SystemExit as e: 

341 log_error_on_exit(e, e.code) 

342 raise 

343 except (subprocess.TimeoutExpired, UnicodeDecodeError) as e: 

344 log_error_on_exit(e, DIE_STATUS) 

345 raise SystemExit(DIE_STATUS) from e 

346 except re.error as e: 

347 log_error_on_exit(f"{e} within regex {e.pattern!r}", DIE_STATUS) 

348 raise SystemExit(DIE_STATUS) from e 

349 except BaseException as e: 

350 log_error_on_exit(e, DIE_STATUS, exc_info=True) 

351 raise SystemExit(DIE_STATUS) from e 

352 finally: 

353 log.info("%s", f"Log file was: {log_params.log_file}") 

354 log.info("Success. Goodbye!") 

355 sys.stderr.flush() 

356 

357 def run_tasks(self) -> None: 

358 """Executes replication cycles, repeating until daemon lifetime expires.""" 

359 p, log = self.params, self.params.log 

360 self.all_exceptions = [] 

361 self.all_exceptions_count = 0 

362 self.first_exception = None 

363 self.remote_conf_cache = {} 

364 self.isatty = self.isatty if self.isatty is not None else p.isatty 

365 self.validate_once() 

366 self.replication_start_time_nanos = time.monotonic_ns() 

367 self.progress_reporter = ProgressReporter(log, p.pv_program_opts, self.use_select, self.progress_update_intervals) 

368 with xfinally(lambda: self.progress_reporter.stop()): 

369 daemon_stoptime_nanos: int = time.monotonic_ns() + p.daemon_lifetime_nanos 

370 while True: # loop for daemon mode 

371 self.timeout_nanos = None if p.timeout_nanos is None else time.monotonic_ns() + p.timeout_nanos 

372 self.all_dst_dataset_exists.clear() 

373 self.progress_reporter.reset() 

374 src, dst = p.src, p.dst 

375 for src_root_dataset, dst_root_dataset in p.root_dataset_pairs: 

376 src.root_dataset = src.basis_root_dataset = src_root_dataset 

377 dst.root_dataset = dst.basis_root_dataset = dst_root_dataset 

378 p.curr_zfs_send_program_opts = p.zfs_send_program_opts.copy() 

379 if p.daemon_lifetime_nanos > 0: 

380 self.timeout_nanos = None if p.timeout_nanos is None else time.monotonic_ns() + p.timeout_nanos 

381 recurs_sep = " " if p.recursive_flag else "" 

382 task_description = f"{src.basis_root_dataset} {p.recursive_flag}{recurs_sep}--> {dst.basis_root_dataset}" 

383 if len(p.root_dataset_pairs) > 1: 

384 log.info("Starting task: %s", task_description + " ...") 

385 try: 

386 try: 

387 maybe_inject_error(self, cmd=[], error_trigger="retryable_run_tasks") 

388 timeout(self) 

389 self.validate_task() 

390 self.run_task() 

391 except RetryableError as retryable_error: 

392 assert retryable_error.__cause__ is not None 

393 raise retryable_error.__cause__ from None 

394 except (CalledProcessError, subprocess.TimeoutExpired, SystemExit, UnicodeDecodeError) as e: 

395 if p.skip_on_error == "fail" or ( 

396 isinstance(e, subprocess.TimeoutExpired) and p.daemon_lifetime_nanos == 0 

397 ): 

398 raise 

399 log.error("%s", e) 

400 self.append_exception(e, "task", task_description) 

401 if not self.sleep_until_next_daemon_iteration(daemon_stoptime_nanos): 

402 break 

403 if not p.skip_replication: 

404 self.print_replication_stats(self.replication_start_time_nanos) 

405 error_count = self.all_exceptions_count 

406 if error_count > 0 and p.daemon_lifetime_nanos == 0: 

407 msgs = "\n".join([f"{i + 1}/{error_count}: {e}" for i, e in enumerate(self.all_exceptions)]) 

408 log.error("%s", f"Tolerated {error_count} errors. Error Summary: \n{msgs}") 

409 assert self.first_exception is not None 

410 raise self.first_exception 

411 

412 def append_exception(self, e: BaseException, task_name: str, task_description: str) -> None: 

413 """Records and logs an exception that was encountered while running a subtask.""" 

414 self.first_exception = self.first_exception or e 

415 if len(self.all_exceptions) < self.max_exceptions_to_summarize: # cap max memory consumption 

416 self.all_exceptions.append(str(e)) 

417 self.all_exceptions_count += 1 

418 self.params.log.error(f"#{self.all_exceptions_count}: Done with %s: %s", task_name, task_description) 

419 

420 def sleep_until_next_daemon_iteration(self, daemon_stoptime_nanos: int) -> bool: 

421 """Pauses until the next scheduled snapshot time or daemon stop.""" 

422 sleep_nanos: int = daemon_stoptime_nanos - time.monotonic_ns() 

423 if sleep_nanos <= 0: 

424 return False 

425 self.progress_reporter.pause() 

426 p, log = self.params, self.params.log 

427 config: CreateSrcSnapshotConfig = p.create_src_snapshots_config 

428 curr_datetime: datetime = config.current_datetime + timedelta(microseconds=1) 

429 next_snapshotting_event_dt: datetime = min( 

430 ( 

431 round_datetime_up_to_duration_multiple(curr_datetime, duration_amount, duration_unit, config.anchors) 

432 for duration_amount, duration_unit in config.suffix_durations.values() 

433 ), 

434 default=curr_datetime + timedelta(days=10 * 365), # infinity 

435 ) 

436 offset: timedelta = next_snapshotting_event_dt - datetime.now(config.tz) 

437 offset_nanos: int = (offset.days * 86400 + offset.seconds) * 1_000_000_000 + offset.microseconds * 1_000 

438 sleep_nanos = min(sleep_nanos, max(0, offset_nanos)) 

439 log.info("Daemon sleeping for: %s%s", human_readable_duration(sleep_nanos), f" ... [Log {p.log_params.log_file}]") 

440 time.sleep(sleep_nanos / 1_000_000_000) 

441 config.current_datetime = datetime.now(config.tz) 

442 return daemon_stoptime_nanos - time.monotonic_ns() > 0 

443 

444 def print_replication_stats(self, start_time_nanos: int) -> None: 

445 """Logs overall replication statistics after a job cycle completes.""" 

446 p, log = self.params, self.params.log 

447 elapsed_nanos = time.monotonic_ns() - start_time_nanos 

448 msg = p.dry(f"Replicated {self.num_snapshots_replicated} snapshots in {human_readable_duration(elapsed_nanos)}.") 

449 if p.is_program_available("pv", "local"): 

450 sent_bytes: int = count_num_bytes_transferred_by_zfs_send(p.log_params.pv_log_file) 

451 sent_bytes_per_sec: int = round(1_000_000_000 * sent_bytes / elapsed_nanos) 

452 msg += f" zfs sent {human_readable_bytes(sent_bytes)} [{human_readable_bytes(sent_bytes_per_sec)}/s]." 

453 log.info("%s", msg.ljust(p.terminal_columns - len("2024-01-01 23:58:45 [I] "))) 

454 

455 def validate_once(self) -> None: 

456 """Validates CLI parameters and compiles regex lists one time only, which will later be reused many times.""" 

457 p = self.params 

458 p.zfs_recv_ox_names = self.recv_option_property_names(p.zfs_recv_program_opts) 

459 for snapshot_filter in p.snapshot_filters: 

460 for _filter in snapshot_filter: 

461 if _filter.name == SNAPSHOT_REGEX_FILTER_NAME: 

462 exclude_snapshot_regexes = compile_regexes(_filter.options[0]) 

463 include_snapshot_regexes = compile_regexes(_filter.options[1] or [".*"]) 

464 _filter.options = (exclude_snapshot_regexes, include_snapshot_regexes) 

465 

466 exclude_regexes: list[str] = [EXCLUDE_DATASET_REGEXES_DEFAULT] 

467 if len(p.args.exclude_dataset_regex) > 0: # some patterns don't exclude anything 

468 exclude_regexes = [regex for regex in p.args.exclude_dataset_regex if regex != "" and regex != "!.*"] 

469 include_regexes: list[str] = p.args.include_dataset_regex 

470 

471 # relative datasets need not be compiled more than once as they don't change between tasks 

472 def separate_abs_vs_rel_datasets(datasets: list[str]) -> tuple[list[str], list[str]]: 

473 abs_datasets: list[str] = [] 

474 rel_datasets: list[str] = [] 

475 for dataset in datasets: 

476 (abs_datasets if dataset.startswith("/") else rel_datasets).append(dataset) 

477 return abs_datasets, rel_datasets 

478 

479 p.abs_exclude_datasets, rel_exclude_datasets = separate_abs_vs_rel_datasets(p.args.exclude_dataset) 

480 p.abs_include_datasets, rel_include_datasets = separate_abs_vs_rel_datasets(p.args.include_dataset) 

481 suffix = DESCENDANTS_RE_SUFFIX 

482 p.tmp_exclude_dataset_regexes, p.tmp_include_dataset_regexes = ( 

483 compile_regexes(exclude_regexes + dataset_regexes(p.src, p.dst, rel_exclude_datasets), suffix=suffix), 

484 compile_regexes(include_regexes + dataset_regexes(p.src, p.dst, rel_include_datasets), suffix=suffix), 

485 ) 

486 

487 if p.pv_program != DISABLE_PRG: 

488 pv_program_opts_set = set(p.pv_program_opts) 

489 if pv_program_opts_set.isdisjoint({"--bytes", "-b", "--bits", "-8"}): 

490 die("--pv-program-opts must contain one of --bytes or --bits for progress metrics to function.") 

491 if self.isatty and not p.quiet: 

492 for opts in [["--eta", "-e"], ["--fineta", "-I"], ["--average-rate", "-a"]]: 

493 if pv_program_opts_set.isdisjoint(opts): 

494 die(f"--pv-program-opts must contain one of {', '.join(opts)} for progress report line to function.") 

495 

496 src, dst = p.src, p.dst 

497 for remote in [src, dst]: 

498 r, loc = remote, remote.location 

499 validate_user_name(r.basis_ssh_user, f"--ssh-{loc}-user") 

500 validate_host_name(r.basis_ssh_host, f"--ssh-{loc}-host") 

501 validate_port(r.ssh_port, f"--ssh-{loc}-port ") 

502 

503 def validate_task(self) -> None: 

504 """Validates a single replication task before execution.""" 

505 p, log = self.params, self.params.log 

506 src, dst = p.src, p.dst 

507 for remote in [src, dst]: 

508 r = remote 

509 r.ssh_user, r.ssh_host, r.ssh_user_host, r.pool, r.root_dataset = parse_dataset_locator( 

510 r.basis_root_dataset, user=r.basis_ssh_user, host=r.basis_ssh_host, port=r.ssh_port 

511 ) 

512 r.sudo, r.use_zfs_delegation = self.sudo_cmd(r.ssh_user_host, r.ssh_user) 

513 local_addrs = ("",) if self.is_test_mode else ("", "127.0.0.1", "::1") # ::1 is IPv6 version of loopback address 

514 remote.is_nonlocal = r.ssh_host not in local_addrs 

515 self.dst_dataset_exists = SynchronizedDict(self.all_dst_dataset_exists[dst.ssh_user_host]) 

516 

517 if src.ssh_host == dst.ssh_host: 

518 msg = f"src: {src.basis_root_dataset}, dst: {dst.basis_root_dataset}" 

519 if src.root_dataset == dst.root_dataset: 

520 die(f"Source and destination dataset must not be the same! {msg}") 

521 if p.recursive and ( 

522 is_descendant(src.root_dataset, of_root_dataset=dst.root_dataset) 

523 or is_descendant(dst.root_dataset, of_root_dataset=src.root_dataset) 

524 ): 

525 die(f"Source and destination dataset trees must not overlap! {msg}") 

526 

527 suffx = DESCENDANTS_RE_SUFFIX # also match descendants of a matching dataset 

528 p.exclude_dataset_regexes, p.include_dataset_regexes = ( 

529 p.tmp_exclude_dataset_regexes + compile_regexes(dataset_regexes(src, dst, p.abs_exclude_datasets), suffix=suffx), 

530 p.tmp_include_dataset_regexes + compile_regexes(dataset_regexes(src, dst, p.abs_include_datasets), suffix=suffx), 

531 ) 

532 if len(p.include_dataset_regexes) == 0: 

533 p.include_dataset_regexes = [(re.compile(r".*"), False)] 

534 

535 detect_available_programs(self) 

536 

537 zfs_send_program_opts: list[str] = p.curr_zfs_send_program_opts 

538 if is_zpool_feature_enabled_or_active(p, dst, "feature@large_blocks"): 

539 append_if_absent(zfs_send_program_opts, "--large-block") # solaris-11.4 does not have this feature 

540 if is_solaris_zfs(p, dst): 

541 p.dry_run_destroy = "" # solaris-11.4 knows no 'zfs destroy -n' flag 

542 p.verbose_destroy = "" # solaris-11.4 knows no 'zfs destroy -v' flag 

543 if is_solaris_zfs(p, src): # solaris-11.4 only knows -w compress 

544 zfs_send_program_opts = ["-p" if opt == "--props" else opt for opt in zfs_send_program_opts] 

545 zfs_send_program_opts = fix_solaris_raw_mode(zfs_send_program_opts) 

546 p.curr_zfs_send_program_opts = zfs_send_program_opts 

547 

548 self.max_workers = {} 

549 self.max_datasets_per_minibatch_on_list_snaps = {} 

550 for r in [src, dst]: 

551 cpus = int(p.available_programs[r.location].get("getconf_cpu_count", 8)) 

552 threads, is_percent = p.threads 

553 cpus = max(1, round(cpus * threads / 100.0) if is_percent else round(threads)) 

554 self.max_workers[r.location] = cpus 

555 bs = max(1, p.max_datasets_per_batch_on_list_snaps) # 1024 by default 

556 max_datasets_per_minibatch = p.max_datasets_per_minibatch_on_list_snaps 

557 if max_datasets_per_minibatch <= 0: 

558 max_datasets_per_minibatch = max(1, bs // cpus) 

559 max_datasets_per_minibatch = min(bs, max_datasets_per_minibatch) 

560 self.max_datasets_per_minibatch_on_list_snaps[r.location] = max_datasets_per_minibatch 

561 log.log( 

562 LOG_TRACE, 

563 "%s", 

564 f"max_datasets_per_batch_on_list_snaps: {p.max_datasets_per_batch_on_list_snaps}, " 

565 f"max_datasets_per_minibatch_on_list_snaps: {max_datasets_per_minibatch}, " 

566 f"max_workers: {self.max_workers[r.location]}, " 

567 f"location: {r.location}", 

568 ) 

569 if self.is_test_mode: 

570 log.log(LOG_TRACE, "Validated Param values: %s", pretty_print_formatter(self.params)) 

571 

572 def sudo_cmd(self, ssh_user_host: str, ssh_user: str) -> tuple[str, bool]: 

573 """Returns sudo command prefix and whether root privileges are required.""" 

574 p: Params = self.params 

575 assert isinstance(ssh_user_host, str) 

576 assert isinstance(ssh_user, str) 

577 assert isinstance(p.sudo_program, str) 

578 assert isinstance(p.enable_privilege_elevation, bool) 

579 

580 is_root: bool = True 

581 if ssh_user_host != "": 

582 if ssh_user == "": 

583 if os.geteuid() != 0: 

584 is_root = False 

585 elif ssh_user != "root": 

586 is_root = False 

587 elif os.geteuid() != 0: 

588 is_root = False 

589 

590 if is_root: 

591 sudo = "" # using sudo in an attempt to make ZFS operations work even if we are not root user? 

592 use_zfs_delegation = False # or instead using 'zfs allow' delegation? 

593 return sudo, use_zfs_delegation 

594 elif p.enable_privilege_elevation: 

595 if p.sudo_program == DISABLE_PRG: 

596 die(f"sudo CLI is not available on host: {ssh_user_host or 'localhost'}") 

597 # The '-n' option makes 'sudo' safer and more fail-fast. It avoids having sudo prompt the user for input of any 

598 # kind. If a password is required for the sudo command to run, sudo will display an error message and exit. 

599 return p.sudo_program + " -n", False 

600 else: 

601 return "", True 

602 

603 def run_task(self) -> None: 

604 """Replicates all snapshots for the current root dataset pair.""" 

605 

606 def filter_src_datasets() -> list[str]: # apply --{include|exclude}-dataset policy 

607 return filter_datasets(self, src, basis_src_datasets) if src_datasets is None else src_datasets 

608 

609 p, log = self.params, self.params.log 

610 src, dst = p.src, p.dst 

611 max_workers: int = min(self.max_workers[src.location], self.max_workers[dst.location]) 

612 recursive_sep: str = " " if p.recursive_flag else "" 

613 task_description: str = f"{src.basis_root_dataset} {p.recursive_flag}{recursive_sep}--> {dst.basis_root_dataset} ..." 

614 failed: bool = False 

615 src_datasets: list[str] | None = None 

616 basis_src_datasets: list[str] = [] 

617 self.src_properties = {} 

618 self.dst_properties = {} 

619 if not is_dummy(src): # find src dataset or all datasets in src dataset tree (with --recursive) 

620 basis_src_datasets = self.list_src_datasets_task() 

621 

622 if not p.create_src_snapshots_config.skip_create_src_snapshots: 

623 log.info(p.dry("--create-src-snapshots: %s"), f"{src.basis_root_dataset} {p.recursive_flag}{recursive_sep}...") 

624 src_datasets = filter_src_datasets() # apply include/exclude policy 

625 self.create_src_snapshots_task(basis_src_datasets, src_datasets) 

626 

627 # Optionally, replicate src.root_dataset (optionally including its descendants) to dst.root_dataset 

628 if not p.skip_replication: 

629 if len(basis_src_datasets) == 0: 

630 die(f"Replication: Source dataset does not exist: {src.basis_root_dataset}") 

631 if is_dummy(dst): 

632 die("Replication: Destination may be a dummy dataset only if exclusively creating snapshots on the source!") 

633 src_datasets = filter_src_datasets() # apply include/exclude policy 

634 failed = self.replicate_datasets(src_datasets, task_description, max_workers) 

635 

636 if failed or not ( 

637 p.delete_dst_datasets 

638 or p.delete_dst_snapshots 

639 or p.delete_empty_dst_datasets 

640 or p.compare_snapshot_lists 

641 or p.monitor_snapshots_config.enable_monitor_snapshots 

642 ): 

643 return 

644 log.info("Listing dst datasets: %s", task_description) 

645 if is_dummy(dst): 

646 die("Destination may be a dummy dataset only if exclusively creating snapshots on the source!") 

647 basis_dst_datasets: list[str] = self.list_dst_datasets_task() 

648 dst_datasets: list[str] = filter_datasets(self, dst, basis_dst_datasets) # apply include/exclude policy 

649 

650 if p.delete_dst_datasets and not failed: 

651 log.info(p.dry("--delete-dst-datasets: %s"), task_description) 

652 basis_dst_datasets, dst_datasets = self.delete_dst_datasets_task( 

653 basis_src_datasets, basis_dst_datasets, dst_datasets 

654 ) 

655 

656 if p.delete_dst_snapshots and not failed: 

657 log.info(p.dry("--delete-dst-snapshots: %s"), task_description + f" [{len(dst_datasets)} datasets]") 

658 failed = self.delete_destination_snapshots_task(basis_src_datasets, dst_datasets, max_workers, task_description) 

659 

660 if p.delete_empty_dst_datasets and p.recursive and not failed: 

661 log.info(p.dry("--delete-empty-dst-datasets: %s"), task_description) 

662 dst_datasets = self.delete_empty_dst_datasets_task(basis_dst_datasets, dst_datasets) 

663 

664 if p.compare_snapshot_lists and not failed: 

665 log.info("--compare-snapshot-lists: %s", task_description) 

666 if len(basis_src_datasets) == 0 and not is_dummy(src): 

667 die(f"Source dataset does not exist: {src.basis_root_dataset}") 

668 src_datasets = filter_src_datasets() # apply include/exclude policy 

669 run_compare_snapshot_lists(self, src_datasets, dst_datasets) 

670 

671 if p.monitor_snapshots_config.enable_monitor_snapshots and not failed: 

672 log.info("--monitor-snapshots: %s", task_description) 

673 src_datasets = filter_src_datasets() # apply include/exclude policy 

674 self.monitor_snapshots_task(src_datasets, dst_datasets, task_description) 

675 

676 def list_src_datasets_task(self) -> list[str]: 

677 """Lists datasets on the source host.""" 

678 p = self.params 

679 src = p.src 

680 basis_src_datasets: list[str] = [] 

681 is_caching: bool = is_caching_snapshots(p, src) 

682 props: str = "volblocksize,recordsize,name" 

683 props = "snapshots_changed," + props if is_caching else props 

684 cmd: list[str] = p.split_args( 

685 f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o {props} {p.recursive_flag}", src.root_dataset 

686 ) 

687 for line in (try_ssh_command(self, src, LOG_DEBUG, cmd=cmd) or "").splitlines(): 

688 cols: list[str] = line.split("\t") 

689 snapshots_changed, volblocksize, recordsize, src_dataset = cols if is_caching else ["-"] + cols 

690 self.src_properties[src_dataset] = DatasetProperties( 

691 recordsize=int(recordsize) if recordsize != "-" else -int(volblocksize), 

692 snapshots_changed=int(snapshots_changed) if snapshots_changed and snapshots_changed != "-" else 0, 

693 ) 

694 basis_src_datasets.append(src_dataset) 

695 assert (not self.is_test_mode) or basis_src_datasets == sorted(basis_src_datasets), "List is not sorted" 

696 return basis_src_datasets 

697 

698 def list_dst_datasets_task(self) -> list[str]: 

699 """Lists datasets on the destination host.""" 

700 p, log = self.params, self.params.log 

701 dst = p.dst 

702 is_caching: bool = is_caching_snapshots(p, dst) and p.monitor_snapshots_config.enable_monitor_snapshots 

703 props: str = "name" 

704 props = "snapshots_changed," + props if is_caching else props 

705 cmd: list[str] = p.split_args( 

706 f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o {props} {p.recursive_flag}", dst.root_dataset 

707 ) 

708 basis_dst_datasets: list[str] = [] 

709 basis_dst_datasets_str: str | None = try_ssh_command(self, dst, LOG_TRACE, cmd=cmd) 

710 if basis_dst_datasets_str is None: 

711 log.warning("Destination dataset does not exist: %s", dst.root_dataset) 

712 else: 

713 for line in basis_dst_datasets_str.splitlines(): 

714 cols: list[str] = line.split("\t") 

715 snapshots_changed, dst_dataset = cols if is_caching else ["-"] + cols 

716 self.dst_properties[dst_dataset] = DatasetProperties( 

717 recordsize=0, 

718 snapshots_changed=int(snapshots_changed) if snapshots_changed and snapshots_changed != "-" else 0, 

719 ) 

720 basis_dst_datasets.append(dst_dataset) 

721 assert (not self.is_test_mode) or basis_dst_datasets == sorted(basis_dst_datasets), "List is not sorted" 

722 return basis_dst_datasets 

723 

724 def create_src_snapshots_task(self, basis_src_datasets: list[str], src_datasets: list[str]) -> None: 

725 """Atomically creates a new snapshot of the src datasets selected by --{include|exclude}-dataset* policy; implements 

726 --create-src-snapshots. 

727 

728 The implementation attempts to fit as many datasets as possible into a single (atomic) 'zfs snapshot' command line, 

729 using lexicographical sort order, and using 'zfs snapshot -r' to the extent that this is compatible with the 

730 --{include|exclude}-dataset* pruning policy. The snapshots of all datasets that fit within the same single 'zfs 

731 snapshot' CLI invocation will be taken within the same ZFS transaction group, and correspondingly have identical 

732 'createtxg' ZFS property (but not necessarily identical 'creation' ZFS time property as ZFS actually provides no such 

733 guarantee), and thus be consistent. Dataset names that can't fit into a single command line are spread over multiple 

734 command line invocations, respecting the limits that the operating system places on the maximum length of a single 

735 command line, per `getconf ARG_MAX`. 

736 """ 

737 p, log = self.params, self.params.log 

738 src = p.src 

739 if len(basis_src_datasets) == 0: 

740 die(f"Source dataset does not exist: {src.basis_root_dataset}") 

741 datasets_to_snapshot: dict[SnapshotLabel, list[str]] = self.find_datasets_to_snapshot(src_datasets) 

742 datasets_to_snapshot = {label: datasets for label, datasets in datasets_to_snapshot.items() if len(datasets) > 0} 

743 basis_datasets_to_snapshot: dict[SnapshotLabel, list[str]] = datasets_to_snapshot.copy() # shallow copy 

744 commands: dict[SnapshotLabel, list[str]] = {} 

745 for label, datasets in datasets_to_snapshot.items(): 

746 cmd: list[str] = p.split_args(f"{src.sudo} {p.zfs_program} snapshot") 

747 if p.recursive: 

748 # Run 'zfs snapshot -r' on the roots of subtrees if possible, else fallback to non-recursive CLI flavor 

749 root_datasets = self.root_datasets_if_recursive_zfs_snapshot_is_possible(datasets, basis_src_datasets) 

750 if root_datasets is not None: 

751 cmd.append("-r") # recursive; takes a snapshot of all datasets in the subtree(s) 

752 datasets_to_snapshot[label] = root_datasets 

753 commands[label] = cmd 

754 creation_msg = f"Creating {sum(len(datasets) for datasets in basis_datasets_to_snapshot.values())} snapshots" 

755 log.info(p.dry("--create-src-snapshots: %s"), f"{creation_msg} within {len(src_datasets)} datasets ...") 

756 # create snapshots in large (parallel) batches, without using a command line that's too big for the OS to handle 

757 run_ssh_cmd_parallel( 

758 self, 

759 src, 

760 [(commands[lbl], [f"{ds}@{lbl}" for ds in datasets]) for lbl, datasets in datasets_to_snapshot.items()], 

761 fn=lambda cmd, batch: run_ssh_command(self, src, is_dry=p.dry_run, print_stdout=True, cmd=cmd + batch), 

762 max_batch_items=1 if is_solaris_zfs(p, src) else 2**29, # solaris CLI doesn't accept multiple datasets 

763 ) 

764 # perf: copy lastmodified time of source dataset into local cache to reduce future 'zfs list -t snapshot' calls 

765 self.cache.update_last_modified_cache(basis_datasets_to_snapshot) 

766 

767 def delete_destination_snapshots_task( 

768 self, basis_src_datasets: list[str], dst_datasets: list[str], max_workers: int, task_description: str 

769 ) -> bool: 

770 """Deletes existing destination snapshots that do not exist within the source dataset if they are included by the 

771 --{include|exclude}-snapshot-* policy, and the destination dataset is included via --{include|exclude}-dataset* 

772 policy; implements --delete-dst-snapshots.""" 

773 p, log = self.params, self.params.log 

774 src, dst = p.src, p.dst 

775 kind: str = "bookmark" if p.delete_dst_bookmarks else "snapshot" 

776 filter_needs_creation_time: bool = has_timerange_filter(p.snapshot_filters) 

777 props: str = self.creation_prefix + "creation,guid,name" if filter_needs_creation_time else "guid,name" 

778 basis_src_datasets_set: set[str] = set(basis_src_datasets) 

779 num_snapshots_found, num_snapshots_deleted = 0, 0 

780 

781 def delete_destination_snapshots(dst_dataset: str, tid: str, retry: Retry) -> bool: # thread-safe 

782 src_dataset: str = replace_prefix(dst_dataset, old_prefix=dst.root_dataset, new_prefix=src.root_dataset) 

783 if src_dataset in basis_src_datasets_set and (are_bookmarks_enabled(p, src) or not p.delete_dst_bookmarks): 

784 src_kind: str = kind 

785 if not p.delete_dst_snapshots_no_crosscheck: 

786 src_kind = "snapshot,bookmark" if are_bookmarks_enabled(p, src) else "snapshot" 

787 src_cmd = p.split_args(f"{p.zfs_program} list -t {src_kind} -d 1 -s name -Hp -o guid", src_dataset) 

788 else: 

789 src_cmd = None 

790 dst_cmd = p.split_args(f"{p.zfs_program} list -t {kind} -d 1 -s createtxg -Hp -o {props}", dst_dataset) 

791 self.maybe_inject_delete(dst, dataset=dst_dataset, delete_trigger="zfs_list_delete_dst_snapshots") 

792 src_snaps_with_guids, dst_snaps_with_guids_str = run_in_parallel( # list src+dst snapshots in parallel 

793 lambda: set(run_ssh_command(self, src, LOG_TRACE, cmd=src_cmd).splitlines() if src_cmd else []), 

794 lambda: try_ssh_command(self, dst, LOG_TRACE, cmd=dst_cmd), 

795 ) 

796 if dst_snaps_with_guids_str is None: 

797 log.warning("Third party deleted destination: %s", dst_dataset) 

798 return False 

799 dst_snaps_with_guids: list[str] = dst_snaps_with_guids_str.splitlines() 

800 num_dst_snaps_with_guids = len(dst_snaps_with_guids) 

801 basis_dst_snaps_with_guids: list[str] = dst_snaps_with_guids.copy() 

802 if p.delete_dst_bookmarks: 

803 replace_in_lines(dst_snaps_with_guids, old="#", new="@", count=1) # treat bookmarks as snapshots 

804 # The check against the source dataset happens *after* filtering the dst snapshots with filter_snapshots(). 

805 # `p.delete_dst_snapshots_except` means the user wants to specify snapshots to *retain* aka *keep* 

806 all_except: bool = p.delete_dst_snapshots_except 

807 if p.delete_dst_snapshots_except and not is_dummy(src): 

808 # However, as here we are in "except" mode AND the source is NOT a dummy, we first filter to get what 

809 # the policy says to *keep* (so all_except=False for the filter_snapshots() call), then from that "keep" 

810 # list, we later further refine by checking what's on the source dataset. 

811 all_except = False 

812 dst_snaps_with_guids = filter_snapshots(self, dst_snaps_with_guids, all_except=all_except) 

813 if p.delete_dst_bookmarks: 

814 replace_in_lines(dst_snaps_with_guids, old="@", new="#", count=1) # restore pre-filtering bookmark state 

815 if filter_needs_creation_time: 

816 dst_snaps_with_guids = cut(field=2, lines=dst_snaps_with_guids) 

817 basis_dst_snaps_with_guids = cut(field=2, lines=basis_dst_snaps_with_guids) 

818 if p.delete_dst_snapshots_except and not is_dummy(src): # Non-dummy Source + "Except" (Keep) Mode 

819 # Retain dst snapshots that match snapshot filter policy AND are on src dataset, aka 

820 # Delete dst snapshots except snapshots that match snapshot filter policy AND are on src dataset. 

821 # Concretely, `dst_snaps_with_guids` contains GUIDs of DST snapshots that the filter policy says to KEEP. 

822 # We only actually keep them if they are ALSO on the SRC. 

823 # So, snapshots to DELETE (`dst_tags_to_delete`) are ALL snapshots on DST (`basis_dst_snaps_with_guids`) 

824 # EXCEPT those whose GUIDs are in `dst_snaps_with_guids` AND ALSO in `src_snaps_with_guids`. 

825 except_dst_guids: set[str] = set(cut(field=1, lines=dst_snaps_with_guids)).intersection(src_snaps_with_guids) 

826 dst_tags_to_delete: list[str] = filter_lines_except(basis_dst_snaps_with_guids, except_dst_guids) 

827 else: # Standard Delete Mode OR Dummy Source + "Except" (Keep) Mode 

828 # In standard delete mode: 

829 # `dst_snaps_with_guids` contains GUIDs of policy-selected snapshots on DST. 

830 # We delete those that are NOT on SRC. 

831 # `dst_tags_to_delete` = `dst_snaps_with_guids` - `src_snaps_with_guids`. 

832 # In dummy source + "except" (keep) mode: 

833 # `all_except` was True. 

834 # `dst_snaps_with_guids` contains snaps NOT matching the "keep" policy -- these are the ones to delete. 

835 # `src_snaps_with_guids` is empty. 

836 # `dst_tags_to_delete` = `dst_snaps_with_guids` - {} = `dst_snaps_with_guids`. 

837 dst_guids_to_delete = set(cut(field=1, lines=dst_snaps_with_guids)).difference(src_snaps_with_guids) 

838 dst_tags_to_delete = filter_lines(dst_snaps_with_guids, dst_guids_to_delete) 

839 separator: str = "#" if p.delete_dst_bookmarks else "@" 

840 dst_tags_to_delete = cut(field=2, separator=separator, lines=dst_tags_to_delete) 

841 if p.delete_dst_bookmarks: 

842 delete_bookmarks(self, dst, dst_dataset, snapshot_tags=dst_tags_to_delete) 

843 else: 

844 delete_snapshots(self, dst, dst_dataset, snapshot_tags=dst_tags_to_delete) 

845 with self.stats_lock: 

846 nonlocal num_snapshots_found 

847 num_snapshots_found += num_dst_snaps_with_guids 

848 nonlocal num_snapshots_deleted 

849 num_snapshots_deleted += len(dst_tags_to_delete) 

850 if len(dst_tags_to_delete) > 0 and not p.delete_dst_bookmarks: 

851 self.dst_properties[dst_dataset].snapshots_changed = 0 # invalidate cache 

852 return True 

853 

854 # Run delete_destination_snapshots(dataset) for each dataset, while handling errors, retries + parallel exec 

855 failed: bool = False 

856 if are_bookmarks_enabled(p, dst) or not p.delete_dst_bookmarks: 

857 start_time_nanos = time.monotonic_ns() 

858 failed = process_datasets_in_parallel_and_fault_tolerant( 

859 log=log, 

860 datasets=dst_datasets, 

861 process_dataset=delete_destination_snapshots, # lambda 

862 skip_tree_on_error=lambda dataset: False, 

863 skip_on_error=p.skip_on_error, 

864 max_workers=max_workers, 

865 enable_barriers=False, 

866 task_name="--delete-dst-snapshots", 

867 append_exception=self.append_exception, 

868 retry_policy=p.retry_policy, 

869 dry_run=p.dry_run, 

870 is_test_mode=self.is_test_mode, 

871 ) 

872 elapsed_nanos = time.monotonic_ns() - start_time_nanos 

873 log.info( 

874 p.dry("--delete-dst-snapshots: %s"), 

875 task_description + f" [Deleted {num_snapshots_deleted} out of {num_snapshots_found} {kind}s " 

876 f"within {len(dst_datasets)} datasets; took {human_readable_duration(elapsed_nanos)}]", 

877 ) 

878 return failed 

879 

880 def delete_dst_datasets_task( 

881 self, basis_src_datasets: list[str], basis_dst_datasets: list[str], sorted_dst_datasets: list[str] 

882 ) -> tuple[list[str], list[str]]: 

883 """Deletes existing destination datasets that do not exist within the source dataset if they are included via 

884 --{include|exclude}-dataset* policy; implements --delete-dst-datasets. 

885 

886 Do not recurse without --recursive. With --recursive, never delete non-selected dataset subtrees or their ancestors. 

887 """ 

888 p = self.params 

889 src, dst = p.src, p.dst 

890 children: dict[str, set[str]] = defaultdict(set) 

891 for dst_dataset in basis_dst_datasets: # Compute the direct children of each NON-FILTERED dataset 

892 parent: str = os.path.dirname(dst_dataset) 

893 children[parent].add(dst_dataset) 

894 to_delete: set[str] = set() 

895 for dst_dataset in reversed(sorted_dst_datasets): 

896 if children[dst_dataset].issubset(to_delete): 

897 to_delete.add(dst_dataset) # all children are deletable, thus the dataset itself is deletable too 

898 to_delete = to_delete.difference( 

899 {replace_prefix(src_dataset, src.root_dataset, dst.root_dataset) for src_dataset in basis_src_datasets} 

900 ) 

901 delete_datasets(self, dst, to_delete) 

902 sorted_dst_datasets = sorted(set(sorted_dst_datasets).difference(to_delete)) 

903 basis_dst_datasets = sorted(set(basis_dst_datasets).difference(to_delete)) 

904 return basis_dst_datasets, sorted_dst_datasets 

905 

906 def delete_empty_dst_datasets_task(self, basis_dst_datasets: list[str], sorted_dst_datasets: list[str]) -> list[str]: 

907 """Deletes any existing destination dataset that has no snapshot and no bookmark if all descendants of that dataset 

908 do not have a snapshot or bookmark either; implements --delete-empty-dst-datasets. 

909 

910 To do so, we walk the dataset list (conceptually, a tree) depth-first (i.e. sorted descending). If a dst dataset has 

911 zero snapshots and zero bookmarks and all its children are already marked as orphans, then it is itself an orphan, 

912 and we mark it as such. Walking in a reverse sorted way means that we efficiently check for zero snapshots/bookmarks 

913 not just over the direct children but the entire tree. Finally, delete all orphan datasets in an efficient batched 

914 way. 

915 """ 

916 p = self.params 

917 dst = p.dst 

918 delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots: bool = ( 

919 p.delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots and are_bookmarks_enabled(p, dst) 

920 ) 

921 

922 # Compute the direct children of each NON-FILTERED dataset. Thus, no non-selected dataset and no ancestor of a 

923 # non-selected dataset will ever be added to the "orphan" set. In other words, this treats non-selected dataset 

924 # subtrees as if they all had snapshots, so non-selected dataset subtrees and their ancestors are guaranteed 

925 # to not get deleted. 

926 children: dict[str, set[str]] = defaultdict(set) 

927 for dst_dataset in basis_dst_datasets: 

928 parent: str = os.path.dirname(dst_dataset) 

929 children[parent].add(dst_dataset) 

930 

931 # Find and mark orphan datasets, finally delete them in an efficient way. Using two filter runs instead of one 

932 # filter run is an optimization. The first run only computes candidate orphans, without incurring I/O, to reduce 

933 # the list of datasets for which we list snapshots via 'zfs list -t snapshot ...' from dst_datasets to a subset 

934 # of dst_datasets, which in turn reduces I/O and improves perf. Essentially, this eliminates the I/O to list 

935 # snapshots for ancestors of excluded datasets. The second run computes the real orphans. 

936 btype: str = "bookmark,snapshot" if delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots else "snapshot" 

937 dst_datasets_having_snapshots: set[str] = set() 

938 for run in range(2): 

939 orphans: set[str] = set() 

940 for dst_dataset in reversed(sorted_dst_datasets): 

941 if children[dst_dataset].issubset(orphans): 

942 # all children turned out to be orphans, thus the dataset itself could be an orphan 

943 if dst_dataset not in dst_datasets_having_snapshots: # always True during first filter run 

944 orphans.add(dst_dataset) 

945 if run == 0: 

946 # find datasets with >= 1 snapshot; update dst_datasets_having_snapshots for real use in the 2nd run 

947 cmd: list[str] = p.split_args(f"{p.zfs_program} list -t {btype} -d 1 -S name -Hp -o name") 

948 for datasets_having_snapshots_lst in zfs_list_snapshots_in_parallel( 

949 self, dst, cmd, sorted(orphans), ordered=False 

950 ): 

951 if delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots: 

952 replace_in_lines(datasets_having_snapshots_lst, old="#", new="@", count=1) # treat bookmarks as snap 

953 datasets_having_snapshots = set(cut(field=1, separator="@", lines=datasets_having_snapshots_lst)) 

954 dst_datasets_having_snapshots.update(datasets_having_snapshots) # union 

955 else: 

956 delete_datasets(self, dst, orphans) 

957 sorted_dst_datasets = sorted(set(sorted_dst_datasets).difference(orphans)) 

958 return sorted_dst_datasets 

959 

960 def monitor_snapshots_task( 

961 self, sorted_src_datasets: list[str], sorted_dst_datasets: list[str], task_description: str 

962 ) -> None: 

963 """Monitors src and dst snapshots; implements --monitor-snapshots.""" 

964 p, log = self.params, self.params.log 

965 src, dst = p.src, p.dst 

966 num_cache_hits: int = self.num_cache_hits 

967 num_cache_misses: int = self.num_cache_misses 

968 start_time_nanos: int = time.monotonic_ns() 

969 run_in_parallel( 

970 lambda: self.monitor_snapshots(dst, sorted_dst_datasets), 

971 lambda: self.monitor_snapshots(src, sorted_src_datasets), 

972 ) 

973 elapsed: str = human_readable_duration(time.monotonic_ns() - start_time_nanos) 

974 if num_cache_hits != self.num_cache_hits or num_cache_misses != self.num_cache_misses: 

975 total: int = self.num_cache_hits + self.num_cache_misses 

976 msg = f", cache hits: {percent(self.num_cache_hits, total)}, misses: {percent(self.num_cache_misses, total)}" 

977 else: 

978 msg = "" 

979 log.info( 

980 "--monitor-snapshots done: %s", 

981 f"{task_description} [{len(sorted_src_datasets) + len(sorted_dst_datasets)} datasets; took {elapsed}{msg}]", 

982 ) 

983 

984 def monitor_snapshots(self, remote: Remote, sorted_datasets: list[str]) -> None: 

985 """Checks snapshot freshness and warns or errors out when limits are exceeded. 

986 

987 Alerts the user if the ZFS 'creation' time property of the latest or oldest snapshot for any specified snapshot name 

988 pattern within the selected datasets is too old wrt. the specified age limit. The purpose is to check if snapshots 

989 are successfully taken on schedule, successfully replicated on schedule, and successfully pruned on schedule. Process 

990 exit code is 0, 1, 2 on OK, WARNING, CRITICAL, respectively. 

991 """ 

992 p, log = self.params, self.params.log 

993 alerts: list[MonitorSnapshotAlert] = p.monitor_snapshots_config.alerts 

994 labels: list[SnapshotLabel] = [alert.label for alert in alerts] 

995 current_unixtime_millis: float = p.create_src_snapshots_config.current_datetime.timestamp() * 1000 

996 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

997 if is_caching_snapshots(p, remote): 

998 props: dict[str, DatasetProperties] = self.dst_properties if remote is p.dst else self.src_properties 

999 snapshots_changed_dict: dict[str, int] = {dataset: vals.snapshots_changed for dataset, vals in props.items()} 

1000 hash_code: str = hashlib.sha256(str(tuple(alerts)).encode("utf-8")).hexdigest() 

1001 is_caching: bool = False 

1002 

1003 def monitor_last_modified_cache_file(r: Remote, dataset: str, label: SnapshotLabel, alert_cfg: AlertConfig) -> str: 

1004 cache_label = SnapshotLabel(os_path_join("===", alert_cfg.kind, str(label), hash_code), "", "", "") 

1005 return self.cache.last_modified_cache_file(r, dataset, cache_label) 

1006 

1007 def alert_msg( 

1008 kind: str, dataset: str, snapshot: str, label: SnapshotLabel, snapshot_age_millis: float, delta_millis: int 

1009 ) -> str: 

1010 assert kind == "Latest" or kind == "Oldest" 

1011 lbl = f"{label.prefix}{label.infix}<timestamp>{label.suffix}" 

1012 if snapshot_age_millis >= current_unixtime_millis: 

1013 return f"No snapshot exists for {dataset}@{lbl}" 

1014 msg = f"{kind} snapshot for {dataset}@{lbl} is {human_readable_duration(snapshot_age_millis, unit='ms')} old" 

1015 s = f" ({snapshot})" if snapshot else "" 

1016 if delta_millis == -1: 

1017 return f"{msg}{s}" 

1018 return f"{msg} but should be at most {human_readable_duration(delta_millis, unit='ms')} old{s}" 

1019 

1020 def check_alert( 

1021 label: SnapshotLabel, 

1022 alert_cfg: AlertConfig | None, 

1023 creation_unixtime_secs: int, 

1024 dataset: str, 

1025 snapshot: str, 

1026 ) -> None: 

1027 if alert_cfg is None: 

1028 return 

1029 if is_caching and not p.dry_run: # update cache with latest state from 'zfs list -t snapshot' 

1030 snapshots_changed: int = snapshots_changed_dict.get(dataset, 0) 

1031 cache_file: str = monitor_last_modified_cache_file(remote, dataset, label, alert_cfg) 

1032 set_last_modification_time_safe(cache_file, unixtime_in_secs=(creation_unixtime_secs, snapshots_changed)) 

1033 warning_millis: int = alert_cfg.warning_millis 

1034 critical_millis: int = alert_cfg.critical_millis 

1035 alert_kind = alert_cfg.kind 

1036 snapshot_age_millis: float = current_unixtime_millis - creation_unixtime_secs * 1000 

1037 m = "--monitor_snapshots: " 

1038 if snapshot_age_millis > critical_millis: 

1039 msg = m + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, critical_millis) 

1040 log.critical("%s", msg) 

1041 if not p.monitor_snapshots_config.dont_crit: 

1042 die(msg, exit_code=CRITICAL_STATUS) 

1043 elif snapshot_age_millis > warning_millis: 

1044 msg = m + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, warning_millis) 

1045 log.warning("%s", msg) 

1046 if not p.monitor_snapshots_config.dont_warn: 

1047 die(msg, exit_code=WARNING_STATUS) 

1048 elif is_debug: 

1049 msg = m + "OK. " + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, delta_millis=-1) 

1050 log.debug("%s", msg) 

1051 

1052 def alert_latest_snapshot(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None: 

1053 alert: MonitorSnapshotAlert = alerts[i] 

1054 check_alert(alert.label, alert.latest, creation_unixtime_secs, dataset, snapshot) 

1055 

1056 def alert_oldest_snapshot(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None: 

1057 alert: MonitorSnapshotAlert = alerts[i] 

1058 check_alert(alert.label, alert.oldest, creation_unixtime_secs, dataset, snapshot) 

1059 

1060 def find_stale_datasets_and_check_alerts() -> list[str]: 

1061 """If the cache is enabled, check which datasets have changed to determine which datasets can be skipped cheaply, 

1062 that is, without incurring 'zfs list -t snapshots'. 

1063 

1064 This is done by comparing the "snapshots_changed" ZFS dataset property with the local cache. See 

1065 https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed 

1066 """ 

1067 stale_datasets: list[str] = [] 

1068 time_threshold: float = time.time() - TIME_THRESHOLD_SECS 

1069 for dataset in sorted_datasets: 

1070 is_stale_dataset: bool = False 

1071 snapshots_changed: int = snapshots_changed_dict.get(dataset, 0) 

1072 for alert in alerts: 

1073 for cfg in (alert.latest, alert.oldest): 

1074 if cfg is None: 

1075 continue 

1076 if ( 

1077 snapshots_changed != 0 

1078 and snapshots_changed < time_threshold 

1079 and ( 

1080 cached_unix_times := self.cache.get_snapshots_changed2( 

1081 monitor_last_modified_cache_file(remote, dataset, alert.label, cfg) 

1082 ) 

1083 ) 

1084 and snapshots_changed == cached_unix_times[1] # cached snapshots_changed aka last modified time 

1085 and snapshots_changed >= cached_unix_times[0] # creation time of minmax snapshot aka access time 

1086 ): # cached state is still valid; emit an alert if the latest/oldest snapshot is too old 

1087 lbl = alert.label 

1088 check_alert(lbl, cfg, creation_unixtime_secs=cached_unix_times[0], dataset=dataset, snapshot="") 

1089 else: # cached state is nomore valid; fallback to 'zfs list -t snapshot' 

1090 is_stale_dataset = True 

1091 if is_stale_dataset: 

1092 stale_datasets.append(dataset) 

1093 return stale_datasets 

1094 

1095 # satisfy request from local cache as much as possible 

1096 if is_caching_snapshots(p, remote): 

1097 stale_datasets: list[str] = find_stale_datasets_and_check_alerts() 

1098 with self.stats_lock: 

1099 self.num_cache_misses += len(stale_datasets) 

1100 self.num_cache_hits += len(sorted_datasets) - len(stale_datasets) 

1101 else: 

1102 stale_datasets = sorted_datasets 

1103 

1104 # fallback to 'zfs list -t snapshot' for any remaining datasets, as these couldn't be satisfied from local cache 

1105 is_caching = is_caching_snapshots(p, remote) 

1106 datasets_without_snapshots: list[str] = self.handle_minmax_snapshots( 

1107 remote, stale_datasets, labels, fn_latest=alert_latest_snapshot, fn_oldest=alert_oldest_snapshot 

1108 ) 

1109 for dataset in datasets_without_snapshots: 

1110 for i in range(len(alerts)): 

1111 alert_latest_snapshot(i, creation_unixtime_secs=0, dataset=dataset, snapshot="") 

1112 alert_oldest_snapshot(i, creation_unixtime_secs=0, dataset=dataset, snapshot="") 

1113 

1114 def replicate_datasets(self, src_datasets: list[str], task_description: str, max_workers: int) -> bool: 

1115 """Replicates a list of datasets.""" 

1116 assert (not self.is_test_mode) or src_datasets == sorted(src_datasets), "List is not sorted" 

1117 p, log = self.params, self.params.log 

1118 src, dst = p.src, p.dst 

1119 self.num_snapshots_found = 0 

1120 self.num_snapshots_replicated = 0 

1121 # perf/latency: no need to set up a dedicated TCP connection if no parallel replication is possible 

1122 self.dedicated_tcp_connection_per_zfs_send = ( 

1123 p.dedicated_tcp_connection_per_zfs_send 

1124 and max_workers > 1 

1125 and has_siblings(src_datasets) # siblings can be replicated in parallel 

1126 ) 

1127 log.info("Starting replication task: %s", task_description + f" [{len(src_datasets)} datasets]") 

1128 start_time_nanos: int = time.monotonic_ns() 

1129 

1130 def src2dst(src_dataset: str) -> str: 

1131 return replace_prefix(src_dataset, old_prefix=src.root_dataset, new_prefix=dst.root_dataset) 

1132 

1133 def dst2src(dst_dataset: str) -> str: 

1134 return replace_prefix(dst_dataset, old_prefix=dst.root_dataset, new_prefix=src.root_dataset) 

1135 

1136 def find_stale_datasets() -> tuple[list[str], dict[str, str]]: 

1137 """If the cache is enabled on replication, check which src datasets or dst datasets have changed to determine 

1138 which datasets can be skipped cheaply, i.e. without incurring 'zfs list -t snapshots'. 

1139 

1140 This is done by comparing the "snapshots_changed" ZFS dataset property with the local cache. See 

1141 https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed 

1142 """ 

1143 # First, check which src datasets have changed since the last replication to that destination 

1144 cache_files: dict[str, str] = {} 

1145 stale_src_datasets1: list[str] = [] 

1146 maybe_stale_dst_datasets: list[str] = [] 

1147 userhost_dir: str = p.dst.ssh_user_host # cache is only valid for identical destination username+host 

1148 userhost_dir = userhost_dir if userhost_dir else "-" 

1149 hash_key = tuple(tuple(f) for f in p.snapshot_filters) # cache is only valid for same --include/excl-snapshot* 

1150 hash_code: str = hashlib.sha256(str(hash_key).encode("utf-8")).hexdigest() 

1151 for src_dataset in src_datasets: 

1152 dst_dataset: str = src2dst(src_dataset) # cache is only valid for identical destination dataset 

1153 cache_label = SnapshotLabel(os.path.join("==", userhost_dir, dst_dataset, hash_code), "", "", "") 

1154 cache_file: str = self.cache.last_modified_cache_file(src, src_dataset, cache_label) 

1155 cache_files[src_dataset] = cache_file 

1156 snapshots_changed: int = self.src_properties[src_dataset].snapshots_changed # get prop "for free" 

1157 if ( 

1158 snapshots_changed != 0 

1159 and time.time() > snapshots_changed + TIME_THRESHOLD_SECS 

1160 and snapshots_changed == self.cache.get_snapshots_changed(cache_file) 

1161 ): 

1162 maybe_stale_dst_datasets.append(dst_dataset) 

1163 else: 

1164 stale_src_datasets1.append(src_dataset) 

1165 

1166 # For each src dataset that hasn't changed, check if the corresponding dst dataset has changed 

1167 stale_src_datasets2: list[str] = [] 

1168 dst_snapshots_changed_dict: dict[str, int] = self.cache.zfs_get_snapshots_changed(dst, maybe_stale_dst_datasets) 

1169 for dst_dataset in maybe_stale_dst_datasets: 

1170 snapshots_changed = dst_snapshots_changed_dict.get(dst_dataset, 0) 

1171 cache_file = self.cache.last_modified_cache_file(dst, dst_dataset) 

1172 if ( 

1173 snapshots_changed != 0 

1174 and time.time() > snapshots_changed + TIME_THRESHOLD_SECS 

1175 and snapshots_changed == self.cache.get_snapshots_changed(cache_file) 

1176 ): 

1177 log.info("Already up-to-date [cached]: %s", dst_dataset) 

1178 else: 

1179 stale_src_datasets2.append(dst2src(dst_dataset)) 

1180 assert (not self.is_test_mode) or stale_src_datasets1 == sorted(stale_src_datasets1), "List is not sorted" 

1181 assert (not self.is_test_mode) or stale_src_datasets2 == sorted(stale_src_datasets2), "List is not sorted" 

1182 stale_src_datasets = list(heapq.merge(stale_src_datasets1, stale_src_datasets2)) # merge two sorted lists 

1183 assert (not self.is_test_mode) or not has_duplicates(stale_src_datasets), "List contains duplicates" 

1184 return stale_src_datasets, cache_files 

1185 

1186 if is_caching_snapshots(p, src): 

1187 stale_src_datasets, cache_files = find_stale_datasets() 

1188 num_cache_misses = len(stale_src_datasets) 

1189 num_cache_hits = len(src_datasets) - len(stale_src_datasets) 

1190 self.num_cache_misses += num_cache_misses 

1191 self.num_cache_hits += num_cache_hits 

1192 total: int = self.num_cache_hits + self.num_cache_misses 

1193 cmsg = f", cache hits: {percent(self.num_cache_hits, total)}, misses: {percent(self.num_cache_misses, total)}" 

1194 else: 

1195 stale_src_datasets = src_datasets 

1196 cache_files = {} 

1197 cmsg = "" 

1198 

1199 # Run replicate_dataset(dataset) for each dataset, while taking care of errors, retries + parallel execution 

1200 failed: bool = process_datasets_in_parallel_and_fault_tolerant( 

1201 log=log, 

1202 datasets=stale_src_datasets, 

1203 process_dataset=lambda src_dataset, tid, retry: replicate_dataset(self, src_dataset, tid, retry), 

1204 skip_tree_on_error=lambda dataset: not self.dst_dataset_exists[src2dst(dataset)], 

1205 skip_on_error=p.skip_on_error, 

1206 max_workers=max_workers, 

1207 enable_barriers=False, 

1208 task_name="Replication", 

1209 append_exception=self.append_exception, 

1210 retry_policy=p.retry_policy, 

1211 dry_run=p.dry_run, 

1212 is_test_mode=self.is_test_mode, 

1213 ) 

1214 

1215 if is_caching_snapshots(p, src) and not failed: 

1216 # refresh "snapshots_changed" ZFS dataset property from dst 

1217 stale_dst_datasets: list[str] = [src2dst(src_dataset) for src_dataset in stale_src_datasets] 

1218 dst_snapshots_changed_dict: dict[str, int] = self.cache.zfs_get_snapshots_changed(dst, stale_dst_datasets) 

1219 for dst_dataset in stale_dst_datasets: # update local cache 

1220 dst_snapshots_changed: int = dst_snapshots_changed_dict.get(dst_dataset, 0) 

1221 dst_cache_file: str = self.cache.last_modified_cache_file(dst, dst_dataset) 

1222 src_dataset: str = dst2src(dst_dataset) 

1223 src_snapshots_changed: int = self.src_properties[src_dataset].snapshots_changed 

1224 if not p.dry_run: 

1225 set_last_modification_time_safe(cache_files[src_dataset], unixtime_in_secs=src_snapshots_changed) 

1226 set_last_modification_time_safe(dst_cache_file, unixtime_in_secs=dst_snapshots_changed) 

1227 

1228 elapsed_nanos: int = time.monotonic_ns() - start_time_nanos 

1229 log.info( 

1230 p.dry("Replication done: %s"), 

1231 f"{task_description} [Replicated {self.num_snapshots_replicated} out of {self.num_snapshots_found} snapshots" 

1232 f" within {len(src_datasets)} datasets; took {human_readable_duration(elapsed_nanos)}{cmsg}]", 

1233 ) 

1234 return failed 

1235 

1236 def maybe_inject_delete(self, remote: Remote, dataset: str, delete_trigger: str) -> None: 

1237 """For testing only; for unit tests to delete datasets during replication and test correct handling of that.""" 

1238 assert delete_trigger 

1239 counter = self.delete_injection_triggers.get("before") 

1240 if counter and decrement_injection_counter(self, counter, delete_trigger): 

1241 p = self.params 

1242 cmd = p.split_args(f"{remote.sudo} {p.zfs_program} destroy -r", p.force_unmount, p.force_hard, dataset or "") 

1243 run_ssh_command(self, remote, LOG_DEBUG, print_stdout=True, cmd=cmd) 

1244 

1245 def maybe_inject_params(self, error_trigger: str) -> None: 

1246 """For testing only; for unit tests to simulate errors during replication and test correct handling of them.""" 

1247 assert error_trigger 

1248 counter = self.error_injection_triggers.get("before") 

1249 if counter and decrement_injection_counter(self, counter, error_trigger): 

1250 self.inject_params = self.param_injection_triggers[error_trigger] 

1251 elif error_trigger in self.param_injection_triggers: 

1252 self.inject_params = {} 

1253 

1254 @staticmethod 

1255 def recv_option_property_names(recv_opts: list[str]) -> set[str]: 

1256 """Extracts -o and -x property names that are already specified on the command line; This can be used to check for 

1257 dupes because 'zfs receive' does not accept multiple -o or -x options with the same property name.""" 

1258 propnames = set() 

1259 i = 0 

1260 n = len(recv_opts) 

1261 while i < n: 

1262 stripped: str = recv_opts[i].strip() 

1263 if stripped in ("-o", "-x"): 

1264 i += 1 

1265 if i == n or recv_opts[i].strip() in ("-o", "-x"): 

1266 die(f"Missing value for {stripped} option in --zfs-recv-program-opt(s): {' '.join(recv_opts)}") 

1267 if stripped == "-o" and "=" not in recv_opts[i]: 

1268 die(f"Missing value for {stripped} name=value pair in --zfs-recv-program-opt(s): {' '.join(recv_opts)}") 

1269 propname: str = recv_opts[i] if stripped == "-x" else recv_opts[i].split("=", 1)[0] 

1270 validate_property_name(propname, "--zfs-recv-program-opt(s)") 

1271 propnames.add(propname) 

1272 i += 1 

1273 return propnames 

1274 

1275 def root_datasets_if_recursive_zfs_snapshot_is_possible( 

1276 self, datasets: list[str], basis_datasets: list[str] 

1277 ) -> list[str] | None: 

1278 """Returns the root datasets within the (filtered) `datasets` list if no incompatible pruning is detected. A dataset 

1279 within `datasets` is considered a root dataset if it has no parent, i.e. it is not a descendant of any dataset in 

1280 `datasets`. Returns `None` if any (unfiltered) dataset in `basis_dataset` that is a descendant of at least one of the 

1281 root datasets is missing in `datasets`, indicating that --include/exclude-dataset* or the snapshot schedule have 

1282 pruned a dataset in a way that is incompatible with 'zfs snapshot -r' CLI semantics, thus requiring a switch to the 

1283 non-recursive 'zfs snapshot snapshot1 .. snapshot N' CLI flavor. 

1284 

1285 Assumes that set(datasets).issubset(set(basis_datasets)). Also assumes that datasets and basis_datasets are both 

1286 sorted (and thus the output root_datasets list is sorted too), which is why this algorithm is efficient - O(N) time 

1287 complexity. The impl is akin to the merge algorithm of a merge sort, adapted to our specific use case. 

1288 See root_datasets_if_recursive_zfs_snapshot_is_possible_slow_but_correct() in the unit test suite for an alternative 

1289 impl that's easier to grok. 

1290 """ 

1291 datasets_set: set[str] = set(datasets) 

1292 root_datasets: list[str] = self.find_root_datasets(datasets) 

1293 len_root_datasets = len(root_datasets) 

1294 len_basis_datasets = len(basis_datasets) 

1295 i, j = 0, 0 

1296 while i < len_root_datasets and j < len_basis_datasets: # walk and "merge" both sorted lists, in sync 

1297 if basis_datasets[j] < root_datasets[i]: # irrelevant subtree? 

1298 j += 1 # move to the next basis_src_dataset 

1299 elif is_descendant(basis_datasets[j], of_root_dataset=root_datasets[i]): # relevant subtree? 

1300 if basis_datasets[j] not in datasets_set: # was dataset chopped off by schedule or --incl/exclude-dataset*? 

1301 return None # detected filter pruning that is incompatible with 'zfs snapshot -r' 

1302 j += 1 # move to the next basis_src_dataset 

1303 else: 

1304 i += 1 # move to next root dataset; no need to check root_datasets that are nomore (or not yet) reachable 

1305 return root_datasets 

1306 

1307 @staticmethod 

1308 def find_root_datasets(sorted_datasets: list[str]) -> list[str]: 

1309 """Returns the roots of the subtrees in the (sorted) input datasets; The output root dataset list is sorted, too; A 

1310 dataset is a root dataset if it has no parent, i.e. it is not a descendant of any dataset in the input datasets.""" 

1311 root_datasets: list[str] = [] 

1312 skip_dataset: str = DONT_SKIP_DATASET 

1313 for dataset in sorted_datasets: 

1314 if is_descendant(dataset, of_root_dataset=skip_dataset): 

1315 continue 

1316 skip_dataset = dataset 

1317 root_datasets.append(dataset) 

1318 return root_datasets 

1319 

1320 def find_datasets_to_snapshot(self, sorted_datasets: list[str]) -> dict[SnapshotLabel, list[str]]: 

1321 """Given a (sorted) list of source datasets, returns a dict where the key is a snapshot name (aka SnapshotLabel, e.g. 

1322 bzfs_2024-11-06_08:30:05_hourly) and the value is the (sorted) (sub)list of datasets for which a snapshot needs to be 

1323 created with that name, because these datasets are due per the schedule, either because the 'creation' time of their 

1324 most recent snapshot with that name pattern is now too old, or such a snapshot does not even exist. 

1325 

1326 The baseline implementation uses the 'zfs list -t snapshot' CLI to find the most recent snapshots, which is simple 

1327 but doesn't scale well with the number of snapshots, at least if the goal is to take snapshots every second. An 

1328 alternative, much more scalable, implementation queries the standard ZFS "snapshots_changed" dataset property 

1329 (requires zfs >= 2.2.0), in combination with a local cache that stores this property, as well as the creation time of 

1330 the most recent snapshot, for each SnapshotLabel and each dataset. 

1331 """ 

1332 p, log = self.params, self.params.log 

1333 src = p.src 

1334 config: CreateSrcSnapshotConfig = p.create_src_snapshots_config 

1335 datasets_to_snapshot: dict[SnapshotLabel, list[str]] = defaultdict(list) 

1336 is_caching: bool = False 

1337 msgs: list[tuple[datetime, str, SnapshotLabel, str]] = [] 

1338 

1339 def create_snapshot_if_latest_is_too_old( 

1340 datasets_to_snapshot: dict[SnapshotLabel, list[str]], dataset: str, label: SnapshotLabel, creation_unixtime: int 

1341 ) -> None: 

1342 """Schedules creation of a snapshot for the given label if the label's existing latest snapshot is too old.""" 

1343 creation_dt: datetime = datetime.fromtimestamp(creation_unixtime, tz=config.tz) 

1344 log.log(LOG_TRACE, "Latest snapshot creation: %s for %s", creation_dt, label) 

1345 duration_amount, duration_unit = config.suffix_durations[label.suffix] 

1346 next_event_dt: datetime = round_datetime_up_to_duration_multiple( 

1347 creation_dt + timedelta(microseconds=1), duration_amount, duration_unit, config.anchors 

1348 ) 

1349 msg: str = "" 

1350 if config.current_datetime >= next_event_dt: 

1351 datasets_to_snapshot[label].append(dataset) # mark it as scheduled for snapshot creation 

1352 msg = " has passed" 

1353 msgs.append((next_event_dt, dataset, label, msg)) 

1354 if is_caching and not p.dry_run: # update cache with latest state from 'zfs list -t snapshot' 

1355 cache_file: str = self.cache.last_modified_cache_file(src, dataset, label) 

1356 set_last_modification_time_safe(cache_file, unixtime_in_secs=creation_unixtime, if_more_recent=True) 

1357 

1358 labels: list[SnapshotLabel] = [] 

1359 config_labels: list[SnapshotLabel] = config.snapshot_labels() 

1360 for label in config_labels: 

1361 duration_amount_, duration_unit_ = config.suffix_durations[label.suffix] 

1362 if duration_amount_ == 0 or config.create_src_snapshots_even_if_not_due: 

1363 datasets_to_snapshot[label] = sorted_datasets # take snapshot regardless of creation time of existing snaps 

1364 else: 

1365 labels.append(label) 

1366 if len(labels) == 0: 

1367 return datasets_to_snapshot # nothing more TBD 

1368 

1369 # satisfy request from local cache as much as possible 

1370 cached_datasets_to_snapshot: dict[SnapshotLabel, list[str]] = defaultdict(list) 

1371 if is_caching_snapshots(p, src): 

1372 sorted_datasets_todo: list[str] = [] 

1373 for dataset in sorted_datasets: 

1374 cache: SnapshotCache = self.cache 

1375 cached_snapshots_changed: int = cache.get_snapshots_changed(cache.last_modified_cache_file(src, dataset)) 

1376 if cached_snapshots_changed == 0: 

1377 sorted_datasets_todo.append(dataset) # request cannot be answered from cache 

1378 continue 

1379 if cached_snapshots_changed != self.src_properties[dataset].snapshots_changed: # get that prop "for free" 

1380 cache.invalidate_last_modified_cache_dataset(dataset) 

1381 sorted_datasets_todo.append(dataset) # request cannot be answered from cache 

1382 continue 

1383 creation_unixtimes: list[int] = [] 

1384 for label in labels: 

1385 creation_unixtime: int = cache.get_snapshots_changed(cache.last_modified_cache_file(src, dataset, label)) 

1386 if creation_unixtime == 0: 

1387 sorted_datasets_todo.append(dataset) # request cannot be answered from cache 

1388 break 

1389 creation_unixtimes.append(creation_unixtime) 

1390 if len(creation_unixtimes) == len(labels): 

1391 for j, label in enumerate(labels): 

1392 create_snapshot_if_latest_is_too_old( 

1393 cached_datasets_to_snapshot, dataset, label, creation_unixtimes[j] 

1394 ) 

1395 sorted_datasets = sorted_datasets_todo 

1396 

1397 def create_snapshot_fn(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None: 

1398 create_snapshot_if_latest_is_too_old(datasets_to_snapshot, dataset, labels[i], creation_unixtime_secs) 

1399 

1400 def on_finish_dataset(dataset: str) -> None: 

1401 if is_caching and not p.dry_run: 

1402 set_last_modification_time_safe( 

1403 self.cache.last_modified_cache_file(src, dataset), 

1404 unixtime_in_secs=self.src_properties[dataset].snapshots_changed, 

1405 if_more_recent=True, 

1406 ) 

1407 

1408 # fallback to 'zfs list -t snapshot' for any remaining datasets, as these couldn't be satisfied from local cache 

1409 is_caching = is_caching_snapshots(p, src) 

1410 datasets_without_snapshots: list[str] = self.handle_minmax_snapshots( 

1411 src, sorted_datasets, labels, fn_latest=create_snapshot_fn, fn_on_finish_dataset=on_finish_dataset 

1412 ) 

1413 for lbl in labels: # merge (sorted) results from local cache + 'zfs list -t snapshot' into (sorted) combined result 

1414 datasets_to_snapshot[lbl].sort() 

1415 if datasets_without_snapshots or (lbl in cached_datasets_to_snapshot): # +take snaps for snapshot-less datasets 

1416 datasets_to_snapshot[lbl] = list( # inputs to merge() are sorted, and outputs are sorted too 

1417 heapq.merge(datasets_to_snapshot[lbl], cached_datasets_to_snapshot[lbl], datasets_without_snapshots) 

1418 ) 

1419 msgs.sort() 

1420 prefx = "Next scheduled snapshot time: " 

1421 text = "\n".join(f"{prefx}{next_event_dt} for {dataset}@{label}{msg}" for next_event_dt, dataset, label, msg in msgs) 

1422 if len(text) > 0: 

1423 log.info("Next scheduled snapshot times ...\n%s", text) 

1424 # sort to ensure that we take snapshots for dailies before hourlies, and so on 

1425 label_indexes: dict[SnapshotLabel, int] = {label: k for k, label in enumerate(config_labels)} 

1426 datasets_to_snapshot = dict(sorted(datasets_to_snapshot.items(), key=lambda kv: label_indexes[kv[0]])) 

1427 return datasets_to_snapshot 

1428 

1429 def handle_minmax_snapshots( 

1430 self, 

1431 remote: Remote, 

1432 sorted_datasets: list[str], 

1433 labels: list[SnapshotLabel], 

1434 fn_latest: Callable[[int, int, str, str], None], # callback function for latest snapshot 

1435 fn_oldest: Callable[[int, int, str, str], None] | None = None, # callback function for oldest snapshot 

1436 fn_on_finish_dataset: Callable[[str], None] = lambda dataset: None, 

1437 ) -> list[str]: # thread-safe 

1438 """For each dataset in `sorted_datasets`, for each label in `labels`, finds the latest and oldest snapshot, and runs 

1439 the callback functions on them; Ignores the timestamp of the input labels and the timestamp of the snapshot names.""" 

1440 assert (not self.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted" 

1441 p = self.params 

1442 cmd = p.split_args(f"{p.zfs_program} list -t snapshot -d 1 -Hp -o createtxg,creation,name") # sort dataset,createtxg 

1443 datasets_with_snapshots: set[str] = set() 

1444 interner: SortedInterner[str] = SortedInterner(sorted_datasets) # reduces memory footprint 

1445 for lines in zfs_list_snapshots_in_parallel(self, remote, cmd, sorted_datasets, ordered=False): 

1446 # streaming group by dataset name (consumes constant memory only) 

1447 for dataset, group in itertools.groupby(lines, key=lambda line: line[line.rindex("\t") + 1 : line.index("@")]): 

1448 dataset = interner.interned(dataset) 

1449 snapshots = sorted( # fetch all snapshots of current dataset and sort by createtxg,creation,name 

1450 (int(createtxg), int(creation_unixtime_secs), name[name.index("@") + 1 :]) 

1451 for createtxg, creation_unixtime_secs, name in (line.split("\t", 2) for line in group) 

1452 ) 

1453 assert len(snapshots) > 0 

1454 datasets_with_snapshots.add(dataset) 

1455 snapshot_names: list[str] = [snapshot[-1] for snapshot in snapshots] 

1456 year_with_4_digits_regex: re.Pattern[str] = YEAR_WITH_FOUR_DIGITS_REGEX 

1457 fns = ((fn_latest, True),) if fn_oldest is None else ((fn_latest, True), (fn_oldest, False)) 

1458 for i, label in enumerate(labels): 

1459 infix: str = label.infix 

1460 start: str = label.prefix + infix 

1461 end: str = label.suffix 

1462 startlen: int = len(start) 

1463 endlen: int = len(end) 

1464 minlen: int = startlen + endlen if infix else 4 + startlen + endlen # year_with_four_digits_regex 

1465 year_slice: slice = slice(startlen, startlen + 4) # [startlen:startlen+4] # year_with_four_digits_regex 

1466 for fn, is_reverse in fns: 

1467 creation_unixtime_secs: int = 0 # find creation time of latest or oldest snapshot matching the label 

1468 minmax_snapshot: str = "" 

1469 for j, s in enumerate(reversed(snapshot_names) if is_reverse else snapshot_names): 

1470 if ( 

1471 s.endswith(end) 

1472 and s.startswith(start) 

1473 and len(s) >= minlen 

1474 and (infix or year_with_4_digits_regex.fullmatch(s[year_slice])) 

1475 ): 

1476 k: int = len(snapshots) - j - 1 if is_reverse else j 

1477 creation_unixtime_secs = snapshots[k][1] 

1478 minmax_snapshot = s 

1479 break 

1480 fn(i, creation_unixtime_secs, dataset, minmax_snapshot) 

1481 fn_on_finish_dataset(dataset) 

1482 datasets_without_snapshots = [dataset for dataset in sorted_datasets if dataset not in datasets_with_snapshots] 

1483 return datasets_without_snapshots 

1484 

1485 

1486############################################################################# 

1487class DatasetProperties: 

1488 """Properties of a ZFS dataset.""" 

1489 

1490 __slots__ = ("recordsize", "snapshots_changed") # uses more compact memory layout than __dict__ 

1491 

1492 def __init__(self, recordsize: int, snapshots_changed: int) -> None: 

1493 # immutable variables: 

1494 self.recordsize: int = recordsize 

1495 

1496 # mutable variables: 

1497 self.snapshots_changed: int = snapshots_changed 

1498 

1499 

1500############################################################################# 

1501def fix_solaris_raw_mode(lst: list[str]) -> list[str]: 

1502 """Adjusts zfs send options for Solaris compatibility.""" 

1503 lst = ["-w" if opt == "--raw" else opt for opt in lst] 

1504 lst = ["compress" if opt == "--compressed" else opt for opt in lst] 

1505 i = lst.index("-w") if "-w" in lst else -1 

1506 if i >= 0: 

1507 i += 1 

1508 if i == len(lst) or (lst[i] != "none" and lst[i] != "compress" and lst[i] != "crypto"): 

1509 lst.insert(i, "none") 

1510 return lst 

1511 

1512 

1513def has_siblings(sorted_datasets: list[str]) -> bool: 

1514 """Returns whether the (sorted) list of input datasets contains any siblings.""" 

1515 skip_dataset: str = DONT_SKIP_DATASET 

1516 parents: set[str] = set() 

1517 for dataset in sorted_datasets: 

1518 assert dataset 

1519 parent = os.path.dirname(dataset) 

1520 if parent in parents: 

1521 return True # I have a sibling if my parent already has another child 

1522 parents.add(parent) 

1523 if is_descendant(dataset, of_root_dataset=skip_dataset): 

1524 continue 

1525 if skip_dataset != DONT_SKIP_DATASET: 

1526 return True # I have a sibling if I am a root dataset and another root dataset already exists 

1527 skip_dataset = dataset 

1528 return False 

1529 

1530 

1531def parse_dataset_locator( 

1532 input_text: str, validate: bool = True, user: str | None = None, host: str | None = None, port: int | None = None 

1533) -> tuple[str, str, str, str, str]: 

1534 """Splits user@host:pool/dataset into its components with optional checks.""" 

1535 

1536 def convert_ipv6(hostname: str) -> str: # support IPv6 without getting confused by host:dataset colon separator ... 

1537 return hostname.replace("|", ":") # ... and any colons that may be part of a (valid) ZFS dataset name 

1538 

1539 user_undefined: bool = user is None 

1540 if user is None: 

1541 user = "" 

1542 host_undefined: bool = host is None 

1543 if host is None: 

1544 host = "" 

1545 host = convert_ipv6(host) 

1546 user_host, dataset, pool = "", "", "" 

1547 

1548 # Input format is [[user@]host:]dataset 

1549 # 1234 5 6 

1550 if match := re.fullmatch(r"(((([^@]*)@)?([^:]+)):)?(.*)", input_text, re.DOTALL): 1550 ↛ 1567line 1550 didn't jump to line 1567 because the condition on line 1550 was always true

1551 if user_undefined: 

1552 user = match.group(4) or "" 

1553 if host_undefined: 

1554 host = match.group(5) or "" 

1555 host = convert_ipv6(host) 

1556 if host == "-": 

1557 host = "" 

1558 dataset = match.group(6) or "" 

1559 i = dataset.find("/") 

1560 pool = dataset[0:i] if i >= 0 else dataset 

1561 

1562 if user and host: 

1563 user_host = f"{user}@{host}" 

1564 elif host: 

1565 user_host = host 

1566 

1567 if validate: 

1568 validate_user_name(user, input_text) 

1569 validate_host_name(host, input_text) 

1570 if port is not None: 

1571 validate_port(port, f"Invalid port number: '{port}' for: '{input_text}' - ") 

1572 validate_dataset_name(dataset, input_text) 

1573 

1574 return user, host, user_host, pool, dataset 

1575 

1576 

1577def validate_user_name(user: str, input_text: str) -> None: 

1578 """Checks that the username is safe for ssh or local usage.""" 

1579 invalid_chars: str = SHELL_CHARS + "/" 

1580 if user and (".." in user or any(c.isspace() or c in invalid_chars for c in user)): 

1581 die(f"Invalid user name: '{user}' for: '{input_text}'") 

1582 

1583 

1584def validate_host_name(host: str, input_text: str, extra_invalid_chars: str = "") -> None: 

1585 """Checks hostname for forbidden characters or patterns.""" 

1586 invalid_chars: str = SHELL_CHARS + "/" + extra_invalid_chars 

1587 if host and (host.startswith("-") or ".." in host or any(c.isspace() or c in invalid_chars for c in host)): 

1588 die(f"Invalid host name: '{host}' for: '{input_text}'") 

1589 

1590 

1591def validate_port(port: str | int, message: str) -> None: 

1592 """Checks that port specification is a valid integer.""" 

1593 if isinstance(port, int): 

1594 port = str(port) 

1595 if port and not port.isdigit(): 

1596 die(message + f"must be empty or a positive integer: '{port}'") 

1597 

1598 

1599############################################################################# 

1600if __name__ == "__main__": 1600 ↛ 1601line 1600 didn't jump to line 1601 because the condition on line 1600 was never true

1601 main()