Coverage for bzfs_main / util / connection_lease.py: 100%

125 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-22 08:03 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15""" 

16Purpose 

17----------------- 

18This module safely reduces SSH startup latency on bzfs process startup. Using this, a fresh bzfs CLI process can immediately 

19attach to an existing OpenSSH ControlMaster connection, and in doing so skip TCP handshake, SSH handshake, key exchange, 

20authentication, and other delays from multiple network round-trips. This way, the very first remote ZFS command starts 

21hundreds of milliseconds earlier as compared to when creating an SSH connection from scratch. Crucially, this module 

22guarantees that a bzfs process can (as long as it is alive) maintain exclusive access to the OpenSSH connections it has 

23obtained. 

24 

25The higher-level goal is predictable performance, shorter critical paths, and less noisy-neighbor impact across frequent 

26periodic replication jobs at fleet scale, including at high concurrency. To achieve this, OpenSSH masters remain alive after 

27bzfs process termination (unless masters become idle for a specified time period), and inter-process reuse of ControlPath 

28sockets is coordinated safely so every new bzfs process benefits from an existing connection when present, and can recreate 

29it deterministically when absent. This amortizes startup costs, reduces tail latency, and improves operational reliability 

30without background daemons, external services, or warm-up procedures. 

31 

32How This Is Achieved 

33-------------------- 

34This module provides a small, fast, safe, and reliable mechanism to allocate and reuse unique Unix domain socket files 

35(ControlPaths) in a per-endpoint namespace, even under high concurrency. The full socket path namespace is ~/.ssh/bzfs/ (per 

36local user) plus a hashed subdirectory derived from the remote endpoint identity (user@host, port, ssh_config_file hash). 

37A bzfs process acquires an exclusive advisory file lock via flock(2) on a named empty lock file in the namespace directory 

38tree, then atomically moves it between free/ and used/ subdirectories to speed up later searches for available names in each 

39category of that namespace. The held lease exposes the open file descriptor (which maintains the lock) and the computed 

40ControlPath so callers can safely establish, reuse and maintain exclusive access to OpenSSH master connections without races 

41or leaks. 

42 

43Holding a lease's lock for the duration of a `bzfs` process guarantees exclusive ownership of that SSH master while allowing 

44the underlying TCP connection to persist beyond bzfs process exit via OpenSSH ControlPersist. No other `bzfs` process can use 

45the TCP connection while the lease's lock is held, which ensures the connection cannot be degraded/overloaded with an 

46unbounded number of concurrent SSH sessions (aka too many concurrent requests per connection, via multiple processes). See 

47the server-side sshd_config(5) MaxSessions parameter (which defaults to 10, see 

48https://manpages.ubuntu.com/manpages/man5/sshd_config.5.html). 

49 

50Also see https://en.wikibooks.org/wiki/OpenSSH/Cookbook/Multiplexing 

51and https://chessman7.substack.com/p/how-ssh-multiplexing-reuses-master 

52 

53Assumptions 

54----------- 

55- The filesystem is POSIX-compliant and supports ``fcntl.flock`` advisory locks, atomic ``os.rename`` file moves, and 

56 permissions enforcement. The process runs on the same host as the SSH client using the ControlPath. 

57- Low latency: scanning the free/ and used/ directories has bounded cost and is O(1) expected time because names are 

58 uniformly random. Directory contents are small and ephemeral. 

59- Crash recovery is acceptable by salvaging an unlocked file from used/; an unlock indicates process termination or orderly 

60 release. The flock itself is the source of truth; directory names merely provide fast classification. 

61 

62Design Rationale 

63---------------- 

64- File locks plus atomic renames form a minimal, portable concurrency primitive that composes well with process lifecycles. 

65 Locks are released automatically on process exit or when the file descriptor is closed, ensuring no cleanup logic is 

66 required after abnormal termination. 

67- The two-directory layout (free/ and used/) makes hot-path acquisition fast. We first probe free/ to reuse previously 

68 released names. If that doesn't produce an acquisition we probe used/ to salvage leftovers from crashed processes that no 

69 longer hold locks. If that doesn't produce an acquisition either we finally generate a new name. This yields a compact, 

70 garbage-free namespace without a janitor. 

71- Name generation mixes in a cryptographically strong random component, encoded in URL-safe base64 to avoid long names and 

72 path-unfriendly characters. This reduces collision probability and helps with human diagnostics. 

73- Security is prioritized: the root, sockets, and lease directories are created with explicit permissions, and symlinks are 

74 rejected to avoid redirection attacks. Open flags include ``O_NOFOLLOW`` and ``O_CLOEXEC`` to remove common foot-guns. 

75 Foreign file ownership and overly permissive file permissions are rejected to prevent sockets being used by third parties. 

76- The public API is intentionally tiny and ergonomic. ``ConnectionLeaseManager.acquire()`` never blocks: it either returns 

77 a lease for an existing name or a new name, ensuring predictable low latency under contention. The ``ConnectionLease`` 

78 is immutable and simple to reason about, with an explicit ``release()`` to return capacity to the pool. 

79- No external services, daemons, or background threads are required. The design favors determinate behavior under failures, 

80 idempotent operations, and ease of testing. This approach integrates cleanly with ``bzfs`` where predictable SSH reuse and 

81 teardown during snapshot replication must remain both fast and safe. 

82""" 

