Coverage for bzfs_main/configuration.py: 99%

539 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-06 13:30 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Configuration subsystem; All CLI option/parameter values are reachable from the "Params" class.""" 

16 

17from __future__ import annotations 

18import argparse 

19import ast 

20import hashlib 

21import os 

22import platform 

23import random 

24import re 

25import shutil 

26import stat 

27import sys 

28import tempfile 

29import time 

30from dataclasses import dataclass 

31from datetime import datetime, tzinfo 

32from logging import Logger 

33from typing import ( 

34 TYPE_CHECKING, 

35 Iterable, 

36 Literal, 

37 NamedTuple, 

38 cast, 

39) 

40 

41import bzfs_main.utils 

42from bzfs_main.argparse_actions import ( 

43 SnapshotFilter, 

44 optimize_snapshot_filters, 

45 validate_no_argument_file, 

46) 

47from bzfs_main.argparse_cli import ( 

48 LOG_DIR_DEFAULT, 

49 ZFS_RECV_GROUPS, 

50 __version__, 

51) 

52from bzfs_main.detect import ( 

53 DISABLE_PRG, 

54) 

55from bzfs_main.period_anchors import ( 

56 PeriodAnchors, 

57) 

58from bzfs_main.retry import ( 

59 RetryPolicy, 

60) 

61from bzfs_main.utils import ( 

62 DIR_PERMISSIONS, 

63 PROG_NAME, 

64 SHELL_CHARS, 

65 SNAPSHOT_FILTERS_VAR, 

66 UNIX_TIME_INFINITY_SECS, 

67 RegexList, 

68 SnapshotPeriods, 

69 SynchronizedBool, 

70 append_if_absent, 

71 compile_regexes, 

72 current_datetime, 

73 die, 

74 get_home_directory, 

75 get_timezone, 

76 getenv_bool, 

77 getenv_int, 

78 is_included, 

79 ninfix, 

80 nprefix, 

81 nsuffix, 

82 parse_duration_to_milliseconds, 

83 pid_exists, 

84 validate_dataset_name, 

85 validate_is_not_a_symlink, 

86 validate_property_name, 

87 xappend, 

88) 

89 

90if TYPE_CHECKING: # pragma: no cover - for type hints only 

91 from bzfs_main.connection import ConnectionPools 

92 

93 

94############################################################################# 

95class LogParams: 

96 """Option values for logging.""" 

97 

98 def __init__(self, args: argparse.Namespace) -> None: 

99 """Reads from ArgumentParser via args.""" 

100 # immutable variables: 

101 if args.quiet: 

102 self.log_level: str = "ERROR" 

103 elif args.verbose >= 2: 

104 self.log_level = "TRACE" 

105 elif args.verbose >= 1: 

106 self.log_level = "DEBUG" 

107 else: 

108 self.log_level = "INFO" 

109 self.log_config_file: str = args.log_config_file 

110 if self.log_config_file: 

111 validate_no_argument_file(self.log_config_file, args, err_prefix="--log-config-file: ") 

112 self.log_config_vars: dict[str, str] = dict(var.split(":", 1) for var in args.log_config_var) 

113 self.timestamp: str = datetime.now().isoformat(sep="_", timespec="seconds") # 2024-09-03_12:26:15 

114 self.home_dir: str = get_home_directory() 

115 log_parent_dir: str = args.log_dir if args.log_dir else os.path.join(self.home_dir, LOG_DIR_DEFAULT) 

116 if LOG_DIR_DEFAULT not in os.path.basename(log_parent_dir): 

117 die(f"Basename of --log-dir must contain the substring '{LOG_DIR_DEFAULT}', but got: {log_parent_dir}") 

118 sep: str = "_" if args.log_subdir == "daily" else ":" 

119 timestamp: str = self.timestamp 

120 subdir: str = timestamp[0 : timestamp.rindex(sep) if args.log_subdir == "minutely" else timestamp.index(sep)] 

121 self.log_dir: str = os.path.join(log_parent_dir, subdir) # 2024-09-03 (d), 2024-09-03_12 (h), 2024-09-03_12:26 (m) 

122 os.makedirs(log_parent_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

123 validate_is_not_a_symlink("--log-dir ", log_parent_dir) 

124 os.makedirs(self.log_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

125 validate_is_not_a_symlink("--log-dir subdir ", self.log_dir) 

126 self.log_file_prefix: str = args.log_file_prefix 

127 self.log_file_infix: str = args.log_file_infix 

128 self.log_file_suffix: str = args.log_file_suffix 

129 fd, self.log_file = tempfile.mkstemp( 

130 suffix=".log", 

131 prefix=f"{self.log_file_prefix}{self.timestamp}{self.log_file_infix}{self.log_file_suffix}-", 

132 dir=self.log_dir, 

133 ) 

134 os.close(fd) 

135 self.pv_log_file: str = self.log_file[0 : -len(".log")] + ".pv" 

136 self.last_modified_cache_dir: str = os.path.join(log_parent_dir, ".cache", "last_modified") 

137 os.makedirs(os.path.dirname(self.last_modified_cache_dir), mode=DIR_PERMISSIONS, exist_ok=True) 

138 

139 # Create/update "current" symlink to current_dir, which is a subdir containing further symlinks to log files. 

140 # For parallel usage, ensures there is no time window when the symlinks are inconsistent or do not exist. 

141 current: str = "current" 

142 dot_current_dir: str = os.path.join(log_parent_dir, f".{current}") 

143 current_dir: str = os.path.join(dot_current_dir, os.path.basename(self.log_file)[0 : -len(".log")]) 

144 os.makedirs(dot_current_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

145 validate_is_not_a_symlink("--log-dir: .current ", dot_current_dir) 

146 os.makedirs(current_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

147 validate_is_not_a_symlink("--log-dir: current ", current_dir) 

148 _create_symlink(self.log_file, current_dir, f"{current}.log") 

149 _create_symlink(self.pv_log_file, current_dir, f"{current}.pv") 

150 _create_symlink(self.log_dir, current_dir, f"{current}.dir") 

151 dst_file: str = os.path.join(current_dir, current) 

152 os.symlink(os.path.relpath(current_dir, start=log_parent_dir), dst_file) 

153 os.replace(dst_file, os.path.join(log_parent_dir, current)) # atomic rename 

154 _delete_stale_files(dot_current_dir, prefix="", millis=10, dirs=True, exclude=os.path.basename(current_dir)) 

155 self.params: Params | None = None 

156 

157 def __repr__(self) -> str: 

158 return str(self.__dict__) 

159 

160 

161############################################################################# 

162class Params: 

163 """All parsed CLI options combined into a single bundle; simplifies passing around numerous settings and defaults.""" 

164 

165 def __init__( 

166 self, 

167 args: argparse.Namespace, 

168 sys_argv: list[str], 

169 log_params: LogParams, 

170 log: Logger, 

171 inject_params: dict[str, bool] | None = None, 

172 ) -> None: 

173 """Reads from ArgumentParser via args.""" 

174 # immutable variables: 

175 assert args is not None 

176 assert isinstance(sys_argv, list) 

177 assert log_params is not None 

178 assert log is not None 

179 self.args: argparse.Namespace = args 

180 self.sys_argv: list[str] = sys_argv 

181 self.log_params: LogParams = log_params 

182 self.log: Logger = log 

183 self.inject_params: dict[str, bool] = inject_params if inject_params is not None else {} # for testing only 

184 self.one_or_more_whitespace_regex: re.Pattern = re.compile(r"\s+") 

185 self.two_or_more_spaces_regex: re.Pattern = re.compile(r" +") 

186 self._unset_matching_env_vars(args) 

187 self.xperiods: SnapshotPeriods = SnapshotPeriods() 

188 

189 assert len(args.root_dataset_pairs) > 0 

190 self.root_dataset_pairs: list[tuple[str, str]] = args.root_dataset_pairs 

191 self.recursive: bool = args.recursive 

192 self.recursive_flag: str = "-r" if args.recursive else "" 

193 

194 self.dry_run: bool = args.dryrun is not None 

195 self.dry_run_recv: str = "-n" if self.dry_run else "" 

196 self.dry_run_destroy: str = self.dry_run_recv 

197 self.dry_run_no_send: bool = args.dryrun == "send" 

198 self.verbose_zfs: bool = args.verbose >= 2 

199 self.verbose_destroy: str = "" if args.quiet else "-v" 

200 self.quiet: bool = args.quiet 

201 

202 self.zfs_send_program_opts: list[str] = self._fix_send_opts(self.split_args(args.zfs_send_program_opts)) 

203 zfs_recv_program_opts: list[str] = self.split_args(args.zfs_recv_program_opts) 

204 for extra_opt in args.zfs_recv_program_opt: 

205 zfs_recv_program_opts.append(self.validate_arg_str(extra_opt, allow_all=True)) 

206 preserve_properties = [validate_property_name(name, "--preserve-properties") for name in args.preserve_properties] 

207 zfs_recv_program_opts, zfs_recv_x_names = self._fix_recv_opts(zfs_recv_program_opts, frozenset(preserve_properties)) 

208 self.zfs_recv_program_opts: list[str] = zfs_recv_program_opts 

209 self.zfs_recv_x_names: list[str] = zfs_recv_x_names 

210 if self.verbose_zfs: 

211 append_if_absent(self.zfs_send_program_opts, "-v") 

212 append_if_absent(self.zfs_recv_program_opts, "-v") 

213 self.zfs_full_recv_opts: list[str] = self.zfs_recv_program_opts.copy() 

214 cpconfigs = [CopyPropertiesConfig(group, flag, args, self) for group, flag in ZFS_RECV_GROUPS.items()] 

215 self.zfs_recv_o_config, self.zfs_recv_x_config, self.zfs_set_config = cpconfigs 

216 

217 self.force_rollback_to_latest_snapshot: bool = args.force_rollback_to_latest_snapshot 

218 self.force_rollback_to_latest_common_snapshot = SynchronizedBool(args.force_rollback_to_latest_common_snapshot) 

219 self.force: SynchronizedBool = SynchronizedBool(args.force) 

220 self.force_once: bool = args.force_once 

221 self.force_unmount: str = "-f" if args.force_unmount else "" 

222 force_hard: str = "-R" if args.force_destroy_dependents else "" 

223 self.force_hard: str = "-R" if args.force_hard else force_hard # --force-hard is deprecated 

224 

225 self.skip_parent: bool = args.skip_parent 

226 self.skip_missing_snapshots: str = args.skip_missing_snapshots 

227 self.skip_on_error: str = args.skip_on_error 

228 self.retry_policy: RetryPolicy = RetryPolicy(args) 

229 self.skip_replication: bool = args.skip_replication 

230 self.delete_dst_snapshots: bool = args.delete_dst_snapshots is not None 

231 self.delete_dst_bookmarks: bool = args.delete_dst_snapshots == "bookmarks" 

232 self.delete_dst_snapshots_no_crosscheck: bool = args.delete_dst_snapshots_no_crosscheck 

233 self.delete_dst_snapshots_except: bool = args.delete_dst_snapshots_except 

234 self.delete_dst_datasets: bool = args.delete_dst_datasets 

235 self.delete_empty_dst_datasets: bool = args.delete_empty_dst_datasets is not None 

236 self.delete_empty_dst_datasets_if_no_bookmarks_and_no_snapshots: bool = ( 

237 args.delete_empty_dst_datasets == "snapshots+bookmarks" 

238 ) 

239 self.compare_snapshot_lists: str = args.compare_snapshot_lists 

240 self.daemon_lifetime_nanos: int = 1_000_000 * parse_duration_to_milliseconds(args.daemon_lifetime) 

241 self.daemon_frequency: str = args.daemon_frequency 

242 self.enable_privilege_elevation: bool = not args.no_privilege_elevation 

243 self.no_stream: bool = args.no_stream 

244 self.resume_recv: bool = not args.no_resume_recv 

245 self.create_bookmarks: str = "none" if args.no_create_bookmark else args.create_bookmarks # no_create_bookmark depr 

246 self.use_bookmark: bool = not args.no_use_bookmark 

247 

248 self.src: Remote = Remote("src", args, self) # src dataset, host and ssh options 

249 self.dst: Remote = Remote("dst", args, self) # dst dataset, host and ssh options 

250 self.create_src_snapshots_config: CreateSrcSnapshotConfig = CreateSrcSnapshotConfig(args, self) 

251 self.monitor_snapshots_config: MonitorSnapshotsConfig = MonitorSnapshotsConfig(args, self) 

252 self.is_caching_snapshots: bool = args.cache_snapshots == "true" 

253 

254 self.compression_program: str = self._program_name(args.compression_program) 

255 self.compression_program_opts: list[str] = self.split_args(args.compression_program_opts) 

256 for opt in ["-o", "--output-file"]: 

257 if opt in self.compression_program_opts: 

258 die(f"--compression-program-opts: {opt} is disallowed for security reasons.") 

259 self.getconf_program: str = self._program_name("getconf") # print number of CPUs on POSIX except Solaris 

260 self.psrinfo_program: str = self._program_name("psrinfo") # print number of CPUs on Solaris 

261 self.mbuffer_program: str = self._program_name(args.mbuffer_program) 

262 self.mbuffer_program_opts: list[str] = self.split_args(args.mbuffer_program_opts) 

263 for opt in ["-i", "-I", "-o", "-O", "-l", "-L"]: 

264 if opt in self.mbuffer_program_opts: 

265 die(f"--mbuffer-program-opts: {opt} is disallowed for security reasons.") 

266 self.ps_program: str = self._program_name(args.ps_program) 

267 self.pv_program: str = self._program_name(args.pv_program) 

268 self.pv_program_opts: list[str] = self.split_args(args.pv_program_opts) 

269 for opt in ["-f", "--log-file"]: 

270 if opt in self.pv_program_opts: 

271 die(f"--pv-program-opts: {opt} is disallowed for security reasons.") 

272 self.isatty: bool = getenv_bool("isatty", True) 

273 if args.bwlimit: 

274 self.pv_program_opts += [f"--rate-limit={self.validate_arg_str(args.bwlimit)}"] 

275 self.shell_program_local: str = "sh" 

276 self.shell_program: str = self._program_name(args.shell_program) 

277 self.ssh_program: str = self._program_name(args.ssh_program) 

278 self.sudo_program: str = self._program_name(args.sudo_program) 

279 self.uname_program: str = self._program_name("uname") 

280 self.zfs_program: str = self._program_name("zfs") 

281 self.zpool_program: str = self._program_name(args.zpool_program) 

282 

283 # no point creating complex shell pipeline commands for tiny data transfers: 

284 self.min_pipe_transfer_size: int = getenv_int("min_pipe_transfer_size", 1024 * 1024) 

285 self.max_datasets_per_batch_on_list_snaps: int = getenv_int("max_datasets_per_batch_on_list_snaps", 1024) 

286 self.max_datasets_per_minibatch_on_list_snaps: int = getenv_int("max_datasets_per_minibatch_on_list_snaps", -1) 

287 self.max_snapshots_per_minibatch_on_delete_snaps = getenv_int("max_snapshots_per_minibatch_on_delete_snaps", 2**29) 

288 self.dedicated_tcp_connection_per_zfs_send: bool = getenv_bool("dedicated_tcp_connection_per_zfs_send", True) 

289 self.threads: tuple[int, bool] = (1, False) if self.force_once else args.threads 

290 timeout_nanos = None if args.timeout is None else 1_000_000 * parse_duration_to_milliseconds(args.timeout) 

291 self.timeout_nanos: int | None = timeout_nanos 

292 self.no_estimate_send_size: bool = args.no_estimate_send_size 

293 self.remote_conf_cache_ttl_nanos: int = 1_000_000 * parse_duration_to_milliseconds(args.daemon_remote_conf_cache_ttl) 

294 self.terminal_columns: int = ( 

295 getenv_int("terminal_columns", shutil.get_terminal_size(fallback=(120, 24)).columns) 

296 if self.isatty and self.pv_program != DISABLE_PRG and not self.quiet 

297 else 0 

298 ) 

299 

300 self.os_cpu_count: int | None = os.cpu_count() 

301 self.os_geteuid: int = os.geteuid() 

302 self.prog_version: str = __version__ 

303 self.python_version: str = sys.version 

304 self.platform_version: str = platform.version() 

305 self.platform_platform: str = platform.platform() 

306 

307 # mutable variables: 

308 snapshot_filters = args.snapshot_filters_var if hasattr(args, SNAPSHOT_FILTERS_VAR) else [[]] 

309 self.snapshot_filters: list[list[SnapshotFilter]] = [optimize_snapshot_filters(f) for f in snapshot_filters] 

310 self.exclude_dataset_property: str | None = args.exclude_dataset_property 

311 self.exclude_dataset_regexes: RegexList = [] # deferred to validate_task() phase 

312 self.include_dataset_regexes: RegexList = [] # deferred to validate_task() phase 

313 self.tmp_exclude_dataset_regexes: RegexList = [] # deferred to validate_task() phase 

314 self.tmp_include_dataset_regexes: RegexList = [] # deferred to validate_task() phase 

315 self.abs_exclude_datasets: list[str] = [] # deferred to validate_task() phase 

316 self.abs_include_datasets: list[str] = [] # deferred to validate_task() phase 

317 

318 self.curr_zfs_send_program_opts: list[str] = [] 

319 self.zfs_recv_ox_names: set[str] = set() 

320 self.available_programs: dict[str, dict[str, str]] = {} 

321 self.zpool_features: dict[str, dict[str, str]] = {} 

322 self.connection_pools: dict[str, ConnectionPools] = {} 

323 

324 def split_args(self, text: str, *items: str | Iterable[str], allow_all: bool = False) -> list[str]: 

325 """Splits option string on runs of one or more whitespace into an option list.""" 

326 text = text.strip() 

327 opts = self.one_or_more_whitespace_regex.split(text) if text else [] 

328 xappend(opts, items) 

329 if not allow_all: 

330 self._validate_quoting(opts) 

331 return opts 

332 

333 def validate_arg(self, opt: str, allow_spaces: bool = False, allow_all: bool = False) -> str | None: 

334 """allow_all permits all characters, including whitespace and quotes; See squote() and dquote().""" 

335 if allow_all or opt is None: 

336 return opt 

337 if any(char.isspace() and (char != " " or not allow_spaces) for char in opt): 

338 die(f"Option must not contain a whitespace character{' other than space' if allow_spaces else ''}: {opt}") 

339 self._validate_quoting([opt]) 

340 return opt 

341 

342 def validate_arg_str(self, opt: str, allow_spaces: bool = False, allow_all: bool = False) -> str: 

343 """Returns validated option string, raising if missing or illegal.""" 

344 if opt is None: 

345 die("Option must not be missing") 

346 self.validate_arg(opt, allow_spaces=allow_spaces, allow_all=allow_all) 

347 return opt 

348 

349 @staticmethod 

350 def _validate_quoting(opts: list[str]) -> None: 

351 """Raises an error if any option contains a quote or shell metacharacter.""" 

352 for opt in opts: 

353 if "'" in opt or '"' in opt or "$" in opt or "`" in opt: 

354 die(f"Option must not contain a single quote or double quote or dollar or backtick character: {opt}") 

355 

356 @staticmethod 

357 def _fix_recv_opts(opts: list[str], preserve_properties: frozenset[str]) -> tuple[list[str], list[str]]: 

358 """Returns sanitized ``zfs recv`` options and captured ``-o/-x`` args.""" 

359 return _fix_send_recv_opts( 

360 opts, 

361 exclude_long_opts={"--dryrun"}, 

362 exclude_short_opts="n", 

363 include_arg_opts={"-o", "-x"}, 

364 preserve_properties=preserve_properties, 

365 ) 

366 

367 @staticmethod 

368 def _fix_send_opts(opts: list[str]) -> list[str]: 

369 """Returns sanitized ``zfs send`` options.""" 

370 return _fix_send_recv_opts( 

371 opts, 

372 exclude_long_opts={"--dryrun"}, 

373 exclude_short_opts="den", 

374 include_arg_opts={"-X", "--exclude", "--redact"}, 

375 exclude_arg_opts=frozenset({"-i", "-I"}), 

376 )[0] 

377 

378 def _program_name(self, program: str) -> str: 

379 """For testing: helps simulate errors caused by external programs.""" 

380 self.validate_arg_str(program) 

381 if not program: 

382 die(f"Program name must not be missing: {program}") 

383 for char in SHELL_CHARS + ":": 

384 if char in program: 

385 die(f"Program name must not contain a '{char}' character: {program}") 

386 if self.inject_params.get("inject_unavailable_" + program, False): 

387 return program + "-xxx" # substitute a program that cannot be found on the PATH 

388 if self.inject_params.get("inject_failing_" + program, False): 

389 return "false" # substitute a program that will error out with non-zero return code 

390 return program 

391 

392 def _unset_matching_env_vars(self, args: argparse.Namespace) -> None: 

393 """Unset environment variables matching regex filters.""" 

394 exclude_envvar_regexes: RegexList = compile_regexes(args.exclude_envvar_regex) 

395 include_envvar_regexes: RegexList = compile_regexes(args.include_envvar_regex) 

396 for envvar_name in list(os.environ.keys()): 

397 if is_included(envvar_name, exclude_envvar_regexes, include_envvar_regexes): 

398 os.environ.pop(envvar_name, None) 

399 self.log.debug("Unsetting b/c envvar regex: %s", envvar_name) 

400 

401 def lock_file_name(self) -> str: 

402 """Returns unique path used to detect concurrently running jobs. 

