Coverage for bzfs_main/bzfs.py: 99%

990 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-07 04:44 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15# Inline script metadata conforming to https://packaging.python.org/specifications/inline-script-metadata 

16# /// script 

17# requires-python = ">=3.9" 

18# dependencies = [] 

19# /// 

20# 

21""" 

22* Main CLI entry point for replicating and managing ZFS snapshots. It handles the low-level mechanics of `zfs send/receive`, 

23 data transfer, and snapshot management between two hosts. 

24* Overview of the bzfs.py codebase: 

25* The codebase starts with docs, definition of input data and associated argument parsing into a "Params" class. 

26* All CLI option/parameter values are reachable from the "Params" class. 

27* Control flow starts in main(), which kicks off a "Job". 

28* A Job runs one or more "tasks" via run_tasks(), each task replicating a separate dataset tree. 

29* The core replication algorithm is in run_task() and especially in replicate_datasets() and replicate_dataset(). 

30* The filter algorithms that apply include/exclude policies are in filter_datasets() and filter_snapshots(). 

31* The --create-src-snapshots-* and --delete-* and --compare-* and --monitor-* algorithms also start in run_task(). 

32* The main retry logic is in run_with_retries() and clear_resumable_recv_state_if_necessary(). 

33* Progress reporting for use during `zfs send/recv` data transfers is in class ProgressReporter. 

34* Executing a CLI command on a local or remote host is in run_ssh_command(). 

35* Network connection management is in refresh_ssh_connection_if_necessary() and class ConnectionPool. 

36* Cache functionality can be found by searching for this regex: .*cach.* 

37* The parallel processing engine is in itr_ssh_cmd_parallel() and process_datasets_in_parallel_and_fault_tolerant(). 

38* README.md is mostly auto-generated from the ArgumentParser help texts as the source of "truth", via update_readme.sh. 

39 Simply run that script whenever you change or add ArgumentParser help text. 

40""" 

41 

42from __future__ import ( 

43 annotations, 

44) 

45import argparse 

46import contextlib 

47import fcntl 

48import heapq 

49import itertools 

50import os 

51import re 

52import subprocess 

53import sys 

54import threading 

55import time 

56from collections import ( 

57 Counter, 

58 defaultdict, 

59) 

60from collections.abc import ( 

61 Collection, 

62) 

63from datetime import ( 

64 datetime, 

65 timedelta, 

66) 

67from logging import ( 

68 Logger, 

69) 

70from pathlib import ( 

71 Path, 

72) 

73from subprocess import ( 

74 CalledProcessError, 

75) 

76from typing import ( 

77 Any, 

78 Callable, 

79 Final, 

80 cast, 

81) 

82 

83import bzfs_main.loggers 

84from bzfs_main.argparse_actions import ( 

85 has_timerange_filter, 

86) 

87from bzfs_main.argparse_cli import ( 

88 EXCLUDE_DATASET_REGEXES_DEFAULT, 

89) 

90from bzfs_main.compare_snapshot_lists import ( 

91 run_compare_snapshot_lists, 

92) 

93from bzfs_main.configuration import ( 

94 AlertConfig, 

95 CreateSrcSnapshotConfig, 

96 LogParams, 

97 MonitorSnapshotAlert, 

98 Params, 

99 Remote, 

100 SnapshotLabel, 

101) 

102from bzfs_main.connection import ( 

103 decrement_injection_counter, 

104 maybe_inject_error, 

105 run_ssh_command, 

106 timeout, 

107 try_ssh_command, 

108) 

109from bzfs_main.detect import ( 

110 DISABLE_PRG, 

111 RemoteConfCacheItem, 

112 are_bookmarks_enabled, 

113 detect_available_programs, 

114 is_caching_snapshots, 

115 is_dummy, 

116 is_zpool_feature_enabled_or_active, 

117) 

118from bzfs_main.filter import ( 

119 SNAPSHOT_REGEX_FILTER_NAME, 

120 dataset_regexes, 

121 filter_datasets, 

122 filter_lines, 

123 filter_lines_except, 

124 filter_snapshots, 

125) 

126from bzfs_main.loggers import ( 

127 get_simple_logger, 

128 reset_logger, 

129 set_logging_runtime_defaults, 

130) 

131from bzfs_main.parallel_batch_cmd import ( 

132 run_ssh_cmd_parallel, 

133 zfs_list_snapshots_in_parallel, 

134) 

135from bzfs_main.parallel_iterator import ( 

136 run_in_parallel, 

137) 

138from bzfs_main.parallel_tasktree_policy import ( 

139 process_datasets_in_parallel_and_fault_tolerant, 

140) 

141from bzfs_main.progress_reporter import ( 

142 ProgressReporter, 

143 count_num_bytes_transferred_by_zfs_send, 

144) 

145from bzfs_main.replication import ( 

146 delete_bookmarks, 

147 delete_datasets, 

148 delete_snapshots, 

149 replicate_dataset, 

150) 

151from bzfs_main.retry import ( 

152 Retry, 

153 RetryableError, 

154) 

155from bzfs_main.snapshot_cache import ( 

156 MONITOR_CACHE_FILE_PREFIX, 

157 REPLICATION_CACHE_FILE_PREFIX, 

158 SnapshotCache, 

159 set_last_modification_time_safe, 

160) 

161from bzfs_main.utils import ( 

162 DESCENDANTS_RE_SUFFIX, 

163 DIE_STATUS, 

164 DONT_SKIP_DATASET, 

165 FILE_PERMISSIONS, 

166 LOG_DEBUG, 

167 LOG_TRACE, 

168 PROG_NAME, 

169 SHELL_CHARS, 

170 UMASK, 

171 YEAR_WITH_FOUR_DIGITS_REGEX, 

172 Interner, 

173 SortedInterner, 

174 Subprocesses, 

175 SynchronizedBool, 

176 SynchronizedDict, 

177 append_if_absent, 

178 compile_regexes, 

179 cut, 

180 die, 

181 has_duplicates, 

182 human_readable_bytes, 

183 human_readable_duration, 

184 is_descendant, 

185 percent, 

186 pretty_print_formatter, 

187 replace_in_lines, 

188 replace_prefix, 

189 sha256_85_urlsafe_base64, 

190 sha256_128_urlsafe_base64, 

191 termination_signal_handler, 

192 validate_dataset_name, 

193 validate_property_name, 

194 xappend, 

195 xfinally, 

196) 

197 

198# constants: 

199__version__: Final[str] = bzfs_main.argparse_cli.__version__ 

200CRITICAL_STATUS: Final[int] = 2 

201WARNING_STATUS: Final[int] = 1 

202STILL_RUNNING_STATUS: Final[int] = 4 

203MIN_PYTHON_VERSION: Final[tuple[int, int]] = (3, 9) 

204if sys.version_info < MIN_PYTHON_VERSION: 

205 print(f"ERROR: {PROG_NAME} requires Python version >= {'.'.join(map(str, MIN_PYTHON_VERSION))}!") 

206 sys.exit(DIE_STATUS) 

207CREATE_SRC_SNAPSHOTS_PREFIX_DFLT: Final[str] = PROG_NAME + "_" 

208CREATE_SRC_SNAPSHOTS_SUFFIX_DFLT: Final[str] = "_adhoc" 

209MATURITY_TIME_THRESHOLD_SECS: Final[float] = 1.1 # 1 sec ZFS creation time resolution + NTP clock skew is typically < 10ms 

210 

211 

212############################################################################# 

213def argument_parser() -> argparse.ArgumentParser: 

214 """Returns the CLI parser used by bzfs.""" 

215 return bzfs_main.argparse_cli.argument_parser() 

216 

217 

218def main() -> None: 

219 """API for command line clients.""" 

220 prev_umask: int = os.umask(UMASK) 

221 try: 

222 set_logging_runtime_defaults() 

223 # On CTRL-C and SIGTERM, send signal to all descendant processes to terminate them 

224 termination_event: threading.Event = threading.Event() 

225 with termination_signal_handler(termination_event=termination_event): 

226 run_main(argument_parser().parse_args(), sys.argv, termination_event=termination_event) 

227 except subprocess.CalledProcessError as e: 

228 ret: int = e.returncode 

229 ret = DIE_STATUS if isinstance(ret, int) and 1 <= ret <= STILL_RUNNING_STATUS else ret 

230 sys.exit(ret) 

231 finally: 

232 os.umask(prev_umask) # restore prior global state 

233 

234 

235def run_main( 

236 args: argparse.Namespace, 

237 sys_argv: list[str] | None = None, 

238 log: Logger | None = None, 

239 termination_event: threading.Event | None = None, 

240) -> None: 

241 """API for Python clients; visible for testing; may become a public API eventually.""" 

242 Job(termination_event=termination_event).run_main(args, sys_argv, log) 

243 

244 

245############################################################################# 

246class Job: 

247 """Executes one bzfs run, coordinating snapshot replication tasks.""" 

248 

249 def __init__(self, termination_event: threading.Event | None = None) -> None: 

250 self.params: Params 

251 self.termination_event: Final[threading.Event] = termination_event or threading.Event() 

252 self.subprocesses: Final[Subprocesses] = Subprocesses() 

253 self.all_dst_dataset_exists: Final[dict[str, dict[str, bool]]] = defaultdict(lambda: defaultdict(bool)) 

254 self.dst_dataset_exists: SynchronizedDict[str, bool] = SynchronizedDict({}) 

255 self.src_properties: dict[str, DatasetProperties] = {} 

256 self.dst_properties: dict[str, DatasetProperties] = {} 

257 self.all_exceptions: list[str] = [] 

258 self.all_exceptions_count: int = 0 

259 self.max_exceptions_to_summarize: int = 10000 

260 self.first_exception: BaseException | None = None 

261 self.remote_conf_cache: dict[tuple, RemoteConfCacheItem] = {} 

262 self.max_datasets_per_minibatch_on_list_snaps: dict[str, int] = {} 

263 self.max_workers: dict[str, int] = {} 

264 self.control_persist_margin_secs: int = 2 

265 self.progress_reporter: ProgressReporter = cast(ProgressReporter, None) 

266 self.is_first_replication_task: Final[SynchronizedBool] = SynchronizedBool(True) 

267 self.replication_start_time_nanos: int = time.monotonic_ns() 

268 self.timeout_nanos: int | None = None 

269 self.cache: SnapshotCache = SnapshotCache(self) 

270 self.stats_lock: Final[threading.Lock] = threading.Lock() 

271 self.num_cache_hits: int = 0 

272 self.num_cache_misses: int = 0 

273 self.num_snapshots_found: int = 0 

274 self.num_snapshots_replicated: int = 0 

275 

276 self.is_test_mode: bool = False # for testing only 

277 self.creation_prefix: str = "" # for testing only 

278 self.isatty: bool | None = None # for testing only 

279 self.use_select: bool = False # for testing only 

280 self.progress_update_intervals: tuple[float, float] | None = None # for testing only 

281 self.error_injection_triggers: dict[str, Counter[str]] = {} # for testing only 

282 self.delete_injection_triggers: dict[str, Counter[str]] = {} # for testing only 

283 self.param_injection_triggers: dict[str, dict[str, bool]] = {} # for testing only 

284 self.inject_params: dict[str, bool] = {} # for testing only 

285 self.injection_lock: threading.Lock = threading.Lock() # for testing only 

286 self.max_command_line_bytes: int | None = None # for testing only 

287 

288 def shutdown(self) -> None: 

289 """Exits any multiplexed ssh sessions that may be leftover.""" 

290 cache_items: Collection[RemoteConfCacheItem] = self.remote_conf_cache.values() 

291 for i, cache_item in enumerate(cache_items): 

292 cache_item.connection_pools.shutdown(f"{i + 1}/{len(cache_items)}") 

293 

294 def terminate(self) -> None: 

295 """Shuts down gracefully; also terminates descendant processes, if any.""" 

296 with xfinally(self.subprocesses.terminate_process_subtrees): 

297 self.shutdown() 

298 

299 def run_main(self, args: argparse.Namespace, sys_argv: list[str] | None = None, log: Logger | None = None) -> None: 

300 """Parses CLI arguments, sets up logging, and executes main job loop.""" 

301 assert isinstance(self.error_injection_triggers, dict) 

302 assert isinstance(self.delete_injection_triggers, dict) 

