Coverage for bzfs_main / bzfs_jobrunner.py: 99%

668 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-24 10:16 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15# Inline script metadata conforming to https://packaging.python.org/specifications/inline-script-metadata 

16# /// script 

17# requires-python = ">=3.9" 

18# dependencies = [] 

19# /// 

20# 

21""" 

22* High-level orchestrator that calls `bzfs` as part of complex, periodic workflows to manage backup, replication, and pruning 

23 jobs across a fleet of multiple source and destination hosts; driven by a fleet-wide job config file 

24 (e.g., `bzfs_job_example.py`). 

25* Overview of the bzfs_jobrunner.py codebase: 

26* The codebase starts with docs, definition of input data and associated argument parsing of CLI options/parameters. 

27* Control flow starts in main(), far below ..., which kicks off a "Job". 

28* A Job creates zero or more "subjobs" for each local or remote host, via run_main(). 

29* It executes the subjobs, serially or in parallel, via run_subjobs(), which in turn delegates parallel job coordination to 

30 bzfs.process_datasets_in_parallel_and_fault_tolerant(). 

31* README_bzfs_jobrunner.md is mostly auto-generated from the ArgumentParser help texts as the source of "truth", via 

32 update_readme.sh. Simply run that script whenever you change or add ArgumentParser help text. 

33""" 

34 

35from __future__ import ( 

36 annotations, 

37) 

38import argparse 

39import contextlib 

40import os 

41import platform 

42import pwd 

43import random 

44import socket 

45import subprocess 

46import sys 

47import threading 

48import time 

49import uuid 

50from ast import ( 

51 literal_eval, 

52) 

53from collections.abc import ( 

54 Iterable, 

55) 

56from logging import ( 

57 Logger, 

58) 

59from subprocess import ( 

60 DEVNULL, 

61 PIPE, 

62) 

63from typing import ( 

64 Any, 

65 Final, 

66 NoReturn, 

67 TypeVar, 

68 Union, 

69 final, 

70) 

71 

72import bzfs_main.argparse_actions 

73from bzfs_main import ( 

74 bzfs, 

75) 

76from bzfs_main.argparse_cli import ( 

77 PROG_AUTHOR, 

78) 

79from bzfs_main.detect import ( 

80 DUMMY_DATASET, 

81) 

82from bzfs_main.loggers import ( 

83 get_simple_logger, 

84 reset_logger, 

85 set_logging_runtime_defaults, 

86) 

87from bzfs_main.util import ( 

88 check_range, 

89 utils, 

90) 

91from bzfs_main.util.parallel_tasktree import ( 

92 BARRIER_CHAR, 

93) 

94from bzfs_main.util.parallel_tasktree_policy import ( 

95 process_datasets_in_parallel_and_fault_tolerant, 

96) 

97from bzfs_main.util.utils import ( 

98 DIE_STATUS, 

99 LOG_TRACE, 

100 UMASK, 

101 JobStats, 

102 Subprocesses, 

103 TaskTiming, 

104 dry, 

105 format_dict, 

106 format_obj, 

107 getenv_bool, 

108 human_readable_duration, 

109 percent, 

110 shuffle_dict, 

111 terminate_process_subtree, 

112 termination_signal_handler, 

113 validate_dataset_name, 

114) 

115from bzfs_main.util.utils import PROG_NAME as BZFS_PROG_NAME 

116 

117# constants: 

118PROG_NAME: Final[str] = "bzfs_jobrunner" 

119SRC_MAGIC_SUBSTITUTION_TOKEN: Final[str] = "^SRC_HOST" # noqa: S105 

120DST_MAGIC_SUBSTITUTION_TOKEN: Final[str] = "^DST_HOST" # noqa: S105 

121SEP: Final[str] = "," 

122POSIX_END_OF_OPTIONS_MARKER: Final[str] = "--" # args following -- are treated as operands, even if they begin with a hyphen 

123 

124 

125def argument_parser() -> argparse.ArgumentParser: 

126 """Returns the CLI parser used by bzfs_jobrunner.""" 

127 # fmt: off 

128 parser = argparse.ArgumentParser( 

129 prog=PROG_NAME, 

130 allow_abbrev=False, 

131 formatter_class=argparse.RawTextHelpFormatter, 

132 description=f""" 

133This program is a convenience wrapper around [bzfs](README.md) that simplifies periodic ZFS snapshot creation, replication, 

134pruning, and monitoring, across a fleet of N source hosts and M destination hosts, using a single fleet-wide shared 

135[jobconfig](bzfs_tests/bzfs_job_example.py) script. 

136For example, this simplifies the deployment of an efficient geo-replicated backup service where each of the M destination 

137hosts is located in a separate geographic region and receives replicas from (the same set of) N source hosts. It also 

138simplifies low latency replication from a primary to a secondary or to M read replicas, or backup to removable drives, etc. 

139 

140This program can be used to efficiently replicate ... 

141 

142a) within a single machine (local mode), or 

143 

144b) from a single source host to one or more destination hosts (pull or push or pull-push mode), or 

145 

146c) from multiple source hosts to a single destination host (pull or push or pull-push mode), or 

147 

148d) from N source hosts to M destination hosts (pull or push or pull-push mode, N and M can be large, M=2 or M=3 are typical 

149geo-replication factors) 

150 

151You can run this program on a single third-party host and have that talk to all source hosts and destination hosts, which is 

152convenient for basic use cases and for testing. 

153However, typically, a cron job on each source host runs `{PROG_NAME}` periodically to create new snapshots (via 

154--create-src-snapshots) and prune outdated snapshots and bookmarks on the source (via --prune-src-snapshots and 

155--prune-src-bookmarks), whereas another cron job on each destination host runs `{PROG_NAME}` periodically to prune 

156outdated destination snapshots (via --prune-dst-snapshots), and to replicate the recently created snapshots from the source 

157to the destination (via --replicate). 

158Yet another cron job on each source and each destination runs `{PROG_NAME}` periodically to alert the user if the latest or 

159oldest snapshot is somehow too old (via --monitor-src-snapshots and --monitor-dst-snapshots). 

160The frequency of these periodic activities can vary by activity, and is typically every second, minute, hour, day, week, 

161month and/or year (or multiples thereof). 

162 

163Edit the jobconfig script in a central place (e.g. versioned in a git repo), then copy the (very same) shared file onto all 

164source hosts and all destination hosts, and add crontab entries (or systemd timers or Monit entries or similar), along these 

165lines: 

166 

167* crontab on source hosts: 

168 

169`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --create-src-snapshots --prune-src-snapshots --prune-src-bookmarks` 

170 

171`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --monitor-src-snapshots` 

172 

173 

174* crontab on destination hosts: 

175 

176`* * * * * testuser /etc/bzfs/bzfs_job_example.py --dst-host="$(hostname)" --replicate --prune-dst-snapshots` 

177 

178`* * * * * testuser /etc/bzfs/bzfs_job_example.py --dst-host="$(hostname)" --monitor-dst-snapshots` 

179 

180 

181### Applying Actions to a Subset of Hosts 

182 

183`--src-host` and `--dst-host` are ways to tell `{PROG_NAME}` to only perform actions for a specified subset of source hosts 

184and destination hosts, e.g. only replicate from a certain subset of source hosts to a certain subset of destination hosts. 

185Each `{PROG_NAME}` invocation will do all actions that apply to the final effective value of `--src-hosts` and `--dst-hosts`, 

186after the subsetting step has been applied using the `--src-host` and `--dst-host` parameters. 

187 

188 

189### High Frequency Replication (Experimental Feature) 

190 

191Taking snapshots, and/or replicating, from every N milliseconds to every 10 seconds or so is considered high frequency. For 

192such use cases, consider that `zfs list -t snapshot` performance degrades as more and more snapshots currently exist within 

193the selected datasets, so try to keep the number of currently existing snapshots small, and prune them at a frequency that 

194is proportional to the frequency with which snapshots are created. Consider using `--skip-parent` and `--exclude-dataset*` 

195filters to limit the selected datasets only to those that require this level of frequency. 

196 

197In addition, use the `--daemon-*` options to reduce startup overhead, in combination with splitting the crontab entry (or 

198better: high frequency systemd timer) into multiple processes, from a single source host to a single destination host, 

199along these lines: 

200 

201* crontab on source hosts: 

202 

203`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --create-src-snapshots` 

204 

205`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --prune-src-snapshots` 

206 

207`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --prune-src-bookmarks` 

208 

209`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --monitor-src-snapshots` 

210 

211 

212* crontab on destination hosts: 

213 

214`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="bar" --dst-host="$(hostname)" --replicate` 

215 

216`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="bar" --dst-host="$(hostname)" --prune-dst-snapshots` 

217 

218`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="bar" --dst-host="$(hostname)" --monitor-dst-snapshots` 

219 

220The daemon processes work like non-daemon processes except that they loop, handle time events and sleep between events, and 

221finally exit after, say, 86400 seconds (whatever you specify via `--daemon-lifetime`). The daemons will subsequently be 

222auto-restarted by 'cron', or earlier if they fail. While the daemons are running, 'cron' will attempt to start new 

223(unnecessary) daemons but this is benign as these new processes immediately exit with a message like this: 

224"Exiting as same previous periodic job is still running without completion yet" 

225""") 

226 

227 # commands: 

228 parser.add_argument( 

229 "--create-src-snapshots", action="store_true", 

230 help="Take snapshots on the selected source hosts as necessary. Typically, this command should be called by a " 

231 "program (or cron job) running on each src host.\n\n") 

232 parser.add_argument( 

233 "--replicate", action="store_true", 

234 help="Replicate snapshots from the selected source hosts to the selected destinations hosts as necessary. For pull " 

235 "mode (recommended), this command should be called by a program (or cron job) running on each dst " 

236 "host; for push mode, on the src host; for pull-push mode on a third-party host.\n\n") 

