Coverage for bzfs/bzfs_jobrunner.py: 99%

189 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-13 10:14 +0000

1#!/usr/bin/env python3 

2# 

3# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

4# 

5# Licensed under the Apache License, Version 2.0 (the "License"); 

6# you may not use this file except in compliance with the License. 

7# You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16 

17"""WARNING: For now, `bzfs_jobrunner` is work-in-progress, and as such may still change in incompatible ways.""" 

18 

19import argparse 

20import ast 

21import logging 

22import socket 

23import subprocess 

24import sys 

25from collections import defaultdict 

26from typing import Dict, List, Optional, Set, Tuple 

27 

28prog_name = "bzfs_jobrunner" 

29 

30 

31def argument_parser() -> argparse.ArgumentParser: 

32 # fmt: off 

33 parser = argparse.ArgumentParser( 

34 prog=prog_name, 

35 description=f""" 

36WARNING: For now, `bzfs_jobrunner` is work-in-progress, and as such may still change in incompatible ways. 

37 

38This program is a convenience wrapper around [bzfs](README.md) that simplifies periodic ZFS snapshot creation, replication,  

39and pruning, across source host and multiple destination hosts, using a single shared  

40[jobconfig](bzfs_tests/bzfs_job_example.py) script. 

41 

42Typically, a cron job on the source host runs `{prog_name}` periodically to create new snapshots (via --create-src-snapshots)  

43and prune outdated snapshots and bookmarks on the source (via --prune-src-snapshots and --prune-src-bookmarks), whereas  

44another cron job on the destination host runs `{prog_name}` periodically to prune outdated destination snapshots (via  

45--prune-dst-snapshots). Yet another cron job runs `{prog_name}` periodically to replicate the recently created snapshots from  

46the source to the destination (via --replicate). The frequency of these periodic activities can vary by activity, and is  

47typically every second, minute, hour, day, week, month and/or year (or multiples thereof). 

48 

49Edit the jobconfig script in a central place (e.g. versioned in a git repo), then copy the (very same) shared file onto the  

50source host and all destination hosts, and add crontab entries or systemd timers or similar, along these lines:  

51 

52* crontab on source host: 

53 

54`* * * * * testuser /etc/bzfs/bzfs_job_example.py --create-src-snapshots --prune-src-snapshots --prune-src-bookmarks` 

55 

56* crontab on destination host(s): 

57 

58`* * * * * testuser /etc/bzfs/bzfs_job_example.py --replicate=pull --prune-dst-snapshots` 

59 

60### High Frequency Replication (Experimental Feature) 

61 

62Taking snapshots, and/or replicating, from every N milliseconds to every 10 seconds or so is considered high frequency. For  

63such use cases, consider that `zfs list -t snapshot` performance degrades as more and more snapshots currently exist within 

64the selected datasets, so try to keep the number of currently existing snapshots small, and prune them at a frequency that  

65is proportional to the frequency with which snapshots are created. Consider using `--skip-parent` and `--exclude-dataset*`  

66filters to limit the selected datasets only to those that require this level of frequency. 

67 

68In addition, use the `--daemon-*` options to reduce startup overhead, in combination with splitting the crontab entry (or  

69better: high frequency systemd timer) into multiple processes, using pull replication mode, along these lines: 

70 

71* crontab on source host: 

72 

73`* * * * * testuser /etc/bzfs/bzfs_job_example.py --create-src-snapshots` 

74 

75`* * * * * testuser /etc/bzfs/bzfs_job_example.py --prune-src-snapshots` 

76 

77`* * * * * testuser /etc/bzfs/bzfs_job_example.py --prune-src-bookmarks` 

78 

79* crontab on destination host(s): 

80 

81`* * * * * testuser /etc/bzfs/bzfs_job_example.py --replicate=pull` 

82 

83`* * * * * testuser /etc/bzfs/bzfs_job_example.py --prune-dst-snapshots` 

84 

85The daemon processes work like non-daemon processes except that they loop, handle time events and sleep between events, and  

86finally exit after, say, 86400 seconds (whatever you specify via `--daemon-lifetime`). The daemons will subsequently be  

87auto-restarted by 'cron', or earlier if they fail. While the daemons are running 'cron' will attempt to start new  

88(unnecessary) daemons but this is benign as these new processes immediately exit with a message like this:  

89"Exiting as same previous periodic job is still running without completion yet" 

90""", formatter_class=argparse.RawTextHelpFormatter) 

