Coverage for bzfs_main / configuration.py: 99%

645 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-29 12:49 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Configuration subsystem; All CLI option/parameter values are reachable from the "Params" class.""" 

16 

17from __future__ import ( 

18 annotations, 

19) 

20import argparse 

21import ast 

22import os 

23import platform 

24import random 

25import re 

26import shutil 

27import stat 

28import sys 

29import tempfile 

30import threading 

31import time 

32from collections.abc import ( 

33 Iterable, 

34) 

35from dataclasses import ( 

36 dataclass, 

37) 

38from datetime import ( 

39 datetime, 

40 tzinfo, 

41) 

42from logging import ( 

43 Logger, 

44) 

45from typing import ( 

46 Final, 

47 Literal, 

48 NamedTuple, 

49 cast, 

50 final, 

51) 

52 

53from bzfs_main.argparse_actions import ( 

54 SnapshotFilter, 

55 optimize_snapshot_filters, 

56) 

57from bzfs_main.argparse_cli import ( 

58 LOG_DIR_DEFAULT, 

59 ZFS_RECV_GROUPS, 

60 ZFS_RECV_O, 

61 ZFS_RECV_O_INCLUDE_REGEX_DEFAULT, 

62 __version__, 

63) 

64from bzfs_main.detect import ( 

65 DISABLE_PRG, 

66) 

67from bzfs_main.filter import ( 

68 SNAPSHOT_FILTERS_VAR, 

69) 

70from bzfs_main.period_anchors import ( 

71 PeriodAnchors, 

72) 

73from bzfs_main.util import ( 

74 utils, 

75) 

76from bzfs_main.util.connection import ( 

77 ConnectionPools, 

78 MiniParams, 

79 MiniRemote, 

80) 

81from bzfs_main.util.retry import ( 

82 RetryPolicy, 

83) 

84from bzfs_main.util.utils import ( 

85 DIR_PERMISSIONS, 

86 FILE_PERMISSIONS, 

87 PROG_NAME, 

88 SHELL_CHARS, 

89 UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH, 

90 UNIX_TIME_INFINITY_SECS, 

91 RegexList, 

92 SnapshotPeriods, 

93 SynchronizedBool, 

94 append_if_absent, 

95 compile_regexes, 

96 current_datetime, 

97 die, 

98 get_home_directory, 

99 get_timezone, 

100 getenv_bool, 

101 getenv_int, 

102 is_included, 

103 ninfix, 

104 nprefix, 

105 nsuffix, 

106 open_nofollow, 

107 parse_duration_to_milliseconds, 

108 pid_exists, 

109 sha256_hex, 

110 sha256_urlsafe_base64, 

111 urlsafe_base64, 

112 validate_dataset_name, 

113 validate_file_permissions, 

114 validate_is_not_a_symlink, 

115 validate_property_name, 

116 xappend, 

117) 

118 

119# constants: 

120_UNSET_ENV_VARS_LOCK: Final[threading.Lock] = threading.Lock() 

121_UNSET_ENV_VARS_LATCH: Final[SynchronizedBool] = SynchronizedBool(True) 

122 

123 

124############################################################################# 

125@final 

126class LogParams: 

127 """Option values for logging.""" 

128 

129 def __init__(self, args: argparse.Namespace) -> None: 

130 """Reads from ArgumentParser via args.""" 

131 # immutable variables: 

132 if args.quiet: 

133 log_level: str = "ERROR" 

134 elif args.verbose >= 2: 

135 log_level = "TRACE" 

136 elif args.verbose >= 1: 

137 log_level = "DEBUG" 

138 else: 

139 log_level = "INFO" 

140 self.log_level: Final[str] = log_level 

141 self.timestamp: Final[str] = datetime.now().isoformat(sep="_", timespec="seconds") # 2024-09-03_12:26:15 

142 self.isatty: Final[bool] = getenv_bool("isatty", True) 

143 self.quiet: Final[bool] = args.quiet 

144 self.terminal_columns: Final[int] = ( 

145 getenv_int("terminal_columns", shutil.get_terminal_size(fallback=(120, 24)).columns) 

146 if self.isatty and args.pv_program != DISABLE_PRG and not self.quiet 

147 else 0 

148 ) 

149 self.home_dir: Final[str] = get_home_directory() 

150 log_parent_dir: Final[str] = args.log_dir if args.log_dir else os.path.join(self.home_dir, LOG_DIR_DEFAULT) 

151 if LOG_DIR_DEFAULT not in os.path.basename(log_parent_dir): 

152 die(f"Basename of --log-dir must contain the substring '{LOG_DIR_DEFAULT}', but got: {log_parent_dir}") 

153 sep: str = "_" if args.log_subdir == "daily" else ":" 

154 timestamp: str = self.timestamp 

155 subdir: str = timestamp[: timestamp.rindex(sep) if args.log_subdir == "minutely" else timestamp.index(sep)] 

156 # 2024-09-03 (d), 2024-09-03_12 (h), 2024-09-03_12:26 (m) 

157 self.log_dir: Final[str] = os.path.join(log_parent_dir, subdir) 

158 os.makedirs(log_parent_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

159 validate_is_not_a_symlink("--log-dir ", log_parent_dir) 

160 validate_file_permissions(log_parent_dir, DIR_PERMISSIONS) 

161 os.makedirs(self.log_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

162 validate_is_not_a_symlink("--log-dir subdir ", self.log_dir) 

163 validate_file_permissions(self.log_dir, DIR_PERMISSIONS) 

164 self.log_file_prefix: Final[str] = args.log_file_prefix 

165 self.log_file_infix: Final[str] = args.log_file_infix 

166 self.log_file_suffix: Final[str] = args.log_file_suffix 

167 fd, self.log_file = tempfile.mkstemp( 

168 suffix=".log", 

169 prefix=f"{self.log_file_prefix}{self.timestamp}{self.log_file_infix}{self.log_file_suffix}-", 

170 dir=self.log_dir, 

171 ) 

172 os.fchmod(fd, FILE_PERMISSIONS) 

173 os.close(fd) 

174 self.pv_log_file: Final[str] = self.log_file[: -len(".log")] + ".pv" 

175 log_file_stem: str = os.path.basename(self.log_file)[: -len(".log")] 

176 # Python's standard logger naming API interprets chars such as '.', '-', ':', spaces, etc in special ways, e.g. 

177 # logging.getLogger("foo.bar") vs logging.getLogger("foo-bar"). Thus, we sanitize the Python logger name via a regex: 

178 self.logger_name_suffix: Final[str] = re.sub(r"[^A-Za-z0-9_]", repl="_", string=log_file_stem) 

179 cache_root_dir: str = os.path.join(log_parent_dir, ".cache") 

180 os.makedirs(cache_root_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

181 validate_file_permissions(cache_root_dir, DIR_PERMISSIONS) 

182 self.last_modified_cache_dir: Final[str] = os.path.join(cache_root_dir, "mods") 

183 

184 # Create/update "current" symlink to current_dir, which is a subdir containing further symlinks to log files. 

185 # For parallel usage, ensures there is no time window when the symlinks are inconsistent or do not exist. 

186 current: str = "current" 

187 dot_current_dir: str = os.path.join(log_parent_dir, f".{current}") 

188 current_dir: str = os.path.join(dot_current_dir, log_file_stem) 

189 os.makedirs(dot_current_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

190 validate_is_not_a_symlink("--log-dir: .current ", dot_current_dir) 

191 try: 

192 os.makedirs(current_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

193 _create_symlink(self.log_file, current_dir, f"{current}.log") 

194 _create_symlink(self.pv_log_file, current_dir, f"{current}.pv") 

195 _create_symlink(self.log_dir, current_dir, f"{current}.dir") 

196 dst_file: str = os.path.join(current_dir, current) 

197 os.symlink(os.path.relpath(current_dir, start=log_parent_dir), dst_file) 

198 os.replace(dst_file, os.path.join(log_parent_dir, current)) # atomic rename 

199 _delete_stale_files(dot_current_dir, prefix="", millis=5000, dirs=True, exclude=os.path.basename(current_dir)) 

200 except FileNotFoundError: 

201 pass # harmless concurrent cleanup 

202 

203 def __repr__(self) -> str: 

204 return str(self.__dict__) 

205 

206 

207############################################################################# 

208@final 

209class Params(MiniParams): 

210 """All parsed CLI options combined into a single bundle; simplifies passing around numerous settings and defaults.""" 

211 

212 def __init__( 

213 self, 

214 args: argparse.Namespace, 

215 sys_argv: list[str], 

216 log_params: LogParams, 

217 log: Logger, 

218 inject_params: dict[str, bool] | None = None, # for testing only 

219 ) -> None: 

220 """Reads from ArgumentParser via args.""" 

221 # immutable variables: 

222 assert args is not None 

223 assert isinstance(sys_argv, list) 

224 assert log_params is not None 

225 assert log is not None 

226 self.args: Final[argparse.Namespace] = args 

227 self.sys_argv: Final[list[str]] = sys_argv 

228 self.log_params: Final[LogParams] = log_params 

229 self.log: Logger = log 

230 self.inject_params: Final[dict[str, bool]] = inject_params if inject_params is not None else {} # for testing only 

231 self.one_or_more_whitespace_regex: Final[re.Pattern[str]] = re.compile(r"\s+") 

232 self.two_or_more_spaces_regex: Final[re.Pattern[str]] = re.compile(r" +") 

233 self._unset_matching_env_vars(args) 

234 self.xperiods: Final[SnapshotPeriods] = SnapshotPeriods() 

235 

236 assert len(args.root_dataset_pairs) > 0 

237 self.root_dataset_pairs: Final[list[tuple[str, str]]] = args.root_dataset_pairs 

238 self.recursive: Final[bool] = args.recursive 

239 self.recursive_flag: Final[str] = "-r" if args.recursive else "" 

240 

241 self.dry_run: Final[bool] = args.dryrun is not None 

242 self.dry_run_recv: Final[str] = "-n" if self.dry_run else "" 

243 self.dry_run_destroy: Final[str] = self.dry_run_recv 

244 self.dry_run_no_send: Final[bool] = args.dryrun == "send" 

245 self.verbose_zfs: Final[bool] = args.verbose >= 2 

246 self.verbose_destroy: Final[str] = "" if args.quiet else "-v" 

247 

248 self.zfs_send_program_opts: Final[list[str]] = self._fix_send_opts(self.split_args(args.zfs_send_program_opts)) 

249 zfs_recv_program_opts: list[str] = self.split_args(args.zfs_recv_program_opts) 

250 for extra_opt in args.zfs_recv_program_opt: 

251 zfs_recv_program_opts.append(self.validate_arg_str(extra_opt, allow_all=True)) 

252 preserve_properties = [validate_property_name(name, "--preserve-properties") for name in args.preserve_properties] 

253 zfs_recv_program_opts, zfs_recv_x_names = self._fix_recv_opts(zfs_recv_program_opts, frozenset(preserve_properties)) 

254 self.zfs_recv_program_opts: Final[list[str]] = zfs_recv_program_opts 

255 self.zfs_recv_x_names: Final[list[str]] = zfs_recv_x_names 

256 if self.verbose_zfs: 

257 append_if_absent(self.zfs_send_program_opts, "-v") 

258 append_if_absent(self.zfs_recv_program_opts, "-v") 

259 # zfs_full_recv_opts: dataset-specific dynamic -o/-x property options are computed later per dataset in 

260 # replication._add_recv_property_options(): 

261 self.zfs_full_recv_opts: Final[list[str]] = self.zfs_recv_program_opts.copy() 

262 cpconfigs = [CopyPropertiesConfig(group, flag, args, self) for group, flag in ZFS_RECV_GROUPS.items()] 

263 self.zfs_recv_o_config, self.zfs_recv_x_config, self.zfs_set_config = cpconfigs 

264 

265 self.force_rollback_to_latest_snapshot: Final[bool] = args.force_rollback_to_latest_snapshot 

266 self.force_rollback_to_latest_common_snapshot: Final[SynchronizedBool] = SynchronizedBool( 

267 args.force_rollback_to_latest_common_snapshot 

268 ) 

269 self.force: Final[SynchronizedBool] = SynchronizedBool(args.force) 

270 self.force_once: Final[bool] = args.force_once 

271 self.force_unmount: Final[str] = "-f" if args.force_unmount else "" 

272 self.force_hard: Final[str] = "-R" if args.force_destroy_dependents else "" 

273 

274 self.skip_parent: Final[bool] = args.skip_parent 

275 self.skip_missing_snapshots: Final[str] = args.skip_missing_snapshots 

276 self.skip_on_error: Final[str] = args.skip_on_error 

277 self.retry_policy: Final[RetryPolicy] = RetryPolicy.from_namespace(args).copy(reraise=True) 

278 self.skip_replication: Final[bool] = args.skip_replication 

279 self.delete_dst_snapshots: Final[bool] = args.delete_dst_snapshots is not None 

280 self.delete_dst_bookmarks: Final[bool] = args.delete_dst_snapshots == "bookmarks" 

281 self.delete_dst_snapshots_no_crosscheck: Final[bool] = args.delete_dst_snapshots_no_crosscheck 

282 self.delete_dst_snapshots_except: Final[bool] = args.delete_dst_snapshots_except 

283 self.delete_dst_datasets: Final[bool] = args.delete_dst_datasets 

284 self.delete_empty_dst_datasets: Final[bool] = args.delete_empty_dst_datasets is not None 

285 self.delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots: Final[bool] = ( 

286 args.delete_empty_dst_datasets == "snapshots+bookmarks" 

287 ) 

288 self.compare_snapshot_lists: Final[str] = args.compare_snapshot_lists 

289 self.daemon_lifetime_nanos: Final[int] = 1_000_000 * parse_duration_to_milliseconds(args.daemon_lifetime) 

290 self.daemon_frequency: Final[str] = args.daemon_frequency 

291 self.enable_privilege_elevation: Final[bool] = not args.no_privilege_elevation 

292 self.no_stream: Final[bool] = args.no_stream 

293 self.resume_recv: Final[bool] = not args.no_resume_recv 

294 self.create_bookmarks: Final[str] = args.create_bookmarks 

295 self.use_bookmark: Final[bool] = not args.no_use_bookmark 

296 self.r2r_mode_requested: Final[str] = args.r2r 

297 

298 self.src: Final[Remote] = Remote("src", args, self) # src dataset, host and ssh options 

299 self.dst: Final[Remote] = Remote("dst", args, self) # dst dataset, host and ssh options 

300 self.create_src_snapshots_config: Final[CreateSrcSnapshotConfig] = CreateSrcSnapshotConfig(args, self) 

301 self.monitor_snapshots_config: Final[MonitorSnapshotsConfig] = MonitorSnapshotsConfig(args, self) 

302 self.is_caching_snapshots: Final[bool] = args.cache_snapshots 

303 

304 self.compression_program: Final[str] = self._program_name(args.compression_program) 

305 self.compression_program_opts: Final[list[str]] = self.split_args(args.compression_program_opts) 

306 for opt in {"-o", "--output-file"}.intersection(self.compression_program_opts): 

307 die(f"--compression-program-opts: {opt} is disallowed for security reasons.") 

308 self.getconf_program: Final[str] = self._program_name("getconf") # print number of CPUs on POSIX 

309 self.mbuffer_program: Final[str] = self._program_name(args.mbuffer_program) 

310 self.mbuffer_program_opts: Final[list[str]] = self.split_args(args.mbuffer_program_opts) 

311 for opt in {"-i", "-I", "-o", "-O", "-l", "-L", "-t", "-T", "-a", "-A"}.intersection(self.mbuffer_program_opts): 

312 die(f"--mbuffer-program-opts: {opt} is disallowed for security reasons.") 

313 self.bwlimit: Final[str] = self.validate_arg_str(args.bwlimit) if args.bwlimit else "" 

314 self.ps_program: Final[str] = self._program_name(args.ps_program) 

315 self.pv_program: Final[str] = self._program_name(args.pv_program) 

316 self.pv_program_opts: Final[list[str]] = self.split_args(args.pv_program_opts) 

317 bad_pv_opts = {"-o", "--output", "-f", "--log-file", "-S", "--stop-at-size", "-Y", "--sync", "-X", "--discard", 

318 "-U", "--store-and-forward", "-d", "--watchfd", "-R", "--remote", "-P", "--pidfile"} # fmt: skip 

319 for opt in bad_pv_opts.intersection(self.pv_program_opts): 

320 die(f"--pv-program-opts: {opt} is disallowed for security reasons.") 

321 if self.bwlimit: 

322 self.pv_program_opts.extend([f"--rate-limit={self.bwlimit}"]) 

323 self.shell_program_local: Final[str] = "sh" 

324 self.shell_program: Final[str] = self._program_name(args.shell_program) 

325 self.ssh_program: str = self._program_name(args.ssh_program) 

326 self.sudo_program: Final[str] = self._program_name(args.sudo_program) 

327 self.uname_program: Final[str] = self._program_name("uname") 

328 self.zfs_program: Final[str] = self._program_name("zfs") 

329 self.zpool_program: Final[str] = self._program_name(args.zpool_program) 

330 

331 # no point creating complex shell pipeline commands for tiny data transfers: 

332 self.min_pipe_transfer_size: Final[int] = getenv_int("min_pipe_transfer_size", 1024 * 1024) 

333 self.max_datasets_per_batch_on_list_snaps: Final[int] = getenv_int("max_datasets_per_batch_on_list_snaps", 1024) 

334 self.max_datasets_per_minibatch_on_list_snaps: int = getenv_int("max_datasets_per_minibatch_on_list_snaps", -1) 

335 self.max_snapshots_per_minibatch_on_delete_snaps = getenv_int("max_snapshots_per_minibatch_on_delete_snaps", 2**29) 

336 self.dedicated_tcp_connection_per_zfs_send: Final[bool] = getenv_bool("dedicated_tcp_connection_per_zfs_send", True) 

337 # threads: with --force-once we intentionally coerce to a single-threaded run to ensure deterministic serial behavior 

338 self.threads: Final[tuple[int, bool]] = (1, False) if self.force_once else args.threads 

339 timeout_duration_nanos = None if args.timeout is None else 1_000_000 * parse_duration_to_milliseconds(args.timeout) 

340 self.timeout_duration_nanos: int | None = timeout_duration_nanos # duration (not a timestamp); for logging only 

341 self.no_estimate_send_size: Final[bool] = args.no_estimate_send_size 

342 self.remote_conf_cache_ttl_nanos: Final[int] = 1_000_000 * parse_duration_to_milliseconds( 

343 args.daemon_remote_conf_cache_ttl 

344 ) 

345 

346 self.os_cpu_count: Final[int | None] = os.cpu_count() 

347 self.os_getuid: Final[int] = os.getuid() 

348 self.os_geteuid: Final[int] = os.geteuid() 

349 self.prog_version: Final[str] = __version__ 

350 self.python_version: Final[str] = sys.version 

351 self.platform_version: Final[str] = platform.version() 

352 self.platform_platform: Final[str] = platform.platform() 

353 

354 # mutable variables: 

355 snapshot_filters = args.snapshot_filters_var if hasattr(args, SNAPSHOT_FILTERS_VAR) else [[]] 

356 self.snapshot_filters: list[list[SnapshotFilter]] = [optimize_snapshot_filters(f) for f in snapshot_filters] 

357 self.exclude_dataset_property: str | None = args.exclude_dataset_property 

358 self.exclude_dataset_regexes: RegexList = [] # deferred to validate_task() phase 

359 self.include_dataset_regexes: RegexList = [] # deferred to validate_task() phase 

360 self.tmp_exclude_dataset_regexes: RegexList = [] # deferred to validate_task() phase 

361 self.tmp_include_dataset_regexes: RegexList = [] # deferred to validate_task() phase 

362 self.abs_exclude_datasets: list[str] = [] # deferred to validate_task() phase 

363 self.abs_include_datasets: list[str] = [] # deferred to validate_task() phase 

364 self.r2r_mode: str = "off" # deferred to validate_task() phase 

365 

366 self.curr_zfs_send_program_opts: list[str] = [] 

367 self.zfs_recv_ox_names: set[str] = set() 

368 self.available_programs: dict[str, dict[str, str]] = {} 

369 self.zpool_features: dict[str, dict[str, dict[str, str]]] = {r.location: {} for r in [self.src, self.dst]} 

370 self.connection_pools: dict[str, ConnectionPools] = {} 

371 

372 def split_args(self, text: str, *items: str | Iterable[str], allow_all: bool = False) -> list[str]: 

373 """Splits option string on runs of one or more whitespace into an option list.""" 

374 text = text.strip() 

375 opts = self.one_or_more_whitespace_regex.split(text) if text else [] 

376 xappend(opts, items) 

377 if not allow_all: 

378 self._validate_quoting(opts) 

379 return opts 

380 

381 def validate_arg(self, opt: str, allow_spaces: bool = False, allow_all: bool = False) -> str | None: 

382 """allow_all permits all characters, including whitespace and quotes; See squote() and dquote().""" 

383 if allow_all or opt is None: 

384 return opt 

385 if any(char.isspace() and (char != " " or not allow_spaces) for char in opt): 

386 die(f"Option must not contain a whitespace character{' other than space' if allow_spaces else ''}: {opt}") 

387 self._validate_quoting([opt]) 

388 return opt 

389 

390 def validate_arg_str(self, opt: str, allow_spaces: bool = False, allow_all: bool = False) -> str: 

391 """Returns validated option string, raising if missing or illegal.""" 

392 if opt is None: 

393 die("Option must not be missing") 

394 self.validate_arg(opt, allow_spaces=allow_spaces, allow_all=allow_all) 

395 return opt 

396 

397 @staticmethod 

398 def _validate_quoting(opts: list[str]) -> None: 

399 """Raises an error if any option contains a quote or shell metacharacter.""" 

400 for opt in opts: 

401 if "'" in opt or '"' in opt or "$" in opt or "`" in opt: 

402 die(f"Option must not contain a single quote or double quote or dollar or backtick character: {opt}") 

403 

404 @staticmethod 

405 def _fix_recv_opts(opts: list[str], preserve_properties: frozenset[str]) -> tuple[list[str], list[str]]: 

406 """Returns sanitized ``zfs recv`` options and captured ``-o/-x`` args.""" 

407 return _fix_send_recv_opts( 

408 opts, 

409 exclude_long_opts={"--dryrun"}, 

410 exclude_short_opts="densFA", 

411 include_arg_opts={"-o", "-x"}, 

412 preserve_properties=preserve_properties, 

413 ) 

414 

415 @staticmethod 

416 def _fix_send_opts(opts: list[str]) -> list[str]: 

417 """Returns sanitized ``zfs send`` options.""" 

418 return _fix_send_recv_opts( 

419 opts, 

420 exclude_long_opts={"--dryrun"}, 

421 exclude_short_opts="den", 

422 include_arg_opts={"-X", "--exclude", "--redact"}, 

423 exclude_arg_opts=frozenset({"-i", "-I", "-t", "--resume"}), 

424 )[0] 

425 

426 def _program_name(self, program: str) -> str: 

427 """For testing: helps simulate errors caused by external programs.""" 

428 self.validate_arg_str(program) 

429 if not program: 

430 die(f"Program name must not be missing: {program}") 

431 for char in SHELL_CHARS + ":": 

432 if char in program: 

433 die(f"Program name must not contain a '{char}' character: {program}") 

434 if self.inject_params.get("inject_unavailable_" + program, False): 

435 return program + "-xxx" # substitute a program that cannot be found on the PATH 

436 if self.inject_params.get("inject_failing_" + program, False): 

437 return "false" # substitute a program that will error out with non-zero return code 

438 return program 

439 

440 def _unset_matching_env_vars(self, args: argparse.Namespace) -> None: 

441 """Unset environment variables matching regex filters.""" 

442 if len(args.exclude_envvar_regex) == 0 and len(args.include_envvar_regex) == 0: 

443 return # fast path 

444 exclude_envvar_regexes: RegexList = compile_regexes(args.exclude_envvar_regex) 

445 include_envvar_regexes: RegexList = compile_regexes(args.include_envvar_regex) 

446 # Mutate global state at most once, atomically. First thread wins. The latch isn't strictly necessary for 

447 # correctness as all concurrent bzfs.Job instances in bzfs_jobrunner have identical include/exclude_envvar_regex 

448 # anyway. It's just for reduced latency. 

449 with _UNSET_ENV_VARS_LOCK: 

450 if _UNSET_ENV_VARS_LATCH.get_and_set(False): 

451 for envvar_name in list(os.environ): 

452 # order of include vs exclude is intentionally reversed to correctly implement semantics: 

453 # "unset env var iff excluded and not included (include takes precedence)." 

454 if is_included(envvar_name, exclude_envvar_regexes, include_envvar_regexes): 

455 os.environ.pop(envvar_name, None) 

456 self.log.debug("Unsetting b/c envvar regex: %s", envvar_name) 

457 

458 def lock_file_name(self) -> str: 

459 """Returns unique path used to detect concurrently running jobs. 

