Coverage for bzfs_main/connection_lease.py: 100%

123 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-07 04:44 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15""" 

16Purpose 

17----------------- 

18This module safely reduces SSH startup latency on bzfs process startup. Using this, a fresh bzfs CLI process can attach 

19immediately to an existing OpenSSH ControlMaster connection, and in doing so skip TCP handshake, SSH handshake, key exchange, 

20authentication, and other delays from multiple network round-trips. This way, the very first remote ZFS command starts 

21hundreds of milliseconds earlier as compared to when creating an SSH connection from scratch. 

22 

23The higher-level goal is predictable performance, shorter critical paths, and less noisy-neighbor impact across frequent 

24periodic replication jobs at fleet scale, including at high concurrency. To achieve this, OpenSSH masters remain alive after 

25bzfs process termination (unless masters become idle for a specified time period), and inter-process reuse of ControlPath 

26sockets is coordinated safely so every new bzfs process benefits from an existing connection when present, and can recreate 

27it deterministically when absent. This amortizes startup costs, reduces tail latency, and improves operational reliability 

28without background daemons, external services, or warm-up procedures. 

29 

30How This Is Achieved 

31-------------------- 

32This module provides a small, fast, safe, and reliable mechanism to allocate and reuse unique Unix domain socket files 

33(ControlPaths) in a per-endpoint namespace, even under high concurrency. The full socket path namespace is ~/.ssh/bzfs/ (per 

34local user) plus a hashed subdirectory derived from the remote endpoint identity (user@host, port, ssh_config_file hash). 

35A bzfs process acquires an exclusive advisory file lock via flock(2) on a named empty lock file in the namespace directory 

36tree, then atomically moves it between free/ and used/ subdirectories to speed up later searches for available names in each 

37category of that namespace. The held lease exposes the open file descriptor (which maintains the lock) and the computed 

38ControlPath so callers can safely establish or reuse SSH master connections without races or leaks. Holding a lease's lock 

39for the duration of a `bzfs` process guarantees exclusive ownership of that SSH master while allowing the underlying TCP 

40connection to persist beyond bzfs process exit via OpenSSH ControlPersist. 

41 

42Also see https://en.wikibooks.org/wiki/OpenSSH/Cookbook/Multiplexing 

43and https://chessman7.substack.com/p/how-ssh-multiplexing-reuses-master 

44 

45Assumptions 

46----------- 

47- The filesystem is POSIX-compliant and supports ``fcntl.flock`` advisory locks, atomic ``os.rename`` file moves, and 

48 permissions enforcement. The process runs on the same host as the SSH client using the ControlPath. 

49- Low latency: scanning the free/ and used/ directories has bounded cost and is O(1) expected time because names are 

50 uniformly random. Directory contents are small and ephemeral. 

51- Crash recovery is acceptable by salvaging an unlocked file from used/; an unlock indicates process termination or orderly 

52 release. The flock itself is the source of truth; directory names merely provide fast classification. 

53 

54Design Rationale 

55---------------- 

56- File locks plus atomic renames form a minimal, portable concurrency primitive that composes well with process lifecycles. 

57 Locks are released automatically on process exit or when the file descriptor is closed, ensuring no cleanup logic is 

58 required after abnormal termination. 

59- The two-directory layout (free/ and used/) makes hot-path acquisition fast. We first probe free/ to reuse previously 

60 released names. If that doesn't produce an acquisition we probe used/ to salvage leftovers from crashed processes that no 

61 longer hold locks. If that doesn't produce an acquisition either we finally generate a new name. This yields a compact, 

62 garbage-free namespace without a janitor. 

63- Name generation mixes in a cryptographically strong random component, encoded in URL-safe base64 to avoid long names and 

64 path-unfriendly characters. This reduces collision probability and helps with human diagnostics. 

65- Security is prioritized: the root, sockets, and lease directories are created with explicit permissions, and symlinks are 

66 rejected to avoid redirection attacks. Open flags include ``O_NOFOLLOW`` and ``O_CLOEXEC`` to remove common foot-guns. 

67 Foreign file ownership and overly permissive file permissions are rejected to prevent sockets being used by third parties. 

68- The public API is intentionally tiny and ergonomic. ``ConnectionLeaseManager.acquire()`` never blocks: it either returns 

69 a lease for an existing name or a new name, ensuring predictable low latency under contention. The ``ConnectionLease`` 

70 is immutable and simple to reason about, with an explicit ``release()`` to return capacity to the pool. 

71- No external services, daemons, or background threads are required. The design favors determinate behavior under failures, 

72 idempotent operations, and ease of testing. This approach integrates cleanly with ``bzfs`` where predictable SSH reuse and 

73 teardown during snapshot replication must remain both fast and safe. 

74""" 