83 

84from __future__ import ( 

85 annotations, 

86) 

87import fcntl 

88import logging 

89import os 

90import pathlib 

91import random 

92import time 

93from typing import ( 

94 Final, 

95 NamedTuple, 

96 final, 

97) 

98 

99from bzfs_main.util.utils import ( 

100 DIR_PERMISSIONS, 

101 FILE_PERMISSIONS, 

102 LOG_TRACE, 

103 UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH, 

104 close_quietly, 

105 sha256_urlsafe_base64, 

106 urlsafe_base64, 

107 validate_file_permissions, 

108 validate_is_not_a_symlink, 

109) 

110 

111# constants: 

112SOCKETS_DIR: Final[str] = "c" 

113FREE_DIR: Final[str] = "free" 

114USED_DIR: Final[str] = "used" 

115SOCKET_PREFIX: Final[str] = "s" 

116NAMESPACE_DIR_LENGTH: Final[int] = 43 # 43 Base64 chars contain the entire SHA-256 of the SSH endpoint 

117 

118 

119############################################################################# 

120@final 

121class ConnectionLease(NamedTuple): 

122 """Purpose: Reduce SSH connection startup latency of a fresh bzfs process via safe OpenSSH ControlPath reuse. 

123 

124 Assumptions: Callers hold this object only while the lease is needed. The file descriptor remains valid and keeps an 

125 exclusive advisory lock until ``release()`` or process exit. Paths are absolute and live within the manager's directory 

126 tree. 

127 

128 Design Rationale: A small, immutable class captures just the essential handles needed for correctness and observability: 

129 the open fd, the used/ and free/ lock file paths, and the Unix domain socket ControlPath that SSH uses. Immutability 

130 prevents accidental mutation bugs, keeps equality semantics straightforward, and encourages explicit ownership transfer. 

131 A dedicated ``release()`` method centralizes cleanup and safe transition of the lock file to free/, followed by closing 

132 the fd for deterministic unlock. 

133 """ 

134 

135 # immutable variables: 

136 fd: int 

137 used_path: str 

138 free_path: str 

139 socket_path: str 

140 

141 def release(self) -> None: 

142 """Releases the lease: moves the lock file from used/ dir back to free/ dir, then unlocks it by closing its fd.""" 

143 try: 

144 os.rename(self.used_path, self.free_path) # mv lock file atomically while holding the lock 

145 except FileNotFoundError: 

146 pass # harmless 

147 finally: 

148 close_quietly(self.fd) # release lock 

149 

150 def set_socket_mtime_to_now(self) -> None: 

151 """Sets the mtime of the lease's ControlPath socket file to now; noop if the file is missing.""" 

152 try: 

153 os.utime(self.socket_path, None) 

154 except FileNotFoundError: 

155 pass # harmless 

156 

157 

158############################################################################# 

159@final 

160class ConnectionLeaseManager: 

161 """Purpose: Reduce SSH connection startup latency of a fresh bzfs process via safe OpenSSH ControlPath reuse. 

162 

163 Assumptions: The manager has exclusive control of its root directory subtree. The process operates on a POSIX-compliant 

164 filesystem and uses advisory locks consistently. Path lengths must respect common Unix domain socket limits. Callers 

165 use the socket path with OpenSSH. 

166 

167 Design Rationale: A compact state machine with free/ and used/ stages, guided by atomically acquired file locks and 

168 renames, yields predictable latency, natural crash recovery, and zero background maintenance. The API is intentionally 

169 minimal to reduce misuse. Defensive checks and secure open flags balance correctness, performance, and security without 

170 external dependencies. 

171 """ 