91 

92 # commands: 

93 parser.add_argument("--create-src-snapshots", action="store_true", 

94 help="Take snapshots on src as necessary. This command should be called by a program (or cron job) running on the " 

95 "src host.\n\n") 

96 parser.add_argument("--replicate", choices=["pull", "push"], default=None, const="pull", nargs="?", 

97 help="Replicate snapshots from src to dst as necessary, either in pull mode (recommended) or push mode " 

98 "(experimental). For pull mode, this command should be called by a program (or cron job) running on the dst " 

99 "host; for push mode, on the src host.\n\n") 

100 parser.add_argument("--prune-src-snapshots", action="store_true", 

101 help="Prune snapshots on src as necessary. This command should be called by a program (or cron job) running on the " 

102 "src host.\n\n") 

103 parser.add_argument("--prune-src-bookmarks", action="store_true", 

104 help="Prune bookmarks on src as necessary. This command should be called by a program (or cron job) running on the " 

105 "src host.\n\n") 

106 parser.add_argument("--prune-dst-snapshots", action="store_true", 

107 help="Prune snapshots on dst as necessary. This command should be called by a program (or cron job) running on the " 

108 "dst host.\n\n") 

109 

110 # options: 

111 parser.add_argument("--src-host", default="-", metavar="STRING", 

112 help="Network hostname of src. Used by destination host(s) if replicating in pull mode.\n\n") 

113 parser.add_argument("--localhost", default=None, metavar="STRING", 

114 help="Hostname of localhost. Default is the hostname without the domain name.\n\n") 

115 dst_hosts_example = {"onsite": "nas", "us-west-1": "bak-us-west-1", "eu-west-1": "bak-eu-west-1", "offsite": "archive"} 

116 parser.add_argument("--dst-hosts", default="{}", metavar="DICT_STRING", 

117 help="Dictionary that maps logical replication target names (the infix portion of a snapshot name) to destination " 

118 f"hostnames. Example: `{format_dict(dst_hosts_example)}`. With this, given a snapshot name, we can find the " 

119 "destination hostname to which the snapshot shall be replicated. Also, given a snapshot name and its " 

120 "--localhost name, a destination host can determine if it shall 'pull' replicate the given snapshot from the " 

121 "--src-host, or if the snapshot is intended for another destination host, in which case it skips the snapshot. " 

122 f"A destination host running {prog_name} will 'pull' snapshots for all targets that map to its --localhost " 

123 "name.\n\n") 

124 parser.add_argument("--retain-dst-targets", default="{}", metavar="DICT_STRING", 

125 help="Dictionary that maps logical replication target names (the infix portion of a snapshot name) to " 

126 f"destination hostnames. Example: `{format_dict(dst_hosts_example)}`. Has same format as --dst-hosts. As part " 

127 "of --prune-dst-snapshots, a destination host will delete any snapshot it has stored whose target has no " 

128 "mapping to its --localhost name in this dictionary. Do not remove a mapping unless you are sure it's ok to " 

129 "delete all those snapshots on that destination host! If in doubt, use --dryrun mode first.\n\n") 

130 dst_root_datasets_example = { 

131 "nas": "tank2/bak", 

132 "bak-us-west-1": "backups/bak001", 

133 "bak-eu-west-1": "backups/bak999", 

134 "archive": "archives/zoo", 

135 "hotspare": "", 

136 } 

