Coverage for bzfs_main / configuration.py: 99%

600 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-22 08:03 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Configuration subsystem; All CLI option/parameter values are reachable from the "Params" class.""" 

16 

17from __future__ import ( 

18 annotations, 

19) 

20import argparse 

21import ast 

22import os 

23import platform 

24import random 

25import re 

26import shutil 

27import stat 

28import sys 

29import tempfile 

30import threading 

31import time 

32from collections.abc import ( 

33 Iterable, 

34) 

35from dataclasses import ( 

36 dataclass, 

37) 

38from datetime import ( 

39 datetime, 

40 tzinfo, 

41) 

42from logging import ( 

43 Logger, 

44) 

45from typing import ( 

46 Final, 

47 Literal, 

48 NamedTuple, 

49 cast, 

50 final, 

51) 

52 

53from bzfs_main.argparse_actions import ( 

54 SnapshotFilter, 

55 optimize_snapshot_filters, 

56) 

57from bzfs_main.argparse_cli import ( 

58 LOG_DIR_DEFAULT, 

59 ZFS_RECV_GROUPS, 

60 ZFS_RECV_O, 

61 ZFS_RECV_O_INCLUDE_REGEX_DEFAULT, 

62 __version__, 

63) 

64from bzfs_main.detect import ( 

65 DISABLE_PRG, 

66) 

67from bzfs_main.filter import ( 

68 SNAPSHOT_FILTERS_VAR, 

69) 

70from bzfs_main.period_anchors import ( 

71 PeriodAnchors, 

72) 

73from bzfs_main.util import ( 

74 utils, 

75) 

76from bzfs_main.util.connection import ( 

77 ConnectionPools, 

78 MiniParams, 

79 MiniRemote, 

80) 

81from bzfs_main.util.retry import ( 

82 RetryPolicy, 

83) 

84from bzfs_main.util.utils import ( 

85 DIR_PERMISSIONS, 

86 FILE_PERMISSIONS, 

87 PROG_NAME, 

88 SHELL_CHARS, 

89 UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH, 

90 UNIX_TIME_INFINITY_SECS, 

91 RegexList, 

92 SnapshotPeriods, 

93 SynchronizedBool, 

94 append_if_absent, 

95 compile_regexes, 

96 current_datetime, 

97 die, 

98 get_home_directory, 

99 get_timezone, 

100 getenv_bool, 

101 getenv_int, 

102 is_included, 

103 ninfix, 

104 nprefix, 

105 nsuffix, 

106 parse_duration_to_milliseconds, 

107 pid_exists, 

108 sha256_hex, 

109 sha256_urlsafe_base64, 

110 urlsafe_base64, 

111 validate_dataset_name, 

112 validate_file_permissions, 

113 validate_is_not_a_symlink, 

114 validate_property_name, 

115 xappend, 

116) 

117 

118# constants: 

119_UNSET_ENV_VARS_LOCK: Final[threading.Lock] = threading.Lock() 

120_UNSET_ENV_VARS_LATCH: Final[SynchronizedBool] = SynchronizedBool(True) 

121 

122 

123############################################################################# 

124@final 

125class LogParams: 

126 """Option values for logging.""" 

127 

128 def __init__(self, args: argparse.Namespace) -> None: 

129 """Reads from ArgumentParser via args.""" 

130 # immutable variables: 

131 if args.quiet: 

132 log_level: str = "ERROR" 

133 elif args.verbose >= 2: 

134 log_level = "TRACE" 

135 elif args.verbose >= 1: 

136 log_level = "DEBUG" 

137 else: 

138 log_level = "INFO" 

139 self.log_level: Final[str] = log_level 

140 self.timestamp: Final[str] = datetime.now().isoformat(sep="_", timespec="seconds") # 2024-09-03_12:26:15 

141 self.isatty: Final[bool] = getenv_bool("isatty", True) 

142 self.quiet: Final[bool] = args.quiet 

143 self.terminal_columns: Final[int] = ( 

144 getenv_int("terminal_columns", shutil.get_terminal_size(fallback=(120, 24)).columns) 

145 if self.isatty and args.pv_program != DISABLE_PRG and not self.quiet 

146 else 0 

147 ) 

148 self.home_dir: Final[str] = get_home_directory() 

149 log_parent_dir: Final[str] = args.log_dir if args.log_dir else os.path.join(self.home_dir, LOG_DIR_DEFAULT) 

150 if LOG_DIR_DEFAULT not in os.path.basename(log_parent_dir): 

151 die(f"Basename of --log-dir must contain the substring '{LOG_DIR_DEFAULT}', but got: {log_parent_dir}") 

152 sep: str = "_" if args.log_subdir == "daily" else ":" 

153 timestamp: str = self.timestamp 

154 subdir: str = timestamp[0 : timestamp.rindex(sep) if args.log_subdir == "minutely" else timestamp.index(sep)] 

155 # 2024-09-03 (d), 2024-09-03_12 (h), 2024-09-03_12:26 (m) 

156 self.log_dir: Final[str] = os.path.join(log_parent_dir, subdir) 

157 os.makedirs(log_parent_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

158 validate_is_not_a_symlink("--log-dir ", log_parent_dir) 

159 validate_file_permissions(log_parent_dir, DIR_PERMISSIONS) 

160 os.makedirs(self.log_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

161 validate_is_not_a_symlink("--log-dir subdir ", self.log_dir) 

162 validate_file_permissions(self.log_dir, DIR_PERMISSIONS) 

163 self.log_file_prefix: Final[str] = args.log_file_prefix 

164 self.log_file_infix: Final[str] = args.log_file_infix 

165 self.log_file_suffix: Final[str] = args.log_file_suffix 

166 fd, self.log_file = tempfile.mkstemp( 

167 suffix=".log", 

168 prefix=f"{self.log_file_prefix}{self.timestamp}{self.log_file_infix}{self.log_file_suffix}-", 

169 dir=self.log_dir, 

170 ) 

171 os.fchmod(fd, FILE_PERMISSIONS) 

172 os.close(fd) 

173 self.pv_log_file: Final[str] = self.log_file[0 : -len(".log")] + ".pv" 

174 log_file_stem: str = os.path.basename(self.log_file)[0 : -len(".log")] 

175 # Python's standard logger naming API interprets chars such as '.', '-', ':', spaces, etc in special ways, e.g. 

176 # logging.getLogger("foo.bar") vs logging.getLogger("foo-bar"). Thus, we sanitize the Python logger name via a regex: 

177 self.logger_name_suffix: Final[str] = re.sub(r"[^A-Za-z0-9_]", repl="_", string=log_file_stem) 

178 cache_root_dir: str = os.path.join(log_parent_dir, ".cache") 

179 os.makedirs(cache_root_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

180 validate_file_permissions(cache_root_dir, DIR_PERMISSIONS) 

181 self.last_modified_cache_dir: Final[str] = os.path.join(cache_root_dir, "mods") 

182 

183 # Create/update "current" symlink to current_dir, which is a subdir containing further symlinks to log files. 

184 # For parallel usage, ensures there is no time window when the symlinks are inconsistent or do not exist. 

185 current: str = "current" 

186 dot_current_dir: str = os.path.join(log_parent_dir, f".{current}") 

187 current_dir: str = os.path.join(dot_current_dir, log_file_stem) 

188 os.makedirs(dot_current_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

189 validate_is_not_a_symlink("--log-dir: .current ", dot_current_dir) 

190 try: 

191 os.makedirs(current_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

192 _create_symlink(self.log_file, current_dir, f"{current}.log") 

193 _create_symlink(self.pv_log_file, current_dir, f"{current}.pv") 

194 _create_symlink(self.log_dir, current_dir, f"{current}.dir") 

195 dst_file: str = os.path.join(current_dir, current) 

196 os.symlink(os.path.relpath(current_dir, start=log_parent_dir), dst_file) 

197 os.replace(dst_file, os.path.join(log_parent_dir, current)) # atomic rename 

198 _delete_stale_files(dot_current_dir, prefix="", millis=5000, dirs=True, exclude=os.path.basename(current_dir)) 

199 except FileNotFoundError: 

200 pass # harmless concurrent cleanup 

201 

202 def __repr__(self) -> str: 

203 return str(self.__dict__) 

204 

205 

206############################################################################# 

207@final 

208class Params(MiniParams): 

209 """All parsed CLI options combined into a single bundle; simplifies passing around numerous settings and defaults.""" 

210 

211 def __init__( 

212 self, 

213 args: argparse.Namespace, 

214 sys_argv: list[str], 

215 log_params: LogParams, 

216 log: Logger, 

217 inject_params: dict[str, bool] | None = None, 

218 ) -> None: 

219 """Reads from ArgumentParser via args.""" 

220 # immutable variables: 

221 assert args is not None 

222 assert isinstance(sys_argv, list) 

223 assert log_params is not None 

224 assert log is not None 

225 self.args: Final[argparse.Namespace] = args 

226 self.sys_argv: Final[list[str]] = sys_argv 

227 self.log_params: Final[LogParams] = log_params 

228 self.log: Logger = log 

229 self.inject_params: Final[dict[str, bool]] = inject_params if inject_params is not None else {} # for testing only 

230 self.one_or_more_whitespace_regex: Final[re.Pattern[str]] = re.compile(r"\s+") 

231 self.two_or_more_spaces_regex: Final[re.Pattern[str]] = re.compile(r" +") 

232 self._unset_matching_env_vars(args) 

233 self.xperiods: Final[SnapshotPeriods] = SnapshotPeriods() 

234 

235 assert len(args.root_dataset_pairs) > 0 

236 self.root_dataset_pairs: Final[list[tuple[str, str]]] = args.root_dataset_pairs 

237 self.recursive: Final[bool] = args.recursive 

238 self.recursive_flag: Final[str] = "-r" if args.recursive else "" 

239 

240 self.dry_run: Final[bool] = args.dryrun is not None 

241 self.dry_run_recv: Final[str] = "-n" if self.dry_run else "" 

242 self.dry_run_destroy: Final[str] = self.dry_run_recv 

243 self.dry_run_no_send: Final[bool] = args.dryrun == "send" 

244 self.verbose_zfs: Final[bool] = args.verbose >= 2 

245 self.verbose_destroy: Final[str] = "" if args.quiet else "-v" 

246 

247 self.zfs_send_program_opts: Final[list[str]] = self._fix_send_opts(self.split_args(args.zfs_send_program_opts)) 

248 zfs_recv_program_opts: list[str] = self.split_args(args.zfs_recv_program_opts) 

249 for extra_opt in args.zfs_recv_program_opt: 

250 zfs_recv_program_opts.append(self.validate_arg_str(extra_opt, allow_all=True)) 

251 preserve_properties = [validate_property_name(name, "--preserve-properties") for name in args.preserve_properties] 

252 zfs_recv_program_opts, zfs_recv_x_names = self._fix_recv_opts(zfs_recv_program_opts, frozenset(preserve_properties)) 

253 self.zfs_recv_program_opts: Final[list[str]] = zfs_recv_program_opts 

254 self.zfs_recv_x_names: Final[list[str]] = zfs_recv_x_names 

255 if self.verbose_zfs: 

256 append_if_absent(self.zfs_send_program_opts, "-v") 

257 append_if_absent(self.zfs_recv_program_opts, "-v") 

258 # zfs_full_recv_opts: dataset-specific dynamic -o/-x property options are computed later per dataset in 

259 # replication._add_recv_property_options(): 

260 self.zfs_full_recv_opts: Final[list[str]] = self.zfs_recv_program_opts.copy() 

261 cpconfigs = [CopyPropertiesConfig(group, flag, args, self) for group, flag in ZFS_RECV_GROUPS.items()] 

262 self.zfs_recv_o_config, self.zfs_recv_x_config, self.zfs_set_config = cpconfigs 

263 

264 self.force_rollback_to_latest_snapshot: Final[bool] = args.force_rollback_to_latest_snapshot 

265 self.force_rollback_to_latest_common_snapshot: Final[SynchronizedBool] = SynchronizedBool( 

266 args.force_rollback_to_latest_common_snapshot 

267 ) 

268 self.force: Final[SynchronizedBool] = SynchronizedBool(args.force) 

269 self.force_once: Final[bool] = args.force_once 

270 self.force_unmount: Final[str] = "-f" if args.force_unmount else "" 

271 self.force_hard: Final[str] = "-R" if args.force_destroy_dependents else "" 

272 

273 self.skip_parent: Final[bool] = args.skip_parent 

274 self.skip_missing_snapshots: Final[str] = args.skip_missing_snapshots 

275 self.skip_on_error: Final[str] = args.skip_on_error 

276 self.retry_policy: Final[RetryPolicy] = RetryPolicy.from_namespace(args) 

277 self.skip_replication: Final[bool] = args.skip_replication 

278 self.delete_dst_snapshots: Final[bool] = args.delete_dst_snapshots is not None 

279 self.delete_dst_bookmarks: Final[bool] = args.delete_dst_snapshots == "bookmarks" 

280 self.delete_dst_snapshots_no_crosscheck: Final[bool] = args.delete_dst_snapshots_no_crosscheck 

281 self.delete_dst_snapshots_except: Final[bool] = args.delete_dst_snapshots_except 

282 self.delete_dst_datasets: Final[bool] = args.delete_dst_datasets 

283 self.delete_empty_dst_datasets: Final[bool] = args.delete_empty_dst_datasets is not None 

284 self.delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots: Final[bool] = ( 

285 args.delete_empty_dst_datasets == "snapshots+bookmarks" 

286 ) 

287 self.compare_snapshot_lists: Final[str] = args.compare_snapshot_lists 

288 self.daemon_lifetime_nanos: Final[int] = 1_000_000 * parse_duration_to_milliseconds(args.daemon_lifetime) 

289 self.daemon_frequency: Final[str] = args.daemon_frequency 

290 self.enable_privilege_elevation: Final[bool] = not args.no_privilege_elevation 

291 self.no_stream: Final[bool] = args.no_stream 

292 self.resume_recv: Final[bool] = not args.no_resume_recv 

293 self.create_bookmarks: Final[str] = args.create_bookmarks 

294 self.use_bookmark: Final[bool] = not args.no_use_bookmark 

295 

296 self.src: Final[Remote] = Remote("src", args, self) # src dataset, host and ssh options 

297 self.dst: Final[Remote] = Remote("dst", args, self) # dst dataset, host and ssh options 

298 self.create_src_snapshots_config: Final[CreateSrcSnapshotConfig] = CreateSrcSnapshotConfig(args, self) 

299 self.monitor_snapshots_config: Final[MonitorSnapshotsConfig] = MonitorSnapshotsConfig(args, self) 

300 self.is_caching_snapshots: Final[bool] = args.cache_snapshots 

301 

302 self.compression_program: Final[str] = self._program_name(args.compression_program) 

303 self.compression_program_opts: Final[list[str]] = self.split_args(args.compression_program_opts) 

304 for opt in {"-o", "--output-file"}.intersection(self.compression_program_opts): 

305 die(f"--compression-program-opts: {opt} is disallowed for security reasons.") 

306 self.getconf_program: Final[str] = self._program_name("getconf") # print number of CPUs on POSIX 

307 self.mbuffer_program: Final[str] = self._program_name(args.mbuffer_program) 

308 self.mbuffer_program_opts: Final[list[str]] = self.split_args(args.mbuffer_program_opts) 

309 for opt in {"-i", "-I", "-o", "-O", "-l", "-L", "-t", "-T", "-a", "-A"}.intersection(self.mbuffer_program_opts): 

310 die(f"--mbuffer-program-opts: {opt} is disallowed for security reasons.") 

311 self.ps_program: Final[str] = self._program_name(args.ps_program) 

312 self.pv_program: Final[str] = self._program_name(args.pv_program) 

313 self.pv_program_opts: Final[list[str]] = self.split_args(args.pv_program_opts) 

314 bad_pv_opts = {"-o", "--output", "-f", "--log-file", "-S", "--stop-at-size", "-Y", "--sync", "-X", "--discard", 

315 "-U", "--store-and-forward", "-d", "--watchfd", "-R", "--remote", "-P", "--pidfile"} # fmt: skip 

316 for opt in bad_pv_opts.intersection(self.pv_program_opts): 

317 die(f"--pv-program-opts: {opt} is disallowed for security reasons.") 

318 if args.bwlimit: 

319 self.pv_program_opts.extend([f"--rate-limit={self.validate_arg_str(args.bwlimit)}"]) 

320 self.shell_program_local: Final[str] = "sh" 

321 self.shell_program: Final[str] = self._program_name(args.shell_program) 

322 self.ssh_program: str = self._program_name(args.ssh_program) 

323 self.sudo_program: Final[str] = self._program_name(args.sudo_program) 

324 self.uname_program: Final[str] = self._program_name("uname") 

325 self.zfs_program: Final[str] = self._program_name("zfs") 

326 self.zpool_program: Final[str] = self._program_name(args.zpool_program) 

327 

328 # no point creating complex shell pipeline commands for tiny data transfers: 

329 self.min_pipe_transfer_size: Final[int] = getenv_int("min_pipe_transfer_size", 1024 * 1024) 

330 self.max_datasets_per_batch_on_list_snaps: Final[int] = getenv_int("max_datasets_per_batch_on_list_snaps", 1024) 

331 self.max_datasets_per_minibatch_on_list_snaps: int = getenv_int("max_datasets_per_minibatch_on_list_snaps", -1) 

332 self.max_snapshots_per_minibatch_on_delete_snaps = getenv_int("max_snapshots_per_minibatch_on_delete_snaps", 2**29) 

333 self.dedicated_tcp_connection_per_zfs_send: Final[bool] = getenv_bool("dedicated_tcp_connection_per_zfs_send", True) 

334 # threads: with --force-once we intentionally coerce to a single-threaded run to ensure deterministic serial behavior 

335 self.threads: Final[tuple[int, bool]] = (1, False) if self.force_once else args.threads 

336 timeout_duration_nanos = None if args.timeout is None else 1_000_000 * parse_duration_to_milliseconds(args.timeout) 

337 self.timeout_duration_nanos: int | None = timeout_duration_nanos # duration (not a timestamp); for logging only 

338 self.no_estimate_send_size: Final[bool] = args.no_estimate_send_size 

339 self.remote_conf_cache_ttl_nanos: Final[int] = 1_000_000 * parse_duration_to_milliseconds( 

340 args.daemon_remote_conf_cache_ttl 

341 ) 

342 

343 self.os_cpu_count: Final[int | None] = os.cpu_count() 

344 self.os_getuid: Final[int] = os.getuid() 

345 self.os_geteuid: Final[int] = os.geteuid() 

346 self.prog_version: Final[str] = __version__ 

347 self.python_version: Final[str] = sys.version 

348 self.platform_version: Final[str] = platform.version() 

349 self.platform_platform: Final[str] = platform.platform() 

350 

351 # mutable variables: 

352 snapshot_filters = args.snapshot_filters_var if hasattr(args, SNAPSHOT_FILTERS_VAR) else [[]] 

353 self.snapshot_filters: list[list[SnapshotFilter]] = [optimize_snapshot_filters(f) for f in snapshot_filters] 

354 self.exclude_dataset_property: str | None = args.exclude_dataset_property 

355 self.exclude_dataset_regexes: RegexList = [] # deferred to validate_task() phase 

356 self.include_dataset_regexes: RegexList = [] # deferred to validate_task() phase 

357 self.tmp_exclude_dataset_regexes: RegexList = [] # deferred to validate_task() phase 

358 self.tmp_include_dataset_regexes: RegexList = [] # deferred to validate_task() phase 

359 self.abs_exclude_datasets: list[str] = [] # deferred to validate_task() phase 

360 self.abs_include_datasets: list[str] = [] # deferred to validate_task() phase 

361 

362 self.curr_zfs_send_program_opts: list[str] = [] 

363 self.zfs_recv_ox_names: set[str] = set() 

364 self.available_programs: dict[str, dict[str, str]] = {} 

365 self.zpool_features: dict[str, dict[str, dict[str, str]]] = {r.location: {} for r in [self.src, self.dst]} 

366 self.connection_pools: dict[str, ConnectionPools] = {} 

367 

368 def split_args(self, text: str, *items: str | Iterable[str], allow_all: bool = False) -> list[str]: 

369 """Splits option string on runs of one or more whitespace into an option list.""" 

370 text = text.strip() 

371 opts = self.one_or_more_whitespace_regex.split(text) if text else [] 

372 xappend(opts, items) 

373 if not allow_all: 

374 self._validate_quoting(opts) 

375 return opts 

376 

377 def validate_arg(self, opt: str, allow_spaces: bool = False, allow_all: bool = False) -> str | None: 

378 """allow_all permits all characters, including whitespace and quotes; See squote() and dquote().""" 

379 if allow_all or opt is None: 

380 return opt 

381 if any(char.isspace() and (char != " " or not allow_spaces) for char in opt): 

382 die(f"Option must not contain a whitespace character{' other than space' if allow_spaces else ''}: {opt}") 

383 self._validate_quoting([opt]) 

384 return opt 

385 

386 def validate_arg_str(self, opt: str, allow_spaces: bool = False, allow_all: bool = False) -> str: 

387 """Returns validated option string, raising if missing or illegal.""" 

388 if opt is None: 

389 die("Option must not be missing") 

390 self.validate_arg(opt, allow_spaces=allow_spaces, allow_all=allow_all) 

391 return opt 

392 

393 @staticmethod 

394 def _validate_quoting(opts: list[str]) -> None: 

395 """Raises an error if any option contains a quote or shell metacharacter.""" 

396 for opt in opts: 

397 if "'" in opt or '"' in opt or "$" in opt or "`" in opt: 

398 die(f"Option must not contain a single quote or double quote or dollar or backtick character: {opt}") 

399 

400 @staticmethod 

401 def _fix_recv_opts(opts: list[str], preserve_properties: frozenset[str]) -> tuple[list[str], list[str]]: 

402 """Returns sanitized ``zfs recv`` options and captured ``-o/-x`` args.""" 

403 return _fix_send_recv_opts( 

404 opts, 

405 exclude_long_opts={"--dryrun"}, 

406 exclude_short_opts="n", 

407 include_arg_opts={"-o", "-x"}, 

408 preserve_properties=preserve_properties, 

409 ) 

410 

411 @staticmethod 

412 def _fix_send_opts(opts: list[str]) -> list[str]: 

413 """Returns sanitized ``zfs send`` options.""" 

414 return _fix_send_recv_opts( 

415 opts, 

416 exclude_long_opts={"--dryrun"}, 

417 exclude_short_opts="den", 

418 include_arg_opts={"-X", "--exclude", "--redact"}, 

419 exclude_arg_opts=frozenset({"-i", "-I", "-t", "--resume"}), 

420 )[0] 

421 

422 def _program_name(self, program: str) -> str: 

423 """For testing: helps simulate errors caused by external programs.""" 

424 self.validate_arg_str(program) 

425 if not program: 

426 die(f"Program name must not be missing: {program}") 

427 for char in SHELL_CHARS + ":": 

428 if char in program: 

429 die(f"Program name must not contain a '{char}' character: {program}") 

430 if self.inject_params.get("inject_unavailable_" + program, False): 

431 return program + "-xxx" # substitute a program that cannot be found on the PATH 

432 if self.inject_params.get("inject_failing_" + program, False): 

433 return "false" # substitute a program that will error out with non-zero return code 

434 return program 

435 

436 def _unset_matching_env_vars(self, args: argparse.Namespace) -> None: 

437 """Unset environment variables matching regex filters.""" 

438 if len(args.exclude_envvar_regex) == 0 and len(args.include_envvar_regex) == 0: 

439 return # fast path 

440 exclude_envvar_regexes: RegexList = compile_regexes(args.exclude_envvar_regex) 

441 include_envvar_regexes: RegexList = compile_regexes(args.include_envvar_regex) 

442 # Mutate global state at most once, atomically. First thread wins. The latch isn't strictly necessary for 

443 # correctness as all concurrent bzfs.Job instances in bzfs_jobrunner have identical include/exclude_envvar_regex 

444 # anyway. It's just for reduced latency. 

445 with _UNSET_ENV_VARS_LOCK: 

446 if _UNSET_ENV_VARS_LATCH.get_and_set(False): 

447 for envvar_name in list(os.environ): 

448 # order of include vs exclude is intentionally reversed to correctly implement semantics: 

449 # "unset env var iff excluded and not included (include takes precedence)." 

450 if is_included(envvar_name, exclude_envvar_regexes, include_envvar_regexes): 

451 os.environ.pop(envvar_name, None) 

452 self.log.debug("Unsetting b/c envvar regex: %s", envvar_name) 

453 

454 def lock_file_name(self) -> str: 

455 """Returns unique path used to detect concurrently running jobs. 

