Coverage for bzfs_main / bzfs_jobrunner.py: 99%

676 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-29 12:49 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15# Inline script metadata conforming to https://packaging.python.org/specifications/inline-script-metadata 

16# /// script 

17# requires-python = ">=3.9" 

18# dependencies = [] 

19# /// 

20# 

21""" 

22* High-level orchestrator that calls `bzfs` as part of complex, periodic workflows to manage backup, replication, and pruning 

23 jobs across a fleet of multiple source and destination hosts; driven by a fleet-wide job config file 

24 (e.g., `bzfs_job_testbed.py`). 

25* Overview of the bzfs_jobrunner.py codebase: 

26* The codebase starts with docs, definition of input data and associated argument parsing of CLI options/parameters. 

27* Control flow starts in main(), far below ..., which kicks off a "Job". 

28* A Job creates zero or more "subjobs" for each local or remote host, via run_main(). 

29* It executes the subjobs, serially or in parallel, via run_subjobs(), which in turn delegates parallel job coordination to 

30 bzfs.process_datasets_in_parallel_and_fault_tolerant(). 

31* README_bzfs_jobrunner.md is mostly auto-generated from the ArgumentParser help texts as the source of "truth", via 

32 update_readme.sh. Simply run that script whenever you change or add ArgumentParser help text. 

33""" 

34 

35from __future__ import ( 

36 annotations, 

37) 

38import argparse 

39import contextlib 

40import os 

41import platform 

42import pwd 

43import random 

44import socket 

45import subprocess 

46import sys 

47import threading 

48import time 

49import uuid 

50from ast import ( 

51 literal_eval, 

52) 

53from collections.abc import ( 

54 Iterable, 

55) 

56from logging import ( 

57 Logger, 

58) 

59from subprocess import ( 

60 DEVNULL, 

61 PIPE, 

62) 

63from typing import ( 

64 Any, 

65 Final, 

66 NoReturn, 

67 TypeVar, 

68 Union, 

69 final, 

70) 

71 

72import bzfs_main.argparse_actions 

73from bzfs_main import ( 

74 bzfs, 

75) 

76from bzfs_main.argparse_cli import ( 

77 PROG_AUTHOR, 

78) 

79from bzfs_main.detect import ( 

80 DUMMY_DATASET, 

81) 

82from bzfs_main.loggers import ( 

83 get_simple_logger, 

84 reset_logger, 

85 set_logging_runtime_defaults, 

86) 

87from bzfs_main.util import ( 

88 check_range, 

89 utils, 

90) 

91from bzfs_main.util.parallel_tasktree import ( 

92 BARRIER_CHAR, 

93) 

94from bzfs_main.util.parallel_tasktree_policy import ( 

95 process_datasets_in_parallel_and_fault_tolerant, 

96) 

97from bzfs_main.util.utils import ( 

98 DIE_STATUS, 

99 LOG_TRACE, 

100 UMASK, 

101 UNIX_TIME_INFINITY_SECS, 

102 JobStats, 

103 Subprocesses, 

104 TaskTiming, 

105 dry, 

106 format_dict, 

107 format_obj, 

108 getenv_bool, 

109 human_readable_duration, 

110 percent, 

111 shuffle_dict, 

112 terminate_process_subtree, 

113 termination_signal_handler, 

114 validate_dataset_name, 

115) 

116from bzfs_main.util.utils import PROG_NAME as BZFS_PROG_NAME 

117 

118# constants: 

119PROG_NAME: Final[str] = "bzfs_jobrunner" 

120SRC_MAGIC_SUBSTITUTION_TOKEN: Final[str] = "^SRC_HOST" # noqa: S105 

121DST_MAGIC_SUBSTITUTION_TOKEN: Final[str] = "^DST_HOST" # noqa: S105 

122SEP: Final[str] = "," 

123POSIX_END_OF_OPTIONS_MARKER: Final[str] = "--" # args following -- are treated as operands, even if they begin with a hyphen 

124 

125 

126def argument_parser() -> argparse.ArgumentParser: 

127 """Returns the CLI parser used by bzfs_jobrunner.""" 

128 # fmt: off 

129 parser = argparse.ArgumentParser( 

130 prog=PROG_NAME, 

131 allow_abbrev=False, 

132 formatter_class=argparse.RawTextHelpFormatter, 

133 description=f""" 

134This companion program wraps [bzfs](README.md) for periodic snapshot creation, replication, pruning, and monitoring across 

135N source hosts and M destination hosts, using one shared fleet-wide [jobconfig](bzfs_testbed/bzfs_job_testbed.py) script. 

136 

137Typical use cases include geo-replicated backup where each destination host is in a different region and receives replicas 

138from the same set of source hosts, low-latency replication from a primary to a secondary or to M read replicas, and backups 

139to removable drives. 

140 

141This program can be used to efficiently replicate ... 

142 

143a) within a single machine (local mode), or 

144 

145b) from a single source host to one or more destination hosts (pull or push or pull-push mode), or 

146 

147c) from multiple source hosts to a single destination host (pull or push or pull-push mode), or 

148 

149d) from N source hosts to M destination hosts (pull or push or pull-push mode, N and M can be large, M=2 or M=3 are typical 

150geo-replication factors) 

151 

152You can run this program on a single third-party host and have it talk to all source and destination hosts. That setup is 

153convenient for basic use cases and testing, and efficient with `--r2r=pull` or `--r2r=push`. 

154In many deployments, a cron job on each source host runs `{PROG_NAME}` periodically to create new snapshots (via 

155--create-src-snapshots) and prune outdated snapshots and bookmarks on the source (via --prune-src-snapshots and 

156--prune-src-bookmarks), whereas another cron job on each destination host runs `{PROG_NAME}` periodically to prune 

157outdated destination snapshots (via --prune-dst-snapshots), and to replicate the recently created snapshots from the source 

158to the destination (via --replicate). 

159A separate cron job on each source host and each destination host runs `{PROG_NAME}` periodically to alert the user if the 

160latest or oldest snapshot is somehow too old (via --monitor-src-snapshots and --monitor-dst-snapshots). 

161The frequency of each activity can differ. Typical intervals range from N milliseconds to years. 

162 

163Edit the jobconfig script in a central place (e.g. versioned in a git repo), then copy the (very same) shared file onto all 

164source hosts and all destination hosts, and add crontab entries (or systemd timers or Monit entries or similar), along these 

165lines: 

166 

167* crontab on source hosts: 

168 

169`* * * * * testuser /bzfs/bzfs_testbed/bzfs_job_testbed.py --src-host="$(hostname)" --create-src-snapshots --prune-src-snapshots --prune-src-bookmarks` 

170 

171`* * * * * testuser /bzfs/bzfs_testbed/bzfs_job_testbed.py --src-host="$(hostname)" --monitor-src-snapshots` 

172 

173* crontab on destination hosts: 

174 

175`* * * * * testuser /bzfs/bzfs_testbed/bzfs_job_testbed.py --dst-host="$(hostname)" --replicate --prune-dst-snapshots` 

176 

177`* * * * * testuser /bzfs/bzfs_testbed/bzfs_job_testbed.py --dst-host="$(hostname)" --monitor-dst-snapshots` 

178 

179Some deployments choose to move monitoring to one centralized management host, like so: 

180 

181`* * * * * testuser /bzfs/bzfs_testbed/bzfs_job_testbed.py --monitor-src-snapshots --monitor-dst-snapshots` 

182 

183### Applying Actions to a Subset of Hosts 

184 

185`--src-host` and `--dst-host` let you run actions on only a subset of source and destination hosts. For example, you can 

186replicate from selected source hosts to selected destination hosts. 

187Each `{PROG_NAME}` invocation runs all enabled actions for the final effective values of `--src-hosts` and `--dst-hosts`, 

188after applying the `--src-host` and `--dst-host` filters. 

189 

190 

191### High Frequency Replication (Experimental Feature) 

192 

193Taking snapshots and/or replicating every N milliseconds to roughly every 10 seconds is considered high frequency. Consider 

194that `zfs list -t snapshot` performance degrades as snapshot counts grow within the selected datasets. Keep the active 

195snapshot count small, and prune at a cadence that matches your snapshot creation rate. Consider using `--skip-parent` and 

196`--exclude-dataset*` filters so only datasets that need this frequency are selected. 

197 

198To reduce startup overhead, forward `--daemon-lifetime` to `bzfs`, use the `--daemon-*` options, and split the 

199crontab entry (or, preferably, a high-frequency systemd timer) into multiple processes, from one source host to one 

200destination host, along these lines: 

201 

202* crontab on source hosts: 

203 

204`* * * * * testuser /bzfs/bzfs_testbed/bzfs_job_testbed.py --src-host="$(hostname)" --dst-host="foo" --create-src-snapshots` 

205 

206`* * * * * testuser /bzfs/bzfs_testbed/bzfs_job_testbed.py --src-host="$(hostname)" --dst-host="foo" --prune-src-snapshots` 

207 

208`* * * * * testuser /bzfs/bzfs_testbed/bzfs_job_testbed.py --src-host="$(hostname)" --dst-host="foo" --prune-src-bookmarks` 

209 

210`* * * * * testuser /bzfs/bzfs_testbed/bzfs_job_testbed.py --src-host="$(hostname)" --dst-host="foo" --monitor-src-snapshots` 

211 

212 

213* crontab on destination hosts: 

214 

215`* * * * * testuser /bzfs/bzfs_testbed/bzfs_job_testbed.py --src-host="bar" --dst-host="$(hostname)" --replicate` 

216 

217`* * * * * testuser /bzfs/bzfs_testbed/bzfs_job_testbed.py --src-host="bar" --dst-host="$(hostname)" --prune-dst-snapshots` 

218 

219`* * * * * testuser /bzfs/bzfs_testbed/bzfs_job_testbed.py --src-host="bar" --dst-host="$(hostname)" --monitor-dst-snapshots` 

220 

221The daemon processes work like non-daemon processes except that they loop, handle time events, and sleep between events. 

222They exit after the interval specified by `--daemon-lifetime` (for example, 86400 seconds). They are then restarted by 

223`cron`, or earlier if they fail. While an existing daemon is still running, `cron` may attempt to start another one. 

224This is harmless because that extra process exits immediately with a message like this: 

225"Exiting as same previous periodic job is still running without completion yet" 

226""") 

