Coverage for bzfs_main/bzfs_jobrunner.py: 99%

630 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-06 13:30 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15# Inline script metadata conforming to https://packaging.python.org/specifications/inline-script-metadata 

16# /// script 

17# requires-python = ">=3.8" 

18# dependencies = [] 

19# /// 

20# 

21""" 

22* High-level orchestrator that calls `bzfs` as part of complex, periodic workflows to manage backup, replication, and pruning 

23 jobs across a fleet of multiple source and destination hosts; driven by a job config file (e.g., `bzfs_job_example.py`). 

24* Overview of the bzfs_jobrunner.py codebase: 

25* The codebase starts with docs, definition of input data and associated argument parsing of CLI options/parameters. 

26* Control flow starts in main(), far below ..., which kicks off a "Job". 

27* A Job creates zero or more "subjobs" for each local or remote host, via run_main(). 

28* It executes the subjobs, serially or in parallel, via run_subjobs(), which in turn delegates parallel job coordination to 

29 bzfs.process_datasets_in_parallel_and_fault_tolerant(). 

30* README_bzfs_jobrunner.md is mostly auto-generated from the ArgumentParser help texts as the source of "truth", via 

31update_readme.sh. Simply run that script whenever you change or add ArgumentParser help text. 

32""" 

33 

34from __future__ import annotations 

35import argparse 

36import contextlib 

37import os 

38import pwd 

39import random 

40import socket 

41import subprocess 

42import sys 

43import threading 

44import time 

45import uuid 

46from ast import literal_eval 

47from logging import Logger 

48from subprocess import DEVNULL, PIPE 

49from typing import Any, Iterable, TypeVar, Union 

50 

51import bzfs_main.argparse_actions 

52import bzfs_main.check_range 

53import bzfs_main.utils 

54from bzfs_main import bzfs 

55from bzfs_main.argparse_cli import PROG_AUTHOR, SKIP_ON_ERROR_DEFAULT 

56from bzfs_main.detect import DUMMY_DATASET 

57from bzfs_main.loggers import get_simple_logger 

58from bzfs_main.parallel_engine import ( 

59 BARRIER_CHAR, 

60 process_datasets_in_parallel_and_fault_tolerant, 

61) 

62from bzfs_main.utils import ( 

63 DIE_STATUS, 

64 LOG_TRACE, 

65 format_dict, 

66 human_readable_duration, 

67 percent, 

68 shuffle_dict, 

69) 

70from bzfs_main.utils import PROG_NAME as BZFS_PROG_NAME 

71 

72# constants: 

73PROG_NAME: str = "bzfs_jobrunner" 

74SRC_MAGIC_SUBSTITUTION_TOKEN: str = "^SRC_HOST" # noqa: S105 

75DST_MAGIC_SUBSTITUTION_TOKEN: str = "^DST_HOST" # noqa: S105 

76SEP: str = "," 

77 

78 

79def argument_parser() -> argparse.ArgumentParser: 

80 """Returns the CLI parser used by bzfs_jobrunner.""" 

81 # fmt: off 

82 parser = argparse.ArgumentParser( 

83 prog=PROG_NAME, 

84 allow_abbrev=False, 

85 formatter_class=argparse.RawTextHelpFormatter, 

86 description=f""" 

87This program is a convenience wrapper around [bzfs](README.md) that simplifies periodic ZFS snapshot creation, replication, 

88pruning, and monitoring, across N source hosts and M destination hosts, using a single shared 

89[jobconfig](bzfs_tests/bzfs_job_example.py) script. 

90For example, this simplifies the deployment of an efficient geo-replicated backup service where each of the M destination 

91hosts is located in a separate geographic region and receives replicas from (the same set of) N source hosts. It also 

92simplifies low latency replication from a primary to a secondary or to M read replicas, or backup to removable drives, etc. 

93 

94This program can be used to efficiently replicate ... 

95 

96a) within a single machine (local mode), or 

97 

98b) from a single source host to one or more destination hosts (pull or push or pull-push mode), or 

99 

100c) from multiple source hosts to a single destination host (pull or push or pull-push mode), or 

101 

102d) from N source hosts to M destination hosts (pull or push or pull-push mode, N and M can be large, M=2 or M=3 are typical 

103geo-replication factors) 

104 

105You can run this program on a single third-party host and have that talk to all source hosts and destination hosts, which is 

106convenient for basic use cases and for testing. 

107However, typically, a cron job on each source host runs `{PROG_NAME}` periodically to create new snapshots (via 

108--create-src-snapshots) and prune outdated snapshots and bookmarks on the source (via --prune-src-snapshots and 

109--prune-src-bookmarks), whereas another cron job on each destination host runs `{PROG_NAME}` periodically to prune 

110outdated destination snapshots (via --prune-dst-snapshots), and to replicate the recently created snapshots from the source 

111to the destination (via --replicate). 

112Yet another cron job on each source and each destination runs `{PROG_NAME}` periodically to alert the user if the latest or 

113oldest snapshot is somehow too old (via --monitor-src-snapshots and --monitor-dst-snapshots). 

114The frequency of these periodic activities can vary by activity, and is typically every second, minute, hour, day, week, 

115month and/or year (or multiples thereof). 

116 

117Edit the jobconfig script in a central place (e.g. versioned in a git repo), then copy the (very same) shared file onto all 

118source hosts and all destination hosts, and add crontab entries (or systemd timers or Monit entries or similar), along these 

119lines: 

120 

121* crontab on source hosts: 

122 

123`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --create-src-snapshots --prune-src-snapshots --prune-src-bookmarks` 

124 

125`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --monitor-src-snapshots` 

126 

127 

128* crontab on destination hosts: 

129 

130`* * * * * testuser /etc/bzfs/bzfs_job_example.py --dst-host="$(hostname)" --replicate --prune-dst-snapshots` 

131 

132`* * * * * testuser /etc/bzfs/bzfs_job_example.py --dst-host="$(hostname)" --monitor-dst-snapshots` 

133 

134### High Frequency Replication (Experimental Feature) 

135 

136Taking snapshots, and/or replicating, from every N milliseconds to every 10 seconds or so is considered high frequency. For 

137such use cases, consider that `zfs list -t snapshot` performance degrades as more and more snapshots currently exist within 

138the selected datasets, so try to keep the number of currently existing snapshots small, and prune them at a frequency that 

139is proportional to the frequency with which snapshots are created. Consider using `--skip-parent` and `--exclude-dataset*` 

140filters to limit the selected datasets only to those that require this level of frequency. 

141 

142In addition, use the `--daemon-*` options to reduce startup overhead, in combination with splitting the crontab entry (or 

143better: high frequency systemd timer) into multiple processes, from a single source host to a single destination host, 

144along these lines: 

145 

146* crontab on source hosts: 

147 

148`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --create-src-snapshots` 

149 

150`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --prune-src-snapshots` 

151 

152`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --prune-src-bookmarks` 

153 

154`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --monitor-src-snapshots` 

155 

156 

157* crontab on destination hosts: 

158 

159`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="bar" --dst-host="$(hostname)" --replicate` 

160 

161`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="bar" --dst-host="$(hostname)" --prune-dst-snapshots` 

162 

163`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="bar" --dst-host="$(hostname)" --monitor-dst-snapshots` 

164 

165The daemon processes work like non-daemon processes except that they loop, handle time events and sleep between events, and 

166finally exit after, say, 86400 seconds (whatever you specify via `--daemon-lifetime`). The daemons will subsequently be 

167auto-restarted by 'cron', or earlier if they fail. While the daemons are running, 'cron' will attempt to start new 

168(unnecessary) daemons but this is benign as these new processes immediately exit with a message like this: 

169"Exiting as same previous periodic job is still running without completion yet" 

170""") 

171 

172 # commands: 

173 parser.add_argument( 

174 "--create-src-snapshots", action="store_true", 

175 help="Take snapshots on the selected source hosts as necessary. Typically, this command should be called by a " 

176 "program (or cron job) running on each src host.\n\n") 

177 parser.add_argument( # `choices` are deprecated; use --replicate without argument instead 

178 "--replicate", choices=["pull", "push"], default=None, const="pull", nargs="?", metavar="", 

179 help="Replicate snapshots from the selected source hosts to the selected destinations hosts as necessary. For pull " 

180 "mode (recommended), this command should be called by a program (or cron job) running on each dst " 

181 "host; for push mode, on the src host; for pull-push mode on a third-party host.\n\n") 

182 parser.add_argument( 

183 "--prune-src-snapshots", action="store_true", 

184 help="Prune snapshots on the selected source hosts as necessary. Typically, this command should be called by a " 

185 "program (or cron job) running on each src host.\n\n") 

186 parser.add_argument( 

187 "--prune-src-bookmarks", action="store_true", 

188 help="Prune bookmarks on the selected source hosts as necessary. Typically, this command should be called by a " 

189 "program (or cron job) running on each src host.\n\n") 

190 parser.add_argument( 

191 "--prune-dst-snapshots", action="store_true", 

192 help="Prune snapshots on the selected destination hosts as necessary. Typically, this command should be called by a " 

193 "program (or cron job) running on each dst host.\n\n") 

