Coverage for bzfs_main/bzfs_jobrunner.py: 99%
637 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15# Inline script metadata conforming to https://packaging.python.org/specifications/inline-script-metadata
16# /// script
17# requires-python = ">=3.9"
18# dependencies = []
19# ///
20#
21"""
22* High-level orchestrator that calls `bzfs` as part of complex, periodic workflows to manage backup, replication, and pruning
23 jobs across a fleet of multiple source and destination hosts; driven by a fleet-wide job config file
24 (e.g., `bzfs_job_example.py`).
25* Overview of the bzfs_jobrunner.py codebase:
26* The codebase starts with docs, definition of input data and associated argument parsing of CLI options/parameters.
27* Control flow starts in main(), far below ..., which kicks off a "Job".
28* A Job creates zero or more "subjobs" for each local or remote host, via run_main().
29* It executes the subjobs, serially or in parallel, via run_subjobs(), which in turn delegates parallel job coordination to
30 bzfs.process_datasets_in_parallel_and_fault_tolerant().
31* README_bzfs_jobrunner.md is mostly auto-generated from the ArgumentParser help texts as the source of "truth", via
32update_readme.sh. Simply run that script whenever you change or add ArgumentParser help text.
33"""
35from __future__ import (
36 annotations,
37)
38import argparse
39import contextlib
40import os
41import platform
42import pwd
43import random
44import socket
45import subprocess
46import sys
47import threading
48import time
49import uuid
50from ast import (
51 literal_eval,
52)
53from collections.abc import (
54 Iterable,
55)
56from logging import (
57 Logger,
58)
59from subprocess import (
60 DEVNULL,
61 PIPE,
62)
63from typing import (
64 Any,
65 Final,
66 NoReturn,
67 TypeVar,
68 Union,
69)
71import bzfs_main.argparse_actions
72import bzfs_main.check_range
73import bzfs_main.utils
74from bzfs_main import (
75 bzfs,
76)
77from bzfs_main.argparse_cli import (
78 PROG_AUTHOR,
79)
80from bzfs_main.detect import (
81 DUMMY_DATASET,
82)
83from bzfs_main.loggers import (
84 get_simple_logger,
85 reset_logger,
86 set_logging_runtime_defaults,
87)
88from bzfs_main.parallel_tasktree import (
89 BARRIER_CHAR,
90)
91from bzfs_main.parallel_tasktree_policy import (
92 process_datasets_in_parallel_and_fault_tolerant,
93)
94from bzfs_main.utils import (
95 DIE_STATUS,
96 LOG_TRACE,
97 UMASK,
98 JobStats,
99 Subprocesses,
100 format_dict,
101 format_obj,
102 getenv_bool,
103 human_readable_duration,
104 percent,
105 shuffle_dict,
106 terminate_process_subtree,
107 termination_signal_handler,
108)
109from bzfs_main.utils import PROG_NAME as BZFS_PROG_NAME
111# constants:
112PROG_NAME: Final[str] = "bzfs_jobrunner"
113SRC_MAGIC_SUBSTITUTION_TOKEN: Final[str] = "^SRC_HOST" # noqa: S105
114DST_MAGIC_SUBSTITUTION_TOKEN: Final[str] = "^DST_HOST" # noqa: S105
115SEP: Final[str] = ","
116POSIX_END_OF_OPTIONS_MARKER: Final[str] = "--" # args following -- are treated as operands, even if they begin with a hyphen
119def argument_parser() -> argparse.ArgumentParser:
120 """Returns the CLI parser used by bzfs_jobrunner."""
121 # fmt: off
122 parser = argparse.ArgumentParser(
123 prog=PROG_NAME,
124 allow_abbrev=False,
125 formatter_class=argparse.RawTextHelpFormatter,
126 description=f"""
127This program is a convenience wrapper around [bzfs](README.md) that simplifies periodic ZFS snapshot creation, replication,
128pruning, and monitoring, across a fleet of N source hosts and M destination hosts, using a single fleet-wide shared
129[jobconfig](bzfs_tests/bzfs_job_example.py) script.
130For example, this simplifies the deployment of an efficient geo-replicated backup service where each of the M destination
131hosts is located in a separate geographic region and receives replicas from (the same set of) N source hosts. It also
132simplifies low latency replication from a primary to a secondary or to M read replicas, or backup to removable drives, etc.
134This program can be used to efficiently replicate ...
136a) within a single machine (local mode), or
138b) from a single source host to one or more destination hosts (pull or push or pull-push mode), or
140c) from multiple source hosts to a single destination host (pull or push or pull-push mode), or
142d) from N source hosts to M destination hosts (pull or push or pull-push mode, N and M can be large, M=2 or M=3 are typical
143geo-replication factors)
145You can run this program on a single third-party host and have that talk to all source hosts and destination hosts, which is
146convenient for basic use cases and for testing.
147However, typically, a cron job on each source host runs `{PROG_NAME}` periodically to create new snapshots (via
148--create-src-snapshots) and prune outdated snapshots and bookmarks on the source (via --prune-src-snapshots and
149--prune-src-bookmarks), whereas another cron job on each destination host runs `{PROG_NAME}` periodically to prune
150outdated destination snapshots (via --prune-dst-snapshots), and to replicate the recently created snapshots from the source
151to the destination (via --replicate).
152Yet another cron job on each source and each destination runs `{PROG_NAME}` periodically to alert the user if the latest or
153oldest snapshot is somehow too old (via --monitor-src-snapshots and --monitor-dst-snapshots).
154The frequency of these periodic activities can vary by activity, and is typically every second, minute, hour, day, week,
155month and/or year (or multiples thereof).
157Edit the jobconfig script in a central place (e.g. versioned in a git repo), then copy the (very same) shared file onto all
158source hosts and all destination hosts, and add crontab entries (or systemd timers or Monit entries or similar), along these
159lines:
161* crontab on source hosts:
163`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --create-src-snapshots --prune-src-snapshots --prune-src-bookmarks`
165`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --monitor-src-snapshots`
168* crontab on destination hosts:
170`* * * * * testuser /etc/bzfs/bzfs_job_example.py --dst-host="$(hostname)" --replicate --prune-dst-snapshots`
172`* * * * * testuser /etc/bzfs/bzfs_job_example.py --dst-host="$(hostname)" --monitor-dst-snapshots`
174### High Frequency Replication (Experimental Feature)
176Taking snapshots, and/or replicating, from every N milliseconds to every 10 seconds or so is considered high frequency. For
177such use cases, consider that `zfs list -t snapshot` performance degrades as more and more snapshots currently exist within
178the selected datasets, so try to keep the number of currently existing snapshots small, and prune them at a frequency that
179is proportional to the frequency with which snapshots are created. Consider using `--skip-parent` and `--exclude-dataset*`
180filters to limit the selected datasets only to those that require this level of frequency.
182In addition, use the `--daemon-*` options to reduce startup overhead, in combination with splitting the crontab entry (or
183better: high frequency systemd timer) into multiple processes, from a single source host to a single destination host,
184along these lines:
186* crontab on source hosts:
188`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --create-src-snapshots`
190`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --prune-src-snapshots`
192`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --prune-src-bookmarks`
194`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="$(hostname)" --dst-host="foo" --monitor-src-snapshots`
197* crontab on destination hosts:
199`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="bar" --dst-host="$(hostname)" --replicate`
201`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="bar" --dst-host="$(hostname)" --prune-dst-snapshots`
203`* * * * * testuser /etc/bzfs/bzfs_job_example.py --src-host="bar" --dst-host="$(hostname)" --monitor-dst-snapshots`
205The daemon processes work like non-daemon processes except that they loop, handle time events and sleep between events, and
206finally exit after, say, 86400 seconds (whatever you specify via `--daemon-lifetime`). The daemons will subsequently be
207auto-restarted by 'cron', or earlier if they fail. While the daemons are running, 'cron' will attempt to start new
208(unnecessary) daemons but this is benign as these new processes immediately exit with a message like this:
209"Exiting as same previous periodic job is still running without completion yet"
210""")
212 # commands:
213 parser.add_argument(
214 "--create-src-snapshots", action="store_true",
215 help="Take snapshots on the selected source hosts as necessary. Typically, this command should be called by a "
216 "program (or cron job) running on each src host.\n\n")
217 parser.add_argument( # `choices` are deprecated; use --replicate without argument instead
218 "--replicate", choices=["pull", "push"], default=None, const="pull", nargs="?", metavar="",
219 help="Replicate snapshots from the selected source hosts to the selected destinations hosts as necessary. For pull "
220 "mode (recommended), this command should be called by a program (or cron job) running on each dst "
221 "host; for push mode, on the src host; for pull-push mode on a third-party host.\n\n")
222 parser.add_argument(
223 "--prune-src-snapshots", action="store_true",
224 help="Prune snapshots on the selected source hosts as necessary. Typically, this command should be called by a "
225 "program (or cron job) running on each src host.\n\n")
226 parser.add_argument(
227 "--prune-src-bookmarks", action="store_true",
228 help="Prune bookmarks on the selected source hosts as necessary. Typically, this command should be called by a "
229 "program (or cron job) running on each src host.\n\n")
230 parser.add_argument(
231 "--prune-dst-snapshots", action="store_true",
232 help="Prune snapshots on the selected destination hosts as necessary. Typically, this command should be called by a "
233 "program (or cron job) running on each dst host.\n\n")
234 parser.add_argument(
235 "--monitor-src-snapshots", action="store_true",
236 help="Alert the user if snapshots on the selected source hosts are too old, using --monitor-snapshot-plan (see "
237 "below). Typically, this command should be called by a program (or cron job) running on each src host.\n\n")
238 parser.add_argument(
239 "--monitor-dst-snapshots", action="store_true",
240 help="Alert the user if snapshots on the selected destination hosts are too old, using --monitor-snapshot-plan (see "
241 "below). Typically, this command should be called by a program (or cron job) running on each dst host.\n\n")
243 # options:
244 parser.add_argument(
245 "--localhost", default=None, action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING",
246 help="Hostname of localhost. Default is the hostname without the domain name, querying the Operating System.\n\n")
247 parser.add_argument(
248 "--src-hosts", default=None, metavar="LIST_STRING",
249 help="Hostnames of the sources to operate on.\n\n")
250 parser.add_argument(
251 "--src-host", default=None, action="append", metavar="STRING",
252 help="For subsetting --src-hosts; Can be specified multiple times; Indicates to only use the --src-hosts that are "
253 "contained in the specified --src-host values (optional).\n\n")
254 dst_hosts_example = {"nas": ["onsite"], "bak-us-west-1": ["us-west-1"],
255 "bak-eu-west-1": ["eu-west-1"], "archive": ["offsite"]}
256 parser.add_argument(
257 "--dst-hosts", default="{}", metavar="DICT_STRING",
258 help="Dictionary that maps each destination hostname to a list of zero or more logical replication target names "
259 "(the infix portion of snapshot name). "
260 f"Example: `{format_dict(dst_hosts_example)}`.\n\n"
261 "With this, given a snapshot name, we can find the destination hostnames to which the snapshot shall be "
262 "replicated. Also, given a snapshot name and its own name, a destination host can determine if it shall "
263 "replicate the given snapshot from the source host, or if the snapshot is intended for another destination "
264 "host, in which case it skips the snapshot. A destination host will receive replicas of snapshots for all "
265 "targets that map to that destination host.\n\n"
266 "Removing a mapping can be used to temporarily suspend replication to a given destination host.\n\n")
267 parser.add_argument(
268 "--dst-host", default=None, action="append", metavar="STRING",
269 help="For subsetting --dst-hosts; Can be specified multiple times; Indicates to only use the --dst-hosts keys that "
270 "are contained in the specified --dst-host values (optional).\n\n")
271 parser.add_argument(
272 "--retain-dst-targets", default="{}", metavar="DICT_STRING",
273 help="Dictionary that maps each destination hostname to a list of zero or more logical replication target names "
274 "(the infix portion of snapshot name). "
275 f"Example: `{format_dict(dst_hosts_example)}`. Has same format as --dst-hosts.\n\n"
276 "As part of --prune-dst-snapshots, a destination host will delete any snapshot it has stored whose target has "
277 "no mapping to that destination host in this dictionary. Do not remove a mapping here unless you are sure it's "
278 "ok to delete all those snapshots on that destination host! If in doubt, use --dryrun mode first.\n\n")
279 dst_root_datasets_example = {
280 "nas": "tank2/bak",
281 "bak-us-west-1": "backups/bak001",
282 "bak-eu-west-1": "backups/bak999",
283 "archive": f"archives/zoo/{SRC_MAGIC_SUBSTITUTION_TOKEN}",
284 "hotspare": "",
285 }
286 parser.add_argument(
287 "--dst-root-datasets", default="{}", metavar="DICT_STRING",
288 help="Dictionary that maps each destination hostname to a root dataset located on that destination host. The root "
289 "dataset name is an (optional) prefix that will be prepended to each dataset that is replicated to that "
290 "destination host. For backup use cases, this is the backup ZFS pool or a ZFS dataset path within that pool, "
291 "whereas for cloning, master slave replication, or replication from a primary to a secondary, this can also be "
292 "the empty string.\n\n"
293 f"`{SRC_MAGIC_SUBSTITUTION_TOKEN}` and `{DST_MAGIC_SUBSTITUTION_TOKEN}` are optional magic substitution tokens "
294 "that will be auto-replaced at runtime with the actual hostname. This can be used to force the use of a "
295 "separate destination root dataset per source host or per destination host.\n\n"
296 f"Example: `{format_dict(dst_root_datasets_example)}`\n\n")
297 src_snapshot_plan_example = {
298 "prod": {
299 "onsite": {"secondly": 40, "minutely": 40, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5},
300 "us-west-1": {"secondly": 0, "minutely": 0, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5},
301 "eu-west-1": {"secondly": 0, "minutely": 0, "hourly": 36, "daily": 31, "weekly": 12, "monthly": 18, "yearly": 5},
302 },
303 "test": {
304 "offsite": {"12hourly": 42, "weekly": 12},
305 },
306 }
307 parser.add_argument(
308 "--src-snapshot-plan", default="{}", metavar="DICT_STRING",
309 help="Retention periods for snapshots to be used if pruning src, and when creating new snapshots on src. "
310 "Snapshots that do not match a retention period will be deleted. A zero or missing retention period indicates "
311 "that no snapshots shall be retained (or even be created) for the given period.\n\n"
312 f"Example: `{format_dict(src_snapshot_plan_example)}`. This example will, for the organization 'prod' and "
313 "the intended logical target 'onsite', create and then retain secondly snapshots that were created less "
314 "than 40 seconds ago, yet retain the latest 40 secondly snapshots regardless of creation time. Analog for "
315 "the latest 40 minutely snapshots, 36 hourly snapshots, etc. "
316 "It will also create and retain snapshots for the targets 'us-west-1' and 'eu-west-1' within the 'prod' "
317 "organization. "
318 "In addition, it will create and retain snapshots every 12 hours and every week for the 'test' organization, "
319 "and name them as being intended for the 'offsite' replication target. "
320 "The example creates snapshots with names like "
321 "`prod_onsite_<timestamp>_secondly`, `prod_onsite_<timestamp>_minutely`, "
322 "`prod_us-west-1_<timestamp>_hourly`, `prod_us-west-1_<timestamp>_daily`, "
323 "`prod_eu-west-1_<timestamp>_hourly`, `prod_eu-west-1_<timestamp>_daily`, "
324 "`test_offsite_<timestamp>_12hourly`, `test_offsite_<timestamp>_weekly`, and so on.\n\n")
325 parser.add_argument(
326 "--src-bookmark-plan", default="{}", metavar="DICT_STRING",
327 help="Retention periods for bookmarks to be used if pruning src. Has same format as --src-snapshot-plan.\n\n")
328 parser.add_argument(
329 "--dst-snapshot-plan", default="{}", metavar="DICT_STRING",
330 help="Retention periods for snapshots to be used if pruning dst. Has same format as --src-snapshot-plan.\n\n")
331 monitor_snapshot_plan_example = {
332 "prod": {
333 "onsite": {
334 "100millisecondly": {"warning": "650 milliseconds", "critical": "2 seconds"},
335 "secondly": {"warning": "2 seconds", "critical": "14 seconds"},
336 "minutely": {"warning": "30 seconds", "critical": "300 seconds"},
337 "hourly": {"warning": "30 minutes", "critical": "300 minutes"},
338 "daily": {"warning": "4 hours", "critical": "8 hours"},
339 "weekly": {"warning": "2 days", "critical": "8 days"},
340 "monthly": {"warning": "2 days", "critical": "8 days"},
341 "yearly": {"warning": "5 days", "critical": "14 days"},
342 "10minutely": {"warning": "0 minutes", "critical": "0 minutes"},
343 },
344 "": {
345 "daily": {"warning": "4 hours", "critical": "8 hours"},
346 },
347 },
348 }
349 parser.add_argument(
350 "--monitor-snapshot-plan", default="{}", metavar="DICT_STRING",
351 help="Alert the user if the ZFS 'creation' time property of the latest or oldest snapshot for any specified "
352 "snapshot pattern within the selected datasets is too old wrt. the specified age limit. The purpose is to "
353 "check if snapshots are successfully taken on schedule, successfully replicated on schedule, and successfully "
354 "pruned on schedule. "
355 "Process exit code is 0, 1, 2 on OK, WARNING, CRITICAL, respectively. "
356 f"Example DICT_STRING: `{format_dict(monitor_snapshot_plan_example)}`. "
357 "This example alerts the user if the latest src or dst snapshot named `prod_onsite_<timestamp>_hourly` is more "
358 "than 30 minutes late (i.e. more than 30+60=90 minutes old) [warning] or more than 300 minutes late (i.e. more "
359 "than 300+60=360 minutes old) [critical]. In addition, the example alerts the user if the oldest src or dst "
360 "snapshot named `prod_onsite_<timestamp>_hourly` is more than 30 + 60x36 minutes old [warning] or more than "
361 "300 + 60x36 minutes old [critical], where 36 is the number of period cycles specified in `src_snapshot_plan` "
362 "or `dst_snapshot_plan`, respectively. "
363 "Analog for the latest snapshot named `prod_<timestamp>_daily`, and so on.\n\n"
364 "Note: A duration that is missing or zero (e.g. '0 minutes') indicates that no snapshots shall be checked for "
365 "the given snapshot name pattern.\n\n")
366 locations = ["src", "dst"]
367 for loc in locations:
368 parser.add_argument(
369 f"--ssh-{loc}-user", default="", metavar="STRING",
370 help=f"Remote SSH username on {loc} hosts to connect to (optional). Examples: 'root', 'alice'.\n\n")
371 for loc in locations:
372 parser.add_argument(
373 f"--ssh-{loc}-port", type=int, min=1, max=65535, action=bzfs_main.check_range.CheckRange, metavar="INT",
374 help=f"Remote SSH port on {loc} host to connect to (optional).\n\n")
375 for loc in locations:
376 parser.add_argument(
377 f"--ssh-{loc}-config-file", type=str, action=bzfs_main.argparse_actions.SSHConfigFileNameAction, metavar="FILE",
378 help=f"Path to SSH ssh_config(5) file to connect to {loc} (optional); will be passed into ssh -F CLI. "
379 "The basename must contain the substring 'bzfs_ssh_config'.\n\n")
380 parser.add_argument(
381 "--src-user", default="", metavar="STRING",
382 help=argparse.SUPPRESS) # deprecated; was renamed to --ssh-src-user
383 parser.add_argument(
384 "--dst-user", default="", metavar="STRING",
385 help=argparse.SUPPRESS) # deprecated; was renamed to --ssh-dst-user
386 parser.add_argument(
387 "--job-id", required=True, action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING",
388 help="The identifier that remains constant across all runs of this particular job; will be included in the log file "
389 "name infix. Example: mytestjob\n\n")
390 parser.add_argument(
391 "--jobid", default=None, action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING",
392 help=argparse.SUPPRESS) # deprecated; was renamed to --job-run
393 parser.add_argument(
394 "--job-run", default=None, action=bzfs_main.argparse_actions.NonEmptyStringAction, metavar="STRING",
395 help="The identifier of this particular run of the overall job; will be included in the log file name suffix. "
396 "Default is a hex UUID. Example: 0badc0f003a011f0a94aef02ac16083c\n\n")
397 workers_default = 100 # percent
398 parser.add_argument(
399 "--workers", min=1, default=(workers_default, True), action=bzfs_main.argparse_actions.CheckPercentRange,
400 metavar="INT[%]",
401 help="The maximum number of jobs to run in parallel at any time; can be given as a positive integer, "
402 f"optionally followed by the %% percent character (min: %(min)s, default: {workers_default}%%). Percentages "
403 "are relative to the number of CPU cores on the machine. Example: 200%% uses twice as many parallel jobs as "
404 "there are cores on the machine; 75%% uses num_procs = num_cores * 0.75. Examples: 1, 4, 75%%, 150%%\n\n")
405 parser.add_argument(
406 "--work-period-seconds", type=float, min=0, default=0, action=bzfs_main.check_range.CheckRange, metavar="FLOAT",
407 help="Reduces bandwidth spikes by spreading out the start of worker jobs over this much time; "
408 "0 disables this feature (default: %(default)s). Examples: 0, 60, 86400\n\n")
409 parser.add_argument(
410 "--jitter", action="store_true",
411 help="Randomize job start time and host order to avoid potential thundering herd problems in large distributed "
412 "systems (optional). Randomizing job start time is only relevant if --work-period-seconds > 0.\n\n")
413 parser.add_argument(
414 "--worker-timeout-seconds", type=float, min=0.001, default=None, action=bzfs_main.check_range.CheckRange,
415 metavar="FLOAT",
416 help="If this much time has passed after a worker process has started executing, kill the straggling worker "
417 "(optional). Other workers remain unaffected. Examples: 60, 3600\n\n")
418 parser.add_argument(
419 "--spawn-process-per-job", action="store_true",
420 help="Spawn a Python process per subjob instead of a Python thread per subjob (optional). The former is only "
421 "recommended for a job operating in parallel on a large number of hosts as it helps avoid exceeding "
422 "per-process limits such as the default max number of open file descriptors, at the expense of increased "
423 "startup latency.\n\n")
424 parser.add_argument(
425 "--jobrunner-dryrun", action="store_true",
426 help="Do a dry run (aka 'no-op') to print what operations would happen if the command were to be executed "
427 "for real (optional). This option treats both the ZFS source and destination as read-only. Can also be used to "
428 "check if the configuration options are valid.\n\n")
429 parser.add_argument(
430 "--jobrunner-log-level", choices=["CRITICAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"], default="INFO",
431 help="Only emit jobrunner messages with equal or higher priority than this log level. Default is '%(default)s'.\n\n")
432 parser.add_argument(
433 "--daemon-replication-frequency", default="minutely", metavar="STRING",
434 help="Specifies how often the bzfs daemon shall replicate from src to dst if --daemon-lifetime is nonzero.\n\n")
435 parser.add_argument(
436 "--daemon-prune-src-frequency", default="minutely", metavar="STRING",
437 help="Specifies how often the bzfs daemon shall prune src if --daemon-lifetime is nonzero.\n\n")
438 parser.add_argument(
439 "--daemon-prune-dst-frequency", default="minutely", metavar="STRING",
440 help="Specifies how often the bzfs daemon shall prune dst if --daemon-lifetime is nonzero.\n\n")
441 parser.add_argument(
442 "--daemon-monitor-snapshots-frequency", default="minutely", metavar="STRING",
443 help="Specifies how often the bzfs daemon shall monitor snapshot age if --daemon-lifetime is nonzero.\n\n")
444 bad_opts = ["--daemon-frequency", "--include-snapshot-plan", "--create-src-snapshots-plan", "--skip-replication",
445 "--log-file-prefix", "--log-file-infix", "--log-file-suffix",
446 "--delete-dst-datasets", "--delete-dst-snapshots", "--delete-dst-snapshots-except",
447 "--delete-dst-snapshots-except-plan", "--delete-empty-dst-datasets",
448 "--monitor-snapshots", "--timeout"]
449 for loc in locations:
450 bad_opts += [f"--ssh-{loc}-host"] # reject this arg as jobrunner will auto-generate it
451 for bad_opt in bad_opts:
452 parser.add_argument(bad_opt, action=RejectArgumentAction, nargs=0, help=argparse.SUPPRESS)
453 parser.add_argument(
454 "--version", action="version", version=f"{PROG_NAME}-{bzfs_main.argparse_cli.__version__}, by {PROG_AUTHOR}",
455 help="Display version information and exit.\n\n")
456 parser.add_argument(
457 "--help, -h", action="help", # trick to ensure both --help and -h are shown in the help msg
458 help="Show this help message and exit.\n\n")
459 parser.add_argument(
460 "--root-dataset-pairs", required=True, nargs="+", action=bzfs_main.argparse_actions.DatasetPairsAction,
461 metavar="SRC_DATASET DST_DATASET",
462 help="Source and destination dataset pairs (excluding usernames and excluding hostnames, which will all be "
463 "auto-appended later).\n\n")
464 return parser
465 # fmt: on
468#############################################################################
469def main() -> None:
470 """API for command line clients."""
471 prev_umask: int = os.umask(UMASK)
472 try:
473 set_logging_runtime_defaults()
474 # On CTRL-C and SIGTERM, send signal to all descendant processes to terminate them
475 termination_event: threading.Event = threading.Event()
476 with termination_signal_handler(termination_event=termination_event):
477 Job(log=None, termination_event=termination_event).run_main(sys_argv=sys.argv)
478 finally:
479 os.umask(prev_umask) # restore prior global state
482#############################################################################
483class Job:
484 """Coordinates subjobs per the CLI flags; Each subjob handles one host pair and may run in its own process or thread."""
486 def __init__(self, log: Logger | None, termination_event: threading.Event) -> None:
487 # immutable variables:
488 self.log_was_None: Final[bool] = log is None
489 self.log: Final[Logger] = get_simple_logger(PROG_NAME) if log is None else log
490 self.termination_event: Final[threading.Event] = termination_event
491 self.subprocesses: Final[Subprocesses] = Subprocesses()
492 self.jobrunner_dryrun: bool = False
493 self.spawn_process_per_job: bool = False
494 self.loopback_address: Final[str] = _detect_loopback_address()
496 # mutable variables:
497 self.first_exception: int | None = None
498 self.stats: JobStats = JobStats(jobs_all=0)
499 self.cache_existing_dst_pools: set[str] = set()
500 self.cache_known_dst_pools: set[str] = set()
502 self.is_test_mode: bool = False # for testing only
504 def run_main(self, sys_argv: list[str]) -> None:
505 """API for Python clients; visible for testing; may become a public API eventually."""
506 try:
507 self._run_main(sys_argv)
508 finally:
509 if self.log_was_None: # reset Logger unless it's a Logger outside of our control 509 ↛ exitline 509 didn't return from function 'run_main' because the condition on line 509 was always true
510 reset_logger(self.log)
512 def _run_main(self, sys_argv: list[str]) -> None:
513 self.first_exception = None
514 log: Logger = self.log
515 log.info("CLI arguments: %s", " ".join(sys_argv))
516 nsp = argparse.Namespace(no_argument_file=True) # disable --root-dataset-pairs='+file' option in DatasetPairsAction
517 args, unknown_args = argument_parser().parse_known_args(sys_argv[1:], nsp) # forward all unknown args to `bzfs`
518 log.setLevel(args.jobrunner_log_level)
519 self.jobrunner_dryrun = args.jobrunner_dryrun
520 assert len(args.root_dataset_pairs) > 0
521 src_snapshot_plan: dict = self.validate_snapshot_plan(literal_eval(args.src_snapshot_plan), "--src-snapshot-plan")
522 src_bookmark_plan: dict = self.validate_snapshot_plan(literal_eval(args.src_bookmark_plan), "--src-bookmark-plan")
523 dst_snapshot_plan: dict = self.validate_snapshot_plan(literal_eval(args.dst_snapshot_plan), "--dst-snapshot-plan")
524 monitor_snapshot_plan: dict = self.validate_monitor_snapshot_plan(literal_eval(args.monitor_snapshot_plan))
525 localhostname: str = args.localhost if args.localhost else socket.gethostname()
526 self.validate_host_name(localhostname, "--localhost")
527 log.debug("localhostname: %s", localhostname)
528 src_hosts: list[str] = self.validate_src_hosts(self.parse_src_hosts_from_cli_or_stdin(args.src_hosts))
529 basis_src_hosts: list[str] = src_hosts
530 nb_src_hosts: int = len(basis_src_hosts)
531 log.debug("src_hosts before subsetting: %s", src_hosts)
532 if args.src_host is not None: # retain only the src hosts that are also contained in args.src_host
533 assert isinstance(args.src_host, list)
534 retain_src_hosts: set[str] = set(args.src_host)
535 self.validate_is_subset(retain_src_hosts, src_hosts, "--src-host", "--src-hosts")
536 src_hosts = [host for host in src_hosts if host in retain_src_hosts]
537 dst_hosts: dict[str, list[str]] = self.validate_dst_hosts(literal_eval(args.dst_hosts))
538 nb_dst_hosts: int = len(dst_hosts)
539 if args.dst_host is not None: # retain only the dst hosts that are also contained in args.dst_host
540 assert isinstance(args.dst_host, list)
541 retain_dst_hosts: set[str] = set(args.dst_host)
542 self.validate_is_subset(retain_dst_hosts, dst_hosts.keys(), "--dst-host", "--dst-hosts.keys")
543 dst_hosts = {dst_host: lst for dst_host, lst in dst_hosts.items() if dst_host in retain_dst_hosts}
544 retain_dst_targets: dict[str, list[str]] = self.validate_dst_hosts(literal_eval(args.retain_dst_targets))
545 self.validate_is_subset(dst_hosts.keys(), retain_dst_targets.keys(), "--dst-hosts.keys", "--retain-dst-targets.keys")
546 dst_root_datasets: dict[str, str] = self.validate_dst_root_datasets(literal_eval(args.dst_root_datasets))
547 self.validate_is_subset(
548 dst_root_datasets.keys(), retain_dst_targets.keys(), "--dst-root-dataset.keys", "--retain-dst-targets.keys"
549 )
550 self.validate_is_subset(dst_hosts.keys(), dst_root_datasets.keys(), "--dst-hosts.keys", "--dst-root-dataset.keys")
551 bad_root_datasets: dict[str, str] = {
552 dst_host: root_dataset
553 for dst_host in sorted(dst_hosts.keys())
554 if SRC_MAGIC_SUBSTITUTION_TOKEN not in (root_dataset := dst_root_datasets[dst_host])
555 }
556 if len(src_hosts) > 1 and len(bad_root_datasets) > 0:
557 self.die(
558 "Cowardly refusing to proceed as multiple source hosts must not be configured to write to the same "
559 "destination dataset. "
560 f"Problematic subset of --dst-root-datasets: {bad_root_datasets} for src_hosts: {sorted(src_hosts)}"
561 )
562 bad_root_datasets = {
563 dst_host: root_dataset
564 for dst_host, root_dataset in sorted(dst_root_datasets.items())
565 if root_dataset and SRC_MAGIC_SUBSTITUTION_TOKEN not in root_dataset
566 }
567 if len(basis_src_hosts) > 1 and len(bad_root_datasets) > 0:
568 self.die(
569 "Cowardly refusing to proceed as multiple source hosts are defined in the configuration, but "
570 f"not all non-empty root datasets in --dst-root-datasets contain the '{SRC_MAGIC_SUBSTITUTION_TOKEN}' "
571 "substitution token to prevent collisions on writing destination datasets. "
572 f"Problematic subset of --dst-root-datasets: {bad_root_datasets} for src_hosts: {sorted(basis_src_hosts)}"
573 )
574 if args.jitter: # randomize host order to avoid potential thundering herd problems in large distributed systems
575 random.SystemRandom().shuffle(src_hosts)
576 dst_hosts = shuffle_dict(dst_hosts)
577 ssh_src_user: str = args.ssh_src_user or args.src_user # --src-user is deprecated
578 ssh_dst_user: str = args.ssh_dst_user or args.dst_user # --dst-user is deprecated
579 ssh_src_port: int | None = args.ssh_src_port
580 ssh_dst_port: int | None = args.ssh_dst_port
581 ssh_src_config_file: str | None = args.ssh_src_config_file
582 ssh_dst_config_file: str | None = args.ssh_dst_config_file
583 job_id: str = _sanitize(args.job_id)
584 job_run: str = args.job_run if args.job_run is not None else args.jobid # --jobid deprecat; was renamed to --job-run
585 job_run = _sanitize(job_run) if job_run else uuid.uuid1().hex
586 workers, workers_is_percent = args.workers
587 max_workers: int = max(1, round((os.cpu_count() or 1) * workers / 100.0) if workers_is_percent else round(workers))
588 worker_timeout_seconds: int = args.worker_timeout_seconds
589 self.spawn_process_per_job = args.spawn_process_per_job
590 username: str = pwd.getpwuid(os.getuid()).pw_name
591 assert username
592 loopback_ids: set[str] = {"localhost", "127.0.0.1", "::1", socket.gethostname()} # ::1 is IPv6 loopback address
593 loopback_ids.update(self.get_localhost_ips()) # union
594 loopback_ids.add(localhostname)
595 loopback_ids = set() if getenv_bool("disable_loopback", False) else loopback_ids
596 log.log(LOG_TRACE, "loopback_ids: %s", sorted(loopback_ids))
598 def zero_pad(number: int, width: int = 6) -> str:
599 """Pads number with leading '0' chars to the given width."""
600 return f"{number:0{width}d}"
602 def jpad(jj: int, tag: str) -> str:
603 """Returns ``tag`` prefixed with slash and zero padded index."""
604 return "/" + zero_pad(jj) + tag
606 def runpad() -> str:
607 """Returns standardized subjob count suffix."""
608 return job_run + SEP + zero_pad(len(subjobs))
610 def update_subjob_name(tag: str) -> str:
611 """Derives next subjob name based on ``tag`` and index ``j``."""
612 if j <= 0:
613 return subjob_name
614 elif j == 1:
615 return subjob_name + jpad(j - 1, tag)
616 else:
617 return subjob_name + "/" + BARRIER_CHAR
619 def resolve_dataset(hostname: str, dataset: str, is_src: bool = True) -> str:
620 """Returns host:dataset string resolving IPv6 and localhost cases."""
621 assert hostname
622 assert dataset
623 ssh_user = ssh_src_user if is_src else ssh_dst_user
624 ssh_user = ssh_user if ssh_user else username
625 lb: str = self.loopback_address
626 loopbck_ids: set[str] = loopback_ids
627 hostname = hostname if hostname not in loopbck_ids else (lb if lb else hostname) if username != ssh_user else "-"
628 hostname = convert_ipv6(hostname)
629 return f"{hostname}:{dataset}"
631 def resolve_dst_dataset(dst_hostname: str, dst_dataset: str) -> str:
632 """Expands ``dst_dataset`` relative to ``dst_hostname`` roots."""
633 assert dst_hostname
634 assert dst_dataset
635 root_dataset: str | None = dst_root_datasets.get(dst_hostname)
636 assert root_dataset is not None, dst_hostname # f"Hostname '{dst_hostname}' missing in --dst-root-datasets"
637 root_dataset = root_dataset.replace(SRC_MAGIC_SUBSTITUTION_TOKEN, src_host)
638 root_dataset = root_dataset.replace(DST_MAGIC_SUBSTITUTION_TOKEN, dst_hostname)
639 resolved_dst_dataset: str = f"{root_dataset}/{dst_dataset}" if root_dataset else dst_dataset
640 bzfs_main.utils.validate_dataset_name(resolved_dst_dataset, dst_dataset)
641 return resolve_dataset(dst_hostname, resolved_dst_dataset, is_src=False)
643 for src_host in src_hosts:
644 assert src_host
645 for dst_hostname in dst_hosts:
646 assert dst_hostname
647 dummy: Final[str] = DUMMY_DATASET
648 lhn: Final[str] = localhostname
649 bzfs_prog_header: Final[list[str]] = [BZFS_PROG_NAME, "--no-argument-file"] + unknown_args
650 subjobs: dict[str, list[str]] = {}
651 for i, src_host in enumerate(src_hosts):
652 subjob_name: str = zero_pad(i) + "src-host"
653 src_log_suffix: str = _log_suffix(localhostname, src_host, "")
654 j: int = 0
655 opts: list[str]
657 if args.create_src_snapshots:
658 opts = ["--create-src-snapshots", f"--create-src-snapshots-plan={src_snapshot_plan}", "--skip-replication"]
659 self.add_log_file_opts(opts, "create-src-snapshots", job_id, runpad(), src_log_suffix)
660 self.add_ssh_opts(
661 opts, ssh_src_user=ssh_src_user, ssh_src_port=ssh_src_port, ssh_src_config_file=ssh_src_config_file
662 )
663 opts += [POSIX_END_OF_OPTIONS_MARKER]
664 opts += _flatten(_dedupe([(resolve_dataset(src_host, src), dummy) for src, dst in args.root_dataset_pairs]))
665 subjob_name += "/create-src-snapshots"
666 subjobs[subjob_name] = bzfs_prog_header + opts
668 if args.replicate:
669 j = 0
670 marker: str = "replicate"
671 for dst_hostname, targets in dst_hosts.items():
672 opts = self.replication_opts(
673 dst_snapshot_plan, set(targets), lhn, src_host, dst_hostname, marker, job_id, runpad()
674 )
675 if len(opts) > 0:
676 opts += [f"--daemon-frequency={args.daemon_replication_frequency}"]
677 self.add_ssh_opts(
678 opts,
679 ssh_src_user=ssh_src_user,
680 ssh_dst_user=ssh_dst_user,
681 ssh_src_port=ssh_src_port,
682 ssh_dst_port=ssh_dst_port,
683 ssh_src_config_file=ssh_src_config_file,
684 ssh_dst_config_file=ssh_dst_config_file,
685 )
686 opts += [POSIX_END_OF_OPTIONS_MARKER]
687 dataset_pairs: list[tuple[str, str]] = [
688 (resolve_dataset(src_host, src), resolve_dst_dataset(dst_hostname, dst))
689 for src, dst in args.root_dataset_pairs
690 ]
691 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs, worker_timeout_seconds)
692 if len(dataset_pairs) > 0:
693 subjobs[subjob_name + jpad(j, marker)] = bzfs_prog_header + opts + _flatten(dataset_pairs)
694 j += 1
695 subjob_name = update_subjob_name(marker)
697 def prune_src(
698 opts: list[str], retention_plan: dict, tag: str, src_host: str = src_host, logsuffix: str = src_log_suffix
699 ) -> None:
700 """Creates prune subjob options for ``tag`` using ``retention_plan``."""
701 opts += ["--skip-replication", f"--delete-dst-snapshots-except-plan={retention_plan}"]
702 opts += [f"--daemon-frequency={args.daemon_prune_src_frequency}"]
703 self.add_log_file_opts(opts, tag, job_id, runpad(), logsuffix)
704 self.add_ssh_opts( # i.e. dst=src, src=dummy
705 opts, ssh_dst_user=ssh_src_user, ssh_dst_port=ssh_src_port, ssh_dst_config_file=ssh_src_config_file
706 )
707 opts += [POSIX_END_OF_OPTIONS_MARKER]
708 opts += _flatten(_dedupe([(dummy, resolve_dataset(src_host, src)) for src, dst in args.root_dataset_pairs]))
709 nonlocal subjob_name
710 subjob_name += f"/{tag}"
711 subjobs[subjob_name] = bzfs_prog_header + opts
713 if args.prune_src_snapshots:
714 prune_src(["--delete-dst-snapshots"], src_snapshot_plan, "prune-src-snapshots")
716 if args.prune_src_bookmarks:
717 prune_src(["--delete-dst-snapshots=bookmarks"], src_bookmark_plan, "prune-src-bookmarks")
719 if args.prune_dst_snapshots:
720 self.validate_true(
721 retain_dst_targets, "--retain-dst-targets must not be empty. Cowardly refusing to delete all snapshots!"
722 )
723 j = 0
724 marker = "prune-dst-snapshots"
725 for dst_hostname, _ in dst_hosts.items():
726 curr_retain_targets: set[str] = set(retain_dst_targets[dst_hostname])
727 curr_dst_snapshot_plan = { # only retain targets that belong to the host
728 org: {target: periods for target, periods in target_periods.items() if target in curr_retain_targets}
729 for org, target_periods in dst_snapshot_plan.items()
730 }
731 opts = ["--delete-dst-snapshots", "--skip-replication"]
732 opts += [f"--delete-dst-snapshots-except-plan={curr_dst_snapshot_plan}"]
733 opts += [f"--daemon-frequency={args.daemon_prune_dst_frequency}"]
734 self.add_log_file_opts(opts, marker, job_id, runpad(), _log_suffix(lhn, src_host, dst_hostname))
735 self.add_ssh_opts(
736 opts, ssh_dst_user=ssh_dst_user, ssh_dst_port=ssh_dst_port, ssh_dst_config_file=ssh_dst_config_file
737 )
738 opts += [POSIX_END_OF_OPTIONS_MARKER]
739 dataset_pairs = [(dummy, resolve_dst_dataset(dst_hostname, dst)) for src, dst in args.root_dataset_pairs]
740 dataset_pairs = _dedupe(dataset_pairs)
741 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs, worker_timeout_seconds)
742 if len(dataset_pairs) > 0:
743 subjobs[subjob_name + jpad(j, marker)] = bzfs_prog_header + opts + _flatten(dataset_pairs)
744 j += 1
745 subjob_name = update_subjob_name(marker)
747 def monitor_snapshots_opts(tag: str, monitor_plan: dict, logsuffix: str) -> list[str]:
748 """Returns monitor subjob options for ``tag`` and ``monitor_plan``."""
749 opts = [f"--monitor-snapshots={monitor_plan}", "--skip-replication"]
750 opts += [f"--daemon-frequency={args.daemon_monitor_snapshots_frequency}"]
751 self.add_log_file_opts(opts, tag, job_id, runpad(), logsuffix)
752 return opts
754 def build_monitor_plan(monitor_plan: dict, snapshot_plan: dict, cycles_prefix: str) -> dict:
755 """Expands ``monitor_plan`` with cycle defaults from ``snapshot_plan``."""
757 def alert_dicts(alertdict: dict, cycles: int) -> dict:
758 """Returns alert dictionaries with explicit ``cycles`` value."""
759 latest_dict = alertdict.copy()
760 for prefix in ("src_snapshot_", "dst_snapshot_", ""):
761 latest_dict.pop(f"{prefix}cycles", None)
762 oldest_dict = latest_dict.copy()
763 oldest_dict["cycles"] = int(alertdict.get(f"{cycles_prefix}cycles", cycles))
764 return {"latest": latest_dict, "oldest": oldest_dict}
766 return {
767 org: {
768 target: {
769 periodunit: alert_dicts(alertdict, snapshot_plan.get(org, {}).get(target, {}).get(periodunit, 1))
770 for periodunit, alertdict in periods.items()
771 }
772 for target, periods in target_periods.items()
773 }
774 for org, target_periods in monitor_plan.items()
775 }
777 if args.monitor_src_snapshots:
778 marker = "monitor-src-snapshots"
779 monitor_plan = build_monitor_plan(monitor_snapshot_plan, src_snapshot_plan, "src_snapshot_")
780 opts = monitor_snapshots_opts(marker, monitor_plan, src_log_suffix)
781 self.add_ssh_opts( # i.e. dst=src, src=dummy
782 opts, ssh_dst_user=ssh_src_user, ssh_dst_port=ssh_src_port, ssh_dst_config_file=ssh_src_config_file
783 )
784 opts += [POSIX_END_OF_OPTIONS_MARKER]
785 opts += _flatten(_dedupe([(dummy, resolve_dataset(src_host, src)) for src, dst in args.root_dataset_pairs]))
786 subjob_name += "/" + marker
787 subjobs[subjob_name] = bzfs_prog_header + opts
789 if args.monitor_dst_snapshots:
790 j = 0
791 marker = "monitor-dst-snapshots"
792 for dst_hostname, targets in dst_hosts.items():
793 monitor_targets = set(targets).intersection(set(retain_dst_targets[dst_hostname]))
794 monitor_plan = { # only retain targets that belong to the host
795 org: {target: periods for target, periods in target_periods.items() if target in monitor_targets}
796 for org, target_periods in monitor_snapshot_plan.items()
797 }
798 monitor_plan = build_monitor_plan(monitor_plan, dst_snapshot_plan, "dst_snapshot_")
799 opts = monitor_snapshots_opts(marker, monitor_plan, _log_suffix(lhn, src_host, dst_hostname))
800 self.add_ssh_opts(
801 opts, ssh_dst_user=ssh_dst_user, ssh_dst_port=ssh_dst_port, ssh_dst_config_file=ssh_dst_config_file
802 )
803 opts += [POSIX_END_OF_OPTIONS_MARKER]
804 dataset_pairs = [(dummy, resolve_dst_dataset(dst_hostname, dst)) for src, dst in args.root_dataset_pairs]
805 dataset_pairs = _dedupe(dataset_pairs)
806 dataset_pairs = self.skip_nonexisting_local_dst_pools(dataset_pairs, worker_timeout_seconds)
807 if len(dataset_pairs) > 0:
808 subjobs[subjob_name + jpad(j, marker)] = bzfs_prog_header + opts + _flatten(dataset_pairs)
809 j += 1
810 subjob_name = update_subjob_name(marker)
812 msg = "Ready to run %s subjobs using %s/%s src hosts: %s, %s/%s dst hosts: %s"
813 log.info(
814 msg, len(subjobs), len(src_hosts), nb_src_hosts, src_hosts, len(dst_hosts), nb_dst_hosts, list(dst_hosts.keys())
815 )
816 log.log(LOG_TRACE, "subjobs: \n%s", _pretty_print_formatter(subjobs))
817 self.run_subjobs(subjobs, max_workers, worker_timeout_seconds, args.work_period_seconds, args.jitter)
818 ex = self.first_exception
819 if isinstance(ex, int):
820 assert ex != 0
821 sys.exit(ex)
822 assert ex is None, ex
823 log.info("Succeeded. Bye!")
825 def replication_opts(
826 self,
827 dst_snapshot_plan: dict[str, dict[str, dict[str, int]]],
828 targets: set[str],
829 localhostname: str,
830 src_hostname: str,
831 dst_hostname: str,
832 tag: str,
833 job_id: str,
834 job_run: str,
835 ) -> list[str]:
836 """Returns CLI options for one replication subjob."""
837 log = self.log
838 log.debug("%s", f"Replicating targets {sorted(targets)} from {src_hostname} to {dst_hostname} ...")
839 include_snapshot_plan = { # only replicate targets that belong to the destination host and are relevant
840 org: {
841 target: {
842 duration_unit: duration_amount
843 for duration_unit, duration_amount in periods.items()
844 if duration_amount > 0
845 }
846 for target, periods in target_periods.items()
847 if target in targets
848 }
849 for org, target_periods in dst_snapshot_plan.items()
850 }
851 include_snapshot_plan = { # only replicate orgs that have at least one relevant target_period
852 org: target_periods
853 for org, target_periods in include_snapshot_plan.items()
854 if any(len(periods) > 0 for target, periods in target_periods.items())
855 }
856 opts: list[str] = []
857 if len(include_snapshot_plan) > 0:
858 opts += [f"--include-snapshot-plan={include_snapshot_plan}"]
859 self.add_log_file_opts(opts, tag, job_id, job_run, _log_suffix(localhostname, src_hostname, dst_hostname))
860 return opts
862 def skip_nonexisting_local_dst_pools(
863 self, root_dataset_pairs: list[tuple[str, str]], timeout_secs: float | None = None
864 ) -> list[tuple[str, str]]:
865 """Skip datasets that point to removable destination drives that are not currently (locally) attached, if any."""
867 def zpool(dataset: str) -> str:
868 """Returns pool name portion of ``dataset``."""
869 return dataset.split("/", 1)[0]
871 assert len(root_dataset_pairs) > 0
872 unknown_dst_pools = {zpool(dst) for src, dst in root_dataset_pairs}
873 unknown_dst_pools = unknown_dst_pools.difference(self.cache_known_dst_pools)
875 # Here we treat a zpool as existing if the zpool isn't local, aka if it isn't prefixed with "-:". A remote host
876 # will raise an appropriate error if it turns out that the remote zpool doesn't actually exist.
877 unknown_local_dst_pools = {pool for pool in unknown_dst_pools if pool.startswith("-:")}
878 if len(unknown_local_dst_pools) > 0: # `zfs list` if local
879 existing_pools = {pool[len("-:") :] for pool in unknown_local_dst_pools}
880 cmd = "zfs list -t filesystem,volume -Hp -o name".split(" ") + sorted(existing_pools)
881 sp = subprocess.run(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, text=True, timeout=timeout_secs)
882 if sp.returncode not in (0, 1): # 1 means dataset not found
883 self.die(f"Unexpected error {sp.returncode} on checking for existing local dst pools: {sp.stderr.strip()}")
884 existing_pools = {"-:" + pool for pool in sp.stdout.splitlines() if pool}
885 self.cache_existing_dst_pools.update(existing_pools) # union
886 unknown_remote_dst_pools = unknown_dst_pools.difference(unknown_local_dst_pools)
887 self.cache_existing_dst_pools.update(unknown_remote_dst_pools) # union
888 self.cache_known_dst_pools.update(unknown_dst_pools) # union
889 results: list[tuple[str, str]] = []
890 for src, dst in root_dataset_pairs:
891 if zpool(dst) in self.cache_existing_dst_pools:
892 results.append((src, dst))
893 else:
894 self.log.warning("Skipping dst dataset for which local dst pool does not exist: %s", dst)
895 return results
897 @staticmethod
898 def add_ssh_opts(
899 opts: list[str],
900 ssh_src_user: str | None = None,
901 ssh_dst_user: str | None = None,
902 ssh_src_port: int | None = None,
903 ssh_dst_port: int | None = None,
904 ssh_src_config_file: str | None = None,
905 ssh_dst_config_file: str | None = None,
906 ) -> None:
907 """Appends ssh related options to ``opts`` if specified."""
908 assert isinstance(opts, list)
909 opts += [f"--ssh-src-user={ssh_src_user}"] if ssh_src_user else []
910 opts += [f"--ssh-dst-user={ssh_dst_user}"] if ssh_dst_user else []
911 opts += [f"--ssh-src-port={ssh_src_port}"] if ssh_src_port is not None else []
912 opts += [f"--ssh-dst-port={ssh_dst_port}"] if ssh_dst_port is not None else []
913 opts += [f"--ssh-src-config-file={ssh_src_config_file}"] if ssh_src_config_file else []
914 opts += [f"--ssh-dst-config-file={ssh_dst_config_file}"] if ssh_dst_config_file else []
916 @staticmethod
917 def add_log_file_opts(opts: list[str], tag: str, job_id: str, job_run: str, logsuffix: str) -> None:
918 """Appends standard log-file CLI options to ``opts``."""
919 opts += [f"--log-file-prefix={PROG_NAME}{SEP}{tag}{SEP}"]
920 opts += [f"--log-file-infix={SEP}{job_id}"]
921 opts += [f"--log-file-suffix={SEP}{job_run}{logsuffix}{SEP}"]
923 def run_subjobs(
924 self,
925 subjobs: dict[str, list[str]],
926 max_workers: int,
927 timeout_secs: float | None,
928 work_period_seconds: float,
929 jitter: bool,
930 ) -> None:
931 """Executes subjobs sequentially or in parallel, respecting '~' barriers.
933 Design note on subjob failure, isolation and termination:
934 - On subjob failure the subjob's subtree is skipped.
935 - Subjob failures are converted to return codes (not exceptions) so the policy does not invoke a termination handler
936 on such failures and sibling subjobs continue unaffected. This preserves per-subjob isolation, both within a single
937 process (thread-per-subjob mode; default) as well as in process-per-subjob mode (``--spawn-process-per-job``).
938 """
939 self.stats = JobStats(len(subjobs))
940 log = self.log
941 num_intervals = 1 + len(subjobs) if jitter else len(subjobs)
942 interval_nanos = 0 if len(subjobs) == 0 else round(1_000_000_000 * max(0.0, work_period_seconds) / num_intervals)
943 assert interval_nanos >= 0
944 if jitter: # randomize job start time to avoid potential thundering herd problems in large distributed systems
945 sleep_nanos = random.SystemRandom().randint(0, interval_nanos)
946 log.info("Jitter: Delaying job start time by sleeping for %s ...", human_readable_duration(sleep_nanos))
947 self.termination_event.wait(sleep_nanos / 1_000_000_000) # allow early wakeup on async termination
948 sorted_subjobs: list[str] = sorted(subjobs.keys())
949 spawn_process_per_job: bool = self.spawn_process_per_job
950 log.log(LOG_TRACE, "%s: %s", "spawn_process_per_job", spawn_process_per_job)
951 if process_datasets_in_parallel_and_fault_tolerant(
952 log=log,
953 datasets=sorted_subjobs,
954 process_dataset=lambda subjob, tid, retry: self.run_subjob(
955 subjobs[subjob], name=subjob, timeout_secs=timeout_secs, spawn_process_per_job=spawn_process_per_job
956 )
957 == 0,
958 skip_tree_on_error=lambda subjob: True,
959 skip_on_error="dataset",
960 max_workers=max_workers,
961 interval_nanos=lambda last_update_nanos, dataset, submitted_count: interval_nanos,
962 termination_event=self.termination_event,
963 termination_handler=self.subprocesses.terminate_process_subtrees,
964 task_name="Subjob",
965 retry_policy=None, # no retries
966 dry_run=False,
967 is_test_mode=self.is_test_mode,
968 ):
969 self.first_exception = DIE_STATUS if self.first_exception is None else self.first_exception
970 stats = self.stats
971 jobs_skipped = stats.jobs_all - stats.jobs_started
972 msg = f"{stats}, skipped:" + percent(jobs_skipped, total=stats.jobs_all, print_total=True)
973 log.info("Final Progress: %s", msg)
974 assert stats.jobs_running == 0, msg
975 assert stats.jobs_completed == stats.jobs_started, msg
976 skipped_jobs_dict = {subjob: subjobs[subjob] for subjob in sorted_subjobs if subjob not in stats.started_job_names}
977 if len(skipped_jobs_dict) > 0:
978 log.debug("Skipped subjobs: \n%s", _pretty_print_formatter(skipped_jobs_dict))
979 assert jobs_skipped == len(skipped_jobs_dict), msg
981 def run_subjob(
982 self, cmd: list[str], name: str, timeout_secs: float | None, spawn_process_per_job: bool
983 ) -> int | None: # thread-safe
984 """Executes one worker job and updates shared Stats."""
985 start_time_nanos = time.monotonic_ns()
986 returncode = None
987 log = self.log
988 cmd_str = " ".join(cmd)
989 stats = self.stats
990 try:
991 msg: str = stats.submit_job(name)
992 log.log(LOG_TRACE, "Starting worker job: %s", cmd_str)
993 log.info("Progress: %s", msg)
994 start_time_nanos = time.monotonic_ns()
995 if spawn_process_per_job:
996 returncode = self.run_worker_job_spawn_process_per_job(cmd, timeout_secs)
997 else:
998 returncode = self.run_worker_job_in_current_thread(cmd, timeout_secs)
999 except BaseException as e:
1000 log.error("Worker job failed with unexpected exception: %s for command: %s", e, cmd_str)
1001 raise
1002 else:
1003 elapsed_human: str = human_readable_duration(time.monotonic_ns() - start_time_nanos)
1004 if returncode != 0:
1005 with stats.lock:
1006 if self.first_exception is None:
1007 self.first_exception = DIE_STATUS if returncode is None else returncode
1008 log.error("Worker job failed with exit code %s in %s: %s", returncode, elapsed_human, cmd_str)
1009 else:
1010 log.debug("Worker job succeeded in %s: %s", elapsed_human, cmd_str)
1011 return returncode
1012 finally:
1013 msg = stats.complete_job(failed=returncode != 0, elapsed_nanos=time.monotonic_ns() - start_time_nanos)
1014 log.info("Progress: %s", msg)
1016 def run_worker_job_in_current_thread(self, cmd: list[str], timeout_secs: float | None) -> int | None:
1017 """Runs ``bzfs`` in-process and return its exit code."""
1018 log = self.log
1019 if timeout_secs is not None:
1020 cmd = cmd[0:1] + [f"--timeout={round(1000 * timeout_secs)}milliseconds"] + cmd[1:]
1021 try:
1022 if not self.jobrunner_dryrun:
1023 self._bzfs_run_main(cmd)
1024 return 0
1025 except subprocess.CalledProcessError as e:
1026 return e.returncode
1027 except SystemExit as e:
1028 assert e.code is None or isinstance(e.code, int)
1029 return e.code
1030 except BaseException:
1031 log.exception("Worker job failed with unexpected exception for command: %s", " ".join(cmd))
1032 return DIE_STATUS
1034 def _bzfs_run_main(self, cmd: list[str]) -> None:
1035 """Delegates execution to :mod:`bzfs` using parsed arguments."""
1036 bzfs_job = bzfs.Job(termination_event=self.termination_event)
1037 bzfs_job.is_test_mode = self.is_test_mode
1038 bzfs_job.run_main(bzfs.argument_parser().parse_args(cmd[1:]), cmd)
1040 def run_worker_job_spawn_process_per_job(self, cmd: list[str], timeout_secs: float | None) -> int | None:
1041 """Spawns a subprocess for the worker job and waits for completion."""
1042 log = self.log
1043 if len(cmd) > 0 and cmd[0] == BZFS_PROG_NAME:
1044 cmd = [sys.executable, "-m", "bzfs_main." + cmd[0]] + cmd[1:]
1045 if self.jobrunner_dryrun:
1046 return 0
1047 proc = subprocess.Popen(cmd, stdin=subprocess.DEVNULL, text=True) # run job in a separate subprocess
1048 pid: int = proc.pid
1049 self.subprocesses.register_child_pid(pid)
1050 try:
1051 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to exit
1052 except subprocess.TimeoutExpired:
1053 cmd_str = " ".join(cmd)
1054 log.error("%s", f"Terminating worker job as it failed to complete within {timeout_secs}s: {cmd_str}")
1055 proc.terminate() # Sends SIGTERM signal to job subprocess
1056 assert isinstance(timeout_secs, float)
1057 timeout_secs = min(1.0, timeout_secs)
1058 try:
1059 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to exit
1060 except subprocess.TimeoutExpired:
1061 log.error("%s", f"Killing worker job as it failed to terminate within {timeout_secs}s: {cmd_str}")
1062 terminate_process_subtree(root_pids=[proc.pid]) # Send SIGTERM to process subtree
1063 proc.kill() # Sends SIGKILL signal to job subprocess because SIGTERM wasn't enough
1064 timeout_secs = min(0.025, timeout_secs)
1065 with contextlib.suppress(subprocess.TimeoutExpired):
1066 proc.communicate(timeout=timeout_secs) # Wait for the subprocess to exit
1067 finally:
1068 self.subprocesses.unregister_child_pid(pid)
1069 return proc.returncode
1071 def validate_src_hosts(self, src_hosts: list[str]) -> list[str]:
1072 """Checks ``src_hosts`` contains valid hostnames."""
1073 context = "--src-hosts"
1074 self.validate_type(src_hosts, list, context)
1075 for src_hostname in src_hosts:
1076 self.validate_host_name(src_hostname, context)
1077 return src_hosts
1079 def validate_dst_hosts(self, dst_hosts: dict[str, list[str]]) -> dict[str, list[str]]:
1080 """Checks destination hosts dictionary."""
1081 context = "--dst-hosts"
1082 self.validate_type(dst_hosts, dict, context)
1083 for dst_hostname, targets in dst_hosts.items():
1084 self.validate_host_name(dst_hostname, context)
1085 self.validate_type(targets, list, f"{context} targets")
1086 for target in targets:
1087 self.validate_type(target, str, f"{context} target")
1088 return dst_hosts
1090 def validate_dst_root_datasets(self, dst_root_datasets: dict[str, str]) -> dict[str, str]:
1091 """Checks that each destination root dataset string is valid."""
1092 context = "--dst-root-datasets"
1093 self.validate_type(dst_root_datasets, dict, context)
1094 for dst_hostname, dst_root_dataset in dst_root_datasets.items():
1095 self.validate_host_name(dst_hostname, context)
1096 self.validate_type(dst_root_dataset, str, f"{context} root dataset")
1097 return dst_root_datasets
1099 def validate_snapshot_plan(
1100 self, snapshot_plan: dict[str, dict[str, dict[str, int]]], context: str
1101 ) -> dict[str, dict[str, dict[str, int]]]:
1102 """Checks snapshot plan structure and value types."""
1103 self.validate_type(snapshot_plan, dict, context)
1104 for org, target_periods in snapshot_plan.items():
1105 self.validate_type(org, str, f"{context} org")
1106 self.validate_type(target_periods, dict, f"{context} target_periods")
1107 for target, periods in target_periods.items():
1108 self.validate_type(target, str, f"{context} org/target")
1109 self.validate_type(periods, dict, f"{context} org/periods")
1110 for period_unit, period_amount in periods.items():
1111 self.validate_non_empty_string(period_unit, f"{context} org/target/period_unit")
1112 self.validate_non_negative_int(period_amount, f"{context} org/target/period_amount")
1113 return snapshot_plan
1115 def validate_monitor_snapshot_plan(
1116 self, monitor_snapshot_plan: dict[str, dict[str, dict[str, dict[str, str | int]]]]
1117 ) -> dict[str, dict[str, dict[str, dict[str, str | int]]]]:
1118 """Checks snapshot monitoring plan configuration."""
1119 context = "--monitor-snapshot-plan"
1120 self.validate_type(monitor_snapshot_plan, dict, context)
1121 for org, target_periods in monitor_snapshot_plan.items():
1122 self.validate_type(org, str, f"{context} org")
1123 self.validate_type(target_periods, dict, f"{context} target_periods")
1124 for target, periods in target_periods.items():
1125 self.validate_type(target, str, f"{context} org/target")
1126 self.validate_type(periods, dict, f"{context} org/periods")
1127 for period_unit, alert_dict in periods.items():
1128 self.validate_non_empty_string(period_unit, f"{context} org/target/period_unit")
1129 self.validate_type(alert_dict, dict, f"{context} org/target/alert_dict")
1130 for key, value in alert_dict.items():
1131 self.validate_non_empty_string(key, f"{context} org/target/alert_dict/key")
1132 self.validate_type(value, Union[str, int], f"{context} org/target/alert_dict/value")
1133 return monitor_snapshot_plan
1135 def validate_is_subset(self, x: Iterable[str], y: Iterable[str], x_name: str, y_name: str) -> None:
1136 """Raises error if ``x`` contains an item not present in ``y``."""
1137 if isinstance(x, str) or not isinstance(x, Iterable):
1138 self.die(f"{x_name} must be an Iterable")
1139 if isinstance(y, str) or not isinstance(y, Iterable):
1140 self.die(f"{y_name} must be an Iterable")
1141 if not set(x).issubset(set(y)):
1142 diff = sorted(set(x).difference(set(y)))
1143 self.die(f"{x_name} must be a subset of {y_name}. diff: {diff}, {x_name}: {sorted(x)}, {y_name}: {sorted(y)}")
1145 def validate_host_name(self, hostname: str, context: str) -> None:
1146 """Checks host name string."""
1147 self.validate_non_empty_string(hostname, f"{context} hostname")
1148 bzfs.validate_host_name(hostname, context)
1150 def validate_non_empty_string(self, value: str, name: str) -> None:
1151 """Checks that ``value`` is a non-empty string."""
1152 self.validate_type(value, str, name)
1153 if not value:
1154 self.die(f"{name} must not be empty!")
1156 def validate_non_negative_int(self, value: int, name: str) -> None:
1157 """Checks ``value`` is an int >= 0."""
1158 self.validate_type(value, int, name)
1159 if value < 0:
1160 self.die(f"{name} must be a non-negative integer: {value}")
1162 def validate_true(self, expr: Any, msg: str) -> None:
1163 """Raises error if ``expr`` evaluates to ``False``."""
1164 if not bool(expr):
1165 self.die(msg)
1167 def validate_type(self, value: Any, expected_type: Any, name: str) -> None:
1168 """Checks ``value`` is instance of ``expected_type`` or union thereof."""
1169 if hasattr(expected_type, "__origin__") and expected_type.__origin__ is Union: # for compat with python < 3.10
1170 union_types = expected_type.__args__
1171 for t in union_types:
1172 if isinstance(value, t):
1173 return
1174 type_msg = " or ".join([t.__name__ for t in union_types])
1175 self.die(f"{name} must be of type {type_msg} but got {type(value).__name__}: {value}")
1176 elif not isinstance(value, expected_type):
1177 self.die(f"{name} must be of type {expected_type.__name__} but got {type(value).__name__}: {value}")
1179 def parse_src_hosts_from_cli_or_stdin(self, raw_src_hosts: str | None) -> list:
1180 """Resolve --src-hosts from CLI or stdin with robust TTY/empty handling."""
1181 if raw_src_hosts is None:
1182 # If stdin is an interactive TTY, don't block waiting for input; fail clearly instead
1183 try:
1184 is_tty: bool = bool(getattr(sys.stdin, "isatty", lambda: False)())
1185 except Exception:
1186 is_tty = False
1187 if is_tty:
1188 self.die("Missing --src-hosts and stdin is a TTY. Provide --src-hosts or pipe the list on stdin.")
1189 stdin_text: str = sys.stdin.read()
1190 if not stdin_text.strip(): # avoid literal_eval("") SyntaxError and provide a clear message
1191 self.die("Missing --src-hosts and stdin is empty. Provide --src-hosts or pipe a list on stdin.")
1192 raw_src_hosts = stdin_text
1193 try:
1194 value = literal_eval(raw_src_hosts)
1195 except Exception as e:
1196 self.die(f"Invalid --src-hosts format: {e} for input: {raw_src_hosts}")
1197 if not isinstance(value, list):
1198 example: str = format_obj(["hostname1", "hostname2"])
1199 self.die(f"Invalid --src-hosts: expected a Python list literal, e.g. {example} but got: {format_obj(value)}")
1200 return value
1202 def die(self, msg: str) -> NoReturn:
1203 """Log ``msg`` and exit the program."""
1204 self.log.error("%s", msg)
1205 bzfs_main.utils.die(msg)
1207 def get_localhost_ips(self) -> set[str]:
1208 """Returns all network addresses of the local host, i.e. all configured addresses on all network interfaces, without
1209 depending on name resolution."""
1210 ips: set[str] = set()
1211 if platform.system() == "Linux":
1212 try:
1213 proc = subprocess.run(["hostname", "-I"], stdin=DEVNULL, stdout=PIPE, text=True, check=True) # noqa: S607
1214 except Exception as e:
1215 self.log.warning("Cannot run 'hostname -I' on localhost: %s", e)
1216 else:
1217 ips = {ip for ip in proc.stdout.strip().split() if ip}
1218 self.log.log(LOG_TRACE, "localhost_ips: %s", sorted(ips))
1219 return ips
1222#############################################################################
1223class RejectArgumentAction(argparse.Action):
1224 """An argparse Action that immediately fails if it is ever triggered."""
1226 def __call__(
1227 self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Any, option_string: str | None = None
1228 ) -> None:
1229 """Abort argument parsing if a protected option is seen."""
1230 parser.error(f"Security: Overriding protected argument '{option_string}' is not allowed.")
1233#############################################################################
1234def _dedupe(root_dataset_pairs: list[tuple[str, str]]) -> list[tuple[str, str]]:
1235 """Returns a list with duplicate dataset pairs removed while preserving order."""
1236 return list(dict.fromkeys(root_dataset_pairs))
1239T = TypeVar("T")
1242def _flatten(root_dataset_pairs: Iterable[Iterable[T]]) -> list[T]:
1243 """Flattens an iterable of pairs into a single list."""
1244 return [item for pair in root_dataset_pairs for item in pair]
1247def _sanitize(filename: str) -> str:
1248 """Replaces potentially problematic characters in ``filename`` with '!'."""
1249 for s in (" ", "..", "/", "\\", SEP):
1250 filename = filename.replace(s, "!")
1251 return filename
1254def _log_suffix(localhostname: str, src_hostname: str, dst_hostname: str) -> str:
1255 """Returns a log file suffix in a format that contains the given hostnames."""
1256 return f"{SEP}{_sanitize(localhostname)}{SEP}{_sanitize(src_hostname)}{SEP}{_sanitize(dst_hostname)}"
1259def _pretty_print_formatter(dictionary: dict[str, Any]) -> Any:
1260 """Lazy JSON formatter used to avoid overhead in disabled log levels."""
1262 class PrettyPrintFormatter:
1263 """Wrapper returning formatted JSON on ``str`` conversion."""
1265 def __str__(self) -> str:
1266 import json
1268 return json.dumps(dictionary, indent=4, sort_keys=True)
1270 return PrettyPrintFormatter()
1273def _detect_loopback_address() -> str:
1274 """Detects if a loopback connection over IPv4 or IPv6 is possible."""
1275 try:
1276 addr = "127.0.0.1"
1277 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
1278 s.bind((addr, 0))
1279 return addr
1280 except OSError:
1281 pass
1283 try:
1284 addr = "::1"
1285 with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
1286 s.bind((addr, 0))
1287 return addr
1288 except OSError:
1289 pass
1291 return ""
1294def convert_ipv6(hostname: str) -> str:
1295 """Supports IPv6 without getting confused by host:dataset colon separator and any colons that may be part of a (valid)
1296 ZFS dataset name."""
1297 return hostname.replace(":", "|") # Also see bzfs.convert_ipv6() for the reverse conversion
1300#############################################################################
1301if __name__ == "__main__": 1301 ↛ 1302line 1301 didn't jump to line 1302 because the condition on line 1301 was never true
1302 main()