303 assert isinstance(self.inject_params, dict) 

304 logger_name_suffix: str = "" 

305 

306 def _reset_logger() -> None: 

307 if logger_name_suffix and log is not None: # reset Logger unless it's a Logger outside of our control 307 ↛ exitline 307 didn't return from function '_reset_logger' because the condition on line 307 was always true

308 reset_logger(log) 

309 

310 with xfinally(_reset_logger): # runs _reset_logger() on exit, without masking error raised in body of `with` block 

311 try: 

312 log_params: LogParams = LogParams(args) 

313 logger_name_suffix = "" if log is not None else log_params.logger_name_suffix 

314 log = bzfs_main.loggers.get_logger( 

315 log_params=log_params, args=args, log=log, logger_name_suffix=logger_name_suffix 

316 ) 

317 log.info("%s", f"Log file is: {log_params.log_file}") 

318 except BaseException as e: 

319 simple_log: Logger = get_simple_logger(PROG_NAME, logger_name_suffix=logger_name_suffix) 

320 try: 

321 simple_log.error("Log init: %s", e, exc_info=not isinstance(e, SystemExit)) 

322 finally: 

323 reset_logger(simple_log) 

324 raise 

325 

326 aux_args: list[str] = [] 

327 if getattr(args, "include_snapshot_plan", None): 

328 aux_args += args.include_snapshot_plan 

329 if getattr(args, "delete_dst_snapshots_except_plan", None): 

330 aux_args += args.delete_dst_snapshots_except_plan 

331 if len(aux_args) > 0: 

332 log.info("Auxiliary CLI arguments: %s", " ".join(aux_args)) 

333 args = argument_parser().parse_args(xappend(aux_args, "--", args.root_dataset_pairs), namespace=args) 

334 

335 def log_error_on_exit(error: Any, status_code: Any, exc_info: bool = False) -> None: 

336 log.error("%s%s", f"Exiting {PROG_NAME} with status code {status_code}. Cause: ", error, exc_info=exc_info) 

337 

338 try: 

339 log.info("CLI arguments: %s %s", " ".join(sys_argv or []), f"[uid: {os.getuid()}, euid: {os.geteuid()}]") 

340 if self.is_test_mode: 

341 log.log(LOG_TRACE, "Parsed CLI arguments: %s", args) 

342 self.params = p = Params(args, sys_argv or [], log_params, log, self.inject_params) 

343 log_params.params = p 

344 lock_file: str = p.lock_file_name() 

345 lock_fd = os.open( 

346 lock_file, os.O_WRONLY | os.O_TRUNC | os.O_CREAT | os.O_NOFOLLOW | os.O_CLOEXEC, FILE_PERMISSIONS 

347 ) 

348 with xfinally(lambda: os.close(lock_fd)): 

349 try: 

350 # Acquire an exclusive lock; will raise a BlockingIOError if lock is already held by this process or 

351 # another process. The (advisory) lock is auto-released when the process terminates or the fd is 

352 # closed. 

353 fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # LOCK_NB ... non-blocking 

354 except BlockingIOError: 

355 msg = "Exiting as same previous periodic job is still running without completion yet per " 

356 die(msg + lock_file, STILL_RUNNING_STATUS) 

357 

358 # xfinally: unlink the lock_file while still holding the flock on its fd - it's a correct and safe 

359 # standard POSIX pattern: 

360 # - Performing unlink() before close(fd) avoids a race where a subsequent bzfs process could recreate and 

361 # lock a fresh inode for the same path between our close() and a later unlink(). In that case, a late 

362 # unlink would delete the newer process's lock_file path. 

363 # - At this point, critical work is complete; the remaining steps are shutdown mechanics that have no 

364 # side effect, so this pattern is correct, safe, and simple. 

365 with xfinally(lambda: Path(lock_file).unlink(missing_ok=True)): # don't accumulate stale files 

366 try: 

367 self.run_tasks() # do the real work 

368 except BaseException: 

369 self.terminate() 

370 raise 

371 self.shutdown() 

372 with contextlib.suppress(BrokenPipeError): 

373 sys.stderr.flush() 

374 except subprocess.CalledProcessError as e: 

375 log_error_on_exit(e, e.returncode) 

376 raise 

377 except SystemExit as e: 

378 log_error_on_exit(e, e.code) 

379 raise 

380 except (subprocess.TimeoutExpired, UnicodeDecodeError) as e: 

381 log_error_on_exit(e, DIE_STATUS) 

382 raise SystemExit(DIE_STATUS) from e 

383 except re.error as e: 

384 log_error_on_exit(f"{e} within regex {e.pattern!r}", DIE_STATUS) 

385 raise SystemExit(DIE_STATUS) from e 

386 except BaseException as e: 

387 log_error_on_exit(e, DIE_STATUS, exc_info=True) 

388 raise SystemExit(DIE_STATUS) from e 

389 finally: 

390 log.info("%s", f"Log file was: {log_params.log_file}") 

391 log.info("Success. Goodbye!") 

392 with contextlib.suppress(BrokenPipeError): 

393 sys.stderr.flush() 

394 

395 def run_tasks(self) -> None: 

396 """Executes replication cycles, repeating until daemon lifetime expires.""" 

397 p, log = self.params, self.params.log 

398 self.all_exceptions = [] 

399 self.all_exceptions_count = 0 

400 self.first_exception = None 

401 self.remote_conf_cache = {} 

402 self.isatty = self.isatty if self.isatty is not None else p.isatty 

403 self.validate_once() 

404 self.replication_start_time_nanos = time.monotonic_ns() 

405 self.progress_reporter = ProgressReporter(log, p.pv_program_opts, self.use_select, self.progress_update_intervals) 

406 with xfinally(lambda: self.progress_reporter.stop()): 

407 daemon_stoptime_nanos: int = time.monotonic_ns() + p.daemon_lifetime_nanos 

408 while True: # loop for daemon mode 

409 self.timeout_nanos = None if p.timeout_nanos is None else time.monotonic_ns() + p.timeout_nanos 

410 self.all_dst_dataset_exists.clear() 

411 self.progress_reporter.reset() 

412 src, dst = p.src, p.dst 

413 for src_root_dataset, dst_root_dataset in p.root_dataset_pairs: 

414 if self.termination_event.is_set(): 

415 self.terminate() 

416 break 

417 src.root_dataset = src.basis_root_dataset = src_root_dataset 

418 dst.root_dataset = dst.basis_root_dataset = dst_root_dataset 

419 p.curr_zfs_send_program_opts = p.zfs_send_program_opts.copy() 

420 if p.daemon_lifetime_nanos > 0: 

421 self.timeout_nanos = None if p.timeout_nanos is None else time.monotonic_ns() + p.timeout_nanos 

422 recurs_sep = " " if p.recursive_flag else "" 

423 task_description = f"{src.basis_root_dataset} {p.recursive_flag}{recurs_sep}--> {dst.basis_root_dataset}" 

424 if len(p.root_dataset_pairs) > 1: 

425 log.info("Starting task: %s", task_description + " ...") 

426 try: 

427 try: 

428 maybe_inject_error(self, cmd=[], error_trigger="retryable_run_tasks") 

429 timeout(self) 

430 self.validate_task() 

431 self.run_task() # do the real work 

432 except RetryableError as retryable_error: 

433 cause: BaseException | None = retryable_error.__cause__ 

434 assert cause is not None 

435 raise cause.with_traceback(cause.__traceback__) # noqa: B904 re-raise of cause without chaining 

436 except (CalledProcessError, subprocess.TimeoutExpired, SystemExit, UnicodeDecodeError) as e: 

437 if p.skip_on_error == "fail" or ( 

438 isinstance(e, subprocess.TimeoutExpired) and p.daemon_lifetime_nanos == 0 

439 ): 

440 raise 

441 log.error("%s", e) 

442 self.append_exception(e, "task", task_description) 

443 if not self.sleep_until_next_daemon_iteration(daemon_stoptime_nanos): 

444 break 

445 if not p.skip_replication: 

446 self.print_replication_stats(self.replication_start_time_nanos) 

447 error_count = self.all_exceptions_count 

448 if error_count > 0 and p.daemon_lifetime_nanos == 0: 

449 msgs = "\n".join([f"{i + 1}/{error_count}: {e}" for i, e in enumerate(self.all_exceptions)]) 

450 log.error("%s", f"Tolerated {error_count} errors. Error Summary: \n{msgs}") 

451 assert self.first_exception is not None 

452 raise self.first_exception 

453 

454 def append_exception(self, e: BaseException, task_name: str, task_description: str) -> None: 

455 """Records and logs an exception that was encountered while running a subtask.""" 

456 self.first_exception = self.first_exception or e 

457 if len(self.all_exceptions) < self.max_exceptions_to_summarize: # cap max memory consumption 

458 self.all_exceptions.append(str(e)) 

459 self.all_exceptions_count += 1 

460 self.params.log.error(f"#{self.all_exceptions_count}: Done with %s: %s", task_name, task_description) 

461 

462 def sleep_until_next_daemon_iteration(self, daemon_stoptime_nanos: int) -> bool: 

463 """Pauses until next scheduled snapshot time or daemon stop; Returns True to continue daemon loop; False to stop.""" 

464 sleep_nanos: int = daemon_stoptime_nanos - time.monotonic_ns() 

465 if sleep_nanos <= 0: 

466 return False 

467 self.progress_reporter.pause() 

468 p, log = self.params, self.params.log 

469 config: CreateSrcSnapshotConfig = p.create_src_snapshots_config 

470 curr_datetime: datetime = config.current_datetime + timedelta(microseconds=1) 

471 next_snapshotting_event_dt: datetime = min( 

472 ( 

473 config.anchors.round_datetime_up_to_duration_multiple(curr_datetime, duration_amount, duration_unit) 

474 for duration_amount, duration_unit in config.suffix_durations.values() 

475 ), 

476 default=curr_datetime + timedelta(days=10 * 365), # infinity 

477 ) 

478 offset: timedelta = next_snapshotting_event_dt - datetime.now(config.tz) 

479 offset_nanos: int = (offset.days * 86400 + offset.seconds) * 1_000_000_000 + offset.microseconds * 1_000 

480 sleep_nanos = min(sleep_nanos, max(0, offset_nanos)) 

481 log.info("Daemon sleeping for: %s%s", human_readable_duration(sleep_nanos), f" ... [Log {p.log_params.log_file}]") 

482 self.termination_event.wait(sleep_nanos / 1_000_000_000) # allow early wakeup on async termination 

483 config.current_datetime = datetime.now(config.tz) 

484 return time.monotonic_ns() < daemon_stoptime_nanos and not self.termination_event.is_set() 

485 

486 def print_replication_stats(self, start_time_nanos: int) -> None: 

487 """Logs overall replication statistics after a job cycle completes.""" 

488 p, log = self.params, self.params.log 

489 elapsed_nanos = time.monotonic_ns() - start_time_nanos 

490 msg = p.dry(f"zfs sent {self.num_snapshots_replicated} snapshots in {human_readable_duration(elapsed_nanos)}.") 

491 if p.is_program_available("pv", "local"): 

492 sent_bytes: int = count_num_bytes_transferred_by_zfs_send(p.log_params.pv_log_file) 

493 sent_bytes_per_sec: int = round(1_000_000_000 * sent_bytes / (elapsed_nanos or 1)) 

494 msg += f" zfs sent {human_readable_bytes(sent_bytes)} [{human_readable_bytes(sent_bytes_per_sec)}/s]." 

495 log.info("%s", msg.ljust(p.terminal_columns - len("2024-01-01 23:58:45 [I] "))) 

496 

497 def validate_once(self) -> None: 

498 """Validates CLI parameters and compiles regex lists one time only, which will later be reused many times.""" 

499 p = self.params 

500 p.zfs_recv_ox_names = self.recv_option_property_names(p.zfs_recv_program_opts) 

501 for snapshot_filter in p.snapshot_filters: 

502 for _filter in snapshot_filter: 

503 if _filter.name == SNAPSHOT_REGEX_FILTER_NAME: 

504 exclude_snapshot_regexes = compile_regexes(_filter.options[0]) 

505 include_snapshot_regexes = compile_regexes(_filter.options[1] or [".*"]) 

506 _filter.options = (exclude_snapshot_regexes, include_snapshot_regexes) 

507 