194 parser.add_argument( 

195 "--monitor-src-snapshots", action="store_true", 

196 help="Alert the user if snapshots on the selected source hosts are too old, using --monitor-snapshot-plan (see " 

197 "below). Typically, this command should be called by a program (or cron job) running on each src host.\n\n") 

198 parser.add_argument( 

199 "--monitor-dst-snapshots", action="store_true", 

200 help="Alert the user if snapshots on the selected destination hosts are too old, using --monitor-snapshot-plan (see " 

201 "below). Typically, this command should be called by a program (or cron job) running on each dst host.\n\n") 

202 

203 # options: 

204 parser.add_argument( 

205 "--localhost", default=None, action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING", 

206 help="Hostname of localhost. Default is the hostname without the domain name, querying the Operating System.\n\n") 

207 parser.add_argument( 

208 "--src-hosts", default=None, metavar="LIST_STRING", 

209 help="Hostnames of the sources to operate on.\n\n") 

210 parser.add_argument( 

211 "--src-host", default=None, action="append", metavar="STRING", 

212 help="For subsetting --src-hosts; Can be specified multiple times; Indicates to only use the --src-hosts that are " 

213 "contained in the specified --src-host values (optional).\n\n") 

214 dst_hosts_example = {"nas": ["onsite"], "bak-us-west-1": ["us-west-1"], 

215 "bak-eu-west-1": ["eu-west-1"], "archive": ["offsite"]} 

216 parser.add_argument( 

217 "--dst-hosts", default="{}", metavar="DICT_STRING", 

218 help="Dictionary that maps each destination hostname to a list of zero or more logical replication target names " 

219 "(the infix portion of snapshot name). " 

220 f"Example: `{format_dict(dst_hosts_example)}`.\n\n" 

221 "With this, given a snapshot name, we can find the destination hostnames to which the snapshot shall be " 

222 "replicated. Also, given a snapshot name and its own name, a destination host can determine if it shall " 

223 "replicate the given snapshot from the source host, or if the snapshot is intended for another destination " 

224 "host, in which case it skips the snapshot. A destination host will receive replicas of snapshots for all " 

225 "targets that map to that destination host.\n\n" 

226 "Removing a mapping can be used to temporarily suspend replication to a given destination host.\n\n") 

227 parser.add_argument( 

228 "--dst-host", default=None, action="append", metavar="STRING", 

229 help="For subsetting --dst-hosts; Can be specified multiple times; Indicates to only use the --dst-hosts keys that " 

230 "are contained in the specified --dst-host values (optional).\n\n") 

231 parser.add_argument( 

232 "--retain-dst-targets", default="{}", metavar="DICT_STRING", 

233 help="Dictionary that maps each destination hostname to a list of zero or more logical replication target names " 

234 "(the infix portion of snapshot name). " 

235 f"Example: `{format_dict(dst_hosts_example)}`. Has same format as --dst-hosts.\n\n" 

236 "As part of --prune-dst-snapshots, a destination host will delete any snapshot it has stored whose target has " 

237 "no mapping to that destination host in this dictionary. Do not remove a mapping here unless you are sure it's " 

238 "ok to delete all those snapshots on that destination host! If in doubt, use --dryrun mode first.\n\n") 

239 dst_root_datasets_example = { 

240 "nas": "tank2/bak", 

241 "bak-us-west-1": "backups/bak001", 

242 "bak-eu-west-1": "backups/bak999", 

243 "archive": f"archives/zoo/{SRC_MAGIC_SUBSTITUTION_TOKEN}", 

244 "hotspare": "", 

245 } 

246 parser.add_argument( 

247 "--dst-root-datasets", default="{}", metavar="DICT_STRING", 

248 help="Dictionary that maps each destination hostname to a root dataset located on that destination host. The root " 

249 "dataset name is an (optional) prefix that will be prepended to each dataset that is replicated to that " 

250 "destination host. For backup use cases, this is the backup ZFS pool or a ZFS dataset path within that pool, " 

251 "whereas for cloning, master slave replication, or replication from a primary to a secondary, this can also be " 

252 "the empty string.\n\n" 

253 f"`{SRC_MAGIC_SUBSTITUTION_TOKEN}` and `{DST_MAGIC_SUBSTITUTION_TOKEN}` are optional magic substitution tokens " 

254 "that will be auto-replaced at runtime with the actual hostname. This can be used to force the use of a " 

255 "separate destination root dataset per source host or per destination host.\n\n" 

256 f"Example: `{format_dict(dst_root_datasets_example)}`\n\n") 

257 src_snapshot_plan_example = { 

258 "prod": { 

259 "onsite": {"secondly": 40, "minutely": 40, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5}, 

260 "us-west-1": {"secondly": 0, "minutely": 0, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5}, 

261 "eu-west-1": {"secondly": 0, "minutely": 0, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5}, 

262 }, 

263 "test": { 

264 "offsite": {"12hourly": 42, "weekly": 12}, 

265 }, 

266 } 

267 parser.add_argument( 

268 "--src-snapshot-plan", default="{}", metavar="DICT_STRING", 

269 help="Retention periods for snapshots to be used if pruning src, and when creating new snapshots on src. " 

270 "Snapshots that do not match a retention period will be deleted. A zero or missing retention period indicates " 

271 "that no snapshots shall be retained (or even be created) for the given period.\n\n" 

272 f"Example: `{format_dict(src_snapshot_plan_example)}`. This example will, for the organization 'prod' and " 

273 "the intended logical target 'onsite', create and then retain secondly snapshots that were created less " 

274 "than 40 seconds ago, yet retain the latest 40 secondly snapshots regardless of creation time. Analog for " 

275 "the latest 40 minutely snapshots, 36 hourly snapshots, etc. " 

276 "It will also create and retain snapshots for the targets 'us-west-1' and 'eu-west-1' within the 'prod' " 

277 "organization. " 

278 "In addition, it will create and retain snapshots every 12 hours and every week for the 'test' organization, " 

279 "and name them as being intended for the 'offsite' replication target. " 

280 "The example creates snapshots with names like " 

281 "`prod_onsite_<timestamp>_secondly`, `prod_onsite_<timestamp>_minutely`, " 

282 "`prod_us-west-1_<timestamp>_hourly`, `prod_us-west-1_<timestamp>_daily`, " 

283 "`prod_eu-west-1_<timestamp>_hourly`, `prod_eu-west-1_<timestamp>_daily`, " 

284 "`test_offsite_<timestamp>_12hourly`, `test_offsite_<timestamp>_weekly`, and so on.\n\n") 

285 parser.add_argument( 

286 "--src-bookmark-plan", default="{}", metavar="DICT_STRING", 

287 help="Retention periods for bookmarks to be used if pruning src. Has same format as --src-snapshot-plan.\n\n") 

288 parser.add_argument( 

289 "--dst-snapshot-plan", default="{}", metavar="DICT_STRING", 

290 help="Retention periods for snapshots to be used if pruning dst. Has same format as --src-snapshot-plan.\n\n") 

291 monitor_snapshot_plan_example = { 

292 "prod": { 

293 "onsite": { 

294 "100millisecondly": {"warning": "650 milliseconds", "critical": "2 seconds"}, 

295 "secondly": {"warning": "2 seconds", "critical": "14 seconds"}, 

296 "minutely": {"warning": "30 seconds", "critical": "300 seconds"}, 

297 "hourly": {"warning": "30 minutes", "critical": "300 minutes"}, 

298 "daily": {"warning": "4 hours", "critical": "8 hours"}, 

299 "weekly": {"warning": "2 days", "critical": "8 days"}, 

300 "monthly": {"warning": "2 days", "critical": "8 days"}, 

301 "yearly": {"warning": "5 days", "critical": "14 days"}, 

302 "10minutely": {"warning": "0 minutes", "critical": "0 minutes"}, 

303 }, 

304 "": { 

305 "daily": {"warning": "4 hours", "critical": "8 hours"}, 

306 }, 

307 }, 

308 } 

309 parser.add_argument( 

310 "--monitor-snapshot-plan", default="{}", metavar="DICT_STRING", 

311 help="Alert the user if the ZFS 'creation' time property of the latest or oldest snapshot for any specified " 

312 "snapshot pattern within the selected datasets is too old wrt. the specified age limit. The purpose is to " 

313 "check if snapshots are successfully taken on schedule, successfully replicated on schedule, and successfully " 

314 "pruned on schedule. " 

315 "Process exit code is 0, 1, 2 on OK, WARNING, CRITICAL, respectively. " 

316 f"Example DICT_STRING: `{format_dict(monitor_snapshot_plan_example)}`. " 

317 "This example alerts the user if the latest src or dst snapshot named `prod_onsite_<timestamp>_hourly` is more " 

318 "than 30 minutes late (i.e. more than 30+60=90 minutes old) [warning] or more than 300 minutes late (i.e. more " 

319 "than 300+60=360 minutes old) [critical]. In addition, the example alerts the user if the oldest src or dst " 

320 "snapshot named `prod_onsite_<timestamp>_hourly` is more than 30 + 60x36 minutes old [warning] or more than " 

321 "300 + 60x36 minutes old [critical], where 36 is the number of period cycles specified in `src_snapshot_plan` " 

322 "or `dst_snapshot_plan`, respectively. " 

323 "Analog for the latest snapshot named `prod_<timestamp>_daily`, and so on.\n\n" 

324 "Note: A duration that is missing or zero (e.g. '0 minutes') indicates that no snapshots shall be checked for " 

325 "the given snapshot name pattern.\n\n") 

