Coverage for bzfs_main/bzfs_jobrunner.py: 99%

588 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-20 13:09 +0000

1# 

2# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15 

16# Inline script metadata conforming to https://packaging.python.org/specifications/inline-script-metadata 

17# /// script 

18# requires-python = ">=3.8" 

19# dependencies = [] 

20# /// 

21 

22""" 

23* README_bzfs_jobrunner.md is mostly auto-generated from the ArgumentParser help texts as the source of "truth", via 

24update_readme.sh. Simply run that script whenever you change or add ArgumentParser help text. 

25""" 

26 

27import argparse 

28import contextlib 

29import os 

30import pwd 

31import random 

32import socket 

33import subprocess 

34import sys 

35import threading 

36import time 

37import uuid 

38from ast import literal_eval 

39from logging import Logger 

40from subprocess import DEVNULL, PIPE 

41from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, TypeVar, Union 

42 

43from bzfs_main import bzfs 

44from bzfs_main.bzfs import die_status, log_trace, prog_name as bzfs_prog_name 

45 

46# constants: 

47prog_name = "bzfs_jobrunner" 

48src_magic_substitution_token = "^SRC_HOST" 

49dst_magic_substitution_token = "^DST_HOST" 

50sep = "," 

51 

52 

53def argument_parser() -> argparse.ArgumentParser: 

54 # fmt: off 

55 parser = argparse.ArgumentParser( 

56 prog=prog_name, 

57 allow_abbrev=False, 

58 formatter_class=argparse.RawTextHelpFormatter, 

59 description=f""" 

60This program is a convenience wrapper around [bzfs](README.md) that simplifies periodic ZFS snapshot creation, replication, 

61pruning, and monitoring, across N source hosts and M destination hosts, using a single shared 

62[jobconfig](bzfs_tests/bzfs_job_example.py) script. 

63For example, this simplifies the deployment of an efficient geo-replicated backup service where each of the M destination 

64hosts is located in a separate geographic region and receives replicas from (the same set of) N source hosts. It also 

65simplifies low latency replication from a primary to a secondary or to M read replicas, or backup to removable drives, etc. 

66 

67This program can be used to efficiently replicate ... 

68 

69a) within a single machine (local mode), or 

70 

71b) from a single source host to one or more destination hosts (pull or push or pull-push mode), or 

72 

73c) from multiple source hosts to a single destination host (pull or push or pull-push mode), or 

74 

75d) from N source hosts to M destination hosts (pull or push or pull-push mode, N and M can be large, M=2 or M=3 are typical 

76geo-replication factors) 

77 

78You can run this program on a single third-party host and have that talk to all source hosts and destination hosts, which is 

79convenient for basic use cases and for testing. 

80However, typically, a cron job on each source host runs `{prog_name}` periodically to create new snapshots (via 

81--create-src-snapshots) and prune outdated snapshots and bookmarks on the source (via --prune-src-snapshots and 

82--prune-src-bookmarks), whereas another cron job on each destination host runs `{prog_name}` periodically to prune 

83outdated destination snapshots (via --prune-dst-snapshots), and to replicate the recently created snapshots from the source 

84to the destination (via --replicate). 

85Yet another cron job on each source and each destination runs `{prog_name}` periodically to alert the user if the latest or 

86oldest snapshot is somehow too old (via --monitor-src-snapshots and --monitor-dst-snapshots). 

87The frequency of these periodic activities can vary by activity, and is typically every second, minute, hour, day, week, 

88month and/or year (or multiples thereof). 

89 

90Edit the jobconfig script in a central place (e.g. versioned in a git repo), then copy the (very same) shared file onto all 

91source hosts and all destination hosts, and add crontab entries (or systemd timers or Monit entries or similar), along these 

92lines: 

93 

94* crontab on source hosts: 

95 

96`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --create-src-snapshots --prune-src-snapshots --prune-src-bookmarks` 

97 

98`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --monitor-src-snapshots` 

99 

100 

101* crontab on destination hosts: 

102 

103`* * * * * testuser /etc/bzfs/bzfs_job_example.py --dst-host="$(hostname)" --replicate --prune-dst-snapshots` 

104 

105`* * * * * testuser /etc/bzfs/bzfs_job_example.py --dst-host="$(hostname)" --monitor-dst-snapshots` 

106 

107### High Frequency Replication (Experimental Feature) 

108 

109Taking snapshots, and/or replicating, from every N milliseconds to every 10 seconds or so is considered high frequency. For 

110such use cases, consider that `zfs list -t snapshot` performance degrades as more and more snapshots currently exist within 

111the selected datasets, so try to keep the number of currently existing snapshots small, and prune them at a frequency that 

112is proportional to the frequency with which snapshots are created. Consider using `--skip-parent` and `--exclude-dataset*` 

113filters to limit the selected datasets only to those that require this level of frequency. 

114 

115In addition, use the `--daemon-*` options to reduce startup overhead, in combination with splitting the crontab entry (or 

116better: high frequency systemd timer) into multiple processes, from a single source host to a single destination host, 

117along these lines: 

118 

119* crontab on source hosts: 

120 

121`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --create-src-snapshots` 

122 

123`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --prune-src-snapshots` 

124 

125`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --prune-src-bookmarks` 

126 

127`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --monitor-src-snapshots` 

128 

129 

130* crontab on destination hosts: 

131 

132`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="bar" --dst-host="$(hostname)" --replicate` 

133 

134`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="bar" --dst-host="$(hostname)" --prune-dst-snapshots` 

135 

136`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="bar" --dst-host="$(hostname)" --monitor-dst-snapshots` 

137 

138The daemon processes work like non-daemon processes except that they loop, handle time events and sleep between events, and 

139finally exit after, say, 86400 seconds (whatever you specify via `--daemon-lifetime`). The daemons will subsequently be 

140auto-restarted by 'cron', or earlier if they fail. While the daemons are running, 'cron' will attempt to start new 

141(unnecessary) daemons but this is benign as these new processes immediately exit with a message like this: 

142"Exiting as same previous periodic job is still running without completion yet" 

143""") # noqa: E501 

144 

145 # commands: 

146 parser.add_argument( 

147 "--create-src-snapshots", action="store_true", 

148 help="Take snapshots on the selected source hosts as necessary. Typically, this command should be called by a " 

149 "program (or cron job) running on each src host.\n\n") 

150 parser.add_argument( # `choices` are deprecated; use --replicate without argument instead 

151 "--replicate", choices=["pull", "push"], default=None, const="pull", nargs="?", metavar="", 

152 help="Replicate snapshots from the selected source hosts to the selected destinations hosts as necessary. For pull " 

153 "mode (recommended), this command should be called by a program (or cron job) running on each dst " 

154 "host; for push mode, on the src host; for pull-push mode on a third-party host.\n\n") 

155 parser.add_argument( 

156 "--prune-src-snapshots", action="store_true", 

157 help="Prune snapshots on the selected source hosts as necessary. Typically, this command should be called by a " 

158 "program (or cron job) running on each src host.\n\n") 

159 parser.add_argument( 

160 "--prune-src-bookmarks", action="store_true", 

161 help="Prune bookmarks on the selected source hosts as necessary. Typically, this command should be called by a " 

162 "program (or cron job) running on each src host.\n\n") 

163 parser.add_argument( 

164 "--prune-dst-snapshots", action="store_true", 

165 help="Prune snapshots on the selected destination hosts as necessary. Typically, this command should be called by a " 

166 "program (or cron job) running on each dst host.\n\n") 

167 parser.add_argument( 

168 "--monitor-src-snapshots", action="store_true", 

169 help="Alert the user if snapshots on the selected source hosts are too old, using --monitor-snapshot-plan (see " 

170 "below). Typically, this command should be called by a program (or cron job) running on each src host.\n\n") 