75 

76from __future__ import ( 

77 annotations, 

78) 

79import fcntl 

80import logging 

81import os 

82import pathlib 

83import random 

84import time 

85from typing import ( 

86 Final, 

87 NamedTuple, 

88) 

89 

90from bzfs_main.utils import ( 

91 DIR_PERMISSIONS, 

92 FILE_PERMISSIONS, 

93 LOG_TRACE, 

94 UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH, 

95 close_quietly, 

96 sha256_urlsafe_base64, 

97 urlsafe_base64, 

98 validate_file_permissions, 

99 validate_is_not_a_symlink, 

100) 

101 

102# constants: 

103SOCKETS_DIR: Final[str] = "c" 

104FREE_DIR: Final[str] = "free" 

105USED_DIR: Final[str] = "used" 

106SOCKET_PREFIX: Final[str] = "s" 

107NAMESPACE_DIR_LENGTH: Final[int] = 43 # 43 Base64 chars contain the entire SHA-256 of the SSH endpoint 

108 

109 

110############################################################################# 

111class ConnectionLease(NamedTuple): 

112 """Purpose: Reduce SSH connection startup latency of a fresh bzfs process via safe OpenSSH ControlPath reuse. 

113 

114 Assumptions: Callers hold this object only while the lease is needed. The file descriptor remains valid and keeps an 

115 exclusive advisory lock until ``release()`` or process exit. Paths are absolute and live within the manager's directory 

116 tree. 

117 

118 Design Rationale: A small, immutable class captures just the essential handles needed for correctness and observability: 

119 the open fd, the used/ and free/ lock file paths, and the Unix domain socket ControlPath that SSH uses. Immutability 

120 prevents accidental mutation bugs, keeps equality semantics straightforward, and encourages explicit ownership transfer. 

121 A dedicated ``release()`` method centralizes cleanup and safe transition of the lock file to free/, followed by closing 

122 the fd for deterministic unlock. 

123 """ 

124 

125 # immutable variables: 

126 fd: int 

127 used_path: str 

128 free_path: str 

129 socket_path: str 

130 

131 def release(self) -> None: 

132 """Releases the lease: moves the lock file from used/ dir back to free/ dir, then unlocks it by closing its fd.""" 

133 try: 

134 os.rename(self.used_path, self.free_path) # mv lock file atomically while holding the lock 

135 except FileNotFoundError: 

136 pass # harmless 

137 finally: 

138 close_quietly(self.fd) # release lock 

139 

140 def set_socket_mtime_to_now(self) -> None: 

141 """Sets the mtime of the lease's ControlPath socket file to now; noop if the file is missing.""" 

142 try: 

143 os.utime(self.socket_path, None) 

144 except FileNotFoundError: 

145 pass # harmless 

146 

147 

148############################################################################# 

149class ConnectionLeaseManager: 

150 """Purpose: Reduce SSH connection startup latency of a fresh bzfs process via safe OpenSSH ControlPath reuse. 

151 

152 Assumptions: The manager has exclusive control of its root directory subtree. The process operates on a POSIX-compliant 

153 filesystem and uses advisory locks consistently. Path lengths must respect common Unix domain socket limits. Callers 

154 use the socket path with OpenSSH. 

155 

156 Design Rationale: A compact state machine with free/ and used/ stages, guided by atomically acquired file locks and 

157 renames, yields predictable latency, natural crash recovery, and zero background maintenance. The API is intentionally 

158 minimal to reduce misuse. Defensive checks and secure open flags balance correctness, performance, and security without 

159 external dependencies. 

160 """ 

161 

