Coverage for bzfs_main / configuration.py: 99%
600 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Configuration subsystem; All CLI option/parameter values are reachable from the "Params" class."""
17from __future__ import (
18 annotations,
19)
20import argparse
21import ast
22import os
23import platform
24import random
25import re
26import shutil
27import stat
28import sys
29import tempfile
30import threading
31import time
32from collections.abc import (
33 Iterable,
34)
35from dataclasses import (
36 dataclass,
37)
38from datetime import (
39 datetime,
40 tzinfo,
41)
42from logging import (
43 Logger,
44)
45from typing import (
46 Final,
47 Literal,
48 NamedTuple,
49 cast,
50 final,
51)
53from bzfs_main.argparse_actions import (
54 SnapshotFilter,
55 optimize_snapshot_filters,
56)
57from bzfs_main.argparse_cli import (
58 LOG_DIR_DEFAULT,
59 ZFS_RECV_GROUPS,
60 ZFS_RECV_O,
61 ZFS_RECV_O_INCLUDE_REGEX_DEFAULT,
62 __version__,
63)
64from bzfs_main.detect import (
65 DISABLE_PRG,
66)
67from bzfs_main.filter import (
68 SNAPSHOT_FILTERS_VAR,
69)
70from bzfs_main.period_anchors import (
71 PeriodAnchors,
72)
73from bzfs_main.util import (
74 utils,
75)
76from bzfs_main.util.connection import (
77 ConnectionPools,
78 MiniParams,
79 MiniRemote,
80)
81from bzfs_main.util.retry import (
82 RetryPolicy,
83)
84from bzfs_main.util.utils import (
85 DIR_PERMISSIONS,
86 FILE_PERMISSIONS,
87 PROG_NAME,
88 SHELL_CHARS,
89 UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH,
90 UNIX_TIME_INFINITY_SECS,
91 RegexList,
92 SnapshotPeriods,
93 SynchronizedBool,
94 append_if_absent,
95 compile_regexes,
96 current_datetime,
97 die,
98 get_home_directory,
99 get_timezone,
100 getenv_bool,
101 getenv_int,
102 is_included,
103 ninfix,
104 nprefix,
105 nsuffix,
106 parse_duration_to_milliseconds,
107 pid_exists,
108 sha256_hex,
109 sha256_urlsafe_base64,
110 urlsafe_base64,
111 validate_dataset_name,
112 validate_file_permissions,
113 validate_is_not_a_symlink,
114 validate_property_name,
115 xappend,
116)
118# constants:
119_UNSET_ENV_VARS_LOCK: Final[threading.Lock] = threading.Lock()
120_UNSET_ENV_VARS_LATCH: Final[SynchronizedBool] = SynchronizedBool(True)
123#############################################################################
124@final
125class LogParams:
126 """Option values for logging."""
128 def __init__(self, args: argparse.Namespace) -> None:
129 """Reads from ArgumentParser via args."""
130 # immutable variables:
131 if args.quiet:
132 log_level: str = "ERROR"
133 elif args.verbose >= 2:
134 log_level = "TRACE"
135 elif args.verbose >= 1:
136 log_level = "DEBUG"
137 else:
138 log_level = "INFO"
139 self.log_level: Final[str] = log_level
140 self.timestamp: Final[str] = datetime.now().isoformat(sep="_", timespec="seconds") # 2024-09-03_12:26:15
141 self.isatty: Final[bool] = getenv_bool("isatty", True)
142 self.quiet: Final[bool] = args.quiet
143 self.terminal_columns: Final[int] = (
144 getenv_int("terminal_columns", shutil.get_terminal_size(fallback=(120, 24)).columns)
145 if self.isatty and args.pv_program != DISABLE_PRG and not self.quiet
146 else 0
147 )
148 self.home_dir: Final[str] = get_home_directory()
149 log_parent_dir: Final[str] = args.log_dir if args.log_dir else os.path.join(self.home_dir, LOG_DIR_DEFAULT)
150 if LOG_DIR_DEFAULT not in os.path.basename(log_parent_dir):
151 die(f"Basename of --log-dir must contain the substring '{LOG_DIR_DEFAULT}', but got: {log_parent_dir}")
152 sep: str = "_" if args.log_subdir == "daily" else ":"
153 timestamp: str = self.timestamp
154 subdir: str = timestamp[0 : timestamp.rindex(sep) if args.log_subdir == "minutely" else timestamp.index(sep)]
155 # 2024-09-03 (d), 2024-09-03_12 (h), 2024-09-03_12:26 (m)
156 self.log_dir: Final[str] = os.path.join(log_parent_dir, subdir)
157 os.makedirs(log_parent_dir, mode=DIR_PERMISSIONS, exist_ok=True)
158 validate_is_not_a_symlink("--log-dir ", log_parent_dir)
159 validate_file_permissions(log_parent_dir, DIR_PERMISSIONS)
160 os.makedirs(self.log_dir, mode=DIR_PERMISSIONS, exist_ok=True)
161 validate_is_not_a_symlink("--log-dir subdir ", self.log_dir)
162 validate_file_permissions(self.log_dir, DIR_PERMISSIONS)
163 self.log_file_prefix: Final[str] = args.log_file_prefix
164 self.log_file_infix: Final[str] = args.log_file_infix
165 self.log_file_suffix: Final[str] = args.log_file_suffix
166 fd, self.log_file = tempfile.mkstemp(
167 suffix=".log",
168 prefix=f"{self.log_file_prefix}{self.timestamp}{self.log_file_infix}{self.log_file_suffix}-",
169 dir=self.log_dir,
170 )
171 os.fchmod(fd, FILE_PERMISSIONS)
172 os.close(fd)
173 self.pv_log_file: Final[str] = self.log_file[0 : -len(".log")] + ".pv"
174 log_file_stem: str = os.path.basename(self.log_file)[0 : -len(".log")]
175 # Python's standard logger naming API interprets chars such as '.', '-', ':', spaces, etc in special ways, e.g.
176 # logging.getLogger("foo.bar") vs logging.getLogger("foo-bar"). Thus, we sanitize the Python logger name via a regex:
177 self.logger_name_suffix: Final[str] = re.sub(r"[^A-Za-z0-9_]", repl="_", string=log_file_stem)
178 cache_root_dir: str = os.path.join(log_parent_dir, ".cache")
179 os.makedirs(cache_root_dir, mode=DIR_PERMISSIONS, exist_ok=True)
180 validate_file_permissions(cache_root_dir, DIR_PERMISSIONS)
181 self.last_modified_cache_dir: Final[str] = os.path.join(cache_root_dir, "mods")
183 # Create/update "current" symlink to current_dir, which is a subdir containing further symlinks to log files.
184 # For parallel usage, ensures there is no time window when the symlinks are inconsistent or do not exist.
185 current: str = "current"
186 dot_current_dir: str = os.path.join(log_parent_dir, f".{current}")
187 current_dir: str = os.path.join(dot_current_dir, log_file_stem)
188 os.makedirs(dot_current_dir, mode=DIR_PERMISSIONS, exist_ok=True)
189 validate_is_not_a_symlink("--log-dir: .current ", dot_current_dir)
190 try:
191 os.makedirs(current_dir, mode=DIR_PERMISSIONS, exist_ok=True)
192 _create_symlink(self.log_file, current_dir, f"{current}.log")
193 _create_symlink(self.pv_log_file, current_dir, f"{current}.pv")
194 _create_symlink(self.log_dir, current_dir, f"{current}.dir")
195 dst_file: str = os.path.join(current_dir, current)
196 os.symlink(os.path.relpath(current_dir, start=log_parent_dir), dst_file)
197 os.replace(dst_file, os.path.join(log_parent_dir, current)) # atomic rename
198 _delete_stale_files(dot_current_dir, prefix="", millis=5000, dirs=True, exclude=os.path.basename(current_dir))
199 except FileNotFoundError:
200 pass # harmless concurrent cleanup
202 def __repr__(self) -> str:
203 return str(self.__dict__)
206#############################################################################
207@final
208class Params(MiniParams):
209 """All parsed CLI options combined into a single bundle; simplifies passing around numerous settings and defaults."""
211 def __init__(
212 self,
213 args: argparse.Namespace,
214 sys_argv: list[str],
215 log_params: LogParams,
216 log: Logger,
217 inject_params: dict[str, bool] | None = None,
218 ) -> None:
219 """Reads from ArgumentParser via args."""
220 # immutable variables:
221 assert args is not None
222 assert isinstance(sys_argv, list)
223 assert log_params is not None
224 assert log is not None
225 self.args: Final[argparse.Namespace] = args
226 self.sys_argv: Final[list[str]] = sys_argv
227 self.log_params: Final[LogParams] = log_params
228 self.log: Logger = log
229 self.inject_params: Final[dict[str, bool]] = inject_params if inject_params is not None else {} # for testing only
230 self.one_or_more_whitespace_regex: Final[re.Pattern[str]] = re.compile(r"\s+")
231 self.two_or_more_spaces_regex: Final[re.Pattern[str]] = re.compile(r" +")
232 self._unset_matching_env_vars(args)
233 self.xperiods: Final[SnapshotPeriods] = SnapshotPeriods()
235 assert len(args.root_dataset_pairs) > 0
236 self.root_dataset_pairs: Final[list[tuple[str, str]]] = args.root_dataset_pairs
237 self.recursive: Final[bool] = args.recursive
238 self.recursive_flag: Final[str] = "-r" if args.recursive else ""
240 self.dry_run: Final[bool] = args.dryrun is not None
241 self.dry_run_recv: Final[str] = "-n" if self.dry_run else ""
242 self.dry_run_destroy: Final[str] = self.dry_run_recv
243 self.dry_run_no_send: Final[bool] = args.dryrun == "send"
244 self.verbose_zfs: Final[bool] = args.verbose >= 2
245 self.verbose_destroy: Final[str] = "" if args.quiet else "-v"
247 self.zfs_send_program_opts: Final[list[str]] = self._fix_send_opts(self.split_args(args.zfs_send_program_opts))
248 zfs_recv_program_opts: list[str] = self.split_args(args.zfs_recv_program_opts)
249 for extra_opt in args.zfs_recv_program_opt:
250 zfs_recv_program_opts.append(self.validate_arg_str(extra_opt, allow_all=True))
251 preserve_properties = [validate_property_name(name, "--preserve-properties") for name in args.preserve_properties]
252 zfs_recv_program_opts, zfs_recv_x_names = self._fix_recv_opts(zfs_recv_program_opts, frozenset(preserve_properties))
253 self.zfs_recv_program_opts: Final[list[str]] = zfs_recv_program_opts
254 self.zfs_recv_x_names: Final[list[str]] = zfs_recv_x_names
255 if self.verbose_zfs:
256 append_if_absent(self.zfs_send_program_opts, "-v")
257 append_if_absent(self.zfs_recv_program_opts, "-v")
258 # zfs_full_recv_opts: dataset-specific dynamic -o/-x property options are computed later per dataset in
259 # replication._add_recv_property_options():
260 self.zfs_full_recv_opts: Final[list[str]] = self.zfs_recv_program_opts.copy()
261 cpconfigs = [CopyPropertiesConfig(group, flag, args, self) for group, flag in ZFS_RECV_GROUPS.items()]
262 self.zfs_recv_o_config, self.zfs_recv_x_config, self.zfs_set_config = cpconfigs
264 self.force_rollback_to_latest_snapshot: Final[bool] = args.force_rollback_to_latest_snapshot
265 self.force_rollback_to_latest_common_snapshot: Final[SynchronizedBool] = SynchronizedBool(
266 args.force_rollback_to_latest_common_snapshot
267 )
268 self.force: Final[SynchronizedBool] = SynchronizedBool(args.force)
269 self.force_once: Final[bool] = args.force_once
270 self.force_unmount: Final[str] = "-f" if args.force_unmount else ""
271 self.force_hard: Final[str] = "-R" if args.force_destroy_dependents else ""
273 self.skip_parent: Final[bool] = args.skip_parent
274 self.skip_missing_snapshots: Final[str] = args.skip_missing_snapshots
275 self.skip_on_error: Final[str] = args.skip_on_error
276 self.retry_policy: Final[RetryPolicy] = RetryPolicy.from_namespace(args)
277 self.skip_replication: Final[bool] = args.skip_replication
278 self.delete_dst_snapshots: Final[bool] = args.delete_dst_snapshots is not None
279 self.delete_dst_bookmarks: Final[bool] = args.delete_dst_snapshots == "bookmarks"
280 self.delete_dst_snapshots_no_crosscheck: Final[bool] = args.delete_dst_snapshots_no_crosscheck
281 self.delete_dst_snapshots_except: Final[bool] = args.delete_dst_snapshots_except
282 self.delete_dst_datasets: Final[bool] = args.delete_dst_datasets
283 self.delete_empty_dst_datasets: Final[bool] = args.delete_empty_dst_datasets is not None
284 self.delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots: Final[bool] = (
285 args.delete_empty_dst_datasets == "snapshots+bookmarks"
286 )
287 self.compare_snapshot_lists: Final[str] = args.compare_snapshot_lists
288 self.daemon_lifetime_nanos: Final[int] = 1_000_000 * parse_duration_to_milliseconds(args.daemon_lifetime)
289 self.daemon_frequency: Final[str] = args.daemon_frequency
290 self.enable_privilege_elevation: Final[bool] = not args.no_privilege_elevation
291 self.no_stream: Final[bool] = args.no_stream
292 self.resume_recv: Final[bool] = not args.no_resume_recv
293 self.create_bookmarks: Final[str] = args.create_bookmarks
294 self.use_bookmark: Final[bool] = not args.no_use_bookmark
296 self.src: Final[Remote] = Remote("src", args, self) # src dataset, host and ssh options
297 self.dst: Final[Remote] = Remote("dst", args, self) # dst dataset, host and ssh options
298 self.create_src_snapshots_config: Final[CreateSrcSnapshotConfig] = CreateSrcSnapshotConfig(args, self)
299 self.monitor_snapshots_config: Final[MonitorSnapshotsConfig] = MonitorSnapshotsConfig(args, self)
300 self.is_caching_snapshots: Final[bool] = args.cache_snapshots
302 self.compression_program: Final[str] = self._program_name(args.compression_program)
303 self.compression_program_opts: Final[list[str]] = self.split_args(args.compression_program_opts)
304 for opt in {"-o", "--output-file"}.intersection(self.compression_program_opts):
305 die(f"--compression-program-opts: {opt} is disallowed for security reasons.")
306 self.getconf_program: Final[str] = self._program_name("getconf") # print number of CPUs on POSIX
307 self.mbuffer_program: Final[str] = self._program_name(args.mbuffer_program)
308 self.mbuffer_program_opts: Final[list[str]] = self.split_args(args.mbuffer_program_opts)
309 for opt in {"-i", "-I", "-o", "-O", "-l", "-L", "-t", "-T", "-a", "-A"}.intersection(self.mbuffer_program_opts):
310 die(f"--mbuffer-program-opts: {opt} is disallowed for security reasons.")
311 self.ps_program: Final[str] = self._program_name(args.ps_program)
312 self.pv_program: Final[str] = self._program_name(args.pv_program)
313 self.pv_program_opts: Final[list[str]] = self.split_args(args.pv_program_opts)
314 bad_pv_opts = {"-o", "--output", "-f", "--log-file", "-S", "--stop-at-size", "-Y", "--sync", "-X", "--discard",
315 "-U", "--store-and-forward", "-d", "--watchfd", "-R", "--remote", "-P", "--pidfile"} # fmt: skip
316 for opt in bad_pv_opts.intersection(self.pv_program_opts):
317 die(f"--pv-program-opts: {opt} is disallowed for security reasons.")
318 if args.bwlimit:
319 self.pv_program_opts.extend([f"--rate-limit={self.validate_arg_str(args.bwlimit)}"])
320 self.shell_program_local: Final[str] = "sh"
321 self.shell_program: Final[str] = self._program_name(args.shell_program)
322 self.ssh_program: str = self._program_name(args.ssh_program)
323 self.sudo_program: Final[str] = self._program_name(args.sudo_program)
324 self.uname_program: Final[str] = self._program_name("uname")
325 self.zfs_program: Final[str] = self._program_name("zfs")
326 self.zpool_program: Final[str] = self._program_name(args.zpool_program)
328 # no point creating complex shell pipeline commands for tiny data transfers:
329 self.min_pipe_transfer_size: Final[int] = getenv_int("min_pipe_transfer_size", 1024 * 1024)
330 self.max_datasets_per_batch_on_list_snaps: Final[int] = getenv_int("max_datasets_per_batch_on_list_snaps", 1024)
331 self.max_datasets_per_minibatch_on_list_snaps: int = getenv_int("max_datasets_per_minibatch_on_list_snaps", -1)
332 self.max_snapshots_per_minibatch_on_delete_snaps = getenv_int("max_snapshots_per_minibatch_on_delete_snaps", 2**29)
333 self.dedicated_tcp_connection_per_zfs_send: Final[bool] = getenv_bool("dedicated_tcp_connection_per_zfs_send", True)
334 # threads: with --force-once we intentionally coerce to a single-threaded run to ensure deterministic serial behavior
335 self.threads: Final[tuple[int, bool]] = (1, False) if self.force_once else args.threads
336 timeout_duration_nanos = None if args.timeout is None else 1_000_000 * parse_duration_to_milliseconds(args.timeout)
337 self.timeout_duration_nanos: int | None = timeout_duration_nanos # duration (not a timestamp); for logging only
338 self.no_estimate_send_size: Final[bool] = args.no_estimate_send_size
339 self.remote_conf_cache_ttl_nanos: Final[int] = 1_000_000 * parse_duration_to_milliseconds(
340 args.daemon_remote_conf_cache_ttl
341 )
343 self.os_cpu_count: Final[int | None] = os.cpu_count()
344 self.os_getuid: Final[int] = os.getuid()
345 self.os_geteuid: Final[int] = os.geteuid()
346 self.prog_version: Final[str] = __version__
347 self.python_version: Final[str] = sys.version
348 self.platform_version: Final[str] = platform.version()
349 self.platform_platform: Final[str] = platform.platform()
351 # mutable variables:
352 snapshot_filters = args.snapshot_filters_var if hasattr(args, SNAPSHOT_FILTERS_VAR) else [[]]
353 self.snapshot_filters: list[list[SnapshotFilter]] = [optimize_snapshot_filters(f) for f in snapshot_filters]
354 self.exclude_dataset_property: str | None = args.exclude_dataset_property
355 self.exclude_dataset_regexes: RegexList = [] # deferred to validate_task() phase
356 self.include_dataset_regexes: RegexList = [] # deferred to validate_task() phase
357 self.tmp_exclude_dataset_regexes: RegexList = [] # deferred to validate_task() phase
358 self.tmp_include_dataset_regexes: RegexList = [] # deferred to validate_task() phase
359 self.abs_exclude_datasets: list[str] = [] # deferred to validate_task() phase
360 self.abs_include_datasets: list[str] = [] # deferred to validate_task() phase
362 self.curr_zfs_send_program_opts: list[str] = []
363 self.zfs_recv_ox_names: set[str] = set()
364 self.available_programs: dict[str, dict[str, str]] = {}
365 self.zpool_features: dict[str, dict[str, dict[str, str]]] = {r.location: {} for r in [self.src, self.dst]}
366 self.connection_pools: dict[str, ConnectionPools] = {}
368 def split_args(self, text: str, *items: str | Iterable[str], allow_all: bool = False) -> list[str]:
369 """Splits option string on runs of one or more whitespace into an option list."""
370 text = text.strip()
371 opts = self.one_or_more_whitespace_regex.split(text) if text else []
372 xappend(opts, items)
373 if not allow_all:
374 self._validate_quoting(opts)
375 return opts
377 def validate_arg(self, opt: str, allow_spaces: bool = False, allow_all: bool = False) -> str | None:
378 """allow_all permits all characters, including whitespace and quotes; See squote() and dquote()."""
379 if allow_all or opt is None:
380 return opt
381 if any(char.isspace() and (char != " " or not allow_spaces) for char in opt):
382 die(f"Option must not contain a whitespace character{' other than space' if allow_spaces else ''}: {opt}")
383 self._validate_quoting([opt])
384 return opt
386 def validate_arg_str(self, opt: str, allow_spaces: bool = False, allow_all: bool = False) -> str:
387 """Returns validated option string, raising if missing or illegal."""
388 if opt is None:
389 die("Option must not be missing")
390 self.validate_arg(opt, allow_spaces=allow_spaces, allow_all=allow_all)
391 return opt
393 @staticmethod
394 def _validate_quoting(opts: list[str]) -> None:
395 """Raises an error if any option contains a quote or shell metacharacter."""
396 for opt in opts:
397 if "'" in opt or '"' in opt or "$" in opt or "`" in opt:
398 die(f"Option must not contain a single quote or double quote or dollar or backtick character: {opt}")
400 @staticmethod
401 def _fix_recv_opts(opts: list[str], preserve_properties: frozenset[str]) -> tuple[list[str], list[str]]:
402 """Returns sanitized ``zfs recv`` options and captured ``-o/-x`` args."""
403 return _fix_send_recv_opts(
404 opts,
405 exclude_long_opts={"--dryrun"},
406 exclude_short_opts="n",
407 include_arg_opts={"-o", "-x"},
408 preserve_properties=preserve_properties,
409 )
411 @staticmethod
412 def _fix_send_opts(opts: list[str]) -> list[str]:
413 """Returns sanitized ``zfs send`` options."""
414 return _fix_send_recv_opts(
415 opts,
416 exclude_long_opts={"--dryrun"},
417 exclude_short_opts="den",
418 include_arg_opts={"-X", "--exclude", "--redact"},
419 exclude_arg_opts=frozenset({"-i", "-I", "-t", "--resume"}),
420 )[0]
422 def _program_name(self, program: str) -> str:
423 """For testing: helps simulate errors caused by external programs."""
424 self.validate_arg_str(program)
425 if not program:
426 die(f"Program name must not be missing: {program}")
427 for char in SHELL_CHARS + ":":
428 if char in program:
429 die(f"Program name must not contain a '{char}' character: {program}")
430 if self.inject_params.get("inject_unavailable_" + program, False):
431 return program + "-xxx" # substitute a program that cannot be found on the PATH
432 if self.inject_params.get("inject_failing_" + program, False):
433 return "false" # substitute a program that will error out with non-zero return code
434 return program
436 def _unset_matching_env_vars(self, args: argparse.Namespace) -> None:
437 """Unset environment variables matching regex filters."""
438 if len(args.exclude_envvar_regex) == 0 and len(args.include_envvar_regex) == 0:
439 return # fast path
440 exclude_envvar_regexes: RegexList = compile_regexes(args.exclude_envvar_regex)
441 include_envvar_regexes: RegexList = compile_regexes(args.include_envvar_regex)
442 # Mutate global state at most once, atomically. First thread wins. The latch isn't strictly necessary for
443 # correctness as all concurrent bzfs.Job instances in bzfs_jobrunner have identical include/exclude_envvar_regex
444 # anyway. It's just for reduced latency.
445 with _UNSET_ENV_VARS_LOCK:
446 if _UNSET_ENV_VARS_LATCH.get_and_set(False):
447 for envvar_name in list(os.environ):
448 # order of include vs exclude is intentionally reversed to correctly implement semantics:
449 # "unset env var iff excluded and not included (include takes precedence)."
450 if is_included(envvar_name, exclude_envvar_regexes, include_envvar_regexes):
451 os.environ.pop(envvar_name, None)
452 self.log.debug("Unsetting b/c envvar regex: %s", envvar_name)
454 def lock_file_name(self) -> str:
455 """Returns unique path used to detect concurrently running jobs.
457 Makes it such that a job that runs periodically declines to start if the same previous periodic job is still running
458 without completion yet. Hashed key avoids overly long filenames while remaining deterministic.
459 """
460 # fmt: off
461 key = (tuple(self.root_dataset_pairs), self.args.recursive, self.args.exclude_dataset_property,
462 tuple(self.args.include_dataset), tuple(self.args.exclude_dataset),
463 tuple(self.args.include_dataset_regex), tuple(self.args.exclude_dataset_regex),
464 tuple(tuple(f) for f in self.snapshot_filters), self.args.skip_replication, self.args.create_src_snapshots,
465 self.args.create_src_snapshots_plan, self.args.create_src_snapshots_timeformat,
466 self.create_src_snapshots_config.anchors,
467 self.args.delete_dst_datasets, self.args.delete_dst_snapshots, self.args.delete_dst_snapshots_except,
468 self.args.delete_empty_dst_datasets,
469 self.args.compare_snapshot_lists, self.args.monitor_snapshots,
470 self.src.basis_ssh_host, self.dst.basis_ssh_host,
471 self.src.basis_ssh_user, self.dst.basis_ssh_user)
472 # fmt: on
473 hash_code: str = sha256_hex(str(key))
474 log_parent_dir: str = os.path.dirname(self.log_params.log_dir)
475 locks_dir: str = os.path.join(log_parent_dir, ".locks")
476 os.makedirs(locks_dir, mode=DIR_PERMISSIONS, exist_ok=True)
477 validate_is_not_a_symlink("--locks-dir ", locks_dir)
478 validate_file_permissions(locks_dir, DIR_PERMISSIONS)
479 return os.path.join(locks_dir, f"{PROG_NAME}-lockfile-{hash_code}.lock")
481 def dry(self, msg: str) -> str:
482 """Prefix ``msg`` with 'Dry' when running in dry-run mode."""
483 return utils.dry(msg, self.dry_run)
485 def is_program_available(self, program: str, location: str) -> bool:
486 """Return True if ``program`` was detected on ``location`` host."""
487 return program in self.available_programs.get(location, {})
490#############################################################################
491@final
492class Remote(MiniRemote):
493 """Connection settings for either source or destination host."""
495 def __init__(self, loc: str, args: argparse.Namespace, p: Params) -> None:
496 """Reads from ArgumentParser via args."""
497 # immutable variables:
498 assert loc == "src" or loc == "dst"
499 self.location: str = loc
500 self.params: Params = p
501 self.basis_ssh_user: Final[str] = getattr(args, f"ssh_{loc}_user")
502 self.basis_ssh_host: Final[str] = getattr(args, f"ssh_{loc}_host")
503 self.ssh_port: Final[int | None] = getattr(args, f"ssh_{loc}_port")
504 self.ssh_config_file: Final[str | None] = p.validate_arg(getattr(args, f"ssh_{loc}_config_file"))
505 if self.ssh_config_file:
506 if "bzfs_ssh_config" not in os.path.basename(self.ssh_config_file):
507 die(f"Basename of --ssh-{loc}-config-file must contain substring 'bzfs_ssh_config': {self.ssh_config_file}")
508 self.ssh_config_file_hash: Final[str] = (
509 sha256_urlsafe_base64(os.path.abspath(self.ssh_config_file), padding=False) if self.ssh_config_file else ""
510 )
511 self.ssh_cipher: Final[str] = p.validate_arg_str(args.ssh_cipher)
512 # disable interactive password prompts and X11 forwarding and pseudo-terminal allocation:
513 ssh_extra_opts: list[str] = ["-oBatchMode=yes", "-oServerAliveInterval=0", "-x", "-T"] + (
514 ["-v"] if args.verbose >= 3 else []
515 )
516 self.ssh_extra_opts: tuple[str, ...] = tuple(ssh_extra_opts)
517 self.max_concurrent_ssh_sessions_per_tcp_connection: Final[int] = args.max_concurrent_ssh_sessions_per_tcp_connection
518 self.ssh_exit_on_shutdown: bool = args.ssh_exit_on_shutdown
519 self.ssh_control_persist_secs: int = args.ssh_control_persist_secs
520 self.ssh_control_persist_margin_secs: int = getenv_int("ssh_control_persist_margin_secs", 2)
521 self.socket_prefix: Final[str] = "s"
522 self.reuse_ssh_connection: bool = getenv_bool("reuse_ssh_connection", True)
523 self.ssh_socket_dir: str = ""
524 if self.reuse_ssh_connection:
525 ssh_home_dir: str = os.path.join(get_home_directory(), ".ssh")
526 os.makedirs(ssh_home_dir, mode=DIR_PERMISSIONS, exist_ok=True)
527 self.ssh_socket_dir = os.path.join(ssh_home_dir, "bzfs")
528 os.makedirs(self.ssh_socket_dir, mode=DIR_PERMISSIONS, exist_ok=True)
529 validate_file_permissions(self.ssh_socket_dir, mode=DIR_PERMISSIONS)
530 self.ssh_exit_on_shutdown_socket_dir: Final[str] = os.path.join(self.ssh_socket_dir, "x")
531 os.makedirs(self.ssh_exit_on_shutdown_socket_dir, mode=DIR_PERMISSIONS, exist_ok=True)
532 validate_file_permissions(self.ssh_exit_on_shutdown_socket_dir, mode=DIR_PERMISSIONS)
533 _delete_stale_files(self.ssh_exit_on_shutdown_socket_dir, self.socket_prefix, ssh=True)
534 self.sanitize1_regex: Final[re.Pattern[str]] = re.compile(r"[\s\\/@$]") # replace whitespace, /, $, \, @ with ~ char
535 self.sanitize2_regex: Final[re.Pattern[str]] = re.compile(rf"[^a-zA-Z0-9{re.escape('~.:_-')}]") # remove bad chars
537 # mutable variables:
538 self.root_dataset: str = "" # deferred until run_main()
539 self.basis_root_dataset: str = "" # deferred until run_main()
540 self.pool: str = ""
541 self.sudo: str = ""
542 self.use_zfs_delegation: bool = False
543 self.ssh_user: str = ""
544 self.ssh_host: str = ""
545 self.ssh_user_host: str = ""
546 self.is_nonlocal: bool = False
548 def local_ssh_command(self, socket_file: str | None) -> list[str]:
549 """Returns the ssh CLI command to run locally in order to talk to the remote host; This excludes the (trailing)
550 command to run on the remote host, which will be appended later."""
551 if not self.ssh_user_host:
552 return [] # dataset is on local host - don't use ssh
554 # dataset is on remote host
555 p: Params = self.params
556 if p.ssh_program == DISABLE_PRG:
557 die("Cannot talk to remote host because ssh CLI is disabled.")
558 ssh_cmd: list[str] = [p.ssh_program] + list(self.ssh_extra_opts)
559 if self.ssh_config_file:
560 ssh_cmd += ["-F", self.ssh_config_file]
561 if self.ssh_cipher: 561 ↛ 563line 561 didn't jump to line 563 because the condition on line 561 was always true
562 ssh_cmd += ["-c", self.ssh_cipher]
563 if self.ssh_port:
564 ssh_cmd += ["-p", str(self.ssh_port)]
565 if self.reuse_ssh_connection:
566 # Performance: reuse ssh connection for low latency startup of frequent ssh invocations via the 'ssh -S' and
567 # 'ssh -S -M -oControlPersist=60s' options. See https://en.wikibooks.org/wiki/OpenSSH/Cookbook/Multiplexing
568 if not socket_file:
569 # Generate unique private Unix domain socket file name in user's home dir and pass it to 'ssh -S /path/to/socket'
570 def sanitize(name: str) -> str:
571 name = self.sanitize1_regex.sub("~", name) # replace whitespace, /, $, \, @ with a ~ tilde char
572 name = self.sanitize2_regex.sub("", name) # Remove disallowed chars
573 return name
575 max_rand: int = 999_999_999_999
576 rand_str: str = urlsafe_base64(random.SystemRandom().randint(0, max_rand), max_value=max_rand, padding=False)
577 curr_time: str = urlsafe_base64(time.time_ns(), max_value=2**64 - 1, padding=False)
578 unique: str = f"{os.getpid()}@{curr_time}@{rand_str}"
579 optional: str = f"@{sanitize(self.ssh_host)[:45]}@{sanitize(self.ssh_user)}"
580 socket_name: str = f"{self.socket_prefix}{unique}{optional}"
581 socket_file = os.path.join(self.ssh_exit_on_shutdown_socket_dir, socket_name)
582 socket_file = socket_file[0 : max(UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH, len(socket_file) - len(optional))]
583 # `ssh` will error out later if the max OS Unix domain socket path limit cannot be met reasonably as the
584 # home directory path is too long, typically because the Unix user name is unreasonably long.
585 ssh_cmd += ["-S", socket_file]
586 ssh_cmd += [self.ssh_user_host]
587 return ssh_cmd
589 def cache_key(self) -> tuple[str, str, int | None, str | None]:
590 """Returns tuple uniquely identifying this Remote for caching."""
591 return self.location, self.ssh_user_host, self.ssh_port, self.ssh_config_file
593 def cache_namespace(self) -> str:
594 """Returns cache namespace string which is a stable, unique directory component for caches that distinguishes
595 endpoints by username+host+port+ssh_config_file where applicable, and uses '-' when no user/host is present (local
596 mode)."""
597 if not self.ssh_user_host:
598 return "-" # local mode
599 return f"{self.ssh_user_host}#{self.ssh_port or ''}#{self.ssh_config_file_hash}"
601 def is_ssh_available(self) -> bool:
602 """Return True if the ssh client program required for this remote is available on the local host."""
603 return self.params.is_program_available("ssh", "local")
605 def __repr__(self) -> str:
606 return str(self.__dict__)
609#############################################################################
610@final
611class CopyPropertiesConfig:
612 """--zfs-recv-o* and --zfs-recv-x* option groups for copying or excluding ZFS properties on receive."""
614 def __init__(self, group: str, flag: str, args: argparse.Namespace, p: Params) -> None:
615 """Reads from ArgumentParser via args."""
616 assert group in ZFS_RECV_GROUPS
617 # immutable variables:
618 grup: str = group
619 self.group: Final[str] = group # one of zfs_recv_o, zfs_recv_x
620 self.flag: Final[str] = flag # one of -o or -x
621 sources: str = p.validate_arg_str(getattr(args, f"{grup}_sources"))
622 self.sources: Final[str] = ",".join(sorted([s.strip() for s in sources.strip().split(",")])) # canonicalize
623 self.targets: Final[str] = p.validate_arg_str(getattr(args, f"{grup}_targets"))
624 include_regexes: list[str] | None = getattr(args, f"{grup}_include_regex")
625 assert ZFS_RECV_O in ZFS_RECV_GROUPS
626 if include_regexes is None:
627 include_regexes = [ZFS_RECV_O_INCLUDE_REGEX_DEFAULT] if grup == ZFS_RECV_O else []
628 self.include_regexes: Final[RegexList] = compile_regexes(include_regexes)
629 self.exclude_regexes: Final[RegexList] = compile_regexes(getattr(args, f"{grup}_exclude_regex"))
631 def __repr__(self) -> str:
632 return str(self.__dict__)
635#############################################################################
636@final
637class SnapshotLabel(NamedTuple):
638 """Contains the individual parts that are concatenated into a ZFS snapshot name."""
640 prefix: str # bzfs_
641 infix: str # us-west-1_
642 timestamp: str # 2024-11-06_08:30:05
643 suffix: str # _hourly
645 def __str__(self) -> str: # bzfs_us-west-1_2024-11-06_08:30:05_hourly
646 return f"{self.prefix}{self.infix}{self.timestamp}{self.suffix}"
648 def notimestamp_str(self) -> str: # bzfs_us-west-1_hourly
649 """Returns the concatenation of all parts except for the timestamp part."""
650 return f"{self.prefix}{self.infix}{self.suffix}"
652 def validate_label(self, input_text: str) -> None:
653 """Validates that the composed snapshot label forms a legal name."""
654 name: str = str(self)
655 validate_dataset_name(name, input_text)
656 if "/" in name:
657 die(f"Invalid ZFS snapshot name: '{name}' for: '{input_text}'")
658 for key, value in {"prefix": self.prefix, "infix": self.infix, "suffix": self.suffix}.items():
659 if key == "prefix":
660 if not value.endswith("_"):
661 die(f"Invalid {input_text}{key}: Must end with an underscore character: '{value}'")
662 if value.count("_") > 1:
663 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'")
664 elif key == "infix":
665 if value:
666 if not value.endswith("_"):
667 die(f"Invalid {input_text}{key}: Must end with an underscore character: '{value}'")
668 if value.count("_") > 1:
669 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'")
670 elif value:
671 if not value.startswith("_"):
672 die(f"Invalid {input_text}{key}: Must start with an underscore character: '{value}'")
673 if value.count("_") > 1:
674 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'")
677#############################################################################
678@final
679class CreateSrcSnapshotConfig:
680 """Option values for --create-src-snapshots, that is, for automatically creating source snapshots."""
682 def __init__(self, args: argparse.Namespace, p: Params) -> None:
683 """Option values for --create-src-snapshots*; reads from ArgumentParser via args."""
684 # immutable variables:
685 self.skip_create_src_snapshots: Final[bool] = not args.create_src_snapshots
686 self.create_src_snapshots_even_if_not_due: Final[bool] = args.create_src_snapshots_even_if_not_due
687 tz_spec: str | None = args.create_src_snapshots_timezone if args.create_src_snapshots_timezone else None
688 self.tz: Final[tzinfo | None] = get_timezone(tz_spec)
689 self.current_datetime: datetime = current_datetime(tz_spec)
690 self.timeformat: Final[str] = args.create_src_snapshots_timeformat
691 self.anchors: Final[PeriodAnchors] = PeriodAnchors.parse(args)
693 # Compute the schedule for upcoming periodic time events (suffix_durations). This event schedule is also used in
694 # daemon mode via sleep_until_next_daemon_iteration()
695 suffixes: list[str] = []
696 labels: list[SnapshotLabel] = []
697 create_src_snapshots_plan: str = args.create_src_snapshots_plan or str({"bzfs": {"onsite": {"adhoc": 1}}})
698 for org, target_periods in ast.literal_eval(create_src_snapshots_plan).items():
699 for target, periods in target_periods.items():
700 for period_unit, period_amount in periods.items(): # e.g. period_unit can be "10minutely" or "minutely"
701 if not isinstance(period_amount, int) or period_amount < 0:
702 die(f"--create-src-snapshots-plan: Period amount must be a non-negative integer: {period_amount}")
703 if period_amount > 0:
704 suffix: str = nsuffix(period_unit)
705 suffixes.append(suffix)
706 labels.append(SnapshotLabel(prefix=nprefix(org), infix=ninfix(target), timestamp="", suffix=suffix))
707 xperiods: SnapshotPeriods = p.xperiods
708 if self.skip_create_src_snapshots:
709 duration_amount, duration_unit = p.xperiods.suffix_to_duration0(p.daemon_frequency)
710 if duration_amount <= 0 or not duration_unit:
711 die(f"Invalid --daemon-frequency: {p.daemon_frequency}")
712 suffixes = [nsuffix(p.daemon_frequency)]
713 labels = []
714 suffix_durations: dict[str, tuple[int, str]] = {suffix: xperiods.suffix_to_duration1(suffix) for suffix in suffixes}
716 def suffix_key(suffix: str) -> tuple[int, str]:
717 duration_amount, duration_unit = suffix_durations[suffix]
718 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0)
719 if suffix.endswith(("hourly", "minutely", "secondly")):
720 if duration_milliseconds != 0 and 86400 * 1000 % duration_milliseconds != 0:
721 die(
722 "Invalid --create-src-snapshots-plan: Period duration should be a divisor of 86400 seconds "
723 f"without remainder so that snapshots will be created at the same time of day every day: {suffix}"
724 )
725 if suffix.endswith("monthly"):
726 if duration_amount != 0 and 12 % duration_amount != 0:
727 die(
728 "Invalid --create-src-snapshots-plan: Period duration should be a divisor of 12 months "
729 f"without remainder so that snapshots will be created at the same time every year: {suffix}"
730 )
731 return duration_milliseconds, suffix
733 suffixes = sorted(suffixes, key=suffix_key, reverse=True) # take snapshots for dailies before hourlies, and so on
734 self.suffix_durations: Final[dict[str, tuple[int, str]]] = {sfx: suffix_durations[sfx] for sfx in suffixes} # sort
735 suffix_indexes: dict[str, int] = {suffix: k for k, suffix in enumerate(suffixes)}
736 labels.sort(key=lambda label: (suffix_indexes[label.suffix], label)) # take snapshots for dailies before hourlies
737 self._snapshot_labels: Final[list[SnapshotLabel]] = labels
738 for label in self.snapshot_labels():
739 label.validate_label("--create-src-snapshots-plan ")
741 def snapshot_labels(self) -> list[SnapshotLabel]:
742 """Returns the snapshot name patterns for which snapshots shall be created."""
743 timeformat: str = self.timeformat
744 is_millis: bool = timeformat.endswith("%F") # non-standard hack to append milliseconds
745 if is_millis:
746 timeformat = timeformat[0:-1] + "f" # replace %F with %f (append microseconds)
747 timestamp: str = self.current_datetime.strftime(timeformat)
748 if is_millis:
749 timestamp = timestamp[0 : -len("000")] # replace microseconds with milliseconds
750 timestamp = timestamp.replace("+", "z") # zfs CLI does not accept the '+' character in snapshot names
751 return [SnapshotLabel(label.prefix, label.infix, timestamp, label.suffix) for label in self._snapshot_labels]
753 def __repr__(self) -> str:
754 return str(self.__dict__)
757#############################################################################
758@dataclass(frozen=True)
759@final
760class AlertConfig:
761 """Thresholds controlling when alerts fire for snapshot age."""
763 kind: Literal["Latest", "Oldest"]
764 warning_millis: int
765 critical_millis: int
768#############################################################################
769@dataclass(frozen=True)
770@final
771class MonitorSnapshotAlert:
772 """Alert configuration for a single monitored snapshot label."""
774 label: SnapshotLabel
775 latest: AlertConfig | None
776 oldest: AlertConfig | None
779#############################################################################
780@final
781class MonitorSnapshotsConfig:
782 """Option values for --monitor-snapshots*, that is, policy describing which snapshots to monitor for staleness."""
784 def __init__(self, args: argparse.Namespace, p: Params) -> None:
785 """Reads from ArgumentParser via args."""
786 # immutable variables:
787 self.monitor_snapshots: Final[dict] = ast.literal_eval(args.monitor_snapshots)
788 self.dont_warn: Final[bool] = args.monitor_snapshots_dont_warn
789 self.dont_crit: Final[bool] = args.monitor_snapshots_dont_crit
790 self.no_latest_check: Final[bool] = args.monitor_snapshots_no_latest_check
791 self.no_oldest_check: Final[bool] = args.monitor_snapshots_no_oldest_check
792 alerts: list[MonitorSnapshotAlert] = []
793 xperiods: SnapshotPeriods = p.xperiods
794 for org, target_periods in self.monitor_snapshots.items():
795 prefix: str = nprefix(org)
796 for target, periods in target_periods.items():
797 for period_unit, alert_dicts in periods.items(): # e.g. period_unit can be "10minutely" or "minutely"
798 label = SnapshotLabel(prefix=prefix, infix=ninfix(target), timestamp="", suffix=nsuffix(period_unit))
799 alert_latest, alert_oldest = None, None
800 for alert_type, alert_dict in alert_dicts.items():
801 m = "--monitor-snapshots: "
802 if alert_type not in ["latest", "oldest"]:
803 die(f"{m}'{alert_type}' must be 'latest' or 'oldest' within {args.monitor_snapshots}")
804 warning_millis: int = 0
805 critical_millis: int = 0
806 cycles: int = 1
807 for kind, value in alert_dict.items():
808 context: str = args.monitor_snapshots
809 if kind == "warning":
810 warning_millis = max(0, parse_duration_to_milliseconds(str(value), context=context))
811 elif kind == "critical":
812 critical_millis = max(0, parse_duration_to_milliseconds(str(value), context=context))
813 elif kind == "cycles":
814 cycles = max(0, int(value))
815 else:
816 die(f"{m}'{kind}' must be 'warning', 'critical' or 'cycles' within {context}")
817 if warning_millis > 0 or critical_millis > 0:
818 duration_amount, duration_unit = xperiods.suffix_to_duration1(label.suffix)
819 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0)
820 warning_millis += 0 if warning_millis <= 0 else cycles * duration_milliseconds
821 critical_millis += 0 if critical_millis <= 0 else cycles * duration_milliseconds
822 warning_millis = UNIX_TIME_INFINITY_SECS if warning_millis <= 0 else warning_millis
823 critical_millis = UNIX_TIME_INFINITY_SECS if critical_millis <= 0 else critical_millis
824 capitalized_alert_type = cast(Literal["Latest", "Oldest"], sys.intern(alert_type.capitalize()))
825 alert_config = AlertConfig(capitalized_alert_type, warning_millis, critical_millis)
826 if alert_type == "latest":
827 if not self.no_latest_check:
828 alert_latest = alert_config
829 else:
830 assert alert_type == "oldest"
831 if not self.no_oldest_check:
832 alert_oldest = alert_config
833 if alert_latest is not None or alert_oldest is not None:
834 alerts.append(MonitorSnapshotAlert(label, alert_latest, alert_oldest))
836 def alert_sort_key(alert: MonitorSnapshotAlert) -> tuple[int, SnapshotLabel]:
837 duration_amount, duration_unit = xperiods.suffix_to_duration1(alert.label.suffix)
838 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0)
839 return duration_milliseconds, alert.label
841 alerts.sort(key=alert_sort_key, reverse=True) # check snapshots for dailies before hourlies, and so on
842 self.alerts: Final[list[MonitorSnapshotAlert]] = alerts
843 self.enable_monitor_snapshots: Final[bool] = len(alerts) > 0
845 def __repr__(self) -> str:
846 return str(self.__dict__)
849#############################################################################
850def _fix_send_recv_opts(
851 opts: list[str],
852 exclude_long_opts: set[str],
853 exclude_short_opts: str,
854 include_arg_opts: set[str],
855 exclude_arg_opts: frozenset[str] = frozenset(),
856 preserve_properties: frozenset[str] = frozenset(),
857) -> tuple[list[str], list[str]]:
858 """These opts are instead managed via bzfs CLI args --dryrun, etc."""
859 assert "-" not in exclude_short_opts
860 results: list[str] = []
861 x_names: set[str] = set(preserve_properties)
862 i = 0
863 n = len(opts)
864 while i < n:
865 opt: str = opts[i]
866 i += 1
867 if opt in exclude_arg_opts: # example: {"-X", "--exclude"}
868 i += 1
869 continue
870 elif opt in include_arg_opts: # example: {"-o", "-x"}
871 results.append(opt)
872 if i < n:
873 if opt == "-o" and "=" in opts[i] and opts[i].split("=", 1)[0] in preserve_properties:
874 die(f"--preserve-properties: Disallowed ZFS property found in --zfs-recv-program-opt(s): -o {opts[i]}")
875 if opt == "-x":
876 x_names.discard(opts[i])
877 results.append(opts[i])
878 i += 1
879 elif opt not in exclude_long_opts: # example: {"--dryrun", "--verbose"}
880 if opt.startswith("-") and opt != "-" and not opt.startswith("--"):
881 for char in exclude_short_opts: # example: "den"
882 opt = opt.replace(char, "")
883 if opt == "-":
884 continue
885 results.append(opt)
886 return results, sorted(x_names)
889_SSH_MASTER_DOMAIN_SOCKET_FILE_PID_REGEX: Final[re.Pattern[str]] = re.compile(r"^[0-9]+") # see local_ssh_command()
892def _delete_stale_files(
893 root_dir: str,
894 prefix: str,
895 millis: int = 60 * 60 * 1000,
896 dirs: bool = False,
897 exclude: str | None = None,
898 ssh: bool = False,
899) -> None:
900 """Cleans up obsolete files; For example caused by abnormal termination, OS crash."""
901 seconds: float = millis / 1000
902 now: float = time.time()
903 validate_is_not_a_symlink("", root_dir)
904 with os.scandir(root_dir) as iterator:
905 for entry in iterator:
906 if entry.name == exclude or not entry.name.startswith(prefix):
907 continue
908 try:
909 stats = entry.stat(follow_symlinks=False)
910 is_dir = entry.is_dir(follow_symlinks=False)
911 if ((dirs and is_dir) or (not dirs and not is_dir)) and now - stats.st_mtime >= seconds:
912 if dirs:
913 shutil.rmtree(entry.path, ignore_errors=True)
914 elif not (ssh and stat.S_ISSOCK(stats.st_mode)):
915 os.remove(entry.path)
916 elif match := _SSH_MASTER_DOMAIN_SOCKET_FILE_PID_REGEX.match(entry.name[len(prefix) :]): 916 ↛ 905line 916 didn't jump to line 905 because the condition on line 916 was always true
917 pid: int = int(match.group(0))
918 if pid_exists(pid) is False or now - stats.st_mtime >= 31 * 24 * 60 * 60:
919 os.remove(entry.path) # bzfs process is no longer alive; its ssh master process isn't either
920 except FileNotFoundError:
921 pass # harmless
924def _create_symlink(src: str, dst_dir: str, dst: str) -> None:
925 """Creates dst symlink pointing to src using a relative path."""
926 rel_path: str = os.path.relpath(src, start=dst_dir)
927 os.symlink(src=rel_path, dst=os.path.join(dst_dir, dst))