227 

228 # commands: 

229 parser.add_argument( 

230 "--create-src-snapshots", action="store_true", 

231 help="Take snapshots on the selected source hosts as necessary. Typically, this command should be called by a " 

232 "program (or cron job) running on each src host.\n\n") 

233 parser.add_argument( 

234 "--replicate", action="store_true", 

235 help="Replicate snapshots from the selected source hosts to the selected destinations hosts as necessary. For pull " 

236 "mode (recommended), this command should be called by a program (or cron job) running on each dst " 

237 "host; for push mode, on the src host; for pull-push mode on a third-party host, maybe with `--r2r=pull` " 

238 "or `--r2r=push`.\n\n") 

239 parser.add_argument( 

240 "--prune-src-snapshots", action="store_true", 

241 help="Prune snapshots on the selected source hosts as necessary. Typically, this command should be called by a " 

242 "program (or cron job) running on each src host.\n\n") 

243 parser.add_argument( 

244 "--prune-src-bookmarks", action="store_true", 

245 help="Prune bookmarks on the selected source hosts as necessary. Typically, this command should be called by a " 

246 "program (or cron job) running on each src host.\n\n") 

247 parser.add_argument( 

248 "--prune-dst-snapshots", action="store_true", 

249 help="Prune snapshots on the selected destination hosts as necessary. Typically, this command should be called by a " 

250 "program (or cron job) running on each dst host.\n\n") 

251 parser.add_argument( 

252 "--monitor-src-snapshots", action="store_true", 

253 help="Alert the user if snapshots on the selected source hosts are too old, using --monitor-snapshot-plan (see " 

254 "below). Typically, this command should be called by a program (or cron job) running on each src host.\n\n") 

255 parser.add_argument( 

256 "--monitor-dst-snapshots", action="store_true", 

257 help="Alert the user if snapshots on the selected destination hosts are too old, using --monitor-snapshot-plan (see " 

258 "below). Typically, this command should be called by a program (or cron job) running on each dst host.\n\n") 

259 

260 # options: 

261 parser.add_argument( 

262 "--localhost", default=None, action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING", 

263 help="Hostname of localhost. Default is the hostname without the domain name, querying the Operating System.\n\n") 

264 parser.add_argument( 

265 "--src-hosts", default=None, metavar="LIST_STRING", 

266 help="Hostnames of the sources to operate on. Specify a Python list literal such as " 

267 "`\"['src1', 'src2']\"`. If omitted, reads the same list literal from stdin.\n\n") 

268 parser.add_argument( 

269 "--src-host", default=None, action="append", metavar="STRING", 

270 help="For subsetting --src-hosts; Can be specified multiple times; Indicates to only use the --src-hosts that are " 

271 "contained in the specified --src-host values (optional).\n\n" 

272 "Example: `--src-host=src1 --src-host=src2 --src-hosts=\"['src1', 'src2', 'src3', 'src4']\"` indicates to " 

273 "effectively only use `--src-hosts=\"['src1', 'src2']\"`.\n\n") 

274 dst_hosts_example = {"nas": ["onsite"], "bak-us-west": ["us-west"], 

275 "bak-eu-west": ["eu-west"], "archive": ["offsite"]} 

276 parser.add_argument( 

277 "--dst-hosts", default="{}", metavar="DICT_STRING", 

278 help="Dictionary that maps each destination hostname to a list of zero or more logical replication target names " 

279 "(the infix portion of snapshot name). As hostname use the real output of the `hostname` CLI. " 

280 "The target is an arbitrary user-defined name that serves as an abstraction of the destination hostnames for " 

281 "a group of snapshots, like target 'onsite', 'offsite', 'hotspare', a geographically independent datacenter like " 

282 "'us-west', or similar. Rather than the snapshot name embedding (i.e. hardcoding) a list of destination " 

283 "hostnames where it should be sent to, the snapshot name embeds the user-defined target name, which is later " 

284 "mapped by this jobconfig to a list of destination hostnames.\n\n" 

285 f"Example: `{format_dict(dst_hosts_example)}`.\n\n" 

286 "With this, given a snapshot name, we can find the destination hostnames to which the snapshot shall be " 

287 "replicated. Also, given a snapshot name and its own name, a destination host can determine if it shall " 

288 "replicate the given snapshot from the source host, or if the snapshot is intended for another destination " 

289 "host, in which case it skips the snapshot. A destination host will receive replicas of snapshots for all " 

290 "targets that map to that destination host.\n\n" 

291 "Removing a mapping can be used to temporarily suspend replication to a given destination host.\n\n") 

292 parser.add_argument( 

293 "--dst-host", default=None, action="append", metavar="STRING", 

294 help="For subsetting --dst-hosts; Can be specified multiple times; Indicates to only use the --dst-hosts keys that " 

295 "are contained in the specified --dst-host values (optional).\n\n" 

296 "Example: " 

297 "`--dst-host=dst1 --dst-host=dst2 --dst-hosts=\"{'dst1': ..., 'dst2': ..., 'dst3': ..., 'dst4': ...}\"` " 

298 "indicates to effectively only use `--dst-hosts=\"{'dst1': ..., 'dst2': ...}\"`.\n\n") 

299 parser.add_argument( 

300 "--retain-dst-targets", default="{}", metavar="DICT_STRING", 

301 help="Dictionary that maps each destination hostname to a list of zero or more logical replication target names " 

302 "(the infix portion of snapshot name).\n\n" 

303 f"Example: `{format_dict(dst_hosts_example)}`. Has same format as --dst-hosts.\n\n" 

304 "As part of --prune-dst-snapshots, a destination host will delete any snapshot it has stored whose target has " 

305 "no mapping to that destination host in this dictionary. Do not remove a mapping here unless you are sure it's " 

306 "ok to delete all those snapshots on that destination host! If in doubt, use --dryrun mode first.\n\n") 

307 dst_root_datasets_example = { 

308 "nas": "tank2/bak", 

309 "bak-us-west": "backups/bak001", 

310 "bak-eu-west": "backups/bak999", 

311 "archive": f"archives/zoo/{SRC_MAGIC_SUBSTITUTION_TOKEN}", 

312 "hotspare": "", 

313 } 

314 parser.add_argument( 

315 "--dst-root-datasets", default="{}", metavar="DICT_STRING", 

316 help="Dictionary that maps each destination hostname to a root dataset located on that destination host. The root " 

317 "dataset name is an (optional) prefix that will be prepended to each dataset that is replicated to that " 

318 "destination host. For backup use cases, this is the backup ZFS pool or a ZFS dataset path within that pool, " 

319 "whereas for cloning, master slave replication, or replication from a primary to a secondary, this can also be " 

320 "the empty string.\n\n" 

321 f"`{SRC_MAGIC_SUBSTITUTION_TOKEN}` and `{DST_MAGIC_SUBSTITUTION_TOKEN}` are optional magic substitution tokens " 

322 "that will be auto-replaced at runtime with the actual hostname. This can be used to force the use of a " 

323 "separate destination root dataset per source host or per destination host.\n\n" 

324 f"Example: `{format_dict(dst_root_datasets_example)}`\n\n") 

325 src_snapshot_plan_example = { 

326 "prod": { 

327 "onsite": {"secondly": 40, "minutely": 40, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5}, 

328 "us-west": {"secondly": 0, "minutely": 0, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5}, 

329 "eu-west": {"secondly": 0, "minutely": 0, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5}, 

330 }, 

331 "test": { 

332 "offsite": {"12hourly": 42, "weekly": 12}, 

333 }, 

334 } 

335 parser.add_argument( 

336 "--src-snapshot-plan", default="{}", metavar="DICT_STRING", 

337 help="Retention periods for snapshots to be used if pruning src, and when creating new snapshots on src. " 

338 "Snapshots that do not match a retention period will be deleted. A zero or missing retention period indicates " 

339 "that no snapshots shall be retained (or even be created) for the given period.\n\n" 

340 f"Example: `{format_dict(src_snapshot_plan_example)}`. This example will, for the organization 'prod' and " 

341 "the intended logical target 'onsite', create and then retain secondly snapshots that were created less " 

342 "than 40 seconds ago, yet retain the latest 40 secondly snapshots regardless of creation time. Analog for " 

343 "the latest 40 minutely snapshots, 36 hourly snapshots, etc. " 

344 "It will also create and retain snapshots for the targets 'us-west' and 'eu-west' within the 'prod' " 

345 "organization. " 

346 "In addition, it will create and retain snapshots every 12 hours and every week for the 'test' organization, " 

347 "and name them as being intended for the 'offsite' replication target. " 

348 "The example creates snapshots with names like " 

349 "`prod_onsite_<timestamp>_secondly`, `prod_onsite_<timestamp>_minutely`, " 

350 "`prod_us-west_<timestamp>_hourly`, `prod_us-west_<timestamp>_daily`, " 

351 "`prod_eu-west_<timestamp>_hourly`, `prod_eu-west_<timestamp>_daily`, " 

352 "`test_offsite_<timestamp>_12hourly`, `test_offsite_<timestamp>_weekly`, and so on.\n\n") 

353 parser.add_argument( 

354 "--src-bookmark-plan", default="{}", metavar="DICT_STRING", 

355 help="Retention periods for bookmarks to be used if pruning src. Has same format as --src-snapshot-plan.\n\n") 

356 parser.add_argument( 

357 "--dst-snapshot-plan", default="{}", metavar="DICT_STRING", 

358 help="Retention periods for snapshots to be used if pruning dst. Has same format as --src-snapshot-plan.\n\n") 