172 

173 def __init__( 

174 self, 

175 root_dir: str, # the local user is implied by root_dir 

176 namespace: str, # derived from the remote endpoint identity (ssh_user_host, port, ssh_config_file hash) 

177 ssh_control_persist_secs: int = 90, # TTL for garbage collecting stale files while preserving reuse of live masters 

178 *, 

179 log: logging.Logger, 

180 ) -> None: 

181 """Initializes manager with namespaced dirs and security settings for SSH ControlPath reuse.""" 

182 # immutable variables: 

183 assert root_dir 

184 assert namespace 

185 assert ssh_control_persist_secs >= 1 

186 self._ssh_control_persist_secs: Final[int] = ssh_control_persist_secs 

187 self._log: Final[logging.Logger] = log 

188 ns: str = sha256_urlsafe_base64(namespace, padding=False) 

189 assert NAMESPACE_DIR_LENGTH >= 22 # a minimum degree of safety: 22 URL-safe Base64 chars = 132 bits of entropy 

190 ns = ns[0:NAMESPACE_DIR_LENGTH] 

191 namespace_dir: str = os.path.join(root_dir, ns) 

192 self._sockets_dir: Final[str] = os.path.join(namespace_dir, SOCKETS_DIR) 

193 self._free_dir: Final[str] = os.path.join(namespace_dir, FREE_DIR) 

194 self._used_dir: Final[str] = os.path.join(namespace_dir, USED_DIR) 

195 self._open_flags: Final[int] = os.O_WRONLY | os.O_NOFOLLOW | os.O_CLOEXEC 

