Coverage for bzfs_main / detect.py: 97%
216 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Detection of ZFS features and system capabilities on local and remote hosts."""
17from __future__ import (
18 annotations,
19)
20import re
21import subprocess
22import sys
23import threading
24import time
25from dataclasses import (
26 dataclass,
27 field,
28)
29from subprocess import (
30 DEVNULL,
31 PIPE,
32)
33from typing import (
34 TYPE_CHECKING,
35 Final,
36 final,
37)
39from bzfs_main.util.connection import (
40 DEDICATED,
41 SHARED,
42 ConnectionPools,
43)
44from bzfs_main.util.utils import (
45 LOG_TRACE,
46 PROG_NAME,
47 SynchronousExecutor,
48 die,
49 drain,
50 list_formatter,
51 stderr_to_str,
52 xprint,
53)
55if TYPE_CHECKING: # pragma: no cover - for type hints only
56 from bzfs_main.bzfs import (
57 Job,
58 )
59 from bzfs_main.configuration import (
60 Params,
61 Remote,
62 )
64# constants:
65DISABLE_PRG: Final[str] = "-"
66DUMMY_DATASET: Final[str] = "dummy"
67ZFS_VERSION_IS_AT_LEAST_2_1_0: Final[str] = "zfs>=2.1.0"
68ZFS_VERSION_IS_AT_LEAST_2_2_0: Final[str] = "zfs>=2.2.0"
71#############################################################################
72@dataclass(frozen=True)
73@final
74class RemoteConfCacheItem:
75 """Caches detected programs, zpool features and connection pools, per remote."""
77 connection_pools: ConnectionPools
78 available_programs: dict[str, str]
79 zpool_features: dict[str, dict[str, str]]
80 timestamp_nanos: int = field(default_factory=time.monotonic_ns)
83def detect_available_programs(job: Job) -> None:
84 """Detects programs, zpool features and connection pools for local and remote hosts."""
85 p = params = job.params
86 log = p.log
87 available_programs: dict[str, dict[str, str]] = params.available_programs
88 if "local" not in available_programs:
89 cmd: list[str] = [p.shell_program_local, "-c", _find_available_programs(p)]
90 sp = job.subprocesses
91 proc = sp.subprocess_run(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, text=True, log=log)
92 xprint(log=log, value=stderr_to_str(proc.stderr), file=sys.stderr, end="")
93 stdout: str = proc.stdout
94 available_programs["local"] = dict.fromkeys(stdout.splitlines(), "")
95 cmd = [p.shell_program_local, "-c", "exit"]
96 proc = sp.subprocess_run(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, text=True, log=log)
97 xprint(log=log, value=stderr_to_str(proc.stderr), file=sys.stderr, end="")
98 if proc.returncode != 0:
99 _disable_program(p, "sh", ["local"])
101 todo: list[Remote] = []
102 for r in [p.dst, p.src]:
103 loc: str = r.location
104 remote_conf_cache_key: tuple = r.cache_key()
105 cache_item: RemoteConfCacheItem | None = job.remote_conf_cache.get(remote_conf_cache_key)
106 if cache_item is not None:
107 # startup perf: cache avoids ssh connect setup and feature detection roundtrips on revisits to same site
108 p.connection_pools[loc] = cache_item.connection_pools
109 p.available_programs[loc] = cache_item.available_programs
110 p.zpool_features[loc] = cache_item.zpool_features
111 if time.monotonic_ns() - cache_item.timestamp_nanos < p.remote_conf_cache_ttl_nanos:
112 if r.pool in cache_item.zpool_features: 112 ↛ 120line 112 didn't jump to line 120 because the condition on line 112 was always true
113 continue # cache hit, skip remote detection
114 else:
115 p.zpool_features[loc] = {} # cache miss, invalidate features of zpools before refetching from remote
116 else:
117 p.connection_pools[loc] = ConnectionPools(
118 r, {SHARED: r.max_concurrent_ssh_sessions_per_tcp_connection, DEDICATED: 1}
119 )
120 todo.append(r)
122 lock: threading.Lock = threading.Lock()
124 def run_detect(r: Remote) -> None: # thread-safe
125 loc: str = r.location
126 remote_conf_cache_key: tuple = r.cache_key()
127 available_programs: dict[str, str] = _detect_available_programs_remote(job, r, r.ssh_user_host)
128 zpool_features: dict[str, str] = _detect_zpool_features(job, r, available_programs)
129 with lock:
130 r.params.available_programs[loc] = available_programs
131 r.params.zpool_features[loc][r.pool] = zpool_features
132 job.remote_conf_cache[remote_conf_cache_key] = RemoteConfCacheItem(
133 p.connection_pools[loc], available_programs, r.params.zpool_features[loc]
134 )
135 if r.use_zfs_delegation and zpool_features.get("delegation") == "off":
136 die(
137 f"Permission denied as ZFS delegation is disabled for {r.location} "
138 f"dataset: {r.basis_root_dataset}. Manually enable it via 'sudo zpool set delegation=on {r.pool}'"
139 )
141 with SynchronousExecutor.executor_for(max_workers=max(1, len(todo))) as executor:
142 drain(executor.map(run_detect, todo)) # detect ZFS features + system capabilities on src+dst in parallel
144 locations = ["src", "dst", "local"]
145 if params.compression_program == DISABLE_PRG:
146 _disable_program(p, "zstd", locations)
147 if params.mbuffer_program == DISABLE_PRG:
148 _disable_program(p, "mbuffer", locations)
149 if params.ps_program == DISABLE_PRG:
150 _disable_program(p, "ps", locations)
151 if params.pv_program == DISABLE_PRG:
152 _disable_program(p, "pv", locations)
153 if params.shell_program == DISABLE_PRG:
154 _disable_program(p, "sh", locations)
155 if params.sudo_program == DISABLE_PRG:
156 _disable_program(p, "sudo", locations)
157 if params.zpool_program == DISABLE_PRG:
158 _disable_program(p, "zpool", locations)
160 for key, programs in available_programs.items():
161 for program in list(programs.keys()):
162 if program.startswith("uname-"):
163 # uname-Linux foo 5.15.0-69-generic #76-Ubuntu SMP Fri Mar 17 17:19:29 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux
164 # uname-FreeBSD freebsd 14.1-RELEASE FreeBSD 14.1-RELEASE releng/14.1-n267679-10e31f0946d8 GENERIC amd64
165 # uname-Darwin foo 23.6.0 Darwin Kernel Version 23.6.0: Mon Jul 29 21:13:04 PDT 2024; root:xnu-10063.141.2~1/RELEASE_ARM64_T6020 arm64
166 programs.pop(program)
167 uname: str = program[len("uname-") :]
168 programs["uname"] = uname
169 log.log(LOG_TRACE, f"available_programs[{key}][uname]: %s", uname)
170 programs["os"] = uname.split(" ", maxsplit=1)[0] # Linux|FreeBSD|Darwin
171 log.log(LOG_TRACE, f"available_programs[{key}][os]: %s", programs["os"])
172 elif program.startswith("default_shell-"):
173 programs.pop(program)
174 default_shell: str = program[len("default_shell-") :]
175 programs["default_shell"] = default_shell
176 log.log(LOG_TRACE, f"available_programs[{key}][default_shell]: %s", default_shell)
177 ssh_user_host = p.src.ssh_user_host if key == "src" else p.dst.ssh_user_host if key == "dst" else ""
178 _validate_default_shell(default_shell, key, ssh_user_host)
179 elif program.startswith("getconf_cpu_count-"):
180 programs.pop(program)
181 getconf_cpu_count: str = program[len("getconf_cpu_count-") :]
182 programs["getconf_cpu_count"] = getconf_cpu_count
183 log.log(LOG_TRACE, f"available_programs[{key}][getconf_cpu_count]: %s", getconf_cpu_count)
185 for key, programs in available_programs.items():
186 log.debug(f"available_programs[{key}]: %s", list_formatter(programs, separator=", "))
188 for r in [p.dst, p.src]:
189 if is_dummy(r):
190 continue
191 if r.sudo and not p.is_program_available("sudo", r.location):
192 die(f"{p.sudo_program} CLI is not available on {r.location} host: {r.ssh_user_host or 'localhost'}")
194 if (
195 len(p.args.preserve_properties) > 0
196 and any(prop in p.zfs_send_program_opts for prop in ["--props", "-p"])
197 and not p.is_program_available(ZFS_VERSION_IS_AT_LEAST_2_2_0, p.dst.location)
198 ):
199 die(
200 "Cowardly refusing to proceed as --preserve-properties is unreliable on destination ZFS < 2.2.0 when using "
201 "'zfs send --props'. Either upgrade destination ZFS, or remove '--props' from --zfs-send-program-opt(s)."
202 )
205def _disable_program(p: Params, program: str, locations: list[str]) -> None:
206 """Removes the given program from the available_programs mapping."""
207 for location in locations:
208 p.available_programs[location].pop(program, None)
211def _find_available_programs(p: Params) -> str:
212 """POSIX shell script that checks for the existence of various programs; It uses `if` statements instead of `&&` plus
213 `printf` instead of `echo` to ensure maximum compatibility across shells."""
214 cmds: list[str] = []
215 cmds.append("printf 'default_shell-%s\n' \"$SHELL\"")
216 cmds.append("if command -v echo > /dev/null; then printf 'echo\n'; fi")
217 cmds.append(f"if command -v {p.zpool_program} > /dev/null; then printf 'zpool\n'; fi")
218 cmds.append(f"if command -v {p.ssh_program} > /dev/null; then printf 'ssh\n'; fi")
219 cmds.append(f"if command -v {p.shell_program} > /dev/null; then printf 'sh\n'; fi")
220 cmds.append(f"if command -v {p.sudo_program} > /dev/null; then printf 'sudo\n'; fi")
221 cmds.append(f"if command -v {p.compression_program} > /dev/null; then printf 'zstd\n'; fi")
222 cmds.append(f"if command -v {p.mbuffer_program} > /dev/null; then printf 'mbuffer\n'; fi")
223 cmds.append(f"if command -v {p.pv_program} > /dev/null; then printf 'pv\n'; fi")
224 cmds.append(f"if command -v {p.ps_program} > /dev/null; then printf 'ps\n'; fi")
225 cmds.append(
226 f"if command -v {p.getconf_program} > /dev/null; then "
227 f"printf 'getconf_cpu_count-'; {p.getconf_program} _NPROCESSORS_ONLN; "
228 "fi"
229 )
230 cmds.append(f"if command -v {p.uname_program} > /dev/null; then printf 'uname-'; {p.uname_program} -a || true; fi")
231 return "; ".join(cmds)
234def _detect_available_programs_remote(job: Job, remote: Remote, ssh_user_host: str) -> dict[str, str]:
235 """Detects CLI tools available on ``remote`` and updates mapping correspondingly."""
236 p, log = job.params, job.params.log
237 location = remote.location
238 available_programs_minimum = {"sudo": ""}
239 available_programs: dict[str, str] = {}
240 if is_dummy(remote):
241 return available_programs
242 lines: str | None = None
243 try:
244 # on Linux, 'zfs --version' returns with zero status and prints the correct info
245 # on FreeBSD, 'zfs --version' always prints the same (correct) info as Linux, but nonetheless sometimes
246 # returns with non-zero status (sometimes = if the zfs kernel module is not loaded)
247 lines = job.run_ssh_command_with_retries(remote, LOG_TRACE, print_stderr=False, cmd=[p.zfs_program, "--version"])
248 assert lines
249 except (FileNotFoundError, PermissionError): # location is local and program file was not found
250 die(f"{p.zfs_program} CLI is not available on {location} host: {ssh_user_host or 'localhost'}")
251 except subprocess.CalledProcessError as e:
252 stderr: str = stderr_to_str(e.stderr)
253 stdout: str = stderr_to_str(e.stdout)
254 if "unrecognized command '--version'" in stderr and "run: zfs help" in stderr:
255 die(f"Unsupported ZFS platform: {stderr}") # solaris is unsupported
256 elif stderr.startswith("ssh: "):
257 assert e.returncode == 255, e.returncode # error within SSH itself (not during the remote command)
258 die(f"ssh exit code {e.returncode}: {stderr.rstrip()}")
259 elif not stdout.startswith("zfs"): 259 ↛ 262line 259 didn't jump to line 262 because the condition on line 259 was always true
260 die(f"{p.zfs_program} CLI is not available on {location} host: {ssh_user_host or 'localhost'}")
261 else:
262 lines = stdout # FreeBSD if the zfs kernel module is not loaded
263 assert lines
264 if lines: 264 ↛ 276line 264 didn't jump to line 276 because the condition on line 264 was always true
265 # Examples that should parse: "zfs-2.1.5~rc5-ubuntu3", "zfswin-2.2.3rc5"
266 first_line: str = lines.splitlines()[0] if lines.splitlines() else ""
267 match = re.search(r"(\d+)\.(\d+)\.(\d+)", first_line)
268 if not match:
269 die("Unparsable zfs version string: '" + first_line + "'")
270 version = ".".join(match.groups())
271 available_programs["zfs"] = version
272 if is_version_at_least(version, "2.1.0"):
273 available_programs[ZFS_VERSION_IS_AT_LEAST_2_1_0] = ""
274 if is_version_at_least(version, "2.2.0"):
275 available_programs[ZFS_VERSION_IS_AT_LEAST_2_2_0] = ""
276 log.log(LOG_TRACE, f"available_programs[{location}][zfs]: %s", available_programs["zfs"])
278 if p.shell_program != DISABLE_PRG:
279 try:
280 cmd: list[str] = [p.shell_program, "-c", _find_available_programs(p)]
281 stdout = job.run_ssh_command_with_retries(remote, LOG_TRACE, cmd=cmd)
282 available_programs.update(dict.fromkeys(stdout.splitlines(), ""))
283 return available_programs
284 except (FileNotFoundError, PermissionError) as e: # location is local and shell program file was not found
285 if e.filename != p.shell_program: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true
286 raise
287 except subprocess.CalledProcessError:
288 pass
289 log.warning("%s", f"Failed to find {p.shell_program} on {location}. Continuing with minimal assumptions...")
290 available_programs.update(available_programs_minimum)
291 return available_programs
294def is_dummy(r: Remote) -> bool:
295 """Returns True if ``remote`` refers to the synthetic dummy dataset."""
296 return r.root_dataset == DUMMY_DATASET
299def _detect_zpool_features(job: Job, remote: Remote, available_programs: dict) -> dict[str, str]:
300 """Fills ``job.params.zpool_features`` with detected zpool capabilities."""
301 p = params = job.params
302 r, loc, log = remote, remote.location, p.log
303 lines: list[str] = []
304 features: dict[str, str] = {}
305 if is_dummy(r):
306 return {}
307 if params.zpool_program != DISABLE_PRG and (params.shell_program == DISABLE_PRG or "zpool" in available_programs):
308 cmd: list[str] = params.split_args(f"{params.zpool_program} get -Hp -o property,value all", r.pool)
309 try:
310 lines = job.run_ssh_command_with_retries(remote, LOG_TRACE, check=False, cmd=cmd).splitlines()
311 except (FileNotFoundError, PermissionError) as e:
312 if e.filename != params.zpool_program: 312 ↛ 313line 312 didn't jump to line 313 because the condition on line 312 was never true
313 raise
314 log.warning("%s", f"Failed to detect zpool features on {loc}: {r.pool}. Continuing with minimal assumptions ...")
315 else:
316 props: dict[str, str] = dict(line.split("\t", 1) for line in lines)
317 features = {k: v for k, v in props.items() if k.startswith("feature@") or k == "delegation"}
318 if len(lines) == 0:
319 cmd = p.split_args(f"{p.zfs_program} list -t filesystem -Hp -o name -s name", r.pool)
320 if job.try_ssh_command_with_retries(remote, LOG_TRACE, cmd=cmd) is None:
321 die(f"Pool does not exist for {loc} dataset: {r.basis_root_dataset}. Manually create the pool first!")
322 return features
325def is_zpool_feature_enabled_or_active(p: Params, remote: Remote, feature: str) -> bool:
326 """Returns True if the given zpool feature is active or enabled on ``remote``."""
327 return p.zpool_features[remote.location][remote.pool].get(feature) in ("active", "enabled")
330def are_bookmarks_enabled(p: Params, remote: Remote) -> bool:
331 """Checks if bookmark related features are enabled on ``remote``."""
332 return is_zpool_feature_enabled_or_active(p, remote, "feature@bookmark_v2") and is_zpool_feature_enabled_or_active(
333 p, remote, "feature@bookmark_written"
334 )
337def is_caching_snapshots(p: Params, remote: Remote) -> bool:
338 """Returns True if snapshot caching is supported and enabled on ``remote``."""
339 return p.is_caching_snapshots and p.is_program_available(ZFS_VERSION_IS_AT_LEAST_2_2_0, remote.location)
342def is_version_at_least(version_str: str, min_version_str: str) -> bool:
343 """Checks if the version string is at least the minimum version string."""
344 return tuple(map(int, version_str.split("."))) >= tuple(map(int, min_version_str.split(".")))
347def _validate_default_shell(path_to_default_shell: str, location: str, ssh_user_host: str) -> None:
348 """Fails if the remote user uses csh or tcsh as the default shell."""
349 if path_to_default_shell in ("csh", "tcsh") or path_to_default_shell.endswith(("/csh", "/tcsh")):
350 # On some old FreeBSD systems the default shell is still csh. Also see https://www.grymoire.com/unix/CshTop10.txt
351 die(
352 f"Cowardly refusing to proceed because {PROG_NAME} is not compatible with csh-style quoting of special "
353 f"characters. The safe workaround is to first manually set 'sh' instead of '{path_to_default_shell}' as "
354 f"the default shell of the Unix user on {location} host: {ssh_user_host or 'localhost'}, like so: "
355 "chsh -s /bin/sh <YOURUSERNAME>"
356 )