359 monitor_snapshot_plan_example = { 

360 "prod": { 

361 "onsite": { 

362 "100millisecondly": {"warning": "650 milliseconds", "critical": "2 seconds"}, 

363 "secondly": {"warning": "2 seconds", "critical": "14 seconds"}, 

364 "minutely": {"warning": "30 seconds", "critical": "300 seconds"}, 

365 "hourly": {"warning": "30 minutes", "critical": "300 minutes"}, 

366 "daily": {"warning": "4 hours", "critical": "8 hours"}, 

367 "weekly": {"warning": "2 days", "critical": "8 days"}, 

368 "monthly": {"warning": "2 days", "critical": "8 days"}, 

369 "yearly": {"warning": "5 days", "critical": "14 days"}, 

370 "10minutely": {"warning": "0 minutes", "critical": "0 minutes"}, 

371 }, 

372 "": { 

373 "daily": {"warning": "4 hours", "critical": "8 hours"}, 

374 }, 

375 }, 

376 } 

377 parser.add_argument( 

378 "--monitor-snapshot-plan", default="{}", metavar="DICT_STRING", 

379 help="Alert the user if the ZFS 'creation' time property of the latest or oldest snapshot for any specified " 

380 "snapshot pattern within the selected datasets is too old wrt. the specified age limit. The purpose is to " 

381 "check if snapshots are successfully taken on schedule, successfully replicated on schedule, and successfully " 

382 "pruned on schedule. " 

383 "Process exit code is 0, 1, 2 on OK, WARNING, CRITICAL, respectively.\n\n" 

384 f"Example DICT_STRING: `{format_dict(monitor_snapshot_plan_example)}`. " 

385 "This example alerts the user if the *latest* src or dst snapshot named `prod_onsite_<timestamp>_hourly` is " 

386 "more than 30 minutes late (i.e. more than 30+60=90 minutes old) [warning] or more than 300 minutes late (i.e. " 

387 "more than 300+60=360 minutes old) [critical]. In addition, the example alerts the user if the *oldest* src or " 

388 "dst snapshot named `prod_onsite_<timestamp>_hourly` is more than 30 + 60x36 minutes old [warning] or more " 

389 "than 300 + 60x36 minutes old [critical], where 36 is the number of period cycles specified in " 

390 "`src_snapshot_plan` or `dst_snapshot_plan`, respectively. " 

391 "Analog for the latest snapshot named `prod_<timestamp>_daily`, and so on.\n\n" 

392 "Note: A duration that is missing or zero (e.g. '0 minutes') indicates that no snapshots shall be checked for " 

393 "the given snapshot name pattern.\n\n") 

394 locations = ["src", "dst"] 

395 for loc in locations: 

396 parser.add_argument( 

397 f"--ssh-{loc}-user", default="", metavar="STRING", 

398 help=f"Remote SSH username on {loc} hosts to connect to (optional). Examples: 'root', 'alice'.\n\n") 

399 for loc in locations: 

400 parser.add_argument( 

401 f"--ssh-{loc}-port", type=int, min=1, max=65535, action=check_range.CheckRange, metavar="INT", 

402 help=f"Remote SSH port on {loc} host to connect to (optional).\n\n") 

403 for loc in locations: 

404 parser.add_argument( 

405 f"--ssh-{loc}-config-file", type=str, action=bzfs_main.argparse_actions.SSHConfigFileNameAction, metavar="FILE", 

406 help=f"Path to SSH ssh_config(5) file to connect to {loc} (optional); will be passed into ssh -F CLI. " 

407 "The basename must contain the substring 'bzfs_ssh_config'.\n\n") 

408 parser.add_argument( 

409 "--job-id", required=True, action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING", 

410 help="The identifier that remains constant across all runs of this particular job; will be included in the log file " 

411 "name infix. Example: mytestjob\n\n") 

412 parser.add_argument( 

413 "--job-run", default="", action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING", 

414 help="The identifier of this particular run of the overall job; will be included in the log file name suffix. " 

415 "Default is a hex UUID. Example: 0badc0f003a011f0a94aef02ac16083c\n\n") 

416 workers_default = 100 # percent 

417 parser.add_argument( 

418 "--workers", min=1, default=(workers_default, True), action=bzfs_main.argparse_actions.CheckPercentRange, 

419 metavar="INT[%]", 

420 help="The maximum number of jobs to run in parallel at any time; can be given as a positive integer, " 

421 f"optionally followed by the %% percent character (min: %(min)s, default: {workers_default}%%). Percentages " 

422 "are relative to the number of CPU cores on the machine. Example: 200%% uses twice as many parallel jobs as " 

423 "there are cores on the machine; 75%% uses num_procs = num_cores * 0.75. Examples: 1, 4, 75%%, 150%%\n\n") 

424 parser.add_argument( 

425 "--work-period-seconds", type=float, min=0, default=0, action=check_range.CheckRange, metavar="FLOAT", 

426 help="Reduces bandwidth spikes by spreading out the start of worker jobs over this much time; " 

427 "0 disables this feature (default: %(default)s). Examples: 0, 60, 86400\n\n") 

428 parser.add_argument( 

429 "--jitter", action="store_true", 

430 help="Randomize job start time and host order to avoid potential thundering herd problems in large distributed " 

431 "systems (optional). Randomizing job start time is only relevant if --work-period-seconds > 0.\n\n") 

432 parser.add_argument( 

433 "--worker-timeout-seconds", type=float, min=0.001, default=None, action=check_range.CheckRange, metavar="FLOAT", 

434 help="If this much time has passed after a worker process has started executing, kill the straggling worker " 

435 "(optional). Other workers remain unaffected. Examples: 60, 3600\n\n") 

436 parser.add_argument( 

437 "--repeat-if-took-more-than-seconds", type=float, min=0.001, default=UNIX_TIME_INFINITY_SECS, 

438 action=check_range.CheckRange, metavar="FLOAT", 

439 help="Repeat the entire workflow if it took longer than this much time and was successful. Use this (with the POSIX " 

440 "`timeout` CLI) before migrating VM storage to converge replication and reduce cutover downtime. Default is " 

441 "infinity, i.e. never repeat the workflow. Examples: 1, 0.1\n\n") 

442 parser.add_argument( 

443 "--spawn-process-per-job", action="store_true", 

444 help="Spawn a Python process per subjob instead of a Python thread per subjob (optional). The former is only " 

445 "recommended for a job operating in parallel on a large number of hosts as it helps avoid exceeding " 

446 "per-process limits such as the default max number of open file descriptors, at the expense of increased " 

447 "startup latency.\n\n") 

448 parser.add_argument( 

449 "--jobrunner-dryrun", action="store_true", 

450 help="Do a dry run (aka 'no-op') to print what operations would happen if the command were to be executed " 

451 "for real (optional). This option treats both the ZFS source and destination as read-only. Can also be used to " 

452 "check if the configuration options are valid.\n\n") 

453 parser.add_argument( 

454 "--jobrunner-log-level", choices=["CRITICAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"], default="INFO", 

455 help="Only emit jobrunner messages with equal or higher priority than this log level. Default is '%(default)s'.\n\n") 

456 parser.add_argument( 

457 "--daemon-replication-frequency", default="minutely", metavar="STRING", 

458 help="Specifies how often the bzfs daemon shall replicate from src to dst if --daemon-lifetime is nonzero.\n\n") 

459 parser.add_argument( 

460 "--daemon-prune-src-frequency", default="minutely", metavar="STRING", 

461 help="Specifies how often the bzfs daemon shall prune src if --daemon-lifetime is nonzero.\n\n") 

462 parser.add_argument( 

463 "--daemon-prune-dst-frequency", default="minutely", metavar="STRING", 

464 help="Specifies how often the bzfs daemon shall prune dst if --daemon-lifetime is nonzero.\n\n") 

465 parser.add_argument( 

466 "--daemon-monitor-snapshots-frequency", default="minutely", metavar="STRING", 

467 help="Specifies how often the bzfs daemon shall monitor snapshot age if --daemon-lifetime is nonzero.\n\n") 

468 bad_opts = ["--daemon-frequency", "--include-snapshot-plan", "--create-src-snapshots-plan", "--skip-replication", 

469 "--include-snapshot-regex", "--exclude-snapshot-regex", "--include-snapshot-times-and-ranks", 

470 "--log-file-prefix", "--log-file-infix", "--log-file-suffix", 

471 "--delete-dst-datasets", "--delete-dst-snapshots", "--delete-dst-snapshots-except", 

472 "--delete-dst-snapshots-except-plan", "--delete-empty-dst-datasets", 

473 "--monitor-snapshots", "--timeout"] 

474 for loc in locations: 

475 bad_opts += [f"--ssh-{loc}-host"] # reject this arg as jobrunner will auto-generate it 

476 for bad_opt in bad_opts: 

477 parser.add_argument(bad_opt, action=RejectArgumentAction, nargs=0, help=argparse.SUPPRESS) 

478 parser.add_argument( 

479 "--version", action="version", version=f"{PROG_NAME}-{bzfs_main.argparse_cli.__version__}, by {PROG_AUTHOR}", 

480 help="Display version information and exit.\n\n") 

481 parser.add_argument( 

482 "--help, -h", action="help", # trick to ensure both --help and -h are shown in the help msg 

483 help="Show this help message and exit.\n\n") 

484 parser.add_argument( 

485 "--root-dataset-pairs", required=True, nargs="+", action=bzfs_main.argparse_actions.DatasetPairsAction, 

486 metavar="SRC_DATASET DST_DATASET", 

487 help="Source and destination dataset pairs (excluding usernames and excluding hostnames, which will all be " 

488 "auto-appended later).\n\n") 

489 return parser 

490 # fmt: on 

491 

492 

493############################################################################# 

494def main() -> None: 

495 """API for command line clients.""" 

496 prev_umask: int = os.umask(UMASK) 

497 try: 

498 set_logging_runtime_defaults() 

499 # On CTRL-C and SIGTERM, send signal to all descendant processes to terminate them 

500 termination_event: threading.Event = threading.Event() 

501 with termination_signal_handler(termination_events=[termination_event]): 

502 Job(log=None, termination_event=termination_event).run_main(sys_argv=sys.argv) 

503 finally: 

504 os.umask(prev_umask) # restore prior global state 

505 

506 

507############################################################################# 

508@final 

509class Job: 

510 """Coordinates subjobs per the CLI flags; Each subjob handles one host pair and may run in its own process or thread.""" 

511 

512 def __init__(self, log: Logger | None, termination_event: threading.Event) -> None: 