403 

404 Makes it such that a job that runs periodically declines to start if the same previous periodic job is still running 

405 without completion yet. Hashed key avoids overly long filenames while remaining deterministic. 

406 """ 

407 # fmt: off 

408 key = (tuple(self.root_dataset_pairs), self.args.recursive, self.args.exclude_dataset_property, 

409 tuple(self.args.include_dataset), tuple(self.args.exclude_dataset), 

410 tuple(self.args.include_dataset_regex), tuple(self.args.exclude_dataset_regex), 

411 tuple(tuple(f) for f in self.snapshot_filters), self.args.skip_replication, self.args.create_src_snapshots, 

412 self.args.create_src_snapshots_plan, self.args.create_src_snapshots_timeformat, 

413 self.create_src_snapshots_config.anchors, 

414 self.args.delete_dst_datasets, self.args.delete_dst_snapshots, self.args.delete_dst_snapshots_except, 

415 self.args.delete_empty_dst_datasets, 

416 self.args.compare_snapshot_lists, self.args.monitor_snapshots, 

417 self.args.log_file_infix, 

418 self.src.basis_ssh_host, self.dst.basis_ssh_host, 

419 self.src.basis_ssh_user, self.dst.basis_ssh_user) 

420 # fmt: on 

421 hash_code: str = hashlib.sha256(str(key).encode("utf-8")).hexdigest() 

422 return os.path.join(tempfile.gettempdir(), f"{PROG_NAME}-lockfile-{hash_code}.lock") 

423 

424 def dry(self, msg: str) -> str: 

425 """Prefix ``msg`` with 'Dry' when running in dry-run mode.""" 

426 return bzfs_main.utils.dry(msg, self.dry_run) 

427 

428 def is_program_available(self, program: str, location: str) -> bool: 

429 """Return True if ``program`` was detected on ``location`` host.""" 

430 return program in self.available_programs.get(location, {}) 

431 

432 

433############################################################################# 

434class Remote: 

435 """Connection settings for either source or destination host.""" 

436 

437 def __init__(self, loc: str, args: argparse.Namespace, p: Params) -> None: 

438 """Reads from ArgumentParser via args.""" 

439 # immutable variables: 

440 assert loc == "src" or loc == "dst" 

441 self.location: str = loc 

442 self.params: Params = p 

443 self.basis_ssh_user: str = getattr(args, f"ssh_{loc}_user") 

444 self.basis_ssh_host: str = getattr(args, f"ssh_{loc}_host") 

445 self.ssh_port: int = getattr(args, f"ssh_{loc}_port") 

446 self.ssh_config_file: str | None = p.validate_arg(getattr(args, f"ssh_{loc}_config_file")) 

447 if self.ssh_config_file: 

448 if "bzfs_ssh_config" not in os.path.basename(self.ssh_config_file): 

449 die(f"Basename of --ssh-{loc}-config-file must contain substring 'bzfs_ssh_config': {self.ssh_config_file}") 

450 # disable interactive password prompts and X11 forwarding and pseudo-terminal allocation: 

451 self.ssh_extra_opts: list[str] = ["-oBatchMode=yes", "-oServerAliveInterval=0", "-x", "-T"] + ( 

452 ["-v"] if args.verbose >= 3 else [] 

453 ) 

454 self.max_concurrent_ssh_sessions_per_tcp_connection: int = args.max_concurrent_ssh_sessions_per_tcp_connection 

455 self.reuse_ssh_connection: bool = getenv_bool("reuse_ssh_connection", True) 

456 if self.reuse_ssh_connection: 

457 self.ssh_socket_dir: str = os.path.join(get_home_directory(), ".ssh", "bzfs") 

458 os.makedirs(os.path.dirname(self.ssh_socket_dir), exist_ok=True) 

459 os.makedirs(self.ssh_socket_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

460 self.socket_prefix: str = "s" 

461 _delete_stale_files(self.ssh_socket_dir, self.socket_prefix, ssh=True) 

462 self.sanitize1_regex: re.Pattern = re.compile(r"[\s\\/@$]") # replace whitespace, /, $, \, @ with a ~ tilde char 

463 self.sanitize2_regex: re.Pattern = re.compile(rf"[^a-zA-Z0-9{re.escape('~.:_-')}]") # Remove disallowed chars 

464 

465 # mutable variables: 

466 self.root_dataset: str = "" # deferred until run_main() 

467 self.basis_root_dataset: str = "" # deferred until run_main() 

468 self.pool: str = "" 

469 self.sudo: str = "" 

470 self.use_zfs_delegation: bool = False 

471 self.ssh_user: str = "" 

472 self.ssh_host: str = "" 

473 self.ssh_user_host: str = "" 

474 self.is_nonlocal: bool = False 

475 

476 def local_ssh_command(self) -> list[str]: 

477 """Returns the ssh CLI command to run locally in order to talk to the remote host; This excludes the (trailing) 