508 exclude_regexes: list[str] = [EXCLUDE_DATASET_REGEXES_DEFAULT] 

509 if len(p.args.exclude_dataset_regex) > 0: # some patterns don't exclude anything 

510 exclude_regexes = [regex for regex in p.args.exclude_dataset_regex if regex != "" and regex != "!.*"] 

511 include_regexes: list[str] = p.args.include_dataset_regex 

512 

513 # relative datasets need not be compiled more than once as they don't change between tasks 

514 def separate_abs_vs_rel_datasets(datasets: list[str]) -> tuple[list[str], list[str]]: 

515 abs_datasets: list[str] = [] 

516 rel_datasets: list[str] = [] 

517 for dataset in datasets: 

518 (abs_datasets if dataset.startswith("/") else rel_datasets).append(dataset) 

519 return abs_datasets, rel_datasets 

520 

521 p.abs_exclude_datasets, rel_exclude_datasets = separate_abs_vs_rel_datasets(p.args.exclude_dataset) 

522 p.abs_include_datasets, rel_include_datasets = separate_abs_vs_rel_datasets(p.args.include_dataset) 

523 suffix = DESCENDANTS_RE_SUFFIX 

524 p.tmp_exclude_dataset_regexes, p.tmp_include_dataset_regexes = ( 

525 compile_regexes(exclude_regexes + dataset_regexes(p.src, p.dst, rel_exclude_datasets), suffix=suffix), 

526 compile_regexes(include_regexes + dataset_regexes(p.src, p.dst, rel_include_datasets), suffix=suffix), 

527 ) 

528 

529 if p.pv_program != DISABLE_PRG: 

530 pv_program_opts_set = set(p.pv_program_opts) 

531 if pv_program_opts_set.isdisjoint({"--bytes", "-b", "--bits", "-8"}): 

532 die("--pv-program-opts must contain one of --bytes or --bits for progress metrics to function.") 

533 if self.isatty and not p.quiet: 

534 for opts in [["--eta", "-e"], ["--fineta", "-I"], ["--average-rate", "-a"]]: 

535 if pv_program_opts_set.isdisjoint(opts): 

536 die(f"--pv-program-opts must contain one of {', '.join(opts)} for progress report line to function.") 

537 

538 src, dst = p.src, p.dst 

539 for remote in [src, dst]: 

540 r, loc = remote, remote.location 

541 validate_user_name(r.basis_ssh_user, f"--ssh-{loc}-user") 

542 validate_host_name(r.basis_ssh_host, f"--ssh-{loc}-host") 

543 validate_port(r.ssh_port, f"--ssh-{loc}-port ") 

544 

545 def validate_task(self) -> None: 

546 """Validates a single replication task before execution.""" 

547 p, log = self.params, self.params.log 

548 src, dst = p.src, p.dst 

549 for remote in [src, dst]: 

550 r = remote 

551 r.ssh_user, r.ssh_host, r.ssh_user_host, r.pool, r.root_dataset = parse_dataset_locator( 

552 r.basis_root_dataset, user=r.basis_ssh_user, host=r.basis_ssh_host, port=r.ssh_port 

553 ) 

554 r.sudo, r.use_zfs_delegation = self.sudo_cmd(r.ssh_user_host, r.ssh_user) 

555 local_addrs = ("",) if self.is_test_mode else ("", "127.0.0.1", "::1") # ::1 is IPv6 version of loopback address 

556 remote.is_nonlocal = r.ssh_host not in local_addrs 

557 self.dst_dataset_exists = SynchronizedDict(self.all_dst_dataset_exists[dst.ssh_user_host]) 

558 

559 if src.ssh_host == dst.ssh_host: 

560 msg = f"src: {src.basis_root_dataset}, dst: {dst.basis_root_dataset}" 

561 if src.root_dataset == dst.root_dataset: 

562 die(f"Source and destination dataset must not be the same! {msg}") 

563 if p.recursive and ( 

564 is_descendant(src.root_dataset, of_root_dataset=dst.root_dataset) 

565 or is_descendant(dst.root_dataset, of_root_dataset=src.root_dataset) 

566 ): 

567 die(f"Source and destination dataset trees must not overlap! {msg}") 

568 

569 suffx = DESCENDANTS_RE_SUFFIX # also match descendants of a matching dataset 

570 p.exclude_dataset_regexes, p.include_dataset_regexes = ( 

571 p.tmp_exclude_dataset_regexes + compile_regexes(dataset_regexes(src, dst, p.abs_exclude_datasets), suffix=suffx), 

572 p.tmp_include_dataset_regexes + compile_regexes(dataset_regexes(src, dst, p.abs_include_datasets), suffix=suffx), 

573 ) 

574 if len(p.include_dataset_regexes) == 0: 

575 p.include_dataset_regexes = [(re.compile(r".*"), False)] 

576 

577 detect_available_programs(self) 

578 

579 zfs_send_program_opts: list[str] = p.curr_zfs_send_program_opts 

580 if is_zpool_feature_enabled_or_active(p, dst, "feature@large_blocks"): 

581 append_if_absent(zfs_send_program_opts, "--large-block") 

582 p.curr_zfs_send_program_opts = zfs_send_program_opts 

583 

584 self.max_workers = {} 

585 self.max_datasets_per_minibatch_on_list_snaps = {} 

586 for r in [src, dst]: 

587 cpus = int(p.available_programs[r.location].get("getconf_cpu_count", 8)) 

588 threads, is_percent = p.threads 

589 cpus = max(1, round(cpus * threads / 100.0) if is_percent else round(threads)) 

590 self.max_workers[r.location] = cpus 

591 bs = max(1, p.max_datasets_per_batch_on_list_snaps) # 1024 by default 

592 max_datasets_per_minibatch = p.max_datasets_per_minibatch_on_list_snaps 

593 if max_datasets_per_minibatch <= 0: 

