Coverage for bzfs_main / bzfs.py: 99%

1087 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-29 12:49 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15# Inline script metadata conforming to https://packaging.python.org/specifications/inline-script-metadata 

16# /// script 

17# requires-python = ">=3.9" 

18# dependencies = [] 

19# /// 

20# 

21""" 

22* Main CLI entry point for replicating and managing ZFS snapshots. It handles the low-level mechanics of `zfs send/receive`, 

23 data transfer, and snapshot management between two hosts. 

24* Overview of the bzfs.py codebase: 

25* The codebase starts with docs, definition of input data and associated argument parsing into a "Params" class. 

26* All CLI option/parameter values are reachable from the "Params" class. 

27* Control flow starts in main(), which kicks off a "Job". 

28* A Job runs one or more "tasks" via run_tasks(), each task replicating a separate dataset tree. 

29* The core replication algorithm is in run_task() and especially in replicate_datasets() and replicate_dataset(). 

30* The filter algorithms that apply include/exclude policies are in filter_datasets() and filter_snapshots(). 

31* The --create-src-snapshots-* and --delete-* and --compare-* and --monitor-* algorithms also start in run_task(). 

32* The main retry logic is in call_with_retries() and clear_resumable_recv_state_if_necessary(). 

33* Progress reporting for use during `zfs send/recv` data transfers is in class ProgressReporter. 

34* Executing a CLI command on a local or remote host is in run_ssh_command(). 

35* Network connection management is in refresh_ssh_connection_if_necessary() and class ConnectionPool. 

36* Cache functionality can be found by searching for this regex: .*cach.* 

37* The parallel processing engine is in itr_ssh_cmd_parallel() and process_datasets_in_parallel_and_fault_tolerant(). 

38* README.md is mostly auto-generated from the ArgumentParser help texts as the source of "truth", via update_readme.sh. 

39 Simply run that script whenever you change or add ArgumentParser help text. 

40""" 

41 

42from __future__ import ( 

43 annotations, 

44) 

45import argparse 

46import contextlib 

47import fcntl 

48import heapq 

49import itertools 

50import logging 

51import os 

52import re 

53import subprocess 

54import sys 

55import threading 

56import time 

57from collections import ( 

58 Counter, 

59 defaultdict, 

60) 

61from collections.abc import ( 

62 Collection, 

63 Sequence, 

64) 

65from datetime import ( 

66 datetime, 

67 timedelta, 

68) 

69from logging import ( 

70 Logger, 

71) 

72from pathlib import ( 

73 Path, 

74) 

75from subprocess import ( 

76 DEVNULL, 

77 PIPE, 

78 CalledProcessError, 

79 TimeoutExpired, 

80) 

81from typing import ( 

82 Any, 

83 Callable, 

84 Final, 

85 cast, 

86 final, 

87) 

88 

89import bzfs_main.loggers 

90from bzfs_main.argparse_actions import ( 

91 has_timerange_filter, 

92) 

93from bzfs_main.argparse_cli import ( 

94 EXCLUDE_DATASET_REGEXES_DEFAULT, 

95) 

96from bzfs_main.compare_snapshot_lists import ( 

97 run_compare_snapshot_lists, 

98) 

99from bzfs_main.configuration import ( 

100 AlertConfig, 

101 CreateSrcSnapshotConfig, 

102 LogParams, 

103 MonitorSnapshotAlert, 

104 Params, 

105 Remote, 

106 SnapshotLabel, 

107 resolve_r2r_mode, 

108) 

109from bzfs_main.detect import ( 

110 DISABLE_PRG, 

111 RemoteConfCacheItem, 

112 are_bookmarks_enabled, 

113 detect_available_programs, 

114 is_caching_snapshots, 

115 is_dummy, 

116 is_zpool_feature_enabled_or_active, 

117) 

118from bzfs_main.filter import ( 

119 SNAPSHOT_REGEX_FILTER_NAME, 

120 dataset_regexes, 

121 filter_datasets, 

122 filter_lines, 

123 filter_lines_except, 

124 filter_snapshots, 

125) 

126from bzfs_main.loggers import ( 

127 get_simple_logger, 

128 reset_logger, 

129 set_logging_runtime_defaults, 

130) 

131from bzfs_main.parallel_batch_cmd import ( 

132 run_ssh_cmd_parallel, 

133 zfs_list_snapshots_in_parallel, 

134) 

135from bzfs_main.progress_reporter import ( 

136 ProgressReporter, 

137 count_num_bytes_transferred_by_zfs_send, 

138) 

139from bzfs_main.replication import ( 

140 delete_bookmarks, 

141 delete_datasets, 

142 delete_snapshots, 

143 replicate_dataset, 

144) 

145from bzfs_main.snapshot_cache import ( 

146 MATURITY_TIME_THRESHOLD_SECS, 

147 MONITOR_CACHE_FILE_PREFIX, 

148 REPLICATION_CACHE_FILE_PREFIX, 

149 SnapshotCache, 

150 set_last_modification_time_safe, 

151) 

152from bzfs_main.util.connection import ( 

153 SHARED, 

154 ConnectionPool, 

155 MiniJob, 

156 MiniRemote, 

157 timeout, 

158) 

159from bzfs_main.util.parallel_iterator import ( 

160 run_in_parallel, 

161) 

162from bzfs_main.util.parallel_tasktree_policy import ( 

163 process_datasets_in_parallel_and_fault_tolerant, 

164) 

165from bzfs_main.util.retry import ( 

166 Retry, 

167 RetryableError, 

168 RetryTemplate, 

169 RetryTerminationError, 

170 RetryTiming, 

171) 

172from bzfs_main.util.utils import ( 

173 DESCENDANTS_RE_SUFFIX, 

174 DIE_STATUS, 

175 DONT_SKIP_DATASET, 

176 FILE_PERMISSIONS, 

177 LOG_DEBUG, 

178 LOG_TRACE, 

179 PROG_NAME, 

180 SHELL_CHARS_AND_SLASH, 

181 UMASK, 

182 YEAR_WITH_FOUR_DIGITS_REGEX, 

183 HashedInterner, 

184 SortedInterner, 

185 Subprocesses, 

186 SynchronizedBool, 

187 SynchronizedDict, 

188 TaskTiming, 

189 append_if_absent, 

190 compile_regexes, 

191 cut, 

192 die, 

193 has_duplicates, 

194 human_readable_bytes, 

195 human_readable_duration, 

196 is_descendant, 

197 percent, 

198 pretty_print_formatter, 

199 replace_in_lines, 

200 replace_prefix, 

201 sha256_85_urlsafe_base64, 

202 sha256_128_urlsafe_base64, 

203 stderr_to_str, 

204 termination_signal_handler, 

205 validate_dataset_name, 

206 validate_property_name, 

207 xappend, 

208 xfinally, 

209 xprint, 

210) 

211 

212# constants: 

213__version__: Final[str] = bzfs_main.argparse_cli.__version__ 

214CRITICAL_STATUS: Final[int] = 2 

215WARNING_STATUS: Final[int] = 1 

216STILL_RUNNING_STATUS: Final[int] = 4 

217MIN_PYTHON_VERSION: Final[tuple[int, int]] = (3, 9) 

218if sys.version_info < MIN_PYTHON_VERSION: 

219 print(f"ERROR: {PROG_NAME} requires Python version >= {'.'.join(map(str, MIN_PYTHON_VERSION))}!") 

220 sys.exit(DIE_STATUS) 

221 

222 

223############################################################################# 

224def argument_parser() -> argparse.ArgumentParser: 

225 """Returns the CLI parser used by bzfs.""" 

226 return bzfs_main.argparse_cli.argument_parser() 

227 

228 

229def main() -> None: 

230 """API for command line clients.""" 

231 prev_umask: int = os.umask(UMASK) 

232 try: 

233 set_logging_runtime_defaults() 

234 # On CTRL-C and SIGTERM, send signal to all descendant processes to terminate them 

235 termination_event: threading.Event = threading.Event() 

236 with termination_signal_handler(termination_events=[termination_event]): 

237 run_main(argument_parser().parse_args(), sys.argv, termination_event=termination_event) 

238 except subprocess.CalledProcessError as e: 

239 sys.exit(normalize_called_process_error(e)) 

240 finally: 

241 os.umask(prev_umask) # restore prior global state 

242 

243 

244def run_main( 

245 args: argparse.Namespace, 

246 sys_argv: list[str] | None = None, 

247 log: Logger | None = None, 

248 termination_event: threading.Event | None = None, 

249) -> None: 

250 """API for Python clients; visible for testing; may become a public API eventually.""" 

251 Job(termination_event=termination_event).run_main(args, sys_argv, log) 

252 

253 

254############################################################################# 

255@final 

256class Job(MiniJob): 

257 """Executes one bzfs run, coordinating snapshot replication tasks.""" 

258 

259 def __init__(self, termination_event: threading.Event | None = None) -> None: 

260 self.params: Params 

261 self.termination_event: Final[threading.Event] = termination_event or threading.Event() 

262 self.retry_timing: Final[RetryTiming] = RetryTiming.make_from(self.termination_event).copy( 

263 on_before_attempt=lambda retry: None 

264 ) 

265 self.task_timing: Final[TaskTiming] = TaskTiming.make_from(self.termination_event) 

266 self.subprocesses: Subprocesses = Subprocesses(self.termination_event.is_set) 

267 self.all_dst_dataset_exists: Final[dict[str, dict[str, bool]]] = defaultdict(lambda: defaultdict(bool)) 

268 self.dst_dataset_exists: SynchronizedDict[str, bool] = SynchronizedDict({}) 

269 self.src_properties: dict[str, DatasetProperties] = {} 

270 self.dst_properties: dict[str, DatasetProperties] = {} 

271 self.all_exceptions: list[str] = [] 

272 self.all_exceptions_count: int = 0 

273 self.max_exceptions_to_summarize: int = 10000 

274 self.first_exception: BaseException | None = None 

275 self.remote_conf_cache: dict[tuple, RemoteConfCacheItem] = {} 

276 self.max_datasets_per_minibatch_on_list_snaps: dict[str, int] = {} 

277 self.max_workers: dict[str, int] = {} 

278 self.progress_reporter: ProgressReporter = cast(ProgressReporter, None) 

279 self.is_first_replication_task: Final[SynchronizedBool] = SynchronizedBool(True) 

280 self.replication_start_time_nanos: int = time.monotonic_ns() 

281 self.timeout_nanos: int | None = None # timestamp aka instant in time 

282 self.timeout_duration_nanos: int | None = None # duration (not a timestamp); for logging only 

283 self.cache: SnapshotCache = SnapshotCache(self) 

284 self.stats_lock: Final[threading.Lock] = threading.Lock() 

285 self.num_cache_hits: int = 0 

286 self.num_cache_misses: int = 0 

287 self.num_snapshots_found: int = 0 

288 self.num_snapshots_replicated: int = 0 

289 

290 self.is_test_mode: bool = False # for testing only 

291 self.creation_prefix: str = "" # for testing only 

292 self.use_select: bool = False # for testing only 

293 self.progress_update_intervals: tuple[float, float] | None = None # for testing only 

294 self.error_injection_triggers: dict[str, Counter[str]] = {} # for testing only 

295 self.delete_injection_triggers: dict[str, Counter[str]] = {} # for testing only 

296 self.param_injection_triggers: dict[str, dict[str, bool]] = {} # for testing only 

297 self.inject_params: dict[str, bool] = {} # for testing only 

298 self.injection_lock: threading.Lock = threading.Lock() # for testing only 

299 self.max_command_line_bytes: int | None = None # for testing only 

300 

301 def shutdown(self) -> None: 

302 """Exits any multiplexed ssh sessions that may be leftover.""" 

303 cache_items: Collection[RemoteConfCacheItem] = self.remote_conf_cache.values() 

304 for i, cache_item in enumerate(cache_items): 

305 cache_item.connection_pools.shutdown(f"{i + 1}/{len(cache_items)}") 

306 

307 def terminate(self) -> None: 

308 """Shuts down gracefully; also terminates descendant processes, if any.""" 

309 with xfinally(self.subprocesses.terminate_process_subtrees): 

310 self.shutdown() 

311 

312 def _retry_template(self) -> RetryTemplate: 

313 p = self.params 

314 return RetryTemplate(policy=p.retry_policy.copy(timing=self.retry_timing), log=p.log) 

315 

316 def run_main(self, args: argparse.Namespace, sys_argv: list[str] | None = None, log: Logger | None = None) -> None: 

317 """Parses CLI arguments, sets up logging, and executes main job loop.""" 

318 assert isinstance(self.error_injection_triggers, dict) 

319 assert isinstance(self.delete_injection_triggers, dict) 

320 assert isinstance(self.inject_params, dict) 

321 logger_name_suffix: str = "" 

322 

323 def _reset_logger() -> None: 

324 if logger_name_suffix and log is not None: # reset Logger unless it's a Logger outside of our control 

325 reset_logger(log) 

326 

327 with xfinally(_reset_logger): # runs _reset_logger() on exit, without masking error raised in body of `with` block 

328 try: 

329 log_params: LogParams = LogParams(args) 

330 logger_name_suffix = "" if log is not None else log_params.logger_name_suffix 

331 log = bzfs_main.loggers.get_logger( 

332 log_params=log_params, args=args, log=log, logger_name_suffix=logger_name_suffix 

333 ) 

334 log.info("%s", f"Log file is: {log_params.log_file}") 

335 except BaseException as e: 

336 simple_log: Logger = get_simple_logger(PROG_NAME, logger_name_suffix=logger_name_suffix) 

