Coverage for bzfs/bzfs_jobrunner.py: 99%
189 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-13 10:14 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-13 10:14 +0000
1#!/usr/bin/env python3
2#
3# Copyright 2024 Wolfgang Hoschek AT mac DOT com
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
17"""WARNING: For now, `bzfs_jobrunner` is work-in-progress, and as such may still change in incompatible ways."""
19import argparse
20import ast
21import logging
22import socket
23import subprocess
24import sys
25from collections import defaultdict
26from typing import Dict, List, Optional, Set, Tuple
28prog_name = "bzfs_jobrunner"
31def argument_parser() -> argparse.ArgumentParser:
32 # fmt: off
33 parser = argparse.ArgumentParser(
34 prog=prog_name,
35 description=f"""
36WARNING: For now, `bzfs_jobrunner` is work-in-progress, and as such may still change in incompatible ways.
38This program is a convenience wrapper around [bzfs](README.md) that simplifies periodic ZFS snapshot creation, replication,
39and pruning, across source host and multiple destination hosts, using a single shared
40[jobconfig](bzfs_tests/bzfs_job_example.py) script.
42Typically, a cron job on the source host runs `{prog_name}` periodically to create new snapshots (via --create-src-snapshots)
43and prune outdated snapshots and bookmarks on the source (via --prune-src-snapshots and --prune-src-bookmarks), whereas
44another cron job on the destination host runs `{prog_name}` periodically to prune outdated destination snapshots (via
45--prune-dst-snapshots). Yet another cron job runs `{prog_name}` periodically to replicate the recently created snapshots from
46the source to the destination (via --replicate). The frequency of these periodic activities can vary by activity, and is
47typically every second, minute, hour, day, week, month and/or year (or multiples thereof).
49Edit the jobconfig script in a central place (e.g. versioned in a git repo), then copy the (very same) shared file onto the
50source host and all destination hosts, and add crontab entries or systemd timers or similar, along these lines:
52* crontab on source host:
54`* * * * * testuser /etc/bzfs/bzfs_job_example.py --create-src-snapshots --prune-src-snapshots --prune-src-bookmarks`
56* crontab on destination host(s):
58`* * * * * testuser /etc/bzfs/bzfs_job_example.py --replicate=pull --prune-dst-snapshots`
60### High Frequency Replication (Experimental Feature)
62Taking snapshots, and/or replicating, from every N milliseconds to every 10 seconds or so is considered high frequency. For
63such use cases, consider that `zfs list -t snapshot` performance degrades as more and more snapshots currently exist within
64the selected datasets, so try to keep the number of currently existing snapshots small, and prune them at a frequency that
65is proportional to the frequency with which snapshots are created. Consider using `--skip-parent` and `--exclude-dataset*`
66filters to limit the selected datasets only to those that require this level of frequency.
68In addition, use the `--daemon-*` options to reduce startup overhead, in combination with splitting the crontab entry (or
69better: high frequency systemd timer) into multiple processes, using pull replication mode, along these lines:
71* crontab on source host:
73`* * * * * testuser /etc/bzfs/bzfs_job_example.py --create-src-snapshots`
75`* * * * * testuser /etc/bzfs/bzfs_job_example.py --prune-src-snapshots`
77`* * * * * testuser /etc/bzfs/bzfs_job_example.py --prune-src-bookmarks`
79* crontab on destination host(s):
81`* * * * * testuser /etc/bzfs/bzfs_job_example.py --replicate=pull`
83`* * * * * testuser /etc/bzfs/bzfs_job_example.py --prune-dst-snapshots`
85The daemon processes work like non-daemon processes except that they loop, handle time events and sleep between events, and
86finally exit after, say, 86400 seconds (whatever you specify via `--daemon-lifetime`). The daemons will subsequently be
87auto-restarted by 'cron', or earlier if they fail. While the daemons are running 'cron' will attempt to start new
88(unnecessary) daemons but this is benign as these new processes immediately exit with a message like this:
89"Exiting as same previous periodic job is still running without completion yet"
90""", formatter_class=argparse.RawTextHelpFormatter)
92 # commands:
93 parser.add_argument("--create-src-snapshots", action="store_true",
94 help="Take snapshots on src as necessary. This command should be called by a program (or cron job) running on the "
95 "src host.\n\n")
96 parser.add_argument("--replicate", choices=["pull", "push"], default=None, const="pull", nargs="?",
97 help="Replicate snapshots from src to dst as necessary, either in pull mode (recommended) or push mode "
98 "(experimental). For pull mode, this command should be called by a program (or cron job) running on the dst "
99 "host; for push mode, on the src host.\n\n")
100 parser.add_argument("--prune-src-snapshots", action="store_true",
101 help="Prune snapshots on src as necessary. This command should be called by a program (or cron job) running on the "
102 "src host.\n\n")
103 parser.add_argument("--prune-src-bookmarks", action="store_true",
104 help="Prune bookmarks on src as necessary. This command should be called by a program (or cron job) running on the "
105 "src host.\n\n")
106 parser.add_argument("--prune-dst-snapshots", action="store_true",
107 help="Prune snapshots on dst as necessary. This command should be called by a program (or cron job) running on the "
108 "dst host.\n\n")
110 # options:
111 parser.add_argument("--src-host", default="-", metavar="STRING",
112 help="Network hostname of src. Used by destination host(s) if replicating in pull mode.\n\n")
113 parser.add_argument("--localhost", default=None, metavar="STRING",
114 help="Hostname of localhost. Default is the hostname without the domain name.\n\n")
115 dst_hosts_example = {"onsite": "nas", "us-west-1": "bak-us-west-1", "eu-west-1": "bak-eu-west-1", "offsite": "archive"}
116 parser.add_argument("--dst-hosts", default="{}", metavar="DICT_STRING",
117 help="Dictionary that maps logical replication target names (the infix portion of a snapshot name) to destination "
118 f"hostnames. Example: `{format_dict(dst_hosts_example)}`. With this, given a snapshot name, we can find the "
119 "destination hostname to which the snapshot shall be replicated. Also, given a snapshot name and its "
120 "--localhost name, a destination host can determine if it shall 'pull' replicate the given snapshot from the "
121 "--src-host, or if the snapshot is intended for another destination host, in which case it skips the snapshot. "
122 f"A destination host running {prog_name} will 'pull' snapshots for all targets that map to its --localhost "
123 "name.\n\n")
124 parser.add_argument("--retain-dst-targets", default="{}", metavar="DICT_STRING",
125 help="Dictionary that maps logical replication target names (the infix portion of a snapshot name) to "
126 f"destination hostnames. Example: `{format_dict(dst_hosts_example)}`. Has same format as --dst-hosts. As part "
127 "of --prune-dst-snapshots, a destination host will delete any snapshot it has stored whose target has no "
128 "mapping to its --localhost name in this dictionary. Do not remove a mapping unless you are sure it's ok to "
129 "delete all those snapshots on that destination host! If in doubt, use --dryrun mode first.\n\n")
130 dst_root_datasets_example = {
131 "nas": "tank2/bak",
132 "bak-us-west-1": "backups/bak001",
133 "bak-eu-west-1": "backups/bak999",
134 "archive": "archives/zoo",
135 "hotspare": "",
136 }
137 parser.add_argument("--dst-root-datasets", default="{}", metavar="DICT_STRING",
138 help="Dictionary that maps each destination hostname to a root dataset located on that destination host. The root "
139 "dataset name is an (optional) prefix that will be prepended to each dataset that is replicated to that "
140 "destination host. For backup use cases, this is the backup ZFS pool or a ZFS dataset path within that pool, "
141 "whereas for cloning, master slave replication, or replication from a primary to a secondary, this can also be "
142 f"the empty string. Example: `{format_dict(dst_root_datasets_example)}`\n\n")
143 src_snapshot_plan_example = {
144 "prod": {
145 "onsite": {"secondly": 40, "minutely": 40, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5},
146 "us-west-1": {"secondly": 0, "minutely": 0, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5},
147 "eu-west-1": {"secondly": 0, "minutely": 0, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5},
148 },
149 "test": {
150 "offsite": {"12hourly": 42, "weekly": 12},
151 },
152 }
153 parser.add_argument("--src-snapshot-plan", default="{}", metavar="DICT_STRING",
154 help="Retention periods for snapshots to be used if pruning src, and when creating new snapshots on src. "
155 "Snapshots that do not match a retention period will be deleted. A zero within a retention period indicates "
156 "that no snapshots shall be retained (or even be created) for the given period.\n\n"
157 f"Example: `{format_dict(src_snapshot_plan_example)}`. This example will, for the organization 'prod' and "
158 "the intended logical target 'onsite', create and then retain secondly snapshots that were created less "
159 "than 40 seconds ago, yet retain the latest 40 secondly snapshots regardless of creation time. Analog for "
160 "the latest 40 minutely snapshots, 36 hourly snapshots, etc. "
161 "It will also create and retain snapshots for the targets 'us-west-1' and 'eu-west-1' within the 'prod' "
162 "organization. "
163 "In addition, it will create and retain snapshots every 12 hours and every week for the 'test' organization, "
164 "and name them as being intended for the 'offsite' replication target. "
165 "The example creates snapshots with names like "
166 "`prod_onsite_<timestamp>_secondly`, `prod_onsite_<timestamp>_minutely`, "
167 "`prod_us-west-1_<timestamp>_hourly`, `prod_us-west-1_<timestamp>_daily`, "
168 "`prod_eu-west-1_<timestamp>_hourly`, `prod_eu-west-1_<timestamp>_daily`, "
169 "`test_offsite_<timestamp>_12hourly`, `test_offsite_<timestamp>_weekly`, and so on.\n\n")
170 parser.add_argument("--src-bookmark-plan", default="{}", metavar="DICT_STRING",
171 help="Retention periods for bookmarks to be used if pruning src. Has same format as --src-snapshot-plan.\n\n")
172 parser.add_argument("--dst-snapshot-plan", default="{}", metavar="DICT_STRING",
173 help="Retention periods for snapshots to be used if pruning dst. Has same format as --src-snapshot-plan.\n\n")
174 parser.add_argument("--src-user", default="", metavar="STRING",
175 help="SSH username on --src-host. Used if replicating in pull mode. Example: 'alice'\n\n")
176 parser.add_argument("--dst-user", default="", metavar="STRING",
177 help="SSH username on dst. Used if replicating in push mode. Example: 'root'\n\n")
178 parser.add_argument("--daemon-replication-frequency", default="minutely", metavar="STRING",
179 help="Specifies how often the bzfs daemon shall replicate from src to dst if --daemon-lifetime is nonzero.\n\n")
180 parser.add_argument("--daemon-prune-src-frequency", default="minutely", metavar="STRING",
181 help="Specifies how often the bzfs daemon shall prune src if --daemon-lifetime is nonzero.\n\n")
182 parser.add_argument("--daemon-prune-dst-frequency", default="minutely", metavar="STRING",
183 help="Specifies how often the bzfs daemon shall prune dst if --daemon-lifetime is nonzero.\n\n")
184 parser.add_argument("root_dataset_pairs", nargs="+", action=DatasetPairsAction, metavar="SRC_DATASET DST_DATASET",
185 help="Source and destination dataset pairs (excluding usernames and excluding hostnames, which will all be "
186 "auto-appended later).\n\n")
187 return parser
188 # fmt: on
191still_running_status = 4
192dummy_dataset = "dummy"
193sep = ","
194DEVNULL = subprocess.DEVNULL
195PIPE = subprocess.PIPE
198def main():
199 Job().run_main(sys.argv[1:])
202#############################################################################
203class Job:
204 def __init__(self, log: Optional[logging.Logger] = None):
205 self.first_exception = None
206 self.log = log if log is not None else get_logger()
208 def run_main(self, sys_argv: List[str]) -> None:
209 self.log.info(
210 "WARNING: For now, `bzfs_jobrunner` is work-in-progress, and as such may still change in incompatible ways."
211 )
212 args, unknown_args = argument_parser().parse_known_args(sys_argv) # forward all unknown args to `bzfs`
213 src_snapshot_plan = ast.literal_eval(args.src_snapshot_plan)
214 src_bookmark_plan = ast.literal_eval(args.src_bookmark_plan)
215 dst_snapshot_plan = ast.literal_eval(args.dst_snapshot_plan)
216 src_host = args.src_host
217 assert src_host, "--src-host must not be empty!"
218 localhostname = args.localhost if args.localhost else socket.gethostname()
219 assert localhostname, "localhostname must not be empty!"
220 dst_hosts = ast.literal_eval(args.dst_hosts)
221 retain_dst_targets = ast.literal_eval(args.retain_dst_targets)
222 dst_root_datasets = ast.literal_eval(args.dst_root_datasets)
224 def resolve_dst_dataset(dst_dataset: str, dst_hostname: str) -> str:
225 root_dataset = dst_root_datasets.get(dst_hostname)
226 assert root_dataset is not None, f"Hostname '{dst_hostname}' missing in --dst-root-datasets: {dst_root_datasets}"
227 return root_dataset + "/" + dst_dataset if root_dataset else dst_dataset
229 if args.create_src_snapshots:
230 opts = ["--create-src-snapshots", f"--create-src-snapshots-plan={src_snapshot_plan}", "--skip-replication"]
231 opts += [f"--log-file-prefix={prog_name}{sep}create-src-snapshots{sep}"]
232 opts += [f"--log-file-suffix={sep}"]
233 opts += unknown_args + ["--"]
234 opts += dedupe([(src, dummy_dataset) for src, dst in args.root_dataset_pairs])
235 self.run_cmd(["bzfs"] + opts)
237 if args.replicate == "pull": # pull mode (recommended)
238 pull_targets = {target for target, dst_hostname in dst_hosts.items() if dst_hostname == localhostname}
239 opts = self.replication_filter_opts(dst_snapshot_plan, "pull", pull_targets, src_host, localhostname)
240 if len(opts) > 0:
241 opts += [f"--ssh-src-user={args.src_user}"] if args.src_user else []
242 opts += unknown_args + ["--"]
243 old_len_opts = len(opts)
244 pairs = [
245 (f"{src_host}:{src}", resolve_dst_dataset(dst, localhostname)) for src, dst in args.root_dataset_pairs
246 ]
247 for src, dst in self.skip_datasets_with_nonexisting_dst_pool(pairs):
248 opts += [src, dst]
249 if len(opts) > old_len_opts:
250 daemon_opts = [f"--daemon-frequency={args.daemon_replication_frequency}"]
251 self.run_cmd(["bzfs"] + daemon_opts + opts)
253 elif args.replicate == "push": # push mode (experimental feature)
254 host_targets = defaultdict(set)
255 for org, targetperiods in dst_snapshot_plan.items():
256 for target in targetperiods.keys():
257 dst_hostname = dst_hosts.get(target)
258 if dst_hostname:
259 host_targets[dst_hostname].add(target)
260 for dst_hostname, push_targets in host_targets.items():
261 opts = self.replication_filter_opts(dst_snapshot_plan, "push", push_targets, localhostname, dst_hostname)
262 if len(opts) > 0:
263 opts += [f"--ssh-dst-user={args.dst_user}"] if args.dst_user else []
264 opts += unknown_args + ["--"]
265 for src, dst in args.root_dataset_pairs:
266 opts += [src, f"{dst_hostname}:{resolve_dst_dataset(dst, dst_hostname)}"]
267 daemon_opts = [f"--daemon-frequency={args.daemon_replication_frequency}"]
268 self.run_cmd(["bzfs"] + daemon_opts + opts)
270 def prune_src(opts: List[str]) -> None:
271 opts += [
272 f"--log-file-suffix={sep}",
273 "--skip-replication",
274 f"--daemon-frequency={args.daemon_prune_src_frequency}",
275 ]
276 opts += unknown_args + ["--"]
277 opts += dedupe([(dummy_dataset, src) for src, dst in args.root_dataset_pairs])
278 self.run_cmd(["bzfs"] + opts)
280 if args.prune_src_snapshots:
281 opts = ["--delete-dst-snapshots", f"--delete-dst-snapshots-except-plan={src_snapshot_plan}"]
282 opts += [f"--log-file-prefix={prog_name}{sep}prune-src-snapshots{sep}"]
283 prune_src(opts)
285 if args.prune_src_bookmarks:
286 opts = ["--delete-dst-snapshots=bookmarks", f"--delete-dst-snapshots-except-plan={src_bookmark_plan}"]
287 opts += [f"--log-file-prefix={prog_name}{sep}prune-src-bookmarks{sep}"]
288 prune_src(opts)
290 if args.prune_dst_snapshots:
291 assert retain_dst_targets, "--retain-dst-targets must not be empty. Cowardly refusing to delete all snapshots!"
292 retain_targets = {target for target, dst_hostname in retain_dst_targets.items() if dst_hostname == localhostname}
293 dst_snapshot_plan = { # only retain targets that belong to the host executing bzfs_jobrunner
294 org: {target: periods for target, periods in target_periods.items() if target in retain_targets}
295 for org, target_periods in dst_snapshot_plan.items()
296 }
297 opts = ["--delete-dst-snapshots", "--skip-replication"]
298 opts += [f"--delete-dst-snapshots-except-plan={dst_snapshot_plan}"]
299 opts += [f"--daemon-frequency={args.daemon_prune_dst_frequency}"]
300 opts += [f"--log-file-prefix={prog_name}{sep}prune-dst-snapshots{sep}"]
301 opts += [f"--log-file-suffix={sep}"]
302 opts += unknown_args + ["--"]
303 old_len_opts = len(opts)
304 pairs = [(dummy_dataset, resolve_dst_dataset(dst, localhostname)) for src, dst in args.root_dataset_pairs]
305 for src, dst in self.skip_datasets_with_nonexisting_dst_pool(pairs):
306 opts += [src, dst]
307 if len(opts) > old_len_opts:
308 self.run_cmd(["bzfs"] + opts)
310 ex = self.first_exception
311 if ex is not None and ((not isinstance(ex, subprocess.CalledProcessError)) or ex.returncode != still_running_status):
312 raise ex
314 def run_cmd(self, *params) -> None:
315 try:
316 subprocess.run(*params, stdin=DEVNULL, text=True, check=True)
317 except (subprocess.CalledProcessError, subprocess.TimeoutExpired, UnicodeDecodeError) as e:
318 if self.first_exception is None:
319 self.first_exception = e
320 self.log.error("%s", str(e)) # log exception and keep on trucking
322 def replication_filter_opts(
323 self, dst_snapshot_plan: Dict, kind: str, targets: Set[str], src_hostname: str, dst_hostname: str
324 ) -> List[str]:
325 log = self.log
326 log.info("%s", f"Replicating targets {sorted(targets)} in {kind} mode from {src_hostname} to {dst_hostname} ...")
327 include_snapshot_plan = { # only replicate targets that belong to the destination host and are relevant
328 org: {
329 target: {
330 duration_unit: duration_amount
331 for duration_unit, duration_amount in periods.items()
332 if duration_amount > 0
333 }
334 for target, periods in target_periods.items()
335 if target in targets
336 }
337 for org, target_periods in dst_snapshot_plan.items()
338 }
339 include_snapshot_plan = { # only replicate orgs that have at least one relevant target_period
340 org: target_periods
341 for org, target_periods in include_snapshot_plan.items()
342 if any(len(periods) > 0 for target, periods in target_periods.items())
343 }
344 opts = []
345 if len(include_snapshot_plan) > 0:
346 opts += [f"--include-snapshot-plan={include_snapshot_plan}"]
347 opts += [f"--log-file-prefix={prog_name}{sep}{kind}{sep}"]
348 opts += [f"--log-file-suffix={sep}{src_hostname}{sep}{dst_hostname}{sep}"]
349 return opts
351 def skip_datasets_with_nonexisting_dst_pool(self, root_dataset_pairs) -> List[Tuple[str, str]]:
352 def zpool(dataset: str) -> str:
353 return dataset.split("/", 1)[0]
355 assert len(root_dataset_pairs) > 0
356 pools = {zpool(dst) for src, dst in root_dataset_pairs}
357 cmd = "zfs list -t filesystem,volume -Hp -o name".split(" ") + sorted(pools)
358 existing_pools = set(subprocess.run(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, text=True).stdout.splitlines())
359 results = []
360 for src, dst in root_dataset_pairs:
361 if zpool(dst) in existing_pools:
362 results.append((src, dst))
363 else:
364 self.log.warning("Skipping dst dataset for which dst pool does not exist: %s", dst)
365 return results
368#############################################################################
369def dedupe(root_dataset_pairs: List[Tuple[str, str]]) -> List[str]:
370 results = []
371 for src, dst in dict.fromkeys(root_dataset_pairs).keys():
372 results += [src, dst]
373 return results
376def format_dict(dictionary) -> str:
377 return f'"{dictionary}"'
380def get_logger() -> logging.Logger:
381 class LevelFormatter(logging.Formatter):
382 def format(self, record):
383 record.level_initial = record.levelname[0] # Use first letter of the level name
384 return super().format(record)
386 log = logging.getLogger(prog_name)
387 log.setLevel(logging.INFO)
388 log.propagate = False
389 if not any(isinstance(h, logging.StreamHandler) for h in log.handlers):
390 handler = logging.StreamHandler()
391 handler.setFormatter(LevelFormatter(fmt="%(asctime)s [%(level_initial)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S"))
392 log.addHandler(handler)
393 return log
396#############################################################################
397class DatasetPairsAction(argparse.Action):
398 def __call__(self, parser, namespace, datasets, option_string=None):
399 if len(datasets) % 2 != 0:
400 parser.error(f"Each SRC_DATASET must have a corresponding DST_DATASET: {datasets}")
401 root_dataset_pairs = [(datasets[i], datasets[i + 1]) for i in range(0, len(datasets), 2)]
402 setattr(namespace, self.dest, root_dataset_pairs)
405#############################################################################
406if __name__ == "__main__": 406 ↛ 407line 406 didn't jump to line 407 because the condition on line 406 was never true
407 main()