171 parser.add_argument( 

172 "--monitor-dst-snapshots", action="store_true", 

173 help="Alert the user if snapshots on the selected destination hosts are too old, using --monitor-snapshot-plan (see " 

174 "below). Typically, this command should be called by a program (or cron job) running on each dst host.\n\n") 

175 

176 # options: 

177 parser.add_argument( 

178 "--localhost", default=None, action=bzfs.NonEmptyStringAction, metavar="STRING", 

179 help="Hostname of localhost. Default is the hostname without the domain name, querying the Operating System.\n\n") 

180 parser.add_argument( 

181 "--src-hosts", default=None, metavar="LIST_STRING", 

182 help="Hostnames of the sources to operate on.\n\n") 

183 parser.add_argument( 

184 "--src-host", default=None, action="append", metavar="STRING", 

185 help="For subsetting --src-hosts; Can be specified multiple times; Indicates to only use the --src-hosts that are " 

186 "contained in the specified --src-host values (optional).\n\n") 

187 dst_hosts_example = {"nas": ["onsite"], "bak-us-west-1": ["us-west-1"], 

188 "bak-eu-west-1": ["eu-west-1"], "archive": ["offsite"]} 

189 parser.add_argument( 

190 "--dst-hosts", default="{}", metavar="DICT_STRING", 

191 help="Dictionary that maps each destination hostname to a list of zero or more logical replication target names " 

192 "(the infix portion of snapshot name). " 

193 f"Example: `{format_dict(dst_hosts_example)}`.\n\n" 

194 "With this, given a snapshot name, we can find the destination hostnames to which the snapshot shall be " 

195 "replicated. Also, given a snapshot name and its own name, a destination host can determine if it shall " 

196 "replicate the given snapshot from the source host, or if the snapshot is intended for another destination " 

197 "host, in which case it skips the snapshot. A destination host will receive replicas of snapshots for all " 

198 "targets that map to that destination host.\n\n" 

199 "Removing a mapping can be used to temporarily suspend replication to a given destination host.\n\n") 

200 parser.add_argument( 

201 "--dst-host", default=None, action="append", metavar="STRING", 

202 help="For subsetting --dst-hosts; Can be specified multiple times; Indicates to only use the --dst-hosts keys that " 

203 "are contained in the specified --dst-host values (optional).\n\n") 

204 parser.add_argument( 

205 "--retain-dst-targets", default="{}", metavar="DICT_STRING", 

206 help="Dictionary that maps each destination hostname to a list of zero or more logical replication target names " 

207 "(the infix portion of snapshot name). " 

208 f"Example: `{format_dict(dst_hosts_example)}`. Has same format as --dst-hosts.\n\n" 

209 "As part of --prune-dst-snapshots, a destination host will delete any snapshot it has stored whose target has " 

210 "no mapping to that destination host in this dictionary. Do not remove a mapping here unless you are sure it's " 

211 "ok to delete all those snapshots on that destination host! If in doubt, use --dryrun mode first.\n\n") 

212 dst_root_datasets_example = { 

213 "nas": "tank2/bak", 

214 "bak-us-west-1": "backups/bak001", 

215 "bak-eu-west-1": "backups/bak999", 

216 "archive": f"archives/zoo/{src_magic_substitution_token}", 

217 "hotspare": "", 

218 } 

219 parser.add_argument( 

220 "--dst-root-datasets", default="{}", metavar="DICT_STRING", 

221 help="Dictionary that maps each destination hostname to a root dataset located on that destination host. The root " 

222 "dataset name is an (optional) prefix that will be prepended to each dataset that is replicated to that " 

223 "destination host. For backup use cases, this is the backup ZFS pool or a ZFS dataset path within that pool, " 

224 "whereas for cloning, master slave replication, or replication from a primary to a secondary, this can also be " 

225 "the empty string.\n\n" 

226 f"`{src_magic_substitution_token}` and `{dst_magic_substitution_token}` are optional magic substitution tokens " 

227 "that will be auto-replaced at runtime with the actual hostname. This can be used to force the use of a " 

228 "separate destination root dataset per source host or per destination host.\n\n" 

229 f"Example: `{format_dict(dst_root_datasets_example)}`\n\n") 

230 src_snapshot_plan_example = { 

231 "prod": { 

232 "onsite": {"secondly": 40, "minutely": 40, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5}, 

233 "us-west-1": {"secondly": 0, "minutely": 0, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5}, 

234 "eu-west-1": {"secondly": 0, "minutely": 0, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5}, 

235 }, 

236 "test": { 

237 "offsite": {"12hourly": 42, "weekly": 12}, 

238 }, 

239 } 

240 parser.add_argument( 

241 "--src-snapshot-plan", default="{}", metavar="DICT_STRING", 

242 help="Retention periods for snapshots to be used if pruning src, and when creating new snapshots on src. " 

243 "Snapshots that do not match a retention period will be deleted. A zero within a retention period indicates " 

244 "that no snapshots shall be retained (or even be created) for the given period.\n\n" 

245 f"Example: `{format_dict(src_snapshot_plan_example)}`. This example will, for the organization 'prod' and " 

246 "the intended logical target 'onsite', create and then retain secondly snapshots that were created less " 

247 "than 40 seconds ago, yet retain the latest 40 secondly snapshots regardless of creation time. Analog for " 

248 "the latest 40 minutely snapshots, 36 hourly snapshots, etc. " 

249 "It will also create and retain snapshots for the targets 'us-west-1' and 'eu-west-1' within the 'prod' " 

250 "organization. " 

251 "In addition, it will create and retain snapshots every 12 hours and every week for the 'test' organization, " 

252 "and name them as being intended for the 'offsite' replication target. " 

253 "The example creates snapshots with names like " 

254 "`prod_onsite_<timestamp>_secondly`, `prod_onsite_<timestamp>_minutely`, " 

255 "`prod_us-west-1_<timestamp>_hourly`, `prod_us-west-1_<timestamp>_daily`, " 

256 "`prod_eu-west-1_<timestamp>_hourly`, `prod_eu-west-1_<timestamp>_daily`, " 

257 "`test_offsite_<timestamp>_12hourly`, `test_offsite_<timestamp>_weekly`, and so on.\n\n") 

258 parser.add_argument( 

259 "--src-bookmark-plan", default="{}", metavar="DICT_STRING", 

260 help="Retention periods for bookmarks to be used if pruning src. Has same format as --src-snapshot-plan.\n\n") 

261 parser.add_argument( 

262 "--dst-snapshot-plan", default="{}", metavar="DICT_STRING", 

263 help="Retention periods for snapshots to be used if pruning dst. Has same format as --src-snapshot-plan.\n\n") 

264 monitor_snapshot_plan_example = { 

265 "prod": { 

266 "onsite": { 

267 "100millisecondly": {"warning": "650 milliseconds", "critical": "2 seconds"}, 

268 "secondly": {"warning": "2 seconds", "critical": "14 seconds"}, 

269 "minutely": {"warning": "30 seconds", "critical": "300 seconds"}, 

270 "hourly": {"warning": "30 minutes", "critical": "300 minutes"}, 

271 "daily": {"warning": "4 hours", "critical": "8 hours"}, 

272 "weekly": {"warning": "2 days", "critical": "8 days"}, 

273 "monthly": {"warning": "2 days", "critical": "8 days"}, 

274 "yearly": {"warning": "5 days", "critical": "14 days"}, 

275 "10minutely": {"warning": "0 minutes", "critical": "0 minutes"}, 

276 }, 

277 "": { 

278 "daily": {"warning": "4 hours", "critical": "8 hours"}, 

279 }, 

280 }, 

281 } 

