Coverage for bzfs_main / bzfs_jobrunner.py: 99%
637 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15# Inline script metadata conforming to https://packaging.python.org/specifications/inline-script-metadata
16# /// script
17# requires-python = ">=3.9"
18# dependencies = []
19# ///
20#
21"""
22* High-level orchestrator that calls `bzfs` as part of complex, periodic workflows to manage backup, replication, and pruning
23 jobs across a fleet of multiple source and destination hosts; driven by a fleet-wide job config file
24 (e.g., `bzfs_job_example.py`).
25* Overview of the bzfs_jobrunner.py codebase:
26* The codebase starts with docs, definition of input data and associated argument parsing of CLI options/parameters.
27* Control flow starts in main(), far below ..., which kicks off a "Job".
28* A Job creates zero or more "subjobs" for each local or remote host, via run_main().
29* It executes the subjobs, serially or in parallel, via run_subjobs(), which in turn delegates parallel job coordination to
30 bzfs.process_datasets_in_parallel_and_fault_tolerant().
31* README_bzfs_jobrunner.md is mostly auto-generated from the ArgumentParser help texts as the source of "truth", via
32update_readme.sh. Simply run that script whenever you change or add ArgumentParser help text.
33"""
35from __future__ import (
36 annotations,
37)
38import argparse
39import contextlib
40import os
41import platform
42import pwd
43import random
44import socket
45import subprocess
46import sys
47import threading
48import time
49import uuid
50from ast import (
51 literal_eval,
52)
53from collections.abc import (
54 Iterable,
55)
56from logging import (
57 Logger,
58)
59from subprocess import (
60 DEVNULL,
61 PIPE,
62)
63from typing import (
64 Any,
65 Final,
66 NoReturn,
67 TypeVar,
68 Union,
69 final,
70)
72import bzfs_main.argparse_actions
73from bzfs_main import (
74 bzfs,
75)
76from bzfs_main.argparse_cli import (
77 PROG_AUTHOR,
78)
79from bzfs_main.detect import (
80 DUMMY_DATASET,
81)
82from bzfs_main.loggers import (
83 get_simple_logger,
84 reset_logger,
85 set_logging_runtime_defaults,
86)
87from bzfs_main.util import (
88 check_range,
89 utils,
90)
91from bzfs_main.util.parallel_tasktree import (
92 BARRIER_CHAR,
93)
94from bzfs_main.util.parallel_tasktree_policy import (
95 process_datasets_in_parallel_and_fault_tolerant,
96)
97from bzfs_main.util.utils import (
98 DIE_STATUS,
99 LOG_TRACE,
100 UMASK,
101 JobStats,
102 Subprocesses,
103 dry,
104 format_dict,
105 format_obj,
106 getenv_bool,
107 human_readable_duration,
108 percent,
109 shuffle_dict,
110 terminate_process_subtree,
111 termination_signal_handler,
112 validate_dataset_name,
113)
114from bzfs_main.util.utils import PROG_NAME as BZFS_PROG_NAME
116# constants:
117PROG_NAME: Final[str] = "bzfs_jobrunner"
118SRC_MAGIC_SUBSTITUTION_TOKEN: Final[str] = "^SRC_HOST" # noqa: S105
119DST_MAGIC_SUBSTITUTION_TOKEN: Final[str] = "^DST_HOST" # noqa: S105
120SEP: Final[str] = ","
121POSIX_END_OF_OPTIONS_MARKER: Final[str] = "--" # args following -- are treated as operands, even if they begin with a hyphen
124def argument_parser() -> argparse.ArgumentParser:
125 """Returns the CLI parser used by bzfs_jobrunner."""
126 # fmt: off
127 parser = argparse.ArgumentParser(
128 prog=PROG_NAME,
129 allow_abbrev=False,
130 formatter_class=argparse.RawTextHelpFormatter,
131 description=f"""
132This program is a convenience wrapper around [bzfs](README.md) that simplifies periodic ZFS snapshot creation, replication,
133pruning, and monitoring, across a fleet of N source hosts and M destination hosts, using a single fleet-wide shared
134[jobconfig](bzfs_tests/bzfs_job_example.py) script.
135For example, this simplifies the deployment of an efficient geo-replicated backup service where each of the M destination
136hosts is located in a separate geographic region and receives replicas from (the same set of) N source hosts. It also
137simplifies low latency replication from a primary to a secondary or to M read replicas, or backup to removable drives, etc.
139This program can be used to efficiently replicate ...
141a) within a single machine (local mode), or
143b) from a single source host to one or more destination hosts (pull or push or pull-push mode), or
145c) from multiple source hosts to a single destination host (pull or push or pull-push mode), or
147d) from N source hosts to M destination hosts (pull or push or pull-push mode, N and M can be large, M=2 or M=3 are typical
148geo-replication factors)
150You can run this program on a single third-party host and have that talk to all source hosts and destination hosts, which is
151convenient for basic use cases and for testing.
152However, typically, a cron job on each source host runs `{PROG_NAME}` periodically to create new snapshots (via
153--create-src-snapshots) and prune outdated snapshots and bookmarks on the source (via --prune-src-snapshots and
154--prune-src-bookmarks), whereas another cron job on each destination host runs `{PROG_NAME}` periodically to prune
155outdated destination snapshots (via --prune-dst-snapshots), and to replicate the recently created snapshots from the source
156to the destination (via --replicate).
157Yet another cron job on each source and each destination runs `{PROG_NAME}` periodically to alert the user if the latest or
158oldest snapshot is somehow too old (via --monitor-src-snapshots and --monitor-dst-snapshots).
159The frequency of these periodic activities can vary by activity, and is typically every second, minute, hour, day, week,
160month and/or year (or multiples thereof).
162Edit the jobconfig script in a central place (e.g. versioned in a git repo), then copy the (very same) shared file onto all
163source hosts and all destination hosts, and add crontab entries (or systemd timers or Monit entries or similar), along these
164lines:
166* crontab on source hosts:
168`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --create-src-snapshots --prune-src-snapshots --prune-src-bookmarks`
170`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --monitor-src-snapshots`
173* crontab on destination hosts:
175`* * * * * testuser /etc/bzfs/bzfs_job_example.py --dst-host="$(hostname)" --replicate --prune-dst-snapshots`
177`* * * * * testuser /etc/bzfs/bzfs_job_example.py --dst-host="$(hostname)" --monitor-dst-snapshots`
179### High Frequency Replication (Experimental Feature)
181Taking snapshots, and/or replicating, from every N milliseconds to every 10 seconds or so is considered high frequency. For
182such use cases, consider that `zfs list -t snapshot` performance degrades as more and more snapshots currently exist within
183the selected datasets, so try to keep the number of currently existing snapshots small, and prune them at a frequency that
184is proportional to the frequency with which snapshots are created. Consider using `--skip-parent` and `--exclude-dataset*`
185filters to limit the selected datasets only to those that require this level of frequency.
187In addition, use the `--daemon-*` options to reduce startup overhead, in combination with splitting the crontab entry (or
188better: high frequency systemd timer) into multiple processes, from a single source host to a single destination host,
189along these lines:
191* crontab on source hosts:
193`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --create-src-snapshots`
195`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --prune-src-snapshots`
197`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --prune-src-bookmarks`
199`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --monitor-src-snapshots`
202* crontab on destination hosts:
204`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="bar" --dst-host="$(hostname)" --replicate`
206`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="bar" --dst-host="$(hostname)" --prune-dst-snapshots`
208`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="bar" --dst-host="$(hostname)" --monitor-dst-snapshots`
210The daemon processes work like non-daemon processes except that they loop, handle time events and sleep between events, and
211finally exit after, say, 86400 seconds (whatever you specify via `--daemon-lifetime`). The daemons will subsequently be
212auto-restarted by 'cron', or earlier if they fail. While the daemons are running, 'cron' will attempt to start new
213(unnecessary) daemons but this is benign as these new processes immediately exit with a message like this:
214"Exiting as same previous periodic job is still running without completion yet"
215""")
217 # commands:
218 parser.add_argument(
219 "--create-src-snapshots", action="store_true",
220 help="Take snapshots on the selected source hosts as necessary. Typically, this command should be called by a "
221 "program (or cron job) running on each src host.\n\n")
222 parser.add_argument(
223 "--replicate", action="store_true",
224 help="Replicate snapshots from the selected source hosts to the selected destinations hosts as necessary. For pull "
225 "mode (recommended), this command should be called by a program (or cron job) running on each dst "
226 "host; for push mode, on the src host; for pull-push mode on a third-party host.\n\n")
227 parser.add_argument(
228 "--prune-src-snapshots", action="store_true",
229 help="Prune snapshots on the selected source hosts as necessary. Typically, this command should be called by a "
230 "program (or cron job) running on each src host.\n\n")
231 parser.add_argument(
232 "--prune-src-bookmarks", action="store_true",
233 help="Prune bookmarks on the selected source hosts as necessary. Typically, this command should be called by a "
234 "program (or cron job) running on each src host.\n\n")
235 parser.add_argument(
236 "--prune-dst-snapshots", action="store_true",
237 help="Prune snapshots on the selected destination hosts as necessary. Typically, this command should be called by a "
238 "program (or cron job) running on each dst host.\n\n")
239 parser.add_argument(
240 "--monitor-src-snapshots", action="store_true",
241 help="Alert the user if snapshots on the selected source hosts are too old, using --monitor-snapshot-plan (see "
242 "below). Typically, this command should be called by a program (or cron job) running on each src host.\n\n")
243 parser.add_argument(
244 "--monitor-dst-snapshots", action="store_true",
245 help="Alert the user if snapshots on the selected destination hosts are too old, using --monitor-snapshot-plan (see "
246 "below). Typically, this command should be called by a program (or cron job) running on each dst host.\n\n")
248 # options:
249 parser.add_argument(
250 "--localhost", default=None, action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING",
251 help="Hostname of localhost. Default is the hostname without the domain name, querying the Operating System.\n\n")
252 parser.add_argument(
253 "--src-hosts", default=None, metavar="LIST_STRING",
254 help="Hostnames of the sources to operate on.\n\n")
255 parser.add_argument(
256 "--src-host", default=None, action="append", metavar="STRING",
257 help="For subsetting --src-hosts; Can be specified multiple times; Indicates to only use the --src-hosts that are "
258 "contained in the specified --src-host values (optional).\n\n")
259 dst_hosts_example = {"nas": ["onsite"], "bak-us-west-1": ["us-west-1"],
260 "bak-eu-west-1": ["eu-west-1"], "archive": ["offsite"]}
261 parser.add_argument(
262 "--dst-hosts", default="{}", metavar="DICT_STRING",
263 help="Dictionary that maps each destination hostname to a list of zero or more logical replication target names "
264 "(the infix portion of snapshot name). "
265 f"Example: `{format_dict(dst_hosts_example)}`.\n\n"
266 "With this, given a snapshot name, we can find the destination hostnames to which the snapshot shall be "
267 "replicated. Also, given a snapshot name and its own name, a destination host can determine if it shall "
268 "replicate the given snapshot from the source host, or if the snapshot is intended for another destination "
269 "host, in which case it skips the snapshot. A destination host will receive replicas of snapshots for all "
270 "targets that map to that destination host.\n\n"
271 "Removing a mapping can be used to temporarily suspend replication to a given destination host.\n\n")
272 parser.add_argument(
273 "--dst-host", default=None, action="append", metavar="STRING",
274 help="For subsetting --dst-hosts; Can be specified multiple times; Indicates to only use the --dst-hosts keys that "
275 "are contained in the specified --dst-host values (optional).\n\n")
276 parser.add_argument(
277 "--retain-dst-targets", default="{}", metavar="DICT_STRING",
278 help="Dictionary that maps each destination hostname to a list of zero or more logical replication target names "
279 "(the infix portion of snapshot name). "
280 f"Example: `{format_dict(dst_hosts_example)}`. Has same format as --dst-hosts.\n\n"
281 "As part of --prune-dst-snapshots, a destination host will delete any snapshot it has stored whose target has "
282 "no mapping to that destination host in this dictionary. Do not remove a mapping here unless you are sure it's "
283 "ok to delete all those snapshots on that destination host! If in doubt, use --dryrun mode first.\n\n")
284 dst_root_datasets_example = {
285 "nas": "tank2/bak",
286 "bak-us-west-1": "backups/bak001",
287 "bak-eu-west-1": "backups/bak999",
288 "archive": f"archives/zoo/{SRC_MAGIC_SUBSTITUTION_TOKEN}",
289 "hotspare": "",
290 }
291 parser.add_argument(
292 "--dst-root-datasets", default="{}", metavar="DICT_STRING",
293 help="Dictionary that maps each destination hostname to a root dataset located on that destination host. The root "
294 "dataset name is an (optional) prefix that will be prepended to each dataset that is replicated to that "
295 "destination host. For backup use cases, this is the backup ZFS pool or a ZFS dataset path within that pool, "
296 "whereas for cloning, master slave replication, or replication from a primary to a secondary, this can also be "
297 "the empty string.\n\n"
298 f"`{SRC_MAGIC_SUBSTITUTION_TOKEN}` and `{DST_MAGIC_SUBSTITUTION_TOKEN}` are optional magic substitution tokens "
299 "that will be auto-replaced at runtime with the actual hostname. This can be used to force the use of a "
300 "separate destination root dataset per source host or per destination host.\n\n"
301 f"Example: `{format_dict(dst_root_datasets_example)}`\n\n")
302 src_snapshot_plan_example = {
303 "prod": {
304 "onsite": {"secondly": 40, "minutely": 40, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5},
305 "us-west-1": {"secondly": 0, "minutely": 0, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5},
306 "eu-west-1": {"secondly": 0, "minutely": 0, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5},
307 },
308 "test": {
309 "offsite": {"12hourly": 42, "weekly": 12},
310 },
311 }
312 parser.add_argument(
313 "--src-snapshot-plan", default="{}", metavar="DICT_STRING",
314 help="Retention periods for snapshots to be used if pruning src, and when creating new snapshots on src. "
315 "Snapshots that do not match a retention period will be deleted. A zero or missing retention period indicates "
316 "that no snapshots shall be retained (or even be created) for the given period.\n\n"
317 f"Example: `{format_dict(src_snapshot_plan_example)}`. This example will, for the organization 'prod' and "
318 "the intended logical target 'onsite', create and then retain secondly snapshots that were created less "
319 "than 40 seconds ago, yet retain the latest 40 secondly snapshots regardless of creation time. Analog for "
320 "the latest 40 minutely snapshots, 36 hourly snapshots, etc. "
321 "It will also create and retain snapshots for the targets 'us-west-1' and 'eu-west-1' within the 'prod' "
322 "organization. "
323 "In addition, it will create and retain snapshots every 12 hours and every week for the 'test' organization, "
324 "and name them as being intended for the 'offsite' replication target. "
325 "The example creates snapshots with names like "
326 "`prod_onsite_<timestamp>_secondly`, `prod_onsite_<timestamp>_minutely`, "
327 "`prod_us-west-1_<timestamp>_hourly`, `prod_us-west-1_<timestamp>_daily`, "
328 "`prod_eu-west-1_<timestamp>_hourly`, `prod_eu-west-1_<timestamp>_daily`, "
329 "`test_offsite_<timestamp>_12hourly`, `test_offsite_<timestamp>_weekly`, and so on.\n\n")
330 parser.add_argument(
331 "--src-bookmark-plan", default="{}", metavar="DICT_STRING",
332 help="Retention periods for bookmarks to be used if pruning src. Has same format as --src-snapshot-plan.\n\n")
333 parser.add_argument(
334 "--dst-snapshot-plan", default="{}", metavar="DICT_STRING",
335 help="Retention periods for snapshots to be used if pruning dst. Has same format as --src-snapshot-plan.\n\n")
336 monitor_snapshot_plan_example = {
337 "prod": {
338 "onsite": {
339 "100millisecondly": {"warning": "650 milliseconds", "critical": "2 seconds"},
340 "secondly": {"warning": "2 seconds", "critical": "14 seconds"},
341 "minutely": {"warning": "30 seconds", "critical": "300 seconds"},
342 "hourly": {"warning": "30 minutes", "critical": "300 minutes"},
343 "daily": {"warning": "4 hours", "critical": "8 hours"},
344 "weekly": {"warning": "2 days", "critical": "8 days"},
345 "monthly": {"warning": "2 days", "critical": "8 days"},
346 "yearly": {"warning": "5 days", "critical": "14 days"},
347 "10minutely": {"warning": "0 minutes", "critical": "0 minutes"},
348 },
349 "": {
350 "daily": {"warning": "4 hours", "critical": "8 hours"},
351 },
352 },
353 }
354 parser.add_argument(
355 "--monitor-snapshot-plan", default="{}", metavar="DICT_STRING",
356 help="Alert the user if the ZFS 'creation' time property of the latest or oldest snapshot for any specified "
357 "snapshot pattern within the selected datasets is too old wrt. the specified age limit. The purpose is to "
358 "check if snapshots are successfully taken on schedule, successfully replicated on schedule, and successfully "
359 "pruned on schedule. "
360 "Process exit code is 0, 1, 2 on OK, WARNING, CRITICAL, respectively. "
361 f"Example DICT_STRING: `{format_dict(monitor_snapshot_plan_example)}`. "
362 "This example alerts the user if the latest src or dst snapshot named `prod_onsite_<timestamp>_hourly` is more "
363 "than 30 minutes late (i.e. more than 30+60=90 minutes old) [warning] or more than 300 minutes late (i.e. more "
364 "than 300+60=360 minutes old) [critical]. In addition, the example alerts the user if the oldest src or dst "
365 "snapshot named `prod_onsite_<timestamp>_hourly` is more than 30 + 60x36 minutes old [warning] or more than "
366 "300 + 60x36 minutes old [critical], where 36 is the number of period cycles specified in `src_snapshot_plan` "
367 "or `dst_snapshot_plan`, respectively. "
368 "Analog for the latest snapshot named `prod_<timestamp>_daily`, and so on.\n\n"
369 "Note: A duration that is missing or zero (e.g. '0 minutes') indicates that no snapshots shall be checked for "
370 "the given snapshot name pattern.\n\n")
371 locations = ["src", "dst"]
372 for loc in locations:
373 parser.add_argument(
374 f"--ssh-{loc}-user", default="", metavar="STRING",
375 help=f"Remote SSH username on {loc} hosts to connect to (optional). Examples: 'root', 'alice'.\n\n")
376 for loc in locations:
377 parser.add_argument(
378 f"--ssh-{loc}-port", type=int, min=1, max=65535, action=check_range.CheckRange, metavar="INT",
379 help=f"Remote SSH port on {loc} host to connect to (optional).\n\n")
380 for loc in locations:
381 parser.add_argument(
382 f"--ssh-{loc}-config-file", type=str, action=bzfs_main.argparse_actions.SSHConfigFileNameAction, metavar="FILE",
383 help=f"Path to SSH ssh_config(5) file to connect to {loc} (optional); will be passed into ssh -F CLI. "
384 "The basename must contain the substring 'bzfs_ssh_config'.\n\n")
385 parser.add_argument(
386 "--job-id", required=True, action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING",
387 help="The identifier that remains constant across all runs of this particular job; will be included in the log file "
388 "name infix. Example: mytestjob\n\n")
389 parser.add_argument(
390 "--job-run", default="", action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING",
391 help="The identifier of this particular run of the overall job; will be included in the log file name suffix. "
392 "Default is a hex UUID. Example: 0badc0f003a011f0a94aef02ac16083c\n\n")
393 workers_default = 100 # percent
394 parser.add_argument(
395 "--workers", min=1, default=(workers_default, True), action=bzfs_main.argparse_actions.CheckPercentRange,
396 metavar="INT[%]",
397 help="The maximum number of jobs to run in parallel at any time; can be given as a positive integer, "
398 f"optionally followed by the %% percent character (min: %(min)s, default: {workers_default}%%). Percentages "
399 "are relative to the number of CPU cores on the machine. Example: 200%% uses twice as many parallel jobs as "
400 "there are cores on the machine; 75%% uses num_procs = num_cores * 0.75. Examples: 1, 4, 75%%, 150%%\n\n")
401 parser.add_argument(
402 "--work-period-seconds", type=float, min=0, default=0, action=check_range.CheckRange, metavar="FLOAT",
403 help="Reduces bandwidth spikes by spreading out the start of worker jobs over this much time; "
404 "0 disables this feature (default: %(default)s). Examples: 0, 60, 86400\n\n")
405 parser.add_argument(
406 "--jitter", action="store_true",
407 help="Randomize job start time and host order to avoid potential thundering herd problems in large distributed "
408 "systems (optional). Randomizing job start time is only relevant if --work-period-seconds > 0.\n\n")
409 parser.add_argument(
410 "--worker-timeout-seconds", type=float, min=0.001, default=None, action=check_range.CheckRange, metavar="FLOAT",
411 help="If this much time has passed after a worker process has started executing, kill the straggling worker "
412 "(optional). Other workers remain unaffected. Examples: 60, 3600\n\n")
413 parser.add_argument(
414 "--spawn-process-per-job", action="store_true",
415 help="Spawn a Python process per subjob instead of a Python thread per subjob (optional). The former is only "
416 "recommended for a job operating in parallel on a large number of hosts as it helps avoid exceeding "
417 "per-process limits such as the default max number of open file descriptors, at the expense of increased "
418 "startup latency.\n\n")
419 parser.add_argument(
420 "--jobrunner-dryrun", action="store_true",
421 help="Do a dry run (aka 'no-op') to print what operations would happen if the command were to be executed "
422 "for real (optional). This option treats both the ZFS source and destination as read-only. Can also be used to "
423 "check if the configuration options are valid.\n\n")
424 parser.add_argument(
425 "--jobrunner-log-level", choices=["CRITICAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"], default="INFO",
426 help="Only emit jobrunner messages with equal or higher priority than this log level. Default is '%(default)s'.\n\n")
427 parser.add_argument(
428 "--daemon-replication-frequency", default="minutely", metavar="STRING",
429 help="Specifies how often the bzfs daemon shall replicate from src to dst if --daemon-lifetime is nonzero.\n\n")
430 parser.add_argument(
431 "--daemon-prune-src-frequency", default="minutely", metavar="STRING",
432 help="Specifies how often the bzfs daemon shall prune src if --daemon-lifetime is nonzero.\n\n")
433 parser.add_argument(
434 "--daemon-prune-dst-frequency", default="minutely", metavar="STRING",
435 help="Specifies how often the bzfs daemon shall prune dst if --daemon-lifetime is nonzero.\n\n")
436 parser.add_argument(
437 "--daemon-monitor-snapshots-frequency", default="minutely", metavar="STRING",
438 help="Specifies how often the bzfs daemon shall monitor snapshot age if --daemon-lifetime is nonzero.\n\n")
439 bad_opts = ["--daemon-frequency", "--include-snapshot-plan", "--create-src-snapshots-plan", "--skip-replication",
440 "--log-file-prefix", "--log-file-infix", "--log-file-suffix",
441 "--delete-dst-datasets", "--delete-dst-snapshots", "--delete-dst-snapshots-except",
442 "--delete-dst-snapshots-except-plan", "--delete-empty-dst-datasets",
443 "--monitor-snapshots", "--timeout"]
444 for loc in locations:
445 bad_opts += [f"--ssh-{loc}-host"] # reject this arg as jobrunner will auto-generate it
446 for bad_opt in bad_opts:
447 parser.add_argument(bad_opt, action=RejectArgumentAction, nargs=0, help=argparse.SUPPRESS)
448 parser.add_argument(
449 "--version", action="version", version=f"{PROG_NAME}-{bzfs_main.argparse_cli.__version__}, by {PROG_AUTHOR}",
450 help="Display version information and exit.\n\n")
451 parser.add_argument(
452 "--help, -h", action="help", # trick to ensure both --help and -h are shown in the help msg
453 help="Show this help message and exit.\n\n")
454 parser.add_argument(
455 "--root-dataset-pairs", required=True, nargs="+", action=bzfs_main.argparse_actions.DatasetPairsAction,
456 metavar="SRC_DATASET DST_DATASET",
457 help="Source and destination dataset pairs (excluding usernames and excluding hostnames, which will all be "
458 "auto-appended later).\n\n")
459 return parser
460 # fmt: on
463#############################################################################
464def main() -> None:
465 """API for command line clients."""
466 prev_umask: int = os.umask(UMASK)
467 try:
468 set_logging_runtime_defaults()
469 # On CTRL-C and SIGTERM, send signal to all descendant processes to terminate them
470 termination_event: threading.Event = threading.Event()
471 with termination_signal_handler(termination_events=[termination_event]):
472 Job(log=None, termination_event=termination_event).run_main(sys_argv=sys.argv)
473 finally:
474 os.umask(prev_umask) # restore prior global state
477#############################################################################
478@final
479class Job:
480 """Coordinates subjobs per the CLI flags; Each subjob handles one host pair and may run in its own process or thread."""
482 def __init__(self, log: Logger | None, termination_event: threading.Event) -> None:
483 # immutable variables:
484 self.log_was_None: Final[bool] = log is None
485 self.log: Final[Logger] = get_simple_logger(PROG_NAME) if log is None else log
486 self.termination_event: Final[threading.Event] = termination_event
487 self.subprocesses: Final[Subprocesses] = Subprocesses(termination_event=self.termination_event)
488 self.jobrunner_dryrun: bool = False
489 self.spawn_process_per_job: bool = False
490 self.loopback_address: Final[str] = _detect_loopback_address()
492 # mutable variables:
493 self.first_exception: int | None = None
494 self.stats: JobStats = JobStats(jobs_all=0)
495 self.cache_existing_dst_pools: set[str] = set()
496 self.cache_known_dst_pools: set[str] = set()
498 self.is_test_mode: bool = False # for testing only
500 def run_main(self, sys_argv: list[str]) -> None:
501 """API for Python clients; visible for testing; may become a public API eventually."""
502 try:
503 self._run_main(sys_argv)
504 finally:
505 if self.log_was_None: # reset Logger unless it's a Logger outside of our control 505 ↛ exitline 505 didn't return from function 'run_main' because the condition on line 505 was always true
506 reset_logger(self.log)
508 def _run_main(self, sys_argv: list[str]) -> None:
509 self.first_exception = None
510 log: Logger = self.log
511 log.info("CLI arguments: %s", " ".join(sys_argv))
512 nsp = argparse.Namespace(no_argument_file=True) # disable --root-dataset-pairs='+file' option in DatasetPairsAction
513 args, unknown_args = argument_parser().parse_known_args(sys_argv[1:], nsp) # forward all unknown args to `bzfs`
514 log.setLevel(args.jobrunner_log_level)
515 self.jobrunner_dryrun = args.jobrunner_dryrun
516 assert len(args.root_dataset_pairs) > 0
517 src_snapshot_plan: dict = self.validate_snapshot_plan(literal_eval(args.src_snapshot_plan), "--src-snapshot-plan")
518 src_bookmark_plan: dict = self.validate_snapshot_plan(literal_eval(args.src_bookmark_plan), "--src-bookmark-plan")
519 dst_snapshot_plan: dict = self.validate_snapshot_plan(literal_eval(args.dst_snapshot_plan), "--dst-snapshot-plan")
520 monitor_snapshot_plan: dict = self.validate_monitor_snapshot_plan(literal_eval(args.monitor_snapshot_plan))
521 localhostname: str = args.localhost if args.localhost else socket.gethostname()
522 self.validate_host_name(localhostname, "--localhost")
523 log.debug("localhostname: %s", localhostname)
524 src_hosts: list[str] = self.validate_src_hosts(self.parse_src_hosts_from_cli_or_stdin(args.src_hosts))
525 basis_src_hosts: list[str] = src_hosts
526 nb_src_hosts: int = len(basis_src_hosts)
527 log.debug("src_hosts before subsetting: %s", src_hosts)
528 if args.src_host is not None: # retain only the src hosts that are also contained in args.src_host
529 assert isinstance(args.src_host, list)
530 retain_src_hosts: set[str] = set(args.src_host)
531 self.validate_is_subset(retain_src_hosts, src_hosts, "--src-host", "--src-hosts")
532 src_hosts = [host for host in src_hosts if host in retain_src_hosts]
533 dst_hosts: dict[str, list[str]] = self.validate_dst_hosts(literal_eval(args.dst_hosts))
534 nb_dst_hosts: int = len(dst_hosts)
535 if args.dst_host is not None: # retain only the dst hosts that are also contained in args.dst_host
536 assert isinstance(args.dst_host, list)
537 retain_dst_hosts: set[str] = set(args.dst_host)
538 self.validate_is_subset(retain_dst_hosts, dst_hosts.keys(), "--dst-host", "--dst-hosts.keys")
539 dst_hosts = {dst_host: lst for dst_host, lst in dst_hosts.items() if dst_host in retain_dst_hosts}
540 retain_dst_targets: dict[str, list[str]] = self.validate_dst_hosts(literal_eval(args.retain_dst_targets))
541 self.validate_is_subset(dst_hosts.keys(), retain_dst_targets.keys(), "--dst-hosts.keys", "--retain-dst-targets.keys")
542 dst_root_datasets: dict[str, str] = self.validate_dst_root_datasets(literal_eval(args.dst_root_datasets))
543 self.validate_is_subset(
544 dst_root_datasets.keys(), retain_dst_targets.keys(), "--dst-root-dataset.keys", "--retain-dst-targets.keys"
545 )
546 self.validate_is_subset(dst_hosts.keys(), dst_root_datasets.keys(), "--dst-hosts.keys", "--dst-root-dataset.keys")
547 bad_root_datasets: dict[str, str] = {
548 dst_host: root_dataset
549 for dst_host in sorted(dst_hosts.keys())
550 if SRC_MAGIC_SUBSTITUTION_TOKEN not in (root_dataset := dst_root_datasets[dst_host])
551 }
552 if len(src_hosts) > 1 and len(bad_root_datasets) > 0:
553 self.die(
554 "Cowardly refusing to proceed as multiple source hosts must not be configured to write to the same "
555 "destination dataset. "
556 f"Problematic subset of --dst-root-datasets: {bad_root_datasets} for src_hosts: {sorted(src_hosts)}"
557 )
558 bad_root_datasets = {
559 dst_host: root_dataset
560 for dst_host, root_dataset in sorted(dst_root_datasets.items())
561 if root_dataset and SRC_MAGIC_SUBSTITUTION_TOKEN not in root_dataset
562 }
563 if len(basis_src_hosts) > 1 and len(bad_root_datasets) > 0:
564 self.die(
565 "Cowardly refusing to proceed as multiple source hosts are defined in the configuration, but "
566 f"not all non-empty root datasets in --dst-root-datasets contain the '{SRC_MAGIC_SUBSTITUTION_TOKEN}' "
567 "substitution token to prevent collisions on writing destination datasets. "
568 f"Problematic subset of --dst-root-datasets: {bad_root_datasets} for src_hosts: {sorted(basis_src_hosts)}"
569 )
570 if args.jitter: # randomize host order to avoid potential thundering herd problems in large distributed systems
571 random.SystemRandom().shuffle(src_hosts)
572 dst_hosts = shuffle_dict(dst_hosts)
573 ssh_src_user: str = args.ssh_src_user
574 ssh_dst_user: str = args.ssh_dst_user
575 ssh_src_port: int | None = args.ssh_src_port
576 ssh_dst_port: int | None = args.ssh_dst_port
577 ssh_src_config_file: str | None = args.ssh_src_config_file
578 ssh_dst_config_file: str | None = args.ssh_dst_config_file
579 job_id: str = _sanitize(args.job_id)
580 job_run: str = _sanitize(args.job_run) if args.job_run else uuid.uuid1().hex
581 workers, workers_is_percent = args.workers
582 max_workers: int = max(1, round((os.cpu_count() or 1) * workers / 100.0) if workers_is_percent else round(workers))
583 worker_timeout_seconds: int = args.worker_timeout_seconds
584 self.spawn_process_per_job = args.spawn_process_per_job
585 username: str = pwd.getpwuid(os.getuid()).pw_name
586 assert username
587 loopback_ids: set[str] = {"localhost", "127.0.0.1", "::1", socket.gethostname()} # ::1 is IPv6 loopback address
588 loopback_ids.update(self.get_localhost_ips()) # union
589 loopback_ids.add(localhostname)
590 loopback_ids = set() if getenv_bool("disable_loopback", False) else loopback_ids
591 log.log(LOG_TRACE, "loopback_ids: %s", sorted(loopback_ids))
593 def zero_pad(number: int, width: int = 6) -> str:
594 """Pads number with leading '0' chars to the given width."""
595 return f"{number:0{width}d}"
597 def jpad(jj: int, tag: str) -> str:
598 """Returns ``tag`` prefixed with slash and zero padded index."""
599 return "/" + zero_pad(jj) + tag
601 def runpad() -> str:
602 """Returns standardized subjob count suffix."""
603 return job_run + SEP + zero_pad(len(subjobs))
605 def update_subjob_name(tag: str) -> str:
606 """Derives next subjob name based on ``tag`` and index ``j``."""
607 if j <= 0:
608 return subjob_name
609 elif j == 1:
610 return subjob_name + jpad(j - 1, tag)
611 else:
612 return subjob_name + "/" + BARRIER_CHAR
614 def resolve_dataset(hostname: str, dataset: str, is_src: bool = True) -> str:
615 """Returns host:dataset string resolving IPv6 and localhost cases."""
616 assert hostname
617 assert dataset
618 ssh_user = ssh_src_user if is_src else ssh_dst_user
619 ssh_user = ssh_user if ssh_user else username
620 lb: str = self.loopback_address
621 loopbck_ids: set[str] = loopback_ids
622 hostname = hostname if hostname not in loopbck_ids else (lb if lb else hostname) if username != ssh_user else "-"
623 hostname = convert_ipv6(hostname)
624 return f"{hostname}:{dataset}"
626 def resolve_dst_dataset(dst_hostname: str, dst_dataset: str) -> str:
627 """Expands ``dst_dataset`` relative to ``dst_hostname`` roots."""
628 assert dst_hostname
629 assert dst_dataset
630 root_dataset: str | None = dst_root_datasets.get(dst_hostname)
631 assert root_dataset is not None, dst_hostname # f"Hostname '{dst_hostname}' missing in --dst-root-datasets"
632 root_dataset = root_dataset.replace(SRC_MAGIC_SUBSTITUTION_TOKEN, src_host)
633 root_dataset = root_dataset.replace(DST_MAGIC_SUBSTITUTION_TOKEN, dst_hostname)
634 resolved_dst_dataset: str = f"{root_dataset}/{dst_dataset}" if root_dataset else dst_dataset
635 validate_dataset_name(resolved_dst_dataset, dst_dataset)
636 return resolve_dataset(dst_hostname, resolved_dst_dataset, is_src=False)
638 for src_host in src_hosts:
639 assert src_host
640 for dst_hostname in dst_hosts:
641 assert dst_hostname
642 dummy: Final[str] = DUMMY_DATASET
643 lhn: Final[str] = localhostname
644 bzfs_prog_header: Final[list[str]] = [BZFS_PROG_NAME, "--no-argument-file"] + unknown_args
645 subjobs: dict[str, list[str]] = {}
646 for i, src_host in enumerate(src_hosts):
647 subjob_name: str = zero_pad(i) + "src-host"
648 src_log_suffix: str = _log_suffix(localhostname, src_host, "")
649 j: int = 0
650 opts: list[str]
652 if args.create_src_snapshots:
653 opts = ["--create-src-snapshots", f"--create-src-snapshots-plan={src_snapshot_plan}", "--skip-replication"]
654 self.add_log_file_opts(opts, "create-src-snapshots", job_id, runpad(), src_log_suffix)
655 self.add_ssh_opts(
656 opts, ssh_src_user=ssh_src_user, ssh_src_port=ssh_src_port, ssh_src_config_file=ssh_src_config_file
657 )
658 opts += [POSIX_END_OF_OPTIONS_MARKER]
659 opts += _flatten(_dedupe([(resolve_dataset(src_host, src), dummy) for src, dst in args.root_dataset_pairs]))
660 subjob_name += "/create-src-snapshots"
661 subjobs[subjob_name] = bzfs_prog_header + opts
663 if args.replicate:
664 j = 0
665 marker: str = "replicate"
666 for dst_hostname, targets in dst_hosts.items():
667 opts = self.replication_opts(
668 dst_snapshot_plan, set(targets), lhn, src_host, dst_hostname, marker, job_id, runpad()
669 )
670 if len(opts) > 0:
671 opts += [f"--daemon-frequency={args.daemon_replication_frequency}"]
672 self.add_ssh_opts(
673 opts,
674 ssh_src_user=ssh_src_user,
675 ssh_dst_user=ssh_dst_user,
676 ssh_src_port=ssh_src_port,
677 ssh_dst_port=ssh_dst_port,
678 ssh_src_config_file=ssh_src_config_file,
679 ssh_dst_config_file=ssh_dst_config_file,
680 )
681 opts += [POSIX_END_OF_OPTIONS_MARKER]
682 dataset_pairs: list[tuple[str, str]] = [
683 (resolve_dataset(src_host, src), resolve_dst_dataset(dst_hostname, dst))
684 for src, dst in args.root_dataset_pairs
685 ]
686 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs, worker_timeout_seconds)
687 if len(dataset_pairs) > 0:
688 subjobs[subjob_name + jpad(j, marker)] = bzfs_prog_header + opts + _flatten(dataset_pairs)
689 j += 1
690 subjob_name = update_subjob_name(marker)
692 def prune_src(
693 opts: list[str], retention_plan: dict, tag: str, src_host: str = src_host, logsuffix: str = src_log_suffix
694 ) -> None:
695 """Creates prune subjob options for ``tag`` using ``retention_plan``."""
696 opts += ["--skip-replication", f"--delete-dst-snapshots-except-plan={retention_plan}"]
697 opts += [f"--daemon-frequency={args.daemon_prune_src_frequency}"]
698 self.add_log_file_opts(opts, tag, job_id, runpad(), logsuffix)
699 self.add_ssh_opts( # i.e. dst=src, src=dummy
700 opts, ssh_dst_user=ssh_src_user, ssh_dst_port=ssh_src_port, ssh_dst_config_file=ssh_src_config_file
701 )
702 opts += [POSIX_END_OF_OPTIONS_MARKER]
703 opts += _flatten(_dedupe([(dummy, resolve_dataset(src_host, src)) for src, dst in args.root_dataset_pairs]))
704 nonlocal subjob_name
705 subjob_name += f"/{tag}"
706 subjobs[subjob_name] = bzfs_prog_header + opts
708 if args.prune_src_snapshots:
709 prune_src(["--delete-dst-snapshots"], src_snapshot_plan, "prune-src-snapshots")
711 if args.prune_src_bookmarks:
712 prune_src(["--delete-dst-snapshots=bookmarks"], src_bookmark_plan, "prune-src-bookmarks")
714 if args.prune_dst_snapshots:
715 self.validate_true(
716 retain_dst_targets, "--retain-dst-targets must not be empty. Cowardly refusing to delete all snapshots!"
717 )
718 j = 0
719 marker = "prune-dst-snapshots"
720 for dst_hostname, _ in dst_hosts.items():
721 curr_retain_targets: set[str] = set(retain_dst_targets[dst_hostname])
722 curr_dst_snapshot_plan = { # only retain targets that belong to the host
723 org: {target: periods for target, periods in target_periods.items() if target in curr_retain_targets}
724 for org, target_periods in dst_snapshot_plan.items()
725 }
726 opts = ["--delete-dst-snapshots", "--skip-replication"]
727 opts += [f"--delete-dst-snapshots-except-plan={curr_dst_snapshot_plan}"]
728 opts += [f"--daemon-frequency={args.daemon_prune_dst_frequency}"]
729 self.add_log_file_opts(opts, marker, job_id, runpad(), _log_suffix(lhn, src_host, dst_hostname))
730 self.add_ssh_opts(
731 opts, ssh_dst_user=ssh_dst_user, ssh_dst_port=ssh_dst_port, ssh_dst_config_file=ssh_dst_config_file
732 )
733 opts += [POSIX_END_OF_OPTIONS_MARKER]
734 dataset_pairs = [(dummy, resolve_dst_dataset(dst_hostname, dst)) for src, dst in args.root_dataset_pairs]
735 dataset_pairs = _dedupe(dataset_pairs)
736 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs, worker_timeout_seconds)
737 if len(dataset_pairs) > 0:
738 subjobs[subjob_name + jpad(j, marker)] = bzfs_prog_header + opts + _flatten(dataset_pairs)
739 j += 1
740 subjob_name = update_subjob_name(marker)
742 def monitor_snapshots_opts(tag: str, monitor_plan: dict, logsuffix: str) -> list[str]:
743 """Returns monitor subjob options for ``tag`` and ``monitor_plan``."""
744 opts = [f"--monitor-snapshots={monitor_plan}", "--skip-replication"]
745 opts += [f"--daemon-frequency={args.daemon_monitor_snapshots_frequency}"]
746 self.add_log_file_opts(opts, tag, job_id, runpad(), logsuffix)
747 return opts
749 def build_monitor_plan(monitor_plan: dict, snapshot_plan: dict, cycles_prefix: str) -> dict:
750 """Expands ``monitor_plan`` with cycle defaults from ``snapshot_plan``."""
752 def alert_dicts(alertdict: dict, cycles: int) -> dict:
753 """Returns alert dictionaries with explicit ``cycles`` value."""
754 latest_dict = alertdict.copy()
755 for prefix in ("src_snapshot_", "dst_snapshot_", ""):
756 latest_dict.pop(f"{prefix}cycles", None)
757 oldest_dict = latest_dict.copy()
758 oldest_dict["cycles"] = int(alertdict.get(f"{cycles_prefix}cycles", cycles))
759 return {"latest": latest_dict, "oldest": oldest_dict}
761 return {
762 org: {
763 target: {
764 periodunit: alert_dicts(alertdict, snapshot_plan.get(org, {}).get(target, {}).get(periodunit, 1))
765 for periodunit, alertdict in periods.items()
766 }
767 for target, periods in target_periods.items()
768 }
769 for org, target_periods in monitor_plan.items()
770 }
772 if args.monitor_src_snapshots:
773 marker = "monitor-src-snapshots"
774 monitor_plan = build_monitor_plan(monitor_snapshot_plan, src_snapshot_plan, "src_snapshot_")
775 opts = monitor_snapshots_opts(marker, monitor_plan, src_log_suffix)
776 self.add_ssh_opts( # i.e. dst=src, src=dummy
777 opts, ssh_dst_user=ssh_src_user, ssh_dst_port=ssh_src_port, ssh_dst_config_file=ssh_src_config_file
778 )
779 opts += [POSIX_END_OF_OPTIONS_MARKER]
780 opts += _flatten(_dedupe([(dummy, resolve_dataset(src_host, src)) for src, dst in args.root_dataset_pairs]))
781 subjob_name += "/" + marker
782 subjobs[subjob_name] = bzfs_prog_header + opts
784 if args.monitor_dst_snapshots:
785 j = 0
786 marker = "monitor-dst-snapshots"
787 for dst_hostname, targets in dst_hosts.items():
788 monitor_targets = set(targets).intersection(set(retain_dst_targets[dst_hostname]))
789 monitor_plan = { # only retain targets that belong to the host
790 org: {target: periods for target, periods in target_periods.items() if target in monitor_targets}
791 for org, target_periods in monitor_snapshot_plan.items()
792 }
793 monitor_plan = build_monitor_plan(monitor_plan, dst_snapshot_plan, "dst_snapshot_")
794 opts = monitor_snapshots_opts(marker, monitor_plan, _log_suffix(lhn, src_host, dst_hostname))
795 self.add_ssh_opts(
796 opts, ssh_dst_user=ssh_dst_user, ssh_dst_port=ssh_dst_port, ssh_dst_config_file=ssh_dst_config_file
797 )
798 opts += [POSIX_END_OF_OPTIONS_MARKER]
799 dataset_pairs = [(dummy, resolve_dst_dataset(dst_hostname, dst)) for src, dst in args.root_dataset_pairs]
800 dataset_pairs = _dedupe(dataset_pairs)
801 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs, worker_timeout_seconds)
802 if len(dataset_pairs) > 0:
803 subjobs[subjob_name + jpad(j, marker)] = bzfs_prog_header + opts + _flatten(dataset_pairs)
804 j += 1
805 subjob_name = update_subjob_name(marker)
807 msg = dry("Ready to run %s subjobs using %s/%s src hosts: %s, %s/%s dst hosts: %s", is_dry_run=self.jobrunner_dryrun)
808 log.info(
809 msg, len(subjobs), len(src_hosts), nb_src_hosts, src_hosts, len(dst_hosts), nb_dst_hosts, list(dst_hosts.keys())
810 )
811 log.log(LOG_TRACE, "subjobs: \n%s", _pretty_print_formatter(subjobs))
812 self.run_subjobs(subjobs, max_workers, worker_timeout_seconds, args.work_period_seconds, args.jitter)
813 ex = self.first_exception
814 if isinstance(ex, int):
815 assert ex != 0
816 sys.exit(ex)
817 assert ex is None, ex
818 log.info("Succeeded. Bye!")
820 def replication_opts(
821 self,
822 dst_snapshot_plan: dict[str, dict[str, dict[str, int]]],
823 targets: set[str],
824 localhostname: str,
825 src_hostname: str,
826 dst_hostname: str,
827 tag: str,
828 job_id: str,
829 job_run: str,
830 ) -> list[str]:
831 """Returns CLI options for one replication subjob."""
832 log = self.log
833 log.debug("%s", f"Replicating targets {sorted(targets)} from {src_hostname} to {dst_hostname} ...")
834 include_snapshot_plan = { # only replicate targets that belong to the destination host and are relevant
835 org: {
836 target: {
837 duration_unit: duration_amount
838 for duration_unit, duration_amount in periods.items()
839 if duration_amount > 0
840 }
841 for target, periods in target_periods.items()
842 if target in targets
843 }
844 for org, target_periods in dst_snapshot_plan.items()
845 }
846 include_snapshot_plan = { # only replicate orgs that have at least one relevant target_period
847 org: target_periods
848 for org, target_periods in include_snapshot_plan.items()
849 if any(len(periods) > 0 for target, periods in target_periods.items())
850 }
851 opts: list[str] = []
852 if len(include_snapshot_plan) > 0:
853 opts += [f"--include-snapshot-plan={include_snapshot_plan}"]
854 self.add_log_file_opts(opts, tag, job_id, job_run, _log_suffix(localhostname, src_hostname, dst_hostname))
855 return opts
857 def skip_nonexisting_local_dst_pools(
858 self, root_dataset_pairs: list[tuple[str, str]], timeout_secs: float | None = None
859 ) -> list[tuple[str, str]]:
860 """Skip datasets that point to removable destination drives that are not currently (locally) attached, if any."""
862 def zpool(dataset: str) -> str:
863 """Returns pool name portion of ``dataset``."""
864 return dataset.split("/", 1)[0]
866 assert len(root_dataset_pairs) > 0
867 unknown_dst_pools = {zpool(dst) for src, dst in root_dataset_pairs}
868 unknown_dst_pools = unknown_dst_pools.difference(self.cache_known_dst_pools)
870 # Here we treat a zpool as existing if the zpool isn't local, aka if it isn't prefixed with "-:". A remote host
871 # will raise an appropriate error if it turns out that the remote zpool doesn't actually exist.
872 unknown_local_dst_pools = {pool for pool in unknown_dst_pools if pool.startswith("-:")}
873 if len(unknown_local_dst_pools) > 0: # `zfs list` if local
874 existing_pools = {pool[len("-:") :] for pool in unknown_local_dst_pools}
875 cmd = "zfs list -t filesystem,volume -Hp -o name".split(" ") + sorted(existing_pools)
876 sp = subprocess.run(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, text=True, timeout=timeout_secs)
877 if sp.returncode not in (0, 1): # 1 means dataset not found
878 self.die(f"Unexpected error {sp.returncode} on checking for existing local dst pools: {sp.stderr.strip()}")
879 existing_pools = {"-:" + pool for pool in sp.stdout.splitlines() if pool}
880 self.cache_existing_dst_pools.update(existing_pools) # union
881 unknown_remote_dst_pools = unknown_dst_pools.difference(unknown_local_dst_pools)
882 self.cache_existing_dst_pools.update(unknown_remote_dst_pools) # union
883 self.cache_known_dst_pools.update(unknown_dst_pools) # union
884 results: list[tuple[str, str]] = []
885 for src, dst in root_dataset_pairs:
886 if zpool(dst) in self.cache_existing_dst_pools:
887 results.append((src, dst))
888 else:
889 self.log.warning("Skipping dst dataset for which local dst pool does not exist: %s", dst)
890 return results
892 @staticmethod
893 def add_ssh_opts(
894 opts: list[str],
895 ssh_src_user: str | None = None,
896 ssh_dst_user: str | None = None,
897 ssh_src_port: int | None = None,
898 ssh_dst_port: int | None = None,
899 ssh_src_config_file: str | None = None,
900 ssh_dst_config_file: str | None = None,
901 ) -> None:
902 """Appends ssh related options to ``opts`` if specified."""
903 assert isinstance(opts, list)
904 opts += [f"--ssh-src-user={ssh_src_user}"] if ssh_src_user else []
905 opts += [f"--ssh-dst-user={ssh_dst_user}"] if ssh_dst_user else []
906 opts += [f"--ssh-src-port={ssh_src_port}"] if ssh_src_port is not None else []
907 opts += [f"--ssh-dst-port={ssh_dst_port}"] if ssh_dst_port is not None else []
908 opts += [f"--ssh-src-config-file={ssh_src_config_file}"] if ssh_src_config_file else []
909 opts += [f"--ssh-dst-config-file={ssh_dst_config_file}"] if ssh_dst_config_file else []
911 @staticmethod
912 def add_log_file_opts(opts: list[str], tag: str, job_id: str, job_run: str, logsuffix: str) -> None:
913 """Appends standard log-file CLI options to ``opts``."""
914 opts += [f"--log-file-prefix={PROG_NAME}{SEP}{tag}{SEP}"]
915 opts += [f"--log-file-infix={SEP}{job_id}"]
916 opts += [f"--log-file-suffix={SEP}{job_run}{logsuffix}{SEP}"]
918 def run_subjobs(
919 self,
920 subjobs: dict[str, list[str]],
921 max_workers: int,
922 timeout_secs: float | None,
923 work_period_seconds: float,
924 jitter: bool,
925 ) -> None:
926 """Executes subjobs sequentially or in parallel, respecting '~' barriers.
928 Design note on subjob failure, isolation and termination:
929 - On subjob failure the subjob's subtree is skipped.
930 - Subjob failures are converted to return codes (not exceptions) so the policy does not invoke a termination handler
931 on such failures and sibling subjobs continue unaffected. This preserves per-subjob isolation, both within a single
932 process (thread-per-subjob mode; default) as well as in process-per-subjob mode (``--spawn-process-per-job``).
933 """
934 self.stats = JobStats(len(subjobs))
935 log = self.log
936 num_intervals = 1 + len(subjobs) if jitter else len(subjobs)
937 interval_nanos = 0 if len(subjobs) == 0 else round(1_000_000_000 * max(0.0, work_period_seconds) / num_intervals)
938 assert interval_nanos >= 0
939 if jitter: # randomize job start time to avoid potential thundering herd problems in large distributed systems
940 sleep_nanos = random.SystemRandom().randint(0, interval_nanos)
941 log.info("Jitter: Delaying job start time by sleeping for %s ...", human_readable_duration(sleep_nanos))
942 self.termination_event.wait(sleep_nanos / 1_000_000_000) # allow early wakeup on async termination
943 sorted_subjobs: list[str] = sorted(subjobs.keys())
944 spawn_process_per_job: bool = self.spawn_process_per_job
945 log.log(LOG_TRACE, "%s: %s", "spawn_process_per_job", spawn_process_per_job)
946 if process_datasets_in_parallel_and_fault_tolerant(
947 log=log,
948 datasets=sorted_subjobs,
949 process_dataset=lambda subjob, tid, retry: self.run_subjob(
950 subjobs[subjob], name=subjob, timeout_secs=timeout_secs, spawn_process_per_job=spawn_process_per_job
951 )
952 == 0,
953 skip_tree_on_error=lambda subjob: True,
954 skip_on_error="dataset",
955 max_workers=max_workers,
956 interval_nanos=lambda last_update_nanos, dataset, submit_count: interval_nanos,
957 termination_event=self.termination_event,
958 termination_handler=self.subprocesses.terminate_process_subtrees,
959 task_name="Subjob",
960 retry_policy=None, # no retries
961 dry_run=False,
962 is_test_mode=self.is_test_mode,
963 ):
964 self.first_exception = DIE_STATUS if self.first_exception is None else self.first_exception
965 stats = self.stats
966 jobs_skipped = stats.jobs_all - stats.jobs_started
967 msg = f"{stats}, skipped:" + percent(jobs_skipped, total=stats.jobs_all, print_total=True)
968 log.info("Final Progress: %s", msg)
969 assert stats.jobs_running == 0, msg
970 assert stats.jobs_completed == stats.jobs_started, msg
971 skipped_jobs_dict = {subjob: subjobs[subjob] for subjob in sorted_subjobs if subjob not in stats.started_job_names}
972 if len(skipped_jobs_dict) > 0:
973 log.debug("Skipped subjobs: \n%s", _pretty_print_formatter(skipped_jobs_dict))
974 assert jobs_skipped == len(skipped_jobs_dict), msg
976 def run_subjob(
977 self, cmd: list[str], name: str, timeout_secs: float | None, spawn_process_per_job: bool
978 ) -> int | None: # thread-safe
979 """Executes one worker job and updates shared Stats."""
980 start_time_nanos = time.monotonic_ns()
981 returncode = None
982 log = self.log
983 cmd_str = " ".join(cmd)
984 stats = self.stats
985 try:
986 msg: str = stats.submit_job(name)
987 log.log(LOG_TRACE, "Starting worker job: %s", cmd_str)
988 log.info("Progress: %s", msg)
989 start_time_nanos = time.monotonic_ns()
990 if spawn_process_per_job:
991 returncode = self.run_worker_job_spawn_process_per_job(cmd, timeout_secs)
992 else:
993 returncode = self.run_worker_job_in_current_thread(cmd, timeout_secs)
994 except BaseException as e:
995 log.error("Worker job failed with unexpected exception: %s for command: %s", e, cmd_str)
996 raise
997 else:
998 elapsed_human: str = human_readable_duration(time.monotonic_ns() - start_time_nanos)
999 if returncode != 0:
1000 with stats.lock:
1001 if self.first_exception is None:
1002 self.first_exception = DIE_STATUS if returncode is None else returncode
1003 log.error("Worker job failed with exit code %s in %s: %s", returncode, elapsed_human, cmd_str)
1004 else:
1005 log.debug("Worker job succeeded in %s: %s", elapsed_human, cmd_str)
1006 return returncode
1007 finally:
1008 msg = stats.complete_job(failed=returncode != 0, elapsed_nanos=time.monotonic_ns() - start_time_nanos)
1009 log.info("Progress: %s", msg)
1011 def run_worker_job_in_current_thread(self, cmd: list[str], timeout_secs: float | None) -> int | None:
1012 """Runs ``bzfs`` in-process and return its exit code."""
1013 log = self.log
1014 if timeout_secs is not None:
1015 cmd = cmd[0:1] + [f"--timeout={round(1000 * timeout_secs)}milliseconds"] + cmd[1:]
1016 try:
1017 if not self.jobrunner_dryrun:
1018 self._bzfs_run_main(cmd)
1019 return 0
1020 except subprocess.CalledProcessError as e:
1021 return e.returncode
1022 except SystemExit as e:
1023 assert e.code is None or isinstance(e.code, int)
1024 return e.code
1025 except BaseException:
1026 log.exception("Worker job failed with unexpected exception for command: %s", " ".join(cmd))
1027 return DIE_STATUS
1029 def _bzfs_run_main(self, cmd: list[str]) -> None:
1030 """Delegates execution to :mod:`bzfs` using parsed arguments."""
1031 bzfs_job = bzfs.Job(termination_event=self.termination_event)
1032 bzfs_job.is_test_mode = self.is_test_mode
1033 bzfs_job.run_main(bzfs.argument_parser().parse_args(cmd[1:]), cmd)
1035 def run_worker_job_spawn_process_per_job(self, cmd: list[str], timeout_secs: float | None) -> int | None:
1036 """Spawns a subprocess for the worker job and waits for completion."""
1037 log = self.log
1038 if len(cmd) > 0 and cmd[0] == BZFS_PROG_NAME:
1039 cmd = [sys.executable, "-m", "bzfs_main." + cmd[0]] + cmd[1:]
1040 if self.jobrunner_dryrun:
1041 return 0
1043 with self.subprocesses.popen_and_track(cmd, stdin=subprocess.DEVNULL, text=True) as proc:
1044 try:
1045 if self.termination_event.is_set():
1046 timeout_secs = 1.0 if timeout_secs is None else timeout_secs
1047 raise subprocess.TimeoutExpired(cmd, timeout_secs) # do not wait for normal completion
1048 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to complete and exit normally
1049 except subprocess.TimeoutExpired:
1050 cmd_str = " ".join(cmd)
1051 if self.termination_event.is_set():
1052 log.error("%s", f"Terminating worker job due to async termination request: {cmd_str}")
1053 else:
1054 log.error("%s", f"Terminating worker job as it failed to complete within {timeout_secs}s: {cmd_str}")
1055 proc.terminate() # Sends SIGTERM signal to job subprocess
1056 assert timeout_secs is not None
1057 timeout_secs = min(1.0, timeout_secs)
1058 try:
1059 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to exit
1060 except subprocess.TimeoutExpired:
1061 log.error("%s", f"Killing worker job as it failed to terminate within {timeout_secs}s: {cmd_str}")
1062 terminate_process_subtree(root_pids=[proc.pid]) # Send SIGTERM to process subtree
1063 proc.kill() # Sends SIGKILL signal to job subprocess because SIGTERM wasn't enough
1064 timeout_secs = min(0.025, timeout_secs)
1065 with contextlib.suppress(subprocess.TimeoutExpired):
1066 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to exit
1067 return proc.returncode
1069 def validate_src_hosts(self, src_hosts: list[str]) -> list[str]:
1070 """Checks ``src_hosts`` contains valid hostnames."""
1071 context = "--src-hosts"
1072 self.validate_type(src_hosts, list, context)
1073 for src_hostname in src_hosts:
1074 self.validate_host_name(src_hostname, context)
1075 return src_hosts
1077 def validate_dst_hosts(self, dst_hosts: dict[str, list[str]]) -> dict[str, list[str]]:
1078 """Checks destination hosts dictionary."""
1079 context = "--dst-hosts"
1080 self.validate_type(dst_hosts, dict, context)
1081 for dst_hostname, targets in dst_hosts.items():
1082 self.validate_host_name(dst_hostname, context)
1083 self.validate_type(targets, list, f"{context} targets")
1084 for target in targets:
1085 self.validate_type(target, str, f"{context} target")
1086 return dst_hosts
1088 def validate_dst_root_datasets(self, dst_root_datasets: dict[str, str]) -> dict[str, str]:
1089 """Checks that each destination root dataset string is valid."""
1090 context = "--dst-root-datasets"
1091 self.validate_type(dst_root_datasets, dict, context)
1092 for dst_hostname, dst_root_dataset in dst_root_datasets.items():
1093 self.validate_host_name(dst_hostname, context)
1094 self.validate_type(dst_root_dataset, str, f"{context} root dataset")
1095 return dst_root_datasets
1097 def validate_snapshot_plan(
1098 self, snapshot_plan: dict[str, dict[str, dict[str, int]]], context: str
1099 ) -> dict[str, dict[str, dict[str, int]]]:
1100 """Checks snapshot plan structure and value types."""
1101 self.validate_type(snapshot_plan, dict, context)
1102 for org, target_periods in snapshot_plan.items():
1103 self.validate_type(org, str, f"{context} org")
1104 self.validate_type(target_periods, dict, f"{context} target_periods")
1105 for target, periods in target_periods.items():
1106 self.validate_type(target, str, f"{context} org/target")
1107 self.validate_type(periods, dict, f"{context} org/periods")
1108 for period_unit, period_amount in periods.items():
1109 self.validate_non_empty_string(period_unit, f"{context} org/target/period_unit")
1110 self.validate_non_negative_int(period_amount, f"{context} org/target/period_amount")
1111 return snapshot_plan
1113 def validate_monitor_snapshot_plan(
1114 self, monitor_snapshot_plan: dict[str, dict[str, dict[str, dict[str, str | int]]]]
1115 ) -> dict[str, dict[str, dict[str, dict[str, str | int]]]]:
1116 """Checks snapshot monitoring plan configuration."""
1117 context = "--monitor-snapshot-plan"
1118 self.validate_type(monitor_snapshot_plan, dict, context)
1119 for org, target_periods in monitor_snapshot_plan.items():
1120 self.validate_type(org, str, f"{context} org")
1121 self.validate_type(target_periods, dict, f"{context} target_periods")
1122 for target, periods in target_periods.items():
1123 self.validate_type(target, str, f"{context} org/target")
1124 self.validate_type(periods, dict, f"{context} org/periods")
1125 for period_unit, alert_dict in periods.items():
1126 self.validate_non_empty_string(period_unit, f"{context} org/target/period_unit")
1127 self.validate_type(alert_dict, dict, f"{context} org/target/alert_dict")
1128 for key, value in alert_dict.items():
1129 self.validate_non_empty_string(key, f"{context} org/target/alert_dict/key")
1130 self.validate_type(value, Union[str, int], f"{context} org/target/alert_dict/value")
1131 return monitor_snapshot_plan
1133 def validate_is_subset(self, x: Iterable[str], y: Iterable[str], x_name: str, y_name: str) -> None:
1134 """Raises error if ``x`` contains an item not present in ``y``."""
1135 if isinstance(x, str) or not isinstance(x, Iterable):
1136 self.die(f"{x_name} must be an Iterable")
1137 if isinstance(y, str) or not isinstance(y, Iterable):
1138 self.die(f"{y_name} must be an Iterable")
1139 if not set(x).issubset(set(y)):
1140 diff = sorted(set(x).difference(set(y)))
1141 self.die(f"{x_name} must be a subset of {y_name}. diff: {diff}, {x_name}: {sorted(x)}, {y_name}: {sorted(y)}")
1143 def validate_host_name(self, hostname: str, context: str) -> None:
1144 """Checks host name string."""
1145 self.validate_non_empty_string(hostname, f"{context} hostname")
1146 bzfs.validate_host_name(hostname, context)
1148 def validate_non_empty_string(self, value: str, name: str) -> None:
1149 """Checks that ``value`` is a non-empty string."""
1150 self.validate_type(value, str, name)
1151 if not value:
1152 self.die(f"{name} must not be empty!")
1154 def validate_non_negative_int(self, value: int, name: str) -> None:
1155 """Checks ``value`` is an int >= 0."""
1156 self.validate_type(value, int, name)
1157 if value < 0:
1158 self.die(f"{name} must be a non-negative integer: {value}")
1160 def validate_true(self, expr: Any, msg: str) -> None:
1161 """Raises error if ``expr`` evaluates to ``False``."""
1162 if not bool(expr):
1163 self.die(msg)
1165 def validate_type(self, value: Any, expected_type: Any, name: str) -> None:
1166 """Checks ``value`` is instance of ``expected_type`` or union thereof."""
1167 if hasattr(expected_type, "__origin__") and expected_type.__origin__ is Union: # for compat with python < 3.10
1168 union_types = expected_type.__args__
1169 for t in union_types:
1170 if isinstance(value, t):
1171 return
1172 type_msg = " or ".join([t.__name__ for t in union_types])
1173 self.die(f"{name} must be of type {type_msg} but got {type(value).__name__}: {value}")
1174 elif not isinstance(value, expected_type):
1175 self.die(f"{name} must be of type {expected_type.__name__} but got {type(value).__name__}: {value}")
1177 def parse_src_hosts_from_cli_or_stdin(self, raw_src_hosts: str | None) -> list:
1178 """Resolve --src-hosts from CLI or stdin with robust TTY/empty handling."""
1179 if raw_src_hosts is None:
1180 # If stdin is an interactive TTY, don't block waiting for input; fail clearly instead
1181 try:
1182 is_tty: bool = getattr(sys.stdin, "isatty", lambda: False)()
1183 except Exception:
1184 is_tty = False
1185 if is_tty:
1186 self.die("Missing --src-hosts and stdin is a TTY. Provide --src-hosts or pipe the list on stdin.")
1187 stdin_text: str = sys.stdin.read()
1188 if not stdin_text.strip(): # avoid literal_eval("") SyntaxError and provide a clear message
1189 self.die("Missing --src-hosts and stdin is empty. Provide --src-hosts or pipe a list on stdin.")
1190 raw_src_hosts = stdin_text
1191 try:
1192 value = literal_eval(raw_src_hosts)
1193 except Exception as e:
1194 self.die(f"Invalid --src-hosts format: {e} for input: {raw_src_hosts}")
1195 if not isinstance(value, list):
1196 example: str = format_obj(["hostname1", "hostname2"])
1197 self.die(f"Invalid --src-hosts: expected a Python list literal, e.g. {example} but got: {format_obj(value)}")
1198 return value
1200 def die(self, msg: str) -> NoReturn:
1201 """Log ``msg`` and exit the program."""
1202 self.log.error("%s", msg)
1203 utils.die(msg)
1205 def get_localhost_ips(self) -> set[str]:
1206 """Returns all network addresses of the local host, i.e. all configured addresses on all network interfaces, without
1207 depending on name resolution."""
1208 ips: set[str] = set()
1209 if platform.system() == "Linux":
1210 try:
1211 proc = subprocess.run(["hostname", "-I"], stdin=DEVNULL, stdout=PIPE, text=True, check=True) # noqa: S607
1212 except Exception as e:
1213 self.log.warning("Cannot run 'hostname -I' on localhost: %s", e)
1214 else:
1215 ips = {ip for ip in proc.stdout.strip().split() if ip}
1216 self.log.log(LOG_TRACE, "localhost_ips: %s", sorted(ips))
1217 return ips
1220#############################################################################
1221@final
1222class RejectArgumentAction(argparse.Action):
1223 """An argparse Action that immediately fails if it is ever triggered."""
1225 def __call__(
1226 self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Any, option_string: str | None = None
1227 ) -> None:
1228 """Abort argument parsing if a protected option is seen."""
1229 parser.error(f"Security: Overriding protected argument '{option_string}' is not allowed.")
1232#############################################################################
1233def _dedupe(root_dataset_pairs: list[tuple[str, str]]) -> list[tuple[str, str]]:
1234 """Returns a list with duplicate dataset pairs removed while preserving order."""
1235 return list(dict.fromkeys(root_dataset_pairs))
1238_T = TypeVar("_T")
1241def _flatten(root_dataset_pairs: Iterable[Iterable[_T]]) -> list[_T]:
1242 """Flattens an iterable of pairs into a single list."""
1243 return [item for pair in root_dataset_pairs for item in pair]
1246def _sanitize(filename: str) -> str:
1247 """Replaces potentially problematic characters in ``filename`` with '!'."""
1248 for s in (" ", "..", "/", "\\", SEP):
1249 filename = filename.replace(s, "!")
1250 return filename
1253def _log_suffix(localhostname: str, src_hostname: str, dst_hostname: str) -> str:
1254 """Returns a log file suffix in a format that contains the given hostnames."""
1255 return f"{SEP}{_sanitize(localhostname)}{SEP}{_sanitize(src_hostname)}{SEP}{_sanitize(dst_hostname)}"
1258def _pretty_print_formatter(dictionary: dict[str, Any]) -> Any:
1259 """Lazy JSON formatter used to avoid overhead in disabled log levels."""
1261 @final
1262 class PrettyPrintFormatter:
1263 """Wrapper returning formatted JSON on ``str`` conversion."""
1265 def __str__(self) -> str:
1266 import json
1268 return json.dumps(dictionary, indent=4, sort_keys=True)
1270 return PrettyPrintFormatter()
1273def _detect_loopback_address() -> str:
1274 """Detects if a loopback connection over IPv4 or IPv6 is possible."""
1275 try:
1276 addr = "127.0.0.1"
1277 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
1278 s.bind((addr, 0))
1279 return addr
1280 except OSError:
1281 pass
1283 try:
1284 addr = "::1"
1285 with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
1286 s.bind((addr, 0))
1287 return addr
1288 except OSError:
1289 pass
1291 return ""
1294def convert_ipv6(hostname: str) -> str:
1295 """Supports IPv6 without getting confused by host:dataset colon separator and any colons that may be part of a (valid)
1296 ZFS dataset name."""
1297 return hostname.replace(":", "|") # Also see bzfs.convert_ipv6() for the reverse conversion
1300#############################################################################
1301if __name__ == "__main__": 1301 ↛ 1302line 1301 didn't jump to line 1302 because the condition on line 1301 was never true
1302 main()