326 locations = ["src", "dst"] 

327 for loc in locations: 

328 parser.add_argument( 

329 f"--ssh-{loc}-user", default="", metavar="STRING", 

330 help=f"Remote SSH username on {loc} hosts to connect to (optional). Examples: 'root', 'alice'.\n\n") 

331 for loc in locations: 

332 parser.add_argument( 

333 f"--ssh-{loc}-port", type=int, metavar="INT", 

334 help=f"Remote SSH port on {loc} host to connect to (optional).\n\n") 

335 for loc in locations: 

336 parser.add_argument( 

337 f"--ssh-{loc}-config-file", type=str, action=bzfs_main.argparse_actions.SSHConfigFileNameAction, metavar="FILE", 

338 help=f"Path to SSH ssh_config(5) file to connect to {loc} (optional); will be passed into ssh -F CLI. " 

339 "The basename must contain the substring 'bzfs_ssh_config'.\n\n") 

340 parser.add_argument( 

341 "--src-user", default="", metavar="STRING", 

342 help=argparse.SUPPRESS) # deprecated; was renamed to --ssh-src-user 

343 parser.add_argument( 

344 "--dst-user", default="", metavar="STRING", 

345 help=argparse.SUPPRESS) # deprecated; was renamed to --ssh-dst-user 

346 parser.add_argument( 

347 "--job-id", required=True, action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING", 

348 help="The identifier that remains constant across all runs of this particular job; will be included in the log file " 

349 "name infix. Example: mytestjob\n\n") 

350 parser.add_argument( 

351 "--jobid", default=None, action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING", 

352 help=argparse.SUPPRESS) # deprecated; was renamed to --job-run 

353 parser.add_argument( 

354 "--job-run", default=None, action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING", 

355 help="The identifier of this particular run of the overall job; will be included in the log file name suffix. " 

356 "Default is a hex UUID. Example: 0badc0f003a011f0a94aef02ac16083c\n\n") 

357 workers_default = 100 # percent 

358 parser.add_argument( 

359 "--workers", min=1, default=(workers_default, True), action=bzfs_main.argparse_actions.CheckPercentRange, 

360 metavar="INT[%]", 

361 help="The maximum number of jobs to run in parallel at any time; can be given as a positive integer, " 

362 f"optionally followed by the %% percent character (min: %(min)s, default: {workers_default}%%). Percentages " 

363 "are relative to the number of CPU cores on the machine. Example: 200%% uses twice as many parallel jobs as " 

364 "there are cores on the machine; 75%% uses num_procs = num_cores * 0.75. Examples: 1, 4, 75%%, 150%%\n\n") 

365 parser.add_argument( 

366 "--work-period-seconds", type=float, min=0, default=0, action=bzfs_main.check_range.CheckRange, metavar="FLOAT", 

367 help="Reduces bandwidth spikes by spreading out the start of worker jobs over this much time; " 

368 "0 disables this feature (default: %(default)s). Examples: 0, 60, 86400\n\n") 

369 parser.add_argument( 

370 "--jitter", action="store_true", 

371 help="Randomize job start time and host order to avoid potential thundering herd problems in large distributed " 

372 "systems (optional). Randomizing job start time is only relevant if --work-period-seconds > 0.\n\n") 

373 parser.add_argument( 

374 "--worker-timeout-seconds", type=float, min=0, default=None, action=bzfs_main.check_range.CheckRange, 

375 metavar="FLOAT", 

376 help="If this much time has passed after a worker process has started executing, kill the straggling worker " 

377 "(optional). Other workers remain unaffected. Examples: 60, 3600\n\n") 

378 parser.add_argument( 

379 "--spawn_process_per_job", action="store_true", 

380 help=argparse.SUPPRESS) 

381 parser.add_argument( 

382 "--jobrunner-dryrun", action="store_true", 

383 help="Do a dry run (aka 'no-op') to print what operations would happen if the command were to be executed " 

384 "for real (optional). This option treats both the ZFS source and destination as read-only. Can also be used to " 

385 "check if the configuration options are valid.\n\n") 

386 parser.add_argument( 

387 "--jobrunner-log-level", choices=["CRITICAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"], default="INFO", 

388 help="Only emit jobrunner messages with equal or higher priority than this log level. Default is '%(default)s'.\n\n") 

389 parser.add_argument( 

390 "--version", action="version", version=f"{PROG_NAME}-{bzfs_main.argparse_cli.__version__}, by {PROG_AUTHOR}", 

391 help="Display version information and exit.\n\n") 

392 parser.add_argument( 

393 "--help, -h", action="help", 

394 help="Show this help message and exit.\n\n") 

395 parser.add_argument( 

396 "--daemon-replication-frequency", default="minutely", metavar="STRING", 

397 help="Specifies how often the bzfs daemon shall replicate from src to dst if --daemon-lifetime is nonzero.\n\n") 

398 parser.add_argument( 

399 "--daemon-prune-src-frequency", default="minutely", metavar="STRING", 

400 help="Specifies how often the bzfs daemon shall prune src if --daemon-lifetime is nonzero.\n\n") 

401 parser.add_argument( 

402 "--daemon-prune-dst-frequency", default="minutely", metavar="STRING", 

403 help="Specifies how often the bzfs daemon shall prune dst if --daemon-lifetime is nonzero.\n\n") 

404 parser.add_argument( 

405 "--daemon-monitor-snapshots-frequency", default="minutely", metavar="STRING", 

406 help="Specifies how often the bzfs daemon shall monitor snapshot age if --daemon-lifetime is nonzero.\n\n") 

407 bad_opts = ["--daemon-frequency", "--include-snapshot-plan", "--create-src-snapshots-plan", "--skip-replication", 

408 "--log-file-prefix", "--log-file-infix", "--log-file-suffix", "--log-config-file", "--log-config-var", 

409 "--delete-dst-datasets", "--delete-dst-snapshots", "--delete-dst-snapshots-except", 

410 "--delete-dst-snapshots-except-plan", "--delete-empty-dst-datasets", 

411 "--monitor-snapshots", "--timeout"] 

412 for loc in locations: 

413 bad_opts += [f"--ssh-{loc}-host"] # reject this arg as jobrunner will auto-generate it 

414 for bad_opt in bad_opts: 

415 parser.add_argument(bad_opt, action=RejectArgumentAction, nargs=0, help=argparse.SUPPRESS) 

416 parser.add_argument( 

417 "--root-dataset-pairs", required=True, nargs="+", action=bzfs_main.argparse_actions.DatasetPairsAction, 

418 metavar="SRC_DATASET DST_DATASET", 

419 help="Source and destination dataset pairs (excluding usernames and excluding hostnames, which will all be " 

420 "auto-appended later).\n\n") 

421 return parser 

422 # fmt: on 

423 

424 

425############################################################################# 

426def main() -> None: 

427 """API for command line clients.""" 

428 Job().run_main(sys.argv) 

429 

430 

431############################################################################# 

432class Job: 

433 """Coordinates subjobs per the CLI flags; Each subjob handles one host pair and may run in its own process or thread.""" 

434 

435 def __init__(self, log: Logger | None = None) -> None: 

436 """Initialize caches, parsers and logger shared across subjobs.""" 

437 # immutable variables: 

438 self.jobrunner_dryrun: bool = False 

439 self.spawn_process_per_job: bool = False 

440 self.log: Logger = log if log is not None else get_simple_logger(PROG_NAME) 

441 self.bzfs_argument_parser: argparse.ArgumentParser = bzfs.argument_parser() 

442 self.argument_parser: argparse.ArgumentParser = argument_parser() 

443 self.loopback_address: str = _detect_loopback_address() 

444 

445 # mutable variables: 

446 self.first_exception: int | None = None 

447 self.stats: Stats = Stats() 

448 self.cache_existing_dst_pools: set[str] = set() 

449 self.cache_known_dst_pools: set[str] = set() 

450 

451 self.is_test_mode: bool = False # for testing only 

452 

453 def run_main(self, sys_argv: list[str]) -> None: 

454 """API for Python clients; visible for testing; may become a public API eventually.""" 

455 self.first_exception = None 

456 log: Logger = self.log 

457 log.info("CLI arguments: %s", " ".join(sys_argv)) 

458 nsp = argparse.Namespace(no_argument_file=True) # disable --root-dataset-pairs='+file' option in DatasetPairsAction 

459 args, unknown_args = self.argument_parser.parse_known_args(sys_argv[1:], nsp) # forward all unknown args to `bzfs` 

460 log.setLevel(args.jobrunner_log_level) 

461 self.jobrunner_dryrun = args.jobrunner_dryrun 

462 assert len(args.root_dataset_pairs) > 0 