282 parser.add_argument( 

283 "--monitor-snapshot-plan", default="{}", metavar="DICT_STRING", 

284 help="Alert the user if the ZFS 'creation' time property of the latest or oldest snapshot for any specified " 

285 "snapshot pattern within the selected datasets is too old wrt. the specified age limit. The purpose is to " 

286 "check if snapshots are successfully taken on schedule, successfully replicated on schedule, and successfully " 

287 "pruned on schedule. " 

288 "Process exit code is 0, 1, 2 on OK, WARNING, CRITICAL, respectively. " 

289 f"Example DICT_STRING: `{format_dict(monitor_snapshot_plan_example)}`. " 

290 "This example alerts the user if the latest src or dst snapshot named `prod_onsite_<timestamp>_hourly` is more " 

291 "than 30 minutes late (i.e. more than 30+60=90 minutes old) [warning] or more than 300 minutes late (i.e. more " 

292 "than 300+60=360 minutes old) [critical]. In addition, the example alerts the user if the oldest src or dst " 

293 "snapshot named `prod_onsite_<timestamp>_hourly` is more than 30 + 60x36 minutes old [warning] or more than " 

294 "300 + 60x36 minutes old [critical], where 36 is the number of period cycles specified in `src_snapshot_plan` " 

295 "or `dst_snapshot_plan`, respectively. " 

296 "Analog for the latest snapshot named `prod_<timestamp>_daily`, and so on.\n\n" 

297 "Note: A duration that is missing or zero (e.g. '0 minutes') indicates that no snapshots shall be checked for " 

298 "the given snapshot name pattern.\n\n") 

299 parser.add_argument( 

300 "--src-user", default="", metavar="STRING", 

301 help="SSH username on src hosts. Used in pull mode and pull-push mode. Example: 'alice'\n\n") 

302 parser.add_argument( 

303 "--dst-user", default="", metavar="STRING", 

304 help="SSH username on dst hosts. Used in push mode and pull-push mode. Example: 'root'\n\n") 

305 parser.add_argument( 

306 "--job-id", required=True, action=bzfs.NonEmptyStringAction, metavar="STRING", 

307 help="The identifier that remains constant across all runs of this particular job; will be included in the log file " 

308 "name infix. Example: mytestjob\n\n") 

309 parser.add_argument( 

310 "--jobid", default=None, action=bzfs.NonEmptyStringAction, metavar="STRING", 

311 help=argparse.SUPPRESS) # deprecated; was renamed to --job-run 

312 parser.add_argument( 

313 "--job-run", default=None, action=bzfs.NonEmptyStringAction, metavar="STRING", 

314 help="The identifier of this particular run of the overall job; will be included in the log file name suffix. " 

315 "Default is a hex UUID. Example: 0badc0f003a011f0a94aef02ac16083c\n\n") 

316 workers_default = 100 # percent 

317 parser.add_argument( 

318 "--workers", min=1, default=(workers_default, True), action=bzfs.CheckPercentRange, metavar="INT[%]", 

319 help="The maximum number of jobs to run in parallel at any time; can be given as a positive integer, " 

320 f"optionally followed by the %% percent character (min: %(min)s, default: {workers_default}%%). Percentages " 

321 "are relative to the number of CPU cores on the machine. Example: 200%% uses twice as many parallel jobs as " 

322 "there are cores on the machine; 75%% uses num_procs = num_cores * 0.75. Examples: 1, 4, 75%%, 150%%\n\n") 

323 parser.add_argument( 

324 "--work-period-seconds", type=float, min=0, default=0, action=bzfs.CheckRange, metavar="FLOAT", 

325 help="Reduces bandwidth spikes by spreading out the start of worker jobs over this much time; " 

326 "0 disables this feature (default: %(default)s). Examples: 0, 60, 86400\n\n") 

327 parser.add_argument( 

328 "--jitter", action="store_true", 

329 help="Randomize job start time and host order to avoid potential thundering herd problems in large distributed " 

330 "systems (optional). Randomizing job start time is only relevant if --work-period-seconds > 0.\n\n") 

331 parser.add_argument( 

332 "--worker-timeout-seconds", type=float, min=0, default=None, action=bzfs.CheckRange, metavar="FLOAT", 

333 help="If this much time has passed after a worker process has started executing, kill the straggling worker " 

334 "(optional). Other workers remain unaffected. Examples: 60, 3600\n\n") 

335 parser.add_argument( 

336 "--spawn_process_per_job", action="store_true", 

337 help=argparse.SUPPRESS) 

338 parser.add_argument( 

339 "--jobrunner-dryrun", action="store_true", 

340 help="Do a dry run (aka 'no-op') to print what operations would happen if the command were to be executed " 

341 "for real (optional). This option treats both the ZFS source and destination as read-only. Can also be used to " 

342 "check if the configuration options are valid.\n\n") 

343 parser.add_argument( 

344 "--jobrunner-log-level", choices=["CRITICAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"], default="INFO", 

345 help="Only emit jobrunner messages with equal or higher priority than this log level. Default is '%(default)s'.\n\n") 

346 parser.add_argument( 

347 "--version", action="version", version=f"{prog_name}-{bzfs.__version__}, by {bzfs.prog_author}", 

348 help="Display version information and exit.\n\n") 

349 parser.add_argument( 

350 "--help, -h", action="help", 

351 help="Show this help message and exit.\n\n") 

352 parser.add_argument( 

353 "--daemon-replication-frequency", default="minutely", metavar="STRING", 

354 help="Specifies how often the bzfs daemon shall replicate from src to dst if --daemon-lifetime is nonzero.\n\n") 

355 parser.add_argument( 

356 "--daemon-prune-src-frequency", default="minutely", metavar="STRING", 

357 help="Specifies how often the bzfs daemon shall prune src if --daemon-lifetime is nonzero.\n\n") 

358 parser.add_argument( 

359 "--daemon-prune-dst-frequency", default="minutely", metavar="STRING", 

360 help="Specifies how often the bzfs daemon shall prune dst if --daemon-lifetime is nonzero.\n\n") 

361 parser.add_argument( 

362 "--daemon-monitor-snapshots-frequency", default="minutely", metavar="STRING", 

363 help="Specifies how often the bzfs daemon shall monitor snapshot age if --daemon-lifetime is nonzero.\n\n") 

364 parser.add_argument( 

365 "--root-dataset-pairs", required=True, nargs="+", action=bzfs.DatasetPairsAction, metavar="SRC_DATASET DST_DATASET", 

366 help="Source and destination dataset pairs (excluding usernames and excluding hostnames, which will all be " 

367 "auto-appended later).\n\n") 

368 return parser 

369 # fmt: on 

370 

371 

372############################################################################# 

373def main() -> None: 

374 Job().run_main(sys.argv) 

375 

376 

377############################################################################# 

378class Job: 

379 def __init__(self, log: Optional[Logger] = None) -> None: 

380 # immutable variables: 

381 self.jobrunner_dryrun: bool = False 

382 self.spawn_process_per_job: bool = False 

383 self.log: Logger = log if log is not None else bzfs.get_simple_logger(prog_name) 

384 self.bzfs_argument_parser: argparse.ArgumentParser = bzfs.argument_parser() 

385 self.argument_parser: argparse.ArgumentParser = argument_parser() 

386 self.loopback_address: str = convert_ipv6(detect_loopback_address()) 

387 

388 # mutable variables: 

389 self.first_exception: Optional[int] = None 

390 self.stats: Stats = Stats() 

391 self.cache_existing_dst_pools: Set[str] = set() 

392 self.cache_known_dst_pools: Set[str] = set() 

393 

394 self.is_test_mode: bool = False # for testing only 

395 

396 def run_main(self, sys_argv: List[str]) -> None: 

397 self.first_exception = None 

398 log = self.log 

399 log.info("CLI arguments: %s", " ".join(sys_argv)) 

400 args, unknown_args = self.argument_parser.parse_known_args(sys_argv[1:]) # forward all unknown args to `bzfs` 

401 log.setLevel(args.jobrunner_log_level) 

402 self.jobrunner_dryrun = args.jobrunner_dryrun 

