Coverage for bzfs_main / detect.py: 97%

218 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-29 12:49 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Detection of ZFS features and system capabilities on local and remote hosts.""" 

16 

17from __future__ import ( 

18 annotations, 

19) 

20import re 

21import subprocess 

22import sys 

23import threading 

24import time 

25from dataclasses import ( 

26 dataclass, 

27 field, 

28) 

29from subprocess import ( 

30 DEVNULL, 

31 PIPE, 

32) 

33from typing import ( 

34 TYPE_CHECKING, 

35 Final, 

36 final, 

37) 

38 

39from bzfs_main.util.connection import ( 

40 DEDICATED, 

41 SHARED, 

42 ConnectionPools, 

43) 

44from bzfs_main.util.utils import ( 

45 LOG_TRACE, 

46 PROG_NAME, 

47 SynchronousExecutor, 

48 die, 

49 drain, 

50 list_formatter, 

51 stderr_to_str, 

52 xprint, 

53) 

54 

55if TYPE_CHECKING: # pragma: no cover - for type hints only 

56 from bzfs_main.bzfs import ( 

57 Job, 

58 ) 

59 from bzfs_main.configuration import ( 

60 Params, 

61 Remote, 

62 ) 

63 

64# constants: 

65DISABLE_PRG: Final[str] = "-" 

66DUMMY_DATASET: Final[str] = "dummy" 

67ZFS_VERSION_IS_AT_LEAST_2_1_0: Final[str] = "zfs>=2.1.0" 

68ZFS_VERSION_IS_AT_LEAST_2_2_0: Final[str] = "zfs>=2.2.0" 

69 

70 

71############################################################################# 

72@dataclass(frozen=True) 

73@final 

74class RemoteConfCacheItem: 

75 """Caches detected programs, zpool features and connection pools, per remote.""" 

76 

77 connection_pools: ConnectionPools 

78 available_programs: dict[str, str] 

79 zpool_features: dict[str, dict[str, str]] 

80 timestamp_nanos: int = field(default_factory=time.monotonic_ns) 

81 

82 

83def detect_available_programs(job: Job) -> None: 

84 """Detects programs, zpool features and connection pools for local and remote hosts.""" 

85 p = params = job.params 

86 log = p.log 

87 available_programs: dict[str, dict[str, str]] = params.available_programs 

88 if "local" not in available_programs: 

89 cmd: list[str] = [p.shell_program_local, "-c", _find_available_programs(p)] 

90 sp = job.subprocesses 

91 proc = sp.subprocess_run(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, text=True, log=log) 

92 xprint(log=log, value=stderr_to_str(proc.stderr), file=sys.stderr, end="") 

93 stdout: str = proc.stdout 

94 available_programs["local"] = dict.fromkeys(stdout.splitlines(), "") 

95 cmd = [p.shell_program_local, "-c", "exit"] 

96 proc = sp.subprocess_run(cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, text=True, log=log) 

97 xprint(log=log, value=stderr_to_str(proc.stderr), file=sys.stderr, end="") 

98 if proc.returncode != 0: 

99 _disable_program(p, "sh", ["local"]) 

100 

101 todo: list[Remote] = [] 

102 for r in [p.dst, p.src]: 

103 loc: str = r.location 

104 remote_conf_cache_key: tuple = r.cache_key() 

105 cache_item: RemoteConfCacheItem | None = job.remote_conf_cache.get(remote_conf_cache_key) 

106 if cache_item is not None: 

107 # startup perf: cache avoids ssh connect setup and feature detection roundtrips on revisits to same site 

108 p.connection_pools[loc] = cache_item.connection_pools 

109 p.available_programs[loc] = cache_item.available_programs 

110 p.zpool_features[loc] = cache_item.zpool_features 

111 if time.monotonic_ns() - cache_item.timestamp_nanos < p.remote_conf_cache_ttl_nanos: 

112 if r.pool in cache_item.zpool_features: 112 ↛ 120line 112 didn't jump to line 120 because the condition on line 112 was always true

113 continue # cache hit, skip remote detection 

114 else: 

115 p.zpool_features[loc] = {} # cache miss, invalidate features of zpools before refetching from remote 

116 else: 

117 p.connection_pools[loc] = ConnectionPools( 

118 remote=r, capacities={SHARED: r.max_concurrent_ssh_sessions_per_tcp_connection, DEDICATED: 1} 

119 ) 

120 todo.append(r) 

121 

122 lock: threading.Lock = threading.Lock() 

123 

124 def run_detect(r: Remote) -> None: # thread-safe 

125 loc: str = r.location 

126 remote_conf_cache_key: tuple = r.cache_key() 

127 available_programs: dict[str, str] = _detect_available_programs_remote(job, r, r.ssh_user_host) 

128 zpool_features: dict[str, str] = _detect_zpool_features(job, r, available_programs) 

129 with lock: 

130 r.params.available_programs[loc] = available_programs 

131 r.params.zpool_features[loc][r.pool] = zpool_features 

132 job.remote_conf_cache[remote_conf_cache_key] = RemoteConfCacheItem( 

133 p.connection_pools[loc], available_programs, r.params.zpool_features[loc] 

134 ) 

135 if r.use_zfs_delegation and zpool_features.get("delegation") == "off": 

136 die( 

137 f"Permission denied as ZFS delegation is disabled for {r.location} " 

138 f"dataset: {r.basis_root_dataset}. Manually enable it via 'sudo zpool set delegation=on {r.pool}'" 

139 ) 

140 

141 with SynchronousExecutor.executor_for(max_workers=max(1, len(todo))) as executor: 

142 drain(executor.map(run_detect, todo)) # detect ZFS features + system capabilities on src+dst in parallel 

143 

144 locations = ["src", "dst", "local"] 

145 if params.compression_program == DISABLE_PRG: 

146 _disable_program(p, "zstd", locations) 

147 if params.mbuffer_program == DISABLE_PRG: 

148 _disable_program(p, "mbuffer", locations) 

149 if params.ps_program == DISABLE_PRG: 

150 _disable_program(p, "ps", locations) 

151 if params.pv_program == DISABLE_PRG: 

152 _disable_program(p, "pv", locations) 

153 if params.shell_program == DISABLE_PRG: 

154 _disable_program(p, "sh", locations) 

155 if params.sudo_program == DISABLE_PRG: 

156 _disable_program(p, "sudo", locations) 

157 if params.zpool_program == DISABLE_PRG: 

158 _disable_program(p, "zpool", locations) 

159 

160 for key, programs in available_programs.items(): 

161 for program in list(programs.keys()): 

162 if program.startswith("uname-"): 

163 # uname-Linux foo 5.15.0-69-generic #76-Ubuntu SMP Fri Mar 17 17:19:29 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux 

164 # uname-FreeBSD freebsd 14.1-RELEASE FreeBSD 14.1-RELEASE releng/14.1-n267679-10e31f0946d8 GENERIC amd64 

165 # uname-Darwin foo 23.6.0 Darwin Kernel Version 23.6.0: Mon Jul 29 21:13:04 PDT 2024; root:xnu-10063.141.2~1/RELEASE_ARM64_T6020 arm64 

166 programs.pop(program) 

167 uname: str = program[len("uname-") :] 

168 programs["uname"] = uname 

169 log.log(LOG_TRACE, f"available_programs[{key}][uname]: %s", uname) 

170 programs["os"] = uname.split(" ", maxsplit=1)[0] # Linux|FreeBSD|Darwin 

171 log.log(LOG_TRACE, f"available_programs[{key}][os]: %s", programs["os"]) 

172 elif program.startswith("default_shell-"): 

173 programs.pop(program) 

174 default_shell: str = program[len("default_shell-") :] 

175 programs["default_shell"] = default_shell 

176 log.log(LOG_TRACE, f"available_programs[{key}][default_shell]: %s", default_shell) 

177 ssh_user_host = p.src.ssh_user_host if key == "src" else p.dst.ssh_user_host if key == "dst" else "" 

178 if ssh_user_host: 

179 _validate_default_shell(default_shell, key, ssh_user_host) 

180 elif program.startswith("getconf_cpu_count-"): 

181 programs.pop(program) 

182 getconf_cpu_count: str = program[len("getconf_cpu_count-") :] 

183 programs["getconf_cpu_count"] = getconf_cpu_count 

184 log.log(LOG_TRACE, f"available_programs[{key}][getconf_cpu_count]: %s", getconf_cpu_count) 

185 

186 for key, programs in available_programs.items(): 

187 log.debug(f"available_programs[{key}]: %s", list_formatter(programs, separator=", ")) 

188 

189 for r in [p.dst, p.src]: 

190 if is_dummy(r): 

191 continue 

192 if r.sudo and not p.is_program_available("sudo", r.location): 

193 die(f"{p.sudo_program} CLI is not available on {r.location} host: {r.ssh_user_host or 'localhost'}") 

194 

195 if ( 

196 len(p.args.preserve_properties) > 0 

197 and any(prop in p.zfs_send_program_opts for prop in ["--props", "-p"]) 

198 and not p.is_program_available(ZFS_VERSION_IS_AT_LEAST_2_2_0, p.dst.location) 

199 ): 

200 die( 

201 "Cowardly refusing to proceed as --preserve-properties is unreliable on destination ZFS < 2.2.0 when using " 

202 "'zfs send --props'. Either upgrade destination ZFS, or remove '--props' from --zfs-send-program-opt(s)." 

203 ) 

204 

205 

206def _disable_program(p: Params, program: str, locations: list[str]) -> None: 

207 """Removes the given program from the available_programs mapping.""" 

208 for location in locations: 

209 p.available_programs[location].pop(program, None) 

210 

211 

212def _find_available_programs(p: Params) -> str: 

213 """POSIX shell script that checks for the existence of various programs; It uses `if` statements instead of `&&` plus 