137 parser.add_argument("--dst-root-datasets", default="{}", metavar="DICT_STRING", 

138 help="Dictionary that maps each destination hostname to a root dataset located on that destination host. The root " 

139 "dataset name is an (optional) prefix that will be prepended to each dataset that is replicated to that " 

140 "destination host. For backup use cases, this is the backup ZFS pool or a ZFS dataset path within that pool, " 

141 "whereas for cloning, master slave replication, or replication from a primary to a secondary, this can also be " 

142 f"the empty string. Example: `{format_dict(dst_root_datasets_example)}`\n\n") 

143 src_snapshot_plan_example = { 

144 "prod": { 

145 "onsite": {"secondly": 40, "minutely": 40, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5}, 

146 "us-west-1": {"secondly": 0, "minutely": 0, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5}, 

147 "eu-west-1": {"secondly": 0, "minutely": 0, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5}, 

148 }, 

149 "test": { 

150 "offsite": {"12hourly": 42, "weekly": 12}, 

151 }, 

152 } 

153 parser.add_argument("--src-snapshot-plan", default="{}", metavar="DICT_STRING", 

154 help="Retention periods for snapshots to be used if pruning src, and when creating new snapshots on src. " 

155 "Snapshots that do not match a retention period will be deleted. A zero within a retention period indicates " 

156 "that no snapshots shall be retained (or even be created) for the given period.\n\n" 

157 f"Example: `{format_dict(src_snapshot_plan_example)}`. This example will, for the organization 'prod' and " 

158 "the intended logical target 'onsite', create and then retain secondly snapshots that were created less " 

159 "than 40 seconds ago, yet retain the latest 40 secondly snapshots regardless of creation time. Analog for " 

160 "the latest 40 minutely snapshots, 36 hourly snapshots, etc. " 

161 "It will also create and retain snapshots for the targets 'us-west-1' and 'eu-west-1' within the 'prod' " 

162 "organization. " 

163 "In addition, it will create and retain snapshots every 12 hours and every week for the 'test' organization, " 

164 "and name them as being intended for the 'offsite' replication target. " 

165 "The example creates snapshots with names like " 

166 "`prod_onsite_<timestamp>_secondly`, `prod_onsite_<timestamp>_minutely`, " 

167 "`prod_us-west-1_<timestamp>_hourly`, `prod_us-west-1_<timestamp>_daily`, " 

168 "`prod_eu-west-1_<timestamp>_hourly`, `prod_eu-west-1_<timestamp>_daily`, " 

169 "`test_offsite_<timestamp>_12hourly`, `test_offsite_<timestamp>_weekly`, and so on.\n\n") 

170 parser.add_argument("--src-bookmark-plan", default="{}", metavar="DICT_STRING", 

171 help="Retention periods for bookmarks to be used if pruning src. Has same format as --src-snapshot-plan.\n\n") 

172 parser.add_argument("--dst-snapshot-plan", default="{}", metavar="DICT_STRING", 

173 help="Retention periods for snapshots to be used if pruning dst. Has same format as --src-snapshot-plan.\n\n") 

174 parser.add_argument("--src-user", default="", metavar="STRING", 

175 help="SSH username on --src-host. Used if replicating in pull mode. Example: 'alice'\n\n") 

176 parser.add_argument("--dst-user", default="", metavar="STRING", 

177 help="SSH username on dst. Used if replicating in push mode. Example: 'root'\n\n") 

178 parser.add_argument("--daemon-replication-frequency", default="minutely", metavar="STRING", 

179 help="Specifies how often the bzfs daemon shall replicate from src to dst if --daemon-lifetime is nonzero.\n\n") 

180 parser.add_argument("--daemon-prune-src-frequency", default="minutely", metavar="STRING", 

181 help="Specifies how often the bzfs daemon shall prune src if --daemon-lifetime is nonzero.\n\n") 

182 parser.add_argument("--daemon-prune-dst-frequency", default="minutely", metavar="STRING", 

183 help="Specifies how often the bzfs daemon shall prune dst if --daemon-lifetime is nonzero.\n\n") 