403 assert len(args.root_dataset_pairs) > 0 

404 src_snapshot_plan = self.validate_snapshot_plan(literal_eval(args.src_snapshot_plan), "--src-snapshot-plan") 

405 src_bookmark_plan = self.validate_snapshot_plan(literal_eval(args.src_bookmark_plan), "--src-bookmark-plan") 

406 dst_snapshot_plan = self.validate_snapshot_plan(literal_eval(args.dst_snapshot_plan), "--dst-snapshot-plan") 

407 monitor_snapshot_plan = self.validate_monitor_snapshot_plan(literal_eval(args.monitor_snapshot_plan)) 

408 localhostname = args.localhost if args.localhost else socket.gethostname() 

409 self.validate_non_empty_string(localhostname, "--localhost") 

410 log.debug("localhostname: %s", localhostname) 

411 src_hosts = self.validate_src_hosts(literal_eval(args.src_hosts if args.src_hosts is not None else sys.stdin.read())) 

412 basis_src_hosts = src_hosts 

413 nb_src_hosts = len(basis_src_hosts) 

414 log.debug("src_hosts before subsetting: %s", src_hosts) 

415 if args.src_host is not None: # retain only the src hosts that are also contained in args.src_host 

416 assert isinstance(args.src_host, list) 

417 retain_src_hosts = set(args.src_host) 

418 self.validate_is_subset(retain_src_hosts, src_hosts, "--src-host", "--src-hosts") 

419 src_hosts = [host for host in src_hosts if host in retain_src_hosts] 

420 dst_hosts = self.validate_dst_hosts(literal_eval(args.dst_hosts)) 

421 nb_dst_hosts = len(dst_hosts) 

422 if args.dst_host is not None: # retain only the dst hosts that are also contained in args.dst_host 

423 assert isinstance(args.dst_host, list) 

424 retain_dst_hosts = set(args.dst_host) 

425 self.validate_is_subset(retain_dst_hosts, dst_hosts.keys(), "--dst-host", "--dst-hosts.keys") 

426 dst_hosts = {dst_host: lst for dst_host, lst in dst_hosts.items() if dst_host in retain_dst_hosts} 

427 retain_dst_targets = self.validate_dst_hosts(literal_eval(args.retain_dst_targets)) 

428 self.validate_is_subset(dst_hosts.keys(), retain_dst_targets.keys(), "--dst-hosts.keys", "--retain-dst-targets.keys") 

429 dst_root_datasets = self.validate_dst_root_datasets(literal_eval(args.dst_root_datasets)) 

430 self.validate_is_subset( 

431 dst_root_datasets.keys(), retain_dst_targets.keys(), "--dst-root-dataset.keys", "--retain-dst-targets.keys" 

432 ) 

433 self.validate_is_subset(dst_hosts.keys(), dst_root_datasets.keys(), "--dst-hosts.keys", "--dst-root-dataset.keys") 

434 bad_root_datasets = { 

435 dst_host: root_dataset 

436 for dst_host in sorted(dst_hosts.keys()) 

437 if src_magic_substitution_token not in (root_dataset := dst_root_datasets[dst_host]) 

438 } 

439 if len(src_hosts) > 1 and len(bad_root_datasets) > 0: 

440 self.die( 

441 "Cowardly refusing to proceed as multiple source hosts must not be configured to write to the same " 

442 "destination dataset. " 

443 f"Problematic subset of --dst-root-datasets: {bad_root_datasets} for src_hosts: {sorted(src_hosts)}" 

444 ) 

445 bad_root_datasets = { 

446 dst_host: root_dataset 

447 for dst_host, root_dataset in sorted(dst_root_datasets.items()) 

448 if root_dataset and src_magic_substitution_token not in root_dataset 

449 } 

450 if len(basis_src_hosts) > 1 and len(bad_root_datasets) > 0: 

451 self.die( 

452 "Cowardly refusing to proceed as multiple source hosts are defined in the configuration, but " 

453 f"not all non-empty root datasets in --dst-root-datasets contain the '{src_magic_substitution_token}' " 

454 "substitution token to prevent collisions on writing destination datasets. " 

455 f"Problematic subset of --dst-root-datasets: {bad_root_datasets} for src_hosts: {sorted(basis_src_hosts)}" 

456 ) 

457 if args.jitter: # randomize host order to avoid potential thundering herd problems in large distributed systems 

458 random.shuffle(src_hosts) 

459 dst_hosts = shuffle_dict(dst_hosts) 

460 job_id = sanitize(args.job_id) 

461 job_run = args.job_run if args.job_run is not None else args.jobid # --jobid is deprecated; was renamed to --job-run 

462 job_run = sanitize(job_run) if job_run else uuid.uuid1().hex 

463 workers, workers_is_percent = args.workers 

464 max_workers = max(1, round(os.cpu_count() * workers / 100.0) if workers_is_percent else round(workers)) 

465 worker_timeout_seconds = args.worker_timeout_seconds 

466 self.spawn_process_per_job = args.spawn_process_per_job 

467 username: str = pwd.getpwuid(os.geteuid()).pw_name 

468 assert username 

469 dummy: str = bzfs.dummy_dataset 

470 

471 def zero_pad(number: int, width: int = 6) -> str: 

472 return f"{number:0{width}d}" # pad number with leading '0' chars to the given width 

473 

474 def jpad(jj: int, tag: str) -> str: 

475 return "/" + zero_pad(jj) + tag 

476 

477 def npad() -> str: 

478 return sep + zero_pad(len(subjobs)) 

479 

480 def update_subjob_name(tag: str) -> str: 

481 if j <= 0: 

482 return subjob_name 

483 elif j == 1: 

484 return subjob_name + jpad(j - 1, tag) 

485 else: 

486 return subjob_name + "/" + bzfs.BARRIER_CHAR 

487 

488 def resolve_dataset(hostname: str, dataset: str, is_src: bool = True) -> str: 

489 ssh_user = args.src_user if is_src else args.dst_user 

490 ssh_user = ssh_user if ssh_user else username 

491 lb = self.loopback_address 

492 hostname = hostname if hostname != localhostname else (lb if lb else hostname) if username != ssh_user else "-" 

493 return f"{hostname}:{dataset}" 

494 

495 def resolve_dst_dataset(dst_hostname: str, dst_dataset: str) -> str: 

496 root_dataset = dst_root_datasets.get(dst_hostname) 

497 assert root_dataset is not None, dst_hostname # f"Hostname '{dst_hostname}' missing in --dst-root-datasets" 

498 root_dataset = root_dataset.replace(src_magic_substitution_token, src_host) 

499 root_dataset = root_dataset.replace(dst_magic_substitution_token, dst_hostname) 

500 dst_dataset = root_dataset + "/" + dst_dataset if root_dataset else dst_dataset 

501 return resolve_dataset(dst_hostname, dst_dataset, is_src=False) 

502 

503 lhn = localhostname 

504 subjobs: Dict[str, List[str]] = {} 

505 for i, src_host in enumerate(src_hosts): 

506 subjob_name: str = zero_pad(i) + "src-host" 

507 

508 if args.create_src_snapshots: 

509 opts = ["--create-src-snapshots", f"--create-src-snapshots-plan={src_snapshot_plan}", "--skip-replication"] 

510 opts += [f"--log-file-prefix={prog_name}{sep}create-src-snapshots{sep}"] 

511 opts += [f"--log-file-infix={sep}{job_id}"] 

512 opts += [f"--log-file-suffix={sep}{job_run}{npad()}{log_suffix(lhn, src_host, '')}{sep}"] 

513 opts += [f"--ssh-src-user={args.src_user}"] if args.src_user else [] 

514 opts += unknown_args + ["--"] 

515 opts += flatten(dedupe([(resolve_dataset(src_host, src), dummy) for src, dst in args.root_dataset_pairs])) 