460 

461 Makes it such that a job that runs periodically declines to start if the same previous periodic job is still running 

462 without completion yet. Hashed key avoids overly long filenames while remaining deterministic. 

463 """ 

464 # fmt: off 

465 key = (tuple(self.root_dataset_pairs), self.args.recursive, self.args.exclude_dataset_property, 

466 tuple(self.args.include_dataset), tuple(self.args.exclude_dataset), 

467 tuple(self.args.include_dataset_regex), tuple(self.args.exclude_dataset_regex), 

468 tuple(tuple(f) for f in self.snapshot_filters), self.args.skip_replication, self.args.create_src_snapshots, 

469 self.args.create_src_snapshots_plan, self.args.create_src_snapshots_timeformat, 

470 self.create_src_snapshots_config.anchors, 

471 self.args.delete_dst_datasets, self.args.delete_dst_snapshots, self.args.delete_dst_snapshots_except, 

472 self.args.delete_empty_dst_datasets, 

473 self.args.compare_snapshot_lists, self.args.monitor_snapshots, 

474 self.src.basis_ssh_host, self.dst.basis_ssh_host, 

475 self.src.basis_ssh_user, self.dst.basis_ssh_user, 

476 self.src.ssh_port, self.dst.ssh_port, 

477 os.path.abspath(self.src.ssh_config_file) if self.src.ssh_config_file else "", 

478 os.path.abspath(self.dst.ssh_config_file) if self.dst.ssh_config_file else "", 

479 ) 

480 # fmt: on 

481 hash_code: str = sha256_hex(str(key)) 

482 log_parent_dir: str = os.path.dirname(self.log_params.log_dir) 

483 locks_dir: str = os.path.join(log_parent_dir, ".locks") 

484 os.makedirs(locks_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

485 validate_is_not_a_symlink("--locks-dir ", locks_dir) 

486 validate_file_permissions(locks_dir, DIR_PERMISSIONS) 

487 return os.path.join(locks_dir, f"{PROG_NAME}-lockfile-{hash_code}.lock") 

488 

489 def dry(self, msg: str) -> str: 

490 """Prefix ``msg`` with 'Dry' when running in dry-run mode.""" 

491 return utils.dry(msg, self.dry_run) 

492 

493 def is_program_available(self, program: str, location: str) -> bool: 

494 """Return True if ``program`` was detected on ``location`` host.""" 

495 return program in self.available_programs.get(location, {}) 

496 

497 

498############################################################################# 

499@final 

500class Remote(MiniRemote): 

501 """Connection settings for either source or destination host.""" 

502 

503 def __init__(self, loc: str, args: argparse.Namespace, p: Params) -> None: 

504 """Reads from ArgumentParser via args.""" 

505 # immutable variables: 

506 assert loc == "src" or loc == "dst" 

507 self.location: str = loc 

508 self.params: Params = p 

509 self.basis_ssh_user: Final[str] = getattr(args, f"ssh_{loc}_user") 

510 self.basis_ssh_host: Final[str] = getattr(args, f"ssh_{loc}_host") 

511 self.ssh_port: Final[int | None] = getattr(args, f"ssh_{loc}_port") 

512 self.ssh_config_file: Final[str | None] = p.validate_arg(getattr(args, f"ssh_{loc}_config_file")) 

513 if self.ssh_config_file and self.ssh_config_file != "none": 

514 # `ssh -F none` will not read any config file per https://man7.org/linux/man-pages/man1/ssh.1.html 

515 if "bzfs_ssh_config" not in os.path.basename(self.ssh_config_file): 

516 die(f"Basename of --ssh-{loc}-config-file must contain substring 'bzfs_ssh_config': {self.ssh_config_file}") 

517 with open_nofollow(self.ssh_config_file, "rb"): 

518 pass # validate 

519 self.ssh_config_file_hash: Final[str] = ( 

520 sha256_urlsafe_base64(os.path.abspath(self.ssh_config_file), padding=False) if self.ssh_config_file else "" 

521 ) 

522 self.ssh_cipher: Final[str] = p.validate_arg_str(args.ssh_cipher) 

523 # disable interactive password prompts and X11 forwarding and pseudo-terminal allocation: 

524 ssh_extra_opts: list[str] = ["-oBatchMode=yes", "-oServerAliveInterval=0", "-x", "-T"] + ( 

525 ["-v"] if args.verbose >= 3 else [] 

526 ) 

527 self.ssh_extra_opts: tuple[str, ...] = tuple(ssh_extra_opts) 

528 self.max_concurrent_ssh_sessions_per_tcp_connection: Final[int] = args.max_concurrent_ssh_sessions_per_tcp_connection 

529 self.ssh_exit_on_shutdown: bool = args.ssh_exit_on_shutdown 

530 self.ssh_control_persist_secs: int = args.ssh_control_persist_secs 

531 self.ssh_control_persist_margin_secs: int = getenv_int("ssh_control_persist_margin_secs", 2) 

532 self.socket_prefix: Final[str] = "s" 

533 self.reuse_ssh_connection: bool = getenv_bool("reuse_ssh_connection", True) 

534 self.ssh_socket_dir: str = "" 

535 if self.reuse_ssh_connection: 

536 ssh_home_dir: str = os.path.join(get_home_directory(), ".ssh") 

537 os.makedirs(ssh_home_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

538 self.ssh_socket_dir = os.path.join(ssh_home_dir, "bzfs") 

539 os.makedirs(self.ssh_socket_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

540 validate_file_permissions(self.ssh_socket_dir, mode=DIR_PERMISSIONS) 

541 self.ssh_exit_on_shutdown_socket_dir: Final[str] = os.path.join(self.ssh_socket_dir, "x") 

542 os.makedirs(self.ssh_exit_on_shutdown_socket_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

543 validate_file_permissions(self.ssh_exit_on_shutdown_socket_dir, mode=DIR_PERMISSIONS) 

544 _delete_stale_files(self.ssh_exit_on_shutdown_socket_dir, prefix=self.socket_prefix, ssh=True) 

545 self.sanitize1_regex: Final[re.Pattern[str]] = re.compile(r"[\s\\/@$]") # replace whitespace, /, $, \, @ with ~ char 

546 self.sanitize2_regex: Final[re.Pattern[str]] = re.compile(rf"[^a-zA-Z0-9{re.escape('~.:_-')}]") # remove bad chars 

547 

548 # mutable variables: 

549 self.root_dataset: str = "" # deferred until run_main() 

550 self.basis_root_dataset: str = "" # deferred until run_main() 

551 self.pool: str = "" 

552 self.sudo: str = "" 

553 self.use_zfs_delegation: bool = False 

554 self.ssh_user: str = "" 

555 self.ssh_host: str = "" 

556 self.ssh_user_host: str = "" 

557 self.is_nonlocal: bool = False 

558 

559 def local_ssh_command(self, socket_file: str | None) -> tuple[list[str], str | None]: 

560 """Returns the ssh CLI command to run locally in order to talk to the remote host; This excludes the (trailing) 