237 parser.add_argument( 

238 "--prune-src-snapshots", action="store_true", 

239 help="Prune snapshots on the selected source hosts as necessary. Typically, this command should be called by a " 

240 "program (or cron job) running on each src host.\n\n") 

241 parser.add_argument( 

242 "--prune-src-bookmarks", action="store_true", 

243 help="Prune bookmarks on the selected source hosts as necessary. Typically, this command should be called by a " 

244 "program (or cron job) running on each src host.\n\n") 

245 parser.add_argument( 

246 "--prune-dst-snapshots", action="store_true", 

247 help="Prune snapshots on the selected destination hosts as necessary. Typically, this command should be called by a " 

248 "program (or cron job) running on each dst host.\n\n") 

249 parser.add_argument( 

250 "--monitor-src-snapshots", action="store_true", 

251 help="Alert the user if snapshots on the selected source hosts are too old, using --monitor-snapshot-plan (see " 

252 "below). Typically, this command should be called by a program (or cron job) running on each src host.\n\n") 

253 parser.add_argument( 

254 "--monitor-dst-snapshots", action="store_true", 

255 help="Alert the user if snapshots on the selected destination hosts are too old, using --monitor-snapshot-plan (see " 

256 "below). Typically, this command should be called by a program (or cron job) running on each dst host.\n\n") 

257 

258 # options: 

259 parser.add_argument( 

260 "--localhost", default=None, action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING", 

261 help="Hostname of localhost. Default is the hostname without the domain name, querying the Operating System.\n\n") 

262 parser.add_argument( 

263 "--src-hosts", default=None, metavar="LIST_STRING", 

264 help="Hostnames of the sources to operate on.\n\n") 

265 parser.add_argument( 

266 "--src-host", default=None, action="append", metavar="STRING", 

267 help="For subsetting --src-hosts; Can be specified multiple times; Indicates to only use the --src-hosts that are " 

268 "contained in the specified --src-host values (optional).\n\n" 

269 "Example: `--src-host=src1 --src-host=src2 --src-hosts=\"['src1', 'src2', 'src3', 'src4']\"` indicates to " 

270 "effectively only use `--src-hosts=\"['src1', 'src2']\"`.\n\n") 

271 dst_hosts_example = {"nas": ["onsite"], "bak-us-west": ["us-west"], 

272 "bak-eu-west": ["eu-west"], "archive": ["offsite"]} 

273 parser.add_argument( 

274 "--dst-hosts", default="{}", metavar="DICT_STRING", 

275 help="Dictionary that maps each destination hostname to a list of zero or more logical replication target names " 

276 "(the infix portion of snapshot name). As hostname use the real output of the `hostname` CLI. " 

277 "The target is an arbitrary user-defined name that serves as an abstraction of the destination hostnames for " 

278 "a group of snapshots, like target 'onsite', 'offsite', 'hotspare', a geographically independent datacenter like " 

279 "'us-west', or similar. Rather than the snapshot name embedding (i.e. hardcoding) a list of destination " 

280 "hostnames where it should be sent to, the snapshot name embeds the user-defined target name, which is later " 

281 "mapped by this jobconfig to a list of destination hostnames.\n\n" 

282 f"Example: `{format_dict(dst_hosts_example)}`.\n\n" 

283 "With this, given a snapshot name, we can find the destination hostnames to which the snapshot shall be " 

284 "replicated. Also, given a snapshot name and its own name, a destination host can determine if it shall " 

285 "replicate the given snapshot from the source host, or if the snapshot is intended for another destination " 

286 "host, in which case it skips the snapshot. A destination host will receive replicas of snapshots for all " 

287 "targets that map to that destination host.\n\n" 

288 "Removing a mapping can be used to temporarily suspend replication to a given destination host.\n\n") 

289 parser.add_argument( 

290 "--dst-host", default=None, action="append", metavar="STRING", 

291 help="For subsetting --dst-hosts; Can be specified multiple times; Indicates to only use the --dst-hosts keys that " 

292 "are contained in the specified --dst-host values (optional).\n\n" 

293 "Example: " 

294 "`--dst-host=dst1 --dst-host=dst2 --dst-hosts=\"{'dst1': ..., 'dst2': ..., 'dst3': ..., 'dst4': ...}\"` " 

295 "indicates to effectively only use `--dst-hosts=\"{'dst1': ..., 'dst2': ...}\"`.\n\n") 

296 parser.add_argument( 

297 "--retain-dst-targets", default="{}", metavar="DICT_STRING", 

298 help="Dictionary that maps each destination hostname to a list of zero or more logical replication target names " 

299 "(the infix portion of snapshot name).\n\n" 

300 f"Example: `{format_dict(dst_hosts_example)}`. Has same format as --dst-hosts.\n\n" 

301 "As part of --prune-dst-snapshots, a destination host will delete any snapshot it has stored whose target has " 

302 "no mapping to that destination host in this dictionary. Do not remove a mapping here unless you are sure it's " 

303 "ok to delete all those snapshots on that destination host! If in doubt, use --dryrun mode first.\n\n") 

304 dst_root_datasets_example = { 

305 "nas": "tank2/bak", 

306 "bak-us-west": "backups/bak001", 

307 "bak-eu-west": "backups/bak999", 

308 "archive": f"archives/zoo/{SRC_MAGIC_SUBSTITUTION_TOKEN}", 

309 "hotspare": "", 

310 } 

311 parser.add_argument( 

312 "--dst-root-datasets", default="{}", metavar="DICT_STRING", 

313 help="Dictionary that maps each destination hostname to a root dataset located on that destination host. The root " 

314 "dataset name is an (optional) prefix that will be prepended to each dataset that is replicated to that " 

315 "destination host. For backup use cases, this is the backup ZFS pool or a ZFS dataset path within that pool, " 

316 "whereas for cloning, master slave replication, or replication from a primary to a secondary, this can also be " 

317 "the empty string.\n\n" 

318 f"`{SRC_MAGIC_SUBSTITUTION_TOKEN}` and `{DST_MAGIC_SUBSTITUTION_TOKEN}` are optional magic substitution tokens " 

319 "that will be auto-replaced at runtime with the actual hostname. This can be used to force the use of a " 

320 "separate destination root dataset per source host or per destination host.\n\n" 

321 f"Example: `{format_dict(dst_root_datasets_example)}`\n\n") 

322 src_snapshot_plan_example = { 

323 "prod": { 

324 "onsite": {"secondly": 40, "minutely": 40, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5}, 

325 "us-west": {"secondly": 0, "minutely": 0, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5}, 

326 "eu-west": {"secondly": 0, "minutely": 0, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5}, 

327 }, 

328 "test": { 

329 "offsite": {"12hourly": 42, "weekly": 12}, 

330 }, 

331 } 

332 parser.add_argument( 

333 "--src-snapshot-plan", default="{}", metavar="DICT_STRING", 

334 help="Retention periods for snapshots to be used if pruning src, and when creating new snapshots on src. " 

335 "Snapshots that do not match a retention period will be deleted. A zero or missing retention period indicates " 

336 "that no snapshots shall be retained (or even be created) for the given period.\n\n" 

337 f"Example: `{format_dict(src_snapshot_plan_example)}`. This example will, for the organization 'prod' and " 

338 "the intended logical target 'onsite', create and then retain secondly snapshots that were created less " 

339 "than 40 seconds ago, yet retain the latest 40 secondly snapshots regardless of creation time. Analog for " 

340 "the latest 40 minutely snapshots, 36 hourly snapshots, etc. " 

341 "It will also create and retain snapshots for the targets 'us-west' and 'eu-west' within the 'prod' " 

342 "organization. " 

343 "In addition, it will create and retain snapshots every 12 hours and every week for the 'test' organization, " 

344 "and name them as being intended for the 'offsite' replication target. " 

345 "The example creates snapshots with names like " 

346 "`prod_onsite_<timestamp>_secondly`, `prod_onsite_<timestamp>_minutely`, " 

347 "`prod_us-west_<timestamp>_hourly`, `prod_us-west_<timestamp>_daily`, " 

348 "`prod_eu-west_<timestamp>_hourly`, `prod_eu-west_<timestamp>_daily`, " 

349 "`test_offsite_<timestamp>_12hourly`, `test_offsite_<timestamp>_weekly`, and so on.\n\n") 

350 parser.add_argument( 

351 "--src-bookmark-plan", default="{}", metavar="DICT_STRING", 

352 help="Retention periods for bookmarks to be used if pruning src. Has same format as --src-snapshot-plan.\n\n") 

353 parser.add_argument( 

354 "--dst-snapshot-plan", default="{}", metavar="DICT_STRING", 

355 help="Retention periods for snapshots to be used if pruning dst. Has same format as --src-snapshot-plan.\n\n") 

356 monitor_snapshot_plan_example = { 

357 "prod": { 

358 "onsite": { 

359 "100millisecondly": {"warning": "650 milliseconds", "critical": "2 seconds"}, 

360 "secondly": {"warning": "2 seconds", "critical": "14 seconds"}, 

361 "minutely": {"warning": "30 seconds", "critical": "300 seconds"}, 

362 "hourly": {"warning": "30 minutes", "critical": "300 minutes"}, 

363 "daily": {"warning": "4 hours", "critical": "8 hours"}, 

364 "weekly": {"warning": "2 days", "critical": "8 days"}, 

365 "monthly": {"warning": "2 days", "critical": "8 days"}, 

366 "yearly": {"warning": "5 days", "critical": "14 days"}, 

367 "10minutely": {"warning": "0 minutes", "critical": "0 minutes"}, 

368 }, 

369 "": { 

370 "daily": {"warning": "4 hours", "critical": "8 hours"}, 

371 }, 

372 }, 

373 } 