337 try: 

338 simple_log.error("Log init: %s", e, exc_info=not isinstance(e, SystemExit)) 

339 finally: 

340 reset_logger(simple_log) 

341 raise 

342 

343 aux_args: list[str] = [] 

344 if getattr(args, "include_snapshot_plan", None): 

345 aux_args += args.include_snapshot_plan 

346 if getattr(args, "delete_dst_snapshots_except_plan", None): 

347 aux_args += args.delete_dst_snapshots_except_plan 

348 if len(aux_args) > 0: 

349 log.info("Auxiliary CLI arguments: %s", " ".join(aux_args)) 

350 args = argument_parser().parse_args(xappend(aux_args, "--", args.root_dataset_pairs), namespace=args) 

351 

352 def log_error_on_exit(error: Any, status_code: Any, exc_info: bool = False) -> None: 

353 log.error("%s%s", f"Exiting {PROG_NAME} with status code {status_code}. Cause: ", error, exc_info=exc_info) 

354 

355 try: 

356 log.info("CLI arguments: %s %s", " ".join(sys_argv or []), f"[uid: {os.getuid()}, euid: {os.geteuid()}]") 

357 if self.is_test_mode: 

358 log.log(LOG_TRACE, "Parsed CLI arguments: %s", args) 

359 self.params = p = Params(args, sys_argv or [], log_params, log, self.inject_params) 

360 self.timeout_duration_nanos = p.timeout_duration_nanos 

361 lock_file: str = p.lock_file_name() 

362 lock_fd = os.open( 

363 lock_file, os.O_WRONLY | os.O_TRUNC | os.O_CREAT | os.O_NOFOLLOW | os.O_CLOEXEC, FILE_PERMISSIONS 

364 ) 

365 with xfinally(lambda: os.close(lock_fd)): 

366 try: 

367 # Acquire an exclusive lock; will raise a BlockingIOError if lock is already held by this process or 

368 # another process. The (advisory) lock is auto-released when the process terminates or the fd is 

369 # closed. 

370 fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # LOCK_NB ... non-blocking 

371 except BlockingIOError: 

372 msg = "Exiting as same previous periodic job is still running without completion yet per " 

373 die(msg + lock_file, STILL_RUNNING_STATUS) 

374 

375 # xfinally: unlink the lock_file while still holding the flock on its fd - it's a correct and safe 

376 # standard POSIX pattern: 

377 # - Performing unlink() before close(fd) avoids a race where a subsequent bzfs process could recreate and 

378 # lock a fresh inode for the same path between our close() and a later unlink(). In that case, a late 

379 # unlink would delete the newer process's lock_file path. 

380 # - At this point, critical work is complete; the remaining steps are shutdown mechanics that have no 

381 # side effect, so this pattern is correct, safe, and simple. 

382 with xfinally(lambda: Path(lock_file).unlink(missing_ok=True)): # don't accumulate stale files 

383 try: 

384 self.run_tasks() # do the real work 

385 except BaseException: 

386 self.terminate() 

387 raise 

388 self.shutdown() 

389 with contextlib.suppress(BrokenPipeError): 

390 sys.stderr.flush() 

391 sys.stdout.flush() 

392 except subprocess.CalledProcessError as e: 

393 log_error_on_exit(e, e.returncode) 

394 raise 

395 except SystemExit as e: 

396 log_error_on_exit(e, e.code) 

397 raise 

398 except (subprocess.TimeoutExpired, UnicodeDecodeError) as e: 

399 log_error_on_exit(e, DIE_STATUS) 

400 raise SystemExit(DIE_STATUS) from e 

401 except re.error as e: 

402 log_error_on_exit(f"{e} within regex {e.pattern!r}", DIE_STATUS) 

403 raise SystemExit(DIE_STATUS) from e 

404 except BaseException as e: 

405 log_error_on_exit(e, DIE_STATUS, exc_info=True) 

406 raise SystemExit(DIE_STATUS) from e 

407 finally: 

408 log.info("%s", f"Log file was: {log_params.log_file}") 

409 log.info("Success. Goodbye!") 

410 with contextlib.suppress(BrokenPipeError): 

411 sys.stderr.flush() 

412 sys.stdout.flush() 

413 

414 def run_tasks(self) -> None: 

415 """Executes replication cycles, repeating until daemon lifetime expires.""" 

416 p, log = self.params, self.params.log 

417 self.all_exceptions = [] 

418 self.all_exceptions_count = 0 

419 self.first_exception = None 

420 self.remote_conf_cache = {} 

421 self.validate_once() 

422 self.replication_start_time_nanos = time.monotonic_ns() 

423 self.progress_reporter = ProgressReporter(log, p.pv_program_opts, self.use_select, self.progress_update_intervals) 

424 with xfinally(lambda: self.progress_reporter.stop()): 

425 daemon_stoptime_nanos: int = time.monotonic_ns() + p.daemon_lifetime_nanos 

426 while True: # loop for daemon mode 

427 self.timeout_nanos = ( 

428 None if p.timeout_duration_nanos is None else time.monotonic_ns() + p.timeout_duration_nanos 

429 ) 

430 self.all_dst_dataset_exists.clear() 

431 self.progress_reporter.reset() 

432 src, dst = p.src, p.dst 

433 for src_root_dataset, dst_root_dataset in p.root_dataset_pairs: 

434 if self.termination_event.is_set(): 

435 self.terminate() 

436 break 

437 src.root_dataset = src.basis_root_dataset = src_root_dataset 

438 dst.root_dataset = dst.basis_root_dataset = dst_root_dataset 

439 p.curr_zfs_send_program_opts = p.zfs_send_program_opts.copy() 

440 if p.daemon_lifetime_nanos > 0: 

441 self.timeout_nanos = ( 

442 None if p.timeout_duration_nanos is None else time.monotonic_ns() + p.timeout_duration_nanos 

443 ) 

444 recurs_sep = " " if p.recursive_flag else "" 

445 task_description = f"{src.basis_root_dataset} {p.recursive_flag}{recurs_sep}--> {dst.basis_root_dataset}" 

446 if len(p.root_dataset_pairs) > 1: 

447 log.info("Starting task: %s", task_description + " ...") 

448 try: 

449 try: 

450 self.maybe_inject_error(cmd=[], error_trigger="retryable_run_tasks") 

451 timeout(self) 

452 self.validate_task() 

453 self.run_task() # do the real work 

454 except RetryableError as retryable_error: 

455 cause: BaseException | None = retryable_error.__cause__ 

456 assert cause is not None 

457 raise cause.with_traceback(cause.__traceback__) # noqa: B904 re-raise of cause without chaining 

458 except (CalledProcessError, TimeoutExpired, SystemExit, UnicodeDecodeError, RetryTerminationError) as e: 

459 if p.skip_on_error == "fail" or ( 

460 isinstance(e, subprocess.TimeoutExpired) and p.daemon_lifetime_nanos == 0 

461 ): 

462 raise 

463 log.error("%s", e) 

464 self.append_exception(e, "task", task_description) 

465 if not self.sleep_until_next_daemon_iteration(daemon_stoptime_nanos): 

466 break 

467 if not p.skip_replication: 

468 self.print_replication_stats(self.replication_start_time_nanos) 

469 error_count = self.all_exceptions_count 

470 if error_count > 0 and p.daemon_lifetime_nanos == 0: 

471 msgs = "\n".join(f"{i + 1}/{error_count}: {e}" for i, e in enumerate(self.all_exceptions)) 

472 log.error("%s", f"Tolerated {error_count} errors. Error Summary: \n{msgs}") 

473 assert self.first_exception is not None 

474 raise self.first_exception 

475 

476 def append_exception(self, e: BaseException, task_name: str, task_description: str) -> None: 

477 """Records and logs an exception that was encountered while running a subtask.""" 

478 self.first_exception = self.first_exception or e 

479 if len(self.all_exceptions) < self.max_exceptions_to_summarize: # cap max memory consumption 

480 self.all_exceptions.append(str(e)) 

481 self.all_exceptions_count += 1 

482 self.params.log.error(f"#{self.all_exceptions_count}: Done with %s: %s", task_name, task_description) 

483 

484 def sleep_until_next_daemon_iteration(self, daemon_stoptime_nanos: int) -> bool: 

485 """Pauses until next scheduled snapshot time or daemon stop; Returns True to continue daemon loop; False to stop.""" 

486 sleep_nanos: int = daemon_stoptime_nanos - time.monotonic_ns() 

487 if sleep_nanos <= 0: 

488 return False 

489 self.progress_reporter.pause() 

490 p, log = self.params, self.params.log 

491 config: CreateSrcSnapshotConfig = p.create_src_snapshots_config 

492 curr_datetime: datetime = config.current_datetime + timedelta(microseconds=1) 

493 next_snapshotting_event_dt: datetime = min( 

494 ( 

495 config.anchors.round_datetime_up_to_duration_multiple(curr_datetime, duration_amount, duration_unit) 

496 for duration_amount, duration_unit in config.suffix_durations.values() 

497 ), 

498 default=curr_datetime + timedelta(days=10 * 365), # infinity 

499 ) 

500 offset: timedelta = next_snapshotting_event_dt - datetime.now(config.tz) 

501 offset_nanos: int = (offset.days * 86400 + offset.seconds) * 1_000_000_000 + offset.microseconds * 1_000 

502 sleep_nanos = min(sleep_nanos, max(0, offset_nanos)) 

503 log.info("Daemon sleeping for: %s%s", human_readable_duration(sleep_nanos), f" ... [Log {p.log_params.log_file}]") 

504 self.termination_event.wait(sleep_nanos / 1_000_000_000) # allow early wakeup on async termination 

505 config.current_datetime = datetime.now(config.tz) 

506 return time.monotonic_ns() < daemon_stoptime_nanos and not self.termination_event.is_set() 

507 

508 def print_replication_stats(self, start_time_nanos: int) -> None: 

509 """Logs overall replication statistics after a job cycle completes.""" 

510 p, log = self.params, self.params.log 

511 elapsed_nanos: int = time.monotonic_ns() - start_time_nanos 

512 msg = p.dry(f"zfs sent {self.num_snapshots_replicated} snapshots in {human_readable_duration(elapsed_nanos)}.") 

513 if p.is_program_available("pv", "local"): 

514 sent_bytes: int = count_num_bytes_transferred_by_zfs_send(p.log_params.pv_log_file) 

515 sent_bytes_per_sec: int = round(1_000_000_000 * sent_bytes / (elapsed_nanos or 1)) 

516 msg += f" zfs sent {human_readable_bytes(sent_bytes)} [{human_readable_bytes(sent_bytes_per_sec)}/s]." 

517 log.info("%s", msg.ljust(p.log_params.terminal_columns - len("2024-01-01 23:58:45 [I] "))) 

518 

519 def validate_once(self) -> None: 

520 """Validates CLI parameters and compiles regex lists one time only, which will later be reused many times.""" 

521 p = self.params 

522 p.zfs_recv_ox_names = self.recv_option_property_names(p.zfs_recv_program_opts) 

523 for snapshot_filter in p.snapshot_filters: 

524 for _filter in snapshot_filter: 

525 if _filter.name == SNAPSHOT_REGEX_FILTER_NAME: 

526 exclude_snapshot_regexes_strings, include_snapshot_regexes_strings = cast( 

527 tuple[list[str], list[str]], _filter.options 

528 ) 

529 exclude_snapshot_regexes = compile_regexes(exclude_snapshot_regexes_strings) 

530 include_snapshot_regexes = compile_regexes(include_snapshot_regexes_strings or [".*"]) 

531 _filter.options = (exclude_snapshot_regexes, include_snapshot_regexes) 

532 

533 exclude_regexes: list[str] = [EXCLUDE_DATASET_REGEXES_DEFAULT] 

534 if len(p.args.exclude_dataset_regex) > 0: # some patterns don't exclude anything 

535 exclude_regexes = [regex for regex in p.args.exclude_dataset_regex if regex != "" and regex != "!.*"] 

536 include_regexes: list[str] = p.args.include_dataset_regex 

537 

538 # relative datasets need not be compiled more than once as they don't change between tasks 

539 def separate_abs_vs_rel_datasets(datasets: list[str]) -> tuple[list[str], list[str]]: 

540 abs_datasets: list[str] = [] 

541 rel_datasets: list[str] = [] 

542 for dataset in datasets: 

543 (abs_datasets if dataset.startswith("/") else rel_datasets).append(dataset) 

544 return abs_datasets, rel_datasets 

545 

546 p.abs_exclude_datasets, rel_exclude_datasets = separate_abs_vs_rel_datasets(p.args.exclude_dataset) 

547 p.abs_include_datasets, rel_include_datasets = separate_abs_vs_rel_datasets(p.args.include_dataset) 

548 suffix = DESCENDANTS_RE_SUFFIX 

549 p.tmp_exclude_dataset_regexes, p.tmp_include_dataset_regexes = ( 

550 compile_regexes(exclude_regexes + dataset_regexes(p.src, p.dst, rel_exclude_datasets), suffix=suffix), 

551 compile_regexes(include_regexes + dataset_regexes(p.src, p.dst, rel_include_datasets), suffix=suffix), 

552 ) 

553 

554 if p.pv_program != DISABLE_PRG: 

555 pv_program_opts_set = set(p.pv_program_opts) 

556 if pv_program_opts_set.isdisjoint({"--bytes", "-b", "--bits", "-8"}): 

557 die("--pv-program-opts must contain one of --bytes or --bits for progress metrics to function.") 

558 if not p.log_params.quiet: 