561 command to run on the remote host, which will be appended later; also returns the effective ControlPath used by the 

562 ssh CLI command, or ``None`` when SSH multiplexing is not active.""" 

563 if not self.ssh_user_host: 

564 return [], None # dataset is on local host - don't use ssh 

565 

566 # dataset is on remote host 

567 p: Params = self.params 

568 if p.ssh_program == DISABLE_PRG: 

569 die("Cannot talk to remote host because ssh CLI is disabled.") 

570 ssh_cmd: list[str] = [p.ssh_program] + list(self.ssh_extra_opts) 

571 if self.ssh_config_file: 

572 ssh_cmd += ["-F", self.ssh_config_file] 

573 if self.ssh_cipher: 573 ↛ 575line 573 didn't jump to line 575 because the condition on line 573 was always true

574 ssh_cmd += ["-c", self.ssh_cipher] 

575 if self.ssh_port: 

576 ssh_cmd += ["-p", str(self.ssh_port)] 

577 

578 socket_path: str | None = None 

579 if self.reuse_ssh_connection: 

580 # Performance: reuse ssh connection for low latency startup of frequent ssh invocations via the 'ssh -S' and 

581 # 'ssh -S -M -oControlPersist=60s' options. See https://en.wikibooks.org/wiki/OpenSSH/Cookbook/Multiplexing 

582 if socket_file: 

583 socket_path = socket_file 

584 else: 

585 # Generate unique private Unix domain socket file name in user's home dir and pass it to 'ssh -S /path/to/socket' 

586 def sanitize(name: str) -> str: 

587 name = self.sanitize1_regex.sub("~", name) # replace whitespace, /, $, \, @ with a ~ tilde char 

588 name = self.sanitize2_regex.sub("", name) # Remove disallowed chars 

589 return name 

590 

591 max_rand: int = 999_999_999_999 

592 rand_str: str = urlsafe_base64(random.SystemRandom().randint(0, max_rand), max_value=max_rand, padding=False) 

593 curr_time: str = urlsafe_base64(time.time_ns(), max_value=2**64 - 1, padding=False) 

594 unique: str = f"{os.getpid()}@{curr_time}@{rand_str}" 

595 optional: str = f"@{sanitize(self.ssh_host)[:45]}@{sanitize(self.ssh_user)}" 

596 socket_name: str = f"{self.socket_prefix}{unique}{optional}" 

597 socket_path = os.path.join(self.ssh_exit_on_shutdown_socket_dir, socket_name) 

598 socket_path = socket_path[: max(UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH, len(socket_path) - len(optional))] 

599 # `ssh` will error out later if the max OS Unix domain socket path limit cannot be met reasonably as the 

600 # home directory path is too long, typically because the Unix user name is unreasonably long. 

601 ssh_cmd += ["-S", socket_path] 

602 ssh_cmd += [self.ssh_user_host] 

603 return ssh_cmd, socket_path 

604 

605 def cache_key(self) -> tuple[str, str, int | None, str | None]: 

606 """Returns tuple uniquely identifying this Remote for caching.""" 

607 return self.location, self.ssh_user_host, self.ssh_port, self.ssh_config_file 

608 

609 def cache_namespace(self) -> str: 

610 """Returns cache namespace string which is a stable, unique directory component for caches that distinguishes 