374 parser.add_argument( 

375 "--monitor-snapshot-plan", default="{}", metavar="DICT_STRING", 

376 help="Alert the user if the ZFS 'creation' time property of the latest or oldest snapshot for any specified " 

377 "snapshot pattern within the selected datasets is too old wrt. the specified age limit. The purpose is to " 

378 "check if snapshots are successfully taken on schedule, successfully replicated on schedule, and successfully " 

379 "pruned on schedule. " 

380 "Process exit code is 0, 1, 2 on OK, WARNING, CRITICAL, respectively.\n\n" 

381 f"Example DICT_STRING: `{format_dict(monitor_snapshot_plan_example)}`. " 

382 "This example alerts the user if the *latest* src or dst snapshot named `prod_onsite_<timestamp>_hourly` is " 

383 "more than 30 minutes late (i.e. more than 30+60=90 minutes old) [warning] or more than 300 minutes late (i.e. " 

384 "more than 300+60=360 minutes old) [critical]. In addition, the example alerts the user if the *oldest* src or " 

385 "dst snapshot named `prod_onsite_<timestamp>_hourly` is more than 30 + 60x36 minutes old [warning] or more " 

386 "than 300 + 60x36 minutes old [critical], where 36 is the number of period cycles specified in " 

387 "`src_snapshot_plan` or `dst_snapshot_plan`, respectively. " 

388 "Analog for the latest snapshot named `prod_<timestamp>_daily`, and so on.\n\n" 

389 "Note: A duration that is missing or zero (e.g. '0 minutes') indicates that no snapshots shall be checked for " 

390 "the given snapshot name pattern.\n\n") 

391 locations = ["src", "dst"] 

392 for loc in locations: 

393 parser.add_argument( 

394 f"--ssh-{loc}-user", default="", metavar="STRING", 

395 help=f"Remote SSH username on {loc} hosts to connect to (optional). Examples: 'root', 'alice'.\n\n") 

396 for loc in locations: 

397 parser.add_argument( 

398 f"--ssh-{loc}-port", type=int, min=1, max=65535, action=check_range.CheckRange, metavar="INT", 

399 help=f"Remote SSH port on {loc} host to connect to (optional).\n\n") 

400 for loc in locations: 

401 parser.add_argument( 

402 f"--ssh-{loc}-config-file", type=str, action=bzfs_main.argparse_actions.SSHConfigFileNameAction, metavar="FILE", 

403 help=f"Path to SSH ssh_config(5) file to connect to {loc} (optional); will be passed into ssh -F CLI. " 

404 "The basename must contain the substring 'bzfs_ssh_config'.\n\n") 

405 parser.add_argument( 

406 "--job-id", required=True, action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING", 

407 help="The identifier that remains constant across all runs of this particular job; will be included in the log file " 

408 "name infix. Example: mytestjob\n\n") 

409 parser.add_argument( 

410 "--job-run", default="", action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING", 

411 help="The identifier of this particular run of the overall job; will be included in the log file name suffix. " 

412 "Default is a hex UUID. Example: 0badc0f003a011f0a94aef02ac16083c\n\n") 

413 workers_default = 100 # percent 

414 parser.add_argument( 

415 "--workers", min=1, default=(workers_default, True), action=bzfs_main.argparse_actions.CheckPercentRange, 

416 metavar="INT[%]", 

417 help="The maximum number of jobs to run in parallel at any time; can be given as a positive integer, " 

418 f"optionally followed by the %% percent character (min: %(min)s, default: {workers_default}%%). Percentages " 

419 "are relative to the number of CPU cores on the machine. Example: 200%% uses twice as many parallel jobs as " 

420 "there are cores on the machine; 75%% uses num_procs = num_cores * 0.75. Examples: 1, 4, 75%%, 150%%\n\n") 

421 parser.add_argument( 

422 "--work-period-seconds", type=float, min=0, default=0, action=check_range.CheckRange, metavar="FLOAT", 

423 help="Reduces bandwidth spikes by spreading out the start of worker jobs over this much time; " 

424 "0 disables this feature (default: %(default)s). Examples: 0, 60, 86400\n\n") 

425 parser.add_argument( 

426 "--jitter", action="store_true", 

427 help="Randomize job start time and host order to avoid potential thundering herd problems in large distributed " 

428 "systems (optional). Randomizing job start time is only relevant if --work-period-seconds > 0.\n\n") 

429 parser.add_argument( 

430 "--worker-timeout-seconds", type=float, min=0.001, default=None, action=check_range.CheckRange, metavar="FLOAT", 

431 help="If this much time has passed after a worker process has started executing, kill the straggling worker " 

432 "(optional). Other workers remain unaffected. Examples: 60, 3600\n\n") 

433 parser.add_argument( 

434 "--spawn-process-per-job", action="store_true", 

435 help="Spawn a Python process per subjob instead of a Python thread per subjob (optional). The former is only " 

436 "recommended for a job operating in parallel on a large number of hosts as it helps avoid exceeding " 

437 "per-process limits such as the default max number of open file descriptors, at the expense of increased " 

438 "startup latency.\n\n") 

439 parser.add_argument( 

440 "--jobrunner-dryrun", action="store_true", 

441 help="Do a dry run (aka 'no-op') to print what operations would happen if the command were to be executed " 

442 "for real (optional). This option treats both the ZFS source and destination as read-only. Can also be used to " 

443 "check if the configuration options are valid.\n\n") 

444 parser.add_argument( 

445 "--jobrunner-log-level", choices=["CRITICAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"], default="INFO", 

446 help="Only emit jobrunner messages with equal or higher priority than this log level. Default is '%(default)s'.\n\n") 

447 parser.add_argument( 

448 "--daemon-replication-frequency", default="minutely", metavar="STRING", 

449 help="Specifies how often the bzfs daemon shall replicate from src to dst if --daemon-lifetime is nonzero.\n\n") 

450 parser.add_argument( 

451 "--daemon-prune-src-frequency", default="minutely", metavar="STRING", 

452 help="Specifies how often the bzfs daemon shall prune src if --daemon-lifetime is nonzero.\n\n") 

453 parser.add_argument( 

454 "--daemon-prune-dst-frequency", default="minutely", metavar="STRING", 

455 help="Specifies how often the bzfs daemon shall prune dst if --daemon-lifetime is nonzero.\n\n") 

456 parser.add_argument( 

457 "--daemon-monitor-snapshots-frequency", default="minutely", metavar="STRING", 

458 help="Specifies how often the bzfs daemon shall monitor snapshot age if --daemon-lifetime is nonzero.\n\n") 

459 bad_opts = ["--daemon-frequency", "--include-snapshot-plan", "--create-src-snapshots-plan", "--skip-replication", 

460 "--include-snapshot-regex", "--exclude-snapshot-regex", "--include-snapshot-times-and-ranks", 

461 "--log-file-prefix", "--log-file-infix", "--log-file-suffix", 

462 "--delete-dst-datasets", "--delete-dst-snapshots", "--delete-dst-snapshots-except", 

463 "--delete-dst-snapshots-except-plan", "--delete-empty-dst-datasets", 

464 "--monitor-snapshots", "--timeout"] 

465 for loc in locations: 

466 bad_opts += [f"--ssh-{loc}-host"] # reject this arg as jobrunner will auto-generate it 

467 for bad_opt in bad_opts: 

468 parser.add_argument(bad_opt, action=RejectArgumentAction, nargs=0, help=argparse.SUPPRESS) 

469 parser.add_argument( 

470 "--version", action="version", version=f"{PROG_NAME}-{bzfs_main.argparse_cli.__version__}, by {PROG_AUTHOR}", 

471 help="Display version information and exit.\n\n") 

472 parser.add_argument( 

473 "--help, -h", action="help", # trick to ensure both --help and -h are shown in the help msg 

474 help="Show this help message and exit.\n\n") 

475 parser.add_argument( 

476 "--root-dataset-pairs", required=True, nargs="+", action=bzfs_main.argparse_actions.DatasetPairsAction, 

477 metavar="SRC_DATASET DST_DATASET", 

478 help="Source and destination dataset pairs (excluding usernames and excluding hostnames, which will all be " 

479 "auto-appended later).\n\n") 

480 return parser 

481 # fmt: on 

482 

483 

484############################################################################# 

485def main() -> None: 

486 """API for command line clients.""" 

487 prev_umask: int = os.umask(UMASK) 

488 try: 

489 set_logging_runtime_defaults() 

490 # On CTRL-C and SIGTERM, send signal to all descendant processes to terminate them 

491 termination_event: threading.Event = threading.Event() 

492 with termination_signal_handler(termination_events=[termination_event]): 

493 Job(log=None, termination_event=termination_event).run_main(sys_argv=sys.argv) 

494 finally: 

495 os.umask(prev_umask) # restore prior global state 

496 

497 

498############################################################################# 

499@final 

500class Job: 

501 """Coordinates subjobs per the CLI flags; Each subjob handles one host pair and may run in its own process or thread.""" 

502 

503 def __init__(self, log: Logger | None, termination_event: threading.Event) -> None: 

504 # immutable variables: 

505 self.log_was_None: Final[bool] = log is None 

506 self.log: Final[Logger] = get_simple_logger(PROG_NAME) if log is None else log 

507 self.termination_event: Final[threading.Event] = termination_event 

508 self.timing: Final[TaskTiming] = TaskTiming.make_from(self.termination_event) 

509 self.subprocesses: Final[Subprocesses] = Subprocesses(termination_event.is_set) 

510 self.jobrunner_dryrun: bool = False 

511 self.spawn_process_per_job: bool = False 

512 self.loopback_address: Final[str] = _detect_loopback_address() 

513 

514 # mutable variables: 

515 self.first_exception: int | None = None 

516 self.worst_exception: int | None = None 

517 self.stats: JobStats = JobStats(jobs_all=0) 

518 self.cache_existing_dst_pools: set[str] = set() 

519 self.cache_known_dst_pools: set[str] = set() 

520 

521 self.is_test_mode: bool = False # for testing only 