513 # immutable variables: 

514 self.log_was_None: Final[bool] = log is None 

515 self.log: Final[Logger] = get_simple_logger(PROG_NAME) if log is None else log 

516 self.termination_event: Final[threading.Event] = termination_event 

517 self.timing: Final[TaskTiming] = TaskTiming.make_from(self.termination_event) 

518 self.subprocesses: Final[Subprocesses] = Subprocesses(termination_event.is_set) 

519 self.jobrunner_dryrun: bool = False 

520 self.spawn_process_per_job: bool = False 

521 self.loopback_address: Final[str] = _detect_loopback_address() 

522 

523 # mutable variables: 

524 self.first_exception: int | None = None 

525 self.worst_exception: int | None = None 

526 self.stats: JobStats = JobStats(jobs_all=0) 

527 self.cache_existing_dst_pools: set[str] = set() 

528 self.cache_known_dst_pools: set[str] = set() 

529 

530 self.is_test_mode: bool = False # for testing only 

531 

532 def run_main(self, sys_argv: list[str]) -> None: 

533 """API for Python clients; visible for testing; may become a public API eventually.""" 

534 try: 

535 self._run_main(sys_argv) 

536 finally: 

537 if self.log_was_None: # reset Logger unless it's a Logger outside of our control 537 ↛ exitline 537 didn't return from function 'run_main' because the condition on line 537 was always true

538 reset_logger(self.log) 

539 

540 def _run_main(self, sys_argv: list[str]) -> None: 

541 self.first_exception = None 

542 self.worst_exception = None 

543 log: Logger = self.log 

544 log.info("CLI arguments: %s", " ".join(sys_argv)) 

545 nsp = argparse.Namespace(no_argument_file=True) # disable --root-dataset-pairs='+file' option in DatasetPairsAction 

546 args, unknown_args = argument_parser().parse_known_args(sys_argv[1:], nsp) # forward all unknown args to `bzfs` 

547 log.setLevel(args.jobrunner_log_level) 

548 self.jobrunner_dryrun = args.jobrunner_dryrun 

549 assert len(args.root_dataset_pairs) > 0 

550 src_snapshot_plan: dict = self.validate_snapshot_plan(literal_eval(args.src_snapshot_plan), "--src-snapshot-plan") 

551 src_bookmark_plan: dict = self.validate_snapshot_plan(literal_eval(args.src_bookmark_plan), "--src-bookmark-plan") 

552 dst_snapshot_plan: dict = self.validate_snapshot_plan(literal_eval(args.dst_snapshot_plan), "--dst-snapshot-plan") 

553 monitor_snapshot_plan: dict = self.validate_monitor_snapshot_plan(literal_eval(args.monitor_snapshot_plan)) 

554 localhostname: str = args.localhost if args.localhost else socket.gethostname() 

555 self.validate_host_name(localhostname, "--localhost") 

556 log.debug("localhostname: %s", localhostname) 

557 src_hosts: list[str] = self.validate_src_hosts(self.parse_src_hosts_from_cli_or_stdin(args.src_hosts)) 

558 basis_src_hosts: list[str] = src_hosts 

559 nb_src_hosts: int = len(basis_src_hosts) 

560 log.debug("src_hosts before subsetting: %s", src_hosts) 

561 if args.src_host is not None: # retain only the src hosts that are also contained in args.src_host 

562 assert isinstance(args.src_host, list) 

563 retain_src_hosts: set[str] = set(args.src_host) 

564 self.validate_is_subset(retain_src_hosts, src_hosts, "--src-host", "--src-hosts") 

565 src_hosts = [host for host in src_hosts if host in retain_src_hosts] 

566 dst_hosts: dict[str, list[str]] = self.validate_dst_hosts(literal_eval(args.dst_hosts)) 

567 nb_dst_hosts: int = len(dst_hosts) 

568 if args.dst_host is not None: # retain only the dst hosts that are also contained in args.dst_host 

569 assert isinstance(args.dst_host, list) 

570 retain_dst_hosts: set[str] = set(args.dst_host) 

571 self.validate_is_subset(retain_dst_hosts, dst_hosts.keys(), "--dst-host", "--dst-hosts.keys") 

572 dst_hosts = {dst_host: lst for dst_host, lst in dst_hosts.items() if dst_host in retain_dst_hosts} 

573 retain_dst_targets: dict[str, list[str]] = self.validate_dst_hosts(literal_eval(args.retain_dst_targets)) 

574 self.validate_is_subset(dst_hosts.keys(), retain_dst_targets.keys(), "--dst-hosts.keys", "--retain-dst-targets.keys") 

575 dst_root_datasets: dict[str, str] = self.validate_dst_root_datasets(literal_eval(args.dst_root_datasets)) 

576 self.validate_is_subset( 

577 dst_root_datasets.keys(), retain_dst_targets.keys(), "--dst-root-dataset.keys", "--retain-dst-targets.keys" 

578 ) 

579 self.validate_is_subset(dst_hosts.keys(), dst_root_datasets.keys(), "--dst-hosts.keys", "--dst-root-dataset.keys") 

580 bad_root_datasets: dict[str, str] = { 

581 dst_host: root_dataset 

582 for dst_host in sorted(dst_hosts.keys()) 

583 if SRC_MAGIC_SUBSTITUTION_TOKEN not in (root_dataset := dst_root_datasets[dst_host]) 

584 } 

585 if len(src_hosts) > 1 and len(bad_root_datasets) > 0: 

586 self.die( 

587 "Cowardly refusing to proceed as multiple source hosts must not be configured to write to the same " 

588 "destination dataset. " 

589 f"Problematic subset of --dst-root-datasets: {bad_root_datasets} for src_hosts: {sorted(src_hosts)}" 

590 ) 

591 bad_root_datasets = { 

592 dst_host: root_dataset 

593 for dst_host, root_dataset in sorted(dst_root_datasets.items()) 

594 if root_dataset and SRC_MAGIC_SUBSTITUTION_TOKEN not in root_dataset 

595 } 

596 if len(basis_src_hosts) > 1 and len(bad_root_datasets) > 0: 

597 self.die( 

598 "Cowardly refusing to proceed as multiple source hosts are defined in the configuration, but " 

599 f"not all non-empty root datasets in --dst-root-datasets contain the '{SRC_MAGIC_SUBSTITUTION_TOKEN}' " 

600 "substitution token to prevent collisions on writing destination datasets. " 

601 f"Problematic subset of --dst-root-datasets: {bad_root_datasets} for src_hosts: {sorted(basis_src_hosts)}" 

602 ) 

603 if args.jitter: # randomize host order to avoid potential thundering herd problems in large distributed systems 

604 random.SystemRandom().shuffle(src_hosts) 

605 dst_hosts = shuffle_dict(dst_hosts) 

606 ssh_src_user: str = args.ssh_src_user 

607 ssh_dst_user: str = args.ssh_dst_user 

608 ssh_src_port: int | None = args.ssh_src_port 

609 ssh_dst_port: int | None = args.ssh_dst_port 

610 ssh_src_config_file: str | None = args.ssh_src_config_file 

611 ssh_dst_config_file: str | None = args.ssh_dst_config_file 

612 job_id: str = _sanitize(args.job_id) 

613 job_run: str = _sanitize(args.job_run) if args.job_run else uuid.uuid1().hex 

614 workers, workers_is_percent = args.workers 

615 max_workers: int = max(1, round((os.cpu_count() or 1) * workers / 100.0) if workers_is_percent else round(workers)) 

616 worker_timeout_seconds: int = args.worker_timeout_seconds 

617 repeat_if_took_more_than_nanos: int = int(args.repeat_if_took_more_than_seconds * 1_000_000_000) 

618 self.spawn_process_per_job = args.spawn_process_per_job 

619 username: str = pwd.getpwuid(os.getuid()).pw_name 

620 assert username 

621 loopback_ids: set[str] = {"localhost", "127.0.0.1", "::1", socket.gethostname()} # ::1 is IPv6 loopback address 

622 loopback_ids.update(self.get_localhost_ips()) # union 

623 loopback_ids.add(localhostname) 

624 loopback_ids = set() if getenv_bool("disable_loopback", False) else loopback_ids 

625 log.log(LOG_TRACE, "loopback_ids: %s", sorted(loopback_ids)) 

626 

627 def zero_pad(number: int, width: int = 6) -> str: 

628 """Pads number with leading '0' chars to the given width.""" 

629 return f"{number:0{width}d}" 

630 

631 def jpad(jj: int, tag: str) -> str: 

632 """Returns ``tag`` prefixed with slash and zero padded index.""" 

633 return "/" + zero_pad(jj) + tag 

634 

635 def runpad() -> str: 

636 """Returns standardized subjob count suffix.""" 

637 return job_run + SEP + zero_pad(len(subjobs)) 

638 

639 def update_subjob_name(tag: str) -> str: 

640 """Derives next subjob name based on ``tag`` and index ``j``.""" 

641 if j <= 0: 

642 return subjob_name 

643 elif j == 1: 

644 return subjob_name + jpad(j - 1, tag) 

645 else: 

646 return subjob_name + "/" + BARRIER_CHAR 

647 

648 def resolve_dataset(hostname: str, dataset: str, is_src: bool = True) -> str: 

649 """Returns host:dataset string resolving IPv6 and localhost cases.""" 

650 assert hostname 

651 assert dataset 

652 ssh_user = ssh_src_user if is_src else ssh_dst_user 

653 ssh_user = ssh_user if ssh_user else username 

654 lb: str = self.loopback_address 

655 loopbck_ids: set[str] = loopback_ids 

656 hostname = hostname if hostname not in loopbck_ids else (lb if lb else hostname) if username != ssh_user else "-" 

657 hostname = convert_ipv6(hostname) 

658 return f"{hostname}:{dataset}" 

659 

660 def resolve_dst_dataset(dst_hostname: str, dst_dataset: str) -> str: 

661 """Expands ``dst_dataset`` relative to ``dst_hostname`` roots.""" 

662 assert dst_hostname 

663 assert dst_dataset 

664 root_dataset: str | None = dst_root_datasets.get(dst_hostname) 

