Coverage for bzfs_main / bzfs.py: 99%
1087 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 10:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 10:16 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15# Inline script metadata conforming to https://packaging.python.org/specifications/inline-script-metadata
16# /// script
17# requires-python = ">=3.9"
18# dependencies = []
19# ///
20#
21"""
22* Main CLI entry point for replicating and managing ZFS snapshots. It handles the low-level mechanics of `zfs send/receive`,
23 data transfer, and snapshot management between two hosts.
24* Overview of the bzfs.py codebase:
25* The codebase starts with docs, definition of input data and associated argument parsing into a "Params" class.
26* All CLI option/parameter values are reachable from the "Params" class.
27* Control flow starts in main(), which kicks off a "Job".
28* A Job runs one or more "tasks" via run_tasks(), each task replicating a separate dataset tree.
29* The core replication algorithm is in run_task() and especially in replicate_datasets() and replicate_dataset().
30* The filter algorithms that apply include/exclude policies are in filter_datasets() and filter_snapshots().
31* The --create-src-snapshots-* and --delete-* and --compare-* and --monitor-* algorithms also start in run_task().
32* The main retry logic is in call_with_retries() and clear_resumable_recv_state_if_necessary().
33* Progress reporting for use during `zfs send/recv` data transfers is in class ProgressReporter.
34* Executing a CLI command on a local or remote host is in run_ssh_command().
35* Network connection management is in refresh_ssh_connection_if_necessary() and class ConnectionPool.
36* Cache functionality can be found by searching for this regex: .*cach.*
37* The parallel processing engine is in itr_ssh_cmd_parallel() and process_datasets_in_parallel_and_fault_tolerant().
38* README.md is mostly auto-generated from the ArgumentParser help texts as the source of "truth", via update_readme.sh.
39 Simply run that script whenever you change or add ArgumentParser help text.
40"""
42from __future__ import (
43 annotations,
44)
45import argparse
46import contextlib
47import fcntl
48import heapq
49import itertools
50import logging
51import os
52import re
53import subprocess
54import sys
55import threading
56import time
57from collections import (
58 Counter,
59 defaultdict,
60)
61from collections.abc import (
62 Collection,
63 Sequence,
64)
65from datetime import (
66 datetime,
67 timedelta,
68)
69from logging import (
70 Logger,
71)
72from pathlib import (
73 Path,
74)
75from subprocess import (
76 DEVNULL,
77 PIPE,
78 CalledProcessError,
79 TimeoutExpired,
80)
81from typing import (
82 Any,
83 Callable,
84 Final,
85 cast,
86 final,
87)
89import bzfs_main.loggers
90from bzfs_main.argparse_actions import (
91 has_timerange_filter,
92)
93from bzfs_main.argparse_cli import (
94 EXCLUDE_DATASET_REGEXES_DEFAULT,
95)
96from bzfs_main.compare_snapshot_lists import (
97 run_compare_snapshot_lists,
98)
99from bzfs_main.configuration import (
100 AlertConfig,
101 CreateSrcSnapshotConfig,
102 LogParams,
103 MonitorSnapshotAlert,
104 Params,
105 Remote,
106 SnapshotLabel,
107)
108from bzfs_main.detect import (
109 DISABLE_PRG,
110 RemoteConfCacheItem,
111 are_bookmarks_enabled,
112 detect_available_programs,
113 is_caching_snapshots,
114 is_dummy,
115 is_zpool_feature_enabled_or_active,
116)
117from bzfs_main.filter import (
118 SNAPSHOT_REGEX_FILTER_NAME,
119 dataset_regexes,
120 filter_datasets,
121 filter_lines,
122 filter_lines_except,
123 filter_snapshots,
124)
125from bzfs_main.loggers import (
126 get_simple_logger,
127 reset_logger,
128 set_logging_runtime_defaults,
129)
130from bzfs_main.parallel_batch_cmd import (
131 run_ssh_cmd_parallel,
132 zfs_list_snapshots_in_parallel,
133)
134from bzfs_main.progress_reporter import (
135 ProgressReporter,
136 count_num_bytes_transferred_by_zfs_send,
137)
138from bzfs_main.replication import (
139 delete_bookmarks,
140 delete_datasets,
141 delete_snapshots,
142 replicate_dataset,
143)
144from bzfs_main.snapshot_cache import (
145 MATURITY_TIME_THRESHOLD_SECS,
146 MONITOR_CACHE_FILE_PREFIX,
147 REPLICATION_CACHE_FILE_PREFIX,
148 SnapshotCache,
149 set_last_modification_time_safe,
150)
151from bzfs_main.util.connection import (
152 SHARED,
153 ConnectionPool,
154 MiniJob,
155 MiniRemote,
156 timeout,
157)
158from bzfs_main.util.parallel_iterator import (
159 run_in_parallel,
160)
161from bzfs_main.util.parallel_tasktree_policy import (
162 process_datasets_in_parallel_and_fault_tolerant,
163)
164from bzfs_main.util.retry import (
165 Retry,
166 RetryableError,
167 RetryTemplate,
168 RetryTerminationError,
169 RetryTiming,
170)
171from bzfs_main.util.utils import (
172 DESCENDANTS_RE_SUFFIX,
173 DIE_STATUS,
174 DONT_SKIP_DATASET,
175 FILE_PERMISSIONS,
176 LOG_DEBUG,
177 LOG_TRACE,
178 PROG_NAME,
179 SHELL_CHARS_AND_SLASH,
180 UMASK,
181 YEAR_WITH_FOUR_DIGITS_REGEX,
182 HashedInterner,
183 SortedInterner,
184 Subprocesses,
185 SynchronizedBool,
186 SynchronizedDict,
187 TaskTiming,
188 append_if_absent,
189 compile_regexes,
190 cut,
191 die,
192 has_duplicates,
193 human_readable_bytes,
194 human_readable_duration,
195 is_descendant,
196 percent,
197 pretty_print_formatter,
198 replace_in_lines,
199 replace_prefix,
200 sha256_85_urlsafe_base64,
201 sha256_128_urlsafe_base64,
202 stderr_to_str,
203 termination_signal_handler,
204 validate_dataset_name,
205 validate_property_name,
206 xappend,
207 xfinally,
208 xprint,
209)
211# constants:
212__version__: Final[str] = bzfs_main.argparse_cli.__version__
213CRITICAL_STATUS: Final[int] = 2
214WARNING_STATUS: Final[int] = 1
215STILL_RUNNING_STATUS: Final[int] = 4
216MIN_PYTHON_VERSION: Final[tuple[int, int]] = (3, 9)
217if sys.version_info < MIN_PYTHON_VERSION:
218 print(f"ERROR: {PROG_NAME} requires Python version >= {'.'.join(map(str, MIN_PYTHON_VERSION))}!")
219 sys.exit(DIE_STATUS)
222#############################################################################
223def argument_parser() -> argparse.ArgumentParser:
224 """Returns the CLI parser used by bzfs."""
225 return bzfs_main.argparse_cli.argument_parser()
228def main() -> None:
229 """API for command line clients."""
230 prev_umask: int = os.umask(UMASK)
231 try:
232 set_logging_runtime_defaults()
233 # On CTRL-C and SIGTERM, send signal to all descendant processes to terminate them
234 termination_event: threading.Event = threading.Event()
235 with termination_signal_handler(termination_events=[termination_event]):
236 run_main(argument_parser().parse_args(), sys.argv, termination_event=termination_event)
237 except subprocess.CalledProcessError as e:
238 sys.exit(normalize_called_process_error(e))
239 finally:
240 os.umask(prev_umask) # restore prior global state
243def run_main(
244 args: argparse.Namespace,
245 sys_argv: list[str] | None = None,
246 log: Logger | None = None,
247 termination_event: threading.Event | None = None,
248) -> None:
249 """API for Python clients; visible for testing; may become a public API eventually."""
250 Job(termination_event=termination_event).run_main(args, sys_argv, log)
253#############################################################################
254@final
255class Job(MiniJob):
256 """Executes one bzfs run, coordinating snapshot replication tasks."""
258 def __init__(self, termination_event: threading.Event | None = None) -> None:
259 self.params: Params
260 self.termination_event: Final[threading.Event] = termination_event or threading.Event()
261 self.retry_timing: Final[RetryTiming] = RetryTiming.make_from(self.termination_event).copy(
262 on_before_attempt=lambda retry: None
263 )
264 self.task_timing: Final[TaskTiming] = TaskTiming.make_from(self.termination_event)
265 self.subprocesses: Subprocesses = Subprocesses(self.termination_event.is_set)
266 self.all_dst_dataset_exists: Final[dict[str, dict[str, bool]]] = defaultdict(lambda: defaultdict(bool))
267 self.dst_dataset_exists: SynchronizedDict[str, bool] = SynchronizedDict({})
268 self.src_properties: dict[str, DatasetProperties] = {}
269 self.dst_properties: dict[str, DatasetProperties] = {}
270 self.all_exceptions: list[str] = []
271 self.all_exceptions_count: int = 0
272 self.max_exceptions_to_summarize: int = 10000
273 self.first_exception: BaseException | None = None
274 self.remote_conf_cache: dict[tuple, RemoteConfCacheItem] = {}
275 self.max_datasets_per_minibatch_on_list_snaps: dict[str, int] = {}
276 self.max_workers: dict[str, int] = {}
277 self.progress_reporter: ProgressReporter = cast(ProgressReporter, None)
278 self.is_first_replication_task: Final[SynchronizedBool] = SynchronizedBool(True)
279 self.replication_start_time_nanos: int = time.monotonic_ns()
280 self.timeout_nanos: int | None = None # timestamp aka instant in time
281 self.timeout_duration_nanos: int | None = None # duration (not a timestamp); for logging only
282 self.cache: SnapshotCache = SnapshotCache(self)
283 self.stats_lock: Final[threading.Lock] = threading.Lock()
284 self.num_cache_hits: int = 0
285 self.num_cache_misses: int = 0
286 self.num_snapshots_found: int = 0
287 self.num_snapshots_replicated: int = 0
289 self.is_test_mode: bool = False # for testing only
290 self.creation_prefix: str = "" # for testing only
291 self.use_select: bool = False # for testing only
292 self.progress_update_intervals: tuple[float, float] | None = None # for testing only
293 self.error_injection_triggers: dict[str, Counter[str]] = {} # for testing only
294 self.delete_injection_triggers: dict[str, Counter[str]] = {} # for testing only
295 self.param_injection_triggers: dict[str, dict[str, bool]] = {} # for testing only
296 self.inject_params: dict[str, bool] = {} # for testing only
297 self.injection_lock: threading.Lock = threading.Lock() # for testing only
298 self.max_command_line_bytes: int | None = None # for testing only
300 def shutdown(self) -> None:
301 """Exits any multiplexed ssh sessions that may be leftover."""
302 cache_items: Collection[RemoteConfCacheItem] = self.remote_conf_cache.values()
303 for i, cache_item in enumerate(cache_items):
304 cache_item.connection_pools.shutdown(f"{i + 1}/{len(cache_items)}")
306 def terminate(self) -> None:
307 """Shuts down gracefully; also terminates descendant processes, if any."""
308 with xfinally(self.subprocesses.terminate_process_subtrees):
309 self.shutdown()
311 def _retry_template(self) -> RetryTemplate:
312 p = self.params
313 return RetryTemplate(policy=p.retry_policy.copy(timing=self.retry_timing), log=p.log)
315 def run_main(self, args: argparse.Namespace, sys_argv: list[str] | None = None, log: Logger | None = None) -> None:
316 """Parses CLI arguments, sets up logging, and executes main job loop."""
317 assert isinstance(self.error_injection_triggers, dict)
318 assert isinstance(self.delete_injection_triggers, dict)
319 assert isinstance(self.inject_params, dict)
320 logger_name_suffix: str = ""
322 def _reset_logger() -> None:
323 if logger_name_suffix and log is not None: # reset Logger unless it's a Logger outside of our control
324 reset_logger(log)
326 with xfinally(_reset_logger): # runs _reset_logger() on exit, without masking error raised in body of `with` block
327 try:
328 log_params: LogParams = LogParams(args)
329 logger_name_suffix = "" if log is not None else log_params.logger_name_suffix
330 log = bzfs_main.loggers.get_logger(
331 log_params=log_params, args=args, log=log, logger_name_suffix=logger_name_suffix
332 )
333 log.info("%s", f"Log file is: {log_params.log_file}")
334 except BaseException as e:
335 simple_log: Logger = get_simple_logger(PROG_NAME, logger_name_suffix=logger_name_suffix)
336 try:
337 simple_log.error("Log init: %s", e, exc_info=not isinstance(e, SystemExit))
338 finally:
339 reset_logger(simple_log)
340 raise
342 aux_args: list[str] = []
343 if getattr(args, "include_snapshot_plan", None):
344 aux_args += args.include_snapshot_plan
345 if getattr(args, "delete_dst_snapshots_except_plan", None):
346 aux_args += args.delete_dst_snapshots_except_plan
347 if len(aux_args) > 0:
348 log.info("Auxiliary CLI arguments: %s", " ".join(aux_args))
349 args = argument_parser().parse_args(xappend(aux_args, "--", args.root_dataset_pairs), namespace=args)
351 def log_error_on_exit(error: Any, status_code: Any, exc_info: bool = False) -> None:
352 log.error("%s%s", f"Exiting {PROG_NAME} with status code {status_code}. Cause: ", error, exc_info=exc_info)
354 try:
355 log.info("CLI arguments: %s %s", " ".join(sys_argv or []), f"[uid: {os.getuid()}, euid: {os.geteuid()}]")
356 if self.is_test_mode:
357 log.log(LOG_TRACE, "Parsed CLI arguments: %s", args)
358 self.params = p = Params(args, sys_argv or [], log_params, log, self.inject_params)
359 self.timeout_duration_nanos = p.timeout_duration_nanos
360 lock_file: str = p.lock_file_name()
361 lock_fd = os.open(
362 lock_file, os.O_WRONLY | os.O_TRUNC | os.O_CREAT | os.O_NOFOLLOW | os.O_CLOEXEC, FILE_PERMISSIONS
363 )
364 with xfinally(lambda: os.close(lock_fd)):
365 try:
366 # Acquire an exclusive lock; will raise a BlockingIOError if lock is already held by this process or
367 # another process. The (advisory) lock is auto-released when the process terminates or the fd is
368 # closed.
369 fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # LOCK_NB ... non-blocking
370 except BlockingIOError:
371 msg = "Exiting as same previous periodic job is still running without completion yet per "
372 die(msg + lock_file, STILL_RUNNING_STATUS)
374 # xfinally: unlink the lock_file while still holding the flock on its fd - it's a correct and safe
375 # standard POSIX pattern:
376 # - Performing unlink() before close(fd) avoids a race where a subsequent bzfs process could recreate and
377 # lock a fresh inode for the same path between our close() and a later unlink(). In that case, a late
378 # unlink would delete the newer process's lock_file path.
379 # - At this point, critical work is complete; the remaining steps are shutdown mechanics that have no
380 # side effect, so this pattern is correct, safe, and simple.
381 with xfinally(lambda: Path(lock_file).unlink(missing_ok=True)): # don't accumulate stale files
382 try:
383 self.run_tasks() # do the real work
384 except BaseException:
385 self.terminate()
386 raise
387 self.shutdown()
388 with contextlib.suppress(BrokenPipeError):
389 sys.stderr.flush()
390 sys.stdout.flush()
391 except subprocess.CalledProcessError as e:
392 log_error_on_exit(e, e.returncode)
393 raise
394 except SystemExit as e:
395 log_error_on_exit(e, e.code)
396 raise
397 except (subprocess.TimeoutExpired, UnicodeDecodeError) as e:
398 log_error_on_exit(e, DIE_STATUS)
399 raise SystemExit(DIE_STATUS) from e
400 except re.error as e:
401 log_error_on_exit(f"{e} within regex {e.pattern!r}", DIE_STATUS)
402 raise SystemExit(DIE_STATUS) from e
403 except BaseException as e:
404 log_error_on_exit(e, DIE_STATUS, exc_info=True)
405 raise SystemExit(DIE_STATUS) from e
406 finally:
407 log.info("%s", f"Log file was: {log_params.log_file}")
408 log.info("Success. Goodbye!")
409 with contextlib.suppress(BrokenPipeError):
410 sys.stderr.flush()
411 sys.stdout.flush()
413 def run_tasks(self) -> None:
414 """Executes replication cycles, repeating until daemon lifetime expires."""
415 p, log = self.params, self.params.log
416 self.all_exceptions = []
417 self.all_exceptions_count = 0
418 self.first_exception = None
419 self.remote_conf_cache = {}
420 self.validate_once()
421 self.replication_start_time_nanos = time.monotonic_ns()
422 self.progress_reporter = ProgressReporter(log, p.pv_program_opts, self.use_select, self.progress_update_intervals)
423 with xfinally(lambda: self.progress_reporter.stop()):
424 daemon_stoptime_nanos: int = time.monotonic_ns() + p.daemon_lifetime_nanos
425 while True: # loop for daemon mode
426 self.timeout_nanos = (
427 None if p.timeout_duration_nanos is None else time.monotonic_ns() + p.timeout_duration_nanos
428 )
429 self.all_dst_dataset_exists.clear()
430 self.progress_reporter.reset()
431 src, dst = p.src, p.dst
432 for src_root_dataset, dst_root_dataset in p.root_dataset_pairs:
433 if self.termination_event.is_set():
434 self.terminate()
435 break
436 src.root_dataset = src.basis_root_dataset = src_root_dataset
437 dst.root_dataset = dst.basis_root_dataset = dst_root_dataset
438 p.curr_zfs_send_program_opts = p.zfs_send_program_opts.copy()
439 if p.daemon_lifetime_nanos > 0:
440 self.timeout_nanos = (
441 None if p.timeout_duration_nanos is None else time.monotonic_ns() + p.timeout_duration_nanos
442 )
443 recurs_sep = " " if p.recursive_flag else ""
444 task_description = f"{src.basis_root_dataset} {p.recursive_flag}{recurs_sep}--> {dst.basis_root_dataset}"
445 if len(p.root_dataset_pairs) > 1:
446 log.info("Starting task: %s", task_description + " ...")
447 try:
448 try:
449 self.maybe_inject_error(cmd=[], error_trigger="retryable_run_tasks")
450 timeout(self)
451 self.validate_task()
452 self.run_task() # do the real work
453 except RetryableError as retryable_error:
454 cause: BaseException | None = retryable_error.__cause__
455 assert cause is not None
456 raise cause.with_traceback(cause.__traceback__) # noqa: B904 re-raise of cause without chaining
457 except (CalledProcessError, TimeoutExpired, SystemExit, UnicodeDecodeError, RetryTerminationError) as e:
458 if p.skip_on_error == "fail" or (
459 isinstance(e, subprocess.TimeoutExpired) and p.daemon_lifetime_nanos == 0
460 ):
461 raise
462 log.error("%s", e)
463 self.append_exception(e, "task", task_description)
464 if not self.sleep_until_next_daemon_iteration(daemon_stoptime_nanos):
465 break
466 if not p.skip_replication:
467 self.print_replication_stats(self.replication_start_time_nanos)
468 error_count = self.all_exceptions_count
469 if error_count > 0 and p.daemon_lifetime_nanos == 0:
470 msgs = "\n".join(f"{i + 1}/{error_count}: {e}" for i, e in enumerate(self.all_exceptions))
471 log.error("%s", f"Tolerated {error_count} errors. Error Summary: \n{msgs}")
472 assert self.first_exception is not None
473 raise self.first_exception
475 def append_exception(self, e: BaseException, task_name: str, task_description: str) -> None:
476 """Records and logs an exception that was encountered while running a subtask."""
477 self.first_exception = self.first_exception or e
478 if len(self.all_exceptions) < self.max_exceptions_to_summarize: # cap max memory consumption
479 self.all_exceptions.append(str(e))
480 self.all_exceptions_count += 1
481 self.params.log.error(f"#{self.all_exceptions_count}: Done with %s: %s", task_name, task_description)
483 def sleep_until_next_daemon_iteration(self, daemon_stoptime_nanos: int) -> bool:
484 """Pauses until next scheduled snapshot time or daemon stop; Returns True to continue daemon loop; False to stop."""
485 sleep_nanos: int = daemon_stoptime_nanos - time.monotonic_ns()
486 if sleep_nanos <= 0:
487 return False
488 self.progress_reporter.pause()
489 p, log = self.params, self.params.log
490 config: CreateSrcSnapshotConfig = p.create_src_snapshots_config
491 curr_datetime: datetime = config.current_datetime + timedelta(microseconds=1)
492 next_snapshotting_event_dt: datetime = min(
493 (
494 config.anchors.round_datetime_up_to_duration_multiple(curr_datetime, duration_amount, duration_unit)
495 for duration_amount, duration_unit in config.suffix_durations.values()
496 ),
497 default=curr_datetime + timedelta(days=10 * 365), # infinity
498 )
499 offset: timedelta = next_snapshotting_event_dt - datetime.now(config.tz)
500 offset_nanos: int = (offset.days * 86400 + offset.seconds) * 1_000_000_000 + offset.microseconds * 1_000
501 sleep_nanos = min(sleep_nanos, max(0, offset_nanos))
502 log.info("Daemon sleeping for: %s%s", human_readable_duration(sleep_nanos), f" ... [Log {p.log_params.log_file}]")
503 self.termination_event.wait(sleep_nanos / 1_000_000_000) # allow early wakeup on async termination
504 config.current_datetime = datetime.now(config.tz)
505 return time.monotonic_ns() < daemon_stoptime_nanos and not self.termination_event.is_set()
507 def print_replication_stats(self, start_time_nanos: int) -> None:
508 """Logs overall replication statistics after a job cycle completes."""
509 p, log = self.params, self.params.log
510 elapsed_nanos: int = time.monotonic_ns() - start_time_nanos
511 msg = p.dry(f"zfs sent {self.num_snapshots_replicated} snapshots in {human_readable_duration(elapsed_nanos)}.")
512 if p.is_program_available("pv", "local"):
513 sent_bytes: int = count_num_bytes_transferred_by_zfs_send(p.log_params.pv_log_file)
514 sent_bytes_per_sec: int = round(1_000_000_000 * sent_bytes / (elapsed_nanos or 1))
515 msg += f" zfs sent {human_readable_bytes(sent_bytes)} [{human_readable_bytes(sent_bytes_per_sec)}/s]."
516 log.info("%s", msg.ljust(p.log_params.terminal_columns - len("2024-01-01 23:58:45 [I] ")))
518 def validate_once(self) -> None:
519 """Validates CLI parameters and compiles regex lists one time only, which will later be reused many times."""
520 p = self.params
521 p.zfs_recv_ox_names = self.recv_option_property_names(p.zfs_recv_program_opts)
522 for snapshot_filter in p.snapshot_filters:
523 for _filter in snapshot_filter:
524 if _filter.name == SNAPSHOT_REGEX_FILTER_NAME:
525 exclude_snapshot_regexes_strings, include_snapshot_regexes_strings = cast(
526 tuple[list[str], list[str]], _filter.options
527 )
528 exclude_snapshot_regexes = compile_regexes(exclude_snapshot_regexes_strings)
529 include_snapshot_regexes = compile_regexes(include_snapshot_regexes_strings or [".*"])
530 _filter.options = (exclude_snapshot_regexes, include_snapshot_regexes)
532 exclude_regexes: list[str] = [EXCLUDE_DATASET_REGEXES_DEFAULT]
533 if len(p.args.exclude_dataset_regex) > 0: # some patterns don't exclude anything
534 exclude_regexes = [regex for regex in p.args.exclude_dataset_regex if regex != "" and regex != "!.*"]
535 include_regexes: list[str] = p.args.include_dataset_regex
537 # relative datasets need not be compiled more than once as they don't change between tasks
538 def separate_abs_vs_rel_datasets(datasets: list[str]) -> tuple[list[str], list[str]]:
539 abs_datasets: list[str] = []
540 rel_datasets: list[str] = []
541 for dataset in datasets:
542 (abs_datasets if dataset.startswith("/") else rel_datasets).append(dataset)
543 return abs_datasets, rel_datasets
545 p.abs_exclude_datasets, rel_exclude_datasets = separate_abs_vs_rel_datasets(p.args.exclude_dataset)
546 p.abs_include_datasets, rel_include_datasets = separate_abs_vs_rel_datasets(p.args.include_dataset)
547 suffix = DESCENDANTS_RE_SUFFIX
548 p.tmp_exclude_dataset_regexes, p.tmp_include_dataset_regexes = (
549 compile_regexes(exclude_regexes + dataset_regexes(p.src, p.dst, rel_exclude_datasets), suffix=suffix),
550 compile_regexes(include_regexes + dataset_regexes(p.src, p.dst, rel_include_datasets), suffix=suffix),
551 )
553 if p.pv_program != DISABLE_PRG:
554 pv_program_opts_set = set(p.pv_program_opts)
555 if pv_program_opts_set.isdisjoint({"--bytes", "-b", "--bits", "-8"}):
556 die("--pv-program-opts must contain one of --bytes or --bits for progress metrics to function.")
557 if not p.log_params.quiet:
558 for opts in [["--eta", "-e"], ["--fineta", "-I"], ["--average-rate", "-a"]]:
559 if pv_program_opts_set.isdisjoint(opts):
560 die(f"--pv-program-opts must contain one of {', '.join(opts)} for progress report line to function.")
562 src, dst = p.src, p.dst
563 for remote in [src, dst]:
564 r, loc = remote, remote.location
565 validate_user_name(r.basis_ssh_user, f"--ssh-{loc}-user")
566 validate_host_name(r.basis_ssh_host, f"--ssh-{loc}-host")
567 validate_port(r.ssh_port, f"--ssh-{loc}-port ")
569 def validate_task(self) -> None:
570 """Validates a single replication task before execution."""
571 p, log = self.params, self.params.log
572 src, dst = p.src, p.dst
573 for remote in [src, dst]:
574 r = remote
575 r.ssh_user, r.ssh_host, r.ssh_user_host, r.pool, r.root_dataset = parse_dataset_locator(
576 r.basis_root_dataset, user=r.basis_ssh_user, host=r.basis_ssh_host, port=r.ssh_port
577 )
578 r.sudo, r.use_zfs_delegation = self.sudo_cmd(r.ssh_user_host, r.ssh_user)
579 local_addrs = ("",) if self.is_test_mode else ("", "127.0.0.1", "::1") # ::1 is IPv6 version of loopback address
580 remote.is_nonlocal = r.ssh_host not in local_addrs
581 self.dst_dataset_exists = SynchronizedDict(self.all_dst_dataset_exists[dst.ssh_user_host])
583 if src.ssh_host == dst.ssh_host:
584 msg = f"src: {src.basis_root_dataset}, dst: {dst.basis_root_dataset}"
585 if src.root_dataset == dst.root_dataset:
586 die(f"Source and destination dataset must not be the same! {msg}")
587 if p.recursive and (
588 is_descendant(src.root_dataset, of_root_dataset=dst.root_dataset)
589 or is_descendant(dst.root_dataset, of_root_dataset=src.root_dataset)
590 ):
591 die(f"Source and destination dataset trees must not overlap! {msg}")
593 suffx: str = DESCENDANTS_RE_SUFFIX # also match descendants of a matching dataset
594 p.exclude_dataset_regexes, p.include_dataset_regexes = (
595 p.tmp_exclude_dataset_regexes + compile_regexes(dataset_regexes(src, dst, p.abs_exclude_datasets), suffix=suffx),
596 p.tmp_include_dataset_regexes + compile_regexes(dataset_regexes(src, dst, p.abs_include_datasets), suffix=suffx),
597 )
598 if len(p.include_dataset_regexes) == 0:
599 p.include_dataset_regexes = [(re.compile(r".*"), False)]
601 detect_available_programs(self)
603 if is_zpool_feature_enabled_or_active(p, dst, "feature@large_blocks"):
604 append_if_absent(p.curr_zfs_send_program_opts, "--large-block")
606 self.max_workers = {}
607 self.max_datasets_per_minibatch_on_list_snaps = {}
608 for r in [src, dst]:
609 cpus: int = int(p.available_programs[r.location].get("getconf_cpu_count", 8))
610 threads, is_percent = p.threads
611 cpus = max(1, round(cpus * threads / 100.0) if is_percent else round(threads))
612 self.max_workers[r.location] = cpus
613 bs: int = max(1, p.max_datasets_per_batch_on_list_snaps) # 1024 by default
614 max_datasets_per_minibatch: int = p.max_datasets_per_minibatch_on_list_snaps
615 if max_datasets_per_minibatch <= 0:
616 max_datasets_per_minibatch = max(1, bs // cpus)
617 max_datasets_per_minibatch = min(bs, max_datasets_per_minibatch)
618 self.max_datasets_per_minibatch_on_list_snaps[r.location] = max_datasets_per_minibatch
619 log.log(
620 LOG_TRACE,
621 "%s",
622 f"max_datasets_per_batch_on_list_snaps: {p.max_datasets_per_batch_on_list_snaps}, "
623 f"max_datasets_per_minibatch_on_list_snaps: {max_datasets_per_minibatch}, "
624 f"max_workers: {self.max_workers[r.location]}, "
625 f"location: {r.location}",
626 )
627 if self.is_test_mode:
628 log.log(LOG_TRACE, "Validated Param values: %s", pretty_print_formatter(self.params))
630 def sudo_cmd(self, ssh_user_host: str, ssh_user: str) -> tuple[str, bool]:
631 """Returns sudo command prefix and whether root privileges are required."""
632 p: Params = self.params
633 assert isinstance(ssh_user_host, str)
634 assert isinstance(ssh_user, str)
635 assert isinstance(p.sudo_program, str)
636 assert isinstance(p.enable_privilege_elevation, bool)
638 is_root: bool = True
639 if ssh_user_host != "":
640 if ssh_user == "":
641 if os.getuid() != 0:
642 is_root = False
643 elif ssh_user != "root":
644 is_root = False
645 elif os.getuid() != 0:
646 is_root = False
648 if is_root:
649 sudo = "" # using sudo in an attempt to make ZFS operations work even if we are not root user?
650 use_zfs_delegation = False # or instead using 'zfs allow' delegation?
651 return sudo, use_zfs_delegation
652 elif p.enable_privilege_elevation:
653 if p.sudo_program == DISABLE_PRG:
654 die(f"sudo CLI is not available on host: {ssh_user_host or 'localhost'}")
655 # The '-n' option makes 'sudo' safer and more fail-fast. It avoids having sudo prompt the user for input of any
656 # kind. If a password is required for the sudo command to run, sudo will display an error message and exit.
657 return p.sudo_program + " -n", False
658 else:
659 return "", True
661 def run_task(self) -> None:
662 """Replicates all snapshots for the current root dataset pair."""
664 def filter_src_datasets() -> list[str]: # apply --{include|exclude}-dataset policy
665 return filter_datasets(self, src, basis_src_datasets) if src_datasets is None else src_datasets
667 p, log = self.params, self.params.log
668 src, dst = p.src, p.dst
669 max_workers: int = min(self.max_workers[src.location], self.max_workers[dst.location])
670 recursive_sep: str = " " if p.recursive_flag else ""
671 task_description: str = f"{src.basis_root_dataset} {p.recursive_flag}{recursive_sep}--> {dst.basis_root_dataset} ..."
672 failed: bool = False
673 src_datasets: list[str] | None = None
674 basis_src_datasets: list[str] = []
675 self.src_properties = {}
676 self.dst_properties = {}
677 if not is_dummy(src): # find src dataset or all datasets in src dataset tree (with --recursive)
678 basis_src_datasets = self.list_src_datasets_task()
680 if not p.create_src_snapshots_config.skip_create_src_snapshots:
681 log.info(p.dry("--create-src-snapshots: %s"), f"{src.basis_root_dataset} {p.recursive_flag}{recursive_sep}...")
682 src_datasets = filter_src_datasets() # apply include/exclude policy
683 self.create_src_snapshots_task(basis_src_datasets, src_datasets)
685 # Optionally, replicate src.root_dataset (optionally including its descendants) to dst.root_dataset
686 if not p.skip_replication:
687 if len(basis_src_datasets) == 0:
688 die(f"Replication: Source dataset does not exist: {src.basis_root_dataset}")
689 if is_dummy(dst):
690 die("Replication: Destination may be a dummy dataset only if exclusively creating snapshots on the source!")
691 src_datasets = filter_src_datasets() # apply include/exclude policy
692 failed = self.replicate_datasets(src_datasets, task_description, max_workers)
694 if failed or not (
695 p.delete_dst_datasets
696 or p.delete_dst_snapshots
697 or p.delete_empty_dst_datasets
698 or p.compare_snapshot_lists
699 or p.monitor_snapshots_config.enable_monitor_snapshots
700 ):
701 return
702 log.info("Listing dst datasets: %s", task_description)
703 if is_dummy(dst):
704 die("Destination may be a dummy dataset only if exclusively creating snapshots on the source!")
705 basis_dst_datasets: list[str] = self.list_dst_datasets_task()
706 dst_datasets: list[str] = filter_datasets(self, dst, basis_dst_datasets) # apply include/exclude policy
708 if p.delete_dst_datasets and not failed:
709 log.info(p.dry("--delete-dst-datasets: %s"), task_description)
710 basis_dst_datasets, dst_datasets = self.delete_dst_datasets_task(
711 basis_src_datasets, basis_dst_datasets, dst_datasets
712 )
714 if p.delete_dst_snapshots and not failed:
715 log.info(p.dry("--delete-dst-snapshots: %s"), task_description + f" [{len(dst_datasets)} datasets]")
716 failed = self.delete_destination_snapshots_task(basis_src_datasets, dst_datasets, max_workers, task_description)
718 if p.delete_empty_dst_datasets and p.recursive and not failed:
719 log.info(p.dry("--delete-empty-dst-datasets: %s"), task_description)
720 basis_dst_datasets, dst_datasets = self.delete_empty_dst_datasets_task(basis_dst_datasets, dst_datasets)
722 if p.compare_snapshot_lists and not failed:
723 log.info("--compare-snapshot-lists: %s", task_description)
724 if len(basis_src_datasets) == 0 and not is_dummy(src):
725 die(f"Source dataset does not exist: {src.basis_root_dataset}")
726 src_datasets = filter_src_datasets() # apply include/exclude policy
727 run_compare_snapshot_lists(self, src_datasets, dst_datasets)
729 if p.monitor_snapshots_config.enable_monitor_snapshots and not failed:
730 log.info("--monitor-snapshots: %s", task_description)
731 src_datasets = filter_src_datasets() # apply include/exclude policy
732 self.monitor_snapshots_task(src_datasets, dst_datasets, task_description)
734 def list_src_datasets_task(self) -> list[str]:
735 """Lists datasets on the source host."""
736 p = self.params
737 src = p.src
738 basis_src_datasets: list[str] = []
739 is_caching: bool = is_caching_snapshots(p, src)
740 props: str = "volblocksize,recordsize,name"
741 props = "snapshots_changed," + props if is_caching else props
742 cmd: list[str] = p.split_args(
743 f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o {props} {p.recursive_flag}", src.root_dataset
744 )
745 for line in (self.try_ssh_command_with_retries(src, LOG_DEBUG, cmd=cmd) or "").splitlines():
746 cols: list[str] = line.split("\t")
747 snapshots_changed, volblocksize, recordsize, src_dataset = cols if is_caching else ["-"] + cols
748 self.src_properties[src_dataset] = DatasetProperties(
749 recordsize=int(recordsize) if recordsize != "-" else -int(volblocksize),
750 snapshots_changed=int(snapshots_changed) if snapshots_changed and snapshots_changed != "-" else 0,
751 )
752 basis_src_datasets.append(src_dataset)
753 assert (not self.is_test_mode) or basis_src_datasets == sorted(basis_src_datasets), "List is not sorted"
754 return basis_src_datasets
756 def list_dst_datasets_task(self) -> list[str]:
757 """Lists datasets on the destination host."""
758 p, log = self.params, self.params.log
759 dst = p.dst
760 is_caching: bool = is_caching_snapshots(p, dst) and p.monitor_snapshots_config.enable_monitor_snapshots
761 props: str = "name"
762 props = "snapshots_changed," + props if is_caching else props
763 cmd: list[str] = p.split_args(
764 f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o {props} {p.recursive_flag}", dst.root_dataset
765 )
766 basis_dst_datasets: list[str] = []
767 basis_dst_datasets_str: str | None = self.try_ssh_command_with_retries(dst, LOG_TRACE, cmd=cmd)
768 if basis_dst_datasets_str is None:
769 log.warning("Destination dataset does not exist: %s", dst.root_dataset)
770 else:
771 for line in basis_dst_datasets_str.splitlines():
772 cols: list[str] = line.split("\t")
773 snapshots_changed, dst_dataset = cols if is_caching else ["-"] + cols
774 self.dst_properties[dst_dataset] = DatasetProperties(
775 recordsize=0,
776 snapshots_changed=int(snapshots_changed) if snapshots_changed and snapshots_changed != "-" else 0,
777 )
778 basis_dst_datasets.append(dst_dataset)
779 assert (not self.is_test_mode) or basis_dst_datasets == sorted(basis_dst_datasets), "List is not sorted"
780 return basis_dst_datasets
782 def create_src_snapshots_task(self, basis_src_datasets: list[str], src_datasets: list[str]) -> None:
783 """Atomically creates a new snapshot of the src datasets selected by --{include|exclude}-dataset* policy; implements
784 --create-src-snapshots.
786 The implementation attempts to fit as many datasets as possible into a single (atomic) 'zfs snapshot' command line,
787 using lexicographical sort order, and using 'zfs snapshot -r' to the extent that this is compatible with the
788 --{include|exclude}-dataset* pruning policy. The snapshots of all datasets that fit within the same single 'zfs
789 snapshot' CLI invocation will be taken within the same ZFS transaction group, and correspondingly have identical
790 'createtxg' ZFS property (but not necessarily identical 'creation' ZFS time property as ZFS actually provides no such
791 guarantee), and thus be consistent. Dataset names that can't fit into a single command line are spread over multiple
792 command line invocations, respecting the limits that the operating system places on the maximum length of a single
793 command line, per `getconf ARG_MAX`.
795 Time complexity is O((N log N) + (N * M log M)) where N is the number of datasets and M is the number of snapshots
796 per dataset. Space complexity is O(max(N, M)).
797 """
798 p, log = self.params, self.params.log
799 src = p.src
800 if len(basis_src_datasets) == 0:
801 die(f"Source dataset does not exist: {src.basis_root_dataset}")
802 datasets_to_snapshot: dict[SnapshotLabel, list[str]] = self.find_datasets_to_snapshot(src_datasets)
803 datasets_to_snapshot = {label: datasets for label, datasets in datasets_to_snapshot.items() if len(datasets) > 0}
804 basis_datasets_to_snapshot: dict[SnapshotLabel, list[str]] = datasets_to_snapshot.copy() # shallow copy
805 commands: dict[SnapshotLabel, list[str]] = {}
806 for label, datasets in datasets_to_snapshot.items():
807 cmd: list[str] = p.split_args(f"{src.sudo} {p.zfs_program} snapshot")
808 if p.recursive:
809 # Run 'zfs snapshot -r' on the roots of subtrees if possible, else fallback to non-recursive CLI flavor
810 root_datasets = self.root_datasets_if_recursive_zfs_snapshot_is_possible(datasets, basis_src_datasets)
811 if root_datasets is not None:
812 cmd.append("-r") # recursive; takes a snapshot of all datasets in the subtree(s)
813 datasets_to_snapshot[label] = root_datasets
814 commands[label] = cmd
815 creation_msg = f"Creating {sum(len(datasets) for datasets in basis_datasets_to_snapshot.values())} snapshots"
816 log.info(p.dry("--create-src-snapshots: %s"), f"{creation_msg} within {len(src_datasets)} datasets ...")
817 # create snapshots in large (parallel) batches, without using a command line that's too big for the OS to handle
818 run_ssh_cmd_parallel(
819 self,
820 src,
821 ((commands[lbl], (f"{ds}@{lbl}" for ds in datasets)) for lbl, datasets in datasets_to_snapshot.items()),
822 fn=lambda cmd, batch: self.run_ssh_command_with_retries(
823 src, is_dry=p.dry_run, print_stdout=True, cmd=cmd + batch, retry_on_generic_ssh_error=False
824 ), # retry_on_generic_ssh_error=False means only retry on SSH connect errors b/c `zfs snapshot` isn't idempotent
825 max_batch_items=2**29,
826 )
827 if is_caching_snapshots(p, src):
828 # perf: copy lastmodified time of source dataset into local cache to reduce future 'zfs list -t snapshot' calls
829 self.cache.update_last_modified_cache(basis_datasets_to_snapshot)
831 def delete_destination_snapshots_task(
832 self, basis_src_datasets: list[str], dst_datasets: list[str], max_workers: int, task_description: str
833 ) -> bool:
834 """Deletes existing destination snapshots that do not exist within the source dataset if they are included by the
835 --{include|exclude}-snapshot-* policy, and the destination dataset is included via --{include|exclude}-dataset*
836 policy; implements --delete-dst-snapshots. Does not attempt to delete snapshots that carry a `zfs hold`."""
837 p, log = self.params, self.params.log
838 src, dst = p.src, p.dst
839 kind: str = "bookmark" if p.delete_dst_bookmarks else "snapshot"
840 filter_needs_creation_time: bool = has_timerange_filter(p.snapshot_filters)
841 props: str = "guid,name,userrefs"
842 props = self.creation_prefix + "creation," + props if filter_needs_creation_time else props
843 basis_src_datasets_set: set[str] = set(basis_src_datasets)
844 num_snapshots_found, num_snapshots_deleted = 0, 0
846 def delete_destination_snapshots(dst_dataset: str, tid: str, retry: Retry) -> bool: # thread-safe
847 src_dataset: str = replace_prefix(dst_dataset, old_prefix=dst.root_dataset, new_prefix=src.root_dataset)
848 if src_dataset in basis_src_datasets_set and (are_bookmarks_enabled(p, src) or not p.delete_dst_bookmarks):
849 src_kind: str = kind
850 if not p.delete_dst_snapshots_no_crosscheck:
851 src_kind = "snapshot,bookmark" if are_bookmarks_enabled(p, src) else "snapshot"
852 src_cmd = p.split_args(f"{p.zfs_program} list -t {src_kind} -d 1 -s name -Hp -o guid", src_dataset)
853 else:
854 src_cmd = None
855 dst_cmd = p.split_args(f"{p.zfs_program} list -t {kind} -d 1 -s createtxg -Hp -o {props}", dst_dataset)
856 self.maybe_inject_delete(dst, dataset=dst_dataset, delete_trigger="zfs_list_delete_dst_snapshots")
857 src_snaps_with_guids, dst_snaps_with_guids_str = run_in_parallel( # list src+dst snapshots in parallel
858 lambda: set(self.run_ssh_command(src, LOG_TRACE, cmd=src_cmd).splitlines() if src_cmd else []),
859 lambda: self.try_ssh_command(dst, LOG_TRACE, cmd=dst_cmd),
860 )
861 if dst_snaps_with_guids_str is None:
862 log.warning("Third party deleted destination: %s", dst_dataset)
863 return False
864 held_dst_snapshots: set[str] = set()
865 dst_snaps_with_guids: list[str] = []
866 no_userrefs: tuple[str, ...] = ("", "-", "0") # ZFS snapshot property userrefs > 0 indicates a zfs hold
867 for line in dst_snaps_with_guids_str.splitlines():
868 dst_snaps_with_guids.append(line[: line.rindex("\t")]) # strip off trailing userrefs column
869 _, name, userrefs = line.rsplit("\t", 2)
870 if userrefs not in no_userrefs:
871 tag: str = name[name.index("@") + 1 :]
872 held_dst_snapshots.add(tag) # don't attempt to delete snapshots that carry a `zfs hold`
873 num_dst_snaps_with_guids = len(dst_snaps_with_guids)
874 basis_dst_snaps_with_guids: list[str] = dst_snaps_with_guids.copy()
875 if p.delete_dst_bookmarks:
876 replace_in_lines(dst_snaps_with_guids, old="#", new="@", count=1) # treat bookmarks as snapshots
877 # The check against the source dataset happens *after* filtering the dst snapshots with filter_snapshots().
878 # `p.delete_dst_snapshots_except` means the user wants to specify snapshots to *retain* aka *keep*
879 all_except: bool = p.delete_dst_snapshots_except
880 if p.delete_dst_snapshots_except and not is_dummy(src):
881 # However, as here we are in "except" mode AND the source is NOT a dummy, we first filter to get what
882 # the policy says to *keep* (so all_except=False for the filter_snapshots() call), then from that "keep"
883 # list, we later further refine by checking what's on the source dataset.
884 all_except = False
885 dst_snaps_with_guids = filter_snapshots(self, dst_snaps_with_guids, all_except=all_except)
886 if p.delete_dst_bookmarks:
887 replace_in_lines(dst_snaps_with_guids, old="@", new="#", count=1) # restore pre-filtering bookmark state
888 if filter_needs_creation_time:
889 dst_snaps_with_guids = cut(field=2, lines=dst_snaps_with_guids)
890 basis_dst_snaps_with_guids = cut(field=2, lines=basis_dst_snaps_with_guids)
891 if p.delete_dst_snapshots_except and not is_dummy(src): # Non-dummy Source + "Except" (Keep) Mode
892 # Retain dst snapshots that match snapshot filter policy AND are on src dataset, aka
893 # Delete dst snapshots except snapshots that match snapshot filter policy AND are on src dataset.
894 # Concretely, `dst_snaps_with_guids` contains GUIDs of DST snapshots that the filter policy says to KEEP.
895 # We only actually keep them if they are ALSO on the SRC.
896 # So, snapshots to DELETE (`dst_tags_to_delete`) are ALL snapshots on DST (`basis_dst_snaps_with_guids`)
897 # EXCEPT those whose GUIDs are in `dst_snaps_with_guids` AND ALSO in `src_snaps_with_guids`.
898 except_dst_guids: set[str] = set(cut(field=1, lines=dst_snaps_with_guids)).intersection(src_snaps_with_guids)
899 dst_tags_to_delete: list[str] = filter_lines_except(basis_dst_snaps_with_guids, except_dst_guids)
900 else: # Standard Delete Mode OR Dummy Source + "Except" (Keep) Mode
901 # In standard delete mode:
902 # `dst_snaps_with_guids` contains GUIDs of policy-selected snapshots on DST.
903 # We delete those that are NOT on SRC.
904 # `dst_tags_to_delete` = `dst_snaps_with_guids` - `src_snaps_with_guids`.
905 # In dummy source + "except" (keep) mode:
906 # `all_except` was True.
907 # `dst_snaps_with_guids` contains snaps NOT matching the "keep" policy -- these are the ones to delete.
908 # `src_snaps_with_guids` is empty.
909 # `dst_tags_to_delete` = `dst_snaps_with_guids` - {} = `dst_snaps_with_guids`.
910 dst_guids_to_delete = set(cut(field=1, lines=dst_snaps_with_guids)).difference(src_snaps_with_guids)
911 dst_tags_to_delete = filter_lines(dst_snaps_with_guids, dst_guids_to_delete)
912 separator: str = "#" if p.delete_dst_bookmarks else "@"
913 dst_tags_to_delete = cut(field=2, separator=separator, lines=dst_tags_to_delete)
914 if p.delete_dst_bookmarks:
915 delete_bookmarks(self, dst, dst_dataset, snapshot_tags=dst_tags_to_delete)
916 else:
917 dst_tags_to_delete = [tag for tag in dst_tags_to_delete if tag not in held_dst_snapshots]
918 delete_snapshots(self, dst, dst_dataset, snapshot_tags=dst_tags_to_delete)
919 with self.stats_lock:
920 nonlocal num_snapshots_found
921 num_snapshots_found += num_dst_snaps_with_guids
922 nonlocal num_snapshots_deleted
923 num_snapshots_deleted += len(dst_tags_to_delete)
924 if len(dst_tags_to_delete) > 0 and not p.delete_dst_bookmarks:
925 self.dst_properties[dst_dataset].snapshots_changed = 0 # invalidate cache
926 return True
928 # Run delete_destination_snapshots(dataset) for each dataset, while handling errors, retries + parallel exec
929 failed: bool = False
930 if are_bookmarks_enabled(p, dst) or not p.delete_dst_bookmarks:
931 start_time_nanos = time.monotonic_ns()
932 failed = process_datasets_in_parallel_and_fault_tolerant(
933 log=log,
934 datasets=dst_datasets,
935 process_dataset=delete_destination_snapshots, # lambda
936 skip_tree_on_error=lambda dataset: False,
937 skip_on_error=p.skip_on_error,
938 max_workers=max_workers,
939 timing=self.task_timing,
940 termination_handler=self.terminate,
941 enable_barriers=False,
942 task_name="--delete-dst-snapshots",
943 append_exception=self.append_exception,
944 retry_template=self._retry_template(),
945 dry_run=p.dry_run,
946 is_test_mode=self.is_test_mode,
947 )
948 elapsed_nanos = time.monotonic_ns() - start_time_nanos
949 log.info(
950 p.dry("--delete-dst-snapshots: %s"),
951 task_description + f" [Deleted {num_snapshots_deleted} out of {num_snapshots_found} {kind}s "
952 f"within {len(dst_datasets)} datasets; took {human_readable_duration(elapsed_nanos)}]",
953 )
954 return failed
956 def delete_dst_datasets_task(
957 self, basis_src_datasets: list[str], basis_dst_datasets: list[str], sorted_dst_datasets: list[str]
958 ) -> tuple[list[str], list[str]]:
959 """Deletes existing destination datasets that do not exist within the source dataset if they are included via
960 --{include|exclude}-dataset* policy; implements --delete-dst-datasets.
962 Do not recurse without --recursive. With --recursive, never delete non-selected dataset subtrees or their ancestors.
963 """
964 p = self.params
965 src, dst = p.src, p.dst
966 children: dict[str, set[str]] = defaultdict(set)
967 for dst_dataset in basis_dst_datasets: # Compute the direct children of each NON-FILTERED dataset
968 parent: str = os.path.dirname(dst_dataset)
969 children[parent].add(dst_dataset)
970 to_delete: set[str] = set()
971 for dst_dataset in reversed(sorted_dst_datasets): # Reverse order facilitates efficient O(N) time algorithm
972 if children[dst_dataset].issubset(to_delete):
973 to_delete.add(dst_dataset) # all children are deletable, thus the dataset itself is deletable too
974 to_delete = to_delete.difference(
975 replace_prefix(src_dataset, src.root_dataset, dst.root_dataset) for src_dataset in basis_src_datasets
976 )
977 delete_datasets(self, dst, to_delete)
978 sorted_dst_datasets = sorted(set(sorted_dst_datasets).difference(to_delete))
979 basis_dst_datasets = sorted(set(basis_dst_datasets).difference(to_delete))
980 return basis_dst_datasets, sorted_dst_datasets
982 def delete_empty_dst_datasets_task(
983 self, basis_dst_datasets: list[str], sorted_dst_datasets: list[str]
984 ) -> tuple[list[str], list[str]]:
985 """Deletes any existing destination dataset that has no snapshot and no bookmark if all descendants of that dataset
986 do not have a snapshot or bookmark either; implements --delete-empty-dst-datasets.
988 To do so, we walk the dataset list (conceptually, a tree) depth-first (i.e. sorted descending). If a dst dataset has
989 zero snapshots and zero bookmarks and all its children are already marked as orphans, then it is itself an orphan,
990 and we mark it as such. Walking in a reverse sorted way means that we efficiently check for zero snapshots/bookmarks
991 not just over the direct children but the entire tree. Finally, delete all orphan datasets in an efficient batched
992 way.
993 """
994 p = self.params
995 dst = p.dst
997 # Compute the direct children of each NON-FILTERED dataset. Thus, no non-selected dataset and no ancestor of a
998 # non-selected dataset will ever be added to the "orphan" set. In other words, this treats non-selected dataset
999 # subtrees as if they all had snapshots, so non-selected dataset subtrees and their ancestors are guaranteed
1000 # to not get deleted.
1001 children: dict[str, set[str]] = defaultdict(set)
1002 for dst_dataset in basis_dst_datasets:
1003 parent: str = os.path.dirname(dst_dataset)
1004 children[parent].add(dst_dataset)
1006 def compute_orphans(datasets_having_snapshots: set[str]) -> set[str]:
1007 """Returns destination datasets having zero snapshots whose children are all orphans."""
1008 orphans: set[str] = set()
1009 for dst_dataset in reversed(sorted_dst_datasets): # Reverse order facilitates efficient O(N) time algorithm
1010 if (dst_dataset not in datasets_having_snapshots) and children[dst_dataset].issubset(orphans):
1011 orphans.add(dst_dataset)
1012 return orphans
1014 # Compute candidate orphan datasets, which reduces the list of datasets for which we list snapshots via
1015 # 'zfs list -t snapshot ...' from dst_datasets to a subset of dst_datasets, which in turn reduces I/O and improves
1016 # perf. Essentially, this eliminates the I/O to list snapshots for ancestors of excluded datasets.
1017 candidate_orphans: set[str] = compute_orphans(set())
1019 # Compute destination datasets having more than zero snapshots
1020 dst_datasets_having_snapshots: set[str] = set()
1021 with_bookmarks: bool = p.delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots and are_bookmarks_enabled(p, dst)
1022 btype: str = "bookmark,snapshot" if with_bookmarks else "snapshot"
1023 cmd: list[str] = p.split_args(f"{p.zfs_program} list -t {btype} -d 1 -S name -Hp -o name")
1024 for snapshots in zfs_list_snapshots_in_parallel(self, dst, cmd, sorted(candidate_orphans), ordered=False):
1025 if with_bookmarks:
1026 replace_in_lines(snapshots, old="#", new="@", count=1) # treat bookmarks as snapshots
1027 dst_datasets_having_snapshots.update(snap[: snap.index("@")] for snap in snapshots) # union
1029 orphans: set[str] = compute_orphans(dst_datasets_having_snapshots) # compute the real orphans
1030 delete_datasets(self, dst, orphans) # finally, delete the orphan datasets in an efficient way
1031 sorted_dst_datasets = sorted(set(sorted_dst_datasets).difference(orphans))
1032 basis_dst_datasets = sorted(set(basis_dst_datasets).difference(orphans))
1033 return basis_dst_datasets, sorted_dst_datasets
1035 def monitor_snapshots_task(
1036 self, sorted_src_datasets: list[str], sorted_dst_datasets: list[str], task_description: str
1037 ) -> None:
1038 """Monitors src and dst snapshots; implements --monitor-snapshots."""
1039 p, log = self.params, self.params.log
1040 src, dst = p.src, p.dst
1041 num_cache_hits: int = self.num_cache_hits
1042 num_cache_misses: int = self.num_cache_misses
1043 start_time_nanos: int = time.monotonic_ns()
1044 dst_alert, src_alert = run_in_parallel(
1045 lambda: self.monitor_snapshots(dst, sorted_dst_datasets),
1046 lambda: self.monitor_snapshots(src, sorted_src_datasets),
1047 )
1048 exit_code, _exit_kind, exit_msg = min(dst_alert, src_alert)
1049 if exit_code != 0:
1050 die(exit_msg, -exit_code)
1051 elapsed: str = human_readable_duration(time.monotonic_ns() - start_time_nanos)
1052 num_cache_hits = self.num_cache_hits - num_cache_hits
1053 num_cache_misses = self.num_cache_misses - num_cache_misses
1054 if num_cache_hits > 0 or num_cache_misses > 0:
1055 msg = self._cache_hits_msg(hits=num_cache_hits, misses=num_cache_misses)
1056 else:
1057 msg = ""
1058 log.info(
1059 "--monitor-snapshots done: %s",
1060 f"{task_description} [{len(sorted_src_datasets) + len(sorted_dst_datasets)} datasets; took {elapsed}{msg}]",
1061 )
1063 def monitor_snapshots(self, remote: Remote, sorted_datasets: list[str]) -> tuple[int, str, str]:
1064 """Checks snapshot freshness and warns or errors out when limits are exceeded.
1066 Alerts the user if the ZFS 'creation' time property of the latest or oldest snapshot for any specified snapshot name
1067 pattern within the selected datasets is too old wrt. the specified age limit. The purpose is to check if snapshots
1068 are successfully taken on schedule, successfully replicated on schedule, and successfully pruned on schedule. Process
1069 exit code is 0, 1, 2 on OK, WARNING, CRITICAL, respectively.
1071 Time complexity is O((N log N) + (N * M log M)) where N is the number of datasets and M is the number of snapshots
1072 per dataset. Space complexity is O(max(N, M)).
1073 """
1074 p, log = self.params, self.params.log
1075 alerts: list[MonitorSnapshotAlert] = p.monitor_snapshots_config.alerts
1076 labels: list[SnapshotLabel] = [alert.label for alert in alerts]
1077 oldest_skip_holds: list[bool] = [alert.oldest_skip_holds for alert in alerts]
1078 current_unixtime_millis: float = p.create_src_snapshots_config.current_datetime.timestamp() * 1000
1079 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
1080 if is_caching_snapshots(p, remote):
1081 props: dict[str, DatasetProperties] = self.dst_properties if remote is p.dst else self.src_properties
1082 snapshots_changed_dict: dict[str, int] = {dataset: vals.snapshots_changed for dataset, vals in props.items()}
1083 alerts_hash: str = sha256_128_urlsafe_base64(str(tuple(alerts)))
1084 label_hashes: dict[SnapshotLabel, str] = {
1085 label: sha256_128_urlsafe_base64(label.notimestamp_str()) for label in labels
1086 }
1087 is_caching: bool = False
1088 worst_alert: tuple[int, str, str] = (0, "", "") # -exit_code, exit_kind, exit_msg
1090 def record_alert(exit_code: int, exit_kind: str, exit_msg: str) -> None:
1091 nonlocal worst_alert
1092 worst_alert = min(worst_alert, (-exit_code, exit_kind, exit_msg)) # min() sorts "Latest" before "Oldest" on tie
1094 def monitor_last_modified_cache_file(r: Remote, dataset: str, label: SnapshotLabel, alert_cfg: AlertConfig) -> str:
1095 cache_label: str = os.path.join(MONITOR_CACHE_FILE_PREFIX, alert_cfg.kind[0], label_hashes[label], alerts_hash)
1096 return self.cache.last_modified_cache_file(r, dataset, cache_label)
1098 def alert_msg(
1099 kind: str, dataset: str, snapshot: str, label: SnapshotLabel, snapshot_age_millis: float, delta_millis: int
1100 ) -> str:
1101 assert kind == "Latest" or kind == "Oldest"
1102 lbl = f"{label.prefix}{label.infix}<timestamp>{label.suffix}"
1103 if snapshot_age_millis >= current_unixtime_millis:
1104 return f"No snapshot exists for {dataset}@{lbl}"
1105 msg = f"{kind} snapshot for {dataset}@{lbl} is {human_readable_duration(snapshot_age_millis, unit='ms')} old"
1106 s = f": @{snapshot}" if snapshot else ""
1107 if delta_millis == -1:
1108 return f"{msg}{s}"
1109 return f"{msg} but should be at most {human_readable_duration(delta_millis, unit='ms')} old{s}"
1111 def check_alert(
1112 label: SnapshotLabel, alert_cfg: AlertConfig | None, creation_unixtime_secs: int, dataset: str, snapshot: str
1113 ) -> None: # thread-safe
1114 if alert_cfg is None:
1115 return
1116 if is_caching and not p.dry_run: # update cache with latest state from 'zfs list -t snapshot'
1117 snapshots_changed: int = snapshots_changed_dict.get(dataset, 0)
1118 cache_file: str = monitor_last_modified_cache_file(remote, dataset, label, alert_cfg)
1119 set_last_modification_time_safe(
1120 cache_file, unixtime_in_secs=(creation_unixtime_secs, snapshots_changed), if_more_recent=True
1121 )
1122 warning_millis: int = alert_cfg.warning_millis
1123 critical_millis: int = alert_cfg.critical_millis
1124 alert_kind = alert_cfg.kind
1125 snapshot_age_millis: float = current_unixtime_millis - creation_unixtime_secs * 1000
1126 m = "--monitor_snapshots: "
1127 if snapshot_age_millis > critical_millis:
1128 msg = m + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, critical_millis)
1129 log.critical("%s", msg)
1130 if not p.monitor_snapshots_config.dont_crit:
1131 record_alert(CRITICAL_STATUS, alert_kind, msg)
1132 elif snapshot_age_millis > warning_millis:
1133 msg = m + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, warning_millis)
1134 log.warning("%s", msg)
1135 if not p.monitor_snapshots_config.dont_warn:
1136 record_alert(WARNING_STATUS, alert_kind, msg)
1137 elif is_debug:
1138 msg = m + "OK. " + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, delta_millis=-1)
1139 log.debug("%s", msg)
1141 def alert_latest_snapshot(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None:
1142 alert: MonitorSnapshotAlert = alerts[i]
1143 check_alert(alert.label, alert.latest, creation_unixtime_secs, dataset, snapshot)
1145 def alert_oldest_snapshot(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None:
1146 alert: MonitorSnapshotAlert = alerts[i]
1147 check_alert(alert.label, alert.oldest, creation_unixtime_secs, dataset, snapshot)
1149 def find_stale_datasets_and_check_alerts() -> list[str]:
1150 """If the cache is enabled, check which datasets have changed to determine which datasets can be skipped cheaply,
1151 that is, without incurring 'zfs list -t snapshots'.
1153 This is done by comparing the "snapshots_changed" ZFS dataset property with the local cache. See
1154 https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed
1155 """
1156 stale_datasets: list[str] = []
1157 time_threshold: float = time.time() - MATURITY_TIME_THRESHOLD_SECS
1158 for dataset in sorted_datasets:
1159 is_stale_dataset: bool = False
1160 snapshots_changed: int = snapshots_changed_dict.get(dataset, 0)
1161 for alert in alerts:
1162 for cfg in (alert.latest, alert.oldest):
1163 if cfg is None:
1164 continue
1165 if (
1166 snapshots_changed != 0
1167 and snapshots_changed < time_threshold
1168 and ( # always True
1169 cached_unix_times := self.cache.get_snapshots_changed2(
1170 monitor_last_modified_cache_file(remote, dataset, alert.label, cfg)
1171 )
1172 )
1173 and snapshots_changed == cached_unix_times[1] # cached snapshots_changed aka last modified time
1174 and snapshots_changed >= cached_unix_times[0] # creation time of minmax snapshot aka access time
1175 ): # cached state is still valid; emit an alert if the latest/oldest snapshot is too old
1176 lbl = alert.label
1177 check_alert(lbl, cfg, creation_unixtime_secs=cached_unix_times[0], dataset=dataset, snapshot="")
1178 else: # cached state is no longer valid; fallback to 'zfs list -t snapshot'
1179 is_stale_dataset = True
1180 if is_stale_dataset:
1181 stale_datasets.append(dataset)
1182 return stale_datasets
1184 # satisfy request from local cache as much as possible
1185 if is_caching_snapshots(p, remote):
1186 stale_datasets: list[str] = find_stale_datasets_and_check_alerts()
1187 with self.stats_lock:
1188 self.num_cache_misses += len(stale_datasets)
1189 self.num_cache_hits += len(sorted_datasets) - len(stale_datasets)
1190 else:
1191 stale_datasets = sorted_datasets
1193 # fallback to 'zfs list -t snapshot' for any remaining datasets, as these couldn't be satisfied from local cache
1194 is_caching = is_caching_snapshots(p, remote)
1195 datasets_without_snapshots: list[str] = self.handle_minmax_snapshots(
1196 remote,
1197 stale_datasets,
1198 labels,
1199 fn_latest=alert_latest_snapshot,
1200 fn_oldest=alert_oldest_snapshot,
1201 fn_oldest_skip_holds=oldest_skip_holds,
1202 )
1203 for dataset in datasets_without_snapshots:
1204 for i in range(len(alerts)):
1205 alert_latest_snapshot(i, creation_unixtime_secs=0, dataset=dataset, snapshot="")
1206 alert_oldest_snapshot(i, creation_unixtime_secs=0, dataset=dataset, snapshot="")
1207 return worst_alert
1209 def replicate_datasets(self, src_datasets: list[str], task_description: str, max_workers: int) -> bool:
1210 """Replicates a list of datasets."""
1211 assert (not self.is_test_mode) or src_datasets == sorted(src_datasets), "List is not sorted"
1212 p, log = self.params, self.params.log
1213 src, dst = p.src, p.dst
1214 self.num_snapshots_found = 0
1215 self.num_snapshots_replicated = 0
1216 log.info("Starting replication task: %s", task_description + f" [{len(src_datasets)} datasets]")
1217 start_time_nanos: int = time.monotonic_ns()
1219 def src2dst(src_dataset: str) -> str:
1220 return replace_prefix(src_dataset, old_prefix=src.root_dataset, new_prefix=dst.root_dataset)
1222 def dst2src(dst_dataset: str) -> str:
1223 return replace_prefix(dst_dataset, old_prefix=dst.root_dataset, new_prefix=src.root_dataset)
1225 def find_stale_datasets() -> tuple[list[str], dict[str, str]]:
1226 """If the cache is enabled on replication, check which src datasets or dst datasets have changed to determine
1227 which datasets can be skipped cheaply, i.e. without incurring 'zfs list -t snapshots'.
1229 This is done by comparing the "snapshots_changed" ZFS dataset property with the local cache. See
1230 https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed
1231 """
1232 # First, check which src datasets have changed since the last replication to that destination
1233 cache_files: dict[str, str] = {}
1234 stale_src_datasets1: list[str] = []
1235 maybe_stale_dst_datasets: list[str] = []
1236 userhost_dir: str = sha256_85_urlsafe_base64(p.dst.cache_namespace())
1237 filter_key = tuple(tuple(f) for f in p.snapshot_filters) # cache is only valid for same --include/excl-snapshot*
1238 filter_hash_code: str = sha256_85_urlsafe_base64(str(filter_key))
1239 for src_dataset in src_datasets:
1240 dst_dataset: str = src2dst(src_dataset) # cache is only valid for identical destination dataset
1241 dst_dataset_dir: str = sha256_85_urlsafe_base64(dst_dataset)
1242 cache_label: str = os.path.join(
1243 REPLICATION_CACHE_FILE_PREFIX, userhost_dir, dst_dataset_dir, filter_hash_code
1244 )
1245 cache_file: str = self.cache.last_modified_cache_file(src, src_dataset, cache_label)
1246 cache_files[src_dataset] = cache_file
1247 snapshots_changed: int = self.src_properties[src_dataset].snapshots_changed # get prop "for free"
1248 if (
1249 snapshots_changed != 0
1250 and time.time() > snapshots_changed + MATURITY_TIME_THRESHOLD_SECS
1251 and snapshots_changed == self.cache.get_snapshots_changed(cache_file)
1252 ):
1253 maybe_stale_dst_datasets.append(dst_dataset)
1254 else:
1255 stale_src_datasets1.append(src_dataset)
1257 # For each src dataset that hasn't changed, check if the corresponding dst dataset has changed
1258 stale_src_datasets2: list[str] = []
1259 dst_snapshots_changed_dict: dict[str, int] = self.cache.zfs_get_snapshots_changed(dst, maybe_stale_dst_datasets)
1260 for dst_dataset in maybe_stale_dst_datasets:
1261 snapshots_changed = dst_snapshots_changed_dict.get(dst_dataset, 0)
1262 if (
1263 snapshots_changed != 0
1264 and time.time() > snapshots_changed + MATURITY_TIME_THRESHOLD_SECS
1265 and snapshots_changed
1266 == self.cache.get_snapshots_changed(self.cache.last_modified_cache_file(dst, dst_dataset))
1267 ):
1268 log.info("Already up-to-date [cached]: %s", dst_dataset)
1269 else:
1270 stale_src_datasets2.append(dst2src(dst_dataset))
1271 assert (not self.is_test_mode) or stale_src_datasets1 == sorted(stale_src_datasets1), "List is not sorted"
1272 assert (not self.is_test_mode) or stale_src_datasets2 == sorted(stale_src_datasets2), "List is not sorted"
1273 stale_src_datasets = list(heapq.merge(stale_src_datasets1, stale_src_datasets2)) # merge two sorted lists
1274 assert (not self.is_test_mode) or not has_duplicates(stale_src_datasets), "List contains duplicates"
1275 return stale_src_datasets, cache_files
1277 if is_caching_snapshots(p, src):
1278 stale_src_datasets, cache_files = find_stale_datasets()
1279 num_cache_misses = len(stale_src_datasets)
1280 num_cache_hits = len(src_datasets) - len(stale_src_datasets)
1281 self.num_cache_misses += num_cache_misses
1282 self.num_cache_hits += num_cache_hits
1283 cmsg = self._cache_hits_msg(hits=num_cache_hits, misses=num_cache_misses)
1284 else:
1285 stale_src_datasets = src_datasets
1286 cache_files = {}
1287 cmsg = ""
1289 done_src_datasets: list[str] = []
1290 done_src_datasets_lock: threading.Lock = threading.Lock()
1292 def _process_dataset_fn(src_dataset: str, tid: str, retry: Retry) -> bool:
1293 result: bool = replicate_dataset(job=self, src_dataset=src_dataset, tid=tid, retry=retry)
1294 with done_src_datasets_lock:
1295 done_src_datasets.append(src_dataset) # record datasets that were actually replicated (not skipped)
1296 return result
1298 # Run replicate_dataset(dataset) for each dataset, while taking care of errors, retries + parallel execution
1299 failed: bool = process_datasets_in_parallel_and_fault_tolerant(
1300 log=log,
1301 datasets=stale_src_datasets,
1302 process_dataset=_process_dataset_fn,
1303 skip_tree_on_error=lambda dataset: not self.dst_dataset_exists[src2dst(dataset)],
1304 skip_on_error=p.skip_on_error,
1305 max_workers=max_workers,
1306 timing=self.task_timing,
1307 termination_handler=self.terminate,
1308 enable_barriers=False,
1309 task_name="Replication",
1310 append_exception=self.append_exception,
1311 retry_template=self._retry_template(),
1312 dry_run=p.dry_run,
1313 is_test_mode=self.is_test_mode,
1314 )
1316 if is_caching_snapshots(p, src) and len(done_src_datasets) > 0:
1317 # refresh "snapshots_changed" ZFS dataset property from dst
1318 stale_dst_datasets: list[str] = [src2dst(src_dataset) for src_dataset in sorted(done_src_datasets)]
1319 dst_snapshots_changed_dict: dict[str, int] = self.cache.zfs_get_snapshots_changed(dst, stale_dst_datasets)
1320 for dst_dataset in stale_dst_datasets: # update local cache
1321 dst_snapshots_changed: int = dst_snapshots_changed_dict.get(dst_dataset, 0)
1322 dst_cache_file: str = self.cache.last_modified_cache_file(dst, dst_dataset)
1323 src_dataset: str = dst2src(dst_dataset)
1324 src_snapshots_changed: int = self.src_properties[src_dataset].snapshots_changed
1325 if not p.dry_run:
1326 set_last_modification_time_safe(
1327 cache_files[src_dataset], unixtime_in_secs=src_snapshots_changed, if_more_recent=True
1328 )
1329 set_last_modification_time_safe(
1330 dst_cache_file, unixtime_in_secs=dst_snapshots_changed, if_more_recent=True
1331 )
1333 elapsed_nanos: int = time.monotonic_ns() - start_time_nanos
1334 log.info(
1335 p.dry("Replication done: %s"),
1336 f"{task_description} [Replicated {self.num_snapshots_replicated} out of {self.num_snapshots_found} snapshots"
1337 f" within {len(src_datasets)} datasets; took {human_readable_duration(elapsed_nanos)}{cmsg}]",
1338 )
1339 return failed
1341 def maybe_inject_delete(self, remote: Remote, dataset: str, delete_trigger: str) -> None:
1342 """For testing only; for unit tests to delete datasets during replication and test correct handling of that."""
1343 assert delete_trigger
1344 counter = self.delete_injection_triggers.get("before")
1345 if counter and self.decrement_injection_counter(counter, delete_trigger):
1346 p = self.params
1347 cmd = p.split_args(f"{remote.sudo} {p.zfs_program} destroy -r", p.force_unmount, p.force_hard, dataset or "")
1348 self.run_ssh_command(remote, LOG_DEBUG, print_stdout=True, cmd=cmd)
1350 def maybe_inject_params(self, error_trigger: str) -> None:
1351 """For testing only; for unit tests to simulate errors during replication and test correct handling of them."""
1352 assert error_trigger
1353 counter = self.error_injection_triggers.get("before")
1354 if counter and self.decrement_injection_counter(counter, error_trigger):
1355 self.inject_params = self.param_injection_triggers[error_trigger]
1356 elif error_trigger in self.param_injection_triggers:
1357 self.inject_params = {}
1359 @staticmethod
1360 def recv_option_property_names(recv_opts: list[str]) -> set[str]:
1361 """Extracts -o and -x property names that are already specified on the command line; This can be used to check for
1362 dupes because 'zfs receive' does not accept multiple -o or -x options with the same property name."""
1363 propnames: set[str] = set()
1364 i = 0
1365 n = len(recv_opts)
1366 while i < n:
1367 stripped: str = recv_opts[i].strip()
1368 if stripped in ("-o", "-x"):
1369 i += 1
1370 if i == n or recv_opts[i].strip() in ("-o", "-x"):
1371 die(f"Missing value for {stripped} option in --zfs-recv-program-opt(s): {' '.join(recv_opts)}")
1372 if stripped == "-o" and "=" not in recv_opts[i]:
1373 die(f"Missing value for {stripped} name=value pair in --zfs-recv-program-opt(s): {' '.join(recv_opts)}")
1374 propname: str = recv_opts[i] if stripped == "-x" else recv_opts[i].split("=", 1)[0]
1375 validate_property_name(propname, "--zfs-recv-program-opt(s)")
1376 propnames.add(propname)
1377 i += 1
1378 return propnames
1380 def root_datasets_if_recursive_zfs_snapshot_is_possible(
1381 self, datasets: list[str], basis_datasets: list[str]
1382 ) -> list[str] | None:
1383 """Returns the root datasets within the (filtered) `datasets` list if no incompatible pruning is detected. A dataset
1384 within `datasets` is considered a root dataset if it has no parent, i.e. it is not a descendant of any dataset in
1385 `datasets`. Returns `None` if any (unfiltered) dataset in `basis_dataset` that is a descendant of at least one of the
1386 root datasets is missing in `datasets`, indicating that --include/exclude-dataset* or the snapshot schedule have
1387 pruned a dataset in a way that is incompatible with 'zfs snapshot -r' CLI semantics, thus requiring a switch to the
1388 non-recursive 'zfs snapshot snapshot1 .. snapshot N' CLI flavor.
1390 Assumes that set(datasets).issubset(set(basis_datasets)). Also assumes that datasets and basis_datasets are both
1391 sorted (and thus the output root_datasets list is sorted too), which is why this algorithm is efficient - O(N) time
1392 complexity. The impl is akin to the merge algorithm of a merge sort, adapted to our specific use case.
1393 See root_datasets_if_recursive_zfs_snapshot_is_possible_slow_but_correct() in the unit test suite for an alternative
1394 impl that's easier to grok.
1395 """
1396 assert (not self.is_test_mode) or datasets == sorted(datasets), "List is not sorted"
1397 assert (not self.is_test_mode) or not has_duplicates(datasets), "List contains duplicates"
1398 assert (not self.is_test_mode) or basis_datasets == sorted(basis_datasets), "List is not sorted"
1399 assert (not self.is_test_mode) or not has_duplicates(basis_datasets), "List contains duplicates"
1400 assert (not self.is_test_mode) or set(datasets).issubset(set(basis_datasets)), "Not a subset"
1401 root_datasets: list[str] = self.find_root_datasets(datasets)
1402 i = 0
1403 j = 0
1404 k = 0
1405 len_root_datasets = len(root_datasets)
1406 len_basis_datasets = len(basis_datasets)
1407 len_datasets = len(datasets)
1408 while i < len_root_datasets and j < len_basis_datasets: # walk and "merge" the sorted lists, in sync
1409 if basis_datasets[j] < root_datasets[i]: # irrelevant subtree?
1410 j += 1 # move to next basis_datasets[j]
1411 elif is_descendant(basis_datasets[j], of_root_dataset=root_datasets[i]): # relevant subtree?
1412 while k < len_datasets and datasets[k] < basis_datasets[j]:
1413 k += 1 # move to next datasets[k]
1414 if k == len_datasets or datasets[k] != basis_datasets[j]: # dataset chopped off by schedule or --incl/excl*?
1415 return None # detected filter pruning that is incompatible with 'zfs snapshot -r'
1416 j += 1 # move to next basis_datasets[j]
1417 else:
1418 i += 1 # move to next root_dataset[i]; no need to check root_datasets that are no longer (or not yet) reachable
1419 return root_datasets
1421 @staticmethod
1422 def find_root_datasets(sorted_datasets: list[str]) -> list[str]:
1423 """Returns the roots of the subtrees in the (sorted) input datasets; The output root dataset list is sorted, too; A
1424 dataset is a root dataset if it has no parent, i.e. it is not a descendant of any dataset in the input datasets."""
1425 root_datasets: list[str] = []
1426 skip_dataset: str = DONT_SKIP_DATASET
1427 for dataset in sorted_datasets:
1428 if is_descendant(dataset, of_root_dataset=skip_dataset):
1429 continue
1430 skip_dataset = dataset
1431 root_datasets.append(dataset)
1432 return root_datasets
1434 def find_datasets_to_snapshot(self, sorted_datasets: list[str]) -> dict[SnapshotLabel, list[str]]:
1435 """Given a (sorted) list of source datasets, returns a dict where the key is a snapshot name (aka SnapshotLabel, e.g.
1436 bzfs_2024-11-06_08:30:05_hourly) and the value is the (sorted) (sub)list of datasets for which a snapshot needs to be
1437 created with that name, because these datasets are due per the schedule, either because the 'creation' time of their
1438 most recent snapshot with that name pattern is now too old, or such a snapshot does not even exist.
1440 The baseline implementation uses the 'zfs list -t snapshot' CLI to find the most recent snapshots, which is simple
1441 but doesn't scale well with the number of snapshots, at least if the goal is to take snapshots every second. An
1442 alternative, much more scalable, implementation queries the standard ZFS "snapshots_changed" dataset property
1443 (requires zfs >= 2.2.0), in combination with a local cache that stores this property, as well as the creation time of
1444 the most recent snapshot, for each SnapshotLabel and each dataset.
1445 """
1446 p, log = self.params, self.params.log
1447 src = p.src
1448 config: CreateSrcSnapshotConfig = p.create_src_snapshots_config
1449 datasets_to_snapshot: dict[SnapshotLabel, list[str]] = defaultdict(list)
1450 is_caching: bool = False
1451 interner: HashedInterner[datetime] = HashedInterner() # reduces memory footprint
1452 msgs: list[tuple[datetime, str, SnapshotLabel, str]] = []
1454 def create_snapshot_if_latest_is_too_old(
1455 datasets_to_snapshot: dict[SnapshotLabel, list[str]], dataset: str, label: SnapshotLabel, creation_unixtime: int
1456 ) -> None: # thread-safe
1457 """Schedules creation of a snapshot for the given label if the label's existing latest snapshot is too old."""
1458 creation_dt: datetime = datetime.fromtimestamp(creation_unixtime, tz=config.tz)
1459 log.log(LOG_TRACE, "Latest snapshot creation: %s for %s", creation_dt, label)
1460 duration_amount, duration_unit = config.suffix_durations[label.suffix]
1461 next_event_dt: datetime = config.anchors.round_datetime_up_to_duration_multiple(
1462 creation_dt + timedelta(microseconds=1), duration_amount, duration_unit
1463 )
1464 msg: str = ""
1465 if config.current_datetime >= next_event_dt:
1466 datasets_to_snapshot[label].append(dataset) # mark it as scheduled for snapshot creation
1467 msg = " has passed"
1468 next_event_dt = interner.intern(next_event_dt)
1469 msgs.append((next_event_dt, dataset, label, msg))
1470 if is_caching and not p.dry_run: # update cache with latest state from 'zfs list -t snapshot'
1471 # Per-label cache stores (atime=creation, mtime=snapshots_changed) so later runs can safely trust creation
1472 # only when the label's mtime matches the current dataset-level '=' cache value. Excludes timestamp of label.
1473 cache_file: str = self.cache.last_modified_cache_file(src, dataset, label_hashes[label])
1474 unixtimes: tuple[int, int] = (creation_unixtime, self.src_properties[dataset].snapshots_changed)
1475 set_last_modification_time_safe(cache_file, unixtime_in_secs=unixtimes, if_more_recent=True)
1477 labels: list[SnapshotLabel] = []
1478 config_labels: list[SnapshotLabel] = config.snapshot_labels()
1479 for label in config_labels:
1480 duration_amount_, _duration_unit = config.suffix_durations[label.suffix]
1481 if duration_amount_ == 0 or config.create_src_snapshots_even_if_not_due:
1482 datasets_to_snapshot[label] = sorted_datasets # take snapshot regardless of creation time of existing snaps
1483 else:
1484 labels.append(label)
1485 if len(labels) == 0:
1486 return datasets_to_snapshot # nothing more TBD
1487 label_hashes: dict[SnapshotLabel, str] = {
1488 label: sha256_128_urlsafe_base64(label.notimestamp_str()) for label in labels
1489 }
1491 # satisfy request from local cache as much as possible
1492 cached_datasets_to_snapshot: dict[SnapshotLabel, list[str]] = defaultdict(list)
1493 if is_caching_snapshots(p, src):
1494 sorted_datasets_todo: list[str] = []
1495 time_threshold: float = time.time() - MATURITY_TIME_THRESHOLD_SECS
1496 for dataset in sorted_datasets:
1497 cache: SnapshotCache = self.cache
1498 cached_snapshots_changed: int = cache.get_snapshots_changed(cache.last_modified_cache_file(src, dataset))
1499 if cached_snapshots_changed == 0:
1500 sorted_datasets_todo.append(dataset) # request cannot be answered from cache
1501 continue
1502 if cached_snapshots_changed != self.src_properties[dataset].snapshots_changed: # get that prop "for free"
1503 cache.invalidate_last_modified_cache_dataset(dataset)
1504 sorted_datasets_todo.append(dataset) # request cannot be answered from cache
1505 continue
1506 if cached_snapshots_changed >= time_threshold: # Avoid equal-second races: only trust matured cache entries
1507 sorted_datasets_todo.append(dataset) # cache entry isn't mature enough to be trusted; skip cache
1508 continue
1509 creation_unixtimes: list[int] = []
1510 for label_hash in label_hashes.values():
1511 # For per-label files, atime stores the latest matching snapshot's creation time, while mtime stores
1512 # the dataset-level snapshots_changed observed when this label file was written.
1513 atime, mtime = cache.get_snapshots_changed2(cache.last_modified_cache_file(src, dataset, label_hash))
1514 # Sanity check: trust per-label cache only when:
1515 # - mtime equals the dataset-level '=' cache (same snapshots_changed), and
1516 # - atime is plausible and not later than mtime (creation <= snapshots_changed), and
1517 # - neither atime nor mtime is zero (unknown provenance).
1518 # Otherwise fall back to 'zfs list -t snapshot' to avoid stale creation times after newer changes.
1519 if atime == 0 or mtime == 0 or mtime != cached_snapshots_changed or atime > mtime:
1520 sorted_datasets_todo.append(dataset) # request cannot be answered from cache
1521 break
1522 creation_unixtimes.append(atime)
1523 if len(creation_unixtimes) == len(labels):
1524 for j, label in enumerate(labels):
1525 create_snapshot_if_latest_is_too_old(
1526 cached_datasets_to_snapshot, dataset, label, creation_unixtimes[j]
1527 )
1528 sorted_datasets = sorted_datasets_todo
1530 def create_snapshot_fn(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None:
1531 create_snapshot_if_latest_is_too_old(datasets_to_snapshot, dataset, labels[i], creation_unixtime_secs)
1533 def on_finish_dataset(dataset: str) -> None:
1534 if is_caching_snapshots(p, src) and not p.dry_run:
1535 set_last_modification_time_safe(
1536 self.cache.last_modified_cache_file(src, dataset),
1537 unixtime_in_secs=self.src_properties[dataset].snapshots_changed,
1538 if_more_recent=True,
1539 )
1541 # fallback to 'zfs list -t snapshot' for any remaining datasets, as these couldn't be satisfied from local cache
1542 is_caching = is_caching_snapshots(p, src)
1543 datasets_without_snapshots: list[str] = self.handle_minmax_snapshots(
1544 src, sorted_datasets, labels, fn_latest=create_snapshot_fn, fn_on_finish_dataset=on_finish_dataset
1545 )
1546 for lbl in labels: # merge (sorted) results from local cache + 'zfs list -t snapshot' into (sorted) combined result
1547 datasets_to_snapshot[lbl].sort()
1548 if datasets_without_snapshots or (lbl in cached_datasets_to_snapshot): # +take snaps for snapshot-less datasets
1549 datasets_to_snapshot[lbl] = list( # inputs to merge() are sorted, and outputs are sorted too
1550 heapq.merge(datasets_to_snapshot[lbl], cached_datasets_to_snapshot[lbl], datasets_without_snapshots)
1551 )
1552 for label, datasets in datasets_to_snapshot.items():
1553 assert (not self.is_test_mode) or datasets == sorted(datasets), "List is not sorted"
1554 assert (not self.is_test_mode) or not has_duplicates(datasets), "List contains duplicates"
1555 assert label
1557 msgs.sort() # sort by time, dataset, label
1558 for i in range(0, len(msgs), 10_000): # reduce logging overhead via mini-batching
1559 text = "".join(
1560 f"\nNext scheduled snapshot time: {next_event_dt} for {dataset}@{label}{msg}"
1561 for next_event_dt, dataset, label, msg in msgs[i : i + 10_000]
1562 )
1563 log.info("Next scheduled snapshot times ...%s", text)
1565 # sort keys to ensure that we take snapshots for dailies before hourlies, and so on
1566 label_indexes: dict[SnapshotLabel, int] = {label: k for k, label in enumerate(config_labels)}
1567 datasets_to_snapshot = dict(sorted(datasets_to_snapshot.items(), key=lambda kv: label_indexes[kv[0]]))
1568 return datasets_to_snapshot
1570 def handle_minmax_snapshots(
1571 self,
1572 remote: Remote,
1573 sorted_datasets: list[str],
1574 labels: list[SnapshotLabel],
1575 *,
1576 fn_latest: Callable[[int, int, str, str], None], # callback function for latest snapshot
1577 fn_oldest: Callable[[int, int, str, str], None] | None = None, # callback function for oldest snapshot
1578 fn_oldest_skip_holds: Sequence[bool] = (),
1579 fn_on_finish_dataset: Callable[[str], None] = lambda dataset: None,
1580 ) -> list[str]: # thread-safe
1581 """For each dataset in `sorted_datasets`, for each label in `labels`, finds the latest and oldest snapshot, and runs
1582 the callback functions on them; Ignores the timestamp of the input labels and the timestamp of the snapshot names.
1584 If the (optional) fn_oldest_skip_holds=True for a given label, then snapshots for that label that carry a 'zfs hold'
1585 are skipped (ignored) when finding the oldest snapshot for fn_oldest. This can be useful for monitor_snapshots(),
1586 given that users often intentionally retain holds for longer than the "normal" snapshot retention period, and this
1587 shouldn't necessarily cause monitoring to emit alerts.
1588 """
1589 if fn_oldest is not None:
1590 assert len(labels) == len(fn_oldest_skip_holds)
1591 assert (not self.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted"
1592 no_userrefs: tuple[str, ...] = ("", "-", "0") # ZFS snapshot property userrefs > 0 indicates a zfs hold
1594 def extract_fields(line: str) -> tuple[int, int, str, bool]:
1595 fields: list[str] = line.split("\t")
1596 if len(fields) == 3:
1597 name, createtxg, creation_unixtime_secs = fields
1598 userrefs = ""
1599 else:
1600 name, createtxg, creation_unixtime_secs, userrefs = fields
1601 return (
1602 int(createtxg),
1603 int(creation_unixtime_secs),
1604 name.split("@", 1)[1],
1605 userrefs in no_userrefs,
1606 )
1608 p = self.params
1609 props: str = "name,createtxg,creation"
1610 props = props if fn_oldest is None or not any(fn_oldest_skip_holds) else props + ",userrefs"
1611 cmd = p.split_args(f"{p.zfs_program} list -t snapshot -d 1 -Hp -o {props}") # sorts by dataset,creation
1612 datasets_with_snapshots: set[str] = set()
1613 interner: SortedInterner[str] = SortedInterner(sorted_datasets) # reduces memory footprint
1614 for lines in zfs_list_snapshots_in_parallel(self, remote, cmd, sorted_datasets, ordered=False):
1615 # streaming group by dataset name (consumes constant memory only)
1616 for dataset, group in itertools.groupby(lines, key=lambda line: line.split("\t", 1)[0].split("@", 1)[0]):
1617 dataset = interner.interned(dataset)
1618 snapshots = sorted( # fetch all snapshots of current dataset and sort by createtxg,creation,name
1619 extract_fields(line) for line in group
1620 ) # perf: sorted() is fast because Timsort is close to O(N) for nearly sorted input, which is our case
1621 assert len(snapshots) > 0
1622 datasets_with_snapshots.add(dataset)
1623 snapshot_names: tuple[str, ...] = tuple(snapshot[2] for snapshot in snapshots)
1624 year_with_4_digits_regex: re.Pattern[str] = YEAR_WITH_FOUR_DIGITS_REGEX
1625 year_with_4_digits_regex_fullmatch = year_with_4_digits_regex.fullmatch
1626 startswith = str.startswith
1627 endswith = str.endswith
1628 fns = ((fn_latest, True),) if fn_oldest is None else ((fn_latest, True), (fn_oldest, False))
1629 for i, label in enumerate(labels):
1630 infix: str = label.infix
1631 start: str = label.prefix + infix
1632 end: str = label.suffix
1633 startlen: int = len(start)
1634 endlen: int = len(end)
1635 minlen: int = startlen + endlen if infix else 4 + startlen + endlen # year_with_four_digits_regex
1636 startlen_4: int = startlen + 4 # [startlen:startlen+4] # year_with_four_digits_regex
1637 has_infix: bool = bool(infix)
1638 for fn, is_reverse in fns:
1639 creation_unixtime_secs: int = 0 # find creation time of latest or oldest snapshot matching the label
1640 minmax_snapshot: str = ""
1641 no_skip_holds: bool = is_reverse or not fn_oldest_skip_holds[i]
1642 for j in range(len(snapshot_names) - 1, -1, -1) if is_reverse else range(len(snapshot_names)):
1643 snapshot_name: str = snapshot_names[j]
1644 if (
1645 endswith(snapshot_name, end) # aka snapshot_name.endswith(end)
1646 and startswith(snapshot_name, start) # aka snapshot_name.startswith(start)
1647 and len(snapshot_name) >= minlen
1648 and (has_infix or year_with_4_digits_regex_fullmatch(snapshot_name, startlen, startlen_4))
1649 and (no_skip_holds or snapshots[j][3])
1650 ):
1651 creation_unixtime_secs = snapshots[j][1]
1652 minmax_snapshot = snapshot_name
1653 break
1654 fn(i, creation_unixtime_secs, dataset, minmax_snapshot)
1655 fn_on_finish_dataset(dataset)
1656 datasets_without_snapshots = [dataset for dataset in sorted_datasets if dataset not in datasets_with_snapshots]
1657 return datasets_without_snapshots
1659 @staticmethod
1660 def _cache_hits_msg(hits: int, misses: int) -> str:
1661 total = hits + misses
1662 return f", cache hits: {percent(hits, total, print_total=True)}, misses: {percent(misses, total, print_total=True)}"
1664 def run_ssh_command(
1665 self,
1666 remote: MiniRemote,
1667 loglevel: int = logging.INFO,
1668 is_dry: bool = False,
1669 check: bool = True,
1670 print_stdout: bool = False,
1671 print_stderr: bool = True,
1672 cmd: list[str] | None = None,
1673 retry_on_generic_ssh_error: bool = True,
1674 ) -> str:
1675 """Runs the given CLI cmd via ssh on the given remote, and returns stdout."""
1676 assert cmd is not None and isinstance(cmd, list) and len(cmd) > 0
1677 conn_pool: ConnectionPool = self.params.connection_pools[remote.location].pool(SHARED)
1678 with conn_pool.connection() as conn:
1679 log: logging.Logger = self.params.log
1680 try:
1681 process: subprocess.CompletedProcess[str] = conn.run_ssh_command(
1682 cmd=cmd,
1683 job=self,
1684 loglevel=loglevel,
1685 is_dry=is_dry,
1686 check=check,
1687 stdin=DEVNULL,
1688 stdout=PIPE,
1689 stderr=PIPE,
1690 text=True,
1691 )
1692 except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
1693 xprint(log, stderr_to_str(e.stdout) if print_stdout else e.stdout, run=print_stdout, file=sys.stdout, end="")
1694 xprint(log, stderr_to_str(e.stderr) if print_stderr else e.stderr, run=print_stderr, file=sys.stderr, end="")
1695 if retry_on_generic_ssh_error and isinstance(e, subprocess.CalledProcessError):
1696 stderr: str = stderr_to_str(e.stderr)
1697 if stderr.startswith("ssh: "):
1698 assert e.returncode == 255, e.returncode # error within SSH itself (not during the remote command)
1699 raise RetryableError(display_msg="ssh") from e
1700 raise
1701 else:
1702 if is_dry:
1703 return ""
1704 xprint(log, process.stdout, run=print_stdout, file=sys.stdout, end="")
1705 xprint(log, process.stderr, run=print_stderr, file=sys.stderr, end="")
1706 return process.stdout
1708 def try_ssh_command(
1709 self,
1710 remote: MiniRemote,
1711 loglevel: int,
1712 is_dry: bool = False,
1713 print_stdout: bool = False,
1714 cmd: list[str] | None = None,
1715 exists: bool = True,
1716 error_trigger: str | None = None,
1717 ) -> str | None:
1718 """Convenience method that helps retry/react to a dataset or pool that potentially doesn't exist anymore."""
1719 assert cmd is not None and isinstance(cmd, list) and len(cmd) > 0
1720 log = self.params.log
1721 try:
1722 self.maybe_inject_error(cmd=cmd, error_trigger=error_trigger)
1723 return self.run_ssh_command(remote=remote, loglevel=loglevel, is_dry=is_dry, print_stdout=print_stdout, cmd=cmd)
1724 except (subprocess.CalledProcessError, UnicodeDecodeError) as e:
1725 if not isinstance(e, UnicodeDecodeError):
1726 stderr: str = stderr_to_str(e.stderr)
1727 if exists and (
1728 ": dataset does not exist" in stderr
1729 or ": filesystem does not exist" in stderr # solaris 11.4.0
1730 or ": no such pool" in stderr
1731 or "does not have any resumable receive state to abort" in stderr # harmless `zfs receive -A` race
1732 ):
1733 return None
1734 log.warning("%s", stderr.rstrip())
1735 raise RetryableError("Subprocess failed") from e
1737 def try_ssh_command_with_retries(self, *args: Any, **kwargs: Any) -> str | None:
1738 """Convenience method that auto-retries try_ssh_command() on failure."""
1739 return self._retry_template().call_with_retries(fn=lambda retry: self.try_ssh_command(*args, **kwargs))
1741 def run_ssh_command_with_retries(self, *args: Any, **kwargs: Any) -> str:
1742 """Convenience method that auto-retries run_ssh_command() on transport failure (not on remote command failure)."""
1743 return self._retry_template().call_with_retries(fn=lambda retry: self.run_ssh_command(*args, **kwargs))
1745 def maybe_inject_error(self, cmd: list[str], error_trigger: str | None = None) -> None:
1746 """For testing only; for unit tests to simulate errors during replication and test correct handling of them."""
1747 if error_trigger:
1748 counter = self.error_injection_triggers.get("before")
1749 if counter and self.decrement_injection_counter(counter, error_trigger):
1750 try:
1751 raise CalledProcessError(returncode=1, cmd=" ".join(cmd), stderr=error_trigger + ":dataset is busy")
1752 except subprocess.CalledProcessError as e:
1753 if error_trigger.startswith("retryable_"):
1754 raise RetryableError("Subprocess failed") from e
1755 else:
1756 raise
1758 def decrement_injection_counter(self, counter: Counter[str], trigger: str) -> bool:
1759 """For testing only."""
1760 with self.injection_lock:
1761 if counter[trigger] <= 0:
1762 return False
1763 counter[trigger] -= 1
1764 return True
1767#############################################################################
1768@final
1769class DatasetProperties:
1770 """Properties of a ZFS dataset."""
1772 __slots__ = ("recordsize", "snapshots_changed") # uses more compact memory layout than __dict__
1774 def __init__(self, recordsize: int, snapshots_changed: int) -> None:
1775 # immutable variables:
1776 self.recordsize: Final[int] = recordsize
1778 # mutable variables:
1779 self.snapshots_changed: int = snapshots_changed
1782#############################################################################
1783# Input format is [[user@]host:]dataset
1784# 1234 5 6
1785_DATASET_LOCATOR_REGEX: Final[re.Pattern[str]] = re.compile(r"(((([^@]*)@)?([^:]+)):)?(.*)", flags=re.DOTALL)
1788def parse_dataset_locator(
1789 input_text: str, validate: bool = True, user: str | None = None, host: str | None = None, port: int | None = None
1790) -> tuple[str, str, str, str, str]:
1791 """Splits user@host:dataset into its components with optional checks."""
1793 def convert_ipv6(hostname: str) -> str: # support IPv6 without getting confused by host:dataset colon separator ...
1794 return hostname.replace("|", ":") # ... and any colons that may be part of a (valid) ZFS dataset name
1796 user_undefined: bool = user is None
1797 if user is None:
1798 user = ""
1799 host_undefined: bool = host is None
1800 if host is None:
1801 host = ""
1802 host = convert_ipv6(host)
1803 user_host, dataset, pool = "", "", ""
1805 if match := _DATASET_LOCATOR_REGEX.fullmatch(input_text): 1805 ↛ 1822line 1805 didn't jump to line 1822 because the condition on line 1805 was always true
1806 if user_undefined:
1807 user = match.group(4) or ""
1808 if host_undefined:
1809 host = match.group(5) or ""
1810 host = convert_ipv6(host)
1811 if host == "-":
1812 host = ""
1813 dataset = match.group(6) or ""
1814 i = dataset.find("/")
1815 pool = dataset[0:i] if i >= 0 else dataset
1817 if user and host:
1818 user_host = f"{user}@{host}"
1819 elif host:
1820 user_host = host
1822 if validate:
1823 validate_user_name(user, input_text)
1824 validate_host_name(host, input_text)
1825 if port is not None:
1826 validate_port(port, f"Invalid port number: '{port}' for: '{input_text}' - ")
1827 validate_dataset_name(dataset, input_text)
1829 return user, host, user_host, pool, dataset
1832def validate_user_name(user: str, input_text: str) -> None:
1833 """Checks that the username is safe for ssh or local usage."""
1834 invalid_chars: str = SHELL_CHARS_AND_SLASH
1835 if user and (user.startswith("-") or ".." in user or any(c.isspace() or c in invalid_chars for c in user)):
1836 die(f"Invalid user name: '{user}' for: '{input_text}'")
1839def validate_host_name(host: str, input_text: str) -> None:
1840 """Checks hostname for forbidden characters or patterns."""
1841 invalid_chars: str = SHELL_CHARS_AND_SLASH
1842 if host and (host.startswith("-") or ".." in host or any(c.isspace() or c in invalid_chars for c in host)):
1843 die(f"Invalid host name: '{host}' for: '{input_text}'")
1846def validate_port(port: str | int | None, message: str) -> None:
1847 """Checks that port specification is a valid integer."""
1848 if isinstance(port, int):
1849 port = str(port)
1850 if port and not port.isdigit():
1851 die(message + f"must be empty or a positive integer: '{port}'")
1854def normalize_called_process_error(error: subprocess.CalledProcessError) -> int:
1855 """Normalizes `CalledProcessError.returncode` to avoid reserved exit codes so callers don't misclassify them."""
1856 ret: int = error.returncode
1857 ret = DIE_STATUS if isinstance(ret, int) and 1 <= ret <= STILL_RUNNING_STATUS else ret
1858 return ret
1861#############################################################################
1862if __name__ == "__main__": 1862 ↛ 1863line 1862 didn't jump to line 1863 because the condition on line 1862 was never true
1863 main()