522 

523 def run_main(self, sys_argv: list[str]) -> None: 

524 """API for Python clients; visible for testing; may become a public API eventually.""" 

525 try: 

526 self._run_main(sys_argv) 

527 finally: 

528 if self.log_was_None: # reset Logger unless it's a Logger outside of our control 528 ↛ exitline 528 didn't return from function 'run_main' because the condition on line 528 was always true

529 reset_logger(self.log) 

530 

531 def _run_main(self, sys_argv: list[str]) -> None: 

532 self.first_exception = None 

533 self.worst_exception = None 

534 log: Logger = self.log 

535 log.info("CLI arguments: %s", " ".join(sys_argv)) 

536 nsp = argparse.Namespace(no_argument_file=True) # disable --root-dataset-pairs='+file' option in DatasetPairsAction 

537 args, unknown_args = argument_parser().parse_known_args(sys_argv[1:], nsp) # forward all unknown args to `bzfs` 

538 log.setLevel(args.jobrunner_log_level) 

539 self.jobrunner_dryrun = args.jobrunner_dryrun 

540 assert len(args.root_dataset_pairs) > 0 

541 src_snapshot_plan: dict = self.validate_snapshot_plan(literal_eval(args.src_snapshot_plan), "--src-snapshot-plan") 

542 src_bookmark_plan: dict = self.validate_snapshot_plan(literal_eval(args.src_bookmark_plan), "--src-bookmark-plan") 

543 dst_snapshot_plan: dict = self.validate_snapshot_plan(literal_eval(args.dst_snapshot_plan), "--dst-snapshot-plan") 

544 monitor_snapshot_plan: dict = self.validate_monitor_snapshot_plan(literal_eval(args.monitor_snapshot_plan)) 

545 localhostname: str = args.localhost if args.localhost else socket.gethostname() 

546 self.validate_host_name(localhostname, "--localhost") 

547 log.debug("localhostname: %s", localhostname) 

548 src_hosts: list[str] = self.validate_src_hosts(self.parse_src_hosts_from_cli_or_stdin(args.src_hosts)) 

549 basis_src_hosts: list[str] = src_hosts 

550 nb_src_hosts: int = len(basis_src_hosts) 

551 log.debug("src_hosts before subsetting: %s", src_hosts) 

552 if args.src_host is not None: # retain only the src hosts that are also contained in args.src_host 

553 assert isinstance(args.src_host, list) 

554 retain_src_hosts: set[str] = set(args.src_host) 

555 self.validate_is_subset(retain_src_hosts, src_hosts, "--src-host", "--src-hosts") 

556 src_hosts = [host for host in src_hosts if host in retain_src_hosts] 

557 dst_hosts: dict[str, list[str]] = self.validate_dst_hosts(literal_eval(args.dst_hosts)) 

558 nb_dst_hosts: int = len(dst_hosts) 

559 if args.dst_host is not None: # retain only the dst hosts that are also contained in args.dst_host 

560 assert isinstance(args.dst_host, list) 

561 retain_dst_hosts: set[str] = set(args.dst_host) 

562 self.validate_is_subset(retain_dst_hosts, dst_hosts.keys(), "--dst-host", "--dst-hosts.keys") 

563 dst_hosts = {dst_host: lst for dst_host, lst in dst_hosts.items() if dst_host in retain_dst_hosts} 

564 retain_dst_targets: dict[str, list[str]] = self.validate_dst_hosts(literal_eval(args.retain_dst_targets)) 

565 self.validate_is_subset(dst_hosts.keys(), retain_dst_targets.keys(), "--dst-hosts.keys", "--retain-dst-targets.keys") 

566 dst_root_datasets: dict[str, str] = self.validate_dst_root_datasets(literal_eval(args.dst_root_datasets)) 

567 self.validate_is_subset( 

568 dst_root_datasets.keys(), retain_dst_targets.keys(), "--dst-root-dataset.keys", "--retain-dst-targets.keys" 

569 ) 

570 self.validate_is_subset(dst_hosts.keys(), dst_root_datasets.keys(), "--dst-hosts.keys", "--dst-root-dataset.keys") 

571 bad_root_datasets: dict[str, str] = { 

572 dst_host: root_dataset 

573 for dst_host in sorted(dst_hosts.keys()) 

574 if SRC_MAGIC_SUBSTITUTION_TOKEN not in (root_dataset := dst_root_datasets[dst_host]) 

575 } 

576 if len(src_hosts) > 1 and len(bad_root_datasets) > 0: 

577 self.die( 

578 "Cowardly refusing to proceed as multiple source hosts must not be configured to write to the same " 

579 "destination dataset. " 

580 f"Problematic subset of --dst-root-datasets: {bad_root_datasets} for src_hosts: {sorted(src_hosts)}" 

581 ) 

582 bad_root_datasets = { 

583 dst_host: root_dataset 

584 for dst_host, root_dataset in sorted(dst_root_datasets.items()) 

585 if root_dataset and SRC_MAGIC_SUBSTITUTION_TOKEN not in root_dataset 

586 } 

587 if len(basis_src_hosts) > 1 and len(bad_root_datasets) > 0: 

588 self.die( 

589 "Cowardly refusing to proceed as multiple source hosts are defined in the configuration, but " 

590 f"not all non-empty root datasets in --dst-root-datasets contain the '{SRC_MAGIC_SUBSTITUTION_TOKEN}' " 

591 "substitution token to prevent collisions on writing destination datasets. " 

592 f"Problematic subset of --dst-root-datasets: {bad_root_datasets} for src_hosts: {sorted(basis_src_hosts)}" 

593 ) 

594 if args.jitter: # randomize host order to avoid potential thundering herd problems in large distributed systems 

595 random.SystemRandom().shuffle(src_hosts) 

596 dst_hosts = shuffle_dict(dst_hosts) 

597 ssh_src_user: str = args.ssh_src_user 

598 ssh_dst_user: str = args.ssh_dst_user 

599 ssh_src_port: int | None = args.ssh_src_port 

600 ssh_dst_port: int | None = args.ssh_dst_port 

601 ssh_src_config_file: str | None = args.ssh_src_config_file 

602 ssh_dst_config_file: str | None = args.ssh_dst_config_file 

603 job_id: str = _sanitize(args.job_id) 

604 job_run: str = _sanitize(args.job_run) if args.job_run else uuid.uuid1().hex 

605 workers, workers_is_percent = args.workers 

606 max_workers: int = max(1, round((os.cpu_count() or 1) * workers / 100.0) if workers_is_percent else round(workers)) 

607 worker_timeout_seconds: int = args.worker_timeout_seconds 

608 self.spawn_process_per_job = args.spawn_process_per_job 

609 username: str = pwd.getpwuid(os.getuid()).pw_name 

610 assert username 

611 loopback_ids: set[str] = {"localhost", "127.0.0.1", "::1", socket.gethostname()} # ::1 is IPv6 loopback address 

612 loopback_ids.update(self.get_localhost_ips()) # union 

613 loopback_ids.add(localhostname) 

614 loopback_ids = set() if getenv_bool("disable_loopback", False) else loopback_ids 

615 log.log(LOG_TRACE, "loopback_ids: %s", sorted(loopback_ids)) 

616 

617 def zero_pad(number: int, width: int = 6) -> str: 

618 """Pads number with leading '0' chars to the given width.""" 

619 return f"{number:0{width}d}" 

620 

621 def jpad(jj: int, tag: str) -> str: 

622 """Returns ``tag`` prefixed with slash and zero padded index.""" 

623 return "/" + zero_pad(jj) + tag 

624 

625 def runpad() -> str: 

626 """Returns standardized subjob count suffix.""" 

627 return job_run + SEP + zero_pad(len(subjobs)) 

628 

629 def update_subjob_name(tag: str) -> str: 

630 """Derives next subjob name based on ``tag`` and index ``j``.""" 

631 if j <= 0: 

632 return subjob_name 

633 elif j == 1: 

634 return subjob_name + jpad(j - 1, tag) 

635 else: 

636 return subjob_name + "/" + BARRIER_CHAR 

637 

638 def resolve_dataset(hostname: str, dataset: str, is_src: bool = True) -> str: 

639 """Returns host:dataset string resolving IPv6 and localhost cases.""" 

640 assert hostname 

641 assert dataset 

642 ssh_user = ssh_src_user if is_src else ssh_dst_user 

643 ssh_user = ssh_user if ssh_user else username 

644 lb: str = self.loopback_address 

645 loopbck_ids: set[str] = loopback_ids 

646 hostname = hostname if hostname not in loopbck_ids else (lb if lb else hostname) if username != ssh_user else "-" 

647 hostname = convert_ipv6(hostname) 

648 return f"{hostname}:{dataset}" 

649 

650 def resolve_dst_dataset(dst_hostname: str, dst_dataset: str) -> str: 

651 """Expands ``dst_dataset`` relative to ``dst_hostname`` roots.""" 

652 assert dst_hostname 

653 assert dst_dataset 

654 root_dataset: str | None = dst_root_datasets.get(dst_hostname) 

655 assert root_dataset is not None, dst_hostname # f"Hostname '{dst_hostname}' missing in --dst-root-datasets" 

656 root_dataset = root_dataset.replace(SRC_MAGIC_SUBSTITUTION_TOKEN, src_host) 

657 root_dataset = root_dataset.replace(DST_MAGIC_SUBSTITUTION_TOKEN, dst_hostname) 

658 resolved_dst_dataset: str = f"{root_dataset}/{dst_dataset}" if root_dataset else dst_dataset 

659 validate_dataset_name(resolved_dst_dataset, dst_dataset) 

660 return resolve_dataset(dst_hostname, resolved_dst_dataset, is_src=False) 

661 

662 for src_host in src_hosts: 

663 assert src_host 

664 for dst_hostname in dst_hosts: 

665 assert dst_hostname 