516 subjob_name += "/create-src-snapshots" 

517 subjobs[subjob_name] = [bzfs_prog_name] + opts 

518 

519 if args.replicate: 

520 j = 0 

521 marker = "replicate" 

522 for dst_hostname, targets in dst_hosts.items(): 

523 opts = self.replication_opts( 

524 dst_snapshot_plan, set(targets), lhn, src_host, dst_hostname, marker, job_id, job_run + npad() 

525 ) 

526 if len(opts) > 0: 

527 opts += [f"--daemon-frequency={args.daemon_replication_frequency}"] 

528 opts += [f"--ssh-src-user={args.src_user}"] if args.src_user else [] 

529 opts += [f"--ssh-dst-user={args.dst_user}"] if args.dst_user else [] 

530 opts += unknown_args + ["--"] 

531 dataset_pairs = [ 

532 (resolve_dataset(src_host, src), resolve_dst_dataset(dst_hostname, dst)) 

533 for src, dst in args.root_dataset_pairs 

534 ] 

535 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs) 

536 if len(dataset_pairs) > 0: 

537 subjobs[subjob_name + jpad(j, marker)] = [bzfs_prog_name] + opts + flatten(dataset_pairs) 

538 j += 1 

539 subjob_name = update_subjob_name(marker) 

540 

541 def prune_src(opts: List[str], retention_plan: Dict, tag: str) -> None: 

542 opts += [ 

543 "--skip-replication", 

544 f"--delete-dst-snapshots-except-plan={retention_plan}", 

545 f"--log-file-prefix={prog_name}{sep}{tag}{sep}", 

546 f"--log-file-infix={sep}{job_id}", 

547 f"--log-file-suffix={sep}{job_run}{npad()}{log_suffix(lhn, src_host, '')}{sep}", # noqa: B023 

548 f"--daemon-frequency={args.daemon_prune_src_frequency}", 

549 ] 

550 opts += [f"--ssh-dst-user={args.src_user}"] if args.src_user else [] 

551 opts += unknown_args + ["--"] 

552 opts += flatten( 

553 dedupe([(dummy, resolve_dataset(src_host, src)) for src, dst in args.root_dataset_pairs]) # noqa: B023 

554 ) 

555 nonlocal subjob_name 

556 subjob_name += f"/{tag}" 

557 subjobs[subjob_name] = [bzfs_prog_name] + opts 

558 

559 if args.prune_src_snapshots: 

560 prune_src(["--delete-dst-snapshots"], src_snapshot_plan, "prune-src-snapshots") 

561 

562 if args.prune_src_bookmarks: 

563 prune_src(["--delete-dst-snapshots=bookmarks"], src_bookmark_plan, "prune-src-bookmarks") 

564 

565 if args.prune_dst_snapshots: 

566 self.validate_true( 

567 retain_dst_targets, "--retain-dst-targets must not be empty. Cowardly refusing to delete all snapshots!" 

568 ) 

569 j = 0 

570 marker = "prune-dst-snapshots" 

571 for dst_hostname, _ in dst_hosts.items(): 

572 curr_retain_targets = set(retain_dst_targets[dst_hostname]) 

573 curr_dst_snapshot_plan = { # only retain targets that belong to the host 

574 org: {target: periods for target, periods in target_periods.items() if target in curr_retain_targets} 

575 for org, target_periods in dst_snapshot_plan.items() 

576 } 

577 opts = ["--delete-dst-snapshots", "--skip-replication"] 

578 opts += [f"--delete-dst-snapshots-except-plan={curr_dst_snapshot_plan}"] 

579 opts += [f"--log-file-prefix={prog_name}{sep}{marker}{sep}"] 

580 opts += [f"--log-file-infix={sep}{job_id}"] 

581 opts += [f"--log-file-suffix={sep}{job_run}{npad()}{log_suffix(lhn, src_host, dst_hostname)}{sep}"] 

582 opts += [f"--daemon-frequency={args.daemon_prune_dst_frequency}"] 

583 opts += [f"--ssh-dst-user={args.dst_user}"] if args.dst_user else [] 

584 opts += unknown_args + ["--"] 

585 dataset_pairs = [(dummy, resolve_dst_dataset(dst_hostname, dst)) for src, dst in args.root_dataset_pairs] 

586 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs) 

587 if len(dataset_pairs) > 0: 

588 subjobs[subjob_name + jpad(j, marker)] = [bzfs_prog_name] + opts + flatten(dataset_pairs) 

589 j += 1 

590 subjob_name = update_subjob_name(marker) 

591 

592 def monitor_snapshots_opts(tag: str, monitor_plan: Dict, logsuffix: str) -> List[str]: 

593 opts = [f"--monitor-snapshots={monitor_plan}", "--skip-replication"] 

594 opts += [f"--log-file-prefix={prog_name}{sep}{tag}{sep}"] 

595 opts += [f"--log-file-infix={sep}{job_id}"] 

596 opts += [f"--log-file-suffix={sep}{job_run}{npad()}{logsuffix}{sep}"] 

597 opts += [f"--daemon-frequency={args.daemon_monitor_snapshots_frequency}"] 

598 return opts 

599 

600 def build_monitor_plan(monitor_plan: Dict, snapshot_plan: Dict, cycles_prefix: str) -> Dict: 

601 

602 def alert_dicts(alertdict: Dict, cycles: int) -> Dict: 

603 latest_dict = alertdict.copy() 

604 for prefix in ("src_snapshot_", "dst_snapshot_", ""): 

605 latest_dict.pop(f"{prefix}cycles", None) 

606 oldest_dict = latest_dict.copy() 

607 oldest_dict["cycles"] = int(alertdict.get(f"{cycles_prefix}cycles", cycles)) 

608 return {"latest": latest_dict, "oldest": oldest_dict} 

609 

610 return { 

611 org: { 

612 target: { 

613 periodunit: alert_dicts(alertdict, snapshot_plan.get(org, {}).get(target, {}).get(periodunit, 1)) 

614 for periodunit, alertdict in periods.items() 

615 } 

616 for target, periods in target_periods.items() 

617 } 

618 for org, target_periods in monitor_plan.items() 

619 } 

620 

621 if args.monitor_src_snapshots: 

622 marker = "monitor-src-snapshots" 

623 monitor_plan = build_monitor_plan(monitor_snapshot_plan, src_snapshot_plan, "src_snapshot_") 

624 opts = monitor_snapshots_opts(marker, monitor_plan, log_suffix(lhn, src_host, "")) 

625 opts += [f"--ssh-dst-user={args.src_user}"] if args.src_user else [] 

626 opts += unknown_args + ["--"] 

627 opts += flatten(dedupe([(dummy, resolve_dataset(src_host, src)) for src, dst in args.root_dataset_pairs])) 

628 subjob_name += "/" + marker 

629 subjobs[subjob_name] = [bzfs_prog_name] + opts 

630 

631 if args.monitor_dst_snapshots: 

632 j = 0 

633 marker = "monitor-dst-snapshots" 

634 for dst_hostname, targets in dst_hosts.items(): 

635 monitor_targets = set(targets).intersection(set(retain_dst_targets[dst_hostname])) 

636 monitor_plan = { # only retain targets that belong to the host 

637 org: {target: periods for target, periods in target_periods.items() if target in monitor_targets} 

638 for org, target_periods in monitor_snapshot_plan.items() 

639 } 

640 monitor_plan = build_monitor_plan(monitor_plan, dst_snapshot_plan, "dst_snapshot_") 

641 opts = monitor_snapshots_opts(marker, monitor_plan, log_suffix(lhn, src_host, dst_hostname)) 

642 opts += [f"--ssh-dst-user={args.dst_user}"] if args.dst_user else [] 

643 opts += unknown_args + ["--"] 