456 

457 Makes it such that a job that runs periodically declines to start if the same previous periodic job is still running 

458 without completion yet. Hashed key avoids overly long filenames while remaining deterministic. 

459 """ 

460 # fmt: off 

461 key = (tuple(self.root_dataset_pairs), self.args.recursive, self.args.exclude_dataset_property, 

462 tuple(self.args.include_dataset), tuple(self.args.exclude_dataset), 

463 tuple(self.args.include_dataset_regex), tuple(self.args.exclude_dataset_regex), 

464 tuple(tuple(f) for f in self.snapshot_filters), self.args.skip_replication, self.args.create_src_snapshots, 

465 self.args.create_src_snapshots_plan, self.args.create_src_snapshots_timeformat, 

466 self.create_src_snapshots_config.anchors, 

467 self.args.delete_dst_datasets, self.args.delete_dst_snapshots, self.args.delete_dst_snapshots_except, 

468 self.args.delete_empty_dst_datasets, 

469 self.args.compare_snapshot_lists, self.args.monitor_snapshots, 

470 self.src.basis_ssh_host, self.dst.basis_ssh_host, 

471 self.src.basis_ssh_user, self.dst.basis_ssh_user) 

472 # fmt: on 

473 hash_code: str = sha256_hex(str(key)) 

474 log_parent_dir: str = os.path.dirname(self.log_params.log_dir) 

475 locks_dir: str = os.path.join(log_parent_dir, ".locks") 

476 os.makedirs(locks_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

477 validate_is_not_a_symlink("--locks-dir ", locks_dir) 

478 validate_file_permissions(locks_dir, DIR_PERMISSIONS) 

479 return os.path.join(locks_dir, f"{PROG_NAME}-lockfile-{hash_code}.lock") 

480 

481 def dry(self, msg: str) -> str: 

482 """Prefix ``msg`` with 'Dry' when running in dry-run mode.""" 

483 return utils.dry(msg, self.dry_run) 

484 

485 def is_program_available(self, program: str, location: str) -> bool: 

486 """Return True if ``program`` was detected on ``location`` host.""" 

487 return program in self.available_programs.get(location, {}) 

488 

489 

490############################################################################# 

491@final 

492class Remote(MiniRemote): 

493 """Connection settings for either source or destination host.""" 

494 

495 def __init__(self, loc: str, args: argparse.Namespace, p: Params) -> None: 

496 """Reads from ArgumentParser via args.""" 

497 # immutable variables: 

498 assert loc == "src" or loc == "dst" 

499 self.location: str = loc 

500 self.params: Params = p 

501 self.basis_ssh_user: Final[str] = getattr(args, f"ssh_{loc}_user") 

502 self.basis_ssh_host: Final[str] = getattr(args, f"ssh_{loc}_host") 

503 self.ssh_port: Final[int | None] = getattr(args, f"ssh_{loc}_port") 

504 self.ssh_config_file: Final[str | None] = p.validate_arg(getattr(args, f"ssh_{loc}_config_file")) 

505 if self.ssh_config_file: 

506 if "bzfs_ssh_config" not in os.path.basename(self.ssh_config_file): 

507 die(f"Basename of --ssh-{loc}-config-file must contain substring 'bzfs_ssh_config': {self.ssh_config_file}") 

508 self.ssh_config_file_hash: Final[str] = ( 

509 sha256_urlsafe_base64(os.path.abspath(self.ssh_config_file), padding=False) if self.ssh_config_file else "" 

510 ) 

511 self.ssh_cipher: Final[str] = p.validate_arg_str(args.ssh_cipher) 

512 # disable interactive password prompts and X11 forwarding and pseudo-terminal allocation: 

513 ssh_extra_opts: list[str] = ["-oBatchMode=yes", "-oServerAliveInterval=0", "-x", "-T"] + ( 

514 ["-v"] if args.verbose >= 3 else [] 

515 ) 

516 self.ssh_extra_opts: tuple[str, ...] = tuple(ssh_extra_opts) 

517 self.max_concurrent_ssh_sessions_per_tcp_connection: Final[int] = args.max_concurrent_ssh_sessions_per_tcp_connection 

518 self.ssh_exit_on_shutdown: bool = args.ssh_exit_on_shutdown 

519 self.ssh_control_persist_secs: int = args.ssh_control_persist_secs 

520 self.ssh_control_persist_margin_secs: int = getenv_int("ssh_control_persist_margin_secs", 2) 

521 self.socket_prefix: Final[str] = "s" 

522 self.reuse_ssh_connection: bool = getenv_bool("reuse_ssh_connection", True) 

523 self.ssh_socket_dir: str = "" 

524 if self.reuse_ssh_connection: 

525 ssh_home_dir: str = os.path.join(get_home_directory(), ".ssh") 

526 os.makedirs(ssh_home_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

527 self.ssh_socket_dir = os.path.join(ssh_home_dir, "bzfs") 

528 os.makedirs(self.ssh_socket_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

529 validate_file_permissions(self.ssh_socket_dir, mode=DIR_PERMISSIONS) 

530 self.ssh_exit_on_shutdown_socket_dir: Final[str] = os.path.join(self.ssh_socket_dir, "x") 

531 os.makedirs(self.ssh_exit_on_shutdown_socket_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

532 validate_file_permissions(self.ssh_exit_on_shutdown_socket_dir, mode=DIR_PERMISSIONS) 

533 _delete_stale_files(self.ssh_exit_on_shutdown_socket_dir, self.socket_prefix, ssh=True) 

534 self.sanitize1_regex: Final[re.Pattern[str]] = re.compile(r"[\s\\/@$]") # replace whitespace, /, $, \, @ with ~ char 

535 self.sanitize2_regex: Final[re.Pattern[str]] = re.compile(rf"[^a-zA-Z0-9{re.escape('~.:_-')}]") # remove bad chars 

536 

537 # mutable variables: 

538 self.root_dataset: str = "" # deferred until run_main() 

539 self.basis_root_dataset: str = "" # deferred until run_main() 

540 self.pool: str = "" 

541 self.sudo: str = "" 

542 self.use_zfs_delegation: bool = False 

543 self.ssh_user: str = "" 

544 self.ssh_host: str = "" 

545 self.ssh_user_host: str = "" 

546 self.is_nonlocal: bool = False 

547 

548 def local_ssh_command(self, socket_file: str | None) -> list[str]: 

549 """Returns the ssh CLI command to run locally in order to talk to the remote host; This excludes the (trailing) 