666 dummy: Final[str] = DUMMY_DATASET 

667 lhn: Final[str] = localhostname 

668 bzfs_prog_header: Final[list[str]] = [BZFS_PROG_NAME, "--no-argument-file"] + unknown_args 

669 subjobs: dict[str, list[str]] = {} 

670 for i, src_host in enumerate(src_hosts): 

671 subjob_name: str = zero_pad(i) + "src-host" 

672 src_log_suffix: str = _log_suffix(localhostname, src_host, "") 

673 j: int = 0 

674 opts: list[str] 

675 

676 if args.create_src_snapshots: 

677 opts = ["--create-src-snapshots", f"--create-src-snapshots-plan={src_snapshot_plan}", "--skip-replication"] 

678 self.add_log_file_opts(opts, "create-src-snapshots", job_id, runpad(), src_log_suffix) 

679 self.add_ssh_opts( 

680 opts, ssh_src_user=ssh_src_user, ssh_src_port=ssh_src_port, ssh_src_config_file=ssh_src_config_file 

681 ) 

682 opts += [POSIX_END_OF_OPTIONS_MARKER] 

683 opts += _flatten(_dedupe([(resolve_dataset(src_host, src), dummy) for src, dst in args.root_dataset_pairs])) 

684 subjob_name += "/create-src-snapshots" 

685 subjobs[subjob_name] = bzfs_prog_header + opts 

686 

687 if args.replicate: 

688 j = 0 

689 marker: str = "replicate" 

690 for dst_hostname, targets in dst_hosts.items(): 

691 opts = self.replication_opts( 

692 dst_snapshot_plan, set(targets), lhn, src_host, dst_hostname, marker, job_id, runpad() 

693 ) 

694 if len(opts) > 0: 

695 opts += [f"--daemon-frequency={args.daemon_replication_frequency}"] 

696 self.add_ssh_opts( 

697 opts, 

698 ssh_src_user=ssh_src_user, 

699 ssh_dst_user=ssh_dst_user, 

700 ssh_src_port=ssh_src_port, 

701 ssh_dst_port=ssh_dst_port, 

702 ssh_src_config_file=ssh_src_config_file, 

703 ssh_dst_config_file=ssh_dst_config_file, 

704 ) 

705 opts += [POSIX_END_OF_OPTIONS_MARKER] 

706 dataset_pairs: list[tuple[str, str]] = [ 

707 (resolve_dataset(src_host, src), resolve_dst_dataset(dst_hostname, dst)) 

708 for src, dst in args.root_dataset_pairs 

709 ] 

710 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs, worker_timeout_seconds) 

711 if len(dataset_pairs) > 0: 

712 subjobs[subjob_name + jpad(j, marker)] = bzfs_prog_header + opts + _flatten(dataset_pairs) 

713 j += 1 

714 subjob_name = update_subjob_name(marker) 

715 

716 def prune_src( 

717 opts: list[str], retention_plan: dict, tag: str, src_host: str = src_host, logsuffix: str = src_log_suffix 

718 ) -> None: 

719 """Creates prune subjob options for ``tag`` using ``retention_plan``.""" 

720 opts += ["--skip-replication", f"--delete-dst-snapshots-except-plan={retention_plan}"] 

721 opts += [f"--daemon-frequency={args.daemon_prune_src_frequency}"] 

722 self.add_log_file_opts(opts, tag, job_id, runpad(), logsuffix) 

723 self.add_ssh_opts( # i.e. dst=src, src=dummy 

724 opts, ssh_dst_user=ssh_src_user, ssh_dst_port=ssh_src_port, ssh_dst_config_file=ssh_src_config_file 

725 ) 

726 opts += [POSIX_END_OF_OPTIONS_MARKER] 

727 opts += _flatten(_dedupe([(dummy, resolve_dataset(src_host, src)) for src, dst in args.root_dataset_pairs])) 

728 nonlocal subjob_name 

729 subjob_name += f"/{tag}" 

730 subjobs[subjob_name] = bzfs_prog_header + opts 

731 

732 if args.prune_src_snapshots: 

733 prune_src(["--delete-dst-snapshots"], src_snapshot_plan, tag="prune-src-snapshots") 

734 

735 if args.prune_src_bookmarks: 

736 prune_src(["--delete-dst-snapshots=bookmarks"], src_bookmark_plan, tag="prune-src-bookmarks") 

737 

738 if args.prune_dst_snapshots: 

739 self.validate_true( 

740 retain_dst_targets, "--retain-dst-targets must not be empty. Cowardly refusing to delete all snapshots!" 

741 ) 

742 j = 0 

743 marker = "prune-dst-snapshots" 

744 for dst_hostname, _ in dst_hosts.items(): 

745 curr_retain_targets: set[str] = set(retain_dst_targets[dst_hostname]) 

746 curr_dst_snapshot_plan = { # only retain targets that belong to the host 

747 org: {target: periods for target, periods in target_periods.items() if target in curr_retain_targets} 

748 for org, target_periods in dst_snapshot_plan.items() 

749 } 

750 opts = ["--delete-dst-snapshots", "--skip-replication"] 

751 opts += [f"--delete-dst-snapshots-except-plan={curr_dst_snapshot_plan}"] 

752 opts += [f"--daemon-frequency={args.daemon_prune_dst_frequency}"] 

753 self.add_log_file_opts(opts, marker, job_id, runpad(), _log_suffix(lhn, src_host, dst_hostname)) 

754 self.add_ssh_opts( 

755 opts, ssh_dst_user=ssh_dst_user, ssh_dst_port=ssh_dst_port, ssh_dst_config_file=ssh_dst_config_file 

756 ) 

757 opts += [POSIX_END_OF_OPTIONS_MARKER] 

758 dataset_pairs = [(dummy, resolve_dst_dataset(dst_hostname, dst)) for src, dst in args.root_dataset_pairs] 

759 dataset_pairs = _dedupe(dataset_pairs) 

760 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs, worker_timeout_seconds) 

761 if len(dataset_pairs) > 0: 

762 subjobs[subjob_name + jpad(j, marker)] = bzfs_prog_header + opts + _flatten(dataset_pairs) 

763 j += 1 

764 subjob_name = update_subjob_name(marker) 

765 

766 def monitor_snapshots_opts(tag: str, monitor_plan: dict, logsuffix: str) -> list[str]: 

767 """Returns monitor subjob options for ``tag`` and ``monitor_plan``.""" 

768 opts = [f"--monitor-snapshots={monitor_plan}", "--skip-replication"] 

769 opts += [f"--daemon-frequency={args.daemon_monitor_snapshots_frequency}"] 

770 self.add_log_file_opts(opts, tag, job_id, runpad(), logsuffix) 

771 return opts 

772 

773 def build_monitor_plan(monitor_plan: dict, snapshot_plan: dict, cycles_prefix: str) -> dict: 

774 """Expands ``monitor_plan`` with cycle defaults from ``snapshot_plan``.""" 

775 

776 def alert_dicts(alertdict: dict, cycles: int) -> dict: 

777 """Returns alert dictionaries with explicit ``cycles`` value.""" 

778 latest_dict = alertdict.copy() 

779 for prefix in ("src_snapshot_", "dst_snapshot_", ""): 

780 latest_dict.pop(f"{prefix}cycles", None) 

781 oldest_dict = latest_dict.copy() 

782 oldest_dict["cycles"] = int(alertdict.get(f"{cycles_prefix}cycles", cycles)) 

783 latest_dict.pop("oldest_skip_holds", None) 

784 return {"latest": latest_dict, "oldest": oldest_dict} 

785 

786 return { 

787 org: { 

788 target: { 

789 periodunit: alert_dicts(alertdict, snapshot_plan.get(org, {}).get(target, {}).get(periodunit, 1)) 

790 for periodunit, alertdict in periods.items() 

791 } 

792 for target, periods in target_periods.items() 

793 } 

794 for org, target_periods in monitor_plan.items() 

795 } 

796 

797 if args.monitor_src_snapshots: 

798 marker = "monitor-src-snapshots" 

799 monitor_plan = build_monitor_plan(monitor_snapshot_plan, src_snapshot_plan, "src_snapshot_") 

800 opts = monitor_snapshots_opts(marker, monitor_plan, src_log_suffix) 

801 self.add_ssh_opts( # i.e. dst=src, src=dummy 

802 opts, ssh_dst_user=ssh_src_user, ssh_dst_port=ssh_src_port, ssh_dst_config_file=ssh_src_config_file 

803 ) 

804 opts += [POSIX_END_OF_OPTIONS_MARKER] 

805 opts += _flatten(_dedupe([(dummy, resolve_dataset(src_host, src)) for src, dst in args.root_dataset_pairs])) 

806 subjob_name += "/" + marker 

807 subjobs[subjob_name] = bzfs_prog_header + opts 

808 

809 if args.monitor_dst_snapshots: 

810 j = 0 

811 marker = "monitor-dst-snapshots" 

812 for dst_hostname, targets in dst_hosts.items(): 

813 monitor_targets: set[str] = set(targets).intersection(set(retain_dst_targets[dst_hostname])) 

814 monitor_plan = { # only retain targets that belong to the host 

815 org: {target: periods for target, periods in target_periods.items() if target in monitor_targets} 

816 for org, target_periods in monitor_snapshot_plan.items() 

817 } 

818 monitor_plan = build_monitor_plan(monitor_plan, dst_snapshot_plan, "dst_snapshot_") 

819 opts = monitor_snapshots_opts(marker, monitor_plan, _log_suffix(lhn, src_host, dst_hostname)) 

820 self.add_ssh_opts( 

821 opts, ssh_dst_user=ssh_dst_user, ssh_dst_port=ssh_dst_port, ssh_dst_config_file=ssh_dst_config_file 

822 ) 

823 opts += [POSIX_END_OF_OPTIONS_MARKER] 