644 dataset_pairs = [(dummy, resolve_dst_dataset(dst_hostname, dst)) for src, dst in args.root_dataset_pairs] 

645 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs) 

646 if len(dataset_pairs) > 0: 

647 subjobs[subjob_name + jpad(j, marker)] = [bzfs_prog_name] + opts + flatten(dataset_pairs) 

648 j += 1 

649 subjob_name = update_subjob_name(marker) 

650 

651 msg = "Ready to run %s subjobs using %s/%s src hosts: %s, %s/%s dst hosts: %s" 

652 log.info( 

653 msg, len(subjobs), len(src_hosts), nb_src_hosts, src_hosts, len(dst_hosts), nb_dst_hosts, list(dst_hosts.keys()) 

654 ) 

655 log.log(log_trace, "subjobs: \n%s", pretty_print_formatter(subjobs)) 

656 self.run_subjobs(subjobs, max_workers, worker_timeout_seconds, args.work_period_seconds, args.jitter) 

657 ex = self.first_exception 

658 if isinstance(ex, int): 

659 assert ex != 0 

660 sys.exit(ex) 

661 assert ex is None, ex 

662 log.info("Succeeded. Bye!") 

663 

664 def replication_opts( 

665 self, 

666 dst_snapshot_plan: Dict[str, Dict[str, Dict[str, int]]], 

667 targets: Set[str], 

668 localhostname: str, 

669 src_hostname: str, 

670 dst_hostname: str, 

671 tag: str, 

672 job_id: str, 

673 job_run: str, 

674 ) -> List[str]: 

675 log = self.log 

676 log.debug("%s", f"Replicating targets {sorted(targets)} from {src_hostname} to {dst_hostname} ...") 

677 include_snapshot_plan = { # only replicate targets that belong to the destination host and are relevant 

678 org: { 

679 target: { 

680 duration_unit: duration_amount 

681 for duration_unit, duration_amount in periods.items() 

682 if duration_amount > 0 

683 } 

684 for target, periods in target_periods.items() 

685 if target in targets 

686 } 

687 for org, target_periods in dst_snapshot_plan.items() 

688 } 

689 include_snapshot_plan = { # only replicate orgs that have at least one relevant target_period 

690 org: target_periods 

691 for org, target_periods in include_snapshot_plan.items() 

692 if any(len(periods) > 0 for target, periods in target_periods.items()) 

693 } 

694 opts = [] 

695 if len(include_snapshot_plan) > 0: 

696 opts += [f"--include-snapshot-plan={include_snapshot_plan}"] 

697 opts += [f"--log-file-prefix={prog_name}{sep}{tag}{sep}"] 

698 opts += [f"--log-file-infix={sep}{job_id}"] 

699 opts += [f"--log-file-suffix={sep}{job_run}{log_suffix(localhostname, src_hostname, dst_hostname)}{sep}"] 

700 return opts 

701 

702 def skip_nonexisting_local_dst_pools(self, root_dataset_pairs: List[Tuple[str, str]]) -> List[Tuple[str, str]]: 

703 """Skip datasets that point to removeable destination drives that are not currently (locally) attached, if any.""" 

704 

705 def zpool(dataset: str) -> str: 

706 return dataset.split("/", 1)[0] 

707 

708 assert len(root_dataset_pairs) > 0 

709 unknown_dst_pools = {zpool(dst) for src, dst in root_dataset_pairs} 

710 unknown_dst_pools = unknown_dst_pools.difference(self.cache_known_dst_pools) 

711 

712 # Here we treat a zpool as existing if the zpool isn't local, aka if it isn't prefixed with "-:". A remote host 

713 # will raise an appropriate error if it turns out that the remote zpool doesn't actually exist. 

714 unknown_local_dst_pools = {pool for pool in unknown_dst_pools if pool.startswith("-:")} 

715 if len(unknown_local_dst_pools) > 0: # `zfs list` if local 

716 existing_pools = {pool[len("-:") :] for pool in unknown_local_dst_pools} 

717 cmd = "zfs list -t filesystem,volume -Hp -o name".split(" ") + sorted(existing_pools) 

718 sp = subprocess.run(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, text=True) 

719 if sp.returncode not in (0, 1): # 1 means dataset not found 719 ↛ 720line 719 didn't jump to line 720 because the condition on line 719 was never true

720 self.die(f"Unexpected error {sp.returncode} on checking for existing local dst pools: {sp.stderr.strip()}") 

721 existing_pools = set(sp.stdout.splitlines()) 

722 existing_pools = {"-:" + pool for pool in existing_pools} 

723 self.cache_existing_dst_pools.update(existing_pools) # union 

724 unknown_remote_dst_pools = unknown_dst_pools.difference(unknown_local_dst_pools) 

725 self.cache_existing_dst_pools.update(unknown_remote_dst_pools) # union 

726 self.cache_known_dst_pools.update(unknown_dst_pools) # union 

727 results = [] 

728 for src, dst in root_dataset_pairs: 

729 if zpool(dst) in self.cache_existing_dst_pools: 

730 results.append((src, dst)) 

731 else: 

732 self.log.warning("Skipping dst dataset for which local dst pool does not exist: %s", dst) 

733 return results 

734 

735 def run_subjobs( 

736 self, 

737 subjobs: Dict[str, List[str]], 

738 max_workers: int, 

739 timeout_secs: Optional[float], 

740 work_period_seconds: float, 

741 jitter: bool, 

742 ) -> None: 

743 self.stats = Stats() 

744 self.stats.jobs_all = len(subjobs) 

745 log = self.log 

746 num_intervals = 1 + len(subjobs) if jitter else len(subjobs) 

747 interval_nanos = 0 if len(subjobs) == 0 else round(1_000_000_000 * max(0.0, work_period_seconds) / num_intervals) 

748 assert interval_nanos >= 0 

749 if jitter: # randomize job start time to avoid potential thundering herd problems in large distributed systems 

750 sleep_nanos = random.randint(0, interval_nanos) 

751 log.info("Jitter: Delaying job start time by sleeping for %s ...", bzfs.human_readable_duration(sleep_nanos)) 

752 time.sleep(sleep_nanos / 1_000_000_000) # seconds 

753 sorted_subjobs = sorted(subjobs.keys()) 

754 has_barrier = any(bzfs.BARRIER_CHAR in subjob.split("/") for subjob in sorted_subjobs) 

755 if self.spawn_process_per_job or has_barrier or bzfs.has_siblings(sorted_subjobs): # siblings can run in parallel 

756 log.log(log_trace, "%s", "spawn_process_per_job: True") 

757 helper = bzfs.Job() 

758 helper.params = bzfs.Params(self.bzfs_argument_parser.parse_args(args=["src", "dst", "--retries=0"]), log=log) 

759 helper.is_test_mode = self.is_test_mode 

760 

761 helper.process_datasets_in_parallel_and_fault_tolerant( 

762 sorted_subjobs, 

763 process_dataset=lambda subjob, tid, retry: self.run_subjob( 

764 subjobs[subjob], name=subjob, timeout_secs=timeout_secs, spawn_process_per_job=True 

765 ) 

766 == 0, 

767 skip_tree_on_error=lambda subjob: True, 

768 max_workers=max_workers, 

769 interval_nanos=lambda subjob: interval_nanos, 

770 task_name="Subjob", 

771 ) 

772 else: 

773 log.log(log_trace, "%s", "spawn_process_per_job: False") 

774 next_update_nanos = time.monotonic_ns() 

775 for subjob in sorted_subjobs: 

776 time.sleep(max(0, next_update_nanos - time.monotonic_ns()) / 1_000_000_000) # seconds 

777 next_update_nanos += interval_nanos 

778 s = subjob 

779 if not self.run_subjob(subjobs[subjob], name=s, timeout_secs=timeout_secs, spawn_process_per_job=False) == 0: 

780 break 

781 stats = self.stats 

