Coverage for bzfs_main/bzfs.py: 99%
983 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15# Inline script metadata conforming to https://packaging.python.org/specifications/inline-script-metadata
16# /// script
17# requires-python = ">=3.8"
18# dependencies = []
19# ///
20#
21"""
22* Main CLI entry point for replicating and managing ZFS snapshots. It handles the low-level mechanics of `zfs send/receive`,
23 data transfer, and snapshot management between two hosts.
24* Overview of the bzfs.py codebase:
25* The codebase starts with docs, definition of input data and associated argument parsing into a "Params" class.
26* All CLI option/parameter values are reachable from the "Params" class.
27* Control flow starts in main(), which kicks off a "Job".
28* A Job runs one or more "tasks" via run_tasks(), each task replicating a separate dataset tree.
29* The core replication algorithm is in run_task() and especially in replicate_datasets() and replicate_dataset().
30* The filter algorithms that apply include/exclude policies are in filter_datasets() and filter_snapshots().
31* The --create-src-snapshots-* and --delete-* and --compare-* and --monitor-* algorithms also start in run_task().
32* The main retry logic is in run_with_retries() and clear_resumable_recv_state_if_necessary().
33* Progress reporting for use during `zfs send/recv` data transfers is in class ProgressReporter.
34* Executing a CLI command on a local or remote host is in run_ssh_command().
35* Network connection management is in refresh_ssh_connection_if_necessary() and class ConnectionPool.
36* Cache functionality can be found by searching for this regex: .*cach.*
37* The parallel processing engine is in itr_ssh_cmd_parallel() and process_datasets_in_parallel_and_fault_tolerant().
38* README.md is mostly auto-generated from the ArgumentParser help texts as the source of "truth", via update_readme.sh.
39 Simply run that script whenever you change or add ArgumentParser help text.
40"""
42from __future__ import annotations
43import argparse
44import contextlib
45import fcntl
46import hashlib
47import heapq
48import itertools
49import os
50import re
51import signal
52import subprocess
53import sys
54import threading
55import time
56from collections import defaultdict
57from datetime import datetime, timedelta
58from logging import Logger
59from os.path import join as os_path_join
60from pathlib import Path
61from subprocess import CalledProcessError
62from typing import (
63 IO,
64 Any,
65 Callable,
66 Collection,
67 Counter,
68 cast,
69)
71import bzfs_main.loggers
72from bzfs_main.argparse_actions import (
73 has_timerange_filter,
74)
75from bzfs_main.argparse_cli import (
76 EXCLUDE_DATASET_REGEXES_DEFAULT,
77)
78from bzfs_main.compare_snapshot_lists import (
79 run_compare_snapshot_lists,
80)
81from bzfs_main.configuration import (
82 AlertConfig,
83 CreateSrcSnapshotConfig,
84 LogParams,
85 MonitorSnapshotAlert,
86 Params,
87 Remote,
88 SnapshotLabel,
89)
90from bzfs_main.connection import (
91 decrement_injection_counter,
92 maybe_inject_error,
93 run_ssh_command,
94 timeout,
95 try_ssh_command,
96)
97from bzfs_main.detect import (
98 DISABLE_PRG,
99 RemoteConfCacheItem,
100 are_bookmarks_enabled,
101 detect_available_programs,
102 is_caching_snapshots,
103 is_dummy,
104 is_solaris_zfs,
105 is_zpool_feature_enabled_or_active,
106)
107from bzfs_main.filter import (
108 SNAPSHOT_REGEX_FILTER_NAME,
109 dataset_regexes,
110 filter_datasets,
111 filter_lines,
112 filter_lines_except,
113 filter_snapshots,
114)
115from bzfs_main.loggers import (
116 Tee,
117 get_simple_logger,
118 reset_logger,
119)
120from bzfs_main.parallel_batch_cmd import (
121 run_ssh_cmd_parallel,
122 zfs_list_snapshots_in_parallel,
123)
124from bzfs_main.parallel_engine import (
125 process_datasets_in_parallel_and_fault_tolerant,
126)
127from bzfs_main.parallel_iterator import (
128 run_in_parallel,
129)
130from bzfs_main.period_anchors import (
131 round_datetime_up_to_duration_multiple,
132)
133from bzfs_main.progress_reporter import (
134 ProgressReporter,
135 count_num_bytes_transferred_by_zfs_send,
136)
137from bzfs_main.replication import (
138 delete_bookmarks,
139 delete_datasets,
140 delete_snapshots,
141 replicate_dataset,
142)
143from bzfs_main.retry import (
144 Retry,
145 RetryableError,
146)
147from bzfs_main.snapshot_cache import (
148 SnapshotCache,
149 set_last_modification_time_safe,
150)
151from bzfs_main.utils import (
152 DESCENDANTS_RE_SUFFIX,
153 DIE_STATUS,
154 DONT_SKIP_DATASET,
155 FILE_PERMISSIONS,
156 LOG_DEBUG,
157 LOG_TRACE,
158 PROG_NAME,
159 SHELL_CHARS,
160 YEAR_WITH_FOUR_DIGITS_REGEX,
161 SortedInterner,
162 SynchronizedBool,
163 SynchronizedDict,
164 append_if_absent,
165 compile_regexes,
166 cut,
167 die,
168 has_duplicates,
169 human_readable_bytes,
170 human_readable_duration,
171 is_descendant,
172 open_nofollow,
173 percent,
174 pretty_print_formatter,
175 replace_in_lines,
176 replace_prefix,
177 terminate_process_subtree,
178 validate_dataset_name,
179 validate_property_name,
180 xappend,
181 xfinally,
182)
184# constants:
185__version__: str = bzfs_main.argparse_cli.__version__
186CRITICAL_STATUS: int = 2
187WARNING_STATUS: int = 1
188STILL_RUNNING_STATUS: int = 4
189MIN_PYTHON_VERSION: tuple[int, int] = (3, 8)
190if sys.version_info < MIN_PYTHON_VERSION:
191 print(f"ERROR: {PROG_NAME} requires Python version >= {'.'.join(map(str, MIN_PYTHON_VERSION))}!")
192 sys.exit(DIE_STATUS)
193CREATE_SRC_SNAPSHOTS_PREFIX_DFLT: str = PROG_NAME + "_"
194CREATE_SRC_SNAPSHOTS_SUFFIX_DFLT: str = "_adhoc"
195TIME_THRESHOLD_SECS: float = 1.1 # 1 second ZFS creation time resolution + NTP clock skew is typically < 10ms
198#############################################################################
199def argument_parser() -> argparse.ArgumentParser:
200 """Returns the CLI parser used by bzfs."""
201 return bzfs_main.argparse_cli.argument_parser()
204def main() -> None:
205 """API for command line clients."""
206 try:
207 run_main(argument_parser().parse_args(), sys.argv)
208 except subprocess.CalledProcessError as e:
209 sys.exit(e.returncode)
212def run_main(args: argparse.Namespace, sys_argv: list[str] | None = None, log: Logger | None = None) -> None:
213 """API for Python clients; visible for testing; may become a public API eventually."""
214 Job().run_main(args, sys_argv, log)
217#############################################################################
218class Job:
219 """Executes one bzfs run, coordinating snapshot replication tasks."""
221 def __init__(self) -> None:
222 self.params: Params
223 self.all_dst_dataset_exists: dict[str, dict[str, bool]] = defaultdict(lambda: defaultdict(bool))
224 self.dst_dataset_exists: SynchronizedDict[str, bool] = SynchronizedDict({})
225 self.src_properties: dict[str, DatasetProperties] = {}
226 self.dst_properties: dict[str, DatasetProperties] = {}
227 self.all_exceptions: list[str] = []
228 self.all_exceptions_count: int = 0
229 self.max_exceptions_to_summarize: int = 10000
230 self.first_exception: BaseException | None = None
231 self.remote_conf_cache: dict[tuple, RemoteConfCacheItem] = {}
232 self.dedicated_tcp_connection_per_zfs_send: bool = True
233 self.max_datasets_per_minibatch_on_list_snaps: dict[str, int] = {}
234 self.max_workers: dict[str, int] = {}
235 self.stats_lock: threading.Lock = threading.Lock()
236 self.num_cache_hits: int = 0
237 self.num_cache_misses: int = 0
238 self.num_snapshots_found: int = 0
239 self.num_snapshots_replicated: int = 0
240 self.control_persist_secs: int = 90
241 self.control_persist_margin_secs: int = 2
242 self.progress_reporter: ProgressReporter = cast(ProgressReporter, None)
243 self.is_first_replication_task: SynchronizedBool = SynchronizedBool(True)
244 self.replication_start_time_nanos: int = time.monotonic_ns()
245 self.timeout_nanos: int | None = None
246 self.cache: SnapshotCache = SnapshotCache(self)
248 self.is_test_mode: bool = False # for testing only
249 self.creation_prefix: str = "" # for testing only
250 self.isatty: bool | None = None # for testing only
251 self.use_select: bool = False # for testing only
252 self.progress_update_intervals: tuple[float, float] | None = None # for testing only
253 self.error_injection_triggers: dict[str, Counter[str]] = {} # for testing only
254 self.delete_injection_triggers: dict[str, Counter[str]] = {} # for testing only
255 self.param_injection_triggers: dict[str, dict[str, bool]] = {} # for testing only
256 self.inject_params: dict[str, bool] = {} # for testing only
257 self.injection_lock: threading.Lock = threading.Lock() # for testing only
258 self.max_command_line_bytes: int | None = None # for testing only
260 def shutdown(self) -> None:
261 """Exits any multiplexed ssh sessions that may be leftover."""
262 cache_items: Collection[RemoteConfCacheItem] = self.remote_conf_cache.values()
263 for i, cache_item in enumerate(cache_items):
264 cache_item.connection_pools.shutdown(f"{i + 1}/{len(cache_items)}")
266 def terminate(self, old_term_handler: Any, except_current_process: bool = False) -> None:
267 """Shuts down gracefully on SIGTERM, optionally killing descendants."""
269 def post_shutdown() -> None:
270 signal.signal(signal.SIGTERM, old_term_handler) # restore original signal handler
271 terminate_process_subtree(except_current_process=except_current_process)
273 with xfinally(post_shutdown):
274 self.shutdown()
276 def run_main(self, args: argparse.Namespace, sys_argv: list[str] | None = None, log: Logger | None = None) -> None:
277 """Parses CLI arguments, sets up logging, and executes main job loop."""
278 assert isinstance(self.error_injection_triggers, dict)
279 assert isinstance(self.delete_injection_triggers, dict)
280 assert isinstance(self.inject_params, dict)
281 with xfinally(reset_logger): # runs reset_logger() on exit, without masking exception raised in body of `with` block
282 try:
283 log_params: LogParams = LogParams(args)
284 log = bzfs_main.loggers.get_logger(log_params=log_params, args=args, log=log)
285 log.info("%s", f"Log file is: {log_params.log_file}")
286 except BaseException as e:
287 get_simple_logger(PROG_NAME).error("Log init: %s", e, exc_info=False if isinstance(e, SystemExit) else True)
288 raise
290 aux_args: list[str] = []
291 if getattr(args, "include_snapshot_plan", None):
292 aux_args += args.include_snapshot_plan
293 if getattr(args, "delete_dst_snapshots_except_plan", None):
294 aux_args += args.delete_dst_snapshots_except_plan
295 if len(aux_args) > 0:
296 log.info("Auxiliary CLI arguments: %s", " ".join(aux_args))
297 args = argument_parser().parse_args(xappend(aux_args, "--", args.root_dataset_pairs), namespace=args)
299 def log_error_on_exit(error: Any, status_code: Any, exc_info: bool = False) -> None:
300 log.error("%s%s", f"Exiting {PROG_NAME} with status code {status_code}. Cause: ", error, exc_info=exc_info)
302 try:
303 log.info("CLI arguments: %s %s", " ".join(sys_argv or []), f"[euid: {os.geteuid()}]")
304 if self.is_test_mode:
305 log.log(LOG_TRACE, "Parsed CLI arguments: %s", args)
306 self.params = p = Params(args, sys_argv or [], log_params, log, self.inject_params)
307 log_params.params = p
308 with open_nofollow(log_params.log_file, "a", encoding="utf-8", perm=FILE_PERMISSIONS) as log_file_fd:
309 with contextlib.redirect_stderr(cast(IO[Any], Tee(log_file_fd, sys.stderr))): # stderr to logfile+stderr
310 lock_file: str = p.lock_file_name()
311 lock_fd = os.open(lock_file, os.O_WRONLY | os.O_TRUNC | os.O_CREAT | os.O_NOFOLLOW, FILE_PERMISSIONS)
312 with xfinally(lambda: os.close(lock_fd)):
313 try:
314 # Acquire an exclusive lock; will raise an error if lock is already held by another process.
315 # The (advisory) lock is auto-released when the process terminates or the fd is closed.
316 fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # LOCK_NB ... non-blocking
317 except BlockingIOError:
318 msg = "Exiting as same previous periodic job is still running without completion yet per "
319 die(msg + lock_file, STILL_RUNNING_STATUS)
320 with xfinally(lambda: Path(lock_file).unlink(missing_ok=True)): # don't accumulate stale files
321 # On CTRL-C and SIGTERM, send signal to descendant processes to also terminate descendants
322 old_term_handler = signal.getsignal(signal.SIGTERM)
323 signal.signal(signal.SIGTERM, lambda sig, f: self.terminate(old_term_handler))
324 old_int_handler = signal.signal(signal.SIGINT, lambda s, f: self.terminate(old_term_handler))
325 try:
326 self.run_tasks()
327 except BaseException:
328 self.terminate(old_term_handler, except_current_process=True)
329 raise
330 finally:
331 signal.signal(signal.SIGTERM, old_term_handler) # restore original signal handler
332 signal.signal(signal.SIGINT, old_int_handler) # restore original signal handler
333 for _ in range(2 if self.max_command_line_bytes else 1):
334 self.shutdown()
335 print("", end="", file=sys.stderr)
336 sys.stderr.flush()
337 except subprocess.CalledProcessError as e:
338 log_error_on_exit(e, e.returncode)
339 raise
340 except SystemExit as e:
341 log_error_on_exit(e, e.code)
342 raise
343 except (subprocess.TimeoutExpired, UnicodeDecodeError) as e:
344 log_error_on_exit(e, DIE_STATUS)
345 raise SystemExit(DIE_STATUS) from e
346 except re.error as e:
347 log_error_on_exit(f"{e} within regex {e.pattern!r}", DIE_STATUS)
348 raise SystemExit(DIE_STATUS) from e
349 except BaseException as e:
350 log_error_on_exit(e, DIE_STATUS, exc_info=True)
351 raise SystemExit(DIE_STATUS) from e
352 finally:
353 log.info("%s", f"Log file was: {log_params.log_file}")
354 log.info("Success. Goodbye!")
355 sys.stderr.flush()
357 def run_tasks(self) -> None:
358 """Executes replication cycles, repeating until daemon lifetime expires."""
359 p, log = self.params, self.params.log
360 self.all_exceptions = []
361 self.all_exceptions_count = 0
362 self.first_exception = None
363 self.remote_conf_cache = {}
364 self.isatty = self.isatty if self.isatty is not None else p.isatty
365 self.validate_once()
366 self.replication_start_time_nanos = time.monotonic_ns()
367 self.progress_reporter = ProgressReporter(log, p.pv_program_opts, self.use_select, self.progress_update_intervals)
368 with xfinally(lambda: self.progress_reporter.stop()):
369 daemon_stoptime_nanos: int = time.monotonic_ns() + p.daemon_lifetime_nanos
370 while True: # loop for daemon mode
371 self.timeout_nanos = None if p.timeout_nanos is None else time.monotonic_ns() + p.timeout_nanos
372 self.all_dst_dataset_exists.clear()
373 self.progress_reporter.reset()
374 src, dst = p.src, p.dst
375 for src_root_dataset, dst_root_dataset in p.root_dataset_pairs:
376 src.root_dataset = src.basis_root_dataset = src_root_dataset
377 dst.root_dataset = dst.basis_root_dataset = dst_root_dataset
378 p.curr_zfs_send_program_opts = p.zfs_send_program_opts.copy()
379 if p.daemon_lifetime_nanos > 0:
380 self.timeout_nanos = None if p.timeout_nanos is None else time.monotonic_ns() + p.timeout_nanos
381 recurs_sep = " " if p.recursive_flag else ""
382 task_description = f"{src.basis_root_dataset} {p.recursive_flag}{recurs_sep}--> {dst.basis_root_dataset}"
383 if len(p.root_dataset_pairs) > 1:
384 log.info("Starting task: %s", task_description + " ...")
385 try:
386 try:
387 maybe_inject_error(self, cmd=[], error_trigger="retryable_run_tasks")
388 timeout(self)
389 self.validate_task()
390 self.run_task()
391 except RetryableError as retryable_error:
392 assert retryable_error.__cause__ is not None
393 raise retryable_error.__cause__ from None
394 except (CalledProcessError, subprocess.TimeoutExpired, SystemExit, UnicodeDecodeError) as e:
395 if p.skip_on_error == "fail" or (
396 isinstance(e, subprocess.TimeoutExpired) and p.daemon_lifetime_nanos == 0
397 ):
398 raise
399 log.error("%s", e)
400 self.append_exception(e, "task", task_description)
401 if not self.sleep_until_next_daemon_iteration(daemon_stoptime_nanos):
402 break
403 if not p.skip_replication:
404 self.print_replication_stats(self.replication_start_time_nanos)
405 error_count = self.all_exceptions_count
406 if error_count > 0 and p.daemon_lifetime_nanos == 0:
407 msgs = "\n".join([f"{i + 1}/{error_count}: {e}" for i, e in enumerate(self.all_exceptions)])
408 log.error("%s", f"Tolerated {error_count} errors. Error Summary: \n{msgs}")
409 assert self.first_exception is not None
410 raise self.first_exception
412 def append_exception(self, e: BaseException, task_name: str, task_description: str) -> None:
413 """Records and logs an exception that was encountered while running a subtask."""
414 self.first_exception = self.first_exception or e
415 if len(self.all_exceptions) < self.max_exceptions_to_summarize: # cap max memory consumption
416 self.all_exceptions.append(str(e))
417 self.all_exceptions_count += 1
418 self.params.log.error(f"#{self.all_exceptions_count}: Done with %s: %s", task_name, task_description)
420 def sleep_until_next_daemon_iteration(self, daemon_stoptime_nanos: int) -> bool:
421 """Pauses until the next scheduled snapshot time or daemon stop."""
422 sleep_nanos: int = daemon_stoptime_nanos - time.monotonic_ns()
423 if sleep_nanos <= 0:
424 return False
425 self.progress_reporter.pause()
426 p, log = self.params, self.params.log
427 config: CreateSrcSnapshotConfig = p.create_src_snapshots_config
428 curr_datetime: datetime = config.current_datetime + timedelta(microseconds=1)
429 next_snapshotting_event_dt: datetime = min(
430 (
431 round_datetime_up_to_duration_multiple(curr_datetime, duration_amount, duration_unit, config.anchors)
432 for duration_amount, duration_unit in config.suffix_durations.values()
433 ),
434 default=curr_datetime + timedelta(days=10 * 365), # infinity
435 )
436 offset: timedelta = next_snapshotting_event_dt - datetime.now(config.tz)
437 offset_nanos: int = (offset.days * 86400 + offset.seconds) * 1_000_000_000 + offset.microseconds * 1_000
438 sleep_nanos = min(sleep_nanos, max(0, offset_nanos))
439 log.info("Daemon sleeping for: %s%s", human_readable_duration(sleep_nanos), f" ... [Log {p.log_params.log_file}]")
440 time.sleep(sleep_nanos / 1_000_000_000)
441 config.current_datetime = datetime.now(config.tz)
442 return daemon_stoptime_nanos - time.monotonic_ns() > 0
444 def print_replication_stats(self, start_time_nanos: int) -> None:
445 """Logs overall replication statistics after a job cycle completes."""
446 p, log = self.params, self.params.log
447 elapsed_nanos = time.monotonic_ns() - start_time_nanos
448 msg = p.dry(f"Replicated {self.num_snapshots_replicated} snapshots in {human_readable_duration(elapsed_nanos)}.")
449 if p.is_program_available("pv", "local"):
450 sent_bytes: int = count_num_bytes_transferred_by_zfs_send(p.log_params.pv_log_file)
451 sent_bytes_per_sec: int = round(1_000_000_000 * sent_bytes / elapsed_nanos)
452 msg += f" zfs sent {human_readable_bytes(sent_bytes)} [{human_readable_bytes(sent_bytes_per_sec)}/s]."
453 log.info("%s", msg.ljust(p.terminal_columns - len("2024-01-01 23:58:45 [I] ")))
455 def validate_once(self) -> None:
456 """Validates CLI parameters and compiles regex lists one time only, which will later be reused many times."""
457 p = self.params
458 p.zfs_recv_ox_names = self.recv_option_property_names(p.zfs_recv_program_opts)
459 for snapshot_filter in p.snapshot_filters:
460 for _filter in snapshot_filter:
461 if _filter.name == SNAPSHOT_REGEX_FILTER_NAME:
462 exclude_snapshot_regexes = compile_regexes(_filter.options[0])
463 include_snapshot_regexes = compile_regexes(_filter.options[1] or [".*"])
464 _filter.options = (exclude_snapshot_regexes, include_snapshot_regexes)
466 exclude_regexes: list[str] = [EXCLUDE_DATASET_REGEXES_DEFAULT]
467 if len(p.args.exclude_dataset_regex) > 0: # some patterns don't exclude anything
468 exclude_regexes = [regex for regex in p.args.exclude_dataset_regex if regex != "" and regex != "!.*"]
469 include_regexes: list[str] = p.args.include_dataset_regex
471 # relative datasets need not be compiled more than once as they don't change between tasks
472 def separate_abs_vs_rel_datasets(datasets: list[str]) -> tuple[list[str], list[str]]:
473 abs_datasets: list[str] = []
474 rel_datasets: list[str] = []
475 for dataset in datasets:
476 (abs_datasets if dataset.startswith("/") else rel_datasets).append(dataset)
477 return abs_datasets, rel_datasets
479 p.abs_exclude_datasets, rel_exclude_datasets = separate_abs_vs_rel_datasets(p.args.exclude_dataset)
480 p.abs_include_datasets, rel_include_datasets = separate_abs_vs_rel_datasets(p.args.include_dataset)
481 suffix = DESCENDANTS_RE_SUFFIX
482 p.tmp_exclude_dataset_regexes, p.tmp_include_dataset_regexes = (
483 compile_regexes(exclude_regexes + dataset_regexes(p.src, p.dst, rel_exclude_datasets), suffix=suffix),
484 compile_regexes(include_regexes + dataset_regexes(p.src, p.dst, rel_include_datasets), suffix=suffix),
485 )
487 if p.pv_program != DISABLE_PRG:
488 pv_program_opts_set = set(p.pv_program_opts)
489 if pv_program_opts_set.isdisjoint({"--bytes", "-b", "--bits", "-8"}):
490 die("--pv-program-opts must contain one of --bytes or --bits for progress metrics to function.")
491 if self.isatty and not p.quiet:
492 for opts in [["--eta", "-e"], ["--fineta", "-I"], ["--average-rate", "-a"]]:
493 if pv_program_opts_set.isdisjoint(opts):
494 die(f"--pv-program-opts must contain one of {', '.join(opts)} for progress report line to function.")
496 src, dst = p.src, p.dst
497 for remote in [src, dst]:
498 r, loc = remote, remote.location
499 validate_user_name(r.basis_ssh_user, f"--ssh-{loc}-user")
500 validate_host_name(r.basis_ssh_host, f"--ssh-{loc}-host")
501 validate_port(r.ssh_port, f"--ssh-{loc}-port ")
503 def validate_task(self) -> None:
504 """Validates a single replication task before execution."""
505 p, log = self.params, self.params.log
506 src, dst = p.src, p.dst
507 for remote in [src, dst]:
508 r = remote
509 r.ssh_user, r.ssh_host, r.ssh_user_host, r.pool, r.root_dataset = parse_dataset_locator(
510 r.basis_root_dataset, user=r.basis_ssh_user, host=r.basis_ssh_host, port=r.ssh_port
511 )
512 r.sudo, r.use_zfs_delegation = self.sudo_cmd(r.ssh_user_host, r.ssh_user)
513 local_addrs = ("",) if self.is_test_mode else ("", "127.0.0.1", "::1") # ::1 is IPv6 version of loopback address
514 remote.is_nonlocal = r.ssh_host not in local_addrs
515 self.dst_dataset_exists = SynchronizedDict(self.all_dst_dataset_exists[dst.ssh_user_host])
517 if src.ssh_host == dst.ssh_host:
518 msg = f"src: {src.basis_root_dataset}, dst: {dst.basis_root_dataset}"
519 if src.root_dataset == dst.root_dataset:
520 die(f"Source and destination dataset must not be the same! {msg}")
521 if p.recursive and (
522 is_descendant(src.root_dataset, of_root_dataset=dst.root_dataset)
523 or is_descendant(dst.root_dataset, of_root_dataset=src.root_dataset)
524 ):
525 die(f"Source and destination dataset trees must not overlap! {msg}")
527 suffx = DESCENDANTS_RE_SUFFIX # also match descendants of a matching dataset
528 p.exclude_dataset_regexes, p.include_dataset_regexes = (
529 p.tmp_exclude_dataset_regexes + compile_regexes(dataset_regexes(src, dst, p.abs_exclude_datasets), suffix=suffx),
530 p.tmp_include_dataset_regexes + compile_regexes(dataset_regexes(src, dst, p.abs_include_datasets), suffix=suffx),
531 )
532 if len(p.include_dataset_regexes) == 0:
533 p.include_dataset_regexes = [(re.compile(r".*"), False)]
535 detect_available_programs(self)
537 zfs_send_program_opts: list[str] = p.curr_zfs_send_program_opts
538 if is_zpool_feature_enabled_or_active(p, dst, "feature@large_blocks"):
539 append_if_absent(zfs_send_program_opts, "--large-block") # solaris-11.4 does not have this feature
540 if is_solaris_zfs(p, dst):
541 p.dry_run_destroy = "" # solaris-11.4 knows no 'zfs destroy -n' flag
542 p.verbose_destroy = "" # solaris-11.4 knows no 'zfs destroy -v' flag
543 if is_solaris_zfs(p, src): # solaris-11.4 only knows -w compress
544 zfs_send_program_opts = ["-p" if opt == "--props" else opt for opt in zfs_send_program_opts]
545 zfs_send_program_opts = fix_solaris_raw_mode(zfs_send_program_opts)
546 p.curr_zfs_send_program_opts = zfs_send_program_opts
548 self.max_workers = {}
549 self.max_datasets_per_minibatch_on_list_snaps = {}
550 for r in [src, dst]:
551 cpus = int(p.available_programs[r.location].get("getconf_cpu_count", 8))
552 threads, is_percent = p.threads
553 cpus = max(1, round(cpus * threads / 100.0) if is_percent else round(threads))
554 self.max_workers[r.location] = cpus
555 bs = max(1, p.max_datasets_per_batch_on_list_snaps) # 1024 by default
556 max_datasets_per_minibatch = p.max_datasets_per_minibatch_on_list_snaps
557 if max_datasets_per_minibatch <= 0:
558 max_datasets_per_minibatch = max(1, bs // cpus)
559 max_datasets_per_minibatch = min(bs, max_datasets_per_minibatch)
560 self.max_datasets_per_minibatch_on_list_snaps[r.location] = max_datasets_per_minibatch
561 log.log(
562 LOG_TRACE,
563 "%s",
564 f"max_datasets_per_batch_on_list_snaps: {p.max_datasets_per_batch_on_list_snaps}, "
565 f"max_datasets_per_minibatch_on_list_snaps: {max_datasets_per_minibatch}, "
566 f"max_workers: {self.max_workers[r.location]}, "
567 f"location: {r.location}",
568 )
569 if self.is_test_mode:
570 log.log(LOG_TRACE, "Validated Param values: %s", pretty_print_formatter(self.params))
572 def sudo_cmd(self, ssh_user_host: str, ssh_user: str) -> tuple[str, bool]:
573 """Returns sudo command prefix and whether root privileges are required."""
574 p: Params = self.params
575 assert isinstance(ssh_user_host, str)
576 assert isinstance(ssh_user, str)
577 assert isinstance(p.sudo_program, str)
578 assert isinstance(p.enable_privilege_elevation, bool)
580 is_root: bool = True
581 if ssh_user_host != "":
582 if ssh_user == "":
583 if os.geteuid() != 0:
584 is_root = False
585 elif ssh_user != "root":
586 is_root = False
587 elif os.geteuid() != 0:
588 is_root = False
590 if is_root:
591 sudo = "" # using sudo in an attempt to make ZFS operations work even if we are not root user?
592 use_zfs_delegation = False # or instead using 'zfs allow' delegation?
593 return sudo, use_zfs_delegation
594 elif p.enable_privilege_elevation:
595 if p.sudo_program == DISABLE_PRG:
596 die(f"sudo CLI is not available on host: {ssh_user_host or 'localhost'}")
597 # The '-n' option makes 'sudo' safer and more fail-fast. It avoids having sudo prompt the user for input of any
598 # kind. If a password is required for the sudo command to run, sudo will display an error message and exit.
599 return p.sudo_program + " -n", False
600 else:
601 return "", True
603 def run_task(self) -> None:
604 """Replicates all snapshots for the current root dataset pair."""
606 def filter_src_datasets() -> list[str]: # apply --{include|exclude}-dataset policy
607 return filter_datasets(self, src, basis_src_datasets) if src_datasets is None else src_datasets
609 p, log = self.params, self.params.log
610 src, dst = p.src, p.dst
611 max_workers: int = min(self.max_workers[src.location], self.max_workers[dst.location])
612 recursive_sep: str = " " if p.recursive_flag else ""
613 task_description: str = f"{src.basis_root_dataset} {p.recursive_flag}{recursive_sep}--> {dst.basis_root_dataset} ..."
614 failed: bool = False
615 src_datasets: list[str] | None = None
616 basis_src_datasets: list[str] = []
617 self.src_properties = {}
618 self.dst_properties = {}
619 if not is_dummy(src): # find src dataset or all datasets in src dataset tree (with --recursive)
620 basis_src_datasets = self.list_src_datasets_task()
622 if not p.create_src_snapshots_config.skip_create_src_snapshots:
623 log.info(p.dry("--create-src-snapshots: %s"), f"{src.basis_root_dataset} {p.recursive_flag}{recursive_sep}...")
624 src_datasets = filter_src_datasets() # apply include/exclude policy
625 self.create_src_snapshots_task(basis_src_datasets, src_datasets)
627 # Optionally, replicate src.root_dataset (optionally including its descendants) to dst.root_dataset
628 if not p.skip_replication:
629 if len(basis_src_datasets) == 0:
630 die(f"Replication: Source dataset does not exist: {src.basis_root_dataset}")
631 if is_dummy(dst):
632 die("Replication: Destination may be a dummy dataset only if exclusively creating snapshots on the source!")
633 src_datasets = filter_src_datasets() # apply include/exclude policy
634 failed = self.replicate_datasets(src_datasets, task_description, max_workers)
636 if failed or not (
637 p.delete_dst_datasets
638 or p.delete_dst_snapshots
639 or p.delete_empty_dst_datasets
640 or p.compare_snapshot_lists
641 or p.monitor_snapshots_config.enable_monitor_snapshots
642 ):
643 return
644 log.info("Listing dst datasets: %s", task_description)
645 if is_dummy(dst):
646 die("Destination may be a dummy dataset only if exclusively creating snapshots on the source!")
647 basis_dst_datasets: list[str] = self.list_dst_datasets_task()
648 dst_datasets: list[str] = filter_datasets(self, dst, basis_dst_datasets) # apply include/exclude policy
650 if p.delete_dst_datasets and not failed:
651 log.info(p.dry("--delete-dst-datasets: %s"), task_description)
652 basis_dst_datasets, dst_datasets = self.delete_dst_datasets_task(
653 basis_src_datasets, basis_dst_datasets, dst_datasets
654 )
656 if p.delete_dst_snapshots and not failed:
657 log.info(p.dry("--delete-dst-snapshots: %s"), task_description + f" [{len(dst_datasets)} datasets]")
658 failed = self.delete_destination_snapshots_task(basis_src_datasets, dst_datasets, max_workers, task_description)
660 if p.delete_empty_dst_datasets and p.recursive and not failed:
661 log.info(p.dry("--delete-empty-dst-datasets: %s"), task_description)
662 dst_datasets = self.delete_empty_dst_datasets_task(basis_dst_datasets, dst_datasets)
664 if p.compare_snapshot_lists and not failed:
665 log.info("--compare-snapshot-lists: %s", task_description)
666 if len(basis_src_datasets) == 0 and not is_dummy(src):
667 die(f"Source dataset does not exist: {src.basis_root_dataset}")
668 src_datasets = filter_src_datasets() # apply include/exclude policy
669 run_compare_snapshot_lists(self, src_datasets, dst_datasets)
671 if p.monitor_snapshots_config.enable_monitor_snapshots and not failed:
672 log.info("--monitor-snapshots: %s", task_description)
673 src_datasets = filter_src_datasets() # apply include/exclude policy
674 self.monitor_snapshots_task(src_datasets, dst_datasets, task_description)
676 def list_src_datasets_task(self) -> list[str]:
677 """Lists datasets on the source host."""
678 p = self.params
679 src = p.src
680 basis_src_datasets: list[str] = []
681 is_caching: bool = is_caching_snapshots(p, src)
682 props: str = "volblocksize,recordsize,name"
683 props = "snapshots_changed," + props if is_caching else props
684 cmd: list[str] = p.split_args(
685 f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o {props} {p.recursive_flag}", src.root_dataset
686 )
687 for line in (try_ssh_command(self, src, LOG_DEBUG, cmd=cmd) or "").splitlines():
688 cols: list[str] = line.split("\t")
689 snapshots_changed, volblocksize, recordsize, src_dataset = cols if is_caching else ["-"] + cols
690 self.src_properties[src_dataset] = DatasetProperties(
691 recordsize=int(recordsize) if recordsize != "-" else -int(volblocksize),
692 snapshots_changed=int(snapshots_changed) if snapshots_changed and snapshots_changed != "-" else 0,
693 )
694 basis_src_datasets.append(src_dataset)
695 assert (not self.is_test_mode) or basis_src_datasets == sorted(basis_src_datasets), "List is not sorted"
696 return basis_src_datasets
698 def list_dst_datasets_task(self) -> list[str]:
699 """Lists datasets on the destination host."""
700 p, log = self.params, self.params.log
701 dst = p.dst
702 is_caching: bool = is_caching_snapshots(p, dst) and p.monitor_snapshots_config.enable_monitor_snapshots
703 props: str = "name"
704 props = "snapshots_changed," + props if is_caching else props
705 cmd: list[str] = p.split_args(
706 f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o {props} {p.recursive_flag}", dst.root_dataset
707 )
708 basis_dst_datasets: list[str] = []
709 basis_dst_datasets_str: str | None = try_ssh_command(self, dst, LOG_TRACE, cmd=cmd)
710 if basis_dst_datasets_str is None:
711 log.warning("Destination dataset does not exist: %s", dst.root_dataset)
712 else:
713 for line in basis_dst_datasets_str.splitlines():
714 cols: list[str] = line.split("\t")
715 snapshots_changed, dst_dataset = cols if is_caching else ["-"] + cols
716 self.dst_properties[dst_dataset] = DatasetProperties(
717 recordsize=0,
718 snapshots_changed=int(snapshots_changed) if snapshots_changed and snapshots_changed != "-" else 0,
719 )
720 basis_dst_datasets.append(dst_dataset)
721 assert (not self.is_test_mode) or basis_dst_datasets == sorted(basis_dst_datasets), "List is not sorted"
722 return basis_dst_datasets
724 def create_src_snapshots_task(self, basis_src_datasets: list[str], src_datasets: list[str]) -> None:
725 """Atomically creates a new snapshot of the src datasets selected by --{include|exclude}-dataset* policy; implements
726 --create-src-snapshots.
728 The implementation attempts to fit as many datasets as possible into a single (atomic) 'zfs snapshot' command line,
729 using lexicographical sort order, and using 'zfs snapshot -r' to the extent that this is compatible with the
730 --{include|exclude}-dataset* pruning policy. The snapshots of all datasets that fit within the same single 'zfs
731 snapshot' CLI invocation will be taken within the same ZFS transaction group, and correspondingly have identical
732 'createtxg' ZFS property (but not necessarily identical 'creation' ZFS time property as ZFS actually provides no such
733 guarantee), and thus be consistent. Dataset names that can't fit into a single command line are spread over multiple
734 command line invocations, respecting the limits that the operating system places on the maximum length of a single
735 command line, per `getconf ARG_MAX`.
736 """
737 p, log = self.params, self.params.log
738 src = p.src
739 if len(basis_src_datasets) == 0:
740 die(f"Source dataset does not exist: {src.basis_root_dataset}")
741 datasets_to_snapshot: dict[SnapshotLabel, list[str]] = self.find_datasets_to_snapshot(src_datasets)
742 datasets_to_snapshot = {label: datasets for label, datasets in datasets_to_snapshot.items() if len(datasets) > 0}
743 basis_datasets_to_snapshot: dict[SnapshotLabel, list[str]] = datasets_to_snapshot.copy() # shallow copy
744 commands: dict[SnapshotLabel, list[str]] = {}
745 for label, datasets in datasets_to_snapshot.items():
746 cmd: list[str] = p.split_args(f"{src.sudo} {p.zfs_program} snapshot")
747 if p.recursive:
748 # Run 'zfs snapshot -r' on the roots of subtrees if possible, else fallback to non-recursive CLI flavor
749 root_datasets = self.root_datasets_if_recursive_zfs_snapshot_is_possible(datasets, basis_src_datasets)
750 if root_datasets is not None:
751 cmd.append("-r") # recursive; takes a snapshot of all datasets in the subtree(s)
752 datasets_to_snapshot[label] = root_datasets
753 commands[label] = cmd
754 creation_msg = f"Creating {sum(len(datasets) for datasets in basis_datasets_to_snapshot.values())} snapshots"
755 log.info(p.dry("--create-src-snapshots: %s"), f"{creation_msg} within {len(src_datasets)} datasets ...")
756 # create snapshots in large (parallel) batches, without using a command line that's too big for the OS to handle
757 run_ssh_cmd_parallel(
758 self,
759 src,
760 [(commands[lbl], [f"{ds}@{lbl}" for ds in datasets]) for lbl, datasets in datasets_to_snapshot.items()],
761 fn=lambda cmd, batch: run_ssh_command(self, src, is_dry=p.dry_run, print_stdout=True, cmd=cmd + batch),
762 max_batch_items=1 if is_solaris_zfs(p, src) else 2**29, # solaris CLI doesn't accept multiple datasets
763 )
764 # perf: copy lastmodified time of source dataset into local cache to reduce future 'zfs list -t snapshot' calls
765 self.cache.update_last_modified_cache(basis_datasets_to_snapshot)
767 def delete_destination_snapshots_task(
768 self, basis_src_datasets: list[str], dst_datasets: list[str], max_workers: int, task_description: str
769 ) -> bool:
770 """Deletes existing destination snapshots that do not exist within the source dataset if they are included by the
771 --{include|exclude}-snapshot-* policy, and the destination dataset is included via --{include|exclude}-dataset*
772 policy; implements --delete-dst-snapshots."""
773 p, log = self.params, self.params.log
774 src, dst = p.src, p.dst
775 kind: str = "bookmark" if p.delete_dst_bookmarks else "snapshot"
776 filter_needs_creation_time: bool = has_timerange_filter(p.snapshot_filters)
777 props: str = self.creation_prefix + "creation,guid,name" if filter_needs_creation_time else "guid,name"
778 basis_src_datasets_set: set[str] = set(basis_src_datasets)
779 num_snapshots_found, num_snapshots_deleted = 0, 0
781 def delete_destination_snapshots(dst_dataset: str, tid: str, retry: Retry) -> bool: # thread-safe
782 src_dataset: str = replace_prefix(dst_dataset, old_prefix=dst.root_dataset, new_prefix=src.root_dataset)
783 if src_dataset in basis_src_datasets_set and (are_bookmarks_enabled(p, src) or not p.delete_dst_bookmarks):
784 src_kind: str = kind
785 if not p.delete_dst_snapshots_no_crosscheck:
786 src_kind = "snapshot,bookmark" if are_bookmarks_enabled(p, src) else "snapshot"
787 src_cmd = p.split_args(f"{p.zfs_program} list -t {src_kind} -d 1 -s name -Hp -o guid", src_dataset)
788 else:
789 src_cmd = None
790 dst_cmd = p.split_args(f"{p.zfs_program} list -t {kind} -d 1 -s createtxg -Hp -o {props}", dst_dataset)
791 self.maybe_inject_delete(dst, dataset=dst_dataset, delete_trigger="zfs_list_delete_dst_snapshots")
792 src_snaps_with_guids, dst_snaps_with_guids_str = run_in_parallel( # list src+dst snapshots in parallel
793 lambda: set(run_ssh_command(self, src, LOG_TRACE, cmd=src_cmd).splitlines() if src_cmd else []),
794 lambda: try_ssh_command(self, dst, LOG_TRACE, cmd=dst_cmd),
795 )
796 if dst_snaps_with_guids_str is None:
797 log.warning("Third party deleted destination: %s", dst_dataset)
798 return False
799 dst_snaps_with_guids: list[str] = dst_snaps_with_guids_str.splitlines()
800 num_dst_snaps_with_guids = len(dst_snaps_with_guids)
801 basis_dst_snaps_with_guids: list[str] = dst_snaps_with_guids.copy()
802 if p.delete_dst_bookmarks:
803 replace_in_lines(dst_snaps_with_guids, old="#", new="@", count=1) # treat bookmarks as snapshots
804 # The check against the source dataset happens *after* filtering the dst snapshots with filter_snapshots().
805 # `p.delete_dst_snapshots_except` means the user wants to specify snapshots to *retain* aka *keep*
806 all_except: bool = p.delete_dst_snapshots_except
807 if p.delete_dst_snapshots_except and not is_dummy(src):
808 # However, as here we are in "except" mode AND the source is NOT a dummy, we first filter to get what
809 # the policy says to *keep* (so all_except=False for the filter_snapshots() call), then from that "keep"
810 # list, we later further refine by checking what's on the source dataset.
811 all_except = False
812 dst_snaps_with_guids = filter_snapshots(self, dst_snaps_with_guids, all_except=all_except)
813 if p.delete_dst_bookmarks:
814 replace_in_lines(dst_snaps_with_guids, old="@", new="#", count=1) # restore pre-filtering bookmark state
815 if filter_needs_creation_time:
816 dst_snaps_with_guids = cut(field=2, lines=dst_snaps_with_guids)
817 basis_dst_snaps_with_guids = cut(field=2, lines=basis_dst_snaps_with_guids)
818 if p.delete_dst_snapshots_except and not is_dummy(src): # Non-dummy Source + "Except" (Keep) Mode
819 # Retain dst snapshots that match snapshot filter policy AND are on src dataset, aka
820 # Delete dst snapshots except snapshots that match snapshot filter policy AND are on src dataset.
821 # Concretely, `dst_snaps_with_guids` contains GUIDs of DST snapshots that the filter policy says to KEEP.
822 # We only actually keep them if they are ALSO on the SRC.
823 # So, snapshots to DELETE (`dst_tags_to_delete`) are ALL snapshots on DST (`basis_dst_snaps_with_guids`)
824 # EXCEPT those whose GUIDs are in `dst_snaps_with_guids` AND ALSO in `src_snaps_with_guids`.
825 except_dst_guids: set[str] = set(cut(field=1, lines=dst_snaps_with_guids)).intersection(src_snaps_with_guids)
826 dst_tags_to_delete: list[str] = filter_lines_except(basis_dst_snaps_with_guids, except_dst_guids)
827 else: # Standard Delete Mode OR Dummy Source + "Except" (Keep) Mode
828 # In standard delete mode:
829 # `dst_snaps_with_guids` contains GUIDs of policy-selected snapshots on DST.
830 # We delete those that are NOT on SRC.
831 # `dst_tags_to_delete` = `dst_snaps_with_guids` - `src_snaps_with_guids`.
832 # In dummy source + "except" (keep) mode:
833 # `all_except` was True.
834 # `dst_snaps_with_guids` contains snaps NOT matching the "keep" policy -- these are the ones to delete.
835 # `src_snaps_with_guids` is empty.
836 # `dst_tags_to_delete` = `dst_snaps_with_guids` - {} = `dst_snaps_with_guids`.
837 dst_guids_to_delete = set(cut(field=1, lines=dst_snaps_with_guids)).difference(src_snaps_with_guids)
838 dst_tags_to_delete = filter_lines(dst_snaps_with_guids, dst_guids_to_delete)
839 separator: str = "#" if p.delete_dst_bookmarks else "@"
840 dst_tags_to_delete = cut(field=2, separator=separator, lines=dst_tags_to_delete)
841 if p.delete_dst_bookmarks:
842 delete_bookmarks(self, dst, dst_dataset, snapshot_tags=dst_tags_to_delete)
843 else:
844 delete_snapshots(self, dst, dst_dataset, snapshot_tags=dst_tags_to_delete)
845 with self.stats_lock:
846 nonlocal num_snapshots_found
847 num_snapshots_found += num_dst_snaps_with_guids
848 nonlocal num_snapshots_deleted
849 num_snapshots_deleted += len(dst_tags_to_delete)
850 if len(dst_tags_to_delete) > 0 and not p.delete_dst_bookmarks:
851 self.dst_properties[dst_dataset].snapshots_changed = 0 # invalidate cache
852 return True
854 # Run delete_destination_snapshots(dataset) for each dataset, while handling errors, retries + parallel exec
855 failed: bool = False
856 if are_bookmarks_enabled(p, dst) or not p.delete_dst_bookmarks:
857 start_time_nanos = time.monotonic_ns()
858 failed = process_datasets_in_parallel_and_fault_tolerant(
859 log=log,
860 datasets=dst_datasets,
861 process_dataset=delete_destination_snapshots, # lambda
862 skip_tree_on_error=lambda dataset: False,
863 skip_on_error=p.skip_on_error,
864 max_workers=max_workers,
865 enable_barriers=False,
866 task_name="--delete-dst-snapshots",
867 append_exception=self.append_exception,
868 retry_policy=p.retry_policy,
869 dry_run=p.dry_run,
870 is_test_mode=self.is_test_mode,
871 )
872 elapsed_nanos = time.monotonic_ns() - start_time_nanos
873 log.info(
874 p.dry("--delete-dst-snapshots: %s"),
875 task_description + f" [Deleted {num_snapshots_deleted} out of {num_snapshots_found} {kind}s "
876 f"within {len(dst_datasets)} datasets; took {human_readable_duration(elapsed_nanos)}]",
877 )
878 return failed
880 def delete_dst_datasets_task(
881 self, basis_src_datasets: list[str], basis_dst_datasets: list[str], sorted_dst_datasets: list[str]
882 ) -> tuple[list[str], list[str]]:
883 """Deletes existing destination datasets that do not exist within the source dataset if they are included via
884 --{include|exclude}-dataset* policy; implements --delete-dst-datasets.
886 Do not recurse without --recursive. With --recursive, never delete non-selected dataset subtrees or their ancestors.
887 """
888 p = self.params
889 src, dst = p.src, p.dst
890 children: dict[str, set[str]] = defaultdict(set)
891 for dst_dataset in basis_dst_datasets: # Compute the direct children of each NON-FILTERED dataset
892 parent: str = os.path.dirname(dst_dataset)
893 children[parent].add(dst_dataset)
894 to_delete: set[str] = set()
895 for dst_dataset in reversed(sorted_dst_datasets):
896 if children[dst_dataset].issubset(to_delete):
897 to_delete.add(dst_dataset) # all children are deletable, thus the dataset itself is deletable too
898 to_delete = to_delete.difference(
899 {replace_prefix(src_dataset, src.root_dataset, dst.root_dataset) for src_dataset in basis_src_datasets}
900 )
901 delete_datasets(self, dst, to_delete)
902 sorted_dst_datasets = sorted(set(sorted_dst_datasets).difference(to_delete))
903 basis_dst_datasets = sorted(set(basis_dst_datasets).difference(to_delete))
904 return basis_dst_datasets, sorted_dst_datasets
906 def delete_empty_dst_datasets_task(self, basis_dst_datasets: list[str], sorted_dst_datasets: list[str]) -> list[str]:
907 """Deletes any existing destination dataset that has no snapshot and no bookmark if all descendants of that dataset
908 do not have a snapshot or bookmark either; implements --delete-empty-dst-datasets.
910 To do so, we walk the dataset list (conceptually, a tree) depth-first (i.e. sorted descending). If a dst dataset has
911 zero snapshots and zero bookmarks and all its children are already marked as orphans, then it is itself an orphan,
912 and we mark it as such. Walking in a reverse sorted way means that we efficiently check for zero snapshots/bookmarks
913 not just over the direct children but the entire tree. Finally, delete all orphan datasets in an efficient batched
914 way.
915 """
916 p = self.params
917 dst = p.dst
918 delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots: bool = (
919 p.delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots and are_bookmarks_enabled(p, dst)
920 )
922 # Compute the direct children of each NON-FILTERED dataset. Thus, no non-selected dataset and no ancestor of a
923 # non-selected dataset will ever be added to the "orphan" set. In other words, this treats non-selected dataset
924 # subtrees as if they all had snapshots, so non-selected dataset subtrees and their ancestors are guaranteed
925 # to not get deleted.
926 children: dict[str, set[str]] = defaultdict(set)
927 for dst_dataset in basis_dst_datasets:
928 parent: str = os.path.dirname(dst_dataset)
929 children[parent].add(dst_dataset)
931 # Find and mark orphan datasets, finally delete them in an efficient way. Using two filter runs instead of one
932 # filter run is an optimization. The first run only computes candidate orphans, without incurring I/O, to reduce
933 # the list of datasets for which we list snapshots via 'zfs list -t snapshot ...' from dst_datasets to a subset
934 # of dst_datasets, which in turn reduces I/O and improves perf. Essentially, this eliminates the I/O to list
935 # snapshots for ancestors of excluded datasets. The second run computes the real orphans.
936 btype: str = "bookmark,snapshot" if delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots else "snapshot"
937 dst_datasets_having_snapshots: set[str] = set()
938 for run in range(2):
939 orphans: set[str] = set()
940 for dst_dataset in reversed(sorted_dst_datasets):
941 if children[dst_dataset].issubset(orphans):
942 # all children turned out to be orphans, thus the dataset itself could be an orphan
943 if dst_dataset not in dst_datasets_having_snapshots: # always True during first filter run
944 orphans.add(dst_dataset)
945 if run == 0:
946 # find datasets with >= 1 snapshot; update dst_datasets_having_snapshots for real use in the 2nd run
947 cmd: list[str] = p.split_args(f"{p.zfs_program} list -t {btype} -d 1 -S name -Hp -o name")
948 for datasets_having_snapshots_lst in zfs_list_snapshots_in_parallel(
949 self, dst, cmd, sorted(orphans), ordered=False
950 ):
951 if delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots:
952 replace_in_lines(datasets_having_snapshots_lst, old="#", new="@", count=1) # treat bookmarks as snap
953 datasets_having_snapshots = set(cut(field=1, separator="@", lines=datasets_having_snapshots_lst))
954 dst_datasets_having_snapshots.update(datasets_having_snapshots) # union
955 else:
956 delete_datasets(self, dst, orphans)
957 sorted_dst_datasets = sorted(set(sorted_dst_datasets).difference(orphans))
958 return sorted_dst_datasets
960 def monitor_snapshots_task(
961 self, sorted_src_datasets: list[str], sorted_dst_datasets: list[str], task_description: str
962 ) -> None:
963 """Monitors src and dst snapshots; implements --monitor-snapshots."""
964 p, log = self.params, self.params.log
965 src, dst = p.src, p.dst
966 num_cache_hits: int = self.num_cache_hits
967 num_cache_misses: int = self.num_cache_misses
968 start_time_nanos: int = time.monotonic_ns()
969 run_in_parallel(
970 lambda: self.monitor_snapshots(dst, sorted_dst_datasets),
971 lambda: self.monitor_snapshots(src, sorted_src_datasets),
972 )
973 elapsed: str = human_readable_duration(time.monotonic_ns() - start_time_nanos)
974 if num_cache_hits != self.num_cache_hits or num_cache_misses != self.num_cache_misses:
975 total: int = self.num_cache_hits + self.num_cache_misses
976 msg = f", cache hits: {percent(self.num_cache_hits, total)}, misses: {percent(self.num_cache_misses, total)}"
977 else:
978 msg = ""
979 log.info(
980 "--monitor-snapshots done: %s",
981 f"{task_description} [{len(sorted_src_datasets) + len(sorted_dst_datasets)} datasets; took {elapsed}{msg}]",
982 )
984 def monitor_snapshots(self, remote: Remote, sorted_datasets: list[str]) -> None:
985 """Checks snapshot freshness and warns or errors out when limits are exceeded.
987 Alerts the user if the ZFS 'creation' time property of the latest or oldest snapshot for any specified snapshot name
988 pattern within the selected datasets is too old wrt. the specified age limit. The purpose is to check if snapshots
989 are successfully taken on schedule, successfully replicated on schedule, and successfully pruned on schedule. Process
990 exit code is 0, 1, 2 on OK, WARNING, CRITICAL, respectively.
991 """
992 p, log = self.params, self.params.log
993 alerts: list[MonitorSnapshotAlert] = p.monitor_snapshots_config.alerts
994 labels: list[SnapshotLabel] = [alert.label for alert in alerts]
995 current_unixtime_millis: float = p.create_src_snapshots_config.current_datetime.timestamp() * 1000
996 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
997 if is_caching_snapshots(p, remote):
998 props: dict[str, DatasetProperties] = self.dst_properties if remote is p.dst else self.src_properties
999 snapshots_changed_dict: dict[str, int] = {dataset: vals.snapshots_changed for dataset, vals in props.items()}
1000 hash_code: str = hashlib.sha256(str(tuple(alerts)).encode("utf-8")).hexdigest()
1001 is_caching: bool = False
1003 def monitor_last_modified_cache_file(r: Remote, dataset: str, label: SnapshotLabel, alert_cfg: AlertConfig) -> str:
1004 cache_label = SnapshotLabel(os_path_join("===", alert_cfg.kind, str(label), hash_code), "", "", "")
1005 return self.cache.last_modified_cache_file(r, dataset, cache_label)
1007 def alert_msg(
1008 kind: str, dataset: str, snapshot: str, label: SnapshotLabel, snapshot_age_millis: float, delta_millis: int
1009 ) -> str:
1010 assert kind == "Latest" or kind == "Oldest"
1011 lbl = f"{label.prefix}{label.infix}<timestamp>{label.suffix}"
1012 if snapshot_age_millis >= current_unixtime_millis:
1013 return f"No snapshot exists for {dataset}@{lbl}"
1014 msg = f"{kind} snapshot for {dataset}@{lbl} is {human_readable_duration(snapshot_age_millis, unit='ms')} old"
1015 s = f" ({snapshot})" if snapshot else ""
1016 if delta_millis == -1:
1017 return f"{msg}{s}"
1018 return f"{msg} but should be at most {human_readable_duration(delta_millis, unit='ms')} old{s}"
1020 def check_alert(
1021 label: SnapshotLabel,
1022 alert_cfg: AlertConfig | None,
1023 creation_unixtime_secs: int,
1024 dataset: str,
1025 snapshot: str,
1026 ) -> None:
1027 if alert_cfg is None:
1028 return
1029 if is_caching and not p.dry_run: # update cache with latest state from 'zfs list -t snapshot'
1030 snapshots_changed: int = snapshots_changed_dict.get(dataset, 0)
1031 cache_file: str = monitor_last_modified_cache_file(remote, dataset, label, alert_cfg)
1032 set_last_modification_time_safe(cache_file, unixtime_in_secs=(creation_unixtime_secs, snapshots_changed))
1033 warning_millis: int = alert_cfg.warning_millis
1034 critical_millis: int = alert_cfg.critical_millis
1035 alert_kind = alert_cfg.kind
1036 snapshot_age_millis: float = current_unixtime_millis - creation_unixtime_secs * 1000
1037 m = "--monitor_snapshots: "
1038 if snapshot_age_millis > critical_millis:
1039 msg = m + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, critical_millis)
1040 log.critical("%s", msg)
1041 if not p.monitor_snapshots_config.dont_crit:
1042 die(msg, exit_code=CRITICAL_STATUS)
1043 elif snapshot_age_millis > warning_millis:
1044 msg = m + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, warning_millis)
1045 log.warning("%s", msg)
1046 if not p.monitor_snapshots_config.dont_warn:
1047 die(msg, exit_code=WARNING_STATUS)
1048 elif is_debug:
1049 msg = m + "OK. " + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, delta_millis=-1)
1050 log.debug("%s", msg)
1052 def alert_latest_snapshot(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None:
1053 alert: MonitorSnapshotAlert = alerts[i]
1054 check_alert(alert.label, alert.latest, creation_unixtime_secs, dataset, snapshot)
1056 def alert_oldest_snapshot(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None:
1057 alert: MonitorSnapshotAlert = alerts[i]
1058 check_alert(alert.label, alert.oldest, creation_unixtime_secs, dataset, snapshot)
1060 def find_stale_datasets_and_check_alerts() -> list[str]:
1061 """If the cache is enabled, check which datasets have changed to determine which datasets can be skipped cheaply,
1062 that is, without incurring 'zfs list -t snapshots'.
1064 This is done by comparing the "snapshots_changed" ZFS dataset property with the local cache. See
1065 https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed
1066 """
1067 stale_datasets: list[str] = []
1068 time_threshold: float = time.time() - TIME_THRESHOLD_SECS
1069 for dataset in sorted_datasets:
1070 is_stale_dataset: bool = False
1071 snapshots_changed: int = snapshots_changed_dict.get(dataset, 0)
1072 for alert in alerts:
1073 for cfg in (alert.latest, alert.oldest):
1074 if cfg is None:
1075 continue
1076 if (
1077 snapshots_changed != 0
1078 and snapshots_changed < time_threshold
1079 and (
1080 cached_unix_times := self.cache.get_snapshots_changed2(
1081 monitor_last_modified_cache_file(remote, dataset, alert.label, cfg)
1082 )
1083 )
1084 and snapshots_changed == cached_unix_times[1] # cached snapshots_changed aka last modified time
1085 and snapshots_changed >= cached_unix_times[0] # creation time of minmax snapshot aka access time
1086 ): # cached state is still valid; emit an alert if the latest/oldest snapshot is too old
1087 lbl = alert.label
1088 check_alert(lbl, cfg, creation_unixtime_secs=cached_unix_times[0], dataset=dataset, snapshot="")
1089 else: # cached state is nomore valid; fallback to 'zfs list -t snapshot'
1090 is_stale_dataset = True
1091 if is_stale_dataset:
1092 stale_datasets.append(dataset)
1093 return stale_datasets
1095 # satisfy request from local cache as much as possible
1096 if is_caching_snapshots(p, remote):
1097 stale_datasets: list[str] = find_stale_datasets_and_check_alerts()
1098 with self.stats_lock:
1099 self.num_cache_misses += len(stale_datasets)
1100 self.num_cache_hits += len(sorted_datasets) - len(stale_datasets)
1101 else:
1102 stale_datasets = sorted_datasets
1104 # fallback to 'zfs list -t snapshot' for any remaining datasets, as these couldn't be satisfied from local cache
1105 is_caching = is_caching_snapshots(p, remote)
1106 datasets_without_snapshots: list[str] = self.handle_minmax_snapshots(
1107 remote, stale_datasets, labels, fn_latest=alert_latest_snapshot, fn_oldest=alert_oldest_snapshot
1108 )
1109 for dataset in datasets_without_snapshots:
1110 for i in range(len(alerts)):
1111 alert_latest_snapshot(i, creation_unixtime_secs=0, dataset=dataset, snapshot="")
1112 alert_oldest_snapshot(i, creation_unixtime_secs=0, dataset=dataset, snapshot="")
1114 def replicate_datasets(self, src_datasets: list[str], task_description: str, max_workers: int) -> bool:
1115 """Replicates a list of datasets."""
1116 assert (not self.is_test_mode) or src_datasets == sorted(src_datasets), "List is not sorted"
1117 p, log = self.params, self.params.log
1118 src, dst = p.src, p.dst
1119 self.num_snapshots_found = 0
1120 self.num_snapshots_replicated = 0
1121 # perf/latency: no need to set up a dedicated TCP connection if no parallel replication is possible
1122 self.dedicated_tcp_connection_per_zfs_send = (
1123 p.dedicated_tcp_connection_per_zfs_send
1124 and max_workers > 1
1125 and has_siblings(src_datasets) # siblings can be replicated in parallel
1126 )
1127 log.info("Starting replication task: %s", task_description + f" [{len(src_datasets)} datasets]")
1128 start_time_nanos: int = time.monotonic_ns()
1130 def src2dst(src_dataset: str) -> str:
1131 return replace_prefix(src_dataset, old_prefix=src.root_dataset, new_prefix=dst.root_dataset)
1133 def dst2src(dst_dataset: str) -> str:
1134 return replace_prefix(dst_dataset, old_prefix=dst.root_dataset, new_prefix=src.root_dataset)
1136 def find_stale_datasets() -> tuple[list[str], dict[str, str]]:
1137 """If the cache is enabled on replication, check which src datasets or dst datasets have changed to determine
1138 which datasets can be skipped cheaply, i.e. without incurring 'zfs list -t snapshots'.
1140 This is done by comparing the "snapshots_changed" ZFS dataset property with the local cache. See
1141 https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed
1142 """
1143 # First, check which src datasets have changed since the last replication to that destination
1144 cache_files: dict[str, str] = {}
1145 stale_src_datasets1: list[str] = []
1146 maybe_stale_dst_datasets: list[str] = []
1147 userhost_dir: str = p.dst.ssh_user_host # cache is only valid for identical destination username+host
1148 userhost_dir = userhost_dir if userhost_dir else "-"
1149 hash_key = tuple(tuple(f) for f in p.snapshot_filters) # cache is only valid for same --include/excl-snapshot*
1150 hash_code: str = hashlib.sha256(str(hash_key).encode("utf-8")).hexdigest()
1151 for src_dataset in src_datasets:
1152 dst_dataset: str = src2dst(src_dataset) # cache is only valid for identical destination dataset
1153 cache_label = SnapshotLabel(os.path.join("==", userhost_dir, dst_dataset, hash_code), "", "", "")
1154 cache_file: str = self.cache.last_modified_cache_file(src, src_dataset, cache_label)
1155 cache_files[src_dataset] = cache_file
1156 snapshots_changed: int = self.src_properties[src_dataset].snapshots_changed # get prop "for free"
1157 if (
1158 snapshots_changed != 0
1159 and time.time() > snapshots_changed + TIME_THRESHOLD_SECS
1160 and snapshots_changed == self.cache.get_snapshots_changed(cache_file)
1161 ):
1162 maybe_stale_dst_datasets.append(dst_dataset)
1163 else:
1164 stale_src_datasets1.append(src_dataset)
1166 # For each src dataset that hasn't changed, check if the corresponding dst dataset has changed
1167 stale_src_datasets2: list[str] = []
1168 dst_snapshots_changed_dict: dict[str, int] = self.cache.zfs_get_snapshots_changed(dst, maybe_stale_dst_datasets)
1169 for dst_dataset in maybe_stale_dst_datasets:
1170 snapshots_changed = dst_snapshots_changed_dict.get(dst_dataset, 0)
1171 cache_file = self.cache.last_modified_cache_file(dst, dst_dataset)
1172 if (
1173 snapshots_changed != 0
1174 and time.time() > snapshots_changed + TIME_THRESHOLD_SECS
1175 and snapshots_changed == self.cache.get_snapshots_changed(cache_file)
1176 ):
1177 log.info("Already up-to-date [cached]: %s", dst_dataset)
1178 else:
1179 stale_src_datasets2.append(dst2src(dst_dataset))
1180 assert (not self.is_test_mode) or stale_src_datasets1 == sorted(stale_src_datasets1), "List is not sorted"
1181 assert (not self.is_test_mode) or stale_src_datasets2 == sorted(stale_src_datasets2), "List is not sorted"
1182 stale_src_datasets = list(heapq.merge(stale_src_datasets1, stale_src_datasets2)) # merge two sorted lists
1183 assert (not self.is_test_mode) or not has_duplicates(stale_src_datasets), "List contains duplicates"
1184 return stale_src_datasets, cache_files
1186 if is_caching_snapshots(p, src):
1187 stale_src_datasets, cache_files = find_stale_datasets()
1188 num_cache_misses = len(stale_src_datasets)
1189 num_cache_hits = len(src_datasets) - len(stale_src_datasets)
1190 self.num_cache_misses += num_cache_misses
1191 self.num_cache_hits += num_cache_hits
1192 total: int = self.num_cache_hits + self.num_cache_misses
1193 cmsg = f", cache hits: {percent(self.num_cache_hits, total)}, misses: {percent(self.num_cache_misses, total)}"
1194 else:
1195 stale_src_datasets = src_datasets
1196 cache_files = {}
1197 cmsg = ""
1199 # Run replicate_dataset(dataset) for each dataset, while taking care of errors, retries + parallel execution
1200 failed: bool = process_datasets_in_parallel_and_fault_tolerant(
1201 log=log,
1202 datasets=stale_src_datasets,
1203 process_dataset=lambda src_dataset, tid, retry: replicate_dataset(self, src_dataset, tid, retry),
1204 skip_tree_on_error=lambda dataset: not self.dst_dataset_exists[src2dst(dataset)],
1205 skip_on_error=p.skip_on_error,
1206 max_workers=max_workers,
1207 enable_barriers=False,
1208 task_name="Replication",
1209 append_exception=self.append_exception,
1210 retry_policy=p.retry_policy,
1211 dry_run=p.dry_run,
1212 is_test_mode=self.is_test_mode,
1213 )
1215 if is_caching_snapshots(p, src) and not failed:
1216 # refresh "snapshots_changed" ZFS dataset property from dst
1217 stale_dst_datasets: list[str] = [src2dst(src_dataset) for src_dataset in stale_src_datasets]
1218 dst_snapshots_changed_dict: dict[str, int] = self.cache.zfs_get_snapshots_changed(dst, stale_dst_datasets)
1219 for dst_dataset in stale_dst_datasets: # update local cache
1220 dst_snapshots_changed: int = dst_snapshots_changed_dict.get(dst_dataset, 0)
1221 dst_cache_file: str = self.cache.last_modified_cache_file(dst, dst_dataset)
1222 src_dataset: str = dst2src(dst_dataset)
1223 src_snapshots_changed: int = self.src_properties[src_dataset].snapshots_changed
1224 if not p.dry_run:
1225 set_last_modification_time_safe(cache_files[src_dataset], unixtime_in_secs=src_snapshots_changed)
1226 set_last_modification_time_safe(dst_cache_file, unixtime_in_secs=dst_snapshots_changed)
1228 elapsed_nanos: int = time.monotonic_ns() - start_time_nanos
1229 log.info(
1230 p.dry("Replication done: %s"),
1231 f"{task_description} [Replicated {self.num_snapshots_replicated} out of {self.num_snapshots_found} snapshots"
1232 f" within {len(src_datasets)} datasets; took {human_readable_duration(elapsed_nanos)}{cmsg}]",
1233 )
1234 return failed
1236 def maybe_inject_delete(self, remote: Remote, dataset: str, delete_trigger: str) -> None:
1237 """For testing only; for unit tests to delete datasets during replication and test correct handling of that."""
1238 assert delete_trigger
1239 counter = self.delete_injection_triggers.get("before")
1240 if counter and decrement_injection_counter(self, counter, delete_trigger):
1241 p = self.params
1242 cmd = p.split_args(f"{remote.sudo} {p.zfs_program} destroy -r", p.force_unmount, p.force_hard, dataset or "")
1243 run_ssh_command(self, remote, LOG_DEBUG, print_stdout=True, cmd=cmd)
1245 def maybe_inject_params(self, error_trigger: str) -> None:
1246 """For testing only; for unit tests to simulate errors during replication and test correct handling of them."""
1247 assert error_trigger
1248 counter = self.error_injection_triggers.get("before")
1249 if counter and decrement_injection_counter(self, counter, error_trigger):
1250 self.inject_params = self.param_injection_triggers[error_trigger]
1251 elif error_trigger in self.param_injection_triggers:
1252 self.inject_params = {}
1254 @staticmethod
1255 def recv_option_property_names(recv_opts: list[str]) -> set[str]:
1256 """Extracts -o and -x property names that are already specified on the command line; This can be used to check for
1257 dupes because 'zfs receive' does not accept multiple -o or -x options with the same property name."""
1258 propnames = set()
1259 i = 0
1260 n = len(recv_opts)
1261 while i < n:
1262 stripped: str = recv_opts[i].strip()
1263 if stripped in ("-o", "-x"):
1264 i += 1
1265 if i == n or recv_opts[i].strip() in ("-o", "-x"):
1266 die(f"Missing value for {stripped} option in --zfs-recv-program-opt(s): {' '.join(recv_opts)}")
1267 if stripped == "-o" and "=" not in recv_opts[i]:
1268 die(f"Missing value for {stripped} name=value pair in --zfs-recv-program-opt(s): {' '.join(recv_opts)}")
1269 propname: str = recv_opts[i] if stripped == "-x" else recv_opts[i].split("=", 1)[0]
1270 validate_property_name(propname, "--zfs-recv-program-opt(s)")
1271 propnames.add(propname)
1272 i += 1
1273 return propnames
1275 def root_datasets_if_recursive_zfs_snapshot_is_possible(
1276 self, datasets: list[str], basis_datasets: list[str]
1277 ) -> list[str] | None:
1278 """Returns the root datasets within the (filtered) `datasets` list if no incompatible pruning is detected. A dataset
1279 within `datasets` is considered a root dataset if it has no parent, i.e. it is not a descendant of any dataset in
1280 `datasets`. Returns `None` if any (unfiltered) dataset in `basis_dataset` that is a descendant of at least one of the
1281 root datasets is missing in `datasets`, indicating that --include/exclude-dataset* or the snapshot schedule have
1282 pruned a dataset in a way that is incompatible with 'zfs snapshot -r' CLI semantics, thus requiring a switch to the
1283 non-recursive 'zfs snapshot snapshot1 .. snapshot N' CLI flavor.
1285 Assumes that set(datasets).issubset(set(basis_datasets)). Also assumes that datasets and basis_datasets are both
1286 sorted (and thus the output root_datasets list is sorted too), which is why this algorithm is efficient - O(N) time
1287 complexity. The impl is akin to the merge algorithm of a merge sort, adapted to our specific use case.
1288 See root_datasets_if_recursive_zfs_snapshot_is_possible_slow_but_correct() in the unit test suite for an alternative
1289 impl that's easier to grok.
1290 """
1291 datasets_set: set[str] = set(datasets)
1292 root_datasets: list[str] = self.find_root_datasets(datasets)
1293 len_root_datasets = len(root_datasets)
1294 len_basis_datasets = len(basis_datasets)
1295 i, j = 0, 0
1296 while i < len_root_datasets and j < len_basis_datasets: # walk and "merge" both sorted lists, in sync
1297 if basis_datasets[j] < root_datasets[i]: # irrelevant subtree?
1298 j += 1 # move to the next basis_src_dataset
1299 elif is_descendant(basis_datasets[j], of_root_dataset=root_datasets[i]): # relevant subtree?
1300 if basis_datasets[j] not in datasets_set: # was dataset chopped off by schedule or --incl/exclude-dataset*?
1301 return None # detected filter pruning that is incompatible with 'zfs snapshot -r'
1302 j += 1 # move to the next basis_src_dataset
1303 else:
1304 i += 1 # move to next root dataset; no need to check root_datasets that are nomore (or not yet) reachable
1305 return root_datasets
1307 @staticmethod
1308 def find_root_datasets(sorted_datasets: list[str]) -> list[str]:
1309 """Returns the roots of the subtrees in the (sorted) input datasets; The output root dataset list is sorted, too; A
1310 dataset is a root dataset if it has no parent, i.e. it is not a descendant of any dataset in the input datasets."""
1311 root_datasets: list[str] = []
1312 skip_dataset: str = DONT_SKIP_DATASET
1313 for dataset in sorted_datasets:
1314 if is_descendant(dataset, of_root_dataset=skip_dataset):
1315 continue
1316 skip_dataset = dataset
1317 root_datasets.append(dataset)
1318 return root_datasets
1320 def find_datasets_to_snapshot(self, sorted_datasets: list[str]) -> dict[SnapshotLabel, list[str]]:
1321 """Given a (sorted) list of source datasets, returns a dict where the key is a snapshot name (aka SnapshotLabel, e.g.
1322 bzfs_2024-11-06_08:30:05_hourly) and the value is the (sorted) (sub)list of datasets for which a snapshot needs to be
1323 created with that name, because these datasets are due per the schedule, either because the 'creation' time of their
1324 most recent snapshot with that name pattern is now too old, or such a snapshot does not even exist.
1326 The baseline implementation uses the 'zfs list -t snapshot' CLI to find the most recent snapshots, which is simple
1327 but doesn't scale well with the number of snapshots, at least if the goal is to take snapshots every second. An
1328 alternative, much more scalable, implementation queries the standard ZFS "snapshots_changed" dataset property
1329 (requires zfs >= 2.2.0), in combination with a local cache that stores this property, as well as the creation time of
1330 the most recent snapshot, for each SnapshotLabel and each dataset.
1331 """
1332 p, log = self.params, self.params.log
1333 src = p.src
1334 config: CreateSrcSnapshotConfig = p.create_src_snapshots_config
1335 datasets_to_snapshot: dict[SnapshotLabel, list[str]] = defaultdict(list)
1336 is_caching: bool = False
1337 msgs: list[tuple[datetime, str, SnapshotLabel, str]] = []
1339 def create_snapshot_if_latest_is_too_old(
1340 datasets_to_snapshot: dict[SnapshotLabel, list[str]], dataset: str, label: SnapshotLabel, creation_unixtime: int
1341 ) -> None:
1342 """Schedules creation of a snapshot for the given label if the label's existing latest snapshot is too old."""
1343 creation_dt: datetime = datetime.fromtimestamp(creation_unixtime, tz=config.tz)
1344 log.log(LOG_TRACE, "Latest snapshot creation: %s for %s", creation_dt, label)
1345 duration_amount, duration_unit = config.suffix_durations[label.suffix]
1346 next_event_dt: datetime = round_datetime_up_to_duration_multiple(
1347 creation_dt + timedelta(microseconds=1), duration_amount, duration_unit, config.anchors
1348 )
1349 msg: str = ""
1350 if config.current_datetime >= next_event_dt:
1351 datasets_to_snapshot[label].append(dataset) # mark it as scheduled for snapshot creation
1352 msg = " has passed"
1353 msgs.append((next_event_dt, dataset, label, msg))
1354 if is_caching and not p.dry_run: # update cache with latest state from 'zfs list -t snapshot'
1355 cache_file: str = self.cache.last_modified_cache_file(src, dataset, label)
1356 set_last_modification_time_safe(cache_file, unixtime_in_secs=creation_unixtime, if_more_recent=True)
1358 labels: list[SnapshotLabel] = []
1359 config_labels: list[SnapshotLabel] = config.snapshot_labels()
1360 for label in config_labels:
1361 duration_amount_, duration_unit_ = config.suffix_durations[label.suffix]
1362 if duration_amount_ == 0 or config.create_src_snapshots_even_if_not_due:
1363 datasets_to_snapshot[label] = sorted_datasets # take snapshot regardless of creation time of existing snaps
1364 else:
1365 labels.append(label)
1366 if len(labels) == 0:
1367 return datasets_to_snapshot # nothing more TBD
1369 # satisfy request from local cache as much as possible
1370 cached_datasets_to_snapshot: dict[SnapshotLabel, list[str]] = defaultdict(list)
1371 if is_caching_snapshots(p, src):
1372 sorted_datasets_todo: list[str] = []
1373 for dataset in sorted_datasets:
1374 cache: SnapshotCache = self.cache
1375 cached_snapshots_changed: int = cache.get_snapshots_changed(cache.last_modified_cache_file(src, dataset))
1376 if cached_snapshots_changed == 0:
1377 sorted_datasets_todo.append(dataset) # request cannot be answered from cache
1378 continue
1379 if cached_snapshots_changed != self.src_properties[dataset].snapshots_changed: # get that prop "for free"
1380 cache.invalidate_last_modified_cache_dataset(dataset)
1381 sorted_datasets_todo.append(dataset) # request cannot be answered from cache
1382 continue
1383 creation_unixtimes: list[int] = []
1384 for label in labels:
1385 creation_unixtime: int = cache.get_snapshots_changed(cache.last_modified_cache_file(src, dataset, label))
1386 if creation_unixtime == 0:
1387 sorted_datasets_todo.append(dataset) # request cannot be answered from cache
1388 break
1389 creation_unixtimes.append(creation_unixtime)
1390 if len(creation_unixtimes) == len(labels):
1391 for j, label in enumerate(labels):
1392 create_snapshot_if_latest_is_too_old(
1393 cached_datasets_to_snapshot, dataset, label, creation_unixtimes[j]
1394 )
1395 sorted_datasets = sorted_datasets_todo
1397 def create_snapshot_fn(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None:
1398 create_snapshot_if_latest_is_too_old(datasets_to_snapshot, dataset, labels[i], creation_unixtime_secs)
1400 def on_finish_dataset(dataset: str) -> None:
1401 if is_caching and not p.dry_run:
1402 set_last_modification_time_safe(
1403 self.cache.last_modified_cache_file(src, dataset),
1404 unixtime_in_secs=self.src_properties[dataset].snapshots_changed,
1405 if_more_recent=True,
1406 )
1408 # fallback to 'zfs list -t snapshot' for any remaining datasets, as these couldn't be satisfied from local cache
1409 is_caching = is_caching_snapshots(p, src)
1410 datasets_without_snapshots: list[str] = self.handle_minmax_snapshots(
1411 src, sorted_datasets, labels, fn_latest=create_snapshot_fn, fn_on_finish_dataset=on_finish_dataset
1412 )
1413 for lbl in labels: # merge (sorted) results from local cache + 'zfs list -t snapshot' into (sorted) combined result
1414 datasets_to_snapshot[lbl].sort()
1415 if datasets_without_snapshots or (lbl in cached_datasets_to_snapshot): # +take snaps for snapshot-less datasets
1416 datasets_to_snapshot[lbl] = list( # inputs to merge() are sorted, and outputs are sorted too
1417 heapq.merge(datasets_to_snapshot[lbl], cached_datasets_to_snapshot[lbl], datasets_without_snapshots)
1418 )
1419 msgs.sort()
1420 prefx = "Next scheduled snapshot time: "
1421 text = "\n".join(f"{prefx}{next_event_dt} for {dataset}@{label}{msg}" for next_event_dt, dataset, label, msg in msgs)
1422 if len(text) > 0:
1423 log.info("Next scheduled snapshot times ...\n%s", text)
1424 # sort to ensure that we take snapshots for dailies before hourlies, and so on
1425 label_indexes: dict[SnapshotLabel, int] = {label: k for k, label in enumerate(config_labels)}
1426 datasets_to_snapshot = dict(sorted(datasets_to_snapshot.items(), key=lambda kv: label_indexes[kv[0]]))
1427 return datasets_to_snapshot
1429 def handle_minmax_snapshots(
1430 self,
1431 remote: Remote,
1432 sorted_datasets: list[str],
1433 labels: list[SnapshotLabel],
1434 fn_latest: Callable[[int, int, str, str], None], # callback function for latest snapshot
1435 fn_oldest: Callable[[int, int, str, str], None] | None = None, # callback function for oldest snapshot
1436 fn_on_finish_dataset: Callable[[str], None] = lambda dataset: None,
1437 ) -> list[str]: # thread-safe
1438 """For each dataset in `sorted_datasets`, for each label in `labels`, finds the latest and oldest snapshot, and runs
1439 the callback functions on them; Ignores the timestamp of the input labels and the timestamp of the snapshot names."""
1440 assert (not self.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted"
1441 p = self.params
1442 cmd = p.split_args(f"{p.zfs_program} list -t snapshot -d 1 -Hp -o createtxg,creation,name") # sort dataset,createtxg
1443 datasets_with_snapshots: set[str] = set()
1444 interner: SortedInterner[str] = SortedInterner(sorted_datasets) # reduces memory footprint
1445 for lines in zfs_list_snapshots_in_parallel(self, remote, cmd, sorted_datasets, ordered=False):
1446 # streaming group by dataset name (consumes constant memory only)
1447 for dataset, group in itertools.groupby(lines, key=lambda line: line[line.rindex("\t") + 1 : line.index("@")]):
1448 dataset = interner.interned(dataset)
1449 snapshots = sorted( # fetch all snapshots of current dataset and sort by createtxg,creation,name
1450 (int(createtxg), int(creation_unixtime_secs), name[name.index("@") + 1 :])
1451 for createtxg, creation_unixtime_secs, name in (line.split("\t", 2) for line in group)
1452 )
1453 assert len(snapshots) > 0
1454 datasets_with_snapshots.add(dataset)
1455 snapshot_names: list[str] = [snapshot[-1] for snapshot in snapshots]
1456 year_with_4_digits_regex: re.Pattern[str] = YEAR_WITH_FOUR_DIGITS_REGEX
1457 fns = ((fn_latest, True),) if fn_oldest is None else ((fn_latest, True), (fn_oldest, False))
1458 for i, label in enumerate(labels):
1459 infix: str = label.infix
1460 start: str = label.prefix + infix
1461 end: str = label.suffix
1462 startlen: int = len(start)
1463 endlen: int = len(end)
1464 minlen: int = startlen + endlen if infix else 4 + startlen + endlen # year_with_four_digits_regex
1465 year_slice: slice = slice(startlen, startlen + 4) # [startlen:startlen+4] # year_with_four_digits_regex
1466 for fn, is_reverse in fns:
1467 creation_unixtime_secs: int = 0 # find creation time of latest or oldest snapshot matching the label
1468 minmax_snapshot: str = ""
1469 for j, s in enumerate(reversed(snapshot_names) if is_reverse else snapshot_names):
1470 if (
1471 s.endswith(end)
1472 and s.startswith(start)
1473 and len(s) >= minlen
1474 and (infix or year_with_4_digits_regex.fullmatch(s[year_slice]))
1475 ):
1476 k: int = len(snapshots) - j - 1 if is_reverse else j
1477 creation_unixtime_secs = snapshots[k][1]
1478 minmax_snapshot = s
1479 break
1480 fn(i, creation_unixtime_secs, dataset, minmax_snapshot)
1481 fn_on_finish_dataset(dataset)
1482 datasets_without_snapshots = [dataset for dataset in sorted_datasets if dataset not in datasets_with_snapshots]
1483 return datasets_without_snapshots
1486#############################################################################
1487class DatasetProperties:
1488 """Properties of a ZFS dataset."""
1490 __slots__ = ("recordsize", "snapshots_changed") # uses more compact memory layout than __dict__
1492 def __init__(self, recordsize: int, snapshots_changed: int) -> None:
1493 # immutable variables:
1494 self.recordsize: int = recordsize
1496 # mutable variables:
1497 self.snapshots_changed: int = snapshots_changed
1500#############################################################################
1501def fix_solaris_raw_mode(lst: list[str]) -> list[str]:
1502 """Adjusts zfs send options for Solaris compatibility."""
1503 lst = ["-w" if opt == "--raw" else opt for opt in lst]
1504 lst = ["compress" if opt == "--compressed" else opt for opt in lst]
1505 i = lst.index("-w") if "-w" in lst else -1
1506 if i >= 0:
1507 i += 1
1508 if i == len(lst) or (lst[i] != "none" and lst[i] != "compress" and lst[i] != "crypto"):
1509 lst.insert(i, "none")
1510 return lst
1513def has_siblings(sorted_datasets: list[str]) -> bool:
1514 """Returns whether the (sorted) list of input datasets contains any siblings."""
1515 skip_dataset: str = DONT_SKIP_DATASET
1516 parents: set[str] = set()
1517 for dataset in sorted_datasets:
1518 assert dataset
1519 parent = os.path.dirname(dataset)
1520 if parent in parents:
1521 return True # I have a sibling if my parent already has another child
1522 parents.add(parent)
1523 if is_descendant(dataset, of_root_dataset=skip_dataset):
1524 continue
1525 if skip_dataset != DONT_SKIP_DATASET:
1526 return True # I have a sibling if I am a root dataset and another root dataset already exists
1527 skip_dataset = dataset
1528 return False
1531def parse_dataset_locator(
1532 input_text: str, validate: bool = True, user: str | None = None, host: str | None = None, port: int | None = None
1533) -> tuple[str, str, str, str, str]:
1534 """Splits user@host:pool/dataset into its components with optional checks."""
1536 def convert_ipv6(hostname: str) -> str: # support IPv6 without getting confused by host:dataset colon separator ...
1537 return hostname.replace("|", ":") # ... and any colons that may be part of a (valid) ZFS dataset name
1539 user_undefined: bool = user is None
1540 if user is None:
1541 user = ""
1542 host_undefined: bool = host is None
1543 if host is None:
1544 host = ""
1545 host = convert_ipv6(host)
1546 user_host, dataset, pool = "", "", ""
1548 # Input format is [[user@]host:]dataset
1549 # 1234 5 6
1550 if match := re.fullmatch(r"(((([^@]*)@)?([^:]+)):)?(.*)", input_text, re.DOTALL): 1550 ↛ 1567line 1550 didn't jump to line 1567 because the condition on line 1550 was always true
1551 if user_undefined:
1552 user = match.group(4) or ""
1553 if host_undefined:
1554 host = match.group(5) or ""
1555 host = convert_ipv6(host)
1556 if host == "-":
1557 host = ""
1558 dataset = match.group(6) or ""
1559 i = dataset.find("/")
1560 pool = dataset[0:i] if i >= 0 else dataset
1562 if user and host:
1563 user_host = f"{user}@{host}"
1564 elif host:
1565 user_host = host
1567 if validate:
1568 validate_user_name(user, input_text)
1569 validate_host_name(host, input_text)
1570 if port is not None:
1571 validate_port(port, f"Invalid port number: '{port}' for: '{input_text}' - ")
1572 validate_dataset_name(dataset, input_text)
1574 return user, host, user_host, pool, dataset
1577def validate_user_name(user: str, input_text: str) -> None:
1578 """Checks that the username is safe for ssh or local usage."""
1579 invalid_chars: str = SHELL_CHARS + "/"
1580 if user and (".." in user or any(c.isspace() or c in invalid_chars for c in user)):
1581 die(f"Invalid user name: '{user}' for: '{input_text}'")
1584def validate_host_name(host: str, input_text: str, extra_invalid_chars: str = "") -> None:
1585 """Checks hostname for forbidden characters or patterns."""
1586 invalid_chars: str = SHELL_CHARS + "/" + extra_invalid_chars
1587 if host and (host.startswith("-") or ".." in host or any(c.isspace() or c in invalid_chars for c in host)):
1588 die(f"Invalid host name: '{host}' for: '{input_text}'")
1591def validate_port(port: str | int, message: str) -> None:
1592 """Checks that port specification is a valid integer."""
1593 if isinstance(port, int):
1594 port = str(port)
1595 if port and not port.isdigit():
1596 die(message + f"must be empty or a positive integer: '{port}'")
1599#############################################################################
1600if __name__ == "__main__": 1600 ↛ 1601line 1600 didn't jump to line 1601 because the condition on line 1600 was never true
1601 main()