824 dataset_pairs = [(dummy, resolve_dst_dataset(dst_hostname, dst)) for src, dst in args.root_dataset_pairs] 

825 dataset_pairs = _dedupe(dataset_pairs) 

826 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs, worker_timeout_seconds) 

827 if len(dataset_pairs) > 0: 

828 subjobs[subjob_name + jpad(j, marker)] = bzfs_prog_header + opts + _flatten(dataset_pairs) 

829 j += 1 

830 subjob_name = update_subjob_name(marker) 

831 

832 msg = f"Ready to run {len(subjobs)} subjobs using {len(src_hosts)}/{nb_src_hosts} src hosts: {src_hosts}, " 

833 msg += f"{len(dst_hosts)}/{nb_dst_hosts} dst hosts: {list(dst_hosts.keys())}" 

834 log.info("%s", dry(msg, is_dry_run=self.jobrunner_dryrun)) 

835 log.log(LOG_TRACE, "subjobs: \n%s", _pretty_print_formatter(subjobs)) 

836 self.run_subjobs(subjobs, max_workers, worker_timeout_seconds, args.work_period_seconds, args.jitter) 

837 ex = self.worst_exception 

838 if isinstance(ex, int): 

839 assert ex != 0 

840 sys.exit(ex) 

841 assert ex is None, ex 

842 log.info("Succeeded. Bye!") 

843 

844 def replication_opts( 

845 self, 

846 dst_snapshot_plan: dict[str, dict[str, dict[str, int]]], 

847 targets: set[str], 

848 localhostname: str, 

849 src_hostname: str, 

850 dst_hostname: str, 

851 tag: str, 

852 job_id: str, 

853 job_run: str, 

854 ) -> list[str]: 

855 """Returns CLI options for one replication subjob.""" 

856 log = self.log 

857 log.debug("%s", f"Replicating targets {sorted(targets)} from {src_hostname} to {dst_hostname} ...") 

858 include_snapshot_plan = { # only replicate targets that belong to the destination host and are relevant 

859 org: { 

860 target: { 

861 duration_unit: duration_amount 

862 for duration_unit, duration_amount in periods.items() 

863 if duration_amount > 0 

864 } 

865 for target, periods in target_periods.items() 

866 if target in targets 

867 } 

868 for org, target_periods in dst_snapshot_plan.items() 

869 } 

870 include_snapshot_plan = { # only replicate orgs that have at least one relevant target_period 

871 org: target_periods 

872 for org, target_periods in include_snapshot_plan.items() 

873 if any(len(periods) > 0 for target, periods in target_periods.items()) 

874 } 

875 opts: list[str] = [] 

876 if len(include_snapshot_plan) > 0: 

877 opts += [f"--include-snapshot-plan={include_snapshot_plan}"] 

878 self.add_log_file_opts(opts, tag, job_id, job_run, _log_suffix(localhostname, src_hostname, dst_hostname)) 

879 return opts 

880 

881 def skip_nonexisting_local_dst_pools( 

882 self, root_dataset_pairs: list[tuple[str, str]], timeout_secs: float | None = None 

883 ) -> list[tuple[str, str]]: 

884 """Skip datasets that point to removable destination drives that are not currently (locally) attached, if any.""" 

885 

886 def zpool(dataset: str) -> str: 

887 """Returns pool name portion of ``dataset``.""" 

888 return dataset.split("/", 1)[0] 

889 

890 assert len(root_dataset_pairs) > 0 

891 unknown_dst_pools = {zpool(dst) for src, dst in root_dataset_pairs} 

892 unknown_dst_pools = unknown_dst_pools.difference(self.cache_known_dst_pools) 

893 

894 # Here we treat a zpool as existing if the zpool isn't local, aka if it isn't prefixed with "-:". A remote host 

895 # will raise an appropriate error if it turns out that the remote zpool doesn't actually exist. 

896 unknown_local_dst_pools = {pool for pool in unknown_dst_pools if pool.startswith("-:")} 

897 if len(unknown_local_dst_pools) > 0: # `zfs list` if local 

898 existing_pools = {pool[len("-:") :] for pool in unknown_local_dst_pools} 

899 cmd = "zfs list -t filesystem,volume -Hp -o name".split(" ") + sorted(existing_pools) 

900 sp = subprocess.run(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, text=True, timeout=timeout_secs) 

901 if sp.returncode not in (0, 1): # 1 means dataset not found 

902 self.die(f"Unexpected error {sp.returncode} on checking for existing local dst pools: {sp.stderr.strip()}") 

903 existing_pools = {"-:" + pool for pool in sp.stdout.splitlines() if pool} 

904 self.cache_existing_dst_pools.update(existing_pools) # union 

905 unknown_remote_dst_pools = unknown_dst_pools.difference(unknown_local_dst_pools) 

906 self.cache_existing_dst_pools.update(unknown_remote_dst_pools) # union 

907 self.cache_known_dst_pools.update(unknown_dst_pools) # union 

908 results: list[tuple[str, str]] = [] 

909 for src, dst in root_dataset_pairs: 

910 if zpool(dst) in self.cache_existing_dst_pools: 

911 results.append((src, dst)) 

912 else: 

913 self.log.warning("Skipping dst dataset for which local dst pool does not exist: %s", dst) 

914 return results 

915 

916 @staticmethod 

917 def add_ssh_opts( 

918 opts: list[str], 

919 *, 

920 ssh_src_user: str | None = None, 

921 ssh_dst_user: str | None = None, 

922 ssh_src_port: int | None = None, 

923 ssh_dst_port: int | None = None, 

924 ssh_src_config_file: str | None = None, 

925 ssh_dst_config_file: str | None = None, 

926 ) -> None: 

927 """Appends ssh related options to ``opts`` if specified.""" 

928 assert isinstance(opts, list) 

929 opts += [f"--ssh-src-user={ssh_src_user}"] if ssh_src_user else [] 

930 opts += [f"--ssh-dst-user={ssh_dst_user}"] if ssh_dst_user else [] 

931 opts += [f"--ssh-src-port={ssh_src_port}"] if ssh_src_port is not None else [] 

932 opts += [f"--ssh-dst-port={ssh_dst_port}"] if ssh_dst_port is not None else [] 

933 opts += [f"--ssh-src-config-file={ssh_src_config_file}"] if ssh_src_config_file else [] 

934 opts += [f"--ssh-dst-config-file={ssh_dst_config_file}"] if ssh_dst_config_file else [] 

935 

936 @staticmethod 

937 def add_log_file_opts(opts: list[str], tag: str, job_id: str, job_run: str, logsuffix: str) -> None: 

938 """Appends standard log-file CLI options to ``opts``.""" 

939 opts += [f"--log-file-prefix={PROG_NAME}{SEP}{tag}{SEP}"] 

940 opts += [f"--log-file-infix={SEP}{job_id}"] 

941 opts += [f"--log-file-suffix={SEP}{job_run}{logsuffix}{SEP}"] 

942 

943 def run_subjobs( 

944 self, 

945 subjobs: dict[str, list[str]], 

946 max_workers: int, 

947 timeout_secs: float | None, 

948 work_period_seconds: float, 

949 jitter: bool, 

950 ) -> None: 

951 """Executes subjobs sequentially or in parallel, respecting '~' barriers. 

952 

953 Design note on subjob failure, isolation and termination: 

954 - On subjob failure the subjob's subtree is skipped. 

955 - Subjob failures are converted to return codes (not exceptions) so the policy does not invoke a termination handler 

956 on such failures and sibling subjobs continue unaffected. This preserves per-subjob isolation, both within a single 

957 process (thread-per-subjob mode; default) as well as in process-per-subjob mode (``--spawn-process-per-job``). 

958 - The first failure is retained for diagnostics. 

959 - Process exit code precedence is: fatal/non-monitor failure > monitor CRITICAL/WARNING > STILL_RUNNING > success. 

960 """ 

961 self.stats = JobStats(len(subjobs)) 

962 log = self.log 

963 num_intervals = 1 + len(subjobs) if jitter else len(subjobs) 

964 interval_nanos = 0 if len(subjobs) == 0 else round(1_000_000_000 * max(0.0, work_period_seconds) / num_intervals) 

965 assert interval_nanos >= 0 

966 if jitter: # randomize job start time to avoid potential thundering herd problems in large distributed systems 

967 sleep_nanos = random.SystemRandom().randint(0, interval_nanos) 

968 log.info("Jitter: Delaying job start time by sleeping for %s ...", human_readable_duration(sleep_nanos)) 

969 self.timing.sleep(sleep_nanos) # allows early wakeup on async termination 

970 sorted_subjobs: list[str] = sorted(subjobs.keys()) 

971 spawn_process_per_job: bool = self.spawn_process_per_job 

972 log.log(LOG_TRACE, "%s: %s", "spawn_process_per_job", spawn_process_per_job) 

973 if process_datasets_in_parallel_and_fault_tolerant( 

974 log=log, 

975 datasets=sorted_subjobs, 

976 process_dataset=lambda subjob, tid, retry: self.run_subjob( 

977 subjobs[subjob], name=subjob, timeout_secs=timeout_secs, spawn_process_per_job=spawn_process_per_job 

978 ) 

979 == 0, 

980 skip_tree_on_error=lambda subjob: True, 

981 skip_on_error="dataset", 

982 max_workers=max_workers, 

983 interval_nanos=lambda last_update_nanos, dataset, submit_count: interval_nanos, 

984 timing=self.timing, 

985 termination_handler=self.subprocesses.terminate_process_subtrees, 

986 task_name="Subjob", 

987 dry_run=False, 

988 is_test_mode=self.is_test_mode, 

989 ): 

990 self.first_exception = DIE_STATUS if self.first_exception is None else self.first_exception 

991 self.worst_exception = self.get_worst_exception(self.worst_exception, DIE_STATUS) 

992 stats = self.stats 

993 jobs_skipped = stats.jobs_all - stats.jobs_started 

