Coverage for bzfs_main / bzfs.py: 99%
1052 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15# Inline script metadata conforming to https://packaging.python.org/specifications/inline-script-metadata
16# /// script
17# requires-python = ">=3.9"
18# dependencies = []
19# ///
20#
21"""
22* Main CLI entry point for replicating and managing ZFS snapshots. It handles the low-level mechanics of `zfs send/receive`,
23 data transfer, and snapshot management between two hosts.
24* Overview of the bzfs.py codebase:
25* The codebase starts with docs, definition of input data and associated argument parsing into a "Params" class.
26* All CLI option/parameter values are reachable from the "Params" class.
27* Control flow starts in main(), which kicks off a "Job".
28* A Job runs one or more "tasks" via run_tasks(), each task replicating a separate dataset tree.
29* The core replication algorithm is in run_task() and especially in replicate_datasets() and replicate_dataset().
30* The filter algorithms that apply include/exclude policies are in filter_datasets() and filter_snapshots().
31* The --create-src-snapshots-* and --delete-* and --compare-* and --monitor-* algorithms also start in run_task().
32* The main retry logic is in call_with_retries() and clear_resumable_recv_state_if_necessary().
33* Progress reporting for use during `zfs send/recv` data transfers is in class ProgressReporter.
34* Executing a CLI command on a local or remote host is in run_ssh_command().
35* Network connection management is in refresh_ssh_connection_if_necessary() and class ConnectionPool.
36* Cache functionality can be found by searching for this regex: .*cach.*
37* The parallel processing engine is in itr_ssh_cmd_parallel() and process_datasets_in_parallel_and_fault_tolerant().
38* README.md is mostly auto-generated from the ArgumentParser help texts as the source of "truth", via update_readme.sh.
39 Simply run that script whenever you change or add ArgumentParser help text.
40"""
42from __future__ import (
43 annotations,
44)
45import argparse
46import contextlib
47import fcntl
48import heapq
49import itertools
50import logging
51import os
52import re
53import subprocess
54import sys
55import threading
56import time
57from collections import (
58 Counter,
59 defaultdict,
60)
61from collections.abc import (
62 Collection,
63)
64from datetime import (
65 datetime,
66 timedelta,
67)
68from logging import (
69 Logger,
70)
71from pathlib import (
72 Path,
73)
74from subprocess import (
75 DEVNULL,
76 PIPE,
77 CalledProcessError,
78)
79from typing import (
80 Any,
81 Callable,
82 Final,
83 cast,
84 final,
85)
87import bzfs_main.loggers
88from bzfs_main.argparse_actions import (
89 has_timerange_filter,
90)
91from bzfs_main.argparse_cli import (
92 EXCLUDE_DATASET_REGEXES_DEFAULT,
93)
94from bzfs_main.compare_snapshot_lists import (
95 run_compare_snapshot_lists,
96)
97from bzfs_main.configuration import (
98 AlertConfig,
99 CreateSrcSnapshotConfig,
100 LogParams,
101 MonitorSnapshotAlert,
102 Params,
103 Remote,
104 SnapshotLabel,
105)
106from bzfs_main.detect import (
107 DISABLE_PRG,
108 RemoteConfCacheItem,
109 are_bookmarks_enabled,
110 detect_available_programs,
111 is_caching_snapshots,
112 is_dummy,
113 is_zpool_feature_enabled_or_active,
114)
115from bzfs_main.filter import (
116 SNAPSHOT_REGEX_FILTER_NAME,
117 dataset_regexes,
118 filter_datasets,
119 filter_lines,
120 filter_lines_except,
121 filter_snapshots,
122)
123from bzfs_main.loggers import (
124 get_simple_logger,
125 reset_logger,
126 set_logging_runtime_defaults,
127)
128from bzfs_main.parallel_batch_cmd import (
129 run_ssh_cmd_parallel,
130 zfs_list_snapshots_in_parallel,
131)
132from bzfs_main.progress_reporter import (
133 ProgressReporter,
134 count_num_bytes_transferred_by_zfs_send,
135)
136from bzfs_main.replication import (
137 delete_bookmarks,
138 delete_datasets,
139 delete_snapshots,
140 replicate_dataset,
141)
142from bzfs_main.snapshot_cache import (
143 MATURITY_TIME_THRESHOLD_SECS,
144 MONITOR_CACHE_FILE_PREFIX,
145 REPLICATION_CACHE_FILE_PREFIX,
146 SnapshotCache,
147 set_last_modification_time_safe,
148)
149from bzfs_main.util.connection import (
150 SHARED,
151 ConnectionPool,
152 MiniJob,
153 MiniRemote,
154 timeout,
155)
156from bzfs_main.util.parallel_iterator import (
157 run_in_parallel,
158)
159from bzfs_main.util.parallel_tasktree_policy import (
160 process_datasets_in_parallel_and_fault_tolerant,
161)
162from bzfs_main.util.retry import (
163 Retry,
164 RetryableError,
165 RetryConfig,
166 RetryOptions,
167 call_with_retries,
168)
169from bzfs_main.util.utils import (
170 DESCENDANTS_RE_SUFFIX,
171 DIE_STATUS,
172 DONT_SKIP_DATASET,
173 FILE_PERMISSIONS,
174 LOG_DEBUG,
175 LOG_TRACE,
176 PROG_NAME,
177 SHELL_CHARS,
178 UMASK,
179 YEAR_WITH_FOUR_DIGITS_REGEX,
180 HashedInterner,
181 SortedInterner,
182 Subprocesses,
183 SynchronizedBool,
184 SynchronizedDict,
185 append_if_absent,
186 compile_regexes,
187 cut,
188 die,
189 has_duplicates,
190 human_readable_bytes,
191 human_readable_duration,
192 is_descendant,
193 percent,
194 pretty_print_formatter,
195 replace_in_lines,
196 replace_prefix,
197 sha256_85_urlsafe_base64,
198 sha256_128_urlsafe_base64,
199 stderr_to_str,
200 termination_signal_handler,
201 validate_dataset_name,
202 validate_property_name,
203 xappend,
204 xfinally,
205 xprint,
206)
208# constants:
209__version__: Final[str] = bzfs_main.argparse_cli.__version__
210CRITICAL_STATUS: Final[int] = 2
211WARNING_STATUS: Final[int] = 1
212STILL_RUNNING_STATUS: Final[int] = 4
213MIN_PYTHON_VERSION: Final[tuple[int, int]] = (3, 9)
214if sys.version_info < MIN_PYTHON_VERSION:
215 print(f"ERROR: {PROG_NAME} requires Python version >= {'.'.join(map(str, MIN_PYTHON_VERSION))}!")
216 sys.exit(DIE_STATUS)
219#############################################################################
220def argument_parser() -> argparse.ArgumentParser:
221 """Returns the CLI parser used by bzfs."""
222 return bzfs_main.argparse_cli.argument_parser()
225def main() -> None:
226 """API for command line clients."""
227 prev_umask: int = os.umask(UMASK)
228 try:
229 set_logging_runtime_defaults()
230 # On CTRL-C and SIGTERM, send signal to all descendant processes to terminate them
231 termination_event: threading.Event = threading.Event()
232 with termination_signal_handler(termination_events=[termination_event]):
233 run_main(argument_parser().parse_args(), sys.argv, termination_event=termination_event)
234 except subprocess.CalledProcessError as e:
235 ret: int = e.returncode
236 ret = DIE_STATUS if isinstance(ret, int) and 1 <= ret <= STILL_RUNNING_STATUS else ret
237 sys.exit(ret)
238 finally:
239 os.umask(prev_umask) # restore prior global state
242def run_main(
243 args: argparse.Namespace,
244 sys_argv: list[str] | None = None,
245 log: Logger | None = None,
246 termination_event: threading.Event | None = None,
247) -> None:
248 """API for Python clients; visible for testing; may become a public API eventually."""
249 Job(termination_event=termination_event).run_main(args, sys_argv, log)
252#############################################################################
253@final
254class Job(MiniJob):
255 """Executes one bzfs run, coordinating snapshot replication tasks."""
257 def __init__(self, termination_event: threading.Event | None = None) -> None:
258 self.params: Params
259 self.termination_event: Final[threading.Event] = termination_event or threading.Event()
260 self.subprocesses: Subprocesses = Subprocesses(termination_event=self.termination_event)
261 self.retry_options: Final[RetryOptions] = RetryOptions(config=RetryConfig(termination_event=self.termination_event))
262 self.all_dst_dataset_exists: Final[dict[str, dict[str, bool]]] = defaultdict(lambda: defaultdict(bool))
263 self.dst_dataset_exists: SynchronizedDict[str, bool] = SynchronizedDict({})
264 self.src_properties: dict[str, DatasetProperties] = {}
265 self.dst_properties: dict[str, DatasetProperties] = {}
266 self.all_exceptions: list[str] = []
267 self.all_exceptions_count: int = 0
268 self.max_exceptions_to_summarize: int = 10000
269 self.first_exception: BaseException | None = None
270 self.remote_conf_cache: dict[tuple, RemoteConfCacheItem] = {}
271 self.max_datasets_per_minibatch_on_list_snaps: dict[str, int] = {}
272 self.max_workers: dict[str, int] = {}
273 self.progress_reporter: ProgressReporter = cast(ProgressReporter, None)
274 self.is_first_replication_task: Final[SynchronizedBool] = SynchronizedBool(True)
275 self.replication_start_time_nanos: int = time.monotonic_ns()
276 self.timeout_nanos: int | None = None # timestamp aka instant in time
277 self.timeout_duration_nanos: int | None = None # duration (not a timestamp); for logging only
278 self.cache: SnapshotCache = SnapshotCache(self)
279 self.stats_lock: Final[threading.Lock] = threading.Lock()
280 self.num_cache_hits: int = 0
281 self.num_cache_misses: int = 0
282 self.num_snapshots_found: int = 0
283 self.num_snapshots_replicated: int = 0
285 self.is_test_mode: bool = False # for testing only
286 self.creation_prefix: str = "" # for testing only
287 self.use_select: bool = False # for testing only
288 self.progress_update_intervals: tuple[float, float] | None = None # for testing only
289 self.error_injection_triggers: dict[str, Counter[str]] = {} # for testing only
290 self.delete_injection_triggers: dict[str, Counter[str]] = {} # for testing only
291 self.param_injection_triggers: dict[str, dict[str, bool]] = {} # for testing only
292 self.inject_params: dict[str, bool] = {} # for testing only
293 self.injection_lock: threading.Lock = threading.Lock() # for testing only
294 self.max_command_line_bytes: int | None = None # for testing only
296 def shutdown(self) -> None:
297 """Exits any multiplexed ssh sessions that may be leftover."""
298 cache_items: Collection[RemoteConfCacheItem] = self.remote_conf_cache.values()
299 for i, cache_item in enumerate(cache_items):
300 cache_item.connection_pools.shutdown(f"{i + 1}/{len(cache_items)}")
302 def terminate(self) -> None:
303 """Shuts down gracefully; also terminates descendant processes, if any."""
304 with xfinally(self.subprocesses.terminate_process_subtrees):
305 self.shutdown()
307 def run_main(self, args: argparse.Namespace, sys_argv: list[str] | None = None, log: Logger | None = None) -> None:
308 """Parses CLI arguments, sets up logging, and executes main job loop."""
309 assert isinstance(self.error_injection_triggers, dict)
310 assert isinstance(self.delete_injection_triggers, dict)
311 assert isinstance(self.inject_params, dict)
312 logger_name_suffix: str = ""
314 def _reset_logger() -> None:
315 if logger_name_suffix and log is not None: # reset Logger unless it's a Logger outside of our control
316 reset_logger(log)
318 with xfinally(_reset_logger): # runs _reset_logger() on exit, without masking error raised in body of `with` block
319 try:
320 log_params: LogParams = LogParams(args)
321 logger_name_suffix = "" if log is not None else log_params.logger_name_suffix
322 log = bzfs_main.loggers.get_logger(
323 log_params=log_params, args=args, log=log, logger_name_suffix=logger_name_suffix
324 )
325 log.info("%s", f"Log file is: {log_params.log_file}")
326 except BaseException as e:
327 simple_log: Logger = get_simple_logger(PROG_NAME, logger_name_suffix=logger_name_suffix)
328 try:
329 simple_log.error("Log init: %s", e, exc_info=not isinstance(e, SystemExit))
330 finally:
331 reset_logger(simple_log)
332 raise
334 aux_args: list[str] = []
335 if getattr(args, "include_snapshot_plan", None):
336 aux_args += args.include_snapshot_plan
337 if getattr(args, "delete_dst_snapshots_except_plan", None):
338 aux_args += args.delete_dst_snapshots_except_plan
339 if len(aux_args) > 0:
340 log.info("Auxiliary CLI arguments: %s", " ".join(aux_args))
341 args = argument_parser().parse_args(xappend(aux_args, "--", args.root_dataset_pairs), namespace=args)
343 def log_error_on_exit(error: Any, status_code: Any, exc_info: bool = False) -> None:
344 log.error("%s%s", f"Exiting {PROG_NAME} with status code {status_code}. Cause: ", error, exc_info=exc_info)
346 try:
347 log.info("CLI arguments: %s %s", " ".join(sys_argv or []), f"[uid: {os.getuid()}, euid: {os.geteuid()}]")
348 if self.is_test_mode:
349 log.log(LOG_TRACE, "Parsed CLI arguments: %s", args)
350 self.params = p = Params(args, sys_argv or [], log_params, log, self.inject_params)
351 self.timeout_duration_nanos = p.timeout_duration_nanos
352 lock_file: str = p.lock_file_name()
353 lock_fd = os.open(
354 lock_file, os.O_WRONLY | os.O_TRUNC | os.O_CREAT | os.O_NOFOLLOW | os.O_CLOEXEC, FILE_PERMISSIONS
355 )
356 with xfinally(lambda: os.close(lock_fd)):
357 try:
358 # Acquire an exclusive lock; will raise a BlockingIOError if lock is already held by this process or
359 # another process. The (advisory) lock is auto-released when the process terminates or the fd is
360 # closed.
361 fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # LOCK_NB ... non-blocking
362 except BlockingIOError:
363 msg = "Exiting as same previous periodic job is still running without completion yet per "
364 die(msg + lock_file, STILL_RUNNING_STATUS)
366 # xfinally: unlink the lock_file while still holding the flock on its fd - it's a correct and safe
367 # standard POSIX pattern:
368 # - Performing unlink() before close(fd) avoids a race where a subsequent bzfs process could recreate and
369 # lock a fresh inode for the same path between our close() and a later unlink(). In that case, a late
370 # unlink would delete the newer process's lock_file path.
371 # - At this point, critical work is complete; the remaining steps are shutdown mechanics that have no
372 # side effect, so this pattern is correct, safe, and simple.
373 with xfinally(lambda: Path(lock_file).unlink(missing_ok=True)): # don't accumulate stale files
374 try:
375 self.run_tasks() # do the real work
376 except BaseException:
377 self.terminate()
378 raise
379 self.shutdown()
380 with contextlib.suppress(BrokenPipeError):
381 sys.stderr.flush()
382 sys.stdout.flush()
383 except subprocess.CalledProcessError as e:
384 log_error_on_exit(e, e.returncode)
385 raise
386 except SystemExit as e:
387 log_error_on_exit(e, e.code)
388 raise
389 except (subprocess.TimeoutExpired, UnicodeDecodeError) as e:
390 log_error_on_exit(e, DIE_STATUS)
391 raise SystemExit(DIE_STATUS) from e
392 except re.error as e:
393 log_error_on_exit(f"{e} within regex {e.pattern!r}", DIE_STATUS)
394 raise SystemExit(DIE_STATUS) from e
395 except BaseException as e:
396 log_error_on_exit(e, DIE_STATUS, exc_info=True)
397 raise SystemExit(DIE_STATUS) from e
398 finally:
399 log.info("%s", f"Log file was: {log_params.log_file}")
400 log.info("Success. Goodbye!")
401 with contextlib.suppress(BrokenPipeError):
402 sys.stderr.flush()
403 sys.stdout.flush()
405 def run_tasks(self) -> None:
406 """Executes replication cycles, repeating until daemon lifetime expires."""
407 p, log = self.params, self.params.log
408 self.all_exceptions = []
409 self.all_exceptions_count = 0
410 self.first_exception = None
411 self.remote_conf_cache = {}
412 self.validate_once()
413 self.replication_start_time_nanos = time.monotonic_ns()
414 self.progress_reporter = ProgressReporter(log, p.pv_program_opts, self.use_select, self.progress_update_intervals)
415 with xfinally(lambda: self.progress_reporter.stop()):
416 daemon_stoptime_nanos: int = time.monotonic_ns() + p.daemon_lifetime_nanos
417 while True: # loop for daemon mode
418 self.timeout_nanos = (
419 None if p.timeout_duration_nanos is None else time.monotonic_ns() + p.timeout_duration_nanos
420 )
421 self.all_dst_dataset_exists.clear()
422 self.progress_reporter.reset()
423 src, dst = p.src, p.dst
424 for src_root_dataset, dst_root_dataset in p.root_dataset_pairs:
425 if self.termination_event.is_set():
426 self.terminate()
427 break
428 src.root_dataset = src.basis_root_dataset = src_root_dataset
429 dst.root_dataset = dst.basis_root_dataset = dst_root_dataset
430 p.curr_zfs_send_program_opts = p.zfs_send_program_opts.copy()
431 if p.daemon_lifetime_nanos > 0:
432 self.timeout_nanos = (
433 None if p.timeout_duration_nanos is None else time.monotonic_ns() + p.timeout_duration_nanos
434 )
435 recurs_sep = " " if p.recursive_flag else ""
436 task_description = f"{src.basis_root_dataset} {p.recursive_flag}{recurs_sep}--> {dst.basis_root_dataset}"
437 if len(p.root_dataset_pairs) > 1:
438 log.info("Starting task: %s", task_description + " ...")
439 try:
440 try:
441 self.maybe_inject_error(cmd=[], error_trigger="retryable_run_tasks")
442 timeout(self)
443 self.validate_task()
444 self.run_task() # do the real work
445 except RetryableError as retryable_error:
446 cause: BaseException | None = retryable_error.__cause__
447 assert cause is not None
448 raise cause.with_traceback(cause.__traceback__) # noqa: B904 re-raise of cause without chaining
449 except (subprocess.CalledProcessError, subprocess.TimeoutExpired, SystemExit, UnicodeDecodeError) as e:
450 if p.skip_on_error == "fail" or (
451 isinstance(e, subprocess.TimeoutExpired) and p.daemon_lifetime_nanos == 0
452 ):
453 raise
454 log.error("%s", e)
455 self.append_exception(e, "task", task_description)
456 if not self.sleep_until_next_daemon_iteration(daemon_stoptime_nanos):
457 break
458 if not p.skip_replication:
459 self.print_replication_stats(self.replication_start_time_nanos)
460 error_count = self.all_exceptions_count
461 if error_count > 0 and p.daemon_lifetime_nanos == 0:
462 msgs = "\n".join(f"{i + 1}/{error_count}: {e}" for i, e in enumerate(self.all_exceptions))
463 log.error("%s", f"Tolerated {error_count} errors. Error Summary: \n{msgs}")
464 assert self.first_exception is not None
465 raise self.first_exception
467 def append_exception(self, e: BaseException, task_name: str, task_description: str) -> None:
468 """Records and logs an exception that was encountered while running a subtask."""
469 self.first_exception = self.first_exception or e
470 if len(self.all_exceptions) < self.max_exceptions_to_summarize: # cap max memory consumption
471 self.all_exceptions.append(str(e))
472 self.all_exceptions_count += 1
473 self.params.log.error(f"#{self.all_exceptions_count}: Done with %s: %s", task_name, task_description)
475 def sleep_until_next_daemon_iteration(self, daemon_stoptime_nanos: int) -> bool:
476 """Pauses until next scheduled snapshot time or daemon stop; Returns True to continue daemon loop; False to stop."""
477 sleep_nanos: int = daemon_stoptime_nanos - time.monotonic_ns()
478 if sleep_nanos <= 0:
479 return False
480 self.progress_reporter.pause()
481 p, log = self.params, self.params.log
482 config: CreateSrcSnapshotConfig = p.create_src_snapshots_config
483 curr_datetime: datetime = config.current_datetime + timedelta(microseconds=1)
484 next_snapshotting_event_dt: datetime = min(
485 (
486 config.anchors.round_datetime_up_to_duration_multiple(curr_datetime, duration_amount, duration_unit)
487 for duration_amount, duration_unit in config.suffix_durations.values()
488 ),
489 default=curr_datetime + timedelta(days=10 * 365), # infinity
490 )
491 offset: timedelta = next_snapshotting_event_dt - datetime.now(config.tz)
492 offset_nanos: int = (offset.days * 86400 + offset.seconds) * 1_000_000_000 + offset.microseconds * 1_000
493 sleep_nanos = min(sleep_nanos, max(0, offset_nanos))
494 log.info("Daemon sleeping for: %s%s", human_readable_duration(sleep_nanos), f" ... [Log {p.log_params.log_file}]")
495 self.termination_event.wait(sleep_nanos / 1_000_000_000) # allow early wakeup on async termination
496 config.current_datetime = datetime.now(config.tz)
497 return time.monotonic_ns() < daemon_stoptime_nanos and not self.termination_event.is_set()
499 def print_replication_stats(self, start_time_nanos: int) -> None:
500 """Logs overall replication statistics after a job cycle completes."""
501 p, log = self.params, self.params.log
502 elapsed_nanos = time.monotonic_ns() - start_time_nanos
503 msg = p.dry(f"zfs sent {self.num_snapshots_replicated} snapshots in {human_readable_duration(elapsed_nanos)}.")
504 if p.is_program_available("pv", "local"):
505 sent_bytes: int = count_num_bytes_transferred_by_zfs_send(p.log_params.pv_log_file)
506 sent_bytes_per_sec: int = round(1_000_000_000 * sent_bytes / (elapsed_nanos or 1))
507 msg += f" zfs sent {human_readable_bytes(sent_bytes)} [{human_readable_bytes(sent_bytes_per_sec)}/s]."
508 log.info("%s", msg.ljust(p.log_params.terminal_columns - len("2024-01-01 23:58:45 [I] ")))
510 def validate_once(self) -> None:
511 """Validates CLI parameters and compiles regex lists one time only, which will later be reused many times."""
512 p = self.params
513 p.zfs_recv_ox_names = self.recv_option_property_names(p.zfs_recv_program_opts)
514 for snapshot_filter in p.snapshot_filters:
515 for _filter in snapshot_filter:
516 if _filter.name == SNAPSHOT_REGEX_FILTER_NAME:
517 exclude_snapshot_regexes = compile_regexes(_filter.options[0])
518 include_snapshot_regexes = compile_regexes(_filter.options[1] or [".*"])
519 _filter.options = (exclude_snapshot_regexes, include_snapshot_regexes)
521 exclude_regexes: list[str] = [EXCLUDE_DATASET_REGEXES_DEFAULT]
522 if len(p.args.exclude_dataset_regex) > 0: # some patterns don't exclude anything
523 exclude_regexes = [regex for regex in p.args.exclude_dataset_regex if regex != "" and regex != "!.*"]
524 include_regexes: list[str] = p.args.include_dataset_regex
526 # relative datasets need not be compiled more than once as they don't change between tasks
527 def separate_abs_vs_rel_datasets(datasets: list[str]) -> tuple[list[str], list[str]]:
528 abs_datasets: list[str] = []
529 rel_datasets: list[str] = []
530 for dataset in datasets:
531 (abs_datasets if dataset.startswith("/") else rel_datasets).append(dataset)
532 return abs_datasets, rel_datasets
534 p.abs_exclude_datasets, rel_exclude_datasets = separate_abs_vs_rel_datasets(p.args.exclude_dataset)
535 p.abs_include_datasets, rel_include_datasets = separate_abs_vs_rel_datasets(p.args.include_dataset)
536 suffix = DESCENDANTS_RE_SUFFIX
537 p.tmp_exclude_dataset_regexes, p.tmp_include_dataset_regexes = (
538 compile_regexes(exclude_regexes + dataset_regexes(p.src, p.dst, rel_exclude_datasets), suffix=suffix),
539 compile_regexes(include_regexes + dataset_regexes(p.src, p.dst, rel_include_datasets), suffix=suffix),
540 )
542 if p.pv_program != DISABLE_PRG:
543 pv_program_opts_set = set(p.pv_program_opts)
544 if pv_program_opts_set.isdisjoint({"--bytes", "-b", "--bits", "-8"}):
545 die("--pv-program-opts must contain one of --bytes or --bits for progress metrics to function.")
546 if not p.log_params.quiet:
547 for opts in [["--eta", "-e"], ["--fineta", "-I"], ["--average-rate", "-a"]]:
548 if pv_program_opts_set.isdisjoint(opts):
549 die(f"--pv-program-opts must contain one of {', '.join(opts)} for progress report line to function.")
551 src, dst = p.src, p.dst
552 for remote in [src, dst]:
553 r, loc = remote, remote.location
554 validate_user_name(r.basis_ssh_user, f"--ssh-{loc}-user")
555 validate_host_name(r.basis_ssh_host, f"--ssh-{loc}-host")
556 validate_port(r.ssh_port, f"--ssh-{loc}-port ")
558 def validate_task(self) -> None:
559 """Validates a single replication task before execution."""
560 p, log = self.params, self.params.log
561 src, dst = p.src, p.dst
562 for remote in [src, dst]:
563 r = remote
564 r.ssh_user, r.ssh_host, r.ssh_user_host, r.pool, r.root_dataset = parse_dataset_locator(
565 r.basis_root_dataset, user=r.basis_ssh_user, host=r.basis_ssh_host, port=r.ssh_port
566 )
567 r.sudo, r.use_zfs_delegation = self.sudo_cmd(r.ssh_user_host, r.ssh_user)
568 local_addrs = ("",) if self.is_test_mode else ("", "127.0.0.1", "::1") # ::1 is IPv6 version of loopback address
569 remote.is_nonlocal = r.ssh_host not in local_addrs
570 self.dst_dataset_exists = SynchronizedDict(self.all_dst_dataset_exists[dst.ssh_user_host])
572 if src.ssh_host == dst.ssh_host:
573 msg = f"src: {src.basis_root_dataset}, dst: {dst.basis_root_dataset}"
574 if src.root_dataset == dst.root_dataset:
575 die(f"Source and destination dataset must not be the same! {msg}")
576 if p.recursive and (
577 is_descendant(src.root_dataset, of_root_dataset=dst.root_dataset)
578 or is_descendant(dst.root_dataset, of_root_dataset=src.root_dataset)
579 ):
580 die(f"Source and destination dataset trees must not overlap! {msg}")
582 suffx = DESCENDANTS_RE_SUFFIX # also match descendants of a matching dataset
583 p.exclude_dataset_regexes, p.include_dataset_regexes = (
584 p.tmp_exclude_dataset_regexes + compile_regexes(dataset_regexes(src, dst, p.abs_exclude_datasets), suffix=suffx),
585 p.tmp_include_dataset_regexes + compile_regexes(dataset_regexes(src, dst, p.abs_include_datasets), suffix=suffx),
586 )
587 if len(p.include_dataset_regexes) == 0:
588 p.include_dataset_regexes = [(re.compile(r".*"), False)]
590 detect_available_programs(self)
592 if is_zpool_feature_enabled_or_active(p, dst, "feature@large_blocks"):
593 append_if_absent(p.curr_zfs_send_program_opts, "--large-block")
595 self.max_workers = {}
596 self.max_datasets_per_minibatch_on_list_snaps = {}
597 for r in [src, dst]:
598 cpus = int(p.available_programs[r.location].get("getconf_cpu_count", 8))
599 threads, is_percent = p.threads
600 cpus = max(1, round(cpus * threads / 100.0) if is_percent else round(threads))
601 self.max_workers[r.location] = cpus
602 bs = max(1, p.max_datasets_per_batch_on_list_snaps) # 1024 by default
603 max_datasets_per_minibatch = p.max_datasets_per_minibatch_on_list_snaps
604 if max_datasets_per_minibatch <= 0:
605 max_datasets_per_minibatch = max(1, bs // cpus)
606 max_datasets_per_minibatch = min(bs, max_datasets_per_minibatch)
607 self.max_datasets_per_minibatch_on_list_snaps[r.location] = max_datasets_per_minibatch
608 log.log(
609 LOG_TRACE,
610 "%s",
611 f"max_datasets_per_batch_on_list_snaps: {p.max_datasets_per_batch_on_list_snaps}, "
612 f"max_datasets_per_minibatch_on_list_snaps: {max_datasets_per_minibatch}, "
613 f"max_workers: {self.max_workers[r.location]}, "
614 f"location: {r.location}",
615 )
616 if self.is_test_mode:
617 log.log(LOG_TRACE, "Validated Param values: %s", pretty_print_formatter(self.params))
619 def sudo_cmd(self, ssh_user_host: str, ssh_user: str) -> tuple[str, bool]:
620 """Returns sudo command prefix and whether root privileges are required."""
621 p: Params = self.params
622 assert isinstance(ssh_user_host, str)
623 assert isinstance(ssh_user, str)
624 assert isinstance(p.sudo_program, str)
625 assert isinstance(p.enable_privilege_elevation, bool)
627 is_root: bool = True
628 if ssh_user_host != "":
629 if ssh_user == "":
630 if os.getuid() != 0:
631 is_root = False
632 elif ssh_user != "root":
633 is_root = False
634 elif os.getuid() != 0:
635 is_root = False
637 if is_root:
638 sudo = "" # using sudo in an attempt to make ZFS operations work even if we are not root user?
639 use_zfs_delegation = False # or instead using 'zfs allow' delegation?
640 return sudo, use_zfs_delegation
641 elif p.enable_privilege_elevation:
642 if p.sudo_program == DISABLE_PRG:
643 die(f"sudo CLI is not available on host: {ssh_user_host or 'localhost'}")
644 # The '-n' option makes 'sudo' safer and more fail-fast. It avoids having sudo prompt the user for input of any
645 # kind. If a password is required for the sudo command to run, sudo will display an error message and exit.
646 return p.sudo_program + " -n", False
647 else:
648 return "", True
650 def run_task(self) -> None:
651 """Replicates all snapshots for the current root dataset pair."""
653 def filter_src_datasets() -> list[str]: # apply --{include|exclude}-dataset policy
654 return filter_datasets(self, src, basis_src_datasets) if src_datasets is None else src_datasets
656 p, log = self.params, self.params.log
657 src, dst = p.src, p.dst
658 max_workers: int = min(self.max_workers[src.location], self.max_workers[dst.location])
659 recursive_sep: str = " " if p.recursive_flag else ""
660 task_description: str = f"{src.basis_root_dataset} {p.recursive_flag}{recursive_sep}--> {dst.basis_root_dataset} ..."
661 failed: bool = False
662 src_datasets: list[str] | None = None
663 basis_src_datasets: list[str] = []
664 self.src_properties = {}
665 self.dst_properties = {}
666 if not is_dummy(src): # find src dataset or all datasets in src dataset tree (with --recursive)
667 basis_src_datasets = self.list_src_datasets_task()
669 if not p.create_src_snapshots_config.skip_create_src_snapshots:
670 log.info(p.dry("--create-src-snapshots: %s"), f"{src.basis_root_dataset} {p.recursive_flag}{recursive_sep}...")
671 src_datasets = filter_src_datasets() # apply include/exclude policy
672 self.create_src_snapshots_task(basis_src_datasets, src_datasets)
674 # Optionally, replicate src.root_dataset (optionally including its descendants) to dst.root_dataset
675 if not p.skip_replication:
676 if len(basis_src_datasets) == 0:
677 die(f"Replication: Source dataset does not exist: {src.basis_root_dataset}")
678 if is_dummy(dst):
679 die("Replication: Destination may be a dummy dataset only if exclusively creating snapshots on the source!")
680 src_datasets = filter_src_datasets() # apply include/exclude policy
681 failed = self.replicate_datasets(src_datasets, task_description, max_workers)
683 if failed or not (
684 p.delete_dst_datasets
685 or p.delete_dst_snapshots
686 or p.delete_empty_dst_datasets
687 or p.compare_snapshot_lists
688 or p.monitor_snapshots_config.enable_monitor_snapshots
689 ):
690 return
691 log.info("Listing dst datasets: %s", task_description)
692 if is_dummy(dst):
693 die("Destination may be a dummy dataset only if exclusively creating snapshots on the source!")
694 basis_dst_datasets: list[str] = self.list_dst_datasets_task()
695 dst_datasets: list[str] = filter_datasets(self, dst, basis_dst_datasets) # apply include/exclude policy
697 if p.delete_dst_datasets and not failed:
698 log.info(p.dry("--delete-dst-datasets: %s"), task_description)
699 basis_dst_datasets, dst_datasets = self.delete_dst_datasets_task(
700 basis_src_datasets, basis_dst_datasets, dst_datasets
701 )
703 if p.delete_dst_snapshots and not failed:
704 log.info(p.dry("--delete-dst-snapshots: %s"), task_description + f" [{len(dst_datasets)} datasets]")
705 failed = self.delete_destination_snapshots_task(basis_src_datasets, dst_datasets, max_workers, task_description)
707 if p.delete_empty_dst_datasets and p.recursive and not failed:
708 log.info(p.dry("--delete-empty-dst-datasets: %s"), task_description)
709 basis_dst_datasets, dst_datasets = self.delete_empty_dst_datasets_task(basis_dst_datasets, dst_datasets)
711 if p.compare_snapshot_lists and not failed:
712 log.info("--compare-snapshot-lists: %s", task_description)
713 if len(basis_src_datasets) == 0 and not is_dummy(src):
714 die(f"Source dataset does not exist: {src.basis_root_dataset}")
715 src_datasets = filter_src_datasets() # apply include/exclude policy
716 run_compare_snapshot_lists(self, src_datasets, dst_datasets)
718 if p.monitor_snapshots_config.enable_monitor_snapshots and not failed:
719 log.info("--monitor-snapshots: %s", task_description)
720 src_datasets = filter_src_datasets() # apply include/exclude policy
721 self.monitor_snapshots_task(src_datasets, dst_datasets, task_description)
723 def list_src_datasets_task(self) -> list[str]:
724 """Lists datasets on the source host."""
725 p = self.params
726 src = p.src
727 basis_src_datasets: list[str] = []
728 is_caching: bool = is_caching_snapshots(p, src)
729 props: str = "volblocksize,recordsize,name"
730 props = "snapshots_changed," + props if is_caching else props
731 cmd: list[str] = p.split_args(
732 f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o {props} {p.recursive_flag}", src.root_dataset
733 )
734 for line in (self.try_ssh_command_with_retries(src, LOG_DEBUG, cmd=cmd) or "").splitlines():
735 cols: list[str] = line.split("\t")
736 snapshots_changed, volblocksize, recordsize, src_dataset = cols if is_caching else ["-"] + cols
737 self.src_properties[src_dataset] = DatasetProperties(
738 recordsize=int(recordsize) if recordsize != "-" else -int(volblocksize),
739 snapshots_changed=int(snapshots_changed) if snapshots_changed and snapshots_changed != "-" else 0,
740 )
741 basis_src_datasets.append(src_dataset)
742 assert (not self.is_test_mode) or basis_src_datasets == sorted(basis_src_datasets), "List is not sorted"
743 return basis_src_datasets
745 def list_dst_datasets_task(self) -> list[str]:
746 """Lists datasets on the destination host."""
747 p, log = self.params, self.params.log
748 dst = p.dst
749 is_caching: bool = is_caching_snapshots(p, dst) and p.monitor_snapshots_config.enable_monitor_snapshots
750 props: str = "name"
751 props = "snapshots_changed," + props if is_caching else props
752 cmd: list[str] = p.split_args(
753 f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o {props} {p.recursive_flag}", dst.root_dataset
754 )
755 basis_dst_datasets: list[str] = []
756 basis_dst_datasets_str: str | None = self.try_ssh_command_with_retries(dst, LOG_TRACE, cmd=cmd)
757 if basis_dst_datasets_str is None:
758 log.warning("Destination dataset does not exist: %s", dst.root_dataset)
759 else:
760 for line in basis_dst_datasets_str.splitlines():
761 cols: list[str] = line.split("\t")
762 snapshots_changed, dst_dataset = cols if is_caching else ["-"] + cols
763 self.dst_properties[dst_dataset] = DatasetProperties(
764 recordsize=0,
765 snapshots_changed=int(snapshots_changed) if snapshots_changed and snapshots_changed != "-" else 0,
766 )
767 basis_dst_datasets.append(dst_dataset)
768 assert (not self.is_test_mode) or basis_dst_datasets == sorted(basis_dst_datasets), "List is not sorted"
769 return basis_dst_datasets
771 def create_src_snapshots_task(self, basis_src_datasets: list[str], src_datasets: list[str]) -> None:
772 """Atomically creates a new snapshot of the src datasets selected by --{include|exclude}-dataset* policy; implements
773 --create-src-snapshots.
775 The implementation attempts to fit as many datasets as possible into a single (atomic) 'zfs snapshot' command line,
776 using lexicographical sort order, and using 'zfs snapshot -r' to the extent that this is compatible with the
777 --{include|exclude}-dataset* pruning policy. The snapshots of all datasets that fit within the same single 'zfs
778 snapshot' CLI invocation will be taken within the same ZFS transaction group, and correspondingly have identical
779 'createtxg' ZFS property (but not necessarily identical 'creation' ZFS time property as ZFS actually provides no such
780 guarantee), and thus be consistent. Dataset names that can't fit into a single command line are spread over multiple
781 command line invocations, respecting the limits that the operating system places on the maximum length of a single
782 command line, per `getconf ARG_MAX`.
784 Time complexity is O((N log N) + (N * M log M)) where N is the number of datasets and M is the number of snapshots
785 per dataset. Space complexity is O(max(N, M)).
786 """
787 p, log = self.params, self.params.log
788 src = p.src
789 if len(basis_src_datasets) == 0:
790 die(f"Source dataset does not exist: {src.basis_root_dataset}")
791 datasets_to_snapshot: dict[SnapshotLabel, list[str]] = self.find_datasets_to_snapshot(src_datasets)
792 datasets_to_snapshot = {label: datasets for label, datasets in datasets_to_snapshot.items() if len(datasets) > 0}
793 basis_datasets_to_snapshot: dict[SnapshotLabel, list[str]] = datasets_to_snapshot.copy() # shallow copy
794 commands: dict[SnapshotLabel, list[str]] = {}
795 for label, datasets in datasets_to_snapshot.items():
796 cmd: list[str] = p.split_args(f"{src.sudo} {p.zfs_program} snapshot")
797 if p.recursive:
798 # Run 'zfs snapshot -r' on the roots of subtrees if possible, else fallback to non-recursive CLI flavor
799 root_datasets = self.root_datasets_if_recursive_zfs_snapshot_is_possible(datasets, basis_src_datasets)
800 if root_datasets is not None:
801 cmd.append("-r") # recursive; takes a snapshot of all datasets in the subtree(s)
802 datasets_to_snapshot[label] = root_datasets
803 commands[label] = cmd
804 creation_msg = f"Creating {sum(len(datasets) for datasets in basis_datasets_to_snapshot.values())} snapshots"
805 log.info(p.dry("--create-src-snapshots: %s"), f"{creation_msg} within {len(src_datasets)} datasets ...")
806 # create snapshots in large (parallel) batches, without using a command line that's too big for the OS to handle
807 run_ssh_cmd_parallel(
808 self,
809 src,
810 ((commands[lbl], (f"{ds}@{lbl}" for ds in datasets)) for lbl, datasets in datasets_to_snapshot.items()),
811 fn=lambda cmd, batch: self.run_ssh_command_with_retries(
812 src, is_dry=p.dry_run, print_stdout=True, cmd=cmd + batch, retry_on_generic_ssh_error=False
813 ), # retry_on_generic_ssh_error=False means only retry on SSH connect errors b/c `zfs snapshot` isn't idempotent
814 max_batch_items=2**29,
815 )
816 if is_caching_snapshots(p, src):
817 # perf: copy lastmodified time of source dataset into local cache to reduce future 'zfs list -t snapshot' calls
818 self.cache.update_last_modified_cache(basis_datasets_to_snapshot)
820 def delete_destination_snapshots_task(
821 self, basis_src_datasets: list[str], dst_datasets: list[str], max_workers: int, task_description: str
822 ) -> bool:
823 """Deletes existing destination snapshots that do not exist within the source dataset if they are included by the
824 --{include|exclude}-snapshot-* policy, and the destination dataset is included via --{include|exclude}-dataset*
825 policy; implements --delete-dst-snapshots."""
826 p, log = self.params, self.params.log
827 src, dst = p.src, p.dst
828 kind: str = "bookmark" if p.delete_dst_bookmarks else "snapshot"
829 filter_needs_creation_time: bool = has_timerange_filter(p.snapshot_filters)
830 props: str = self.creation_prefix + "creation,guid,name" if filter_needs_creation_time else "guid,name"
831 basis_src_datasets_set: set[str] = set(basis_src_datasets)
832 num_snapshots_found, num_snapshots_deleted = 0, 0
834 def delete_destination_snapshots(dst_dataset: str, tid: str, retry: Retry) -> bool: # thread-safe
835 src_dataset: str = replace_prefix(dst_dataset, old_prefix=dst.root_dataset, new_prefix=src.root_dataset)
836 if src_dataset in basis_src_datasets_set and (are_bookmarks_enabled(p, src) or not p.delete_dst_bookmarks):
837 src_kind: str = kind
838 if not p.delete_dst_snapshots_no_crosscheck:
839 src_kind = "snapshot,bookmark" if are_bookmarks_enabled(p, src) else "snapshot"
840 src_cmd = p.split_args(f"{p.zfs_program} list -t {src_kind} -d 1 -s name -Hp -o guid", src_dataset)
841 else:
842 src_cmd = None
843 dst_cmd = p.split_args(f"{p.zfs_program} list -t {kind} -d 1 -s createtxg -Hp -o {props}", dst_dataset)
844 self.maybe_inject_delete(dst, dataset=dst_dataset, delete_trigger="zfs_list_delete_dst_snapshots")
845 src_snaps_with_guids, dst_snaps_with_guids_str = run_in_parallel( # list src+dst snapshots in parallel
846 lambda: set(self.run_ssh_command(src, LOG_TRACE, cmd=src_cmd).splitlines() if src_cmd else []),
847 lambda: self.try_ssh_command(dst, LOG_TRACE, cmd=dst_cmd),
848 )
849 if dst_snaps_with_guids_str is None:
850 log.warning("Third party deleted destination: %s", dst_dataset)
851 return False
852 dst_snaps_with_guids: list[str] = dst_snaps_with_guids_str.splitlines()
853 num_dst_snaps_with_guids = len(dst_snaps_with_guids)
854 basis_dst_snaps_with_guids: list[str] = dst_snaps_with_guids.copy()
855 if p.delete_dst_bookmarks:
856 replace_in_lines(dst_snaps_with_guids, old="#", new="@", count=1) # treat bookmarks as snapshots
857 # The check against the source dataset happens *after* filtering the dst snapshots with filter_snapshots().
858 # `p.delete_dst_snapshots_except` means the user wants to specify snapshots to *retain* aka *keep*
859 all_except: bool = p.delete_dst_snapshots_except
860 if p.delete_dst_snapshots_except and not is_dummy(src):
861 # However, as here we are in "except" mode AND the source is NOT a dummy, we first filter to get what
862 # the policy says to *keep* (so all_except=False for the filter_snapshots() call), then from that "keep"
863 # list, we later further refine by checking what's on the source dataset.
864 all_except = False
865 dst_snaps_with_guids = filter_snapshots(self, dst_snaps_with_guids, all_except=all_except)
866 if p.delete_dst_bookmarks:
867 replace_in_lines(dst_snaps_with_guids, old="@", new="#", count=1) # restore pre-filtering bookmark state
868 if filter_needs_creation_time:
869 dst_snaps_with_guids = cut(field=2, lines=dst_snaps_with_guids)
870 basis_dst_snaps_with_guids = cut(field=2, lines=basis_dst_snaps_with_guids)
871 if p.delete_dst_snapshots_except and not is_dummy(src): # Non-dummy Source + "Except" (Keep) Mode
872 # Retain dst snapshots that match snapshot filter policy AND are on src dataset, aka
873 # Delete dst snapshots except snapshots that match snapshot filter policy AND are on src dataset.
874 # Concretely, `dst_snaps_with_guids` contains GUIDs of DST snapshots that the filter policy says to KEEP.
875 # We only actually keep them if they are ALSO on the SRC.
876 # So, snapshots to DELETE (`dst_tags_to_delete`) are ALL snapshots on DST (`basis_dst_snaps_with_guids`)
877 # EXCEPT those whose GUIDs are in `dst_snaps_with_guids` AND ALSO in `src_snaps_with_guids`.
878 except_dst_guids: set[str] = set(cut(field=1, lines=dst_snaps_with_guids)).intersection(src_snaps_with_guids)
879 dst_tags_to_delete: list[str] = filter_lines_except(basis_dst_snaps_with_guids, except_dst_guids)
880 else: # Standard Delete Mode OR Dummy Source + "Except" (Keep) Mode
881 # In standard delete mode:
882 # `dst_snaps_with_guids` contains GUIDs of policy-selected snapshots on DST.
883 # We delete those that are NOT on SRC.
884 # `dst_tags_to_delete` = `dst_snaps_with_guids` - `src_snaps_with_guids`.
885 # In dummy source + "except" (keep) mode:
886 # `all_except` was True.
887 # `dst_snaps_with_guids` contains snaps NOT matching the "keep" policy -- these are the ones to delete.
888 # `src_snaps_with_guids` is empty.
889 # `dst_tags_to_delete` = `dst_snaps_with_guids` - {} = `dst_snaps_with_guids`.
890 dst_guids_to_delete = set(cut(field=1, lines=dst_snaps_with_guids)).difference(src_snaps_with_guids)
891 dst_tags_to_delete = filter_lines(dst_snaps_with_guids, dst_guids_to_delete)
892 separator: str = "#" if p.delete_dst_bookmarks else "@"
893 dst_tags_to_delete = cut(field=2, separator=separator, lines=dst_tags_to_delete)
894 if p.delete_dst_bookmarks:
895 delete_bookmarks(self, dst, dst_dataset, snapshot_tags=dst_tags_to_delete)
896 else:
897 delete_snapshots(self, dst, dst_dataset, snapshot_tags=dst_tags_to_delete)
898 with self.stats_lock:
899 nonlocal num_snapshots_found
900 num_snapshots_found += num_dst_snaps_with_guids
901 nonlocal num_snapshots_deleted
902 num_snapshots_deleted += len(dst_tags_to_delete)
903 if len(dst_tags_to_delete) > 0 and not p.delete_dst_bookmarks:
904 self.dst_properties[dst_dataset].snapshots_changed = 0 # invalidate cache
905 return True
907 # Run delete_destination_snapshots(dataset) for each dataset, while handling errors, retries + parallel exec
908 failed: bool = False
909 if are_bookmarks_enabled(p, dst) or not p.delete_dst_bookmarks:
910 start_time_nanos = time.monotonic_ns()
911 failed = process_datasets_in_parallel_and_fault_tolerant(
912 log=log,
913 datasets=dst_datasets,
914 process_dataset=delete_destination_snapshots, # lambda
915 skip_tree_on_error=lambda dataset: False,
916 skip_on_error=p.skip_on_error,
917 max_workers=max_workers,
918 termination_event=self.termination_event,
919 termination_handler=self.terminate,
920 enable_barriers=False,
921 task_name="--delete-dst-snapshots",
922 append_exception=self.append_exception,
923 retry_policy=p.retry_policy,
924 retry_options=self.retry_options,
925 dry_run=p.dry_run,
926 is_test_mode=self.is_test_mode,
927 )
928 elapsed_nanos = time.monotonic_ns() - start_time_nanos
929 log.info(
930 p.dry("--delete-dst-snapshots: %s"),
931 task_description + f" [Deleted {num_snapshots_deleted} out of {num_snapshots_found} {kind}s "
932 f"within {len(dst_datasets)} datasets; took {human_readable_duration(elapsed_nanos)}]",
933 )
934 return failed
936 def delete_dst_datasets_task(
937 self, basis_src_datasets: list[str], basis_dst_datasets: list[str], sorted_dst_datasets: list[str]
938 ) -> tuple[list[str], list[str]]:
939 """Deletes existing destination datasets that do not exist within the source dataset if they are included via
940 --{include|exclude}-dataset* policy; implements --delete-dst-datasets.
942 Do not recurse without --recursive. With --recursive, never delete non-selected dataset subtrees or their ancestors.
943 """
944 p = self.params
945 src, dst = p.src, p.dst
946 children: dict[str, set[str]] = defaultdict(set)
947 for dst_dataset in basis_dst_datasets: # Compute the direct children of each NON-FILTERED dataset
948 parent: str = os.path.dirname(dst_dataset)
949 children[parent].add(dst_dataset)
950 to_delete: set[str] = set()
951 for dst_dataset in reversed(sorted_dst_datasets): # Reverse order facilitates efficient O(N) time algorithm
952 if children[dst_dataset].issubset(to_delete):
953 to_delete.add(dst_dataset) # all children are deletable, thus the dataset itself is deletable too
954 to_delete = to_delete.difference(
955 replace_prefix(src_dataset, src.root_dataset, dst.root_dataset) for src_dataset in basis_src_datasets
956 )
957 delete_datasets(self, dst, to_delete)
958 sorted_dst_datasets = sorted(set(sorted_dst_datasets).difference(to_delete))
959 basis_dst_datasets = sorted(set(basis_dst_datasets).difference(to_delete))
960 return basis_dst_datasets, sorted_dst_datasets
962 def delete_empty_dst_datasets_task(
963 self, basis_dst_datasets: list[str], sorted_dst_datasets: list[str]
964 ) -> tuple[list[str], list[str]]:
965 """Deletes any existing destination dataset that has no snapshot and no bookmark if all descendants of that dataset
966 do not have a snapshot or bookmark either; implements --delete-empty-dst-datasets.
968 To do so, we walk the dataset list (conceptually, a tree) depth-first (i.e. sorted descending). If a dst dataset has
969 zero snapshots and zero bookmarks and all its children are already marked as orphans, then it is itself an orphan,
970 and we mark it as such. Walking in a reverse sorted way means that we efficiently check for zero snapshots/bookmarks
971 not just over the direct children but the entire tree. Finally, delete all orphan datasets in an efficient batched
972 way.
973 """
974 p = self.params
975 dst = p.dst
977 # Compute the direct children of each NON-FILTERED dataset. Thus, no non-selected dataset and no ancestor of a
978 # non-selected dataset will ever be added to the "orphan" set. In other words, this treats non-selected dataset
979 # subtrees as if they all had snapshots, so non-selected dataset subtrees and their ancestors are guaranteed
980 # to not get deleted.
981 children: dict[str, set[str]] = defaultdict(set)
982 for dst_dataset in basis_dst_datasets:
983 parent: str = os.path.dirname(dst_dataset)
984 children[parent].add(dst_dataset)
986 def compute_orphans(datasets_having_snapshots: set[str]) -> set[str]:
987 """Returns destination datasets having zero snapshots whose children are all orphans."""
988 orphans: set[str] = set()
989 for dst_dataset in reversed(sorted_dst_datasets): # Reverse order facilitates efficient O(N) time algorithm
990 if (dst_dataset not in datasets_having_snapshots) and children[dst_dataset].issubset(orphans):
991 orphans.add(dst_dataset)
992 return orphans
994 # Compute candidate orphan datasets, which reduces the list of datasets for which we list snapshots via
995 # 'zfs list -t snapshot ...' from dst_datasets to a subset of dst_datasets, which in turn reduces I/O and improves
996 # perf. Essentially, this eliminates the I/O to list snapshots for ancestors of excluded datasets.
997 candidate_orphans: set[str] = compute_orphans(set())
999 # Compute destination datasets having more than zero snapshots
1000 dst_datasets_having_snapshots: set[str] = set()
1001 with_bookmarks: bool = p.delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots and are_bookmarks_enabled(p, dst)
1002 btype: str = "bookmark,snapshot" if with_bookmarks else "snapshot"
1003 cmd: list[str] = p.split_args(f"{p.zfs_program} list -t {btype} -d 1 -S name -Hp -o name")
1004 for snapshots in zfs_list_snapshots_in_parallel(self, dst, cmd, sorted(candidate_orphans), ordered=False):
1005 if with_bookmarks:
1006 replace_in_lines(snapshots, old="#", new="@", count=1) # treat bookmarks as snapshots
1007 dst_datasets_having_snapshots.update(snap[0 : snap.index("@")] for snap in snapshots) # union
1009 orphans = compute_orphans(dst_datasets_having_snapshots) # compute the real orphans
1010 delete_datasets(self, dst, orphans) # finally, delete the orphan datasets in an efficient way
1011 sorted_dst_datasets = sorted(set(sorted_dst_datasets).difference(orphans))
1012 basis_dst_datasets = sorted(set(basis_dst_datasets).difference(orphans))
1013 return basis_dst_datasets, sorted_dst_datasets
1015 def monitor_snapshots_task(
1016 self, sorted_src_datasets: list[str], sorted_dst_datasets: list[str], task_description: str
1017 ) -> None:
1018 """Monitors src and dst snapshots; implements --monitor-snapshots."""
1019 p, log = self.params, self.params.log
1020 src, dst = p.src, p.dst
1021 num_cache_hits: int = self.num_cache_hits
1022 num_cache_misses: int = self.num_cache_misses
1023 start_time_nanos: int = time.monotonic_ns()
1024 run_in_parallel(
1025 lambda: self.monitor_snapshots(dst, sorted_dst_datasets),
1026 lambda: self.monitor_snapshots(src, sorted_src_datasets),
1027 )
1028 elapsed: str = human_readable_duration(time.monotonic_ns() - start_time_nanos)
1029 num_cache_hits = self.num_cache_hits - num_cache_hits
1030 num_cache_misses = self.num_cache_misses - num_cache_misses
1031 if num_cache_hits > 0 or num_cache_misses > 0:
1032 msg = self._cache_hits_msg(hits=num_cache_hits, misses=num_cache_misses)
1033 else:
1034 msg = ""
1035 log.info(
1036 "--monitor-snapshots done: %s",
1037 f"{task_description} [{len(sorted_src_datasets) + len(sorted_dst_datasets)} datasets; took {elapsed}{msg}]",
1038 )
1040 def monitor_snapshots(self, remote: Remote, sorted_datasets: list[str]) -> None:
1041 """Checks snapshot freshness and warns or errors out when limits are exceeded.
1043 Alerts the user if the ZFS 'creation' time property of the latest or oldest snapshot for any specified snapshot name
1044 pattern within the selected datasets is too old wrt. the specified age limit. The purpose is to check if snapshots
1045 are successfully taken on schedule, successfully replicated on schedule, and successfully pruned on schedule. Process
1046 exit code is 0, 1, 2 on OK, WARNING, CRITICAL, respectively.
1048 Time complexity is O((N log N) + (N * M log M)) where N is the number of datasets and M is the number of snapshots
1049 per dataset. Space complexity is O(max(N, M)).
1050 """
1051 p, log = self.params, self.params.log
1052 alerts: list[MonitorSnapshotAlert] = p.monitor_snapshots_config.alerts
1053 labels: list[SnapshotLabel] = [alert.label for alert in alerts]
1054 current_unixtime_millis: float = p.create_src_snapshots_config.current_datetime.timestamp() * 1000
1055 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
1056 if is_caching_snapshots(p, remote):
1057 props: dict[str, DatasetProperties] = self.dst_properties if remote is p.dst else self.src_properties
1058 snapshots_changed_dict: dict[str, int] = {dataset: vals.snapshots_changed for dataset, vals in props.items()}
1059 alerts_hash: str = sha256_128_urlsafe_base64(str(tuple(alerts)))
1060 label_hashes: dict[SnapshotLabel, str] = {
1061 label: sha256_128_urlsafe_base64(label.notimestamp_str()) for label in labels
1062 }
1063 is_caching: bool = False
1065 def monitor_last_modified_cache_file(r: Remote, dataset: str, label: SnapshotLabel, alert_cfg: AlertConfig) -> str:
1066 cache_label: str = os.path.join(MONITOR_CACHE_FILE_PREFIX, alert_cfg.kind[0], label_hashes[label], alerts_hash)
1067 return self.cache.last_modified_cache_file(r, dataset, cache_label)
1069 def alert_msg(
1070 kind: str, dataset: str, snapshot: str, label: SnapshotLabel, snapshot_age_millis: float, delta_millis: int
1071 ) -> str:
1072 assert kind == "Latest" or kind == "Oldest"
1073 lbl = f"{label.prefix}{label.infix}<timestamp>{label.suffix}"
1074 if snapshot_age_millis >= current_unixtime_millis:
1075 return f"No snapshot exists for {dataset}@{lbl}"
1076 msg = f"{kind} snapshot for {dataset}@{lbl} is {human_readable_duration(snapshot_age_millis, unit='ms')} old"
1077 s = f": @{snapshot}" if snapshot else ""
1078 if delta_millis == -1:
1079 return f"{msg}{s}"
1080 return f"{msg} but should be at most {human_readable_duration(delta_millis, unit='ms')} old{s}"
1082 def check_alert(
1083 label: SnapshotLabel, alert_cfg: AlertConfig | None, creation_unixtime_secs: int, dataset: str, snapshot: str
1084 ) -> None: # thread-safe
1085 if alert_cfg is None:
1086 return
1087 if is_caching and not p.dry_run: # update cache with latest state from 'zfs list -t snapshot'
1088 snapshots_changed: int = snapshots_changed_dict.get(dataset, 0)
1089 cache_file: str = monitor_last_modified_cache_file(remote, dataset, label, alert_cfg)
1090 set_last_modification_time_safe(
1091 cache_file, unixtime_in_secs=(creation_unixtime_secs, snapshots_changed), if_more_recent=True
1092 )
1093 warning_millis: int = alert_cfg.warning_millis
1094 critical_millis: int = alert_cfg.critical_millis
1095 alert_kind = alert_cfg.kind
1096 snapshot_age_millis: float = current_unixtime_millis - creation_unixtime_secs * 1000
1097 m = "--monitor_snapshots: "
1098 if snapshot_age_millis > critical_millis:
1099 msg = m + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, critical_millis)
1100 log.critical("%s", msg)
1101 if not p.monitor_snapshots_config.dont_crit:
1102 die(msg, exit_code=CRITICAL_STATUS)
1103 elif snapshot_age_millis > warning_millis:
1104 msg = m + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, warning_millis)
1105 log.warning("%s", msg)
1106 if not p.monitor_snapshots_config.dont_warn:
1107 die(msg, exit_code=WARNING_STATUS)
1108 elif is_debug:
1109 msg = m + "OK. " + alert_msg(alert_kind, dataset, snapshot, label, snapshot_age_millis, delta_millis=-1)
1110 log.debug("%s", msg)
1112 def alert_latest_snapshot(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None:
1113 alert: MonitorSnapshotAlert = alerts[i]
1114 check_alert(alert.label, alert.latest, creation_unixtime_secs, dataset, snapshot)
1116 def alert_oldest_snapshot(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None:
1117 alert: MonitorSnapshotAlert = alerts[i]
1118 check_alert(alert.label, alert.oldest, creation_unixtime_secs, dataset, snapshot)
1120 def find_stale_datasets_and_check_alerts() -> list[str]:
1121 """If the cache is enabled, check which datasets have changed to determine which datasets can be skipped cheaply,
1122 that is, without incurring 'zfs list -t snapshots'.
1124 This is done by comparing the "snapshots_changed" ZFS dataset property with the local cache. See
1125 https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed
1126 """
1127 stale_datasets: list[str] = []
1128 time_threshold: float = time.time() - MATURITY_TIME_THRESHOLD_SECS
1129 for dataset in sorted_datasets:
1130 is_stale_dataset: bool = False
1131 snapshots_changed: int = snapshots_changed_dict.get(dataset, 0)
1132 for alert in alerts:
1133 for cfg in (alert.latest, alert.oldest):
1134 if cfg is None:
1135 continue
1136 if (
1137 snapshots_changed != 0
1138 and snapshots_changed < time_threshold
1139 and ( # always True
1140 cached_unix_times := self.cache.get_snapshots_changed2(
1141 monitor_last_modified_cache_file(remote, dataset, alert.label, cfg)
1142 )
1143 )
1144 and snapshots_changed == cached_unix_times[1] # cached snapshots_changed aka last modified time
1145 and snapshots_changed >= cached_unix_times[0] # creation time of minmax snapshot aka access time
1146 ): # cached state is still valid; emit an alert if the latest/oldest snapshot is too old
1147 lbl = alert.label
1148 check_alert(lbl, cfg, creation_unixtime_secs=cached_unix_times[0], dataset=dataset, snapshot="")
1149 else: # cached state is no longer valid; fallback to 'zfs list -t snapshot'
1150 is_stale_dataset = True
1151 if is_stale_dataset:
1152 stale_datasets.append(dataset)
1153 return stale_datasets
1155 # satisfy request from local cache as much as possible
1156 if is_caching_snapshots(p, remote):
1157 stale_datasets: list[str] = find_stale_datasets_and_check_alerts()
1158 with self.stats_lock:
1159 self.num_cache_misses += len(stale_datasets)
1160 self.num_cache_hits += len(sorted_datasets) - len(stale_datasets)
1161 else:
1162 stale_datasets = sorted_datasets
1164 # fallback to 'zfs list -t snapshot' for any remaining datasets, as these couldn't be satisfied from local cache
1165 is_caching = is_caching_snapshots(p, remote)
1166 datasets_without_snapshots: list[str] = self.handle_minmax_snapshots(
1167 remote, stale_datasets, labels, fn_latest=alert_latest_snapshot, fn_oldest=alert_oldest_snapshot
1168 )
1169 for dataset in datasets_without_snapshots:
1170 for i in range(len(alerts)):
1171 alert_latest_snapshot(i, creation_unixtime_secs=0, dataset=dataset, snapshot="")
1172 alert_oldest_snapshot(i, creation_unixtime_secs=0, dataset=dataset, snapshot="")
1174 def replicate_datasets(self, src_datasets: list[str], task_description: str, max_workers: int) -> bool:
1175 """Replicates a list of datasets."""
1176 assert (not self.is_test_mode) or src_datasets == sorted(src_datasets), "List is not sorted"
1177 p, log = self.params, self.params.log
1178 src, dst = p.src, p.dst
1179 self.num_snapshots_found = 0
1180 self.num_snapshots_replicated = 0
1181 log.info("Starting replication task: %s", task_description + f" [{len(src_datasets)} datasets]")
1182 start_time_nanos: int = time.monotonic_ns()
1184 def src2dst(src_dataset: str) -> str:
1185 return replace_prefix(src_dataset, old_prefix=src.root_dataset, new_prefix=dst.root_dataset)
1187 def dst2src(dst_dataset: str) -> str:
1188 return replace_prefix(dst_dataset, old_prefix=dst.root_dataset, new_prefix=src.root_dataset)
1190 def find_stale_datasets() -> tuple[list[str], dict[str, str]]:
1191 """If the cache is enabled on replication, check which src datasets or dst datasets have changed to determine
1192 which datasets can be skipped cheaply, i.e. without incurring 'zfs list -t snapshots'.
1194 This is done by comparing the "snapshots_changed" ZFS dataset property with the local cache. See
1195 https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed
1196 """
1197 # First, check which src datasets have changed since the last replication to that destination
1198 cache_files: dict[str, str] = {}
1199 stale_src_datasets1: list[str] = []
1200 maybe_stale_dst_datasets: list[str] = []
1201 userhost_dir: str = sha256_85_urlsafe_base64(p.dst.cache_namespace())
1202 filter_key = tuple(tuple(f) for f in p.snapshot_filters) # cache is only valid for same --include/excl-snapshot*
1203 filter_hash_code: str = sha256_85_urlsafe_base64(str(filter_key))
1204 for src_dataset in src_datasets:
1205 dst_dataset: str = src2dst(src_dataset) # cache is only valid for identical destination dataset
1206 dst_dataset_dir: str = sha256_85_urlsafe_base64(dst_dataset)
1207 cache_label: str = os.path.join(
1208 REPLICATION_CACHE_FILE_PREFIX, userhost_dir, dst_dataset_dir, filter_hash_code
1209 )
1210 cache_file: str = self.cache.last_modified_cache_file(src, src_dataset, cache_label)
1211 cache_files[src_dataset] = cache_file
1212 snapshots_changed: int = self.src_properties[src_dataset].snapshots_changed # get prop "for free"
1213 if (
1214 snapshots_changed != 0
1215 and time.time() > snapshots_changed + MATURITY_TIME_THRESHOLD_SECS
1216 and snapshots_changed == self.cache.get_snapshots_changed(cache_file)
1217 ):
1218 maybe_stale_dst_datasets.append(dst_dataset)
1219 else:
1220 stale_src_datasets1.append(src_dataset)
1222 # For each src dataset that hasn't changed, check if the corresponding dst dataset has changed
1223 stale_src_datasets2: list[str] = []
1224 dst_snapshots_changed_dict: dict[str, int] = self.cache.zfs_get_snapshots_changed(dst, maybe_stale_dst_datasets)
1225 for dst_dataset in maybe_stale_dst_datasets:
1226 snapshots_changed = dst_snapshots_changed_dict.get(dst_dataset, 0)
1227 if (
1228 snapshots_changed != 0
1229 and time.time() > snapshots_changed + MATURITY_TIME_THRESHOLD_SECS
1230 and snapshots_changed
1231 == self.cache.get_snapshots_changed(self.cache.last_modified_cache_file(dst, dst_dataset))
1232 ):
1233 log.info("Already up-to-date [cached]: %s", dst_dataset)
1234 else:
1235 stale_src_datasets2.append(dst2src(dst_dataset))
1236 assert (not self.is_test_mode) or stale_src_datasets1 == sorted(stale_src_datasets1), "List is not sorted"
1237 assert (not self.is_test_mode) or stale_src_datasets2 == sorted(stale_src_datasets2), "List is not sorted"
1238 stale_src_datasets = list(heapq.merge(stale_src_datasets1, stale_src_datasets2)) # merge two sorted lists
1239 assert (not self.is_test_mode) or not has_duplicates(stale_src_datasets), "List contains duplicates"
1240 return stale_src_datasets, cache_files
1242 if is_caching_snapshots(p, src):
1243 stale_src_datasets, cache_files = find_stale_datasets()
1244 num_cache_misses = len(stale_src_datasets)
1245 num_cache_hits = len(src_datasets) - len(stale_src_datasets)
1246 self.num_cache_misses += num_cache_misses
1247 self.num_cache_hits += num_cache_hits
1248 cmsg = self._cache_hits_msg(hits=num_cache_hits, misses=num_cache_misses)
1249 else:
1250 stale_src_datasets = src_datasets
1251 cache_files = {}
1252 cmsg = ""
1254 done_src_datasets: list[str] = []
1255 done_src_datasets_lock: threading.Lock = threading.Lock()
1257 def _process_dataset_fn(src_dataset: str, tid: str, retry: Retry) -> bool:
1258 result: bool = replicate_dataset(job=self, src_dataset=src_dataset, tid=tid, retry=retry)
1259 with done_src_datasets_lock:
1260 done_src_datasets.append(src_dataset) # record datasets that were actually replicated (not skipped)
1261 return result
1263 # Run replicate_dataset(dataset) for each dataset, while taking care of errors, retries + parallel execution
1264 failed: bool = process_datasets_in_parallel_and_fault_tolerant(
1265 log=log,
1266 datasets=stale_src_datasets,
1267 process_dataset=_process_dataset_fn,
1268 skip_tree_on_error=lambda dataset: not self.dst_dataset_exists[src2dst(dataset)],
1269 skip_on_error=p.skip_on_error,
1270 max_workers=max_workers,
1271 termination_event=self.termination_event,
1272 termination_handler=self.terminate,
1273 enable_barriers=False,
1274 task_name="Replication",
1275 append_exception=self.append_exception,
1276 retry_policy=p.retry_policy,
1277 retry_options=self.retry_options,
1278 dry_run=p.dry_run,
1279 is_test_mode=self.is_test_mode,
1280 )
1282 if is_caching_snapshots(p, src) and len(done_src_datasets) > 0:
1283 # refresh "snapshots_changed" ZFS dataset property from dst
1284 stale_dst_datasets: list[str] = [src2dst(src_dataset) for src_dataset in sorted(done_src_datasets)]
1285 dst_snapshots_changed_dict: dict[str, int] = self.cache.zfs_get_snapshots_changed(dst, stale_dst_datasets)
1286 for dst_dataset in stale_dst_datasets: # update local cache
1287 dst_snapshots_changed: int = dst_snapshots_changed_dict.get(dst_dataset, 0)
1288 dst_cache_file: str = self.cache.last_modified_cache_file(dst, dst_dataset)
1289 src_dataset: str = dst2src(dst_dataset)
1290 src_snapshots_changed: int = self.src_properties[src_dataset].snapshots_changed
1291 if not p.dry_run:
1292 set_last_modification_time_safe(
1293 cache_files[src_dataset], unixtime_in_secs=src_snapshots_changed, if_more_recent=True
1294 )
1295 set_last_modification_time_safe(
1296 dst_cache_file, unixtime_in_secs=dst_snapshots_changed, if_more_recent=True
1297 )
1299 elapsed_nanos: int = time.monotonic_ns() - start_time_nanos
1300 log.info(
1301 p.dry("Replication done: %s"),
1302 f"{task_description} [Replicated {self.num_snapshots_replicated} out of {self.num_snapshots_found} snapshots"
1303 f" within {len(src_datasets)} datasets; took {human_readable_duration(elapsed_nanos)}{cmsg}]",
1304 )
1305 return failed
1307 def maybe_inject_delete(self, remote: Remote, dataset: str, delete_trigger: str) -> None:
1308 """For testing only; for unit tests to delete datasets during replication and test correct handling of that."""
1309 assert delete_trigger
1310 counter = self.delete_injection_triggers.get("before")
1311 if counter and self.decrement_injection_counter(counter, delete_trigger):
1312 p = self.params
1313 cmd = p.split_args(f"{remote.sudo} {p.zfs_program} destroy -r", p.force_unmount, p.force_hard, dataset or "")
1314 self.run_ssh_command(remote, LOG_DEBUG, print_stdout=True, cmd=cmd)
1316 def maybe_inject_params(self, error_trigger: str) -> None:
1317 """For testing only; for unit tests to simulate errors during replication and test correct handling of them."""
1318 assert error_trigger
1319 counter = self.error_injection_triggers.get("before")
1320 if counter and self.decrement_injection_counter(counter, error_trigger):
1321 self.inject_params = self.param_injection_triggers[error_trigger]
1322 elif error_trigger in self.param_injection_triggers:
1323 self.inject_params = {}
1325 @staticmethod
1326 def recv_option_property_names(recv_opts: list[str]) -> set[str]:
1327 """Extracts -o and -x property names that are already specified on the command line; This can be used to check for
1328 dupes because 'zfs receive' does not accept multiple -o or -x options with the same property name."""
1329 propnames: set[str] = set()
1330 i = 0
1331 n = len(recv_opts)
1332 while i < n:
1333 stripped: str = recv_opts[i].strip()
1334 if stripped in ("-o", "-x"):
1335 i += 1
1336 if i == n or recv_opts[i].strip() in ("-o", "-x"):
1337 die(f"Missing value for {stripped} option in --zfs-recv-program-opt(s): {' '.join(recv_opts)}")
1338 if stripped == "-o" and "=" not in recv_opts[i]:
1339 die(f"Missing value for {stripped} name=value pair in --zfs-recv-program-opt(s): {' '.join(recv_opts)}")
1340 propname: str = recv_opts[i] if stripped == "-x" else recv_opts[i].split("=", 1)[0]
1341 validate_property_name(propname, "--zfs-recv-program-opt(s)")
1342 propnames.add(propname)
1343 i += 1
1344 return propnames
1346 def root_datasets_if_recursive_zfs_snapshot_is_possible(
1347 self, datasets: list[str], basis_datasets: list[str]
1348 ) -> list[str] | None:
1349 """Returns the root datasets within the (filtered) `datasets` list if no incompatible pruning is detected. A dataset
1350 within `datasets` is considered a root dataset if it has no parent, i.e. it is not a descendant of any dataset in
1351 `datasets`. Returns `None` if any (unfiltered) dataset in `basis_dataset` that is a descendant of at least one of the
1352 root datasets is missing in `datasets`, indicating that --include/exclude-dataset* or the snapshot schedule have
1353 pruned a dataset in a way that is incompatible with 'zfs snapshot -r' CLI semantics, thus requiring a switch to the
1354 non-recursive 'zfs snapshot snapshot1 .. snapshot N' CLI flavor.
1356 Assumes that set(datasets).issubset(set(basis_datasets)). Also assumes that datasets and basis_datasets are both
1357 sorted (and thus the output root_datasets list is sorted too), which is why this algorithm is efficient - O(N) time
1358 complexity. The impl is akin to the merge algorithm of a merge sort, adapted to our specific use case.
1359 See root_datasets_if_recursive_zfs_snapshot_is_possible_slow_but_correct() in the unit test suite for an alternative
1360 impl that's easier to grok.
1361 """
1362 assert (not self.is_test_mode) or datasets == sorted(datasets), "List is not sorted"
1363 assert (not self.is_test_mode) or not has_duplicates(datasets), "List contains duplicates"
1364 assert (not self.is_test_mode) or basis_datasets == sorted(basis_datasets), "List is not sorted"
1365 assert (not self.is_test_mode) or not has_duplicates(basis_datasets), "List contains duplicates"
1366 assert (not self.is_test_mode) or set(datasets).issubset(set(basis_datasets)), "Not a subset"
1367 root_datasets: list[str] = self.find_root_datasets(datasets)
1368 i = 0
1369 j = 0
1370 k = 0
1371 len_root_datasets = len(root_datasets)
1372 len_basis_datasets = len(basis_datasets)
1373 len_datasets = len(datasets)
1374 while i < len_root_datasets and j < len_basis_datasets: # walk and "merge" the sorted lists, in sync
1375 if basis_datasets[j] < root_datasets[i]: # irrelevant subtree?
1376 j += 1 # move to next basis_datasets[j]
1377 elif is_descendant(basis_datasets[j], of_root_dataset=root_datasets[i]): # relevant subtree?
1378 while k < len_datasets and datasets[k] < basis_datasets[j]:
1379 k += 1 # move to next datasets[k]
1380 if k == len_datasets or datasets[k] != basis_datasets[j]: # dataset chopped off by schedule or --incl/excl*?
1381 return None # detected filter pruning that is incompatible with 'zfs snapshot -r'
1382 j += 1 # move to next basis_datasets[j]
1383 else:
1384 i += 1 # move to next root_dataset[i]; no need to check root_datasets that are no longer (or not yet) reachable
1385 return root_datasets
1387 @staticmethod
1388 def find_root_datasets(sorted_datasets: list[str]) -> list[str]:
1389 """Returns the roots of the subtrees in the (sorted) input datasets; The output root dataset list is sorted, too; A
1390 dataset is a root dataset if it has no parent, i.e. it is not a descendant of any dataset in the input datasets."""
1391 root_datasets: list[str] = []
1392 skip_dataset: str = DONT_SKIP_DATASET
1393 for dataset in sorted_datasets:
1394 if is_descendant(dataset, of_root_dataset=skip_dataset):
1395 continue
1396 skip_dataset = dataset
1397 root_datasets.append(dataset)
1398 return root_datasets
1400 def find_datasets_to_snapshot(self, sorted_datasets: list[str]) -> dict[SnapshotLabel, list[str]]:
1401 """Given a (sorted) list of source datasets, returns a dict where the key is a snapshot name (aka SnapshotLabel, e.g.
1402 bzfs_2024-11-06_08:30:05_hourly) and the value is the (sorted) (sub)list of datasets for which a snapshot needs to be
1403 created with that name, because these datasets are due per the schedule, either because the 'creation' time of their
1404 most recent snapshot with that name pattern is now too old, or such a snapshot does not even exist.
1406 The baseline implementation uses the 'zfs list -t snapshot' CLI to find the most recent snapshots, which is simple
1407 but doesn't scale well with the number of snapshots, at least if the goal is to take snapshots every second. An
1408 alternative, much more scalable, implementation queries the standard ZFS "snapshots_changed" dataset property
1409 (requires zfs >= 2.2.0), in combination with a local cache that stores this property, as well as the creation time of
1410 the most recent snapshot, for each SnapshotLabel and each dataset.
1411 """
1412 p, log = self.params, self.params.log
1413 src = p.src
1414 config: CreateSrcSnapshotConfig = p.create_src_snapshots_config
1415 datasets_to_snapshot: dict[SnapshotLabel, list[str]] = defaultdict(list)
1416 is_caching: bool = False
1417 interner: HashedInterner[datetime] = HashedInterner() # reduces memory footprint
1418 msgs: list[tuple[datetime, str, SnapshotLabel, str]] = []
1420 def create_snapshot_if_latest_is_too_old(
1421 datasets_to_snapshot: dict[SnapshotLabel, list[str]], dataset: str, label: SnapshotLabel, creation_unixtime: int
1422 ) -> None: # thread-safe
1423 """Schedules creation of a snapshot for the given label if the label's existing latest snapshot is too old."""
1424 creation_dt: datetime = datetime.fromtimestamp(creation_unixtime, tz=config.tz)
1425 log.log(LOG_TRACE, "Latest snapshot creation: %s for %s", creation_dt, label)
1426 duration_amount, duration_unit = config.suffix_durations[label.suffix]
1427 next_event_dt: datetime = config.anchors.round_datetime_up_to_duration_multiple(
1428 creation_dt + timedelta(microseconds=1), duration_amount, duration_unit
1429 )
1430 msg: str = ""
1431 if config.current_datetime >= next_event_dt:
1432 datasets_to_snapshot[label].append(dataset) # mark it as scheduled for snapshot creation
1433 msg = " has passed"
1434 next_event_dt = interner.intern(next_event_dt)
1435 msgs.append((next_event_dt, dataset, label, msg))
1436 if is_caching and not p.dry_run: # update cache with latest state from 'zfs list -t snapshot'
1437 # Per-label cache stores (atime=creation, mtime=snapshots_changed) so later runs can safely trust creation
1438 # only when the label's mtime matches the current dataset-level '=' cache value. Excludes timestamp of label.
1439 cache_file: str = self.cache.last_modified_cache_file(src, dataset, label_hashes[label])
1440 unixtimes: tuple[int, int] = (creation_unixtime, self.src_properties[dataset].snapshots_changed)
1441 set_last_modification_time_safe(cache_file, unixtime_in_secs=unixtimes, if_more_recent=True)
1443 labels: list[SnapshotLabel] = []
1444 config_labels: list[SnapshotLabel] = config.snapshot_labels()
1445 for label in config_labels:
1446 duration_amount_, _duration_unit = config.suffix_durations[label.suffix]
1447 if duration_amount_ == 0 or config.create_src_snapshots_even_if_not_due:
1448 datasets_to_snapshot[label] = sorted_datasets # take snapshot regardless of creation time of existing snaps
1449 else:
1450 labels.append(label)
1451 if len(labels) == 0:
1452 return datasets_to_snapshot # nothing more TBD
1453 label_hashes: dict[SnapshotLabel, str] = {
1454 label: sha256_128_urlsafe_base64(label.notimestamp_str()) for label in labels
1455 }
1457 # satisfy request from local cache as much as possible
1458 cached_datasets_to_snapshot: dict[SnapshotLabel, list[str]] = defaultdict(list)
1459 if is_caching_snapshots(p, src):
1460 sorted_datasets_todo: list[str] = []
1461 time_threshold: float = time.time() - MATURITY_TIME_THRESHOLD_SECS
1462 for dataset in sorted_datasets:
1463 cache: SnapshotCache = self.cache
1464 cached_snapshots_changed: int = cache.get_snapshots_changed(cache.last_modified_cache_file(src, dataset))
1465 if cached_snapshots_changed == 0:
1466 sorted_datasets_todo.append(dataset) # request cannot be answered from cache
1467 continue
1468 if cached_snapshots_changed != self.src_properties[dataset].snapshots_changed: # get that prop "for free"
1469 cache.invalidate_last_modified_cache_dataset(dataset)
1470 sorted_datasets_todo.append(dataset) # request cannot be answered from cache
1471 continue
1472 if cached_snapshots_changed >= time_threshold: # Avoid equal-second races: only trust matured cache entries
1473 sorted_datasets_todo.append(dataset) # cache entry isn't mature enough to be trusted; skip cache
1474 continue
1475 creation_unixtimes: list[int] = []
1476 for label_hash in label_hashes.values():
1477 # For per-label files, atime stores the latest matching snapshot's creation time, while mtime stores
1478 # the dataset-level snapshots_changed observed when this label file was written.
1479 atime, mtime = cache.get_snapshots_changed2(cache.last_modified_cache_file(src, dataset, label_hash))
1480 # Sanity check: trust per-label cache only when:
1481 # - mtime equals the dataset-level '=' cache (same snapshots_changed), and
1482 # - atime is plausible and not later than mtime (creation <= snapshots_changed), and
1483 # - neither atime nor mtime is zero (unknown provenance).
1484 # Otherwise fall back to 'zfs list -t snapshot' to avoid stale creation times after newer changes.
1485 if atime == 0 or mtime == 0 or mtime != cached_snapshots_changed or atime > mtime:
1486 sorted_datasets_todo.append(dataset) # request cannot be answered from cache
1487 break
1488 creation_unixtimes.append(atime)
1489 if len(creation_unixtimes) == len(labels):
1490 for j, label in enumerate(labels):
1491 create_snapshot_if_latest_is_too_old(
1492 cached_datasets_to_snapshot, dataset, label, creation_unixtimes[j]
1493 )
1494 sorted_datasets = sorted_datasets_todo
1496 def create_snapshot_fn(i: int, creation_unixtime_secs: int, dataset: str, snapshot: str) -> None:
1497 create_snapshot_if_latest_is_too_old(datasets_to_snapshot, dataset, labels[i], creation_unixtime_secs)
1499 def on_finish_dataset(dataset: str) -> None:
1500 if is_caching_snapshots(p, src) and not p.dry_run:
1501 set_last_modification_time_safe(
1502 self.cache.last_modified_cache_file(src, dataset),
1503 unixtime_in_secs=self.src_properties[dataset].snapshots_changed,
1504 if_more_recent=True,
1505 )
1507 # fallback to 'zfs list -t snapshot' for any remaining datasets, as these couldn't be satisfied from local cache
1508 is_caching = is_caching_snapshots(p, src)
1509 datasets_without_snapshots: list[str] = self.handle_minmax_snapshots(
1510 src, sorted_datasets, labels, fn_latest=create_snapshot_fn, fn_on_finish_dataset=on_finish_dataset
1511 )
1512 for lbl in labels: # merge (sorted) results from local cache + 'zfs list -t snapshot' into (sorted) combined result
1513 datasets_to_snapshot[lbl].sort()
1514 if datasets_without_snapshots or (lbl in cached_datasets_to_snapshot): # +take snaps for snapshot-less datasets
1515 datasets_to_snapshot[lbl] = list( # inputs to merge() are sorted, and outputs are sorted too
1516 heapq.merge(datasets_to_snapshot[lbl], cached_datasets_to_snapshot[lbl], datasets_without_snapshots)
1517 )
1518 for label, datasets in datasets_to_snapshot.items():
1519 assert (not self.is_test_mode) or datasets == sorted(datasets), "List is not sorted"
1520 assert (not self.is_test_mode) or not has_duplicates(datasets), "List contains duplicates"
1521 assert label
1523 msgs.sort() # sort by time, dataset, label
1524 for i in range(0, len(msgs), 10_000): # reduce logging overhead via mini-batching
1525 text = "".join(
1526 f"\nNext scheduled snapshot time: {next_event_dt} for {dataset}@{label}{msg}"
1527 for next_event_dt, dataset, label, msg in msgs[i : i + 10_000]
1528 )
1529 log.info("Next scheduled snapshot times ...%s", text)
1531 # sort keys to ensure that we take snapshots for dailies before hourlies, and so on
1532 label_indexes: dict[SnapshotLabel, int] = {label: k for k, label in enumerate(config_labels)}
1533 datasets_to_snapshot = dict(sorted(datasets_to_snapshot.items(), key=lambda kv: label_indexes[kv[0]]))
1534 return datasets_to_snapshot
1536 def handle_minmax_snapshots(
1537 self,
1538 remote: Remote,
1539 sorted_datasets: list[str],
1540 labels: list[SnapshotLabel],
1541 fn_latest: Callable[[int, int, str, str], None], # callback function for latest snapshot
1542 fn_oldest: Callable[[int, int, str, str], None] | None = None, # callback function for oldest snapshot
1543 fn_on_finish_dataset: Callable[[str], None] = lambda dataset: None,
1544 ) -> list[str]: # thread-safe
1545 """For each dataset in `sorted_datasets`, for each label in `labels`, finds the latest and oldest snapshot, and runs
1546 the callback functions on them; Ignores the timestamp of the input labels and the timestamp of the snapshot names."""
1547 assert (not self.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted"
1548 p = self.params
1549 cmd = p.split_args(f"{p.zfs_program} list -t snapshot -d 1 -Hp -o createtxg,creation,name") # sort dataset,createtxg
1550 datasets_with_snapshots: set[str] = set()
1551 interner: SortedInterner[str] = SortedInterner(sorted_datasets) # reduces memory footprint
1552 for lines in zfs_list_snapshots_in_parallel(self, remote, cmd, sorted_datasets, ordered=False):
1553 # streaming group by dataset name (consumes constant memory only)
1554 for dataset, group in itertools.groupby(lines, key=lambda line: line.rsplit("\t", 1)[1].split("@", 1)[0]):
1555 dataset = interner.interned(dataset)
1556 snapshots = sorted( # fetch all snapshots of current dataset and sort by createtxg,creation,name
1557 (int(createtxg), int(creation_unixtime_secs), name.split("@", 1)[1])
1558 for createtxg, creation_unixtime_secs, name in (line.split("\t", 2) for line in group)
1559 ) # perf: sorted() is fast because Timsort is close to O(N) for nearly sorted input, which is our case
1560 assert len(snapshots) > 0
1561 datasets_with_snapshots.add(dataset)
1562 snapshot_names: tuple[str, ...] = tuple(snapshot[-1] for snapshot in snapshots)
1563 reversed_snapshot_names: tuple[str, ...] = snapshot_names[::-1]
1564 year_with_4_digits_regex: re.Pattern[str] = YEAR_WITH_FOUR_DIGITS_REGEX
1565 year_with_4_digits_regex_fullmatch = year_with_4_digits_regex.fullmatch
1566 startswith = str.startswith
1567 endswith = str.endswith
1568 fns = ((fn_latest, True),) if fn_oldest is None else ((fn_latest, True), (fn_oldest, False))
1569 for i, label in enumerate(labels):
1570 infix: str = label.infix
1571 start: str = label.prefix + infix
1572 end: str = label.suffix
1573 startlen: int = len(start)
1574 endlen: int = len(end)
1575 minlen: int = startlen + endlen if infix else 4 + startlen + endlen # year_with_four_digits_regex
1576 startlen_4: int = startlen + 4 # [startlen:startlen+4] # year_with_four_digits_regex
1577 has_infix: bool = bool(infix)
1578 for fn, is_reverse in fns:
1579 creation_unixtime_secs: int = 0 # find creation time of latest or oldest snapshot matching the label
1580 minmax_snapshot: str = ""
1581 for j, snapshot_name in enumerate(reversed_snapshot_names if is_reverse else snapshot_names):
1582 if (
1583 endswith(snapshot_name, end) # aka snapshot_name.endswith(end)
1584 and startswith(snapshot_name, start) # aka snapshot_name.startswith(start)
1585 and len(snapshot_name) >= minlen
1586 and (has_infix or year_with_4_digits_regex_fullmatch(snapshot_name, startlen, startlen_4))
1587 ):
1588 k: int = len(snapshots) - j - 1 if is_reverse else j
1589 creation_unixtime_secs = snapshots[k][1]
1590 minmax_snapshot = snapshot_name
1591 break
1592 fn(i, creation_unixtime_secs, dataset, minmax_snapshot)
1593 fn_on_finish_dataset(dataset)
1594 datasets_without_snapshots = [dataset for dataset in sorted_datasets if dataset not in datasets_with_snapshots]
1595 return datasets_without_snapshots
1597 @staticmethod
1598 def _cache_hits_msg(hits: int, misses: int) -> str:
1599 total = hits + misses
1600 return f", cache hits: {percent(hits, total, print_total=True)}, misses: {percent(misses, total, print_total=True)}"
1602 def run_ssh_command(
1603 self,
1604 remote: MiniRemote,
1605 loglevel: int = logging.INFO,
1606 is_dry: bool = False,
1607 check: bool = True,
1608 print_stdout: bool = False,
1609 print_stderr: bool = True,
1610 cmd: list[str] | None = None,
1611 retry_on_generic_ssh_error: bool = True,
1612 ) -> str:
1613 """Runs the given CLI cmd via ssh on the given remote, and returns stdout."""
1614 assert cmd is not None and isinstance(cmd, list) and len(cmd) > 0
1615 conn_pool: ConnectionPool = self.params.connection_pools[remote.location].pool(SHARED)
1616 with conn_pool.connection() as conn:
1617 log: logging.Logger = self.params.log
1618 try:
1619 process: subprocess.CompletedProcess[str] = conn.run_ssh_command(
1620 cmd=cmd,
1621 job=self,
1622 loglevel=loglevel,
1623 is_dry=is_dry,
1624 check=check,
1625 stdin=DEVNULL,
1626 stdout=PIPE,
1627 stderr=PIPE,
1628 text=True,
1629 )
1630 except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
1631 xprint(log, stderr_to_str(e.stdout) if print_stdout else e.stdout, run=print_stdout, file=sys.stdout, end="")
1632 xprint(log, stderr_to_str(e.stderr) if print_stderr else e.stderr, run=print_stderr, file=sys.stderr, end="")
1633 if retry_on_generic_ssh_error and isinstance(e, subprocess.CalledProcessError):
1634 stderr: str = stderr_to_str(e.stderr)
1635 if stderr.startswith("ssh: "):
1636 assert e.returncode == 255, e.returncode # error within SSH itself (not during the remote command)
1637 raise RetryableError("Subprocess failed", display_msg="ssh") from e
1638 raise
1639 else:
1640 if is_dry:
1641 return ""
1642 xprint(log, process.stdout, run=print_stdout, file=sys.stdout, end="")
1643 xprint(log, process.stderr, run=print_stderr, file=sys.stderr, end="")
1644 return process.stdout
1646 def try_ssh_command(
1647 self,
1648 remote: MiniRemote,
1649 loglevel: int,
1650 is_dry: bool = False,
1651 print_stdout: bool = False,
1652 cmd: list[str] | None = None,
1653 exists: bool = True,
1654 error_trigger: str | None = None,
1655 ) -> str | None:
1656 """Convenience method that helps retry/react to a dataset or pool that potentially doesn't exist anymore."""
1657 assert cmd is not None and isinstance(cmd, list) and len(cmd) > 0
1658 log = self.params.log
1659 try:
1660 self.maybe_inject_error(cmd=cmd, error_trigger=error_trigger)
1661 return self.run_ssh_command(remote=remote, loglevel=loglevel, is_dry=is_dry, print_stdout=print_stdout, cmd=cmd)
1662 except (subprocess.CalledProcessError, UnicodeDecodeError) as e:
1663 if not isinstance(e, UnicodeDecodeError):
1664 stderr: str = stderr_to_str(e.stderr)
1665 if exists and (
1666 ": dataset does not exist" in stderr
1667 or ": filesystem does not exist" in stderr # solaris 11.4.0
1668 or ": no such pool" in stderr
1669 or "does not have any resumable receive state to abort" in stderr # harmless `zfs receive -A` race
1670 ):
1671 return None
1672 log.warning("%s", stderr.rstrip())
1673 raise RetryableError("Subprocess failed") from e
1675 def try_ssh_command_with_retries(self, *args: Any, **kwargs: Any) -> str | None:
1676 """Convenience method that auto-retries try_ssh_command() on failure."""
1677 p = self.params
1678 return call_with_retries(
1679 fn=lambda retry: self.try_ssh_command(*args, **kwargs),
1680 policy=p.retry_policy,
1681 config=self.retry_options.config,
1682 giveup=self.retry_options.giveup,
1683 after_attempt=self.retry_options.after_attempt,
1684 log=p.log,
1685 )
1687 def run_ssh_command_with_retries(self, *args: Any, **kwargs: Any) -> str:
1688 """Convenience method that auto-retries run_ssh_command() on transport failure (not on remote command failure)."""
1689 p = self.params
1690 return call_with_retries(
1691 fn=lambda retry: self.run_ssh_command(*args, **kwargs),
1692 policy=p.retry_policy,
1693 config=self.retry_options.config,
1694 giveup=self.retry_options.giveup,
1695 after_attempt=self.retry_options.after_attempt,
1696 log=p.log,
1697 )
1699 def maybe_inject_error(self, cmd: list[str], error_trigger: str | None = None) -> None:
1700 """For testing only; for unit tests to simulate errors during replication and test correct handling of them."""
1701 if error_trigger:
1702 counter = self.error_injection_triggers.get("before")
1703 if counter and self.decrement_injection_counter(counter, error_trigger):
1704 try:
1705 raise CalledProcessError(returncode=1, cmd=" ".join(cmd), stderr=error_trigger + ":dataset is busy")
1706 except subprocess.CalledProcessError as e:
1707 if error_trigger.startswith("retryable_"):
1708 raise RetryableError("Subprocess failed") from e
1709 else:
1710 raise
1712 def decrement_injection_counter(self, counter: Counter[str], trigger: str) -> bool:
1713 """For testing only."""
1714 with self.injection_lock:
1715 if counter[trigger] <= 0:
1716 return False
1717 counter[trigger] -= 1
1718 return True
1721#############################################################################
1722@final
1723class DatasetProperties:
1724 """Properties of a ZFS dataset."""
1726 __slots__ = ("recordsize", "snapshots_changed") # uses more compact memory layout than __dict__
1728 def __init__(self, recordsize: int, snapshots_changed: int) -> None:
1729 # immutable variables:
1730 self.recordsize: Final[int] = recordsize
1732 # mutable variables:
1733 self.snapshots_changed: int = snapshots_changed
1736#############################################################################
1737# Input format is [[user@]host:]dataset
1738# 1234 5 6
1739_DATASET_LOCATOR_REGEX: Final[re.Pattern[str]] = re.compile(r"(((([^@]*)@)?([^:]+)):)?(.*)", flags=re.DOTALL)
1742def parse_dataset_locator(
1743 input_text: str, validate: bool = True, user: str | None = None, host: str | None = None, port: int | None = None
1744) -> tuple[str, str, str, str, str]:
1745 """Splits user@host:dataset into its components with optional checks."""
1747 def convert_ipv6(hostname: str) -> str: # support IPv6 without getting confused by host:dataset colon separator ...
1748 return hostname.replace("|", ":") # ... and any colons that may be part of a (valid) ZFS dataset name
1750 user_undefined: bool = user is None
1751 if user is None:
1752 user = ""
1753 host_undefined: bool = host is None
1754 if host is None:
1755 host = ""
1756 host = convert_ipv6(host)
1757 user_host, dataset, pool = "", "", ""
1759 if match := _DATASET_LOCATOR_REGEX.fullmatch(input_text): 1759 ↛ 1776line 1759 didn't jump to line 1776 because the condition on line 1759 was always true
1760 if user_undefined:
1761 user = match.group(4) or ""
1762 if host_undefined:
1763 host = match.group(5) or ""
1764 host = convert_ipv6(host)
1765 if host == "-":
1766 host = ""
1767 dataset = match.group(6) or ""
1768 i = dataset.find("/")
1769 pool = dataset[0:i] if i >= 0 else dataset
1771 if user and host:
1772 user_host = f"{user}@{host}"
1773 elif host:
1774 user_host = host
1776 if validate:
1777 validate_user_name(user, input_text)
1778 validate_host_name(host, input_text)
1779 if port is not None:
1780 validate_port(port, f"Invalid port number: '{port}' for: '{input_text}' - ")
1781 validate_dataset_name(dataset, input_text)
1783 return user, host, user_host, pool, dataset
1786def validate_user_name(user: str, input_text: str) -> None:
1787 """Checks that the username is safe for ssh or local usage."""
1788 invalid_chars: str = SHELL_CHARS + "/"
1789 if user and (".." in user or any(c.isspace() or c in invalid_chars for c in user)):
1790 die(f"Invalid user name: '{user}' for: '{input_text}'")
1793def validate_host_name(host: str, input_text: str) -> None:
1794 """Checks hostname for forbidden characters or patterns."""
1795 invalid_chars: str = SHELL_CHARS + "/"
1796 if host and (host.startswith("-") or ".." in host or any(c.isspace() or c in invalid_chars for c in host)):
1797 die(f"Invalid host name: '{host}' for: '{input_text}'")
1800def validate_port(port: str | int | None, message: str) -> None:
1801 """Checks that port specification is a valid integer."""
1802 if isinstance(port, int):
1803 port = str(port)
1804 if port and not port.isdigit():
1805 die(message + f"must be empty or a positive integer: '{port}'")
1808#############################################################################
1809if __name__ == "__main__": 1809 ↛ 1810line 1809 didn't jump to line 1810 because the condition on line 1809 was never true
1810 main()