550 command to run on the remote host, which will be appended later.""" 

551 if not self.ssh_user_host: 

552 return [] # dataset is on local host - don't use ssh 

553 

554 # dataset is on remote host 

555 p: Params = self.params 

556 if p.ssh_program == DISABLE_PRG: 

557 die("Cannot talk to remote host because ssh CLI is disabled.") 

558 ssh_cmd: list[str] = [p.ssh_program] + list(self.ssh_extra_opts) 

559 if self.ssh_config_file: 

560 ssh_cmd += ["-F", self.ssh_config_file] 

561 if self.ssh_cipher: 561 ↛ 563line 561 didn't jump to line 563 because the condition on line 561 was always true

562 ssh_cmd += ["-c", self.ssh_cipher] 

563 if self.ssh_port: 

564 ssh_cmd += ["-p", str(self.ssh_port)] 

565 if self.reuse_ssh_connection: 

566 # Performance: reuse ssh connection for low latency startup of frequent ssh invocations via the 'ssh -S' and 

567 # 'ssh -S -M -oControlPersist=60s' options. See https://en.wikibooks.org/wiki/OpenSSH/Cookbook/Multiplexing 

568 if not socket_file: 

569 # Generate unique private Unix domain socket file name in user's home dir and pass it to 'ssh -S /path/to/socket' 

570 def sanitize(name: str) -> str: 

571 name = self.sanitize1_regex.sub("~", name) # replace whitespace, /, $, \, @ with a ~ tilde char 

572 name = self.sanitize2_regex.sub("", name) # Remove disallowed chars 

573 return name 

574 

575 max_rand: int = 999_999_999_999 

576 rand_str: str = urlsafe_base64(random.SystemRandom().randint(0, max_rand), max_value=max_rand, padding=False) 

577 curr_time: str = urlsafe_base64(time.time_ns(), max_value=2**64 - 1, padding=False) 

578 unique: str = f"{os.getpid()}@{curr_time}@{rand_str}" 

579 optional: str = f"@{sanitize(self.ssh_host)[:45]}@{sanitize(self.ssh_user)}" 

580 socket_name: str = f"{self.socket_prefix}{unique}{optional}" 

581 socket_file = os.path.join(self.ssh_exit_on_shutdown_socket_dir, socket_name) 

582 socket_file = socket_file[0 : max(UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH, len(socket_file) - len(optional))] 

583 # `ssh` will error out later if the max OS Unix domain socket path limit cannot be met reasonably as the 

584 # home directory path is too long, typically because the Unix user name is unreasonably long. 

585 ssh_cmd += ["-S", socket_file] 

586 ssh_cmd += [self.ssh_user_host] 

587 return ssh_cmd 

588 

589 def cache_key(self) -> tuple[str, str, int | None, str | None]: 

590 """Returns tuple uniquely identifying this Remote for caching.""" 

591 return self.location, self.ssh_user_host, self.ssh_port, self.ssh_config_file 

592 

593 def cache_namespace(self) -> str: 

594 """Returns cache namespace string which is a stable, unique directory component for caches that distinguishes 