665 assert root_dataset is not None, dst_hostname # f"Hostname '{dst_hostname}' missing in --dst-root-datasets" 

666 root_dataset = root_dataset.replace(SRC_MAGIC_SUBSTITUTION_TOKEN, src_host) 

667 root_dataset = root_dataset.replace(DST_MAGIC_SUBSTITUTION_TOKEN, dst_hostname) 

668 resolved_dst_dataset: str = f"{root_dataset}/{dst_dataset}" if root_dataset else dst_dataset 

669 validate_dataset_name(resolved_dst_dataset, dst_dataset) 

670 return resolve_dataset(dst_hostname, resolved_dst_dataset, is_src=False) 

671 

672 for src_host in src_hosts: 

673 assert src_host 

674 for dst_hostname in dst_hosts: 

675 assert dst_hostname 

676 dummy: Final[str] = DUMMY_DATASET 

677 lhn: Final[str] = localhostname 

678 bzfs_prog_header: Final[list[str]] = [BZFS_PROG_NAME, "--no-argument-file"] + unknown_args 

679 subjobs: dict[str, list[str]] = {} 

680 for i, src_host in enumerate(src_hosts): 

681 subjob_name: str = zero_pad(i) + "src-host" 

682 src_log_suffix: str = _log_suffix(localhostname, src_host, "") 

683 j: int = 0 

684 opts: list[str] 

685 

686 if args.create_src_snapshots: 

687 opts = ["--create-src-snapshots", f"--create-src-snapshots-plan={src_snapshot_plan}", "--skip-replication"] 

688 self.add_log_file_opts(opts, "create-src-snapshots", job_id, runpad(), src_log_suffix) 

689 self.add_ssh_opts( 

690 opts, ssh_src_user=ssh_src_user, ssh_src_port=ssh_src_port, ssh_src_config_file=ssh_src_config_file 

691 ) 

692 opts += [POSIX_END_OF_OPTIONS_MARKER] 

693 opts += _flatten(_dedupe([(resolve_dataset(src_host, src), dummy) for src, dst in args.root_dataset_pairs])) 

694 subjob_name += "/create-src-snapshots" 

695 subjobs[subjob_name] = bzfs_prog_header + opts 

696 

697 if args.replicate: 

698 j = 0 

699 marker: str = "replicate" 

700 for dst_hostname, targets in dst_hosts.items(): 

701 opts = self.replication_opts( 

702 dst_snapshot_plan, set(targets), lhn, src_host, dst_hostname, marker, job_id, runpad() 

703 ) 

704 if len(opts) > 0: 

705 opts += [f"--daemon-frequency={args.daemon_replication_frequency}"] 

706 self.add_ssh_opts( 

707 opts, 

708 ssh_src_user=ssh_src_user, 

709 ssh_dst_user=ssh_dst_user, 

710 ssh_src_port=ssh_src_port, 

711 ssh_dst_port=ssh_dst_port, 

712 ssh_src_config_file=ssh_src_config_file, 

713 ssh_dst_config_file=ssh_dst_config_file, 

714 ) 

715 opts += [POSIX_END_OF_OPTIONS_MARKER] 

716 dataset_pairs: list[tuple[str, str]] = [ 

717 (resolve_dataset(src_host, src), resolve_dst_dataset(dst_hostname, dst)) 

718 for src, dst in args.root_dataset_pairs 

719 ] 

720 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs, worker_timeout_seconds) 

721 if len(dataset_pairs) > 0: 

722 subjobs[subjob_name + jpad(j, marker)] = bzfs_prog_header + opts + _flatten(dataset_pairs) 

723 j += 1 

724 subjob_name = update_subjob_name(marker) 

725 

726 def prune_src( 

727 opts: list[str], retention_plan: dict, tag: str, src_host: str = src_host, logsuffix: str = src_log_suffix 

728 ) -> None: 

729 """Creates prune subjob options for ``tag`` using ``retention_plan``.""" 

730 opts += ["--skip-replication", f"--delete-dst-snapshots-except-plan={retention_plan}"] 

731 opts += [f"--daemon-frequency={args.daemon_prune_src_frequency}"] 

732 self.add_log_file_opts(opts, tag, job_id, runpad(), logsuffix) 

733 self.add_ssh_opts( # i.e. dst=src, src=dummy 

734 opts, ssh_dst_user=ssh_src_user, ssh_dst_port=ssh_src_port, ssh_dst_config_file=ssh_src_config_file 

735 ) 

736 opts += [POSIX_END_OF_OPTIONS_MARKER] 

737 opts += _flatten(_dedupe([(dummy, resolve_dataset(src_host, src)) for src, dst in args.root_dataset_pairs])) 

738 nonlocal subjob_name 

739 subjob_name += f"/{tag}" 

740 subjobs[subjob_name] = bzfs_prog_header + opts 

741 

742 if args.prune_src_snapshots: 

743 prune_src(["--delete-dst-snapshots"], src_snapshot_plan, tag="prune-src-snapshots") 

744 

745 if args.prune_src_bookmarks: 

746 prune_src(["--delete-dst-snapshots=bookmarks"], src_bookmark_plan, tag="prune-src-bookmarks") 

747 

748 if args.prune_dst_snapshots: 

749 self.validate_true( 

750 retain_dst_targets, "--retain-dst-targets must not be empty. Cowardly refusing to delete all snapshots!" 

751 ) 

752 j = 0 

753 marker = "prune-dst-snapshots" 

754 for dst_hostname, _ in dst_hosts.items(): 

755 curr_retain_targets: set[str] = set(retain_dst_targets[dst_hostname]) 

756 curr_dst_snapshot_plan = { # only retain targets that belong to the host 

757 org: {target: periods for target, periods in target_periods.items() if target in curr_retain_targets} 

758 for org, target_periods in dst_snapshot_plan.items() 

759 } 

760 opts = ["--delete-dst-snapshots", "--skip-replication"] 

761 opts += [f"--delete-dst-snapshots-except-plan={curr_dst_snapshot_plan}"] 

762 opts += [f"--daemon-frequency={args.daemon_prune_dst_frequency}"] 

763 self.add_log_file_opts(opts, marker, job_id, runpad(), _log_suffix(lhn, src_host, dst_hostname)) 

764 self.add_ssh_opts( 

765 opts, ssh_dst_user=ssh_dst_user, ssh_dst_port=ssh_dst_port, ssh_dst_config_file=ssh_dst_config_file 

766 ) 

767 opts += [POSIX_END_OF_OPTIONS_MARKER] 

768 dataset_pairs = [(dummy, resolve_dst_dataset(dst_hostname, dst)) for src, dst in args.root_dataset_pairs] 

769 dataset_pairs = _dedupe(dataset_pairs) 

770 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs, worker_timeout_seconds) 

771 if len(dataset_pairs) > 0: 

772 subjobs[subjob_name + jpad(j, marker)] = bzfs_prog_header + opts + _flatten(dataset_pairs) 

773 j += 1 

774 subjob_name = update_subjob_name(marker) 

775 

776 def monitor_snapshots_opts(tag: str, monitor_plan: dict, logsuffix: str) -> list[str]: 

777 """Returns monitor subjob options for ``tag`` and ``monitor_plan``.""" 

778 opts = [f"--monitor-snapshots={monitor_plan}", "--skip-replication"] 

779 opts += [f"--daemon-frequency={args.daemon_monitor_snapshots_frequency}"] 

780 self.add_log_file_opts(opts, tag, job_id, runpad(), logsuffix) 

781 return opts 

782 

783 def build_monitor_plan(monitor_plan: dict, snapshot_plan: dict, cycles_prefix: str) -> dict: 

784 """Expands ``monitor_plan`` with cycle defaults from ``snapshot_plan``.""" 

785 

786 def alert_dicts(alertdict: dict, cycles: int) -> dict: 

787 """Returns alert dictionaries with explicit ``cycles`` value.""" 

788 latest_dict = alertdict.copy() 

789 for prefix in ("src_snapshot_", "dst_snapshot_", ""): 

790 latest_dict.pop(f"{prefix}cycles", None) 

791 oldest_dict = latest_dict.copy() 

792 oldest_dict["cycles"] = int(alertdict.get(f"{cycles_prefix}cycles", cycles)) 

793 latest_dict.pop("oldest_skip_holds", None) 

794 return {"latest": latest_dict, "oldest": oldest_dict} 

795 

796 return { 

797 org: { 

798 target: { 

799 periodunit: alert_dicts(alertdict, snapshot_plan.get(org, {}).get(target, {}).get(periodunit, 1)) 

800 for periodunit, alertdict in periods.items() 

801 } 

802 for target, periods in target_periods.items() 

803 } 

804 for org, target_periods in monitor_plan.items() 

805 } 

806 

807 if args.monitor_src_snapshots: 

808 marker = "monitor-src-snapshots" 

809 monitor_plan = build_monitor_plan(monitor_snapshot_plan, src_snapshot_plan, "src_snapshot_") 

810 opts = monitor_snapshots_opts(marker, monitor_plan, src_log_suffix) 

811 self.add_ssh_opts( # i.e. dst=src, src=dummy 

812 opts, ssh_dst_user=ssh_src_user, ssh_dst_port=ssh_src_port, ssh_dst_config_file=ssh_src_config_file 

813 ) 

814 opts += [POSIX_END_OF_OPTIONS_MARKER] 

815 opts += _flatten(_dedupe([(dummy, resolve_dataset(src_host, src)) for src, dst in args.root_dataset_pairs])) 

816 subjob_name += "/" + marker 

817 subjobs[subjob_name] = bzfs_prog_header + opts 

818 

819 if args.monitor_dst_snapshots: 

820 j = 0 

821 marker = "monitor-dst-snapshots" 

822 for dst_hostname, targets in dst_hosts.items(): 

823 monitor_targets: set[str] = set(targets).intersection(set(retain_dst_targets[dst_hostname])) 

824 monitor_plan = { # only retain targets that belong to the host 

825 org: {target: periods for target, periods in target_periods.items() if target in monitor_targets} 

826 for org, target_periods in monitor_snapshot_plan.items() 

827 } 