196 os.makedirs(root_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

197 validate_is_not_a_symlink("connection lease root_dir ", root_dir) 

198 validate_file_permissions(root_dir, mode=DIR_PERMISSIONS) 

199 os.makedirs(namespace_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

200 validate_is_not_a_symlink("connection lease namespace_dir ", namespace_dir) 

201 validate_file_permissions(namespace_dir, mode=DIR_PERMISSIONS) 

202 

203 def _validate_dirs(self) -> None: 

204 """Ensures sockets/free/used directories exist, are not symlinks, and have strict permissions.""" 

205 for _dir in (self._sockets_dir, self._free_dir, self._used_dir): 

206 os.makedirs(_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

207 validate_is_not_a_symlink("connection lease dir ", _dir) 

208 validate_file_permissions(_dir, mode=DIR_PERMISSIONS) 

209 

210 def acquire(self) -> ConnectionLease: # thread-safe 

211 """Acquires and returns a ConnectionLease with an open, flocked fd and the SSH ControlPath aka socket file path.""" 

212 self._validate_dirs() 

213 lease = self._find_and_acquire(self._free_dir) # fast path: find free aka unlocked socket name in O(1) expected time 

214 if lease is not None: 

215 self._log.log(LOG_TRACE, "_find_and_acquire: %s", self._free_dir) 

216 return lease 

217 lease = self._find_and_acquire(self._used_dir) # salvage from used yet unlocked socket names leftover from crash 

218 if lease is not None: 

219 self._log.log(LOG_TRACE, "_find_and_acquire: %s", self._used_dir) 

220 return lease 

221 lease = self._create_and_acquire() # create new socket name 

222 self._log.log(LOG_TRACE, "_create_and_acquire: %s", self._free_dir) 

223 return lease 

224 

225 def _find_and_acquire(self, scan_dir: str) -> ConnectionLease | None: 

226 """Scans a directory for an unlocked lease, prunes stale entries, and returns a locked lease if found.""" 

227 with os.scandir(scan_dir) as iterator: 

228 for entry in iterator: 

229 if entry.name.startswith(SOCKET_PREFIX) and entry.is_file(follow_symlinks=False): 

230 lease: ConnectionLease | None = self._try_lock(entry.path, open_flags=self._open_flags) 

231 if lease is not None: 

232 # If the control socket does not exist or is too old, then prune this entry to keep directory sizes 

233 # bounded after crash storms, without losing opportunities to reuse a live SSH master connection. 

234 delete_used_path: bool = False 

235 try: 

236 age_secs: float = time.time() - os.stat(lease.socket_path).st_mtime 

237 if age_secs > self._ssh_control_persist_secs: # old garbage left over from crash? 

238 delete_used_path = True 

239 os.unlink(lease.socket_path) # remove control socket garbage while holding the lock 

240 except FileNotFoundError: 

241 delete_used_path = True # control socket does not exist anymore 

242 if not delete_used_path: 

243 return lease # return locked lease; this is the common case 

244 try: # Remove the renamed lock file at its current location under used/ while holding the lock 

245 pathlib.Path(lease.used_path).unlink(missing_ok=True) 

246 finally: 

247 close_quietly(lease.fd) # release lock 

248 # keep scanning for a better candidate 

249 return None 

250 

251 def _create_and_acquire(self) -> ConnectionLease: 

252 """Creates a new unique lease name, enforces socket path length, and returns a locked lease.""" 

253 max_rand: int = 2**64 - 1 

254 rand: random.Random = random.SystemRandom() 

255 while True: 

256 random_prefix: str = urlsafe_base64(rand.randint(0, max_rand), max_value=max_rand, padding=False) 

257 socket_name: str = f"{SOCKET_PREFIX}{random_prefix}" 

258 socket_path: str = os.path.join(self._sockets_dir, socket_name) 

259 

260 # Intentionally error out hard if the max OS Unix domain socket path limit cannot be met reasonably as the home 

261 # directory path is too long, typically because the Unix user name is unreasonably long. Failing fast here avoids 

262 # opaque OS errors later in `ssh`. 

263 socket_path_bytes: bytes = os.fsencode(socket_path) 

264 if len(socket_path_bytes) > UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH: 

265 raise OSError( 

266 "SSH ControlPath exceeds Unix domain socket limit " 

267 f"({len(socket_path_bytes)} > {UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH}); " 

268 f"shorten it by shortening the home directory path: {socket_path}" 

269 ) 

270 

271 free_path: str = os.path.join(self._free_dir, socket_name) 

272 lease: ConnectionLease | None = self._try_lock(free_path, open_flags=self._open_flags | os.O_CREAT) 

273 if lease is not None: 

274 return lease 

275 

276 def _try_lock(self, src_path: str, open_flags: int) -> ConnectionLease | None: 

277 """Attempts to open and exclusively flock a lease file, atomically moves it to used/, and builds the lease.""" 

278 fd: int = -1 

279 try: 

280 fd = os.open(src_path, flags=open_flags, mode=FILE_PERMISSIONS) 

281 

282 # Acquire an exclusive lock; will raise a BlockingIOError if lock is already held by this process or another 

283 # process. The (advisory) lock is auto-released when the process terminates or the fd is closed. 

284 # Note: All supported operating systems also reject any attempt to acquire an exclusive flock that is already 

285 # held within the same process under an fd obtained from a separate os.open() call, by raising a BlockingIOError. 

286 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # LOCK_NB ... non-blocking 

287 

288 socket_name: str = os.path.basename(src_path) 

289 used_path: str = os.path.join(self._used_dir, socket_name) 

290 if src_path != used_path and os.path.exists(used_path): # extremely rare name collision? 

291 close_quietly(fd) # release lock 

292 return None # harmless; retry with another name. See test_collision_on_create_then_salvage_and_release() 

293 

294 # Rename cannot race: only free/<name> -> used/<name> produces used/ entries, and requires holding an exclusive 

295 # flock on free/<name>. We hold that lock while renaming. With exclusive subtree control, used/<name> cannot 

296 # appear between exists() and rename(). The atomic rename below is safe. 

297 os.rename(src_path, used_path) # mv lock file atomically while holding the lock 

298 

299 except OSError as e: 

300 close_quietly(fd) # release lock 

301 if isinstance(e, BlockingIOError): 

302 return None # lock is already held by this process or another process 

303 elif isinstance(e, FileNotFoundError): 

304 self._validate_dirs() 

305 return None 

306 raise 

307 else: # success 

308 return ConnectionLease( 

309 fd=fd, 

310 used_path=used_path, 

311 free_path=os.path.join(self._free_dir, socket_name), 

312 socket_path=os.path.join(self._sockets_dir, socket_name), 

313 )