595 endpoints by username+host+port+ssh_config_file where applicable, and uses '-' when no user/host is present (local 

596 mode).""" 

597 if not self.ssh_user_host: 

598 return "-" # local mode 

599 return f"{self.ssh_user_host}#{self.ssh_port or ''}#{self.ssh_config_file_hash}" 

600 

601 def is_ssh_available(self) -> bool: 

602 """Return True if the ssh client program required for this remote is available on the local host.""" 

603 return self.params.is_program_available("ssh", "local") 

604 

605 def __repr__(self) -> str: 

606 return str(self.__dict__) 

607 

608 

609############################################################################# 

610@final 

611class CopyPropertiesConfig: 

612 """--zfs-recv-o* and --zfs-recv-x* option groups for copying or excluding ZFS properties on receive.""" 

613 

614 def __init__(self, group: str, flag: str, args: argparse.Namespace, p: Params) -> None: 

615 """Reads from ArgumentParser via args.""" 

616 assert group in ZFS_RECV_GROUPS 

617 # immutable variables: 

618 grup: str = group 

619 self.group: Final[str] = group # one of zfs_recv_o, zfs_recv_x 

620 self.flag: Final[str] = flag # one of -o or -x 

621 sources: str = p.validate_arg_str(getattr(args, f"{grup}_sources")) 

622 self.sources: Final[str] = ",".join(sorted([s.strip() for s in sources.strip().split(",")])) # canonicalize 

623 self.targets: Final[str] = p.validate_arg_str(getattr(args, f"{grup}_targets")) 

624 include_regexes: list[str] | None = getattr(args, f"{grup}_include_regex") 

625 assert ZFS_RECV_O in ZFS_RECV_GROUPS 

626 if include_regexes is None: 

627 include_regexes = [ZFS_RECV_O_INCLUDE_REGEX_DEFAULT] if grup == ZFS_RECV_O else [] 

628 self.include_regexes: Final[RegexList] = compile_regexes(include_regexes) 

629 self.exclude_regexes: Final[RegexList] = compile_regexes(getattr(args, f"{grup}_exclude_regex")) 

630 

631 def __repr__(self) -> str: 

632 return str(self.__dict__) 

633 

634 

635############################################################################# 

636@final 

637class SnapshotLabel(NamedTuple): 

638 """Contains the individual parts that are concatenated into a ZFS snapshot name.""" 

639 

640 prefix: str # bzfs_ 

641 infix: str # us-west-1_ 

642 timestamp: str # 2024-11-06_08:30:05 

643 suffix: str # _hourly 

644 

645 def __str__(self) -> str: # bzfs_us-west-1_2024-11-06_08:30:05_hourly 

646 return f"{self.prefix}{self.infix}{self.timestamp}{self.suffix}" 

647 

648 def notimestamp_str(self) -> str: # bzfs_us-west-1_hourly 

649 """Returns the concatenation of all parts except for the timestamp part.""" 

650 return f"{self.prefix}{self.infix}{self.suffix}" 

651 

652 def validate_label(self, input_text: str) -> None: 

653 """Validates that the composed snapshot label forms a legal name.""" 

654 name: str = str(self) 

655 validate_dataset_name(name, input_text) 

656 if "/" in name: 

657 die(f"Invalid ZFS snapshot name: '{name}' for: '{input_text}'") 

658 for key, value in {"prefix": self.prefix, "infix": self.infix, "suffix": self.suffix}.items(): 

659 if key == "prefix": 

660 if not value.endswith("_"): 

661 die(f"Invalid {input_text}{key}: Must end with an underscore character: '{value}'") 

662 if value.count("_") > 1: 

663 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'") 

664 elif key == "infix": 

665 if value: 

666 if not value.endswith("_"): 

667 die(f"Invalid {input_text}{key}: Must end with an underscore character: '{value}'") 

668 if value.count("_") > 1: 

669 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'") 

670 elif value: 

671 if not value.startswith("_"): 

672 die(f"Invalid {input_text}{key}: Must start with an underscore character: '{value}'") 

673 if value.count("_") > 1: 

674 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'") 

675 

676 

677############################################################################# 

678@final 

679class CreateSrcSnapshotConfig: 

680 """Option values for --create-src-snapshots, that is, for automatically creating source snapshots.""" 

681 

682 def __init__(self, args: argparse.Namespace, p: Params) -> None: 

683 """Option values for --create-src-snapshots*; reads from ArgumentParser via args.""" 

684 # immutable variables: 

685 self.skip_create_src_snapshots: Final[bool] = not args.create_src_snapshots 

686 self.create_src_snapshots_even_if_not_due: Final[bool] = args.create_src_snapshots_even_if_not_due 

687 tz_spec: str | None = args.create_src_snapshots_timezone if args.create_src_snapshots_timezone else None 

688 self.tz: Final[tzinfo | None] = get_timezone(tz_spec) 

689 self.current_datetime: datetime = current_datetime(tz_spec) 

690 self.timeformat: Final[str] = args.create_src_snapshots_timeformat 

691 self.anchors: Final[PeriodAnchors] = PeriodAnchors.parse(args) 

692 

693 # Compute the schedule for upcoming periodic time events (suffix_durations). This event schedule is also used in 

694 # daemon mode via sleep_until_next_daemon_iteration() 

695 suffixes: list[str] = [] 

696 labels: list[SnapshotLabel] = [] 

697 create_src_snapshots_plan: str = args.create_src_snapshots_plan or str({"bzfs": {"onsite": {"adhoc": 1}}}) 

698 for org, target_periods in ast.literal_eval(create_src_snapshots_plan).items(): 

699 for target, periods in target_periods.items(): 

700 for period_unit, period_amount in periods.items(): # e.g. period_unit can be "10minutely" or "minutely" 

701 if not isinstance(period_amount, int) or period_amount < 0: 

702 die(f"--create-src-snapshots-plan: Period amount must be a non-negative integer: {period_amount}") 

703 if period_amount > 0: 

704 suffix: str = nsuffix(period_unit) 

705 suffixes.append(suffix) 

706 labels.append(SnapshotLabel(prefix=nprefix(org), infix=ninfix(target), timestamp="", suffix=suffix)) 

707 xperiods: SnapshotPeriods = p.xperiods 

708 if self.skip_create_src_snapshots: 

709 duration_amount, duration_unit = p.xperiods.suffix_to_duration0(p.daemon_frequency) 

710 if duration_amount <= 0 or not duration_unit: 

711 die(f"Invalid --daemon-frequency: {p.daemon_frequency}") 

712 suffixes = [nsuffix(p.daemon_frequency)] 

713 labels = [] 

714 suffix_durations: dict[str, tuple[int, str]] = {suffix: xperiods.suffix_to_duration1(suffix) for suffix in suffixes} 

715 

716 def suffix_key(suffix: str) -> tuple[int, str]: 

717 duration_amount, duration_unit = suffix_durations[suffix] 

718 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0) 

719 if suffix.endswith(("hourly", "minutely", "secondly")): 

720 if duration_milliseconds != 0 and 86400 * 1000 % duration_milliseconds != 0: 

721 die( 

722 "Invalid --create-src-snapshots-plan: Period duration should be a divisor of 86400 seconds " 

723 f"without remainder so that snapshots will be created at the same time of day every day: {suffix}" 

724 ) 

725 if suffix.endswith("monthly"): 

726 if duration_amount != 0 and 12 % duration_amount != 0: 

727 die( 

728 "Invalid --create-src-snapshots-plan: Period duration should be a divisor of 12 months " 

729 f"without remainder so that snapshots will be created at the same time every year: {suffix}" 

730 ) 

731 return duration_milliseconds, suffix 

732 

733 suffixes = sorted(suffixes, key=suffix_key, reverse=True) # take snapshots for dailies before hourlies, and so on 

734 self.suffix_durations: Final[dict[str, tuple[int, str]]] = {sfx: suffix_durations[sfx] for sfx in suffixes} # sort 

735 suffix_indexes: dict[str, int] = {suffix: k for k, suffix in enumerate(suffixes)} 

736 labels.sort(key=lambda label: (suffix_indexes[label.suffix], label)) # take snapshots for dailies before hourlies 

737 self._snapshot_labels: Final[list[SnapshotLabel]] = labels 

738 for label in self.snapshot_labels(): 

739 label.validate_label("--create-src-snapshots-plan ") 

740 

741 def snapshot_labels(self) -> list[SnapshotLabel]: 

742 """Returns the snapshot name patterns for which snapshots shall be created.""" 

743 timeformat: str = self.timeformat 

744 is_millis: bool = timeformat.endswith("%F") # non-standard hack to append milliseconds 

745 if is_millis: 

746 timeformat = timeformat[0:-1] + "f" # replace %F with %f (append microseconds) 

747 timestamp: str = self.current_datetime.strftime(timeformat) 

748 if is_millis: 

749 timestamp = timestamp[0 : -len("000")] # replace microseconds with milliseconds 

750 timestamp = timestamp.replace("+", "z") # zfs CLI does not accept the '+' character in snapshot names 

751 return [SnapshotLabel(label.prefix, label.infix, timestamp, label.suffix) for label in self._snapshot_labels] 

752 

753 def __repr__(self) -> str: 

754 return str(self.__dict__) 

755 

756 

757############################################################################# 

758@dataclass(frozen=True) 

759@final 

760class AlertConfig: 

761 """Thresholds controlling when alerts fire for snapshot age.""" 

762 

763 kind: Literal["Latest", "Oldest"] 

764 warning_millis: int 

765 critical_millis: int 

766 

767 

768############################################################################# 

769@dataclass(frozen=True) 

770@final 

771class MonitorSnapshotAlert: 

772 """Alert configuration for a single monitored snapshot label.""" 

773 

774 label: SnapshotLabel 

775 latest: AlertConfig | None 

776 oldest: AlertConfig | None 

777 

778 

779############################################################################# 

780@final 

781class MonitorSnapshotsConfig: 

782 """Option values for --monitor-snapshots*, that is, policy describing which snapshots to monitor for staleness.""" 

783 

784 def __init__(self, args: argparse.Namespace, p: Params) -> None: 

785 """Reads from ArgumentParser via args.""" 

786 # immutable variables: 

787 self.monitor_snapshots: Final[dict] = ast.literal_eval(args.monitor_snapshots) 

788 self.dont_warn: Final[bool] = args.monitor_snapshots_dont_warn 

789 self.dont_crit: Final[bool] = args.monitor_snapshots_dont_crit 

790 self.no_latest_check: Final[bool] = args.monitor_snapshots_no_latest_check 

791 self.no_oldest_check: Final[bool] = args.monitor_snapshots_no_oldest_check 

792 alerts: list[MonitorSnapshotAlert] = [] 

793 xperiods: SnapshotPeriods = p.xperiods 

794 for org, target_periods in self.monitor_snapshots.items(): 

795 prefix: str = nprefix(org) 

796 for target, periods in target_periods.items(): 

797 for period_unit, alert_dicts in periods.items(): # e.g. period_unit can be "10minutely" or "minutely" 

798 label = SnapshotLabel(prefix=prefix, infix=ninfix(target), timestamp="", suffix=nsuffix(period_unit)) 

799 alert_latest, alert_oldest = None, None 

800 for alert_type, alert_dict in alert_dicts.items(): 

801 m = "--monitor-snapshots: " 

802 if alert_type not in ["latest", "oldest"]: 

803 die(f"{m}'{alert_type}' must be 'latest' or 'oldest' within {args.monitor_snapshots}") 

804 warning_millis: int = 0 

805 critical_millis: int = 0 

806 cycles: int = 1 

807 for kind, value in alert_dict.items(): 

808 context: str = args.monitor_snapshots 

809 if kind == "warning": 

810 warning_millis = max(0, parse_duration_to_milliseconds(str(value), context=context)) 

811 elif kind == "critical": 

812 critical_millis = max(0, parse_duration_to_milliseconds(str(value), context=context)) 

813 elif kind == "cycles": 

814 cycles = max(0, int(value)) 

815 else: 

816 die(f"{m}'{kind}' must be 'warning', 'critical' or 'cycles' within {context}") 

817 if warning_millis > 0 or critical_millis > 0: 

818 duration_amount, duration_unit = xperiods.suffix_to_duration1(label.suffix) 

819 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0) 

820 warning_millis += 0 if warning_millis <= 0 else cycles * duration_milliseconds 

821 critical_millis += 0 if critical_millis <= 0 else cycles * duration_milliseconds 

822 warning_millis = UNIX_TIME_INFINITY_SECS if warning_millis <= 0 else warning_millis 

823 critical_millis = UNIX_TIME_INFINITY_SECS if critical_millis <= 0 else critical_millis 

824 capitalized_alert_type = cast(Literal["Latest", "Oldest"], sys.intern(alert_type.capitalize())) 

825 alert_config = AlertConfig(capitalized_alert_type, warning_millis, critical_millis) 

826 if alert_type == "latest": 

827 if not self.no_latest_check: 

828 alert_latest = alert_config 

829 else: 

830 assert alert_type == "oldest" 

831 if not self.no_oldest_check: 

832 alert_oldest = alert_config 

833 if alert_latest is not None or alert_oldest is not None: 

834 alerts.append(MonitorSnapshotAlert(label, alert_latest, alert_oldest)) 

835 

836 def alert_sort_key(alert: MonitorSnapshotAlert) -> tuple[int, SnapshotLabel]: 

837 duration_amount, duration_unit = xperiods.suffix_to_duration1(alert.label.suffix) 

838 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0) 

839 return duration_milliseconds, alert.label 

840 

841 alerts.sort(key=alert_sort_key, reverse=True) # check snapshots for dailies before hourlies, and so on 

842 self.alerts: Final[list[MonitorSnapshotAlert]] = alerts 

843 self.enable_monitor_snapshots: Final[bool] = len(alerts) > 0 

844 

845 def __repr__(self) -> str: 

846 return str(self.__dict__) 

847 

848 

849############################################################################# 

850def _fix_send_recv_opts( 

851 opts: list[str], 

852 exclude_long_opts: set[str], 

853 exclude_short_opts: str, 

854 include_arg_opts: set[str], 

855 exclude_arg_opts: frozenset[str] = frozenset(), 

856 preserve_properties: frozenset[str] = frozenset(), 

857) -> tuple[list[str], list[str]]: 

858 """These opts are instead managed via bzfs CLI args --dryrun, etc.""" 

859 assert "-" not in exclude_short_opts 

860 results: list[str] = [] 

861 x_names: set[str] = set(preserve_properties) 

862 i = 0 

863 n = len(opts) 

864 while i < n: 

865 opt: str = opts[i] 

866 i += 1 

867 if opt in exclude_arg_opts: # example: {"-X", "--exclude"} 

868 i += 1 

869 continue 

870 elif opt in include_arg_opts: # example: {"-o", "-x"} 

871 results.append(opt) 

872 if i < n: 

873 if opt == "-o" and "=" in opts[i] and opts[i].split("=", 1)[0] in preserve_properties: 

874 die(f"--preserve-properties: Disallowed ZFS property found in --zfs-recv-program-opt(s): -o {opts[i]}") 

875 if opt == "-x": 

876 x_names.discard(opts[i]) 

877 results.append(opts[i]) 

878 i += 1 

879 elif opt not in exclude_long_opts: # example: {"--dryrun", "--verbose"} 

880 if opt.startswith("-") and opt != "-" and not opt.startswith("--"): 

881 for char in exclude_short_opts: # example: "den" 

882 opt = opt.replace(char, "") 

883 if opt == "-": 

884 continue 

885 results.append(opt) 

886 return results, sorted(x_names) 

887 

888 

889_SSH_MASTER_DOMAIN_SOCKET_FILE_PID_REGEX: Final[re.Pattern[str]] = re.compile(r"^[0-9]+") # see local_ssh_command() 

890 

891 

892def _delete_stale_files( 

893 root_dir: str, 

894 prefix: str, 

895 millis: int = 60 * 60 * 1000, 

896 dirs: bool = False, 

897 exclude: str | None = None, 

898 ssh: bool = False, 

899) -> None: 

900 """Cleans up obsolete files; For example caused by abnormal termination, OS crash.""" 

901 seconds: float = millis / 1000 

902 now: float = time.time() 

903 validate_is_not_a_symlink("", root_dir) 

904 with os.scandir(root_dir) as iterator: 

905 for entry in iterator: 

906 if entry.name == exclude or not entry.name.startswith(prefix): 

907 continue 

908 try: 

909 stats = entry.stat(follow_symlinks=False) 

910 is_dir = entry.is_dir(follow_symlinks=False) 

911 if ((dirs and is_dir) or (not dirs and not is_dir)) and now - stats.st_mtime >= seconds: 

912 if dirs: 

913 shutil.rmtree(entry.path, ignore_errors=True) 

914 elif not (ssh and stat.S_ISSOCK(stats.st_mode)): 

915 os.remove(entry.path) 

916 elif match := _SSH_MASTER_DOMAIN_SOCKET_FILE_PID_REGEX.match(entry.name[len(prefix) :]): 916 ↛ 905line 916 didn't jump to line 905 because the condition on line 916 was always true

917 pid: int = int(match.group(0)) 

918 if pid_exists(pid) is False or now - stats.st_mtime >= 31 * 24 * 60 * 60: 

919 os.remove(entry.path) # bzfs process is no longer alive; its ssh master process isn't either 

920 except FileNotFoundError: 

921 pass # harmless 

922 

923 

924def _create_symlink(src: str, dst_dir: str, dst: str) -> None: 

925 """Creates dst symlink pointing to src using a relative path.""" 

926 rel_path: str = os.path.relpath(src, start=dst_dir) 

927 os.symlink(src=rel_path, dst=os.path.join(dst_dir, dst))