184 parser.add_argument("root_dataset_pairs", nargs="+", action=DatasetPairsAction, metavar="SRC_DATASET DST_DATASET", 

185 help="Source and destination dataset pairs (excluding usernames and excluding hostnames, which will all be " 

186 "auto-appended later).\n\n") 

187 return parser 

188 # fmt: on 

189 

190 

191still_running_status = 4 

192dummy_dataset = "dummy" 

193sep = "," 

194DEVNULL = subprocess.DEVNULL 

195PIPE = subprocess.PIPE 

196 

197 

198def main(): 

199 Job().run_main(sys.argv[1:]) 

200 

201 

202############################################################################# 

203class Job: 

204 def __init__(self, log: Optional[logging.Logger] = None): 

205 self.first_exception = None 

206 self.log = log if log is not None else get_logger() 

207 

208 def run_main(self, sys_argv: List[str]) -> None: 

209 self.log.info( 

210 "WARNING: For now, `bzfs_jobrunner` is work-in-progress, and as such may still change in incompatible ways." 

211 ) 

212 args, unknown_args = argument_parser().parse_known_args(sys_argv) # forward all unknown args to `bzfs` 

213 src_snapshot_plan = ast.literal_eval(args.src_snapshot_plan) 

214 src_bookmark_plan = ast.literal_eval(args.src_bookmark_plan) 

215 dst_snapshot_plan = ast.literal_eval(args.dst_snapshot_plan) 

216 src_host = args.src_host 

217 assert src_host, "--src-host must not be empty!" 

218 localhostname = args.localhost if args.localhost else socket.gethostname() 

219 assert localhostname, "localhostname must not be empty!" 

220 dst_hosts = ast.literal_eval(args.dst_hosts) 

221 retain_dst_targets = ast.literal_eval(args.retain_dst_targets) 

222 dst_root_datasets = ast.literal_eval(args.dst_root_datasets) 

223 

224 def resolve_dst_dataset(dst_dataset: str, dst_hostname: str) -> str: 

225 root_dataset = dst_root_datasets.get(dst_hostname) 

226 assert root_dataset is not None, f"Hostname '{dst_hostname}' missing in --dst-root-datasets: {dst_root_datasets}" 

227 return root_dataset + "/" + dst_dataset if root_dataset else dst_dataset 

228 

229 if args.create_src_snapshots: 

230 opts = ["--create-src-snapshots", f"--create-src-snapshots-plan={src_snapshot_plan}", "--skip-replication"] 

231 opts += [f"--log-file-prefix={prog_name}{sep}create-src-snapshots{sep}"] 

232 opts += [f"--log-file-suffix={sep}"] 

233 opts += unknown_args + ["--"] 

234 opts += dedupe([(src, dummy_dataset) for src, dst in args.root_dataset_pairs]) 

235 self.run_cmd(["bzfs"] + opts) 

236 

237 if args.replicate == "pull": # pull mode (recommended) 

238 pull_targets = {target for target, dst_hostname in dst_hosts.items() if dst_hostname == localhostname} 

239 opts = self.replication_filter_opts(dst_snapshot_plan, "pull", pull_targets, src_host, localhostname) 

240 if len(opts) > 0: 

241 opts += [f"--ssh-src-user={args.src_user}"] if args.src_user else [] 

242 opts += unknown_args + ["--"] 

243 old_len_opts = len(opts) 

244 pairs = [ 

245 (f"{src_host}:{src}", resolve_dst_dataset(dst, localhostname)) for src, dst in args.root_dataset_pairs 

246 ] 

247 for src, dst in self.skip_datasets_with_nonexisting_dst_pool(pairs): 

248 opts += [src, dst] 

249 if len(opts) > old_len_opts: 

250 daemon_opts = [f"--daemon-frequency={args.daemon_replication_frequency}"] 

251 self.run_cmd(["bzfs"] + daemon_opts + opts) 

252 

253 elif args.replicate == "push": # push mode (experimental feature) 

254 host_targets = defaultdict(set) 

255 for org, targetperiods in dst_snapshot_plan.items(): 

256 for target in targetperiods.keys(): 