611 endpoints by username+host+port+ssh_config_file where applicable, and uses '-' when no user/host is present (local 

612 mode).""" 

613 if not self.ssh_user_host: 

614 return "-" # local mode 

615 return f"{self.ssh_user_host}#{self.ssh_port or ''}#{self.ssh_config_file_hash}" 

616 

617 def is_ssh_available(self) -> bool: 

618 """Return True if the ssh client program required for this remote is available on the local host.""" 

619 return self.params.is_program_available("ssh", "local") 

620 

621 def __repr__(self) -> str: 

622 return str(self.__dict__) 

623 

624 

625############################################################################# 

626@final 

627class CopyPropertiesConfig: 

628 """--zfs-recv-o* and --zfs-recv-x* option groups for copying or excluding ZFS properties on receive.""" 

629 

630 def __init__(self, group: str, flag: str, args: argparse.Namespace, p: Params) -> None: 

631 """Reads from ArgumentParser via args.""" 

632 assert group in ZFS_RECV_GROUPS 

633 # immutable variables: 

634 grup: str = group 

635 self.group: Final[str] = group # one of zfs_recv_o, zfs_recv_x 

636 self.flag: Final[str] = flag # one of -o or -x 

637 sources: str = p.validate_arg_str(getattr(args, f"{grup}_sources")) 

638 self.sources: Final[str] = ",".join(sorted([s.strip() for s in sources.strip().split(",")])) # canonicalize 

639 self.targets: Final[str] = p.validate_arg_str(getattr(args, f"{grup}_targets")) 

640 include_regexes: list[str] | None = getattr(args, f"{grup}_include_regex") 

641 assert ZFS_RECV_O in ZFS_RECV_GROUPS 

642 if include_regexes is None: 

643 include_regexes = [ZFS_RECV_O_INCLUDE_REGEX_DEFAULT] if grup == ZFS_RECV_O else [] 

644 self.include_regexes: Final[RegexList] = compile_regexes(include_regexes) 

645 self.exclude_regexes: Final[RegexList] = compile_regexes(getattr(args, f"{grup}_exclude_regex")) 

646 

647 def __repr__(self) -> str: 

648 return str(self.__dict__) 

649 

650 

651############################################################################# 

652@final 

653class SnapshotLabel(NamedTuple): 

654 """Contains the individual parts that are concatenated into a ZFS snapshot name.""" 

655 

656 prefix: str # bzfs_ 

657 infix: str # us-west_ 

658 timestamp: str # 2024-11-06_08:30:05 

659 suffix: str # _hourly 

660 

661 def __str__(self) -> str: # bzfs_us-west_2024-11-06_08:30:05_hourly 

662 return f"{self.prefix}{self.infix}{self.timestamp}{self.suffix}" 

663 

664 def notimestamp_str(self) -> str: # bzfs_us-west_hourly 

665 """Returns the concatenation of all parts except for the timestamp part.""" 

666 return f"{self.prefix}{self.infix}{self.suffix}" 

667 

668 def validate_label(self, input_text: str) -> None: 

669 """Validates that the composed snapshot label forms a legal name.""" 

670 name: str = str(self) 

671 validate_dataset_name(name, input_text) 

672 if "/" in name: 

673 die(f"Invalid ZFS snapshot name: '{name}' for: '{input_text}'") 

674 for key, value in {"prefix": self.prefix, "infix": self.infix, "suffix": self.suffix}.items(): 

675 if key == "prefix": 

676 if not value.endswith("_"): 

677 die(f"Invalid {input_text}{key}: Must end with an underscore character: '{value}'") 

678 if value.count("_") > 1: 

679 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'") 

680 elif key == "infix": 

681 if value: 

682 if not value.endswith("_"): 

683 die(f"Invalid {input_text}{key}: Must end with an underscore character: '{value}'") 

684 if value.count("_") > 1: 

685 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'") 

686 elif value: 

687 if not value.startswith("_"): 

688 die(f"Invalid {input_text}{key}: Must start with an underscore character: '{value}'") 

689 if value.count("_") > 1: 

690 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'") 

691 

692 

693############################################################################# 

694@final 

695class CreateSrcSnapshotConfig: 

696 """Option values for --create-src-snapshots, that is, for automatically creating source snapshots.""" 

697 

698 def __init__(self, args: argparse.Namespace, p: Params) -> None: 

699 """Option values for --create-src-snapshots*; reads from ArgumentParser via args.""" 

700 # immutable variables: 

701 self.skip_create_src_snapshots: Final[bool] = not args.create_src_snapshots 

702 self.create_src_snapshots_even_if_not_due: Final[bool] = args.create_src_snapshots_even_if_not_due 

703 tz_spec: str | None = args.create_src_snapshots_timezone if args.create_src_snapshots_timezone else None 

704 self.tz: Final[tzinfo | None] = get_timezone(tz_spec) 

705 self.current_datetime: datetime = current_datetime(tz_spec) 

706 self.timeformat: Final[str] = args.create_src_snapshots_timeformat 

707 self.anchors: Final[PeriodAnchors] = PeriodAnchors.parse(args) 

708 

709 # Compute the schedule for upcoming periodic time events (suffix_durations). This event schedule is also used in 

710 # daemon mode via sleep_until_next_daemon_iteration() 

711 labels: list[SnapshotLabel] = [] 

712 create_src_snapshots_plan: str = args.create_src_snapshots_plan or str({"bzfs": {"onsite": {"adhoc": 1}}}) 

713 for org, target_periods in ast.literal_eval(create_src_snapshots_plan).items(): 

714 for target, periods in target_periods.items(): 

715 for period_unit, period_amount in periods.items(): # e.g. period_unit can be "10minutely" or "minutely" 

716 if not isinstance(period_amount, int) or period_amount < 0: 

717 die(f"--create-src-snapshots-plan: Period amount must be a non-negative integer: {period_amount}") 

718 if period_amount > 0: 

719 suffix: str = nsuffix(period_unit) 

720 labels.append(SnapshotLabel(prefix=nprefix(org), infix=ninfix(target), timestamp="", suffix=suffix)) 

721 suffixes: list[str] = list({label.suffix for label in labels}) # dedupe 

722 xperiods: SnapshotPeriods = p.xperiods 

723 if self.skip_create_src_snapshots: 

724 duration_amount, duration_unit = p.xperiods.suffix_to_duration0(p.daemon_frequency) 

725 if duration_amount <= 0 or not duration_unit: 

726 die(f"Invalid --daemon-frequency: {p.daemon_frequency}") 

727 suffixes = [nsuffix(p.daemon_frequency)] 

728 labels = [] 

729 suffix_durations: dict[str, tuple[int, str]] = {suffix: xperiods.suffix_to_duration1(suffix) for suffix in suffixes} 

730 

731 def suffix_key(suffix: str) -> tuple[int, str]: 

732 duration_amount, duration_unit = suffix_durations[suffix] 

733 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0) 

734 if suffix.endswith(("hourly", "minutely", "secondly")): 

735 if duration_milliseconds != 0 and 86400 * 1000 % duration_milliseconds != 0: 

736 die( 

737 "Invalid --create-src-snapshots-plan: Period duration should be a divisor of 86400 seconds " 

738 f"without remainder so that snapshots will be created at the same time of day every day: {suffix}" 

739 ) 

740 if suffix.endswith("monthly"): 

741 if duration_amount != 0 and 12 % duration_amount != 0: 

742 die( 

743 "Invalid --create-src-snapshots-plan: Period duration should be a divisor of 12 months " 

744 f"without remainder so that snapshots will be created at the same time every year: {suffix}" 

745 ) 

746 return duration_milliseconds, suffix 

747 

748 suffixes.sort(key=suffix_key, reverse=True) # take snapshots for dailies before hourlies, and so on 

749 self.suffix_durations: Final[dict[str, tuple[int, str]]] = {sfx: suffix_durations[sfx] for sfx in suffixes} # sort 

750 suffix_indexes: dict[str, int] = {suffix: k for k, suffix in enumerate(suffixes)} 

751 labels.sort(key=lambda label: (suffix_indexes[label.suffix], label)) # take snapshots for dailies before hourlies 

752 self._snapshot_labels: Final[list[SnapshotLabel]] = labels 

753 for label in self.snapshot_labels(): 

754 label.validate_label("--create-src-snapshots-plan ") 

755 

756 def snapshot_labels(self) -> list[SnapshotLabel]: 

757 """Returns the snapshot name patterns for which snapshots shall be created.""" 

758 timeformat: str = self.timeformat 

759 is_millis: bool = timeformat.endswith("%F") # non-standard hack to append milliseconds 

760 if is_millis: 

761 timeformat = timeformat[0:-1] + "f" # replace %F with %f (append microseconds) 

762 timestamp: str = self.current_datetime.strftime(timeformat) 

763 if is_millis: 

764 timestamp = timestamp[: -len("000")] # replace microseconds with milliseconds 

765 timestamp = timestamp.replace("+", "z") # zfs CLI does not accept the '+' character in snapshot names 

766 return [SnapshotLabel(label.prefix, label.infix, timestamp, label.suffix) for label in self._snapshot_labels] 

767 

768 def __repr__(self) -> str: 

769 return str(self.__dict__) 

770 

771 

772############################################################################# 

773@dataclass(frozen=True) 

774@final 

775class AlertConfig: 

776 """Thresholds controlling when alerts fire for snapshot age.""" 

777 

778 kind: Literal["Latest", "Oldest"] 

779 warning_millis: int 

780 critical_millis: int 

781 

782 

783############################################################################# 

784@dataclass(frozen=True) 

785@final 

786class MonitorSnapshotAlert: 

787 """Alert configuration for a single monitored snapshot label.""" 

788 

789 label: SnapshotLabel 

790 latest: AlertConfig | None 

791 oldest: AlertConfig | None 

792 oldest_skip_holds: bool 

793 

794 

795############################################################################# 

796@final 

797class MonitorSnapshotsConfig: 

798 """Option values for --monitor-snapshots*, that is, policy describing which snapshots to monitor for staleness.""" 

799 

800 def __init__(self, args: argparse.Namespace, p: Params) -> None: 

801 """Reads from ArgumentParser via args.""" 

802 # immutable variables: 

803 self.monitor_snapshots: Final[dict] = ast.literal_eval(args.monitor_snapshots) 

804 self.dont_warn: Final[bool] = args.monitor_snapshots_dont_warn 

805 self.dont_crit: Final[bool] = args.monitor_snapshots_dont_crit 

806 self.no_latest_check: Final[bool] = args.monitor_snapshots_no_latest_check 

807 self.no_oldest_check: Final[bool] = args.monitor_snapshots_no_oldest_check 

808 alerts: list[MonitorSnapshotAlert] = [] 

809 xperiods: SnapshotPeriods = p.xperiods 

810 for org, target_periods in self.monitor_snapshots.items(): 

811 prefix: str = nprefix(org) 

812 for target, periods in target_periods.items(): 

813 for period_unit, alert_dicts in periods.items(): # e.g. period_unit can be "10minutely" or "minutely" 

814 label = SnapshotLabel(prefix=prefix, infix=ninfix(target), timestamp="", suffix=nsuffix(period_unit)) 

815 alert_latest, alert_oldest = None, None 

816 oldest_skip_holds: bool = False 

817 for alert_type, alert_dict in alert_dicts.items(): 

818 m = "--monitor-snapshots: " 

819 if alert_type not in ["latest", "oldest"]: 

820 die(f"{m}'{alert_type}' must be 'latest' or 'oldest' within {args.monitor_snapshots}") 

821 warning_millis: int = 0 

822 critical_millis: int = 0 

823 cycles: int = 1 

824 for kind, value in alert_dict.items(): 

825 context: str = args.monitor_snapshots 

826 if kind == "warning": 

827 warning_millis = max(0, parse_duration_to_milliseconds(str(value), context=context)) 

828 elif kind == "critical": 

829 critical_millis = max(0, parse_duration_to_milliseconds(str(value), context=context)) 

830 elif kind == "cycles": 

831 cycles = max(0, int(value)) 

832 elif kind == "oldest_skip_holds" and alert_type == "oldest": 

833 if not isinstance(value, bool): 

834 die(f"{m}'{kind}' must be a bool within {context}") 

835 oldest_skip_holds = value 

836 else: 

837 die(f"{m}'{kind}' must be 'warning', 'critical' or 'cycles' within {context}") 

838 if warning_millis > 0 or critical_millis > 0: 

839 duration_amount, duration_unit = xperiods.suffix_to_duration1(label.suffix) 

840 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0) 

841 warning_millis += 0 if warning_millis <= 0 else cycles * duration_milliseconds 

842 critical_millis += 0 if critical_millis <= 0 else cycles * duration_milliseconds 

843 warning_millis = UNIX_TIME_INFINITY_SECS if warning_millis <= 0 else warning_millis 

844 critical_millis = UNIX_TIME_INFINITY_SECS if critical_millis <= 0 else critical_millis 

845 capitalized_alert_type = cast(Literal["Latest", "Oldest"], sys.intern(alert_type.capitalize())) 

846 alert_config = AlertConfig(capitalized_alert_type, warning_millis, critical_millis) 

847 if alert_type == "latest": 

848 if not self.no_latest_check: 

849 alert_latest = alert_config 

850 else: 

851 assert alert_type == "oldest" 

852 if not self.no_oldest_check: 

853 alert_oldest = alert_config 

854 if alert_latest is not None or alert_oldest is not None: 

855 alerts.append(MonitorSnapshotAlert(label, alert_latest, alert_oldest, oldest_skip_holds)) 

856 

857 def alert_sort_key(alert: MonitorSnapshotAlert) -> tuple[int, SnapshotLabel]: 

858 duration_amount, duration_unit = xperiods.suffix_to_duration1(alert.label.suffix) 

859 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0) 

860 return duration_milliseconds, alert.label 

861 

862 alerts.sort(key=alert_sort_key, reverse=True) # check snapshots for dailies before hourlies, and so on 

863 self.alerts: Final[list[MonitorSnapshotAlert]] = alerts 

864 self.enable_monitor_snapshots: Final[bool] = len(alerts) > 0 

865 

866 def __repr__(self) -> str: 

867 return str(self.__dict__) 

868 

869 

870############################################################################# 

871def _fix_send_recv_opts( 

872 opts: list[str], 

873 *, 

874 exclude_long_opts: set[str], 

875 exclude_short_opts: str, 

876 include_arg_opts: set[str], 

877 exclude_arg_opts: frozenset[str] = frozenset(), 

878 preserve_properties: frozenset[str] = frozenset(), 

879) -> tuple[list[str], list[str]]: 

880 """These opts are instead managed via bzfs CLI args --dryrun, etc.""" 

881 assert "-" not in exclude_short_opts 

882 results: list[str] = [] 

883 x_names: set[str] = set(preserve_properties) 

884 i = 0 

885 n = len(opts) 

886 while i < n: 

887 opt: str = opts[i] 

888 i += 1 

889 if opt in exclude_arg_opts: # example: {"-X", "--exclude"} 

890 i += 1 

891 continue 

892 elif opt in include_arg_opts: # example: {"-o", "-x"} 

893 results.append(opt) 

894 if i < n: 

895 if opt == "-o" and "=" in opts[i] and opts[i].split("=", 1)[0] in preserve_properties: 

896 die(f"--preserve-properties: Disallowed ZFS property found in --zfs-recv-program-opt(s): -o {opts[i]}") 

897 if opt == "-x": 

898 x_names.discard(opts[i]) 

899 results.append(opts[i]) 

900 i += 1 

901 elif opt not in exclude_long_opts: # example: {"--dryrun", "--verbose"} 

902 if opt.startswith("-") and opt != "-" and not opt.startswith("--"): 

903 for char in exclude_short_opts: # example: "den" 

904 opt = opt.replace(char, "") 

905 if opt == "-": 

906 continue 

907 results.append(opt) 

908 return results, sorted(x_names) 

909 

910 

911_SSH_MASTER_DOMAIN_SOCKET_FILE_PID_REGEX: Final[re.Pattern[str]] = re.compile(r"^[0-9]+") # see local_ssh_command() 

912 

913 

914def _delete_stale_files( 

915 root_dir: str, 

916 *, 

917 prefix: str, 

918 millis: int = 60 * 60 * 1000, 

919 dirs: bool = False, 

920 exclude: str | None = None, 

921 ssh: bool = False, 

922) -> None: 

923 """Cleans up obsolete files; For example caused by abnormal termination, OS crash.""" 

924 seconds: float = millis / 1000 

925 now: float = time.time() 

926 validate_is_not_a_symlink("", root_dir) 

927 with os.scandir(root_dir) as iterator: 

928 for entry in iterator: 

929 if entry.name == exclude or not entry.name.startswith(prefix): 

930 continue 

931 try: 

932 stats = entry.stat(follow_symlinks=False) 

933 is_dir = entry.is_dir(follow_symlinks=False) 

934 if ((dirs and is_dir) or (not dirs and not is_dir)) and now - stats.st_mtime >= seconds: 

935 if dirs: 

936 shutil.rmtree(entry.path, ignore_errors=True) 

937 elif not (ssh and stat.S_ISSOCK(stats.st_mode)): 

938 os.remove(entry.path) 

939 elif match := _SSH_MASTER_DOMAIN_SOCKET_FILE_PID_REGEX.match(entry.name[len(prefix) :]): 939 ↛ 928line 939 didn't jump to line 928 because the condition on line 939 was always true

940 pid: int = int(match.group(0)) 

941 if pid_exists(pid) is False or now - stats.st_mtime >= 31 * 24 * 60 * 60: 

942 os.remove(entry.path) # bzfs process is no longer alive; its ssh master process isn't either 

943 except FileNotFoundError: 

944 pass # harmless 

945 

946 

947def _create_symlink(src: str, dst_dir: str, dst: str) -> None: 

948 """Creates dst symlink pointing to src using a relative path.""" 

949 rel_path: str = os.path.relpath(src, start=dst_dir) 

950 os.symlink(src=rel_path, dst=os.path.join(dst_dir, dst)) 

951 

952 

953def resolve_r2r_mode(p: Params) -> str: 

954 """Returns the effective r2r mode for the current task and emits fallback warnings.""" 

955 src, dst = p.src, p.dst 

956 log = p.log 

957 mode: str = p.r2r_mode_requested 

958 assert mode in ("off", "pull", "push"), mode 

959 

960 if p.skip_replication: 

961 return "off" 

962 

963 if mode == "off": 

964 return "off" 

965 

966 if (not src.ssh_user_host) and (not dst.ssh_user_host): 

967 return "off" # we'll do local replication (there's no need for r2r) 

968 

969 if (not src.is_nonlocal) or (not dst.is_nonlocal): 

970 return "off" # at least one of them is local to this host (there's no need for r2r) 

971 

972 r: Remote = dst if mode == "pull" else src 

973 if not p.is_program_available("sh", r.location): 

974 log.warning( 

975 f"--r2r={mode} requires sh on {r.location} host: {r.ssh_user_host or 'localhost'} for remote-to-remote " 

976 "replication; falling back to --r2r=off." 

977 ) 

978 return "off" 

979 

980 if is_same_remote(src, dst): # are user@host, port, ssh_config file the same? 

981 return mode # perf: we'll do local replication on that host 

982 

983 if not p.is_program_available("ssh", r.location): 

984 log.warning( 

985 f"--r2r={mode} requires ssh on {r.location} host: {r.ssh_user_host or 'localhost'} " 

986 "for remote-to-remote replication; falling back to --r2r=off." 

987 ) 

988 return "off" 

989 

990 if mode == "pull" and src.ssh_config_file and src.ssh_config_file != "none": 

991 log.warning( 

992 "--r2r=pull cannot use --ssh-src-config-file for remote-to-remote ssh; cowardly falling back to --r2r=off." 

993 ) 

994 return "off" 

995 

996 if mode == "push" and dst.ssh_config_file and dst.ssh_config_file != "none": 

997 log.warning( 

998 "--r2r=push cannot use --ssh-dst-config-file for remote-to-remote ssh; cowardly falling back to --r2r=off." 

999 ) 

1000 return "off" 

1001 

1002 return mode 

1003 

1004 

1005def is_same_remote(src: Remote, dst: Remote) -> bool: 

1006 """Returns whether this remote is the same as the other remote.""" 

1007 if src.ssh_user_host == dst.ssh_user_host and not src.ssh_user_host: 

1008 return True # both are local 

1009 return ( 

1010 src.ssh_user_host == dst.ssh_user_host 

1011 and src.ssh_port == dst.ssh_port 

1012 and src.ssh_config_file == dst.ssh_config_file 

1013 )