Coverage for bzfs_main/configuration.py: 99%
539 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Configuration subsystem; All CLI option/parameter values are reachable from the "Params" class."""
17from __future__ import annotations
18import argparse
19import ast
20import hashlib
21import os
22import platform
23import random
24import re
25import shutil
26import stat
27import sys
28import tempfile
29import time
30from dataclasses import dataclass
31from datetime import datetime, tzinfo
32from logging import Logger
33from typing import (
34 TYPE_CHECKING,
35 Iterable,
36 Literal,
37 NamedTuple,
38 cast,
39)
41import bzfs_main.utils
42from bzfs_main.argparse_actions import (
43 SnapshotFilter,
44 optimize_snapshot_filters,
45 validate_no_argument_file,
46)
47from bzfs_main.argparse_cli import (
48 LOG_DIR_DEFAULT,
49 ZFS_RECV_GROUPS,
50 __version__,
51)
52from bzfs_main.detect import (
53 DISABLE_PRG,
54)
55from bzfs_main.period_anchors import (
56 PeriodAnchors,
57)
58from bzfs_main.retry import (
59 RetryPolicy,
60)
61from bzfs_main.utils import (
62 DIR_PERMISSIONS,
63 PROG_NAME,
64 SHELL_CHARS,
65 SNAPSHOT_FILTERS_VAR,
66 UNIX_TIME_INFINITY_SECS,
67 RegexList,
68 SnapshotPeriods,
69 SynchronizedBool,
70 append_if_absent,
71 compile_regexes,
72 current_datetime,
73 die,
74 get_home_directory,
75 get_timezone,
76 getenv_bool,
77 getenv_int,
78 is_included,
79 ninfix,
80 nprefix,
81 nsuffix,
82 parse_duration_to_milliseconds,
83 pid_exists,
84 validate_dataset_name,
85 validate_is_not_a_symlink,
86 validate_property_name,
87 xappend,
88)
90if TYPE_CHECKING: # pragma: no cover - for type hints only
91 from bzfs_main.connection import ConnectionPools
94#############################################################################
95class LogParams:
96 """Option values for logging."""
98 def __init__(self, args: argparse.Namespace) -> None:
99 """Reads from ArgumentParser via args."""
100 # immutable variables:
101 if args.quiet:
102 self.log_level: str = "ERROR"
103 elif args.verbose >= 2:
104 self.log_level = "TRACE"
105 elif args.verbose >= 1:
106 self.log_level = "DEBUG"
107 else:
108 self.log_level = "INFO"
109 self.log_config_file: str = args.log_config_file
110 if self.log_config_file:
111 validate_no_argument_file(self.log_config_file, args, err_prefix="--log-config-file: ")
112 self.log_config_vars: dict[str, str] = dict(var.split(":", 1) for var in args.log_config_var)
113 self.timestamp: str = datetime.now().isoformat(sep="_", timespec="seconds") # 2024-09-03_12:26:15
114 self.home_dir: str = get_home_directory()
115 log_parent_dir: str = args.log_dir if args.log_dir else os.path.join(self.home_dir, LOG_DIR_DEFAULT)
116 if LOG_DIR_DEFAULT not in os.path.basename(log_parent_dir):
117 die(f"Basename of --log-dir must contain the substring '{LOG_DIR_DEFAULT}', but got: {log_parent_dir}")
118 sep: str = "_" if args.log_subdir == "daily" else ":"
119 timestamp: str = self.timestamp
120 subdir: str = timestamp[0 : timestamp.rindex(sep) if args.log_subdir == "minutely" else timestamp.index(sep)]
121 self.log_dir: str = os.path.join(log_parent_dir, subdir) # 2024-09-03 (d), 2024-09-03_12 (h), 2024-09-03_12:26 (m)
122 os.makedirs(log_parent_dir, mode=DIR_PERMISSIONS, exist_ok=True)
123 validate_is_not_a_symlink("--log-dir ", log_parent_dir)
124 os.makedirs(self.log_dir, mode=DIR_PERMISSIONS, exist_ok=True)
125 validate_is_not_a_symlink("--log-dir subdir ", self.log_dir)
126 self.log_file_prefix: str = args.log_file_prefix
127 self.log_file_infix: str = args.log_file_infix
128 self.log_file_suffix: str = args.log_file_suffix
129 fd, self.log_file = tempfile.mkstemp(
130 suffix=".log",
131 prefix=f"{self.log_file_prefix}{self.timestamp}{self.log_file_infix}{self.log_file_suffix}-",
132 dir=self.log_dir,
133 )
134 os.close(fd)
135 self.pv_log_file: str = self.log_file[0 : -len(".log")] + ".pv"
136 self.last_modified_cache_dir: str = os.path.join(log_parent_dir, ".cache", "last_modified")
137 os.makedirs(os.path.dirname(self.last_modified_cache_dir), mode=DIR_PERMISSIONS, exist_ok=True)
139 # Create/update "current" symlink to current_dir, which is a subdir containing further symlinks to log files.
140 # For parallel usage, ensures there is no time window when the symlinks are inconsistent or do not exist.
141 current: str = "current"
142 dot_current_dir: str = os.path.join(log_parent_dir, f".{current}")
143 current_dir: str = os.path.join(dot_current_dir, os.path.basename(self.log_file)[0 : -len(".log")])
144 os.makedirs(dot_current_dir, mode=DIR_PERMISSIONS, exist_ok=True)
145 validate_is_not_a_symlink("--log-dir: .current ", dot_current_dir)
146 os.makedirs(current_dir, mode=DIR_PERMISSIONS, exist_ok=True)
147 validate_is_not_a_symlink("--log-dir: current ", current_dir)
148 _create_symlink(self.log_file, current_dir, f"{current}.log")
149 _create_symlink(self.pv_log_file, current_dir, f"{current}.pv")
150 _create_symlink(self.log_dir, current_dir, f"{current}.dir")
151 dst_file: str = os.path.join(current_dir, current)
152 os.symlink(os.path.relpath(current_dir, start=log_parent_dir), dst_file)
153 os.replace(dst_file, os.path.join(log_parent_dir, current)) # atomic rename
154 _delete_stale_files(dot_current_dir, prefix="", millis=10, dirs=True, exclude=os.path.basename(current_dir))
155 self.params: Params | None = None
157 def __repr__(self) -> str:
158 return str(self.__dict__)
161#############################################################################
162class Params:
163 """All parsed CLI options combined into a single bundle; simplifies passing around numerous settings and defaults."""
165 def __init__(
166 self,
167 args: argparse.Namespace,
168 sys_argv: list[str],
169 log_params: LogParams,
170 log: Logger,
171 inject_params: dict[str, bool] | None = None,
172 ) -> None:
173 """Reads from ArgumentParser via args."""
174 # immutable variables:
175 assert args is not None
176 assert isinstance(sys_argv, list)
177 assert log_params is not None
178 assert log is not None
179 self.args: argparse.Namespace = args
180 self.sys_argv: list[str] = sys_argv
181 self.log_params: LogParams = log_params
182 self.log: Logger = log
183 self.inject_params: dict[str, bool] = inject_params if inject_params is not None else {} # for testing only
184 self.one_or_more_whitespace_regex: re.Pattern = re.compile(r"\s+")
185 self.two_or_more_spaces_regex: re.Pattern = re.compile(r" +")
186 self._unset_matching_env_vars(args)
187 self.xperiods: SnapshotPeriods = SnapshotPeriods()
189 assert len(args.root_dataset_pairs) > 0
190 self.root_dataset_pairs: list[tuple[str, str]] = args.root_dataset_pairs
191 self.recursive: bool = args.recursive
192 self.recursive_flag: str = "-r" if args.recursive else ""
194 self.dry_run: bool = args.dryrun is not None
195 self.dry_run_recv: str = "-n" if self.dry_run else ""
196 self.dry_run_destroy: str = self.dry_run_recv
197 self.dry_run_no_send: bool = args.dryrun == "send"
198 self.verbose_zfs: bool = args.verbose >= 2
199 self.verbose_destroy: str = "" if args.quiet else "-v"
200 self.quiet: bool = args.quiet
202 self.zfs_send_program_opts: list[str] = self._fix_send_opts(self.split_args(args.zfs_send_program_opts))
203 zfs_recv_program_opts: list[str] = self.split_args(args.zfs_recv_program_opts)
204 for extra_opt in args.zfs_recv_program_opt:
205 zfs_recv_program_opts.append(self.validate_arg_str(extra_opt, allow_all=True))
206 preserve_properties = [validate_property_name(name, "--preserve-properties") for name in args.preserve_properties]
207 zfs_recv_program_opts, zfs_recv_x_names = self._fix_recv_opts(zfs_recv_program_opts, frozenset(preserve_properties))
208 self.zfs_recv_program_opts: list[str] = zfs_recv_program_opts
209 self.zfs_recv_x_names: list[str] = zfs_recv_x_names
210 if self.verbose_zfs:
211 append_if_absent(self.zfs_send_program_opts, "-v")
212 append_if_absent(self.zfs_recv_program_opts, "-v")
213 self.zfs_full_recv_opts: list[str] = self.zfs_recv_program_opts.copy()
214 cpconfigs = [CopyPropertiesConfig(group, flag, args, self) for group, flag in ZFS_RECV_GROUPS.items()]
215 self.zfs_recv_o_config, self.zfs_recv_x_config, self.zfs_set_config = cpconfigs
217 self.force_rollback_to_latest_snapshot: bool = args.force_rollback_to_latest_snapshot
218 self.force_rollback_to_latest_common_snapshot = SynchronizedBool(args.force_rollback_to_latest_common_snapshot)
219 self.force: SynchronizedBool = SynchronizedBool(args.force)
220 self.force_once: bool = args.force_once
221 self.force_unmount: str = "-f" if args.force_unmount else ""
222 force_hard: str = "-R" if args.force_destroy_dependents else ""
223 self.force_hard: str = "-R" if args.force_hard else force_hard # --force-hard is deprecated
225 self.skip_parent: bool = args.skip_parent
226 self.skip_missing_snapshots: str = args.skip_missing_snapshots
227 self.skip_on_error: str = args.skip_on_error
228 self.retry_policy: RetryPolicy = RetryPolicy(args)
229 self.skip_replication: bool = args.skip_replication
230 self.delete_dst_snapshots: bool = args.delete_dst_snapshots is not None
231 self.delete_dst_bookmarks: bool = args.delete_dst_snapshots == "bookmarks"
232 self.delete_dst_snapshots_no_crosscheck: bool = args.delete_dst_snapshots_no_crosscheck
233 self.delete_dst_snapshots_except: bool = args.delete_dst_snapshots_except
234 self.delete_dst_datasets: bool = args.delete_dst_datasets
235 self.delete_empty_dst_datasets: bool = args.delete_empty_dst_datasets is not None
236 self.delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots: bool = (
237 args.delete_empty_dst_datasets == "snapshots+bookmarks"
238 )
239 self.compare_snapshot_lists: str = args.compare_snapshot_lists
240 self.daemon_lifetime_nanos: int = 1_000_000 * parse_duration_to_milliseconds(args.daemon_lifetime)
241 self.daemon_frequency: str = args.daemon_frequency
242 self.enable_privilege_elevation: bool = not args.no_privilege_elevation
243 self.no_stream: bool = args.no_stream
244 self.resume_recv: bool = not args.no_resume_recv
245 self.create_bookmarks: str = "none" if args.no_create_bookmark else args.create_bookmarks # no_create_bookmark depr
246 self.use_bookmark: bool = not args.no_use_bookmark
248 self.src: Remote = Remote("src", args, self) # src dataset, host and ssh options
249 self.dst: Remote = Remote("dst", args, self) # dst dataset, host and ssh options
250 self.create_src_snapshots_config: CreateSrcSnapshotConfig = CreateSrcSnapshotConfig(args, self)
251 self.monitor_snapshots_config: MonitorSnapshotsConfig = MonitorSnapshotsConfig(args, self)
252 self.is_caching_snapshots: bool = args.cache_snapshots == "true"
254 self.compression_program: str = self._program_name(args.compression_program)
255 self.compression_program_opts: list[str] = self.split_args(args.compression_program_opts)
256 for opt in ["-o", "--output-file"]:
257 if opt in self.compression_program_opts:
258 die(f"--compression-program-opts: {opt} is disallowed for security reasons.")
259 self.getconf_program: str = self._program_name("getconf") # print number of CPUs on POSIX except Solaris
260 self.psrinfo_program: str = self._program_name("psrinfo") # print number of CPUs on Solaris
261 self.mbuffer_program: str = self._program_name(args.mbuffer_program)
262 self.mbuffer_program_opts: list[str] = self.split_args(args.mbuffer_program_opts)
263 for opt in ["-i", "-I", "-o", "-O", "-l", "-L"]:
264 if opt in self.mbuffer_program_opts:
265 die(f"--mbuffer-program-opts: {opt} is disallowed for security reasons.")
266 self.ps_program: str = self._program_name(args.ps_program)
267 self.pv_program: str = self._program_name(args.pv_program)
268 self.pv_program_opts: list[str] = self.split_args(args.pv_program_opts)
269 for opt in ["-f", "--log-file"]:
270 if opt in self.pv_program_opts:
271 die(f"--pv-program-opts: {opt} is disallowed for security reasons.")
272 self.isatty: bool = getenv_bool("isatty", True)
273 if args.bwlimit:
274 self.pv_program_opts += [f"--rate-limit={self.validate_arg_str(args.bwlimit)}"]
275 self.shell_program_local: str = "sh"
276 self.shell_program: str = self._program_name(args.shell_program)
277 self.ssh_program: str = self._program_name(args.ssh_program)
278 self.sudo_program: str = self._program_name(args.sudo_program)
279 self.uname_program: str = self._program_name("uname")
280 self.zfs_program: str = self._program_name("zfs")
281 self.zpool_program: str = self._program_name(args.zpool_program)
283 # no point creating complex shell pipeline commands for tiny data transfers:
284 self.min_pipe_transfer_size: int = getenv_int("min_pipe_transfer_size", 1024 * 1024)
285 self.max_datasets_per_batch_on_list_snaps: int = getenv_int("max_datasets_per_batch_on_list_snaps", 1024)
286 self.max_datasets_per_minibatch_on_list_snaps: int = getenv_int("max_datasets_per_minibatch_on_list_snaps", -1)
287 self.max_snapshots_per_minibatch_on_delete_snaps = getenv_int("max_snapshots_per_minibatch_on_delete_snaps", 2**29)
288 self.dedicated_tcp_connection_per_zfs_send: bool = getenv_bool("dedicated_tcp_connection_per_zfs_send", True)
289 self.threads: tuple[int, bool] = (1, False) if self.force_once else args.threads
290 timeout_nanos = None if args.timeout is None else 1_000_000 * parse_duration_to_milliseconds(args.timeout)
291 self.timeout_nanos: int | None = timeout_nanos
292 self.no_estimate_send_size: bool = args.no_estimate_send_size
293 self.remote_conf_cache_ttl_nanos: int = 1_000_000 * parse_duration_to_milliseconds(args.daemon_remote_conf_cache_ttl)
294 self.terminal_columns: int = (
295 getenv_int("terminal_columns", shutil.get_terminal_size(fallback=(120, 24)).columns)
296 if self.isatty and self.pv_program != DISABLE_PRG and not self.quiet
297 else 0
298 )
300 self.os_cpu_count: int | None = os.cpu_count()
301 self.os_geteuid: int = os.geteuid()
302 self.prog_version: str = __version__
303 self.python_version: str = sys.version
304 self.platform_version: str = platform.version()
305 self.platform_platform: str = platform.platform()
307 # mutable variables:
308 snapshot_filters = args.snapshot_filters_var if hasattr(args, SNAPSHOT_FILTERS_VAR) else [[]]
309 self.snapshot_filters: list[list[SnapshotFilter]] = [optimize_snapshot_filters(f) for f in snapshot_filters]
310 self.exclude_dataset_property: str | None = args.exclude_dataset_property
311 self.exclude_dataset_regexes: RegexList = [] # deferred to validate_task() phase
312 self.include_dataset_regexes: RegexList = [] # deferred to validate_task() phase
313 self.tmp_exclude_dataset_regexes: RegexList = [] # deferred to validate_task() phase
314 self.tmp_include_dataset_regexes: RegexList = [] # deferred to validate_task() phase
315 self.abs_exclude_datasets: list[str] = [] # deferred to validate_task() phase
316 self.abs_include_datasets: list[str] = [] # deferred to validate_task() phase
318 self.curr_zfs_send_program_opts: list[str] = []
319 self.zfs_recv_ox_names: set[str] = set()
320 self.available_programs: dict[str, dict[str, str]] = {}
321 self.zpool_features: dict[str, dict[str, str]] = {}
322 self.connection_pools: dict[str, ConnectionPools] = {}
324 def split_args(self, text: str, *items: str | Iterable[str], allow_all: bool = False) -> list[str]:
325 """Splits option string on runs of one or more whitespace into an option list."""
326 text = text.strip()
327 opts = self.one_or_more_whitespace_regex.split(text) if text else []
328 xappend(opts, items)
329 if not allow_all:
330 self._validate_quoting(opts)
331 return opts
333 def validate_arg(self, opt: str, allow_spaces: bool = False, allow_all: bool = False) -> str | None:
334 """allow_all permits all characters, including whitespace and quotes; See squote() and dquote()."""
335 if allow_all or opt is None:
336 return opt
337 if any(char.isspace() and (char != " " or not allow_spaces) for char in opt):
338 die(f"Option must not contain a whitespace character{' other than space' if allow_spaces else ''}: {opt}")
339 self._validate_quoting([opt])
340 return opt
342 def validate_arg_str(self, opt: str, allow_spaces: bool = False, allow_all: bool = False) -> str:
343 """Returns validated option string, raising if missing or illegal."""
344 if opt is None:
345 die("Option must not be missing")
346 self.validate_arg(opt, allow_spaces=allow_spaces, allow_all=allow_all)
347 return opt
349 @staticmethod
350 def _validate_quoting(opts: list[str]) -> None:
351 """Raises an error if any option contains a quote or shell metacharacter."""
352 for opt in opts:
353 if "'" in opt or '"' in opt or "$" in opt or "`" in opt:
354 die(f"Option must not contain a single quote or double quote or dollar or backtick character: {opt}")
356 @staticmethod
357 def _fix_recv_opts(opts: list[str], preserve_properties: frozenset[str]) -> tuple[list[str], list[str]]:
358 """Returns sanitized ``zfs recv`` options and captured ``-o/-x`` args."""
359 return _fix_send_recv_opts(
360 opts,
361 exclude_long_opts={"--dryrun"},
362 exclude_short_opts="n",
363 include_arg_opts={"-o", "-x"},
364 preserve_properties=preserve_properties,
365 )
367 @staticmethod
368 def _fix_send_opts(opts: list[str]) -> list[str]:
369 """Returns sanitized ``zfs send`` options."""
370 return _fix_send_recv_opts(
371 opts,
372 exclude_long_opts={"--dryrun"},
373 exclude_short_opts="den",
374 include_arg_opts={"-X", "--exclude", "--redact"},
375 exclude_arg_opts=frozenset({"-i", "-I"}),
376 )[0]
378 def _program_name(self, program: str) -> str:
379 """For testing: helps simulate errors caused by external programs."""
380 self.validate_arg_str(program)
381 if not program:
382 die(f"Program name must not be missing: {program}")
383 for char in SHELL_CHARS + ":":
384 if char in program:
385 die(f"Program name must not contain a '{char}' character: {program}")
386 if self.inject_params.get("inject_unavailable_" + program, False):
387 return program + "-xxx" # substitute a program that cannot be found on the PATH
388 if self.inject_params.get("inject_failing_" + program, False):
389 return "false" # substitute a program that will error out with non-zero return code
390 return program
392 def _unset_matching_env_vars(self, args: argparse.Namespace) -> None:
393 """Unset environment variables matching regex filters."""
394 exclude_envvar_regexes: RegexList = compile_regexes(args.exclude_envvar_regex)
395 include_envvar_regexes: RegexList = compile_regexes(args.include_envvar_regex)
396 for envvar_name in list(os.environ.keys()):
397 if is_included(envvar_name, exclude_envvar_regexes, include_envvar_regexes):
398 os.environ.pop(envvar_name, None)
399 self.log.debug("Unsetting b/c envvar regex: %s", envvar_name)
401 def lock_file_name(self) -> str:
402 """Returns unique path used to detect concurrently running jobs.
404 Makes it such that a job that runs periodically declines to start if the same previous periodic job is still running
405 without completion yet. Hashed key avoids overly long filenames while remaining deterministic.
406 """
407 # fmt: off
408 key = (tuple(self.root_dataset_pairs), self.args.recursive, self.args.exclude_dataset_property,
409 tuple(self.args.include_dataset), tuple(self.args.exclude_dataset),
410 tuple(self.args.include_dataset_regex), tuple(self.args.exclude_dataset_regex),
411 tuple(tuple(f) for f in self.snapshot_filters), self.args.skip_replication, self.args.create_src_snapshots,
412 self.args.create_src_snapshots_plan, self.args.create_src_snapshots_timeformat,
413 self.create_src_snapshots_config.anchors,
414 self.args.delete_dst_datasets, self.args.delete_dst_snapshots, self.args.delete_dst_snapshots_except,
415 self.args.delete_empty_dst_datasets,
416 self.args.compare_snapshot_lists, self.args.monitor_snapshots,
417 self.args.log_file_infix,
418 self.src.basis_ssh_host, self.dst.basis_ssh_host,
419 self.src.basis_ssh_user, self.dst.basis_ssh_user)
420 # fmt: on
421 hash_code: str = hashlib.sha256(str(key).encode("utf-8")).hexdigest()
422 return os.path.join(tempfile.gettempdir(), f"{PROG_NAME}-lockfile-{hash_code}.lock")
424 def dry(self, msg: str) -> str:
425 """Prefix ``msg`` with 'Dry' when running in dry-run mode."""
426 return bzfs_main.utils.dry(msg, self.dry_run)
428 def is_program_available(self, program: str, location: str) -> bool:
429 """Return True if ``program`` was detected on ``location`` host."""
430 return program in self.available_programs.get(location, {})
433#############################################################################
434class Remote:
435 """Connection settings for either source or destination host."""
437 def __init__(self, loc: str, args: argparse.Namespace, p: Params) -> None:
438 """Reads from ArgumentParser via args."""
439 # immutable variables:
440 assert loc == "src" or loc == "dst"
441 self.location: str = loc
442 self.params: Params = p
443 self.basis_ssh_user: str = getattr(args, f"ssh_{loc}_user")
444 self.basis_ssh_host: str = getattr(args, f"ssh_{loc}_host")
445 self.ssh_port: int = getattr(args, f"ssh_{loc}_port")
446 self.ssh_config_file: str | None = p.validate_arg(getattr(args, f"ssh_{loc}_config_file"))
447 if self.ssh_config_file:
448 if "bzfs_ssh_config" not in os.path.basename(self.ssh_config_file):
449 die(f"Basename of --ssh-{loc}-config-file must contain substring 'bzfs_ssh_config': {self.ssh_config_file}")
450 # disable interactive password prompts and X11 forwarding and pseudo-terminal allocation:
451 self.ssh_extra_opts: list[str] = ["-oBatchMode=yes", "-oServerAliveInterval=0", "-x", "-T"] + (
452 ["-v"] if args.verbose >= 3 else []
453 )
454 self.max_concurrent_ssh_sessions_per_tcp_connection: int = args.max_concurrent_ssh_sessions_per_tcp_connection
455 self.reuse_ssh_connection: bool = getenv_bool("reuse_ssh_connection", True)
456 if self.reuse_ssh_connection:
457 self.ssh_socket_dir: str = os.path.join(get_home_directory(), ".ssh", "bzfs")
458 os.makedirs(os.path.dirname(self.ssh_socket_dir), exist_ok=True)
459 os.makedirs(self.ssh_socket_dir, mode=DIR_PERMISSIONS, exist_ok=True)
460 self.socket_prefix: str = "s"
461 _delete_stale_files(self.ssh_socket_dir, self.socket_prefix, ssh=True)
462 self.sanitize1_regex: re.Pattern = re.compile(r"[\s\\/@$]") # replace whitespace, /, $, \, @ with a ~ tilde char
463 self.sanitize2_regex: re.Pattern = re.compile(rf"[^a-zA-Z0-9{re.escape('~.:_-')}]") # Remove disallowed chars
465 # mutable variables:
466 self.root_dataset: str = "" # deferred until run_main()
467 self.basis_root_dataset: str = "" # deferred until run_main()
468 self.pool: str = ""
469 self.sudo: str = ""
470 self.use_zfs_delegation: bool = False
471 self.ssh_user: str = ""
472 self.ssh_host: str = ""
473 self.ssh_user_host: str = ""
474 self.is_nonlocal: bool = False
476 def local_ssh_command(self) -> list[str]:
477 """Returns the ssh CLI command to run locally in order to talk to the remote host; This excludes the (trailing)
478 command to run on the remote host, which will be appended later."""
479 if self.ssh_user_host == "":
480 return [] # dataset is on local host - don't use ssh
482 # dataset is on remote host
483 p: Params = self.params
484 if p.ssh_program == DISABLE_PRG:
485 die("Cannot talk to remote host because ssh CLI is disabled.")
486 ssh_cmd: list[str] = [p.ssh_program] + self.ssh_extra_opts
487 if self.ssh_config_file:
488 ssh_cmd += ["-F", self.ssh_config_file]
489 if self.ssh_port:
490 ssh_cmd += ["-p", str(self.ssh_port)]
491 if self.reuse_ssh_connection:
492 # Performance: reuse ssh connection for low latency startup of frequent ssh invocations via the 'ssh -S' and
493 # 'ssh -S -M -oControlPersist=60s' options. See https://en.wikibooks.org/wiki/OpenSSH/Cookbook/Multiplexing
494 # Generate unique private Unix domain socket file name in user's home dir and pass it to 'ssh -S /path/to/socket'
495 def sanitize(name: str) -> str:
496 name = self.sanitize1_regex.sub("~", name) # replace whitespace, /, $, \, @ with a ~ tilde char
497 name = self.sanitize2_regex.sub("", name) # Remove disallowed chars
498 return name
500 unique: str = f"{os.getpid()}@{time.time_ns()}@{random.SystemRandom().randint(0, 999_999_999_999)}"
501 socket_name: str = f"{self.socket_prefix}{unique}@{sanitize(self.ssh_host)[:45]}@{sanitize(self.ssh_user)}"
502 socket_file: str = os.path.join(self.ssh_socket_dir, socket_name)[: max(100, len(self.ssh_socket_dir) + 10)]
503 ssh_cmd += ["-S", socket_file]
504 ssh_cmd += [self.ssh_user_host]
505 return ssh_cmd
507 def cache_key(self) -> tuple:
508 """Returns tuple uniquely identifying this Remote for caching."""
509 return self.location, self.pool, self.ssh_user_host, self.ssh_port, self.ssh_config_file
511 def __repr__(self) -> str:
512 return str(self.__dict__)
515#############################################################################
516class CopyPropertiesConfig:
517 """--zfs-recv-o* and --zfs-recv-x* option groups for copying or excluding ZFS properties on receive."""
519 def __init__(self, group: str, flag: str, args: argparse.Namespace, p: Params) -> None:
520 """Reads from ArgumentParser via args."""
521 # immutable variables:
522 grup: str = group
523 self.group: str = group
524 self.flag: str = flag # one of -o or -x
525 sources: str = p.validate_arg_str(getattr(args, f"{grup}_sources"))
526 self.sources: str = ",".join(sorted([s.strip() for s in sources.strip().split(",")])) # canonicalize
527 self.targets: str = p.validate_arg_str(getattr(args, f"{grup}_targets"))
528 self.include_regexes: RegexList = compile_regexes(getattr(args, f"{grup}_include_regex"))
529 self.exclude_regexes: RegexList = compile_regexes(getattr(args, f"{grup}_exclude_regex"))
531 def __repr__(self) -> str:
532 return str(self.__dict__)
535#############################################################################
536class SnapshotLabel(NamedTuple):
537 """Contains the individual parts that are concatenated into a ZFS snapshot name."""
539 prefix: str # bzfs_
540 infix: str # us-west-1_
541 timestamp: str # 2024-11-06_08:30:05
542 suffix: str # _hourly
544 def __str__(self) -> str: # bzfs_us-west-1_2024-11-06_08:30:05_hourly
545 return f"{self.prefix}{self.infix}{self.timestamp}{self.suffix}"
547 def validate_label(self, input_text: str) -> None:
548 """Validates that the composed snapshot label forms a legal name."""
549 name: str = str(self)
550 validate_dataset_name(name, input_text)
551 if "/" in name:
552 die(f"Invalid ZFS snapshot name: '{name}' for: '{input_text}*'")
553 for key, value in {"prefix": self.prefix, "infix": self.infix, "suffix": self.suffix}.items():
554 if key == "prefix":
555 if not value.endswith("_"):
556 die(f"Invalid {input_text}{key}: Must end with an underscore character: '{value}'")
557 if value.count("_") > 1:
558 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'")
559 elif key == "infix":
560 if value:
561 if not value.endswith("_"):
562 die(f"Invalid {input_text}{key}: Must end with an underscore character: '{value}'")
563 if value.count("_") > 1:
564 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'")
565 elif value:
566 if not value.startswith("_"):
567 die(f"Invalid {input_text}{key}: Must start with an underscore character: '{value}'")
568 if value.count("_") > 1:
569 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'")
572#############################################################################
573class CreateSrcSnapshotConfig:
574 """Option values for --create-src-snapshots, that is, for automatically creating source snapshots."""
576 def __init__(self, args: argparse.Namespace, p: Params) -> None:
577 """Option values for --create-src-snapshots*; reads from ArgumentParser via args."""
578 # immutable variables:
579 self.skip_create_src_snapshots: bool = not args.create_src_snapshots
580 self.create_src_snapshots_even_if_not_due: bool = args.create_src_snapshots_even_if_not_due
581 tz_spec: str | None = args.create_src_snapshots_timezone if args.create_src_snapshots_timezone else None
582 self.tz: tzinfo | None = get_timezone(tz_spec)
583 self.current_datetime: datetime = current_datetime(tz_spec)
584 self.timeformat: str = args.create_src_snapshots_timeformat
585 self.anchors: PeriodAnchors = PeriodAnchors.parse(args)
587 # Compute the schedule for upcoming periodic time events (suffix_durations). This event schedule is also used in
588 # daemon mode via sleep_until_next_daemon_iteration()
589 suffixes: list[str] = []
590 labels: list[SnapshotLabel] = []
591 create_src_snapshots_plan: str = args.create_src_snapshots_plan or str({"bzfs": {"onsite": {"adhoc": 1}}})
592 for org, target_periods in ast.literal_eval(create_src_snapshots_plan).items():
593 for target, periods in target_periods.items():
594 for period_unit, period_amount in periods.items(): # e.g. period_unit can be "10minutely" or "minutely"
595 if not isinstance(period_amount, int) or period_amount < 0:
596 die(f"--create-src-snapshots-plan: Period amount must be a non-negative integer: {period_amount}")
597 if period_amount > 0:
598 suffix: str = nsuffix(period_unit)
599 suffixes.append(suffix)
600 labels.append(SnapshotLabel(prefix=nprefix(org), infix=ninfix(target), timestamp="", suffix=suffix))
601 xperiods: SnapshotPeriods = p.xperiods
602 if self.skip_create_src_snapshots:
603 duration_amount, duration_unit = p.xperiods.suffix_to_duration0(p.daemon_frequency)
604 if duration_amount <= 0 or not duration_unit:
605 die(f"Invalid --daemon-frequency: {p.daemon_frequency}")
606 suffixes = [nsuffix(p.daemon_frequency)]
607 labels = []
608 suffix_durations: dict[str, tuple[int, str]] = {suffix: xperiods.suffix_to_duration1(suffix) for suffix in suffixes}
610 def suffix_key(suffix: str) -> tuple[int, str]:
611 duration_amount, duration_unit = suffix_durations[suffix]
612 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0)
613 if suffix.endswith(("hourly", "minutely", "secondly")):
614 if duration_milliseconds != 0 and 86400 * 1000 % duration_milliseconds != 0:
615 die(
616 "Invalid --create-src-snapshots-plan: Period duration should be a divisor of 86400 seconds "
617 f"without remainder so that snapshots will be created at the same time of day every day: {suffix}"
618 )
619 if suffix.endswith("monthly"):
620 if duration_amount != 0 and 12 % duration_amount != 0:
621 die(
622 "Invalid --create-src-snapshots-plan: Period duration should be a divisor of 12 months "
623 f"without remainder so that snapshots will be created at the same time every year: {suffix}"
624 )
625 return duration_milliseconds, suffix
627 suffixes = sorted(suffixes, key=suffix_key, reverse=True) # take snapshots for dailies before hourlies, and so on
628 self.suffix_durations: dict[str, tuple[int, str]] = {suffix: suffix_durations[suffix] for suffix in suffixes} # sort
629 suffix_indexes: dict[str, int] = {suffix: k for k, suffix in enumerate(suffixes)}
630 labels.sort(key=lambda label: (suffix_indexes[label.suffix], label)) # take snapshots for dailies before hourlies
631 self._snapshot_labels: list[SnapshotLabel] = labels
632 for label in self.snapshot_labels():
633 label.validate_label("--create-src-snapshots-plan ")
635 def snapshot_labels(self) -> list[SnapshotLabel]:
636 """Returns the snapshot name patterns for which snapshots shall be created."""
637 timeformat: str = self.timeformat
638 is_millis: bool = timeformat.endswith("%F") # non-standard hack to append milliseconds
639 if is_millis:
640 timeformat = timeformat[0:-1] + "f" # replace %F with %f (append microseconds)
641 timestamp: str = self.current_datetime.strftime(timeformat)
642 if is_millis:
643 timestamp = timestamp[0 : -len("000")] # replace microseconds with milliseconds
644 timestamp = timestamp.replace("+", "z") # zfs CLI does not accept the '+' character in snapshot names
645 return [SnapshotLabel(label.prefix, label.infix, timestamp, label.suffix) for label in self._snapshot_labels]
647 def __repr__(self) -> str:
648 return str(self.__dict__)
651#############################################################################
652@dataclass(frozen=True)
653class AlertConfig:
654 """Thresholds controlling when alerts fire for snapshot age."""
656 kind: Literal["Latest", "Oldest"]
657 warning_millis: int
658 critical_millis: int
661#############################################################################
662@dataclass(frozen=True)
663class MonitorSnapshotAlert:
664 """Alert configuration for a single monitored snapshot label."""
666 label: SnapshotLabel
667 latest: AlertConfig | None
668 oldest: AlertConfig | None
671#############################################################################
672class MonitorSnapshotsConfig:
673 """Option values for --monitor-snapshots*, that is, policy describing which snapshots to monitor for staleness."""
675 def __init__(self, args: argparse.Namespace, p: Params) -> None:
676 """Reads from ArgumentParser via args."""
677 # immutable variables:
678 self.monitor_snapshots: dict = ast.literal_eval(args.monitor_snapshots)
679 self.dont_warn: bool = args.monitor_snapshots_dont_warn
680 self.dont_crit: bool = args.monitor_snapshots_dont_crit
681 self.no_latest_check: bool = args.monitor_snapshots_no_latest_check
682 self.no_oldest_check: bool = args.monitor_snapshots_no_oldest_check
683 alerts: list[MonitorSnapshotAlert] = []
684 xperiods: SnapshotPeriods = p.xperiods
685 for org, target_periods in self.monitor_snapshots.items():
686 prefix: str = nprefix(org)
687 for target, periods in target_periods.items():
688 for period_unit, alert_dicts in periods.items(): # e.g. period_unit can be "10minutely" or "minutely"
689 label = SnapshotLabel(prefix=prefix, infix=ninfix(target), timestamp="", suffix=nsuffix(period_unit))
690 alert_latest, alert_oldest = None, None
691 for alert_type, alert_dict in alert_dicts.items():
692 m = "--monitor-snapshots: "
693 if alert_type not in ["latest", "oldest"]:
694 die(f"{m}'{alert_type}' must be 'latest' or 'oldest' within {args.monitor_snapshots}")
695 warning_millis: int = 0
696 critical_millis: int = 0
697 cycles: int = 1
698 for kind, value in alert_dict.items():
699 context: str = args.monitor_snapshots
700 if kind == "warning":
701 warning_millis = max(0, parse_duration_to_milliseconds(str(value), context=context))
702 elif kind == "critical":
703 critical_millis = max(0, parse_duration_to_milliseconds(str(value), context=context))
704 elif kind == "cycles":
705 cycles = max(0, int(value))
706 else:
707 die(f"{m}'{kind}' must be 'warning', 'critical' or 'cycles' within {context}")
708 if warning_millis > 0 or critical_millis > 0:
709 duration_amount, duration_unit = xperiods.suffix_to_duration1(label.suffix)
710 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0)
711 warning_millis += 0 if warning_millis <= 0 else cycles * duration_milliseconds
712 critical_millis += 0 if critical_millis <= 0 else cycles * duration_milliseconds
713 warning_millis = UNIX_TIME_INFINITY_SECS if warning_millis <= 0 else warning_millis
714 critical_millis = UNIX_TIME_INFINITY_SECS if critical_millis <= 0 else critical_millis
715 capitalized_alert_type = cast(Literal["Latest", "Oldest"], sys.intern(alert_type.capitalize()))
716 alert_config = AlertConfig(capitalized_alert_type, warning_millis, critical_millis)
717 if alert_type == "latest":
718 if not self.no_latest_check:
719 alert_latest = alert_config
720 else:
721 assert alert_type == "oldest"
722 if not self.no_oldest_check:
723 alert_oldest = alert_config
724 if alert_latest is not None or alert_oldest is not None:
725 alerts.append(MonitorSnapshotAlert(label, alert_latest, alert_oldest))
727 def alert_sort_key(alert: MonitorSnapshotAlert) -> tuple[int, SnapshotLabel]:
728 duration_amount, duration_unit = xperiods.suffix_to_duration1(alert.label.suffix)
729 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0)
730 return duration_milliseconds, alert.label
732 alerts.sort(key=alert_sort_key, reverse=True) # check snapshots for dailies before hourlies, and so on
733 self.alerts: list[MonitorSnapshotAlert] = alerts
734 self.enable_monitor_snapshots: bool = len(alerts) > 0
736 def __repr__(self) -> str:
737 return str(self.__dict__)
740#############################################################################
741def _fix_send_recv_opts(
742 opts: list[str],
743 exclude_long_opts: set[str],
744 exclude_short_opts: str,
745 include_arg_opts: set[str],
746 exclude_arg_opts: frozenset[str] = frozenset(),
747 preserve_properties: frozenset[str] = frozenset(),
748) -> tuple[list[str], list[str]]:
749 """These opts are instead managed via bzfs CLI args --dryrun, etc."""
750 assert "-" not in exclude_short_opts
751 results: list[str] = []
752 x_names: set[str] = set(preserve_properties)
753 i = 0
754 n = len(opts)
755 while i < n:
756 opt: str = opts[i]
757 i += 1
758 if opt in exclude_arg_opts: # example: {"-X", "--exclude"}
759 i += 1
760 continue
761 elif opt in include_arg_opts: # example: {"-o", "-x"}
762 results.append(opt)
763 if i < n:
764 if opt == "-o" and "=" in opts[i] and opts[i].split("=", 1)[0] in preserve_properties:
765 die(f"--preserve-properties: Disallowed ZFS property found in --zfs-recv-program-opt(s): -o {opts[i]}")
766 if opt == "-x":
767 x_names.discard(opts[i])
768 results.append(opts[i])
769 i += 1
770 elif opt not in exclude_long_opts: # example: {"--dryrun", "--verbose"}
771 if opt.startswith("-") and opt != "-" and not opt.startswith("--"):
772 for char in exclude_short_opts: # example: "den"
773 opt = opt.replace(char, "")
774 if opt == "-":
775 continue
776 results.append(opt)
777 return results, sorted(x_names)
780SSH_MASTER_DOMAIN_SOCKET_FILE_PID_REGEX: re.Pattern[str] = re.compile(r"^[0-9]+") # see socket_name in local_ssh_command()
783def _delete_stale_files(
784 root_dir: str,
785 prefix: str,
786 millis: int = 60 * 60 * 1000,
787 dirs: bool = False,
788 exclude: str | None = None,
789 ssh: bool = False,
790) -> None:
791 """Cleans up obsolete files; For example caused by abnormal termination, OS crash."""
792 seconds: float = millis / 1000
793 now: float = time.time()
794 validate_is_not_a_symlink("", root_dir)
795 for entry in os.scandir(root_dir):
796 if entry.name == exclude or not entry.name.startswith(prefix):
797 continue
798 try:
799 if ((dirs and entry.is_dir()) or (not dirs and not entry.is_dir())) and now - entry.stat().st_mtime >= seconds:
800 if dirs:
801 shutil.rmtree(entry.path, ignore_errors=True)
802 elif not (ssh and stat.S_ISSOCK(entry.stat().st_mode)):
803 os.remove(entry.path)
804 elif match := SSH_MASTER_DOMAIN_SOCKET_FILE_PID_REGEX.match(entry.name[len(prefix) :]): 804 ↛ 795line 804 didn't jump to line 795 because the condition on line 804 was always true
805 pid: int = int(match.group(0))
806 if pid_exists(pid) is False or now - entry.stat().st_mtime >= 31 * 24 * 60 * 60:
807 os.remove(entry.path) # bzfs process is nomore alive hence its ssh master process isn't alive either
808 except FileNotFoundError:
809 pass # harmless
812def _create_symlink(src: str, dst_dir: str, dst: str) -> None:
813 """Creates dst symlink pointing to src using a relative path."""
814 rel_path: str = os.path.relpath(src, start=dst_dir)
815 os.symlink(src=rel_path, dst=os.path.join(dst_dir, dst))