Coverage for bzfs_main / configuration.py: 99%
645 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 12:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 12:49 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Configuration subsystem; All CLI option/parameter values are reachable from the "Params" class."""
17from __future__ import (
18 annotations,
19)
20import argparse
21import ast
22import os
23import platform
24import random
25import re
26import shutil
27import stat
28import sys
29import tempfile
30import threading
31import time
32from collections.abc import (
33 Iterable,
34)
35from dataclasses import (
36 dataclass,
37)
38from datetime import (
39 datetime,
40 tzinfo,
41)
42from logging import (
43 Logger,
44)
45from typing import (
46 Final,
47 Literal,
48 NamedTuple,
49 cast,
50 final,
51)
53from bzfs_main.argparse_actions import (
54 SnapshotFilter,
55 optimize_snapshot_filters,
56)
57from bzfs_main.argparse_cli import (
58 LOG_DIR_DEFAULT,
59 ZFS_RECV_GROUPS,
60 ZFS_RECV_O,
61 ZFS_RECV_O_INCLUDE_REGEX_DEFAULT,
62 __version__,
63)
64from bzfs_main.detect import (
65 DISABLE_PRG,
66)
67from bzfs_main.filter import (
68 SNAPSHOT_FILTERS_VAR,
69)
70from bzfs_main.period_anchors import (
71 PeriodAnchors,
72)
73from bzfs_main.util import (
74 utils,
75)
76from bzfs_main.util.connection import (
77 ConnectionPools,
78 MiniParams,
79 MiniRemote,
80)
81from bzfs_main.util.retry import (
82 RetryPolicy,
83)
84from bzfs_main.util.utils import (
85 DIR_PERMISSIONS,
86 FILE_PERMISSIONS,
87 PROG_NAME,
88 SHELL_CHARS,
89 UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH,
90 UNIX_TIME_INFINITY_SECS,
91 RegexList,
92 SnapshotPeriods,
93 SynchronizedBool,
94 append_if_absent,
95 compile_regexes,
96 current_datetime,
97 die,
98 get_home_directory,
99 get_timezone,
100 getenv_bool,
101 getenv_int,
102 is_included,
103 ninfix,
104 nprefix,
105 nsuffix,
106 open_nofollow,
107 parse_duration_to_milliseconds,
108 pid_exists,
109 sha256_hex,
110 sha256_urlsafe_base64,
111 urlsafe_base64,
112 validate_dataset_name,
113 validate_file_permissions,
114 validate_is_not_a_symlink,
115 validate_property_name,
116 xappend,
117)
119# constants:
120_UNSET_ENV_VARS_LOCK: Final[threading.Lock] = threading.Lock()
121_UNSET_ENV_VARS_LATCH: Final[SynchronizedBool] = SynchronizedBool(True)
124#############################################################################
125@final
126class LogParams:
127 """Option values for logging."""
129 def __init__(self, args: argparse.Namespace) -> None:
130 """Reads from ArgumentParser via args."""
131 # immutable variables:
132 if args.quiet:
133 log_level: str = "ERROR"
134 elif args.verbose >= 2:
135 log_level = "TRACE"
136 elif args.verbose >= 1:
137 log_level = "DEBUG"
138 else:
139 log_level = "INFO"
140 self.log_level: Final[str] = log_level
141 self.timestamp: Final[str] = datetime.now().isoformat(sep="_", timespec="seconds") # 2024-09-03_12:26:15
142 self.isatty: Final[bool] = getenv_bool("isatty", True)
143 self.quiet: Final[bool] = args.quiet
144 self.terminal_columns: Final[int] = (
145 getenv_int("terminal_columns", shutil.get_terminal_size(fallback=(120, 24)).columns)
146 if self.isatty and args.pv_program != DISABLE_PRG and not self.quiet
147 else 0
148 )
149 self.home_dir: Final[str] = get_home_directory()
150 log_parent_dir: Final[str] = args.log_dir if args.log_dir else os.path.join(self.home_dir, LOG_DIR_DEFAULT)
151 if LOG_DIR_DEFAULT not in os.path.basename(log_parent_dir):
152 die(f"Basename of --log-dir must contain the substring '{LOG_DIR_DEFAULT}', but got: {log_parent_dir}")
153 sep: str = "_" if args.log_subdir == "daily" else ":"
154 timestamp: str = self.timestamp
155 subdir: str = timestamp[: timestamp.rindex(sep) if args.log_subdir == "minutely" else timestamp.index(sep)]
156 # 2024-09-03 (d), 2024-09-03_12 (h), 2024-09-03_12:26 (m)
157 self.log_dir: Final[str] = os.path.join(log_parent_dir, subdir)
158 os.makedirs(log_parent_dir, mode=DIR_PERMISSIONS, exist_ok=True)
159 validate_is_not_a_symlink("--log-dir ", log_parent_dir)
160 validate_file_permissions(log_parent_dir, DIR_PERMISSIONS)
161 os.makedirs(self.log_dir, mode=DIR_PERMISSIONS, exist_ok=True)
162 validate_is_not_a_symlink("--log-dir subdir ", self.log_dir)
163 validate_file_permissions(self.log_dir, DIR_PERMISSIONS)
164 self.log_file_prefix: Final[str] = args.log_file_prefix
165 self.log_file_infix: Final[str] = args.log_file_infix
166 self.log_file_suffix: Final[str] = args.log_file_suffix
167 fd, self.log_file = tempfile.mkstemp(
168 suffix=".log",
169 prefix=f"{self.log_file_prefix}{self.timestamp}{self.log_file_infix}{self.log_file_suffix}-",
170 dir=self.log_dir,
171 )
172 os.fchmod(fd, FILE_PERMISSIONS)
173 os.close(fd)
174 self.pv_log_file: Final[str] = self.log_file[: -len(".log")] + ".pv"
175 log_file_stem: str = os.path.basename(self.log_file)[: -len(".log")]
176 # Python's standard logger naming API interprets chars such as '.', '-', ':', spaces, etc in special ways, e.g.
177 # logging.getLogger("foo.bar") vs logging.getLogger("foo-bar"). Thus, we sanitize the Python logger name via a regex:
178 self.logger_name_suffix: Final[str] = re.sub(r"[^A-Za-z0-9_]", repl="_", string=log_file_stem)
179 cache_root_dir: str = os.path.join(log_parent_dir, ".cache")
180 os.makedirs(cache_root_dir, mode=DIR_PERMISSIONS, exist_ok=True)
181 validate_file_permissions(cache_root_dir, DIR_PERMISSIONS)
182 self.last_modified_cache_dir: Final[str] = os.path.join(cache_root_dir, "mods")
184 # Create/update "current" symlink to current_dir, which is a subdir containing further symlinks to log files.
185 # For parallel usage, ensures there is no time window when the symlinks are inconsistent or do not exist.
186 current: str = "current"
187 dot_current_dir: str = os.path.join(log_parent_dir, f".{current}")
188 current_dir: str = os.path.join(dot_current_dir, log_file_stem)
189 os.makedirs(dot_current_dir, mode=DIR_PERMISSIONS, exist_ok=True)
190 validate_is_not_a_symlink("--log-dir: .current ", dot_current_dir)
191 try:
192 os.makedirs(current_dir, mode=DIR_PERMISSIONS, exist_ok=True)
193 _create_symlink(self.log_file, current_dir, f"{current}.log")
194 _create_symlink(self.pv_log_file, current_dir, f"{current}.pv")
195 _create_symlink(self.log_dir, current_dir, f"{current}.dir")
196 dst_file: str = os.path.join(current_dir, current)
197 os.symlink(os.path.relpath(current_dir, start=log_parent_dir), dst_file)
198 os.replace(dst_file, os.path.join(log_parent_dir, current)) # atomic rename
199 _delete_stale_files(dot_current_dir, prefix="", millis=5000, dirs=True, exclude=os.path.basename(current_dir))
200 except FileNotFoundError:
201 pass # harmless concurrent cleanup
203 def __repr__(self) -> str:
204 return str(self.__dict__)
207#############################################################################
208@final
209class Params(MiniParams):
210 """All parsed CLI options combined into a single bundle; simplifies passing around numerous settings and defaults."""
212 def __init__(
213 self,
214 args: argparse.Namespace,
215 sys_argv: list[str],
216 log_params: LogParams,
217 log: Logger,
218 inject_params: dict[str, bool] | None = None, # for testing only
219 ) -> None:
220 """Reads from ArgumentParser via args."""
221 # immutable variables:
222 assert args is not None
223 assert isinstance(sys_argv, list)
224 assert log_params is not None
225 assert log is not None
226 self.args: Final[argparse.Namespace] = args
227 self.sys_argv: Final[list[str]] = sys_argv
228 self.log_params: Final[LogParams] = log_params
229 self.log: Logger = log
230 self.inject_params: Final[dict[str, bool]] = inject_params if inject_params is not None else {} # for testing only
231 self.one_or_more_whitespace_regex: Final[re.Pattern[str]] = re.compile(r"\s+")
232 self.two_or_more_spaces_regex: Final[re.Pattern[str]] = re.compile(r" +")
233 self._unset_matching_env_vars(args)
234 self.xperiods: Final[SnapshotPeriods] = SnapshotPeriods()
236 assert len(args.root_dataset_pairs) > 0
237 self.root_dataset_pairs: Final[list[tuple[str, str]]] = args.root_dataset_pairs
238 self.recursive: Final[bool] = args.recursive
239 self.recursive_flag: Final[str] = "-r" if args.recursive else ""
241 self.dry_run: Final[bool] = args.dryrun is not None
242 self.dry_run_recv: Final[str] = "-n" if self.dry_run else ""
243 self.dry_run_destroy: Final[str] = self.dry_run_recv
244 self.dry_run_no_send: Final[bool] = args.dryrun == "send"
245 self.verbose_zfs: Final[bool] = args.verbose >= 2
246 self.verbose_destroy: Final[str] = "" if args.quiet else "-v"
248 self.zfs_send_program_opts: Final[list[str]] = self._fix_send_opts(self.split_args(args.zfs_send_program_opts))
249 zfs_recv_program_opts: list[str] = self.split_args(args.zfs_recv_program_opts)
250 for extra_opt in args.zfs_recv_program_opt:
251 zfs_recv_program_opts.append(self.validate_arg_str(extra_opt, allow_all=True))
252 preserve_properties = [validate_property_name(name, "--preserve-properties") for name in args.preserve_properties]
253 zfs_recv_program_opts, zfs_recv_x_names = self._fix_recv_opts(zfs_recv_program_opts, frozenset(preserve_properties))
254 self.zfs_recv_program_opts: Final[list[str]] = zfs_recv_program_opts
255 self.zfs_recv_x_names: Final[list[str]] = zfs_recv_x_names
256 if self.verbose_zfs:
257 append_if_absent(self.zfs_send_program_opts, "-v")
258 append_if_absent(self.zfs_recv_program_opts, "-v")
259 # zfs_full_recv_opts: dataset-specific dynamic -o/-x property options are computed later per dataset in
260 # replication._add_recv_property_options():
261 self.zfs_full_recv_opts: Final[list[str]] = self.zfs_recv_program_opts.copy()
262 cpconfigs = [CopyPropertiesConfig(group, flag, args, self) for group, flag in ZFS_RECV_GROUPS.items()]
263 self.zfs_recv_o_config, self.zfs_recv_x_config, self.zfs_set_config = cpconfigs
265 self.force_rollback_to_latest_snapshot: Final[bool] = args.force_rollback_to_latest_snapshot
266 self.force_rollback_to_latest_common_snapshot: Final[SynchronizedBool] = SynchronizedBool(
267 args.force_rollback_to_latest_common_snapshot
268 )
269 self.force: Final[SynchronizedBool] = SynchronizedBool(args.force)
270 self.force_once: Final[bool] = args.force_once
271 self.force_unmount: Final[str] = "-f" if args.force_unmount else ""
272 self.force_hard: Final[str] = "-R" if args.force_destroy_dependents else ""
274 self.skip_parent: Final[bool] = args.skip_parent
275 self.skip_missing_snapshots: Final[str] = args.skip_missing_snapshots
276 self.skip_on_error: Final[str] = args.skip_on_error
277 self.retry_policy: Final[RetryPolicy] = RetryPolicy.from_namespace(args).copy(reraise=True)
278 self.skip_replication: Final[bool] = args.skip_replication
279 self.delete_dst_snapshots: Final[bool] = args.delete_dst_snapshots is not None
280 self.delete_dst_bookmarks: Final[bool] = args.delete_dst_snapshots == "bookmarks"
281 self.delete_dst_snapshots_no_crosscheck: Final[bool] = args.delete_dst_snapshots_no_crosscheck
282 self.delete_dst_snapshots_except: Final[bool] = args.delete_dst_snapshots_except
283 self.delete_dst_datasets: Final[bool] = args.delete_dst_datasets
284 self.delete_empty_dst_datasets: Final[bool] = args.delete_empty_dst_datasets is not None
285 self.delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots: Final[bool] = (
286 args.delete_empty_dst_datasets == "snapshots+bookmarks"
287 )
288 self.compare_snapshot_lists: Final[str] = args.compare_snapshot_lists
289 self.daemon_lifetime_nanos: Final[int] = 1_000_000 * parse_duration_to_milliseconds(args.daemon_lifetime)
290 self.daemon_frequency: Final[str] = args.daemon_frequency
291 self.enable_privilege_elevation: Final[bool] = not args.no_privilege_elevation
292 self.no_stream: Final[bool] = args.no_stream
293 self.resume_recv: Final[bool] = not args.no_resume_recv
294 self.create_bookmarks: Final[str] = args.create_bookmarks
295 self.use_bookmark: Final[bool] = not args.no_use_bookmark
296 self.r2r_mode_requested: Final[str] = args.r2r
298 self.src: Final[Remote] = Remote("src", args, self) # src dataset, host and ssh options
299 self.dst: Final[Remote] = Remote("dst", args, self) # dst dataset, host and ssh options
300 self.create_src_snapshots_config: Final[CreateSrcSnapshotConfig] = CreateSrcSnapshotConfig(args, self)
301 self.monitor_snapshots_config: Final[MonitorSnapshotsConfig] = MonitorSnapshotsConfig(args, self)
302 self.is_caching_snapshots: Final[bool] = args.cache_snapshots
304 self.compression_program: Final[str] = self._program_name(args.compression_program)
305 self.compression_program_opts: Final[list[str]] = self.split_args(args.compression_program_opts)
306 for opt in {"-o", "--output-file"}.intersection(self.compression_program_opts):
307 die(f"--compression-program-opts: {opt} is disallowed for security reasons.")
308 self.getconf_program: Final[str] = self._program_name("getconf") # print number of CPUs on POSIX
309 self.mbuffer_program: Final[str] = self._program_name(args.mbuffer_program)
310 self.mbuffer_program_opts: Final[list[str]] = self.split_args(args.mbuffer_program_opts)
311 for opt in {"-i", "-I", "-o", "-O", "-l", "-L", "-t", "-T", "-a", "-A"}.intersection(self.mbuffer_program_opts):
312 die(f"--mbuffer-program-opts: {opt} is disallowed for security reasons.")
313 self.bwlimit: Final[str] = self.validate_arg_str(args.bwlimit) if args.bwlimit else ""
314 self.ps_program: Final[str] = self._program_name(args.ps_program)
315 self.pv_program: Final[str] = self._program_name(args.pv_program)
316 self.pv_program_opts: Final[list[str]] = self.split_args(args.pv_program_opts)
317 bad_pv_opts = {"-o", "--output", "-f", "--log-file", "-S", "--stop-at-size", "-Y", "--sync", "-X", "--discard",
318 "-U", "--store-and-forward", "-d", "--watchfd", "-R", "--remote", "-P", "--pidfile"} # fmt: skip
319 for opt in bad_pv_opts.intersection(self.pv_program_opts):
320 die(f"--pv-program-opts: {opt} is disallowed for security reasons.")
321 if self.bwlimit:
322 self.pv_program_opts.extend([f"--rate-limit={self.bwlimit}"])
323 self.shell_program_local: Final[str] = "sh"
324 self.shell_program: Final[str] = self._program_name(args.shell_program)
325 self.ssh_program: str = self._program_name(args.ssh_program)
326 self.sudo_program: Final[str] = self._program_name(args.sudo_program)
327 self.uname_program: Final[str] = self._program_name("uname")
328 self.zfs_program: Final[str] = self._program_name("zfs")
329 self.zpool_program: Final[str] = self._program_name(args.zpool_program)
331 # no point creating complex shell pipeline commands for tiny data transfers:
332 self.min_pipe_transfer_size: Final[int] = getenv_int("min_pipe_transfer_size", 1024 * 1024)
333 self.max_datasets_per_batch_on_list_snaps: Final[int] = getenv_int("max_datasets_per_batch_on_list_snaps", 1024)
334 self.max_datasets_per_minibatch_on_list_snaps: int = getenv_int("max_datasets_per_minibatch_on_list_snaps", -1)
335 self.max_snapshots_per_minibatch_on_delete_snaps = getenv_int("max_snapshots_per_minibatch_on_delete_snaps", 2**29)
336 self.dedicated_tcp_connection_per_zfs_send: Final[bool] = getenv_bool("dedicated_tcp_connection_per_zfs_send", True)
337 # threads: with --force-once we intentionally coerce to a single-threaded run to ensure deterministic serial behavior
338 self.threads: Final[tuple[int, bool]] = (1, False) if self.force_once else args.threads
339 timeout_duration_nanos = None if args.timeout is None else 1_000_000 * parse_duration_to_milliseconds(args.timeout)
340 self.timeout_duration_nanos: int | None = timeout_duration_nanos # duration (not a timestamp); for logging only
341 self.no_estimate_send_size: Final[bool] = args.no_estimate_send_size
342 self.remote_conf_cache_ttl_nanos: Final[int] = 1_000_000 * parse_duration_to_milliseconds(
343 args.daemon_remote_conf_cache_ttl
344 )
346 self.os_cpu_count: Final[int | None] = os.cpu_count()
347 self.os_getuid: Final[int] = os.getuid()
348 self.os_geteuid: Final[int] = os.geteuid()
349 self.prog_version: Final[str] = __version__
350 self.python_version: Final[str] = sys.version
351 self.platform_version: Final[str] = platform.version()
352 self.platform_platform: Final[str] = platform.platform()
354 # mutable variables:
355 snapshot_filters = args.snapshot_filters_var if hasattr(args, SNAPSHOT_FILTERS_VAR) else [[]]
356 self.snapshot_filters: list[list[SnapshotFilter]] = [optimize_snapshot_filters(f) for f in snapshot_filters]
357 self.exclude_dataset_property: str | None = args.exclude_dataset_property
358 self.exclude_dataset_regexes: RegexList = [] # deferred to validate_task() phase
359 self.include_dataset_regexes: RegexList = [] # deferred to validate_task() phase
360 self.tmp_exclude_dataset_regexes: RegexList = [] # deferred to validate_task() phase
361 self.tmp_include_dataset_regexes: RegexList = [] # deferred to validate_task() phase
362 self.abs_exclude_datasets: list[str] = [] # deferred to validate_task() phase
363 self.abs_include_datasets: list[str] = [] # deferred to validate_task() phase
364 self.r2r_mode: str = "off" # deferred to validate_task() phase
366 self.curr_zfs_send_program_opts: list[str] = []
367 self.zfs_recv_ox_names: set[str] = set()
368 self.available_programs: dict[str, dict[str, str]] = {}
369 self.zpool_features: dict[str, dict[str, dict[str, str]]] = {r.location: {} for r in [self.src, self.dst]}
370 self.connection_pools: dict[str, ConnectionPools] = {}
372 def split_args(self, text: str, *items: str | Iterable[str], allow_all: bool = False) -> list[str]:
373 """Splits option string on runs of one or more whitespace into an option list."""
374 text = text.strip()
375 opts = self.one_or_more_whitespace_regex.split(text) if text else []
376 xappend(opts, items)
377 if not allow_all:
378 self._validate_quoting(opts)
379 return opts
381 def validate_arg(self, opt: str, allow_spaces: bool = False, allow_all: bool = False) -> str | None:
382 """allow_all permits all characters, including whitespace and quotes; See squote() and dquote()."""
383 if allow_all or opt is None:
384 return opt
385 if any(char.isspace() and (char != " " or not allow_spaces) for char in opt):
386 die(f"Option must not contain a whitespace character{' other than space' if allow_spaces else ''}: {opt}")
387 self._validate_quoting([opt])
388 return opt
390 def validate_arg_str(self, opt: str, allow_spaces: bool = False, allow_all: bool = False) -> str:
391 """Returns validated option string, raising if missing or illegal."""
392 if opt is None:
393 die("Option must not be missing")
394 self.validate_arg(opt, allow_spaces=allow_spaces, allow_all=allow_all)
395 return opt
397 @staticmethod
398 def _validate_quoting(opts: list[str]) -> None:
399 """Raises an error if any option contains a quote or shell metacharacter."""
400 for opt in opts:
401 if "'" in opt or '"' in opt or "$" in opt or "`" in opt:
402 die(f"Option must not contain a single quote or double quote or dollar or backtick character: {opt}")
404 @staticmethod
405 def _fix_recv_opts(opts: list[str], preserve_properties: frozenset[str]) -> tuple[list[str], list[str]]:
406 """Returns sanitized ``zfs recv`` options and captured ``-o/-x`` args."""
407 return _fix_send_recv_opts(
408 opts,
409 exclude_long_opts={"--dryrun"},
410 exclude_short_opts="densFA",
411 include_arg_opts={"-o", "-x"},
412 preserve_properties=preserve_properties,
413 )
415 @staticmethod
416 def _fix_send_opts(opts: list[str]) -> list[str]:
417 """Returns sanitized ``zfs send`` options."""
418 return _fix_send_recv_opts(
419 opts,
420 exclude_long_opts={"--dryrun"},
421 exclude_short_opts="den",
422 include_arg_opts={"-X", "--exclude", "--redact"},
423 exclude_arg_opts=frozenset({"-i", "-I", "-t", "--resume"}),
424 )[0]
426 def _program_name(self, program: str) -> str:
427 """For testing: helps simulate errors caused by external programs."""
428 self.validate_arg_str(program)
429 if not program:
430 die(f"Program name must not be missing: {program}")
431 for char in SHELL_CHARS + ":":
432 if char in program:
433 die(f"Program name must not contain a '{char}' character: {program}")
434 if self.inject_params.get("inject_unavailable_" + program, False):
435 return program + "-xxx" # substitute a program that cannot be found on the PATH
436 if self.inject_params.get("inject_failing_" + program, False):
437 return "false" # substitute a program that will error out with non-zero return code
438 return program
440 def _unset_matching_env_vars(self, args: argparse.Namespace) -> None:
441 """Unset environment variables matching regex filters."""
442 if len(args.exclude_envvar_regex) == 0 and len(args.include_envvar_regex) == 0:
443 return # fast path
444 exclude_envvar_regexes: RegexList = compile_regexes(args.exclude_envvar_regex)
445 include_envvar_regexes: RegexList = compile_regexes(args.include_envvar_regex)
446 # Mutate global state at most once, atomically. First thread wins. The latch isn't strictly necessary for
447 # correctness as all concurrent bzfs.Job instances in bzfs_jobrunner have identical include/exclude_envvar_regex
448 # anyway. It's just for reduced latency.
449 with _UNSET_ENV_VARS_LOCK:
450 if _UNSET_ENV_VARS_LATCH.get_and_set(False):
451 for envvar_name in list(os.environ):
452 # order of include vs exclude is intentionally reversed to correctly implement semantics:
453 # "unset env var iff excluded and not included (include takes precedence)."
454 if is_included(envvar_name, exclude_envvar_regexes, include_envvar_regexes):
455 os.environ.pop(envvar_name, None)
456 self.log.debug("Unsetting b/c envvar regex: %s", envvar_name)
458 def lock_file_name(self) -> str:
459 """Returns unique path used to detect concurrently running jobs.
461 Makes it such that a job that runs periodically declines to start if the same previous periodic job is still running
462 without completion yet. Hashed key avoids overly long filenames while remaining deterministic.
463 """
464 # fmt: off
465 key = (tuple(self.root_dataset_pairs), self.args.recursive, self.args.exclude_dataset_property,
466 tuple(self.args.include_dataset), tuple(self.args.exclude_dataset),
467 tuple(self.args.include_dataset_regex), tuple(self.args.exclude_dataset_regex),
468 tuple(tuple(f) for f in self.snapshot_filters), self.args.skip_replication, self.args.create_src_snapshots,
469 self.args.create_src_snapshots_plan, self.args.create_src_snapshots_timeformat,
470 self.create_src_snapshots_config.anchors,
471 self.args.delete_dst_datasets, self.args.delete_dst_snapshots, self.args.delete_dst_snapshots_except,
472 self.args.delete_empty_dst_datasets,
473 self.args.compare_snapshot_lists, self.args.monitor_snapshots,
474 self.src.basis_ssh_host, self.dst.basis_ssh_host,
475 self.src.basis_ssh_user, self.dst.basis_ssh_user,
476 self.src.ssh_port, self.dst.ssh_port,
477 os.path.abspath(self.src.ssh_config_file) if self.src.ssh_config_file else "",
478 os.path.abspath(self.dst.ssh_config_file) if self.dst.ssh_config_file else "",
479 )
480 # fmt: on
481 hash_code: str = sha256_hex(str(key))
482 log_parent_dir: str = os.path.dirname(self.log_params.log_dir)
483 locks_dir: str = os.path.join(log_parent_dir, ".locks")
484 os.makedirs(locks_dir, mode=DIR_PERMISSIONS, exist_ok=True)
485 validate_is_not_a_symlink("--locks-dir ", locks_dir)
486 validate_file_permissions(locks_dir, DIR_PERMISSIONS)
487 return os.path.join(locks_dir, f"{PROG_NAME}-lockfile-{hash_code}.lock")
489 def dry(self, msg: str) -> str:
490 """Prefix ``msg`` with 'Dry' when running in dry-run mode."""
491 return utils.dry(msg, self.dry_run)
493 def is_program_available(self, program: str, location: str) -> bool:
494 """Return True if ``program`` was detected on ``location`` host."""
495 return program in self.available_programs.get(location, {})
498#############################################################################
499@final
500class Remote(MiniRemote):
501 """Connection settings for either source or destination host."""
503 def __init__(self, loc: str, args: argparse.Namespace, p: Params) -> None:
504 """Reads from ArgumentParser via args."""
505 # immutable variables:
506 assert loc == "src" or loc == "dst"
507 self.location: str = loc
508 self.params: Params = p
509 self.basis_ssh_user: Final[str] = getattr(args, f"ssh_{loc}_user")
510 self.basis_ssh_host: Final[str] = getattr(args, f"ssh_{loc}_host")
511 self.ssh_port: Final[int | None] = getattr(args, f"ssh_{loc}_port")
512 self.ssh_config_file: Final[str | None] = p.validate_arg(getattr(args, f"ssh_{loc}_config_file"))
513 if self.ssh_config_file and self.ssh_config_file != "none":
514 # `ssh -F none` will not read any config file per https://man7.org/linux/man-pages/man1/ssh.1.html
515 if "bzfs_ssh_config" not in os.path.basename(self.ssh_config_file):
516 die(f"Basename of --ssh-{loc}-config-file must contain substring 'bzfs_ssh_config': {self.ssh_config_file}")
517 with open_nofollow(self.ssh_config_file, "rb"):
518 pass # validate
519 self.ssh_config_file_hash: Final[str] = (
520 sha256_urlsafe_base64(os.path.abspath(self.ssh_config_file), padding=False) if self.ssh_config_file else ""
521 )
522 self.ssh_cipher: Final[str] = p.validate_arg_str(args.ssh_cipher)
523 # disable interactive password prompts and X11 forwarding and pseudo-terminal allocation:
524 ssh_extra_opts: list[str] = ["-oBatchMode=yes", "-oServerAliveInterval=0", "-x", "-T"] + (
525 ["-v"] if args.verbose >= 3 else []
526 )
527 self.ssh_extra_opts: tuple[str, ...] = tuple(ssh_extra_opts)
528 self.max_concurrent_ssh_sessions_per_tcp_connection: Final[int] = args.max_concurrent_ssh_sessions_per_tcp_connection
529 self.ssh_exit_on_shutdown: bool = args.ssh_exit_on_shutdown
530 self.ssh_control_persist_secs: int = args.ssh_control_persist_secs
531 self.ssh_control_persist_margin_secs: int = getenv_int("ssh_control_persist_margin_secs", 2)
532 self.socket_prefix: Final[str] = "s"
533 self.reuse_ssh_connection: bool = getenv_bool("reuse_ssh_connection", True)
534 self.ssh_socket_dir: str = ""
535 if self.reuse_ssh_connection:
536 ssh_home_dir: str = os.path.join(get_home_directory(), ".ssh")
537 os.makedirs(ssh_home_dir, mode=DIR_PERMISSIONS, exist_ok=True)
538 self.ssh_socket_dir = os.path.join(ssh_home_dir, "bzfs")
539 os.makedirs(self.ssh_socket_dir, mode=DIR_PERMISSIONS, exist_ok=True)
540 validate_file_permissions(self.ssh_socket_dir, mode=DIR_PERMISSIONS)
541 self.ssh_exit_on_shutdown_socket_dir: Final[str] = os.path.join(self.ssh_socket_dir, "x")
542 os.makedirs(self.ssh_exit_on_shutdown_socket_dir, mode=DIR_PERMISSIONS, exist_ok=True)
543 validate_file_permissions(self.ssh_exit_on_shutdown_socket_dir, mode=DIR_PERMISSIONS)
544 _delete_stale_files(self.ssh_exit_on_shutdown_socket_dir, prefix=self.socket_prefix, ssh=True)
545 self.sanitize1_regex: Final[re.Pattern[str]] = re.compile(r"[\s\\/@$]") # replace whitespace, /, $, \, @ with ~ char
546 self.sanitize2_regex: Final[re.Pattern[str]] = re.compile(rf"[^a-zA-Z0-9{re.escape('~.:_-')}]") # remove bad chars
548 # mutable variables:
549 self.root_dataset: str = "" # deferred until run_main()
550 self.basis_root_dataset: str = "" # deferred until run_main()
551 self.pool: str = ""
552 self.sudo: str = ""
553 self.use_zfs_delegation: bool = False
554 self.ssh_user: str = ""
555 self.ssh_host: str = ""
556 self.ssh_user_host: str = ""
557 self.is_nonlocal: bool = False
559 def local_ssh_command(self, socket_file: str | None) -> tuple[list[str], str | None]:
560 """Returns the ssh CLI command to run locally in order to talk to the remote host; This excludes the (trailing)
561 command to run on the remote host, which will be appended later; also returns the effective ControlPath used by the
562 ssh CLI command, or ``None`` when SSH multiplexing is not active."""
563 if not self.ssh_user_host:
564 return [], None # dataset is on local host - don't use ssh
566 # dataset is on remote host
567 p: Params = self.params
568 if p.ssh_program == DISABLE_PRG:
569 die("Cannot talk to remote host because ssh CLI is disabled.")
570 ssh_cmd: list[str] = [p.ssh_program] + list(self.ssh_extra_opts)
571 if self.ssh_config_file:
572 ssh_cmd += ["-F", self.ssh_config_file]
573 if self.ssh_cipher: 573 ↛ 575line 573 didn't jump to line 575 because the condition on line 573 was always true
574 ssh_cmd += ["-c", self.ssh_cipher]
575 if self.ssh_port:
576 ssh_cmd += ["-p", str(self.ssh_port)]
578 socket_path: str | None = None
579 if self.reuse_ssh_connection:
580 # Performance: reuse ssh connection for low latency startup of frequent ssh invocations via the 'ssh -S' and
581 # 'ssh -S -M -oControlPersist=60s' options. See https://en.wikibooks.org/wiki/OpenSSH/Cookbook/Multiplexing
582 if socket_file:
583 socket_path = socket_file
584 else:
585 # Generate unique private Unix domain socket file name in user's home dir and pass it to 'ssh -S /path/to/socket'
586 def sanitize(name: str) -> str:
587 name = self.sanitize1_regex.sub("~", name) # replace whitespace, /, $, \, @ with a ~ tilde char
588 name = self.sanitize2_regex.sub("", name) # Remove disallowed chars
589 return name
591 max_rand: int = 999_999_999_999
592 rand_str: str = urlsafe_base64(random.SystemRandom().randint(0, max_rand), max_value=max_rand, padding=False)
593 curr_time: str = urlsafe_base64(time.time_ns(), max_value=2**64 - 1, padding=False)
594 unique: str = f"{os.getpid()}@{curr_time}@{rand_str}"
595 optional: str = f"@{sanitize(self.ssh_host)[:45]}@{sanitize(self.ssh_user)}"
596 socket_name: str = f"{self.socket_prefix}{unique}{optional}"
597 socket_path = os.path.join(self.ssh_exit_on_shutdown_socket_dir, socket_name)
598 socket_path = socket_path[: max(UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH, len(socket_path) - len(optional))]
599 # `ssh` will error out later if the max OS Unix domain socket path limit cannot be met reasonably as the
600 # home directory path is too long, typically because the Unix user name is unreasonably long.
601 ssh_cmd += ["-S", socket_path]
602 ssh_cmd += [self.ssh_user_host]
603 return ssh_cmd, socket_path
605 def cache_key(self) -> tuple[str, str, int | None, str | None]:
606 """Returns tuple uniquely identifying this Remote for caching."""
607 return self.location, self.ssh_user_host, self.ssh_port, self.ssh_config_file
609 def cache_namespace(self) -> str:
610 """Returns cache namespace string which is a stable, unique directory component for caches that distinguishes
611 endpoints by username+host+port+ssh_config_file where applicable, and uses '-' when no user/host is present (local
612 mode)."""
613 if not self.ssh_user_host:
614 return "-" # local mode
615 return f"{self.ssh_user_host}#{self.ssh_port or ''}#{self.ssh_config_file_hash}"
617 def is_ssh_available(self) -> bool:
618 """Return True if the ssh client program required for this remote is available on the local host."""
619 return self.params.is_program_available("ssh", "local")
621 def __repr__(self) -> str:
622 return str(self.__dict__)
625#############################################################################
626@final
627class CopyPropertiesConfig:
628 """--zfs-recv-o* and --zfs-recv-x* option groups for copying or excluding ZFS properties on receive."""
630 def __init__(self, group: str, flag: str, args: argparse.Namespace, p: Params) -> None:
631 """Reads from ArgumentParser via args."""
632 assert group in ZFS_RECV_GROUPS
633 # immutable variables:
634 grup: str = group
635 self.group: Final[str] = group # one of zfs_recv_o, zfs_recv_x
636 self.flag: Final[str] = flag # one of -o or -x
637 sources: str = p.validate_arg_str(getattr(args, f"{grup}_sources"))
638 self.sources: Final[str] = ",".join(sorted([s.strip() for s in sources.strip().split(",")])) # canonicalize
639 self.targets: Final[str] = p.validate_arg_str(getattr(args, f"{grup}_targets"))
640 include_regexes: list[str] | None = getattr(args, f"{grup}_include_regex")
641 assert ZFS_RECV_O in ZFS_RECV_GROUPS
642 if include_regexes is None:
643 include_regexes = [ZFS_RECV_O_INCLUDE_REGEX_DEFAULT] if grup == ZFS_RECV_O else []
644 self.include_regexes: Final[RegexList] = compile_regexes(include_regexes)
645 self.exclude_regexes: Final[RegexList] = compile_regexes(getattr(args, f"{grup}_exclude_regex"))
647 def __repr__(self) -> str:
648 return str(self.__dict__)
651#############################################################################
652@final
653class SnapshotLabel(NamedTuple):
654 """Contains the individual parts that are concatenated into a ZFS snapshot name."""
656 prefix: str # bzfs_
657 infix: str # us-west_
658 timestamp: str # 2024-11-06_08:30:05
659 suffix: str # _hourly
661 def __str__(self) -> str: # bzfs_us-west_2024-11-06_08:30:05_hourly
662 return f"{self.prefix}{self.infix}{self.timestamp}{self.suffix}"
664 def notimestamp_str(self) -> str: # bzfs_us-west_hourly
665 """Returns the concatenation of all parts except for the timestamp part."""
666 return f"{self.prefix}{self.infix}{self.suffix}"
668 def validate_label(self, input_text: str) -> None:
669 """Validates that the composed snapshot label forms a legal name."""
670 name: str = str(self)
671 validate_dataset_name(name, input_text)
672 if "/" in name:
673 die(f"Invalid ZFS snapshot name: '{name}' for: '{input_text}'")
674 for key, value in {"prefix": self.prefix, "infix": self.infix, "suffix": self.suffix}.items():
675 if key == "prefix":
676 if not value.endswith("_"):
677 die(f"Invalid {input_text}{key}: Must end with an underscore character: '{value}'")
678 if value.count("_") > 1:
679 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'")
680 elif key == "infix":
681 if value:
682 if not value.endswith("_"):
683 die(f"Invalid {input_text}{key}: Must end with an underscore character: '{value}'")
684 if value.count("_") > 1:
685 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'")
686 elif value:
687 if not value.startswith("_"):
688 die(f"Invalid {input_text}{key}: Must start with an underscore character: '{value}'")
689 if value.count("_") > 1:
690 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'")
693#############################################################################
694@final
695class CreateSrcSnapshotConfig:
696 """Option values for --create-src-snapshots, that is, for automatically creating source snapshots."""
698 def __init__(self, args: argparse.Namespace, p: Params) -> None:
699 """Option values for --create-src-snapshots*; reads from ArgumentParser via args."""
700 # immutable variables:
701 self.skip_create_src_snapshots: Final[bool] = not args.create_src_snapshots
702 self.create_src_snapshots_even_if_not_due: Final[bool] = args.create_src_snapshots_even_if_not_due
703 tz_spec: str | None = args.create_src_snapshots_timezone if args.create_src_snapshots_timezone else None
704 self.tz: Final[tzinfo | None] = get_timezone(tz_spec)
705 self.current_datetime: datetime = current_datetime(tz_spec)
706 self.timeformat: Final[str] = args.create_src_snapshots_timeformat
707 self.anchors: Final[PeriodAnchors] = PeriodAnchors.parse(args)
709 # Compute the schedule for upcoming periodic time events (suffix_durations). This event schedule is also used in
710 # daemon mode via sleep_until_next_daemon_iteration()
711 labels: list[SnapshotLabel] = []
712 create_src_snapshots_plan: str = args.create_src_snapshots_plan or str({"bzfs": {"onsite": {"adhoc": 1}}})
713 for org, target_periods in ast.literal_eval(create_src_snapshots_plan).items():
714 for target, periods in target_periods.items():
715 for period_unit, period_amount in periods.items(): # e.g. period_unit can be "10minutely" or "minutely"
716 if not isinstance(period_amount, int) or period_amount < 0:
717 die(f"--create-src-snapshots-plan: Period amount must be a non-negative integer: {period_amount}")
718 if period_amount > 0:
719 suffix: str = nsuffix(period_unit)
720 labels.append(SnapshotLabel(prefix=nprefix(org), infix=ninfix(target), timestamp="", suffix=suffix))
721 suffixes: list[str] = list({label.suffix for label in labels}) # dedupe
722 xperiods: SnapshotPeriods = p.xperiods
723 if self.skip_create_src_snapshots:
724 duration_amount, duration_unit = p.xperiods.suffix_to_duration0(p.daemon_frequency)
725 if duration_amount <= 0 or not duration_unit:
726 die(f"Invalid --daemon-frequency: {p.daemon_frequency}")
727 suffixes = [nsuffix(p.daemon_frequency)]
728 labels = []
729 suffix_durations: dict[str, tuple[int, str]] = {suffix: xperiods.suffix_to_duration1(suffix) for suffix in suffixes}
731 def suffix_key(suffix: str) -> tuple[int, str]:
732 duration_amount, duration_unit = suffix_durations[suffix]
733 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0)
734 if suffix.endswith(("hourly", "minutely", "secondly")):
735 if duration_milliseconds != 0 and 86400 * 1000 % duration_milliseconds != 0:
736 die(
737 "Invalid --create-src-snapshots-plan: Period duration should be a divisor of 86400 seconds "
738 f"without remainder so that snapshots will be created at the same time of day every day: {suffix}"
739 )
740 if suffix.endswith("monthly"):
741 if duration_amount != 0 and 12 % duration_amount != 0:
742 die(
743 "Invalid --create-src-snapshots-plan: Period duration should be a divisor of 12 months "
744 f"without remainder so that snapshots will be created at the same time every year: {suffix}"
745 )
746 return duration_milliseconds, suffix
748 suffixes.sort(key=suffix_key, reverse=True) # take snapshots for dailies before hourlies, and so on
749 self.suffix_durations: Final[dict[str, tuple[int, str]]] = {sfx: suffix_durations[sfx] for sfx in suffixes} # sort
750 suffix_indexes: dict[str, int] = {suffix: k for k, suffix in enumerate(suffixes)}
751 labels.sort(key=lambda label: (suffix_indexes[label.suffix], label)) # take snapshots for dailies before hourlies
752 self._snapshot_labels: Final[list[SnapshotLabel]] = labels
753 for label in self.snapshot_labels():
754 label.validate_label("--create-src-snapshots-plan ")
756 def snapshot_labels(self) -> list[SnapshotLabel]:
757 """Returns the snapshot name patterns for which snapshots shall be created."""
758 timeformat: str = self.timeformat
759 is_millis: bool = timeformat.endswith("%F") # non-standard hack to append milliseconds
760 if is_millis:
761 timeformat = timeformat[0:-1] + "f" # replace %F with %f (append microseconds)
762 timestamp: str = self.current_datetime.strftime(timeformat)
763 if is_millis:
764 timestamp = timestamp[: -len("000")] # replace microseconds with milliseconds
765 timestamp = timestamp.replace("+", "z") # zfs CLI does not accept the '+' character in snapshot names
766 return [SnapshotLabel(label.prefix, label.infix, timestamp, label.suffix) for label in self._snapshot_labels]
768 def __repr__(self) -> str:
769 return str(self.__dict__)
772#############################################################################
773@dataclass(frozen=True)
774@final
775class AlertConfig:
776 """Thresholds controlling when alerts fire for snapshot age."""
778 kind: Literal["Latest", "Oldest"]
779 warning_millis: int
780 critical_millis: int
783#############################################################################
784@dataclass(frozen=True)
785@final
786class MonitorSnapshotAlert:
787 """Alert configuration for a single monitored snapshot label."""
789 label: SnapshotLabel
790 latest: AlertConfig | None
791 oldest: AlertConfig | None
792 oldest_skip_holds: bool
795#############################################################################
796@final
797class MonitorSnapshotsConfig:
798 """Option values for --monitor-snapshots*, that is, policy describing which snapshots to monitor for staleness."""
800 def __init__(self, args: argparse.Namespace, p: Params) -> None:
801 """Reads from ArgumentParser via args."""
802 # immutable variables:
803 self.monitor_snapshots: Final[dict] = ast.literal_eval(args.monitor_snapshots)
804 self.dont_warn: Final[bool] = args.monitor_snapshots_dont_warn
805 self.dont_crit: Final[bool] = args.monitor_snapshots_dont_crit
806 self.no_latest_check: Final[bool] = args.monitor_snapshots_no_latest_check
807 self.no_oldest_check: Final[bool] = args.monitor_snapshots_no_oldest_check
808 alerts: list[MonitorSnapshotAlert] = []
809 xperiods: SnapshotPeriods = p.xperiods
810 for org, target_periods in self.monitor_snapshots.items():
811 prefix: str = nprefix(org)
812 for target, periods in target_periods.items():
813 for period_unit, alert_dicts in periods.items(): # e.g. period_unit can be "10minutely" or "minutely"
814 label = SnapshotLabel(prefix=prefix, infix=ninfix(target), timestamp="", suffix=nsuffix(period_unit))
815 alert_latest, alert_oldest = None, None
816 oldest_skip_holds: bool = False
817 for alert_type, alert_dict in alert_dicts.items():
818 m = "--monitor-snapshots: "
819 if alert_type not in ["latest", "oldest"]:
820 die(f"{m}'{alert_type}' must be 'latest' or 'oldest' within {args.monitor_snapshots}")
821 warning_millis: int = 0
822 critical_millis: int = 0
823 cycles: int = 1
824 for kind, value in alert_dict.items():
825 context: str = args.monitor_snapshots
826 if kind == "warning":
827 warning_millis = max(0, parse_duration_to_milliseconds(str(value), context=context))
828 elif kind == "critical":
829 critical_millis = max(0, parse_duration_to_milliseconds(str(value), context=context))
830 elif kind == "cycles":
831 cycles = max(0, int(value))
832 elif kind == "oldest_skip_holds" and alert_type == "oldest":
833 if not isinstance(value, bool):
834 die(f"{m}'{kind}' must be a bool within {context}")
835 oldest_skip_holds = value
836 else:
837 die(f"{m}'{kind}' must be 'warning', 'critical' or 'cycles' within {context}")
838 if warning_millis > 0 or critical_millis > 0:
839 duration_amount, duration_unit = xperiods.suffix_to_duration1(label.suffix)
840 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0)
841 warning_millis += 0 if warning_millis <= 0 else cycles * duration_milliseconds
842 critical_millis += 0 if critical_millis <= 0 else cycles * duration_milliseconds
843 warning_millis = UNIX_TIME_INFINITY_SECS if warning_millis <= 0 else warning_millis
844 critical_millis = UNIX_TIME_INFINITY_SECS if critical_millis <= 0 else critical_millis
845 capitalized_alert_type = cast(Literal["Latest", "Oldest"], sys.intern(alert_type.capitalize()))
846 alert_config = AlertConfig(capitalized_alert_type, warning_millis, critical_millis)
847 if alert_type == "latest":
848 if not self.no_latest_check:
849 alert_latest = alert_config
850 else:
851 assert alert_type == "oldest"
852 if not self.no_oldest_check:
853 alert_oldest = alert_config
854 if alert_latest is not None or alert_oldest is not None:
855 alerts.append(MonitorSnapshotAlert(label, alert_latest, alert_oldest, oldest_skip_holds))
857 def alert_sort_key(alert: MonitorSnapshotAlert) -> tuple[int, SnapshotLabel]:
858 duration_amount, duration_unit = xperiods.suffix_to_duration1(alert.label.suffix)
859 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0)
860 return duration_milliseconds, alert.label
862 alerts.sort(key=alert_sort_key, reverse=True) # check snapshots for dailies before hourlies, and so on
863 self.alerts: Final[list[MonitorSnapshotAlert]] = alerts
864 self.enable_monitor_snapshots: Final[bool] = len(alerts) > 0
866 def __repr__(self) -> str:
867 return str(self.__dict__)
870#############################################################################
871def _fix_send_recv_opts(
872 opts: list[str],
873 *,
874 exclude_long_opts: set[str],
875 exclude_short_opts: str,
876 include_arg_opts: set[str],
877 exclude_arg_opts: frozenset[str] = frozenset(),
878 preserve_properties: frozenset[str] = frozenset(),
879) -> tuple[list[str], list[str]]:
880 """These opts are instead managed via bzfs CLI args --dryrun, etc."""
881 assert "-" not in exclude_short_opts
882 results: list[str] = []
883 x_names: set[str] = set(preserve_properties)
884 i = 0
885 n = len(opts)
886 while i < n:
887 opt: str = opts[i]
888 i += 1
889 if opt in exclude_arg_opts: # example: {"-X", "--exclude"}
890 i += 1
891 continue
892 elif opt in include_arg_opts: # example: {"-o", "-x"}
893 results.append(opt)
894 if i < n:
895 if opt == "-o" and "=" in opts[i] and opts[i].split("=", 1)[0] in preserve_properties:
896 die(f"--preserve-properties: Disallowed ZFS property found in --zfs-recv-program-opt(s): -o {opts[i]}")
897 if opt == "-x":
898 x_names.discard(opts[i])
899 results.append(opts[i])
900 i += 1
901 elif opt not in exclude_long_opts: # example: {"--dryrun", "--verbose"}
902 if opt.startswith("-") and opt != "-" and not opt.startswith("--"):
903 for char in exclude_short_opts: # example: "den"
904 opt = opt.replace(char, "")
905 if opt == "-":
906 continue
907 results.append(opt)
908 return results, sorted(x_names)
911_SSH_MASTER_DOMAIN_SOCKET_FILE_PID_REGEX: Final[re.Pattern[str]] = re.compile(r"^[0-9]+") # see local_ssh_command()
914def _delete_stale_files(
915 root_dir: str,
916 *,
917 prefix: str,
918 millis: int = 60 * 60 * 1000,
919 dirs: bool = False,
920 exclude: str | None = None,
921 ssh: bool = False,
922) -> None:
923 """Cleans up obsolete files; For example caused by abnormal termination, OS crash."""
924 seconds: float = millis / 1000
925 now: float = time.time()
926 validate_is_not_a_symlink("", root_dir)
927 with os.scandir(root_dir) as iterator:
928 for entry in iterator:
929 if entry.name == exclude or not entry.name.startswith(prefix):
930 continue
931 try:
932 stats = entry.stat(follow_symlinks=False)
933 is_dir = entry.is_dir(follow_symlinks=False)
934 if ((dirs and is_dir) or (not dirs and not is_dir)) and now - stats.st_mtime >= seconds:
935 if dirs:
936 shutil.rmtree(entry.path, ignore_errors=True)
937 elif not (ssh and stat.S_ISSOCK(stats.st_mode)):
938 os.remove(entry.path)
939 elif match := _SSH_MASTER_DOMAIN_SOCKET_FILE_PID_REGEX.match(entry.name[len(prefix) :]): 939 ↛ 928line 939 didn't jump to line 928 because the condition on line 939 was always true
940 pid: int = int(match.group(0))
941 if pid_exists(pid) is False or now - stats.st_mtime >= 31 * 24 * 60 * 60:
942 os.remove(entry.path) # bzfs process is no longer alive; its ssh master process isn't either
943 except FileNotFoundError:
944 pass # harmless
947def _create_symlink(src: str, dst_dir: str, dst: str) -> None:
948 """Creates dst symlink pointing to src using a relative path."""
949 rel_path: str = os.path.relpath(src, start=dst_dir)
950 os.symlink(src=rel_path, dst=os.path.join(dst_dir, dst))
953def resolve_r2r_mode(p: Params) -> str:
954 """Returns the effective r2r mode for the current task and emits fallback warnings."""
955 src, dst = p.src, p.dst
956 log = p.log
957 mode: str = p.r2r_mode_requested
958 assert mode in ("off", "pull", "push"), mode
960 if p.skip_replication:
961 return "off"
963 if mode == "off":
964 return "off"
966 if (not src.ssh_user_host) and (not dst.ssh_user_host):
967 return "off" # we'll do local replication (there's no need for r2r)
969 if (not src.is_nonlocal) or (not dst.is_nonlocal):
970 return "off" # at least one of them is local to this host (there's no need for r2r)
972 r: Remote = dst if mode == "pull" else src
973 if not p.is_program_available("sh", r.location):
974 log.warning(
975 f"--r2r={mode} requires sh on {r.location} host: {r.ssh_user_host or 'localhost'} for remote-to-remote "
976 "replication; falling back to --r2r=off."
977 )
978 return "off"
980 if is_same_remote(src, dst): # are user@host, port, ssh_config file the same?
981 return mode # perf: we'll do local replication on that host
983 if not p.is_program_available("ssh", r.location):
984 log.warning(
985 f"--r2r={mode} requires ssh on {r.location} host: {r.ssh_user_host or 'localhost'} "
986 "for remote-to-remote replication; falling back to --r2r=off."
987 )
988 return "off"
990 if mode == "pull" and src.ssh_config_file and src.ssh_config_file != "none":
991 log.warning(
992 "--r2r=pull cannot use --ssh-src-config-file for remote-to-remote ssh; cowardly falling back to --r2r=off."
993 )
994 return "off"
996 if mode == "push" and dst.ssh_config_file and dst.ssh_config_file != "none":
997 log.warning(
998 "--r2r=push cannot use --ssh-dst-config-file for remote-to-remote ssh; cowardly falling back to --r2r=off."
999 )
1000 return "off"
1002 return mode
1005def is_same_remote(src: Remote, dst: Remote) -> bool:
1006 """Returns whether this remote is the same as the other remote."""
1007 if src.ssh_user_host == dst.ssh_user_host and not src.ssh_user_host:
1008 return True # both are local
1009 return (
1010 src.ssh_user_host == dst.ssh_user_host
1011 and src.ssh_port == dst.ssh_port
1012 and src.ssh_config_file == dst.ssh_config_file
1013 )