Coverage for bzfs_main / bzfs_jobrunner.py: 99%
668 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 10:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 10:16 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15# Inline script metadata conforming to https://packaging.python.org/specifications/inline-script-metadata
16# /// script
17# requires-python = ">=3.9"
18# dependencies = []
19# ///
20#
21"""
22* High-level orchestrator that calls `bzfs` as part of complex, periodic workflows to manage backup, replication, and pruning
23 jobs across a fleet of multiple source and destination hosts; driven by a fleet-wide job config file
24 (e.g., `bzfs_job_example.py`).
25* Overview of the bzfs_jobrunner.py codebase:
26* The codebase starts with docs, definition of input data and associated argument parsing of CLI options/parameters.
27* Control flow starts in main(), far below ..., which kicks off a "Job".
28* A Job creates zero or more "subjobs" for each local or remote host, via run_main().
29* It executes the subjobs, serially or in parallel, via run_subjobs(), which in turn delegates parallel job coordination to
30 bzfs.process_datasets_in_parallel_and_fault_tolerant().
31* README_bzfs_jobrunner.md is mostly auto-generated from the ArgumentParser help texts as the source of "truth", via
32 update_readme.sh. Simply run that script whenever you change or add ArgumentParser help text.
33"""
35from __future__ import (
36 annotations,
37)
38import argparse
39import contextlib
40import os
41import platform
42import pwd
43import random
44import socket
45import subprocess
46import sys
47import threading
48import time
49import uuid
50from ast import (
51 literal_eval,
52)
53from collections.abc import (
54 Iterable,
55)
56from logging import (
57 Logger,
58)
59from subprocess import (
60 DEVNULL,
61 PIPE,
62)
63from typing import (
64 Any,
65 Final,
66 NoReturn,
67 TypeVar,
68 Union,
69 final,
70)
72import bzfs_main.argparse_actions
73from bzfs_main import (
74 bzfs,
75)
76from bzfs_main.argparse_cli import (
77 PROG_AUTHOR,
78)
79from bzfs_main.detect import (
80 DUMMY_DATASET,
81)
82from bzfs_main.loggers import (
83 get_simple_logger,
84 reset_logger,
85 set_logging_runtime_defaults,
86)
87from bzfs_main.util import (
88 check_range,
89 utils,
90)
91from bzfs_main.util.parallel_tasktree import (
92 BARRIER_CHAR,
93)
94from bzfs_main.util.parallel_tasktree_policy import (
95 process_datasets_in_parallel_and_fault_tolerant,
96)
97from bzfs_main.util.utils import (
98 DIE_STATUS,
99 LOG_TRACE,
100 UMASK,
101 JobStats,
102 Subprocesses,
103 TaskTiming,
104 dry,
105 format_dict,
106 format_obj,
107 getenv_bool,
108 human_readable_duration,
109 percent,
110 shuffle_dict,
111 terminate_process_subtree,
112 termination_signal_handler,
113 validate_dataset_name,
114)
115from bzfs_main.util.utils import PROG_NAME as BZFS_PROG_NAME
117# constants:
118PROG_NAME: Final[str] = "bzfs_jobrunner"
119SRC_MAGIC_SUBSTITUTION_TOKEN: Final[str] = "^SRC_HOST" # noqa: S105
120DST_MAGIC_SUBSTITUTION_TOKEN: Final[str] = "^DST_HOST" # noqa: S105
121SEP: Final[str] = ","
122POSIX_END_OF_OPTIONS_MARKER: Final[str] = "--" # args following -- are treated as operands, even if they begin with a hyphen
125def argument_parser() -> argparse.ArgumentParser:
126 """Returns the CLI parser used by bzfs_jobrunner."""
127 # fmt: off
128 parser = argparse.ArgumentParser(
129 prog=PROG_NAME,
130 allow_abbrev=False,
131 formatter_class=argparse.RawTextHelpFormatter,
132 description=f"""
133This program is a convenience wrapper around [bzfs](README.md) that simplifies periodic ZFS snapshot creation, replication,
134pruning, and monitoring, across a fleet of N source hosts and M destination hosts, using a single fleet-wide shared
135[jobconfig](bzfs_tests/bzfs_job_example.py) script.
136For example, this simplifies the deployment of an efficient geo-replicated backup service where each of the M destination
137hosts is located in a separate geographic region and receives replicas from (the same set of) N source hosts. It also
138simplifies low latency replication from a primary to a secondary or to M read replicas, or backup to removable drives, etc.
140This program can be used to efficiently replicate ...
142a) within a single machine (local mode), or
144b) from a single source host to one or more destination hosts (pull or push or pull-push mode), or
146c) from multiple source hosts to a single destination host (pull or push or pull-push mode), or
148d) from N source hosts to M destination hosts (pull or push or pull-push mode, N and M can be large, M=2 or M=3 are typical
149geo-replication factors)
151You can run this program on a single third-party host and have that talk to all source hosts and destination hosts, which is
152convenient for basic use cases and for testing.
153However, typically, a cron job on each source host runs `{PROG_NAME}` periodically to create new snapshots (via
154--create-src-snapshots) and prune outdated snapshots and bookmarks on the source (via --prune-src-snapshots and
155--prune-src-bookmarks), whereas another cron job on each destination host runs `{PROG_NAME}` periodically to prune
156outdated destination snapshots (via --prune-dst-snapshots), and to replicate the recently created snapshots from the source
157to the destination (via --replicate).
158Yet another cron job on each source and each destination runs `{PROG_NAME}` periodically to alert the user if the latest or
159oldest snapshot is somehow too old (via --monitor-src-snapshots and --monitor-dst-snapshots).
160The frequency of these periodic activities can vary by activity, and is typically every second, minute, hour, day, week,
161month and/or year (or multiples thereof).
163Edit the jobconfig script in a central place (e.g. versioned in a git repo), then copy the (very same) shared file onto all
164source hosts and all destination hosts, and add crontab entries (or systemd timers or Monit entries or similar), along these
165lines:
167* crontab on source hosts:
169`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --create-src-snapshots --prune-src-snapshots --prune-src-bookmarks`
171`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --monitor-src-snapshots`
174* crontab on destination hosts:
176`* * * * * testuser /etc/bzfs/bzfs_job_example.py --dst-host="$(hostname)" --replicate --prune-dst-snapshots`
178`* * * * * testuser /etc/bzfs/bzfs_job_example.py --dst-host="$(hostname)" --monitor-dst-snapshots`
181### Applying Actions to a Subset of Hosts
183`--src-host` and `--dst-host` are ways to tell `{PROG_NAME}` to only perform actions for a specified subset of source hosts
184and destination hosts, e.g. only replicate from a certain subset of source hosts to a certain subset of destination hosts.
185Each `{PROG_NAME}` invocation will do all actions that apply to the final effective value of `--src-hosts` and `--dst-hosts`,
186after the subsetting step has been applied using the `--src-host` and `--dst-host` parameters.
189### High Frequency Replication (Experimental Feature)
191Taking snapshots, and/or replicating, from every N milliseconds to every 10 seconds or so is considered high frequency. For
192such use cases, consider that `zfs list -t snapshot` performance degrades as more and more snapshots currently exist within
193the selected datasets, so try to keep the number of currently existing snapshots small, and prune them at a frequency that
194is proportional to the frequency with which snapshots are created. Consider using `--skip-parent` and `--exclude-dataset*`
195filters to limit the selected datasets only to those that require this level of frequency.
197In addition, use the `--daemon-*` options to reduce startup overhead, in combination with splitting the crontab entry (or
198better: high frequency systemd timer) into multiple processes, from a single source host to a single destination host,
199along these lines:
201* crontab on source hosts:
203`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --create-src-snapshots`
205`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --prune-src-snapshots`
207`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --prune-src-bookmarks`
209`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --monitor-src-snapshots`
212* crontab on destination hosts:
214`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="bar" --dst-host="$(hostname)" --replicate`
216`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="bar" --dst-host="$(hostname)" --prune-dst-snapshots`
218`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="bar" --dst-host="$(hostname)" --monitor-dst-snapshots`
220The daemon processes work like non-daemon processes except that they loop, handle time events and sleep between events, and
221finally exit after, say, 86400 seconds (whatever you specify via `--daemon-lifetime`). The daemons will subsequently be
222auto-restarted by 'cron', or earlier if they fail. While the daemons are running, 'cron' will attempt to start new
223(unnecessary) daemons but this is benign as these new processes immediately exit with a message like this:
224"Exiting as same previous periodic job is still running without completion yet"
225""")
227 # commands:
228 parser.add_argument(
229 "--create-src-snapshots", action="store_true",
230 help="Take snapshots on the selected source hosts as necessary. Typically, this command should be called by a "
231 "program (or cron job) running on each src host.\n\n")
232 parser.add_argument(
233 "--replicate", action="store_true",
234 help="Replicate snapshots from the selected source hosts to the selected destinations hosts as necessary. For pull "
235 "mode (recommended), this command should be called by a program (or cron job) running on each dst "
236 "host; for push mode, on the src host; for pull-push mode on a third-party host.\n\n")
237 parser.add_argument(
238 "--prune-src-snapshots", action="store_true",
239 help="Prune snapshots on the selected source hosts as necessary. Typically, this command should be called by a "
240 "program (or cron job) running on each src host.\n\n")
241 parser.add_argument(
242 "--prune-src-bookmarks", action="store_true",
243 help="Prune bookmarks on the selected source hosts as necessary. Typically, this command should be called by a "
244 "program (or cron job) running on each src host.\n\n")
245 parser.add_argument(
246 "--prune-dst-snapshots", action="store_true",
247 help="Prune snapshots on the selected destination hosts as necessary. Typically, this command should be called by a "
248 "program (or cron job) running on each dst host.\n\n")
249 parser.add_argument(
250 "--monitor-src-snapshots", action="store_true",
251 help="Alert the user if snapshots on the selected source hosts are too old, using --monitor-snapshot-plan (see "
252 "below). Typically, this command should be called by a program (or cron job) running on each src host.\n\n")
253 parser.add_argument(
254 "--monitor-dst-snapshots", action="store_true",
255 help="Alert the user if snapshots on the selected destination hosts are too old, using --monitor-snapshot-plan (see "
256 "below). Typically, this command should be called by a program (or cron job) running on each dst host.\n\n")
258 # options:
259 parser.add_argument(
260 "--localhost", default=None, action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING",
261 help="Hostname of localhost. Default is the hostname without the domain name, querying the Operating System.\n\n")
262 parser.add_argument(
263 "--src-hosts", default=None, metavar="LIST_STRING",
264 help="Hostnames of the sources to operate on.\n\n")
265 parser.add_argument(
266 "--src-host", default=None, action="append", metavar="STRING",
267 help="For subsetting --src-hosts; Can be specified multiple times; Indicates to only use the --src-hosts that are "
268 "contained in the specified --src-host values (optional).\n\n"
269 "Example: `--src-host=src1 --src-host=src2 --src-hosts=\"['src1', 'src2', 'src3', 'src4']\"` indicates to "
270 "effectively only use `--src-hosts=\"['src1', 'src2']\"`.\n\n")
271 dst_hosts_example = {"nas": ["onsite"], "bak-us-west": ["us-west"],
272 "bak-eu-west": ["eu-west"], "archive": ["offsite"]}
273 parser.add_argument(
274 "--dst-hosts", default="{}", metavar="DICT_STRING",
275 help="Dictionary that maps each destination hostname to a list of zero or more logical replication target names "
276 "(the infix portion of snapshot name). As hostname use the real output of the `hostname` CLI. "
277 "The target is an arbitrary user-defined name that serves as an abstraction of the destination hostnames for "
278 "a group of snapshots, like target 'onsite', 'offsite', 'hotspare', a geographically independent datacenter like "
279 "'us-west', or similar. Rather than the snapshot name embedding (i.e. hardcoding) a list of destination "
280 "hostnames where it should be sent to, the snapshot name embeds the user-defined target name, which is later "
281 "mapped by this jobconfig to a list of destination hostnames.\n\n"
282 f"Example: `{format_dict(dst_hosts_example)}`.\n\n"
283 "With this, given a snapshot name, we can find the destination hostnames to which the snapshot shall be "
284 "replicated. Also, given a snapshot name and its own name, a destination host can determine if it shall "
285 "replicate the given snapshot from the source host, or if the snapshot is intended for another destination "
286 "host, in which case it skips the snapshot. A destination host will receive replicas of snapshots for all "
287 "targets that map to that destination host.\n\n"
288 "Removing a mapping can be used to temporarily suspend replication to a given destination host.\n\n")
289 parser.add_argument(
290 "--dst-host", default=None, action="append", metavar="STRING",
291 help="For subsetting --dst-hosts; Can be specified multiple times; Indicates to only use the --dst-hosts keys that "
292 "are contained in the specified --dst-host values (optional).\n\n"
293 "Example: "
294 "`--dst-host=dst1 --dst-host=dst2 --dst-hosts=\"{'dst1': ..., 'dst2': ..., 'dst3': ..., 'dst4': ...}\"` "
295 "indicates to effectively only use `--dst-hosts=\"{'dst1': ..., 'dst2': ...}\"`.\n\n")
296 parser.add_argument(
297 "--retain-dst-targets", default="{}", metavar="DICT_STRING",
298 help="Dictionary that maps each destination hostname to a list of zero or more logical replication target names "
299 "(the infix portion of snapshot name).\n\n"
300 f"Example: `{format_dict(dst_hosts_example)}`. Has same format as --dst-hosts.\n\n"
301 "As part of --prune-dst-snapshots, a destination host will delete any snapshot it has stored whose target has "
302 "no mapping to that destination host in this dictionary. Do not remove a mapping here unless you are sure it's "
303 "ok to delete all those snapshots on that destination host! If in doubt, use --dryrun mode first.\n\n")
304 dst_root_datasets_example = {
305 "nas": "tank2/bak",
306 "bak-us-west": "backups/bak001",
307 "bak-eu-west": "backups/bak999",
308 "archive": f"archives/zoo/{SRC_MAGIC_SUBSTITUTION_TOKEN}",
309 "hotspare": "",
310 }
311 parser.add_argument(
312 "--dst-root-datasets", default="{}", metavar="DICT_STRING",
313 help="Dictionary that maps each destination hostname to a root dataset located on that destination host. The root "
314 "dataset name is an (optional) prefix that will be prepended to each dataset that is replicated to that "
315 "destination host. For backup use cases, this is the backup ZFS pool or a ZFS dataset path within that pool, "
316 "whereas for cloning, master slave replication, or replication from a primary to a secondary, this can also be "
317 "the empty string.\n\n"
318 f"`{SRC_MAGIC_SUBSTITUTION_TOKEN}` and `{DST_MAGIC_SUBSTITUTION_TOKEN}` are optional magic substitution tokens "
319 "that will be auto-replaced at runtime with the actual hostname. This can be used to force the use of a "
320 "separate destination root dataset per source host or per destination host.\n\n"
321 f"Example: `{format_dict(dst_root_datasets_example)}`\n\n")
322 src_snapshot_plan_example = {
323 "prod": {
324 "onsite": {"secondly": 40, "minutely": 40, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5},
325 "us-west": {"secondly": 0, "minutely": 0, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5},
326 "eu-west": {"secondly": 0, "minutely": 0, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5},
327 },
328 "test": {
329 "offsite": {"12hourly": 42, "weekly": 12},
330 },
331 }
332 parser.add_argument(
333 "--src-snapshot-plan", default="{}", metavar="DICT_STRING",
334 help="Retention periods for snapshots to be used if pruning src, and when creating new snapshots on src. "
335 "Snapshots that do not match a retention period will be deleted. A zero or missing retention period indicates "
336 "that no snapshots shall be retained (or even be created) for the given period.\n\n"
337 f"Example: `{format_dict(src_snapshot_plan_example)}`. This example will, for the organization 'prod' and "
338 "the intended logical target 'onsite', create and then retain secondly snapshots that were created less "
339 "than 40 seconds ago, yet retain the latest 40 secondly snapshots regardless of creation time. Analog for "
340 "the latest 40 minutely snapshots, 36 hourly snapshots, etc. "
341 "It will also create and retain snapshots for the targets 'us-west' and 'eu-west' within the 'prod' "
342 "organization. "
343 "In addition, it will create and retain snapshots every 12 hours and every week for the 'test' organization, "
344 "and name them as being intended for the 'offsite' replication target. "
345 "The example creates snapshots with names like "
346 "`prod_onsite_<timestamp>_secondly`, `prod_onsite_<timestamp>_minutely`, "
347 "`prod_us-west_<timestamp>_hourly`, `prod_us-west_<timestamp>_daily`, "
348 "`prod_eu-west_<timestamp>_hourly`, `prod_eu-west_<timestamp>_daily`, "
349 "`test_offsite_<timestamp>_12hourly`, `test_offsite_<timestamp>_weekly`, and so on.\n\n")
350 parser.add_argument(
351 "--src-bookmark-plan", default="{}", metavar="DICT_STRING",
352 help="Retention periods for bookmarks to be used if pruning src. Has same format as --src-snapshot-plan.\n\n")
353 parser.add_argument(
354 "--dst-snapshot-plan", default="{}", metavar="DICT_STRING",
355 help="Retention periods for snapshots to be used if pruning dst. Has same format as --src-snapshot-plan.\n\n")
356 monitor_snapshot_plan_example = {
357 "prod": {
358 "onsite": {
359 "100millisecondly": {"warning": "650 milliseconds", "critical": "2 seconds"},
360 "secondly": {"warning": "2 seconds", "critical": "14 seconds"},
361 "minutely": {"warning": "30 seconds", "critical": "300 seconds"},
362 "hourly": {"warning": "30 minutes", "critical": "300 minutes"},
363 "daily": {"warning": "4 hours", "critical": "8 hours"},
364 "weekly": {"warning": "2 days", "critical": "8 days"},
365 "monthly": {"warning": "2 days", "critical": "8 days"},
366 "yearly": {"warning": "5 days", "critical": "14 days"},
367 "10minutely": {"warning": "0 minutes", "critical": "0 minutes"},
368 },
369 "": {
370 "daily": {"warning": "4 hours", "critical": "8 hours"},
371 },
372 },
373 }
374 parser.add_argument(
375 "--monitor-snapshot-plan", default="{}", metavar="DICT_STRING",
376 help="Alert the user if the ZFS 'creation' time property of the latest or oldest snapshot for any specified "
377 "snapshot pattern within the selected datasets is too old wrt. the specified age limit. The purpose is to "
378 "check if snapshots are successfully taken on schedule, successfully replicated on schedule, and successfully "
379 "pruned on schedule. "
380 "Process exit code is 0, 1, 2 on OK, WARNING, CRITICAL, respectively.\n\n"
381 f"Example DICT_STRING: `{format_dict(monitor_snapshot_plan_example)}`. "
382 "This example alerts the user if the *latest* src or dst snapshot named `prod_onsite_<timestamp>_hourly` is "
383 "more than 30 minutes late (i.e. more than 30+60=90 minutes old) [warning] or more than 300 minutes late (i.e. "
384 "more than 300+60=360 minutes old) [critical]. In addition, the example alerts the user if the *oldest* src or "
385 "dst snapshot named `prod_onsite_<timestamp>_hourly` is more than 30 + 60x36 minutes old [warning] or more "
386 "than 300 + 60x36 minutes old [critical], where 36 is the number of period cycles specified in "
387 "`src_snapshot_plan` or `dst_snapshot_plan`, respectively. "
388 "Analog for the latest snapshot named `prod_<timestamp>_daily`, and so on.\n\n"
389 "Note: A duration that is missing or zero (e.g. '0 minutes') indicates that no snapshots shall be checked for "
390 "the given snapshot name pattern.\n\n")
391 locations = ["src", "dst"]
392 for loc in locations:
393 parser.add_argument(
394 f"--ssh-{loc}-user", default="", metavar="STRING",
395 help=f"Remote SSH username on {loc} hosts to connect to (optional). Examples: 'root', 'alice'.\n\n")
396 for loc in locations:
397 parser.add_argument(
398 f"--ssh-{loc}-port", type=int, min=1, max=65535, action=check_range.CheckRange, metavar="INT",
399 help=f"Remote SSH port on {loc} host to connect to (optional).\n\n")
400 for loc in locations:
401 parser.add_argument(
402 f"--ssh-{loc}-config-file", type=str, action=bzfs_main.argparse_actions.SSHConfigFileNameAction, metavar="FILE",
403 help=f"Path to SSH ssh_config(5) file to connect to {loc} (optional); will be passed into ssh -F CLI. "
404 "The basename must contain the substring 'bzfs_ssh_config'.\n\n")
405 parser.add_argument(
406 "--job-id", required=True, action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING",
407 help="The identifier that remains constant across all runs of this particular job; will be included in the log file "
408 "name infix. Example: mytestjob\n\n")
409 parser.add_argument(
410 "--job-run", default="", action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING",
411 help="The identifier of this particular run of the overall job; will be included in the log file name suffix. "
412 "Default is a hex UUID. Example: 0badc0f003a011f0a94aef02ac16083c\n\n")
413 workers_default = 100 # percent
414 parser.add_argument(
415 "--workers", min=1, default=(workers_default, True), action=bzfs_main.argparse_actions.CheckPercentRange,
416 metavar="INT[%]",
417 help="The maximum number of jobs to run in parallel at any time; can be given as a positive integer, "
418 f"optionally followed by the %% percent character (min: %(min)s, default: {workers_default}%%). Percentages "
419 "are relative to the number of CPU cores on the machine. Example: 200%% uses twice as many parallel jobs as "
420 "there are cores on the machine; 75%% uses num_procs = num_cores * 0.75. Examples: 1, 4, 75%%, 150%%\n\n")
421 parser.add_argument(
422 "--work-period-seconds", type=float, min=0, default=0, action=check_range.CheckRange, metavar="FLOAT",
423 help="Reduces bandwidth spikes by spreading out the start of worker jobs over this much time; "
424 "0 disables this feature (default: %(default)s). Examples: 0, 60, 86400\n\n")
425 parser.add_argument(
426 "--jitter", action="store_true",
427 help="Randomize job start time and host order to avoid potential thundering herd problems in large distributed "
428 "systems (optional). Randomizing job start time is only relevant if --work-period-seconds > 0.\n\n")
429 parser.add_argument(
430 "--worker-timeout-seconds", type=float, min=0.001, default=None, action=check_range.CheckRange, metavar="FLOAT",
431 help="If this much time has passed after a worker process has started executing, kill the straggling worker "
432 "(optional). Other workers remain unaffected. Examples: 60, 3600\n\n")
433 parser.add_argument(
434 "--spawn-process-per-job", action="store_true",
435 help="Spawn a Python process per subjob instead of a Python thread per subjob (optional). The former is only "
436 "recommended for a job operating in parallel on a large number of hosts as it helps avoid exceeding "
437 "per-process limits such as the default max number of open file descriptors, at the expense of increased "
438 "startup latency.\n\n")
439 parser.add_argument(
440 "--jobrunner-dryrun", action="store_true",
441 help="Do a dry run (aka 'no-op') to print what operations would happen if the command were to be executed "
442 "for real (optional). This option treats both the ZFS source and destination as read-only. Can also be used to "
443 "check if the configuration options are valid.\n\n")
444 parser.add_argument(
445 "--jobrunner-log-level", choices=["CRITICAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"], default="INFO",
446 help="Only emit jobrunner messages with equal or higher priority than this log level. Default is '%(default)s'.\n\n")
447 parser.add_argument(
448 "--daemon-replication-frequency", default="minutely", metavar="STRING",
449 help="Specifies how often the bzfs daemon shall replicate from src to dst if --daemon-lifetime is nonzero.\n\n")
450 parser.add_argument(
451 "--daemon-prune-src-frequency", default="minutely", metavar="STRING",
452 help="Specifies how often the bzfs daemon shall prune src if --daemon-lifetime is nonzero.\n\n")
453 parser.add_argument(
454 "--daemon-prune-dst-frequency", default="minutely", metavar="STRING",
455 help="Specifies how often the bzfs daemon shall prune dst if --daemon-lifetime is nonzero.\n\n")
456 parser.add_argument(
457 "--daemon-monitor-snapshots-frequency", default="minutely", metavar="STRING",
458 help="Specifies how often the bzfs daemon shall monitor snapshot age if --daemon-lifetime is nonzero.\n\n")
459 bad_opts = ["--daemon-frequency", "--include-snapshot-plan", "--create-src-snapshots-plan", "--skip-replication",
460 "--include-snapshot-regex", "--exclude-snapshot-regex", "--include-snapshot-times-and-ranks",
461 "--log-file-prefix", "--log-file-infix", "--log-file-suffix",
462 "--delete-dst-datasets", "--delete-dst-snapshots", "--delete-dst-snapshots-except",
463 "--delete-dst-snapshots-except-plan", "--delete-empty-dst-datasets",
464 "--monitor-snapshots", "--timeout"]
465 for loc in locations:
466 bad_opts += [f"--ssh-{loc}-host"] # reject this arg as jobrunner will auto-generate it
467 for bad_opt in bad_opts:
468 parser.add_argument(bad_opt, action=RejectArgumentAction, nargs=0, help=argparse.SUPPRESS)
469 parser.add_argument(
470 "--version", action="version", version=f"{PROG_NAME}-{bzfs_main.argparse_cli.__version__}, by {PROG_AUTHOR}",
471 help="Display version information and exit.\n\n")
472 parser.add_argument(
473 "--help, -h", action="help", # trick to ensure both --help and -h are shown in the help msg
474 help="Show this help message and exit.\n\n")
475 parser.add_argument(
476 "--root-dataset-pairs", required=True, nargs="+", action=bzfs_main.argparse_actions.DatasetPairsAction,
477 metavar="SRC_DATASET DST_DATASET",
478 help="Source and destination dataset pairs (excluding usernames and excluding hostnames, which will all be "
479 "auto-appended later).\n\n")
480 return parser
481 # fmt: on
484#############################################################################
485def main() -> None:
486 """API for command line clients."""
487 prev_umask: int = os.umask(UMASK)
488 try:
489 set_logging_runtime_defaults()
490 # On CTRL-C and SIGTERM, send signal to all descendant processes to terminate them
491 termination_event: threading.Event = threading.Event()
492 with termination_signal_handler(termination_events=[termination_event]):
493 Job(log=None, termination_event=termination_event).run_main(sys_argv=sys.argv)
494 finally:
495 os.umask(prev_umask) # restore prior global state
498#############################################################################
499@final
500class Job:
501 """Coordinates subjobs per the CLI flags; Each subjob handles one host pair and may run in its own process or thread."""
503 def __init__(self, log: Logger | None, termination_event: threading.Event) -> None:
504 # immutable variables:
505 self.log_was_None: Final[bool] = log is None
506 self.log: Final[Logger] = get_simple_logger(PROG_NAME) if log is None else log
507 self.termination_event: Final[threading.Event] = termination_event
508 self.timing: Final[TaskTiming] = TaskTiming.make_from(self.termination_event)
509 self.subprocesses: Final[Subprocesses] = Subprocesses(termination_event.is_set)
510 self.jobrunner_dryrun: bool = False
511 self.spawn_process_per_job: bool = False
512 self.loopback_address: Final[str] = _detect_loopback_address()
514 # mutable variables:
515 self.first_exception: int | None = None
516 self.worst_exception: int | None = None
517 self.stats: JobStats = JobStats(jobs_all=0)
518 self.cache_existing_dst_pools: set[str] = set()
519 self.cache_known_dst_pools: set[str] = set()
521 self.is_test_mode: bool = False # for testing only
523 def run_main(self, sys_argv: list[str]) -> None:
524 """API for Python clients; visible for testing; may become a public API eventually."""
525 try:
526 self._run_main(sys_argv)
527 finally:
528 if self.log_was_None: # reset Logger unless it's a Logger outside of our control 528 ↛ exitline 528 didn't return from function 'run_main' because the condition on line 528 was always true
529 reset_logger(self.log)
531 def _run_main(self, sys_argv: list[str]) -> None:
532 self.first_exception = None
533 self.worst_exception = None
534 log: Logger = self.log
535 log.info("CLI arguments: %s", " ".join(sys_argv))
536 nsp = argparse.Namespace(no_argument_file=True) # disable --root-dataset-pairs='+file' option in DatasetPairsAction
537 args, unknown_args = argument_parser().parse_known_args(sys_argv[1:], nsp) # forward all unknown args to `bzfs`
538 log.setLevel(args.jobrunner_log_level)
539 self.jobrunner_dryrun = args.jobrunner_dryrun
540 assert len(args.root_dataset_pairs) > 0
541 src_snapshot_plan: dict = self.validate_snapshot_plan(literal_eval(args.src_snapshot_plan), "--src-snapshot-plan")
542 src_bookmark_plan: dict = self.validate_snapshot_plan(literal_eval(args.src_bookmark_plan), "--src-bookmark-plan")
543 dst_snapshot_plan: dict = self.validate_snapshot_plan(literal_eval(args.dst_snapshot_plan), "--dst-snapshot-plan")
544 monitor_snapshot_plan: dict = self.validate_monitor_snapshot_plan(literal_eval(args.monitor_snapshot_plan))
545 localhostname: str = args.localhost if args.localhost else socket.gethostname()
546 self.validate_host_name(localhostname, "--localhost")
547 log.debug("localhostname: %s", localhostname)
548 src_hosts: list[str] = self.validate_src_hosts(self.parse_src_hosts_from_cli_or_stdin(args.src_hosts))
549 basis_src_hosts: list[str] = src_hosts
550 nb_src_hosts: int = len(basis_src_hosts)
551 log.debug("src_hosts before subsetting: %s", src_hosts)
552 if args.src_host is not None: # retain only the src hosts that are also contained in args.src_host
553 assert isinstance(args.src_host, list)
554 retain_src_hosts: set[str] = set(args.src_host)
555 self.validate_is_subset(retain_src_hosts, src_hosts, "--src-host", "--src-hosts")
556 src_hosts = [host for host in src_hosts if host in retain_src_hosts]
557 dst_hosts: dict[str, list[str]] = self.validate_dst_hosts(literal_eval(args.dst_hosts))
558 nb_dst_hosts: int = len(dst_hosts)
559 if args.dst_host is not None: # retain only the dst hosts that are also contained in args.dst_host
560 assert isinstance(args.dst_host, list)
561 retain_dst_hosts: set[str] = set(args.dst_host)
562 self.validate_is_subset(retain_dst_hosts, dst_hosts.keys(), "--dst-host", "--dst-hosts.keys")
563 dst_hosts = {dst_host: lst for dst_host, lst in dst_hosts.items() if dst_host in retain_dst_hosts}
564 retain_dst_targets: dict[str, list[str]] = self.validate_dst_hosts(literal_eval(args.retain_dst_targets))
565 self.validate_is_subset(dst_hosts.keys(), retain_dst_targets.keys(), "--dst-hosts.keys", "--retain-dst-targets.keys")
566 dst_root_datasets: dict[str, str] = self.validate_dst_root_datasets(literal_eval(args.dst_root_datasets))
567 self.validate_is_subset(
568 dst_root_datasets.keys(), retain_dst_targets.keys(), "--dst-root-dataset.keys", "--retain-dst-targets.keys"
569 )
570 self.validate_is_subset(dst_hosts.keys(), dst_root_datasets.keys(), "--dst-hosts.keys", "--dst-root-dataset.keys")
571 bad_root_datasets: dict[str, str] = {
572 dst_host: root_dataset
573 for dst_host in sorted(dst_hosts.keys())
574 if SRC_MAGIC_SUBSTITUTION_TOKEN not in (root_dataset := dst_root_datasets[dst_host])
575 }
576 if len(src_hosts) > 1 and len(bad_root_datasets) > 0:
577 self.die(
578 "Cowardly refusing to proceed as multiple source hosts must not be configured to write to the same "
579 "destination dataset. "
580 f"Problematic subset of --dst-root-datasets: {bad_root_datasets} for src_hosts: {sorted(src_hosts)}"
581 )
582 bad_root_datasets = {
583 dst_host: root_dataset
584 for dst_host, root_dataset in sorted(dst_root_datasets.items())
585 if root_dataset and SRC_MAGIC_SUBSTITUTION_TOKEN not in root_dataset
586 }
587 if len(basis_src_hosts) > 1 and len(bad_root_datasets) > 0:
588 self.die(
589 "Cowardly refusing to proceed as multiple source hosts are defined in the configuration, but "
590 f"not all non-empty root datasets in --dst-root-datasets contain the '{SRC_MAGIC_SUBSTITUTION_TOKEN}' "
591 "substitution token to prevent collisions on writing destination datasets. "
592 f"Problematic subset of --dst-root-datasets: {bad_root_datasets} for src_hosts: {sorted(basis_src_hosts)}"
593 )
594 if args.jitter: # randomize host order to avoid potential thundering herd problems in large distributed systems
595 random.SystemRandom().shuffle(src_hosts)
596 dst_hosts = shuffle_dict(dst_hosts)
597 ssh_src_user: str = args.ssh_src_user
598 ssh_dst_user: str = args.ssh_dst_user
599 ssh_src_port: int | None = args.ssh_src_port
600 ssh_dst_port: int | None = args.ssh_dst_port
601 ssh_src_config_file: str | None = args.ssh_src_config_file
602 ssh_dst_config_file: str | None = args.ssh_dst_config_file
603 job_id: str = _sanitize(args.job_id)
604 job_run: str = _sanitize(args.job_run) if args.job_run else uuid.uuid1().hex
605 workers, workers_is_percent = args.workers
606 max_workers: int = max(1, round((os.cpu_count() or 1) * workers / 100.0) if workers_is_percent else round(workers))
607 worker_timeout_seconds: int = args.worker_timeout_seconds
608 self.spawn_process_per_job = args.spawn_process_per_job
609 username: str = pwd.getpwuid(os.getuid()).pw_name
610 assert username
611 loopback_ids: set[str] = {"localhost", "127.0.0.1", "::1", socket.gethostname()} # ::1 is IPv6 loopback address
612 loopback_ids.update(self.get_localhost_ips()) # union
613 loopback_ids.add(localhostname)
614 loopback_ids = set() if getenv_bool("disable_loopback", False) else loopback_ids
615 log.log(LOG_TRACE, "loopback_ids: %s", sorted(loopback_ids))
617 def zero_pad(number: int, width: int = 6) -> str:
618 """Pads number with leading '0' chars to the given width."""
619 return f"{number:0{width}d}"
621 def jpad(jj: int, tag: str) -> str:
622 """Returns ``tag`` prefixed with slash and zero padded index."""
623 return "/" + zero_pad(jj) + tag
625 def runpad() -> str:
626 """Returns standardized subjob count suffix."""
627 return job_run + SEP + zero_pad(len(subjobs))
629 def update_subjob_name(tag: str) -> str:
630 """Derives next subjob name based on ``tag`` and index ``j``."""
631 if j <= 0:
632 return subjob_name
633 elif j == 1:
634 return subjob_name + jpad(j - 1, tag)
635 else:
636 return subjob_name + "/" + BARRIER_CHAR
638 def resolve_dataset(hostname: str, dataset: str, is_src: bool = True) -> str:
639 """Returns host:dataset string resolving IPv6 and localhost cases."""
640 assert hostname
641 assert dataset
642 ssh_user = ssh_src_user if is_src else ssh_dst_user
643 ssh_user = ssh_user if ssh_user else username
644 lb: str = self.loopback_address
645 loopbck_ids: set[str] = loopback_ids
646 hostname = hostname if hostname not in loopbck_ids else (lb if lb else hostname) if username != ssh_user else "-"
647 hostname = convert_ipv6(hostname)
648 return f"{hostname}:{dataset}"
650 def resolve_dst_dataset(dst_hostname: str, dst_dataset: str) -> str:
651 """Expands ``dst_dataset`` relative to ``dst_hostname`` roots."""
652 assert dst_hostname
653 assert dst_dataset
654 root_dataset: str | None = dst_root_datasets.get(dst_hostname)
655 assert root_dataset is not None, dst_hostname # f"Hostname '{dst_hostname}' missing in --dst-root-datasets"
656 root_dataset = root_dataset.replace(SRC_MAGIC_SUBSTITUTION_TOKEN, src_host)
657 root_dataset = root_dataset.replace(DST_MAGIC_SUBSTITUTION_TOKEN, dst_hostname)
658 resolved_dst_dataset: str = f"{root_dataset}/{dst_dataset}" if root_dataset else dst_dataset
659 validate_dataset_name(resolved_dst_dataset, dst_dataset)
660 return resolve_dataset(dst_hostname, resolved_dst_dataset, is_src=False)
662 for src_host in src_hosts:
663 assert src_host
664 for dst_hostname in dst_hosts:
665 assert dst_hostname
666 dummy: Final[str] = DUMMY_DATASET
667 lhn: Final[str] = localhostname
668 bzfs_prog_header: Final[list[str]] = [BZFS_PROG_NAME, "--no-argument-file"] + unknown_args
669 subjobs: dict[str, list[str]] = {}
670 for i, src_host in enumerate(src_hosts):
671 subjob_name: str = zero_pad(i) + "src-host"
672 src_log_suffix: str = _log_suffix(localhostname, src_host, "")
673 j: int = 0
674 opts: list[str]
676 if args.create_src_snapshots:
677 opts = ["--create-src-snapshots", f"--create-src-snapshots-plan={src_snapshot_plan}", "--skip-replication"]
678 self.add_log_file_opts(opts, "create-src-snapshots", job_id, runpad(), src_log_suffix)
679 self.add_ssh_opts(
680 opts, ssh_src_user=ssh_src_user, ssh_src_port=ssh_src_port, ssh_src_config_file=ssh_src_config_file
681 )
682 opts += [POSIX_END_OF_OPTIONS_MARKER]
683 opts += _flatten(_dedupe([(resolve_dataset(src_host, src), dummy) for src, dst in args.root_dataset_pairs]))
684 subjob_name += "/create-src-snapshots"
685 subjobs[subjob_name] = bzfs_prog_header + opts
687 if args.replicate:
688 j = 0
689 marker: str = "replicate"
690 for dst_hostname, targets in dst_hosts.items():
691 opts = self.replication_opts(
692 dst_snapshot_plan, set(targets), lhn, src_host, dst_hostname, marker, job_id, runpad()
693 )
694 if len(opts) > 0:
695 opts += [f"--daemon-frequency={args.daemon_replication_frequency}"]
696 self.add_ssh_opts(
697 opts,
698 ssh_src_user=ssh_src_user,
699 ssh_dst_user=ssh_dst_user,
700 ssh_src_port=ssh_src_port,
701 ssh_dst_port=ssh_dst_port,
702 ssh_src_config_file=ssh_src_config_file,
703 ssh_dst_config_file=ssh_dst_config_file,
704 )
705 opts += [POSIX_END_OF_OPTIONS_MARKER]
706 dataset_pairs: list[tuple[str, str]] = [
707 (resolve_dataset(src_host, src), resolve_dst_dataset(dst_hostname, dst))
708 for src, dst in args.root_dataset_pairs
709 ]
710 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs, worker_timeout_seconds)
711 if len(dataset_pairs) > 0:
712 subjobs[subjob_name + jpad(j, marker)] = bzfs_prog_header + opts + _flatten(dataset_pairs)
713 j += 1
714 subjob_name = update_subjob_name(marker)
716 def prune_src(
717 opts: list[str], retention_plan: dict, tag: str, src_host: str = src_host, logsuffix: str = src_log_suffix
718 ) -> None:
719 """Creates prune subjob options for ``tag`` using ``retention_plan``."""
720 opts += ["--skip-replication", f"--delete-dst-snapshots-except-plan={retention_plan}"]
721 opts += [f"--daemon-frequency={args.daemon_prune_src_frequency}"]
722 self.add_log_file_opts(opts, tag, job_id, runpad(), logsuffix)
723 self.add_ssh_opts( # i.e. dst=src, src=dummy
724 opts, ssh_dst_user=ssh_src_user, ssh_dst_port=ssh_src_port, ssh_dst_config_file=ssh_src_config_file
725 )
726 opts += [POSIX_END_OF_OPTIONS_MARKER]
727 opts += _flatten(_dedupe([(dummy, resolve_dataset(src_host, src)) for src, dst in args.root_dataset_pairs]))
728 nonlocal subjob_name
729 subjob_name += f"/{tag}"
730 subjobs[subjob_name] = bzfs_prog_header + opts
732 if args.prune_src_snapshots:
733 prune_src(["--delete-dst-snapshots"], src_snapshot_plan, tag="prune-src-snapshots")
735 if args.prune_src_bookmarks:
736 prune_src(["--delete-dst-snapshots=bookmarks"], src_bookmark_plan, tag="prune-src-bookmarks")
738 if args.prune_dst_snapshots:
739 self.validate_true(
740 retain_dst_targets, "--retain-dst-targets must not be empty. Cowardly refusing to delete all snapshots!"
741 )
742 j = 0
743 marker = "prune-dst-snapshots"
744 for dst_hostname, _ in dst_hosts.items():
745 curr_retain_targets: set[str] = set(retain_dst_targets[dst_hostname])
746 curr_dst_snapshot_plan = { # only retain targets that belong to the host
747 org: {target: periods for target, periods in target_periods.items() if target in curr_retain_targets}
748 for org, target_periods in dst_snapshot_plan.items()
749 }
750 opts = ["--delete-dst-snapshots", "--skip-replication"]
751 opts += [f"--delete-dst-snapshots-except-plan={curr_dst_snapshot_plan}"]
752 opts += [f"--daemon-frequency={args.daemon_prune_dst_frequency}"]
753 self.add_log_file_opts(opts, marker, job_id, runpad(), _log_suffix(lhn, src_host, dst_hostname))
754 self.add_ssh_opts(
755 opts, ssh_dst_user=ssh_dst_user, ssh_dst_port=ssh_dst_port, ssh_dst_config_file=ssh_dst_config_file
756 )
757 opts += [POSIX_END_OF_OPTIONS_MARKER]
758 dataset_pairs = [(dummy, resolve_dst_dataset(dst_hostname, dst)) for src, dst in args.root_dataset_pairs]
759 dataset_pairs = _dedupe(dataset_pairs)
760 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs, worker_timeout_seconds)
761 if len(dataset_pairs) > 0:
762 subjobs[subjob_name + jpad(j, marker)] = bzfs_prog_header + opts + _flatten(dataset_pairs)
763 j += 1
764 subjob_name = update_subjob_name(marker)
766 def monitor_snapshots_opts(tag: str, monitor_plan: dict, logsuffix: str) -> list[str]:
767 """Returns monitor subjob options for ``tag`` and ``monitor_plan``."""
768 opts = [f"--monitor-snapshots={monitor_plan}", "--skip-replication"]
769 opts += [f"--daemon-frequency={args.daemon_monitor_snapshots_frequency}"]
770 self.add_log_file_opts(opts, tag, job_id, runpad(), logsuffix)
771 return opts
773 def build_monitor_plan(monitor_plan: dict, snapshot_plan: dict, cycles_prefix: str) -> dict:
774 """Expands ``monitor_plan`` with cycle defaults from ``snapshot_plan``."""
776 def alert_dicts(alertdict: dict, cycles: int) -> dict:
777 """Returns alert dictionaries with explicit ``cycles`` value."""
778 latest_dict = alertdict.copy()
779 for prefix in ("src_snapshot_", "dst_snapshot_", ""):
780 latest_dict.pop(f"{prefix}cycles", None)
781 oldest_dict = latest_dict.copy()
782 oldest_dict["cycles"] = int(alertdict.get(f"{cycles_prefix}cycles", cycles))
783 latest_dict.pop("oldest_skip_holds", None)
784 return {"latest": latest_dict, "oldest": oldest_dict}
786 return {
787 org: {
788 target: {
789 periodunit: alert_dicts(alertdict, snapshot_plan.get(org, {}).get(target, {}).get(periodunit, 1))
790 for periodunit, alertdict in periods.items()
791 }
792 for target, periods in target_periods.items()
793 }
794 for org, target_periods in monitor_plan.items()
795 }
797 if args.monitor_src_snapshots:
798 marker = "monitor-src-snapshots"
799 monitor_plan = build_monitor_plan(monitor_snapshot_plan, src_snapshot_plan, "src_snapshot_")
800 opts = monitor_snapshots_opts(marker, monitor_plan, src_log_suffix)
801 self.add_ssh_opts( # i.e. dst=src, src=dummy
802 opts, ssh_dst_user=ssh_src_user, ssh_dst_port=ssh_src_port, ssh_dst_config_file=ssh_src_config_file
803 )
804 opts += [POSIX_END_OF_OPTIONS_MARKER]
805 opts += _flatten(_dedupe([(dummy, resolve_dataset(src_host, src)) for src, dst in args.root_dataset_pairs]))
806 subjob_name += "/" + marker
807 subjobs[subjob_name] = bzfs_prog_header + opts
809 if args.monitor_dst_snapshots:
810 j = 0
811 marker = "monitor-dst-snapshots"
812 for dst_hostname, targets in dst_hosts.items():
813 monitor_targets: set[str] = set(targets).intersection(set(retain_dst_targets[dst_hostname]))
814 monitor_plan = { # only retain targets that belong to the host
815 org: {target: periods for target, periods in target_periods.items() if target in monitor_targets}
816 for org, target_periods in monitor_snapshot_plan.items()
817 }
818 monitor_plan = build_monitor_plan(monitor_plan, dst_snapshot_plan, "dst_snapshot_")
819 opts = monitor_snapshots_opts(marker, monitor_plan, _log_suffix(lhn, src_host, dst_hostname))
820 self.add_ssh_opts(
821 opts, ssh_dst_user=ssh_dst_user, ssh_dst_port=ssh_dst_port, ssh_dst_config_file=ssh_dst_config_file
822 )
823 opts += [POSIX_END_OF_OPTIONS_MARKER]
824 dataset_pairs = [(dummy, resolve_dst_dataset(dst_hostname, dst)) for src, dst in args.root_dataset_pairs]
825 dataset_pairs = _dedupe(dataset_pairs)
826 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs, worker_timeout_seconds)
827 if len(dataset_pairs) > 0:
828 subjobs[subjob_name + jpad(j, marker)] = bzfs_prog_header + opts + _flatten(dataset_pairs)
829 j += 1
830 subjob_name = update_subjob_name(marker)
832 msg = f"Ready to run {len(subjobs)} subjobs using {len(src_hosts)}/{nb_src_hosts} src hosts: {src_hosts}, "
833 msg += f"{len(dst_hosts)}/{nb_dst_hosts} dst hosts: {list(dst_hosts.keys())}"
834 log.info("%s", dry(msg, is_dry_run=self.jobrunner_dryrun))
835 log.log(LOG_TRACE, "subjobs: \n%s", _pretty_print_formatter(subjobs))
836 self.run_subjobs(subjobs, max_workers, worker_timeout_seconds, args.work_period_seconds, args.jitter)
837 ex = self.worst_exception
838 if isinstance(ex, int):
839 assert ex != 0
840 sys.exit(ex)
841 assert ex is None, ex
842 log.info("Succeeded. Bye!")
844 def replication_opts(
845 self,
846 dst_snapshot_plan: dict[str, dict[str, dict[str, int]]],
847 targets: set[str],
848 localhostname: str,
849 src_hostname: str,
850 dst_hostname: str,
851 tag: str,
852 job_id: str,
853 job_run: str,
854 ) -> list[str]:
855 """Returns CLI options for one replication subjob."""
856 log = self.log
857 log.debug("%s", f"Replicating targets {sorted(targets)} from {src_hostname} to {dst_hostname} ...")
858 include_snapshot_plan = { # only replicate targets that belong to the destination host and are relevant
859 org: {
860 target: {
861 duration_unit: duration_amount
862 for duration_unit, duration_amount in periods.items()
863 if duration_amount > 0
864 }
865 for target, periods in target_periods.items()
866 if target in targets
867 }
868 for org, target_periods in dst_snapshot_plan.items()
869 }
870 include_snapshot_plan = { # only replicate orgs that have at least one relevant target_period
871 org: target_periods
872 for org, target_periods in include_snapshot_plan.items()
873 if any(len(periods) > 0 for target, periods in target_periods.items())
874 }
875 opts: list[str] = []
876 if len(include_snapshot_plan) > 0:
877 opts += [f"--include-snapshot-plan={include_snapshot_plan}"]
878 self.add_log_file_opts(opts, tag, job_id, job_run, _log_suffix(localhostname, src_hostname, dst_hostname))
879 return opts
881 def skip_nonexisting_local_dst_pools(
882 self, root_dataset_pairs: list[tuple[str, str]], timeout_secs: float | None = None
883 ) -> list[tuple[str, str]]:
884 """Skip datasets that point to removable destination drives that are not currently (locally) attached, if any."""
886 def zpool(dataset: str) -> str:
887 """Returns pool name portion of ``dataset``."""
888 return dataset.split("/", 1)[0]
890 assert len(root_dataset_pairs) > 0
891 unknown_dst_pools = {zpool(dst) for src, dst in root_dataset_pairs}
892 unknown_dst_pools = unknown_dst_pools.difference(self.cache_known_dst_pools)
894 # Here we treat a zpool as existing if the zpool isn't local, aka if it isn't prefixed with "-:". A remote host
895 # will raise an appropriate error if it turns out that the remote zpool doesn't actually exist.
896 unknown_local_dst_pools = {pool for pool in unknown_dst_pools if pool.startswith("-:")}
897 if len(unknown_local_dst_pools) > 0: # `zfs list` if local
898 existing_pools = {pool[len("-:") :] for pool in unknown_local_dst_pools}
899 cmd = "zfs list -t filesystem,volume -Hp -o name".split(" ") + sorted(existing_pools)
900 sp = subprocess.run(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, text=True, timeout=timeout_secs)
901 if sp.returncode not in (0, 1): # 1 means dataset not found
902 self.die(f"Unexpected error {sp.returncode} on checking for existing local dst pools: {sp.stderr.strip()}")
903 existing_pools = {"-:" + pool for pool in sp.stdout.splitlines() if pool}
904 self.cache_existing_dst_pools.update(existing_pools) # union
905 unknown_remote_dst_pools = unknown_dst_pools.difference(unknown_local_dst_pools)
906 self.cache_existing_dst_pools.update(unknown_remote_dst_pools) # union
907 self.cache_known_dst_pools.update(unknown_dst_pools) # union
908 results: list[tuple[str, str]] = []
909 for src, dst in root_dataset_pairs:
910 if zpool(dst) in self.cache_existing_dst_pools:
911 results.append((src, dst))
912 else:
913 self.log.warning("Skipping dst dataset for which local dst pool does not exist: %s", dst)
914 return results
916 @staticmethod
917 def add_ssh_opts(
918 opts: list[str],
919 *,
920 ssh_src_user: str | None = None,
921 ssh_dst_user: str | None = None,
922 ssh_src_port: int | None = None,
923 ssh_dst_port: int | None = None,
924 ssh_src_config_file: str | None = None,
925 ssh_dst_config_file: str | None = None,
926 ) -> None:
927 """Appends ssh related options to ``opts`` if specified."""
928 assert isinstance(opts, list)
929 opts += [f"--ssh-src-user={ssh_src_user}"] if ssh_src_user else []
930 opts += [f"--ssh-dst-user={ssh_dst_user}"] if ssh_dst_user else []
931 opts += [f"--ssh-src-port={ssh_src_port}"] if ssh_src_port is not None else []
932 opts += [f"--ssh-dst-port={ssh_dst_port}"] if ssh_dst_port is not None else []
933 opts += [f"--ssh-src-config-file={ssh_src_config_file}"] if ssh_src_config_file else []
934 opts += [f"--ssh-dst-config-file={ssh_dst_config_file}"] if ssh_dst_config_file else []
936 @staticmethod
937 def add_log_file_opts(opts: list[str], tag: str, job_id: str, job_run: str, logsuffix: str) -> None:
938 """Appends standard log-file CLI options to ``opts``."""
939 opts += [f"--log-file-prefix={PROG_NAME}{SEP}{tag}{SEP}"]
940 opts += [f"--log-file-infix={SEP}{job_id}"]
941 opts += [f"--log-file-suffix={SEP}{job_run}{logsuffix}{SEP}"]
943 def run_subjobs(
944 self,
945 subjobs: dict[str, list[str]],
946 max_workers: int,
947 timeout_secs: float | None,
948 work_period_seconds: float,
949 jitter: bool,
950 ) -> None:
951 """Executes subjobs sequentially or in parallel, respecting '~' barriers.
953 Design note on subjob failure, isolation and termination:
954 - On subjob failure the subjob's subtree is skipped.
955 - Subjob failures are converted to return codes (not exceptions) so the policy does not invoke a termination handler
956 on such failures and sibling subjobs continue unaffected. This preserves per-subjob isolation, both within a single
957 process (thread-per-subjob mode; default) as well as in process-per-subjob mode (``--spawn-process-per-job``).
958 - The first failure is retained for diagnostics.
959 - Process exit code precedence is: fatal/non-monitor failure > monitor CRITICAL/WARNING > STILL_RUNNING > success.
960 """
961 self.stats = JobStats(len(subjobs))
962 log = self.log
963 num_intervals = 1 + len(subjobs) if jitter else len(subjobs)
964 interval_nanos = 0 if len(subjobs) == 0 else round(1_000_000_000 * max(0.0, work_period_seconds) / num_intervals)
965 assert interval_nanos >= 0
966 if jitter: # randomize job start time to avoid potential thundering herd problems in large distributed systems
967 sleep_nanos = random.SystemRandom().randint(0, interval_nanos)
968 log.info("Jitter: Delaying job start time by sleeping for %s ...", human_readable_duration(sleep_nanos))
969 self.timing.sleep(sleep_nanos) # allows early wakeup on async termination
970 sorted_subjobs: list[str] = sorted(subjobs.keys())
971 spawn_process_per_job: bool = self.spawn_process_per_job
972 log.log(LOG_TRACE, "%s: %s", "spawn_process_per_job", spawn_process_per_job)
973 if process_datasets_in_parallel_and_fault_tolerant(
974 log=log,
975 datasets=sorted_subjobs,
976 process_dataset=lambda subjob, tid, retry: self.run_subjob(
977 subjobs[subjob], name=subjob, timeout_secs=timeout_secs, spawn_process_per_job=spawn_process_per_job
978 )
979 == 0,
980 skip_tree_on_error=lambda subjob: True,
981 skip_on_error="dataset",
982 max_workers=max_workers,
983 interval_nanos=lambda last_update_nanos, dataset, submit_count: interval_nanos,
984 timing=self.timing,
985 termination_handler=self.subprocesses.terminate_process_subtrees,
986 task_name="Subjob",
987 dry_run=False,
988 is_test_mode=self.is_test_mode,
989 ):
990 self.first_exception = DIE_STATUS if self.first_exception is None else self.first_exception
991 self.worst_exception = self.get_worst_exception(self.worst_exception, DIE_STATUS)
992 stats = self.stats
993 jobs_skipped = stats.jobs_all - stats.jobs_started
994 msg = f"{stats}, skipped:" + percent(jobs_skipped, total=stats.jobs_all, print_total=True)
995 log.info("Final Progress: %s", msg)
996 assert stats.jobs_running == 0, msg
997 assert stats.jobs_completed == stats.jobs_started, msg
998 skipped_jobs_dict = {subjob: subjobs[subjob] for subjob in sorted_subjobs if subjob not in stats.started_job_names}
999 if len(skipped_jobs_dict) > 0:
1000 log.debug("Skipped subjobs: \n%s", _pretty_print_formatter(skipped_jobs_dict))
1001 assert jobs_skipped == len(skipped_jobs_dict), msg
1003 def run_subjob(
1004 self, cmd: list[str], name: str, timeout_secs: float | None, spawn_process_per_job: bool
1005 ) -> int | None: # thread-safe
1006 """Executes one worker job and updates shared Stats."""
1007 start_time_nanos = time.monotonic_ns()
1008 returncode = None
1009 log = self.log
1010 cmd_str = " ".join(cmd)
1011 stats = self.stats
1012 try:
1013 msg: str = stats.submit_job(name)
1014 log.log(LOG_TRACE, "Starting worker job: %s", cmd_str)
1015 log.info("Progress: %s", msg)
1016 start_time_nanos = time.monotonic_ns()
1017 if spawn_process_per_job:
1018 returncode = self.run_worker_job_spawn_process_per_job(cmd, timeout_secs)
1019 else:
1020 returncode = self.run_worker_job_in_current_thread(cmd, timeout_secs)
1021 except BaseException as e:
1022 log.error("Worker job failed with unexpected exception: %s for command: %s", e, cmd_str)
1023 raise
1024 else:
1025 elapsed_human: str = human_readable_duration(time.monotonic_ns() - start_time_nanos)
1026 if returncode != 0:
1027 with stats.lock:
1028 if self.first_exception is None:
1029 self.first_exception = DIE_STATUS if returncode is None else returncode
1030 self.worst_exception = self.get_worst_exception(self.worst_exception, returncode)
1031 log.error("Worker job failed with exit code %s in %s: %s", returncode, elapsed_human, cmd_str)
1032 else:
1033 log.debug("Worker job succeeded in %s: %s", elapsed_human, cmd_str)
1034 return returncode
1035 finally:
1036 msg = stats.complete_job(failed=returncode != 0, elapsed_nanos=time.monotonic_ns() - start_time_nanos)
1037 log.info("Progress: %s", msg)
1039 def run_worker_job_in_current_thread(self, cmd: list[str], timeout_secs: float | None) -> int | None:
1040 """Runs ``bzfs`` in-process and return its exit code."""
1041 log = self.log
1042 if timeout_secs is not None:
1043 cmd = cmd[0:1] + [f"--timeout={round(1000 * timeout_secs)}milliseconds"] + cmd[1:]
1044 try:
1045 if not self.jobrunner_dryrun:
1046 self._bzfs_run_main(cmd)
1047 return 0
1048 except subprocess.CalledProcessError as e:
1049 return bzfs.normalize_called_process_error(e)
1050 except SystemExit as e:
1051 assert e.code is None or isinstance(e.code, int)
1052 return e.code
1053 except BaseException:
1054 log.exception("Worker job failed with unexpected exception for command: %s", " ".join(cmd))
1055 return DIE_STATUS
1057 def _bzfs_run_main(self, cmd: list[str]) -> None:
1058 """Delegates execution to :mod:`bzfs` using parsed arguments."""
1059 bzfs_job = bzfs.Job(termination_event=self.termination_event)
1060 bzfs_job.is_test_mode = self.is_test_mode
1061 bzfs_job.run_main(bzfs.argument_parser().parse_args(cmd[1:]), cmd)
1063 def run_worker_job_spawn_process_per_job(self, cmd: list[str], timeout_secs: float | None) -> int | None:
1064 """Spawns a subprocess for the worker job and waits for completion."""
1065 log = self.log
1066 if len(cmd) > 0 and cmd[0] == BZFS_PROG_NAME:
1067 cmd = [sys.executable, "-m", "bzfs_main." + cmd[0]] + cmd[1:]
1068 if self.jobrunner_dryrun:
1069 return 0
1071 with self.subprocesses.popen_and_track(cmd, stdin=subprocess.DEVNULL, text=True) as proc:
1072 try:
1073 if self.termination_event.is_set():
1074 timeout_secs = 1.0 if timeout_secs is None else timeout_secs
1075 raise subprocess.TimeoutExpired(cmd, timeout_secs) # do not wait for normal completion
1076 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to complete and exit normally
1077 except subprocess.TimeoutExpired:
1078 cmd_str = " ".join(cmd)
1079 if self.termination_event.is_set():
1080 log.error("%s", f"Terminating worker job due to async termination request: {cmd_str}")
1081 else:
1082 log.error("%s", f"Terminating worker job as it failed to complete within {timeout_secs}s: {cmd_str}")
1083 proc.terminate() # Sends SIGTERM signal to job subprocess
1084 assert timeout_secs is not None
1085 timeout_secs = min(1.0, timeout_secs)
1086 try:
1087 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to exit
1088 except subprocess.TimeoutExpired:
1089 log.error("%s", f"Killing worker job as it failed to terminate within {timeout_secs}s: {cmd_str}")
1090 terminate_process_subtree(root_pids=[proc.pid]) # Send SIGTERM to process subtree
1091 proc.kill() # Sends SIGKILL signal to job subprocess because SIGTERM wasn't enough
1092 timeout_secs = min(0.025, timeout_secs)
1093 with contextlib.suppress(subprocess.TimeoutExpired):
1094 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to exit
1095 return proc.returncode
1097 @staticmethod
1098 def get_worst_exception(existing_code: int | None, new_code: int | None) -> int:
1099 """Process exit code precedence is: fatal/non-monitor failure > monitor CRITICAL/WARNING > STILL_RUNNING > success."""
1100 new_code = DIE_STATUS if new_code is None else new_code
1101 if existing_code is None:
1102 return new_code
1103 assert existing_code is not None
1104 assert new_code is not None
1106 nonfatal: tuple[int, ...] = (0, bzfs.WARNING_STATUS, bzfs.CRITICAL_STATUS, bzfs.STILL_RUNNING_STATUS)
1107 existing_is_fatal = existing_code not in nonfatal
1108 new_is_fatal = new_code not in nonfatal
1109 if existing_is_fatal and new_is_fatal:
1110 return max(existing_code, new_code)
1111 if existing_is_fatal or new_is_fatal:
1112 return existing_code if existing_is_fatal else new_code
1114 monitor: tuple[int, ...] = (bzfs.WARNING_STATUS, bzfs.CRITICAL_STATUS)
1115 existing_is_monitor = existing_code in monitor
1116 new_is_monitor = new_code in monitor
1117 if existing_is_monitor and new_is_monitor:
1118 return max(existing_code, new_code)
1119 if existing_is_monitor or new_is_monitor:
1120 return existing_code if existing_is_monitor else new_code
1122 assert existing_code in (0, bzfs.STILL_RUNNING_STATUS), existing_code
1123 assert new_code in (0, bzfs.STILL_RUNNING_STATUS), new_code
1124 return max(existing_code, new_code)
1126 def validate_src_hosts(self, src_hosts: list[str]) -> list[str]:
1127 """Checks ``src_hosts`` contains valid hostnames."""
1128 context = "--src-hosts"
1129 self.validate_type(src_hosts, list, context)
1130 for src_hostname in src_hosts:
1131 self.validate_host_name(src_hostname, context)
1132 return src_hosts
1134 def validate_dst_hosts(self, dst_hosts: dict[str, list[str]]) -> dict[str, list[str]]:
1135 """Checks destination hosts dictionary."""
1136 context = "--dst-hosts"
1137 self.validate_type(dst_hosts, dict, context)
1138 for dst_hostname, targets in dst_hosts.items():
1139 self.validate_host_name(dst_hostname, context)
1140 self.validate_type(targets, list, f"{context} targets")
1141 for target in targets:
1142 self.validate_type(target, str, f"{context} target")
1143 return dst_hosts
1145 def validate_dst_root_datasets(self, dst_root_datasets: dict[str, str]) -> dict[str, str]:
1146 """Checks that each destination root dataset string is valid."""
1147 context = "--dst-root-datasets"
1148 self.validate_type(dst_root_datasets, dict, context)
1149 for dst_hostname, dst_root_dataset in dst_root_datasets.items():
1150 self.validate_host_name(dst_hostname, context)
1151 self.validate_type(dst_root_dataset, str, f"{context} root dataset")
1152 return dst_root_datasets
1154 def validate_snapshot_plan(
1155 self, snapshot_plan: dict[str, dict[str, dict[str, int]]], context: str
1156 ) -> dict[str, dict[str, dict[str, int]]]:
1157 """Checks snapshot plan structure and value types."""
1158 self.validate_type(snapshot_plan, dict, context)
1159 for org, target_periods in snapshot_plan.items():
1160 self.validate_type(org, str, f"{context} org")
1161 self.validate_type(target_periods, dict, f"{context} target_periods")
1162 for target, periods in target_periods.items():
1163 self.validate_type(target, str, f"{context} org/target")
1164 self.validate_type(periods, dict, f"{context} org/periods")
1165 for period_unit, period_amount in periods.items():
1166 self.validate_non_empty_string(period_unit, f"{context} org/target/period_unit")
1167 self.validate_non_negative_int(period_amount, f"{context} org/target/period_amount")
1168 return snapshot_plan
1170 def validate_monitor_snapshot_plan(
1171 self, monitor_snapshot_plan: dict[str, dict[str, dict[str, dict[str, str | int]]]]
1172 ) -> dict[str, dict[str, dict[str, dict[str, str | int]]]]:
1173 """Checks snapshot monitoring plan configuration."""
1174 context = "--monitor-snapshot-plan"
1175 self.validate_type(monitor_snapshot_plan, dict, context)
1176 for org, target_periods in monitor_snapshot_plan.items():
1177 self.validate_type(org, str, f"{context} org")
1178 self.validate_type(target_periods, dict, f"{context} target_periods")
1179 for target, periods in target_periods.items():
1180 self.validate_type(target, str, f"{context} org/target")
1181 self.validate_type(periods, dict, f"{context} org/periods")
1182 for period_unit, alert_dict in periods.items():
1183 self.validate_non_empty_string(period_unit, f"{context} org/target/period_unit")
1184 self.validate_type(alert_dict, dict, f"{context} org/target/alert_dict")
1185 for key, value in alert_dict.items():
1186 self.validate_non_empty_string(key, f"{context} org/target/alert_dict/key")
1187 self.validate_type(value, Union[str, int], f"{context} org/target/alert_dict/value")
1188 return monitor_snapshot_plan
1190 def validate_is_subset(self, x: Iterable[str], y: Iterable[str], x_name: str, y_name: str) -> None:
1191 """Raises error if ``x`` contains an item not present in ``y``."""
1192 if isinstance(x, str) or not isinstance(x, Iterable):
1193 self.die(f"{x_name} must be an Iterable")
1194 if isinstance(y, str) or not isinstance(y, Iterable):
1195 self.die(f"{y_name} must be an Iterable")
1196 if not set(x).issubset(set(y)):
1197 diff = sorted(set(x).difference(set(y)))
1198 self.die(f"{x_name} must be a subset of {y_name}. diff: {diff}, {x_name}: {sorted(x)}, {y_name}: {sorted(y)}")
1200 def validate_host_name(self, hostname: str, context: str) -> None:
1201 """Checks host name string."""
1202 self.validate_non_empty_string(hostname, f"{context} hostname")
1203 bzfs.validate_host_name(hostname, context)
1205 def validate_non_empty_string(self, value: str, name: str) -> None:
1206 """Checks that ``value`` is a non-empty string."""
1207 self.validate_type(value, str, name)
1208 if not value:
1209 self.die(f"{name} must not be empty!")
1211 def validate_non_negative_int(self, value: int, name: str) -> None:
1212 """Checks ``value`` is an int >= 0."""
1213 self.validate_type(value, int, name)
1214 if value < 0:
1215 self.die(f"{name} must be a non-negative integer: {value}")
1217 def validate_true(self, expr: Any, msg: str) -> None:
1218 """Raises error if ``expr`` evaluates to ``False``."""
1219 if not bool(expr):
1220 self.die(msg)
1222 def validate_type(self, value: Any, expected_type: Any, name: str) -> None:
1223 """Checks ``value`` is instance of ``expected_type`` or union thereof."""
1224 if hasattr(expected_type, "__origin__") and expected_type.__origin__ is Union: # for compat with python < 3.10
1225 union_types = expected_type.__args__
1226 for t in union_types:
1227 if isinstance(value, t):
1228 return
1229 type_msg = " or ".join([t.__name__ for t in union_types])
1230 self.die(f"{name} must be of type {type_msg} but got {type(value).__name__}: {value}")
1231 elif not isinstance(value, expected_type):
1232 self.die(f"{name} must be of type {expected_type.__name__} but got {type(value).__name__}: {value}")
1234 def parse_src_hosts_from_cli_or_stdin(self, raw_src_hosts: str | None) -> list:
1235 """Resolve --src-hosts from CLI or stdin with robust TTY/empty handling."""
1236 if raw_src_hosts is None:
1237 # If stdin is an interactive TTY, don't block waiting for input; fail clearly instead
1238 try:
1239 is_tty: bool = getattr(sys.stdin, "isatty", lambda: False)()
1240 except Exception:
1241 is_tty = False
1242 if is_tty:
1243 self.die("Missing --src-hosts and stdin is a TTY. Provide --src-hosts or pipe the list on stdin.")
1244 stdin_text: str = sys.stdin.read()
1245 if not stdin_text.strip(): # avoid literal_eval("") SyntaxError and provide a clear message
1246 self.die("Missing --src-hosts and stdin is empty. Provide --src-hosts or pipe a list on stdin.")
1247 raw_src_hosts = stdin_text
1248 try:
1249 value = literal_eval(raw_src_hosts)
1250 except Exception as e:
1251 self.die(f"Invalid --src-hosts format: {e} for input: {raw_src_hosts}")
1252 if not isinstance(value, list):
1253 example: str = format_obj(["hostname1", "hostname2"])
1254 self.die(f"Invalid --src-hosts: expected a Python list literal, e.g. {example} but got: {format_obj(value)}")
1255 return value
1257 def die(self, msg: str) -> NoReturn:
1258 """Log ``msg`` and exit the program."""
1259 self.log.error("%s", msg)
1260 utils.die(msg)
1262 def get_localhost_ips(self) -> set[str]:
1263 """Returns all network addresses of the local host, i.e. all configured addresses on all network interfaces, without
1264 depending on name resolution."""
1265 ips: set[str] = set()
1266 if platform.system() == "Linux":
1267 try:
1268 proc = subprocess.run(["hostname", "-I"], stdin=DEVNULL, stdout=PIPE, text=True, check=True) # noqa: S607
1269 except Exception as e:
1270 self.log.warning("Cannot run 'hostname -I' on localhost: %s", e)
1271 else:
1272 ips = {ip for ip in proc.stdout.strip().split() if ip}
1273 self.log.log(LOG_TRACE, "localhost_ips: %s", sorted(ips))
1274 return ips
1277#############################################################################
1278@final
1279class RejectArgumentAction(argparse.Action):
1280 """An argparse Action that immediately fails if it is ever triggered."""
1282 def __call__(
1283 self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Any, option_string: str | None = None
1284 ) -> None:
1285 """Abort argument parsing if a protected option is seen."""
1286 parser.error(f"Security: Overriding protected argument '{option_string}' is not allowed.")
1289#############################################################################
1290def _dedupe(root_dataset_pairs: list[tuple[str, str]]) -> list[tuple[str, str]]:
1291 """Returns a list with duplicate dataset pairs removed while preserving order."""
1292 return list(dict.fromkeys(root_dataset_pairs))
1295_T = TypeVar("_T")
1298def _flatten(root_dataset_pairs: Iterable[Iterable[_T]]) -> list[_T]:
1299 """Flattens an iterable of pairs into a single list."""
1300 return [item for pair in root_dataset_pairs for item in pair]
1303def _sanitize(filename: str) -> str:
1304 """Replaces potentially problematic characters in ``filename`` with '!'."""
1305 for s in (" ", "..", "/", "\\", SEP):
1306 filename = filename.replace(s, "!")
1307 return filename
1310def _log_suffix(localhostname: str, src_hostname: str, dst_hostname: str) -> str:
1311 """Returns a log file suffix in a format that contains the given hostnames."""
1312 return f"{SEP}{_sanitize(localhostname)}{SEP}{_sanitize(src_hostname)}{SEP}{_sanitize(dst_hostname)}"
1315def _pretty_print_formatter(dictionary: dict[str, Any]) -> Any:
1316 """Lazy JSON formatter used to avoid overhead in disabled log levels."""
1318 @final
1319 class PrettyPrintFormatter:
1320 """Wrapper returning formatted JSON on ``str`` conversion."""
1322 def __str__(self) -> str:
1323 import json
1325 return json.dumps(dictionary, indent=4, sort_keys=True)
1327 return PrettyPrintFormatter()
1330def _detect_loopback_address() -> str:
1331 """Detects if a loopback connection over IPv4 or IPv6 is possible."""
1332 try:
1333 addr = "127.0.0.1"
1334 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
1335 s.bind((addr, 0))
1336 return addr
1337 except OSError:
1338 pass
1340 try:
1341 addr = "::1"
1342 with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
1343 s.bind((addr, 0))
1344 return addr
1345 except OSError:
1346 pass
1348 return ""
1351def convert_ipv6(hostname: str) -> str:
1352 """Supports IPv6 without getting confused by host:dataset colon separator and any colons that may be part of a (valid)
1353 ZFS dataset name."""
1354 return hostname.replace(":", "|") # Also see bzfs.convert_ipv6() for the reverse conversion
1357#############################################################################
1358if __name__ == "__main__": 1358 ↛ 1359line 1358 didn't jump to line 1359 because the condition on line 1358 was never true
1359 main()