162 def __init__( 

163 self, 

164 root_dir: str, # the local user is implied by root_dir 

165 namespace: str, # derived from the remote endpoint identity (ssh_user_host, port, ssh_config_file hash) 

166 ssh_control_persist_secs: int = 90, # TTL for garbage collecting stale files while preserving reuse of live masters 

167 *, 

168 log: logging.Logger, 

169 ) -> None: 

170 """Initializes manager with namespaced dirs and security settings for SSH ControlPath reuse.""" 

171 # immutable variables: 

172 assert root_dir 

173 assert namespace 

174 assert ssh_control_persist_secs >= 1 

175 self._ssh_control_persist_secs: Final[int] = ssh_control_persist_secs 

176 self._log: Final[logging.Logger] = log 

177 ns: str = sha256_urlsafe_base64(namespace, padding=False) 

178 assert NAMESPACE_DIR_LENGTH >= 22 # a minimum degree of safety: 22 URL-safe Base64 chars = 132 bits of entropy 

179 ns = ns[0:NAMESPACE_DIR_LENGTH] 

180 namespace_dir: str = os.path.join(root_dir, ns) 

181 self._sockets_dir: Final[str] = os.path.join(namespace_dir, SOCKETS_DIR) 

182 self._free_dir: Final[str] = os.path.join(namespace_dir, FREE_DIR) 

183 self._used_dir: Final[str] = os.path.join(namespace_dir, USED_DIR) 

184 self._open_flags: Final[int] = os.O_WRONLY | os.O_NOFOLLOW | os.O_CLOEXEC 

