Coverage for bzfs_main/configuration.py: 99%

587 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-07 04:44 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Configuration subsystem; All CLI option/parameter values are reachable from the "Params" class.""" 

16 

17from __future__ import ( 

18 annotations, 

19) 

20import argparse 

21import ast 

22import os 

23import platform 

24import random 

25import re 

26import shutil 

27import stat 

28import sys 

29import tempfile 

30import threading 

31import time 

32from collections.abc import ( 

33 Iterable, 

34) 

35from dataclasses import ( 

36 dataclass, 

37) 

38from datetime import ( 

39 datetime, 

40 tzinfo, 

41) 

42from logging import ( 

43 Logger, 

44) 

45from typing import ( 

46 TYPE_CHECKING, 

47 Final, 

48 Literal, 

49 NamedTuple, 

50 cast, 

51) 

52 

53import bzfs_main.utils 

54from bzfs_main.argparse_actions import ( 

55 SnapshotFilter, 

56 optimize_snapshot_filters, 

57) 

58from bzfs_main.argparse_cli import ( 

59 LOG_DIR_DEFAULT, 

60 ZFS_RECV_GROUPS, 

61 ZFS_RECV_O, 

62 ZFS_RECV_O_INCLUDE_REGEX_DEFAULT, 

63 __version__, 

64) 

65from bzfs_main.detect import ( 

66 DISABLE_PRG, 

67) 

68from bzfs_main.period_anchors import ( 

69 PeriodAnchors, 

70) 

71from bzfs_main.retry import ( 

72 RetryPolicy, 

73) 

74from bzfs_main.utils import ( 

75 DIR_PERMISSIONS, 

76 FILE_PERMISSIONS, 

77 PROG_NAME, 

78 SHELL_CHARS, 

79 SNAPSHOT_FILTERS_VAR, 

80 UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH, 

81 UNIX_TIME_INFINITY_SECS, 

82 RegexList, 

83 SnapshotPeriods, 

84 SynchronizedBool, 

85 append_if_absent, 

86 compile_regexes, 

87 current_datetime, 

88 die, 

89 get_home_directory, 

90 get_timezone, 

91 getenv_bool, 

92 getenv_int, 

93 is_included, 

94 ninfix, 

95 nprefix, 

96 nsuffix, 

97 parse_duration_to_milliseconds, 

98 pid_exists, 

99 sha256_hex, 

100 sha256_urlsafe_base64, 

101 urlsafe_base64, 

102 validate_dataset_name, 

103 validate_file_permissions, 

104 validate_is_not_a_symlink, 

105 validate_property_name, 

106 xappend, 

107) 

108 

109if TYPE_CHECKING: # pragma: no cover - for type hints only 

110 from bzfs_main.connection import ( 

111 ConnectionPools, 

112 ) 

113 

114# constants: 

115HOME_DIRECTORY: Final[str] = get_home_directory() 

116_UNSET_ENV_VARS_LOCK: Final[threading.Lock] = threading.Lock() 

117_UNSET_ENV_VARS_LATCH: Final[SynchronizedBool] = SynchronizedBool(True) 

118 

119 

120############################################################################# 

121class LogParams: 

122 """Option values for logging.""" 

123 

124 def __init__(self, args: argparse.Namespace) -> None: 

125 """Reads from ArgumentParser via args.""" 

126 # immutable variables: 

127 if args.quiet: 

128 log_level: str = "ERROR" 

129 elif args.verbose >= 2: 

130 log_level = "TRACE" 

131 elif args.verbose >= 1: 

132 log_level = "DEBUG" 

133 else: 

134 log_level = "INFO" 

135 self.log_level: Final[str] = log_level 

136 self.timestamp: Final[str] = datetime.now().isoformat(sep="_", timespec="seconds") # 2024-09-03_12:26:15 

137 self.home_dir: Final[str] = HOME_DIRECTORY 

138 log_parent_dir: Final[str] = args.log_dir if args.log_dir else os.path.join(self.home_dir, LOG_DIR_DEFAULT) 

139 if LOG_DIR_DEFAULT not in os.path.basename(log_parent_dir): 

140 die(f"Basename of --log-dir must contain the substring '{LOG_DIR_DEFAULT}', but got: {log_parent_dir}") 

141 sep: str = "_" if args.log_subdir == "daily" else ":" 

142 timestamp: str = self.timestamp 

143 subdir: str = timestamp[0 : timestamp.rindex(sep) if args.log_subdir == "minutely" else timestamp.index(sep)] 

144 # 2024-09-03 (d), 2024-09-03_12 (h), 2024-09-03_12:26 (m) 

145 self.log_dir: Final[str] = os.path.join(log_parent_dir, subdir) 

146 os.makedirs(log_parent_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

147 validate_is_not_a_symlink("--log-dir ", log_parent_dir) 

148 validate_file_permissions(log_parent_dir, DIR_PERMISSIONS) 

149 os.makedirs(self.log_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

150 validate_is_not_a_symlink("--log-dir subdir ", self.log_dir) 

151 validate_file_permissions(self.log_dir, DIR_PERMISSIONS) 

152 self.log_file_prefix: Final[str] = args.log_file_prefix 

153 self.log_file_infix: Final[str] = args.log_file_infix 

154 self.log_file_suffix: Final[str] = args.log_file_suffix 

155 fd, self.log_file = tempfile.mkstemp( 

156 suffix=".log", 

157 prefix=f"{self.log_file_prefix}{self.timestamp}{self.log_file_infix}{self.log_file_suffix}-", 

158 dir=self.log_dir, 

159 ) 

160 os.fchmod(fd, FILE_PERMISSIONS) 

161 os.close(fd) 

162 self.pv_log_file: str = self.log_file[0 : -len(".log")] + ".pv" 

163 log_file_stem: str = os.path.basename(self.log_file)[0 : -len(".log")] 

164 # Python's standard logger naming API interprets chars such as '.', '-', ':', spaces, etc in special ways, e.g. 

165 # logging.getLogger("foo.bar") vs logging.getLogger("foo-bar"). Thus, we sanitize the Python logger name via a regex: 

166 self.logger_name_suffix: Final[str] = re.sub(r"[^A-Za-z0-9_]", repl="_", string=log_file_stem) 

167 cache_root_dir: str = os.path.join(log_parent_dir, ".cache") 

168 os.makedirs(cache_root_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

169 validate_file_permissions(cache_root_dir, DIR_PERMISSIONS) 

170 self.last_modified_cache_dir: Final[str] = os.path.join(cache_root_dir, "mods") 

171 

172 # Create/update "current" symlink to current_dir, which is a subdir containing further symlinks to log files. 

173 # For parallel usage, ensures there is no time window when the symlinks are inconsistent or do not exist. 

174 current: str = "current" 

175 dot_current_dir: str = os.path.join(log_parent_dir, f".{current}") 

176 current_dir: str = os.path.join(dot_current_dir, log_file_stem) 

177 os.makedirs(dot_current_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

178 validate_is_not_a_symlink("--log-dir: .current ", dot_current_dir) 

179 try: 

180 os.makedirs(current_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

181 _create_symlink(self.log_file, current_dir, f"{current}.log") 

182 _create_symlink(self.pv_log_file, current_dir, f"{current}.pv") 

183 _create_symlink(self.log_dir, current_dir, f"{current}.dir") 

184 dst_file: str = os.path.join(current_dir, current) 

185 os.symlink(os.path.relpath(current_dir, start=log_parent_dir), dst_file) 

186 os.replace(dst_file, os.path.join(log_parent_dir, current)) # atomic rename 

187 _delete_stale_files(dot_current_dir, prefix="", millis=5000, dirs=True, exclude=os.path.basename(current_dir)) 

188 except FileNotFoundError: 

189 pass # harmless concurrent cleanup 

190 self.params: Params | None = None 

191 

192 def __repr__(self) -> str: 

193 return str(self.__dict__) 

194 

195 

196############################################################################# 

197class Params: 

198 """All parsed CLI options combined into a single bundle; simplifies passing around numerous settings and defaults.""" 

199 

200 def __init__( 

201 self, 

202 args: argparse.Namespace, 

203 sys_argv: list[str], 

204 log_params: LogParams, 

205 log: Logger, 

206 inject_params: dict[str, bool] | None = None, 

207 ) -> None: 

208 """Reads from ArgumentParser via args.""" 

209 # immutable variables: 

210 assert args is not None 

211 assert isinstance(sys_argv, list) 

212 assert log_params is not None 

213 assert log is not None 

214 self.args: Final[argparse.Namespace] = args 

215 self.sys_argv: Final[list[str]] = sys_argv 

216 self.log_params: Final[LogParams] = log_params 

217 self.log: Final[Logger] = log 

218 self.inject_params: Final[dict[str, bool]] = inject_params if inject_params is not None else {} # for testing only 

219 self.one_or_more_whitespace_regex: Final[re.Pattern[str]] = re.compile(r"\s+") 

220 self.two_or_more_spaces_regex: Final[re.Pattern[str]] = re.compile(r" +") 

221 self._unset_matching_env_vars(args) 

222 self.xperiods: Final[SnapshotPeriods] = SnapshotPeriods() 

223 

224 assert len(args.root_dataset_pairs) > 0 

225 self.root_dataset_pairs: Final[list[tuple[str, str]]] = args.root_dataset_pairs 

226 self.recursive: Final[bool] = args.recursive 

227 self.recursive_flag: Final[str] = "-r" if args.recursive else "" 

228 

229 self.dry_run: Final[bool] = args.dryrun is not None 

230 self.dry_run_recv: Final[str] = "-n" if self.dry_run else "" 

231 self.dry_run_destroy: Final[str] = self.dry_run_recv 

232 self.dry_run_no_send: Final[bool] = args.dryrun == "send" 

233 self.verbose_zfs: Final[bool] = args.verbose >= 2 

234 self.verbose_destroy: Final[str] = "" if args.quiet else "-v" 

235 self.quiet: Final[bool] = args.quiet 

236 

237 self.zfs_send_program_opts: list[str] = self._fix_send_opts(self.split_args(args.zfs_send_program_opts)) 

238 zfs_recv_program_opts: list[str] = self.split_args(args.zfs_recv_program_opts) 

239 for extra_opt in args.zfs_recv_program_opt: 

240 zfs_recv_program_opts.append(self.validate_arg_str(extra_opt, allow_all=True)) 

241 preserve_properties = [validate_property_name(name, "--preserve-properties") for name in args.preserve_properties] 

242 zfs_recv_program_opts, zfs_recv_x_names = self._fix_recv_opts(zfs_recv_program_opts, frozenset(preserve_properties)) 

243 self.zfs_recv_program_opts: Final[list[str]] = zfs_recv_program_opts 

244 self.zfs_recv_x_names: list[str] = zfs_recv_x_names 

245 if self.verbose_zfs: 

246 append_if_absent(self.zfs_send_program_opts, "-v") 

247 append_if_absent(self.zfs_recv_program_opts, "-v") 

248 # zfs_full_recv_opts: dataset-specific dynamic -o/-x property options are computed later per dataset in 

249 # replication._add_recv_property_options(): 

250 self.zfs_full_recv_opts: Final[list[str]] = self.zfs_recv_program_opts.copy() 

251 cpconfigs = [CopyPropertiesConfig(group, flag, args, self) for group, flag in ZFS_RECV_GROUPS.items()] 

252 self.zfs_recv_o_config, self.zfs_recv_x_config, self.zfs_set_config = cpconfigs 

253 

254 self.force_rollback_to_latest_snapshot: Final[bool] = args.force_rollback_to_latest_snapshot 

255 self.force_rollback_to_latest_common_snapshot = SynchronizedBool(args.force_rollback_to_latest_common_snapshot) 

256 self.force: Final[SynchronizedBool] = SynchronizedBool(args.force) 

257 self.force_once: Final[bool] = args.force_once 

258 self.force_unmount: Final[str] = "-f" if args.force_unmount else "" 

259 force_hard: str = "-R" if args.force_destroy_dependents else "" 

260 self.force_hard: Final[str] = "-R" if args.force_hard else force_hard # --force-hard is deprecated 

261 

262 self.skip_parent: bool = args.skip_parent 

263 self.skip_missing_snapshots: Final[str] = args.skip_missing_snapshots 

264 self.skip_on_error: Final[str] = args.skip_on_error 

265 self.retry_policy: Final[RetryPolicy] = RetryPolicy(args) 

266 self.skip_replication: Final[bool] = args.skip_replication 

267 self.delete_dst_snapshots: Final[bool] = args.delete_dst_snapshots is not None 

268 self.delete_dst_bookmarks: Final[bool] = args.delete_dst_snapshots == "bookmarks" 

269 self.delete_dst_snapshots_no_crosscheck: Final[bool] = args.delete_dst_snapshots_no_crosscheck 

270 self.delete_dst_snapshots_except: Final[bool] = args.delete_dst_snapshots_except 

271 self.delete_dst_datasets: Final[bool] = args.delete_dst_datasets 

272 self.delete_empty_dst_datasets: Final[bool] = args.delete_empty_dst_datasets is not None 

273 self.delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots: bool = ( 

274 args.delete_empty_dst_datasets == "snapshots+bookmarks" 

275 ) 

276 self.compare_snapshot_lists: str = args.compare_snapshot_lists 

277 self.daemon_lifetime_nanos: Final[int] = 1_000_000 * parse_duration_to_milliseconds(args.daemon_lifetime) 

278 self.daemon_frequency: str = args.daemon_frequency 

279 self.enable_privilege_elevation: bool = not args.no_privilege_elevation 

280 self.no_stream: Final[bool] = args.no_stream 

281 self.resume_recv: Final[bool] = not args.no_resume_recv 

282 self.create_bookmarks: Final[str] = ( 

283 "none" if args.no_create_bookmark else args.create_bookmarks 

284 ) # no_create_bookmark depr 

285 self.use_bookmark: Final[bool] = not args.no_use_bookmark 

286 

287 self.src: Remote = Remote("src", args, self) # src dataset, host and ssh options 

288 self.dst: Remote = Remote("dst", args, self) # dst dataset, host and ssh options 

289 self.create_src_snapshots_config: Final[CreateSrcSnapshotConfig] = CreateSrcSnapshotConfig(args, self) 

290 self.monitor_snapshots_config: Final[MonitorSnapshotsConfig] = MonitorSnapshotsConfig(args, self) 

291 self.is_caching_snapshots: Final[bool] = args.cache_snapshots == "true" 

292 

293 self.compression_program: Final[str] = self._program_name(args.compression_program) 

294 self.compression_program_opts: Final[list[str]] = self.split_args(args.compression_program_opts) 

295 for opt in {"-o", "--output-file"}.intersection(self.compression_program_opts): 

296 die(f"--compression-program-opts: {opt} is disallowed for security reasons.") 

297 self.getconf_program: Final[str] = self._program_name("getconf") # print number of CPUs on POSIX 

298 self.mbuffer_program: Final[str] = self._program_name(args.mbuffer_program) 

299 self.mbuffer_program_opts: Final[list[str]] = self.split_args(args.mbuffer_program_opts) 

300 for opt in {"-i", "-I", "-o", "-O", "-l", "-L", "-t", "-T", "-a", "-A"}.intersection(self.mbuffer_program_opts): 

301 die(f"--mbuffer-program-opts: {opt} is disallowed for security reasons.") 

302 self.ps_program: Final[str] = self._program_name(args.ps_program) 

303 self.pv_program: Final[str] = self._program_name(args.pv_program) 

304 self.pv_program_opts: Final[list[str]] = self.split_args(args.pv_program_opts) 

305 bad_pv_opts = {"-o", "--output", "-f", "--log-file", "-S", "--stop-at-size", "-Y", "--sync", "-X", "--discard", 

306 "-U", "--store-and-forward", "-d", "--watchfd", "-R", "--remote", "-P", "--pidfile"} # fmt: skip 

307 for opt in bad_pv_opts.intersection(self.pv_program_opts): 

308 die(f"--pv-program-opts: {opt} is disallowed for security reasons.") 

309 self.isatty: Final[bool] = getenv_bool("isatty", True) 

310 if args.bwlimit: 

311 self.pv_program_opts.extend([f"--rate-limit={self.validate_arg_str(args.bwlimit)}"]) 

312 self.shell_program_local: Final[str] = "sh" 

313 self.shell_program: str = self._program_name(args.shell_program) 

314 self.ssh_program: Final[str] = self._program_name(args.ssh_program) 

315 self.sudo_program: str = self._program_name(args.sudo_program) 

316 self.uname_program: Final[str] = self._program_name("uname") 

317 self.zfs_program: Final[str] = self._program_name("zfs") 

318 self.zpool_program: Final[str] = self._program_name(args.zpool_program) 

319 

320 # no point creating complex shell pipeline commands for tiny data transfers: 

321 self.min_pipe_transfer_size: Final[int] = getenv_int("min_pipe_transfer_size", 1024 * 1024) 

322 self.max_datasets_per_batch_on_list_snaps: Final[int] = getenv_int("max_datasets_per_batch_on_list_snaps", 1024) 

323 self.max_datasets_per_minibatch_on_list_snaps: int = getenv_int("max_datasets_per_minibatch_on_list_snaps", -1) 

324 self.max_snapshots_per_minibatch_on_delete_snaps = getenv_int("max_snapshots_per_minibatch_on_delete_snaps", 2**29) 

325 self.dedicated_tcp_connection_per_zfs_send: Final[bool] = getenv_bool("dedicated_tcp_connection_per_zfs_send", True) 

326 # threads: with --force-once we intentionally coerce to a single-threaded run to ensure deterministic serial behavior 

327 self.threads: Final[tuple[int, bool]] = (1, False) if self.force_once else args.threads 

328 timeout_nanos = None if args.timeout is None else 1_000_000 * parse_duration_to_milliseconds(args.timeout) 

329 self.timeout_nanos: int | None = timeout_nanos 

330 self.no_estimate_send_size: Final[bool] = args.no_estimate_send_size 

331 self.remote_conf_cache_ttl_nanos: int = 1_000_000 * parse_duration_to_milliseconds(args.daemon_remote_conf_cache_ttl) 

332 self.terminal_columns: Final[int] = ( 

333 getenv_int("terminal_columns", shutil.get_terminal_size(fallback=(120, 24)).columns) 

334 if self.isatty and self.pv_program != DISABLE_PRG and not self.quiet 

335 else 0 

336 ) 

337 

338 self.os_cpu_count: Final[int | None] = os.cpu_count() 

339 self.os_getuid: Final[int] = os.getuid() 

340 self.os_geteuid: Final[int] = os.geteuid() 

341 self.prog_version: Final[str] = __version__ 

342 self.python_version: Final[str] = sys.version 

343 self.platform_version: Final[str] = platform.version() 

344 self.platform_platform: Final[str] = platform.platform() 

345 

346 # mutable variables: 

347 snapshot_filters = args.snapshot_filters_var if hasattr(args, SNAPSHOT_FILTERS_VAR) else [[]] 

348 self.snapshot_filters: list[list[SnapshotFilter]] = [optimize_snapshot_filters(f) for f in snapshot_filters] 

349 self.exclude_dataset_property: str | None = args.exclude_dataset_property 

350 self.exclude_dataset_regexes: RegexList = [] # deferred to validate_task() phase 

351 self.include_dataset_regexes: RegexList = [] # deferred to validate_task() phase 

352 self.tmp_exclude_dataset_regexes: RegexList = [] # deferred to validate_task() phase 

353 self.tmp_include_dataset_regexes: RegexList = [] # deferred to validate_task() phase 

354 self.abs_exclude_datasets: list[str] = [] # deferred to validate_task() phase 

355 self.abs_include_datasets: list[str] = [] # deferred to validate_task() phase 

356 

357 self.curr_zfs_send_program_opts: list[str] = [] 

358 self.zfs_recv_ox_names: set[str] = set() 

359 self.available_programs: dict[str, dict[str, str]] = {} 

360 self.zpool_features: dict[str, dict[str, dict[str, str]]] = {r.location: {} for r in [self.src, self.dst]} 

361 self.connection_pools: dict[str, ConnectionPools] = {} 

362 

363 def split_args(self, text: str, *items: str | Iterable[str], allow_all: bool = False) -> list[str]: 

364 """Splits option string on runs of one or more whitespace into an option list.""" 

365 text = text.strip() 

366 opts = self.one_or_more_whitespace_regex.split(text) if text else [] 

367 xappend(opts, items) 

368 if not allow_all: 

369 self._validate_quoting(opts) 

370 return opts 

371 

372 def validate_arg(self, opt: str, allow_spaces: bool = False, allow_all: bool = False) -> str | None: 

373 """allow_all permits all characters, including whitespace and quotes; See squote() and dquote().""" 

374 if allow_all or opt is None: 

375 return opt 

376 if any(char.isspace() and (char != " " or not allow_spaces) for char in opt): 

377 die(f"Option must not contain a whitespace character{' other than space' if allow_spaces else ''}: {opt}") 

378 self._validate_quoting([opt]) 

379 return opt 

380 

381 def validate_arg_str(self, opt: str, allow_spaces: bool = False, allow_all: bool = False) -> str: 

382 """Returns validated option string, raising if missing or illegal.""" 

383 if opt is None: 

384 die("Option must not be missing") 

385 self.validate_arg(opt, allow_spaces=allow_spaces, allow_all=allow_all) 

386 return opt 

387 

388 @staticmethod 

389 def _validate_quoting(opts: list[str]) -> None: 

390 """Raises an error if any option contains a quote or shell metacharacter.""" 

391 for opt in opts: 

392 if "'" in opt or '"' in opt or "$" in opt or "`" in opt: 

393 die(f"Option must not contain a single quote or double quote or dollar or backtick character: {opt}") 

394 

395 @staticmethod 

396 def _fix_recv_opts(opts: list[str], preserve_properties: frozenset[str]) -> tuple[list[str], list[str]]: 

397 """Returns sanitized ``zfs recv`` options and captured ``-o/-x`` args.""" 

398 return _fix_send_recv_opts( 

399 opts, 

400 exclude_long_opts={"--dryrun"}, 

401 exclude_short_opts="n", 

402 include_arg_opts={"-o", "-x"}, 

403 preserve_properties=preserve_properties, 

404 ) 

405 

406 @staticmethod 

407 def _fix_send_opts(opts: list[str]) -> list[str]: 

408 """Returns sanitized ``zfs send`` options.""" 

409 return _fix_send_recv_opts( 

410 opts, 

411 exclude_long_opts={"--dryrun"}, 

412 exclude_short_opts="den", 

413 include_arg_opts={"-X", "--exclude", "--redact"}, 

414 exclude_arg_opts=frozenset({"-i", "-I"}), 

415 )[0] 

416 

417 def _program_name(self, program: str) -> str: 

418 """For testing: helps simulate errors caused by external programs.""" 

419 self.validate_arg_str(program) 

420 if not program: 

421 die(f"Program name must not be missing: {program}") 

422 for char in SHELL_CHARS + ":": 

423 if char in program: 

424 die(f"Program name must not contain a '{char}' character: {program}") 

425 if self.inject_params.get("inject_unavailable_" + program, False): 

426 return program + "-xxx" # substitute a program that cannot be found on the PATH 

427 if self.inject_params.get("inject_failing_" + program, False): 

428 return "false" # substitute a program that will error out with non-zero return code 

429 return program 

430 

431 def _unset_matching_env_vars(self, args: argparse.Namespace) -> None: 

432 """Unset environment variables matching regex filters.""" 

433 if len(args.exclude_envvar_regex) == 0 and len(args.include_envvar_regex) == 0: 

434 return # fast path 

435 exclude_envvar_regexes: RegexList = compile_regexes(args.exclude_envvar_regex) 

436 include_envvar_regexes: RegexList = compile_regexes(args.include_envvar_regex) 

437 # Mutate global state at most once, atomically. First thread wins. The latch isn't strictly necessary for 

438 # correctness as all concurrent bzfs.Job instances in bzfs_jobrunner have identical include/exclude_envvar_regex 

439 # anyway. It's just for reduced latency. 

440 with _UNSET_ENV_VARS_LOCK: 

441 if _UNSET_ENV_VARS_LATCH.get_and_set(False): 

442 for envvar_name in list(os.environ): 

443 # order of include vs exclude is intentionally reversed to correctly implement semantics: 

444 # "unset env var iff excluded and not included (include takes precedence)." 

445 if is_included(envvar_name, exclude_envvar_regexes, include_envvar_regexes): 

446 os.environ.pop(envvar_name, None) 

447 self.log.debug("Unsetting b/c envvar regex: %s", envvar_name) 

448 

449 def lock_file_name(self) -> str: 

450 """Returns unique path used to detect concurrently running jobs. 