828 monitor_plan = build_monitor_plan(monitor_plan, dst_snapshot_plan, "dst_snapshot_") 

829 opts = monitor_snapshots_opts(marker, monitor_plan, _log_suffix(lhn, src_host, dst_hostname)) 

830 self.add_ssh_opts( 

831 opts, ssh_dst_user=ssh_dst_user, ssh_dst_port=ssh_dst_port, ssh_dst_config_file=ssh_dst_config_file 

832 ) 

833 opts += [POSIX_END_OF_OPTIONS_MARKER] 

834 dataset_pairs = [(dummy, resolve_dst_dataset(dst_hostname, dst)) for src, dst in args.root_dataset_pairs] 

835 dataset_pairs = _dedupe(dataset_pairs) 

836 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs, worker_timeout_seconds) 

837 if len(dataset_pairs) > 0: 

838 subjobs[subjob_name + jpad(j, marker)] = bzfs_prog_header + opts + _flatten(dataset_pairs) 

839 j += 1 

840 subjob_name = update_subjob_name(marker) 

841 

842 msg = f"Ready to run {len(subjobs)} subjobs using {len(src_hosts)}/{nb_src_hosts} src hosts: {src_hosts}, " 

843 msg += f"{len(dst_hosts)}/{nb_dst_hosts} dst hosts: {list(dst_hosts.keys())}" 

844 log.info("%s", dry(msg, is_dry_run=self.jobrunner_dryrun)) 

845 log.log(LOG_TRACE, "subjobs: \n%s", _pretty_print_formatter(subjobs)) 

846 while True: 

847 start_time_nanos = time.monotonic_ns() 

848 self.first_exception = None 

849 self.worst_exception = None 

850 self.run_subjobs(subjobs, max_workers, worker_timeout_seconds, args.work_period_seconds, args.jitter) 

851 ex = self.worst_exception 

852 if isinstance(ex, int): 

853 assert ex != 0 

854 sys.exit(ex) 

855 assert ex is None, ex 

856 if time.monotonic_ns() - start_time_nanos <= repeat_if_took_more_than_nanos: 

857 break 

858 log.info("Succeeded. Bye!") 

859 

860 def replication_opts( 

861 self, 

862 dst_snapshot_plan: dict[str, dict[str, dict[str, int]]], 

863 targets: set[str], 

864 localhostname: str, 

865 src_hostname: str, 

866 dst_hostname: str, 

867 tag: str, 

868 job_id: str, 

869 job_run: str, 

870 ) -> list[str]: 

871 """Returns CLI options for one replication subjob.""" 

872 log = self.log 

873 log.debug("%s", f"Replicating targets {sorted(targets)} from {src_hostname} to {dst_hostname} ...") 

874 include_snapshot_plan = { # only replicate targets that belong to the destination host and are relevant 

875 org: { 

876 target: { 

877 duration_unit: duration_amount 

878 for duration_unit, duration_amount in periods.items() 

879 if duration_amount > 0 

880 } 

881 for target, periods in target_periods.items() 

882 if target in targets 

883 } 

884 for org, target_periods in dst_snapshot_plan.items() 

885 } 

886 include_snapshot_plan = { # only replicate orgs that have at least one relevant target_period 

887 org: target_periods 

888 for org, target_periods in include_snapshot_plan.items() 

889 if any(len(periods) > 0 for target, periods in target_periods.items()) 

890 } 

891 opts: list[str] = [] 

892 if len(include_snapshot_plan) > 0: 

893 opts += [f"--include-snapshot-plan={include_snapshot_plan}"] 

894 self.add_log_file_opts(opts, tag, job_id, job_run, _log_suffix(localhostname, src_hostname, dst_hostname)) 

895 return opts 

896 

897 def skip_nonexisting_local_dst_pools( 

898 self, root_dataset_pairs: list[tuple[str, str]], timeout_secs: float | None = None 

899 ) -> list[tuple[str, str]]: 

900 """Skip datasets that point to removable destination drives that are not currently (locally) attached, if any.""" 

901 

902 def zpool(dataset: str) -> str: 

903 """Returns pool name portion of ``dataset``.""" 

904 return dataset.split("/", 1)[0] 

905 

906 assert len(root_dataset_pairs) > 0 

907 unknown_dst_pools = {zpool(dst) for src, dst in root_dataset_pairs} 

908 unknown_dst_pools = unknown_dst_pools.difference(self.cache_known_dst_pools) 

909 

910 # Here we treat a zpool as existing if the zpool isn't local, aka if it isn't prefixed with "-:". A remote host 

911 # will raise an appropriate error if it turns out that the remote zpool doesn't actually exist. 

912 unknown_local_dst_pools = {pool for pool in unknown_dst_pools if pool.startswith("-:")} 

913 if len(unknown_local_dst_pools) > 0: # `zfs list` if local 

914 existing_pools = {pool[len("-:") :] for pool in unknown_local_dst_pools} 

915 cmd = "zfs list -t filesystem,volume -Hp -o name".split(" ") + sorted(existing_pools) 

916 sp = subprocess.run(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, text=True, timeout=timeout_secs) 

917 if sp.returncode not in (0, 1): # 1 means dataset not found 

918 self.die(f"Unexpected error {sp.returncode} on checking for existing local dst pools: {sp.stderr.strip()}") 

919 existing_pools = {"-:" + pool for pool in sp.stdout.splitlines() if pool} 

920 self.cache_existing_dst_pools.update(existing_pools) # union 

921 unknown_remote_dst_pools = unknown_dst_pools.difference(unknown_local_dst_pools) 

922 self.cache_existing_dst_pools.update(unknown_remote_dst_pools) # union 

923 self.cache_known_dst_pools.update(unknown_dst_pools) # union 

924 results: list[tuple[str, str]] = [] 

925 for src, dst in root_dataset_pairs: 

926 if zpool(dst) in self.cache_existing_dst_pools: 

927 results.append((src, dst)) 

928 else: 

929 self.log.warning("Skipping dst dataset for which local dst pool does not exist: %s", dst) 

930 return results 

931 

932 @staticmethod 

933 def add_ssh_opts( 

934 opts: list[str], 

935 *, 

936 ssh_src_user: str | None = None, 

937 ssh_dst_user: str | None = None, 

938 ssh_src_port: int | None = None, 

939 ssh_dst_port: int | None = None, 

940 ssh_src_config_file: str | None = None, 

941 ssh_dst_config_file: str | None = None, 

942 ) -> None: 

943 """Appends ssh related options to ``opts`` if specified.""" 

944 assert isinstance(opts, list) 

945 opts += [f"--ssh-src-user={ssh_src_user}"] if ssh_src_user else [] 

946 opts += [f"--ssh-dst-user={ssh_dst_user}"] if ssh_dst_user else [] 

947 opts += [f"--ssh-src-port={ssh_src_port}"] if ssh_src_port is not None else [] 

948 opts += [f"--ssh-dst-port={ssh_dst_port}"] if ssh_dst_port is not None else [] 

949 opts += [f"--ssh-src-config-file={ssh_src_config_file}"] if ssh_src_config_file else [] 

950 opts += [f"--ssh-dst-config-file={ssh_dst_config_file}"] if ssh_dst_config_file else [] 

951 

952 @staticmethod 

953 def add_log_file_opts(opts: list[str], tag: str, job_id: str, job_run: str, logsuffix: str) -> None: 

954 """Appends standard log-file CLI options to ``opts``.""" 

955 opts += [f"--log-file-prefix={PROG_NAME}{SEP}{tag}{SEP}"] 

956 opts += [f"--log-file-infix={SEP}{job_id}"] 

957 opts += [f"--log-file-suffix={SEP}{job_run}{logsuffix}{SEP}"] 

958 

959 def run_subjobs( 

960 self, 

961 subjobs: dict[str, list[str]], 

962 max_workers: int, 

963 timeout_secs: float | None, 

964 work_period_seconds: float, 

965 jitter: bool, 

966 ) -> None: 

967 """Executes subjobs sequentially or in parallel, respecting '~' barriers. 

968 

969 Design note on subjob failure, isolation and termination: 

970 - On subjob failure the subjob's subtree is skipped. 

971 - Subjob failures are converted to return codes (not exceptions) so the policy does not invoke a termination handler 

972 on such failures and sibling subjobs continue unaffected. This preserves per-subjob isolation, both within a single 

973 process (thread-per-subjob mode; default) as well as in process-per-subjob mode (``--spawn-process-per-job``). 

974 - The first failure is retained for diagnostics. 

975 - Process exit code precedence is: fatal/non-monitor failure > monitor CRITICAL/WARNING > STILL_RUNNING > success. 

976 """ 

977 self.stats = JobStats(len(subjobs)) 

978 log = self.log 

979 num_intervals = 1 + len(subjobs) if jitter else len(subjobs) 

980 interval_nanos = 0 if len(subjobs) == 0 else round(1_000_000_000 * max(0.0, work_period_seconds) / num_intervals) 

981 assert interval_nanos >= 0 

982 if jitter: # randomize job start time to avoid potential thundering herd problems in large distributed systems 

983 sleep_nanos = random.SystemRandom().randint(0, interval_nanos) 

984 log.info("Jitter: Delaying job start time by sleeping for %s ...", human_readable_duration(sleep_nanos)) 

985 self.timing.sleep(sleep_nanos) # allows early wakeup on async termination 

986 sorted_subjobs: list[str] = sorted(subjobs.keys()) 

987 spawn_process_per_job: bool = self.spawn_process_per_job 

988 log.log(LOG_TRACE, "%s: %s", "spawn_process_per_job", spawn_process_per_job) 

989 if process_datasets_in_parallel_and_fault_tolerant( 

990 log=log, 

991 datasets=sorted_subjobs, 

992 process_dataset=lambda subjob, tid, retry: self.run_subjob( 

993 subjobs[subjob], name=subjob, timeout_secs=timeout_secs, spawn_process_per_job=spawn_process_per_job 

994 ) 

995 == 0, 

996 skip_tree_on_error=lambda subjob: True, 

997 skip_on_error="dataset", 

998 max_workers=max_workers, 

999 interval_nanos=lambda last_update_nanos, dataset, submit_count: interval_nanos, 

1000 timing=self.timing, 

1001 termination_handler=self.subprocesses.terminate_process_subtrees, 

1002 task_name="Subjob", 

1003 dry_run=False, 

1004 is_test_mode=self.is_test_mode, 

1005 ): 