214 `printf` instead of `echo` to ensure maximum compatibility across shells.""" 

215 cmds: list[str] = [] 

216 cmds.append("printf 'default_shell-%s\n' \"$SHELL\"") 

217 cmds.append("if command -v echo > /dev/null; then printf 'echo\n'; fi") 

218 cmds.append(f"if command -v {p.zpool_program} > /dev/null; then printf 'zpool\n'; fi") 

219 cmds.append(f"if command -v {p.ssh_program} > /dev/null; then printf 'ssh\n'; fi") 

220 cmds.append(f"if command -v {p.shell_program} > /dev/null; then printf 'sh\n'; fi") 

221 cmds.append(f"if command -v {p.sudo_program} > /dev/null; then printf 'sudo\n'; fi") 

222 cmds.append(f"if command -v {p.compression_program} > /dev/null; then printf 'zstd\n'; fi") 

223 cmds.append(f"if command -v {p.mbuffer_program} > /dev/null; then printf 'mbuffer\n'; fi") 

224 cmds.append(f"if command -v {p.pv_program} > /dev/null; then printf 'pv\n'; fi") 

225 cmds.append(f"if command -v {p.ps_program} > /dev/null; then printf 'ps\n'; fi") 

226 cmds.append( 

227 f"if command -v {p.getconf_program} > /dev/null; then " 

228 f"printf 'getconf_cpu_count-'; {p.getconf_program} _NPROCESSORS_ONLN; " 

229 "fi" 

230 ) 

231 cmds.append(f"if command -v {p.uname_program} > /dev/null; then printf 'uname-'; {p.uname_program} -a || true; fi") 

232 return "; ".join(cmds) 

233 

234 

235def _detect_available_programs_remote(job: Job, remote: Remote, ssh_user_host: str) -> dict[str, str]: 

236 """Detects CLI tools available on ``remote`` and updates mapping correspondingly.""" 

237 p, log = job.params, job.params.log 

238 location = remote.location 

239 available_programs_minimum = {"sudo": ""} 

240 available_programs: dict[str, str] = {} 

241 if is_dummy(remote): 

242 return available_programs 

243 lines: str | None = None 

244 try: 

245 # on Linux, 'zfs --version' returns with zero status and prints the correct info 

246 # on FreeBSD, 'zfs --version' always prints the same (correct) info as Linux, but nonetheless sometimes 

247 # returns with non-zero status (sometimes = if the zfs kernel module is not loaded) 

248 lines = job.run_ssh_command_with_retries(remote, LOG_TRACE, print_stderr=False, cmd=[p.zfs_program, "--version"]) 

249 assert lines 

250 except (FileNotFoundError, PermissionError): # location is local and program file was not found 

251 die(f"{p.zfs_program} CLI is not available on {location} host: {ssh_user_host or 'localhost'}") 

252 except subprocess.CalledProcessError as e: 

253 stderr: str = stderr_to_str(e.stderr) 

254 stdout: str = stderr_to_str(e.stdout) 

255 if "unrecognized command '--version'" in stderr and "run: zfs help" in stderr: 

256 die(f"Unsupported ZFS platform: {stderr}") # solaris is unsupported 

257 elif stderr.startswith("ssh: "): 

258 assert e.returncode == 255, e.returncode # error within SSH itself (not during the remote command) 

259 die(f"ssh exit code {e.returncode}: {stderr.rstrip()}") 

260 elif not stdout.startswith("zfs"): 260 ↛ 263line 260 didn't jump to line 263 because the condition on line 260 was always true

261 die(f"{p.zfs_program} CLI is not available on {location} host: {ssh_user_host or 'localhost'}") 

262 else: 

263 lines = stdout # FreeBSD if the zfs kernel module is not loaded 

264 assert lines 

265 if lines: 265 ↛ 277line 265 didn't jump to line 277 because the condition on line 265 was always true

266 # Examples that should parse: "zfs-2.1.5~rc5-ubuntu3", "zfswin-2.2.3rc5" 

267 first_line: str = lines.splitlines()[0] if lines.splitlines() else "" 

268 match = re.search(r"(\d+)\.(\d+)\.(\d+)", first_line) 

269 if not match: 

270 die("Unparsable zfs version string: '" + first_line + "'") 

271 version = ".".join(match.groups()) 

272 available_programs["zfs"] = version 

273 if is_version_at_least(version, "2.1.0"): 

274 available_programs[ZFS_VERSION_IS_AT_LEAST_2_1_0] = "" 

275 if is_version_at_least(version, "2.2.0"): 

276 available_programs[ZFS_VERSION_IS_AT_LEAST_2_2_0] = "" 

277 log.log(LOG_TRACE, f"available_programs[{location}][zfs]: %s", available_programs["zfs"]) 

278 

279 if p.shell_program != DISABLE_PRG: 

280 try: 

281 cmd: list[str] = [p.shell_program, "-c", _find_available_programs(p)] 

282 stdout = job.run_ssh_command_with_retries(remote, LOG_TRACE, cmd=cmd) 

283 available_programs.update(dict.fromkeys(stdout.splitlines(), "")) 

284 return available_programs 

285 except (FileNotFoundError, PermissionError) as e: # location is local and shell program file was not found 

286 if e.filename != p.shell_program: 286 ↛ 287line 286 didn't jump to line 287 because the condition on line 286 was never true

287 raise 

288 except subprocess.CalledProcessError: 

289 pass 

290 log.warning("%s", f"Failed to find {p.shell_program} on {location}. Continuing with minimal assumptions...") 

291 available_programs.update(available_programs_minimum) 

292 return available_programs 

293 

294 

295def is_dummy(r: Remote) -> bool: 

296 """Returns True if ``remote`` refers to the synthetic dummy dataset.""" 

297 return r.root_dataset == DUMMY_DATASET 

298 

299 

300def _detect_zpool_features(job: Job, remote: Remote, available_programs: dict) -> dict[str, str]: 

301 """Fills ``job.params.zpool_features`` with detected zpool capabilities.""" 

302 p = params = job.params 

303 r, loc, log = remote, remote.location, p.log 

304 lines: list[str] = [] 

305 features: dict[str, str] = {} 

306 if is_dummy(r): 

307 return {} 

308 if params.zpool_program != DISABLE_PRG and (params.shell_program == DISABLE_PRG or "zpool" in available_programs): 

309 cmd: list[str] = params.split_args(f"{params.zpool_program} get -Hp -o property,value all", r.pool) 

310 try: 

311 lines = job.run_ssh_command_with_retries(remote, LOG_TRACE, check=False, cmd=cmd).splitlines() 

312 except (FileNotFoundError, PermissionError) as e: 

313 if e.filename != params.zpool_program: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true

314 raise 

315 log.warning("%s", f"Failed to detect zpool features on {loc}: {r.pool}. Continuing with minimal assumptions ...") 

316 else: 

317 props: dict[str, str] = dict(line.split("\t", 1) for line in lines) 

318 features = {k: v for k, v in props.items() if k.startswith("feature@") or k == "delegation"} 

319 if len(lines) == 0: 

320 cmd = p.split_args(f"{p.zfs_program} list -t filesystem -Hp -o name -s name", r.pool) 

321 if job.try_ssh_command_with_retries(remote, LOG_TRACE, cmd=cmd) is None: 

322 die(f"Pool does not exist for {loc} dataset: {r.basis_root_dataset}. Manually create the pool first!") 

323 return features 

324 

325 

326def is_zpool_feature_enabled_or_active(p: Params, remote: Remote, feature: str) -> bool: 

327 """Returns True if the given zpool feature is active or enabled on ``remote``.""" 

328 return p.zpool_features[remote.location][remote.pool].get(feature) in ("active", "enabled") 

329 

330 

331def are_bookmarks_enabled(p: Params, remote: Remote) -> bool: 

332 """Checks if bookmark related features are enabled on ``remote``.""" 

333 return is_zpool_feature_enabled_or_active(p, remote, "feature@bookmark_v2") and is_zpool_feature_enabled_or_active( 

334 p, remote, "feature@bookmark_written" 

335 ) 

336 

337 

338def is_caching_snapshots(p: Params, remote: Remote) -> bool: 

339 """Returns True if snapshot caching is supported and enabled on ``remote``.""" 

340 return p.is_caching_snapshots and p.is_program_available(ZFS_VERSION_IS_AT_LEAST_2_2_0, remote.location) 

341 

342 

343def is_version_at_least(version_str: str, min_version_str: str) -> bool: 

344 """Checks if the version string is at least the minimum version string.""" 

345 return tuple(map(int, version_str.split("."))) >= tuple(map(int, min_version_str.split("."))) 

346 

347 

348def _validate_default_shell(path_to_default_shell: str, location: str, ssh_user_host: str) -> None: 

349 """Fails for default shells that do not honor POSIX shell quoting.""" 

350 shell_name: str = path_to_default_shell.rsplit("/", maxsplit=1)[-1] 

351 if shell_name in ("csh", "tcsh", "elvish", "fish", "nu", "nushell", "xonsh"): 

352 # On some old FreeBSD systems the default shell is still csh. Also see https://www.grymoire.com/unix/CshTop10.txt 

353 die( 

354 f"Cowardly refusing to proceed because {PROG_NAME} requires POSIX shell quoting of special characters, " 

355 f"but '{path_to_default_shell}' is incompatible. The safe workaround is to first manually set 'sh' " 

356 f"instead of '{path_to_default_shell}' as the default shell of the Unix user on {location} host: " 

357 f"{ssh_user_host or 'localhost'}, like so: " 

358 "chsh -s /bin/sh <YOURUSERNAME>" 

359 )