257 dst_hostname = dst_hosts.get(target) 

258 if dst_hostname: 

259 host_targets[dst_hostname].add(target) 

260 for dst_hostname, push_targets in host_targets.items(): 

261 opts = self.replication_filter_opts(dst_snapshot_plan, "push", push_targets, localhostname, dst_hostname) 

262 if len(opts) > 0: 

263 opts += [f"--ssh-dst-user={args.dst_user}"] if args.dst_user else [] 

264 opts += unknown_args + ["--"] 

265 for src, dst in args.root_dataset_pairs: 

266 opts += [src, f"{dst_hostname}:{resolve_dst_dataset(dst, dst_hostname)}"] 

267 daemon_opts = [f"--daemon-frequency={args.daemon_replication_frequency}"] 

268 self.run_cmd(["bzfs"] + daemon_opts + opts) 

269 

270 def prune_src(opts: List[str]) -> None: 

271 opts += [ 

272 f"--log-file-suffix={sep}", 

273 "--skip-replication", 

274 f"--daemon-frequency={args.daemon_prune_src_frequency}", 

275 ] 

276 opts += unknown_args + ["--"] 

277 opts += dedupe([(dummy_dataset, src) for src, dst in args.root_dataset_pairs]) 

278 self.run_cmd(["bzfs"] + opts) 

279 

280 if args.prune_src_snapshots: 

281 opts = ["--delete-dst-snapshots", f"--delete-dst-snapshots-except-plan={src_snapshot_plan}"] 

282 opts += [f"--log-file-prefix={prog_name}{sep}prune-src-snapshots{sep}"] 

283 prune_src(opts) 

284 

285 if args.prune_src_bookmarks: 

286 opts = ["--delete-dst-snapshots=bookmarks", f"--delete-dst-snapshots-except-plan={src_bookmark_plan}"] 

287 opts += [f"--log-file-prefix={prog_name}{sep}prune-src-bookmarks{sep}"] 

288 prune_src(opts) 

289 

290 if args.prune_dst_snapshots: 

291 assert retain_dst_targets, "--retain-dst-targets must not be empty. Cowardly refusing to delete all snapshots!" 

292 retain_targets = {target for target, dst_hostname in retain_dst_targets.items() if dst_hostname == localhostname} 

293 dst_snapshot_plan = { # only retain targets that belong to the host executing bzfs_jobrunner 

294 org: {target: periods for target, periods in target_periods.items() if target in retain_targets} 

295 for org, target_periods in dst_snapshot_plan.items() 

296 } 

297 opts = ["--delete-dst-snapshots", "--skip-replication"] 

298 opts += [f"--delete-dst-snapshots-except-plan={dst_snapshot_plan}"] 

299 opts += [f"--daemon-frequency={args.daemon_prune_dst_frequency}"] 

300 opts += [f"--log-file-prefix={prog_name}{sep}prune-dst-snapshots{sep}"] 

301 opts += [f"--log-file-suffix={sep}"] 

302 opts += unknown_args + ["--"] 

303 old_len_opts = len(opts) 

304 pairs = [(dummy_dataset, resolve_dst_dataset(dst, localhostname)) for src, dst in args.root_dataset_pairs] 

305 for src, dst in self.skip_datasets_with_nonexisting_dst_pool(pairs): 

306 opts += [src, dst] 

307 if len(opts) > old_len_opts: 

308 self.run_cmd(["bzfs"] + opts) 

309 

310 ex = self.first_exception 

311 if ex is not None and ((not isinstance(ex, subprocess.CalledProcessError)) or ex.returncode != still_running_status): 

312 raise ex 

313 

314 def run_cmd(self, *params) -> None: 

315 try: 

316 subprocess.run(*params, stdin=DEVNULL, text=True, check=True) 

317 except (subprocess.CalledProcessError, subprocess.TimeoutExpired, UnicodeDecodeError) as e: 

318 if self.first_exception is None: 

319 self.first_exception = e 

320 self.log.error("%s", str(e)) # log exception and keep on trucking 