451 

452 Makes it such that a job that runs periodically declines to start if the same previous periodic job is still running 

453 without completion yet. Hashed key avoids overly long filenames while remaining deterministic. 

454 """ 

455 # fmt: off 

456 key = (tuple(self.root_dataset_pairs), self.args.recursive, self.args.exclude_dataset_property, 

457 tuple(self.args.include_dataset), tuple(self.args.exclude_dataset), 

458 tuple(self.args.include_dataset_regex), tuple(self.args.exclude_dataset_regex), 

459 tuple(tuple(f) for f in self.snapshot_filters), self.args.skip_replication, self.args.create_src_snapshots, 

460 self.args.create_src_snapshots_plan, self.args.create_src_snapshots_timeformat, 

461 self.create_src_snapshots_config.anchors, 

462 self.args.delete_dst_datasets, self.args.delete_dst_snapshots, self.args.delete_dst_snapshots_except, 

463 self.args.delete_empty_dst_datasets, 

464 self.args.compare_snapshot_lists, self.args.monitor_snapshots, 

465 self.src.basis_ssh_host, self.dst.basis_ssh_host, 

466 self.src.basis_ssh_user, self.dst.basis_ssh_user) 

467 # fmt: on 

468 hash_code: str = sha256_hex(str(key)) 

469 log_parent_dir: str = os.path.dirname(self.log_params.log_dir) 

470 locks_dir: str = os.path.join(log_parent_dir, ".locks") 

471 os.makedirs(locks_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

472 validate_is_not_a_symlink("--locks-dir ", locks_dir) 

473 validate_file_permissions(locks_dir, DIR_PERMISSIONS) 

474 return os.path.join(locks_dir, f"{PROG_NAME}-lockfile-{hash_code}.lock") 

475 

476 def dry(self, msg: str) -> str: 

477 """Prefix ``msg`` with 'Dry' when running in dry-run mode.""" 

478 return bzfs_main.utils.dry(msg, self.dry_run) 

479 

480 def is_program_available(self, program: str, location: str) -> bool: 

481 """Return True if ``program`` was detected on ``location`` host.""" 

482 return program in self.available_programs.get(location, {}) 

483 

484 

485############################################################################# 

486class Remote: 

487 """Connection settings for either source or destination host.""" 

488 

489 def __init__(self, loc: str, args: argparse.Namespace, p: Params) -> None: 

490 """Reads from ArgumentParser via args.""" 

491 # immutable variables: 

492 assert loc == "src" or loc == "dst" 

493 self.location: Final[str] = loc 

494 self.params: Final[Params] = p 

495 self.basis_ssh_user: str = getattr(args, f"ssh_{loc}_user") 

496 self.basis_ssh_host: str = getattr(args, f"ssh_{loc}_host") 

497 self.ssh_port: int | None = getattr(args, f"ssh_{loc}_port") 

498 self.ssh_config_file: Final[str | None] = p.validate_arg(getattr(args, f"ssh_{loc}_config_file")) 

499 if self.ssh_config_file: 

500 if "bzfs_ssh_config" not in os.path.basename(self.ssh_config_file): 

501 die(f"Basename of --ssh-{loc}-config-file must contain substring 'bzfs_ssh_config': {self.ssh_config_file}") 

502 self.ssh_config_file_hash: str = ( 

503 sha256_urlsafe_base64(os.path.abspath(self.ssh_config_file), padding=False) if self.ssh_config_file else "" 

504 ) 

505 self.ssh_cipher: Final[str] = p.validate_arg_str(args.ssh_cipher) 

506 # disable interactive password prompts and X11 forwarding and pseudo-terminal allocation: 

507 self.ssh_extra_opts: list[str] = ["-oBatchMode=yes", "-oServerAliveInterval=0", "-x", "-T"] + ( 

508 ["-v"] if args.verbose >= 3 else [] 

509 ) 

510 self.max_concurrent_ssh_sessions_per_tcp_connection: Final[int] = args.max_concurrent_ssh_sessions_per_tcp_connection 

511 self.ssh_exit_on_shutdown: Final[bool] = args.ssh_exit_on_shutdown 

512 self.ssh_control_persist_secs: Final[int] = args.ssh_control_persist_secs 

513 self.socket_prefix: Final[str] = "s" 

514 self.reuse_ssh_connection: bool = getenv_bool("reuse_ssh_connection", True) 

515 if self.reuse_ssh_connection: 

516 ssh_home_dir: str = os.path.join(HOME_DIRECTORY, ".ssh") 

517 os.makedirs(ssh_home_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

518 self.ssh_socket_dir: Final[str] = os.path.join(ssh_home_dir, "bzfs") 

519 os.makedirs(self.ssh_socket_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

520 validate_file_permissions(self.ssh_socket_dir, mode=DIR_PERMISSIONS) 

521 self.ssh_exit_on_shutdown_socket_dir: Final[str] = os.path.join(self.ssh_socket_dir, "x") 

522 os.makedirs(self.ssh_exit_on_shutdown_socket_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

523 validate_file_permissions(self.ssh_exit_on_shutdown_socket_dir, mode=DIR_PERMISSIONS) 

524 _delete_stale_files(self.ssh_exit_on_shutdown_socket_dir, self.socket_prefix, ssh=True) 

525 self.sanitize1_regex: Final[re.Pattern[str]] = re.compile(r"[\s\\/@$]") # replace whitespace, /, $, \, @ with ~ char 

526 self.sanitize2_regex: Final[re.Pattern[str]] = re.compile(rf"[^a-zA-Z0-9{re.escape('~.:_-')}]") # remove bad chars 

527 

528 # mutable variables: 

529 self.root_dataset: str = "" # deferred until run_main() 

530 self.basis_root_dataset: str = "" # deferred until run_main() 

531 self.pool: str = "" 

532 self.sudo: str = "" 

533 self.use_zfs_delegation: bool = False 

534 self.ssh_user: str = "" 

535 self.ssh_host: str = "" 

536 self.ssh_user_host: str = "" 

537 self.is_nonlocal: bool = False 

538 

539 def local_ssh_command(self, socket_file: str | None) -> list[str]: 

540 """Returns the ssh CLI command to run locally in order to talk to the remote host; This excludes the (trailing) 