782 jobs_skipped = stats.jobs_all - stats.jobs_started 

783 msg = f"{stats}, skipped:" + bzfs.percent(jobs_skipped, total=stats.jobs_all) 

784 log.info("Final Progress: %s", msg) 

785 assert stats.jobs_running == 0, msg 

786 assert stats.jobs_completed == stats.jobs_started, msg 

787 skipped_jobs_dict = {subjob: subjobs[subjob] for subjob in sorted_subjobs if subjob not in stats.started_job_names} 

788 if len(skipped_jobs_dict) > 0: 

789 log.debug("Skipped subjobs: \n%s", pretty_print_formatter(skipped_jobs_dict)) 

790 assert jobs_skipped == len(skipped_jobs_dict), msg 

791 

792 def run_subjob( 

793 self, cmd: List[str], name: str, timeout_secs: Optional[float], spawn_process_per_job: bool 

794 ) -> Optional[int]: # thread-safe 

795 start_time_nanos = time.monotonic_ns() 

796 returncode = None 

797 log = self.log 

798 cmd_str = " ".join(cmd) 

799 stats = self.stats 

800 try: 

801 with stats.lock: 

802 stats.jobs_started += 1 

803 stats.jobs_running += 1 

804 stats.started_job_names.add(name) 

805 msg = str(stats) 

806 log.log(log_trace, "Starting worker job: %s", cmd_str) 

807 log.info("Progress: %s", msg) 

808 start_time_nanos = time.monotonic_ns() 

809 if spawn_process_per_job: 

810 returncode = self.run_worker_job_spawn_process_per_job(cmd, timeout_secs) 

811 else: 

812 returncode = self.run_worker_job_in_current_thread(cmd, timeout_secs) 

813 except BaseException as e: 

814 log.error("Worker job failed with unexpected exception: %s for command: %s", e, cmd_str) 

815 raise e 

816 else: 

817 elapsed_nanos = time.monotonic_ns() - start_time_nanos 

818 elapsed_human = bzfs.human_readable_duration(elapsed_nanos) 

819 if returncode != 0: 

820 with stats.lock: 

821 if self.first_exception is None: 

822 self.first_exception = die_status if returncode is None else returncode 

823 log.error("Worker job failed with exit code %s in %s: %s", returncode, elapsed_human, cmd_str) 

824 else: 

825 log.debug("Worker job succeeded in %s: %s", elapsed_human, cmd_str) 

826 return returncode 

827 finally: 

828 elapsed_nanos = time.monotonic_ns() - start_time_nanos 

829 with stats.lock: 

830 stats.jobs_running -= 1 

831 stats.jobs_completed += 1 

832 stats.sum_elapsed_nanos += elapsed_nanos 

833 stats.jobs_failed += 1 if returncode != 0 else 0 

834 msg = str(stats) 

835 assert stats.jobs_all >= 0, msg 

836 assert stats.jobs_started >= 0, msg 

837 assert stats.jobs_completed >= 0, msg 

838 assert stats.jobs_failed >= 0, msg 

839 assert stats.jobs_running >= 0, msg 

840 assert stats.sum_elapsed_nanos >= 0, msg 

841 assert stats.jobs_failed <= stats.jobs_completed, msg 

842 assert stats.jobs_completed <= stats.jobs_started, msg 

843 assert stats.jobs_started <= stats.jobs_all, msg 

844 log.info("Progress: %s", msg) 

845 

846 def run_worker_job_in_current_thread(self, cmd: List[str], timeout_secs: Optional[float]) -> Optional[int]: 

847 log = self.log 

848 if timeout_secs is not None: 

849 cmd = cmd[0:1] + [f"--timeout={round(1000 * timeout_secs)}milliseconds"] + cmd[1:] 

850 try: 

851 if not self.jobrunner_dryrun: 

852 self._bzfs_run_main(cmd) 

853 return 0 

854 except subprocess.CalledProcessError as e: 

855 return e.returncode 

856 except SystemExit as e: 

857 assert e.code is None or isinstance(e.code, int) 

858 return e.code 

859 except BaseException as e: 

860 log.error("Worker job failed with unexpected exception for command: %s", " ".join(cmd), exc_info=e) 

861 return die_status 

862 

863 def _bzfs_run_main(self, cmd: List[str]) -> None: 

864 bzfs_job = bzfs.Job() 

865 bzfs_job.is_test_mode = self.is_test_mode 

866 bzfs_job.run_main(self.bzfs_argument_parser.parse_args(cmd[1:]), cmd) 

867 

868 def run_worker_job_spawn_process_per_job(self, cmd: List[str], timeout_secs: Optional[float]) -> Optional[int]: 

869 log = self.log 

870 if len(cmd) > 0 and cmd[0] == bzfs_prog_name: 

871 cmd = [sys.executable, "-m", "bzfs_main." + cmd[0]] + cmd[1:] 

872 if self.jobrunner_dryrun: 

873 return 0 

874 proc = subprocess.Popen(cmd, stdin=subprocess.DEVNULL, text=True) # run job in a separate subprocess 

875 try: 

876 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to exit 

877 except subprocess.TimeoutExpired: 

878 cmd_str = " ".join(cmd) 

879 log.error("%s", f"Terminating worker job as it failed to complete within {timeout_secs}s: {cmd_str}") 

880 proc.terminate() # Sends SIGTERM signal to job subprocess 

881 assert isinstance(timeout_secs, float) 

882 timeout_secs = min(1.0, timeout_secs) 

883 try: 

884 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to exit 

885 except subprocess.TimeoutExpired: 

886 log.error("%s", f"Killing worker job as it failed to terminate within {timeout_secs}s: {cmd_str}") 

887 proc.kill() # Sends SIGKILL signal to job subprocess because SIGTERM wasn't enough 

888 timeout_secs = min(0.025, timeout_secs) 

889 with contextlib.suppress(subprocess.TimeoutExpired): 

890 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to exit 

891 return proc.returncode 

892 

893 def validate_src_hosts(self, src_hosts: List[str]) -> List[str]: 

894 context = "--src-hosts" 

895 self.validate_type(src_hosts, list, context) 

896 for src_hostname in src_hosts: 

897 self.validate_host_name(src_hostname, context) 

898 return src_hosts 

899 

900 def validate_dst_hosts(self, dst_hosts: Dict[str, List[str]]) -> Dict[str, List[str]]: 

901 context = "--dst-hosts" 

902 self.validate_type(dst_hosts, dict, context) 

903 for dst_hostname, targets in dst_hosts.items(): 

904 self.validate_host_name(dst_hostname, context) 

905 self.validate_type(targets, list, f"{context} targets") 

906 for target in targets: 

907 self.validate_type(target, str, f"{context} target") 

908 return dst_hosts 

909 

910 def validate_dst_root_datasets(self, dst_root_datasets: Dict[str, str]) -> Dict[str, str]: 

911 context = "--dst-root-datasets" 

912 self.validate_type(dst_root_datasets, dict, context) 

913 for dst_hostname, dst_root_dataset in dst_root_datasets.items(): 

914 self.validate_host_name(dst_hostname, context) 

915 self.validate_type(dst_root_dataset, str, f"{context} root dataset") 

916 return dst_root_datasets 

917 

918 def validate_snapshot_plan( 

919 self, snapshot_plan: Dict[str, Dict[str, Dict[str, int]]], context: str 

920 ) -> Dict[str, Dict[str, Dict[str, int]]]: 

921 self.validate_type(snapshot_plan, dict, context) 

922 for org, target_periods in snapshot_plan.items(): 

923 self.validate_type(org, str, f"{context} org") 

924 self.validate_type(target_periods, dict, f"{context} target_periods") 

925 for target, periods in target_periods.items(): 

926 self.validate_type(target, str, f"{context} org/target") 

927 self.validate_type(periods, dict, f"{context} org/periods") 