594 max_datasets_per_minibatch = max(1, bs // cpus) 

595 max_datasets_per_minibatch = min(bs, max_datasets_per_minibatch) 

596 self.max_datasets_per_minibatch_on_list_snaps[r.location] = max_datasets_per_minibatch 

597 log.log( 

598 LOG_TRACE, 

599 "%s", 

600 f"max_datasets_per_batch_on_list_snaps: {p.max_datasets_per_batch_on_list_snaps}, " 

601 f"max_datasets_per_minibatch_on_list_snaps: {max_datasets_per_minibatch}, " 

602 f"max_workers: {self.max_workers[r.location]}, " 

603 f"location: {r.location}", 

604 ) 

605 if self.is_test_mode: 

606 log.log(LOG_TRACE, "Validated Param values: %s", pretty_print_formatter(self.params)) 

607 

608 def sudo_cmd(self, ssh_user_host: str, ssh_user: str) -> tuple[str, bool]: 

609 """Returns sudo command prefix and whether root privileges are required.""" 

610 p: Params = self.params 

611 assert isinstance(ssh_user_host, str) 

612 assert isinstance(ssh_user, str) 

613 assert isinstance(p.sudo_program, str) 

614 assert isinstance(p.enable_privilege_elevation, bool) 

615 

616 is_root: bool = True 

617 if ssh_user_host != "": 

618 if ssh_user == "": 

619 if os.getuid() != 0: 

620 is_root = False 

621 elif ssh_user != "root": 

622 is_root = False 

623 elif os.getuid() != 0: 

624 is_root = False 

625 

626 if is_root: 

627 sudo = "" # using sudo in an attempt to make ZFS operations work even if we are not root user? 

628 use_zfs_delegation = False # or instead using 'zfs allow' delegation? 

629 return sudo, use_zfs_delegation 

630 elif p.enable_privilege_elevation: 

631 if p.sudo_program == DISABLE_PRG: 

632 die(f"sudo CLI is not available on host: {ssh_user_host or 'localhost'}") 

633 # The '-n' option makes 'sudo' safer and more fail-fast. It avoids having sudo prompt the user for input of any 

634 # kind. If a password is required for the sudo command to run, sudo will display an error message and exit. 

635 return p.sudo_program + " -n", False 

636 else: 

637 return "", True 

638 

639 def run_task(self) -> None: 

640 """Replicates all snapshots for the current root dataset pair.""" 

641 

642 def filter_src_datasets() -> list[str]: # apply --{include|exclude}-dataset policy 

643 return filter_datasets(self, src, basis_src_datasets) if src_datasets is None else src_datasets 

644 

645 p, log = self.params, self.params.log 

646 src, dst = p.src, p.dst 

647 max_workers: int = min(self.max_workers[src.location], self.max_workers[dst.location]) 

648 recursive_sep: str = " " if p.recursive_flag else "" 

649 task_description: str = f"{src.basis_root_dataset} {p.recursive_flag}{recursive_sep}--> {dst.basis_root_dataset} ..." 

650 failed: bool = False 

651 src_datasets: list[str] | None = None 

652 basis_src_datasets: list[str] = [] 

653 self.src_properties = {} 

654 self.dst_properties = {} 

655 if not is_dummy(src): # find src dataset or all datasets in src dataset tree (with --recursive) 

656 basis_src_datasets = self.list_src_datasets_task() 

657 

658 if not p.create_src_snapshots_config.skip_create_src_snapshots: 

659 log.info(p.dry("--create-src-snapshots: %s"), f"{src.basis_root_dataset} {p.recursive_flag}{recursive_sep}...") 

660 src_datasets = filter_src_datasets() # apply include/exclude policy 

661 self.create_src_snapshots_task(basis_src_datasets, src_datasets) 

662 

663 # Optionally, replicate src.root_dataset (optionally including its descendants) to dst.root_dataset 

664 if not p.skip_replication: 

665 if len(basis_src_datasets) == 0: 

666 die(f"Replication: Source dataset does not exist: {src.basis_root_dataset}") 

667 if is_dummy(dst): 

668 die("Replication: Destination may be a dummy dataset only if exclusively creating snapshots on the source!") 

669 src_datasets = filter_src_datasets() # apply include/exclude policy 

670 failed = self.replicate_datasets(src_datasets, task_description, max_workers) 

671 

672 if failed or not ( 

673 p.delete_dst_datasets 

674 or p.delete_dst_snapshots 

675 or p.delete_empty_dst_datasets 

676 or p.compare_snapshot_lists 

677 or p.monitor_snapshots_config.enable_monitor_snapshots 

678 ): 

679 return 

680 log.info("Listing dst datasets: %s", task_description) 

681 if is_dummy(dst): 

682 die("Destination may be a dummy dataset only if exclusively creating snapshots on the source!") 

683 basis_dst_datasets: list[str] = self.list_dst_datasets_task() 

684 dst_datasets: list[str] = filter_datasets(self, dst, basis_dst_datasets) # apply include/exclude policy 

685 

686 if p.delete_dst_datasets and not failed: 

687 log.info(p.dry("--delete-dst-datasets: %s"), task_description) 

688 basis_dst_datasets, dst_datasets = self.delete_dst_datasets_task( 

689 basis_src_datasets, basis_dst_datasets, dst_datasets 

690 ) 

691 

692 if p.delete_dst_snapshots and not failed: 

693 log.info(p.dry("--delete-dst-snapshots: %s"), task_description + f" [{len(dst_datasets)} datasets]") 

694 failed = self.delete_destination_snapshots_task(basis_src_datasets, dst_datasets, max_workers, task_description) 

695 

696 if p.delete_empty_dst_datasets and p.recursive and not failed: 

697 log.info(p.dry("--delete-empty-dst-datasets: %s"), task_description) 

698 basis_dst_datasets, dst_datasets = self.delete_empty_dst_datasets_task(basis_dst_datasets, dst_datasets) 

699 

700 if p.compare_snapshot_lists and not failed: 

701 log.info("--compare-snapshot-lists: %s", task_description) 

702 if len(basis_src_datasets) == 0 and not is_dummy(src): 

703 die(f"Source dataset does not exist: {src.basis_root_dataset}") 

704 src_datasets = filter_src_datasets() # apply include/exclude policy 

705 run_compare_snapshot_lists(self, src_datasets, dst_datasets) 

706 

707 if p.monitor_snapshots_config.enable_monitor_snapshots and not failed: 

708 log.info("--monitor-snapshots: %s", task_description) 

709 src_datasets = filter_src_datasets() # apply include/exclude policy 

710 self.monitor_snapshots_task(src_datasets, dst_datasets, task_description) 

711 

712 def list_src_datasets_task(self) -> list[str]: 

713 """Lists datasets on the source host.""" 

714 p = self.params 

715 src = p.src 

716 basis_src_datasets: list[str] = [] 

717 is_caching: bool = is_caching_snapshots(p, src) 

718 props: str = "volblocksize,recordsize,name" 

719 props = "snapshots_changed," + props if is_caching else props 

720 cmd: list[str] = p.split_args( 

721 f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o {props} {p.recursive_flag}", src.root_dataset 

722 ) 

723 for line in (try_ssh_command(self, src, LOG_DEBUG, cmd=cmd) or "").splitlines(): 

724 cols: list[str] = line.split("\t") 

725 snapshots_changed, volblocksize, recordsize, src_dataset = cols if is_caching else ["-"] + cols 

726 self.src_properties[src_dataset] = DatasetProperties( 

727 recordsize=int(recordsize) if recordsize != "-" else -int(volblocksize), 

728 snapshots_changed=int(snapshots_changed) if snapshots_changed and snapshots_changed != "-" else 0, 

729 ) 

730 basis_src_datasets.append(src_dataset) 

731 assert (not self.is_test_mode) or basis_src_datasets == sorted(basis_src_datasets), "List is not sorted" 

732 return basis_src_datasets 

733 

734 def list_dst_datasets_task(self) -> list[str]: 

735 """Lists datasets on the destination host.""" 

736 p, log = self.params, self.params.log 

737 dst = p.dst 

738 is_caching: bool = is_caching_snapshots(p, dst) and p.monitor_snapshots_config.enable_monitor_snapshots 

739 props: str = "name" 

740 props = "snapshots_changed," + props if is_caching else props 

741 cmd: list[str] = p.split_args( 

742 f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o {props} {p.recursive_flag}", dst.root_dataset 

743 ) 

744 basis_dst_datasets: list[str] = [] 

745 basis_dst_datasets_str: str | None = try_ssh_command(self, dst, LOG_TRACE, cmd=cmd) 

746 if basis_dst_datasets_str is None: 

747 log.warning("Destination dataset does not exist: %s", dst.root_dataset) 

748 else: 

749 for line in basis_dst_datasets_str.splitlines(): 

750 cols: list[str] = line.split("\t") 

751 snapshots_changed, dst_dataset = cols if is_caching else ["-"] + cols 

752 self.dst_properties[dst_dataset] = DatasetProperties( 

753 recordsize=0, 

754 snapshots_changed=int(snapshots_changed) if snapshots_changed and snapshots_changed != "-" else 0, 

755 ) 

756 basis_dst_datasets.append(dst_dataset) 

757 assert (not self.is_test_mode) or basis_dst_datasets == sorted(basis_dst_datasets), "List is not sorted" 

758 return basis_dst_datasets 

759 

760 def create_src_snapshots_task(self, basis_src_datasets: list[str], src_datasets: list[str]) -> None: 

761 """Atomically creates a new snapshot of the src datasets selected by --{include|exclude}-dataset* policy; implements 

762 --create-src-snapshots. 

763 

764 The implementation attempts to fit as many datasets as possible into a single (atomic) 'zfs snapshot' command line, 

765 using lexicographical sort order, and using 'zfs snapshot -r' to the extent that this is compatible with the 

766 --{include|exclude}-dataset* pruning policy. The snapshots of all datasets that fit within the same single 'zfs 

767 snapshot' CLI invocation will be taken within the same ZFS transaction group, and correspondingly have identical 

768 'createtxg' ZFS property (but not necessarily identical 'creation' ZFS time property as ZFS actually provides no such 

769 guarantee), and thus be consistent. Dataset names that can't fit into a single command line are spread over multiple 

770 command line invocations, respecting the limits that the operating system places on the maximum length of a single 

771 command line, per `getconf ARG_MAX`. 

772 

773 Time complexity is O(max(N log N, M log M)) where N is the number of datasets and M is the number of snapshots per 

774 dataset. Space complexity is O(max(N, M)). 

775 """ 

776 p, log = self.params, self.params.log 

777 src = p.src 

778 if len(basis_src_datasets) == 0: 

779 die(f"Source dataset does not exist: {src.basis_root_dataset}") 

780 datasets_to_snapshot: dict[SnapshotLabel, list[str]] = self.find_datasets_to_snapshot(src_datasets) 

781 datasets_to_snapshot = {label: datasets for label, datasets in datasets_to_snapshot.items() if len(datasets) > 0} 

782 basis_datasets_to_snapshot: dict[SnapshotLabel, list[str]] = datasets_to_snapshot.copy() # shallow copy 

783 commands: dict[SnapshotLabel, list[str]] = {} 

784 for label, datasets in datasets_to_snapshot.items(): 

785 cmd: list[str] = p.split_args(f"{src.sudo} {p.zfs_program} snapshot") 

786 if p.recursive: 

787 # Run 'zfs snapshot -r' on the roots of subtrees if possible, else fallback to non-recursive CLI flavor 

788 root_datasets = self.root_datasets_if_recursive_zfs_snapshot_is_possible(datasets, basis_src_datasets) 

789 if root_datasets is not None: 

790 cmd.append("-r") # recursive; takes a snapshot of all datasets in the subtree(s) 

791 datasets_to_snapshot[label] = root_datasets 

792 commands[label] = cmd 

793 creation_msg = f"Creating {sum(len(datasets) for datasets in basis_datasets_to_snapshot.values())} snapshots" 

794 log.info(p.dry("--create-src-snapshots: %s"), f"{creation_msg} within {len(src_datasets)} datasets ...") 

795 # create snapshots in large (parallel) batches, without using a command line that's too big for the OS to handle 

796 run_ssh_cmd_parallel( 

797 self, 

798 src, 

799 ((commands[lbl], (f"{ds}@{lbl}" for ds in datasets)) for lbl, datasets in datasets_to_snapshot.items()), 

800 fn=lambda cmd, batch: run_ssh_command(self, src, is_dry=p.dry_run, print_stdout=True, cmd=cmd + batch), 

801 max_batch_items=2**29, 

802 ) 

803 if is_caching_snapshots(p, src): 

804 # perf: copy lastmodified time of source dataset into local cache to reduce future 'zfs list -t snapshot' calls 

805 self.cache.update_last_modified_cache(basis_datasets_to_snapshot) 

806 

807 def delete_destination_snapshots_task( 

808 self, basis_src_datasets: list[str], dst_datasets: list[str], max_workers: int, task_description: str 

809 ) -> bool: 

810 """Deletes existing destination snapshots that do not exist within the source dataset if they are included by the 

811 --{include|exclude}-snapshot-* policy, and the destination dataset is included via --{include|exclude}-dataset* 

812 policy; implements --delete-dst-snapshots.""" 

813 p, log = self.params, self.params.log 

814 src, dst = p.src, p.dst 

815 kind: str = "bookmark" if p.delete_dst_bookmarks else "snapshot" 

816 filter_needs_creation_time: bool = has_timerange_filter(p.snapshot_filters) 

817 props: str = self.creation_prefix + "creation,guid,name" if filter_needs_creation_time else "guid,name" 

818 basis_src_datasets_set: set[str] = set(basis_src_datasets) 

819 num_snapshots_found, num_snapshots_deleted = 0, 0 

820 

821 def delete_destination_snapshots(dst_dataset: str, tid: str, retry: Retry) -> bool: # thread-safe 

822 src_dataset: str = replace_prefix(dst_dataset, old_prefix=dst.root_dataset, new_prefix=src.root_dataset) 

823 if src_dataset in basis_src_datasets_set and (are_bookmarks_enabled(p, src) or not p.delete_dst_bookmarks): 

824 src_kind: str = kind 

825 if not p.delete_dst_snapshots_no_crosscheck: 

826 src_kind = "snapshot,bookmark" if are_bookmarks_enabled(p, src) else "snapshot" 

827 src_cmd = p.split_args(f"{p.zfs_program} list -t {src_kind} -d 1 -s name -Hp -o guid", src_dataset) 

828 else: 

829 src_cmd = None 

830 dst_cmd = p.split_args(f"{p.zfs_program} list -t {kind} -d 1 -s createtxg -Hp -o {props}", dst_dataset) 

831 self.maybe_inject_delete(dst, dataset=dst_dataset, delete_trigger="zfs_list_delete_dst_snapshots") 

832 src_snaps_with_guids, dst_snaps_with_guids_str = run_in_parallel( # list src+dst snapshots in parallel 

833 lambda: set(run_ssh_command(self, src, LOG_TRACE, cmd=src_cmd).splitlines() if src_cmd else []), 

834 lambda: try_ssh_command(self, dst, LOG_TRACE, cmd=dst_cmd), 

835 ) 

836 if dst_snaps_with_guids_str is None: 

837 log.warning("Third party deleted destination: %s", dst_dataset) 

838 return False 

839 dst_snaps_with_guids: list[str] = dst_snaps_with_guids_str.splitlines() 

840 num_dst_snaps_with_guids = len(dst_snaps_with_guids) 

841 basis_dst_snaps_with_guids: list[str] = dst_snaps_with_guids.copy() 

842 if p.delete_dst_bookmarks: 

843 replace_in_lines(dst_snaps_with_guids, old="#", new="@", count=1) # treat bookmarks as snapshots 

844 # The check against the source dataset happens *after* filtering the dst snapshots with filter_snapshots(). 

845 # `p.delete_dst_snapshots_except` means the user wants to specify snapshots to *retain* aka *keep* 

846 all_except: bool = p.delete_dst_snapshots_except 

847 if p.delete_dst_snapshots_except and not is_dummy(src): 

848 # However, as here we are in "except" mode AND the source is NOT a dummy, we first filter to get what 

849 # the policy says to *keep* (so all_except=False for the filter_snapshots() call), then from that "keep" 

850 # list, we later further refine by checking what's on the source dataset. 

851 all_except = False 

852 dst_snaps_with_guids = filter_snapshots(self, dst_snaps_with_guids, all_except=all_except) 

853 if p.delete_dst_bookmarks: 

854 replace_in_lines(dst_snaps_with_guids, old="@", new="#", count=1) # restore pre-filtering bookmark state 

855 if filter_needs_creation_time: 

856 dst_snaps_with_guids = cut(field=2, lines=dst_snaps_with_guids) 

857 basis_dst_snaps_with_guids = cut(field=2, lines=basis_dst_snaps_with_guids) 

858 if p.delete_dst_snapshots_except and not is_dummy(src): # Non-dummy Source + "Except" (Keep) Mode 

859 # Retain dst snapshots that match snapshot filter policy AND are on src dataset, aka 

860 # Delete dst snapshots except snapshots that match snapshot filter policy AND are on src dataset. 

861 # Concretely, `dst_snaps_with_guids` contains GUIDs of DST snapshots that the filter policy says to KEEP. 

862 # We only actually keep them if they are ALSO on the SRC. 

863 # So, snapshots to DELETE (`dst_tags_to_delete`) are ALL snapshots on DST (`basis_dst_snaps_with_guids`) 

864 # EXCEPT those whose GUIDs are in `dst_snaps_with_guids` AND ALSO in `src_snaps_with_guids`. 

865 except_dst_guids: set[str] = set(cut(field=1, lines=dst_snaps_with_guids)).intersection(src_snaps_with_guids) 

866 dst_tags_to_delete: list[str] = filter_lines_except(basis_dst_snaps_with_guids, except_dst_guids) 

867 else: # Standard Delete Mode OR Dummy Source + "Except" (Keep) Mode 

868 # In standard delete mode: 

869 # `dst_snaps_with_guids` contains GUIDs of policy-selected snapshots on DST. 

870 # We delete those that are NOT on SRC. 

871 # `dst_tags_to_delete` = `dst_snaps_with_guids` - `src_snaps_with_guids`. 

872 # In dummy source + "except" (keep) mode: 

873 # `all_except` was True. 

874 # `dst_snaps_with_guids` contains snaps NOT matching the "keep" policy -- these are the ones to delete. 

875 # `src_snaps_with_guids` is empty. 

876 # `dst_tags_to_delete` = `dst_snaps_with_guids` - {} = `dst_snaps_with_guids`. 

877 dst_guids_to_delete = set(cut(field=1, lines=dst_snaps_with_guids)).difference(src_snaps_with_guids) 

878 dst_tags_to_delete = filter_lines(dst_snaps_with_guids, dst_guids_to_delete) 

879 separator: str = "#" if p.delete_dst_bookmarks else "@" 

880 dst_tags_to_delete = cut(field=2, separator=separator, lines=dst_tags_to_delete) 

881 if p.delete_dst_bookmarks: 

882 delete_bookmarks(self, dst, dst_dataset, snapshot_tags=dst_tags_to_delete) 

883 else: 

884 delete_snapshots(self, dst, dst_dataset, snapshot_tags=dst_tags_to_delete) 

885 with self.stats_lock: 

886 nonlocal num_snapshots_found 

887 num_snapshots_found += num_dst_snaps_with_guids 

888 nonlocal num_snapshots_deleted 

889 num_snapshots_deleted += len(dst_tags_to_delete) 

890 if len(dst_tags_to_delete) > 0 and not p.delete_dst_bookmarks: 

891 self.dst_properties[dst_dataset].snapshots_changed = 0 # invalidate cache 

892 return True 

893 

894 # Run delete_destination_snapshots(dataset) for each dataset, while handling errors, retries + parallel exec 

895 failed: bool = False 

896 if are_bookmarks_enabled(p, dst) or not p.delete_dst_bookmarks: 

897 start_time_nanos = time.monotonic_ns() 

898 failed = process_datasets_in_parallel_and_fault_tolerant( 

899 log=log, 

900 datasets=dst_datasets, 

901 process_dataset=delete_destination_snapshots, # lambda 

902 skip_tree_on_error=lambda dataset: False, 

903 skip_on_error=p.skip_on_error, 

904 max_workers=max_workers, 

905 termination_event=self.termination_event, 

906 termination_handler=self.terminate, 

907 enable_barriers=False, 

908 task_name="--delete-dst-snapshots", 

909 append_exception=self.append_exception, 

910 retry_policy=p.retry_policy, 

911 dry_run=p.dry_run, 

912 is_test_mode=self.is_test_mode, 

913 ) 

914 elapsed_nanos = time.monotonic_ns() - start_time_nanos 

915 log.info( 

916 p.dry("--delete-dst-snapshots: %s"), 

917 task_description + f" [Deleted {num_snapshots_deleted} out of {num_snapshots_found} {kind}s " 

918 f"within {len(dst_datasets)} datasets; took {human_readable_duration(elapsed_nanos)}]", 

919 ) 

920 return failed 

921 

922 def delete_dst_datasets_task( 

923 self, basis_src_datasets: list[str], basis_dst_datasets: list[str], sorted_dst_datasets: list[str] 

924 ) -> tuple[list[str], list[str]]: 

925 """Deletes existing destination datasets that do not exist within the source dataset if they are included via 

926 --{include|exclude}-dataset* policy; implements --delete-dst-datasets. 

927 

928 Do not recurse without --recursive. With --recursive, never delete non-selected dataset subtrees or their ancestors. 

929 """ 

930 p = self.params 

931 src, dst = p.src, p.dst 

932 children: dict[str, set[str]] = defaultdict(set) 

933 for dst_dataset in basis_dst_datasets: # Compute the direct children of each NON-FILTERED dataset 

934 parent: str = os.path.dirname(dst_dataset) 

935 children[parent].add(dst_dataset) 

936 to_delete: set[str] = set() 

937 for dst_dataset in reversed(sorted_dst_datasets): # Reverse order facilitates efficient O(N) time algorithm 

938 if children[dst_dataset].issubset(to_delete): 

939 to_delete.add(dst_dataset) # all children are deletable, thus the dataset itself is deletable too 

940 to_delete = to_delete.difference( 

941 replace_prefix(src_dataset, src.root_dataset, dst.root_dataset) for src_dataset in basis_src_datasets 

942 ) 

943 delete_datasets(self, dst, to_delete) 

944 sorted_dst_datasets = sorted(set(sorted_dst_datasets).difference(to_delete)) 

945 basis_dst_datasets = sorted(set(basis_dst_datasets).difference(to_delete)) 

946 return basis_dst_datasets, sorted_dst_datasets 

947 

948 def delete_empty_dst_datasets_task( 

949 self, basis_dst_datasets: list[str], sorted_dst_datasets: list[str] 

950 ) -> tuple[list[str], list[str]]: 

951 """Deletes any existing destination dataset that has no snapshot and no bookmark if all descendants of that dataset 

952 do not have a snapshot or bookmark either; implements --delete-empty-dst-datasets. 

953 

954 To do so, we walk the dataset list (conceptually, a tree) depth-first (i.e. sorted descending). If a dst dataset has 

955 zero snapshots and zero bookmarks and all its children are already marked as orphans, then it is itself an orphan, 

956 and we mark it as such. Walking in a reverse sorted way means that we efficiently check for zero snapshots/bookmarks 

957 not just over the direct children but the entire tree. Finally, delete all orphan datasets in an efficient batched 

958 way. 

959 """ 

960 p = self.params 

961 dst = p.dst 

962 

963 # Compute the direct children of each NON-FILTERED dataset. Thus, no non-selected dataset and no ancestor of a 

964 # non-selected dataset will ever be added to the "orphan" set. In other words, this treats non-selected dataset 

965 # subtrees as if they all had snapshots, so non-selected dataset subtrees and their ancestors are guaranteed 

966 # to not get deleted. 

967 children: dict[str, set[str]] = defaultdict(set) 

968 for dst_dataset in basis_dst_datasets: 

969 parent: str = os.path.dirname(dst_dataset) 

970 children[parent].add(dst_dataset) 

971 

972 def compute_orphans(datasets_having_snapshots: set[str]) -> set[str]: 

973 """Returns destination datasets having zero snapshots whose children are all orphans.""" 

974 orphans: set[str] = set() 

975 for dst_dataset in reversed(sorted_dst_datasets): # Reverse order facilitates efficient O(N) time algorithm 

976 if (dst_dataset not in datasets_having_snapshots) and children[dst_dataset].issubset(orphans): 

977 orphans.add(dst_dataset) 

978 return orphans 

979 

980 # Compute candidate orphan datasets, which reduces the list of datasets for which we list snapshots via 

981 # 'zfs list -t snapshot ...' from dst_datasets to a subset of dst_datasets, which in turn reduces I/O and improves 

982 # perf. Essentially, this eliminates the I/O to list snapshots for ancestors of excluded datasets. 

983 candidate_orphans: set[str] = compute_orphans(set()) 

984 

985 # Compute destination datasets having more than zero snapshots 

986 dst_datasets_having_snapshots: set[str] = set() 

987 with_bookmarks: bool = p.delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots and are_bookmarks_enabled(p, dst) 

988 btype: str = "bookmark,snapshot" if with_bookmarks else "snapshot" 

989 cmd: list[str] = p.split_args(f"{p.zfs_program} list -t {btype} -d 1 -S name -Hp -o name") 

990 for snapshots in zfs_list_snapshots_in_parallel(self, dst, cmd, sorted(candidate_orphans), ordered=False): 

991 if with_bookmarks: 

992 replace_in_lines(snapshots, old="#", new="@", count=1) # treat bookmarks as snapshots 

993 dst_datasets_having_snapshots.update(snap[0 : snap.index("@")] for snap in snapshots) # union 

994 

995 orphans = compute_orphans(dst_datasets_having_snapshots) # compute the real orphans 

996 delete_datasets(self, dst, orphans) # finally, delete the orphan datasets in an efficient way 

997 sorted_dst_datasets = sorted(set(sorted_dst_datasets).difference(orphans)) 

998 basis_dst_datasets = sorted(set(basis_dst_datasets).difference(orphans)) 

999 return basis_dst_datasets, sorted_dst_datasets 

1000 

1001 def monitor_snapshots_task( 

1002 self, sorted_src_datasets: list[str], sorted_dst_datasets: list[str], task_description: str 

1003 ) -> None: 

1004 """Monitors src and dst snapshots; implements --monitor-snapshots.""" 

1005 p, log = self.params, self.params.log 

1006 src, dst = p.src, p.dst 

1007 num_cache_hits: int = self.num_cache_hits 

1008 num_cache_misses: int = self.num_cache_misses 

1009 start_time_nanos: int = time.monotonic_ns() 

1010 run_in_parallel( 

1011 lambda: self.monitor_snapshots(dst, sorted_dst_datasets), 

1012 lambda: self.monitor_snapshots(src, sorted_src_datasets), 

1013 ) 

1014 elapsed: str = human_readable_duration(time.monotonic_ns() - start_time_nanos) 

1015 num_cache_hits = self.num_cache_hits - num_cache_hits 

1016 num_cache_misses = self.num_cache_misses - num_cache_misses 

1017 if num_cache_hits > 0 or num_cache_misses > 0: 

1018 msg = self._cache_hits_msg(hits=num_cache_hits, misses=num_cache_misses) 

1019 else: 

1020 msg = "" 

1021 log.info( 

1022 "--monitor-snapshots done: %s", 

1023 f"{task_description} [{len(sorted_src_datasets) + len(sorted_dst_datasets)} datasets; took {elapsed}{msg}]", 

1024 ) 

1025 

1026 def monitor_snapshots(self, remote: Remote, sorted_datasets: list[str]) -> None: 

1027 """Checks snapshot freshness and warns or errors out when limits are exceeded. 

1028 

1029 Alerts the user if the ZFS 'creation' time property of the latest or oldest snapshot for any specified snapshot name 

1030 pattern within the selected datasets is too old wrt. the specified age limit. The purpose is to check if snapshots 

1031 are successfully taken on schedule, successfully replicated on schedule, and successfully pruned on schedule. Process 

1032 exit code is 0, 1, 2 on OK, WARNING, CRITICAL, respectively. 

1033 

1034 Time complexity is O(max(N log N, M log M)) where N is the number of datasets and M is the number of snapshots per 

1035 dataset. Space complexity is O(max(N, M)). 

1036 """ 

1037 p, log = self.params, self.params.log 

1038 alerts: list[MonitorSnapshotAlert] = p.monitor_snapshots_config.alerts 

1039 labels: list[SnapshotLabel] = [alert.label for alert in alerts] 

1040 current_unixtime_millis: float = p.create_src_snapshots_config.current_datetime.timestamp() * 1000 

1041 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

1042 if is_caching_snapshots(p, remote): 

1043 props: dict[str, DatasetProperties] = self.dst_properties if remote is p.dst else self.src_properties 

1044 snapshots_changed_dict: dict[str, int] = {dataset: vals.snapshots_changed for dataset, vals in props.items()} 

1045 alerts_hash: str = sha256_128_urlsafe_base64(str(tuple(alerts))) 

1046 label_hashes: dict[SnapshotLabel, str] = { 

1047 label: sha256_128_urlsafe_base64(label.notimestamp_str()) for label in labels 

1048 } 

1049 is_caching: bool = False 

1050 

1051 def monitor_last_modified_cache_file(r: Remote, dataset: str, label: SnapshotLabel, alert_cfg: AlertConfig) -> str: 

1052 cache_label: str = os.path.join(MONITOR_CACHE_FILE_PREFIX, alert_cfg.kind[0], label_hashes[label], alerts_hash) 

1053 return self.cache.last_modified_cache_file(r, dataset, cache_label) 

1054 

1055 def alert_msg( 

1056 kind: str, dataset: str, snapshot: str, label: SnapshotLabel, snapshot_age_millis: float, delta_millis: int 

1057 ) -> str: 

1058 assert kind == "Latest" or kind == "Oldest" 

1059 lbl = f"{label.prefix}{label.infix}<timestamp>{label.suffix}" 

1060 if snapshot_age_millis >= current_unixtime_millis: 

1061 return f"No snapshot exists for {dataset}@{lbl}" 

1062 msg = f"{kind} snapshot for {dataset}@{lbl} is {human_readable_duration(snapshot_age_millis, unit='ms')} old" 

1063 s = f": @{snapshot}" if snapshot else "" 

1064 if delta_millis == -1: 

1065 return f"{msg}{s}" 

1066 return f"{msg} but should be at most {human_readable_duration(delta_millis, unit='ms')} old{s}" 

1067 

1068 def check_alert( 

1069 label: SnapshotLabel, alert_cfg: AlertConfig | None, creation_unixtime_secs: int, dataset: str, snapshot: str 

1070 ) -> None: # thread-safe 

1071 if alert_cfg is None: 

1072 return 

1073 if is_caching and not p.dry_run: # update cache with latest state from 'zfs list -t snapshot' 

1074 snapshots_changed: int = snapshots_changed_dict.get(dataset, 0) 

1075 cache_file: str = monitor_last_modified_cache_file(remote, dataset, label, alert_cfg) 

1076 set_last_modification_time_safe( 

1077 cache_file, unixtime_in_secs=(creation_unixtime_secs, snapshots_changed), if_more_recent=True 

1078 ) 

1079 warning_millis: int = alert_cfg.warning_millis 

1080 critical_millis: int = alert_cfg.critical_millis 

1081 alert_kind = alert_cfg.kind 

1082 snapshot_age_millis: float = current_unixtime_millis - creation_unixtime_secs * 1000 

1083 m = "--monitor_snapshots: " 

1084 if snapshot_age_millis > critical_millis: 

1085 msg = m + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, critical_millis) 

