Coverage for bzfs_main / util / connection_lease.py: 100%
125 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""
16Purpose
17-----------------
18This module safely reduces SSH startup latency on bzfs process startup. Using this, a fresh bzfs CLI process can immediately
19attach to an existing OpenSSH ControlMaster connection, and in doing so skip TCP handshake, SSH handshake, key exchange,
20authentication, and other delays from multiple network round-trips. This way, the very first remote ZFS command starts
21hundreds of milliseconds earlier as compared to when creating an SSH connection from scratch. Crucially, this module
22guarantees that a bzfs process can (as long as it is alive) maintain exclusive access to the OpenSSH connections it has
23obtained.
25The higher-level goal is predictable performance, shorter critical paths, and less noisy-neighbor impact across frequent
26periodic replication jobs at fleet scale, including at high concurrency. To achieve this, OpenSSH masters remain alive after
27bzfs process termination (unless masters become idle for a specified time period), and inter-process reuse of ControlPath
28sockets is coordinated safely so every new bzfs process benefits from an existing connection when present, and can recreate
29it deterministically when absent. This amortizes startup costs, reduces tail latency, and improves operational reliability
30without background daemons, external services, or warm-up procedures.
32How This Is Achieved
33--------------------
34This module provides a small, fast, safe, and reliable mechanism to allocate and reuse unique Unix domain socket files
35(ControlPaths) in a per-endpoint namespace, even under high concurrency. The full socket path namespace is ~/.ssh/bzfs/ (per
36local user) plus a hashed subdirectory derived from the remote endpoint identity (user@host, port, ssh_config_file hash).
37A bzfs process acquires an exclusive advisory file lock via flock(2) on a named empty lock file in the namespace directory
38tree, then atomically moves it between free/ and used/ subdirectories to speed up later searches for available names in each
39category of that namespace. The held lease exposes the open file descriptor (which maintains the lock) and the computed
40ControlPath so callers can safely establish, reuse and maintain exclusive access to OpenSSH master connections without races
41or leaks.
43Holding a lease's lock for the duration of a `bzfs` process guarantees exclusive ownership of that SSH master while allowing
44the underlying TCP connection to persist beyond bzfs process exit via OpenSSH ControlPersist. No other `bzfs` process can use
45the TCP connection while the lease's lock is held, which ensures the connection cannot be degraded/overloaded with an
46unbounded number of concurrent SSH sessions (aka too many concurrent requests per connection, via multiple processes). See
47the server-side sshd_config(5) MaxSessions parameter (which defaults to 10, see
48https://manpages.ubuntu.com/manpages/man5/sshd_config.5.html).
50Also see https://en.wikibooks.org/wiki/OpenSSH/Cookbook/Multiplexing
51and https://chessman7.substack.com/p/how-ssh-multiplexing-reuses-master
53Assumptions
54-----------
55- The filesystem is POSIX-compliant and supports ``fcntl.flock`` advisory locks, atomic ``os.rename`` file moves, and
56 permissions enforcement. The process runs on the same host as the SSH client using the ControlPath.
57- Low latency: scanning the free/ and used/ directories has bounded cost and is O(1) expected time because names are
58 uniformly random. Directory contents are small and ephemeral.
59- Crash recovery is acceptable by salvaging an unlocked file from used/; an unlock indicates process termination or orderly
60 release. The flock itself is the source of truth; directory names merely provide fast classification.
62Design Rationale
63----------------
64- File locks plus atomic renames form a minimal, portable concurrency primitive that composes well with process lifecycles.
65 Locks are released automatically on process exit or when the file descriptor is closed, ensuring no cleanup logic is
66 required after abnormal termination.
67- The two-directory layout (free/ and used/) makes hot-path acquisition fast. We first probe free/ to reuse previously
68 released names. If that doesn't produce an acquisition we probe used/ to salvage leftovers from crashed processes that no
69 longer hold locks. If that doesn't produce an acquisition either we finally generate a new name. This yields a compact,
70 garbage-free namespace without a janitor.
71- Name generation mixes in a cryptographically strong random component, encoded in URL-safe base64 to avoid long names and
72 path-unfriendly characters. This reduces collision probability and helps with human diagnostics.
73- Security is prioritized: the root, sockets, and lease directories are created with explicit permissions, and symlinks are
74 rejected to avoid redirection attacks. Open flags include ``O_NOFOLLOW`` and ``O_CLOEXEC`` to remove common foot-guns.
75 Foreign file ownership and overly permissive file permissions are rejected to prevent sockets being used by third parties.
76- The public API is intentionally tiny and ergonomic. ``ConnectionLeaseManager.acquire()`` never blocks: it either returns
77 a lease for an existing name or a new name, ensuring predictable low latency under contention. The ``ConnectionLease``
78 is immutable and simple to reason about, with an explicit ``release()`` to return capacity to the pool.
79- No external services, daemons, or background threads are required. The design favors determinate behavior under failures,
80 idempotent operations, and ease of testing. This approach integrates cleanly with ``bzfs`` where predictable SSH reuse and
81 teardown during snapshot replication must remain both fast and safe.
82"""
84from __future__ import (
85 annotations,
86)
87import fcntl
88import logging
89import os
90import pathlib
91import random
92import time
93from typing import (
94 Final,
95 NamedTuple,
96 final,
97)
99from bzfs_main.util.utils import (
100 DIR_PERMISSIONS,
101 FILE_PERMISSIONS,
102 LOG_TRACE,
103 UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH,
104 close_quietly,
105 sha256_urlsafe_base64,
106 urlsafe_base64,
107 validate_file_permissions,
108 validate_is_not_a_symlink,
109)
111# constants:
112SOCKETS_DIR: Final[str] = "c"
113FREE_DIR: Final[str] = "free"
114USED_DIR: Final[str] = "used"
115SOCKET_PREFIX: Final[str] = "s"
116NAMESPACE_DIR_LENGTH: Final[int] = 43 # 43 Base64 chars contain the entire SHA-256 of the SSH endpoint
119#############################################################################
120@final
121class ConnectionLease(NamedTuple):
122 """Purpose: Reduce SSH connection startup latency of a fresh bzfs process via safe OpenSSH ControlPath reuse.
124 Assumptions: Callers hold this object only while the lease is needed. The file descriptor remains valid and keeps an
125 exclusive advisory lock until ``release()`` or process exit. Paths are absolute and live within the manager's directory
126 tree.
128 Design Rationale: A small, immutable class captures just the essential handles needed for correctness and observability:
129 the open fd, the used/ and free/ lock file paths, and the Unix domain socket ControlPath that SSH uses. Immutability
130 prevents accidental mutation bugs, keeps equality semantics straightforward, and encourages explicit ownership transfer.
131 A dedicated ``release()`` method centralizes cleanup and safe transition of the lock file to free/, followed by closing
132 the fd for deterministic unlock.
133 """
135 # immutable variables:
136 fd: int
137 used_path: str
138 free_path: str
139 socket_path: str
141 def release(self) -> None:
142 """Releases the lease: moves the lock file from used/ dir back to free/ dir, then unlocks it by closing its fd."""
143 try:
144 os.rename(self.used_path, self.free_path) # mv lock file atomically while holding the lock
145 except FileNotFoundError:
146 pass # harmless
147 finally:
148 close_quietly(self.fd) # release lock
150 def set_socket_mtime_to_now(self) -> None:
151 """Sets the mtime of the lease's ControlPath socket file to now; noop if the file is missing."""
152 try:
153 os.utime(self.socket_path, None)
154 except FileNotFoundError:
155 pass # harmless
158#############################################################################
159@final
160class ConnectionLeaseManager:
161 """Purpose: Reduce SSH connection startup latency of a fresh bzfs process via safe OpenSSH ControlPath reuse.
163 Assumptions: The manager has exclusive control of its root directory subtree. The process operates on a POSIX-compliant
164 filesystem and uses advisory locks consistently. Path lengths must respect common Unix domain socket limits. Callers
165 use the socket path with OpenSSH.
167 Design Rationale: A compact state machine with free/ and used/ stages, guided by atomically acquired file locks and
168 renames, yields predictable latency, natural crash recovery, and zero background maintenance. The API is intentionally
169 minimal to reduce misuse. Defensive checks and secure open flags balance correctness, performance, and security without
170 external dependencies.
171 """
173 def __init__(
174 self,
175 root_dir: str, # the local user is implied by root_dir
176 namespace: str, # derived from the remote endpoint identity (ssh_user_host, port, ssh_config_file hash)
177 ssh_control_persist_secs: int = 90, # TTL for garbage collecting stale files while preserving reuse of live masters
178 *,
179 log: logging.Logger,
180 ) -> None:
181 """Initializes manager with namespaced dirs and security settings for SSH ControlPath reuse."""
182 # immutable variables:
183 assert root_dir
184 assert namespace
185 assert ssh_control_persist_secs >= 1
186 self._ssh_control_persist_secs: Final[int] = ssh_control_persist_secs
187 self._log: Final[logging.Logger] = log
188 ns: str = sha256_urlsafe_base64(namespace, padding=False)
189 assert NAMESPACE_DIR_LENGTH >= 22 # a minimum degree of safety: 22 URL-safe Base64 chars = 132 bits of entropy
190 ns = ns[0:NAMESPACE_DIR_LENGTH]
191 namespace_dir: str = os.path.join(root_dir, ns)
192 self._sockets_dir: Final[str] = os.path.join(namespace_dir, SOCKETS_DIR)
193 self._free_dir: Final[str] = os.path.join(namespace_dir, FREE_DIR)
194 self._used_dir: Final[str] = os.path.join(namespace_dir, USED_DIR)
195 self._open_flags: Final[int] = os.O_WRONLY | os.O_NOFOLLOW | os.O_CLOEXEC
196 os.makedirs(root_dir, mode=DIR_PERMISSIONS, exist_ok=True)
197 validate_is_not_a_symlink("connection lease root_dir ", root_dir)
198 validate_file_permissions(root_dir, mode=DIR_PERMISSIONS)
199 os.makedirs(namespace_dir, mode=DIR_PERMISSIONS, exist_ok=True)
200 validate_is_not_a_symlink("connection lease namespace_dir ", namespace_dir)
201 validate_file_permissions(namespace_dir, mode=DIR_PERMISSIONS)
203 def _validate_dirs(self) -> None:
204 """Ensures sockets/free/used directories exist, are not symlinks, and have strict permissions."""
205 for _dir in (self._sockets_dir, self._free_dir, self._used_dir):
206 os.makedirs(_dir, mode=DIR_PERMISSIONS, exist_ok=True)
207 validate_is_not_a_symlink("connection lease dir ", _dir)
208 validate_file_permissions(_dir, mode=DIR_PERMISSIONS)
210 def acquire(self) -> ConnectionLease: # thread-safe
211 """Acquires and returns a ConnectionLease with an open, flocked fd and the SSH ControlPath aka socket file path."""
212 self._validate_dirs()
213 lease = self._find_and_acquire(self._free_dir) # fast path: find free aka unlocked socket name in O(1) expected time
214 if lease is not None:
215 self._log.log(LOG_TRACE, "_find_and_acquire: %s", self._free_dir)
216 return lease
217 lease = self._find_and_acquire(self._used_dir) # salvage from used yet unlocked socket names leftover from crash
218 if lease is not None:
219 self._log.log(LOG_TRACE, "_find_and_acquire: %s", self._used_dir)
220 return lease
221 lease = self._create_and_acquire() # create new socket name
222 self._log.log(LOG_TRACE, "_create_and_acquire: %s", self._free_dir)
223 return lease
225 def _find_and_acquire(self, scan_dir: str) -> ConnectionLease | None:
226 """Scans a directory for an unlocked lease, prunes stale entries, and returns a locked lease if found."""
227 with os.scandir(scan_dir) as iterator:
228 for entry in iterator:
229 if entry.name.startswith(SOCKET_PREFIX) and entry.is_file(follow_symlinks=False):
230 lease: ConnectionLease | None = self._try_lock(entry.path, open_flags=self._open_flags)
231 if lease is not None:
232 # If the control socket does not exist or is too old, then prune this entry to keep directory sizes
233 # bounded after crash storms, without losing opportunities to reuse a live SSH master connection.
234 delete_used_path: bool = False
235 try:
236 age_secs: float = time.time() - os.stat(lease.socket_path).st_mtime
237 if age_secs > self._ssh_control_persist_secs: # old garbage left over from crash?
238 delete_used_path = True
239 os.unlink(lease.socket_path) # remove control socket garbage while holding the lock
240 except FileNotFoundError:
241 delete_used_path = True # control socket does not exist anymore
242 if not delete_used_path:
243 return lease # return locked lease; this is the common case
244 try: # Remove the renamed lock file at its current location under used/ while holding the lock
245 pathlib.Path(lease.used_path).unlink(missing_ok=True)
246 finally:
247 close_quietly(lease.fd) # release lock
248 # keep scanning for a better candidate
249 return None
251 def _create_and_acquire(self) -> ConnectionLease:
252 """Creates a new unique lease name, enforces socket path length, and returns a locked lease."""
253 max_rand: int = 2**64 - 1
254 rand: random.Random = random.SystemRandom()
255 while True:
256 random_prefix: str = urlsafe_base64(rand.randint(0, max_rand), max_value=max_rand, padding=False)
257 socket_name: str = f"{SOCKET_PREFIX}{random_prefix}"
258 socket_path: str = os.path.join(self._sockets_dir, socket_name)
260 # Intentionally error out hard if the max OS Unix domain socket path limit cannot be met reasonably as the home
261 # directory path is too long, typically because the Unix user name is unreasonably long. Failing fast here avoids
262 # opaque OS errors later in `ssh`.
263 socket_path_bytes: bytes = os.fsencode(socket_path)
264 if len(socket_path_bytes) > UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH:
265 raise OSError(
266 "SSH ControlPath exceeds Unix domain socket limit "
267 f"({len(socket_path_bytes)} > {UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH}); "
268 f"shorten it by shortening the home directory path: {socket_path}"
269 )
271 free_path: str = os.path.join(self._free_dir, socket_name)
272 lease: ConnectionLease | None = self._try_lock(free_path, open_flags=self._open_flags | os.O_CREAT)
273 if lease is not None:
274 return lease
276 def _try_lock(self, src_path: str, open_flags: int) -> ConnectionLease | None:
277 """Attempts to open and exclusively flock a lease file, atomically moves it to used/, and builds the lease."""
278 fd: int = -1
279 try:
280 fd = os.open(src_path, flags=open_flags, mode=FILE_PERMISSIONS)
282 # Acquire an exclusive lock; will raise a BlockingIOError if lock is already held by this process or another
283 # process. The (advisory) lock is auto-released when the process terminates or the fd is closed.
284 # Note: All supported operating systems also reject any attempt to acquire an exclusive flock that is already
285 # held within the same process under an fd obtained from a separate os.open() call, by raising a BlockingIOError.
286 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # LOCK_NB ... non-blocking
288 socket_name: str = os.path.basename(src_path)
289 used_path: str = os.path.join(self._used_dir, socket_name)
290 if src_path != used_path and os.path.exists(used_path): # extremely rare name collision?
291 close_quietly(fd) # release lock
292 return None # harmless; retry with another name. See test_collision_on_create_then_salvage_and_release()
294 # Rename cannot race: only free/<name> -> used/<name> produces used/ entries, and requires holding an exclusive
295 # flock on free/<name>. We hold that lock while renaming. With exclusive subtree control, used/<name> cannot
296 # appear between exists() and rename(). The atomic rename below is safe.
297 os.rename(src_path, used_path) # mv lock file atomically while holding the lock
299 except OSError as e:
300 close_quietly(fd) # release lock
301 if isinstance(e, BlockingIOError):
302 return None # lock is already held by this process or another process
303 elif isinstance(e, FileNotFoundError):
304 self._validate_dirs()
305 return None
306 raise
307 else: # success
308 return ConnectionLease(
309 fd=fd,
310 used_path=used_path,
311 free_path=os.path.join(self._free_dir, socket_name),
312 socket_path=os.path.join(self._sockets_dir, socket_name),
313 )