994 msg = f"{stats}, skipped:" + percent(jobs_skipped, total=stats.jobs_all, print_total=True) 

995 log.info("Final Progress: %s", msg) 

996 assert stats.jobs_running == 0, msg 

997 assert stats.jobs_completed == stats.jobs_started, msg 

998 skipped_jobs_dict = {subjob: subjobs[subjob] for subjob in sorted_subjobs if subjob not in stats.started_job_names} 

999 if len(skipped_jobs_dict) > 0: 

1000 log.debug("Skipped subjobs: \n%s", _pretty_print_formatter(skipped_jobs_dict)) 

1001 assert jobs_skipped == len(skipped_jobs_dict), msg 

1002 

1003 def run_subjob( 

1004 self, cmd: list[str], name: str, timeout_secs: float | None, spawn_process_per_job: bool 

1005 ) -> int | None: # thread-safe 

1006 """Executes one worker job and updates shared Stats.""" 

1007 start_time_nanos = time.monotonic_ns() 

1008 returncode = None 

1009 log = self.log 

1010 cmd_str = " ".join(cmd) 

1011 stats = self.stats 

1012 try: 

1013 msg: str = stats.submit_job(name) 

1014 log.log(LOG_TRACE, "Starting worker job: %s", cmd_str) 

1015 log.info("Progress: %s", msg) 

1016 start_time_nanos = time.monotonic_ns() 

1017 if spawn_process_per_job: 

1018 returncode = self.run_worker_job_spawn_process_per_job(cmd, timeout_secs) 

1019 else: 

1020 returncode = self.run_worker_job_in_current_thread(cmd, timeout_secs) 

1021 except BaseException as e: 

1022 log.error("Worker job failed with unexpected exception: %s for command: %s", e, cmd_str) 

1023 raise 

1024 else: 

1025 elapsed_human: str = human_readable_duration(time.monotonic_ns() - start_time_nanos) 

1026 if returncode != 0: 

1027 with stats.lock: 

1028 if self.first_exception is None: 

1029 self.first_exception = DIE_STATUS if returncode is None else returncode 

1030 self.worst_exception = self.get_worst_exception(self.worst_exception, returncode) 

1031 log.error("Worker job failed with exit code %s in %s: %s", returncode, elapsed_human, cmd_str) 

1032 else: 

1033 log.debug("Worker job succeeded in %s: %s", elapsed_human, cmd_str) 

1034 return returncode 

1035 finally: 

1036 msg = stats.complete_job(failed=returncode != 0, elapsed_nanos=time.monotonic_ns() - start_time_nanos) 

1037 log.info("Progress: %s", msg) 

1038 

1039 def run_worker_job_in_current_thread(self, cmd: list[str], timeout_secs: float | None) -> int | None: 

1040 """Runs ``bzfs`` in-process and return its exit code.""" 

1041 log = self.log 

1042 if timeout_secs is not None: 

1043 cmd = cmd[0:1] + [f"--timeout={round(1000 * timeout_secs)}milliseconds"] + cmd[1:] 

1044 try: 

1045 if not self.jobrunner_dryrun: 

1046 self._bzfs_run_main(cmd) 

1047 return 0 

1048 except subprocess.CalledProcessError as e: 

1049 return bzfs.normalize_called_process_error(e) 

1050 except SystemExit as e: 

1051 assert e.code is None or isinstance(e.code, int) 

1052 return e.code 

1053 except BaseException: 

1054 log.exception("Worker job failed with unexpected exception for command: %s", " ".join(cmd)) 

1055 return DIE_STATUS 

1056 

1057 def _bzfs_run_main(self, cmd: list[str]) -> None: 

1058 """Delegates execution to :mod:`bzfs` using parsed arguments.""" 

1059 bzfs_job = bzfs.Job(termination_event=self.termination_event) 

1060 bzfs_job.is_test_mode = self.is_test_mode 

1061 bzfs_job.run_main(bzfs.argument_parser().parse_args(cmd[1:]), cmd) 

1062 

1063 def run_worker_job_spawn_process_per_job(self, cmd: list[str], timeout_secs: float | None) -> int | None: 

1064 """Spawns a subprocess for the worker job and waits for completion.""" 

1065 log = self.log 

1066 if len(cmd) > 0 and cmd[0] == BZFS_PROG_NAME: 

1067 cmd = [sys.executable, "-m", "bzfs_main." + cmd[0]] + cmd[1:] 

1068 if self.jobrunner_dryrun: 

1069 return 0 

1070 

1071 with self.subprocesses.popen_and_track(cmd, stdin=subprocess.DEVNULL, text=True) as proc: 

1072 try: 

1073 if self.termination_event.is_set(): 

1074 timeout_secs = 1.0 if timeout_secs is None else timeout_secs 

1075 raise subprocess.TimeoutExpired(cmd, timeout_secs) # do not wait for normal completion 

1076 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to complete and exit normally 

1077 except subprocess.TimeoutExpired: 

1078 cmd_str = " ".join(cmd) 

1079 if self.termination_event.is_set(): 

1080 log.error("%s", f"Terminating worker job due to async termination request: {cmd_str}") 

1081 else: 

1082 log.error("%s", f"Terminating worker job as it failed to complete within {timeout_secs}s: {cmd_str}") 

1083 proc.terminate() # Sends SIGTERM signal to job subprocess 

1084 assert timeout_secs is not None 

1085 timeout_secs = min(1.0, timeout_secs) 

1086 try: 

1087 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to exit 

1088 except subprocess.TimeoutExpired: 

1089 log.error("%s", f"Killing worker job as it failed to terminate within {timeout_secs}s: {cmd_str}") 

1090 terminate_process_subtree(root_pids=[proc.pid]) # Send SIGTERM to process subtree 

1091 proc.kill() # Sends SIGKILL signal to job subprocess because SIGTERM wasn't enough 

1092 timeout_secs = min(0.025, timeout_secs) 

1093 with contextlib.suppress(subprocess.TimeoutExpired): 

1094 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to exit 

1095 return proc.returncode 

1096 

1097 @staticmethod 

1098 def get_worst_exception(existing_code: int | None, new_code: int | None) -> int: 

1099 """Process exit code precedence is: fatal/non-monitor failure > monitor CRITICAL/WARNING > STILL_RUNNING > success.""" 

1100 new_code = DIE_STATUS if new_code is None else new_code 

1101 if existing_code is None: 

1102 return new_code 

1103 assert existing_code is not None 

1104 assert new_code is not None 

1105 

1106 nonfatal: tuple[int, ...] = (0, bzfs.WARNING_STATUS, bzfs.CRITICAL_STATUS, bzfs.STILL_RUNNING_STATUS) 

1107 existing_is_fatal = existing_code not in nonfatal 

1108 new_is_fatal = new_code not in nonfatal 

1109 if existing_is_fatal and new_is_fatal: 

1110 return max(existing_code, new_code) 

1111 if existing_is_fatal or new_is_fatal: 

1112 return existing_code if existing_is_fatal else new_code 

1113 

1114 monitor: tuple[int, ...] = (bzfs.WARNING_STATUS, bzfs.CRITICAL_STATUS) 

1115 existing_is_monitor = existing_code in monitor 

1116 new_is_monitor = new_code in monitor 

1117 if existing_is_monitor and new_is_monitor: 

1118 return max(existing_code, new_code) 

1119 if existing_is_monitor or new_is_monitor: 

1120 return existing_code if existing_is_monitor else new_code 

1121 

1122 assert existing_code in (0, bzfs.STILL_RUNNING_STATUS), existing_code 

1123 assert new_code in (0, bzfs.STILL_RUNNING_STATUS), new_code 

1124 return max(existing_code, new_code) 

1125 

1126 def validate_src_hosts(self, src_hosts: list[str]) -> list[str]: 

1127 """Checks ``src_hosts`` contains valid hostnames.""" 

1128 context = "--src-hosts" 

1129 self.validate_type(src_hosts, list, context) 

1130 for src_hostname in src_hosts: 

1131 self.validate_host_name(src_hostname, context) 

1132 return src_hosts 

1133 

1134 def validate_dst_hosts(self, dst_hosts: dict[str, list[str]]) -> dict[str, list[str]]: 

1135 """Checks destination hosts dictionary.""" 

1136 context = "--dst-hosts" 

1137 self.validate_type(dst_hosts, dict, context) 

1138 for dst_hostname, targets in dst_hosts.items(): 

1139 self.validate_host_name(dst_hostname, context) 

1140 self.validate_type(targets, list, f"{context} targets") 

1141 for target in targets: 

1142 self.validate_type(target, str, f"{context} target") 

1143 return dst_hosts 

1144 

1145 def validate_dst_root_datasets(self, dst_root_datasets: dict[str, str]) -> dict[str, str]: 

1146 """Checks that each destination root dataset string is valid.""" 

1147 context = "--dst-root-datasets" 

1148 self.validate_type(dst_root_datasets, dict, context) 

1149 for dst_hostname, dst_root_dataset in dst_root_datasets.items(): 

1150 self.validate_host_name(dst_hostname, context) 

1151 self.validate_type(dst_root_dataset, str, f"{context} root dataset") 

1152 return dst_root_datasets 

1153 

1154 def validate_snapshot_plan( 

1155 self, snapshot_plan: dict[str, dict[str, dict[str, int]]], context: str 

1156 ) -> dict[str, dict[str, dict[str, int]]]: 

1157 """Checks snapshot plan structure and value types.""" 

1158 self.validate_type(snapshot_plan, dict, context) 

1159 for org, target_periods in snapshot_plan.items(): 

1160 self.validate_type(org, str, f"{context} org") 

1161 self.validate_type(target_periods, dict, f"{context} target_periods") 

1162 for target, periods in target_periods.items(): 

1163 self.validate_type(target, str, f"{context} org/target") 