559 for opts in [["--eta", "-e"], ["--fineta", "-I"], ["--average-rate", "-a"]]: 

560 if pv_program_opts_set.isdisjoint(opts): 

561 die(f"--pv-program-opts must contain one of {', '.join(opts)} for progress report line to function.") 

562 

563 src, dst = p.src, p.dst 

564 for remote in [src, dst]: 

565 r, loc = remote, remote.location 

566 validate_user_name(r.basis_ssh_user, f"--ssh-{loc}-user") 

567 validate_host_name(r.basis_ssh_host, f"--ssh-{loc}-host") 

568 validate_port(r.ssh_port, f"--ssh-{loc}-port ") 

569 

570 def validate_task(self) -> None: 

571 """Validates a single replication task before execution.""" 

572 p, log = self.params, self.params.log 

573 src, dst = p.src, p.dst 

574 for remote in [src, dst]: 

575 r = remote 

576 r.ssh_user, r.ssh_host, r.ssh_user_host, r.pool, r.root_dataset = parse_dataset_locator( 

577 r.basis_root_dataset, user=r.basis_ssh_user, host=r.basis_ssh_host, port=r.ssh_port 

578 ) 

579 r.sudo, r.use_zfs_delegation = self.sudo_cmd(r.ssh_user_host, r.ssh_user) 

580 local_addrs = ("",) if self.is_test_mode else ("", "127.0.0.1", "::1") # ::1 is IPv6 version of loopback address 

581 remote.is_nonlocal = r.ssh_host not in local_addrs 

582 self.dst_dataset_exists = SynchronizedDict(self.all_dst_dataset_exists[dst.ssh_user_host]) 

583 

584 if src.ssh_host == dst.ssh_host: 

585 msg = f"src: {src.basis_root_dataset}, dst: {dst.basis_root_dataset}" 

586 if src.root_dataset == dst.root_dataset: 

587 die(f"Source and destination dataset must not be the same! {msg}") 

588 if p.recursive and ( 

589 is_descendant(src.root_dataset, of_root_dataset=dst.root_dataset) 

590 or is_descendant(dst.root_dataset, of_root_dataset=src.root_dataset) 

591 ): 

592 die(f"Source and destination dataset trees must not overlap! {msg}") 

593 

594 suffx: str = DESCENDANTS_RE_SUFFIX # also match descendants of a matching dataset 

595 p.exclude_dataset_regexes, p.include_dataset_regexes = ( 

596 p.tmp_exclude_dataset_regexes + compile_regexes(dataset_regexes(src, dst, p.abs_exclude_datasets), suffix=suffx), 

597 p.tmp_include_dataset_regexes + compile_regexes(dataset_regexes(src, dst, p.abs_include_datasets), suffix=suffx), 

598 ) 

599 if len(p.include_dataset_regexes) == 0: 

600 p.include_dataset_regexes = [(re.compile(r".*"), False)] 

601 

602 detect_available_programs(self) 

603 p.r2r_mode = resolve_r2r_mode(p) 

604 

605 if is_zpool_feature_enabled_or_active(p, dst, "feature@large_blocks"): 

606 append_if_absent(p.curr_zfs_send_program_opts, "--large-block") 

607 

608 self.max_workers = {} 

609 self.max_datasets_per_minibatch_on_list_snaps = {} 

610 for r in [src, dst]: 

611 cpus: int = int(p.available_programs[r.location].get("getconf_cpu_count", 8)) 

612 threads, is_percent = p.threads 

613 cpus = max(1, round(cpus * threads / 100.0) if is_percent else round(threads)) 

614 self.max_workers[r.location] = cpus 

615 bs: int = max(1, p.max_datasets_per_batch_on_list_snaps) # 1024 by default 

616 max_datasets_per_minibatch: int = p.max_datasets_per_minibatch_on_list_snaps 

617 if max_datasets_per_minibatch <= 0: 

