Coverage for bzfs_main/bzfs_jobrunner.py: 99%
630 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15# Inline script metadata conforming to https://packaging.python.org/specifications/inline-script-metadata
16# /// script
17# requires-python = ">=3.8"
18# dependencies = []
19# ///
20#
21"""
22* High-level orchestrator that calls `bzfs` as part of complex, periodic workflows to manage backup, replication, and pruning
23 jobs across a fleet of multiple source and destination hosts; driven by a job config file (e.g., `bzfs_job_example.py`).
24* Overview of the bzfs_jobrunner.py codebase:
25* The codebase starts with docs, definition of input data and associated argument parsing of CLI options/parameters.
26* Control flow starts in main(), far below ..., which kicks off a "Job".
27* A Job creates zero or more "subjobs" for each local or remote host, via run_main().
28* It executes the subjobs, serially or in parallel, via run_subjobs(), which in turn delegates parallel job coordination to
29 bzfs.process_datasets_in_parallel_and_fault_tolerant().
30* README_bzfs_jobrunner.md is mostly auto-generated from the ArgumentParser help texts as the source of "truth", via
31update_readme.sh. Simply run that script whenever you change or add ArgumentParser help text.
32"""
34from __future__ import annotations
35import argparse
36import contextlib
37import os
38import pwd
39import random
40import socket
41import subprocess
42import sys
43import threading
44import time
45import uuid
46from ast import literal_eval
47from logging import Logger
48from subprocess import DEVNULL, PIPE
49from typing import Any, Iterable, TypeVar, Union
51import bzfs_main.argparse_actions
52import bzfs_main.check_range
53import bzfs_main.utils
54from bzfs_main import bzfs
55from bzfs_main.argparse_cli import PROG_AUTHOR, SKIP_ON_ERROR_DEFAULT
56from bzfs_main.detect import DUMMY_DATASET
57from bzfs_main.loggers import get_simple_logger
58from bzfs_main.parallel_engine import (
59 BARRIER_CHAR,
60 process_datasets_in_parallel_and_fault_tolerant,
61)
62from bzfs_main.utils import (
63 DIE_STATUS,
64 LOG_TRACE,
65 format_dict,
66 human_readable_duration,
67 percent,
68 shuffle_dict,
69)
70from bzfs_main.utils import PROG_NAME as BZFS_PROG_NAME
72# constants:
73PROG_NAME: str = "bzfs_jobrunner"
74SRC_MAGIC_SUBSTITUTION_TOKEN: str = "^SRC_HOST" # noqa: S105
75DST_MAGIC_SUBSTITUTION_TOKEN: str = "^DST_HOST" # noqa: S105
76SEP: str = ","
79def argument_parser() -> argparse.ArgumentParser:
80 """Returns the CLI parser used by bzfs_jobrunner."""
81 # fmt: off
82 parser = argparse.ArgumentParser(
83 prog=PROG_NAME,
84 allow_abbrev=False,
85 formatter_class=argparse.RawTextHelpFormatter,
86 description=f"""
87This program is a convenience wrapper around [bzfs](README.md) that simplifies periodic ZFS snapshot creation, replication,
88pruning, and monitoring, across N source hosts and M destination hosts, using a single shared
89[jobconfig](bzfs_tests/bzfs_job_example.py) script.
90For example, this simplifies the deployment of an efficient geo-replicated backup service where each of the M destination
91hosts is located in a separate geographic region and receives replicas from (the same set of) N source hosts. It also
92simplifies low latency replication from a primary to a secondary or to M read replicas, or backup to removable drives, etc.
94This program can be used to efficiently replicate ...
96a) within a single machine (local mode), or
98b) from a single source host to one or more destination hosts (pull or push or pull-push mode), or
100c) from multiple source hosts to a single destination host (pull or push or pull-push mode), or
102d) from N source hosts to M destination hosts (pull or push or pull-push mode, N and M can be large, M=2 or M=3 are typical
103geo-replication factors)
105You can run this program on a single third-party host and have that talk to all source hosts and destination hosts, which is
106convenient for basic use cases and for testing.
107However, typically, a cron job on each source host runs `{PROG_NAME}` periodically to create new snapshots (via
108--create-src-snapshots) and prune outdated snapshots and bookmarks on the source (via --prune-src-snapshots and
109--prune-src-bookmarks), whereas another cron job on each destination host runs `{PROG_NAME}` periodically to prune
110outdated destination snapshots (via --prune-dst-snapshots), and to replicate the recently created snapshots from the source
111to the destination (via --replicate).
112Yet another cron job on each source and each destination runs `{PROG_NAME}` periodically to alert the user if the latest or
113oldest snapshot is somehow too old (via --monitor-src-snapshots and --monitor-dst-snapshots).
114The frequency of these periodic activities can vary by activity, and is typically every second, minute, hour, day, week,
115month and/or year (or multiples thereof).
117Edit the jobconfig script in a central place (e.g. versioned in a git repo), then copy the (very same) shared file onto all
118source hosts and all destination hosts, and add crontab entries (or systemd timers or Monit entries or similar), along these
119lines:
121* crontab on source hosts:
123`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --create-src-snapshots --prune-src-snapshots --prune-src-bookmarks`
125`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --monitor-src-snapshots`
128* crontab on destination hosts:
130`* * * * * testuser /etc/bzfs/bzfs_job_example.py --dst-host="$(hostname)" --replicate --prune-dst-snapshots`
132`* * * * * testuser /etc/bzfs/bzfs_job_example.py --dst-host="$(hostname)" --monitor-dst-snapshots`
134### High Frequency Replication (Experimental Feature)
136Taking snapshots, and/or replicating, from every N milliseconds to every 10 seconds or so is considered high frequency. For
137such use cases, consider that `zfs list -t snapshot` performance degrades as more and more snapshots currently exist within
138the selected datasets, so try to keep the number of currently existing snapshots small, and prune them at a frequency that
139is proportional to the frequency with which snapshots are created. Consider using `--skip-parent` and `--exclude-dataset*`
140filters to limit the selected datasets only to those that require this level of frequency.
142In addition, use the `--daemon-*` options to reduce startup overhead, in combination with splitting the crontab entry (or
143better: high frequency systemd timer) into multiple processes, from a single source host to a single destination host,
144along these lines:
146* crontab on source hosts:
148`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --create-src-snapshots`
150`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --prune-src-snapshots`
152`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --prune-src-bookmarks`
154`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --monitor-src-snapshots`
157* crontab on destination hosts:
159`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="bar" --dst-host="$(hostname)" --replicate`
161`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="bar" --dst-host="$(hostname)" --prune-dst-snapshots`
163`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="bar" --dst-host="$(hostname)" --monitor-dst-snapshots`
165The daemon processes work like non-daemon processes except that they loop, handle time events and sleep between events, and
166finally exit after, say, 86400 seconds (whatever you specify via `--daemon-lifetime`). The daemons will subsequently be
167auto-restarted by 'cron', or earlier if they fail. While the daemons are running, 'cron' will attempt to start new
168(unnecessary) daemons but this is benign as these new processes immediately exit with a message like this:
169"Exiting as same previous periodic job is still running without completion yet"
170""")
172 # commands:
173 parser.add_argument(
174 "--create-src-snapshots", action="store_true",
175 help="Take snapshots on the selected source hosts as necessary. Typically, this command should be called by a "
176 "program (or cron job) running on each src host.\n\n")
177 parser.add_argument( # `choices` are deprecated; use --replicate without argument instead
178 "--replicate", choices=["pull", "push"], default=None, const="pull", nargs="?", metavar="",
179 help="Replicate snapshots from the selected source hosts to the selected destinations hosts as necessary. For pull "
180 "mode (recommended), this command should be called by a program (or cron job) running on each dst "
181 "host; for push mode, on the src host; for pull-push mode on a third-party host.\n\n")
182 parser.add_argument(
183 "--prune-src-snapshots", action="store_true",
184 help="Prune snapshots on the selected source hosts as necessary. Typically, this command should be called by a "
185 "program (or cron job) running on each src host.\n\n")
186 parser.add_argument(
187 "--prune-src-bookmarks", action="store_true",
188 help="Prune bookmarks on the selected source hosts as necessary. Typically, this command should be called by a "
189 "program (or cron job) running on each src host.\n\n")
190 parser.add_argument(
191 "--prune-dst-snapshots", action="store_true",
192 help="Prune snapshots on the selected destination hosts as necessary. Typically, this command should be called by a "
193 "program (or cron job) running on each dst host.\n\n")
194 parser.add_argument(
195 "--monitor-src-snapshots", action="store_true",
196 help="Alert the user if snapshots on the selected source hosts are too old, using --monitor-snapshot-plan (see "
197 "below). Typically, this command should be called by a program (or cron job) running on each src host.\n\n")
198 parser.add_argument(
199 "--monitor-dst-snapshots", action="store_true",
200 help="Alert the user if snapshots on the selected destination hosts are too old, using --monitor-snapshot-plan (see "
201 "below). Typically, this command should be called by a program (or cron job) running on each dst host.\n\n")
203 # options:
204 parser.add_argument(
205 "--localhost", default=None, action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING",
206 help="Hostname of localhost. Default is the hostname without the domain name, querying the Operating System.\n\n")
207 parser.add_argument(
208 "--src-hosts", default=None, metavar="LIST_STRING",
209 help="Hostnames of the sources to operate on.\n\n")
210 parser.add_argument(
211 "--src-host", default=None, action="append", metavar="STRING",
212 help="For subsetting --src-hosts; Can be specified multiple times; Indicates to only use the --src-hosts that are "
213 "contained in the specified --src-host values (optional).\n\n")
214 dst_hosts_example = {"nas": ["onsite"], "bak-us-west-1": ["us-west-1"],
215 "bak-eu-west-1": ["eu-west-1"], "archive": ["offsite"]}
216 parser.add_argument(
217 "--dst-hosts", default="{}", metavar="DICT_STRING",
218 help="Dictionary that maps each destination hostname to a list of zero or more logical replication target names "
219 "(the infix portion of snapshot name). "
220 f"Example: `{format_dict(dst_hosts_example)}`.\n\n"
221 "With this, given a snapshot name, we can find the destination hostnames to which the snapshot shall be "
222 "replicated. Also, given a snapshot name and its own name, a destination host can determine if it shall "
223 "replicate the given snapshot from the source host, or if the snapshot is intended for another destination "
224 "host, in which case it skips the snapshot. A destination host will receive replicas of snapshots for all "
225 "targets that map to that destination host.\n\n"
226 "Removing a mapping can be used to temporarily suspend replication to a given destination host.\n\n")
227 parser.add_argument(
228 "--dst-host", default=None, action="append", metavar="STRING",
229 help="For subsetting --dst-hosts; Can be specified multiple times; Indicates to only use the --dst-hosts keys that "
230 "are contained in the specified --dst-host values (optional).\n\n")
231 parser.add_argument(
232 "--retain-dst-targets", default="{}", metavar="DICT_STRING",
233 help="Dictionary that maps each destination hostname to a list of zero or more logical replication target names "
234 "(the infix portion of snapshot name). "
235 f"Example: `{format_dict(dst_hosts_example)}`. Has same format as --dst-hosts.\n\n"
236 "As part of --prune-dst-snapshots, a destination host will delete any snapshot it has stored whose target has "
237 "no mapping to that destination host in this dictionary. Do not remove a mapping here unless you are sure it's "
238 "ok to delete all those snapshots on that destination host! If in doubt, use --dryrun mode first.\n\n")
239 dst_root_datasets_example = {
240 "nas": "tank2/bak",
241 "bak-us-west-1": "backups/bak001",
242 "bak-eu-west-1": "backups/bak999",
243 "archive": f"archives/zoo/{SRC_MAGIC_SUBSTITUTION_TOKEN}",
244 "hotspare": "",
245 }
246 parser.add_argument(
247 "--dst-root-datasets", default="{}", metavar="DICT_STRING",
248 help="Dictionary that maps each destination hostname to a root dataset located on that destination host. The root "
249 "dataset name is an (optional) prefix that will be prepended to each dataset that is replicated to that "
250 "destination host. For backup use cases, this is the backup ZFS pool or a ZFS dataset path within that pool, "
251 "whereas for cloning, master slave replication, or replication from a primary to a secondary, this can also be "
252 "the empty string.\n\n"
253 f"`{SRC_MAGIC_SUBSTITUTION_TOKEN}` and `{DST_MAGIC_SUBSTITUTION_TOKEN}` are optional magic substitution tokens "
254 "that will be auto-replaced at runtime with the actual hostname. This can be used to force the use of a "
255 "separate destination root dataset per source host or per destination host.\n\n"
256 f"Example: `{format_dict(dst_root_datasets_example)}`\n\n")
257 src_snapshot_plan_example = {
258 "prod": {
259 "onsite": {"secondly": 40, "minutely": 40, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5},
260 "us-west-1": {"secondly": 0, "minutely": 0, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5},
261 "eu-west-1": {"secondly": 0, "minutely": 0, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5},
262 },
263 "test": {
264 "offsite": {"12hourly": 42, "weekly": 12},
265 },
266 }
267 parser.add_argument(
268 "--src-snapshot-plan", default="{}", metavar="DICT_STRING",
269 help="Retention periods for snapshots to be used if pruning src, and when creating new snapshots on src. "
270 "Snapshots that do not match a retention period will be deleted. A zero or missing retention period indicates "
271 "that no snapshots shall be retained (or even be created) for the given period.\n\n"
272 f"Example: `{format_dict(src_snapshot_plan_example)}`. This example will, for the organization 'prod' and "
273 "the intended logical target 'onsite', create and then retain secondly snapshots that were created less "
274 "than 40 seconds ago, yet retain the latest 40 secondly snapshots regardless of creation time. Analog for "
275 "the latest 40 minutely snapshots, 36 hourly snapshots, etc. "
276 "It will also create and retain snapshots for the targets 'us-west-1' and 'eu-west-1' within the 'prod' "
277 "organization. "
278 "In addition, it will create and retain snapshots every 12 hours and every week for the 'test' organization, "
279 "and name them as being intended for the 'offsite' replication target. "
280 "The example creates snapshots with names like "
281 "`prod_onsite_<timestamp>_secondly`, `prod_onsite_<timestamp>_minutely`, "
282 "`prod_us-west-1_<timestamp>_hourly`, `prod_us-west-1_<timestamp>_daily`, "
283 "`prod_eu-west-1_<timestamp>_hourly`, `prod_eu-west-1_<timestamp>_daily`, "
284 "`test_offsite_<timestamp>_12hourly`, `test_offsite_<timestamp>_weekly`, and so on.\n\n")
285 parser.add_argument(
286 "--src-bookmark-plan", default="{}", metavar="DICT_STRING",
287 help="Retention periods for bookmarks to be used if pruning src. Has same format as --src-snapshot-plan.\n\n")
288 parser.add_argument(
289 "--dst-snapshot-plan", default="{}", metavar="DICT_STRING",
290 help="Retention periods for snapshots to be used if pruning dst. Has same format as --src-snapshot-plan.\n\n")
291 monitor_snapshot_plan_example = {
292 "prod": {
293 "onsite": {
294 "100millisecondly": {"warning": "650 milliseconds", "critical": "2 seconds"},
295 "secondly": {"warning": "2 seconds", "critical": "14 seconds"},
296 "minutely": {"warning": "30 seconds", "critical": "300 seconds"},
297 "hourly": {"warning": "30 minutes", "critical": "300 minutes"},
298 "daily": {"warning": "4 hours", "critical": "8 hours"},
299 "weekly": {"warning": "2 days", "critical": "8 days"},
300 "monthly": {"warning": "2 days", "critical": "8 days"},
301 "yearly": {"warning": "5 days", "critical": "14 days"},
302 "10minutely": {"warning": "0 minutes", "critical": "0 minutes"},
303 },
304 "": {
305 "daily": {"warning": "4 hours", "critical": "8 hours"},
306 },
307 },
308 }
309 parser.add_argument(
310 "--monitor-snapshot-plan", default="{}", metavar="DICT_STRING",
311 help="Alert the user if the ZFS 'creation' time property of the latest or oldest snapshot for any specified "
312 "snapshot pattern within the selected datasets is too old wrt. the specified age limit. The purpose is to "
313 "check if snapshots are successfully taken on schedule, successfully replicated on schedule, and successfully "
314 "pruned on schedule. "
315 "Process exit code is 0, 1, 2 on OK, WARNING, CRITICAL, respectively. "
316 f"Example DICT_STRING: `{format_dict(monitor_snapshot_plan_example)}`. "
317 "This example alerts the user if the latest src or dst snapshot named `prod_onsite_<timestamp>_hourly` is more "
318 "than 30 minutes late (i.e. more than 30+60=90 minutes old) [warning] or more than 300 minutes late (i.e. more "
319 "than 300+60=360 minutes old) [critical]. In addition, the example alerts the user if the oldest src or dst "
320 "snapshot named `prod_onsite_<timestamp>_hourly` is more than 30 + 60x36 minutes old [warning] or more than "
321 "300 + 60x36 minutes old [critical], where 36 is the number of period cycles specified in `src_snapshot_plan` "
322 "or `dst_snapshot_plan`, respectively. "
323 "Analog for the latest snapshot named `prod_<timestamp>_daily`, and so on.\n\n"
324 "Note: A duration that is missing or zero (e.g. '0 minutes') indicates that no snapshots shall be checked for "
325 "the given snapshot name pattern.\n\n")
326 locations = ["src", "dst"]
327 for loc in locations:
328 parser.add_argument(
329 f"--ssh-{loc}-user", default="", metavar="STRING",
330 help=f"Remote SSH username on {loc} hosts to connect to (optional). Examples: 'root', 'alice'.\n\n")
331 for loc in locations:
332 parser.add_argument(
333 f"--ssh-{loc}-port", type=int, metavar="INT",
334 help=f"Remote SSH port on {loc} host to connect to (optional).\n\n")
335 for loc in locations:
336 parser.add_argument(
337 f"--ssh-{loc}-config-file", type=str, action=bzfs_main.argparse_actions.SSHConfigFileNameAction, metavar="FILE",
338 help=f"Path to SSH ssh_config(5) file to connect to {loc} (optional); will be passed into ssh -F CLI. "
339 "The basename must contain the substring 'bzfs_ssh_config'.\n\n")
340 parser.add_argument(
341 "--src-user", default="", metavar="STRING",
342 help=argparse.SUPPRESS) # deprecated; was renamed to --ssh-src-user
343 parser.add_argument(
344 "--dst-user", default="", metavar="STRING",
345 help=argparse.SUPPRESS) # deprecated; was renamed to --ssh-dst-user
346 parser.add_argument(
347 "--job-id", required=True, action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING",
348 help="The identifier that remains constant across all runs of this particular job; will be included in the log file "
349 "name infix. Example: mytestjob\n\n")
350 parser.add_argument(
351 "--jobid", default=None, action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING",
352 help=argparse.SUPPRESS) # deprecated; was renamed to --job-run
353 parser.add_argument(
354 "--job-run", default=None, action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING",
355 help="The identifier of this particular run of the overall job; will be included in the log file name suffix. "
356 "Default is a hex UUID. Example: 0badc0f003a011f0a94aef02ac16083c\n\n")
357 workers_default = 100 # percent
358 parser.add_argument(
359 "--workers", min=1, default=(workers_default, True), action=bzfs_main.argparse_actions.CheckPercentRange,
360 metavar="INT[%]",
361 help="The maximum number of jobs to run in parallel at any time; can be given as a positive integer, "
362 f"optionally followed by the %% percent character (min: %(min)s, default: {workers_default}%%). Percentages "
363 "are relative to the number of CPU cores on the machine. Example: 200%% uses twice as many parallel jobs as "
364 "there are cores on the machine; 75%% uses num_procs = num_cores * 0.75. Examples: 1, 4, 75%%, 150%%\n\n")
365 parser.add_argument(
366 "--work-period-seconds", type=float, min=0, default=0, action=bzfs_main.check_range.CheckRange, metavar="FLOAT",
367 help="Reduces bandwidth spikes by spreading out the start of worker jobs over this much time; "
368 "0 disables this feature (default: %(default)s). Examples: 0, 60, 86400\n\n")
369 parser.add_argument(
370 "--jitter", action="store_true",
371 help="Randomize job start time and host order to avoid potential thundering herd problems in large distributed "
372 "systems (optional). Randomizing job start time is only relevant if --work-period-seconds > 0.\n\n")
373 parser.add_argument(
374 "--worker-timeout-seconds", type=float, min=0, default=None, action=bzfs_main.check_range.CheckRange,
375 metavar="FLOAT",
376 help="If this much time has passed after a worker process has started executing, kill the straggling worker "
377 "(optional). Other workers remain unaffected. Examples: 60, 3600\n\n")
378 parser.add_argument(
379 "--spawn_process_per_job", action="store_true",
380 help=argparse.SUPPRESS)
381 parser.add_argument(
382 "--jobrunner-dryrun", action="store_true",
383 help="Do a dry run (aka 'no-op') to print what operations would happen if the command were to be executed "
384 "for real (optional). This option treats both the ZFS source and destination as read-only. Can also be used to "
385 "check if the configuration options are valid.\n\n")
386 parser.add_argument(
387 "--jobrunner-log-level", choices=["CRITICAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"], default="INFO",
388 help="Only emit jobrunner messages with equal or higher priority than this log level. Default is '%(default)s'.\n\n")
389 parser.add_argument(
390 "--version", action="version", version=f"{PROG_NAME}-{bzfs_main.argparse_cli.__version__}, by {PROG_AUTHOR}",
391 help="Display version information and exit.\n\n")
392 parser.add_argument(
393 "--help, -h", action="help",
394 help="Show this help message and exit.\n\n")
395 parser.add_argument(
396 "--daemon-replication-frequency", default="minutely", metavar="STRING",
397 help="Specifies how often the bzfs daemon shall replicate from src to dst if --daemon-lifetime is nonzero.\n\n")
398 parser.add_argument(
399 "--daemon-prune-src-frequency", default="minutely", metavar="STRING",
400 help="Specifies how often the bzfs daemon shall prune src if --daemon-lifetime is nonzero.\n\n")
401 parser.add_argument(
402 "--daemon-prune-dst-frequency", default="minutely", metavar="STRING",
403 help="Specifies how often the bzfs daemon shall prune dst if --daemon-lifetime is nonzero.\n\n")
404 parser.add_argument(
405 "--daemon-monitor-snapshots-frequency", default="minutely", metavar="STRING",
406 help="Specifies how often the bzfs daemon shall monitor snapshot age if --daemon-lifetime is nonzero.\n\n")
407 bad_opts = ["--daemon-frequency", "--include-snapshot-plan", "--create-src-snapshots-plan", "--skip-replication",
408 "--log-file-prefix", "--log-file-infix", "--log-file-suffix", "--log-config-file", "--log-config-var",
409 "--delete-dst-datasets", "--delete-dst-snapshots", "--delete-dst-snapshots-except",
410 "--delete-dst-snapshots-except-plan", "--delete-empty-dst-datasets",
411 "--monitor-snapshots", "--timeout"]
412 for loc in locations:
413 bad_opts += [f"--ssh-{loc}-host"] # reject this arg as jobrunner will auto-generate it
414 for bad_opt in bad_opts:
415 parser.add_argument(bad_opt, action=RejectArgumentAction, nargs=0, help=argparse.SUPPRESS)
416 parser.add_argument(
417 "--root-dataset-pairs", required=True, nargs="+", action=bzfs_main.argparse_actions.DatasetPairsAction,
418 metavar="SRC_DATASET DST_DATASET",
419 help="Source and destination dataset pairs (excluding usernames and excluding hostnames, which will all be "
420 "auto-appended later).\n\n")
421 return parser
422 # fmt: on
425#############################################################################
426def main() -> None:
427 """API for command line clients."""
428 Job().run_main(sys.argv)
431#############################################################################
432class Job:
433 """Coordinates subjobs per the CLI flags; Each subjob handles one host pair and may run in its own process or thread."""
435 def __init__(self, log: Logger | None = None) -> None:
436 """Initialize caches, parsers and logger shared across subjobs."""
437 # immutable variables:
438 self.jobrunner_dryrun: bool = False
439 self.spawn_process_per_job: bool = False
440 self.log: Logger = log if log is not None else get_simple_logger(PROG_NAME)
441 self.bzfs_argument_parser: argparse.ArgumentParser = bzfs.argument_parser()
442 self.argument_parser: argparse.ArgumentParser = argument_parser()
443 self.loopback_address: str = _detect_loopback_address()
445 # mutable variables:
446 self.first_exception: int | None = None
447 self.stats: Stats = Stats()
448 self.cache_existing_dst_pools: set[str] = set()
449 self.cache_known_dst_pools: set[str] = set()
451 self.is_test_mode: bool = False # for testing only
453 def run_main(self, sys_argv: list[str]) -> None:
454 """API for Python clients; visible for testing; may become a public API eventually."""
455 self.first_exception = None
456 log: Logger = self.log
457 log.info("CLI arguments: %s", " ".join(sys_argv))
458 nsp = argparse.Namespace(no_argument_file=True) # disable --root-dataset-pairs='+file' option in DatasetPairsAction
459 args, unknown_args = self.argument_parser.parse_known_args(sys_argv[1:], nsp) # forward all unknown args to `bzfs`
460 log.setLevel(args.jobrunner_log_level)
461 self.jobrunner_dryrun = args.jobrunner_dryrun
462 assert len(args.root_dataset_pairs) > 0
463 src_snapshot_plan: dict = self.validate_snapshot_plan(literal_eval(args.src_snapshot_plan), "--src-snapshot-plan")
464 src_bookmark_plan: dict = self.validate_snapshot_plan(literal_eval(args.src_bookmark_plan), "--src-bookmark-plan")
465 dst_snapshot_plan: dict = self.validate_snapshot_plan(literal_eval(args.dst_snapshot_plan), "--dst-snapshot-plan")
466 monitor_snapshot_plan: dict = self.validate_monitor_snapshot_plan(literal_eval(args.monitor_snapshot_plan))
467 localhostname: str = args.localhost if args.localhost else socket.gethostname()
468 self.validate_host_name(localhostname, "--localhost")
469 log.debug("localhostname: %s", localhostname)
470 src_hosts: list[str] = self.validate_src_hosts(
471 literal_eval(args.src_hosts if args.src_hosts is not None else sys.stdin.read())
472 )
473 basis_src_hosts: list[str] = src_hosts
474 nb_src_hosts: int = len(basis_src_hosts)
475 log.debug("src_hosts before subsetting: %s", src_hosts)
476 if args.src_host is not None: # retain only the src hosts that are also contained in args.src_host
477 assert isinstance(args.src_host, list)
478 retain_src_hosts: set[str] = set(args.src_host)
479 self.validate_is_subset(retain_src_hosts, src_hosts, "--src-host", "--src-hosts")
480 src_hosts = [host for host in src_hosts if host in retain_src_hosts]
481 dst_hosts: dict[str, list[str]] = self.validate_dst_hosts(literal_eval(args.dst_hosts))
482 nb_dst_hosts: int = len(dst_hosts)
483 if args.dst_host is not None: # retain only the dst hosts that are also contained in args.dst_host
484 assert isinstance(args.dst_host, list)
485 retain_dst_hosts: set[str] = set(args.dst_host)
486 self.validate_is_subset(retain_dst_hosts, dst_hosts.keys(), "--dst-host", "--dst-hosts.keys")
487 dst_hosts = {dst_host: lst for dst_host, lst in dst_hosts.items() if dst_host in retain_dst_hosts}
488 retain_dst_targets: dict[str, list[str]] = self.validate_dst_hosts(literal_eval(args.retain_dst_targets))
489 self.validate_is_subset(dst_hosts.keys(), retain_dst_targets.keys(), "--dst-hosts.keys", "--retain-dst-targets.keys")
490 dst_root_datasets: dict[str, str] = self.validate_dst_root_datasets(literal_eval(args.dst_root_datasets))
491 self.validate_is_subset(
492 dst_root_datasets.keys(), retain_dst_targets.keys(), "--dst-root-dataset.keys", "--retain-dst-targets.keys"
493 )
494 self.validate_is_subset(dst_hosts.keys(), dst_root_datasets.keys(), "--dst-hosts.keys", "--dst-root-dataset.keys")
495 bad_root_datasets: dict[str, str] = {
496 dst_host: root_dataset
497 for dst_host in sorted(dst_hosts.keys())
498 if SRC_MAGIC_SUBSTITUTION_TOKEN not in (root_dataset := dst_root_datasets[dst_host])
499 }
500 if len(src_hosts) > 1 and len(bad_root_datasets) > 0:
501 self.die(
502 "Cowardly refusing to proceed as multiple source hosts must not be configured to write to the same "
503 "destination dataset. "
504 f"Problematic subset of --dst-root-datasets: {bad_root_datasets} for src_hosts: {sorted(src_hosts)}"
505 )
506 bad_root_datasets = {
507 dst_host: root_dataset
508 for dst_host, root_dataset in sorted(dst_root_datasets.items())
509 if root_dataset and SRC_MAGIC_SUBSTITUTION_TOKEN not in root_dataset
510 }
511 if len(basis_src_hosts) > 1 and len(bad_root_datasets) > 0:
512 self.die(
513 "Cowardly refusing to proceed as multiple source hosts are defined in the configuration, but "
514 f"not all non-empty root datasets in --dst-root-datasets contain the '{SRC_MAGIC_SUBSTITUTION_TOKEN}' "
515 "substitution token to prevent collisions on writing destination datasets. "
516 f"Problematic subset of --dst-root-datasets: {bad_root_datasets} for src_hosts: {sorted(basis_src_hosts)}"
517 )
518 if args.jitter: # randomize host order to avoid potential thundering herd problems in large distributed systems
519 random.shuffle(src_hosts)
520 dst_hosts = shuffle_dict(dst_hosts)
521 ssh_src_user: str = args.ssh_src_user if args.ssh_src_user is not None else args.src_user # --src-user is deprecated
522 ssh_dst_user: str = args.ssh_dst_user if args.ssh_dst_user is not None else args.dst_user # --dst-user is deprecated
523 ssh_src_port: int | None = args.ssh_src_port
524 ssh_dst_port: int | None = args.ssh_dst_port
525 ssh_src_config_file: str | None = args.ssh_src_config_file
526 ssh_dst_config_file: str | None = args.ssh_dst_config_file
527 job_id: str = _sanitize(args.job_id)
528 job_run: str = args.job_run if args.job_run is not None else args.jobid # --jobid deprecat; was renamed to --job-run
529 job_run = _sanitize(job_run) if job_run else uuid.uuid1().hex
530 workers, workers_is_percent = args.workers
531 max_workers: int = max(1, round((os.cpu_count() or 1) * workers / 100.0) if workers_is_percent else round(workers))
532 worker_timeout_seconds: int = args.worker_timeout_seconds
533 self.spawn_process_per_job = args.spawn_process_per_job
534 username: str = pwd.getpwuid(os.geteuid()).pw_name
535 assert username
536 localhost_ids: set[str] = {"localhost", "127.0.0.1", "::1", socket.gethostname()} # ::1 is IPv6 loopback address
537 localhost_ids.update(self.get_localhost_ips()) # union
538 localhost_ids.add(localhostname)
540 def zero_pad(number: int, width: int = 6) -> str:
541 """Pads number with leading '0' chars to the given width."""
542 return f"{number:0{width}d}"
544 def jpad(jj: int, tag: str) -> str:
545 """Returns ``tag`` prefixed with slash and zero padded index."""
546 return "/" + zero_pad(jj) + tag
548 def npad() -> str:
549 """Returns standardized subjob count suffix."""
550 return SEP + zero_pad(len(subjobs))
552 def update_subjob_name(tag: str) -> str:
553 """Derives next subjob name based on ``tag`` and index ``j``."""
554 if j <= 0:
555 return subjob_name
556 elif j == 1:
557 return subjob_name + jpad(j - 1, tag)
558 else:
559 return subjob_name + "/" + BARRIER_CHAR
561 def resolve_dataset(hostname: str, dataset: str, is_src: bool = True) -> str:
562 """Returns host:dataset string resolving IPv6 and localhost cases."""
563 ssh_user = ssh_src_user if is_src else ssh_dst_user
564 ssh_user = ssh_user if ssh_user else username
565 lb: str = self.loopback_address
566 lhi: set[str] = localhost_ids
567 hostname = hostname if hostname not in lhi else (lb if lb else hostname) if username != ssh_user else "-"
568 hostname = convert_ipv6(hostname)
569 return f"{hostname}:{dataset}"
571 def resolve_dst_dataset(dst_hostname: str, dst_dataset: str) -> str:
572 """Expands ``dst_dataset`` relative to ``dst_hostname`` roots."""
573 root_dataset: str | None = dst_root_datasets.get(dst_hostname)
574 assert root_dataset is not None, dst_hostname # f"Hostname '{dst_hostname}' missing in --dst-root-datasets"
575 root_dataset = root_dataset.replace(SRC_MAGIC_SUBSTITUTION_TOKEN, src_host)
576 root_dataset = root_dataset.replace(DST_MAGIC_SUBSTITUTION_TOKEN, dst_hostname)
577 resolved_dst_dataset: str = root_dataset + "/" + dst_dataset if root_dataset else dst_dataset
578 bzfs_main.utils.validate_dataset_name(resolved_dst_dataset, dst_dataset)
579 return resolve_dataset(dst_hostname, resolved_dst_dataset, is_src=False)
581 dummy: str = DUMMY_DATASET
582 lhn: str = localhostname
583 bzfs_prog_header: list[str] = [BZFS_PROG_NAME, "--no-argument-file"] + unknown_args
584 subjobs: dict[str, list[str]] = {}
585 for i, src_host in enumerate(src_hosts):
586 subjob_name: str = zero_pad(i) + "src-host"
587 j: int = 0
588 opts: list[str]
590 if args.create_src_snapshots:
591 opts = ["--create-src-snapshots", f"--create-src-snapshots-plan={src_snapshot_plan}", "--skip-replication"]
592 opts += [f"--log-file-prefix={PROG_NAME}{SEP}create-src-snapshots{SEP}"]
593 opts += [f"--log-file-infix={SEP}{job_id}"]
594 opts += [f"--log-file-suffix={SEP}{job_run}{npad()}{_log_suffix(lhn, src_host, '')}{SEP}"]
595 self.add_ssh_opts(
596 opts, ssh_src_user=ssh_src_user, ssh_src_port=ssh_src_port, ssh_src_config_file=ssh_src_config_file
597 )
598 opts += ["--"]
599 opts += _flatten(_dedupe([(resolve_dataset(src_host, src), dummy) for src, dst in args.root_dataset_pairs]))
600 subjob_name += "/create-src-snapshots"
601 subjobs[subjob_name] = bzfs_prog_header + opts
603 if args.replicate:
604 j = 0
605 marker: str = "replicate"
606 for dst_hostname, targets in dst_hosts.items():
607 opts = self.replication_opts(
608 dst_snapshot_plan, set(targets), lhn, src_host, dst_hostname, marker, job_id, job_run + npad()
609 )
610 if len(opts) > 0:
611 opts += [f"--daemon-frequency={args.daemon_replication_frequency}"]
612 self.add_ssh_opts(
613 opts,
614 ssh_src_user=ssh_src_user,
615 ssh_dst_user=ssh_dst_user,
616 ssh_src_port=ssh_src_port,
617 ssh_dst_port=ssh_dst_port,
618 ssh_src_config_file=ssh_src_config_file,
619 ssh_dst_config_file=ssh_dst_config_file,
620 )
621 opts += ["--"]
622 dataset_pairs: list[tuple[str, str]] = [
623 (resolve_dataset(src_host, src), resolve_dst_dataset(dst_hostname, dst))
624 for src, dst in args.root_dataset_pairs
625 ]
626 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs)
627 if len(dataset_pairs) > 0:
628 subjobs[subjob_name + jpad(j, marker)] = bzfs_prog_header + opts + _flatten(dataset_pairs)
629 j += 1
630 subjob_name = update_subjob_name(marker)
632 def prune_src(opts: list[str], retention_plan: dict, tag: str, src_host: str = src_host) -> None:
633 """Creates prune subjob options for ``tag`` using ``retention_plan``."""
634 opts += [
635 "--skip-replication",
636 f"--delete-dst-snapshots-except-plan={retention_plan}",
637 f"--log-file-prefix={PROG_NAME}{SEP}{tag}{SEP}",
638 f"--log-file-infix={SEP}{job_id}",
639 f"--log-file-suffix={SEP}{job_run}{npad()}{_log_suffix(lhn, src_host, '')}{SEP}",
640 f"--daemon-frequency={args.daemon_prune_src_frequency}",
641 ]
642 self.add_ssh_opts( # i.e. dst=src, src=dummy
643 opts, ssh_dst_user=ssh_src_user, ssh_dst_port=ssh_src_port, ssh_dst_config_file=ssh_src_config_file
644 )
645 opts += ["--"]
646 opts += _flatten(_dedupe([(dummy, resolve_dataset(src_host, src)) for src, dst in args.root_dataset_pairs]))
647 nonlocal subjob_name
648 subjob_name += f"/{tag}"
649 subjobs[subjob_name] = bzfs_prog_header + opts
651 if args.prune_src_snapshots:
652 prune_src(["--delete-dst-snapshots"], src_snapshot_plan, "prune-src-snapshots")
654 if args.prune_src_bookmarks:
655 prune_src(["--delete-dst-snapshots=bookmarks"], src_bookmark_plan, "prune-src-bookmarks")
657 if args.prune_dst_snapshots:
658 self.validate_true(
659 retain_dst_targets, "--retain-dst-targets must not be empty. Cowardly refusing to delete all snapshots!"
660 )
661 j = 0
662 marker = "prune-dst-snapshots"
663 for dst_hostname, _ in dst_hosts.items():
664 curr_retain_targets: set[str] = set(retain_dst_targets[dst_hostname])
665 curr_dst_snapshot_plan = { # only retain targets that belong to the host
666 org: {target: periods for target, periods in target_periods.items() if target in curr_retain_targets}
667 for org, target_periods in dst_snapshot_plan.items()
668 }
669 opts = ["--delete-dst-snapshots", "--skip-replication"]
670 opts += [f"--delete-dst-snapshots-except-plan={curr_dst_snapshot_plan}"]
671 opts += [f"--log-file-prefix={PROG_NAME}{SEP}{marker}{SEP}"]
672 opts += [f"--log-file-infix={SEP}{job_id}"]
673 opts += [f"--log-file-suffix={SEP}{job_run}{npad()}{_log_suffix(lhn, src_host, dst_hostname)}{SEP}"]
674 opts += [f"--daemon-frequency={args.daemon_prune_dst_frequency}"]
675 self.add_ssh_opts(
676 opts, ssh_dst_user=ssh_dst_user, ssh_dst_port=ssh_dst_port, ssh_dst_config_file=ssh_dst_config_file
677 )
678 opts += ["--"]
679 dataset_pairs = [(dummy, resolve_dst_dataset(dst_hostname, dst)) for src, dst in args.root_dataset_pairs]
680 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs)
681 if len(dataset_pairs) > 0:
682 subjobs[subjob_name + jpad(j, marker)] = bzfs_prog_header + opts + _flatten(dataset_pairs)
683 j += 1
684 subjob_name = update_subjob_name(marker)
686 def monitor_snapshots_opts(tag: str, monitor_plan: dict, logsuffix: str) -> list[str]:
687 """Returns monitor subjob options for ``tag`` and ``monitor_plan``."""
688 opts = [f"--monitor-snapshots={monitor_plan}", "--skip-replication"]
689 opts += [f"--log-file-prefix={PROG_NAME}{SEP}{tag}{SEP}"]
690 opts += [f"--log-file-infix={SEP}{job_id}"]
691 opts += [f"--log-file-suffix={SEP}{job_run}{npad()}{logsuffix}{SEP}"]
692 opts += [f"--daemon-frequency={args.daemon_monitor_snapshots_frequency}"]
693 return opts
695 def build_monitor_plan(monitor_plan: dict, snapshot_plan: dict, cycles_prefix: str) -> dict:
696 """Expands ``monitor_plan`` with cycle defaults from ``snapshot_plan``."""
698 def alert_dicts(alertdict: dict, cycles: int) -> dict:
699 """Returns alert dictionaries with explicit ``cycles`` value."""
700 latest_dict = alertdict.copy()
701 for prefix in ("src_snapshot_", "dst_snapshot_", ""):
702 latest_dict.pop(f"{prefix}cycles", None)
703 oldest_dict = latest_dict.copy()
704 oldest_dict["cycles"] = int(alertdict.get(f"{cycles_prefix}cycles", cycles))
705 return {"latest": latest_dict, "oldest": oldest_dict}
707 return {
708 org: {
709 target: {
710 periodunit: alert_dicts(alertdict, snapshot_plan.get(org, {}).get(target, {}).get(periodunit, 1))
711 for periodunit, alertdict in periods.items()
712 }
713 for target, periods in target_periods.items()
714 }
715 for org, target_periods in monitor_plan.items()
716 }
718 if args.monitor_src_snapshots:
719 marker = "monitor-src-snapshots"
720 monitor_plan = build_monitor_plan(monitor_snapshot_plan, src_snapshot_plan, "src_snapshot_")
721 opts = monitor_snapshots_opts(marker, monitor_plan, _log_suffix(lhn, src_host, ""))
722 self.add_ssh_opts( # i.e. dst=src, src=dummy
723 opts, ssh_dst_user=ssh_src_user, ssh_dst_port=ssh_src_port, ssh_dst_config_file=ssh_src_config_file
724 )
725 opts += ["--"]
726 opts += _flatten(_dedupe([(dummy, resolve_dataset(src_host, src)) for src, dst in args.root_dataset_pairs]))
727 subjob_name += "/" + marker
728 subjobs[subjob_name] = bzfs_prog_header + opts
730 if args.monitor_dst_snapshots:
731 j = 0
732 marker = "monitor-dst-snapshots"
733 for dst_hostname, targets in dst_hosts.items():
734 monitor_targets = set(targets).intersection(set(retain_dst_targets[dst_hostname]))
735 monitor_plan = { # only retain targets that belong to the host
736 org: {target: periods for target, periods in target_periods.items() if target in monitor_targets}
737 for org, target_periods in monitor_snapshot_plan.items()
738 }
739 monitor_plan = build_monitor_plan(monitor_plan, dst_snapshot_plan, "dst_snapshot_")
740 opts = monitor_snapshots_opts(marker, monitor_plan, _log_suffix(lhn, src_host, dst_hostname))
741 self.add_ssh_opts(
742 opts, ssh_dst_user=ssh_dst_user, ssh_dst_port=ssh_dst_port, ssh_dst_config_file=ssh_dst_config_file
743 )
744 opts += ["--"]
745 dataset_pairs = [(dummy, resolve_dst_dataset(dst_hostname, dst)) for src, dst in args.root_dataset_pairs]
746 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs)
747 if len(dataset_pairs) > 0:
748 subjobs[subjob_name + jpad(j, marker)] = bzfs_prog_header + opts + _flatten(dataset_pairs)
749 j += 1
750 subjob_name = update_subjob_name(marker)
752 msg = "Ready to run %s subjobs using %s/%s src hosts: %s, %s/%s dst hosts: %s"
753 log.info(
754 msg, len(subjobs), len(src_hosts), nb_src_hosts, src_hosts, len(dst_hosts), nb_dst_hosts, list(dst_hosts.keys())
755 )
756 log.log(LOG_TRACE, "subjobs: \n%s", _pretty_print_formatter(subjobs))
757 self.run_subjobs(subjobs, max_workers, worker_timeout_seconds, args.work_period_seconds, args.jitter)
758 ex = self.first_exception
759 if isinstance(ex, int):
760 assert ex != 0
761 sys.exit(ex)
762 assert ex is None, ex
763 log.info("Succeeded. Bye!")
765 def replication_opts(
766 self,
767 dst_snapshot_plan: dict[str, dict[str, dict[str, int]]],
768 targets: set[str],
769 localhostname: str,
770 src_hostname: str,
771 dst_hostname: str,
772 tag: str,
773 job_id: str,
774 job_run: str,
775 ) -> list[str]:
776 """Returns CLI options for one replication subjob."""
777 log = self.log
778 log.debug("%s", f"Replicating targets {sorted(targets)} from {src_hostname} to {dst_hostname} ...")
779 include_snapshot_plan = { # only replicate targets that belong to the destination host and are relevant
780 org: {
781 target: {
782 duration_unit: duration_amount
783 for duration_unit, duration_amount in periods.items()
784 if duration_amount > 0
785 }
786 for target, periods in target_periods.items()
787 if target in targets
788 }
789 for org, target_periods in dst_snapshot_plan.items()
790 }
791 include_snapshot_plan = { # only replicate orgs that have at least one relevant target_period
792 org: target_periods
793 for org, target_periods in include_snapshot_plan.items()
794 if any(len(periods) > 0 for target, periods in target_periods.items())
795 }
796 opts: list[str] = []
797 if len(include_snapshot_plan) > 0:
798 opts += [f"--include-snapshot-plan={include_snapshot_plan}"]
799 opts += [f"--log-file-prefix={PROG_NAME}{SEP}{tag}{SEP}"]
800 opts += [f"--log-file-infix={SEP}{job_id}"]
801 opts += [f"--log-file-suffix={SEP}{job_run}{_log_suffix(localhostname, src_hostname, dst_hostname)}{SEP}"]
802 return opts
804 def skip_nonexisting_local_dst_pools(self, root_dataset_pairs: list[tuple[str, str]]) -> list[tuple[str, str]]:
805 """Skip datasets that point to removable destination drives that are not currently (locally) attached, if any."""
807 def zpool(dataset: str) -> str:
808 """Returns pool name portion of ``dataset``."""
809 return dataset.split("/", 1)[0]
811 assert len(root_dataset_pairs) > 0
812 unknown_dst_pools = {zpool(dst) for src, dst in root_dataset_pairs}
813 unknown_dst_pools = unknown_dst_pools.difference(self.cache_known_dst_pools)
815 # Here we treat a zpool as existing if the zpool isn't local, aka if it isn't prefixed with "-:". A remote host
816 # will raise an appropriate error if it turns out that the remote zpool doesn't actually exist.
817 unknown_local_dst_pools = {pool for pool in unknown_dst_pools if pool.startswith("-:")}
818 if len(unknown_local_dst_pools) > 0: # `zfs list` if local
819 existing_pools = {pool[len("-:") :] for pool in unknown_local_dst_pools}
820 cmd = "zfs list -t filesystem,volume -Hp -o name".split(" ") + sorted(existing_pools)
821 sp = subprocess.run(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, text=True)
822 if sp.returncode not in (0, 1): # 1 means dataset not found
823 self.die(f"Unexpected error {sp.returncode} on checking for existing local dst pools: {sp.stderr.strip()}")
824 existing_pools = {"-:" + pool for pool in sp.stdout.splitlines()}
825 self.cache_existing_dst_pools.update(existing_pools) # union
826 unknown_remote_dst_pools = unknown_dst_pools.difference(unknown_local_dst_pools)
827 self.cache_existing_dst_pools.update(unknown_remote_dst_pools) # union
828 self.cache_known_dst_pools.update(unknown_dst_pools) # union
829 results: list[tuple[str, str]] = []
830 for src, dst in root_dataset_pairs:
831 if zpool(dst) in self.cache_existing_dst_pools:
832 results.append((src, dst))
833 else:
834 self.log.warning("Skipping dst dataset for which local dst pool does not exist: %s", dst)
835 return results
837 @staticmethod
838 def add_ssh_opts(
839 opts: list[str],
840 ssh_src_user: str | None = None,
841 ssh_dst_user: str | None = None,
842 ssh_src_port: int | None = None,
843 ssh_dst_port: int | None = None,
844 ssh_src_config_file: str | None = None,
845 ssh_dst_config_file: str | None = None,
846 ) -> None:
847 """Appends ssh related options to ``opts`` if specified."""
848 assert isinstance(opts, list)
849 opts += [f"--ssh-src-user={ssh_src_user}"] if ssh_src_user else []
850 opts += [f"--ssh-dst-user={ssh_dst_user}"] if ssh_dst_user else []
851 opts += [f"--ssh-src-port={ssh_src_port}"] if ssh_src_port is not None else []
852 opts += [f"--ssh-dst-port={ssh_dst_port}"] if ssh_dst_port is not None else []
853 opts += [f"--ssh-src-config-file={ssh_src_config_file}"] if ssh_src_config_file else []
854 opts += [f"--ssh-dst-config-file={ssh_dst_config_file}"] if ssh_dst_config_file else []
856 def run_subjobs(
857 self,
858 subjobs: dict[str, list[str]],
859 max_workers: int,
860 timeout_secs: float | None,
861 work_period_seconds: float,
862 jitter: bool,
863 ) -> None:
864 """Executes subjobs sequentially or in parallel, respecting barriers."""
865 self.stats = Stats()
866 self.stats.jobs_all = len(subjobs)
867 log = self.log
868 num_intervals = 1 + len(subjobs) if jitter else len(subjobs)
869 interval_nanos = 0 if len(subjobs) == 0 else round(1_000_000_000 * max(0.0, work_period_seconds) / num_intervals)
870 assert interval_nanos >= 0
871 if jitter: # randomize job start time to avoid potential thundering herd problems in large distributed systems
872 sleep_nanos = random.randint(0, interval_nanos) # noqa: S311
873 log.info("Jitter: Delaying job start time by sleeping for %s ...", human_readable_duration(sleep_nanos))
874 time.sleep(sleep_nanos / 1_000_000_000) # seconds
875 sorted_subjobs = sorted(subjobs.keys())
876 has_barrier = any(BARRIER_CHAR in subjob.split("/") for subjob in sorted_subjobs)
877 if self.spawn_process_per_job or has_barrier or bzfs.has_siblings(sorted_subjobs): # siblings can run in parallel
878 log.log(LOG_TRACE, "%s", "spawn_process_per_job: True")
879 process_datasets_in_parallel_and_fault_tolerant(
880 log=log,
881 datasets=sorted_subjobs,
882 process_dataset=lambda subjob, tid, retry: self.run_subjob(
883 subjobs[subjob], name=subjob, timeout_secs=timeout_secs, spawn_process_per_job=True
884 )
885 == 0,
886 skip_tree_on_error=lambda subjob: True,
887 skip_on_error=SKIP_ON_ERROR_DEFAULT,
888 max_workers=max_workers,
889 interval_nanos=lambda subjob: interval_nanos,
890 task_name="Subjob",
891 retry_policy=None, # no retries
892 dry_run=False,
893 is_test_mode=self.is_test_mode,
894 )
895 else:
896 log.log(LOG_TRACE, "%s", "spawn_process_per_job: False")
897 next_update_nanos = time.monotonic_ns()
898 for subjob in sorted_subjobs:
899 time.sleep(max(0, next_update_nanos - time.monotonic_ns()) / 1_000_000_000) # seconds
900 next_update_nanos += interval_nanos
901 s = subjob
902 if not self.run_subjob(subjobs[subjob], name=s, timeout_secs=timeout_secs, spawn_process_per_job=False) == 0:
903 break
904 stats = self.stats
905 jobs_skipped = stats.jobs_all - stats.jobs_started
906 msg = f"{stats}, skipped:" + percent(jobs_skipped, total=stats.jobs_all)
907 log.info("Final Progress: %s", msg)
908 assert stats.jobs_running == 0, msg
909 assert stats.jobs_completed == stats.jobs_started, msg
910 skipped_jobs_dict = {subjob: subjobs[subjob] for subjob in sorted_subjobs if subjob not in stats.started_job_names}
911 if len(skipped_jobs_dict) > 0:
912 log.debug("Skipped subjobs: \n%s", _pretty_print_formatter(skipped_jobs_dict))
913 assert jobs_skipped == len(skipped_jobs_dict), msg
915 def run_subjob(
916 self, cmd: list[str], name: str, timeout_secs: float | None, spawn_process_per_job: bool
917 ) -> int | None: # thread-safe
918 """Executes one worker job and updates shared Stats."""
919 start_time_nanos = time.monotonic_ns()
920 returncode = None
921 log = self.log
922 cmd_str = " ".join(cmd)
923 stats = self.stats
924 try:
925 with stats.lock:
926 stats.jobs_started += 1
927 stats.jobs_running += 1
928 stats.started_job_names.add(name)
929 msg = str(stats)
930 log.log(LOG_TRACE, "Starting worker job: %s", cmd_str)
931 log.info("Progress: %s", msg)
932 start_time_nanos = time.monotonic_ns()
933 if spawn_process_per_job:
934 returncode = self.run_worker_job_spawn_process_per_job(cmd, timeout_secs)
935 else:
936 returncode = self.run_worker_job_in_current_thread(cmd, timeout_secs)
937 except BaseException as e:
938 log.error("Worker job failed with unexpected exception: %s for command: %s", e, cmd_str)
939 raise
940 else:
941 elapsed_nanos = time.monotonic_ns() - start_time_nanos
942 elapsed_human = human_readable_duration(elapsed_nanos)
943 if returncode != 0:
944 with stats.lock:
945 if self.first_exception is None:
946 self.first_exception = DIE_STATUS if returncode is None else returncode
947 log.error("Worker job failed with exit code %s in %s: %s", returncode, elapsed_human, cmd_str)
948 else:
949 log.debug("Worker job succeeded in %s: %s", elapsed_human, cmd_str)
950 return returncode
951 finally:
952 elapsed_nanos = time.monotonic_ns() - start_time_nanos
953 with stats.lock:
954 stats.jobs_running -= 1
955 stats.jobs_completed += 1
956 stats.sum_elapsed_nanos += elapsed_nanos
957 stats.jobs_failed += 1 if returncode != 0 else 0
958 msg = str(stats)
959 assert stats.jobs_all >= 0, msg
960 assert stats.jobs_started >= 0, msg
961 assert stats.jobs_completed >= 0, msg
962 assert stats.jobs_failed >= 0, msg
963 assert stats.jobs_running >= 0, msg
964 assert stats.sum_elapsed_nanos >= 0, msg
965 assert stats.jobs_failed <= stats.jobs_completed, msg
966 assert stats.jobs_completed <= stats.jobs_started, msg
967 assert stats.jobs_started <= stats.jobs_all, msg
968 log.info("Progress: %s", msg)
970 def run_worker_job_in_current_thread(self, cmd: list[str], timeout_secs: float | None) -> int | None:
971 """Runs ``bzfs`` in-process and return its exit code."""
972 log = self.log
973 if timeout_secs is not None:
974 cmd = cmd[0:1] + [f"--timeout={round(1000 * timeout_secs)}milliseconds"] + cmd[1:]
975 try:
976 if not self.jobrunner_dryrun:
977 self._bzfs_run_main(cmd)
978 return 0
979 except subprocess.CalledProcessError as e:
980 return e.returncode
981 except SystemExit as e:
982 assert e.code is None or isinstance(e.code, int)
983 return e.code
984 except BaseException:
985 log.exception("Worker job failed with unexpected exception for command: %s", " ".join(cmd))
986 return DIE_STATUS
988 def _bzfs_run_main(self, cmd: list[str]) -> None:
989 """Delegate execution to :mod:`bzfs` using parsed arguments."""
990 bzfs_job = bzfs.Job()
991 bzfs_job.is_test_mode = self.is_test_mode
992 bzfs_job.run_main(self.bzfs_argument_parser.parse_args(cmd[1:]), cmd)
994 def run_worker_job_spawn_process_per_job(self, cmd: list[str], timeout_secs: float | None) -> int | None:
995 """Spawns a subprocess for the worker job and waits for completion."""
996 log = self.log
997 if len(cmd) > 0 and cmd[0] == BZFS_PROG_NAME:
998 cmd = [sys.executable, "-m", "bzfs_main." + cmd[0]] + cmd[1:]
999 if self.jobrunner_dryrun:
1000 return 0
1001 proc = subprocess.Popen(cmd, stdin=subprocess.DEVNULL, text=True) # run job in a separate subprocess
1002 try:
1003 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to exit
1004 except subprocess.TimeoutExpired:
1005 cmd_str = " ".join(cmd)
1006 log.error("%s", f"Terminating worker job as it failed to complete within {timeout_secs}s: {cmd_str}")
1007 proc.terminate() # Sends SIGTERM signal to job subprocess
1008 assert isinstance(timeout_secs, float)
1009 timeout_secs = min(1.0, timeout_secs)
1010 try:
1011 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to exit
1012 except subprocess.TimeoutExpired:
1013 log.error("%s", f"Killing worker job as it failed to terminate within {timeout_secs}s: {cmd_str}")
1014 proc.kill() # Sends SIGKILL signal to job subprocess because SIGTERM wasn't enough
1015 timeout_secs = min(0.025, timeout_secs)
1016 with contextlib.suppress(subprocess.TimeoutExpired):
1017 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to exit
1018 return proc.returncode
1020 def validate_src_hosts(self, src_hosts: list[str]) -> list[str]:
1021 """Checks ``src_hosts`` contains valid hostnames."""
1022 context = "--src-hosts"
1023 self.validate_type(src_hosts, list, context)
1024 for src_hostname in src_hosts:
1025 self.validate_host_name(src_hostname, context)
1026 return src_hosts
1028 def validate_dst_hosts(self, dst_hosts: dict[str, list[str]]) -> dict[str, list[str]]:
1029 """Checks destination hosts dictionary."""
1030 context = "--dst-hosts"
1031 self.validate_type(dst_hosts, dict, context)
1032 for dst_hostname, targets in dst_hosts.items():
1033 self.validate_host_name(dst_hostname, context)
1034 self.validate_type(targets, list, f"{context} targets")
1035 for target in targets:
1036 self.validate_type(target, str, f"{context} target")
1037 return dst_hosts
1039 def validate_dst_root_datasets(self, dst_root_datasets: dict[str, str]) -> dict[str, str]:
1040 """Checks that each destination root dataset string is valid."""
1041 context = "--dst-root-datasets"
1042 self.validate_type(dst_root_datasets, dict, context)
1043 for dst_hostname, dst_root_dataset in dst_root_datasets.items():
1044 self.validate_host_name(dst_hostname, context)
1045 self.validate_type(dst_root_dataset, str, f"{context} root dataset")
1046 return dst_root_datasets
1048 def validate_snapshot_plan(
1049 self, snapshot_plan: dict[str, dict[str, dict[str, int]]], context: str
1050 ) -> dict[str, dict[str, dict[str, int]]]:
1051 """Checks snapshot plan structure and value types."""
1052 self.validate_type(snapshot_plan, dict, context)
1053 for org, target_periods in snapshot_plan.items():
1054 self.validate_type(org, str, f"{context} org")
1055 self.validate_type(target_periods, dict, f"{context} target_periods")
1056 for target, periods in target_periods.items():
1057 self.validate_type(target, str, f"{context} org/target")
1058 self.validate_type(periods, dict, f"{context} org/periods")
1059 for period_unit, period_amount in periods.items():
1060 self.validate_non_empty_string(period_unit, f"{context} org/target/period_unit")
1061 self.validate_non_negative_int(period_amount, f"{context} org/target/period_amount")
1062 return snapshot_plan
1064 def validate_monitor_snapshot_plan(
1065 self, monitor_snapshot_plan: dict[str, dict[str, dict[str, dict[str, str | int]]]]
1066 ) -> dict[str, dict[str, dict[str, dict[str, str | int]]]]:
1067 """Checks snapshot monitoring plan configuration."""
1068 context = "--monitor-snapshot-plan"
1069 self.validate_type(monitor_snapshot_plan, dict, context)
1070 for org, target_periods in monitor_snapshot_plan.items():
1071 self.validate_type(org, str, f"{context} org")
1072 self.validate_type(target_periods, dict, f"{context} target_periods")
1073 for target, periods in target_periods.items():
1074 self.validate_type(target, str, f"{context} org/target")
1075 self.validate_type(periods, dict, f"{context} org/periods")
1076 for period_unit, alert_dict in periods.items():
1077 self.validate_non_empty_string(period_unit, f"{context} org/target/period_unit")
1078 self.validate_type(alert_dict, dict, f"{context} org/target/alert_dict")
1079 for key, value in alert_dict.items():
1080 self.validate_non_empty_string(key, f"{context} org/target/alert_dict/key")
1081 self.validate_type(value, Union[str, int], f"{context} org/target/alert_dict/value")
1082 return monitor_snapshot_plan
1084 def validate_is_subset(self, x: Iterable[str], y: Iterable[str], x_name: str, y_name: str) -> None:
1085 """Raises error if ``x`` contains an item not present in ``y``."""
1086 if isinstance(x, str) or not isinstance(x, Iterable):
1087 self.die(f"{x_name} must be an Iterable")
1088 if isinstance(y, str) or not isinstance(y, Iterable):
1089 self.die(f"{y_name} must be an Iterable")
1090 if not set(x).issubset(set(y)):
1091 diff = sorted(set(x).difference(set(y)))
1092 self.die(f"{x_name} must be a subset of {y_name}. diff: {diff}, {x_name}: {sorted(x)}, {y_name}: {sorted(y)}")
1094 def validate_host_name(self, hostname: str, context: str) -> None:
1095 """Checks host name string."""
1096 self.validate_non_empty_string(hostname, f"{context} hostname")
1097 bzfs.validate_host_name(hostname, context)
1099 def validate_non_empty_string(self, value: str, name: str) -> None:
1100 """Checks that ``value`` is a non-empty string."""
1101 self.validate_type(value, str, name)
1102 if not value:
1103 self.die(f"{name} must not be empty!")
1105 def validate_non_negative_int(self, value: int, name: str) -> None:
1106 """Checks ``value`` is an int >= 0."""
1107 self.validate_type(value, int, name)
1108 if value < 0:
1109 self.die(f"{name} must be a non-negative integer: {value}")
1111 def validate_true(self, expr: Any, msg: str) -> None:
1112 """Raises error if ``expr`` evaluates to ``False``."""
1113 if not bool(expr):
1114 self.die(msg)
1116 def validate_type(self, value: Any, expected_type: Any, name: str) -> None:
1117 """Checks ``value`` is instance of ``expected_type`` or union thereof."""
1118 if hasattr(expected_type, "__origin__") and expected_type.__origin__ is Union: # for compat with python < 3.10
1119 union_types = expected_type.__args__
1120 for t in union_types:
1121 if isinstance(value, t):
1122 return
1123 type_msg = " or ".join([t.__name__ for t in union_types])
1124 self.die(f"{name} must be of type {type_msg} but got {type(value).__name__}: {value}")
1125 elif not isinstance(value, expected_type):
1126 self.die(f"{name} must be of type {expected_type.__name__} but got {type(value).__name__}: {value}")
1128 def die(self, msg: str) -> None:
1129 """Log ``msg`` and exit the program."""
1130 self.log.error("%s", msg)
1131 bzfs_main.utils.die(msg)
1133 def get_localhost_ips(self) -> set[str]:
1134 """Returns all network addresses of the local host, i.e. all configured addresses on all network interfaces, without
1135 depending on name resolution."""
1136 if sys.platform == "linux":
1137 try:
1138 proc = subprocess.run(["hostname", "-I"], stdin=DEVNULL, stdout=PIPE, text=True, check=True) # noqa: S607
1139 except Exception as e:
1140 self.log.warning("Cannot run 'hostname -I' on localhost: %s", e)
1141 else:
1142 return {ip for ip in proc.stdout.strip().split() if ip}
1143 return set()
1146#############################################################################
1147class Stats:
1148 """Thread-safe counters summarizing subjob progress."""
1150 def __init__(self) -> None:
1151 self.lock: threading.Lock = threading.Lock()
1152 self.jobs_all: int = 0
1153 self.jobs_started: int = 0
1154 self.jobs_completed: int = 0
1155 self.jobs_failed: int = 0
1156 self.jobs_running: int = 0
1157 self.sum_elapsed_nanos: int = 0
1158 self.started_job_names: set[str] = set()
1160 def __repr__(self) -> str:
1162 def pct(number: int) -> str:
1163 """Returns percentage string relative to total jobs."""
1164 return percent(number, total=self.jobs_all)
1166 al, started, completed, failed = self.jobs_all, self.jobs_started, self.jobs_completed, self.jobs_failed
1167 running = self.jobs_running
1168 t = "avg_completion_time:" + human_readable_duration(self.sum_elapsed_nanos / max(1, completed))
1169 return f"all:{al}, started:{pct(started)}, completed:{pct(completed)}, failed:{pct(failed)}, running:{running}, {t}"
1172#############################################################################
1173class RejectArgumentAction(argparse.Action):
1174 """An argparse Action that immediately fails if it is ever triggered."""
1176 def __call__(
1177 self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Any, option_string: str | None = None
1178 ) -> None:
1179 """Abort argument parsing if a protected option is seen."""
1180 parser.error(f"Security: Overriding protected argument '{option_string}' is not allowed.")
1183#############################################################################
1184def _dedupe(root_dataset_pairs: list[tuple[str, str]]) -> list[tuple[str, str]]:
1185 """Returns a list with duplicate dataset pairs removed while preserving order."""
1186 return list(dict.fromkeys(root_dataset_pairs).keys())
1189T = TypeVar("T")
1192def _flatten(root_dataset_pairs: Iterable[Iterable[T]]) -> list[T]:
1193 """Flattens an iterable of pairs into a single list."""
1194 return [item for pair in root_dataset_pairs for item in pair]
1197def _sanitize(filename: str) -> str:
1198 """Replaces potentially problematic characters in ``filename`` with '!'."""
1199 for s in (" ", "..", "/", "\\", SEP):
1200 filename = filename.replace(s, "!")
1201 return filename
1204def _log_suffix(localhostname: str, src_hostname: str, dst_hostname: str) -> str:
1205 """Returns a log file suffix in a format that contains the given hostnames."""
1206 sanitized_dst_hostname = _sanitize(dst_hostname) if dst_hostname else ""
1207 return f"{SEP}{_sanitize(localhostname)}{SEP}{_sanitize(src_hostname)}{SEP}{sanitized_dst_hostname}"
1210def _pretty_print_formatter(dictionary: dict[str, Any]) -> Any:
1211 """Lazy JSON formatter used to avoid overhead in disabled log levels."""
1213 class PrettyPrintFormatter:
1214 """Wrapper returning formatted JSON on ``str`` conversion."""
1216 def __str__(self) -> str:
1217 import json
1219 return json.dumps(dictionary, indent=4, sort_keys=True)
1221 return PrettyPrintFormatter()
1224def _detect_loopback_address() -> str:
1225 """Detects if a loopback connection over IPv4 or IPv6 is possible."""
1226 try:
1227 addr = "127.0.0.1"
1228 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
1229 s.bind((addr, 0))
1230 return addr
1231 except OSError:
1232 pass
1234 try:
1235 addr = "::1"
1236 with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
1237 s.bind((addr, 0))
1238 return addr
1239 except OSError:
1240 pass
1242 return ""
1245def convert_ipv6(hostname: str) -> str:
1246 """Supports IPv6 without getting confused by host:dataset colon separator and any colons that may be part of a (valid)
1247 ZFS dataset name."""
1248 return hostname.replace(":", "|") # Also see bzfs.convert_ipv6() for the reverse conversion
1251#############################################################################
1252if __name__ == "__main__": 1252 ↛ 1253line 1252 didn't jump to line 1253 because the condition on line 1252 was never true
1253 main()