1164 self.validate_type(periods, dict, f"{context} org/periods") 

1165 for period_unit, period_amount in periods.items(): 

1166 self.validate_non_empty_string(period_unit, f"{context} org/target/period_unit") 

1167 self.validate_non_negative_int(period_amount, f"{context} org/target/period_amount") 

1168 return snapshot_plan 

1169 

1170 def validate_monitor_snapshot_plan( 

1171 self, monitor_snapshot_plan: dict[str, dict[str, dict[str, dict[str, str | int]]]] 

1172 ) -> dict[str, dict[str, dict[str, dict[str, str | int]]]]: 

1173 """Checks snapshot monitoring plan configuration.""" 

1174 context = "--monitor-snapshot-plan" 

1175 self.validate_type(monitor_snapshot_plan, dict, context) 

1176 for org, target_periods in monitor_snapshot_plan.items(): 

1177 self.validate_type(org, str, f"{context} org") 

1178 self.validate_type(target_periods, dict, f"{context} target_periods") 

1179 for target, periods in target_periods.items(): 

1180 self.validate_type(target, str, f"{context} org/target") 

1181 self.validate_type(periods, dict, f"{context} org/periods") 

1182 for period_unit, alert_dict in periods.items(): 

1183 self.validate_non_empty_string(period_unit, f"{context} org/target/period_unit") 

1184 self.validate_type(alert_dict, dict, f"{context} org/target/alert_dict") 

1185 for key, value in alert_dict.items(): 

1186 self.validate_non_empty_string(key, f"{context} org/target/alert_dict/key") 

1187 self.validate_type(value, Union[str, int], f"{context} org/target/alert_dict/value") 

1188 return monitor_snapshot_plan 

1189 

1190 def validate_is_subset(self, x: Iterable[str], y: Iterable[str], x_name: str, y_name: str) -> None: 

1191 """Raises error if ``x`` contains an item not present in ``y``.""" 

1192 if isinstance(x, str) or not isinstance(x, Iterable): 

1193 self.die(f"{x_name} must be an Iterable") 

1194 if isinstance(y, str) or not isinstance(y, Iterable): 

1195 self.die(f"{y_name} must be an Iterable") 

1196 if not set(x).issubset(set(y)): 

1197 diff = sorted(set(x).difference(set(y))) 

1198 self.die(f"{x_name} must be a subset of {y_name}. diff: {diff}, {x_name}: {sorted(x)}, {y_name}: {sorted(y)}") 

1199 

1200 def validate_host_name(self, hostname: str, context: str) -> None: 

1201 """Checks host name string.""" 

1202 self.validate_non_empty_string(hostname, f"{context} hostname") 

1203 bzfs.validate_host_name(hostname, context) 

1204 

1205 def validate_non_empty_string(self, value: str, name: str) -> None: 

1206 """Checks that ``value`` is a non-empty string.""" 

1207 self.validate_type(value, str, name) 

1208 if not value: 

1209 self.die(f"{name} must not be empty!") 

1210 

1211 def validate_non_negative_int(self, value: int, name: str) -> None: 

1212 """Checks ``value`` is an int >= 0.""" 

1213 self.validate_type(value, int, name) 

1214 if value < 0: 

1215 self.die(f"{name} must be a non-negative integer: {value}") 

1216 

1217 def validate_true(self, expr: Any, msg: str) -> None: 

1218 """Raises error if ``expr`` evaluates to ``False``.""" 

1219 if not bool(expr): 

1220 self.die(msg) 

1221 

1222 def validate_type(self, value: Any, expected_type: Any, name: str) -> None: 

1223 """Checks ``value`` is instance of ``expected_type`` or union thereof.""" 

1224 if hasattr(expected_type, "__origin__") and expected_type.__origin__ is Union: # for compat with python < 3.10 

1225 union_types = expected_type.__args__ 

1226 for t in union_types: 

1227 if isinstance(value, t): 

1228 return 

1229 type_msg = " or ".join([t.__name__ for t in union_types]) 

1230 self.die(f"{name} must be of type {type_msg} but got {type(value).__name__}: {value}") 

1231 elif not isinstance(value, expected_type): 

1232 self.die(f"{name} must be of type {expected_type.__name__} but got {type(value).__name__}: {value}") 

1233 

1234 def parse_src_hosts_from_cli_or_stdin(self, raw_src_hosts: str | None) -> list: 

1235 """Resolve --src-hosts from CLI or stdin with robust TTY/empty handling.""" 

1236 if raw_src_hosts is None: 

1237 # If stdin is an interactive TTY, don't block waiting for input; fail clearly instead 

1238 try: 

1239 is_tty: bool = getattr(sys.stdin, "isatty", lambda: False)() 

1240 except Exception: 

1241 is_tty = False 

1242 if is_tty: 

1243 self.die("Missing --src-hosts and stdin is a TTY. Provide --src-hosts or pipe the list on stdin.") 

1244 stdin_text: str = sys.stdin.read() 

1245 if not stdin_text.strip(): # avoid literal_eval("") SyntaxError and provide a clear message 

1246 self.die("Missing --src-hosts and stdin is empty. Provide --src-hosts or pipe a list on stdin.") 

1247 raw_src_hosts = stdin_text 

1248 try: 

1249 value = literal_eval(raw_src_hosts) 

1250 except Exception as e: 

1251 self.die(f"Invalid --src-hosts format: {e} for input: {raw_src_hosts}") 

1252 if not isinstance(value, list): 

1253 example: str = format_obj(["hostname1", "hostname2"]) 

1254 self.die(f"Invalid --src-hosts: expected a Python list literal, e.g. {example} but got: {format_obj(value)}") 

1255 return value 

1256 

1257 def die(self, msg: str) -> NoReturn: 

1258 """Log ``msg`` and exit the program.""" 

1259 self.log.error("%s", msg) 

1260 utils.die(msg) 

1261 

1262 def get_localhost_ips(self) -> set[str]: 

1263 """Returns all network addresses of the local host, i.e. all configured addresses on all network interfaces, without 

1264 depending on name resolution.""" 

1265 ips: set[str] = set() 

1266 if platform.system() == "Linux": 

1267 try: 

1268 proc = subprocess.run(["hostname", "-I"], stdin=DEVNULL, stdout=PIPE, text=True, check=True) # noqa: S607 

1269 except Exception as e: 

1270 self.log.warning("Cannot run 'hostname -I' on localhost: %s", e) 

1271 else: 

1272 ips = {ip for ip in proc.stdout.strip().split() if ip} 

1273 self.log.log(LOG_TRACE, "localhost_ips: %s", sorted(ips)) 

1274 return ips 

1275 

1276 

1277############################################################################# 

1278@final 

1279class RejectArgumentAction(argparse.Action): 

1280 """An argparse Action that immediately fails if it is ever triggered.""" 

1281 

1282 def __call__( 

1283 self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Any, option_string: str | None = None 

1284 ) -> None: 

1285 """Abort argument parsing if a protected option is seen.""" 

1286 parser.error(f"Security: Overriding protected argument '{option_string}' is not allowed.") 

1287 

1288 

1289############################################################################# 

1290def _dedupe(root_dataset_pairs: list[tuple[str, str]]) -> list[tuple[str, str]]: 

1291 """Returns a list with duplicate dataset pairs removed while preserving order.""" 

1292 return list(dict.fromkeys(root_dataset_pairs)) 

1293 

1294 

1295_T = TypeVar("_T") 

1296 

1297 

1298def _flatten(root_dataset_pairs: Iterable[Iterable[_T]]) -> list[_T]: 

1299 """Flattens an iterable of pairs into a single list.""" 

1300 return [item for pair in root_dataset_pairs for item in pair] 

1301 

1302 

1303def _sanitize(filename: str) -> str: 

1304 """Replaces potentially problematic characters in ``filename`` with '!'.""" 

1305 for s in (" ", "..", "/", "\\", SEP): 

1306 filename = filename.replace(s, "!") 

1307 return filename 

1308 

1309 

1310def _log_suffix(localhostname: str, src_hostname: str, dst_hostname: str) -> str: 

1311 """Returns a log file suffix in a format that contains the given hostnames.""" 

1312 return f"{SEP}{_sanitize(localhostname)}{SEP}{_sanitize(src_hostname)}{SEP}{_sanitize(dst_hostname)}" 

1313 

1314 

1315def _pretty_print_formatter(dictionary: dict[str, Any]) -> Any: 

1316 """Lazy JSON formatter used to avoid overhead in disabled log levels.""" 

1317 

1318 @final 

1319 class PrettyPrintFormatter: 

1320 """Wrapper returning formatted JSON on ``str`` conversion.""" 

1321 

1322 def __str__(self) -> str: 

1323 import json 

1324 

1325 return json.dumps(dictionary, indent=4, sort_keys=True) 

1326 

1327 return PrettyPrintFormatter() 

1328 

1329 

1330def _detect_loopback_address() -> str: 

1331 """Detects if a loopback connection over IPv4 or IPv6 is possible.""" 

1332 try: 

1333 addr = "127.0.0.1" 

1334 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 

1335 s.bind((addr, 0)) 

1336 return addr 

1337 except OSError: 

1338 pass 

1339 

1340 try: 

1341 addr = "::1" 

1342 with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: 

1343 s.bind((addr, 0)) 

1344 return addr 

1345 except OSError: 

1346 pass 

1347 

1348 return "" 

1349 

1350 

1351def convert_ipv6(hostname: str) -> str: 

1352 """Supports IPv6 without getting confused by host:dataset colon separator and any colons that may be part of a (valid) 

1353 ZFS dataset name.""" 

1354 return hostname.replace(":", "|") # Also see bzfs.convert_ipv6() for the reverse conversion 

1355 

1356 

1357############################################################################# 

1358if __name__ == "__main__": 1358 ↛ 1359line 1358 didn't jump to line 1359 because the condition on line 1358 was never true

1359 main()