1006 self.first_exception = DIE_STATUS if self.first_exception is None else self.first_exception 

1007 self.worst_exception = self.get_worst_exception(self.worst_exception, DIE_STATUS) 

1008 stats = self.stats 

1009 jobs_skipped = stats.jobs_all - stats.jobs_started 

1010 msg = f"{stats}, skipped:" + percent(jobs_skipped, total=stats.jobs_all, print_total=True) 

1011 log.info("Final Progress: %s", msg) 

1012 assert stats.jobs_running == 0, msg 

1013 assert stats.jobs_completed == stats.jobs_started, msg 

1014 skipped_jobs_dict = {subjob: subjobs[subjob] for subjob in sorted_subjobs if subjob not in stats.started_job_names} 

1015 if len(skipped_jobs_dict) > 0: 

1016 log.debug("Skipped subjobs: \n%s", _pretty_print_formatter(skipped_jobs_dict)) 

1017 assert jobs_skipped == len(skipped_jobs_dict), msg 

1018 

1019 def run_subjob( 

1020 self, cmd: list[str], name: str, timeout_secs: float | None, spawn_process_per_job: bool 

1021 ) -> int | None: # thread-safe 

1022 """Executes one worker job and updates shared Stats.""" 

1023 start_time_nanos = time.monotonic_ns() 

1024 returncode = None 

1025 log = self.log 

1026 cmd_str = " ".join(cmd) 

1027 stats = self.stats 

1028 try: 

1029 msg: str = stats.submit_job(name) 

1030 log.log(LOG_TRACE, "Starting worker job: %s", cmd_str) 

1031 log.info("Progress: %s", msg) 

1032 start_time_nanos = time.monotonic_ns() 

1033 if spawn_process_per_job: 

1034 returncode = self.run_worker_job_spawn_process_per_job(cmd, timeout_secs) 

1035 else: 

1036 returncode = self.run_worker_job_in_current_thread(cmd, timeout_secs) 

1037 except BaseException as e: 

1038 log.error("Worker job failed with unexpected exception: %s for command: %s", e, cmd_str) 

1039 raise 

1040 else: 

1041 elapsed_human: str = human_readable_duration(time.monotonic_ns() - start_time_nanos) 

1042 if returncode != 0: 

1043 with stats.lock: 

1044 if self.first_exception is None: 

1045 self.first_exception = DIE_STATUS if returncode is None else returncode 

1046 self.worst_exception = self.get_worst_exception(self.worst_exception, returncode) 

1047 log.error("Worker job failed with exit code %s in %s: %s", returncode, elapsed_human, cmd_str) 

1048 else: 

1049 log.debug("Worker job succeeded in %s: %s", elapsed_human, cmd_str) 

1050 return returncode 

1051 finally: 

1052 msg = stats.complete_job(failed=returncode != 0, elapsed_nanos=time.monotonic_ns() - start_time_nanos) 

1053 log.info("Progress: %s", msg) 

1054 

1055 def run_worker_job_in_current_thread(self, cmd: list[str], timeout_secs: float | None) -> int | None: 

1056 """Runs ``bzfs`` in-process and return its exit code.""" 

1057 log = self.log 

1058 if timeout_secs is not None: 

1059 cmd = cmd[0:1] + [f"--timeout={round(1000 * timeout_secs)}milliseconds"] + cmd[1:] 

1060 try: 

1061 if not self.jobrunner_dryrun: 

1062 self._bzfs_run_main(cmd) 

1063 return 0 

1064 except subprocess.CalledProcessError as e: 

1065 return bzfs.normalize_called_process_error(e) 

1066 except SystemExit as e: 

1067 assert e.code is None or isinstance(e.code, int) 

1068 return e.code 

1069 except BaseException: 

1070 log.exception("Worker job failed with unexpected exception for command: %s", " ".join(cmd)) 

1071 return DIE_STATUS 

1072 

1073 def _bzfs_run_main(self, cmd: list[str]) -> None: 

1074 """Delegates execution to :mod:`bzfs` using parsed arguments.""" 

1075 bzfs_job = bzfs.Job(termination_event=self.termination_event) 

1076 bzfs_job.is_test_mode = self.is_test_mode 

1077 bzfs_job.run_main(bzfs.argument_parser().parse_args(cmd[1:]), cmd) 

1078 

1079 def run_worker_job_spawn_process_per_job(self, cmd: list[str], timeout_secs: float | None) -> int | None: 

1080 """Spawns a subprocess for the worker job and waits for completion.""" 

1081 log = self.log 

1082 if len(cmd) > 0 and cmd[0] == BZFS_PROG_NAME: 

1083 cmd = [sys.executable, "-m", "bzfs_main." + cmd[0]] + cmd[1:] 

1084 if self.jobrunner_dryrun: 

1085 return 0 

1086 

1087 with self.subprocesses.popen_and_track(cmd, stdin=subprocess.DEVNULL, text=True) as proc: 

1088 try: 

1089 if self.termination_event.is_set(): 

1090 timeout_secs = 1.0 if timeout_secs is None else timeout_secs 

1091 raise subprocess.TimeoutExpired(cmd, timeout_secs) # do not wait for normal completion 

1092 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to complete and exit normally 

1093 except subprocess.TimeoutExpired: 

1094 cmd_str = " ".join(cmd) 

1095 if self.termination_event.is_set(): 

1096 log.error("%s", f"Terminating worker job due to async termination request: {cmd_str}") 

1097 else: 

1098 log.error("%s", f"Terminating worker job as it failed to complete within {timeout_secs}s: {cmd_str}") 

1099 proc.terminate() # Sends SIGTERM signal to job subprocess 

1100 assert timeout_secs is not None 

1101 timeout_secs = min(1.0, timeout_secs) 

1102 try: 

1103 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to exit 

1104 except subprocess.TimeoutExpired: 

1105 log.error("%s", f"Killing worker job as it failed to terminate within {timeout_secs}s: {cmd_str}") 

1106 terminate_process_subtree(root_pids=[proc.pid]) # Send SIGTERM to process subtree 

1107 proc.kill() # Sends SIGKILL signal to job subprocess because SIGTERM wasn't enough 

1108 timeout_secs = min(0.025, timeout_secs) 

1109 with contextlib.suppress(subprocess.TimeoutExpired): 

1110 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to exit 

1111 return proc.returncode 

1112 

1113 @staticmethod 

1114 def get_worst_exception(existing_code: int | None, new_code: int | None) -> int: 

1115 """Process exit code precedence is: fatal/non-monitor failure > monitor CRITICAL/WARNING > STILL_RUNNING > success.""" 

1116 new_code = DIE_STATUS if new_code is None else new_code 

1117 if existing_code is None: 

1118 return new_code 

1119 assert existing_code is not None 

1120 assert new_code is not None 

1121 

1122 nonfatal: tuple[int, ...] = (0, bzfs.WARNING_STATUS, bzfs.CRITICAL_STATUS, bzfs.STILL_RUNNING_STATUS) 

1123 existing_is_fatal = existing_code not in nonfatal 

1124 new_is_fatal = new_code not in nonfatal 

1125 if existing_is_fatal and new_is_fatal: 

1126 return max(existing_code, new_code) 

1127 if existing_is_fatal or new_is_fatal: 

1128 return existing_code if existing_is_fatal else new_code 

1129 

1130 monitor: tuple[int, ...] = (bzfs.WARNING_STATUS, bzfs.CRITICAL_STATUS) 

1131 existing_is_monitor = existing_code in monitor 

1132 new_is_monitor = new_code in monitor 

1133 if existing_is_monitor and new_is_monitor: 

1134 return max(existing_code, new_code) 

1135 if existing_is_monitor or new_is_monitor: 

1136 return existing_code if existing_is_monitor else new_code 

1137 

1138 assert existing_code in (0, bzfs.STILL_RUNNING_STATUS), existing_code 

1139 assert new_code in (0, bzfs.STILL_RUNNING_STATUS), new_code 

1140 return max(existing_code, new_code) 

1141 

1142 def validate_src_hosts(self, src_hosts: list[str]) -> list[str]: 

1143 """Checks ``src_hosts`` contains valid hostnames.""" 

1144 context = "--src-hosts" 

1145 self.validate_type(src_hosts, list, context) 

1146 for src_hostname in src_hosts: 

1147 self.validate_host_name(src_hostname, context) 

1148 return src_hosts 

1149 

1150 def validate_dst_hosts(self, dst_hosts: dict[str, list[str]]) -> dict[str, list[str]]: 

1151 """Checks destination hosts dictionary.""" 

1152 context = "--dst-hosts" 

1153 self.validate_type(dst_hosts, dict, context) 

1154 for dst_hostname, targets in dst_hosts.items(): 

1155 self.validate_host_name(dst_hostname, context) 

1156 self.validate_type(targets, list, f"{context} targets") 

1157 for target in targets: 

1158 self.validate_type(target, str, f"{context} target") 

1159 return dst_hosts 

1160 

1161 def validate_dst_root_datasets(self, dst_root_datasets: dict[str, str]) -> dict[str, str]: 

1162 """Checks that each destination root dataset string is valid.""" 

1163 context = "--dst-root-datasets" 

1164 self.validate_type(dst_root_datasets, dict, context) 

1165 for dst_hostname, dst_root_dataset in dst_root_datasets.items(): 

1166 self.validate_host_name(dst_hostname, context) 

1167 self.validate_type(dst_root_dataset, str, f"{context} root dataset") 

1168 return dst_root_datasets 

1169 

1170 def validate_snapshot_plan( 

1171 self, snapshot_plan: dict[str, dict[str, dict[str, int]]], context: str 

1172 ) -> dict[str, dict[str, dict[str, int]]]: 

1173 """Checks snapshot plan structure and value types.""" 

1174 self.validate_type(snapshot_plan, dict, context) 

1175 for org, target_periods in snapshot_plan.items(): 