463 src_snapshot_plan: dict = self.validate_snapshot_plan(literal_eval(args.src_snapshot_plan), "--src-snapshot-plan") 

464 src_bookmark_plan: dict = self.validate_snapshot_plan(literal_eval(args.src_bookmark_plan), "--src-bookmark-plan") 

465 dst_snapshot_plan: dict = self.validate_snapshot_plan(literal_eval(args.dst_snapshot_plan), "--dst-snapshot-plan") 

466 monitor_snapshot_plan: dict = self.validate_monitor_snapshot_plan(literal_eval(args.monitor_snapshot_plan)) 

467 localhostname: str = args.localhost if args.localhost else socket.gethostname() 

468 self.validate_host_name(localhostname, "--localhost") 

469 log.debug("localhostname: %s", localhostname) 

470 src_hosts: list[str] = self.validate_src_hosts( 

471 literal_eval(args.src_hosts if args.src_hosts is not None else sys.stdin.read()) 

472 ) 

473 basis_src_hosts: list[str] = src_hosts 

474 nb_src_hosts: int = len(basis_src_hosts) 

475 log.debug("src_hosts before subsetting: %s", src_hosts) 

476 if args.src_host is not None: # retain only the src hosts that are also contained in args.src_host 

477 assert isinstance(args.src_host, list) 

478 retain_src_hosts: set[str] = set(args.src_host) 

479 self.validate_is_subset(retain_src_hosts, src_hosts, "--src-host", "--src-hosts") 

480 src_hosts = [host for host in src_hosts if host in retain_src_hosts] 

481 dst_hosts: dict[str, list[str]] = self.validate_dst_hosts(literal_eval(args.dst_hosts)) 

482 nb_dst_hosts: int = len(dst_hosts) 

483 if args.dst_host is not None: # retain only the dst hosts that are also contained in args.dst_host 

484 assert isinstance(args.dst_host, list) 

485 retain_dst_hosts: set[str] = set(args.dst_host) 

486 self.validate_is_subset(retain_dst_hosts, dst_hosts.keys(), "--dst-host", "--dst-hosts.keys") 

487 dst_hosts = {dst_host: lst for dst_host, lst in dst_hosts.items() if dst_host in retain_dst_hosts} 

488 retain_dst_targets: dict[str, list[str]] = self.validate_dst_hosts(literal_eval(args.retain_dst_targets)) 

489 self.validate_is_subset(dst_hosts.keys(), retain_dst_targets.keys(), "--dst-hosts.keys", "--retain-dst-targets.keys") 

490 dst_root_datasets: dict[str, str] = self.validate_dst_root_datasets(literal_eval(args.dst_root_datasets)) 

491 self.validate_is_subset( 

492 dst_root_datasets.keys(), retain_dst_targets.keys(), "--dst-root-dataset.keys", "--retain-dst-targets.keys" 

493 ) 

494 self.validate_is_subset(dst_hosts.keys(), dst_root_datasets.keys(), "--dst-hosts.keys", "--dst-root-dataset.keys") 

495 bad_root_datasets: dict[str, str] = { 

496 dst_host: root_dataset 

497 for dst_host in sorted(dst_hosts.keys()) 

498 if SRC_MAGIC_SUBSTITUTION_TOKEN not in (root_dataset := dst_root_datasets[dst_host]) 

499 } 

500 if len(src_hosts) > 1 and len(bad_root_datasets) > 0: 

501 self.die( 

502 "Cowardly refusing to proceed as multiple source hosts must not be configured to write to the same " 

503 "destination dataset. " 

504 f"Problematic subset of --dst-root-datasets: {bad_root_datasets} for src_hosts: {sorted(src_hosts)}" 

505 ) 

506 bad_root_datasets = { 

507 dst_host: root_dataset 

508 for dst_host, root_dataset in sorted(dst_root_datasets.items()) 

509 if root_dataset and SRC_MAGIC_SUBSTITUTION_TOKEN not in root_dataset 

510 } 

511 if len(basis_src_hosts) > 1 and len(bad_root_datasets) > 0: 

512 self.die( 

513 "Cowardly refusing to proceed as multiple source hosts are defined in the configuration, but " 

514 f"not all non-empty root datasets in --dst-root-datasets contain the '{SRC_MAGIC_SUBSTITUTION_TOKEN}' " 

515 "substitution token to prevent collisions on writing destination datasets. " 

516 f"Problematic subset of --dst-root-datasets: {bad_root_datasets} for src_hosts: {sorted(basis_src_hosts)}" 

517 ) 

518 if args.jitter: # randomize host order to avoid potential thundering herd problems in large distributed systems 

519 random.shuffle(src_hosts) 

520 dst_hosts = shuffle_dict(dst_hosts) 

521 ssh_src_user: str = args.ssh_src_user if args.ssh_src_user is not None else args.src_user # --src-user is deprecated 

522 ssh_dst_user: str = args.ssh_dst_user if args.ssh_dst_user is not None else args.dst_user # --dst-user is deprecated 

523 ssh_src_port: int | None = args.ssh_src_port 

524 ssh_dst_port: int | None = args.ssh_dst_port 

525 ssh_src_config_file: str | None = args.ssh_src_config_file 

526 ssh_dst_config_file: str | None = args.ssh_dst_config_file 

527 job_id: str = _sanitize(args.job_id) 

528 job_run: str = args.job_run if args.job_run is not None else args.jobid # --jobid deprecat; was renamed to --job-run 

529 job_run = _sanitize(job_run) if job_run else uuid.uuid1().hex 

530 workers, workers_is_percent = args.workers 

531 max_workers: int = max(1, round((os.cpu_count() or 1) * workers / 100.0) if workers_is_percent else round(workers)) 

532 worker_timeout_seconds: int = args.worker_timeout_seconds 

533 self.spawn_process_per_job = args.spawn_process_per_job 

534 username: str = pwd.getpwuid(os.geteuid()).pw_name 

535 assert username 

536 localhost_ids: set[str] = {"localhost", "127.0.0.1", "::1", socket.gethostname()} # ::1 is IPv6 loopback address 

537 localhost_ids.update(self.get_localhost_ips()) # union 

538 localhost_ids.add(localhostname) 

539 

540 def zero_pad(number: int, width: int = 6) -> str: 

541 """Pads number with leading '0' chars to the given width.""" 

542 return f"{number:0{width}d}" 

543 

544 def jpad(jj: int, tag: str) -> str: 

545 """Returns ``tag`` prefixed with slash and zero padded index.""" 

546 return "/" + zero_pad(jj) + tag 

547 

548 def npad() -> str: 

549 """Returns standardized subjob count suffix.""" 

550 return SEP + zero_pad(len(subjobs)) 

551 

552 def update_subjob_name(tag: str) -> str: 

553 """Derives next subjob name based on ``tag`` and index ``j``.""" 

554 if j <= 0: 

555 return subjob_name 

556 elif j == 1: 

557 return subjob_name + jpad(j - 1, tag) 

558 else: 

559 return subjob_name + "/" + BARRIER_CHAR 

560 

561 def resolve_dataset(hostname: str, dataset: str, is_src: bool = True) -> str: 

562 """Returns host:dataset string resolving IPv6 and localhost cases.""" 

563 ssh_user = ssh_src_user if is_src else ssh_dst_user 

564 ssh_user = ssh_user if ssh_user else username 

565 lb: str = self.loopback_address 

566 lhi: set[str] = localhost_ids 

567 hostname = hostname if hostname not in lhi else (lb if lb else hostname) if username != ssh_user else "-" 

568 hostname = convert_ipv6(hostname) 

569 return f"{hostname}:{dataset}" 

570 

571 def resolve_dst_dataset(dst_hostname: str, dst_dataset: str) -> str: 

572 """Expands ``dst_dataset`` relative to ``dst_hostname`` roots.""" 

573 root_dataset: str | None = dst_root_datasets.get(dst_hostname) 

574 assert root_dataset is not None, dst_hostname # f"Hostname '{dst_hostname}' missing in --dst-root-datasets" 

575 root_dataset = root_dataset.replace(SRC_MAGIC_SUBSTITUTION_TOKEN, src_host) 

576 root_dataset = root_dataset.replace(DST_MAGIC_SUBSTITUTION_TOKEN, dst_hostname) 

577 resolved_dst_dataset: str = root_dataset + "/" + dst_dataset if root_dataset else dst_dataset 

578 bzfs_main.utils.validate_dataset_name(resolved_dst_dataset, dst_dataset) 

579 return resolve_dataset(dst_hostname, resolved_dst_dataset, is_src=False) 

580 

581 dummy: str = DUMMY_DATASET 

582 lhn: str = localhostname 

583 bzfs_prog_header: list[str] = [BZFS_PROG_NAME, "--no-argument-file"] + unknown_args 

584 subjobs: dict[str, list[str]] = {} 

585 for i, src_host in enumerate(src_hosts): 

586 subjob_name: str = zero_pad(i) + "src-host" 

587 j: int = 0 

588 opts: list[str] 

589 

590 if args.create_src_snapshots: 

591 opts = ["--create-src-snapshots", f"--create-src-snapshots-plan={src_snapshot_plan}", "--skip-replication"] 