618 max_datasets_per_minibatch = max(1, bs // cpus) 

619 max_datasets_per_minibatch = min(bs, max_datasets_per_minibatch) 

620 self.max_datasets_per_minibatch_on_list_snaps[r.location] = max_datasets_per_minibatch 

621 log.log( 

622 LOG_TRACE, 

623 "%s", 

624 f"max_datasets_per_batch_on_list_snaps: {p.max_datasets_per_batch_on_list_snaps}, " 

625 f"max_datasets_per_minibatch_on_list_snaps: {max_datasets_per_minibatch}, " 

626 f"max_workers: {self.max_workers[r.location]}, " 

627 f"location: {r.location}", 

628 ) 

629 if self.is_test_mode: 

630 log.log(LOG_TRACE, "Validated Param values: %s", pretty_print_formatter(self.params)) 

631 

632 def sudo_cmd(self, ssh_user_host: str, ssh_user: str) -> tuple[str, bool]: 

633 """Returns sudo command prefix and whether root privileges are required.""" 

634 p: Params = self.params 

635 assert isinstance(ssh_user_host, str) 

636 assert isinstance(ssh_user, str) 

637 assert isinstance(p.sudo_program, str) 

638 assert isinstance(p.enable_privilege_elevation, bool) 

639 

640 is_root: bool = True 

641 if ssh_user_host != "": 

642 if ssh_user == "": 

643 if os.getuid() != 0: 

644 is_root = False 

645 elif ssh_user != "root": 

646 is_root = False 

647 elif os.getuid() != 0: 

648 is_root = False 

649 

650 if is_root: 

651 sudo = "" # using sudo in an attempt to make ZFS operations work even if we are not root user? 

652 use_zfs_delegation = False # or instead using 'zfs allow' delegation? 

653 return sudo, use_zfs_delegation 

654 elif p.enable_privilege_elevation: 

655 if p.sudo_program == DISABLE_PRG: 

656 die(f"sudo CLI is not available on host: {ssh_user_host or 'localhost'}") 

657 # The '-n' option makes 'sudo' safer and more fail-fast. It avoids having sudo prompt the user for input of any 

658 # kind. If a password is required for the sudo command to run, sudo will display an error message and exit. 

659 return p.sudo_program + " -n", False 

660 else: 

661 return "", True 

662 

663 def run_task(self) -> None: 

664 """Replicates all snapshots for the current root dataset pair.""" 

665 

666 def filter_src_datasets() -> list[str]: # apply --{include|exclude}-dataset policy 

667 return filter_datasets(self, src, basis_src_datasets) if src_datasets is None else src_datasets 

668 

669 p, log = self.params, self.params.log 

670 src, dst = p.src, p.dst 

671 max_workers: int = min(self.max_workers[src.location], self.max_workers[dst.location]) 

672 recursive_sep: str = " " if p.recursive_flag else "" 

673 task_description: str = f"{src.basis_root_dataset} {p.recursive_flag}{recursive_sep}--> {dst.basis_root_dataset} ..." 

674 failed: bool = False 

675 src_datasets: list[str] | None = None 

676 basis_src_datasets: list[str] = [] 

677 self.src_properties = {} 

678 self.dst_properties = {} 

679 if not is_dummy(src): # find src dataset or all datasets in src dataset tree (with --recursive) 

680 basis_src_datasets = self.list_src_datasets_task() 

681 

682 if not p.create_src_snapshots_config.skip_create_src_snapshots: 

683 log.info(p.dry("--create-src-snapshots: %s"), f"{src.basis_root_dataset} {p.recursive_flag}{recursive_sep}...") 

684 src_datasets = filter_src_datasets() # apply include/exclude policy 

685 self.create_src_snapshots_task(basis_src_datasets, src_datasets) 

686 

687 # Optionally, replicate src.root_dataset (optionally including its descendants) to dst.root_dataset 

688 if not p.skip_replication: 

689 if len(basis_src_datasets) == 0: 

690 die(f"Replication: Source dataset does not exist: {src.basis_root_dataset}") 

691 if is_dummy(dst): 

692 die("Replication: Destination may be a dummy dataset only if exclusively creating snapshots on the source!") 

693 src_datasets = filter_src_datasets() # apply include/exclude policy 

694 failed = self.replicate_datasets(src_datasets, task_description, max_workers) 

695 

696 if failed or not ( 

697 p.delete_dst_datasets 

698 or p.delete_dst_snapshots 

699 or p.delete_empty_dst_datasets 

700 or p.compare_snapshot_lists 

701 or p.monitor_snapshots_config.enable_monitor_snapshots 

702 ): 

703 return 

704 log.info("Listing dst datasets: %s", task_description) 

705 if is_dummy(dst): 

706 die("Destination may be a dummy dataset only if exclusively creating snapshots on the source!") 

707 basis_dst_datasets: list[str] = self.list_dst_datasets_task() 

708 dst_datasets: list[str] = filter_datasets(self, dst, basis_dst_datasets) # apply include/exclude policy 

709 

710 if p.delete_dst_datasets and not failed: 

711 log.info(p.dry("--delete-dst-datasets: %s"), task_description) 

712 basis_dst_datasets, dst_datasets = self.delete_dst_datasets_task( 

713 basis_src_datasets, basis_dst_datasets, dst_datasets 

714 ) 

715 

716 if p.delete_dst_snapshots and not failed: 

717 log.info(p.dry("--delete-dst-snapshots: %s"), task_description + f" [{len(dst_datasets)} datasets]") 

718 failed = self.delete_destination_snapshots_task(basis_src_datasets, dst_datasets, max_workers, task_description) 

719 

720 if p.delete_empty_dst_datasets and p.recursive and not failed: 

721 log.info(p.dry("--delete-empty-dst-datasets: %s"), task_description) 

722 basis_dst_datasets, dst_datasets = self.delete_empty_dst_datasets_task(basis_dst_datasets, dst_datasets) 

723 

724 if p.compare_snapshot_lists and not failed: 

725 log.info("--compare-snapshot-lists: %s", task_description) 

726 if len(basis_src_datasets) == 0 and not is_dummy(src): 

727 die(f"Source dataset does not exist: {src.basis_root_dataset}") 

728 src_datasets = filter_src_datasets() # apply include/exclude policy 

729 run_compare_snapshot_lists(self, src_datasets, dst_datasets) 

730 

731 if p.monitor_snapshots_config.enable_monitor_snapshots and not failed: 

732 log.info("--monitor-snapshots: %s", task_description) 

733 src_datasets = filter_src_datasets() # apply include/exclude policy 

734 self.monitor_snapshots_task(src_datasets, dst_datasets, task_description) 

735 

736 def list_src_datasets_task(self) -> list[str]: 

737 """Lists datasets on the source host.""" 

738 p = self.params 

739 src = p.src 

740 basis_src_datasets: list[str] = [] 

741 is_caching: bool = is_caching_snapshots(p, src) 

742 props: str = "volblocksize,recordsize,name" 

743 props = "snapshots_changed," + props if is_caching else props 

744 cmd: list[str] = p.split_args( 

745 f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o {props} {p.recursive_flag}", src.root_dataset 

746 ) 

747 for line in (self.try_ssh_command_with_retries(src, LOG_DEBUG, cmd=cmd) or "").splitlines(): 

748 cols: list[str] = line.split("\t") 

749 snapshots_changed, volblocksize, recordsize, src_dataset = cols if is_caching else ["-"] + cols 

750 self.src_properties[src_dataset] = DatasetProperties( 

751 recordsize=int(recordsize) if recordsize != "-" else -int(volblocksize), 

752 snapshots_changed=int(snapshots_changed) if snapshots_changed and snapshots_changed != "-" else 0, 

753 ) 

754 basis_src_datasets.append(src_dataset) 

755 assert (not self.is_test_mode) or basis_src_datasets == sorted(basis_src_datasets), "List is not sorted" 

756 return basis_src_datasets 

757 

758 def list_dst_datasets_task(self) -> list[str]: 

759 """Lists datasets on the destination host.""" 

760 p, log = self.params, self.params.log 

761 dst = p.dst 

762 is_caching: bool = is_caching_snapshots(p, dst) and p.monitor_snapshots_config.enable_monitor_snapshots 

763 props: str = "name" 

764 props = "snapshots_changed," + props if is_caching else props 

765 cmd: list[str] = p.split_args( 

766 f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o {props} {p.recursive_flag}", dst.root_dataset 

767 ) 

768 basis_dst_datasets: list[str] = [] 

769 basis_dst_datasets_str: str | None = self.try_ssh_command_with_retries(dst, LOG_TRACE, cmd=cmd) 

770 if basis_dst_datasets_str is None: 

771 log.warning("Destination dataset does not exist: %s", dst.root_dataset) 

772 else: 

773 for line in basis_dst_datasets_str.splitlines(): 

774 cols: list[str] = line.split("\t") 

775 snapshots_changed, dst_dataset = cols if is_caching else ["-"] + cols 

776 self.dst_properties[dst_dataset] = DatasetProperties( 

777 recordsize=0, 

778 snapshots_changed=int(snapshots_changed) if snapshots_changed and snapshots_changed != "-" else 0, 

779 ) 

780 basis_dst_datasets.append(dst_dataset) 

781 assert (not self.is_test_mode) or basis_dst_datasets == sorted(basis_dst_datasets), "List is not sorted" 

782 return basis_dst_datasets 

783 

784 def create_src_snapshots_task(self, basis_src_datasets: list[str], src_datasets: list[str]) -> None: 

785 """Atomically creates a new snapshot of the src datasets selected by --{include|exclude}-dataset* policy; implements 

786 --create-src-snapshots. 

787 

788 The implementation attempts to fit as many datasets as possible into a single (atomic) 'zfs snapshot' command line, 

789 using lexicographical sort order, and using 'zfs snapshot -r' to the extent that this is compatible with the 

790 --{include|exclude}-dataset* pruning policy. The snapshots of all datasets that fit within the same single 'zfs 

791 snapshot' CLI invocation will be taken within the same ZFS transaction group, and correspondingly have identical 

792 'createtxg' ZFS property (but not necessarily identical 'creation' ZFS time property as ZFS actually provides no such 

793 guarantee), and thus be consistent. Dataset names that can't fit into a single command line are spread over multiple 

794 command line invocations, respecting the limits that the operating system places on the maximum length of a single 

795 command line, per `getconf ARG_MAX`. 

796 

797 Time complexity is O((N log N) + (N * M log M)) where N is the number of datasets and M is the number of snapshots 

798 per dataset. Space complexity is O(max(N, M)). 

799 """ 

800 p, log = self.params, self.params.log 

801 src = p.src 

802 if len(basis_src_datasets) == 0: 

803 die(f"Source dataset does not exist: {src.basis_root_dataset}") 

804 datasets_to_snapshot: dict[SnapshotLabel, list[str]] = self.find_datasets_to_snapshot(src_datasets) 

805 datasets_to_snapshot = {label: datasets for label, datasets in datasets_to_snapshot.items() if len(datasets) > 0} 

806 basis_datasets_to_snapshot: dict[SnapshotLabel, list[str]] = datasets_to_snapshot.copy() # shallow copy 

807 commands: dict[SnapshotLabel, list[str]] = {} 

808 for label, datasets in datasets_to_snapshot.items(): 

809 cmd: list[str] = p.split_args(f"{src.sudo} {p.zfs_program} snapshot") 

810 if p.recursive: 

811 # Run 'zfs snapshot -r' on the roots of subtrees if possible, else fallback to non-recursive CLI flavor 

812 root_datasets = self.root_datasets_if_recursive_zfs_snapshot_is_possible(datasets, basis_src_datasets) 

813 if root_datasets is not None: 

814 cmd.append("-r") # recursive; takes a snapshot of all datasets in the subtree(s) 

815 datasets_to_snapshot[label] = root_datasets 

816 commands[label] = cmd 

817 creation_msg = f"Creating {sum(len(datasets) for datasets in basis_datasets_to_snapshot.values())} snapshots" 

818 log.info(p.dry("--create-src-snapshots: %s"), f"{creation_msg} within {len(src_datasets)} datasets ...") 

819 # create snapshots in large (parallel) batches, without using a command line that's too big for the OS to handle 

820 run_ssh_cmd_parallel( 

821 self, 

822 src, 

823 ((commands[lbl], (f"{ds}@{lbl}" for ds in datasets)) for lbl, datasets in datasets_to_snapshot.items()), 

824 fn=lambda cmd, batch: self.run_ssh_command_with_retries( 

825 src, is_dry=p.dry_run, print_stdout=True, cmd=cmd + batch, retry_on_generic_ssh_error=False 

826 ), # retry_on_generic_ssh_error=False means only retry on SSH connect errors b/c `zfs snapshot` isn't idempotent 

827 max_batch_items=2**29, 

828 ) 

829 if is_caching_snapshots(p, src): 

830 # perf: copy lastmodified time of source dataset into local cache to reduce future 'zfs list -t snapshot' calls 

831 self.cache.update_last_modified_cache(basis_datasets_to_snapshot) 

832 

833 def delete_destination_snapshots_task( 

834 self, basis_src_datasets: list[str], dst_datasets: list[str], max_workers: int, task_description: str 

835 ) -> bool: 

836 """Deletes existing destination snapshots that do not exist within the source dataset if they are included by the 

837 --{include|exclude}-snapshot-* policy, and the destination dataset is included via --{include|exclude}-dataset* 

838 policy; implements --delete-dst-snapshots. Does not attempt to delete snapshots that carry a `zfs hold`.""" 

839 p, log = self.params, self.params.log 

840 src, dst = p.src, p.dst 

841 kind: str = "bookmark" if p.delete_dst_bookmarks else "snapshot" 

842 filter_needs_creation_time: bool = has_timerange_filter(p.snapshot_filters) 

843 props: str = "guid,name,userrefs" 

844 props = self.creation_prefix + "creation," + props if filter_needs_creation_time else props 

845 basis_src_datasets_set: set[str] = set(basis_src_datasets) 

846 num_snapshots_found, num_snapshots_deleted = 0, 0 

847 

848 def delete_destination_snapshots(dst_dataset: str, tid: str, retry: Retry) -> bool: # thread-safe 

849 src_dataset: str = replace_prefix(dst_dataset, old_prefix=dst.root_dataset, new_prefix=src.root_dataset) 

850 if src_dataset in basis_src_datasets_set and (are_bookmarks_enabled(p, src) or not p.delete_dst_bookmarks): 

851 src_kind: str = kind 

852 if not p.delete_dst_snapshots_no_crosscheck: 

853 src_kind = "snapshot,bookmark" if are_bookmarks_enabled(p, src) else "snapshot" 

854 src_cmd = p.split_args(f"{p.zfs_program} list -t {src_kind} -d 1 -s name -Hp -o guid", src_dataset) 

855 else: 

856 src_cmd = None 

857 dst_cmd = p.split_args(f"{p.zfs_program} list -t {kind} -d 1 -s createtxg -Hp -o {props}", dst_dataset) 

858 self.maybe_inject_delete(dst, dataset=dst_dataset, delete_trigger="zfs_list_delete_dst_snapshots") 

859 src_snaps_with_guids, dst_snaps_with_guids_str = run_in_parallel( # list src+dst snapshots in parallel 

860 lambda: set(self.run_ssh_command(src, LOG_TRACE, cmd=src_cmd).splitlines() if src_cmd else []), 

861 lambda: self.try_ssh_command(dst, LOG_TRACE, cmd=dst_cmd), 

862 ) 

863 if dst_snaps_with_guids_str is None: 

864 log.warning("Third party deleted destination: %s", dst_dataset) 

865 return False 

866 held_dst_snapshots: set[str] = set() 

867 dst_snaps_with_guids: list[str] = [] 

868 no_userrefs: tuple[str, ...] = ("", "-", "0") # ZFS snapshot property userrefs > 0 indicates a zfs hold 

869 for line in dst_snaps_with_guids_str.splitlines(): 

870 dst_snaps_with_guids.append(line[: line.rindex("\t")]) # strip off trailing userrefs column 

871 _, name, userrefs = line.rsplit("\t", 2) 

872 if userrefs not in no_userrefs: 

873 tag: str = name[name.index("@") + 1 :] 

874 held_dst_snapshots.add(tag) # don't attempt to delete snapshots that carry a `zfs hold` 

875 num_dst_snaps_with_guids = len(dst_snaps_with_guids) 

876 basis_dst_snaps_with_guids: list[str] = dst_snaps_with_guids.copy() 

877 if p.delete_dst_bookmarks: 

878 replace_in_lines(dst_snaps_with_guids, old="#", new="@", count=1) # treat bookmarks as snapshots 

879 # The check against the source dataset happens *after* filtering the dst snapshots with filter_snapshots(). 

880 # `p.delete_dst_snapshots_except` means the user wants to specify snapshots to *retain* aka *keep* 

881 all_except: bool = p.delete_dst_snapshots_except 

882 if p.delete_dst_snapshots_except and not is_dummy(src): 

883 # However, as here we are in "except" mode AND the source is NOT a dummy, we first filter to get what 

884 # the policy says to *keep* (so all_except=False for the filter_snapshots() call), then from that "keep" 

885 # list, we later further refine by checking what's on the source dataset. 

886 all_except = False 

887 dst_snaps_with_guids = filter_snapshots(self, dst_snaps_with_guids, all_except=all_except) 

888 if p.delete_dst_bookmarks: 

889 replace_in_lines(dst_snaps_with_guids, old="@", new="#", count=1) # restore pre-filtering bookmark state 

890 if filter_needs_creation_time: 

891 dst_snaps_with_guids = cut(field=2, lines=dst_snaps_with_guids) 

892 basis_dst_snaps_with_guids = cut(field=2, lines=basis_dst_snaps_with_guids) 

893 if p.delete_dst_snapshots_except and not is_dummy(src): # Non-dummy Source + "Except" (Keep) Mode 

894 # Retain dst snapshots that match snapshot filter policy AND are on src dataset, aka 

895 # Delete dst snapshots except snapshots that match snapshot filter policy AND are on src dataset. 

896 # Concretely, `dst_snaps_with_guids` contains GUIDs of DST snapshots that the filter policy says to KEEP. 

897 # We only actually keep them if they are ALSO on the SRC. 

898 # So, snapshots to DELETE (`dst_tags_to_delete`) are ALL snapshots on DST (`basis_dst_snaps_with_guids`) 

899 # EXCEPT those whose GUIDs are in `dst_snaps_with_guids` AND ALSO in `src_snaps_with_guids`. 

900 except_dst_guids: set[str] = set(cut(field=1, lines=dst_snaps_with_guids)).intersection(src_snaps_with_guids) 

901 dst_tags_to_delete: list[str] = filter_lines_except(basis_dst_snaps_with_guids, except_dst_guids) 

902 else: # Standard Delete Mode OR Dummy Source + "Except" (Keep) Mode 

903 # In standard delete mode: 

904 # `dst_snaps_with_guids` contains GUIDs of policy-selected snapshots on DST. 

905 # We delete those that are NOT on SRC. 

906 # `dst_tags_to_delete` = `dst_snaps_with_guids` - `src_snaps_with_guids`. 

907 # In dummy source + "except" (keep) mode: 

908 # `all_except` was True. 

909 # `dst_snaps_with_guids` contains snaps NOT matching the "keep" policy -- these are the ones to delete. 

910 # `src_snaps_with_guids` is empty. 

911 # `dst_tags_to_delete` = `dst_snaps_with_guids` - {} = `dst_snaps_with_guids`. 

912 dst_guids_to_delete = set(cut(field=1, lines=dst_snaps_with_guids)).difference(src_snaps_with_guids) 

913 dst_tags_to_delete = filter_lines(dst_snaps_with_guids, dst_guids_to_delete) 

914 separator: str = "#" if p.delete_dst_bookmarks else "@" 

915 dst_tags_to_delete = cut(field=2, separator=separator, lines=dst_tags_to_delete) 

916 if p.delete_dst_bookmarks: 

917 delete_bookmarks(self, dst, dst_dataset, snapshot_tags=dst_tags_to_delete) 

918 else: 

919 dst_tags_to_delete = [tag for tag in dst_tags_to_delete if tag not in held_dst_snapshots] 

920 delete_snapshots(self, dst, dst_dataset, snapshot_tags=dst_tags_to_delete) 

921 with self.stats_lock: 

922 nonlocal num_snapshots_found 

923 num_snapshots_found += num_dst_snaps_with_guids 

924 nonlocal num_snapshots_deleted 

925 num_snapshots_deleted += len(dst_tags_to_delete) 

926 if len(dst_tags_to_delete) > 0 and not p.delete_dst_bookmarks: 

927 self.dst_properties[dst_dataset].snapshots_changed = 0 # invalidate cache 

928 return True 

929 

930 # Run delete_destination_snapshots(dataset) for each dataset, while handling errors, retries + parallel exec 

931 failed: bool = False 

932 if are_bookmarks_enabled(p, dst) or not p.delete_dst_bookmarks: 

933 start_time_nanos = time.monotonic_ns() 

934 failed = process_datasets_in_parallel_and_fault_tolerant( 

935 log=log, 

936 datasets=dst_datasets, 

937 process_dataset=delete_destination_snapshots, # lambda 

938 skip_tree_on_error=lambda dataset: False, 

939 skip_on_error=p.skip_on_error, 

940 max_workers=max_workers, 

941 timing=self.task_timing, 

942 termination_handler=self.terminate, 

943 enable_barriers=False, 

944 task_name="--delete-dst-snapshots", 

945 append_exception=self.append_exception, 

946 retry_template=self._retry_template(), 

947 dry_run=p.dry_run, 

948 is_test_mode=self.is_test_mode, 

949 ) 

950 elapsed_nanos = time.monotonic_ns() - start_time_nanos 

951 log.info( 

952 p.dry("--delete-dst-snapshots: %s"), 

953 task_description + f" [Deleted {num_snapshots_deleted} out of {num_snapshots_found} {kind}s " 

954 f"within {len(dst_datasets)} datasets; took {human_readable_duration(elapsed_nanos)}]", 

955 ) 

956 return failed 

957 

958 def delete_dst_datasets_task( 

959 self, basis_src_datasets: list[str], basis_dst_datasets: list[str], sorted_dst_datasets: list[str] 

960 ) -> tuple[list[str], list[str]]: 

961 """Deletes existing destination datasets that do not exist within the source dataset if they are included via 

962 --{include|exclude}-dataset* policy; implements --delete-dst-datasets. 

963 

964 Do not recurse without --recursive. With --recursive, never delete non-selected dataset subtrees or their ancestors. 

965 """ 

966 p = self.params 

967 src, dst = p.src, p.dst 

968 children: dict[str, set[str]] = defaultdict(set) 

969 for dst_dataset in basis_dst_datasets: # Compute the direct children of each NON-FILTERED dataset 

970 parent: str = os.path.dirname(dst_dataset) 

971 children[parent].add(dst_dataset) 

972 to_delete: set[str] = set() 

973 for dst_dataset in reversed(sorted_dst_datasets): # Reverse order facilitates efficient O(N) time algorithm 

974 if children[dst_dataset].issubset(to_delete): 

975 to_delete.add(dst_dataset) # all children are deletable, thus the dataset itself is deletable too 

976 to_delete = to_delete.difference( 

977 replace_prefix(src_dataset, src.root_dataset, dst.root_dataset) for src_dataset in basis_src_datasets 

978 ) 

979 delete_datasets(self, dst, to_delete) 

980 sorted_dst_datasets = sorted(set(sorted_dst_datasets).difference(to_delete)) 

981 basis_dst_datasets = sorted(set(basis_dst_datasets).difference(to_delete)) 

982 return basis_dst_datasets, sorted_dst_datasets 

983 

984 def delete_empty_dst_datasets_task( 

985 self, basis_dst_datasets: list[str], sorted_dst_datasets: list[str] 

986 ) -> tuple[list[str], list[str]]: 

987 """Deletes any existing destination dataset that has no snapshot and no bookmark if all descendants of that dataset 

988 do not have a snapshot or bookmark either; implements --delete-empty-dst-datasets. 

989 

990 To do so, we walk the dataset list (conceptually, a tree) depth-first (i.e. sorted descending). If a dst dataset has 

991 zero snapshots and zero bookmarks and all its children are already marked as orphans, then it is itself an orphan, 

992 and we mark it as such. Walking in a reverse sorted way means that we efficiently check for zero snapshots/bookmarks 

993 not just over the direct children but the entire tree. Finally, delete all orphan datasets in an efficient batched 

994 way. 

995 """ 

996 p = self.params 

997 dst = p.dst 

998 

999 # Compute the direct children of each NON-FILTERED dataset. Thus, no non-selected dataset and no ancestor of a 

1000 # non-selected dataset will ever be added to the "orphan" set. In other words, this treats non-selected dataset 

1001 # subtrees as if they all had snapshots, so non-selected dataset subtrees and their ancestors are guaranteed 

1002 # to not get deleted. 

1003 children: dict[str, set[str]] = defaultdict(set) 

1004 for dst_dataset in basis_dst_datasets: 

1005 parent: str = os.path.dirname(dst_dataset) 

1006 children[parent].add(dst_dataset) 

1007 

1008 def compute_orphans(datasets_having_snapshots: set[str]) -> set[str]: 

1009 """Returns destination datasets having zero snapshots whose children are all orphans.""" 

1010 orphans: set[str] = set() 

1011 for dst_dataset in reversed(sorted_dst_datasets): # Reverse order facilitates efficient O(N) time algorithm 

1012 if (dst_dataset not in datasets_having_snapshots) and children[dst_dataset].issubset(orphans): 

1013 orphans.add(dst_dataset) 

1014 return orphans 

1015 

1016 # Compute candidate orphan datasets, which reduces the list of datasets for which we list snapshots via 

1017 # 'zfs list -t snapshot ...' from dst_datasets to a subset of dst_datasets, which in turn reduces I/O and improves 

1018 # perf. Essentially, this eliminates the I/O to list snapshots for ancestors of excluded datasets. 

1019 candidate_orphans: set[str] = compute_orphans(set()) 

1020 

1021 # Compute destination datasets having more than zero snapshots 

1022 dst_datasets_having_snapshots: set[str] = set() 

1023 with_bookmarks: bool = p.delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots and are_bookmarks_enabled(p, dst) 

1024 btype: str = "bookmark,snapshot" if with_bookmarks else "snapshot" 

1025 cmd: list[str] = p.split_args(f"{p.zfs_program} list -t {btype} -d 1 -S name -Hp -o name") 

1026 for snapshots in zfs_list_snapshots_in_parallel(self, dst, cmd, sorted(candidate_orphans), ordered=False): 

1027 if with_bookmarks: 

1028 replace_in_lines(snapshots, old="#", new="@", count=1) # treat bookmarks as snapshots 

1029 dst_datasets_having_snapshots.update(snap[: snap.index("@")] for snap in snapshots) # union 

1030 

1031 orphans: set[str] = compute_orphans(dst_datasets_having_snapshots) # compute the real orphans 

1032 delete_datasets(self, dst, orphans) # finally, delete the orphan datasets in an efficient way 

1033 sorted_dst_datasets = sorted(set(sorted_dst_datasets).difference(orphans)) 

1034 basis_dst_datasets = sorted(set(basis_dst_datasets).difference(orphans)) 

1035 return basis_dst_datasets, sorted_dst_datasets 

1036 

1037 def monitor_snapshots_task( 

1038 self, sorted_src_datasets: list[str], sorted_dst_datasets: list[str], task_description: str 

1039 ) -> None: 

1040 """Monitors src and dst snapshots; implements --monitor-snapshots.""" 

1041 p, log = self.params, self.params.log 

1042 src, dst = p.src, p.dst 

1043 num_cache_hits: int = self.num_cache_hits 

1044 num_cache_misses: int = self.num_cache_misses 

1045 start_time_nanos: int = time.monotonic_ns() 

1046 dst_alert, src_alert = run_in_parallel( 

1047 lambda: self.monitor_snapshots(dst, sorted_dst_datasets), 

1048 lambda: self.monitor_snapshots(src, sorted_src_datasets), 

1049 ) 

1050 exit_code, _exit_kind, exit_msg = min(dst_alert, src_alert) 

1051 if exit_code != 0: 

1052 die(exit_msg, -exit_code) 

1053 elapsed: str = human_readable_duration(time.monotonic_ns() - start_time_nanos) 

1054 num_cache_hits = self.num_cache_hits - num_cache_hits 

1055 num_cache_misses = self.num_cache_misses - num_cache_misses 

1056 if num_cache_hits > 0 or num_cache_misses > 0: 

1057 msg = self._cache_hits_msg(hits=num_cache_hits, misses=num_cache_misses) 

1058 else: 

1059 msg = "" 

1060 log.info( 

1061 "--monitor-snapshots done: %s", 

1062 f"{task_description} [{len(sorted_src_datasets) + len(sorted_dst_datasets)} datasets; took {elapsed}{msg}]", 

1063 ) 

1064 

1065 def monitor_snapshots(self, remote: Remote, sorted_datasets: list[str]) -> tuple[int, str, str]: 

1066 """Checks snapshot freshness and warns or errors out when limits are exceeded. 

1067 

1068 Alerts the user if the ZFS 'creation' time property of the latest or oldest snapshot for any specified snapshot name 

1069 pattern within the selected datasets is too old wrt. the specified age limit. The purpose is to check if snapshots 

1070 are successfully taken on schedule, successfully replicated on schedule, and successfully pruned on schedule. Process 

1071 exit code is 0, 1, 2 on OK, WARNING, CRITICAL, respectively. 

1072 

1073 Time complexity is O((N log N) + (N * M log M)) where N is the number of datasets and M is the number of snapshots 

1074 per dataset. Space complexity is O(max(N, M)). 

1075 """ 

1076 p, log = self.params, self.params.log 

1077 alerts: list[MonitorSnapshotAlert] = p.monitor_snapshots_config.alerts 

1078 labels: list[SnapshotLabel] = [alert.label for alert in alerts] 

1079 oldest_skip_holds: list[bool] = [alert.oldest_skip_holds for alert in alerts] 

1080 current_unixtime_millis: float = p.create_src_snapshots_config.current_datetime.timestamp() * 1000 

1081 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

1082 if is_caching_snapshots(p, remote): 

1083 props: dict[str, DatasetProperties] = self.dst_properties if remote is p.dst else self.src_properties 

1084 snapshots_changed_dict: dict[str, int] = {dataset: vals.snapshots_changed for dataset, vals in props.items()} 

1085 alerts_hash: str = sha256_128_urlsafe_base64(str(tuple(alerts))) 

1086 label_hashes: dict[SnapshotLabel, str] = { 

1087 label: sha256_128_urlsafe_base64(label.notimestamp_str()) for label in labels 

1088 } 

1089 is_caching: bool = False 

1090 worst_alert: tuple[int, str, str] = (0, "", "") # -exit_code, exit_kind, exit_msg 

1091 

1092 def record_alert(exit_code: int, exit_kind: str, exit_msg: str) -> None: 

1093 nonlocal worst_alert 

1094 worst_alert = min(worst_alert, (-exit_code, exit_kind, exit_msg)) # min() sorts "Latest" before "Oldest" on tie 

1095 

1096 def monitor_last_modified_cache_file(r: Remote, dataset: str, label: SnapshotLabel, alert_cfg: AlertConfig) -> str: 

1097 cache_label: str = os.path.join(MONITOR_CACHE_FILE_PREFIX, alert_cfg.kind[0], label_hashes[label], alerts_hash) 

1098 return self.cache.last_modified_cache_file(r, dataset, cache_label) 

1099 

1100 def alert_msg( 

1101 kind: str, dataset: str, snapshot: str, label: SnapshotLabel, snapshot_age_millis: float, delta_millis: int 

1102 ) -> str: 

1103 assert kind == "Latest" or kind == "Oldest" 

1104 lbl = f"{label.prefix}{label.infix}<timestamp>{label.suffix}" 

1105 if snapshot_age_millis >= current_unixtime_millis: 

1106 return f"No snapshot exists for {dataset}@{lbl}" 

1107 msg = f"{kind} snapshot for {dataset}@{lbl} is {human_readable_duration(snapshot_age_millis, unit='ms')} old" 

1108 s = f": @{snapshot}" if snapshot else "" 

1109 if delta_millis == -1: 

1110 return f"{msg}{s}" 

1111 return f"{msg} but should be at most {human_readable_duration(delta_millis, unit='ms')} old{s}" 

1112 

1113 def check_alert( 

1114 label: SnapshotLabel, alert_cfg: AlertConfig | None, creation_unixtime_secs: int, dataset: str, snapshot: str 

1115 ) -> None: # thread-safe 

1116 if alert_cfg is None: 

1117 return 

1118 if is_caching and not p.dry_run: # update cache with latest state from 'zfs list -t snapshot' 

1119 snapshots_changed: int = snapshots_changed_dict.get(dataset, 0) 

1120 cache_file: str = monitor_last_modified_cache_file(remote, dataset, label, alert_cfg) 

1121 set_last_modification_time_safe( 

1122 cache_file, unixtime_in_secs=(creation_unixtime_secs, snapshots_changed), if_more_recent=True 

1123 ) 

1124 warning_millis: int = alert_cfg.warning_millis 

1125 critical_millis: int = alert_cfg.critical_millis 

1126 alert_kind = alert_cfg.kind 

1127 snapshot_age_millis: float = current_unixtime_millis - creation_unixtime_secs * 1000 

1128 m = "--monitor_snapshots: " 

1129 if snapshot_age_millis > critical_millis: 

1130 msg = m + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, critical_millis) 