541 command to run on the remote host, which will be appended later.""" 

542 if not self.ssh_user_host: 

543 return [] # dataset is on local host - don't use ssh 

544 

545 # dataset is on remote host 

546 p: Params = self.params 

547 if p.ssh_program == DISABLE_PRG: 

548 die("Cannot talk to remote host because ssh CLI is disabled.") 

549 ssh_cmd: list[str] = [p.ssh_program] + self.ssh_extra_opts 

550 if self.ssh_config_file: 

551 ssh_cmd += ["-F", self.ssh_config_file] 

552 if self.ssh_cipher: 552 ↛ 554line 552 didn't jump to line 554 because the condition on line 552 was always true

553 ssh_cmd += ["-c", self.ssh_cipher] 

554 if self.ssh_port: 

555 ssh_cmd += ["-p", str(self.ssh_port)] 

556 if self.reuse_ssh_connection: 

557 # Performance: reuse ssh connection for low latency startup of frequent ssh invocations via the 'ssh -S' and 

558 # 'ssh -S -M -oControlPersist=60s' options. See https://en.wikibooks.org/wiki/OpenSSH/Cookbook/Multiplexing 

559 if not socket_file: 

560 # Generate unique private Unix domain socket file name in user's home dir and pass it to 'ssh -S /path/to/socket' 

561 def sanitize(name: str) -> str: 

562 name = self.sanitize1_regex.sub("~", name) # replace whitespace, /, $, \, @ with a ~ tilde char 

563 name = self.sanitize2_regex.sub("", name) # Remove disallowed chars 

564 return name 

565 

566 max_rand: int = 999_999_999_999 

567 rand_str: str = urlsafe_base64(random.SystemRandom().randint(0, max_rand), max_value=max_rand, padding=False) 

568 curr_time: str = urlsafe_base64(time.time_ns(), max_value=2**64 - 1, padding=False) 

569 unique: str = f"{os.getpid()}@{curr_time}@{rand_str}" 

570 optional: str = f"@{sanitize(self.ssh_host)[:45]}@{sanitize(self.ssh_user)}" 

571 socket_name: str = f"{self.socket_prefix}{unique}{optional}" 

572 socket_file = os.path.join(self.ssh_exit_on_shutdown_socket_dir, socket_name) 

573 socket_file = socket_file[0 : max(UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH, len(socket_file) - len(optional))] 

574 # `ssh` will error out later if the max OS Unix domain socket path limit cannot be met reasonably as the 

575 # home directory path is too long, typically because the Unix user name is unreasonably long. 

576 ssh_cmd += ["-S", socket_file] 

577 ssh_cmd += [self.ssh_user_host] 

578 return ssh_cmd 

579 

580 def cache_key(self) -> tuple[str, str, int | None, str | None]: 

581 """Returns tuple uniquely identifying this Remote for caching.""" 

582 return self.location, self.ssh_user_host, self.ssh_port, self.ssh_config_file 

583 

584 def cache_namespace(self) -> str: 

585 """Returns cache namespace string which is a stable, unique directory component for snapshot caches that 