1086 log.critical("%s", msg) 

1087 if not p.monitor_snapshots_config.dont_crit: 

1088 die(msg, exit_code=CRITICAL_STATUS) 

1089 elif snapshot_age_millis > warning_millis: 

1090 msg = m + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, warning_millis) 

1091 log.warning("%s", msg) 

1092 if not p.monitor_snapshots_config.dont_warn: 

1093 die(msg, exit_code=WARNING_STATUS) 

1094 elif is_debug: 

1095 msg = m + "OK. " + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, delta_millis=-1) 

1096 log.debug("%s", msg) 

1097 

1098 def alert_latest_snapshot(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None: 

1099 alert: MonitorSnapshotAlert = alerts[i] 

1100 check_alert(alert.label, alert.latest, creation_unixtime_secs, dataset, snapshot) 

1101 

1102 def alert_oldest_snapshot(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None: 

1103 alert: MonitorSnapshotAlert = alerts[i] 

1104 check_alert(alert.label, alert.oldest, creation_unixtime_secs, dataset, snapshot) 

1105 

1106 def find_stale_datasets_and_check_alerts() -> list[str]: 

1107 """If the cache is enabled, check which datasets have changed to determine which datasets can be skipped cheaply, 

1108 that is, without incurring 'zfs list -t snapshots'. 

1109 

1110 This is done by comparing the "snapshots_changed" ZFS dataset property with the local cache. See 

1111 https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed 

1112 """ 

1113 stale_datasets: list[str] = [] 

1114 time_threshold: float = time.time() - MATURITY_TIME_THRESHOLD_SECS 

1115 for dataset in sorted_datasets: 

1116 is_stale_dataset: bool = False 

1117 snapshots_changed: int = snapshots_changed_dict.get(dataset, 0) 

1118 for alert in alerts: 

1119 for cfg in (alert.latest, alert.oldest): 

1120 if cfg is None: 

1121 continue 

1122 if ( 

1123 snapshots_changed != 0 

1124 and snapshots_changed < time_threshold 

1125 and ( # always True 

1126 cached_unix_times := self.cache.get_snapshots_changed2( 

1127 monitor_last_modified_cache_file(remote, dataset, alert.label, cfg) 

1128 ) 

1129 ) 

1130 and snapshots_changed == cached_unix_times[1] # cached snapshots_changed aka last modified time 

1131 and snapshots_changed >= cached_unix_times[0] # creation time of minmax snapshot aka access time 

1132 ): # cached state is still valid; emit an alert if the latest/oldest snapshot is too old 

1133 lbl = alert.label 

1134 check_alert(lbl, cfg, creation_unixtime_secs=cached_unix_times[0], dataset=dataset, snapshot="") 

1135 else: # cached state is no longer valid; fallback to 'zfs list -t snapshot' 

1136 is_stale_dataset = True 

1137 if is_stale_dataset: 

1138 stale_datasets.append(dataset) 

1139 return stale_datasets 

1140 

1141 # satisfy request from local cache as much as possible 

1142 if is_caching_snapshots(p, remote): 

1143 stale_datasets: list[str] = find_stale_datasets_and_check_alerts() 

1144 with self.stats_lock: 

1145 self.num_cache_misses += len(stale_datasets) 

1146 self.num_cache_hits += len(sorted_datasets) - len(stale_datasets) 

1147 else: 

1148 stale_datasets = sorted_datasets 

1149 

1150 # fallback to 'zfs list -t snapshot' for any remaining datasets, as these couldn't be satisfied from local cache 

1151 is_caching = is_caching_snapshots(p, remote) 

1152 datasets_without_snapshots: list[str] = self.handle_minmax_snapshots( 

1153 remote, stale_datasets, labels, fn_latest=alert_latest_snapshot, fn_oldest=alert_oldest_snapshot 

1154 ) 

1155 for dataset in datasets_without_snapshots: 

1156 for i in range(len(alerts)): 

1157 alert_latest_snapshot(i, creation_unixtime_secs=0, dataset=dataset, snapshot="") 

1158 alert_oldest_snapshot(i, creation_unixtime_secs=0, dataset=dataset, snapshot="") 

1159 

1160 def replicate_datasets(self, src_datasets: list[str], task_description: str, max_workers: int) -> bool: 

1161 """Replicates a list of datasets.""" 

1162 assert (not self.is_test_mode) or src_datasets == sorted(src_datasets), "List is not sorted" 

1163 p, log = self.params, self.params.log 

1164 src, dst = p.src, p.dst 

1165 self.num_snapshots_found = 0 

1166 self.num_snapshots_replicated = 0 

1167 log.info("Starting replication task: %s", task_description + f" [{len(src_datasets)} datasets]") 

1168 start_time_nanos: int = time.monotonic_ns() 

1169 

1170 def src2dst(src_dataset: str) -> str: 

1171 return replace_prefix(src_dataset, old_prefix=src.root_dataset, new_prefix=dst.root_dataset) 

1172 

1173 def dst2src(dst_dataset: str) -> str: 

1174 return replace_prefix(dst_dataset, old_prefix=dst.root_dataset, new_prefix=src.root_dataset) 

1175 

1176 def find_stale_datasets() -> tuple[list[str], dict[str, str]]: 

1177 """If the cache is enabled on replication, check which src datasets or dst datasets have changed to determine 

1178 which datasets can be skipped cheaply, i.e. without incurring 'zfs list -t snapshots'. 

1179 

1180 This is done by comparing the "snapshots_changed" ZFS dataset property with the local cache. See 

1181 https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed 

1182 """ 

1183 # First, check which src datasets have changed since the last replication to that destination 

1184 cache_files: dict[str, str] = {} 

1185 stale_src_datasets1: list[str] = [] 

1186 maybe_stale_dst_datasets: list[str] = [] 

1187 userhost_dir: str = sha256_85_urlsafe_base64(p.dst.cache_namespace()) 

1188 filter_key = tuple(tuple(f) for f in p.snapshot_filters) # cache is only valid for same --include/excl-snapshot* 

1189 filter_hash_code: str = sha256_85_urlsafe_base64(str(filter_key)) 

1190 for src_dataset in src_datasets: 

1191 dst_dataset: str = src2dst(src_dataset) # cache is only valid for identical destination dataset 

1192 dst_dataset_dir: str = sha256_85_urlsafe_base64(dst_dataset) 

1193 cache_label: str = os.path.join( 

1194 REPLICATION_CACHE_FILE_PREFIX, userhost_dir, dst_dataset_dir, filter_hash_code 

1195 ) 

1196 cache_file: str = self.cache.last_modified_cache_file(src, src_dataset, cache_label) 

1197 cache_files[src_dataset] = cache_file 

1198 snapshots_changed: int = self.src_properties[src_dataset].snapshots_changed # get prop "for free" 

1199 if ( 

1200 snapshots_changed != 0 

1201 and time.time() > snapshots_changed + MATURITY_TIME_THRESHOLD_SECS 

1202 and snapshots_changed == self.cache.get_snapshots_changed(cache_file) 

1203 ): 

1204 maybe_stale_dst_datasets.append(dst_dataset) 

1205 else: 

1206 stale_src_datasets1.append(src_dataset) 

1207 

1208 # For each src dataset that hasn't changed, check if the corresponding dst dataset has changed 

1209 stale_src_datasets2: list[str] = [] 

1210 dst_snapshots_changed_dict: dict[str, int] = self.cache.zfs_get_snapshots_changed(dst, maybe_stale_dst_datasets) 

1211 for dst_dataset in maybe_stale_dst_datasets: 

1212 snapshots_changed = dst_snapshots_changed_dict.get(dst_dataset, 0) 

1213 if ( 

1214 snapshots_changed != 0 

1215 and time.time() > snapshots_changed + MATURITY_TIME_THRESHOLD_SECS 

1216 and snapshots_changed 

1217 == self.cache.get_snapshots_changed(self.cache.last_modified_cache_file(dst, dst_dataset)) 

1218 ): 

1219 log.info("Already up-to-date [cached]: %s", dst_dataset) 

1220 else: 

1221 stale_src_datasets2.append(dst2src(dst_dataset)) 

1222 assert (not self.is_test_mode) or stale_src_datasets1 == sorted(stale_src_datasets1), "List is not sorted" 

1223 assert (not self.is_test_mode) or stale_src_datasets2 == sorted(stale_src_datasets2), "List is not sorted" 

1224 stale_src_datasets = list(heapq.merge(stale_src_datasets1, stale_src_datasets2)) # merge two sorted lists 

1225 assert (not self.is_test_mode) or not has_duplicates(stale_src_datasets), "List contains duplicates" 

1226 return stale_src_datasets, cache_files 

1227 

1228 if is_caching_snapshots(p, src): 

1229 stale_src_datasets, cache_files = find_stale_datasets() 

1230 num_cache_misses = len(stale_src_datasets) 

1231 num_cache_hits = len(src_datasets) - len(stale_src_datasets) 

1232 self.num_cache_misses += num_cache_misses 

1233 self.num_cache_hits += num_cache_hits 

1234 cmsg = self._cache_hits_msg(hits=num_cache_hits, misses=num_cache_misses) 

1235 else: 

1236 stale_src_datasets = src_datasets 

1237 cache_files = {} 

1238 cmsg = "" 

1239 

1240 # Run replicate_dataset(dataset) for each dataset, while taking care of errors, retries + parallel execution 

1241 failed: bool = process_datasets_in_parallel_and_fault_tolerant( 

1242 log=log, 

1243 datasets=stale_src_datasets, 

1244 process_dataset=lambda src_dataset, tid, retry: replicate_dataset(self, src_dataset, tid, retry), 

1245 skip_tree_on_error=lambda dataset: not self.dst_dataset_exists[src2dst(dataset)], 

1246 skip_on_error=p.skip_on_error, 

1247 max_workers=max_workers, 

1248 termination_event=self.termination_event, 

1249 termination_handler=self.terminate, 

1250 enable_barriers=False, 

1251 task_name="Replication", 

1252 append_exception=self.append_exception, 

1253 retry_policy=p.retry_policy, 

1254 dry_run=p.dry_run, 

1255 is_test_mode=self.is_test_mode, 

1256 ) 

1257 

1258 if is_caching_snapshots(p, src) and not failed: 

1259 # refresh "snapshots_changed" ZFS dataset property from dst 

1260 stale_dst_datasets: list[str] = [src2dst(src_dataset) for src_dataset in stale_src_datasets] 

1261 dst_snapshots_changed_dict: dict[str, int] = self.cache.zfs_get_snapshots_changed(dst, stale_dst_datasets) 

1262 for dst_dataset in stale_dst_datasets: # update local cache 

1263 dst_snapshots_changed: int = dst_snapshots_changed_dict.get(dst_dataset, 0) 

1264 dst_cache_file: str = self.cache.last_modified_cache_file(dst, dst_dataset) 

1265 src_dataset: str = dst2src(dst_dataset) 

1266 src_snapshots_changed: int = self.src_properties[src_dataset].snapshots_changed 

1267 if not p.dry_run: 

1268 set_last_modification_time_safe( 

1269 cache_files[src_dataset], unixtime_in_secs=src_snapshots_changed, if_more_recent=True 

1270 ) 

1271 set_last_modification_time_safe( 

1272 dst_cache_file, unixtime_in_secs=dst_snapshots_changed, if_more_recent=True 

1273 ) 

1274 

1275 elapsed_nanos: int = time.monotonic_ns() - start_time_nanos 

1276 log.info( 

1277 p.dry("Replication done: %s"), 

1278 f"{task_description} [Replicated {self.num_snapshots_replicated} out of {self.num_snapshots_found} snapshots" 

1279 f" within {len(src_datasets)} datasets; took {human_readable_duration(elapsed_nanos)}{cmsg}]", 

1280 ) 

1281 return failed 

1282 

1283 def maybe_inject_delete(self, remote: Remote, dataset: str, delete_trigger: str) -> None: 

1284 """For testing only; for unit tests to delete datasets during replication and test correct handling of that.""" 

1285 assert delete_trigger 

1286 counter = self.delete_injection_triggers.get("before") 

1287 if counter and decrement_injection_counter(self, counter, delete_trigger): 

1288 p = self.params 

1289 cmd = p.split_args(f"{remote.sudo} {p.zfs_program} destroy -r", p.force_unmount, p.force_hard, dataset or "") 

1290 run_ssh_command(self, remote, LOG_DEBUG, print_stdout=True, cmd=cmd) 

1291 

1292 def maybe_inject_params(self, error_trigger: str) -> None: 

1293 """For testing only; for unit tests to simulate errors during replication and test correct handling of them.""" 

1294 assert error_trigger 

1295 counter = self.error_injection_triggers.get("before") 

1296 if counter and decrement_injection_counter(self, counter, error_trigger): 

1297 self.inject_params = self.param_injection_triggers[error_trigger] 

1298 elif error_trigger in self.param_injection_triggers: 

1299 self.inject_params = {} 

1300 

1301 @staticmethod 

1302 def recv_option_property_names(recv_opts: list[str]) -> set[str]: 

1303 """Extracts -o and -x property names that are already specified on the command line; This can be used to check for 

1304 dupes because 'zfs receive' does not accept multiple -o or -x options with the same property name.""" 

1305 propnames = set() 

1306 i = 0 

1307 n = len(recv_opts) 

1308 while i < n: 

1309 stripped: str = recv_opts[i].strip() 

1310 if stripped in ("-o", "-x"): 

1311 i += 1 

1312 if i == n or recv_opts[i].strip() in ("-o", "-x"): 

1313 die(f"Missing value for {stripped} option in --zfs-recv-program-opt(s): {' '.join(recv_opts)}") 

1314 if stripped == "-o" and "=" not in recv_opts[i]: 

1315 die(f"Missing value for {stripped} name=value pair in --zfs-recv-program-opt(s): {' '.join(recv_opts)}") 

1316 propname: str = recv_opts[i] if stripped == "-x" else recv_opts[i].split("=", 1)[0] 

1317 validate_property_name(propname, "--zfs-recv-program-opt(s)") 

1318 propnames.add(propname) 

1319 i += 1 

1320 return propnames 

1321 

1322 def root_datasets_if_recursive_zfs_snapshot_is_possible( 

1323 self, datasets: list[str], basis_datasets: list[str] 

1324 ) -> list[str] | None: 

1325 """Returns the root datasets within the (filtered) `datasets` list if no incompatible pruning is detected. A dataset 

1326 within `datasets` is considered a root dataset if it has no parent, i.e. it is not a descendant of any dataset in 

1327 `datasets`. Returns `None` if any (unfiltered) dataset in `basis_dataset` that is a descendant of at least one of the 

1328 root datasets is missing in `datasets`, indicating that --include/exclude-dataset* or the snapshot schedule have 

1329 pruned a dataset in a way that is incompatible with 'zfs snapshot -r' CLI semantics, thus requiring a switch to the 

1330 non-recursive 'zfs snapshot snapshot1 .. snapshot N' CLI flavor. 

1331 

1332 Assumes that set(datasets).issubset(set(basis_datasets)). Also assumes that datasets and basis_datasets are both 

1333 sorted (and thus the output root_datasets list is sorted too), which is why this algorithm is efficient - O(N) time 

1334 complexity. The impl is akin to the merge algorithm of a merge sort, adapted to our specific use case. 

1335 See root_datasets_if_recursive_zfs_snapshot_is_possible_slow_but_correct() in the unit test suite for an alternative 

1336 impl that's easier to grok. 

1337 """ 

1338 assert (not self.is_test_mode) or datasets == sorted(datasets), "List is not sorted" 

1339 assert (not self.is_test_mode) or not has_duplicates(datasets), "List contains duplicates" 

1340 assert (not self.is_test_mode) or basis_datasets == sorted(basis_datasets), "List is not sorted" 

1341 assert (not self.is_test_mode) or not has_duplicates(basis_datasets), "List contains duplicates" 

1342 assert (not self.is_test_mode) or set(datasets).issubset(set(basis_datasets)), "Not a subset" 

1343 root_datasets: list[str] = self.find_root_datasets(datasets) 

1344 i = 0 

1345 j = 0 

1346 k = 0 

1347 len_root_datasets = len(root_datasets) 

1348 len_basis_datasets = len(basis_datasets) 

1349 len_datasets = len(datasets) 

1350 while i < len_root_datasets and j < len_basis_datasets: # walk and "merge" the sorted lists, in sync 

1351 if basis_datasets[j] < root_datasets[i]: # irrelevant subtree? 

1352 j += 1 # move to next basis_datasets[j] 

1353 elif is_descendant(basis_datasets[j], of_root_dataset=root_datasets[i]): # relevant subtree? 

1354 while k < len_datasets and datasets[k] < basis_datasets[j]: 

1355 k += 1 # move to next datasets[k] 

1356 if k == len_datasets or datasets[k] != basis_datasets[j]: # dataset chopped off by schedule or --incl/excl*? 

1357 return None # detected filter pruning that is incompatible with 'zfs snapshot -r' 

1358 j += 1 # move to next basis_datasets[j] 

1359 else: 

1360 i += 1 # move to next root_dataset[i]; no need to check root_datasets that are no longer (or not yet) reachable 

1361 return root_datasets 

1362 

1363 @staticmethod 

1364 def find_root_datasets(sorted_datasets: list[str]) -> list[str]: 

1365 """Returns the roots of the subtrees in the (sorted) input datasets; The output root dataset list is sorted, too; A 

1366 dataset is a root dataset if it has no parent, i.e. it is not a descendant of any dataset in the input datasets.""" 

1367 root_datasets: list[str] = [] 

1368 skip_dataset: str = DONT_SKIP_DATASET 

1369 for dataset in sorted_datasets: 

1370 if is_descendant(dataset, of_root_dataset=skip_dataset): 

1371 continue 

1372 skip_dataset = dataset 

1373 root_datasets.append(dataset) 

1374 return root_datasets 

1375 

1376 def find_datasets_to_snapshot(self, sorted_datasets: list[str]) -> dict[SnapshotLabel, list[str]]: 

1377 """Given a (sorted) list of source datasets, returns a dict where the key is a snapshot name (aka SnapshotLabel, e.g. 

1378 bzfs_2024-11-06_08:30:05_hourly) and the value is the (sorted) (sub)list of datasets for which a snapshot needs to be 

1379 created with that name, because these datasets are due per the schedule, either because the 'creation' time of their 

1380 most recent snapshot with that name pattern is now too old, or such a snapshot does not even exist. 

1381 

1382 The baseline implementation uses the 'zfs list -t snapshot' CLI to find the most recent snapshots, which is simple 

1383 but doesn't scale well with the number of snapshots, at least if the goal is to take snapshots every second. An 

1384 alternative, much more scalable, implementation queries the standard ZFS "snapshots_changed" dataset property 

1385 (requires zfs >= 2.2.0), in combination with a local cache that stores this property, as well as the creation time of 

1386 the most recent snapshot, for each SnapshotLabel and each dataset. 

1387 """ 

1388 p, log = self.params, self.params.log 

1389 src = p.src 

1390 config: CreateSrcSnapshotConfig = p.create_src_snapshots_config 

1391 datasets_to_snapshot: dict[SnapshotLabel, list[str]] = defaultdict(list) 

1392 is_caching: bool = False 

1393 interner: Interner[datetime] = Interner() # reduces memory footprint 

1394 msgs: list[tuple[datetime, str, SnapshotLabel, str]] = [] 

1395 

1396 def create_snapshot_if_latest_is_too_old( 

1397 datasets_to_snapshot: dict[SnapshotLabel, list[str]], dataset: str, label: SnapshotLabel, creation_unixtime: int 

1398 ) -> None: # thread-safe 

1399 """Schedules creation of a snapshot for the given label if the label's existing latest snapshot is too old.""" 

1400 creation_dt: datetime = datetime.fromtimestamp(creation_unixtime, tz=config.tz) 

1401 log.log(LOG_TRACE, "Latest snapshot creation: %s for %s", creation_dt, label) 

1402 duration_amount, duration_unit = config.suffix_durations[label.suffix] 

1403 next_event_dt: datetime = config.anchors.round_datetime_up_to_duration_multiple( 

1404 creation_dt + timedelta(microseconds=1), duration_amount, duration_unit 

1405 ) 

1406 msg: str = "" 

1407 if config.current_datetime >= next_event_dt: 

1408 datasets_to_snapshot[label].append(dataset) # mark it as scheduled for snapshot creation 

1409 msg = " has passed" 

1410 next_event_dt = interner.intern(next_event_dt) 

1411 msgs.append((next_event_dt, dataset, label, msg)) 

1412 if is_caching and not p.dry_run: # update cache with latest state from 'zfs list -t snapshot' 

1413 # Per-label cache stores (atime=creation, mtime=snapshots_changed) so later runs can safely trust creation 

1414 # only when the label's mtime matches the current dataset-level '=' cache value. Excludes timestamp of label. 

1415 cache_file: str = self.cache.last_modified_cache_file(src, dataset, label_hashes[label]) 

1416 unixtimes: tuple[int, int] = (creation_unixtime, self.src_properties[dataset].snapshots_changed) 

1417 set_last_modification_time_safe(cache_file, unixtime_in_secs=unixtimes, if_more_recent=True) 

1418 

1419 labels: list[SnapshotLabel] = [] 

1420 config_labels: list[SnapshotLabel] = config.snapshot_labels() 

1421 for label in config_labels: 

1422 duration_amount_, _duration_unit = config.suffix_durations[label.suffix] 

1423 if duration_amount_ == 0 or config.create_src_snapshots_even_if_not_due: 

1424 datasets_to_snapshot[label] = sorted_datasets # take snapshot regardless of creation time of existing snaps 

1425 else: 

1426 labels.append(label) 

1427 if len(labels) == 0: 

1428 return datasets_to_snapshot # nothing more TBD 

1429 label_hashes: dict[SnapshotLabel, str] = { 

1430 label: sha256_128_urlsafe_base64(label.notimestamp_str()) for label in labels 

1431 } 

1432 

1433 # satisfy request from local cache as much as possible 

1434 cached_datasets_to_snapshot: dict[SnapshotLabel, list[str]] = defaultdict(list) 

1435 if is_caching_snapshots(p, src): 

1436 sorted_datasets_todo: list[str] = [] 

1437 time_threshold: float = time.time() - MATURITY_TIME_THRESHOLD_SECS 

1438 for dataset in sorted_datasets: 

1439 cache: SnapshotCache = self.cache 

1440 cached_snapshots_changed: int = cache.get_snapshots_changed(cache.last_modified_cache_file(src, dataset)) 

1441 if cached_snapshots_changed == 0: 

1442 sorted_datasets_todo.append(dataset) # request cannot be answered from cache 

1443 continue 

1444 if cached_snapshots_changed != self.src_properties[dataset].snapshots_changed: # get that prop "for free" 

1445 cache.invalidate_last_modified_cache_dataset(dataset) 

1446 sorted_datasets_todo.append(dataset) # request cannot be answered from cache 

1447 continue 

1448 if cached_snapshots_changed >= time_threshold: # Avoid equal-second races: only trust matured cache entries 

1449 sorted_datasets_todo.append(dataset) # cache entry isn't mature enough to be trusted; skip cache 

1450 continue 

1451 creation_unixtimes: list[int] = [] 

1452 for label_hash in label_hashes.values(): 

1453 # For per-label files, atime stores the latest matching snapshot's creation time, while mtime stores 

1454 # the dataset-level snapshots_changed observed when this label file was written. 

1455 atime, mtime = cache.get_snapshots_changed2(cache.last_modified_cache_file(src, dataset, label_hash)) 

1456 # Sanity check: trust per-label cache only when: 

1457 # - mtime equals the dataset-level '=' cache (same snapshots_changed), and 

1458 # - atime is plausible and not later than mtime (creation <= snapshots_changed), and 

1459 # - neither atime nor mtime is zero (unknown provenance). 

1460 # Otherwise fall back to 'zfs list -t snapshot' to avoid stale creation times after newer changes. 

1461 if atime == 0 or mtime == 0 or mtime != cached_snapshots_changed or atime > mtime: 

1462 sorted_datasets_todo.append(dataset) # request cannot be answered from cache 

1463 break 

1464 creation_unixtimes.append(atime) 

1465 if len(creation_unixtimes) == len(labels): 

1466 for j, label in enumerate(labels): 

1467 create_snapshot_if_latest_is_too_old( 

1468 cached_datasets_to_snapshot, dataset, label, creation_unixtimes[j] 

1469 ) 

1470 sorted_datasets = sorted_datasets_todo 

1471 

1472 def create_snapshot_fn(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None: 

1473 create_snapshot_if_latest_is_too_old(datasets_to_snapshot, dataset, labels[i], creation_unixtime_secs) 

1474 

1475 def on_finish_dataset(dataset: str) -> None: 

1476 if is_caching and not p.dry_run: 

1477 set_last_modification_time_safe( 

1478 self.cache.last_modified_cache_file(src, dataset), 

1479 unixtime_in_secs=self.src_properties[dataset].snapshots_changed, 

1480 if_more_recent=True, 

1481 ) 

1482 

1483 # fallback to 'zfs list -t snapshot' for any remaining datasets, as these couldn't be satisfied from local cache 

1484 is_caching = is_caching_snapshots(p, src) 

1485 datasets_without_snapshots: list[str] = self.handle_minmax_snapshots( 

1486 src, sorted_datasets, labels, fn_latest=create_snapshot_fn, fn_on_finish_dataset=on_finish_dataset 

1487 ) 

1488 for lbl in labels: # merge (sorted) results from local cache + 'zfs list -t snapshot' into (sorted) combined result 

1489 datasets_to_snapshot[lbl].sort() 

1490 if datasets_without_snapshots or (lbl in cached_datasets_to_snapshot): # +take snaps for snapshot-less datasets 

1491 datasets_to_snapshot[lbl] = list( # inputs to merge() are sorted, and outputs are sorted too 

1492 heapq.merge(datasets_to_snapshot[lbl], cached_datasets_to_snapshot[lbl], datasets_without_snapshots) 

1493 ) 

1494 for label, datasets in datasets_to_snapshot.items(): 

1495 assert (not self.is_test_mode) or datasets == sorted(datasets), "List is not sorted" 

1496 assert (not self.is_test_mode) or not has_duplicates(datasets), "List contains duplicates" 

1497 assert label 

1498 

1499 msgs.sort() # sort by time, dataset, label 

1500 for i in range(0, len(msgs), 10_000): # reduce logging overhead via mini-batching 

1501 text = "".join( 

1502 f"\nNext scheduled snapshot time: {next_event_dt} for {dataset}@{label}{msg}" 

1503 for next_event_dt, dataset, label, msg in msgs[i : i + 10_000] 

1504 ) 

1505 log.info("Next scheduled snapshot times ...%s", text) 

1506 

1507 # sort keys to ensure that we take snapshots for dailies before hourlies, and so on 

1508 label_indexes: dict[SnapshotLabel, int] = {label: k for k, label in enumerate(config_labels)} 

1509 datasets_to_snapshot = dict(sorted(datasets_to_snapshot.items(), key=lambda kv: label_indexes[kv[0]])) 

1510 return datasets_to_snapshot 

1511 

1512 def handle_minmax_snapshots( 

1513 self, 

1514 remote: Remote, 

1515 sorted_datasets: list[str], 

1516 labels: list[SnapshotLabel], 

1517 fn_latest: Callable[[int, int, str, str], None], # callback function for latest snapshot 

1518 fn_oldest: Callable[[int, int, str, str], None] | None = None, # callback function for oldest snapshot 

1519 fn_on_finish_dataset: Callable[[str], None] = lambda dataset: None, 

1520 ) -> list[str]: # thread-safe 

1521 """For each dataset in `sorted_datasets`, for each label in `labels`, finds the latest and oldest snapshot, and runs 

1522 the callback functions on them; Ignores the timestamp of the input labels and the timestamp of the snapshot names.""" 

1523 assert (not self.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted" 

1524 p = self.params 

1525 cmd = p.split_args(f"{p.zfs_program} list -t snapshot -d 1 -Hp -o createtxg,creation,name") # sort dataset,createtxg 

1526 datasets_with_snapshots: set[str] = set() 

1527 interner: SortedInterner[str] = SortedInterner(sorted_datasets) # reduces memory footprint 

1528 for lines in zfs_list_snapshots_in_parallel(self, remote, cmd, sorted_datasets, ordered=False): 

1529 # streaming group by dataset name (consumes constant memory only) 

1530 for dataset, group in itertools.groupby(lines, key=lambda line: line.rsplit("\t", 1)[1].split("@", 1)[0]): 

1531 dataset = interner.interned(dataset) 

1532 snapshots = sorted( # fetch all snapshots of current dataset and sort by createtxg,creation,name 

1533 (int(createtxg), int(creation_unixtime_secs), name.split("@", 1)[1]) 

1534 for createtxg, creation_unixtime_secs, name in (line.split("\t", 2) for line in group) 

1535 ) # perf: sorted() is fast because Timsort is close to O(N) for nearly sorted input, which is our case 

1536 assert len(snapshots) > 0 

1537 datasets_with_snapshots.add(dataset) 

1538 snapshot_names: tuple[str, ...] = tuple(snapshot[-1] for snapshot in snapshots) 

1539 reversed_snapshot_names: tuple[str, ...] = snapshot_names[::-1] 

1540 year_with_4_digits_regex: re.Pattern[str] = YEAR_WITH_FOUR_DIGITS_REGEX 

1541 year_with_4_digits_regex_fullmatch = year_with_4_digits_regex.fullmatch 

1542 startswith = str.startswith 

1543 endswith = str.endswith 

1544 fns = ((fn_latest, True),) if fn_oldest is None else ((fn_latest, True), (fn_oldest, False)) 

1545 for i, label in enumerate(labels): 

1546 infix: str = label.infix 

1547 start: str = label.prefix + infix 

1548 end: str = label.suffix 

1549 startlen: int = len(start) 

1550 endlen: int = len(end) 

1551 minlen: int = startlen + endlen if infix else 4 + startlen + endlen # year_with_four_digits_regex 

1552 startlen_4: int = startlen + 4 # [startlen:startlen+4] # year_with_four_digits_regex 

1553 has_infix: bool = bool(infix) 

1554 for fn, is_reverse in fns: 

1555 creation_unixtime_secs: int = 0 # find creation time of latest or oldest snapshot matching the label 

1556 minmax_snapshot: str = "" 

1557 for j, snapshot_name in enumerate(reversed_snapshot_names if is_reverse else snapshot_names): 

1558 if ( 

1559 endswith(snapshot_name, end) # aka snapshot_name.endswith(end) 

1560 and startswith(snapshot_name, start) # aka snapshot_name.startswith(start) 

1561 and len(snapshot_name) >= minlen 

1562 and (has_infix or year_with_4_digits_regex_fullmatch(snapshot_name, startlen, startlen_4)) 

1563 ): 

1564 k: int = len(snapshots) - j - 1 if is_reverse else j 

1565 creation_unixtime_secs = snapshots[k][1] 

1566 minmax_snapshot = snapshot_name 

1567 break 

1568 fn(i, creation_unixtime_secs, dataset, minmax_snapshot) 

1569 fn_on_finish_dataset(dataset) 

1570 datasets_without_snapshots = [dataset for dataset in sorted_datasets if dataset not in datasets_with_snapshots] 

1571 return datasets_without_snapshots 

1572 

1573 @staticmethod 

1574 def _cache_hits_msg(hits: int, misses: int) -> str: 

1575 total = hits + misses 

1576 return f", cache hits: {percent(hits, total, print_total=True)}, misses: {percent(misses, total, print_total=True)}" 

1577 

1578 

1579############################################################################# 

1580class DatasetProperties: 

1581 """Properties of a ZFS dataset.""" 

1582 

1583 __slots__ = ("recordsize", "snapshots_changed") # uses more compact memory layout than __dict__ 

1584 

1585 def __init__(self, recordsize: int, snapshots_changed: int) -> None: 

1586 # immutable variables: 

1587 self.recordsize: Final[int] = recordsize 

1588 

1589 # mutable variables: 

1590 self.snapshots_changed: int = snapshots_changed 

1591 

1592 

1593# Input format is [[user@]host:]dataset 

1594# 1234 5 6 

1595DATASET_LOCATOR_REGEX: Final[re.Pattern[str]] = re.compile(r"(((([^@]*)@)?([^:]+)):)?(.*)", flags=re.DOTALL) 

1596 

1597 

1598def parse_dataset_locator( 

1599 input_text: str, validate: bool = True, user: str | None = None, host: str | None = None, port: int | None = None 

1600) -> tuple[str, str, str, str, str]: 

1601 """Splits user@host:dataset into its components with optional checks.""" 

1602 

1603 def convert_ipv6(hostname: str) -> str: # support IPv6 without getting confused by host:dataset colon separator ... 

1604 return hostname.replace("|", ":") # ... and any colons that may be part of a (valid) ZFS dataset name 

1605 

1606 user_undefined: bool = user is None 

1607 if user is None: 

1608 user = "" 

1609 host_undefined: bool = host is None 

1610 if host is None: 

1611 host = "" 

1612 host = convert_ipv6(host) 

1613 user_host, dataset, pool = "", "", "" 

1614 

1615 if match := DATASET_LOCATOR_REGEX.fullmatch(input_text): 1615 ↛ 1632line 1615 didn't jump to line 1632 because the condition on line 1615 was always true

1616 if user_undefined: 

1617 user = match.group(4) or "" 

1618 if host_undefined: 

1619 host = match.group(5) or "" 

1620 host = convert_ipv6(host) 

1621 if host == "-": 

1622 host = "" 

1623 dataset = match.group(6) or "" 

1624 i = dataset.find("/") 

1625 pool = dataset[0:i] if i >= 0 else dataset 

1626 

1627 if user and host: 

1628 user_host = f"{user}@{host}" 

1629 elif host: 

1630 user_host = host 

1631 

1632 if validate: 

1633 validate_user_name(user, input_text) 

1634 validate_host_name(host, input_text) 

1635 if port is not None: 

1636 validate_port(port, f"Invalid port number: '{port}' for: '{input_text}' - ") 

1637 validate_dataset_name(dataset, input_text) 

1638 

1639 return user, host, user_host, pool, dataset 

1640 

1641 

1642def validate_user_name(user: str, input_text: str) -> None: 

1643 """Checks that the username is safe for ssh or local usage.""" 

1644 invalid_chars: str = SHELL_CHARS + "/" 

1645 if user and (".." in user or any(c.isspace() or c in invalid_chars for c in user)): 

1646 die(f"Invalid user name: '{user}' for: '{input_text}'") 

1647 

1648 

1649def validate_host_name(host: str, input_text: str) -> None: 

1650 """Checks hostname for forbidden characters or patterns.""" 

1651 invalid_chars: str = SHELL_CHARS + "/" 

1652 if host and (host.startswith("-") or ".." in host or any(c.isspace() or c in invalid_chars for c in host)): 

1653 die(f"Invalid host name: '{host}' for: '{input_text}'") 

1654 

1655 

1656def validate_port(port: str | int | None, message: str) -> None: 

1657 """Checks that port specification is a valid integer.""" 

1658 if isinstance(port, int): 

1659 port = str(port) 

1660 if port and not port.isdigit(): 

1661 die(message + f"must be empty or a positive integer: '{port}'") 

1662 

1663 

1664############################################################################# 

1665if __name__ == "__main__": 1665 ↛ 1666line 1665 didn't jump to line 1666 because the condition on line 1665 was never true

1666 main()