Coverage for bzfs_main/configuration.py: 99%
587 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Configuration subsystem; All CLI option/parameter values are reachable from the "Params" class."""
17from __future__ import (
18 annotations,
19)
20import argparse
21import ast
22import os
23import platform
24import random
25import re
26import shutil
27import stat
28import sys
29import tempfile
30import threading
31import time
32from collections.abc import (
33 Iterable,
34)
35from dataclasses import (
36 dataclass,
37)
38from datetime import (
39 datetime,
40 tzinfo,
41)
42from logging import (
43 Logger,
44)
45from typing import (
46 TYPE_CHECKING,
47 Final,
48 Literal,
49 NamedTuple,
50 cast,
51)
53import bzfs_main.utils
54from bzfs_main.argparse_actions import (
55 SnapshotFilter,
56 optimize_snapshot_filters,
57)
58from bzfs_main.argparse_cli import (
59 LOG_DIR_DEFAULT,
60 ZFS_RECV_GROUPS,
61 ZFS_RECV_O,
62 ZFS_RECV_O_INCLUDE_REGEX_DEFAULT,
63 __version__,
64)
65from bzfs_main.detect import (
66 DISABLE_PRG,
67)
68from bzfs_main.period_anchors import (
69 PeriodAnchors,
70)
71from bzfs_main.retry import (
72 RetryPolicy,
73)
74from bzfs_main.utils import (
75 DIR_PERMISSIONS,
76 FILE_PERMISSIONS,
77 PROG_NAME,
78 SHELL_CHARS,
79 SNAPSHOT_FILTERS_VAR,
80 UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH,
81 UNIX_TIME_INFINITY_SECS,
82 RegexList,
83 SnapshotPeriods,
84 SynchronizedBool,
85 append_if_absent,
86 compile_regexes,
87 current_datetime,
88 die,
89 get_home_directory,
90 get_timezone,
91 getenv_bool,
92 getenv_int,
93 is_included,
94 ninfix,
95 nprefix,
96 nsuffix,
97 parse_duration_to_milliseconds,
98 pid_exists,
99 sha256_hex,
100 sha256_urlsafe_base64,
101 urlsafe_base64,
102 validate_dataset_name,
103 validate_file_permissions,
104 validate_is_not_a_symlink,
105 validate_property_name,
106 xappend,
107)
109if TYPE_CHECKING: # pragma: no cover - for type hints only
110 from bzfs_main.connection import (
111 ConnectionPools,
112 )
114# constants:
115HOME_DIRECTORY: Final[str] = get_home_directory()
116_UNSET_ENV_VARS_LOCK: Final[threading.Lock] = threading.Lock()
117_UNSET_ENV_VARS_LATCH: Final[SynchronizedBool] = SynchronizedBool(True)
120#############################################################################
121class LogParams:
122 """Option values for logging."""
124 def __init__(self, args: argparse.Namespace) -> None:
125 """Reads from ArgumentParser via args."""
126 # immutable variables:
127 if args.quiet:
128 log_level: str = "ERROR"
129 elif args.verbose >= 2:
130 log_level = "TRACE"
131 elif args.verbose >= 1:
132 log_level = "DEBUG"
133 else:
134 log_level = "INFO"
135 self.log_level: Final[str] = log_level
136 self.timestamp: Final[str] = datetime.now().isoformat(sep="_", timespec="seconds") # 2024-09-03_12:26:15
137 self.home_dir: Final[str] = HOME_DIRECTORY
138 log_parent_dir: Final[str] = args.log_dir if args.log_dir else os.path.join(self.home_dir, LOG_DIR_DEFAULT)
139 if LOG_DIR_DEFAULT not in os.path.basename(log_parent_dir):
140 die(f"Basename of --log-dir must contain the substring '{LOG_DIR_DEFAULT}', but got: {log_parent_dir}")
141 sep: str = "_" if args.log_subdir == "daily" else ":"
142 timestamp: str = self.timestamp
143 subdir: str = timestamp[0 : timestamp.rindex(sep) if args.log_subdir == "minutely" else timestamp.index(sep)]
144 # 2024-09-03 (d), 2024-09-03_12 (h), 2024-09-03_12:26 (m)
145 self.log_dir: Final[str] = os.path.join(log_parent_dir, subdir)
146 os.makedirs(log_parent_dir, mode=DIR_PERMISSIONS, exist_ok=True)
147 validate_is_not_a_symlink("--log-dir ", log_parent_dir)
148 validate_file_permissions(log_parent_dir, DIR_PERMISSIONS)
149 os.makedirs(self.log_dir, mode=DIR_PERMISSIONS, exist_ok=True)
150 validate_is_not_a_symlink("--log-dir subdir ", self.log_dir)
151 validate_file_permissions(self.log_dir, DIR_PERMISSIONS)
152 self.log_file_prefix: Final[str] = args.log_file_prefix
153 self.log_file_infix: Final[str] = args.log_file_infix
154 self.log_file_suffix: Final[str] = args.log_file_suffix
155 fd, self.log_file = tempfile.mkstemp(
156 suffix=".log",
157 prefix=f"{self.log_file_prefix}{self.timestamp}{self.log_file_infix}{self.log_file_suffix}-",
158 dir=self.log_dir,
159 )
160 os.fchmod(fd, FILE_PERMISSIONS)
161 os.close(fd)
162 self.pv_log_file: str = self.log_file[0 : -len(".log")] + ".pv"
163 log_file_stem: str = os.path.basename(self.log_file)[0 : -len(".log")]
164 # Python's standard logger naming API interprets chars such as '.', '-', ':', spaces, etc in special ways, e.g.
165 # logging.getLogger("foo.bar") vs logging.getLogger("foo-bar"). Thus, we sanitize the Python logger name via a regex:
166 self.logger_name_suffix: Final[str] = re.sub(r"[^A-Za-z0-9_]", repl="_", string=log_file_stem)
167 cache_root_dir: str = os.path.join(log_parent_dir, ".cache")
168 os.makedirs(cache_root_dir, mode=DIR_PERMISSIONS, exist_ok=True)
169 validate_file_permissions(cache_root_dir, DIR_PERMISSIONS)
170 self.last_modified_cache_dir: Final[str] = os.path.join(cache_root_dir, "mods")
172 # Create/update "current" symlink to current_dir, which is a subdir containing further symlinks to log files.
173 # For parallel usage, ensures there is no time window when the symlinks are inconsistent or do not exist.
174 current: str = "current"
175 dot_current_dir: str = os.path.join(log_parent_dir, f".{current}")
176 current_dir: str = os.path.join(dot_current_dir, log_file_stem)
177 os.makedirs(dot_current_dir, mode=DIR_PERMISSIONS, exist_ok=True)
178 validate_is_not_a_symlink("--log-dir: .current ", dot_current_dir)
179 try:
180 os.makedirs(current_dir, mode=DIR_PERMISSIONS, exist_ok=True)
181 _create_symlink(self.log_file, current_dir, f"{current}.log")
182 _create_symlink(self.pv_log_file, current_dir, f"{current}.pv")
183 _create_symlink(self.log_dir, current_dir, f"{current}.dir")
184 dst_file: str = os.path.join(current_dir, current)
185 os.symlink(os.path.relpath(current_dir, start=log_parent_dir), dst_file)
186 os.replace(dst_file, os.path.join(log_parent_dir, current)) # atomic rename
187 _delete_stale_files(dot_current_dir, prefix="", millis=5000, dirs=True, exclude=os.path.basename(current_dir))
188 except FileNotFoundError:
189 pass # harmless concurrent cleanup
190 self.params: Params | None = None
192 def __repr__(self) -> str:
193 return str(self.__dict__)
196#############################################################################
197class Params:
198 """All parsed CLI options combined into a single bundle; simplifies passing around numerous settings and defaults."""
200 def __init__(
201 self,
202 args: argparse.Namespace,
203 sys_argv: list[str],
204 log_params: LogParams,
205 log: Logger,
206 inject_params: dict[str, bool] | None = None,
207 ) -> None:
208 """Reads from ArgumentParser via args."""
209 # immutable variables:
210 assert args is not None
211 assert isinstance(sys_argv, list)
212 assert log_params is not None
213 assert log is not None
214 self.args: Final[argparse.Namespace] = args
215 self.sys_argv: Final[list[str]] = sys_argv
216 self.log_params: Final[LogParams] = log_params
217 self.log: Final[Logger] = log
218 self.inject_params: Final[dict[str, bool]] = inject_params if inject_params is not None else {} # for testing only
219 self.one_or_more_whitespace_regex: Final[re.Pattern[str]] = re.compile(r"\s+")
220 self.two_or_more_spaces_regex: Final[re.Pattern[str]] = re.compile(r" +")
221 self._unset_matching_env_vars(args)
222 self.xperiods: Final[SnapshotPeriods] = SnapshotPeriods()
224 assert len(args.root_dataset_pairs) > 0
225 self.root_dataset_pairs: Final[list[tuple[str, str]]] = args.root_dataset_pairs
226 self.recursive: Final[bool] = args.recursive
227 self.recursive_flag: Final[str] = "-r" if args.recursive else ""
229 self.dry_run: Final[bool] = args.dryrun is not None
230 self.dry_run_recv: Final[str] = "-n" if self.dry_run else ""
231 self.dry_run_destroy: Final[str] = self.dry_run_recv
232 self.dry_run_no_send: Final[bool] = args.dryrun == "send"
233 self.verbose_zfs: Final[bool] = args.verbose >= 2
234 self.verbose_destroy: Final[str] = "" if args.quiet else "-v"
235 self.quiet: Final[bool] = args.quiet
237 self.zfs_send_program_opts: list[str] = self._fix_send_opts(self.split_args(args.zfs_send_program_opts))
238 zfs_recv_program_opts: list[str] = self.split_args(args.zfs_recv_program_opts)
239 for extra_opt in args.zfs_recv_program_opt:
240 zfs_recv_program_opts.append(self.validate_arg_str(extra_opt, allow_all=True))
241 preserve_properties = [validate_property_name(name, "--preserve-properties") for name in args.preserve_properties]
242 zfs_recv_program_opts, zfs_recv_x_names = self._fix_recv_opts(zfs_recv_program_opts, frozenset(preserve_properties))
243 self.zfs_recv_program_opts: Final[list[str]] = zfs_recv_program_opts
244 self.zfs_recv_x_names: list[str] = zfs_recv_x_names
245 if self.verbose_zfs:
246 append_if_absent(self.zfs_send_program_opts, "-v")
247 append_if_absent(self.zfs_recv_program_opts, "-v")
248 # zfs_full_recv_opts: dataset-specific dynamic -o/-x property options are computed later per dataset in
249 # replication._add_recv_property_options():
250 self.zfs_full_recv_opts: Final[list[str]] = self.zfs_recv_program_opts.copy()
251 cpconfigs = [CopyPropertiesConfig(group, flag, args, self) for group, flag in ZFS_RECV_GROUPS.items()]
252 self.zfs_recv_o_config, self.zfs_recv_x_config, self.zfs_set_config = cpconfigs
254 self.force_rollback_to_latest_snapshot: Final[bool] = args.force_rollback_to_latest_snapshot
255 self.force_rollback_to_latest_common_snapshot = SynchronizedBool(args.force_rollback_to_latest_common_snapshot)
256 self.force: Final[SynchronizedBool] = SynchronizedBool(args.force)
257 self.force_once: Final[bool] = args.force_once
258 self.force_unmount: Final[str] = "-f" if args.force_unmount else ""
259 force_hard: str = "-R" if args.force_destroy_dependents else ""
260 self.force_hard: Final[str] = "-R" if args.force_hard else force_hard # --force-hard is deprecated
262 self.skip_parent: bool = args.skip_parent
263 self.skip_missing_snapshots: Final[str] = args.skip_missing_snapshots
264 self.skip_on_error: Final[str] = args.skip_on_error
265 self.retry_policy: Final[RetryPolicy] = RetryPolicy(args)
266 self.skip_replication: Final[bool] = args.skip_replication
267 self.delete_dst_snapshots: Final[bool] = args.delete_dst_snapshots is not None
268 self.delete_dst_bookmarks: Final[bool] = args.delete_dst_snapshots == "bookmarks"
269 self.delete_dst_snapshots_no_crosscheck: Final[bool] = args.delete_dst_snapshots_no_crosscheck
270 self.delete_dst_snapshots_except: Final[bool] = args.delete_dst_snapshots_except
271 self.delete_dst_datasets: Final[bool] = args.delete_dst_datasets
272 self.delete_empty_dst_datasets: Final[bool] = args.delete_empty_dst_datasets is not None
273 self.delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots: bool = (
274 args.delete_empty_dst_datasets == "snapshots+bookmarks"
275 )
276 self.compare_snapshot_lists: str = args.compare_snapshot_lists
277 self.daemon_lifetime_nanos: Final[int] = 1_000_000 * parse_duration_to_milliseconds(args.daemon_lifetime)
278 self.daemon_frequency: str = args.daemon_frequency
279 self.enable_privilege_elevation: bool = not args.no_privilege_elevation
280 self.no_stream: Final[bool] = args.no_stream
281 self.resume_recv: Final[bool] = not args.no_resume_recv
282 self.create_bookmarks: Final[str] = (
283 "none" if args.no_create_bookmark else args.create_bookmarks
284 ) # no_create_bookmark depr
285 self.use_bookmark: Final[bool] = not args.no_use_bookmark
287 self.src: Remote = Remote("src", args, self) # src dataset, host and ssh options
288 self.dst: Remote = Remote("dst", args, self) # dst dataset, host and ssh options
289 self.create_src_snapshots_config: Final[CreateSrcSnapshotConfig] = CreateSrcSnapshotConfig(args, self)
290 self.monitor_snapshots_config: Final[MonitorSnapshotsConfig] = MonitorSnapshotsConfig(args, self)
291 self.is_caching_snapshots: Final[bool] = args.cache_snapshots == "true"
293 self.compression_program: Final[str] = self._program_name(args.compression_program)
294 self.compression_program_opts: Final[list[str]] = self.split_args(args.compression_program_opts)
295 for opt in {"-o", "--output-file"}.intersection(self.compression_program_opts):
296 die(f"--compression-program-opts: {opt} is disallowed for security reasons.")
297 self.getconf_program: Final[str] = self._program_name("getconf") # print number of CPUs on POSIX
298 self.mbuffer_program: Final[str] = self._program_name(args.mbuffer_program)
299 self.mbuffer_program_opts: Final[list[str]] = self.split_args(args.mbuffer_program_opts)
300 for opt in {"-i", "-I", "-o", "-O", "-l", "-L", "-t", "-T", "-a", "-A"}.intersection(self.mbuffer_program_opts):
301 die(f"--mbuffer-program-opts: {opt} is disallowed for security reasons.")
302 self.ps_program: Final[str] = self._program_name(args.ps_program)
303 self.pv_program: Final[str] = self._program_name(args.pv_program)
304 self.pv_program_opts: Final[list[str]] = self.split_args(args.pv_program_opts)
305 bad_pv_opts = {"-o", "--output", "-f", "--log-file", "-S", "--stop-at-size", "-Y", "--sync", "-X", "--discard",
306 "-U", "--store-and-forward", "-d", "--watchfd", "-R", "--remote", "-P", "--pidfile"} # fmt: skip
307 for opt in bad_pv_opts.intersection(self.pv_program_opts):
308 die(f"--pv-program-opts: {opt} is disallowed for security reasons.")
309 self.isatty: Final[bool] = getenv_bool("isatty", True)
310 if args.bwlimit:
311 self.pv_program_opts.extend([f"--rate-limit={self.validate_arg_str(args.bwlimit)}"])
312 self.shell_program_local: Final[str] = "sh"
313 self.shell_program: str = self._program_name(args.shell_program)
314 self.ssh_program: Final[str] = self._program_name(args.ssh_program)
315 self.sudo_program: str = self._program_name(args.sudo_program)
316 self.uname_program: Final[str] = self._program_name("uname")
317 self.zfs_program: Final[str] = self._program_name("zfs")
318 self.zpool_program: Final[str] = self._program_name(args.zpool_program)
320 # no point creating complex shell pipeline commands for tiny data transfers:
321 self.min_pipe_transfer_size: Final[int] = getenv_int("min_pipe_transfer_size", 1024 * 1024)
322 self.max_datasets_per_batch_on_list_snaps: Final[int] = getenv_int("max_datasets_per_batch_on_list_snaps", 1024)
323 self.max_datasets_per_minibatch_on_list_snaps: int = getenv_int("max_datasets_per_minibatch_on_list_snaps", -1)
324 self.max_snapshots_per_minibatch_on_delete_snaps = getenv_int("max_snapshots_per_minibatch_on_delete_snaps", 2**29)
325 self.dedicated_tcp_connection_per_zfs_send: Final[bool] = getenv_bool("dedicated_tcp_connection_per_zfs_send", True)
326 # threads: with --force-once we intentionally coerce to a single-threaded run to ensure deterministic serial behavior
327 self.threads: Final[tuple[int, bool]] = (1, False) if self.force_once else args.threads
328 timeout_nanos = None if args.timeout is None else 1_000_000 * parse_duration_to_milliseconds(args.timeout)
329 self.timeout_nanos: int | None = timeout_nanos
330 self.no_estimate_send_size: Final[bool] = args.no_estimate_send_size
331 self.remote_conf_cache_ttl_nanos: int = 1_000_000 * parse_duration_to_milliseconds(args.daemon_remote_conf_cache_ttl)
332 self.terminal_columns: Final[int] = (
333 getenv_int("terminal_columns", shutil.get_terminal_size(fallback=(120, 24)).columns)
334 if self.isatty and self.pv_program != DISABLE_PRG and not self.quiet
335 else 0
336 )
338 self.os_cpu_count: Final[int | None] = os.cpu_count()
339 self.os_getuid: Final[int] = os.getuid()
340 self.os_geteuid: Final[int] = os.geteuid()
341 self.prog_version: Final[str] = __version__
342 self.python_version: Final[str] = sys.version
343 self.platform_version: Final[str] = platform.version()
344 self.platform_platform: Final[str] = platform.platform()
346 # mutable variables:
347 snapshot_filters = args.snapshot_filters_var if hasattr(args, SNAPSHOT_FILTERS_VAR) else [[]]
348 self.snapshot_filters: list[list[SnapshotFilter]] = [optimize_snapshot_filters(f) for f in snapshot_filters]
349 self.exclude_dataset_property: str | None = args.exclude_dataset_property
350 self.exclude_dataset_regexes: RegexList = [] # deferred to validate_task() phase
351 self.include_dataset_regexes: RegexList = [] # deferred to validate_task() phase
352 self.tmp_exclude_dataset_regexes: RegexList = [] # deferred to validate_task() phase
353 self.tmp_include_dataset_regexes: RegexList = [] # deferred to validate_task() phase
354 self.abs_exclude_datasets: list[str] = [] # deferred to validate_task() phase
355 self.abs_include_datasets: list[str] = [] # deferred to validate_task() phase
357 self.curr_zfs_send_program_opts: list[str] = []
358 self.zfs_recv_ox_names: set[str] = set()
359 self.available_programs: dict[str, dict[str, str]] = {}
360 self.zpool_features: dict[str, dict[str, dict[str, str]]] = {r.location: {} for r in [self.src, self.dst]}
361 self.connection_pools: dict[str, ConnectionPools] = {}
363 def split_args(self, text: str, *items: str | Iterable[str], allow_all: bool = False) -> list[str]:
364 """Splits option string on runs of one or more whitespace into an option list."""
365 text = text.strip()
366 opts = self.one_or_more_whitespace_regex.split(text) if text else []
367 xappend(opts, items)
368 if not allow_all:
369 self._validate_quoting(opts)
370 return opts
372 def validate_arg(self, opt: str, allow_spaces: bool = False, allow_all: bool = False) -> str | None:
373 """allow_all permits all characters, including whitespace and quotes; See squote() and dquote()."""
374 if allow_all or opt is None:
375 return opt
376 if any(char.isspace() and (char != " " or not allow_spaces) for char in opt):
377 die(f"Option must not contain a whitespace character{' other than space' if allow_spaces else ''}: {opt}")
378 self._validate_quoting([opt])
379 return opt
381 def validate_arg_str(self, opt: str, allow_spaces: bool = False, allow_all: bool = False) -> str:
382 """Returns validated option string, raising if missing or illegal."""
383 if opt is None:
384 die("Option must not be missing")
385 self.validate_arg(opt, allow_spaces=allow_spaces, allow_all=allow_all)
386 return opt
388 @staticmethod
389 def _validate_quoting(opts: list[str]) -> None:
390 """Raises an error if any option contains a quote or shell metacharacter."""
391 for opt in opts:
392 if "'" in opt or '"' in opt or "$" in opt or "`" in opt:
393 die(f"Option must not contain a single quote or double quote or dollar or backtick character: {opt}")
395 @staticmethod
396 def _fix_recv_opts(opts: list[str], preserve_properties: frozenset[str]) -> tuple[list[str], list[str]]:
397 """Returns sanitized ``zfs recv`` options and captured ``-o/-x`` args."""
398 return _fix_send_recv_opts(
399 opts,
400 exclude_long_opts={"--dryrun"},
401 exclude_short_opts="n",
402 include_arg_opts={"-o", "-x"},
403 preserve_properties=preserve_properties,
404 )
406 @staticmethod
407 def _fix_send_opts(opts: list[str]) -> list[str]:
408 """Returns sanitized ``zfs send`` options."""
409 return _fix_send_recv_opts(
410 opts,
411 exclude_long_opts={"--dryrun"},
412 exclude_short_opts="den",
413 include_arg_opts={"-X", "--exclude", "--redact"},
414 exclude_arg_opts=frozenset({"-i", "-I"}),
415 )[0]
417 def _program_name(self, program: str) -> str:
418 """For testing: helps simulate errors caused by external programs."""
419 self.validate_arg_str(program)
420 if not program:
421 die(f"Program name must not be missing: {program}")
422 for char in SHELL_CHARS + ":":
423 if char in program:
424 die(f"Program name must not contain a '{char}' character: {program}")
425 if self.inject_params.get("inject_unavailable_" + program, False):
426 return program + "-xxx" # substitute a program that cannot be found on the PATH
427 if self.inject_params.get("inject_failing_" + program, False):
428 return "false" # substitute a program that will error out with non-zero return code
429 return program
431 def _unset_matching_env_vars(self, args: argparse.Namespace) -> None:
432 """Unset environment variables matching regex filters."""
433 if len(args.exclude_envvar_regex) == 0 and len(args.include_envvar_regex) == 0:
434 return # fast path
435 exclude_envvar_regexes: RegexList = compile_regexes(args.exclude_envvar_regex)
436 include_envvar_regexes: RegexList = compile_regexes(args.include_envvar_regex)
437 # Mutate global state at most once, atomically. First thread wins. The latch isn't strictly necessary for
438 # correctness as all concurrent bzfs.Job instances in bzfs_jobrunner have identical include/exclude_envvar_regex
439 # anyway. It's just for reduced latency.
440 with _UNSET_ENV_VARS_LOCK:
441 if _UNSET_ENV_VARS_LATCH.get_and_set(False):
442 for envvar_name in list(os.environ):
443 # order of include vs exclude is intentionally reversed to correctly implement semantics:
444 # "unset env var iff excluded and not included (include takes precedence)."
445 if is_included(envvar_name, exclude_envvar_regexes, include_envvar_regexes):
446 os.environ.pop(envvar_name, None)
447 self.log.debug("Unsetting b/c envvar regex: %s", envvar_name)
449 def lock_file_name(self) -> str:
450 """Returns unique path used to detect concurrently running jobs.
452 Makes it such that a job that runs periodically declines to start if the same previous periodic job is still running
453 without completion yet. Hashed key avoids overly long filenames while remaining deterministic.
454 """
455 # fmt: off
456 key = (tuple(self.root_dataset_pairs), self.args.recursive, self.args.exclude_dataset_property,
457 tuple(self.args.include_dataset), tuple(self.args.exclude_dataset),
458 tuple(self.args.include_dataset_regex), tuple(self.args.exclude_dataset_regex),
459 tuple(tuple(f) for f in self.snapshot_filters), self.args.skip_replication, self.args.create_src_snapshots,
460 self.args.create_src_snapshots_plan, self.args.create_src_snapshots_timeformat,
461 self.create_src_snapshots_config.anchors,
462 self.args.delete_dst_datasets, self.args.delete_dst_snapshots, self.args.delete_dst_snapshots_except,
463 self.args.delete_empty_dst_datasets,
464 self.args.compare_snapshot_lists, self.args.monitor_snapshots,
465 self.src.basis_ssh_host, self.dst.basis_ssh_host,
466 self.src.basis_ssh_user, self.dst.basis_ssh_user)
467 # fmt: on
468 hash_code: str = sha256_hex(str(key))
469 log_parent_dir: str = os.path.dirname(self.log_params.log_dir)
470 locks_dir: str = os.path.join(log_parent_dir, ".locks")
471 os.makedirs(locks_dir, mode=DIR_PERMISSIONS, exist_ok=True)
472 validate_is_not_a_symlink("--locks-dir ", locks_dir)
473 validate_file_permissions(locks_dir, DIR_PERMISSIONS)
474 return os.path.join(locks_dir, f"{PROG_NAME}-lockfile-{hash_code}.lock")
476 def dry(self, msg: str) -> str:
477 """Prefix ``msg`` with 'Dry' when running in dry-run mode."""
478 return bzfs_main.utils.dry(msg, self.dry_run)
480 def is_program_available(self, program: str, location: str) -> bool:
481 """Return True if ``program`` was detected on ``location`` host."""
482 return program in self.available_programs.get(location, {})
485#############################################################################
486class Remote:
487 """Connection settings for either source or destination host."""
489 def __init__(self, loc: str, args: argparse.Namespace, p: Params) -> None:
490 """Reads from ArgumentParser via args."""
491 # immutable variables:
492 assert loc == "src" or loc == "dst"
493 self.location: Final[str] = loc
494 self.params: Final[Params] = p
495 self.basis_ssh_user: str = getattr(args, f"ssh_{loc}_user")
496 self.basis_ssh_host: str = getattr(args, f"ssh_{loc}_host")
497 self.ssh_port: int | None = getattr(args, f"ssh_{loc}_port")
498 self.ssh_config_file: Final[str | None] = p.validate_arg(getattr(args, f"ssh_{loc}_config_file"))
499 if self.ssh_config_file:
500 if "bzfs_ssh_config" not in os.path.basename(self.ssh_config_file):
501 die(f"Basename of --ssh-{loc}-config-file must contain substring 'bzfs_ssh_config': {self.ssh_config_file}")
502 self.ssh_config_file_hash: str = (
503 sha256_urlsafe_base64(os.path.abspath(self.ssh_config_file), padding=False) if self.ssh_config_file else ""
504 )
505 self.ssh_cipher: Final[str] = p.validate_arg_str(args.ssh_cipher)
506 # disable interactive password prompts and X11 forwarding and pseudo-terminal allocation:
507 self.ssh_extra_opts: list[str] = ["-oBatchMode=yes", "-oServerAliveInterval=0", "-x", "-T"] + (
508 ["-v"] if args.verbose >= 3 else []
509 )
510 self.max_concurrent_ssh_sessions_per_tcp_connection: Final[int] = args.max_concurrent_ssh_sessions_per_tcp_connection
511 self.ssh_exit_on_shutdown: Final[bool] = args.ssh_exit_on_shutdown
512 self.ssh_control_persist_secs: Final[int] = args.ssh_control_persist_secs
513 self.socket_prefix: Final[str] = "s"
514 self.reuse_ssh_connection: bool = getenv_bool("reuse_ssh_connection", True)
515 if self.reuse_ssh_connection:
516 ssh_home_dir: str = os.path.join(HOME_DIRECTORY, ".ssh")
517 os.makedirs(ssh_home_dir, mode=DIR_PERMISSIONS, exist_ok=True)
518 self.ssh_socket_dir: Final[str] = os.path.join(ssh_home_dir, "bzfs")
519 os.makedirs(self.ssh_socket_dir, mode=DIR_PERMISSIONS, exist_ok=True)
520 validate_file_permissions(self.ssh_socket_dir, mode=DIR_PERMISSIONS)
521 self.ssh_exit_on_shutdown_socket_dir: Final[str] = os.path.join(self.ssh_socket_dir, "x")
522 os.makedirs(self.ssh_exit_on_shutdown_socket_dir, mode=DIR_PERMISSIONS, exist_ok=True)
523 validate_file_permissions(self.ssh_exit_on_shutdown_socket_dir, mode=DIR_PERMISSIONS)
524 _delete_stale_files(self.ssh_exit_on_shutdown_socket_dir, self.socket_prefix, ssh=True)
525 self.sanitize1_regex: Final[re.Pattern[str]] = re.compile(r"[\s\\/@$]") # replace whitespace, /, $, \, @ with ~ char
526 self.sanitize2_regex: Final[re.Pattern[str]] = re.compile(rf"[^a-zA-Z0-9{re.escape('~.:_-')}]") # remove bad chars
528 # mutable variables:
529 self.root_dataset: str = "" # deferred until run_main()
530 self.basis_root_dataset: str = "" # deferred until run_main()
531 self.pool: str = ""
532 self.sudo: str = ""
533 self.use_zfs_delegation: bool = False
534 self.ssh_user: str = ""
535 self.ssh_host: str = ""
536 self.ssh_user_host: str = ""
537 self.is_nonlocal: bool = False
539 def local_ssh_command(self, socket_file: str | None) -> list[str]:
540 """Returns the ssh CLI command to run locally in order to talk to the remote host; This excludes the (trailing)
541 command to run on the remote host, which will be appended later."""
542 if not self.ssh_user_host:
543 return [] # dataset is on local host - don't use ssh
545 # dataset is on remote host
546 p: Params = self.params
547 if p.ssh_program == DISABLE_PRG:
548 die("Cannot talk to remote host because ssh CLI is disabled.")
549 ssh_cmd: list[str] = [p.ssh_program] + self.ssh_extra_opts
550 if self.ssh_config_file:
551 ssh_cmd += ["-F", self.ssh_config_file]
552 if self.ssh_cipher: 552 ↛ 554line 552 didn't jump to line 554 because the condition on line 552 was always true
553 ssh_cmd += ["-c", self.ssh_cipher]
554 if self.ssh_port:
555 ssh_cmd += ["-p", str(self.ssh_port)]
556 if self.reuse_ssh_connection:
557 # Performance: reuse ssh connection for low latency startup of frequent ssh invocations via the 'ssh -S' and
558 # 'ssh -S -M -oControlPersist=60s' options. See https://en.wikibooks.org/wiki/OpenSSH/Cookbook/Multiplexing
559 if not socket_file:
560 # Generate unique private Unix domain socket file name in user's home dir and pass it to 'ssh -S /path/to/socket'
561 def sanitize(name: str) -> str:
562 name = self.sanitize1_regex.sub("~", name) # replace whitespace, /, $, \, @ with a ~ tilde char
563 name = self.sanitize2_regex.sub("", name) # Remove disallowed chars
564 return name
566 max_rand: int = 999_999_999_999
567 rand_str: str = urlsafe_base64(random.SystemRandom().randint(0, max_rand), max_value=max_rand, padding=False)
568 curr_time: str = urlsafe_base64(time.time_ns(), max_value=2**64 - 1, padding=False)
569 unique: str = f"{os.getpid()}@{curr_time}@{rand_str}"
570 optional: str = f"@{sanitize(self.ssh_host)[:45]}@{sanitize(self.ssh_user)}"
571 socket_name: str = f"{self.socket_prefix}{unique}{optional}"
572 socket_file = os.path.join(self.ssh_exit_on_shutdown_socket_dir, socket_name)
573 socket_file = socket_file[0 : max(UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH, len(socket_file) - len(optional))]
574 # `ssh` will error out later if the max OS Unix domain socket path limit cannot be met reasonably as the
575 # home directory path is too long, typically because the Unix user name is unreasonably long.
576 ssh_cmd += ["-S", socket_file]
577 ssh_cmd += [self.ssh_user_host]
578 return ssh_cmd
580 def cache_key(self) -> tuple[str, str, int | None, str | None]:
581 """Returns tuple uniquely identifying this Remote for caching."""
582 return self.location, self.ssh_user_host, self.ssh_port, self.ssh_config_file
584 def cache_namespace(self) -> str:
585 """Returns cache namespace string which is a stable, unique directory component for snapshot caches that
586 distinguishes endpoints by username+host+port+ssh_config_file where applicable, and uses '-' when no user/host is
587 present (local mode)."""
588 if not self.ssh_user_host:
589 return "-" # local mode
590 return f"{self.ssh_user_host}#{self.ssh_port or ''}#{self.ssh_config_file_hash}"
592 def __repr__(self) -> str:
593 return str(self.__dict__)
596#############################################################################
597class CopyPropertiesConfig:
598 """--zfs-recv-o* and --zfs-recv-x* option groups for copying or excluding ZFS properties on receive."""
600 def __init__(self, group: str, flag: str, args: argparse.Namespace, p: Params) -> None:
601 """Reads from ArgumentParser via args."""
602 assert group in ZFS_RECV_GROUPS
603 # immutable variables:
604 grup: str = group
605 self.group: Final[str] = group # one of zfs_recv_o, zfs_recv_x
606 self.flag: Final[str] = flag # one of -o or -x
607 sources: str = p.validate_arg_str(getattr(args, f"{grup}_sources"))
608 self.sources: Final[str] = ",".join(sorted([s.strip() for s in sources.strip().split(",")])) # canonicalize
609 self.targets: Final[str] = p.validate_arg_str(getattr(args, f"{grup}_targets"))
610 include_regexes: list[str] | None = getattr(args, f"{grup}_include_regex")
611 assert ZFS_RECV_O in ZFS_RECV_GROUPS
612 if include_regexes is None:
613 include_regexes = [ZFS_RECV_O_INCLUDE_REGEX_DEFAULT] if grup == ZFS_RECV_O else []
614 self.include_regexes: Final[RegexList] = compile_regexes(include_regexes)
615 self.exclude_regexes: Final[RegexList] = compile_regexes(getattr(args, f"{grup}_exclude_regex"))
617 def __repr__(self) -> str:
618 return str(self.__dict__)
621#############################################################################
622class SnapshotLabel(NamedTuple):
623 """Contains the individual parts that are concatenated into a ZFS snapshot name."""
625 prefix: str # bzfs_
626 infix: str # us-west-1_
627 timestamp: str # 2024-11-06_08:30:05
628 suffix: str # _hourly
630 def __str__(self) -> str: # bzfs_us-west-1_2024-11-06_08:30:05_hourly
631 return f"{self.prefix}{self.infix}{self.timestamp}{self.suffix}"
633 def notimestamp_str(self) -> str: # bzfs_us-west-1_hourly
634 """Returns the concatenation of all parts except for the timestamp part."""
635 return f"{self.prefix}{self.infix}{self.suffix}"
637 def validate_label(self, input_text: str) -> None:
638 """Validates that the composed snapshot label forms a legal name."""
639 name: str = str(self)
640 validate_dataset_name(name, input_text)
641 if "/" in name:
642 die(f"Invalid ZFS snapshot name: '{name}' for: '{input_text}'")
643 for key, value in {"prefix": self.prefix, "infix": self.infix, "suffix": self.suffix}.items():
644 if key == "prefix":
645 if not value.endswith("_"):
646 die(f"Invalid {input_text}{key}: Must end with an underscore character: '{value}'")
647 if value.count("_") > 1:
648 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'")
649 elif key == "infix":
650 if value:
651 if not value.endswith("_"):
652 die(f"Invalid {input_text}{key}: Must end with an underscore character: '{value}'")
653 if value.count("_") > 1:
654 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'")
655 elif value:
656 if not value.startswith("_"):
657 die(f"Invalid {input_text}{key}: Must start with an underscore character: '{value}'")
658 if value.count("_") > 1:
659 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'")
662#############################################################################
663class CreateSrcSnapshotConfig:
664 """Option values for --create-src-snapshots, that is, for automatically creating source snapshots."""
666 def __init__(self, args: argparse.Namespace, p: Params) -> None:
667 """Option values for --create-src-snapshots*; reads from ArgumentParser via args."""
668 # immutable variables:
669 self.skip_create_src_snapshots: Final[bool] = not args.create_src_snapshots
670 self.create_src_snapshots_even_if_not_due: Final[bool] = args.create_src_snapshots_even_if_not_due
671 tz_spec: str | None = args.create_src_snapshots_timezone if args.create_src_snapshots_timezone else None
672 self.tz: tzinfo | None = get_timezone(tz_spec)
673 self.current_datetime: datetime = current_datetime(tz_spec)
674 self.timeformat: Final[str] = args.create_src_snapshots_timeformat
675 self.anchors: Final[PeriodAnchors] = PeriodAnchors.parse(args)
677 # Compute the schedule for upcoming periodic time events (suffix_durations). This event schedule is also used in
678 # daemon mode via sleep_until_next_daemon_iteration()
679 suffixes: list[str] = []
680 labels: list[SnapshotLabel] = []
681 create_src_snapshots_plan: str = args.create_src_snapshots_plan or str({"bzfs": {"onsite": {"adhoc": 1}}})
682 for org, target_periods in ast.literal_eval(create_src_snapshots_plan).items():
683 for target, periods in target_periods.items():
684 for period_unit, period_amount in periods.items(): # e.g. period_unit can be "10minutely" or "minutely"
685 if not isinstance(period_amount, int) or period_amount < 0:
686 die(f"--create-src-snapshots-plan: Period amount must be a non-negative integer: {period_amount}")
687 if period_amount > 0:
688 suffix: str = nsuffix(period_unit)
689 suffixes.append(suffix)
690 labels.append(SnapshotLabel(prefix=nprefix(org), infix=ninfix(target), timestamp="", suffix=suffix))
691 xperiods: SnapshotPeriods = p.xperiods
692 if self.skip_create_src_snapshots:
693 duration_amount, duration_unit = p.xperiods.suffix_to_duration0(p.daemon_frequency)
694 if duration_amount <= 0 or not duration_unit:
695 die(f"Invalid --daemon-frequency: {p.daemon_frequency}")
696 suffixes = [nsuffix(p.daemon_frequency)]
697 labels = []
698 suffix_durations: dict[str, tuple[int, str]] = {suffix: xperiods.suffix_to_duration1(suffix) for suffix in suffixes}
700 def suffix_key(suffix: str) -> tuple[int, str]:
701 duration_amount, duration_unit = suffix_durations[suffix]
702 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0)
703 if suffix.endswith(("hourly", "minutely", "secondly")):
704 if duration_milliseconds != 0 and 86400 * 1000 % duration_milliseconds != 0:
705 die(
706 "Invalid --create-src-snapshots-plan: Period duration should be a divisor of 86400 seconds "
707 f"without remainder so that snapshots will be created at the same time of day every day: {suffix}"
708 )
709 if suffix.endswith("monthly"):
710 if duration_amount != 0 and 12 % duration_amount != 0:
711 die(
712 "Invalid --create-src-snapshots-plan: Period duration should be a divisor of 12 months "
713 f"without remainder so that snapshots will be created at the same time every year: {suffix}"
714 )
715 return duration_milliseconds, suffix
717 suffixes = sorted(suffixes, key=suffix_key, reverse=True) # take snapshots for dailies before hourlies, and so on
718 self.suffix_durations: Final[dict[str, tuple[int, str]]] = {sfx: suffix_durations[sfx] for sfx in suffixes} # sort
719 suffix_indexes: dict[str, int] = {suffix: k for k, suffix in enumerate(suffixes)}
720 labels.sort(key=lambda label: (suffix_indexes[label.suffix], label)) # take snapshots for dailies before hourlies
721 self._snapshot_labels: Final[list[SnapshotLabel]] = labels
722 for label in self.snapshot_labels():
723 label.validate_label("--create-src-snapshots-plan ")
725 def snapshot_labels(self) -> list[SnapshotLabel]:
726 """Returns the snapshot name patterns for which snapshots shall be created."""
727 timeformat: str = self.timeformat
728 is_millis: bool = timeformat.endswith("%F") # non-standard hack to append milliseconds
729 if is_millis:
730 timeformat = timeformat[0:-1] + "f" # replace %F with %f (append microseconds)
731 timestamp: str = self.current_datetime.strftime(timeformat)
732 if is_millis:
733 timestamp = timestamp[0 : -len("000")] # replace microseconds with milliseconds
734 timestamp = timestamp.replace("+", "z") # zfs CLI does not accept the '+' character in snapshot names
735 return [SnapshotLabel(label.prefix, label.infix, timestamp, label.suffix) for label in self._snapshot_labels]
737 def __repr__(self) -> str:
738 return str(self.__dict__)
741#############################################################################
742@dataclass(frozen=True)
743class AlertConfig:
744 """Thresholds controlling when alerts fire for snapshot age."""
746 kind: Literal["Latest", "Oldest"]
747 warning_millis: int
748 critical_millis: int
751#############################################################################
752@dataclass(frozen=True)
753class MonitorSnapshotAlert:
754 """Alert configuration for a single monitored snapshot label."""
756 label: SnapshotLabel
757 latest: AlertConfig | None
758 oldest: AlertConfig | None
761#############################################################################
762class MonitorSnapshotsConfig:
763 """Option values for --monitor-snapshots*, that is, policy describing which snapshots to monitor for staleness."""
765 def __init__(self, args: argparse.Namespace, p: Params) -> None:
766 """Reads from ArgumentParser via args."""
767 # immutable variables:
768 self.monitor_snapshots: Final[dict] = ast.literal_eval(args.monitor_snapshots)
769 self.dont_warn: Final[bool] = args.monitor_snapshots_dont_warn
770 self.dont_crit: Final[bool] = args.monitor_snapshots_dont_crit
771 self.no_latest_check: Final[bool] = args.monitor_snapshots_no_latest_check
772 self.no_oldest_check: Final[bool] = args.monitor_snapshots_no_oldest_check
773 alerts: list[MonitorSnapshotAlert] = []
774 xperiods: SnapshotPeriods = p.xperiods
775 for org, target_periods in self.monitor_snapshots.items():
776 prefix: str = nprefix(org)
777 for target, periods in target_periods.items():
778 for period_unit, alert_dicts in periods.items(): # e.g. period_unit can be "10minutely" or "minutely"
779 label = SnapshotLabel(prefix=prefix, infix=ninfix(target), timestamp="", suffix=nsuffix(period_unit))
780 alert_latest, alert_oldest = None, None
781 for alert_type, alert_dict in alert_dicts.items():
782 m = "--monitor-snapshots: "
783 if alert_type not in ["latest", "oldest"]:
784 die(f"{m}'{alert_type}' must be 'latest' or 'oldest' within {args.monitor_snapshots}")
785 warning_millis: int = 0
786 critical_millis: int = 0
787 cycles: int = 1
788 for kind, value in alert_dict.items():
789 context: str = args.monitor_snapshots
790 if kind == "warning":
791 warning_millis = max(0, parse_duration_to_milliseconds(str(value), context=context))
792 elif kind == "critical":
793 critical_millis = max(0, parse_duration_to_milliseconds(str(value), context=context))
794 elif kind == "cycles":
795 cycles = max(0, int(value))
796 else:
797 die(f"{m}'{kind}' must be 'warning', 'critical' or 'cycles' within {context}")
798 if warning_millis > 0 or critical_millis > 0:
799 duration_amount, duration_unit = xperiods.suffix_to_duration1(label.suffix)
800 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0)
801 warning_millis += 0 if warning_millis <= 0 else cycles * duration_milliseconds
802 critical_millis += 0 if critical_millis <= 0 else cycles * duration_milliseconds
803 warning_millis = UNIX_TIME_INFINITY_SECS if warning_millis <= 0 else warning_millis
804 critical_millis = UNIX_TIME_INFINITY_SECS if critical_millis <= 0 else critical_millis
805 capitalized_alert_type = cast(Literal["Latest", "Oldest"], sys.intern(alert_type.capitalize()))
806 alert_config = AlertConfig(capitalized_alert_type, warning_millis, critical_millis)
807 if alert_type == "latest":
808 if not self.no_latest_check:
809 alert_latest = alert_config
810 else:
811 assert alert_type == "oldest"
812 if not self.no_oldest_check:
813 alert_oldest = alert_config
814 if alert_latest is not None or alert_oldest is not None:
815 alerts.append(MonitorSnapshotAlert(label, alert_latest, alert_oldest))
817 def alert_sort_key(alert: MonitorSnapshotAlert) -> tuple[int, SnapshotLabel]:
818 duration_amount, duration_unit = xperiods.suffix_to_duration1(alert.label.suffix)
819 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0)
820 return duration_milliseconds, alert.label
822 alerts.sort(key=alert_sort_key, reverse=True) # check snapshots for dailies before hourlies, and so on
823 self.alerts: Final[list[MonitorSnapshotAlert]] = alerts
824 self.enable_monitor_snapshots: Final[bool] = len(alerts) > 0
826 def __repr__(self) -> str:
827 return str(self.__dict__)
830#############################################################################
831def _fix_send_recv_opts(
832 opts: list[str],
833 exclude_long_opts: set[str],
834 exclude_short_opts: str,
835 include_arg_opts: set[str],
836 exclude_arg_opts: frozenset[str] = frozenset(),
837 preserve_properties: frozenset[str] = frozenset(),
838) -> tuple[list[str], list[str]]:
839 """These opts are instead managed via bzfs CLI args --dryrun, etc."""
840 assert "-" not in exclude_short_opts
841 results: list[str] = []
842 x_names: set[str] = set(preserve_properties)
843 i = 0
844 n = len(opts)
845 while i < n:
846 opt: str = opts[i]
847 i += 1
848 if opt in exclude_arg_opts: # example: {"-X", "--exclude"}
849 i += 1
850 continue
851 elif opt in include_arg_opts: # example: {"-o", "-x"}
852 results.append(opt)
853 if i < n:
854 if opt == "-o" and "=" in opts[i] and opts[i].split("=", 1)[0] in preserve_properties:
855 die(f"--preserve-properties: Disallowed ZFS property found in --zfs-recv-program-opt(s): -o {opts[i]}")
856 if opt == "-x":
857 x_names.discard(opts[i])
858 results.append(opts[i])
859 i += 1
860 elif opt not in exclude_long_opts: # example: {"--dryrun", "--verbose"}
861 if opt.startswith("-") and opt != "-" and not opt.startswith("--"):
862 for char in exclude_short_opts: # example: "den"
863 opt = opt.replace(char, "")
864 if opt == "-":
865 continue
866 results.append(opt)
867 return results, sorted(x_names)
870SSH_MASTER_DOMAIN_SOCKET_FILE_PID_REGEX: Final[re.Pattern[str]] = re.compile(r"^[0-9]+") # see local_ssh_command()
873def _delete_stale_files(
874 root_dir: str,
875 prefix: str,
876 millis: int = 60 * 60 * 1000,
877 dirs: bool = False,
878 exclude: str | None = None,
879 ssh: bool = False,
880) -> None:
881 """Cleans up obsolete files; For example caused by abnormal termination, OS crash."""
882 seconds: float = millis / 1000
883 now: float = time.time()
884 validate_is_not_a_symlink("", root_dir)
885 with os.scandir(root_dir) as iterator:
886 for entry in iterator:
887 if entry.name == exclude or not entry.name.startswith(prefix):
888 continue
889 try:
890 stats = entry.stat(follow_symlinks=False)
891 is_dir = entry.is_dir(follow_symlinks=False)
892 if ((dirs and is_dir) or (not dirs and not is_dir)) and now - stats.st_mtime >= seconds:
893 if dirs:
894 shutil.rmtree(entry.path, ignore_errors=True)
895 elif not (ssh and stat.S_ISSOCK(stats.st_mode)):
896 os.remove(entry.path)
897 elif match := SSH_MASTER_DOMAIN_SOCKET_FILE_PID_REGEX.match(entry.name[len(prefix) :]): 897 ↛ 886line 897 didn't jump to line 886 because the condition on line 897 was always true
898 pid: int = int(match.group(0))
899 if pid_exists(pid) is False or now - stats.st_mtime >= 31 * 24 * 60 * 60:
900 os.remove(entry.path) # bzfs process is no longer alive; its ssh master process isn't either
901 except FileNotFoundError:
902 pass # harmless
905def _create_symlink(src: str, dst_dir: str, dst: str) -> None:
906 """Creates dst symlink pointing to src using a relative path."""
907 rel_path: str = os.path.relpath(src, start=dst_dir)
908 os.symlink(src=rel_path, dst=os.path.join(dst_dir, dst))