1131 log.critical("%s", msg) 

1132 if not p.monitor_snapshots_config.dont_crit: 

1133 record_alert(CRITICAL_STATUS, alert_kind, msg) 

1134 elif snapshot_age_millis > warning_millis: 

1135 msg = m + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, warning_millis) 

1136 log.warning("%s", msg) 

1137 if not p.monitor_snapshots_config.dont_warn: 

1138 record_alert(WARNING_STATUS, alert_kind, msg) 

1139 elif is_debug: 

1140 msg = m + "OK. " + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, delta_millis=-1) 

1141 log.debug("%s", msg) 

1142 

1143 def alert_latest_snapshot(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None: 

1144 alert: MonitorSnapshotAlert = alerts[i] 

1145 check_alert(alert.label, alert.latest, creation_unixtime_secs, dataset, snapshot) 

1146 

1147 def alert_oldest_snapshot(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None: 

1148 alert: MonitorSnapshotAlert = alerts[i] 

1149 check_alert(alert.label, alert.oldest, creation_unixtime_secs, dataset, snapshot) 

1150 

1151 def find_stale_datasets_and_check_alerts() -> list[str]: 

1152 """If the cache is enabled, check which datasets have changed to determine which datasets can be skipped cheaply, 

1153 that is, without incurring 'zfs list -t snapshots'. 

1154 

1155 This is done by comparing the "snapshots_changed" ZFS dataset property with the local cache. See 

1156 https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed 

1157 """ 

1158 stale_datasets: list[str] = [] 

1159 time_threshold: float = time.time() - MATURITY_TIME_THRESHOLD_SECS 

1160 for dataset in sorted_datasets: 

1161 is_stale_dataset: bool = False 

1162 snapshots_changed: int = snapshots_changed_dict.get(dataset, 0) 

1163 for alert in alerts: 

1164 for cfg in (alert.latest, alert.oldest): 

1165 if cfg is None: 

1166 continue 

1167 if ( 

1168 snapshots_changed != 0 

1169 and snapshots_changed < time_threshold 

1170 and ( # always True 

1171 cached_unix_times := self.cache.get_snapshots_changed2( 

1172 monitor_last_modified_cache_file(remote, dataset, alert.label, cfg) 

1173 ) 

1174 ) 

1175 and snapshots_changed == cached_unix_times[1] # cached snapshots_changed aka last modified time 

1176 and snapshots_changed >= cached_unix_times[0] # creation time of minmax snapshot aka access time 

1177 ): # cached state is still valid; emit an alert if the latest/oldest snapshot is too old 

1178 lbl = alert.label 

1179 check_alert(lbl, cfg, creation_unixtime_secs=cached_unix_times[0], dataset=dataset, snapshot="") 

1180 else: # cached state is no longer valid; fallback to 'zfs list -t snapshot' 

1181 is_stale_dataset = True 

1182 if is_stale_dataset: 

1183 stale_datasets.append(dataset) 

1184 return stale_datasets 

1185 

1186 # satisfy request from local cache as much as possible 

1187 if is_caching_snapshots(p, remote): 

1188 stale_datasets: list[str] = find_stale_datasets_and_check_alerts() 

1189 with self.stats_lock: 

1190 self.num_cache_misses += len(stale_datasets) 

1191 self.num_cache_hits += len(sorted_datasets) - len(stale_datasets) 

1192 else: 

1193 stale_datasets = sorted_datasets 

1194 

1195 # fallback to 'zfs list -t snapshot' for any remaining datasets, as these couldn't be satisfied from local cache 

1196 is_caching = is_caching_snapshots(p, remote) 

1197 datasets_without_snapshots: list[str] = self.handle_minmax_snapshots( 

1198 remote, 

1199 stale_datasets, 

1200 labels, 

1201 fn_latest=alert_latest_snapshot, 

1202 fn_oldest=alert_oldest_snapshot, 

1203 fn_oldest_skip_holds=oldest_skip_holds, 

1204 ) 

1205 for dataset in datasets_without_snapshots: 

1206 for i in range(len(alerts)): 

1207 alert_latest_snapshot(i, creation_unixtime_secs=0, dataset=dataset, snapshot="") 

1208 alert_oldest_snapshot(i, creation_unixtime_secs=0, dataset=dataset, snapshot="") 

1209 return worst_alert 

1210 

1211 def replicate_datasets(self, src_datasets: list[str], task_description: str, max_workers: int) -> bool: 

1212 """Replicates a list of datasets.""" 

1213 assert (not self.is_test_mode) or src_datasets == sorted(src_datasets), "List is not sorted" 

1214 p, log = self.params, self.params.log 

1215 src, dst = p.src, p.dst 

1216 self.num_snapshots_found = 0 

1217 self.num_snapshots_replicated = 0 

1218 log.info("Starting replication task: %s", task_description + f" [{len(src_datasets)} datasets]") 

1219 start_time_nanos: int = time.monotonic_ns() 

1220 

1221 def src2dst(src_dataset: str) -> str: 

1222 return replace_prefix(src_dataset, old_prefix=src.root_dataset, new_prefix=dst.root_dataset) 

1223 

1224 def dst2src(dst_dataset: str) -> str: 

1225 return replace_prefix(dst_dataset, old_prefix=dst.root_dataset, new_prefix=src.root_dataset) 

1226 

1227 def find_stale_datasets() -> tuple[list[str], dict[str, str]]: 

1228 """If the cache is enabled on replication, check which src datasets or dst datasets have changed to determine 

1229 which datasets can be skipped cheaply, i.e. without incurring 'zfs list -t snapshots'. 

1230 

1231 This is done by comparing the "snapshots_changed" ZFS dataset property with the local cache. See 

1232 https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed 

1233 """ 

1234 # First, check which src datasets have changed since the last replication to that destination 

1235 cache_files: dict[str, str] = {} 

1236 stale_src_datasets1: list[str] = [] 

1237 maybe_stale_dst_datasets: list[str] = [] 

1238 userhost_dir: str = sha256_85_urlsafe_base64(p.dst.cache_namespace()) 

1239 filter_key = tuple(tuple(f) for f in p.snapshot_filters) # cache is only valid for same --include/excl-snapshot* 

1240 filter_hash_code: str = sha256_85_urlsafe_base64(str(filter_key)) 

1241 for src_dataset in src_datasets: 

1242 dst_dataset: str = src2dst(src_dataset) # cache is only valid for identical destination dataset 

1243 dst_dataset_dir: str = sha256_85_urlsafe_base64(dst_dataset) 

1244 cache_label: str = os.path.join( 

1245 REPLICATION_CACHE_FILE_PREFIX, userhost_dir, dst_dataset_dir, filter_hash_code 

1246 ) 

1247 cache_file: str = self.cache.last_modified_cache_file(src, src_dataset, cache_label) 

1248 cache_files[src_dataset] = cache_file 

1249 snapshots_changed: int = self.src_properties[src_dataset].snapshots_changed # get prop "for free" 

1250 if ( 

1251 snapshots_changed != 0 

1252 and time.time() > snapshots_changed + MATURITY_TIME_THRESHOLD_SECS 

1253 and snapshots_changed == self.cache.get_snapshots_changed(cache_file) 

1254 ): 

1255 maybe_stale_dst_datasets.append(dst_dataset) 

1256 else: 

1257 stale_src_datasets1.append(src_dataset) 

1258 

1259 # For each src dataset that hasn't changed, check if the corresponding dst dataset has changed 

1260 stale_src_datasets2: list[str] = [] 

1261 dst_snapshots_changed_dict: dict[str, int] = self.cache.zfs_get_snapshots_changed(dst, maybe_stale_dst_datasets) 

1262 for dst_dataset in maybe_stale_dst_datasets: 

1263 snapshots_changed = dst_snapshots_changed_dict.get(dst_dataset, 0) 

1264 if ( 

1265 snapshots_changed != 0 

1266 and time.time() > snapshots_changed + MATURITY_TIME_THRESHOLD_SECS 

1267 and snapshots_changed 

1268 == self.cache.get_snapshots_changed(self.cache.last_modified_cache_file(dst, dst_dataset)) 

1269 ): 

1270 log.info("Already up-to-date [cached]: %s", dst_dataset) 

1271 else: 

1272 stale_src_datasets2.append(dst2src(dst_dataset)) 

1273 assert (not self.is_test_mode) or stale_src_datasets1 == sorted(stale_src_datasets1), "List is not sorted" 

1274 assert (not self.is_test_mode) or stale_src_datasets2 == sorted(stale_src_datasets2), "List is not sorted" 

1275 stale_src_datasets = list(heapq.merge(stale_src_datasets1, stale_src_datasets2)) # merge two sorted lists 

1276 assert (not self.is_test_mode) or not has_duplicates(stale_src_datasets), "List contains duplicates" 

1277 return stale_src_datasets, cache_files 

1278 

1279 if is_caching_snapshots(p, src): 

1280 stale_src_datasets, cache_files = find_stale_datasets() 

1281 num_cache_misses = len(stale_src_datasets) 

1282 num_cache_hits = len(src_datasets) - len(stale_src_datasets) 

1283 self.num_cache_misses += num_cache_misses 

1284 self.num_cache_hits += num_cache_hits 

1285 cmsg = self._cache_hits_msg(hits=num_cache_hits, misses=num_cache_misses) 

1286 else: 

1287 stale_src_datasets = src_datasets 

1288 cache_files = {} 

1289 cmsg = "" 

1290 

1291 done_src_datasets: list[str] = [] 

1292 done_src_datasets_lock: threading.Lock = threading.Lock() 

1293 

1294 def _process_dataset_fn(src_dataset: str, tid: str, retry: Retry) -> bool: 

1295 result: bool = replicate_dataset(job=self, src_dataset=src_dataset, tid=tid, retry=retry) 

1296 with done_src_datasets_lock: 

1297 done_src_datasets.append(src_dataset) # record datasets that were actually replicated (not skipped) 

1298 return result 

1299 

1300 # Run replicate_dataset(dataset) for each dataset, while taking care of errors, retries + parallel execution 

1301 failed: bool = process_datasets_in_parallel_and_fault_tolerant( 

1302 log=log, 

1303 datasets=stale_src_datasets, 

1304 process_dataset=_process_dataset_fn, 

1305 skip_tree_on_error=lambda dataset: not self.dst_dataset_exists[src2dst(dataset)], 

1306 skip_on_error=p.skip_on_error, 

1307 max_workers=max_workers, 

1308 timing=self.task_timing, 

1309 termination_handler=self.terminate, 

1310 enable_barriers=False, 

1311 task_name="Replication", 

1312 append_exception=self.append_exception, 

1313 retry_template=self._retry_template(), 

1314 dry_run=p.dry_run, 

1315 is_test_mode=self.is_test_mode, 

1316 ) 

1317 

1318 if is_caching_snapshots(p, src) and len(done_src_datasets) > 0: 

1319 # refresh "snapshots_changed" ZFS dataset property from dst 

1320 stale_dst_datasets: list[str] = [src2dst(src_dataset) for src_dataset in sorted(done_src_datasets)] 

1321 dst_snapshots_changed_dict: dict[str, int] = self.cache.zfs_get_snapshots_changed(dst, stale_dst_datasets) 

1322 for dst_dataset in stale_dst_datasets: # update local cache 

1323 dst_snapshots_changed: int = dst_snapshots_changed_dict.get(dst_dataset, 0) 

1324 dst_cache_file: str = self.cache.last_modified_cache_file(dst, dst_dataset) 

1325 src_dataset: str = dst2src(dst_dataset) 

1326 src_snapshots_changed: int = self.src_properties[src_dataset].snapshots_changed 

1327 if not p.dry_run: 

1328 set_last_modification_time_safe( 

1329 cache_files[src_dataset], unixtime_in_secs=src_snapshots_changed, if_more_recent=True 

1330 ) 

1331 set_last_modification_time_safe( 

1332 dst_cache_file, unixtime_in_secs=dst_snapshots_changed, if_more_recent=True 

1333 ) 

1334 

1335 elapsed_nanos: int = time.monotonic_ns() - start_time_nanos 

1336 log.info( 

1337 p.dry("Replication done: %s"), 

1338 f"{task_description} [Replicated {self.num_snapshots_replicated} out of {self.num_snapshots_found} snapshots" 

1339 f" within {len(src_datasets)} datasets; took {human_readable_duration(elapsed_nanos)}{cmsg}]", 

1340 ) 

1341 return failed 

1342 

1343 def maybe_inject_delete(self, remote: Remote, dataset: str, delete_trigger: str) -> None: 

1344 """For testing only; for unit tests to delete datasets during replication and test correct handling of that.""" 

1345 assert delete_trigger 

1346 counter = self.delete_injection_triggers.get("before") 

1347 if counter and self.decrement_injection_counter(counter, delete_trigger): 

1348 p = self.params 

1349 cmd = p.split_args(f"{remote.sudo} {p.zfs_program} destroy -r", p.force_unmount, p.force_hard, dataset or "") 

1350 self.run_ssh_command(remote, LOG_DEBUG, print_stdout=True, cmd=cmd) 

1351 

1352 def maybe_inject_params(self, error_trigger: str) -> None: 

1353 """For testing only; for unit tests to simulate errors during replication and test correct handling of them.""" 

1354 assert error_trigger 

1355 counter = self.error_injection_triggers.get("before") 

1356 if counter and self.decrement_injection_counter(counter, error_trigger): 

1357 self.inject_params = self.param_injection_triggers[error_trigger] 

1358 elif error_trigger in self.param_injection_triggers: 

1359 self.inject_params = {} 

1360 

1361 @staticmethod 

1362 def recv_option_property_names(recv_opts: list[str]) -> set[str]: 

1363 """Extracts -o and -x property names that are already specified on the command line; This can be used to check for 

1364 dupes because 'zfs receive' does not accept multiple -o or -x options with the same property name.""" 

1365 propnames: set[str] = set() 

1366 i = 0 

1367 n = len(recv_opts) 

1368 while i < n: 

1369 stripped: str = recv_opts[i].strip() 

1370 if stripped in ("-o", "-x"): 

1371 i += 1 

1372 if i == n or recv_opts[i].strip() in ("-o", "-x"): 

1373 die(f"Missing value for {stripped} option in --zfs-recv-program-opt(s): {' '.join(recv_opts)}") 

1374 if stripped == "-o" and "=" not in recv_opts[i]: 

1375 die(f"Missing value for {stripped} name=value pair in --zfs-recv-program-opt(s): {' '.join(recv_opts)}") 

1376 propname: str = recv_opts[i] if stripped == "-x" else recv_opts[i].split("=", 1)[0] 

1377 validate_property_name(propname, "--zfs-recv-program-opt(s)") 

1378 propnames.add(propname) 

1379 i += 1 

1380 return propnames 

1381 

1382 def root_datasets_if_recursive_zfs_snapshot_is_possible( 

1383 self, datasets: list[str], basis_datasets: list[str] 

1384 ) -> list[str] | None: 

1385 """Returns the root datasets within the (filtered) `datasets` list if no incompatible pruning is detected. A dataset 

1386 within `datasets` is considered a root dataset if it has no parent, i.e. it is not a descendant of any dataset in 

1387 `datasets`. Returns `None` if any (unfiltered) dataset in `basis_dataset` that is a descendant of at least one of the 

1388 root datasets is missing in `datasets`, indicating that --include/exclude-dataset* or the snapshot schedule have 

1389 pruned a dataset in a way that is incompatible with 'zfs snapshot -r' CLI semantics, thus requiring a switch to the 

1390 non-recursive 'zfs snapshot snapshot1 .. snapshot N' CLI flavor. 

1391 

1392 Assumes that set(datasets).issubset(set(basis_datasets)). Also assumes that datasets and basis_datasets are both 

1393 sorted (and thus the output root_datasets list is sorted too), which is why this algorithm is efficient - O(N) time 

1394 complexity. The impl is akin to the merge algorithm of a merge sort, adapted to our specific use case. 

1395 See root_datasets_if_recursive_zfs_snapshot_is_possible_slow_but_correct() in the unit test suite for an alternative 

1396 impl that's easier to grok. 

1397 """ 

1398 assert (not self.is_test_mode) or datasets == sorted(datasets), "List is not sorted" 

1399 assert (not self.is_test_mode) or not has_duplicates(datasets), "List contains duplicates" 

1400 assert (not self.is_test_mode) or basis_datasets == sorted(basis_datasets), "List is not sorted" 

1401 assert (not self.is_test_mode) or not has_duplicates(basis_datasets), "List contains duplicates" 

1402 assert (not self.is_test_mode) or set(datasets).issubset(set(basis_datasets)), "Not a subset" 

1403 root_datasets: list[str] = self.find_root_datasets(datasets) 

1404 i = 0 

1405 j = 0 

1406 k = 0 

1407 len_root_datasets = len(root_datasets) 

1408 len_basis_datasets = len(basis_datasets) 

1409 len_datasets = len(datasets) 

1410 while i < len_root_datasets and j < len_basis_datasets: # walk and "merge" the sorted lists, in sync 

1411 if basis_datasets[j] < root_datasets[i]: # irrelevant subtree? 

1412 j += 1 # move to next basis_datasets[j] 

1413 elif is_descendant(basis_datasets[j], of_root_dataset=root_datasets[i]): # relevant subtree? 

1414 while k < len_datasets and datasets[k] < basis_datasets[j]: 

1415 k += 1 # move to next datasets[k] 

1416 if k == len_datasets or datasets[k] != basis_datasets[j]: # dataset chopped off by schedule or --incl/excl*? 

1417 return None # detected filter pruning that is incompatible with 'zfs snapshot -r' 

1418 j += 1 # move to next basis_datasets[j] 

1419 else: 

1420 i += 1 # move to next root_dataset[i]; no need to check root_datasets that are no longer (or not yet) reachable 

1421 return root_datasets 

1422 

1423 @staticmethod 

1424 def find_root_datasets(sorted_datasets: list[str]) -> list[str]: 

1425 """Returns the roots of the subtrees in the (sorted) input datasets; The output root dataset list is sorted, too; A 

1426 dataset is a root dataset if it has no parent, i.e. it is not a descendant of any dataset in the input datasets.""" 

1427 root_datasets: list[str] = [] 

1428 skip_dataset: str = DONT_SKIP_DATASET 

1429 for dataset in sorted_datasets: 

1430 if is_descendant(dataset, of_root_dataset=skip_dataset): 

1431 continue 

1432 skip_dataset = dataset 

1433 root_datasets.append(dataset) 

1434 return root_datasets 

1435 

1436 def find_datasets_to_snapshot(self, sorted_datasets: list[str]) -> dict[SnapshotLabel, list[str]]: 

1437 """Given a (sorted) list of source datasets, returns a dict where the key is a snapshot name (aka SnapshotLabel, e.g. 

1438 bzfs_2024-11-06_08:30:05_hourly) and the value is the (sorted) (sub)list of datasets for which a snapshot needs to be 

1439 created with that name, because these datasets are due per the schedule, either because the 'creation' time of their 

1440 most recent snapshot with that name pattern is now too old, or such a snapshot does not even exist. 

1441 

1442 The baseline implementation uses the 'zfs list -t snapshot' CLI to find the most recent snapshots, which is simple 

1443 but doesn't scale well with the number of snapshots, at least if the goal is to take snapshots every second. An 

1444 alternative, much more scalable, implementation queries the standard ZFS "snapshots_changed" dataset property 

1445 (requires zfs >= 2.2.0), in combination with a local cache that stores this property, as well as the creation time of 

1446 the most recent snapshot, for each SnapshotLabel and each dataset. 

1447 """ 

1448 p, log = self.params, self.params.log 

1449 src = p.src 

1450 config: CreateSrcSnapshotConfig = p.create_src_snapshots_config 

1451 datasets_to_snapshot: dict[SnapshotLabel, list[str]] = defaultdict(list) 

1452 is_caching: bool = False 

1453 interner: HashedInterner[datetime] = HashedInterner() # reduces memory footprint 

1454 msgs: list[tuple[datetime, str, SnapshotLabel, str]] = [] 

1455 

1456 def create_snapshot_if_latest_is_too_old( 

1457 datasets_to_snapshot: dict[SnapshotLabel, list[str]], dataset: str, label: SnapshotLabel, creation_unixtime: int 

1458 ) -> None: # thread-safe 

1459 """Schedules creation of a snapshot for the given label if the label's existing latest snapshot is too old.""" 

1460 creation_dt: datetime = datetime.fromtimestamp(creation_unixtime, tz=config.tz) 

1461 log.log(LOG_TRACE, "Latest snapshot creation: %s for %s", creation_dt, label) 

1462 duration_amount, duration_unit = config.suffix_durations[label.suffix] 

1463 next_event_dt: datetime = config.anchors.round_datetime_up_to_duration_multiple( 

1464 creation_dt + timedelta(microseconds=1), duration_amount, duration_unit 

1465 ) 

1466 msg: str = "" 

1467 if config.current_datetime >= next_event_dt: 

1468 datasets_to_snapshot[label].append(dataset) # mark it as scheduled for snapshot creation 

1469 msg = " has passed" 

1470 next_event_dt = interner.intern(next_event_dt) 

1471 msgs.append((next_event_dt, dataset, label, msg)) 

1472 if is_caching and not p.dry_run: # update cache with latest state from 'zfs list -t snapshot' 

1473 # Per-label cache stores (atime=creation, mtime=snapshots_changed) so later runs can safely trust creation 

1474 # only when the label's mtime matches the current dataset-level '=' cache value. Excludes timestamp of label. 

1475 cache_file: str = self.cache.last_modified_cache_file(src, dataset, label_hashes[label]) 

1476 unixtimes: tuple[int, int] = (creation_unixtime, self.src_properties[dataset].snapshots_changed) 

1477 set_last_modification_time_safe(cache_file, unixtime_in_secs=unixtimes, if_more_recent=True) 

1478 

1479 labels: list[SnapshotLabel] = [] 

1480 config_labels: list[SnapshotLabel] = config.snapshot_labels() 

1481 for label in config_labels: 

1482 duration_amount_, _duration_unit = config.suffix_durations[label.suffix] 

1483 if duration_amount_ == 0 or config.create_src_snapshots_even_if_not_due: 

1484 datasets_to_snapshot[label] = sorted_datasets # take snapshot regardless of creation time of existing snaps 

1485 else: 

1486 labels.append(label) 

1487 if len(labels) == 0: 

1488 return datasets_to_snapshot # nothing more TBD 

1489 label_hashes: dict[SnapshotLabel, str] = { 

1490 label: sha256_128_urlsafe_base64(label.notimestamp_str()) for label in labels 

1491 } 

1492 

1493 # satisfy request from local cache as much as possible 

1494 cached_datasets_to_snapshot: dict[SnapshotLabel, list[str]] = defaultdict(list) 

1495 if is_caching_snapshots(p, src): 

1496 sorted_datasets_todo: list[str] = [] 

1497 time_threshold: float = time.time() - MATURITY_TIME_THRESHOLD_SECS 

1498 for dataset in sorted_datasets: 

1499 cache: SnapshotCache = self.cache 

1500 cached_snapshots_changed: int = cache.get_snapshots_changed(cache.last_modified_cache_file(src, dataset)) 

1501 if cached_snapshots_changed == 0: 

1502 sorted_datasets_todo.append(dataset) # request cannot be answered from cache 

1503 continue 

1504 if cached_snapshots_changed != self.src_properties[dataset].snapshots_changed: # get that prop "for free" 

1505 cache.invalidate_last_modified_cache_dataset(dataset) 

1506 sorted_datasets_todo.append(dataset) # request cannot be answered from cache 

1507 continue 

1508 if cached_snapshots_changed >= time_threshold: # Avoid equal-second races: only trust matured cache entries 

1509 sorted_datasets_todo.append(dataset) # cache entry isn't mature enough to be trusted; skip cache 

1510 continue 

1511 creation_unixtimes: list[int] = [] 

1512 for label_hash in label_hashes.values(): 

1513 # For per-label files, atime stores the latest matching snapshot's creation time, while mtime stores 

1514 # the dataset-level snapshots_changed observed when this label file was written. 

1515 atime, mtime = cache.get_snapshots_changed2(cache.last_modified_cache_file(src, dataset, label_hash)) 

1516 # Sanity check: trust per-label cache only when: 

1517 # - mtime equals the dataset-level '=' cache (same snapshots_changed), and 

1518 # - atime is plausible and not later than mtime (creation <= snapshots_changed), and 

1519 # - neither atime nor mtime is zero (unknown provenance). 

1520 # Otherwise fall back to 'zfs list -t snapshot' to avoid stale creation times after newer changes. 

1521 if atime == 0 or mtime == 0 or mtime != cached_snapshots_changed or atime > mtime: 

1522 sorted_datasets_todo.append(dataset) # request cannot be answered from cache 

1523 break 

1524 creation_unixtimes.append(atime) 

1525 if len(creation_unixtimes) == len(labels): 

1526 for j, label in enumerate(labels): 

1527 create_snapshot_if_latest_is_too_old( 

1528 cached_datasets_to_snapshot, dataset, label, creation_unixtimes[j] 

1529 ) 

1530 sorted_datasets = sorted_datasets_todo 

1531 

1532 def create_snapshot_fn(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None: 

1533 create_snapshot_if_latest_is_too_old(datasets_to_snapshot, dataset, labels[i], creation_unixtime_secs) 

1534 

1535 def on_finish_dataset(dataset: str) -> None: 

1536 if is_caching_snapshots(p, src) and not p.dry_run: 

1537 set_last_modification_time_safe( 

1538 self.cache.last_modified_cache_file(src, dataset), 

1539 unixtime_in_secs=self.src_properties[dataset].snapshots_changed, 

1540 if_more_recent=True, 

1541 ) 

1542 

1543 # fallback to 'zfs list -t snapshot' for any remaining datasets, as these couldn't be satisfied from local cache 

1544 is_caching = is_caching_snapshots(p, src) 

1545 datasets_without_snapshots: list[str] = self.handle_minmax_snapshots( 

1546 src, sorted_datasets, labels, fn_latest=create_snapshot_fn, fn_on_finish_dataset=on_finish_dataset 

1547 ) 

1548 for lbl in labels: # merge (sorted) results from local cache + 'zfs list -t snapshot' into (sorted) combined result 

1549 datasets_to_snapshot[lbl].sort() 

1550 if datasets_without_snapshots or (lbl in cached_datasets_to_snapshot): # +take snaps for snapshot-less datasets 

1551 datasets_to_snapshot[lbl] = list( # inputs to merge() are sorted, and outputs are sorted too 

1552 heapq.merge(datasets_to_snapshot[lbl], cached_datasets_to_snapshot[lbl], datasets_without_snapshots) 

1553 ) 

1554 for label, datasets in datasets_to_snapshot.items(): 

1555 assert (not self.is_test_mode) or datasets == sorted(datasets), "List is not sorted" 

1556 assert (not self.is_test_mode) or not has_duplicates(datasets), "List contains duplicates" 

1557 assert label 

1558 

1559 msgs.sort() # sort by time, dataset, label 

1560 for i in range(0, len(msgs), 10_000): # reduce logging overhead via mini-batching 

1561 text = "".join( 

1562 f"\nNext scheduled snapshot time: {next_event_dt} for {dataset}@{label}{msg}" 

1563 for next_event_dt, dataset, label, msg in msgs[i : i + 10_000] 

1564 ) 

1565 log.info("Next scheduled snapshot times ...%s", text) 

1566 

1567 # sort keys to ensure that we take snapshots for dailies before hourlies, and so on 

1568 datasets_to_snapshot = {lbl: datasets_to_snapshot[lbl] for lbl in config_labels if lbl in datasets_to_snapshot} 

1569 return datasets_to_snapshot 

1570 

1571 def handle_minmax_snapshots( 

1572 self, 

1573 remote: Remote, 

1574 sorted_datasets: list[str], 

1575 labels: list[SnapshotLabel], 

1576 *, 

1577 fn_latest: Callable[[int, int, str, str], None], # callback function for latest snapshot 

1578 fn_oldest: Callable[[int, int, str, str], None] | None = None, # callback function for oldest snapshot 

1579 fn_oldest_skip_holds: Sequence[bool] = (), 

1580 fn_on_finish_dataset: Callable[[str], None] = lambda dataset: None, 

1581 ) -> list[str]: # thread-safe 

1582 """For each dataset in `sorted_datasets`, for each label in `labels`, finds the latest and oldest snapshot, and runs 

1583 the callback functions on them; Ignores the timestamp of the input labels and the timestamp of the snapshot names. 

1584 

1585 If the (optional) fn_oldest_skip_holds=True for a given label, then snapshots for that label that carry a 'zfs hold' 

1586 are skipped (ignored) when finding the oldest snapshot for fn_oldest. This can be useful for monitor_snapshots(), 

1587 given that users often intentionally retain holds for longer than the "normal" snapshot retention period, and this 

1588 shouldn't necessarily cause monitoring to emit alerts. 

1589 """ 

1590 if fn_oldest is not None: 

1591 assert len(labels) == len(fn_oldest_skip_holds) 

1592 assert (not self.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted" 

1593 no_userrefs: tuple[str, ...] = ("", "-", "0") # ZFS snapshot property userrefs > 0 indicates a zfs hold 

1594 

1595 def extract_fields(line: str) -> tuple[int, int, str, bool]: 

1596 fields: list[str] = line.split("\t") 

1597 if len(fields) == 3: 

1598 name, createtxg, creation_unixtime_secs = fields 

1599 userrefs = "" 

1600 else: 

1601 name, createtxg, creation_unixtime_secs, userrefs = fields 

1602 return ( 

1603 int(createtxg), 

1604 int(creation_unixtime_secs), 

1605 name.split("@", 1)[1], 

1606 userrefs in no_userrefs, 

1607 ) 

1608 

1609 p = self.params 

1610 props: str = "name,createtxg,creation" 

1611 props = props if fn_oldest is None or not any(fn_oldest_skip_holds) else props + ",userrefs" 

1612 cmd = p.split_args(f"{p.zfs_program} list -t snapshot -d 1 -Hp -o {props}") # sorts by dataset,creation 

1613 datasets_with_snapshots: set[str] = set() 

1614 interner: SortedInterner[str] = SortedInterner(sorted_datasets) # reduces memory footprint 

1615 for lines in zfs_list_snapshots_in_parallel(self, remote, cmd, sorted_datasets, ordered=False): 

1616 # streaming group by dataset name (consumes constant memory only) 

1617 for dataset, group in itertools.groupby(lines, key=lambda line: line.split("\t", 1)[0].split("@", 1)[0]): 

1618 dataset = interner.interned(dataset) 

1619 snapshots = sorted( # fetch all snapshots of current dataset and sort by createtxg,creation,name 

1620 extract_fields(line) for line in group 

1621 ) # perf: sorted() is fast because Timsort is close to O(N) for nearly sorted input, which is our case 

1622 assert len(snapshots) > 0 

1623 datasets_with_snapshots.add(dataset) 

1624 snapshot_names: tuple[str, ...] = tuple(snapshot[2] for snapshot in snapshots) 

1625 year_with_4_digits_regex: re.Pattern[str] = YEAR_WITH_FOUR_DIGITS_REGEX 

1626 year_with_4_digits_regex_fullmatch = year_with_4_digits_regex.fullmatch 

1627 startswith = str.startswith 

1628 endswith = str.endswith 

1629 fns = ((fn_latest, True),) if fn_oldest is None else ((fn_latest, True), (fn_oldest, False)) 

1630 for i, label in enumerate(labels): 

1631 infix: str = label.infix 

1632 start: str = label.prefix + infix 

1633 end: str = label.suffix 

1634 startlen: int = len(start) 

1635 endlen: int = len(end) 

1636 minlen: int = startlen + endlen if infix else 4 + startlen + endlen # year_with_four_digits_regex 

1637 startlen_4: int = startlen + 4 # [startlen:startlen+4] # year_with_four_digits_regex 

1638 has_infix: bool = bool(infix) 

1639 for fn, is_reverse in fns: 

1640 creation_unixtime_secs: int = 0 # find creation time of latest or oldest snapshot matching the label 

1641 minmax_snapshot: str = "" 

1642 no_skip_holds: bool = is_reverse or not fn_oldest_skip_holds[i] 

1643 for j in range(len(snapshot_names) - 1, -1, -1) if is_reverse else range(len(snapshot_names)): 

1644 snapshot_name: str = snapshot_names[j] 

1645 if ( 

1646 endswith(snapshot_name, end) # aka snapshot_name.endswith(end) 

1647 and startswith(snapshot_name, start) # aka snapshot_name.startswith(start) 

1648 and len(snapshot_name) >= minlen 

1649 and (has_infix or year_with_4_digits_regex_fullmatch(snapshot_name, startlen, startlen_4)) 

1650 and (no_skip_holds or snapshots[j][3]) 

1651 ): 

1652 creation_unixtime_secs = snapshots[j][1] 

1653 minmax_snapshot = snapshot_name 

1654 break 

1655 fn(i, creation_unixtime_secs, dataset, minmax_snapshot) 

1656 fn_on_finish_dataset(dataset) 

1657 datasets_without_snapshots = [dataset for dataset in sorted_datasets if dataset not in datasets_with_snapshots] 

1658 return datasets_without_snapshots 

1659 

1660 @staticmethod 

1661 def _cache_hits_msg(hits: int, misses: int) -> str: 

1662 total = hits + misses 

1663 return f", cache hits: {percent(hits, total, print_total=True)}, misses: {percent(misses, total, print_total=True)}" 

1664 

1665 def run_ssh_command( 

1666 self, 

1667 remote: MiniRemote, 

1668 loglevel: int = logging.INFO, 

1669 is_dry: bool = False, 

1670 check: bool = True, 

1671 print_stdout: bool = False, 

1672 print_stderr: bool = True, 

1673 cmd: list[str] | None = None, 

1674 retry_on_generic_ssh_error: bool = True, 

1675 ) -> str: 

1676 """Runs the given CLI cmd via ssh on the given remote, and returns stdout.""" 

1677 assert cmd is not None and isinstance(cmd, list) and len(cmd) > 0 

1678 conn_pool: ConnectionPool = self.params.connection_pools[remote.location].pool(SHARED) 

1679 with conn_pool.connection() as conn: 

1680 log: logging.Logger = self.params.log 

1681 try: 

1682 process: subprocess.CompletedProcess[str] = conn.run_ssh_command( 

1683 cmd=cmd, 

1684 job=self, 

1685 loglevel=loglevel, 

1686 is_dry=is_dry, 

1687 check=check, 

1688 stdin=DEVNULL, 

1689 stdout=PIPE, 

1690 stderr=PIPE, 

1691 text=True, 

1692 ) 

1693 except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e: 

1694 xprint(log, stderr_to_str(e.stdout) if print_stdout else e.stdout, run=print_stdout, file=sys.stdout, end="") 

1695 xprint(log, stderr_to_str(e.stderr) if print_stderr else e.stderr, run=print_stderr, file=sys.stderr, end="") 

1696 if retry_on_generic_ssh_error and isinstance(e, subprocess.CalledProcessError): 

1697 stderr: str = stderr_to_str(e.stderr) 

1698 if stderr.startswith("ssh: "): 

1699 assert e.returncode == 255, e.returncode # error within SSH itself (not during the remote command) 

1700 raise RetryableError(display_msg="ssh") from e 

1701 raise 

1702 else: 

1703 if is_dry: 

1704 return "" 

1705 xprint(log, process.stdout, run=print_stdout, file=sys.stdout, end="") 

1706 xprint(log, process.stderr, run=print_stderr, file=sys.stderr, end="") 

1707 return process.stdout 

1708 

1709 def try_ssh_command( 

1710 self, 

1711 remote: MiniRemote, 

1712 loglevel: int, 

1713 is_dry: bool = False, 

1714 print_stdout: bool = False, 

1715 cmd: list[str] | None = None, 

1716 exists: bool = True, 

1717 error_trigger: str | None = None, 

1718 ) -> str | None: 

1719 """Convenience method that helps retry/react to a dataset or pool that potentially doesn't exist anymore.""" 

1720 assert cmd is not None and isinstance(cmd, list) and len(cmd) > 0 

1721 log = self.params.log 

1722 try: 

1723 self.maybe_inject_error(cmd=cmd, error_trigger=error_trigger) 

1724 return self.run_ssh_command(remote=remote, loglevel=loglevel, is_dry=is_dry, print_stdout=print_stdout, cmd=cmd) 

1725 except (subprocess.CalledProcessError, UnicodeDecodeError) as e: 

1726 if not isinstance(e, UnicodeDecodeError): 

1727 stderr: str = stderr_to_str(e.stderr) 

1728 if exists and ( 

1729 ": dataset does not exist" in stderr 

1730 or ": filesystem does not exist" in stderr # solaris 11.4.0 

1731 or ": no such pool" in stderr 

1732 or "does not have any resumable receive state to abort" in stderr # harmless `zfs receive -A` race 

1733 ): 

1734 return None 

1735 log.warning("%s", stderr.rstrip()) 

1736 raise RetryableError("Subprocess failed") from e 

1737 

1738 def try_ssh_command_with_retries(self, *args: Any, **kwargs: Any) -> str | None: 

1739 """Convenience method that auto-retries try_ssh_command() on failure.""" 

1740 return self._retry_template().call_with_retries(fn=lambda retry: self.try_ssh_command(*args, **kwargs)) 

1741 

1742 def run_ssh_command_with_retries(self, *args: Any, **kwargs: Any) -> str: 

1743 """Convenience method that auto-retries run_ssh_command() on transport failure (not on remote command failure).""" 

1744 return self._retry_template().call_with_retries(fn=lambda retry: self.run_ssh_command(*args, **kwargs)) 

1745 

1746 def maybe_inject_error(self, cmd: list[str], error_trigger: str | None = None) -> None: 

1747 """For testing only; for unit tests to simulate errors during replication and test correct handling of them.""" 

1748 if error_trigger: 

1749 counter = self.error_injection_triggers.get("before") 

1750 if counter and self.decrement_injection_counter(counter, error_trigger): 

1751 try: 

1752 raise CalledProcessError(returncode=1, cmd=" ".join(cmd), stderr=error_trigger + ":dataset is busy") 

1753 except subprocess.CalledProcessError as e: 

1754 if error_trigger.startswith("retryable_"): 

1755 raise RetryableError("Subprocess failed") from e 

1756 else: 

1757 raise 

1758 

1759 def decrement_injection_counter(self, counter: Counter[str], trigger: str) -> bool: 

1760 """For testing only.""" 

1761 with self.injection_lock: 

1762 if counter[trigger] <= 0: 

1763 return False 

1764 counter[trigger] -= 1 

1765 return True 

1766 

1767 

1768############################################################################# 

1769@final 

1770class DatasetProperties: 

1771 """Properties of a ZFS dataset.""" 

1772 

1773 __slots__ = ("recordsize", "snapshots_changed") # uses more compact memory layout than __dict__ 

1774 

1775 def __init__(self, recordsize: int, snapshots_changed: int) -> None: 

1776 # immutable variables: 

1777 self.recordsize: Final[int] = recordsize 

1778 

1779 # mutable variables: 

1780 self.snapshots_changed: int = snapshots_changed 

1781 

1782 

1783############################################################################# 

1784# Input format is [[user@]host:]dataset 

1785# 1234 5 6 

1786_DATASET_LOCATOR_REGEX: Final[re.Pattern[str]] = re.compile(r"(((([^@]*)@)?([^:]+)):)?(.*)", flags=re.DOTALL) 

1787 

1788 

1789def parse_dataset_locator( 

1790 input_text: str, validate: bool = True, user: str | None = None, host: str | None = None, port: int | None = None 

1791) -> tuple[str, str, str, str, str]: 

1792 """Splits user@host:dataset into its components with optional checks.""" 

1793 

1794 def convert_ipv6(hostname: str) -> str: # support IPv6 without getting confused by host:dataset colon separator ... 

1795 return hostname.replace("|", ":") # ... and any colons that may be part of a (valid) ZFS dataset name 

1796 

1797 user_undefined: bool = user is None 

1798 if user is None: 

1799 user = "" 

1800 host_undefined: bool = host is None 

1801 if host is None: 

1802 host = "" 

1803 host = convert_ipv6(host) 

1804 user_host, dataset, pool = "", "", "" 

1805 

1806 if match := _DATASET_LOCATOR_REGEX.fullmatch(input_text): 1806 ↛ 1823line 1806 didn't jump to line 1823 because the condition on line 1806 was always true

1807 if user_undefined: 

1808 user = match.group(4) or "" 

1809 if host_undefined: 

1810 host = match.group(5) or "" 

1811 host = convert_ipv6(host) 

1812 if host == "-": 

1813 host = "" 

1814 dataset = match.group(6) or "" 

1815 i = dataset.find("/") 

1816 pool = dataset[0:i] if i >= 0 else dataset 

1817 

1818 if user and host: 

1819 user_host = f"{user}@{host}" 

1820 elif host: 

1821 user_host = host 

1822 

1823 if validate: 

1824 validate_user_name(user, input_text) 

1825 validate_host_name(host, input_text) 

1826 if port is not None: 

1827 validate_port(port, f"Invalid port number: '{port}' for: '{input_text}' - ") 

1828 validate_dataset_name(dataset, input_text) 

1829 

1830 return user, host, user_host, pool, dataset 

1831 

1832 

1833def validate_user_name(user: str, input_text: str) -> None: 

1834 """Checks that the username is safe for ssh or local usage.""" 

1835 invalid_chars: str = SHELL_CHARS_AND_SLASH 

1836 if user and (user.startswith("-") or ".." in user or any(c.isspace() or c in invalid_chars for c in user)): 

1837 die(f"Invalid user name: '{user}' for: '{input_text}'") 

1838 

1839 

1840def validate_host_name(host: str, input_text: str) -> None: 

1841 """Checks hostname for forbidden characters or patterns.""" 

1842 invalid_chars: str = SHELL_CHARS_AND_SLASH 

1843 if host and (host.startswith("-") or ".." in host or any(c.isspace() or c in invalid_chars for c in host)): 

1844 die(f"Invalid host name: '{host}' for: '{input_text}'") 

1845 

1846 

1847def validate_port(port: str | int | None, message: str) -> None: 

1848 """Checks that port specification is a valid integer.""" 

1849 if isinstance(port, int): 

1850 port = str(port) 

1851 if port and not port.isdigit(): 

1852 die(message + f"must be empty or a positive integer: '{port}'") 

1853 

1854 

1855def normalize_called_process_error(error: subprocess.CalledProcessError) -> int: 

1856 """Normalizes `CalledProcessError.returncode` to avoid reserved exit codes so callers don't misclassify them.""" 

1857 ret: int = error.returncode 

1858 ret = DIE_STATUS if isinstance(ret, int) and 1 <= ret <= STILL_RUNNING_STATUS else ret 

1859 return ret 

1860 

1861 

1862############################################################################# 

1863if __name__ == "__main__": 1863 ↛ 1864line 1863 didn't jump to line 1864 because the condition on line 1863 was never true

1864 main()