1176 self.validate_type(org, str, f"{context} org") 

1177 self.validate_type(target_periods, dict, f"{context} target_periods") 

1178 for target, periods in target_periods.items(): 

1179 self.validate_type(target, str, f"{context} org/target") 

1180 self.validate_type(periods, dict, f"{context} org/periods") 

1181 for period_unit, period_amount in periods.items(): 

1182 self.validate_non_empty_string(period_unit, f"{context} org/target/period_unit") 

1183 self.validate_non_negative_int(period_amount, f"{context} org/target/period_amount") 

1184 return snapshot_plan 

1185 

1186 def validate_monitor_snapshot_plan( 

1187 self, monitor_snapshot_plan: dict[str, dict[str, dict[str, dict[str, str | int]]]] 

1188 ) -> dict[str, dict[str, dict[str, dict[str, str | int]]]]: 

1189 """Checks snapshot monitoring plan configuration.""" 

1190 context = "--monitor-snapshot-plan" 

1191 self.validate_type(monitor_snapshot_plan, dict, context) 

1192 for org, target_periods in monitor_snapshot_plan.items(): 

1193 self.validate_type(org, str, f"{context} org") 

1194 self.validate_type(target_periods, dict, f"{context} target_periods") 

1195 for target, periods in target_periods.items(): 

1196 self.validate_type(target, str, f"{context} org/target") 

1197 self.validate_type(periods, dict, f"{context} org/periods") 

1198 for period_unit, alert_dict in periods.items(): 

1199 self.validate_non_empty_string(period_unit, f"{context} org/target/period_unit") 

1200 self.validate_type(alert_dict, dict, f"{context} org/target/alert_dict") 

1201 for key, value in alert_dict.items(): 

1202 self.validate_non_empty_string(key, f"{context} org/target/alert_dict/key") 

1203 self.validate_type(value, Union[str, int], f"{context} org/target/alert_dict/value") 

1204 return monitor_snapshot_plan 

1205 

1206 def validate_is_subset(self, x: Iterable[str], y: Iterable[str], x_name: str, y_name: str) -> None: 

1207 """Raises error if ``x`` contains an item not present in ``y``.""" 

1208 if isinstance(x, str) or not isinstance(x, Iterable): 

1209 self.die(f"{x_name} must be an Iterable") 

1210 if isinstance(y, str) or not isinstance(y, Iterable): 

1211 self.die(f"{y_name} must be an Iterable") 

1212 if not set(x).issubset(set(y)): 

1213 diff = sorted(set(x).difference(set(y))) 

1214 self.die(f"{x_name} must be a subset of {y_name}. diff: {diff}, {x_name}: {sorted(x)}, {y_name}: {sorted(y)}") 

1215 

1216 def validate_host_name(self, hostname: str, context: str) -> None: 

1217 """Checks host name string.""" 

1218 self.validate_non_empty_string(hostname, f"{context} hostname") 

1219 bzfs.validate_host_name(hostname, context) 

1220 

1221 def validate_non_empty_string(self, value: str, name: str) -> None: 

1222 """Checks that ``value`` is a non-empty string.""" 

1223 self.validate_type(value, str, name) 

1224 if not value: 

1225 self.die(f"{name} must not be empty!") 

1226 

1227 def validate_non_negative_int(self, value: int, name: str) -> None: 

1228 """Checks ``value`` is an int >= 0.""" 

1229 self.validate_type(value, int, name) 

1230 if value < 0: 

1231 self.die(f"{name} must be a non-negative integer: {value}") 

1232 

1233 def validate_true(self, expr: Any, msg: str) -> None: 

1234 """Raises error if ``expr`` evaluates to ``False``.""" 

1235 if not bool(expr): 

1236 self.die(msg) 

1237 

1238 def validate_type(self, value: Any, expected_type: Any, name: str) -> None: 

1239 """Checks ``value`` is instance of ``expected_type`` or union thereof.""" 

1240 if hasattr(expected_type, "__origin__") and expected_type.__origin__ is Union: # for compat with python < 3.10 

1241 union_types = expected_type.__args__ 

1242 for t in union_types: 

1243 if isinstance(value, t): 

1244 return 

1245 type_msg = " or ".join([t.__name__ for t in union_types]) 

1246 self.die(f"{name} must be of type {type_msg} but got {type(value).__name__}: {value}") 

1247 elif not isinstance(value, expected_type): 

1248 self.die(f"{name} must be of type {expected_type.__name__} but got {type(value).__name__}: {value}") 

1249 

1250 def parse_src_hosts_from_cli_or_stdin(self, raw_src_hosts: str | None) -> list: 

1251 """Resolve --src-hosts from CLI or stdin with robust TTY/empty handling.""" 

1252 if raw_src_hosts is None: 

1253 # If stdin is an interactive TTY, don't block waiting for input; fail clearly instead 

1254 try: 

1255 is_tty: bool = getattr(sys.stdin, "isatty", lambda: False)() 

1256 except Exception: 

1257 is_tty = False 

1258 if is_tty: 

1259 self.die("Missing --src-hosts and stdin is a TTY. Provide --src-hosts or pipe the list on stdin.") 

1260 stdin_text: str = sys.stdin.read() 

1261 if not stdin_text.strip(): # avoid literal_eval("") SyntaxError and provide a clear message 

1262 self.die("Missing --src-hosts and stdin is empty. Provide --src-hosts or pipe a list on stdin.") 

1263 raw_src_hosts = stdin_text 

1264 try: 

1265 value = literal_eval(raw_src_hosts) 

1266 except Exception as e: 

1267 self.die(f"Invalid --src-hosts format: {e} for input: {raw_src_hosts}") 

1268 if not isinstance(value, list): 

1269 example: str = format_obj(["hostname1", "hostname2"]) 

1270 self.die(f"Invalid --src-hosts: expected a Python list literal, e.g. {example} but got: {format_obj(value)}") 

1271 return value 

1272 

1273 def die(self, msg: str) -> NoReturn: 

1274 """Log ``msg`` and exit the program.""" 

1275 self.log.error("%s", msg) 

1276 utils.die(msg) 

1277 

1278 def get_localhost_ips(self) -> set[str]: 

1279 """Returns all network addresses of the local host, i.e. all configured addresses on all network interfaces, without 

1280 depending on name resolution.""" 

1281 ips: set[str] = set() 

1282 if platform.system() == "Linux": 

1283 try: 

1284 proc = subprocess.run(["hostname", "-I"], stdin=DEVNULL, stdout=PIPE, text=True, check=True) # noqa: S607 

1285 except Exception as e: 

1286 self.log.warning("Cannot run 'hostname -I' on localhost: %s", e) 

1287 else: 

1288 ips = {ip for ip in proc.stdout.strip().split() if ip} 

1289 self.log.log(LOG_TRACE, "localhost_ips: %s", sorted(ips)) 

1290 return ips 

1291 

1292 

1293############################################################################# 

1294@final 

1295class RejectArgumentAction(argparse.Action): 

1296 """An argparse Action that immediately fails if it is ever triggered.""" 

1297 

1298 def __call__( 

1299 self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Any, option_string: str | None = None 

1300 ) -> None: 

1301 """Abort argument parsing if a protected option is seen.""" 

1302 parser.error(f"Security: Overriding protected argument '{option_string}' is not allowed.") 

1303 

1304 

1305############################################################################# 

1306def _dedupe(root_dataset_pairs: list[tuple[str, str]]) -> list[tuple[str, str]]: 

1307 """Returns a list with duplicate dataset pairs removed while preserving order.""" 

1308 return list(dict.fromkeys(root_dataset_pairs)) 

1309 

1310 

1311_T = TypeVar("_T") 

1312 

1313 

1314def _flatten(root_dataset_pairs: Iterable[Iterable[_T]]) -> list[_T]: 

1315 """Flattens an iterable of pairs into a single list.""" 

1316 return [item for pair in root_dataset_pairs for item in pair] 

1317 

1318 

1319def _sanitize(filename: str) -> str: 

1320 """Replaces potentially problematic characters in ``filename`` with '!'.""" 

1321 for s in (" ", "..", "/", "\\", SEP): 

1322 filename = filename.replace(s, "!") 

1323 return filename 

1324 

1325 

1326def _log_suffix(localhostname: str, src_hostname: str, dst_hostname: str) -> str: 

1327 """Returns a log file suffix in a format that contains the given hostnames.""" 

1328 return f"{SEP}{_sanitize(localhostname)}{SEP}{_sanitize(src_hostname)}{SEP}{_sanitize(dst_hostname)}" 

1329 

1330 

1331def _pretty_print_formatter(dictionary: dict[str, Any]) -> Any: 

1332 """Lazy JSON formatter used to avoid overhead in disabled log levels.""" 

1333 

1334 @final 

1335 class PrettyPrintFormatter: 

1336 """Wrapper returning formatted JSON on ``str`` conversion.""" 

1337 

1338 def __str__(self) -> str: 

1339 import json 

1340 

1341 return json.dumps(dictionary, indent=4, sort_keys=True) 

1342 

1343 return PrettyPrintFormatter() 

1344 

1345 

1346def _detect_loopback_address() -> str: 

1347 """Detects if a loopback connection over IPv4 or IPv6 is possible.""" 

1348 try: 

1349 addr = "127.0.0.1" 

1350 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 

1351 s.bind((addr, 0)) 

1352 return addr 

1353 except OSError: 

1354 pass 

1355 

1356 try: 

1357 addr = "::1" 

1358 with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: 

1359 s.bind((addr, 0)) 

1360 return addr 

1361 except OSError: 

1362 pass 

1363 

1364 return "" 

1365 

1366 

1367def convert_ipv6(hostname: str) -> str: 

1368 """Supports IPv6 without getting confused by host:dataset colon separator and any colons that may be part of a (valid) 

1369 ZFS dataset name.""" 

1370 return hostname.replace(":", "|") # Also see bzfs.convert_ipv6() for the reverse conversion 

1371 

1372 

1373############################################################################# 

1374if __name__ == "__main__": 1374 ↛ 1375line 1374 didn't jump to line 1375 because the condition on line 1374 was never true

1375 main()