586 distinguishes endpoints by username+host+port+ssh_config_file where applicable, and uses '-' when no user/host is 

587 present (local mode).""" 

588 if not self.ssh_user_host: 

589 return "-" # local mode 

590 return f"{self.ssh_user_host}#{self.ssh_port or ''}#{self.ssh_config_file_hash}" 

591 

592 def __repr__(self) -> str: 

593 return str(self.__dict__) 

594 

595 

596############################################################################# 

597class CopyPropertiesConfig: 

598 """--zfs-recv-o* and --zfs-recv-x* option groups for copying or excluding ZFS properties on receive.""" 

599 

600 def __init__(self, group: str, flag: str, args: argparse.Namespace, p: Params) -> None: 

601 """Reads from ArgumentParser via args.""" 

602 assert group in ZFS_RECV_GROUPS 

603 # immutable variables: 

604 grup: str = group 

605 self.group: Final[str] = group # one of zfs_recv_o, zfs_recv_x 

606 self.flag: Final[str] = flag # one of -o or -x 

607 sources: str = p.validate_arg_str(getattr(args, f"{grup}_sources")) 

608 self.sources: Final[str] = ",".join(sorted([s.strip() for s in sources.strip().split(",")])) # canonicalize 

609 self.targets: Final[str] = p.validate_arg_str(getattr(args, f"{grup}_targets")) 

610 include_regexes: list[str] | None = getattr(args, f"{grup}_include_regex") 

611 assert ZFS_RECV_O in ZFS_RECV_GROUPS 

612 if include_regexes is None: 

613 include_regexes = [ZFS_RECV_O_INCLUDE_REGEX_DEFAULT] if grup == ZFS_RECV_O else [] 

614 self.include_regexes: Final[RegexList] = compile_regexes(include_regexes) 

615 self.exclude_regexes: Final[RegexList] = compile_regexes(getattr(args, f"{grup}_exclude_regex")) 

616 

617 def __repr__(self) -> str: 

618 return str(self.__dict__) 

619 

620 

621############################################################################# 

622class SnapshotLabel(NamedTuple): 

623 """Contains the individual parts that are concatenated into a ZFS snapshot name.""" 

624 

625 prefix: str # bzfs_ 

626 infix: str # us-west-1_ 

627 timestamp: str # 2024-11-06_08:30:05 

628 suffix: str # _hourly 

629 

630 def __str__(self) -> str: # bzfs_us-west-1_2024-11-06_08:30:05_hourly 

631 return f"{self.prefix}{self.infix}{self.timestamp}{self.suffix}" 

632 

633 def notimestamp_str(self) -> str: # bzfs_us-west-1_hourly 

634 """Returns the concatenation of all parts except for the timestamp part.""" 

635 return f"{self.prefix}{self.infix}{self.suffix}" 

636 

637 def validate_label(self, input_text: str) -> None: 

638 """Validates that the composed snapshot label forms a legal name.""" 

639 name: str = str(self) 

640 validate_dataset_name(name, input_text) 

641 if "/" in name: 

642 die(f"Invalid ZFS snapshot name: '{name}' for: '{input_text}'") 

643 for key, value in {"prefix": self.prefix, "infix": self.infix, "suffix": self.suffix}.items(): 

644 if key == "prefix": 

645 if not value.endswith("_"): 

646 die(f"Invalid {input_text}{key}: Must end with an underscore character: '{value}'") 

647 if value.count("_") > 1: 

648 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'") 

649 elif key == "infix": 

650 if value: 

651 if not value.endswith("_"): 

652 die(f"Invalid {input_text}{key}: Must end with an underscore character: '{value}'") 

653 if value.count("_") > 1: 

654 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'") 

655 elif value: 

656 if not value.startswith("_"): 

657 die(f"Invalid {input_text}{key}: Must start with an underscore character: '{value}'") 

658 if value.count("_") > 1: 

659 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'") 

660 

661 

662############################################################################# 

663class CreateSrcSnapshotConfig: 

664 """Option values for --create-src-snapshots, that is, for automatically creating source snapshots.""" 

665 

666 def __init__(self, args: argparse.Namespace, p: Params) -> None: 

667 """Option values for --create-src-snapshots*; reads from ArgumentParser via args.""" 

668 # immutable variables: 

669 self.skip_create_src_snapshots: Final[bool] = not args.create_src_snapshots 

670 self.create_src_snapshots_even_if_not_due: Final[bool] = args.create_src_snapshots_even_if_not_due 

671 tz_spec: str | None = args.create_src_snapshots_timezone if args.create_src_snapshots_timezone else None 

672 self.tz: tzinfo | None = get_timezone(tz_spec) 

673 self.current_datetime: datetime = current_datetime(tz_spec) 

674 self.timeformat: Final[str] = args.create_src_snapshots_timeformat 

675 self.anchors: Final[PeriodAnchors] = PeriodAnchors.parse(args) 

676 

677 # Compute the schedule for upcoming periodic time events (suffix_durations). This event schedule is also used in 

678 # daemon mode via sleep_until_next_daemon_iteration() 

679 suffixes: list[str] = [] 

680 labels: list[SnapshotLabel] = [] 

681 create_src_snapshots_plan: str = args.create_src_snapshots_plan or str({"bzfs": {"onsite": {"adhoc": 1}}}) 

682 for org, target_periods in ast.literal_eval(create_src_snapshots_plan).items(): 

683 for target, periods in target_periods.items(): 

684 for period_unit, period_amount in periods.items(): # e.g. period_unit can be "10minutely" or "minutely" 

685 if not isinstance(period_amount, int) or period_amount < 0: 

686 die(f"--create-src-snapshots-plan: Period amount must be a non-negative integer: {period_amount}") 

687 if period_amount > 0: 

688 suffix: str = nsuffix(period_unit) 

689 suffixes.append(suffix) 

690 labels.append(SnapshotLabel(prefix=nprefix(org), infix=ninfix(target), timestamp="", suffix=suffix)) 

691 xperiods: SnapshotPeriods = p.xperiods 

692 if self.skip_create_src_snapshots: 

693 duration_amount, duration_unit = p.xperiods.suffix_to_duration0(p.daemon_frequency) 

694 if duration_amount <= 0 or not duration_unit: 

695 die(f"Invalid --daemon-frequency: {p.daemon_frequency}") 

696 suffixes = [nsuffix(p.daemon_frequency)] 

697 labels = [] 

698 suffix_durations: dict[str, tuple[int, str]] = {suffix: xperiods.suffix_to_duration1(suffix) for suffix in suffixes} 

699 

700 def suffix_key(suffix: str) -> tuple[int, str]: 

701 duration_amount, duration_unit = suffix_durations[suffix] 

702 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0) 

703 if suffix.endswith(("hourly", "minutely", "secondly")): 

704 if duration_milliseconds != 0 and 86400 * 1000 % duration_milliseconds != 0: 

705 die( 

706 "Invalid --create-src-snapshots-plan: Period duration should be a divisor of 86400 seconds " 

707 f"without remainder so that snapshots will be created at the same time of day every day: {suffix}" 

708 ) 

709 if suffix.endswith("monthly"): 

710 if duration_amount != 0 and 12 % duration_amount != 0: 

711 die( 

712 "Invalid --create-src-snapshots-plan: Period duration should be a divisor of 12 months " 

713 f"without remainder so that snapshots will be created at the same time every year: {suffix}" 

714 ) 

715 return duration_milliseconds, suffix 

716 

717 suffixes = sorted(suffixes, key=suffix_key, reverse=True) # take snapshots for dailies before hourlies, and so on 

718 self.suffix_durations: Final[dict[str, tuple[int, str]]] = {sfx: suffix_durations[sfx] for sfx in suffixes} # sort 

719 suffix_indexes: dict[str, int] = {suffix: k for k, suffix in enumerate(suffixes)} 

720 labels.sort(key=lambda label: (suffix_indexes[label.suffix], label)) # take snapshots for dailies before hourlies 

721 self._snapshot_labels: Final[list[SnapshotLabel]] = labels 

722 for label in self.snapshot_labels(): 

723 label.validate_label("--create-src-snapshots-plan ") 

724 

725 def snapshot_labels(self) -> list[SnapshotLabel]: 

726 """Returns the snapshot name patterns for which snapshots shall be created.""" 

727 timeformat: str = self.timeformat 

728 is_millis: bool = timeformat.endswith("%F") # non-standard hack to append milliseconds 

729 if is_millis: 

730 timeformat = timeformat[0:-1] + "f" # replace %F with %f (append microseconds) 

731 timestamp: str = self.current_datetime.strftime(timeformat) 

732 if is_millis: 

733 timestamp = timestamp[0 : -len("000")] # replace microseconds with milliseconds 

734 timestamp = timestamp.replace("+", "z") # zfs CLI does not accept the '+' character in snapshot names 

735 return [SnapshotLabel(label.prefix, label.infix, timestamp, label.suffix) for label in self._snapshot_labels] 

736 

737 def __repr__(self) -> str: 

738 return str(self.__dict__) 

739 

740 

741############################################################################# 

742@dataclass(frozen=True) 

743class AlertConfig: 

744 """Thresholds controlling when alerts fire for snapshot age.""" 

745 

746 kind: Literal["Latest", "Oldest"] 

747 warning_millis: int 

748 critical_millis: int 

749 

750 

751############################################################################# 

752@dataclass(frozen=True) 

753class MonitorSnapshotAlert: 

754 """Alert configuration for a single monitored snapshot label.""" 

755 

756 label: SnapshotLabel 

757 latest: AlertConfig | None 

758 oldest: AlertConfig | None 

759 

760 

761############################################################################# 

762class MonitorSnapshotsConfig: 

763 """Option values for --monitor-snapshots*, that is, policy describing which snapshots to monitor for staleness.""" 

764 

765 def __init__(self, args: argparse.Namespace, p: Params) -> None: 

766 """Reads from ArgumentParser via args.""" 

767 # immutable variables: 

768 self.monitor_snapshots: Final[dict] = ast.literal_eval(args.monitor_snapshots) 

769 self.dont_warn: Final[bool] = args.monitor_snapshots_dont_warn 

770 self.dont_crit: Final[bool] = args.monitor_snapshots_dont_crit 

771 self.no_latest_check: Final[bool] = args.monitor_snapshots_no_latest_check 

772 self.no_oldest_check: Final[bool] = args.monitor_snapshots_no_oldest_check 

773 alerts: list[MonitorSnapshotAlert] = [] 

774 xperiods: SnapshotPeriods = p.xperiods 

775 for org, target_periods in self.monitor_snapshots.items(): 

776 prefix: str = nprefix(org) 

777 for target, periods in target_periods.items(): 

778 for period_unit, alert_dicts in periods.items(): # e.g. period_unit can be "10minutely" or "minutely" 

779 label = SnapshotLabel(prefix=prefix, infix=ninfix(target), timestamp="", suffix=nsuffix(period_unit)) 

780 alert_latest, alert_oldest = None, None 

781 for alert_type, alert_dict in alert_dicts.items(): 

782 m = "--monitor-snapshots: " 

783 if alert_type not in ["latest", "oldest"]: 

784 die(f"{m}'{alert_type}' must be 'latest' or 'oldest' within {args.monitor_snapshots}") 

785 warning_millis: int = 0 

786 critical_millis: int = 0 

787 cycles: int = 1 

788 for kind, value in alert_dict.items(): 

789 context: str = args.monitor_snapshots 

790 if kind == "warning": 

791 warning_millis = max(0, parse_duration_to_milliseconds(str(value), context=context)) 

792 elif kind == "critical": 

793 critical_millis = max(0, parse_duration_to_milliseconds(str(value), context=context)) 

794 elif kind == "cycles": 

795 cycles = max(0, int(value)) 

796 else: 

797 die(f"{m}'{kind}' must be 'warning', 'critical' or 'cycles' within {context}") 

798 if warning_millis > 0 or critical_millis > 0: 

799 duration_amount, duration_unit = xperiods.suffix_to_duration1(label.suffix) 

800 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0) 

801 warning_millis += 0 if warning_millis <= 0 else cycles * duration_milliseconds 

802 critical_millis += 0 if critical_millis <= 0 else cycles * duration_milliseconds 

803 warning_millis = UNIX_TIME_INFINITY_SECS if warning_millis <= 0 else warning_millis 

804 critical_millis = UNIX_TIME_INFINITY_SECS if critical_millis <= 0 else critical_millis 

805 capitalized_alert_type = cast(Literal["Latest", "Oldest"], sys.intern(alert_type.capitalize())) 

806 alert_config = AlertConfig(capitalized_alert_type, warning_millis, critical_millis) 

807 if alert_type == "latest": 

808 if not self.no_latest_check: 

809 alert_latest = alert_config 

810 else: 

811 assert alert_type == "oldest" 

812 if not self.no_oldest_check: 

813 alert_oldest = alert_config 

814 if alert_latest is not None or alert_oldest is not None: 

815 alerts.append(MonitorSnapshotAlert(label, alert_latest, alert_oldest)) 

816 

817 def alert_sort_key(alert: MonitorSnapshotAlert) -> tuple[int, SnapshotLabel]: 

818 duration_amount, duration_unit = xperiods.suffix_to_duration1(alert.label.suffix) 

819 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0) 

820 return duration_milliseconds, alert.label 

821 

822 alerts.sort(key=alert_sort_key, reverse=True) # check snapshots for dailies before hourlies, and so on 

823 self.alerts: Final[list[MonitorSnapshotAlert]] = alerts 

824 self.enable_monitor_snapshots: Final[bool] = len(alerts) > 0 

825 

826 def __repr__(self) -> str: 

827 return str(self.__dict__) 

828 

829 

830############################################################################# 

831def _fix_send_recv_opts( 

832 opts: list[str], 

833 exclude_long_opts: set[str], 

834 exclude_short_opts: str, 

835 include_arg_opts: set[str], 

836 exclude_arg_opts: frozenset[str] = frozenset(), 

837 preserve_properties: frozenset[str] = frozenset(), 

838) -> tuple[list[str], list[str]]: 

839 """These opts are instead managed via bzfs CLI args --dryrun, etc.""" 

840 assert "-" not in exclude_short_opts 

841 results: list[str] = [] 

842 x_names: set[str] = set(preserve_properties) 

843 i = 0 

844 n = len(opts) 

845 while i < n: 

846 opt: str = opts[i] 

847 i += 1 

848 if opt in exclude_arg_opts: # example: {"-X", "--exclude"} 

849 i += 1 

850 continue 

851 elif opt in include_arg_opts: # example: {"-o", "-x"} 

852 results.append(opt) 

853 if i < n: 

854 if opt == "-o" and "=" in opts[i] and opts[i].split("=", 1)[0] in preserve_properties: 

855 die(f"--preserve-properties: Disallowed ZFS property found in --zfs-recv-program-opt(s): -o {opts[i]}") 

856 if opt == "-x": 

857 x_names.discard(opts[i]) 

858 results.append(opts[i]) 

859 i += 1 

860 elif opt not in exclude_long_opts: # example: {"--dryrun", "--verbose"} 

861 if opt.startswith("-") and opt != "-" and not opt.startswith("--"): 

862 for char in exclude_short_opts: # example: "den" 

863 opt = opt.replace(char, "") 

864 if opt == "-": 

865 continue 

866 results.append(opt) 

867 return results, sorted(x_names) 

868 

869 

870SSH_MASTER_DOMAIN_SOCKET_FILE_PID_REGEX: Final[re.Pattern[str]] = re.compile(r"^[0-9]+") # see local_ssh_command() 

871 

872 

873def _delete_stale_files( 

874 root_dir: str, 

875 prefix: str, 

876 millis: int = 60 * 60 * 1000, 

877 dirs: bool = False, 

878 exclude: str | None = None, 

879 ssh: bool = False, 

880) -> None: 

881 """Cleans up obsolete files; For example caused by abnormal termination, OS crash.""" 

882 seconds: float = millis / 1000 

883 now: float = time.time() 

884 validate_is_not_a_symlink("", root_dir) 

885 with os.scandir(root_dir) as iterator: 

886 for entry in iterator: 

887 if entry.name == exclude or not entry.name.startswith(prefix): 

888 continue 

889 try: 

890 stats = entry.stat(follow_symlinks=False) 

891 is_dir = entry.is_dir(follow_symlinks=False) 

892 if ((dirs and is_dir) or (not dirs and not is_dir)) and now - stats.st_mtime >= seconds: 

893 if dirs: 

894 shutil.rmtree(entry.path, ignore_errors=True) 

895 elif not (ssh and stat.S_ISSOCK(stats.st_mode)): 

896 os.remove(entry.path) 

897 elif match := SSH_MASTER_DOMAIN_SOCKET_FILE_PID_REGEX.match(entry.name[len(prefix) :]): 897 ↛ 886line 897 didn't jump to line 886 because the condition on line 897 was always true

898 pid: int = int(match.group(0)) 

899 if pid_exists(pid) is False or now - stats.st_mtime >= 31 * 24 * 60 * 60: 

900 os.remove(entry.path) # bzfs process is no longer alive; its ssh master process isn't either 

901 except FileNotFoundError: 

902 pass # harmless 

903 

904 

905def _create_symlink(src: str, dst_dir: str, dst: str) -> None: 

906 """Creates dst symlink pointing to src using a relative path.""" 

907 rel_path: str = os.path.relpath(src, start=dst_dir) 

908 os.symlink(src=rel_path, dst=os.path.join(dst_dir, dst))