592 opts += [f"--log-file-prefix={PROG_NAME}{SEP}create-src-snapshots{SEP}"] 

593 opts += [f"--log-file-infix={SEP}{job_id}"] 

594 opts += [f"--log-file-suffix={SEP}{job_run}{npad()}{_log_suffix(lhn, src_host, '')}{SEP}"] 

595 self.add_ssh_opts( 

596 opts, ssh_src_user=ssh_src_user, ssh_src_port=ssh_src_port, ssh_src_config_file=ssh_src_config_file 

597 ) 

598 opts += ["--"] 

599 opts += _flatten(_dedupe([(resolve_dataset(src_host, src), dummy) for src, dst in args.root_dataset_pairs])) 

600 subjob_name += "/create-src-snapshots" 

601 subjobs[subjob_name] = bzfs_prog_header + opts 

602 

603 if args.replicate: 

604 j = 0 

605 marker: str = "replicate" 

606 for dst_hostname, targets in dst_hosts.items(): 

607 opts = self.replication_opts( 

608 dst_snapshot_plan, set(targets), lhn, src_host, dst_hostname, marker, job_id, job_run + npad() 

609 ) 

610 if len(opts) > 0: 

611 opts += [f"--daemon-frequency={args.daemon_replication_frequency}"] 

612 self.add_ssh_opts( 

613 opts, 

614 ssh_src_user=ssh_src_user, 

615 ssh_dst_user=ssh_dst_user, 

616 ssh_src_port=ssh_src_port, 

617 ssh_dst_port=ssh_dst_port, 

618 ssh_src_config_file=ssh_src_config_file, 

619 ssh_dst_config_file=ssh_dst_config_file, 

620 ) 

621 opts += ["--"] 

622 dataset_pairs: list[tuple[str, str]] = [ 

623 (resolve_dataset(src_host, src), resolve_dst_dataset(dst_hostname, dst)) 

624 for src, dst in args.root_dataset_pairs 

625 ] 

626 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs) 

627 if len(dataset_pairs) > 0: 

628 subjobs[subjob_name + jpad(j, marker)] = bzfs_prog_header + opts + _flatten(dataset_pairs) 

629 j += 1 

630 subjob_name = update_subjob_name(marker) 

631 

632 def prune_src(opts: list[str], retention_plan: dict, tag: str, src_host: str = src_host) -> None: 

633 """Creates prune subjob options for ``tag`` using ``retention_plan``.""" 

634 opts += [ 

635 "--skip-replication", 

636 f"--delete-dst-snapshots-except-plan={retention_plan}", 

637 f"--log-file-prefix={PROG_NAME}{SEP}{tag}{SEP}", 

638 f"--log-file-infix={SEP}{job_id}", 

639 f"--log-file-suffix={SEP}{job_run}{npad()}{_log_suffix(lhn, src_host, '')}{SEP}", 

640 f"--daemon-frequency={args.daemon_prune_src_frequency}", 

641 ] 

642 self.add_ssh_opts( # i.e. dst=src, src=dummy 

643 opts, ssh_dst_user=ssh_src_user, ssh_dst_port=ssh_src_port, ssh_dst_config_file=ssh_src_config_file 

644 ) 

645 opts += ["--"] 

646 opts += _flatten(_dedupe([(dummy, resolve_dataset(src_host, src)) for src, dst in args.root_dataset_pairs])) 

647 nonlocal subjob_name 

648 subjob_name += f"/{tag}" 

649 subjobs[subjob_name] = bzfs_prog_header + opts 

650 

651 if args.prune_src_snapshots: 

652 prune_src(["--delete-dst-snapshots"], src_snapshot_plan, "prune-src-snapshots") 

653 

654 if args.prune_src_bookmarks: 

655 prune_src(["--delete-dst-snapshots=bookmarks"], src_bookmark_plan, "prune-src-bookmarks") 

656 

657 if args.prune_dst_snapshots: 

658 self.validate_true( 

659 retain_dst_targets, "--retain-dst-targets must not be empty. Cowardly refusing to delete all snapshots!" 

660 ) 

661 j = 0 

662 marker = "prune-dst-snapshots" 

663 for dst_hostname, _ in dst_hosts.items(): 

664 curr_retain_targets: set[str] = set(retain_dst_targets[dst_hostname]) 

665 curr_dst_snapshot_plan = { # only retain targets that belong to the host 

666 org: {target: periods for target, periods in target_periods.items() if target in curr_retain_targets} 

667 for org, target_periods in dst_snapshot_plan.items() 

668 } 

669 opts = ["--delete-dst-snapshots", "--skip-replication"] 

670 opts += [f"--delete-dst-snapshots-except-plan={curr_dst_snapshot_plan}"] 

671 opts += [f"--log-file-prefix={PROG_NAME}{SEP}{marker}{SEP}"] 

672 opts += [f"--log-file-infix={SEP}{job_id}"] 

673 opts += [f"--log-file-suffix={SEP}{job_run}{npad()}{_log_suffix(lhn, src_host, dst_hostname)}{SEP}"] 

674 opts += [f"--daemon-frequency={args.daemon_prune_dst_frequency}"] 

675 self.add_ssh_opts( 

676 opts, ssh_dst_user=ssh_dst_user, ssh_dst_port=ssh_dst_port, ssh_dst_config_file=ssh_dst_config_file 

677 ) 

678 opts += ["--"] 

679 dataset_pairs = [(dummy, resolve_dst_dataset(dst_hostname, dst)) for src, dst in args.root_dataset_pairs] 

680 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs) 

681 if len(dataset_pairs) > 0: 

682 subjobs[subjob_name + jpad(j, marker)] = bzfs_prog_header + opts + _flatten(dataset_pairs) 

683 j += 1 

684 subjob_name = update_subjob_name(marker) 

685 

686 def monitor_snapshots_opts(tag: str, monitor_plan: dict, logsuffix: str) -> list[str]: 

687 """Returns monitor subjob options for ``tag`` and ``monitor_plan``.""" 

688 opts = [f"--monitor-snapshots={monitor_plan}", "--skip-replication"] 

689 opts += [f"--log-file-prefix={PROG_NAME}{SEP}{tag}{SEP}"] 

690 opts += [f"--log-file-infix={SEP}{job_id}"] 

691 opts += [f"--log-file-suffix={SEP}{job_run}{npad()}{logsuffix}{SEP}"] 

692 opts += [f"--daemon-frequency={args.daemon_monitor_snapshots_frequency}"] 

693 return opts 

694 

695 def build_monitor_plan(monitor_plan: dict, snapshot_plan: dict, cycles_prefix: str) -> dict: 

696 """Expands ``monitor_plan`` with cycle defaults from ``snapshot_plan``.""" 

697 

698 def alert_dicts(alertdict: dict, cycles: int) -> dict: 

699 """Returns alert dictionaries with explicit ``cycles`` value.""" 

700 latest_dict = alertdict.copy() 

701 for prefix in ("src_snapshot_", "dst_snapshot_", ""): 

702 latest_dict.pop(f"{prefix}cycles", None) 

703 oldest_dict = latest_dict.copy() 

704 oldest_dict["cycles"] = int(alertdict.get(f"{cycles_prefix}cycles", cycles)) 

705 return {"latest": latest_dict, "oldest": oldest_dict} 

706 

707 return { 

708 org: { 

709 target: { 

710 periodunit: alert_dicts(alertdict, snapshot_plan.get(org, {}).get(target, {}).get(periodunit, 1)) 

711 for periodunit, alertdict in periods.items() 

712 } 

713 for target, periods in target_periods.items() 

714 } 

715 for org, target_periods in monitor_plan.items() 

716 } 

717 

718 if args.monitor_src_snapshots: 

719 marker = "monitor-src-snapshots" 

720 monitor_plan = build_monitor_plan(monitor_snapshot_plan, src_snapshot_plan, "src_snapshot_") 

721 opts = monitor_snapshots_opts(marker, monitor_plan, _log_suffix(lhn, src_host, "")) 

722 self.add_ssh_opts( # i.e. dst=src, src=dummy 

723 opts, ssh_dst_user=ssh_src_user, ssh_dst_port=ssh_src_port, ssh_dst_config_file=ssh_src_config_file 

724 ) 

725 opts += ["--"] 

726 opts += _flatten(_dedupe([(dummy, resolve_dataset(src_host, src)) for src, dst in args.root_dataset_pairs])) 

727 subjob_name += "/" + marker 

728 subjobs[subjob_name] = bzfs_prog_header + opts 

729 

730 if args.monitor_dst_snapshots: 

731 j = 0 

732 marker = "monitor-dst-snapshots" 

733 for dst_hostname, targets in dst_hosts.items(): 

734 monitor_targets = set(targets).intersection(set(retain_dst_targets[dst_hostname])) 

735 monitor_plan = { # only retain targets that belong to the host 

736 org: {target: periods for target, periods in target_periods.items() if target in monitor_targets} 

737 for org, target_periods in monitor_snapshot_plan.items() 

738 } 

739 monitor_plan = build_monitor_plan(monitor_plan, dst_snapshot_plan, "dst_snapshot_") 