321 

322 def replication_filter_opts( 

323 self, dst_snapshot_plan: Dict, kind: str, targets: Set[str], src_hostname: str, dst_hostname: str 

324 ) -> List[str]: 

325 log = self.log 

326 log.info("%s", f"Replicating targets {sorted(targets)} in {kind} mode from {src_hostname} to {dst_hostname} ...") 

327 include_snapshot_plan = { # only replicate targets that belong to the destination host and are relevant 

328 org: { 

329 target: { 

330 duration_unit: duration_amount 

331 for duration_unit, duration_amount in periods.items() 

332 if duration_amount > 0 

333 } 

334 for target, periods in target_periods.items() 

335 if target in targets 

336 } 

337 for org, target_periods in dst_snapshot_plan.items() 

338 } 

339 include_snapshot_plan = { # only replicate orgs that have at least one relevant target_period 

340 org: target_periods 

341 for org, target_periods in include_snapshot_plan.items() 

342 if any(len(periods) > 0 for target, periods in target_periods.items()) 

343 } 

344 opts = [] 

345 if len(include_snapshot_plan) > 0: 

346 opts += [f"--include-snapshot-plan={include_snapshot_plan}"] 

347 opts += [f"--log-file-prefix={prog_name}{sep}{kind}{sep}"] 

348 opts += [f"--log-file-suffix={sep}{src_hostname}{sep}{dst_hostname}{sep}"] 

349 return opts 

350 

351 def skip_datasets_with_nonexisting_dst_pool(self, root_dataset_pairs) -> List[Tuple[str, str]]: 

352 def zpool(dataset: str) -> str: 

353 return dataset.split("/", 1)[0] 

354 

355 assert len(root_dataset_pairs) > 0 

356 pools = {zpool(dst) for src, dst in root_dataset_pairs} 

357 cmd = "zfs list -t filesystem,volume -Hp -o name".split(" ") + sorted(pools) 

358 existing_pools = set(subprocess.run(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, text=True).stdout.splitlines()) 

359 results = [] 

360 for src, dst in root_dataset_pairs: 

361 if zpool(dst) in existing_pools: 

362 results.append((src, dst)) 

363 else: 

364 self.log.warning("Skipping dst dataset for which dst pool does not exist: %s", dst) 

365 return results 

366 

367 

368############################################################################# 

369def dedupe(root_dataset_pairs: List[Tuple[str, str]]) -> List[str]: 

370 results = [] 

371 for src, dst in dict.fromkeys(root_dataset_pairs).keys(): 

372 results += [src, dst] 

373 return results 

374 

375 

376def format_dict(dictionary) -> str: 

377 return f'"{dictionary}"' 

378 

379 

380def get_logger() -> logging.Logger: 

381 class LevelFormatter(logging.Formatter): 

382 def format(self, record): 

383 record.level_initial = record.levelname[0] # Use first letter of the level name 

384 return super().format(record) 

385 

386 log = logging.getLogger(prog_name) 

387 log.setLevel(logging.INFO) 

388 log.propagate = False 

389 if not any(isinstance(h, logging.StreamHandler) for h in log.handlers): 

390 handler = logging.StreamHandler() 

391 handler.setFormatter(LevelFormatter(fmt="%(asctime)s [%(level_initial)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S")) 

392 log.addHandler(handler) 

393 return log 

394 

395 

396############################################################################# 

397class DatasetPairsAction(argparse.Action): 

398 def __call__(self, parser, namespace, datasets, option_string=None): 

399 if len(datasets) % 2 != 0: 

400 parser.error(f"Each SRC_DATASET must have a corresponding DST_DATASET: {datasets}") 

401 root_dataset_pairs = [(datasets[i], datasets[i + 1]) for i in range(0, len(datasets), 2)] 

402 setattr(namespace, self.dest, root_dataset_pairs) 

403 

404 

405############################################################################# 

406if __name__ == "__main__": 406 ↛ 407line 406 didn't jump to line 407 because the condition on line 406 was never true

407 main()