928 for period_unit, period_amount in periods.items(): 

929 self.validate_non_empty_string(period_unit, f"{context} org/target/period_unit") 

930 self.validate_non_negative_int(period_amount, f"{context} org/target/period_amount") 

931 return snapshot_plan 

932 

933 def validate_monitor_snapshot_plan( 

934 self, 

935 monitor_snapshot_plan: Dict[str, Dict[str, Dict[str, Dict[str, Union[str, int]]]]], 

936 ) -> Dict[str, Dict[str, Dict[str, Dict[str, Union[str, int]]]]]: 

937 context = "--monitor-snapshot-plan" 

938 self.validate_type(monitor_snapshot_plan, dict, context) 

939 for org, target_periods in monitor_snapshot_plan.items(): 

940 self.validate_type(org, str, f"{context} org") 

941 self.validate_type(target_periods, dict, f"{context} target_periods") 

942 for target, periods in target_periods.items(): 

943 self.validate_type(target, str, f"{context} org/target") 

944 self.validate_type(periods, dict, f"{context} org/periods") 

945 for period_unit, alert_dict in periods.items(): 

946 self.validate_non_empty_string(period_unit, f"{context} org/target/period_unit") 

947 self.validate_type(alert_dict, dict, f"{context} org/target/alert_dict") 

948 for key, value in alert_dict.items(): 

949 self.validate_non_empty_string(key, f"{context} org/target/alert_dict/key") 

950 self.validate_type(value, Union[str, int], f"{context} org/target/alert_dict/value") 

951 return monitor_snapshot_plan 

952 

953 def validate_is_subset(self, x: Iterable[str], y: Iterable[str], x_name: str, y_name: str) -> None: 

954 if isinstance(x, str) or not isinstance(x, Iterable): 

955 self.die(f"{x_name} must be an Iterable") 

956 if isinstance(y, str) or not isinstance(y, Iterable): 

957 self.die(f"{y_name} must be an Iterable") 

958 if not set(x).issubset(set(y)): 

959 diff = sorted(set(x).difference(set(y))) 

960 self.die(f"{x_name} must be a subset of {y_name}. diff: {diff}, {x_name}: {sorted(x)}, {y_name}: {sorted(y)}") 

961 

962 def validate_host_name(self, hostname: str, context: str) -> None: 

963 self.validate_non_empty_string(hostname, f"{context} hostname") 

964 bzfs.validate_host_name(hostname, context, extra_invalid_chars=":") 

965 

966 def validate_non_empty_string(self, value: str, name: str) -> None: 

967 self.validate_type(value, str, name) 

968 if not value: 

969 self.die(f"{name} must not be empty!") 

970 

971 def validate_non_negative_int(self, value: int, name: str) -> None: 

972 self.validate_type(value, int, name) 

973 if value < 0: 

974 self.die(f"{name} must be a non-negative integer: {value}") 

975 

976 def validate_true(self, expr: Any, msg: str) -> None: 

977 if not bool(expr): 

978 self.die(msg) 

979 

980 def validate_type(self, value: Any, expected_type: Any, name: str) -> None: 

981 if hasattr(expected_type, "__origin__") and expected_type.__origin__ is Union: # for compat with python < 3.10 

982 union_types = expected_type.__args__ 

983 for t in union_types: 

984 if isinstance(value, t): 

985 return 

986 type_msg = " or ".join([t.__name__ for t in union_types]) 

987 self.die(f"{name} must be of type {type_msg} but got {type(value).__name__}: {value}") 

988 elif not isinstance(value, expected_type): 

989 self.die(f"{name} must be of type {expected_type.__name__} but got {type(value).__name__}: {value}") 

990 

991 def die(self, msg: str) -> None: 

992 self.log.error("%s", msg) 

993 bzfs.die(msg) 

994 

995 

996############################################################################# 

997class Stats: 

998 def __init__(self) -> None: 

999 self.lock: threading.Lock = threading.Lock() 

1000 self.jobs_all: int = 0 

1001 self.jobs_started: int = 0 

1002 self.jobs_completed: int = 0 

1003 self.jobs_failed: int = 0 

1004 self.jobs_running: int = 0 

1005 self.sum_elapsed_nanos: int = 0 

1006 self.started_job_names: Set[str] = set() 

1007 

1008 def __repr__(self) -> str: 

1009 def pct(number: int) -> str: 

1010 return bzfs.percent(number, total=self.jobs_all) 

1011 

1012 al, started, completed, failed = self.jobs_all, self.jobs_started, self.jobs_completed, self.jobs_failed 

1013 running = self.jobs_running 

1014 t = "avg_completion_time:" + bzfs.human_readable_duration(self.sum_elapsed_nanos / max(1, completed)) 

1015 return f"all:{al}, started:{pct(started)}, completed:{pct(completed)}, failed:{pct(failed)}, running:{running}, {t}" 

1016 

1017 

1018############################################################################# 

1019def dedupe(root_dataset_pairs: List[Tuple[str, str]]) -> List[Tuple[str, str]]: 

1020 return list(dict.fromkeys(root_dataset_pairs).keys()) 

1021 

1022 

1023def flatten(root_dataset_pairs: List[Tuple[str, str]]) -> List[str]: 

1024 return [item for pair in root_dataset_pairs for item in pair] 

1025 

1026 

1027K = TypeVar("K") 

1028V = TypeVar("V") 

1029 

1030 

1031def shuffle_dict(dictionary: Dict[K, V]) -> Dict[K, V]: 

1032 items = list(dictionary.items()) 

1033 random.shuffle(items) 

1034 return dict(items) 

1035 

1036 

1037def sorted_dict(dictionary: Dict[K, V]) -> Dict[K, V]: 

1038 return dict(sorted(dictionary.items())) 

1039 

1040 

1041def sanitize(filename: str) -> str: 

1042 for s in (" ", "..", "/", "\\", sep): 

1043 filename = filename.replace(s, "!") 

1044 return filename 

1045 

1046 

1047def log_suffix(localhostname: str, src_hostname: str, dst_hostname: str) -> str: 

1048 sanitized_dst_hostname = sanitize(dst_hostname) if dst_hostname else "" 

1049 return f"{sep}{sanitize(localhostname)}{sep}{sanitize(src_hostname)}{sep}{sanitized_dst_hostname}" 

1050 

1051 

1052def format_dict(dictionary: Dict[str, Any]) -> str: 

1053 return bzfs.format_dict(dictionary) 

1054 

1055 

1056def pretty_print_formatter(dictionary: Dict[str, Any]) -> Any: # For lazy/noop evaluation in disabled log levels 

1057 class PrettyPrintFormatter: 

1058 def __str__(self) -> str: 

1059 import json 

1060 

1061 return json.dumps(dictionary, indent=4, sort_keys=True) 

1062 

1063 return PrettyPrintFormatter() 

1064 

1065 

1066def detect_loopback_address() -> str: 

1067 """Detects if a loopback connection over IPv4 or IPv6 is possible.""" 

1068 try: 

1069 addr = "127.0.0.1" 

1070 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 

1071 s.bind((addr, 0)) 

1072 return addr 

1073 except OSError: 

1074 pass 

1075 

1076 try: 

1077 addr = "::1" 

1078 with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: 

1079 s.bind((addr, 0)) 

1080 return addr 

1081 except OSError: 

1082 pass 

1083 

1084 return "" 

1085 

1086 

1087def convert_ipv6(hostname: str) -> str: # support IPv6 without getting confused by host:dataset colon separator ... 

1088 return hostname.replace(":", "|") # ... and any colons that may be part of a (valid) ZFS dataset name 

1089 # Also see bzfs.convert_ipv6() for the reverse conversion 

1090 

1091 

1092############################################################################# 

1093if __name__ == "__main__": 1093 ↛ 1094line 1093 didn't jump to line 1094 because the condition on line 1093 was never true

1094 main()