740 opts = monitor_snapshots_opts(marker, monitor_plan, _log_suffix(lhn, src_host, dst_hostname)) 

741 self.add_ssh_opts( 

742 opts, ssh_dst_user=ssh_dst_user, ssh_dst_port=ssh_dst_port, ssh_dst_config_file=ssh_dst_config_file 

743 ) 

744 opts += ["--"] 

745 dataset_pairs = [(dummy, resolve_dst_dataset(dst_hostname, dst)) for src, dst in args.root_dataset_pairs] 

746 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs) 

747 if len(dataset_pairs) > 0: 

748 subjobs[subjob_name + jpad(j, marker)] = bzfs_prog_header + opts + _flatten(dataset_pairs) 

749 j += 1 

750 subjob_name = update_subjob_name(marker) 

751 

752 msg = "Ready to run %s subjobs using %s/%s src hosts: %s, %s/%s dst hosts: %s" 

753 log.info( 

754 msg, len(subjobs), len(src_hosts), nb_src_hosts, src_hosts, len(dst_hosts), nb_dst_hosts, list(dst_hosts.keys()) 

755 ) 

756 log.log(LOG_TRACE, "subjobs: \n%s", _pretty_print_formatter(subjobs)) 

757 self.run_subjobs(subjobs, max_workers, worker_timeout_seconds, args.work_period_seconds, args.jitter) 

758 ex = self.first_exception 

759 if isinstance(ex, int): 

760 assert ex != 0 

761 sys.exit(ex) 

762 assert ex is None, ex 

763 log.info("Succeeded. Bye!") 

764 

765 def replication_opts( 

766 self, 

767 dst_snapshot_plan: dict[str, dict[str, dict[str, int]]], 

768 targets: set[str], 

769 localhostname: str, 

770 src_hostname: str, 

771 dst_hostname: str, 

772 tag: str, 

773 job_id: str, 

774 job_run: str, 

775 ) -> list[str]: 

776 """Returns CLI options for one replication subjob.""" 

777 log = self.log 

778 log.debug("%s", f"Replicating targets {sorted(targets)} from {src_hostname} to {dst_hostname} ...") 

779 include_snapshot_plan = { # only replicate targets that belong to the destination host and are relevant 

780 org: { 

781 target: { 

782 duration_unit: duration_amount 

783 for duration_unit, duration_amount in periods.items() 

784 if duration_amount > 0 

785 } 

786 for target, periods in target_periods.items() 

787 if target in targets 

788 } 

789 for org, target_periods in dst_snapshot_plan.items() 

790 } 

791 include_snapshot_plan = { # only replicate orgs that have at least one relevant target_period 

792 org: target_periods 

793 for org, target_periods in include_snapshot_plan.items() 

794 if any(len(periods) > 0 for target, periods in target_periods.items()) 

795 } 

796 opts: list[str] = [] 

797 if len(include_snapshot_plan) > 0: 

798 opts += [f"--include-snapshot-plan={include_snapshot_plan}"] 

799 opts += [f"--log-file-prefix={PROG_NAME}{SEP}{tag}{SEP}"] 

800 opts += [f"--log-file-infix={SEP}{job_id}"] 

801 opts += [f"--log-file-suffix={SEP}{job_run}{_log_suffix(localhostname, src_hostname, dst_hostname)}{SEP}"] 

802 return opts 

803 

804 def skip_nonexisting_local_dst_pools(self, root_dataset_pairs: list[tuple[str, str]]) -> list[tuple[str, str]]: 

805 """Skip datasets that point to removable destination drives that are not currently (locally) attached, if any.""" 

806 

807 def zpool(dataset: str) -> str: 

808 """Returns pool name portion of ``dataset``.""" 

809 return dataset.split("/", 1)[0] 

810 

811 assert len(root_dataset_pairs) > 0 

812 unknown_dst_pools = {zpool(dst) for src, dst in root_dataset_pairs} 

813 unknown_dst_pools = unknown_dst_pools.difference(self.cache_known_dst_pools) 

814 

815 # Here we treat a zpool as existing if the zpool isn't local, aka if it isn't prefixed with "-:". A remote host 

816 # will raise an appropriate error if it turns out that the remote zpool doesn't actually exist. 

817 unknown_local_dst_pools = {pool for pool in unknown_dst_pools if pool.startswith("-:")} 

818 if len(unknown_local_dst_pools) > 0: # `zfs list` if local 

819 existing_pools = {pool[len("-:") :] for pool in unknown_local_dst_pools} 

820 cmd = "zfs list -t filesystem,volume -Hp -o name".split(" ") + sorted(existing_pools) 

821 sp = subprocess.run(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, text=True) 

822 if sp.returncode not in (0, 1): # 1 means dataset not found 

823 self.die(f"Unexpected error {sp.returncode} on checking for existing local dst pools: {sp.stderr.strip()}") 

824 existing_pools = {"-:" + pool for pool in sp.stdout.splitlines()} 

825 self.cache_existing_dst_pools.update(existing_pools) # union 

826 unknown_remote_dst_pools = unknown_dst_pools.difference(unknown_local_dst_pools) 

827 self.cache_existing_dst_pools.update(unknown_remote_dst_pools) # union 

828 self.cache_known_dst_pools.update(unknown_dst_pools) # union 

829 results: list[tuple[str, str]] = [] 

830 for src, dst in root_dataset_pairs: 

831 if zpool(dst) in self.cache_existing_dst_pools: 

832 results.append((src, dst)) 

833 else: 

834 self.log.warning("Skipping dst dataset for which local dst pool does not exist: %s", dst) 

835 return results 

836 

837 @staticmethod 

838 def add_ssh_opts( 

839 opts: list[str], 

840 ssh_src_user: str | None = None, 

841 ssh_dst_user: str | None = None, 

842 ssh_src_port: int | None = None, 

843 ssh_dst_port: int | None = None, 

844 ssh_src_config_file: str | None = None, 

845 ssh_dst_config_file: str | None = None, 

846 ) -> None: 

847 """Appends ssh related options to ``opts`` if specified.""" 

848 assert isinstance(opts, list) 

849 opts += [f"--ssh-src-user={ssh_src_user}"] if ssh_src_user else [] 

850 opts += [f"--ssh-dst-user={ssh_dst_user}"] if ssh_dst_user else [] 

851 opts += [f"--ssh-src-port={ssh_src_port}"] if ssh_src_port is not None else [] 

852 opts += [f"--ssh-dst-port={ssh_dst_port}"] if ssh_dst_port is not None else [] 

853 opts += [f"--ssh-src-config-file={ssh_src_config_file}"] if ssh_src_config_file else [] 

854 opts += [f"--ssh-dst-config-file={ssh_dst_config_file}"] if ssh_dst_config_file else [] 

855 

856 def run_subjobs( 

857 self, 

858 subjobs: dict[str, list[str]], 

859 max_workers: int, 

860 timeout_secs: float | None, 

861 work_period_seconds: float, 

862 jitter: bool, 

863 ) -> None: 

864 """Executes subjobs sequentially or in parallel, respecting barriers.""" 

865 self.stats = Stats() 

866 self.stats.jobs_all = len(subjobs) 

867 log = self.log 

868 num_intervals = 1 + len(subjobs) if jitter else len(subjobs) 

869 interval_nanos = 0 if len(subjobs) == 0 else round(1_000_000_000 * max(0.0, work_period_seconds) / num_intervals) 

870 assert interval_nanos >= 0 

871 if jitter: # randomize job start time to avoid potential thundering herd problems in large distributed systems 

872 sleep_nanos = random.randint(0, interval_nanos) # noqa: S311 

873 log.info("Jitter: Delaying job start time by sleeping for %s ...", human_readable_duration(sleep_nanos)) 

874 time.sleep(sleep_nanos / 1_000_000_000) # seconds 

875 sorted_subjobs = sorted(subjobs.keys()) 

876 has_barrier = any(BARRIER_CHAR in subjob.split("/") for subjob in sorted_subjobs) 

877 if self.spawn_process_per_job or has_barrier or bzfs.has_siblings(sorted_subjobs): # siblings can run in parallel 

878 log.log(LOG_TRACE, "%s", "spawn_process_per_job: True") 

879 process_datasets_in_parallel_and_fault_tolerant( 

880 log=log, 

881 datasets=sorted_subjobs, 

882 process_dataset=lambda subjob, tid, retry: self.run_subjob( 

883 subjobs[subjob], name=subjob, timeout_secs=timeout_secs, spawn_process_per_job=True 

884 ) 

885 == 0, 

886 skip_tree_on_error=lambda subjob: True, 

887 skip_on_error=SKIP_ON_ERROR_DEFAULT, 

888 max_workers=max_workers, 

889 interval_nanos=lambda subjob: interval_nanos, 

890 task_name="Subjob", 

891 retry_policy=None, # no retries 

892 dry_run=False, 

893 is_test_mode=self.is_test_mode, 

894 ) 

895 else: 

896 log.log(LOG_TRACE, "%s", "spawn_process_per_job: False") 

897 next_update_nanos = time.monotonic_ns() 

898 for subjob in sorted_subjobs: 

