Coverage for bzfs_main / bzfs.py: 99%
1087 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 12:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 12:49 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15# Inline script metadata conforming to https://packaging.python.org/specifications/inline-script-metadata
16# /// script
17# requires-python = ">=3.9"
18# dependencies = []
19# ///
20#
21"""
22* Main CLI entry point for replicating and managing ZFS snapshots. It handles the low-level mechanics of `zfs send/receive`,
23 data transfer, and snapshot management between two hosts.
24* Overview of the bzfs.py codebase:
25* The codebase starts with docs, definition of input data and associated argument parsing into a "Params" class.
26* All CLI option/parameter values are reachable from the "Params" class.
27* Control flow starts in main(), which kicks off a "Job".
28* A Job runs one or more "tasks" via run_tasks(), each task replicating a separate dataset tree.
29* The core replication algorithm is in run_task() and especially in replicate_datasets() and replicate_dataset().
30* The filter algorithms that apply include/exclude policies are in filter_datasets() and filter_snapshots().
31* The --create-src-snapshots-* and --delete-* and --compare-* and --monitor-* algorithms also start in run_task().
32* The main retry logic is in call_with_retries() and clear_resumable_recv_state_if_necessary().
33* Progress reporting for use during `zfs send/recv` data transfers is in class ProgressReporter.
34* Executing a CLI command on a local or remote host is in run_ssh_command().
35* Network connection management is in refresh_ssh_connection_if_necessary() and class ConnectionPool.
36* Cache functionality can be found by searching for this regex: .*cach.*
37* The parallel processing engine is in itr_ssh_cmd_parallel() and process_datasets_in_parallel_and_fault_tolerant().
38* README.md is mostly auto-generated from the ArgumentParser help texts as the source of "truth", via update_readme.sh.
39 Simply run that script whenever you change or add ArgumentParser help text.
40"""
42from __future__ import (
43 annotations,
44)
45import argparse
46import contextlib
47import fcntl
48import heapq
49import itertools
50import logging
51import os
52import re
53import subprocess
54import sys
55import threading
56import time
57from collections import (
58 Counter,
59 defaultdict,
60)
61from collections.abc import (
62 Collection,
63 Sequence,
64)
65from datetime import (
66 datetime,
67 timedelta,
68)
69from logging import (
70 Logger,
71)
72from pathlib import (
73 Path,
74)
75from subprocess import (
76 DEVNULL,
77 PIPE,
78 CalledProcessError,
79 TimeoutExpired,
80)
81from typing import (
82 Any,
83 Callable,
84 Final,
85 cast,
86 final,
87)
89import bzfs_main.loggers
90from bzfs_main.argparse_actions import (
91 has_timerange_filter,
92)
93from bzfs_main.argparse_cli import (
94 EXCLUDE_DATASET_REGEXES_DEFAULT,
95)
96from bzfs_main.compare_snapshot_lists import (
97 run_compare_snapshot_lists,
98)
99from bzfs_main.configuration import (
100 AlertConfig,
101 CreateSrcSnapshotConfig,
102 LogParams,
103 MonitorSnapshotAlert,
104 Params,
105 Remote,
106 SnapshotLabel,
107 resolve_r2r_mode,
108)
109from bzfs_main.detect import (
110 DISABLE_PRG,
111 RemoteConfCacheItem,
112 are_bookmarks_enabled,
113 detect_available_programs,
114 is_caching_snapshots,
115 is_dummy,
116 is_zpool_feature_enabled_or_active,
117)
118from bzfs_main.filter import (
119 SNAPSHOT_REGEX_FILTER_NAME,
120 dataset_regexes,
121 filter_datasets,
122 filter_lines,
123 filter_lines_except,
124 filter_snapshots,
125)
126from bzfs_main.loggers import (
127 get_simple_logger,
128 reset_logger,
129 set_logging_runtime_defaults,
130)
131from bzfs_main.parallel_batch_cmd import (
132 run_ssh_cmd_parallel,
133 zfs_list_snapshots_in_parallel,
134)
135from bzfs_main.progress_reporter import (
136 ProgressReporter,
137 count_num_bytes_transferred_by_zfs_send,
138)
139from bzfs_main.replication import (
140 delete_bookmarks,
141 delete_datasets,
142 delete_snapshots,
143 replicate_dataset,
144)
145from bzfs_main.snapshot_cache import (
146 MATURITY_TIME_THRESHOLD_SECS,
147 MONITOR_CACHE_FILE_PREFIX,
148 REPLICATION_CACHE_FILE_PREFIX,
149 SnapshotCache,
150 set_last_modification_time_safe,
151)
152from bzfs_main.util.connection import (
153 SHARED,
154 ConnectionPool,
155 MiniJob,
156 MiniRemote,
157 timeout,
158)
159from bzfs_main.util.parallel_iterator import (
160 run_in_parallel,
161)
162from bzfs_main.util.parallel_tasktree_policy import (
163 process_datasets_in_parallel_and_fault_tolerant,
164)
165from bzfs_main.util.retry import (
166 Retry,
167 RetryableError,
168 RetryTemplate,
169 RetryTerminationError,
170 RetryTiming,
171)
172from bzfs_main.util.utils import (
173 DESCENDANTS_RE_SUFFIX,
174 DIE_STATUS,
175 DONT_SKIP_DATASET,
176 FILE_PERMISSIONS,
177 LOG_DEBUG,
178 LOG_TRACE,
179 PROG_NAME,
180 SHELL_CHARS_AND_SLASH,
181 UMASK,
182 YEAR_WITH_FOUR_DIGITS_REGEX,
183 HashedInterner,
184 SortedInterner,
185 Subprocesses,
186 SynchronizedBool,
187 SynchronizedDict,
188 TaskTiming,
189 append_if_absent,
190 compile_regexes,
191 cut,
192 die,
193 has_duplicates,
194 human_readable_bytes,
195 human_readable_duration,
196 is_descendant,
197 percent,
198 pretty_print_formatter,
199 replace_in_lines,
200 replace_prefix,
201 sha256_85_urlsafe_base64,
202 sha256_128_urlsafe_base64,
203 stderr_to_str,
204 termination_signal_handler,
205 validate_dataset_name,
206 validate_property_name,
207 xappend,
208 xfinally,
209 xprint,
210)
212# constants:
213__version__: Final[str] = bzfs_main.argparse_cli.__version__
214CRITICAL_STATUS: Final[int] = 2
215WARNING_STATUS: Final[int] = 1
216STILL_RUNNING_STATUS: Final[int] = 4
217MIN_PYTHON_VERSION: Final[tuple[int, int]] = (3, 9)
218if sys.version_info < MIN_PYTHON_VERSION:
219 print(f"ERROR: {PROG_NAME} requires Python version >= {'.'.join(map(str, MIN_PYTHON_VERSION))}!")
220 sys.exit(DIE_STATUS)
223#############################################################################
224def argument_parser() -> argparse.ArgumentParser:
225 """Returns the CLI parser used by bzfs."""
226 return bzfs_main.argparse_cli.argument_parser()
229def main() -> None:
230 """API for command line clients."""
231 prev_umask: int = os.umask(UMASK)
232 try:
233 set_logging_runtime_defaults()
234 # On CTRL-C and SIGTERM, send signal to all descendant processes to terminate them
235 termination_event: threading.Event = threading.Event()
236 with termination_signal_handler(termination_events=[termination_event]):
237 run_main(argument_parser().parse_args(), sys.argv, termination_event=termination_event)
238 except subprocess.CalledProcessError as e:
239 sys.exit(normalize_called_process_error(e))
240 finally:
241 os.umask(prev_umask) # restore prior global state
244def run_main(
245 args: argparse.Namespace,
246 sys_argv: list[str] | None = None,
247 log: Logger | None = None,
248 termination_event: threading.Event | None = None,
249) -> None:
250 """API for Python clients; visible for testing; may become a public API eventually."""
251 Job(termination_event=termination_event).run_main(args, sys_argv, log)
254#############################################################################
255@final
256class Job(MiniJob):
257 """Executes one bzfs run, coordinating snapshot replication tasks."""
259 def __init__(self, termination_event: threading.Event | None = None) -> None:
260 self.params: Params
261 self.termination_event: Final[threading.Event] = termination_event or threading.Event()
262 self.retry_timing: Final[RetryTiming] = RetryTiming.make_from(self.termination_event).copy(
263 on_before_attempt=lambda retry: None
264 )
265 self.task_timing: Final[TaskTiming] = TaskTiming.make_from(self.termination_event)
266 self.subprocesses: Subprocesses = Subprocesses(self.termination_event.is_set)
267 self.all_dst_dataset_exists: Final[dict[str, dict[str, bool]]] = defaultdict(lambda: defaultdict(bool))
268 self.dst_dataset_exists: SynchronizedDict[str, bool] = SynchronizedDict({})
269 self.src_properties: dict[str, DatasetProperties] = {}
270 self.dst_properties: dict[str, DatasetProperties] = {}
271 self.all_exceptions: list[str] = []
272 self.all_exceptions_count: int = 0
273 self.max_exceptions_to_summarize: int = 10000
274 self.first_exception: BaseException | None = None
275 self.remote_conf_cache: dict[tuple, RemoteConfCacheItem] = {}
276 self.max_datasets_per_minibatch_on_list_snaps: dict[str, int] = {}
277 self.max_workers: dict[str, int] = {}
278 self.progress_reporter: ProgressReporter = cast(ProgressReporter, None)
279 self.is_first_replication_task: Final[SynchronizedBool] = SynchronizedBool(True)
280 self.replication_start_time_nanos: int = time.monotonic_ns()
281 self.timeout_nanos: int | None = None # timestamp aka instant in time
282 self.timeout_duration_nanos: int | None = None # duration (not a timestamp); for logging only
283 self.cache: SnapshotCache = SnapshotCache(self)
284 self.stats_lock: Final[threading.Lock] = threading.Lock()
285 self.num_cache_hits: int = 0
286 self.num_cache_misses: int = 0
287 self.num_snapshots_found: int = 0
288 self.num_snapshots_replicated: int = 0
290 self.is_test_mode: bool = False # for testing only
291 self.creation_prefix: str = "" # for testing only
292 self.use_select: bool = False # for testing only
293 self.progress_update_intervals: tuple[float, float] | None = None # for testing only
294 self.error_injection_triggers: dict[str, Counter[str]] = {} # for testing only
295 self.delete_injection_triggers: dict[str, Counter[str]] = {} # for testing only
296 self.param_injection_triggers: dict[str, dict[str, bool]] = {} # for testing only
297 self.inject_params: dict[str, bool] = {} # for testing only
298 self.injection_lock: threading.Lock = threading.Lock() # for testing only
299 self.max_command_line_bytes: int | None = None # for testing only
301 def shutdown(self) -> None:
302 """Exits any multiplexed ssh sessions that may be leftover."""
303 cache_items: Collection[RemoteConfCacheItem] = self.remote_conf_cache.values()
304 for i, cache_item in enumerate(cache_items):
305 cache_item.connection_pools.shutdown(f"{i + 1}/{len(cache_items)}")
307 def terminate(self) -> None:
308 """Shuts down gracefully; also terminates descendant processes, if any."""
309 with xfinally(self.subprocesses.terminate_process_subtrees):
310 self.shutdown()
312 def _retry_template(self) -> RetryTemplate:
313 p = self.params
314 return RetryTemplate(policy=p.retry_policy.copy(timing=self.retry_timing), log=p.log)
316 def run_main(self, args: argparse.Namespace, sys_argv: list[str] | None = None, log: Logger | None = None) -> None:
317 """Parses CLI arguments, sets up logging, and executes main job loop."""
318 assert isinstance(self.error_injection_triggers, dict)
319 assert isinstance(self.delete_injection_triggers, dict)
320 assert isinstance(self.inject_params, dict)
321 logger_name_suffix: str = ""
323 def _reset_logger() -> None:
324 if logger_name_suffix and log is not None: # reset Logger unless it's a Logger outside of our control
325 reset_logger(log)
327 with xfinally(_reset_logger): # runs _reset_logger() on exit, without masking error raised in body of `with` block
328 try:
329 log_params: LogParams = LogParams(args)
330 logger_name_suffix = "" if log is not None else log_params.logger_name_suffix
331 log = bzfs_main.loggers.get_logger(
332 log_params=log_params, args=args, log=log, logger_name_suffix=logger_name_suffix
333 )
334 log.info("%s", f"Log file is: {log_params.log_file}")
335 except BaseException as e:
336 simple_log: Logger = get_simple_logger(PROG_NAME, logger_name_suffix=logger_name_suffix)
337 try:
338 simple_log.error("Log init: %s", e, exc_info=not isinstance(e, SystemExit))
339 finally:
340 reset_logger(simple_log)
341 raise
343 aux_args: list[str] = []
344 if getattr(args, "include_snapshot_plan", None):
345 aux_args += args.include_snapshot_plan
346 if getattr(args, "delete_dst_snapshots_except_plan", None):
347 aux_args += args.delete_dst_snapshots_except_plan
348 if len(aux_args) > 0:
349 log.info("Auxiliary CLI arguments: %s", " ".join(aux_args))
350 args = argument_parser().parse_args(xappend(aux_args, "--", args.root_dataset_pairs), namespace=args)
352 def log_error_on_exit(error: Any, status_code: Any, exc_info: bool = False) -> None:
353 log.error("%s%s", f"Exiting {PROG_NAME} with status code {status_code}. Cause: ", error, exc_info=exc_info)
355 try:
356 log.info("CLI arguments: %s %s", " ".join(sys_argv or []), f"[uid: {os.getuid()}, euid: {os.geteuid()}]")
357 if self.is_test_mode:
358 log.log(LOG_TRACE, "Parsed CLI arguments: %s", args)
359 self.params = p = Params(args, sys_argv or [], log_params, log, self.inject_params)
360 self.timeout_duration_nanos = p.timeout_duration_nanos
361 lock_file: str = p.lock_file_name()
362 lock_fd = os.open(
363 lock_file, os.O_WRONLY | os.O_TRUNC | os.O_CREAT | os.O_NOFOLLOW | os.O_CLOEXEC, FILE_PERMISSIONS
364 )
365 with xfinally(lambda: os.close(lock_fd)):
366 try:
367 # Acquire an exclusive lock; will raise a BlockingIOError if lock is already held by this process or
368 # another process. The (advisory) lock is auto-released when the process terminates or the fd is
369 # closed.
370 fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # LOCK_NB ... non-blocking
371 except BlockingIOError:
372 msg = "Exiting as same previous periodic job is still running without completion yet per "
373 die(msg + lock_file, STILL_RUNNING_STATUS)
375 # xfinally: unlink the lock_file while still holding the flock on its fd - it's a correct and safe
376 # standard POSIX pattern:
377 # - Performing unlink() before close(fd) avoids a race where a subsequent bzfs process could recreate and
378 # lock a fresh inode for the same path between our close() and a later unlink(). In that case, a late
379 # unlink would delete the newer process's lock_file path.
380 # - At this point, critical work is complete; the remaining steps are shutdown mechanics that have no
381 # side effect, so this pattern is correct, safe, and simple.
382 with xfinally(lambda: Path(lock_file).unlink(missing_ok=True)): # don't accumulate stale files
383 try:
384 self.run_tasks() # do the real work
385 except BaseException:
386 self.terminate()
387 raise
388 self.shutdown()
389 with contextlib.suppress(BrokenPipeError):
390 sys.stderr.flush()
391 sys.stdout.flush()
392 except subprocess.CalledProcessError as e:
393 log_error_on_exit(e, e.returncode)
394 raise
395 except SystemExit as e:
396 log_error_on_exit(e, e.code)
397 raise
398 except (subprocess.TimeoutExpired, UnicodeDecodeError) as e:
399 log_error_on_exit(e, DIE_STATUS)
400 raise SystemExit(DIE_STATUS) from e
401 except re.error as e:
402 log_error_on_exit(f"{e} within regex {e.pattern!r}", DIE_STATUS)
403 raise SystemExit(DIE_STATUS) from e
404 except BaseException as e:
405 log_error_on_exit(e, DIE_STATUS, exc_info=True)
406 raise SystemExit(DIE_STATUS) from e
407 finally:
408 log.info("%s", f"Log file was: {log_params.log_file}")
409 log.info("Success. Goodbye!")
410 with contextlib.suppress(BrokenPipeError):
411 sys.stderr.flush()
412 sys.stdout.flush()
414 def run_tasks(self) -> None:
415 """Executes replication cycles, repeating until daemon lifetime expires."""
416 p, log = self.params, self.params.log
417 self.all_exceptions = []
418 self.all_exceptions_count = 0
419 self.first_exception = None
420 self.remote_conf_cache = {}
421 self.validate_once()
422 self.replication_start_time_nanos = time.monotonic_ns()
423 self.progress_reporter = ProgressReporter(log, p.pv_program_opts, self.use_select, self.progress_update_intervals)
424 with xfinally(lambda: self.progress_reporter.stop()):
425 daemon_stoptime_nanos: int = time.monotonic_ns() + p.daemon_lifetime_nanos
426 while True: # loop for daemon mode
427 self.timeout_nanos = (
428 None if p.timeout_duration_nanos is None else time.monotonic_ns() + p.timeout_duration_nanos
429 )
430 self.all_dst_dataset_exists.clear()
431 self.progress_reporter.reset()
432 src, dst = p.src, p.dst
433 for src_root_dataset, dst_root_dataset in p.root_dataset_pairs:
434 if self.termination_event.is_set():
435 self.terminate()
436 break
437 src.root_dataset = src.basis_root_dataset = src_root_dataset
438 dst.root_dataset = dst.basis_root_dataset = dst_root_dataset
439 p.curr_zfs_send_program_opts = p.zfs_send_program_opts.copy()
440 if p.daemon_lifetime_nanos > 0:
441 self.timeout_nanos = (
442 None if p.timeout_duration_nanos is None else time.monotonic_ns() + p.timeout_duration_nanos
443 )
444 recurs_sep = " " if p.recursive_flag else ""
445 task_description = f"{src.basis_root_dataset} {p.recursive_flag}{recurs_sep}--> {dst.basis_root_dataset}"
446 if len(p.root_dataset_pairs) > 1:
447 log.info("Starting task: %s", task_description + " ...")
448 try:
449 try:
450 self.maybe_inject_error(cmd=[], error_trigger="retryable_run_tasks")
451 timeout(self)
452 self.validate_task()
453 self.run_task() # do the real work
454 except RetryableError as retryable_error:
455 cause: BaseException | None = retryable_error.__cause__
456 assert cause is not None
457 raise cause.with_traceback(cause.__traceback__) # noqa: B904 re-raise of cause without chaining
458 except (CalledProcessError, TimeoutExpired, SystemExit, UnicodeDecodeError, RetryTerminationError) as e:
459 if p.skip_on_error == "fail" or (
460 isinstance(e, subprocess.TimeoutExpired) and p.daemon_lifetime_nanos == 0
461 ):
462 raise
463 log.error("%s", e)
464 self.append_exception(e, "task", task_description)
465 if not self.sleep_until_next_daemon_iteration(daemon_stoptime_nanos):
466 break
467 if not p.skip_replication:
468 self.print_replication_stats(self.replication_start_time_nanos)
469 error_count = self.all_exceptions_count
470 if error_count > 0 and p.daemon_lifetime_nanos == 0:
471 msgs = "\n".join(f"{i + 1}/{error_count}: {e}" for i, e in enumerate(self.all_exceptions))
472 log.error("%s", f"Tolerated {error_count} errors. Error Summary: \n{msgs}")
473 assert self.first_exception is not None
474 raise self.first_exception
476 def append_exception(self, e: BaseException, task_name: str, task_description: str) -> None:
477 """Records and logs an exception that was encountered while running a subtask."""
478 self.first_exception = self.first_exception or e
479 if len(self.all_exceptions) < self.max_exceptions_to_summarize: # cap max memory consumption
480 self.all_exceptions.append(str(e))
481 self.all_exceptions_count += 1
482 self.params.log.error(f"#{self.all_exceptions_count}: Done with %s: %s", task_name, task_description)
484 def sleep_until_next_daemon_iteration(self, daemon_stoptime_nanos: int) -> bool:
485 """Pauses until next scheduled snapshot time or daemon stop; Returns True to continue daemon loop; False to stop."""
486 sleep_nanos: int = daemon_stoptime_nanos - time.monotonic_ns()
487 if sleep_nanos <= 0:
488 return False
489 self.progress_reporter.pause()
490 p, log = self.params, self.params.log
491 config: CreateSrcSnapshotConfig = p.create_src_snapshots_config
492 curr_datetime: datetime = config.current_datetime + timedelta(microseconds=1)
493 next_snapshotting_event_dt: datetime = min(
494 (
495 config.anchors.round_datetime_up_to_duration_multiple(curr_datetime, duration_amount, duration_unit)
496 for duration_amount, duration_unit in config.suffix_durations.values()
497 ),
498 default=curr_datetime + timedelta(days=10 * 365), # infinity
499 )
500 offset: timedelta = next_snapshotting_event_dt - datetime.now(config.tz)
501 offset_nanos: int = (offset.days * 86400 + offset.seconds) * 1_000_000_000 + offset.microseconds * 1_000
502 sleep_nanos = min(sleep_nanos, max(0, offset_nanos))
503 log.info("Daemon sleeping for: %s%s", human_readable_duration(sleep_nanos), f" ... [Log {p.log_params.log_file}]")
504 self.termination_event.wait(sleep_nanos / 1_000_000_000) # allow early wakeup on async termination
505 config.current_datetime = datetime.now(config.tz)
506 return time.monotonic_ns() < daemon_stoptime_nanos and not self.termination_event.is_set()
508 def print_replication_stats(self, start_time_nanos: int) -> None:
509 """Logs overall replication statistics after a job cycle completes."""
510 p, log = self.params, self.params.log
511 elapsed_nanos: int = time.monotonic_ns() - start_time_nanos
512 msg = p.dry(f"zfs sent {self.num_snapshots_replicated} snapshots in {human_readable_duration(elapsed_nanos)}.")
513 if p.is_program_available("pv", "local"):
514 sent_bytes: int = count_num_bytes_transferred_by_zfs_send(p.log_params.pv_log_file)
515 sent_bytes_per_sec: int = round(1_000_000_000 * sent_bytes / (elapsed_nanos or 1))
516 msg += f" zfs sent {human_readable_bytes(sent_bytes)} [{human_readable_bytes(sent_bytes_per_sec)}/s]."
517 log.info("%s", msg.ljust(p.log_params.terminal_columns - len("2024-01-01 23:58:45 [I] ")))
519 def validate_once(self) -> None:
520 """Validates CLI parameters and compiles regex lists one time only, which will later be reused many times."""
521 p = self.params
522 p.zfs_recv_ox_names = self.recv_option_property_names(p.zfs_recv_program_opts)
523 for snapshot_filter in p.snapshot_filters:
524 for _filter in snapshot_filter:
525 if _filter.name == SNAPSHOT_REGEX_FILTER_NAME:
526 exclude_snapshot_regexes_strings, include_snapshot_regexes_strings = cast(
527 tuple[list[str], list[str]], _filter.options
528 )
529 exclude_snapshot_regexes = compile_regexes(exclude_snapshot_regexes_strings)
530 include_snapshot_regexes = compile_regexes(include_snapshot_regexes_strings or [".*"])
531 _filter.options = (exclude_snapshot_regexes, include_snapshot_regexes)
533 exclude_regexes: list[str] = [EXCLUDE_DATASET_REGEXES_DEFAULT]
534 if len(p.args.exclude_dataset_regex) > 0: # some patterns don't exclude anything
535 exclude_regexes = [regex for regex in p.args.exclude_dataset_regex if regex != "" and regex != "!.*"]
536 include_regexes: list[str] = p.args.include_dataset_regex
538 # relative datasets need not be compiled more than once as they don't change between tasks
539 def separate_abs_vs_rel_datasets(datasets: list[str]) -> tuple[list[str], list[str]]:
540 abs_datasets: list[str] = []
541 rel_datasets: list[str] = []
542 for dataset in datasets:
543 (abs_datasets if dataset.startswith("/") else rel_datasets).append(dataset)
544 return abs_datasets, rel_datasets
546 p.abs_exclude_datasets, rel_exclude_datasets = separate_abs_vs_rel_datasets(p.args.exclude_dataset)
547 p.abs_include_datasets, rel_include_datasets = separate_abs_vs_rel_datasets(p.args.include_dataset)
548 suffix = DESCENDANTS_RE_SUFFIX
549 p.tmp_exclude_dataset_regexes, p.tmp_include_dataset_regexes = (
550 compile_regexes(exclude_regexes + dataset_regexes(p.src, p.dst, rel_exclude_datasets), suffix=suffix),
551 compile_regexes(include_regexes + dataset_regexes(p.src, p.dst, rel_include_datasets), suffix=suffix),
552 )
554 if p.pv_program != DISABLE_PRG:
555 pv_program_opts_set = set(p.pv_program_opts)
556 if pv_program_opts_set.isdisjoint({"--bytes", "-b", "--bits", "-8"}):
557 die("--pv-program-opts must contain one of --bytes or --bits for progress metrics to function.")
558 if not p.log_params.quiet:
559 for opts in [["--eta", "-e"], ["--fineta", "-I"], ["--average-rate", "-a"]]:
560 if pv_program_opts_set.isdisjoint(opts):
561 die(f"--pv-program-opts must contain one of {', '.join(opts)} for progress report line to function.")
563 src, dst = p.src, p.dst
564 for remote in [src, dst]:
565 r, loc = remote, remote.location
566 validate_user_name(r.basis_ssh_user, f"--ssh-{loc}-user")
567 validate_host_name(r.basis_ssh_host, f"--ssh-{loc}-host")
568 validate_port(r.ssh_port, f"--ssh-{loc}-port ")
570 def validate_task(self) -> None:
571 """Validates a single replication task before execution."""
572 p, log = self.params, self.params.log
573 src, dst = p.src, p.dst
574 for remote in [src, dst]:
575 r = remote
576 r.ssh_user, r.ssh_host, r.ssh_user_host, r.pool, r.root_dataset = parse_dataset_locator(
577 r.basis_root_dataset, user=r.basis_ssh_user, host=r.basis_ssh_host, port=r.ssh_port
578 )
579 r.sudo, r.use_zfs_delegation = self.sudo_cmd(r.ssh_user_host, r.ssh_user)
580 local_addrs = ("",) if self.is_test_mode else ("", "127.0.0.1", "::1") # ::1 is IPv6 version of loopback address
581 remote.is_nonlocal = r.ssh_host not in local_addrs
582 self.dst_dataset_exists = SynchronizedDict(self.all_dst_dataset_exists[dst.ssh_user_host])
584 if src.ssh_host == dst.ssh_host:
585 msg = f"src: {src.basis_root_dataset}, dst: {dst.basis_root_dataset}"
586 if src.root_dataset == dst.root_dataset:
587 die(f"Source and destination dataset must not be the same! {msg}")
588 if p.recursive and (
589 is_descendant(src.root_dataset, of_root_dataset=dst.root_dataset)
590 or is_descendant(dst.root_dataset, of_root_dataset=src.root_dataset)
591 ):
592 die(f"Source and destination dataset trees must not overlap! {msg}")
594 suffx: str = DESCENDANTS_RE_SUFFIX # also match descendants of a matching dataset
595 p.exclude_dataset_regexes, p.include_dataset_regexes = (
596 p.tmp_exclude_dataset_regexes + compile_regexes(dataset_regexes(src, dst, p.abs_exclude_datasets), suffix=suffx),
597 p.tmp_include_dataset_regexes + compile_regexes(dataset_regexes(src, dst, p.abs_include_datasets), suffix=suffx),
598 )
599 if len(p.include_dataset_regexes) == 0:
600 p.include_dataset_regexes = [(re.compile(r".*"), False)]
602 detect_available_programs(self)
603 p.r2r_mode = resolve_r2r_mode(p)
605 if is_zpool_feature_enabled_or_active(p, dst, "feature@large_blocks"):
606 append_if_absent(p.curr_zfs_send_program_opts, "--large-block")
608 self.max_workers = {}
609 self.max_datasets_per_minibatch_on_list_snaps = {}
610 for r in [src, dst]:
611 cpus: int = int(p.available_programs[r.location].get("getconf_cpu_count", 8))
612 threads, is_percent = p.threads
613 cpus = max(1, round(cpus * threads / 100.0) if is_percent else round(threads))
614 self.max_workers[r.location] = cpus
615 bs: int = max(1, p.max_datasets_per_batch_on_list_snaps) # 1024 by default
616 max_datasets_per_minibatch: int = p.max_datasets_per_minibatch_on_list_snaps
617 if max_datasets_per_minibatch <= 0:
618 max_datasets_per_minibatch = max(1, bs // cpus)
619 max_datasets_per_minibatch = min(bs, max_datasets_per_minibatch)
620 self.max_datasets_per_minibatch_on_list_snaps[r.location] = max_datasets_per_minibatch
621 log.log(
622 LOG_TRACE,
623 "%s",
624 f"max_datasets_per_batch_on_list_snaps: {p.max_datasets_per_batch_on_list_snaps}, "
625 f"max_datasets_per_minibatch_on_list_snaps: {max_datasets_per_minibatch}, "
626 f"max_workers: {self.max_workers[r.location]}, "
627 f"location: {r.location}",
628 )
629 if self.is_test_mode:
630 log.log(LOG_TRACE, "Validated Param values: %s", pretty_print_formatter(self.params))
632 def sudo_cmd(self, ssh_user_host: str, ssh_user: str) -> tuple[str, bool]:
633 """Returns sudo command prefix and whether root privileges are required."""
634 p: Params = self.params
635 assert isinstance(ssh_user_host, str)
636 assert isinstance(ssh_user, str)
637 assert isinstance(p.sudo_program, str)
638 assert isinstance(p.enable_privilege_elevation, bool)
640 is_root: bool = True
641 if ssh_user_host != "":
642 if ssh_user == "":
643 if os.getuid() != 0:
644 is_root = False
645 elif ssh_user != "root":
646 is_root = False
647 elif os.getuid() != 0:
648 is_root = False
650 if is_root:
651 sudo = "" # using sudo in an attempt to make ZFS operations work even if we are not root user?
652 use_zfs_delegation = False # or instead using 'zfs allow' delegation?
653 return sudo, use_zfs_delegation
654 elif p.enable_privilege_elevation:
655 if p.sudo_program == DISABLE_PRG:
656 die(f"sudo CLI is not available on host: {ssh_user_host or 'localhost'}")
657 # The '-n' option makes 'sudo' safer and more fail-fast. It avoids having sudo prompt the user for input of any
658 # kind. If a password is required for the sudo command to run, sudo will display an error message and exit.
659 return p.sudo_program + " -n", False
660 else:
661 return "", True
663 def run_task(self) -> None:
664 """Replicates all snapshots for the current root dataset pair."""
666 def filter_src_datasets() -> list[str]: # apply --{include|exclude}-dataset policy
667 return filter_datasets(self, src, basis_src_datasets) if src_datasets is None else src_datasets
669 p, log = self.params, self.params.log
670 src, dst = p.src, p.dst
671 max_workers: int = min(self.max_workers[src.location], self.max_workers[dst.location])
672 recursive_sep: str = " " if p.recursive_flag else ""
673 task_description: str = f"{src.basis_root_dataset} {p.recursive_flag}{recursive_sep}--> {dst.basis_root_dataset} ..."
674 failed: bool = False
675 src_datasets: list[str] | None = None
676 basis_src_datasets: list[str] = []
677 self.src_properties = {}
678 self.dst_properties = {}
679 if not is_dummy(src): # find src dataset or all datasets in src dataset tree (with --recursive)
680 basis_src_datasets = self.list_src_datasets_task()
682 if not p.create_src_snapshots_config.skip_create_src_snapshots:
683 log.info(p.dry("--create-src-snapshots: %s"), f"{src.basis_root_dataset} {p.recursive_flag}{recursive_sep}...")
684 src_datasets = filter_src_datasets() # apply include/exclude policy
685 self.create_src_snapshots_task(basis_src_datasets, src_datasets)
687 # Optionally, replicate src.root_dataset (optionally including its descendants) to dst.root_dataset
688 if not p.skip_replication:
689 if len(basis_src_datasets) == 0:
690 die(f"Replication: Source dataset does not exist: {src.basis_root_dataset}")
691 if is_dummy(dst):
692 die("Replication: Destination may be a dummy dataset only if exclusively creating snapshots on the source!")
693 src_datasets = filter_src_datasets() # apply include/exclude policy
694 failed = self.replicate_datasets(src_datasets, task_description, max_workers)
696 if failed or not (
697 p.delete_dst_datasets
698 or p.delete_dst_snapshots
699 or p.delete_empty_dst_datasets
700 or p.compare_snapshot_lists
701 or p.monitor_snapshots_config.enable_monitor_snapshots
702 ):
703 return
704 log.info("Listing dst datasets: %s", task_description)
705 if is_dummy(dst):
706 die("Destination may be a dummy dataset only if exclusively creating snapshots on the source!")
707 basis_dst_datasets: list[str] = self.list_dst_datasets_task()
708 dst_datasets: list[str] = filter_datasets(self, dst, basis_dst_datasets) # apply include/exclude policy
710 if p.delete_dst_datasets and not failed:
711 log.info(p.dry("--delete-dst-datasets: %s"), task_description)
712 basis_dst_datasets, dst_datasets = self.delete_dst_datasets_task(
713 basis_src_datasets, basis_dst_datasets, dst_datasets
714 )
716 if p.delete_dst_snapshots and not failed:
717 log.info(p.dry("--delete-dst-snapshots: %s"), task_description + f" [{len(dst_datasets)} datasets]")
718 failed = self.delete_destination_snapshots_task(basis_src_datasets, dst_datasets, max_workers, task_description)
720 if p.delete_empty_dst_datasets and p.recursive and not failed:
721 log.info(p.dry("--delete-empty-dst-datasets: %s"), task_description)
722 basis_dst_datasets, dst_datasets = self.delete_empty_dst_datasets_task(basis_dst_datasets, dst_datasets)
724 if p.compare_snapshot_lists and not failed:
725 log.info("--compare-snapshot-lists: %s", task_description)
726 if len(basis_src_datasets) == 0 and not is_dummy(src):
727 die(f"Source dataset does not exist: {src.basis_root_dataset}")
728 src_datasets = filter_src_datasets() # apply include/exclude policy
729 run_compare_snapshot_lists(self, src_datasets, dst_datasets)
731 if p.monitor_snapshots_config.enable_monitor_snapshots and not failed:
732 log.info("--monitor-snapshots: %s", task_description)
733 src_datasets = filter_src_datasets() # apply include/exclude policy
734 self.monitor_snapshots_task(src_datasets, dst_datasets, task_description)
736 def list_src_datasets_task(self) -> list[str]:
737 """Lists datasets on the source host."""
738 p = self.params
739 src = p.src
740 basis_src_datasets: list[str] = []
741 is_caching: bool = is_caching_snapshots(p, src)
742 props: str = "volblocksize,recordsize,name"
743 props = "snapshots_changed," + props if is_caching else props
744 cmd: list[str] = p.split_args(
745 f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o {props} {p.recursive_flag}", src.root_dataset
746 )
747 for line in (self.try_ssh_command_with_retries(src, LOG_DEBUG, cmd=cmd) or "").splitlines():
748 cols: list[str] = line.split("\t")
749 snapshots_changed, volblocksize, recordsize, src_dataset = cols if is_caching else ["-"] + cols
750 self.src_properties[src_dataset] = DatasetProperties(
751 recordsize=int(recordsize) if recordsize != "-" else -int(volblocksize),
752 snapshots_changed=int(snapshots_changed) if snapshots_changed and snapshots_changed != "-" else 0,
753 )
754 basis_src_datasets.append(src_dataset)
755 assert (not self.is_test_mode) or basis_src_datasets == sorted(basis_src_datasets), "List is not sorted"
756 return basis_src_datasets
758 def list_dst_datasets_task(self) -> list[str]:
759 """Lists datasets on the destination host."""
760 p, log = self.params, self.params.log
761 dst = p.dst
762 is_caching: bool = is_caching_snapshots(p, dst) and p.monitor_snapshots_config.enable_monitor_snapshots
763 props: str = "name"
764 props = "snapshots_changed," + props if is_caching else props
765 cmd: list[str] = p.split_args(
766 f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o {props} {p.recursive_flag}", dst.root_dataset
767 )
768 basis_dst_datasets: list[str] = []
769 basis_dst_datasets_str: str | None = self.try_ssh_command_with_retries(dst, LOG_TRACE, cmd=cmd)
770 if basis_dst_datasets_str is None:
771 log.warning("Destination dataset does not exist: %s", dst.root_dataset)
772 else:
773 for line in basis_dst_datasets_str.splitlines():
774 cols: list[str] = line.split("\t")
775 snapshots_changed, dst_dataset = cols if is_caching else ["-"] + cols
776 self.dst_properties[dst_dataset] = DatasetProperties(
777 recordsize=0,
778 snapshots_changed=int(snapshots_changed) if snapshots_changed and snapshots_changed != "-" else 0,
779 )
780 basis_dst_datasets.append(dst_dataset)
781 assert (not self.is_test_mode) or basis_dst_datasets == sorted(basis_dst_datasets), "List is not sorted"
782 return basis_dst_datasets
784 def create_src_snapshots_task(self, basis_src_datasets: list[str], src_datasets: list[str]) -> None:
785 """Atomically creates a new snapshot of the src datasets selected by --{include|exclude}-dataset* policy; implements
786 --create-src-snapshots.
788 The implementation attempts to fit as many datasets as possible into a single (atomic) 'zfs snapshot' command line,
789 using lexicographical sort order, and using 'zfs snapshot -r' to the extent that this is compatible with the
790 --{include|exclude}-dataset* pruning policy. The snapshots of all datasets that fit within the same single 'zfs
791 snapshot' CLI invocation will be taken within the same ZFS transaction group, and correspondingly have identical
792 'createtxg' ZFS property (but not necessarily identical 'creation' ZFS time property as ZFS actually provides no such
793 guarantee), and thus be consistent. Dataset names that can't fit into a single command line are spread over multiple
794 command line invocations, respecting the limits that the operating system places on the maximum length of a single
795 command line, per `getconf ARG_MAX`.
797 Time complexity is O((N log N) + (N * M log M)) where N is the number of datasets and M is the number of snapshots
798 per dataset. Space complexity is O(max(N, M)).
799 """
800 p, log = self.params, self.params.log
801 src = p.src
802 if len(basis_src_datasets) == 0:
803 die(f"Source dataset does not exist: {src.basis_root_dataset}")
804 datasets_to_snapshot: dict[SnapshotLabel, list[str]] = self.find_datasets_to_snapshot(src_datasets)
805 datasets_to_snapshot = {label: datasets for label, datasets in datasets_to_snapshot.items() if len(datasets) > 0}
806 basis_datasets_to_snapshot: dict[SnapshotLabel, list[str]] = datasets_to_snapshot.copy() # shallow copy
807 commands: dict[SnapshotLabel, list[str]] = {}
808 for label, datasets in datasets_to_snapshot.items():
809 cmd: list[str] = p.split_args(f"{src.sudo} {p.zfs_program} snapshot")
810 if p.recursive:
811 # Run 'zfs snapshot -r' on the roots of subtrees if possible, else fallback to non-recursive CLI flavor
812 root_datasets = self.root_datasets_if_recursive_zfs_snapshot_is_possible(datasets, basis_src_datasets)
813 if root_datasets is not None:
814 cmd.append("-r") # recursive; takes a snapshot of all datasets in the subtree(s)
815 datasets_to_snapshot[label] = root_datasets
816 commands[label] = cmd
817 creation_msg = f"Creating {sum(len(datasets) for datasets in basis_datasets_to_snapshot.values())} snapshots"
818 log.info(p.dry("--create-src-snapshots: %s"), f"{creation_msg} within {len(src_datasets)} datasets ...")
819 # create snapshots in large (parallel) batches, without using a command line that's too big for the OS to handle
820 run_ssh_cmd_parallel(
821 self,
822 src,
823 ((commands[lbl], (f"{ds}@{lbl}" for ds in datasets)) for lbl, datasets in datasets_to_snapshot.items()),
824 fn=lambda cmd, batch: self.run_ssh_command_with_retries(
825 src, is_dry=p.dry_run, print_stdout=True, cmd=cmd + batch, retry_on_generic_ssh_error=False
826 ), # retry_on_generic_ssh_error=False means only retry on SSH connect errors b/c `zfs snapshot` isn't idempotent
827 max_batch_items=2**29,
828 )
829 if is_caching_snapshots(p, src):
830 # perf: copy lastmodified time of source dataset into local cache to reduce future 'zfs list -t snapshot' calls
831 self.cache.update_last_modified_cache(basis_datasets_to_snapshot)
833 def delete_destination_snapshots_task(
834 self, basis_src_datasets: list[str], dst_datasets: list[str], max_workers: int, task_description: str
835 ) -> bool:
836 """Deletes existing destination snapshots that do not exist within the source dataset if they are included by the
837 --{include|exclude}-snapshot-* policy, and the destination dataset is included via --{include|exclude}-dataset*
838 policy; implements --delete-dst-snapshots. Does not attempt to delete snapshots that carry a `zfs hold`."""
839 p, log = self.params, self.params.log
840 src, dst = p.src, p.dst
841 kind: str = "bookmark" if p.delete_dst_bookmarks else "snapshot"
842 filter_needs_creation_time: bool = has_timerange_filter(p.snapshot_filters)
843 props: str = "guid,name,userrefs"
844 props = self.creation_prefix + "creation," + props if filter_needs_creation_time else props
845 basis_src_datasets_set: set[str] = set(basis_src_datasets)
846 num_snapshots_found, num_snapshots_deleted = 0, 0
848 def delete_destination_snapshots(dst_dataset: str, tid: str, retry: Retry) -> bool: # thread-safe
849 src_dataset: str = replace_prefix(dst_dataset, old_prefix=dst.root_dataset, new_prefix=src.root_dataset)
850 if src_dataset in basis_src_datasets_set and (are_bookmarks_enabled(p, src) or not p.delete_dst_bookmarks):
851 src_kind: str = kind
852 if not p.delete_dst_snapshots_no_crosscheck:
853 src_kind = "snapshot,bookmark" if are_bookmarks_enabled(p, src) else "snapshot"
854 src_cmd = p.split_args(f"{p.zfs_program} list -t {src_kind} -d 1 -s name -Hp -o guid", src_dataset)
855 else:
856 src_cmd = None
857 dst_cmd = p.split_args(f"{p.zfs_program} list -t {kind} -d 1 -s createtxg -Hp -o {props}", dst_dataset)
858 self.maybe_inject_delete(dst, dataset=dst_dataset, delete_trigger="zfs_list_delete_dst_snapshots")
859 src_snaps_with_guids, dst_snaps_with_guids_str = run_in_parallel( # list src+dst snapshots in parallel
860 lambda: set(self.run_ssh_command(src, LOG_TRACE, cmd=src_cmd).splitlines() if src_cmd else []),
861 lambda: self.try_ssh_command(dst, LOG_TRACE, cmd=dst_cmd),
862 )
863 if dst_snaps_with_guids_str is None:
864 log.warning("Third party deleted destination: %s", dst_dataset)
865 return False
866 held_dst_snapshots: set[str] = set()
867 dst_snaps_with_guids: list[str] = []
868 no_userrefs: tuple[str, ...] = ("", "-", "0") # ZFS snapshot property userrefs > 0 indicates a zfs hold
869 for line in dst_snaps_with_guids_str.splitlines():
870 dst_snaps_with_guids.append(line[: line.rindex("\t")]) # strip off trailing userrefs column
871 _, name, userrefs = line.rsplit("\t", 2)
872 if userrefs not in no_userrefs:
873 tag: str = name[name.index("@") + 1 :]
874 held_dst_snapshots.add(tag) # don't attempt to delete snapshots that carry a `zfs hold`
875 num_dst_snaps_with_guids = len(dst_snaps_with_guids)
876 basis_dst_snaps_with_guids: list[str] = dst_snaps_with_guids.copy()
877 if p.delete_dst_bookmarks:
878 replace_in_lines(dst_snaps_with_guids, old="#", new="@", count=1) # treat bookmarks as snapshots
879 # The check against the source dataset happens *after* filtering the dst snapshots with filter_snapshots().
880 # `p.delete_dst_snapshots_except` means the user wants to specify snapshots to *retain* aka *keep*
881 all_except: bool = p.delete_dst_snapshots_except
882 if p.delete_dst_snapshots_except and not is_dummy(src):
883 # However, as here we are in "except" mode AND the source is NOT a dummy, we first filter to get what
884 # the policy says to *keep* (so all_except=False for the filter_snapshots() call), then from that "keep"
885 # list, we later further refine by checking what's on the source dataset.
886 all_except = False
887 dst_snaps_with_guids = filter_snapshots(self, dst_snaps_with_guids, all_except=all_except)
888 if p.delete_dst_bookmarks:
889 replace_in_lines(dst_snaps_with_guids, old="@", new="#", count=1) # restore pre-filtering bookmark state
890 if filter_needs_creation_time:
891 dst_snaps_with_guids = cut(field=2, lines=dst_snaps_with_guids)
892 basis_dst_snaps_with_guids = cut(field=2, lines=basis_dst_snaps_with_guids)
893 if p.delete_dst_snapshots_except and not is_dummy(src): # Non-dummy Source + "Except" (Keep) Mode
894 # Retain dst snapshots that match snapshot filter policy AND are on src dataset, aka
895 # Delete dst snapshots except snapshots that match snapshot filter policy AND are on src dataset.
896 # Concretely, `dst_snaps_with_guids` contains GUIDs of DST snapshots that the filter policy says to KEEP.
897 # We only actually keep them if they are ALSO on the SRC.
898 # So, snapshots to DELETE (`dst_tags_to_delete`) are ALL snapshots on DST (`basis_dst_snaps_with_guids`)
899 # EXCEPT those whose GUIDs are in `dst_snaps_with_guids` AND ALSO in `src_snaps_with_guids`.
900 except_dst_guids: set[str] = set(cut(field=1, lines=dst_snaps_with_guids)).intersection(src_snaps_with_guids)
901 dst_tags_to_delete: list[str] = filter_lines_except(basis_dst_snaps_with_guids, except_dst_guids)
902 else: # Standard Delete Mode OR Dummy Source + "Except" (Keep) Mode
903 # In standard delete mode:
904 # `dst_snaps_with_guids` contains GUIDs of policy-selected snapshots on DST.
905 # We delete those that are NOT on SRC.
906 # `dst_tags_to_delete` = `dst_snaps_with_guids` - `src_snaps_with_guids`.
907 # In dummy source + "except" (keep) mode:
908 # `all_except` was True.
909 # `dst_snaps_with_guids` contains snaps NOT matching the "keep" policy -- these are the ones to delete.
910 # `src_snaps_with_guids` is empty.
911 # `dst_tags_to_delete` = `dst_snaps_with_guids` - {} = `dst_snaps_with_guids`.
912 dst_guids_to_delete = set(cut(field=1, lines=dst_snaps_with_guids)).difference(src_snaps_with_guids)
913 dst_tags_to_delete = filter_lines(dst_snaps_with_guids, dst_guids_to_delete)
914 separator: str = "#" if p.delete_dst_bookmarks else "@"
915 dst_tags_to_delete = cut(field=2, separator=separator, lines=dst_tags_to_delete)
916 if p.delete_dst_bookmarks:
917 delete_bookmarks(self, dst, dst_dataset, snapshot_tags=dst_tags_to_delete)
918 else:
919 dst_tags_to_delete = [tag for tag in dst_tags_to_delete if tag not in held_dst_snapshots]
920 delete_snapshots(self, dst, dst_dataset, snapshot_tags=dst_tags_to_delete)
921 with self.stats_lock:
922 nonlocal num_snapshots_found
923 num_snapshots_found += num_dst_snaps_with_guids
924 nonlocal num_snapshots_deleted
925 num_snapshots_deleted += len(dst_tags_to_delete)
926 if len(dst_tags_to_delete) > 0 and not p.delete_dst_bookmarks:
927 self.dst_properties[dst_dataset].snapshots_changed = 0 # invalidate cache
928 return True
930 # Run delete_destination_snapshots(dataset) for each dataset, while handling errors, retries + parallel exec
931 failed: bool = False
932 if are_bookmarks_enabled(p, dst) or not p.delete_dst_bookmarks:
933 start_time_nanos = time.monotonic_ns()
934 failed = process_datasets_in_parallel_and_fault_tolerant(
935 log=log,
936 datasets=dst_datasets,
937 process_dataset=delete_destination_snapshots, # lambda
938 skip_tree_on_error=lambda dataset: False,
939 skip_on_error=p.skip_on_error,
940 max_workers=max_workers,
941 timing=self.task_timing,
942 termination_handler=self.terminate,
943 enable_barriers=False,
944 task_name="--delete-dst-snapshots",
945 append_exception=self.append_exception,
946 retry_template=self._retry_template(),
947 dry_run=p.dry_run,
948 is_test_mode=self.is_test_mode,
949 )
950 elapsed_nanos = time.monotonic_ns() - start_time_nanos
951 log.info(
952 p.dry("--delete-dst-snapshots: %s"),
953 task_description + f" [Deleted {num_snapshots_deleted} out of {num_snapshots_found} {kind}s "
954 f"within {len(dst_datasets)} datasets; took {human_readable_duration(elapsed_nanos)}]",
955 )
956 return failed
958 def delete_dst_datasets_task(
959 self, basis_src_datasets: list[str], basis_dst_datasets: list[str], sorted_dst_datasets: list[str]
960 ) -> tuple[list[str], list[str]]:
961 """Deletes existing destination datasets that do not exist within the source dataset if they are included via
962 --{include|exclude}-dataset* policy; implements --delete-dst-datasets.
964 Do not recurse without --recursive. With --recursive, never delete non-selected dataset subtrees or their ancestors.
965 """
966 p = self.params
967 src, dst = p.src, p.dst
968 children: dict[str, set[str]] = defaultdict(set)
969 for dst_dataset in basis_dst_datasets: # Compute the direct children of each NON-FILTERED dataset
970 parent: str = os.path.dirname(dst_dataset)
971 children[parent].add(dst_dataset)
972 to_delete: set[str] = set()
973 for dst_dataset in reversed(sorted_dst_datasets): # Reverse order facilitates efficient O(N) time algorithm
974 if children[dst_dataset].issubset(to_delete):
975 to_delete.add(dst_dataset) # all children are deletable, thus the dataset itself is deletable too
976 to_delete = to_delete.difference(
977 replace_prefix(src_dataset, src.root_dataset, dst.root_dataset) for src_dataset in basis_src_datasets
978 )
979 delete_datasets(self, dst, to_delete)
980 sorted_dst_datasets = sorted(set(sorted_dst_datasets).difference(to_delete))
981 basis_dst_datasets = sorted(set(basis_dst_datasets).difference(to_delete))
982 return basis_dst_datasets, sorted_dst_datasets
984 def delete_empty_dst_datasets_task(
985 self, basis_dst_datasets: list[str], sorted_dst_datasets: list[str]
986 ) -> tuple[list[str], list[str]]:
987 """Deletes any existing destination dataset that has no snapshot and no bookmark if all descendants of that dataset
988 do not have a snapshot or bookmark either; implements --delete-empty-dst-datasets.
990 To do so, we walk the dataset list (conceptually, a tree) depth-first (i.e. sorted descending). If a dst dataset has
991 zero snapshots and zero bookmarks and all its children are already marked as orphans, then it is itself an orphan,
992 and we mark it as such. Walking in a reverse sorted way means that we efficiently check for zero snapshots/bookmarks
993 not just over the direct children but the entire tree. Finally, delete all orphan datasets in an efficient batched
994 way.
995 """
996 p = self.params
997 dst = p.dst
999 # Compute the direct children of each NON-FILTERED dataset. Thus, no non-selected dataset and no ancestor of a
1000 # non-selected dataset will ever be added to the "orphan" set. In other words, this treats non-selected dataset
1001 # subtrees as if they all had snapshots, so non-selected dataset subtrees and their ancestors are guaranteed
1002 # to not get deleted.
1003 children: dict[str, set[str]] = defaultdict(set)
1004 for dst_dataset in basis_dst_datasets:
1005 parent: str = os.path.dirname(dst_dataset)
1006 children[parent].add(dst_dataset)
1008 def compute_orphans(datasets_having_snapshots: set[str]) -> set[str]:
1009 """Returns destination datasets having zero snapshots whose children are all orphans."""
1010 orphans: set[str] = set()
1011 for dst_dataset in reversed(sorted_dst_datasets): # Reverse order facilitates efficient O(N) time algorithm
1012 if (dst_dataset not in datasets_having_snapshots) and children[dst_dataset].issubset(orphans):
1013 orphans.add(dst_dataset)
1014 return orphans
1016 # Compute candidate orphan datasets, which reduces the list of datasets for which we list snapshots via
1017 # 'zfs list -t snapshot ...' from dst_datasets to a subset of dst_datasets, which in turn reduces I/O and improves
1018 # perf. Essentially, this eliminates the I/O to list snapshots for ancestors of excluded datasets.
1019 candidate_orphans: set[str] = compute_orphans(set())
1021 # Compute destination datasets having more than zero snapshots
1022 dst_datasets_having_snapshots: set[str] = set()
1023 with_bookmarks: bool = p.delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots and are_bookmarks_enabled(p, dst)
1024 btype: str = "bookmark,snapshot" if with_bookmarks else "snapshot"
1025 cmd: list[str] = p.split_args(f"{p.zfs_program} list -t {btype} -d 1 -S name -Hp -o name")
1026 for snapshots in zfs_list_snapshots_in_parallel(self, dst, cmd, sorted(candidate_orphans), ordered=False):
1027 if with_bookmarks:
1028 replace_in_lines(snapshots, old="#", new="@", count=1) # treat bookmarks as snapshots
1029 dst_datasets_having_snapshots.update(snap[: snap.index("@")] for snap in snapshots) # union
1031 orphans: set[str] = compute_orphans(dst_datasets_having_snapshots) # compute the real orphans
1032 delete_datasets(self, dst, orphans) # finally, delete the orphan datasets in an efficient way
1033 sorted_dst_datasets = sorted(set(sorted_dst_datasets).difference(orphans))
1034 basis_dst_datasets = sorted(set(basis_dst_datasets).difference(orphans))
1035 return basis_dst_datasets, sorted_dst_datasets
1037 def monitor_snapshots_task(
1038 self, sorted_src_datasets: list[str], sorted_dst_datasets: list[str], task_description: str
1039 ) -> None:
1040 """Monitors src and dst snapshots; implements --monitor-snapshots."""
1041 p, log = self.params, self.params.log
1042 src, dst = p.src, p.dst
1043 num_cache_hits: int = self.num_cache_hits
1044 num_cache_misses: int = self.num_cache_misses
1045 start_time_nanos: int = time.monotonic_ns()
1046 dst_alert, src_alert = run_in_parallel(
1047 lambda: self.monitor_snapshots(dst, sorted_dst_datasets),
1048 lambda: self.monitor_snapshots(src, sorted_src_datasets),
1049 )
1050 exit_code, _exit_kind, exit_msg = min(dst_alert, src_alert)
1051 if exit_code != 0:
1052 die(exit_msg, -exit_code)
1053 elapsed: str = human_readable_duration(time.monotonic_ns() - start_time_nanos)
1054 num_cache_hits = self.num_cache_hits - num_cache_hits
1055 num_cache_misses = self.num_cache_misses - num_cache_misses
1056 if num_cache_hits > 0 or num_cache_misses > 0:
1057 msg = self._cache_hits_msg(hits=num_cache_hits, misses=num_cache_misses)
1058 else:
1059 msg = ""
1060 log.info(
1061 "--monitor-snapshots done: %s",
1062 f"{task_description} [{len(sorted_src_datasets) + len(sorted_dst_datasets)} datasets; took {elapsed}{msg}]",
1063 )
1065 def monitor_snapshots(self, remote: Remote, sorted_datasets: list[str]) -> tuple[int, str, str]:
1066 """Checks snapshot freshness and warns or errors out when limits are exceeded.
1068 Alerts the user if the ZFS 'creation' time property of the latest or oldest snapshot for any specified snapshot name
1069 pattern within the selected datasets is too old wrt. the specified age limit. The purpose is to check if snapshots
1070 are successfully taken on schedule, successfully replicated on schedule, and successfully pruned on schedule. Process
1071 exit code is 0, 1, 2 on OK, WARNING, CRITICAL, respectively.
1073 Time complexity is O((N log N) + (N * M log M)) where N is the number of datasets and M is the number of snapshots
1074 per dataset. Space complexity is O(max(N, M)).
1075 """
1076 p, log = self.params, self.params.log
1077 alerts: list[MonitorSnapshotAlert] = p.monitor_snapshots_config.alerts
1078 labels: list[SnapshotLabel] = [alert.label for alert in alerts]
1079 oldest_skip_holds: list[bool] = [alert.oldest_skip_holds for alert in alerts]
1080 current_unixtime_millis: float = p.create_src_snapshots_config.current_datetime.timestamp() * 1000
1081 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
1082 if is_caching_snapshots(p, remote):
1083 props: dict[str, DatasetProperties] = self.dst_properties if remote is p.dst else self.src_properties
1084 snapshots_changed_dict: dict[str, int] = {dataset: vals.snapshots_changed for dataset, vals in props.items()}
1085 alerts_hash: str = sha256_128_urlsafe_base64(str(tuple(alerts)))
1086 label_hashes: dict[SnapshotLabel, str] = {
1087 label: sha256_128_urlsafe_base64(label.notimestamp_str()) for label in labels
1088 }
1089 is_caching: bool = False
1090 worst_alert: tuple[int, str, str] = (0, "", "") # -exit_code, exit_kind, exit_msg
1092 def record_alert(exit_code: int, exit_kind: str, exit_msg: str) -> None:
1093 nonlocal worst_alert
1094 worst_alert = min(worst_alert, (-exit_code, exit_kind, exit_msg)) # min() sorts "Latest" before "Oldest" on tie
1096 def monitor_last_modified_cache_file(r: Remote, dataset: str, label: SnapshotLabel, alert_cfg: AlertConfig) -> str:
1097 cache_label: str = os.path.join(MONITOR_CACHE_FILE_PREFIX, alert_cfg.kind[0], label_hashes[label], alerts_hash)
1098 return self.cache.last_modified_cache_file(r, dataset, cache_label)
1100 def alert_msg(
1101 kind: str, dataset: str, snapshot: str, label: SnapshotLabel, snapshot_age_millis: float, delta_millis: int
1102 ) -> str:
1103 assert kind == "Latest" or kind == "Oldest"
1104 lbl = f"{label.prefix}{label.infix}<timestamp>{label.suffix}"
1105 if snapshot_age_millis >= current_unixtime_millis:
1106 return f"No snapshot exists for {dataset}@{lbl}"
1107 msg = f"{kind} snapshot for {dataset}@{lbl} is {human_readable_duration(snapshot_age_millis, unit='ms')} old"
1108 s = f": @{snapshot}" if snapshot else ""
1109 if delta_millis == -1:
1110 return f"{msg}{s}"
1111 return f"{msg} but should be at most {human_readable_duration(delta_millis, unit='ms')} old{s}"
1113 def check_alert(
1114 label: SnapshotLabel, alert_cfg: AlertConfig | None, creation_unixtime_secs: int, dataset: str, snapshot: str
1115 ) -> None: # thread-safe
1116 if alert_cfg is None:
1117 return
1118 if is_caching and not p.dry_run: # update cache with latest state from 'zfs list -t snapshot'
1119 snapshots_changed: int = snapshots_changed_dict.get(dataset, 0)
1120 cache_file: str = monitor_last_modified_cache_file(remote, dataset, label, alert_cfg)
1121 set_last_modification_time_safe(
1122 cache_file, unixtime_in_secs=(creation_unixtime_secs, snapshots_changed), if_more_recent=True
1123 )
1124 warning_millis: int = alert_cfg.warning_millis
1125 critical_millis: int = alert_cfg.critical_millis
1126 alert_kind = alert_cfg.kind
1127 snapshot_age_millis: float = current_unixtime_millis - creation_unixtime_secs * 1000
1128 m = "--monitor_snapshots: "
1129 if snapshot_age_millis > critical_millis:
1130 msg = m + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, critical_millis)
1131 log.critical("%s", msg)
1132 if not p.monitor_snapshots_config.dont_crit:
1133 record_alert(CRITICAL_STATUS, alert_kind, msg)
1134 elif snapshot_age_millis > warning_millis:
1135 msg = m + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, warning_millis)
1136 log.warning("%s", msg)
1137 if not p.monitor_snapshots_config.dont_warn:
1138 record_alert(WARNING_STATUS, alert_kind, msg)
1139 elif is_debug:
1140 msg = m + "OK. " + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, delta_millis=-1)
1141 log.debug("%s", msg)
1143 def alert_latest_snapshot(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None:
1144 alert: MonitorSnapshotAlert = alerts[i]
1145 check_alert(alert.label, alert.latest, creation_unixtime_secs, dataset, snapshot)
1147 def alert_oldest_snapshot(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None:
1148 alert: MonitorSnapshotAlert = alerts[i]
1149 check_alert(alert.label, alert.oldest, creation_unixtime_secs, dataset, snapshot)
1151 def find_stale_datasets_and_check_alerts() -> list[str]:
1152 """If the cache is enabled, check which datasets have changed to determine which datasets can be skipped cheaply,
1153 that is, without incurring 'zfs list -t snapshots'.
1155 This is done by comparing the "snapshots_changed" ZFS dataset property with the local cache. See
1156 https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed
1157 """
1158 stale_datasets: list[str] = []
1159 time_threshold: float = time.time() - MATURITY_TIME_THRESHOLD_SECS
1160 for dataset in sorted_datasets:
1161 is_stale_dataset: bool = False
1162 snapshots_changed: int = snapshots_changed_dict.get(dataset, 0)
1163 for alert in alerts:
1164 for cfg in (alert.latest, alert.oldest):
1165 if cfg is None:
1166 continue
1167 if (
1168 snapshots_changed != 0
1169 and snapshots_changed < time_threshold
1170 and ( # always True
1171 cached_unix_times := self.cache.get_snapshots_changed2(
1172 monitor_last_modified_cache_file(remote, dataset, alert.label, cfg)
1173 )
1174 )
1175 and snapshots_changed == cached_unix_times[1] # cached snapshots_changed aka last modified time
1176 and snapshots_changed >= cached_unix_times[0] # creation time of minmax snapshot aka access time
1177 ): # cached state is still valid; emit an alert if the latest/oldest snapshot is too old
1178 lbl = alert.label
1179 check_alert(lbl, cfg, creation_unixtime_secs=cached_unix_times[0], dataset=dataset, snapshot="")
1180 else: # cached state is no longer valid; fallback to 'zfs list -t snapshot'
1181 is_stale_dataset = True
1182 if is_stale_dataset:
1183 stale_datasets.append(dataset)
1184 return stale_datasets
1186 # satisfy request from local cache as much as possible
1187 if is_caching_snapshots(p, remote):
1188 stale_datasets: list[str] = find_stale_datasets_and_check_alerts()
1189 with self.stats_lock:
1190 self.num_cache_misses += len(stale_datasets)
1191 self.num_cache_hits += len(sorted_datasets) - len(stale_datasets)
1192 else:
1193 stale_datasets = sorted_datasets
1195 # fallback to 'zfs list -t snapshot' for any remaining datasets, as these couldn't be satisfied from local cache
1196 is_caching = is_caching_snapshots(p, remote)
1197 datasets_without_snapshots: list[str] = self.handle_minmax_snapshots(
1198 remote,
1199 stale_datasets,
1200 labels,
1201 fn_latest=alert_latest_snapshot,
1202 fn_oldest=alert_oldest_snapshot,
1203 fn_oldest_skip_holds=oldest_skip_holds,
1204 )
1205 for dataset in datasets_without_snapshots:
1206 for i in range(len(alerts)):
1207 alert_latest_snapshot(i, creation_unixtime_secs=0, dataset=dataset, snapshot="")
1208 alert_oldest_snapshot(i, creation_unixtime_secs=0, dataset=dataset, snapshot="")
1209 return worst_alert
1211 def replicate_datasets(self, src_datasets: list[str], task_description: str, max_workers: int) -> bool:
1212 """Replicates a list of datasets."""
1213 assert (not self.is_test_mode) or src_datasets == sorted(src_datasets), "List is not sorted"
1214 p, log = self.params, self.params.log
1215 src, dst = p.src, p.dst
1216 self.num_snapshots_found = 0
1217 self.num_snapshots_replicated = 0
1218 log.info("Starting replication task: %s", task_description + f" [{len(src_datasets)} datasets]")
1219 start_time_nanos: int = time.monotonic_ns()
1221 def src2dst(src_dataset: str) -> str:
1222 return replace_prefix(src_dataset, old_prefix=src.root_dataset, new_prefix=dst.root_dataset)
1224 def dst2src(dst_dataset: str) -> str:
1225 return replace_prefix(dst_dataset, old_prefix=dst.root_dataset, new_prefix=src.root_dataset)
1227 def find_stale_datasets() -> tuple[list[str], dict[str, str]]:
1228 """If the cache is enabled on replication, check which src datasets or dst datasets have changed to determine
1229 which datasets can be skipped cheaply, i.e. without incurring 'zfs list -t snapshots'.
1231 This is done by comparing the "snapshots_changed" ZFS dataset property with the local cache. See
1232 https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed
1233 """
1234 # First, check which src datasets have changed since the last replication to that destination
1235 cache_files: dict[str, str] = {}
1236 stale_src_datasets1: list[str] = []
1237 maybe_stale_dst_datasets: list[str] = []
1238 userhost_dir: str = sha256_85_urlsafe_base64(p.dst.cache_namespace())
1239 filter_key = tuple(tuple(f) for f in p.snapshot_filters) # cache is only valid for same --include/excl-snapshot*
1240 filter_hash_code: str = sha256_85_urlsafe_base64(str(filter_key))
1241 for src_dataset in src_datasets:
1242 dst_dataset: str = src2dst(src_dataset) # cache is only valid for identical destination dataset
1243 dst_dataset_dir: str = sha256_85_urlsafe_base64(dst_dataset)
1244 cache_label: str = os.path.join(
1245 REPLICATION_CACHE_FILE_PREFIX, userhost_dir, dst_dataset_dir, filter_hash_code
1246 )
1247 cache_file: str = self.cache.last_modified_cache_file(src, src_dataset, cache_label)
1248 cache_files[src_dataset] = cache_file
1249 snapshots_changed: int = self.src_properties[src_dataset].snapshots_changed # get prop "for free"
1250 if (
1251 snapshots_changed != 0
1252 and time.time() > snapshots_changed + MATURITY_TIME_THRESHOLD_SECS
1253 and snapshots_changed == self.cache.get_snapshots_changed(cache_file)
1254 ):
1255 maybe_stale_dst_datasets.append(dst_dataset)
1256 else:
1257 stale_src_datasets1.append(src_dataset)
1259 # For each src dataset that hasn't changed, check if the corresponding dst dataset has changed
1260 stale_src_datasets2: list[str] = []
1261 dst_snapshots_changed_dict: dict[str, int] = self.cache.zfs_get_snapshots_changed(dst, maybe_stale_dst_datasets)
1262 for dst_dataset in maybe_stale_dst_datasets:
1263 snapshots_changed = dst_snapshots_changed_dict.get(dst_dataset, 0)
1264 if (
1265 snapshots_changed != 0
1266 and time.time() > snapshots_changed + MATURITY_TIME_THRESHOLD_SECS
1267 and snapshots_changed
1268 == self.cache.get_snapshots_changed(self.cache.last_modified_cache_file(dst, dst_dataset))
1269 ):
1270 log.info("Already up-to-date [cached]: %s", dst_dataset)
1271 else:
1272 stale_src_datasets2.append(dst2src(dst_dataset))
1273 assert (not self.is_test_mode) or stale_src_datasets1 == sorted(stale_src_datasets1), "List is not sorted"
1274 assert (not self.is_test_mode) or stale_src_datasets2 == sorted(stale_src_datasets2), "List is not sorted"
1275 stale_src_datasets = list(heapq.merge(stale_src_datasets1, stale_src_datasets2)) # merge two sorted lists
1276 assert (not self.is_test_mode) or not has_duplicates(stale_src_datasets), "List contains duplicates"
1277 return stale_src_datasets, cache_files
1279 if is_caching_snapshots(p, src):
1280 stale_src_datasets, cache_files = find_stale_datasets()
1281 num_cache_misses = len(stale_src_datasets)
1282 num_cache_hits = len(src_datasets) - len(stale_src_datasets)
1283 self.num_cache_misses += num_cache_misses
1284 self.num_cache_hits += num_cache_hits
1285 cmsg = self._cache_hits_msg(hits=num_cache_hits, misses=num_cache_misses)
1286 else:
1287 stale_src_datasets = src_datasets
1288 cache_files = {}
1289 cmsg = ""
1291 done_src_datasets: list[str] = []
1292 done_src_datasets_lock: threading.Lock = threading.Lock()
1294 def _process_dataset_fn(src_dataset: str, tid: str, retry: Retry) -> bool:
1295 result: bool = replicate_dataset(job=self, src_dataset=src_dataset, tid=tid, retry=retry)
1296 with done_src_datasets_lock:
1297 done_src_datasets.append(src_dataset) # record datasets that were actually replicated (not skipped)
1298 return result
1300 # Run replicate_dataset(dataset) for each dataset, while taking care of errors, retries + parallel execution
1301 failed: bool = process_datasets_in_parallel_and_fault_tolerant(
1302 log=log,
1303 datasets=stale_src_datasets,
1304 process_dataset=_process_dataset_fn,
1305 skip_tree_on_error=lambda dataset: not self.dst_dataset_exists[src2dst(dataset)],
1306 skip_on_error=p.skip_on_error,
1307 max_workers=max_workers,
1308 timing=self.task_timing,
1309 termination_handler=self.terminate,
1310 enable_barriers=False,
1311 task_name="Replication",
1312 append_exception=self.append_exception,
1313 retry_template=self._retry_template(),
1314 dry_run=p.dry_run,
1315 is_test_mode=self.is_test_mode,
1316 )
1318 if is_caching_snapshots(p, src) and len(done_src_datasets) > 0:
1319 # refresh "snapshots_changed" ZFS dataset property from dst
1320 stale_dst_datasets: list[str] = [src2dst(src_dataset) for src_dataset in sorted(done_src_datasets)]
1321 dst_snapshots_changed_dict: dict[str, int] = self.cache.zfs_get_snapshots_changed(dst, stale_dst_datasets)
1322 for dst_dataset in stale_dst_datasets: # update local cache
1323 dst_snapshots_changed: int = dst_snapshots_changed_dict.get(dst_dataset, 0)
1324 dst_cache_file: str = self.cache.last_modified_cache_file(dst, dst_dataset)
1325 src_dataset: str = dst2src(dst_dataset)
1326 src_snapshots_changed: int = self.src_properties[src_dataset].snapshots_changed
1327 if not p.dry_run:
1328 set_last_modification_time_safe(
1329 cache_files[src_dataset], unixtime_in_secs=src_snapshots_changed, if_more_recent=True
1330 )
1331 set_last_modification_time_safe(
1332 dst_cache_file, unixtime_in_secs=dst_snapshots_changed, if_more_recent=True
1333 )
1335 elapsed_nanos: int = time.monotonic_ns() - start_time_nanos
1336 log.info(
1337 p.dry("Replication done: %s"),
1338 f"{task_description} [Replicated {self.num_snapshots_replicated} out of {self.num_snapshots_found} snapshots"
1339 f" within {len(src_datasets)} datasets; took {human_readable_duration(elapsed_nanos)}{cmsg}]",
1340 )
1341 return failed
1343 def maybe_inject_delete(self, remote: Remote, dataset: str, delete_trigger: str) -> None:
1344 """For testing only; for unit tests to delete datasets during replication and test correct handling of that."""
1345 assert delete_trigger
1346 counter = self.delete_injection_triggers.get("before")
1347 if counter and self.decrement_injection_counter(counter, delete_trigger):
1348 p = self.params
1349 cmd = p.split_args(f"{remote.sudo} {p.zfs_program} destroy -r", p.force_unmount, p.force_hard, dataset or "")
1350 self.run_ssh_command(remote, LOG_DEBUG, print_stdout=True, cmd=cmd)
1352 def maybe_inject_params(self, error_trigger: str) -> None:
1353 """For testing only; for unit tests to simulate errors during replication and test correct handling of them."""
1354 assert error_trigger
1355 counter = self.error_injection_triggers.get("before")
1356 if counter and self.decrement_injection_counter(counter, error_trigger):
1357 self.inject_params = self.param_injection_triggers[error_trigger]
1358 elif error_trigger in self.param_injection_triggers:
1359 self.inject_params = {}
1361 @staticmethod
1362 def recv_option_property_names(recv_opts: list[str]) -> set[str]:
1363 """Extracts -o and -x property names that are already specified on the command line; This can be used to check for
1364 dupes because 'zfs receive' does not accept multiple -o or -x options with the same property name."""
1365 propnames: set[str] = set()
1366 i = 0
1367 n = len(recv_opts)
1368 while i < n:
1369 stripped: str = recv_opts[i].strip()
1370 if stripped in ("-o", "-x"):
1371 i += 1
1372 if i == n or recv_opts[i].strip() in ("-o", "-x"):
1373 die(f"Missing value for {stripped} option in --zfs-recv-program-opt(s): {' '.join(recv_opts)}")
1374 if stripped == "-o" and "=" not in recv_opts[i]:
1375 die(f"Missing value for {stripped} name=value pair in --zfs-recv-program-opt(s): {' '.join(recv_opts)}")
1376 propname: str = recv_opts[i] if stripped == "-x" else recv_opts[i].split("=", 1)[0]
1377 validate_property_name(propname, "--zfs-recv-program-opt(s)")
1378 propnames.add(propname)
1379 i += 1
1380 return propnames
1382 def root_datasets_if_recursive_zfs_snapshot_is_possible(
1383 self, datasets: list[str], basis_datasets: list[str]
1384 ) -> list[str] | None:
1385 """Returns the root datasets within the (filtered) `datasets` list if no incompatible pruning is detected. A dataset
1386 within `datasets` is considered a root dataset if it has no parent, i.e. it is not a descendant of any dataset in
1387 `datasets`. Returns `None` if any (unfiltered) dataset in `basis_dataset` that is a descendant of at least one of the
1388 root datasets is missing in `datasets`, indicating that --include/exclude-dataset* or the snapshot schedule have
1389 pruned a dataset in a way that is incompatible with 'zfs snapshot -r' CLI semantics, thus requiring a switch to the
1390 non-recursive 'zfs snapshot snapshot1 .. snapshot N' CLI flavor.
1392 Assumes that set(datasets).issubset(set(basis_datasets)). Also assumes that datasets and basis_datasets are both
1393 sorted (and thus the output root_datasets list is sorted too), which is why this algorithm is efficient - O(N) time
1394 complexity. The impl is akin to the merge algorithm of a merge sort, adapted to our specific use case.
1395 See root_datasets_if_recursive_zfs_snapshot_is_possible_slow_but_correct() in the unit test suite for an alternative
1396 impl that's easier to grok.
1397 """
1398 assert (not self.is_test_mode) or datasets == sorted(datasets), "List is not sorted"
1399 assert (not self.is_test_mode) or not has_duplicates(datasets), "List contains duplicates"
1400 assert (not self.is_test_mode) or basis_datasets == sorted(basis_datasets), "List is not sorted"
1401 assert (not self.is_test_mode) or not has_duplicates(basis_datasets), "List contains duplicates"
1402 assert (not self.is_test_mode) or set(datasets).issubset(set(basis_datasets)), "Not a subset"
1403 root_datasets: list[str] = self.find_root_datasets(datasets)
1404 i = 0
1405 j = 0
1406 k = 0
1407 len_root_datasets = len(root_datasets)
1408 len_basis_datasets = len(basis_datasets)
1409 len_datasets = len(datasets)
1410 while i < len_root_datasets and j < len_basis_datasets: # walk and "merge" the sorted lists, in sync
1411 if basis_datasets[j] < root_datasets[i]: # irrelevant subtree?
1412 j += 1 # move to next basis_datasets[j]
1413 elif is_descendant(basis_datasets[j], of_root_dataset=root_datasets[i]): # relevant subtree?
1414 while k < len_datasets and datasets[k] < basis_datasets[j]:
1415 k += 1 # move to next datasets[k]
1416 if k == len_datasets or datasets[k] != basis_datasets[j]: # dataset chopped off by schedule or --incl/excl*?
1417 return None # detected filter pruning that is incompatible with 'zfs snapshot -r'
1418 j += 1 # move to next basis_datasets[j]
1419 else:
1420 i += 1 # move to next root_dataset[i]; no need to check root_datasets that are no longer (or not yet) reachable
1421 return root_datasets
1423 @staticmethod
1424 def find_root_datasets(sorted_datasets: list[str]) -> list[str]:
1425 """Returns the roots of the subtrees in the (sorted) input datasets; The output root dataset list is sorted, too; A
1426 dataset is a root dataset if it has no parent, i.e. it is not a descendant of any dataset in the input datasets."""
1427 root_datasets: list[str] = []
1428 skip_dataset: str = DONT_SKIP_DATASET
1429 for dataset in sorted_datasets:
1430 if is_descendant(dataset, of_root_dataset=skip_dataset):
1431 continue
1432 skip_dataset = dataset
1433 root_datasets.append(dataset)
1434 return root_datasets
1436 def find_datasets_to_snapshot(self, sorted_datasets: list[str]) -> dict[SnapshotLabel, list[str]]:
1437 """Given a (sorted) list of source datasets, returns a dict where the key is a snapshot name (aka SnapshotLabel, e.g.
1438 bzfs_2024-11-06_08:30:05_hourly) and the value is the (sorted) (sub)list of datasets for which a snapshot needs to be
1439 created with that name, because these datasets are due per the schedule, either because the 'creation' time of their
1440 most recent snapshot with that name pattern is now too old, or such a snapshot does not even exist.
1442 The baseline implementation uses the 'zfs list -t snapshot' CLI to find the most recent snapshots, which is simple
1443 but doesn't scale well with the number of snapshots, at least if the goal is to take snapshots every second. An
1444 alternative, much more scalable, implementation queries the standard ZFS "snapshots_changed" dataset property
1445 (requires zfs >= 2.2.0), in combination with a local cache that stores this property, as well as the creation time of
1446 the most recent snapshot, for each SnapshotLabel and each dataset.
1447 """
1448 p, log = self.params, self.params.log
1449 src = p.src
1450 config: CreateSrcSnapshotConfig = p.create_src_snapshots_config
1451 datasets_to_snapshot: dict[SnapshotLabel, list[str]] = defaultdict(list)
1452 is_caching: bool = False
1453 interner: HashedInterner[datetime] = HashedInterner() # reduces memory footprint
1454 msgs: list[tuple[datetime, str, SnapshotLabel, str]] = []
1456 def create_snapshot_if_latest_is_too_old(
1457 datasets_to_snapshot: dict[SnapshotLabel, list[str]], dataset: str, label: SnapshotLabel, creation_unixtime: int
1458 ) -> None: # thread-safe
1459 """Schedules creation of a snapshot for the given label if the label's existing latest snapshot is too old."""
1460 creation_dt: datetime = datetime.fromtimestamp(creation_unixtime, tz=config.tz)
1461 log.log(LOG_TRACE, "Latest snapshot creation: %s for %s", creation_dt, label)
1462 duration_amount, duration_unit = config.suffix_durations[label.suffix]
1463 next_event_dt: datetime = config.anchors.round_datetime_up_to_duration_multiple(
1464 creation_dt + timedelta(microseconds=1), duration_amount, duration_unit
1465 )
1466 msg: str = ""
1467 if config.current_datetime >= next_event_dt:
1468 datasets_to_snapshot[label].append(dataset) # mark it as scheduled for snapshot creation
1469 msg = " has passed"
1470 next_event_dt = interner.intern(next_event_dt)
1471 msgs.append((next_event_dt, dataset, label, msg))
1472 if is_caching and not p.dry_run: # update cache with latest state from 'zfs list -t snapshot'
1473 # Per-label cache stores (atime=creation, mtime=snapshots_changed) so later runs can safely trust creation
1474 # only when the label's mtime matches the current dataset-level '=' cache value. Excludes timestamp of label.
1475 cache_file: str = self.cache.last_modified_cache_file(src, dataset, label_hashes[label])
1476 unixtimes: tuple[int, int] = (creation_unixtime, self.src_properties[dataset].snapshots_changed)
1477 set_last_modification_time_safe(cache_file, unixtime_in_secs=unixtimes, if_more_recent=True)
1479 labels: list[SnapshotLabel] = []
1480 config_labels: list[SnapshotLabel] = config.snapshot_labels()
1481 for label in config_labels:
1482 duration_amount_, _duration_unit = config.suffix_durations[label.suffix]
1483 if duration_amount_ == 0 or config.create_src_snapshots_even_if_not_due:
1484 datasets_to_snapshot[label] = sorted_datasets # take snapshot regardless of creation time of existing snaps
1485 else:
1486 labels.append(label)
1487 if len(labels) == 0:
1488 return datasets_to_snapshot # nothing more TBD
1489 label_hashes: dict[SnapshotLabel, str] = {
1490 label: sha256_128_urlsafe_base64(label.notimestamp_str()) for label in labels
1491 }
1493 # satisfy request from local cache as much as possible
1494 cached_datasets_to_snapshot: dict[SnapshotLabel, list[str]] = defaultdict(list)
1495 if is_caching_snapshots(p, src):
1496 sorted_datasets_todo: list[str] = []
1497 time_threshold: float = time.time() - MATURITY_TIME_THRESHOLD_SECS
1498 for dataset in sorted_datasets:
1499 cache: SnapshotCache = self.cache
1500 cached_snapshots_changed: int = cache.get_snapshots_changed(cache.last_modified_cache_file(src, dataset))
1501 if cached_snapshots_changed == 0:
1502 sorted_datasets_todo.append(dataset) # request cannot be answered from cache
1503 continue
1504 if cached_snapshots_changed != self.src_properties[dataset].snapshots_changed: # get that prop "for free"
1505 cache.invalidate_last_modified_cache_dataset(dataset)
1506 sorted_datasets_todo.append(dataset) # request cannot be answered from cache
1507 continue
1508 if cached_snapshots_changed >= time_threshold: # Avoid equal-second races: only trust matured cache entries
1509 sorted_datasets_todo.append(dataset) # cache entry isn't mature enough to be trusted; skip cache
1510 continue
1511 creation_unixtimes: list[int] = []
1512 for label_hash in label_hashes.values():
1513 # For per-label files, atime stores the latest matching snapshot's creation time, while mtime stores
1514 # the dataset-level snapshots_changed observed when this label file was written.
1515 atime, mtime = cache.get_snapshots_changed2(cache.last_modified_cache_file(src, dataset, label_hash))
1516 # Sanity check: trust per-label cache only when:
1517 # - mtime equals the dataset-level '=' cache (same snapshots_changed), and
1518 # - atime is plausible and not later than mtime (creation <= snapshots_changed), and
1519 # - neither atime nor mtime is zero (unknown provenance).
1520 # Otherwise fall back to 'zfs list -t snapshot' to avoid stale creation times after newer changes.
1521 if atime == 0 or mtime == 0 or mtime != cached_snapshots_changed or atime > mtime:
1522 sorted_datasets_todo.append(dataset) # request cannot be answered from cache
1523 break
1524 creation_unixtimes.append(atime)
1525 if len(creation_unixtimes) == len(labels):
1526 for j, label in enumerate(labels):
1527 create_snapshot_if_latest_is_too_old(
1528 cached_datasets_to_snapshot, dataset, label, creation_unixtimes[j]
1529 )
1530 sorted_datasets = sorted_datasets_todo
1532 def create_snapshot_fn(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None:
1533 create_snapshot_if_latest_is_too_old(datasets_to_snapshot, dataset, labels[i], creation_unixtime_secs)
1535 def on_finish_dataset(dataset: str) -> None:
1536 if is_caching_snapshots(p, src) and not p.dry_run:
1537 set_last_modification_time_safe(
1538 self.cache.last_modified_cache_file(src, dataset),
1539 unixtime_in_secs=self.src_properties[dataset].snapshots_changed,
1540 if_more_recent=True,
1541 )
1543 # fallback to 'zfs list -t snapshot' for any remaining datasets, as these couldn't be satisfied from local cache
1544 is_caching = is_caching_snapshots(p, src)
1545 datasets_without_snapshots: list[str] = self.handle_minmax_snapshots(
1546 src, sorted_datasets, labels, fn_latest=create_snapshot_fn, fn_on_finish_dataset=on_finish_dataset
1547 )
1548 for lbl in labels: # merge (sorted) results from local cache + 'zfs list -t snapshot' into (sorted) combined result
1549 datasets_to_snapshot[lbl].sort()
1550 if datasets_without_snapshots or (lbl in cached_datasets_to_snapshot): # +take snaps for snapshot-less datasets
1551 datasets_to_snapshot[lbl] = list( # inputs to merge() are sorted, and outputs are sorted too
1552 heapq.merge(datasets_to_snapshot[lbl], cached_datasets_to_snapshot[lbl], datasets_without_snapshots)
1553 )
1554 for label, datasets in datasets_to_snapshot.items():
1555 assert (not self.is_test_mode) or datasets == sorted(datasets), "List is not sorted"
1556 assert (not self.is_test_mode) or not has_duplicates(datasets), "List contains duplicates"
1557 assert label
1559 msgs.sort() # sort by time, dataset, label
1560 for i in range(0, len(msgs), 10_000): # reduce logging overhead via mini-batching
1561 text = "".join(
1562 f"\nNext scheduled snapshot time: {next_event_dt} for {dataset}@{label}{msg}"
1563 for next_event_dt, dataset, label, msg in msgs[i : i + 10_000]
1564 )
1565 log.info("Next scheduled snapshot times ...%s", text)
1567 # sort keys to ensure that we take snapshots for dailies before hourlies, and so on
1568 datasets_to_snapshot = {lbl: datasets_to_snapshot[lbl] for lbl in config_labels if lbl in datasets_to_snapshot}
1569 return datasets_to_snapshot
1571 def handle_minmax_snapshots(
1572 self,
1573 remote: Remote,
1574 sorted_datasets: list[str],
1575 labels: list[SnapshotLabel],
1576 *,
1577 fn_latest: Callable[[int, int, str, str], None], # callback function for latest snapshot
1578 fn_oldest: Callable[[int, int, str, str], None] | None = None, # callback function for oldest snapshot
1579 fn_oldest_skip_holds: Sequence[bool] = (),
1580 fn_on_finish_dataset: Callable[[str], None] = lambda dataset: None,
1581 ) -> list[str]: # thread-safe
1582 """For each dataset in `sorted_datasets`, for each label in `labels`, finds the latest and oldest snapshot, and runs
1583 the callback functions on them; Ignores the timestamp of the input labels and the timestamp of the snapshot names.
1585 If the (optional) fn_oldest_skip_holds=True for a given label, then snapshots for that label that carry a 'zfs hold'
1586 are skipped (ignored) when finding the oldest snapshot for fn_oldest. This can be useful for monitor_snapshots(),
1587 given that users often intentionally retain holds for longer than the "normal" snapshot retention period, and this
1588 shouldn't necessarily cause monitoring to emit alerts.
1589 """
1590 if fn_oldest is not None:
1591 assert len(labels) == len(fn_oldest_skip_holds)
1592 assert (not self.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted"
1593 no_userrefs: tuple[str, ...] = ("", "-", "0") # ZFS snapshot property userrefs > 0 indicates a zfs hold
1595 def extract_fields(line: str) -> tuple[int, int, str, bool]:
1596 fields: list[str] = line.split("\t")
1597 if len(fields) == 3:
1598 name, createtxg, creation_unixtime_secs = fields
1599 userrefs = ""
1600 else:
1601 name, createtxg, creation_unixtime_secs, userrefs = fields
1602 return (
1603 int(createtxg),
1604 int(creation_unixtime_secs),
1605 name.split("@", 1)[1],
1606 userrefs in no_userrefs,
1607 )
1609 p = self.params
1610 props: str = "name,createtxg,creation"
1611 props = props if fn_oldest is None or not any(fn_oldest_skip_holds) else props + ",userrefs"
1612 cmd = p.split_args(f"{p.zfs_program} list -t snapshot -d 1 -Hp -o {props}") # sorts by dataset,creation
1613 datasets_with_snapshots: set[str] = set()
1614 interner: SortedInterner[str] = SortedInterner(sorted_datasets) # reduces memory footprint
1615 for lines in zfs_list_snapshots_in_parallel(self, remote, cmd, sorted_datasets, ordered=False):
1616 # streaming group by dataset name (consumes constant memory only)
1617 for dataset, group in itertools.groupby(lines, key=lambda line: line.split("\t", 1)[0].split("@", 1)[0]):
1618 dataset = interner.interned(dataset)
1619 snapshots = sorted( # fetch all snapshots of current dataset and sort by createtxg,creation,name
1620 extract_fields(line) for line in group
1621 ) # perf: sorted() is fast because Timsort is close to O(N) for nearly sorted input, which is our case
1622 assert len(snapshots) > 0
1623 datasets_with_snapshots.add(dataset)
1624 snapshot_names: tuple[str, ...] = tuple(snapshot[2] for snapshot in snapshots)
1625 year_with_4_digits_regex: re.Pattern[str] = YEAR_WITH_FOUR_DIGITS_REGEX
1626 year_with_4_digits_regex_fullmatch = year_with_4_digits_regex.fullmatch
1627 startswith = str.startswith
1628 endswith = str.endswith
1629 fns = ((fn_latest, True),) if fn_oldest is None else ((fn_latest, True), (fn_oldest, False))
1630 for i, label in enumerate(labels):
1631 infix: str = label.infix
1632 start: str = label.prefix + infix
1633 end: str = label.suffix
1634 startlen: int = len(start)
1635 endlen: int = len(end)
1636 minlen: int = startlen + endlen if infix else 4 + startlen + endlen # year_with_four_digits_regex
1637 startlen_4: int = startlen + 4 # [startlen:startlen+4] # year_with_four_digits_regex
1638 has_infix: bool = bool(infix)
1639 for fn, is_reverse in fns:
1640 creation_unixtime_secs: int = 0 # find creation time of latest or oldest snapshot matching the label
1641 minmax_snapshot: str = ""
1642 no_skip_holds: bool = is_reverse or not fn_oldest_skip_holds[i]
1643 for j in range(len(snapshot_names) - 1, -1, -1) if is_reverse else range(len(snapshot_names)):
1644 snapshot_name: str = snapshot_names[j]
1645 if (
1646 endswith(snapshot_name, end) # aka snapshot_name.endswith(end)
1647 and startswith(snapshot_name, start) # aka snapshot_name.startswith(start)
1648 and len(snapshot_name) >= minlen
1649 and (has_infix or year_with_4_digits_regex_fullmatch(snapshot_name, startlen, startlen_4))
1650 and (no_skip_holds or snapshots[j][3])
1651 ):
1652 creation_unixtime_secs = snapshots[j][1]
1653 minmax_snapshot = snapshot_name
1654 break
1655 fn(i, creation_unixtime_secs, dataset, minmax_snapshot)
1656 fn_on_finish_dataset(dataset)
1657 datasets_without_snapshots = [dataset for dataset in sorted_datasets if dataset not in datasets_with_snapshots]
1658 return datasets_without_snapshots
1660 @staticmethod
1661 def _cache_hits_msg(hits: int, misses: int) -> str:
1662 total = hits + misses
1663 return f", cache hits: {percent(hits, total, print_total=True)}, misses: {percent(misses, total, print_total=True)}"
1665 def run_ssh_command(
1666 self,
1667 remote: MiniRemote,
1668 loglevel: int = logging.INFO,
1669 is_dry: bool = False,
1670 check: bool = True,
1671 print_stdout: bool = False,
1672 print_stderr: bool = True,
1673 cmd: list[str] | None = None,
1674 retry_on_generic_ssh_error: bool = True,
1675 ) -> str:
1676 """Runs the given CLI cmd via ssh on the given remote, and returns stdout."""
1677 assert cmd is not None and isinstance(cmd, list) and len(cmd) > 0
1678 conn_pool: ConnectionPool = self.params.connection_pools[remote.location].pool(SHARED)
1679 with conn_pool.connection() as conn:
1680 log: logging.Logger = self.params.log
1681 try:
1682 process: subprocess.CompletedProcess[str] = conn.run_ssh_command(
1683 cmd=cmd,
1684 job=self,
1685 loglevel=loglevel,
1686 is_dry=is_dry,
1687 check=check,
1688 stdin=DEVNULL,
1689 stdout=PIPE,
1690 stderr=PIPE,
1691 text=True,
1692 )
1693 except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
1694 xprint(log, stderr_to_str(e.stdout) if print_stdout else e.stdout, run=print_stdout, file=sys.stdout, end="")
1695 xprint(log, stderr_to_str(e.stderr) if print_stderr else e.stderr, run=print_stderr, file=sys.stderr, end="")
1696 if retry_on_generic_ssh_error and isinstance(e, subprocess.CalledProcessError):
1697 stderr: str = stderr_to_str(e.stderr)
1698 if stderr.startswith("ssh: "):
1699 assert e.returncode == 255, e.returncode # error within SSH itself (not during the remote command)
1700 raise RetryableError(display_msg="ssh") from e
1701 raise
1702 else:
1703 if is_dry:
1704 return ""
1705 xprint(log, process.stdout, run=print_stdout, file=sys.stdout, end="")
1706 xprint(log, process.stderr, run=print_stderr, file=sys.stderr, end="")
1707 return process.stdout
1709 def try_ssh_command(
1710 self,
1711 remote: MiniRemote,
1712 loglevel: int,
1713 is_dry: bool = False,
1714 print_stdout: bool = False,
1715 cmd: list[str] | None = None,
1716 exists: bool = True,
1717 error_trigger: str | None = None,
1718 ) -> str | None:
1719 """Convenience method that helps retry/react to a dataset or pool that potentially doesn't exist anymore."""
1720 assert cmd is not None and isinstance(cmd, list) and len(cmd) > 0
1721 log = self.params.log
1722 try:
1723 self.maybe_inject_error(cmd=cmd, error_trigger=error_trigger)
1724 return self.run_ssh_command(remote=remote, loglevel=loglevel, is_dry=is_dry, print_stdout=print_stdout, cmd=cmd)
1725 except (subprocess.CalledProcessError, UnicodeDecodeError) as e:
1726 if not isinstance(e, UnicodeDecodeError):
1727 stderr: str = stderr_to_str(e.stderr)
1728 if exists and (
1729 ": dataset does not exist" in stderr
1730 or ": filesystem does not exist" in stderr # solaris 11.4.0
1731 or ": no such pool" in stderr
1732 or "does not have any resumable receive state to abort" in stderr # harmless `zfs receive -A` race
1733 ):
1734 return None
1735 log.warning("%s", stderr.rstrip())
1736 raise RetryableError("Subprocess failed") from e
1738 def try_ssh_command_with_retries(self, *args: Any, **kwargs: Any) -> str | None:
1739 """Convenience method that auto-retries try_ssh_command() on failure."""
1740 return self._retry_template().call_with_retries(fn=lambda retry: self.try_ssh_command(*args, **kwargs))
1742 def run_ssh_command_with_retries(self, *args: Any, **kwargs: Any) -> str:
1743 """Convenience method that auto-retries run_ssh_command() on transport failure (not on remote command failure)."""
1744 return self._retry_template().call_with_retries(fn=lambda retry: self.run_ssh_command(*args, **kwargs))
1746 def maybe_inject_error(self, cmd: list[str], error_trigger: str | None = None) -> None:
1747 """For testing only; for unit tests to simulate errors during replication and test correct handling of them."""
1748 if error_trigger:
1749 counter = self.error_injection_triggers.get("before")
1750 if counter and self.decrement_injection_counter(counter, error_trigger):
1751 try:
1752 raise CalledProcessError(returncode=1, cmd=" ".join(cmd), stderr=error_trigger + ":dataset is busy")
1753 except subprocess.CalledProcessError as e:
1754 if error_trigger.startswith("retryable_"):
1755 raise RetryableError("Subprocess failed") from e
1756 else:
1757 raise
1759 def decrement_injection_counter(self, counter: Counter[str], trigger: str) -> bool:
1760 """For testing only."""
1761 with self.injection_lock:
1762 if counter[trigger] <= 0:
1763 return False
1764 counter[trigger] -= 1
1765 return True
1768#############################################################################
1769@final
1770class DatasetProperties:
1771 """Properties of a ZFS dataset."""
1773 __slots__ = ("recordsize", "snapshots_changed") # uses more compact memory layout than __dict__
1775 def __init__(self, recordsize: int, snapshots_changed: int) -> None:
1776 # immutable variables:
1777 self.recordsize: Final[int] = recordsize
1779 # mutable variables:
1780 self.snapshots_changed: int = snapshots_changed
1783#############################################################################
1784# Input format is [[user@]host:]dataset
1785# 1234 5 6
1786_DATASET_LOCATOR_REGEX: Final[re.Pattern[str]] = re.compile(r"(((([^@]*)@)?([^:]+)):)?(.*)", flags=re.DOTALL)
1789def parse_dataset_locator(
1790 input_text: str, validate: bool = True, user: str | None = None, host: str | None = None, port: int | None = None
1791) -> tuple[str, str, str, str, str]:
1792 """Splits user@host:dataset into its components with optional checks."""
1794 def convert_ipv6(hostname: str) -> str: # support IPv6 without getting confused by host:dataset colon separator ...
1795 return hostname.replace("|", ":") # ... and any colons that may be part of a (valid) ZFS dataset name
1797 user_undefined: bool = user is None
1798 if user is None:
1799 user = ""
1800 host_undefined: bool = host is None
1801 if host is None:
1802 host = ""
1803 host = convert_ipv6(host)
1804 user_host, dataset, pool = "", "", ""
1806 if match := _DATASET_LOCATOR_REGEX.fullmatch(input_text): 1806 ↛ 1823line 1806 didn't jump to line 1823 because the condition on line 1806 was always true
1807 if user_undefined:
1808 user = match.group(4) or ""
1809 if host_undefined:
1810 host = match.group(5) or ""
1811 host = convert_ipv6(host)
1812 if host == "-":
1813 host = ""
1814 dataset = match.group(6) or ""
1815 i = dataset.find("/")
1816 pool = dataset[0:i] if i >= 0 else dataset
1818 if user and host:
1819 user_host = f"{user}@{host}"
1820 elif host:
1821 user_host = host
1823 if validate:
1824 validate_user_name(user, input_text)
1825 validate_host_name(host, input_text)
1826 if port is not None:
1827 validate_port(port, f"Invalid port number: '{port}' for: '{input_text}' - ")
1828 validate_dataset_name(dataset, input_text)
1830 return user, host, user_host, pool, dataset
1833def validate_user_name(user: str, input_text: str) -> None:
1834 """Checks that the username is safe for ssh or local usage."""
1835 invalid_chars: str = SHELL_CHARS_AND_SLASH
1836 if user and (user.startswith("-") or ".." in user or any(c.isspace() or c in invalid_chars for c in user)):
1837 die(f"Invalid user name: '{user}' for: '{input_text}'")
1840def validate_host_name(host: str, input_text: str) -> None:
1841 """Checks hostname for forbidden characters or patterns."""
1842 invalid_chars: str = SHELL_CHARS_AND_SLASH
1843 if host and (host.startswith("-") or ".." in host or any(c.isspace() or c in invalid_chars for c in host)):
1844 die(f"Invalid host name: '{host}' for: '{input_text}'")
1847def validate_port(port: str | int | None, message: str) -> None:
1848 """Checks that port specification is a valid integer."""
1849 if isinstance(port, int):
1850 port = str(port)
1851 if port and not port.isdigit():
1852 die(message + f"must be empty or a positive integer: '{port}'")
1855def normalize_called_process_error(error: subprocess.CalledProcessError) -> int:
1856 """Normalizes `CalledProcessError.returncode` to avoid reserved exit codes so callers don't misclassify them."""
1857 ret: int = error.returncode
1858 ret = DIE_STATUS if isinstance(ret, int) and 1 <= ret <= STILL_RUNNING_STATUS else ret
1859 return ret
1862#############################################################################
1863if __name__ == "__main__": 1863 ↛ 1864line 1863 didn't jump to line 1864 because the condition on line 1863 was never true
1864 main()