Coverage for bzfs_main/bzfs.py: 99%
990 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15# Inline script metadata conforming to https://packaging.python.org/specifications/inline-script-metadata
16# /// script
17# requires-python = ">=3.9"
18# dependencies = []
19# ///
20#
21"""
22* Main CLI entry point for replicating and managing ZFS snapshots. It handles the low-level mechanics of `zfs send/receive`,
23 data transfer, and snapshot management between two hosts.
24* Overview of the bzfs.py codebase:
25* The codebase starts with docs, definition of input data and associated argument parsing into a "Params" class.
26* All CLI option/parameter values are reachable from the "Params" class.
27* Control flow starts in main(), which kicks off a "Job".
28* A Job runs one or more "tasks" via run_tasks(), each task replicating a separate dataset tree.
29* The core replication algorithm is in run_task() and especially in replicate_datasets() and replicate_dataset().
30* The filter algorithms that apply include/exclude policies are in filter_datasets() and filter_snapshots().
31* The --create-src-snapshots-* and --delete-* and --compare-* and --monitor-* algorithms also start in run_task().
32* The main retry logic is in run_with_retries() and clear_resumable_recv_state_if_necessary().
33* Progress reporting for use during `zfs send/recv` data transfers is in class ProgressReporter.
34* Executing a CLI command on a local or remote host is in run_ssh_command().
35* Network connection management is in refresh_ssh_connection_if_necessary() and class ConnectionPool.
36* Cache functionality can be found by searching for this regex: .*cach.*
37* The parallel processing engine is in itr_ssh_cmd_parallel() and process_datasets_in_parallel_and_fault_tolerant().
38* README.md is mostly auto-generated from the ArgumentParser help texts as the source of "truth", via update_readme.sh.
39 Simply run that script whenever you change or add ArgumentParser help text.
40"""
42from __future__ import (
43 annotations,
44)
45import argparse
46import contextlib
47import fcntl
48import heapq
49import itertools
50import os
51import re
52import subprocess
53import sys
54import threading
55import time
56from collections import (
57 Counter,
58 defaultdict,
59)
60from collections.abc import (
61 Collection,
62)
63from datetime import (
64 datetime,
65 timedelta,
66)
67from logging import (
68 Logger,
69)
70from pathlib import (
71 Path,
72)
73from subprocess import (
74 CalledProcessError,
75)
76from typing import (
77 Any,
78 Callable,
79 Final,
80 cast,
81)
83import bzfs_main.loggers
84from bzfs_main.argparse_actions import (
85 has_timerange_filter,
86)
87from bzfs_main.argparse_cli import (
88 EXCLUDE_DATASET_REGEXES_DEFAULT,
89)
90from bzfs_main.compare_snapshot_lists import (
91 run_compare_snapshot_lists,
92)
93from bzfs_main.configuration import (
94 AlertConfig,
95 CreateSrcSnapshotConfig,
96 LogParams,
97 MonitorSnapshotAlert,
98 Params,
99 Remote,
100 SnapshotLabel,
101)
102from bzfs_main.connection import (
103 decrement_injection_counter,
104 maybe_inject_error,
105 run_ssh_command,
106 timeout,
107 try_ssh_command,
108)
109from bzfs_main.detect import (
110 DISABLE_PRG,
111 RemoteConfCacheItem,
112 are_bookmarks_enabled,
113 detect_available_programs,
114 is_caching_snapshots,
115 is_dummy,
116 is_zpool_feature_enabled_or_active,
117)
118from bzfs_main.filter import (
119 SNAPSHOT_REGEX_FILTER_NAME,
120 dataset_regexes,
121 filter_datasets,
122 filter_lines,
123 filter_lines_except,
124 filter_snapshots,
125)
126from bzfs_main.loggers import (
127 get_simple_logger,
128 reset_logger,
129 set_logging_runtime_defaults,
130)
131from bzfs_main.parallel_batch_cmd import (
132 run_ssh_cmd_parallel,
133 zfs_list_snapshots_in_parallel,
134)
135from bzfs_main.parallel_iterator import (
136 run_in_parallel,
137)
138from bzfs_main.parallel_tasktree_policy import (
139 process_datasets_in_parallel_and_fault_tolerant,
140)
141from bzfs_main.progress_reporter import (
142 ProgressReporter,
143 count_num_bytes_transferred_by_zfs_send,
144)
145from bzfs_main.replication import (
146 delete_bookmarks,
147 delete_datasets,
148 delete_snapshots,
149 replicate_dataset,
150)
151from bzfs_main.retry import (
152 Retry,
153 RetryableError,
154)
155from bzfs_main.snapshot_cache import (
156 MONITOR_CACHE_FILE_PREFIX,
157 REPLICATION_CACHE_FILE_PREFIX,
158 SnapshotCache,
159 set_last_modification_time_safe,
160)
161from bzfs_main.utils import (
162 DESCENDANTS_RE_SUFFIX,
163 DIE_STATUS,
164 DONT_SKIP_DATASET,
165 FILE_PERMISSIONS,
166 LOG_DEBUG,
167 LOG_TRACE,
168 PROG_NAME,
169 SHELL_CHARS,
170 UMASK,
171 YEAR_WITH_FOUR_DIGITS_REGEX,
172 Interner,
173 SortedInterner,
174 Subprocesses,
175 SynchronizedBool,
176 SynchronizedDict,
177 append_if_absent,
178 compile_regexes,
179 cut,
180 die,
181 has_duplicates,
182 human_readable_bytes,
183 human_readable_duration,
184 is_descendant,
185 percent,
186 pretty_print_formatter,
187 replace_in_lines,
188 replace_prefix,
189 sha256_85_urlsafe_base64,
190 sha256_128_urlsafe_base64,
191 termination_signal_handler,
192 validate_dataset_name,
193 validate_property_name,
194 xappend,
195 xfinally,
196)
198# constants:
199__version__: Final[str] = bzfs_main.argparse_cli.__version__
200CRITICAL_STATUS: Final[int] = 2
201WARNING_STATUS: Final[int] = 1
202STILL_RUNNING_STATUS: Final[int] = 4
203MIN_PYTHON_VERSION: Final[tuple[int, int]] = (3, 9)
204if sys.version_info < MIN_PYTHON_VERSION:
205 print(f"ERROR: {PROG_NAME} requires Python version >= {'.'.join(map(str, MIN_PYTHON_VERSION))}!")
206 sys.exit(DIE_STATUS)
207CREATE_SRC_SNAPSHOTS_PREFIX_DFLT: Final[str] = PROG_NAME + "_"
208CREATE_SRC_SNAPSHOTS_SUFFIX_DFLT: Final[str] = "_adhoc"
209MATURITY_TIME_THRESHOLD_SECS: Final[float] = 1.1 # 1 sec ZFS creation time resolution + NTP clock skew is typically < 10ms
212#############################################################################
213def argument_parser() -> argparse.ArgumentParser:
214 """Returns the CLI parser used by bzfs."""
215 return bzfs_main.argparse_cli.argument_parser()
218def main() -> None:
219 """API for command line clients."""
220 prev_umask: int = os.umask(UMASK)
221 try:
222 set_logging_runtime_defaults()
223 # On CTRL-C and SIGTERM, send signal to all descendant processes to terminate them
224 termination_event: threading.Event = threading.Event()
225 with termination_signal_handler(termination_event=termination_event):
226 run_main(argument_parser().parse_args(), sys.argv, termination_event=termination_event)
227 except subprocess.CalledProcessError as e:
228 ret: int = e.returncode
229 ret = DIE_STATUS if isinstance(ret, int) and 1 <= ret <= STILL_RUNNING_STATUS else ret
230 sys.exit(ret)
231 finally:
232 os.umask(prev_umask) # restore prior global state
235def run_main(
236 args: argparse.Namespace,
237 sys_argv: list[str] | None = None,
238 log: Logger | None = None,
239 termination_event: threading.Event | None = None,
240) -> None:
241 """API for Python clients; visible for testing; may become a public API eventually."""
242 Job(termination_event=termination_event).run_main(args, sys_argv, log)
245#############################################################################
246class Job:
247 """Executes one bzfs run, coordinating snapshot replication tasks."""
249 def __init__(self, termination_event: threading.Event | None = None) -> None:
250 self.params: Params
251 self.termination_event: Final[threading.Event] = termination_event or threading.Event()
252 self.subprocesses: Final[Subprocesses] = Subprocesses()
253 self.all_dst_dataset_exists: Final[dict[str, dict[str, bool]]] = defaultdict(lambda: defaultdict(bool))
254 self.dst_dataset_exists: SynchronizedDict[str, bool] = SynchronizedDict({})
255 self.src_properties: dict[str, DatasetProperties] = {}
256 self.dst_properties: dict[str, DatasetProperties] = {}
257 self.all_exceptions: list[str] = []
258 self.all_exceptions_count: int = 0
259 self.max_exceptions_to_summarize: int = 10000
260 self.first_exception: BaseException | None = None
261 self.remote_conf_cache: dict[tuple, RemoteConfCacheItem] = {}
262 self.max_datasets_per_minibatch_on_list_snaps: dict[str, int] = {}
263 self.max_workers: dict[str, int] = {}
264 self.control_persist_margin_secs: int = 2
265 self.progress_reporter: ProgressReporter = cast(ProgressReporter, None)
266 self.is_first_replication_task: Final[SynchronizedBool] = SynchronizedBool(True)
267 self.replication_start_time_nanos: int = time.monotonic_ns()
268 self.timeout_nanos: int | None = None
269 self.cache: SnapshotCache = SnapshotCache(self)
270 self.stats_lock: Final[threading.Lock] = threading.Lock()
271 self.num_cache_hits: int = 0
272 self.num_cache_misses: int = 0
273 self.num_snapshots_found: int = 0
274 self.num_snapshots_replicated: int = 0
276 self.is_test_mode: bool = False # for testing only
277 self.creation_prefix: str = "" # for testing only
278 self.isatty: bool | None = None # for testing only
279 self.use_select: bool = False # for testing only
280 self.progress_update_intervals: tuple[float, float] | None = None # for testing only
281 self.error_injection_triggers: dict[str, Counter[str]] = {} # for testing only
282 self.delete_injection_triggers: dict[str, Counter[str]] = {} # for testing only
283 self.param_injection_triggers: dict[str, dict[str, bool]] = {} # for testing only
284 self.inject_params: dict[str, bool] = {} # for testing only
285 self.injection_lock: threading.Lock = threading.Lock() # for testing only
286 self.max_command_line_bytes: int | None = None # for testing only
288 def shutdown(self) -> None:
289 """Exits any multiplexed ssh sessions that may be leftover."""
290 cache_items: Collection[RemoteConfCacheItem] = self.remote_conf_cache.values()
291 for i, cache_item in enumerate(cache_items):
292 cache_item.connection_pools.shutdown(f"{i + 1}/{len(cache_items)}")
294 def terminate(self) -> None:
295 """Shuts down gracefully; also terminates descendant processes, if any."""
296 with xfinally(self.subprocesses.terminate_process_subtrees):
297 self.shutdown()
299 def run_main(self, args: argparse.Namespace, sys_argv: list[str] | None = None, log: Logger | None = None) -> None:
300 """Parses CLI arguments, sets up logging, and executes main job loop."""
301 assert isinstance(self.error_injection_triggers, dict)
302 assert isinstance(self.delete_injection_triggers, dict)
303 assert isinstance(self.inject_params, dict)
304 logger_name_suffix: str = ""
306 def _reset_logger() -> None:
307 if logger_name_suffix and log is not None: # reset Logger unless it's a Logger outside of our control 307 ↛ exitline 307 didn't return from function '_reset_logger' because the condition on line 307 was always true
308 reset_logger(log)
310 with xfinally(_reset_logger): # runs _reset_logger() on exit, without masking error raised in body of `with` block
311 try:
312 log_params: LogParams = LogParams(args)
313 logger_name_suffix = "" if log is not None else log_params.logger_name_suffix
314 log = bzfs_main.loggers.get_logger(
315 log_params=log_params, args=args, log=log, logger_name_suffix=logger_name_suffix
316 )
317 log.info("%s", f"Log file is: {log_params.log_file}")
318 except BaseException as e:
319 simple_log: Logger = get_simple_logger(PROG_NAME, logger_name_suffix=logger_name_suffix)
320 try:
321 simple_log.error("Log init: %s", e, exc_info=not isinstance(e, SystemExit))
322 finally:
323 reset_logger(simple_log)
324 raise
326 aux_args: list[str] = []
327 if getattr(args, "include_snapshot_plan", None):
328 aux_args += args.include_snapshot_plan
329 if getattr(args, "delete_dst_snapshots_except_plan", None):
330 aux_args += args.delete_dst_snapshots_except_plan
331 if len(aux_args) > 0:
332 log.info("Auxiliary CLI arguments: %s", " ".join(aux_args))
333 args = argument_parser().parse_args(xappend(aux_args, "--", args.root_dataset_pairs), namespace=args)
335 def log_error_on_exit(error: Any, status_code: Any, exc_info: bool = False) -> None:
336 log.error("%s%s", f"Exiting {PROG_NAME} with status code {status_code}. Cause: ", error, exc_info=exc_info)
338 try:
339 log.info("CLI arguments: %s %s", " ".join(sys_argv or []), f"[uid: {os.getuid()}, euid: {os.geteuid()}]")
340 if self.is_test_mode:
341 log.log(LOG_TRACE, "Parsed CLI arguments: %s", args)
342 self.params = p = Params(args, sys_argv or [], log_params, log, self.inject_params)
343 log_params.params = p
344 lock_file: str = p.lock_file_name()
345 lock_fd = os.open(
346 lock_file, os.O_WRONLY | os.O_TRUNC | os.O_CREAT | os.O_NOFOLLOW | os.O_CLOEXEC, FILE_PERMISSIONS
347 )
348 with xfinally(lambda: os.close(lock_fd)):
349 try:
350 # Acquire an exclusive lock; will raise a BlockingIOError if lock is already held by this process or
351 # another process. The (advisory) lock is auto-released when the process terminates or the fd is
352 # closed.
353 fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # LOCK_NB ... non-blocking
354 except BlockingIOError:
355 msg = "Exiting as same previous periodic job is still running without completion yet per "
356 die(msg + lock_file, STILL_RUNNING_STATUS)
358 # xfinally: unlink the lock_file while still holding the flock on its fd - it's a correct and safe
359 # standard POSIX pattern:
360 # - Performing unlink() before close(fd) avoids a race where a subsequent bzfs process could recreate and
361 # lock a fresh inode for the same path between our close() and a later unlink(). In that case, a late
362 # unlink would delete the newer process's lock_file path.
363 # - At this point, critical work is complete; the remaining steps are shutdown mechanics that have no
364 # side effect, so this pattern is correct, safe, and simple.
365 with xfinally(lambda: Path(lock_file).unlink(missing_ok=True)): # don't accumulate stale files
366 try:
367 self.run_tasks() # do the real work
368 except BaseException:
369 self.terminate()
370 raise
371 self.shutdown()
372 with contextlib.suppress(BrokenPipeError):
373 sys.stderr.flush()
374 except subprocess.CalledProcessError as e:
375 log_error_on_exit(e, e.returncode)
376 raise
377 except SystemExit as e:
378 log_error_on_exit(e, e.code)
379 raise
380 except (subprocess.TimeoutExpired, UnicodeDecodeError) as e:
381 log_error_on_exit(e, DIE_STATUS)
382 raise SystemExit(DIE_STATUS) from e
383 except re.error as e:
384 log_error_on_exit(f"{e} within regex {e.pattern!r}", DIE_STATUS)
385 raise SystemExit(DIE_STATUS) from e
386 except BaseException as e:
387 log_error_on_exit(e, DIE_STATUS, exc_info=True)
388 raise SystemExit(DIE_STATUS) from e
389 finally:
390 log.info("%s", f"Log file was: {log_params.log_file}")
391 log.info("Success. Goodbye!")
392 with contextlib.suppress(BrokenPipeError):
393 sys.stderr.flush()
395 def run_tasks(self) -> None:
396 """Executes replication cycles, repeating until daemon lifetime expires."""
397 p, log = self.params, self.params.log
398 self.all_exceptions = []
399 self.all_exceptions_count = 0
400 self.first_exception = None
401 self.remote_conf_cache = {}
402 self.isatty = self.isatty if self.isatty is not None else p.isatty
403 self.validate_once()
404 self.replication_start_time_nanos = time.monotonic_ns()
405 self.progress_reporter = ProgressReporter(log, p.pv_program_opts, self.use_select, self.progress_update_intervals)
406 with xfinally(lambda: self.progress_reporter.stop()):
407 daemon_stoptime_nanos: int = time.monotonic_ns() + p.daemon_lifetime_nanos
408 while True: # loop for daemon mode
409 self.timeout_nanos = None if p.timeout_nanos is None else time.monotonic_ns() + p.timeout_nanos
410 self.all_dst_dataset_exists.clear()
411 self.progress_reporter.reset()
412 src, dst = p.src, p.dst
413 for src_root_dataset, dst_root_dataset in p.root_dataset_pairs:
414 if self.termination_event.is_set():
415 self.terminate()
416 break
417 src.root_dataset = src.basis_root_dataset = src_root_dataset
418 dst.root_dataset = dst.basis_root_dataset = dst_root_dataset
419 p.curr_zfs_send_program_opts = p.zfs_send_program_opts.copy()
420 if p.daemon_lifetime_nanos > 0:
421 self.timeout_nanos = None if p.timeout_nanos is None else time.monotonic_ns() + p.timeout_nanos
422 recurs_sep = " " if p.recursive_flag else ""
423 task_description = f"{src.basis_root_dataset} {p.recursive_flag}{recurs_sep}--> {dst.basis_root_dataset}"
424 if len(p.root_dataset_pairs) > 1:
425 log.info("Starting task: %s", task_description + " ...")
426 try:
427 try:
428 maybe_inject_error(self, cmd=[], error_trigger="retryable_run_tasks")
429 timeout(self)
430 self.validate_task()
431 self.run_task() # do the real work
432 except RetryableError as retryable_error:
433 cause: BaseException | None = retryable_error.__cause__
434 assert cause is not None
435 raise cause.with_traceback(cause.__traceback__) # noqa: B904 re-raise of cause without chaining
436 except (CalledProcessError, subprocess.TimeoutExpired, SystemExit, UnicodeDecodeError) as e:
437 if p.skip_on_error == "fail" or (
438 isinstance(e, subprocess.TimeoutExpired) and p.daemon_lifetime_nanos == 0
439 ):
440 raise
441 log.error("%s", e)
442 self.append_exception(e, "task", task_description)
443 if not self.sleep_until_next_daemon_iteration(daemon_stoptime_nanos):
444 break
445 if not p.skip_replication:
446 self.print_replication_stats(self.replication_start_time_nanos)
447 error_count = self.all_exceptions_count
448 if error_count > 0 and p.daemon_lifetime_nanos == 0:
449 msgs = "\n".join([f"{i + 1}/{error_count}: {e}" for i, e in enumerate(self.all_exceptions)])
450 log.error("%s", f"Tolerated {error_count} errors. Error Summary: \n{msgs}")
451 assert self.first_exception is not None
452 raise self.first_exception
454 def append_exception(self, e: BaseException, task_name: str, task_description: str) -> None:
455 """Records and logs an exception that was encountered while running a subtask."""
456 self.first_exception = self.first_exception or e
457 if len(self.all_exceptions) < self.max_exceptions_to_summarize: # cap max memory consumption
458 self.all_exceptions.append(str(e))
459 self.all_exceptions_count += 1
460 self.params.log.error(f"#{self.all_exceptions_count}: Done with %s: %s", task_name, task_description)
462 def sleep_until_next_daemon_iteration(self, daemon_stoptime_nanos: int) -> bool:
463 """Pauses until next scheduled snapshot time or daemon stop; Returns True to continue daemon loop; False to stop."""
464 sleep_nanos: int = daemon_stoptime_nanos - time.monotonic_ns()
465 if sleep_nanos <= 0:
466 return False
467 self.progress_reporter.pause()
468 p, log = self.params, self.params.log
469 config: CreateSrcSnapshotConfig = p.create_src_snapshots_config
470 curr_datetime: datetime = config.current_datetime + timedelta(microseconds=1)
471 next_snapshotting_event_dt: datetime = min(
472 (
473 config.anchors.round_datetime_up_to_duration_multiple(curr_datetime, duration_amount, duration_unit)
474 for duration_amount, duration_unit in config.suffix_durations.values()
475 ),
476 default=curr_datetime + timedelta(days=10 * 365), # infinity
477 )
478 offset: timedelta = next_snapshotting_event_dt - datetime.now(config.tz)
479 offset_nanos: int = (offset.days * 86400 + offset.seconds) * 1_000_000_000 + offset.microseconds * 1_000
480 sleep_nanos = min(sleep_nanos, max(0, offset_nanos))
481 log.info("Daemon sleeping for: %s%s", human_readable_duration(sleep_nanos), f" ... [Log {p.log_params.log_file}]")
482 self.termination_event.wait(sleep_nanos / 1_000_000_000) # allow early wakeup on async termination
483 config.current_datetime = datetime.now(config.tz)
484 return time.monotonic_ns() < daemon_stoptime_nanos and not self.termination_event.is_set()
486 def print_replication_stats(self, start_time_nanos: int) -> None:
487 """Logs overall replication statistics after a job cycle completes."""
488 p, log = self.params, self.params.log
489 elapsed_nanos = time.monotonic_ns() - start_time_nanos
490 msg = p.dry(f"zfs sent {self.num_snapshots_replicated} snapshots in {human_readable_duration(elapsed_nanos)}.")
491 if p.is_program_available("pv", "local"):
492 sent_bytes: int = count_num_bytes_transferred_by_zfs_send(p.log_params.pv_log_file)
493 sent_bytes_per_sec: int = round(1_000_000_000 * sent_bytes / (elapsed_nanos or 1))
494 msg += f" zfs sent {human_readable_bytes(sent_bytes)} [{human_readable_bytes(sent_bytes_per_sec)}/s]."
495 log.info("%s", msg.ljust(p.terminal_columns - len("2024-01-01 23:58:45 [I] ")))
497 def validate_once(self) -> None:
498 """Validates CLI parameters and compiles regex lists one time only, which will later be reused many times."""
499 p = self.params
500 p.zfs_recv_ox_names = self.recv_option_property_names(p.zfs_recv_program_opts)
501 for snapshot_filter in p.snapshot_filters:
502 for _filter in snapshot_filter:
503 if _filter.name == SNAPSHOT_REGEX_FILTER_NAME:
504 exclude_snapshot_regexes = compile_regexes(_filter.options[0])
505 include_snapshot_regexes = compile_regexes(_filter.options[1] or [".*"])
506 _filter.options = (exclude_snapshot_regexes, include_snapshot_regexes)
508 exclude_regexes: list[str] = [EXCLUDE_DATASET_REGEXES_DEFAULT]
509 if len(p.args.exclude_dataset_regex) > 0: # some patterns don't exclude anything
510 exclude_regexes = [regex for regex in p.args.exclude_dataset_regex if regex != "" and regex != "!.*"]
511 include_regexes: list[str] = p.args.include_dataset_regex
513 # relative datasets need not be compiled more than once as they don't change between tasks
514 def separate_abs_vs_rel_datasets(datasets: list[str]) -> tuple[list[str], list[str]]:
515 abs_datasets: list[str] = []
516 rel_datasets: list[str] = []
517 for dataset in datasets:
518 (abs_datasets if dataset.startswith("/") else rel_datasets).append(dataset)
519 return abs_datasets, rel_datasets
521 p.abs_exclude_datasets, rel_exclude_datasets = separate_abs_vs_rel_datasets(p.args.exclude_dataset)
522 p.abs_include_datasets, rel_include_datasets = separate_abs_vs_rel_datasets(p.args.include_dataset)
523 suffix = DESCENDANTS_RE_SUFFIX
524 p.tmp_exclude_dataset_regexes, p.tmp_include_dataset_regexes = (
525 compile_regexes(exclude_regexes + dataset_regexes(p.src, p.dst, rel_exclude_datasets), suffix=suffix),
526 compile_regexes(include_regexes + dataset_regexes(p.src, p.dst, rel_include_datasets), suffix=suffix),
527 )
529 if p.pv_program != DISABLE_PRG:
530 pv_program_opts_set = set(p.pv_program_opts)
531 if pv_program_opts_set.isdisjoint({"--bytes", "-b", "--bits", "-8"}):
532 die("--pv-program-opts must contain one of --bytes or --bits for progress metrics to function.")
533 if self.isatty and not p.quiet:
534 for opts in [["--eta", "-e"], ["--fineta", "-I"], ["--average-rate", "-a"]]:
535 if pv_program_opts_set.isdisjoint(opts):
536 die(f"--pv-program-opts must contain one of {', '.join(opts)} for progress report line to function.")
538 src, dst = p.src, p.dst
539 for remote in [src, dst]:
540 r, loc = remote, remote.location
541 validate_user_name(r.basis_ssh_user, f"--ssh-{loc}-user")
542 validate_host_name(r.basis_ssh_host, f"--ssh-{loc}-host")
543 validate_port(r.ssh_port, f"--ssh-{loc}-port ")
545 def validate_task(self) -> None:
546 """Validates a single replication task before execution."""
547 p, log = self.params, self.params.log
548 src, dst = p.src, p.dst
549 for remote in [src, dst]:
550 r = remote
551 r.ssh_user, r.ssh_host, r.ssh_user_host, r.pool, r.root_dataset = parse_dataset_locator(
552 r.basis_root_dataset, user=r.basis_ssh_user, host=r.basis_ssh_host, port=r.ssh_port
553 )
554 r.sudo, r.use_zfs_delegation = self.sudo_cmd(r.ssh_user_host, r.ssh_user)
555 local_addrs = ("",) if self.is_test_mode else ("", "127.0.0.1", "::1") # ::1 is IPv6 version of loopback address
556 remote.is_nonlocal = r.ssh_host not in local_addrs
557 self.dst_dataset_exists = SynchronizedDict(self.all_dst_dataset_exists[dst.ssh_user_host])
559 if src.ssh_host == dst.ssh_host:
560 msg = f"src: {src.basis_root_dataset}, dst: {dst.basis_root_dataset}"
561 if src.root_dataset == dst.root_dataset:
562 die(f"Source and destination dataset must not be the same! {msg}")
563 if p.recursive and (
564 is_descendant(src.root_dataset, of_root_dataset=dst.root_dataset)
565 or is_descendant(dst.root_dataset, of_root_dataset=src.root_dataset)
566 ):
567 die(f"Source and destination dataset trees must not overlap! {msg}")
569 suffx = DESCENDANTS_RE_SUFFIX # also match descendants of a matching dataset
570 p.exclude_dataset_regexes, p.include_dataset_regexes = (
571 p.tmp_exclude_dataset_regexes + compile_regexes(dataset_regexes(src, dst, p.abs_exclude_datasets), suffix=suffx),
572 p.tmp_include_dataset_regexes + compile_regexes(dataset_regexes(src, dst, p.abs_include_datasets), suffix=suffx),
573 )
574 if len(p.include_dataset_regexes) == 0:
575 p.include_dataset_regexes = [(re.compile(r".*"), False)]
577 detect_available_programs(self)
579 zfs_send_program_opts: list[str] = p.curr_zfs_send_program_opts
580 if is_zpool_feature_enabled_or_active(p, dst, "feature@large_blocks"):
581 append_if_absent(zfs_send_program_opts, "--large-block")
582 p.curr_zfs_send_program_opts = zfs_send_program_opts
584 self.max_workers = {}
585 self.max_datasets_per_minibatch_on_list_snaps = {}
586 for r in [src, dst]:
587 cpus = int(p.available_programs[r.location].get("getconf_cpu_count", 8))
588 threads, is_percent = p.threads
589 cpus = max(1, round(cpus * threads / 100.0) if is_percent else round(threads))
590 self.max_workers[r.location] = cpus
591 bs = max(1, p.max_datasets_per_batch_on_list_snaps) # 1024 by default
592 max_datasets_per_minibatch = p.max_datasets_per_minibatch_on_list_snaps
593 if max_datasets_per_minibatch <= 0:
594 max_datasets_per_minibatch = max(1, bs // cpus)
595 max_datasets_per_minibatch = min(bs, max_datasets_per_minibatch)
596 self.max_datasets_per_minibatch_on_list_snaps[r.location] = max_datasets_per_minibatch
597 log.log(
598 LOG_TRACE,
599 "%s",
600 f"max_datasets_per_batch_on_list_snaps: {p.max_datasets_per_batch_on_list_snaps}, "
601 f"max_datasets_per_minibatch_on_list_snaps: {max_datasets_per_minibatch}, "
602 f"max_workers: {self.max_workers[r.location]}, "
603 f"location: {r.location}",
604 )
605 if self.is_test_mode:
606 log.log(LOG_TRACE, "Validated Param values: %s", pretty_print_formatter(self.params))
608 def sudo_cmd(self, ssh_user_host: str, ssh_user: str) -> tuple[str, bool]:
609 """Returns sudo command prefix and whether root privileges are required."""
610 p: Params = self.params
611 assert isinstance(ssh_user_host, str)
612 assert isinstance(ssh_user, str)
613 assert isinstance(p.sudo_program, str)
614 assert isinstance(p.enable_privilege_elevation, bool)
616 is_root: bool = True
617 if ssh_user_host != "":
618 if ssh_user == "":
619 if os.getuid() != 0:
620 is_root = False
621 elif ssh_user != "root":
622 is_root = False
623 elif os.getuid() != 0:
624 is_root = False
626 if is_root:
627 sudo = "" # using sudo in an attempt to make ZFS operations work even if we are not root user?
628 use_zfs_delegation = False # or instead using 'zfs allow' delegation?
629 return sudo, use_zfs_delegation
630 elif p.enable_privilege_elevation:
631 if p.sudo_program == DISABLE_PRG:
632 die(f"sudo CLI is not available on host: {ssh_user_host or 'localhost'}")
633 # The '-n' option makes 'sudo' safer and more fail-fast. It avoids having sudo prompt the user for input of any
634 # kind. If a password is required for the sudo command to run, sudo will display an error message and exit.
635 return p.sudo_program + " -n", False
636 else:
637 return "", True
639 def run_task(self) -> None:
640 """Replicates all snapshots for the current root dataset pair."""
642 def filter_src_datasets() -> list[str]: # apply --{include|exclude}-dataset policy
643 return filter_datasets(self, src, basis_src_datasets) if src_datasets is None else src_datasets
645 p, log = self.params, self.params.log
646 src, dst = p.src, p.dst
647 max_workers: int = min(self.max_workers[src.location], self.max_workers[dst.location])
648 recursive_sep: str = " " if p.recursive_flag else ""
649 task_description: str = f"{src.basis_root_dataset} {p.recursive_flag}{recursive_sep}--> {dst.basis_root_dataset} ..."
650 failed: bool = False
651 src_datasets: list[str] | None = None
652 basis_src_datasets: list[str] = []
653 self.src_properties = {}
654 self.dst_properties = {}
655 if not is_dummy(src): # find src dataset or all datasets in src dataset tree (with --recursive)
656 basis_src_datasets = self.list_src_datasets_task()
658 if not p.create_src_snapshots_config.skip_create_src_snapshots:
659 log.info(p.dry("--create-src-snapshots: %s"), f"{src.basis_root_dataset} {p.recursive_flag}{recursive_sep}...")
660 src_datasets = filter_src_datasets() # apply include/exclude policy
661 self.create_src_snapshots_task(basis_src_datasets, src_datasets)
663 # Optionally, replicate src.root_dataset (optionally including its descendants) to dst.root_dataset
664 if not p.skip_replication:
665 if len(basis_src_datasets) == 0:
666 die(f"Replication: Source dataset does not exist: {src.basis_root_dataset}")
667 if is_dummy(dst):
668 die("Replication: Destination may be a dummy dataset only if exclusively creating snapshots on the source!")
669 src_datasets = filter_src_datasets() # apply include/exclude policy
670 failed = self.replicate_datasets(src_datasets, task_description, max_workers)
672 if failed or not (
673 p.delete_dst_datasets
674 or p.delete_dst_snapshots
675 or p.delete_empty_dst_datasets
676 or p.compare_snapshot_lists
677 or p.monitor_snapshots_config.enable_monitor_snapshots
678 ):
679 return
680 log.info("Listing dst datasets: %s", task_description)
681 if is_dummy(dst):
682 die("Destination may be a dummy dataset only if exclusively creating snapshots on the source!")
683 basis_dst_datasets: list[str] = self.list_dst_datasets_task()
684 dst_datasets: list[str] = filter_datasets(self, dst, basis_dst_datasets) # apply include/exclude policy
686 if p.delete_dst_datasets and not failed:
687 log.info(p.dry("--delete-dst-datasets: %s"), task_description)
688 basis_dst_datasets, dst_datasets = self.delete_dst_datasets_task(
689 basis_src_datasets, basis_dst_datasets, dst_datasets
690 )
692 if p.delete_dst_snapshots and not failed:
693 log.info(p.dry("--delete-dst-snapshots: %s"), task_description + f" [{len(dst_datasets)} datasets]")
694 failed = self.delete_destination_snapshots_task(basis_src_datasets, dst_datasets, max_workers, task_description)
696 if p.delete_empty_dst_datasets and p.recursive and not failed:
697 log.info(p.dry("--delete-empty-dst-datasets: %s"), task_description)
698 basis_dst_datasets, dst_datasets = self.delete_empty_dst_datasets_task(basis_dst_datasets, dst_datasets)
700 if p.compare_snapshot_lists and not failed:
701 log.info("--compare-snapshot-lists: %s", task_description)
702 if len(basis_src_datasets) == 0 and not is_dummy(src):
703 die(f"Source dataset does not exist: {src.basis_root_dataset}")
704 src_datasets = filter_src_datasets() # apply include/exclude policy
705 run_compare_snapshot_lists(self, src_datasets, dst_datasets)
707 if p.monitor_snapshots_config.enable_monitor_snapshots and not failed:
708 log.info("--monitor-snapshots: %s", task_description)
709 src_datasets = filter_src_datasets() # apply include/exclude policy
710 self.monitor_snapshots_task(src_datasets, dst_datasets, task_description)
712 def list_src_datasets_task(self) -> list[str]:
713 """Lists datasets on the source host."""
714 p = self.params
715 src = p.src
716 basis_src_datasets: list[str] = []
717 is_caching: bool = is_caching_snapshots(p, src)
718 props: str = "volblocksize,recordsize,name"
719 props = "snapshots_changed," + props if is_caching else props
720 cmd: list[str] = p.split_args(
721 f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o {props} {p.recursive_flag}", src.root_dataset
722 )
723 for line in (try_ssh_command(self, src, LOG_DEBUG, cmd=cmd) or "").splitlines():
724 cols: list[str] = line.split("\t")
725 snapshots_changed, volblocksize, recordsize, src_dataset = cols if is_caching else ["-"] + cols
726 self.src_properties[src_dataset] = DatasetProperties(
727 recordsize=int(recordsize) if recordsize != "-" else -int(volblocksize),
728 snapshots_changed=int(snapshots_changed) if snapshots_changed and snapshots_changed != "-" else 0,
729 )
730 basis_src_datasets.append(src_dataset)
731 assert (not self.is_test_mode) or basis_src_datasets == sorted(basis_src_datasets), "List is not sorted"
732 return basis_src_datasets
734 def list_dst_datasets_task(self) -> list[str]:
735 """Lists datasets on the destination host."""
736 p, log = self.params, self.params.log
737 dst = p.dst
738 is_caching: bool = is_caching_snapshots(p, dst) and p.monitor_snapshots_config.enable_monitor_snapshots
739 props: str = "name"
740 props = "snapshots_changed," + props if is_caching else props
741 cmd: list[str] = p.split_args(
742 f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o {props} {p.recursive_flag}", dst.root_dataset
743 )
744 basis_dst_datasets: list[str] = []
745 basis_dst_datasets_str: str | None = try_ssh_command(self, dst, LOG_TRACE, cmd=cmd)
746 if basis_dst_datasets_str is None:
747 log.warning("Destination dataset does not exist: %s", dst.root_dataset)
748 else:
749 for line in basis_dst_datasets_str.splitlines():
750 cols: list[str] = line.split("\t")
751 snapshots_changed, dst_dataset = cols if is_caching else ["-"] + cols
752 self.dst_properties[dst_dataset] = DatasetProperties(
753 recordsize=0,
754 snapshots_changed=int(snapshots_changed) if snapshots_changed and snapshots_changed != "-" else 0,
755 )
756 basis_dst_datasets.append(dst_dataset)
757 assert (not self.is_test_mode) or basis_dst_datasets == sorted(basis_dst_datasets), "List is not sorted"
758 return basis_dst_datasets
760 def create_src_snapshots_task(self, basis_src_datasets: list[str], src_datasets: list[str]) -> None:
761 """Atomically creates a new snapshot of the src datasets selected by --{include|exclude}-dataset* policy; implements
762 --create-src-snapshots.
764 The implementation attempts to fit as many datasets as possible into a single (atomic) 'zfs snapshot' command line,
765 using lexicographical sort order, and using 'zfs snapshot -r' to the extent that this is compatible with the
766 --{include|exclude}-dataset* pruning policy. The snapshots of all datasets that fit within the same single 'zfs
767 snapshot' CLI invocation will be taken within the same ZFS transaction group, and correspondingly have identical
768 'createtxg' ZFS property (but not necessarily identical 'creation' ZFS time property as ZFS actually provides no such
769 guarantee), and thus be consistent. Dataset names that can't fit into a single command line are spread over multiple
770 command line invocations, respecting the limits that the operating system places on the maximum length of a single
771 command line, per `getconf ARG_MAX`.
773 Time complexity is O(max(N log N, M log M)) where N is the number of datasets and M is the number of snapshots per
774 dataset. Space complexity is O(max(N, M)).
775 """
776 p, log = self.params, self.params.log
777 src = p.src
778 if len(basis_src_datasets) == 0:
779 die(f"Source dataset does not exist: {src.basis_root_dataset}")
780 datasets_to_snapshot: dict[SnapshotLabel, list[str]] = self.find_datasets_to_snapshot(src_datasets)
781 datasets_to_snapshot = {label: datasets for label, datasets in datasets_to_snapshot.items() if len(datasets) > 0}
782 basis_datasets_to_snapshot: dict[SnapshotLabel, list[str]] = datasets_to_snapshot.copy() # shallow copy
783 commands: dict[SnapshotLabel, list[str]] = {}
784 for label, datasets in datasets_to_snapshot.items():
785 cmd: list[str] = p.split_args(f"{src.sudo} {p.zfs_program} snapshot")
786 if p.recursive:
787 # Run 'zfs snapshot -r' on the roots of subtrees if possible, else fallback to non-recursive CLI flavor
788 root_datasets = self.root_datasets_if_recursive_zfs_snapshot_is_possible(datasets, basis_src_datasets)
789 if root_datasets is not None:
790 cmd.append("-r") # recursive; takes a snapshot of all datasets in the subtree(s)
791 datasets_to_snapshot[label] = root_datasets
792 commands[label] = cmd
793 creation_msg = f"Creating {sum(len(datasets) for datasets in basis_datasets_to_snapshot.values())} snapshots"
794 log.info(p.dry("--create-src-snapshots: %s"), f"{creation_msg} within {len(src_datasets)} datasets ...")
795 # create snapshots in large (parallel) batches, without using a command line that's too big for the OS to handle
796 run_ssh_cmd_parallel(
797 self,
798 src,
799 ((commands[lbl], (f"{ds}@{lbl}" for ds in datasets)) for lbl, datasets in datasets_to_snapshot.items()),
800 fn=lambda cmd, batch: run_ssh_command(self, src, is_dry=p.dry_run, print_stdout=True, cmd=cmd + batch),
801 max_batch_items=2**29,
802 )
803 if is_caching_snapshots(p, src):
804 # perf: copy lastmodified time of source dataset into local cache to reduce future 'zfs list -t snapshot' calls
805 self.cache.update_last_modified_cache(basis_datasets_to_snapshot)
807 def delete_destination_snapshots_task(
808 self, basis_src_datasets: list[str], dst_datasets: list[str], max_workers: int, task_description: str
809 ) -> bool:
810 """Deletes existing destination snapshots that do not exist within the source dataset if they are included by the
811 --{include|exclude}-snapshot-* policy, and the destination dataset is included via --{include|exclude}-dataset*
812 policy; implements --delete-dst-snapshots."""
813 p, log = self.params, self.params.log
814 src, dst = p.src, p.dst
815 kind: str = "bookmark" if p.delete_dst_bookmarks else "snapshot"
816 filter_needs_creation_time: bool = has_timerange_filter(p.snapshot_filters)
817 props: str = self.creation_prefix + "creation,guid,name" if filter_needs_creation_time else "guid,name"
818 basis_src_datasets_set: set[str] = set(basis_src_datasets)
819 num_snapshots_found, num_snapshots_deleted = 0, 0
821 def delete_destination_snapshots(dst_dataset: str, tid: str, retry: Retry) -> bool: # thread-safe
822 src_dataset: str = replace_prefix(dst_dataset, old_prefix=dst.root_dataset, new_prefix=src.root_dataset)
823 if src_dataset in basis_src_datasets_set and (are_bookmarks_enabled(p, src) or not p.delete_dst_bookmarks):
824 src_kind: str = kind
825 if not p.delete_dst_snapshots_no_crosscheck:
826 src_kind = "snapshot,bookmark" if are_bookmarks_enabled(p, src) else "snapshot"
827 src_cmd = p.split_args(f"{p.zfs_program} list -t {src_kind} -d 1 -s name -Hp -o guid", src_dataset)
828 else:
829 src_cmd = None
830 dst_cmd = p.split_args(f"{p.zfs_program} list -t {kind} -d 1 -s createtxg -Hp -o {props}", dst_dataset)
831 self.maybe_inject_delete(dst, dataset=dst_dataset, delete_trigger="zfs_list_delete_dst_snapshots")
832 src_snaps_with_guids, dst_snaps_with_guids_str = run_in_parallel( # list src+dst snapshots in parallel
833 lambda: set(run_ssh_command(self, src, LOG_TRACE, cmd=src_cmd).splitlines() if src_cmd else []),
834 lambda: try_ssh_command(self, dst, LOG_TRACE, cmd=dst_cmd),
835 )
836 if dst_snaps_with_guids_str is None:
837 log.warning("Third party deleted destination: %s", dst_dataset)
838 return False
839 dst_snaps_with_guids: list[str] = dst_snaps_with_guids_str.splitlines()
840 num_dst_snaps_with_guids = len(dst_snaps_with_guids)
841 basis_dst_snaps_with_guids: list[str] = dst_snaps_with_guids.copy()
842 if p.delete_dst_bookmarks:
843 replace_in_lines(dst_snaps_with_guids, old="#", new="@", count=1) # treat bookmarks as snapshots
844 # The check against the source dataset happens *after* filtering the dst snapshots with filter_snapshots().
845 # `p.delete_dst_snapshots_except` means the user wants to specify snapshots to *retain* aka *keep*
846 all_except: bool = p.delete_dst_snapshots_except
847 if p.delete_dst_snapshots_except and not is_dummy(src):
848 # However, as here we are in "except" mode AND the source is NOT a dummy, we first filter to get what
849 # the policy says to *keep* (so all_except=False for the filter_snapshots() call), then from that "keep"
850 # list, we later further refine by checking what's on the source dataset.
851 all_except = False
852 dst_snaps_with_guids = filter_snapshots(self, dst_snaps_with_guids, all_except=all_except)
853 if p.delete_dst_bookmarks:
854 replace_in_lines(dst_snaps_with_guids, old="@", new="#", count=1) # restore pre-filtering bookmark state
855 if filter_needs_creation_time:
856 dst_snaps_with_guids = cut(field=2, lines=dst_snaps_with_guids)
857 basis_dst_snaps_with_guids = cut(field=2, lines=basis_dst_snaps_with_guids)
858 if p.delete_dst_snapshots_except and not is_dummy(src): # Non-dummy Source + "Except" (Keep) Mode
859 # Retain dst snapshots that match snapshot filter policy AND are on src dataset, aka
860 # Delete dst snapshots except snapshots that match snapshot filter policy AND are on src dataset.
861 # Concretely, `dst_snaps_with_guids` contains GUIDs of DST snapshots that the filter policy says to KEEP.
862 # We only actually keep them if they are ALSO on the SRC.
863 # So, snapshots to DELETE (`dst_tags_to_delete`) are ALL snapshots on DST (`basis_dst_snaps_with_guids`)
864 # EXCEPT those whose GUIDs are in `dst_snaps_with_guids` AND ALSO in `src_snaps_with_guids`.
865 except_dst_guids: set[str] = set(cut(field=1, lines=dst_snaps_with_guids)).intersection(src_snaps_with_guids)
866 dst_tags_to_delete: list[str] = filter_lines_except(basis_dst_snaps_with_guids, except_dst_guids)
867 else: # Standard Delete Mode OR Dummy Source + "Except" (Keep) Mode
868 # In standard delete mode:
869 # `dst_snaps_with_guids` contains GUIDs of policy-selected snapshots on DST.
870 # We delete those that are NOT on SRC.
871 # `dst_tags_to_delete` = `dst_snaps_with_guids` - `src_snaps_with_guids`.
872 # In dummy source + "except" (keep) mode:
873 # `all_except` was True.
874 # `dst_snaps_with_guids` contains snaps NOT matching the "keep" policy -- these are the ones to delete.
875 # `src_snaps_with_guids` is empty.
876 # `dst_tags_to_delete` = `dst_snaps_with_guids` - {} = `dst_snaps_with_guids`.
877 dst_guids_to_delete = set(cut(field=1, lines=dst_snaps_with_guids)).difference(src_snaps_with_guids)
878 dst_tags_to_delete = filter_lines(dst_snaps_with_guids, dst_guids_to_delete)
879 separator: str = "#" if p.delete_dst_bookmarks else "@"
880 dst_tags_to_delete = cut(field=2, separator=separator, lines=dst_tags_to_delete)
881 if p.delete_dst_bookmarks:
882 delete_bookmarks(self, dst, dst_dataset, snapshot_tags=dst_tags_to_delete)
883 else:
884 delete_snapshots(self, dst, dst_dataset, snapshot_tags=dst_tags_to_delete)
885 with self.stats_lock:
886 nonlocal num_snapshots_found
887 num_snapshots_found += num_dst_snaps_with_guids
888 nonlocal num_snapshots_deleted
889 num_snapshots_deleted += len(dst_tags_to_delete)
890 if len(dst_tags_to_delete) > 0 and not p.delete_dst_bookmarks:
891 self.dst_properties[dst_dataset].snapshots_changed = 0 # invalidate cache
892 return True
894 # Run delete_destination_snapshots(dataset) for each dataset, while handling errors, retries + parallel exec
895 failed: bool = False
896 if are_bookmarks_enabled(p, dst) or not p.delete_dst_bookmarks:
897 start_time_nanos = time.monotonic_ns()
898 failed = process_datasets_in_parallel_and_fault_tolerant(
899 log=log,
900 datasets=dst_datasets,
901 process_dataset=delete_destination_snapshots, # lambda
902 skip_tree_on_error=lambda dataset: False,
903 skip_on_error=p.skip_on_error,
904 max_workers=max_workers,
905 termination_event=self.termination_event,
906 termination_handler=self.terminate,
907 enable_barriers=False,
908 task_name="--delete-dst-snapshots",
909 append_exception=self.append_exception,
910 retry_policy=p.retry_policy,
911 dry_run=p.dry_run,
912 is_test_mode=self.is_test_mode,
913 )
914 elapsed_nanos = time.monotonic_ns() - start_time_nanos
915 log.info(
916 p.dry("--delete-dst-snapshots: %s"),
917 task_description + f" [Deleted {num_snapshots_deleted} out of {num_snapshots_found} {kind}s "
918 f"within {len(dst_datasets)} datasets; took {human_readable_duration(elapsed_nanos)}]",
919 )
920 return failed
922 def delete_dst_datasets_task(
923 self, basis_src_datasets: list[str], basis_dst_datasets: list[str], sorted_dst_datasets: list[str]
924 ) -> tuple[list[str], list[str]]:
925 """Deletes existing destination datasets that do not exist within the source dataset if they are included via
926 --{include|exclude}-dataset* policy; implements --delete-dst-datasets.
928 Do not recurse without --recursive. With --recursive, never delete non-selected dataset subtrees or their ancestors.
929 """
930 p = self.params
931 src, dst = p.src, p.dst
932 children: dict[str, set[str]] = defaultdict(set)
933 for dst_dataset in basis_dst_datasets: # Compute the direct children of each NON-FILTERED dataset
934 parent: str = os.path.dirname(dst_dataset)
935 children[parent].add(dst_dataset)
936 to_delete: set[str] = set()
937 for dst_dataset in reversed(sorted_dst_datasets): # Reverse order facilitates efficient O(N) time algorithm
938 if children[dst_dataset].issubset(to_delete):
939 to_delete.add(dst_dataset) # all children are deletable, thus the dataset itself is deletable too
940 to_delete = to_delete.difference(
941 replace_prefix(src_dataset, src.root_dataset, dst.root_dataset) for src_dataset in basis_src_datasets
942 )
943 delete_datasets(self, dst, to_delete)
944 sorted_dst_datasets = sorted(set(sorted_dst_datasets).difference(to_delete))
945 basis_dst_datasets = sorted(set(basis_dst_datasets).difference(to_delete))
946 return basis_dst_datasets, sorted_dst_datasets
948 def delete_empty_dst_datasets_task(
949 self, basis_dst_datasets: list[str], sorted_dst_datasets: list[str]
950 ) -> tuple[list[str], list[str]]:
951 """Deletes any existing destination dataset that has no snapshot and no bookmark if all descendants of that dataset
952 do not have a snapshot or bookmark either; implements --delete-empty-dst-datasets.
954 To do so, we walk the dataset list (conceptually, a tree) depth-first (i.e. sorted descending). If a dst dataset has
955 zero snapshots and zero bookmarks and all its children are already marked as orphans, then it is itself an orphan,
956 and we mark it as such. Walking in a reverse sorted way means that we efficiently check for zero snapshots/bookmarks
957 not just over the direct children but the entire tree. Finally, delete all orphan datasets in an efficient batched
958 way.
959 """
960 p = self.params
961 dst = p.dst
963 # Compute the direct children of each NON-FILTERED dataset. Thus, no non-selected dataset and no ancestor of a
964 # non-selected dataset will ever be added to the "orphan" set. In other words, this treats non-selected dataset
965 # subtrees as if they all had snapshots, so non-selected dataset subtrees and their ancestors are guaranteed
966 # to not get deleted.
967 children: dict[str, set[str]] = defaultdict(set)
968 for dst_dataset in basis_dst_datasets:
969 parent: str = os.path.dirname(dst_dataset)
970 children[parent].add(dst_dataset)
972 def compute_orphans(datasets_having_snapshots: set[str]) -> set[str]:
973 """Returns destination datasets having zero snapshots whose children are all orphans."""
974 orphans: set[str] = set()
975 for dst_dataset in reversed(sorted_dst_datasets): # Reverse order facilitates efficient O(N) time algorithm
976 if (dst_dataset not in datasets_having_snapshots) and children[dst_dataset].issubset(orphans):
977 orphans.add(dst_dataset)
978 return orphans
980 # Compute candidate orphan datasets, which reduces the list of datasets for which we list snapshots via
981 # 'zfs list -t snapshot ...' from dst_datasets to a subset of dst_datasets, which in turn reduces I/O and improves
982 # perf. Essentially, this eliminates the I/O to list snapshots for ancestors of excluded datasets.
983 candidate_orphans: set[str] = compute_orphans(set())
985 # Compute destination datasets having more than zero snapshots
986 dst_datasets_having_snapshots: set[str] = set()
987 with_bookmarks: bool = p.delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots and are_bookmarks_enabled(p, dst)
988 btype: str = "bookmark,snapshot" if with_bookmarks else "snapshot"
989 cmd: list[str] = p.split_args(f"{p.zfs_program} list -t {btype} -d 1 -S name -Hp -o name")
990 for snapshots in zfs_list_snapshots_in_parallel(self, dst, cmd, sorted(candidate_orphans), ordered=False):
991 if with_bookmarks:
992 replace_in_lines(snapshots, old="#", new="@", count=1) # treat bookmarks as snapshots
993 dst_datasets_having_snapshots.update(snap[0 : snap.index("@")] for snap in snapshots) # union
995 orphans = compute_orphans(dst_datasets_having_snapshots) # compute the real orphans
996 delete_datasets(self, dst, orphans) # finally, delete the orphan datasets in an efficient way
997 sorted_dst_datasets = sorted(set(sorted_dst_datasets).difference(orphans))
998 basis_dst_datasets = sorted(set(basis_dst_datasets).difference(orphans))
999 return basis_dst_datasets, sorted_dst_datasets
1001 def monitor_snapshots_task(
1002 self, sorted_src_datasets: list[str], sorted_dst_datasets: list[str], task_description: str
1003 ) -> None:
1004 """Monitors src and dst snapshots; implements --monitor-snapshots."""
1005 p, log = self.params, self.params.log
1006 src, dst = p.src, p.dst
1007 num_cache_hits: int = self.num_cache_hits
1008 num_cache_misses: int = self.num_cache_misses
1009 start_time_nanos: int = time.monotonic_ns()
1010 run_in_parallel(
1011 lambda: self.monitor_snapshots(dst, sorted_dst_datasets),
1012 lambda: self.monitor_snapshots(src, sorted_src_datasets),
1013 )
1014 elapsed: str = human_readable_duration(time.monotonic_ns() - start_time_nanos)
1015 num_cache_hits = self.num_cache_hits - num_cache_hits
1016 num_cache_misses = self.num_cache_misses - num_cache_misses
1017 if num_cache_hits > 0 or num_cache_misses > 0:
1018 msg = self._cache_hits_msg(hits=num_cache_hits, misses=num_cache_misses)
1019 else:
1020 msg = ""
1021 log.info(
1022 "--monitor-snapshots done: %s",
1023 f"{task_description} [{len(sorted_src_datasets) + len(sorted_dst_datasets)} datasets; took {elapsed}{msg}]",
1024 )
1026 def monitor_snapshots(self, remote: Remote, sorted_datasets: list[str]) -> None:
1027 """Checks snapshot freshness and warns or errors out when limits are exceeded.
1029 Alerts the user if the ZFS 'creation' time property of the latest or oldest snapshot for any specified snapshot name
1030 pattern within the selected datasets is too old wrt. the specified age limit. The purpose is to check if snapshots
1031 are successfully taken on schedule, successfully replicated on schedule, and successfully pruned on schedule. Process
1032 exit code is 0, 1, 2 on OK, WARNING, CRITICAL, respectively.
1034 Time complexity is O(max(N log N, M log M)) where N is the number of datasets and M is the number of snapshots per
1035 dataset. Space complexity is O(max(N, M)).
1036 """
1037 p, log = self.params, self.params.log
1038 alerts: list[MonitorSnapshotAlert] = p.monitor_snapshots_config.alerts
1039 labels: list[SnapshotLabel] = [alert.label for alert in alerts]
1040 current_unixtime_millis: float = p.create_src_snapshots_config.current_datetime.timestamp() * 1000
1041 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
1042 if is_caching_snapshots(p, remote):
1043 props: dict[str, DatasetProperties] = self.dst_properties if remote is p.dst else self.src_properties
1044 snapshots_changed_dict: dict[str, int] = {dataset: vals.snapshots_changed for dataset, vals in props.items()}
1045 alerts_hash: str = sha256_128_urlsafe_base64(str(tuple(alerts)))
1046 label_hashes: dict[SnapshotLabel, str] = {
1047 label: sha256_128_urlsafe_base64(label.notimestamp_str()) for label in labels
1048 }
1049 is_caching: bool = False
1051 def monitor_last_modified_cache_file(r: Remote, dataset: str, label: SnapshotLabel, alert_cfg: AlertConfig) -> str:
1052 cache_label: str = os.path.join(MONITOR_CACHE_FILE_PREFIX, alert_cfg.kind[0], label_hashes[label], alerts_hash)
1053 return self.cache.last_modified_cache_file(r, dataset, cache_label)
1055 def alert_msg(
1056 kind: str, dataset: str, snapshot: str, label: SnapshotLabel, snapshot_age_millis: float, delta_millis: int
1057 ) -> str:
1058 assert kind == "Latest" or kind == "Oldest"
1059 lbl = f"{label.prefix}{label.infix}<timestamp>{label.suffix}"
1060 if snapshot_age_millis >= current_unixtime_millis:
1061 return f"No snapshot exists for {dataset}@{lbl}"
1062 msg = f"{kind} snapshot for {dataset}@{lbl} is {human_readable_duration(snapshot_age_millis, unit='ms')} old"
1063 s = f": @{snapshot}" if snapshot else ""
1064 if delta_millis == -1:
1065 return f"{msg}{s}"
1066 return f"{msg} but should be at most {human_readable_duration(delta_millis, unit='ms')} old{s}"
1068 def check_alert(
1069 label: SnapshotLabel, alert_cfg: AlertConfig | None, creation_unixtime_secs: int, dataset: str, snapshot: str
1070 ) -> None: # thread-safe
1071 if alert_cfg is None:
1072 return
1073 if is_caching and not p.dry_run: # update cache with latest state from 'zfs list -t snapshot'
1074 snapshots_changed: int = snapshots_changed_dict.get(dataset, 0)
1075 cache_file: str = monitor_last_modified_cache_file(remote, dataset, label, alert_cfg)
1076 set_last_modification_time_safe(
1077 cache_file, unixtime_in_secs=(creation_unixtime_secs, snapshots_changed), if_more_recent=True
1078 )
1079 warning_millis: int = alert_cfg.warning_millis
1080 critical_millis: int = alert_cfg.critical_millis
1081 alert_kind = alert_cfg.kind
1082 snapshot_age_millis: float = current_unixtime_millis - creation_unixtime_secs * 1000
1083 m = "--monitor_snapshots: "
1084 if snapshot_age_millis > critical_millis:
1085 msg = m + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, critical_millis)
1086 log.critical("%s", msg)
1087 if not p.monitor_snapshots_config.dont_crit:
1088 die(msg, exit_code=CRITICAL_STATUS)
1089 elif snapshot_age_millis > warning_millis:
1090 msg = m + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, warning_millis)
1091 log.warning("%s", msg)
1092 if not p.monitor_snapshots_config.dont_warn:
1093 die(msg, exit_code=WARNING_STATUS)
1094 elif is_debug:
1095 msg = m + "OK. " + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, delta_millis=-1)
1096 log.debug("%s", msg)
1098 def alert_latest_snapshot(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None:
1099 alert: MonitorSnapshotAlert = alerts[i]
1100 check_alert(alert.label, alert.latest, creation_unixtime_secs, dataset, snapshot)
1102 def alert_oldest_snapshot(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None:
1103 alert: MonitorSnapshotAlert = alerts[i]
1104 check_alert(alert.label, alert.oldest, creation_unixtime_secs, dataset, snapshot)
1106 def find_stale_datasets_and_check_alerts() -> list[str]:
1107 """If the cache is enabled, check which datasets have changed to determine which datasets can be skipped cheaply,
1108 that is, without incurring 'zfs list -t snapshots'.
1110 This is done by comparing the "snapshots_changed" ZFS dataset property with the local cache. See
1111 https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed
1112 """
1113 stale_datasets: list[str] = []
1114 time_threshold: float = time.time() - MATURITY_TIME_THRESHOLD_SECS
1115 for dataset in sorted_datasets:
1116 is_stale_dataset: bool = False
1117 snapshots_changed: int = snapshots_changed_dict.get(dataset, 0)
1118 for alert in alerts:
1119 for cfg in (alert.latest, alert.oldest):
1120 if cfg is None:
1121 continue
1122 if (
1123 snapshots_changed != 0
1124 and snapshots_changed < time_threshold
1125 and ( # always True
1126 cached_unix_times := self.cache.get_snapshots_changed2(
1127 monitor_last_modified_cache_file(remote, dataset, alert.label, cfg)
1128 )
1129 )
1130 and snapshots_changed == cached_unix_times[1] # cached snapshots_changed aka last modified time
1131 and snapshots_changed >= cached_unix_times[0] # creation time of minmax snapshot aka access time
1132 ): # cached state is still valid; emit an alert if the latest/oldest snapshot is too old
1133 lbl = alert.label
1134 check_alert(lbl, cfg, creation_unixtime_secs=cached_unix_times[0], dataset=dataset, snapshot="")
1135 else: # cached state is no longer valid; fallback to 'zfs list -t snapshot'
1136 is_stale_dataset = True
1137 if is_stale_dataset:
1138 stale_datasets.append(dataset)
1139 return stale_datasets
1141 # satisfy request from local cache as much as possible
1142 if is_caching_snapshots(p, remote):
1143 stale_datasets: list[str] = find_stale_datasets_and_check_alerts()
1144 with self.stats_lock:
1145 self.num_cache_misses += len(stale_datasets)
1146 self.num_cache_hits += len(sorted_datasets) - len(stale_datasets)
1147 else:
1148 stale_datasets = sorted_datasets
1150 # fallback to 'zfs list -t snapshot' for any remaining datasets, as these couldn't be satisfied from local cache
1151 is_caching = is_caching_snapshots(p, remote)
1152 datasets_without_snapshots: list[str] = self.handle_minmax_snapshots(
1153 remote, stale_datasets, labels, fn_latest=alert_latest_snapshot, fn_oldest=alert_oldest_snapshot
1154 )
1155 for dataset in datasets_without_snapshots:
1156 for i in range(len(alerts)):
1157 alert_latest_snapshot(i, creation_unixtime_secs=0, dataset=dataset, snapshot="")
1158 alert_oldest_snapshot(i, creation_unixtime_secs=0, dataset=dataset, snapshot="")
1160 def replicate_datasets(self, src_datasets: list[str], task_description: str, max_workers: int) -> bool:
1161 """Replicates a list of datasets."""
1162 assert (not self.is_test_mode) or src_datasets == sorted(src_datasets), "List is not sorted"
1163 p, log = self.params, self.params.log
1164 src, dst = p.src, p.dst
1165 self.num_snapshots_found = 0
1166 self.num_snapshots_replicated = 0
1167 log.info("Starting replication task: %s", task_description + f" [{len(src_datasets)} datasets]")
1168 start_time_nanos: int = time.monotonic_ns()
1170 def src2dst(src_dataset: str) -> str:
1171 return replace_prefix(src_dataset, old_prefix=src.root_dataset, new_prefix=dst.root_dataset)
1173 def dst2src(dst_dataset: str) -> str:
1174 return replace_prefix(dst_dataset, old_prefix=dst.root_dataset, new_prefix=src.root_dataset)
1176 def find_stale_datasets() -> tuple[list[str], dict[str, str]]:
1177 """If the cache is enabled on replication, check which src datasets or dst datasets have changed to determine
1178 which datasets can be skipped cheaply, i.e. without incurring 'zfs list -t snapshots'.
1180 This is done by comparing the "snapshots_changed" ZFS dataset property with the local cache. See
1181 https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed
1182 """
1183 # First, check which src datasets have changed since the last replication to that destination
1184 cache_files: dict[str, str] = {}
1185 stale_src_datasets1: list[str] = []
1186 maybe_stale_dst_datasets: list[str] = []
1187 userhost_dir: str = sha256_85_urlsafe_base64(p.dst.cache_namespace())
1188 filter_key = tuple(tuple(f) for f in p.snapshot_filters) # cache is only valid for same --include/excl-snapshot*
1189 filter_hash_code: str = sha256_85_urlsafe_base64(str(filter_key))
1190 for src_dataset in src_datasets:
1191 dst_dataset: str = src2dst(src_dataset) # cache is only valid for identical destination dataset
1192 dst_dataset_dir: str = sha256_85_urlsafe_base64(dst_dataset)
1193 cache_label: str = os.path.join(
1194 REPLICATION_CACHE_FILE_PREFIX, userhost_dir, dst_dataset_dir, filter_hash_code
1195 )
1196 cache_file: str = self.cache.last_modified_cache_file(src, src_dataset, cache_label)
1197 cache_files[src_dataset] = cache_file
1198 snapshots_changed: int = self.src_properties[src_dataset].snapshots_changed # get prop "for free"
1199 if (
1200 snapshots_changed != 0
1201 and time.time() > snapshots_changed + MATURITY_TIME_THRESHOLD_SECS
1202 and snapshots_changed == self.cache.get_snapshots_changed(cache_file)
1203 ):
1204 maybe_stale_dst_datasets.append(dst_dataset)
1205 else:
1206 stale_src_datasets1.append(src_dataset)
1208 # For each src dataset that hasn't changed, check if the corresponding dst dataset has changed
1209 stale_src_datasets2: list[str] = []
1210 dst_snapshots_changed_dict: dict[str, int] = self.cache.zfs_get_snapshots_changed(dst, maybe_stale_dst_datasets)
1211 for dst_dataset in maybe_stale_dst_datasets:
1212 snapshots_changed = dst_snapshots_changed_dict.get(dst_dataset, 0)
1213 if (
1214 snapshots_changed != 0
1215 and time.time() > snapshots_changed + MATURITY_TIME_THRESHOLD_SECS
1216 and snapshots_changed
1217 == self.cache.get_snapshots_changed(self.cache.last_modified_cache_file(dst, dst_dataset))
1218 ):
1219 log.info("Already up-to-date [cached]: %s", dst_dataset)
1220 else:
1221 stale_src_datasets2.append(dst2src(dst_dataset))
1222 assert (not self.is_test_mode) or stale_src_datasets1 == sorted(stale_src_datasets1), "List is not sorted"
1223 assert (not self.is_test_mode) or stale_src_datasets2 == sorted(stale_src_datasets2), "List is not sorted"
1224 stale_src_datasets = list(heapq.merge(stale_src_datasets1, stale_src_datasets2)) # merge two sorted lists
1225 assert (not self.is_test_mode) or not has_duplicates(stale_src_datasets), "List contains duplicates"
1226 return stale_src_datasets, cache_files
1228 if is_caching_snapshots(p, src):
1229 stale_src_datasets, cache_files = find_stale_datasets()
1230 num_cache_misses = len(stale_src_datasets)
1231 num_cache_hits = len(src_datasets) - len(stale_src_datasets)
1232 self.num_cache_misses += num_cache_misses
1233 self.num_cache_hits += num_cache_hits
1234 cmsg = self._cache_hits_msg(hits=num_cache_hits, misses=num_cache_misses)
1235 else:
1236 stale_src_datasets = src_datasets
1237 cache_files = {}
1238 cmsg = ""
1240 # Run replicate_dataset(dataset) for each dataset, while taking care of errors, retries + parallel execution
1241 failed: bool = process_datasets_in_parallel_and_fault_tolerant(
1242 log=log,
1243 datasets=stale_src_datasets,
1244 process_dataset=lambda src_dataset, tid, retry: replicate_dataset(self, src_dataset, tid, retry),
1245 skip_tree_on_error=lambda dataset: not self.dst_dataset_exists[src2dst(dataset)],
1246 skip_on_error=p.skip_on_error,
1247 max_workers=max_workers,
1248 termination_event=self.termination_event,
1249 termination_handler=self.terminate,
1250 enable_barriers=False,
1251 task_name="Replication",
1252 append_exception=self.append_exception,
1253 retry_policy=p.retry_policy,
1254 dry_run=p.dry_run,
1255 is_test_mode=self.is_test_mode,
1256 )
1258 if is_caching_snapshots(p, src) and not failed:
1259 # refresh "snapshots_changed" ZFS dataset property from dst
1260 stale_dst_datasets: list[str] = [src2dst(src_dataset) for src_dataset in stale_src_datasets]
1261 dst_snapshots_changed_dict: dict[str, int] = self.cache.zfs_get_snapshots_changed(dst, stale_dst_datasets)
1262 for dst_dataset in stale_dst_datasets: # update local cache
1263 dst_snapshots_changed: int = dst_snapshots_changed_dict.get(dst_dataset, 0)
1264 dst_cache_file: str = self.cache.last_modified_cache_file(dst, dst_dataset)
1265 src_dataset: str = dst2src(dst_dataset)
1266 src_snapshots_changed: int = self.src_properties[src_dataset].snapshots_changed
1267 if not p.dry_run:
1268 set_last_modification_time_safe(
1269 cache_files[src_dataset], unixtime_in_secs=src_snapshots_changed, if_more_recent=True
1270 )
1271 set_last_modification_time_safe(
1272 dst_cache_file, unixtime_in_secs=dst_snapshots_changed, if_more_recent=True
1273 )
1275 elapsed_nanos: int = time.monotonic_ns() - start_time_nanos
1276 log.info(
1277 p.dry("Replication done: %s"),
1278 f"{task_description} [Replicated {self.num_snapshots_replicated} out of {self.num_snapshots_found} snapshots"
1279 f" within {len(src_datasets)} datasets; took {human_readable_duration(elapsed_nanos)}{cmsg}]",
1280 )
1281 return failed
1283 def maybe_inject_delete(self, remote: Remote, dataset: str, delete_trigger: str) -> None:
1284 """For testing only; for unit tests to delete datasets during replication and test correct handling of that."""
1285 assert delete_trigger
1286 counter = self.delete_injection_triggers.get("before")
1287 if counter and decrement_injection_counter(self, counter, delete_trigger):
1288 p = self.params
1289 cmd = p.split_args(f"{remote.sudo} {p.zfs_program} destroy -r", p.force_unmount, p.force_hard, dataset or "")
1290 run_ssh_command(self, remote, LOG_DEBUG, print_stdout=True, cmd=cmd)
1292 def maybe_inject_params(self, error_trigger: str) -> None:
1293 """For testing only; for unit tests to simulate errors during replication and test correct handling of them."""
1294 assert error_trigger
1295 counter = self.error_injection_triggers.get("before")
1296 if counter and decrement_injection_counter(self, counter, error_trigger):
1297 self.inject_params = self.param_injection_triggers[error_trigger]
1298 elif error_trigger in self.param_injection_triggers:
1299 self.inject_params = {}
1301 @staticmethod
1302 def recv_option_property_names(recv_opts: list[str]) -> set[str]:
1303 """Extracts -o and -x property names that are already specified on the command line; This can be used to check for
1304 dupes because 'zfs receive' does not accept multiple -o or -x options with the same property name."""
1305 propnames = set()
1306 i = 0
1307 n = len(recv_opts)
1308 while i < n:
1309 stripped: str = recv_opts[i].strip()
1310 if stripped in ("-o", "-x"):
1311 i += 1
1312 if i == n or recv_opts[i].strip() in ("-o", "-x"):
1313 die(f"Missing value for {stripped} option in --zfs-recv-program-opt(s): {' '.join(recv_opts)}")
1314 if stripped == "-o" and "=" not in recv_opts[i]:
1315 die(f"Missing value for {stripped} name=value pair in --zfs-recv-program-opt(s): {' '.join(recv_opts)}")
1316 propname: str = recv_opts[i] if stripped == "-x" else recv_opts[i].split("=", 1)[0]
1317 validate_property_name(propname, "--zfs-recv-program-opt(s)")
1318 propnames.add(propname)
1319 i += 1
1320 return propnames
1322 def root_datasets_if_recursive_zfs_snapshot_is_possible(
1323 self, datasets: list[str], basis_datasets: list[str]
1324 ) -> list[str] | None:
1325 """Returns the root datasets within the (filtered) `datasets` list if no incompatible pruning is detected. A dataset
1326 within `datasets` is considered a root dataset if it has no parent, i.e. it is not a descendant of any dataset in
1327 `datasets`. Returns `None` if any (unfiltered) dataset in `basis_dataset` that is a descendant of at least one of the
1328 root datasets is missing in `datasets`, indicating that --include/exclude-dataset* or the snapshot schedule have
1329 pruned a dataset in a way that is incompatible with 'zfs snapshot -r' CLI semantics, thus requiring a switch to the
1330 non-recursive 'zfs snapshot snapshot1 .. snapshot N' CLI flavor.
1332 Assumes that set(datasets).issubset(set(basis_datasets)). Also assumes that datasets and basis_datasets are both
1333 sorted (and thus the output root_datasets list is sorted too), which is why this algorithm is efficient - O(N) time
1334 complexity. The impl is akin to the merge algorithm of a merge sort, adapted to our specific use case.
1335 See root_datasets_if_recursive_zfs_snapshot_is_possible_slow_but_correct() in the unit test suite for an alternative
1336 impl that's easier to grok.
1337 """
1338 assert (not self.is_test_mode) or datasets == sorted(datasets), "List is not sorted"
1339 assert (not self.is_test_mode) or not has_duplicates(datasets), "List contains duplicates"
1340 assert (not self.is_test_mode) or basis_datasets == sorted(basis_datasets), "List is not sorted"
1341 assert (not self.is_test_mode) or not has_duplicates(basis_datasets), "List contains duplicates"
1342 assert (not self.is_test_mode) or set(datasets).issubset(set(basis_datasets)), "Not a subset"
1343 root_datasets: list[str] = self.find_root_datasets(datasets)
1344 i = 0
1345 j = 0
1346 k = 0
1347 len_root_datasets = len(root_datasets)
1348 len_basis_datasets = len(basis_datasets)
1349 len_datasets = len(datasets)
1350 while i < len_root_datasets and j < len_basis_datasets: # walk and "merge" the sorted lists, in sync
1351 if basis_datasets[j] < root_datasets[i]: # irrelevant subtree?
1352 j += 1 # move to next basis_datasets[j]
1353 elif is_descendant(basis_datasets[j], of_root_dataset=root_datasets[i]): # relevant subtree?
1354 while k < len_datasets and datasets[k] < basis_datasets[j]:
1355 k += 1 # move to next datasets[k]
1356 if k == len_datasets or datasets[k] != basis_datasets[j]: # dataset chopped off by schedule or --incl/excl*?
1357 return None # detected filter pruning that is incompatible with 'zfs snapshot -r'
1358 j += 1 # move to next basis_datasets[j]
1359 else:
1360 i += 1 # move to next root_dataset[i]; no need to check root_datasets that are no longer (or not yet) reachable
1361 return root_datasets
1363 @staticmethod
1364 def find_root_datasets(sorted_datasets: list[str]) -> list[str]:
1365 """Returns the roots of the subtrees in the (sorted) input datasets; The output root dataset list is sorted, too; A
1366 dataset is a root dataset if it has no parent, i.e. it is not a descendant of any dataset in the input datasets."""
1367 root_datasets: list[str] = []
1368 skip_dataset: str = DONT_SKIP_DATASET
1369 for dataset in sorted_datasets:
1370 if is_descendant(dataset, of_root_dataset=skip_dataset):
1371 continue
1372 skip_dataset = dataset
1373 root_datasets.append(dataset)
1374 return root_datasets
1376 def find_datasets_to_snapshot(self, sorted_datasets: list[str]) -> dict[SnapshotLabel, list[str]]:
1377 """Given a (sorted) list of source datasets, returns a dict where the key is a snapshot name (aka SnapshotLabel, e.g.
1378 bzfs_2024-11-06_08:30:05_hourly) and the value is the (sorted) (sub)list of datasets for which a snapshot needs to be
1379 created with that name, because these datasets are due per the schedule, either because the 'creation' time of their
1380 most recent snapshot with that name pattern is now too old, or such a snapshot does not even exist.
1382 The baseline implementation uses the 'zfs list -t snapshot' CLI to find the most recent snapshots, which is simple
1383 but doesn't scale well with the number of snapshots, at least if the goal is to take snapshots every second. An
1384 alternative, much more scalable, implementation queries the standard ZFS "snapshots_changed" dataset property
1385 (requires zfs >= 2.2.0), in combination with a local cache that stores this property, as well as the creation time of
1386 the most recent snapshot, for each SnapshotLabel and each dataset.
1387 """
1388 p, log = self.params, self.params.log
1389 src = p.src
1390 config: CreateSrcSnapshotConfig = p.create_src_snapshots_config
1391 datasets_to_snapshot: dict[SnapshotLabel, list[str]] = defaultdict(list)
1392 is_caching: bool = False
1393 interner: Interner[datetime] = Interner() # reduces memory footprint
1394 msgs: list[tuple[datetime, str, SnapshotLabel, str]] = []
1396 def create_snapshot_if_latest_is_too_old(
1397 datasets_to_snapshot: dict[SnapshotLabel, list[str]], dataset: str, label: SnapshotLabel, creation_unixtime: int
1398 ) -> None: # thread-safe
1399 """Schedules creation of a snapshot for the given label if the label's existing latest snapshot is too old."""
1400 creation_dt: datetime = datetime.fromtimestamp(creation_unixtime, tz=config.tz)
1401 log.log(LOG_TRACE, "Latest snapshot creation: %s for %s", creation_dt, label)
1402 duration_amount, duration_unit = config.suffix_durations[label.suffix]
1403 next_event_dt: datetime = config.anchors.round_datetime_up_to_duration_multiple(
1404 creation_dt + timedelta(microseconds=1), duration_amount, duration_unit
1405 )
1406 msg: str = ""
1407 if config.current_datetime >= next_event_dt:
1408 datasets_to_snapshot[label].append(dataset) # mark it as scheduled for snapshot creation
1409 msg = " has passed"
1410 next_event_dt = interner.intern(next_event_dt)
1411 msgs.append((next_event_dt, dataset, label, msg))
1412 if is_caching and not p.dry_run: # update cache with latest state from 'zfs list -t snapshot'
1413 # Per-label cache stores (atime=creation, mtime=snapshots_changed) so later runs can safely trust creation
1414 # only when the label's mtime matches the current dataset-level '=' cache value. Excludes timestamp of label.
1415 cache_file: str = self.cache.last_modified_cache_file(src, dataset, label_hashes[label])
1416 unixtimes: tuple[int, int] = (creation_unixtime, self.src_properties[dataset].snapshots_changed)
1417 set_last_modification_time_safe(cache_file, unixtime_in_secs=unixtimes, if_more_recent=True)
1419 labels: list[SnapshotLabel] = []
1420 config_labels: list[SnapshotLabel] = config.snapshot_labels()
1421 for label in config_labels:
1422 duration_amount_, _duration_unit = config.suffix_durations[label.suffix]
1423 if duration_amount_ == 0 or config.create_src_snapshots_even_if_not_due:
1424 datasets_to_snapshot[label] = sorted_datasets # take snapshot regardless of creation time of existing snaps
1425 else:
1426 labels.append(label)
1427 if len(labels) == 0:
1428 return datasets_to_snapshot # nothing more TBD
1429 label_hashes: dict[SnapshotLabel, str] = {
1430 label: sha256_128_urlsafe_base64(label.notimestamp_str()) for label in labels
1431 }
1433 # satisfy request from local cache as much as possible
1434 cached_datasets_to_snapshot: dict[SnapshotLabel, list[str]] = defaultdict(list)
1435 if is_caching_snapshots(p, src):
1436 sorted_datasets_todo: list[str] = []
1437 time_threshold: float = time.time() - MATURITY_TIME_THRESHOLD_SECS
1438 for dataset in sorted_datasets:
1439 cache: SnapshotCache = self.cache
1440 cached_snapshots_changed: int = cache.get_snapshots_changed(cache.last_modified_cache_file(src, dataset))
1441 if cached_snapshots_changed == 0:
1442 sorted_datasets_todo.append(dataset) # request cannot be answered from cache
1443 continue
1444 if cached_snapshots_changed != self.src_properties[dataset].snapshots_changed: # get that prop "for free"
1445 cache.invalidate_last_modified_cache_dataset(dataset)
1446 sorted_datasets_todo.append(dataset) # request cannot be answered from cache
1447 continue
1448 if cached_snapshots_changed >= time_threshold: # Avoid equal-second races: only trust matured cache entries
1449 sorted_datasets_todo.append(dataset) # cache entry isn't mature enough to be trusted; skip cache
1450 continue
1451 creation_unixtimes: list[int] = []
1452 for label_hash in label_hashes.values():
1453 # For per-label files, atime stores the latest matching snapshot's creation time, while mtime stores
1454 # the dataset-level snapshots_changed observed when this label file was written.
1455 atime, mtime = cache.get_snapshots_changed2(cache.last_modified_cache_file(src, dataset, label_hash))
1456 # Sanity check: trust per-label cache only when:
1457 # - mtime equals the dataset-level '=' cache (same snapshots_changed), and
1458 # - atime is plausible and not later than mtime (creation <= snapshots_changed), and
1459 # - neither atime nor mtime is zero (unknown provenance).
1460 # Otherwise fall back to 'zfs list -t snapshot' to avoid stale creation times after newer changes.
1461 if atime == 0 or mtime == 0 or mtime != cached_snapshots_changed or atime > mtime:
1462 sorted_datasets_todo.append(dataset) # request cannot be answered from cache
1463 break
1464 creation_unixtimes.append(atime)
1465 if len(creation_unixtimes) == len(labels):
1466 for j, label in enumerate(labels):
1467 create_snapshot_if_latest_is_too_old(
1468 cached_datasets_to_snapshot, dataset, label, creation_unixtimes[j]
1469 )
1470 sorted_datasets = sorted_datasets_todo
1472 def create_snapshot_fn(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None:
1473 create_snapshot_if_latest_is_too_old(datasets_to_snapshot, dataset, labels[i], creation_unixtime_secs)
1475 def on_finish_dataset(dataset: str) -> None:
1476 if is_caching and not p.dry_run:
1477 set_last_modification_time_safe(
1478 self.cache.last_modified_cache_file(src, dataset),
1479 unixtime_in_secs=self.src_properties[dataset].snapshots_changed,
1480 if_more_recent=True,
1481 )
1483 # fallback to 'zfs list -t snapshot' for any remaining datasets, as these couldn't be satisfied from local cache
1484 is_caching = is_caching_snapshots(p, src)
1485 datasets_without_snapshots: list[str] = self.handle_minmax_snapshots(
1486 src, sorted_datasets, labels, fn_latest=create_snapshot_fn, fn_on_finish_dataset=on_finish_dataset
1487 )
1488 for lbl in labels: # merge (sorted) results from local cache + 'zfs list -t snapshot' into (sorted) combined result
1489 datasets_to_snapshot[lbl].sort()
1490 if datasets_without_snapshots or (lbl in cached_datasets_to_snapshot): # +take snaps for snapshot-less datasets
1491 datasets_to_snapshot[lbl] = list( # inputs to merge() are sorted, and outputs are sorted too
1492 heapq.merge(datasets_to_snapshot[lbl], cached_datasets_to_snapshot[lbl], datasets_without_snapshots)
1493 )
1494 for label, datasets in datasets_to_snapshot.items():
1495 assert (not self.is_test_mode) or datasets == sorted(datasets), "List is not sorted"
1496 assert (not self.is_test_mode) or not has_duplicates(datasets), "List contains duplicates"
1497 assert label
1499 msgs.sort() # sort by time, dataset, label
1500 for i in range(0, len(msgs), 10_000): # reduce logging overhead via mini-batching
1501 text = "".join(
1502 f"\nNext scheduled snapshot time: {next_event_dt} for {dataset}@{label}{msg}"
1503 for next_event_dt, dataset, label, msg in msgs[i : i + 10_000]
1504 )
1505 log.info("Next scheduled snapshot times ...%s", text)
1507 # sort keys to ensure that we take snapshots for dailies before hourlies, and so on
1508 label_indexes: dict[SnapshotLabel, int] = {label: k for k, label in enumerate(config_labels)}
1509 datasets_to_snapshot = dict(sorted(datasets_to_snapshot.items(), key=lambda kv: label_indexes[kv[0]]))
1510 return datasets_to_snapshot
1512 def handle_minmax_snapshots(
1513 self,
1514 remote: Remote,
1515 sorted_datasets: list[str],
1516 labels: list[SnapshotLabel],
1517 fn_latest: Callable[[int, int, str, str], None], # callback function for latest snapshot
1518 fn_oldest: Callable[[int, int, str, str], None] | None = None, # callback function for oldest snapshot
1519 fn_on_finish_dataset: Callable[[str], None] = lambda dataset: None,
1520 ) -> list[str]: # thread-safe
1521 """For each dataset in `sorted_datasets`, for each label in `labels`, finds the latest and oldest snapshot, and runs
1522 the callback functions on them; Ignores the timestamp of the input labels and the timestamp of the snapshot names."""
1523 assert (not self.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted"
1524 p = self.params
1525 cmd = p.split_args(f"{p.zfs_program} list -t snapshot -d 1 -Hp -o createtxg,creation,name") # sort dataset,createtxg
1526 datasets_with_snapshots: set[str] = set()
1527 interner: SortedInterner[str] = SortedInterner(sorted_datasets) # reduces memory footprint
1528 for lines in zfs_list_snapshots_in_parallel(self, remote, cmd, sorted_datasets, ordered=False):
1529 # streaming group by dataset name (consumes constant memory only)
1530 for dataset, group in itertools.groupby(lines, key=lambda line: line.rsplit("\t", 1)[1].split("@", 1)[0]):
1531 dataset = interner.interned(dataset)
1532 snapshots = sorted( # fetch all snapshots of current dataset and sort by createtxg,creation,name
1533 (int(createtxg), int(creation_unixtime_secs), name.split("@", 1)[1])
1534 for createtxg, creation_unixtime_secs, name in (line.split("\t", 2) for line in group)
1535 ) # perf: sorted() is fast because Timsort is close to O(N) for nearly sorted input, which is our case
1536 assert len(snapshots) > 0
1537 datasets_with_snapshots.add(dataset)
1538 snapshot_names: tuple[str, ...] = tuple(snapshot[-1] for snapshot in snapshots)
1539 reversed_snapshot_names: tuple[str, ...] = snapshot_names[::-1]
1540 year_with_4_digits_regex: re.Pattern[str] = YEAR_WITH_FOUR_DIGITS_REGEX
1541 year_with_4_digits_regex_fullmatch = year_with_4_digits_regex.fullmatch
1542 startswith = str.startswith
1543 endswith = str.endswith
1544 fns = ((fn_latest, True),) if fn_oldest is None else ((fn_latest, True), (fn_oldest, False))
1545 for i, label in enumerate(labels):
1546 infix: str = label.infix
1547 start: str = label.prefix + infix
1548 end: str = label.suffix
1549 startlen: int = len(start)
1550 endlen: int = len(end)
1551 minlen: int = startlen + endlen if infix else 4 + startlen + endlen # year_with_four_digits_regex
1552 startlen_4: int = startlen + 4 # [startlen:startlen+4] # year_with_four_digits_regex
1553 has_infix: bool = bool(infix)
1554 for fn, is_reverse in fns:
1555 creation_unixtime_secs: int = 0 # find creation time of latest or oldest snapshot matching the label
1556 minmax_snapshot: str = ""
1557 for j, snapshot_name in enumerate(reversed_snapshot_names if is_reverse else snapshot_names):
1558 if (
1559 endswith(snapshot_name, end) # aka snapshot_name.endswith(end)
1560 and startswith(snapshot_name, start) # aka snapshot_name.startswith(start)
1561 and len(snapshot_name) >= minlen
1562 and (has_infix or year_with_4_digits_regex_fullmatch(snapshot_name, startlen, startlen_4))
1563 ):
1564 k: int = len(snapshots) - j - 1 if is_reverse else j
1565 creation_unixtime_secs = snapshots[k][1]
1566 minmax_snapshot = snapshot_name
1567 break
1568 fn(i, creation_unixtime_secs, dataset, minmax_snapshot)
1569 fn_on_finish_dataset(dataset)
1570 datasets_without_snapshots = [dataset for dataset in sorted_datasets if dataset not in datasets_with_snapshots]
1571 return datasets_without_snapshots
1573 @staticmethod
1574 def _cache_hits_msg(hits: int, misses: int) -> str:
1575 total = hits + misses
1576 return f", cache hits: {percent(hits, total, print_total=True)}, misses: {percent(misses, total, print_total=True)}"
1579#############################################################################
1580class DatasetProperties:
1581 """Properties of a ZFS dataset."""
1583 __slots__ = ("recordsize", "snapshots_changed") # uses more compact memory layout than __dict__
1585 def __init__(self, recordsize: int, snapshots_changed: int) -> None:
1586 # immutable variables:
1587 self.recordsize: Final[int] = recordsize
1589 # mutable variables:
1590 self.snapshots_changed: int = snapshots_changed
1593# Input format is [[user@]host:]dataset
1594# 1234 5 6
1595DATASET_LOCATOR_REGEX: Final[re.Pattern[str]] = re.compile(r"(((([^@]*)@)?([^:]+)):)?(.*)", flags=re.DOTALL)
1598def parse_dataset_locator(
1599 input_text: str, validate: bool = True, user: str | None = None, host: str | None = None, port: int | None = None
1600) -> tuple[str, str, str, str, str]:
1601 """Splits user@host:dataset into its components with optional checks."""
1603 def convert_ipv6(hostname: str) -> str: # support IPv6 without getting confused by host:dataset colon separator ...
1604 return hostname.replace("|", ":") # ... and any colons that may be part of a (valid) ZFS dataset name
1606 user_undefined: bool = user is None
1607 if user is None:
1608 user = ""
1609 host_undefined: bool = host is None
1610 if host is None:
1611 host = ""
1612 host = convert_ipv6(host)
1613 user_host, dataset, pool = "", "", ""
1615 if match := DATASET_LOCATOR_REGEX.fullmatch(input_text): 1615 ↛ 1632line 1615 didn't jump to line 1632 because the condition on line 1615 was always true
1616 if user_undefined:
1617 user = match.group(4) or ""
1618 if host_undefined:
1619 host = match.group(5) or ""
1620 host = convert_ipv6(host)
1621 if host == "-":
1622 host = ""
1623 dataset = match.group(6) or ""
1624 i = dataset.find("/")
1625 pool = dataset[0:i] if i >= 0 else dataset
1627 if user and host:
1628 user_host = f"{user}@{host}"
1629 elif host:
1630 user_host = host
1632 if validate:
1633 validate_user_name(user, input_text)
1634 validate_host_name(host, input_text)
1635 if port is not None:
1636 validate_port(port, f"Invalid port number: '{port}' for: '{input_text}' - ")
1637 validate_dataset_name(dataset, input_text)
1639 return user, host, user_host, pool, dataset
1642def validate_user_name(user: str, input_text: str) -> None:
1643 """Checks that the username is safe for ssh or local usage."""
1644 invalid_chars: str = SHELL_CHARS + "/"
1645 if user and (".." in user or any(c.isspace() or c in invalid_chars for c in user)):
1646 die(f"Invalid user name: '{user}' for: '{input_text}'")
1649def validate_host_name(host: str, input_text: str) -> None:
1650 """Checks hostname for forbidden characters or patterns."""
1651 invalid_chars: str = SHELL_CHARS + "/"
1652 if host and (host.startswith("-") or ".." in host or any(c.isspace() or c in invalid_chars for c in host)):
1653 die(f"Invalid host name: '{host}' for: '{input_text}'")
1656def validate_port(port: str | int | None, message: str) -> None:
1657 """Checks that port specification is a valid integer."""
1658 if isinstance(port, int):
1659 port = str(port)
1660 if port and not port.isdigit():
1661 die(message + f"must be empty or a positive integer: '{port}'")
1664#############################################################################
1665if __name__ == "__main__": 1665 ↛ 1666line 1665 didn't jump to line 1666 because the condition on line 1665 was never true
1666 main()