899 time.sleep(max(0, next_update_nanos - time.monotonic_ns()) / 1_000_000_000) # seconds 

900 next_update_nanos += interval_nanos 

901 s = subjob 

902 if not self.run_subjob(subjobs[subjob], name=s, timeout_secs=timeout_secs, spawn_process_per_job=False) == 0: 

903 break 

904 stats = self.stats 

905 jobs_skipped = stats.jobs_all - stats.jobs_started 

906 msg = f"{stats}, skipped:" + percent(jobs_skipped, total=stats.jobs_all) 

907 log.info("Final Progress: %s", msg) 

908 assert stats.jobs_running == 0, msg 

909 assert stats.jobs_completed == stats.jobs_started, msg 

910 skipped_jobs_dict = {subjob: subjobs[subjob] for subjob in sorted_subjobs if subjob not in stats.started_job_names} 

911 if len(skipped_jobs_dict) > 0: 

912 log.debug("Skipped subjobs: \n%s", _pretty_print_formatter(skipped_jobs_dict)) 

913 assert jobs_skipped == len(skipped_jobs_dict), msg 

914 

915 def run_subjob( 

916 self, cmd: list[str], name: str, timeout_secs: float | None, spawn_process_per_job: bool 

917 ) -> int | None: # thread-safe 

918 """Executes one worker job and updates shared Stats.""" 

919 start_time_nanos = time.monotonic_ns() 

920 returncode = None 

921 log = self.log 

922 cmd_str = " ".join(cmd) 

923 stats = self.stats 

924 try: 

925 with stats.lock: 

926 stats.jobs_started += 1 

927 stats.jobs_running += 1 

928 stats.started_job_names.add(name) 

929 msg = str(stats) 

930 log.log(LOG_TRACE, "Starting worker job: %s", cmd_str) 

931 log.info("Progress: %s", msg) 

932 start_time_nanos = time.monotonic_ns() 

933 if spawn_process_per_job: 

934 returncode = self.run_worker_job_spawn_process_per_job(cmd, timeout_secs) 

935 else: 

936 returncode = self.run_worker_job_in_current_thread(cmd, timeout_secs) 

937 except BaseException as e: 

938 log.error("Worker job failed with unexpected exception: %s for command: %s", e, cmd_str) 

939 raise 

940 else: 

941 elapsed_nanos = time.monotonic_ns() - start_time_nanos 

942 elapsed_human = human_readable_duration(elapsed_nanos) 

943 if returncode != 0: 

944 with stats.lock: 

945 if self.first_exception is None: 

946 self.first_exception = DIE_STATUS if returncode is None else returncode 

947 log.error("Worker job failed with exit code %s in %s: %s", returncode, elapsed_human, cmd_str) 

948 else: 

949 log.debug("Worker job succeeded in %s: %s", elapsed_human, cmd_str) 

950 return returncode 

951 finally: 

952 elapsed_nanos = time.monotonic_ns() - start_time_nanos 

953 with stats.lock: 

954 stats.jobs_running -= 1 

955 stats.jobs_completed += 1 

956 stats.sum_elapsed_nanos += elapsed_nanos 

957 stats.jobs_failed += 1 if returncode != 0 else 0 

958 msg = str(stats) 

959 assert stats.jobs_all >= 0, msg 

960 assert stats.jobs_started >= 0, msg 

961 assert stats.jobs_completed >= 0, msg 

962 assert stats.jobs_failed >= 0, msg 

963 assert stats.jobs_running >= 0, msg 

964 assert stats.sum_elapsed_nanos >= 0, msg 

965 assert stats.jobs_failed <= stats.jobs_completed, msg 

966 assert stats.jobs_completed <= stats.jobs_started, msg 

967 assert stats.jobs_started <= stats.jobs_all, msg 

968 log.info("Progress: %s", msg) 

969 

970 def run_worker_job_in_current_thread(self, cmd: list[str], timeout_secs: float | None) -> int | None: 

971 """Runs ``bzfs`` in-process and return its exit code.""" 

972 log = self.log 

973 if timeout_secs is not None: 

974 cmd = cmd[0:1] + [f"--timeout={round(1000 * timeout_secs)}milliseconds"] + cmd[1:] 

975 try: 

976 if not self.jobrunner_dryrun: 

977 self._bzfs_run_main(cmd) 

978 return 0 

979 except subprocess.CalledProcessError as e: 

980 return e.returncode 

981 except SystemExit as e: 

982 assert e.code is None or isinstance(e.code, int) 

983 return e.code 

984 except BaseException: 

985 log.exception("Worker job failed with unexpected exception for command: %s", " ".join(cmd)) 

986 return DIE_STATUS 

987 

988 def _bzfs_run_main(self, cmd: list[str]) -> None: 

989 """Delegate execution to :mod:`bzfs` using parsed arguments.""" 

990 bzfs_job = bzfs.Job() 

991 bzfs_job.is_test_mode = self.is_test_mode 

992 bzfs_job.run_main(self.bzfs_argument_parser.parse_args(cmd[1:]), cmd) 

993 

994 def run_worker_job_spawn_process_per_job(self, cmd: list[str], timeout_secs: float | None) -> int | None: 

995 """Spawns a subprocess for the worker job and waits for completion.""" 

996 log = self.log 

997 if len(cmd) > 0 and cmd[0] == BZFS_PROG_NAME: 

998 cmd = [sys.executable, "-m", "bzfs_main." + cmd[0]] + cmd[1:] 

999 if self.jobrunner_dryrun: 

1000 return 0 

1001 proc = subprocess.Popen(cmd, stdin=subprocess.DEVNULL, text=True) # run job in a separate subprocess 

1002 try: 

1003 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to exit 

1004 except subprocess.TimeoutExpired: 

1005 cmd_str = " ".join(cmd) 

1006 log.error("%s", f"Terminating worker job as it failed to complete within {timeout_secs}s: {cmd_str}") 

1007 proc.terminate() # Sends SIGTERM signal to job subprocess 

1008 assert isinstance(timeout_secs, float) 

1009 timeout_secs = min(1.0, timeout_secs) 

1010 try: 

1011 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to exit 

1012 except subprocess.TimeoutExpired: 

1013 log.error("%s", f"Killing worker job as it failed to terminate within {timeout_secs}s: {cmd_str}") 

1014 proc.kill() # Sends SIGKILL signal to job subprocess because SIGTERM wasn't enough 

1015 timeout_secs = min(0.025, timeout_secs) 

1016 with contextlib.suppress(subprocess.TimeoutExpired): 

1017 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to exit 

1018 return proc.returncode 

1019 

1020 def validate_src_hosts(self, src_hosts: list[str]) -> list[str]: 

1021 """Checks ``src_hosts`` contains valid hostnames.""" 

1022 context = "--src-hosts" 

1023 self.validate_type(src_hosts, list, context) 

1024 for src_hostname in src_hosts: 

1025 self.validate_host_name(src_hostname, context) 

1026 return src_hosts 

1027 

1028 def validate_dst_hosts(self, dst_hosts: dict[str, list[str]]) -> dict[str, list[str]]: 

1029 """Checks destination hosts dictionary.""" 

1030 context = "--dst-hosts" 

1031 self.validate_type(dst_hosts, dict, context) 

1032 for dst_hostname, targets in dst_hosts.items(): 

1033 self.validate_host_name(dst_hostname, context) 

1034 self.validate_type(targets, list, f"{context} targets") 

1035 for target in targets: 

1036 self.validate_type(target, str, f"{context} target") 

1037 return dst_hosts 

1038 

1039 def validate_dst_root_datasets(self, dst_root_datasets: dict[str, str]) -> dict[str, str]: 

1040 """Checks that each destination root dataset string is valid.""" 

1041 context = "--dst-root-datasets" 

1042 self.validate_type(dst_root_datasets, dict, context) 

1043 for dst_hostname, dst_root_dataset in dst_root_datasets.items(): 

1044 self.validate_host_name(dst_hostname, context) 

1045 self.validate_type(dst_root_dataset, str, f"{context} root dataset") 

1046 return dst_root_datasets 

1047 

1048 def validate_snapshot_plan( 

1049 self, snapshot_plan: dict[str, dict[str, dict[str, int]]], context: str 

1050 ) -> dict[str, dict[str, dict[str, int]]]: 

1051 """Checks snapshot plan structure and value types.""" 

1052 self.validate_type(snapshot_plan, dict, context) 

1053 for org, target_periods in snapshot_plan.items(): 

1054 self.validate_type(org, str, f"{context} org") 

1055 self.validate_type(target_periods, dict, f"{context} target_periods") 

1056 for target, periods in target_periods.items(): 

1057 self.validate_type(target, str, f"{context} org/target") 

1058 self.validate_type(periods, dict, f"{context} org/periods") 

1059 for period_unit, period_amount in periods.items(): 

1060 self.validate_non_empty_string(period_unit, f"{context} org/target/period_unit") 

1061 self.validate_non_negative_int(period_amount, f"{context} org/target/period_amount") 

1062 return snapshot_plan 

1063 

1064 def validate_monitor_snapshot_plan( 

1065 self, monitor_snapshot_plan: dict[str, dict[str, dict[str, dict[str, str | int]]]] 

1066 ) -> dict[str, dict[str, dict[str, dict[str, str | int]]]]: 