185 os.makedirs(root_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

186 validate_is_not_a_symlink("connection lease root_dir ", root_dir) 

187 validate_file_permissions(root_dir, mode=DIR_PERMISSIONS) 

188 os.makedirs(namespace_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

189 validate_is_not_a_symlink("connection lease namespace_dir ", namespace_dir) 

190 validate_file_permissions(namespace_dir, mode=DIR_PERMISSIONS) 

191 

192 def _validate_dirs(self) -> None: 

193 """Ensures sockets/free/used directories exist, are not symlinks, and have strict permissions.""" 

194 for _dir in (self._sockets_dir, self._free_dir, self._used_dir): 

195 os.makedirs(_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

196 validate_is_not_a_symlink("connection lease dir ", _dir) 

197 validate_file_permissions(_dir, mode=DIR_PERMISSIONS) 

198 

199 def acquire(self) -> ConnectionLease: # thread-safe 

200 """Acquires and returns a ConnectionLease with an open, flocked fd and the SSH ControlPath aka socket file path.""" 

201 self._validate_dirs() 

202 lease = self._find_and_acquire(self._free_dir) # fast path: find free aka unlocked socket name in O(1) expected time 

203 if lease is not None: 

204 self._log.log(LOG_TRACE, "_find_and_acquire: %s", self._free_dir) 

205 return lease 

206 lease = self._find_and_acquire(self._used_dir) # salvage from used yet unlocked socket names leftover from crash 

207 if lease is not None: 

208 self._log.log(LOG_TRACE, "_find_and_acquire: %s", self._used_dir) 

209 return lease 

210 lease = self._create_and_acquire() # create new socket name 

211 self._log.log(LOG_TRACE, "_create_and_acquire: %s", self._free_dir) 

212 return lease 

213 

214 def _find_and_acquire(self, scan_dir: str) -> ConnectionLease | None: 

215 """Scans a directory for an unlocked lease, prunes stale entries, and returns a locked lease if found.""" 

216 with os.scandir(scan_dir) as iterator: 

217 for entry in iterator: 

218 if entry.name.startswith(SOCKET_PREFIX) and entry.is_file(follow_symlinks=False): 

219 lease: ConnectionLease | None = self._try_lock(entry.path, open_flags=self._open_flags) 

220 if lease is not None: 

221 # If the control socket does not exist or is too old, then prune this entry to keep directory sizes 

222 # bounded after crash storms, without losing opportunities to reuse a live SSH master connection. 

223 delete_used_path: bool = False 

224 try: 

225 age_secs: float = time.time() - os.stat(lease.socket_path).st_mtime 

226 if age_secs > self._ssh_control_persist_secs: # old garbage left over from crash? 

227 delete_used_path = True 

228 os.unlink(lease.socket_path) # remove control socket garbage while holding the lock 

229 except FileNotFoundError: 

230 delete_used_path = True # control socket does not exist anymore 

231 if not delete_used_path: 

232 return lease # return locked lease; this is the common case 

233 try: # Remove the renamed lock file at its current location under used/ while holding the lock 

234 pathlib.Path(lease.used_path).unlink(missing_ok=True) 

235 finally: 

236 close_quietly(lease.fd) # release lock 

237 # keep scanning for a better candidate 

238 return None 

239 

240 def _create_and_acquire(self) -> ConnectionLease: 

241 """Creates a new unique lease name, enforces socket path length, and returns a locked lease.""" 

242 max_rand: int = 2**64 - 1 

243 rand: random.Random = random.SystemRandom() 

244 while True: 

245 random_prefix: str = urlsafe_base64(rand.randint(0, max_rand), max_value=max_rand, padding=False) 

246 socket_name: str = f"{SOCKET_PREFIX}{random_prefix}" 

247 socket_path: str = os.path.join(self._sockets_dir, socket_name) 

248 

249 # Intentionally error out hard if the max OS Unix domain socket path limit cannot be met reasonably as the home 

250 # directory path is too long, typically because the Unix user name is unreasonably long. Failing fast here avoids 

251 # opaque OS errors later in `ssh`. 

252 socket_path_bytes: bytes = os.fsencode(socket_path) 

253 if len(socket_path_bytes) > UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH: 

254 raise OSError( 

255 "SSH ControlPath exceeds Unix domain socket limit " 

256 f"({len(socket_path_bytes)} > {UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH}); " 

257 f"shorten it by shortening the home directory path: {socket_path}" 

258 ) 

259 

260 free_path: str = os.path.join(self._free_dir, os.path.basename(socket_path)) 

261 lease: ConnectionLease | None = self._try_lock(free_path, open_flags=self._open_flags | os.O_CREAT) 

262 if lease is not None: 

263 return lease 

264 

265 def _try_lock(self, src_path: str, open_flags: int) -> ConnectionLease | None: 

266 """Attempts to open and exclusively flock a lease file, atomically moves it to used/, and builds the lease.""" 

267 fd: int = -1 

268 try: 

269 fd = os.open(src_path, flags=open_flags, mode=FILE_PERMISSIONS) 

270 

271 # Acquire an exclusive lock; will raise a BlockingIOError if lock is already held by this process or another 

272 # process. The (advisory) lock is auto-released when the process terminates or the fd is closed. 

273 # Note: All supported operating systems also reject any attempt to acquire an exclusive flock that is already 

274 # held within the same process under an fd obtained from a separate os.open() call, by raising a BlockingIOError. 

275 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # LOCK_NB ... non-blocking 

276 

277 socket_name: str = os.path.basename(src_path) 

278 used_path: str = os.path.join(self._used_dir, socket_name) 

279 if src_path != used_path and os.path.exists(used_path): # extremely rare name collision? 

280 close_quietly(fd) # release lock 

281 return None # harmless; retry with another name. See test_collision_on_create_then_salvage_and_release() 

282 

283 # Rename cannot race: only free/<name> -> used/<name> produces used/ entries, and requires holding an exclusive 

284 # flock on free/<name>. We hold that lock while renaming. With exclusive subtree control, used/<name> cannot 

285 # appear between exists() and rename(). The atomic rename below is safe. 

286 os.rename(src_path, used_path) # mv lock file atomically while holding the lock 

287 

288 except OSError as e: 

289 close_quietly(fd) # release lock 

290 if isinstance(e, BlockingIOError): 

291 return None # lock is already held by this process or another process 

292 elif isinstance(e, FileNotFoundError): 

293 self._validate_dirs() 

294 return None 

295 raise 

296 else: # success 

297 return ConnectionLease( 

298 fd=fd, 

299 used_path=used_path, 

300 free_path=os.path.join(self._free_dir, socket_name), 

301 socket_path=os.path.join(self._sockets_dir, socket_name), 

302 )