478 command to run on the remote host, which will be appended later.""" 

479 if self.ssh_user_host == "": 

480 return [] # dataset is on local host - don't use ssh 

481 

482 # dataset is on remote host 

483 p: Params = self.params 

484 if p.ssh_program == DISABLE_PRG: 

485 die("Cannot talk to remote host because ssh CLI is disabled.") 

486 ssh_cmd: list[str] = [p.ssh_program] + self.ssh_extra_opts 

487 if self.ssh_config_file: 

488 ssh_cmd += ["-F", self.ssh_config_file] 

489 if self.ssh_port: 

490 ssh_cmd += ["-p", str(self.ssh_port)] 

491 if self.reuse_ssh_connection: 

492 # Performance: reuse ssh connection for low latency startup of frequent ssh invocations via the 'ssh -S' and 

493 # 'ssh -S -M -oControlPersist=60s' options. See https://en.wikibooks.org/wiki/OpenSSH/Cookbook/Multiplexing 

494 # Generate unique private Unix domain socket file name in user's home dir and pass it to 'ssh -S /path/to/socket' 

495 def sanitize(name: str) -> str: 

496 name = self.sanitize1_regex.sub("~", name) # replace whitespace, /, $, \, @ with a ~ tilde char 

497 name = self.sanitize2_regex.sub("", name) # Remove disallowed chars 

498 return name 

499 

500 unique: str = f"{os.getpid()}@{time.time_ns()}@{random.SystemRandom().randint(0, 999_999_999_999)}" 

501 socket_name: str = f"{self.socket_prefix}{unique}@{sanitize(self.ssh_host)[:45]}@{sanitize(self.ssh_user)}" 

502 socket_file: str = os.path.join(self.ssh_socket_dir, socket_name)[: max(100, len(self.ssh_socket_dir) + 10)] 

503 ssh_cmd += ["-S", socket_file] 

504 ssh_cmd += [self.ssh_user_host] 

505 return ssh_cmd 

506 

507 def cache_key(self) -> tuple: 

508 """Returns tuple uniquely identifying this Remote for caching.""" 

509 return self.location, self.pool, self.ssh_user_host, self.ssh_port, self.ssh_config_file 

510 

511 def __repr__(self) -> str: 

512 return str(self.__dict__) 

513 

514 

515############################################################################# 

516class CopyPropertiesConfig: 

517 """--zfs-recv-o* and --zfs-recv-x* option groups for copying or excluding ZFS properties on receive.""" 

518 

519 def __init__(self, group: str, flag: str, args: argparse.Namespace, p: Params) -> None: 

520 """Reads from ArgumentParser via args.""" 

521 # immutable variables: 

522 grup: str = group 

523 self.group: str = group 

524 self.flag: str = flag # one of -o or -x 

525 sources: str = p.validate_arg_str(getattr(args, f"{grup}_sources")) 

526 self.sources: str = ",".join(sorted([s.strip() for s in sources.strip().split(",")])) # canonicalize 

527 self.targets: str = p.validate_arg_str(getattr(args, f"{grup}_targets")) 

528 self.include_regexes: RegexList = compile_regexes(getattr(args, f"{grup}_include_regex")) 

529 self.exclude_regexes: RegexList = compile_regexes(getattr(args, f"{grup}_exclude_regex")) 

530 

531 def __repr__(self) -> str: 

532 return str(self.__dict__) 

533 

534 

535############################################################################# 

536class SnapshotLabel(NamedTuple): 

537 """Contains the individual parts that are concatenated into a ZFS snapshot name.""" 

538 

539 prefix: str # bzfs_ 

540 infix: str # us-west-1_ 

541 timestamp: str # 2024-11-06_08:30:05 

542 suffix: str # _hourly 

543 

544 def __str__(self) -> str: # bzfs_us-west-1_2024-11-06_08:30:05_hourly 

545 return f"{self.prefix}{self.infix}{self.timestamp}{self.suffix}" 

546 

547 def validate_label(self, input_text: str) -> None: 

548 """Validates that the composed snapshot label forms a legal name.""" 

549 name: str = str(self) 

550 validate_dataset_name(name, input_text) 

551 if "/" in name: 

552 die(f"Invalid ZFS snapshot name: '{name}' for: '{input_text}*'") 

553 for key, value in {"prefix": self.prefix, "infix": self.infix, "suffix": self.suffix}.items(): 

554 if key == "prefix": 

555 if not value.endswith("_"): 

556 die(f"Invalid {input_text}{key}: Must end with an underscore character: '{value}'") 

557 if value.count("_") > 1: 

558 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'") 

559 elif key == "infix": 

560 if value: 

561 if not value.endswith("_"): 

562 die(f"Invalid {input_text}{key}: Must end with an underscore character: '{value}'") 

563 if value.count("_") > 1: 

564 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'") 

565 elif value: 

566 if not value.startswith("_"): 

567 die(f"Invalid {input_text}{key}: Must start with an underscore character: '{value}'") 

568 if value.count("_") > 1: 

569 die(f"Invalid {input_text}{key}: Must not contain multiple underscore characters: '{value}'") 

570 

571 

572############################################################################# 

573class CreateSrcSnapshotConfig: 

574 """Option values for --create-src-snapshots, that is, for automatically creating source snapshots.""" 

575 

576 def __init__(self, args: argparse.Namespace, p: Params) -> None: 

577 """Option values for --create-src-snapshots*; reads from ArgumentParser via args.""" 

578 # immutable variables: 

579 self.skip_create_src_snapshots: bool = not args.create_src_snapshots 

580 self.create_src_snapshots_even_if_not_due: bool = args.create_src_snapshots_even_if_not_due 

581 tz_spec: str | None = args.create_src_snapshots_timezone if args.create_src_snapshots_timezone else None 

582 self.tz: tzinfo | None = get_timezone(tz_spec) 

583 self.current_datetime: datetime = current_datetime(tz_spec) 

584 self.timeformat: str = args.create_src_snapshots_timeformat 

585 self.anchors: PeriodAnchors = PeriodAnchors.parse(args) 

586 

587 # Compute the schedule for upcoming periodic time events (suffix_durations). This event schedule is also used in 

588 # daemon mode via sleep_until_next_daemon_iteration() 

589 suffixes: list[str] = [] 

590 labels: list[SnapshotLabel] = [] 

591 create_src_snapshots_plan: str = args.create_src_snapshots_plan or str({"bzfs": {"onsite": {"adhoc": 1}}}) 

592 for org, target_periods in ast.literal_eval(create_src_snapshots_plan).items(): 

593 for target, periods in target_periods.items(): 

594 for period_unit, period_amount in periods.items(): # e.g. period_unit can be "10minutely" or "minutely" 

595 if not isinstance(period_amount, int) or period_amount < 0: 

596 die(f"--create-src-snapshots-plan: Period amount must be a non-negative integer: {period_amount}") 

597 if period_amount > 0: 

598 suffix: str = nsuffix(period_unit) 

599 suffixes.append(suffix) 

600 labels.append(SnapshotLabel(prefix=nprefix(org), infix=ninfix(target), timestamp="", suffix=suffix)) 

601 xperiods: SnapshotPeriods = p.xperiods 

602 if self.skip_create_src_snapshots: 

603 duration_amount, duration_unit = p.xperiods.suffix_to_duration0(p.daemon_frequency) 

604 if duration_amount <= 0 or not duration_unit: 

605 die(f"Invalid --daemon-frequency: {p.daemon_frequency}") 

606 suffixes = [nsuffix(p.daemon_frequency)] 

607 labels = [] 

608 suffix_durations: dict[str, tuple[int, str]] = {suffix: xperiods.suffix_to_duration1(suffix) for suffix in suffixes} 

609 

610 def suffix_key(suffix: str) -> tuple[int, str]: 

611 duration_amount, duration_unit = suffix_durations[suffix] 

612 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0) 

613 if suffix.endswith(("hourly", "minutely", "secondly")): 

614 if duration_milliseconds != 0 and 86400 * 1000 % duration_milliseconds != 0: 

615 die( 

616 "Invalid --create-src-snapshots-plan: Period duration should be a divisor of 86400 seconds " 

617 f"without remainder so that snapshots will be created at the same time of day every day: {suffix}" 

618 ) 

619 if suffix.endswith("monthly"): 

620 if duration_amount != 0 and 12 % duration_amount != 0: 

621 die( 

622 "Invalid --create-src-snapshots-plan: Period duration should be a divisor of 12 months " 

623 f"without remainder so that snapshots will be created at the same time every year: {suffix}" 

624 ) 

625 return duration_milliseconds, suffix 

626 

627 suffixes = sorted(suffixes, key=suffix_key, reverse=True) # take snapshots for dailies before hourlies, and so on 

628 self.suffix_durations: dict[str, tuple[int, str]] = {suffix: suffix_durations[suffix] for suffix in suffixes} # sort 

629 suffix_indexes: dict[str, int] = {suffix: k for k, suffix in enumerate(suffixes)} 

630 labels.sort(key=lambda label: (suffix_indexes[label.suffix], label)) # take snapshots for dailies before hourlies 

631 self._snapshot_labels: list[SnapshotLabel] = labels 

632 for label in self.snapshot_labels(): 

633 label.validate_label("--create-src-snapshots-plan ") 

634 

635 def snapshot_labels(self) -> list[SnapshotLabel]: 

636 """Returns the snapshot name patterns for which snapshots shall be created.""" 

637 timeformat: str = self.timeformat 

638 is_millis: bool = timeformat.endswith("%F") # non-standard hack to append milliseconds 

639 if is_millis: 

640 timeformat = timeformat[0:-1] + "f" # replace %F with %f (append microseconds) 

641 timestamp: str = self.current_datetime.strftime(timeformat) 

642 if is_millis: 

643 timestamp = timestamp[0 : -len("000")] # replace microseconds with milliseconds 

644 timestamp = timestamp.replace("+", "z") # zfs CLI does not accept the '+' character in snapshot names 

645 return [SnapshotLabel(label.prefix, label.infix, timestamp, label.suffix) for label in self._snapshot_labels] 

646 

647 def __repr__(self) -> str: 

648 return str(self.__dict__) 

649 

650 

651############################################################################# 

652@dataclass(frozen=True) 

653class AlertConfig: 

654 """Thresholds controlling when alerts fire for snapshot age.""" 

655 

656 kind: Literal["Latest", "Oldest"] 

657 warning_millis: int 

658 critical_millis: int 

659 

660 

661############################################################################# 

662@dataclass(frozen=True) 

663class MonitorSnapshotAlert: 

664 """Alert configuration for a single monitored snapshot label.""" 

665 

666 label: SnapshotLabel 

667 latest: AlertConfig | None 

668 oldest: AlertConfig | None 

669 

670 

671############################################################################# 

672class MonitorSnapshotsConfig: 

673 """Option values for --monitor-snapshots*, that is, policy describing which snapshots to monitor for staleness.""" 

674 

675 def __init__(self, args: argparse.Namespace, p: Params) -> None: 

676 """Reads from ArgumentParser via args.""" 

677 # immutable variables: 

678 self.monitor_snapshots: dict = ast.literal_eval(args.monitor_snapshots) 

679 self.dont_warn: bool = args.monitor_snapshots_dont_warn 

680 self.dont_crit: bool = args.monitor_snapshots_dont_crit 

681 self.no_latest_check: bool = args.monitor_snapshots_no_latest_check 

682 self.no_oldest_check: bool = args.monitor_snapshots_no_oldest_check 

683 alerts: list[MonitorSnapshotAlert] = [] 

684 xperiods: SnapshotPeriods = p.xperiods 

685 for org, target_periods in self.monitor_snapshots.items(): 

686 prefix: str = nprefix(org) 

687 for target, periods in target_periods.items(): 

688 for period_unit, alert_dicts in periods.items(): # e.g. period_unit can be "10minutely" or "minutely" 

689 label = SnapshotLabel(prefix=prefix, infix=ninfix(target), timestamp="", suffix=nsuffix(period_unit)) 

690 alert_latest, alert_oldest = None, None 

691 for alert_type, alert_dict in alert_dicts.items(): 

692 m = "--monitor-snapshots: " 

693 if alert_type not in ["latest", "oldest"]: 

694 die(f"{m}'{alert_type}' must be 'latest' or 'oldest' within {args.monitor_snapshots}") 

695 warning_millis: int = 0 

696 critical_millis: int = 0 

697 cycles: int = 1 

698 for kind, value in alert_dict.items(): 

699 context: str = args.monitor_snapshots 

700 if kind == "warning": 

701 warning_millis = max(0, parse_duration_to_milliseconds(str(value), context=context)) 

702 elif kind == "critical": 

703 critical_millis = max(0, parse_duration_to_milliseconds(str(value), context=context)) 

704 elif kind == "cycles": 

705 cycles = max(0, int(value)) 

706 else: 

707 die(f"{m}'{kind}' must be 'warning', 'critical' or 'cycles' within {context}") 

708 if warning_millis > 0 or critical_millis > 0: 

709 duration_amount, duration_unit = xperiods.suffix_to_duration1(label.suffix) 

710 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0) 

711 warning_millis += 0 if warning_millis <= 0 else cycles * duration_milliseconds 

712 critical_millis += 0 if critical_millis <= 0 else cycles * duration_milliseconds 

713 warning_millis = UNIX_TIME_INFINITY_SECS if warning_millis <= 0 else warning_millis 

714 critical_millis = UNIX_TIME_INFINITY_SECS if critical_millis <= 0 else critical_millis 

715 capitalized_alert_type = cast(Literal["Latest", "Oldest"], sys.intern(alert_type.capitalize())) 

716 alert_config = AlertConfig(capitalized_alert_type, warning_millis, critical_millis) 

717 if alert_type == "latest": 

718 if not self.no_latest_check: 

719 alert_latest = alert_config 

720 else: 

721 assert alert_type == "oldest" 

722 if not self.no_oldest_check: 

723 alert_oldest = alert_config 

724 if alert_latest is not None or alert_oldest is not None: 

725 alerts.append(MonitorSnapshotAlert(label, alert_latest, alert_oldest)) 

726 

727 def alert_sort_key(alert: MonitorSnapshotAlert) -> tuple[int, SnapshotLabel]: 

728 duration_amount, duration_unit = xperiods.suffix_to_duration1(alert.label.suffix) 

729 duration_milliseconds: int = duration_amount * xperiods.suffix_milliseconds.get(duration_unit, 0) 

730 return duration_milliseconds, alert.label 

731 

732 alerts.sort(key=alert_sort_key, reverse=True) # check snapshots for dailies before hourlies, and so on 

733 self.alerts: list[MonitorSnapshotAlert] = alerts 

734 self.enable_monitor_snapshots: bool = len(alerts) > 0 

735 

736 def __repr__(self) -> str: 

737 return str(self.__dict__) 

738 

739 

740############################################################################# 

741def _fix_send_recv_opts( 

742 opts: list[str], 

743 exclude_long_opts: set[str], 

744 exclude_short_opts: str, 

745 include_arg_opts: set[str], 

746 exclude_arg_opts: frozenset[str] = frozenset(), 

747 preserve_properties: frozenset[str] = frozenset(), 

748) -> tuple[list[str], list[str]]: 

749 """These opts are instead managed via bzfs CLI args --dryrun, etc.""" 

750 assert "-" not in exclude_short_opts 

751 results: list[str] = [] 

752 x_names: set[str] = set(preserve_properties) 

753 i = 0 

754 n = len(opts) 

755 while i < n: 

756 opt: str = opts[i] 

757 i += 1 

758 if opt in exclude_arg_opts: # example: {"-X", "--exclude"} 

759 i += 1 

760 continue 

761 elif opt in include_arg_opts: # example: {"-o", "-x"} 

762 results.append(opt) 

763 if i < n: 

764 if opt == "-o" and "=" in opts[i] and opts[i].split("=", 1)[0] in preserve_properties: 

765 die(f"--preserve-properties: Disallowed ZFS property found in --zfs-recv-program-opt(s): -o {opts[i]}") 

766 if opt == "-x": 

767 x_names.discard(opts[i]) 

768 results.append(opts[i]) 

769 i += 1 

770 elif opt not in exclude_long_opts: # example: {"--dryrun", "--verbose"} 

771 if opt.startswith("-") and opt != "-" and not opt.startswith("--"): 

772 for char in exclude_short_opts: # example: "den" 

773 opt = opt.replace(char, "") 

774 if opt == "-": 

775 continue 

776 results.append(opt) 

777 return results, sorted(x_names) 

778 

779 

780SSH_MASTER_DOMAIN_SOCKET_FILE_PID_REGEX: re.Pattern[str] = re.compile(r"^[0-9]+") # see socket_name in local_ssh_command() 

781 

782 

783def _delete_stale_files( 

784 root_dir: str, 

785 prefix: str, 

786 millis: int = 60 * 60 * 1000, 

787 dirs: bool = False, 

788 exclude: str | None = None, 

789 ssh: bool = False, 

790) -> None: 

791 """Cleans up obsolete files; For example caused by abnormal termination, OS crash.""" 

792 seconds: float = millis / 1000 

793 now: float = time.time() 

794 validate_is_not_a_symlink("", root_dir) 

795 for entry in os.scandir(root_dir): 

796 if entry.name == exclude or not entry.name.startswith(prefix): 

797 continue 

798 try: 

799 if ((dirs and entry.is_dir()) or (not dirs and not entry.is_dir())) and now - entry.stat().st_mtime >= seconds: 

800 if dirs: 

801 shutil.rmtree(entry.path, ignore_errors=True) 

802 elif not (ssh and stat.S_ISSOCK(entry.stat().st_mode)): 

803 os.remove(entry.path) 

804 elif match := SSH_MASTER_DOMAIN_SOCKET_FILE_PID_REGEX.match(entry.name[len(prefix) :]): 804 ↛ 795line 804 didn't jump to line 795 because the condition on line 804 was always true

805 pid: int = int(match.group(0)) 

806 if pid_exists(pid) is False or now - entry.stat().st_mtime >= 31 * 24 * 60 * 60: 

807 os.remove(entry.path) # bzfs process is nomore alive hence its ssh master process isn't alive either 

808 except FileNotFoundError: 

809 pass # harmless 

810 

811 

812def _create_symlink(src: str, dst_dir: str, dst: str) -> None: 

813 """Creates dst symlink pointing to src using a relative path.""" 

814 rel_path: str = os.path.relpath(src, start=dst_dir) 

815 os.symlink(src=rel_path, dst=os.path.join(dst_dir, dst))