1067 """Checks snapshot monitoring plan configuration.""" 

1068 context = "--monitor-snapshot-plan" 

1069 self.validate_type(monitor_snapshot_plan, dict, context) 

1070 for org, target_periods in monitor_snapshot_plan.items(): 

1071 self.validate_type(org, str, f"{context} org") 

1072 self.validate_type(target_periods, dict, f"{context} target_periods") 

1073 for target, periods in target_periods.items(): 

1074 self.validate_type(target, str, f"{context} org/target") 

1075 self.validate_type(periods, dict, f"{context} org/periods") 

1076 for period_unit, alert_dict in periods.items(): 

1077 self.validate_non_empty_string(period_unit, f"{context} org/target/period_unit") 

1078 self.validate_type(alert_dict, dict, f"{context} org/target/alert_dict") 

1079 for key, value in alert_dict.items(): 

1080 self.validate_non_empty_string(key, f"{context} org/target/alert_dict/key") 

1081 self.validate_type(value, Union[str, int], f"{context} org/target/alert_dict/value") 

1082 return monitor_snapshot_plan 

1083 

1084 def validate_is_subset(self, x: Iterable[str], y: Iterable[str], x_name: str, y_name: str) -> None: 

1085 """Raises error if ``x`` contains an item not present in ``y``.""" 

1086 if isinstance(x, str) or not isinstance(x, Iterable): 

1087 self.die(f"{x_name} must be an Iterable") 

1088 if isinstance(y, str) or not isinstance(y, Iterable): 

1089 self.die(f"{y_name} must be an Iterable") 

1090 if not set(x).issubset(set(y)): 

1091 diff = sorted(set(x).difference(set(y))) 

1092 self.die(f"{x_name} must be a subset of {y_name}. diff: {diff}, {x_name}: {sorted(x)}, {y_name}: {sorted(y)}") 

1093 

1094 def validate_host_name(self, hostname: str, context: str) -> None: 

1095 """Checks host name string.""" 

1096 self.validate_non_empty_string(hostname, f"{context} hostname") 

1097 bzfs.validate_host_name(hostname, context) 

1098 

1099 def validate_non_empty_string(self, value: str, name: str) -> None: 

1100 """Checks that ``value`` is a non-empty string.""" 

1101 self.validate_type(value, str, name) 

1102 if not value: 

1103 self.die(f"{name} must not be empty!") 

1104 

1105 def validate_non_negative_int(self, value: int, name: str) -> None: 

1106 """Checks ``value`` is an int >= 0.""" 

1107 self.validate_type(value, int, name) 

1108 if value < 0: 

1109 self.die(f"{name} must be a non-negative integer: {value}") 

1110 

1111 def validate_true(self, expr: Any, msg: str) -> None: 

1112 """Raises error if ``expr`` evaluates to ``False``.""" 

1113 if not bool(expr): 

1114 self.die(msg) 

1115 

1116 def validate_type(self, value: Any, expected_type: Any, name: str) -> None: 

1117 """Checks ``value`` is instance of ``expected_type`` or union thereof.""" 

1118 if hasattr(expected_type, "__origin__") and expected_type.__origin__ is Union: # for compat with python < 3.10 

1119 union_types = expected_type.__args__ 

1120 for t in union_types: 

1121 if isinstance(value, t): 

1122 return 

1123 type_msg = " or ".join([t.__name__ for t in union_types]) 

1124 self.die(f"{name} must be of type {type_msg} but got {type(value).__name__}: {value}") 

1125 elif not isinstance(value, expected_type): 

1126 self.die(f"{name} must be of type {expected_type.__name__} but got {type(value).__name__}: {value}") 

1127 

1128 def die(self, msg: str) -> None: 

1129 """Log ``msg`` and exit the program.""" 

1130 self.log.error("%s", msg) 

1131 bzfs_main.utils.die(msg) 

1132 

1133 def get_localhost_ips(self) -> set[str]: 

1134 """Returns all network addresses of the local host, i.e. all configured addresses on all network interfaces, without 

1135 depending on name resolution.""" 

1136 if sys.platform == "linux": 

1137 try: 

1138 proc = subprocess.run(["hostname", "-I"], stdin=DEVNULL, stdout=PIPE, text=True, check=True) # noqa: S607 

1139 except Exception as e: 

1140 self.log.warning("Cannot run 'hostname -I' on localhost: %s", e) 

1141 else: 

1142 return {ip for ip in proc.stdout.strip().split() if ip} 

1143 return set() 

1144 

1145 

1146############################################################################# 

1147class Stats: 

1148 """Thread-safe counters summarizing subjob progress.""" 

1149 

1150 def __init__(self) -> None: 

1151 self.lock: threading.Lock = threading.Lock() 

1152 self.jobs_all: int = 0 

1153 self.jobs_started: int = 0 

1154 self.jobs_completed: int = 0 

1155 self.jobs_failed: int = 0 

1156 self.jobs_running: int = 0 

1157 self.sum_elapsed_nanos: int = 0 

1158 self.started_job_names: set[str] = set() 

1159 

1160 def __repr__(self) -> str: 

1161 

1162 def pct(number: int) -> str: 

1163 """Returns percentage string relative to total jobs.""" 

1164 return percent(number, total=self.jobs_all) 

1165 

1166 al, started, completed, failed = self.jobs_all, self.jobs_started, self.jobs_completed, self.jobs_failed 

1167 running = self.jobs_running 

1168 t = "avg_completion_time:" + human_readable_duration(self.sum_elapsed_nanos / max(1, completed)) 

1169 return f"all:{al}, started:{pct(started)}, completed:{pct(completed)}, failed:{pct(failed)}, running:{running}, {t}" 

1170 

1171 

1172############################################################################# 

1173class RejectArgumentAction(argparse.Action): 

1174 """An argparse Action that immediately fails if it is ever triggered.""" 

1175 

1176 def __call__( 

1177 self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Any, option_string: str | None = None 

1178 ) -> None: 

1179 """Abort argument parsing if a protected option is seen.""" 

1180 parser.error(f"Security: Overriding protected argument '{option_string}' is not allowed.") 

1181 

1182 

1183############################################################################# 

1184def _dedupe(root_dataset_pairs: list[tuple[str, str]]) -> list[tuple[str, str]]: 

1185 """Returns a list with duplicate dataset pairs removed while preserving order.""" 

1186 return list(dict.fromkeys(root_dataset_pairs).keys()) 

1187 

1188 

1189T = TypeVar("T") 

1190 

1191 

1192def _flatten(root_dataset_pairs: Iterable[Iterable[T]]) -> list[T]: 

1193 """Flattens an iterable of pairs into a single list.""" 

1194 return [item for pair in root_dataset_pairs for item in pair] 

1195 

1196 

1197def _sanitize(filename: str) -> str: 

1198 """Replaces potentially problematic characters in ``filename`` with '!'.""" 

1199 for s in (" ", "..", "/", "\\", SEP): 

1200 filename = filename.replace(s, "!") 

1201 return filename 

1202 

1203 

1204def _log_suffix(localhostname: str, src_hostname: str, dst_hostname: str) -> str: 

1205 """Returns a log file suffix in a format that contains the given hostnames.""" 

1206 sanitized_dst_hostname = _sanitize(dst_hostname) if dst_hostname else "" 

1207 return f"{SEP}{_sanitize(localhostname)}{SEP}{_sanitize(src_hostname)}{SEP}{sanitized_dst_hostname}" 

1208 

1209 

1210def _pretty_print_formatter(dictionary: dict[str, Any]) -> Any: 

1211 """Lazy JSON formatter used to avoid overhead in disabled log levels.""" 

1212 

1213 class PrettyPrintFormatter: 

1214 """Wrapper returning formatted JSON on ``str`` conversion.""" 

1215 

1216 def __str__(self) -> str: 

1217 import json 

1218 

1219 return json.dumps(dictionary, indent=4, sort_keys=True) 

1220 

1221 return PrettyPrintFormatter() 

1222 

1223 

1224def _detect_loopback_address() -> str: 

1225 """Detects if a loopback connection over IPv4 or IPv6 is possible.""" 

1226 try: 

1227 addr = "127.0.0.1" 

1228 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 

1229 s.bind((addr, 0)) 

1230 return addr 

1231 except OSError: 

1232 pass 

1233 

1234 try: 

1235 addr = "::1" 

1236 with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: 

1237 s.bind((addr, 0)) 

1238 return addr 

1239 except OSError: 

1240 pass 

1241 

1242 return "" 

1243 

1244 

1245def convert_ipv6(hostname: str) -> str: 

1246 """Supports IPv6 without getting confused by host:dataset colon separator and any colons that may be part of a (valid) 

1247 ZFS dataset name.""" 

1248 return hostname.replace(":", "|") # Also see bzfs.convert_ipv6() for the reverse conversion 

1249 

1250 

1251############################################################################# 

1252if __name__ == "__main__": 1252 ↛ 1253line 1252 didn't jump to line 1253 because the condition on line 1252 was never true

1253 main()