Coverage for bzfs_main/connection_lease.py: 100%
123 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""
16Purpose
17-----------------
18This module safely reduces SSH startup latency on bzfs process startup. Using this, a fresh bzfs CLI process can attach
19immediately to an existing OpenSSH ControlMaster connection, and in doing so skip TCP handshake, SSH handshake, key exchange,
20authentication, and other delays from multiple network round-trips. This way, the very first remote ZFS command starts
21hundreds of milliseconds earlier as compared to when creating an SSH connection from scratch.
23The higher-level goal is predictable performance, shorter critical paths, and less noisy-neighbor impact across frequent
24periodic replication jobs at fleet scale, including at high concurrency. To achieve this, OpenSSH masters remain alive after
25bzfs process termination (unless masters become idle for a specified time period), and inter-process reuse of ControlPath
26sockets is coordinated safely so every new bzfs process benefits from an existing connection when present, and can recreate
27it deterministically when absent. This amortizes startup costs, reduces tail latency, and improves operational reliability
28without background daemons, external services, or warm-up procedures.
30How This Is Achieved
31--------------------
32This module provides a small, fast, safe, and reliable mechanism to allocate and reuse unique Unix domain socket files
33(ControlPaths) in a per-endpoint namespace, even under high concurrency. The full socket path namespace is ~/.ssh/bzfs/ (per
34local user) plus a hashed subdirectory derived from the remote endpoint identity (user@host, port, ssh_config_file hash).
35A bzfs process acquires an exclusive advisory file lock via flock(2) on a named empty lock file in the namespace directory
36tree, then atomically moves it between free/ and used/ subdirectories to speed up later searches for available names in each
37category of that namespace. The held lease exposes the open file descriptor (which maintains the lock) and the computed
38ControlPath so callers can safely establish or reuse SSH master connections without races or leaks. Holding a lease's lock
39for the duration of a `bzfs` process guarantees exclusive ownership of that SSH master while allowing the underlying TCP
40connection to persist beyond bzfs process exit via OpenSSH ControlPersist.
42Also see https://en.wikibooks.org/wiki/OpenSSH/Cookbook/Multiplexing
43and https://chessman7.substack.com/p/how-ssh-multiplexing-reuses-master
45Assumptions
46-----------
47- The filesystem is POSIX-compliant and supports ``fcntl.flock`` advisory locks, atomic ``os.rename`` file moves, and
48 permissions enforcement. The process runs on the same host as the SSH client using the ControlPath.
49- Low latency: scanning the free/ and used/ directories has bounded cost and is O(1) expected time because names are
50 uniformly random. Directory contents are small and ephemeral.
51- Crash recovery is acceptable by salvaging an unlocked file from used/; an unlock indicates process termination or orderly
52 release. The flock itself is the source of truth; directory names merely provide fast classification.
54Design Rationale
55----------------
56- File locks plus atomic renames form a minimal, portable concurrency primitive that composes well with process lifecycles.
57 Locks are released automatically on process exit or when the file descriptor is closed, ensuring no cleanup logic is
58 required after abnormal termination.
59- The two-directory layout (free/ and used/) makes hot-path acquisition fast. We first probe free/ to reuse previously
60 released names. If that doesn't produce an acquisition we probe used/ to salvage leftovers from crashed processes that no
61 longer hold locks. If that doesn't produce an acquisition either we finally generate a new name. This yields a compact,
62 garbage-free namespace without a janitor.
63- Name generation mixes in a cryptographically strong random component, encoded in URL-safe base64 to avoid long names and
64 path-unfriendly characters. This reduces collision probability and helps with human diagnostics.
65- Security is prioritized: the root, sockets, and lease directories are created with explicit permissions, and symlinks are
66 rejected to avoid redirection attacks. Open flags include ``O_NOFOLLOW`` and ``O_CLOEXEC`` to remove common foot-guns.
67 Foreign file ownership and overly permissive file permissions are rejected to prevent sockets being used by third parties.
68- The public API is intentionally tiny and ergonomic. ``ConnectionLeaseManager.acquire()`` never blocks: it either returns
69 a lease for an existing name or a new name, ensuring predictable low latency under contention. The ``ConnectionLease``
70 is immutable and simple to reason about, with an explicit ``release()`` to return capacity to the pool.
71- No external services, daemons, or background threads are required. The design favors determinate behavior under failures,
72 idempotent operations, and ease of testing. This approach integrates cleanly with ``bzfs`` where predictable SSH reuse and
73 teardown during snapshot replication must remain both fast and safe.
74"""
76from __future__ import (
77 annotations,
78)
79import fcntl
80import logging
81import os
82import pathlib
83import random
84import time
85from typing import (
86 Final,
87 NamedTuple,
88)
90from bzfs_main.utils import (
91 DIR_PERMISSIONS,
92 FILE_PERMISSIONS,
93 LOG_TRACE,
94 UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH,
95 close_quietly,
96 sha256_urlsafe_base64,
97 urlsafe_base64,
98 validate_file_permissions,
99 validate_is_not_a_symlink,
100)
102# constants:
103SOCKETS_DIR: Final[str] = "c"
104FREE_DIR: Final[str] = "free"
105USED_DIR: Final[str] = "used"
106SOCKET_PREFIX: Final[str] = "s"
107NAMESPACE_DIR_LENGTH: Final[int] = 43 # 43 Base64 chars contain the entire SHA-256 of the SSH endpoint
110#############################################################################
111class ConnectionLease(NamedTuple):
112 """Purpose: Reduce SSH connection startup latency of a fresh bzfs process via safe OpenSSH ControlPath reuse.
114 Assumptions: Callers hold this object only while the lease is needed. The file descriptor remains valid and keeps an
115 exclusive advisory lock until ``release()`` or process exit. Paths are absolute and live within the manager's directory
116 tree.
118 Design Rationale: A small, immutable class captures just the essential handles needed for correctness and observability:
119 the open fd, the used/ and free/ lock file paths, and the Unix domain socket ControlPath that SSH uses. Immutability
120 prevents accidental mutation bugs, keeps equality semantics straightforward, and encourages explicit ownership transfer.
121 A dedicated ``release()`` method centralizes cleanup and safe transition of the lock file to free/, followed by closing
122 the fd for deterministic unlock.
123 """
125 # immutable variables:
126 fd: int
127 used_path: str
128 free_path: str
129 socket_path: str
131 def release(self) -> None:
132 """Releases the lease: moves the lock file from used/ dir back to free/ dir, then unlocks it by closing its fd."""
133 try:
134 os.rename(self.used_path, self.free_path) # mv lock file atomically while holding the lock
135 except FileNotFoundError:
136 pass # harmless
137 finally:
138 close_quietly(self.fd) # release lock
140 def set_socket_mtime_to_now(self) -> None:
141 """Sets the mtime of the lease's ControlPath socket file to now; noop if the file is missing."""
142 try:
143 os.utime(self.socket_path, None)
144 except FileNotFoundError:
145 pass # harmless
148#############################################################################
149class ConnectionLeaseManager:
150 """Purpose: Reduce SSH connection startup latency of a fresh bzfs process via safe OpenSSH ControlPath reuse.
152 Assumptions: The manager has exclusive control of its root directory subtree. The process operates on a POSIX-compliant
153 filesystem and uses advisory locks consistently. Path lengths must respect common Unix domain socket limits. Callers
154 use the socket path with OpenSSH.
156 Design Rationale: A compact state machine with free/ and used/ stages, guided by atomically acquired file locks and
157 renames, yields predictable latency, natural crash recovery, and zero background maintenance. The API is intentionally
158 minimal to reduce misuse. Defensive checks and secure open flags balance correctness, performance, and security without
159 external dependencies.
160 """
162 def __init__(
163 self,
164 root_dir: str, # the local user is implied by root_dir
165 namespace: str, # derived from the remote endpoint identity (ssh_user_host, port, ssh_config_file hash)
166 ssh_control_persist_secs: int = 90, # TTL for garbage collecting stale files while preserving reuse of live masters
167 *,
168 log: logging.Logger,
169 ) -> None:
170 """Initializes manager with namespaced dirs and security settings for SSH ControlPath reuse."""
171 # immutable variables:
172 assert root_dir
173 assert namespace
174 assert ssh_control_persist_secs >= 1
175 self._ssh_control_persist_secs: Final[int] = ssh_control_persist_secs
176 self._log: Final[logging.Logger] = log
177 ns: str = sha256_urlsafe_base64(namespace, padding=False)
178 assert NAMESPACE_DIR_LENGTH >= 22 # a minimum degree of safety: 22 URL-safe Base64 chars = 132 bits of entropy
179 ns = ns[0:NAMESPACE_DIR_LENGTH]
180 namespace_dir: str = os.path.join(root_dir, ns)
181 self._sockets_dir: Final[str] = os.path.join(namespace_dir, SOCKETS_DIR)
182 self._free_dir: Final[str] = os.path.join(namespace_dir, FREE_DIR)
183 self._used_dir: Final[str] = os.path.join(namespace_dir, USED_DIR)
184 self._open_flags: Final[int] = os.O_WRONLY | os.O_NOFOLLOW | os.O_CLOEXEC
185 os.makedirs(root_dir, mode=DIR_PERMISSIONS, exist_ok=True)
186 validate_is_not_a_symlink("connection lease root_dir ", root_dir)
187 validate_file_permissions(root_dir, mode=DIR_PERMISSIONS)
188 os.makedirs(namespace_dir, mode=DIR_PERMISSIONS, exist_ok=True)
189 validate_is_not_a_symlink("connection lease namespace_dir ", namespace_dir)
190 validate_file_permissions(namespace_dir, mode=DIR_PERMISSIONS)
192 def _validate_dirs(self) -> None:
193 """Ensures sockets/free/used directories exist, are not symlinks, and have strict permissions."""
194 for _dir in (self._sockets_dir, self._free_dir, self._used_dir):
195 os.makedirs(_dir, mode=DIR_PERMISSIONS, exist_ok=True)
196 validate_is_not_a_symlink("connection lease dir ", _dir)
197 validate_file_permissions(_dir, mode=DIR_PERMISSIONS)
199 def acquire(self) -> ConnectionLease: # thread-safe
200 """Acquires and returns a ConnectionLease with an open, flocked fd and the SSH ControlPath aka socket file path."""
201 self._validate_dirs()
202 lease = self._find_and_acquire(self._free_dir) # fast path: find free aka unlocked socket name in O(1) expected time
203 if lease is not None:
204 self._log.log(LOG_TRACE, "_find_and_acquire: %s", self._free_dir)
205 return lease
206 lease = self._find_and_acquire(self._used_dir) # salvage from used yet unlocked socket names leftover from crash
207 if lease is not None:
208 self._log.log(LOG_TRACE, "_find_and_acquire: %s", self._used_dir)
209 return lease
210 lease = self._create_and_acquire() # create new socket name
211 self._log.log(LOG_TRACE, "_create_and_acquire: %s", self._free_dir)
212 return lease
214 def _find_and_acquire(self, scan_dir: str) -> ConnectionLease | None:
215 """Scans a directory for an unlocked lease, prunes stale entries, and returns a locked lease if found."""
216 with os.scandir(scan_dir) as iterator:
217 for entry in iterator:
218 if entry.name.startswith(SOCKET_PREFIX) and entry.is_file(follow_symlinks=False):
219 lease: ConnectionLease | None = self._try_lock(entry.path, open_flags=self._open_flags)
220 if lease is not None:
221 # If the control socket does not exist or is too old, then prune this entry to keep directory sizes
222 # bounded after crash storms, without losing opportunities to reuse a live SSH master connection.
223 delete_used_path: bool = False
224 try:
225 age_secs: float = time.time() - os.stat(lease.socket_path).st_mtime
226 if age_secs > self._ssh_control_persist_secs: # old garbage left over from crash?
227 delete_used_path = True
228 os.unlink(lease.socket_path) # remove control socket garbage while holding the lock
229 except FileNotFoundError:
230 delete_used_path = True # control socket does not exist anymore
231 if not delete_used_path:
232 return lease # return locked lease; this is the common case
233 try: # Remove the renamed lock file at its current location under used/ while holding the lock
234 pathlib.Path(lease.used_path).unlink(missing_ok=True)
235 finally:
236 close_quietly(lease.fd) # release lock
237 # keep scanning for a better candidate
238 return None
240 def _create_and_acquire(self) -> ConnectionLease:
241 """Creates a new unique lease name, enforces socket path length, and returns a locked lease."""
242 max_rand: int = 2**64 - 1
243 rand: random.Random = random.SystemRandom()
244 while True:
245 random_prefix: str = urlsafe_base64(rand.randint(0, max_rand), max_value=max_rand, padding=False)
246 socket_name: str = f"{SOCKET_PREFIX}{random_prefix}"
247 socket_path: str = os.path.join(self._sockets_dir, socket_name)
249 # Intentionally error out hard if the max OS Unix domain socket path limit cannot be met reasonably as the home
250 # directory path is too long, typically because the Unix user name is unreasonably long. Failing fast here avoids
251 # opaque OS errors later in `ssh`.
252 socket_path_bytes: bytes = os.fsencode(socket_path)
253 if len(socket_path_bytes) > UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH:
254 raise OSError(
255 "SSH ControlPath exceeds Unix domain socket limit "
256 f"({len(socket_path_bytes)} > {UNIX_DOMAIN_SOCKET_PATH_MAX_LENGTH}); "
257 f"shorten it by shortening the home directory path: {socket_path}"
258 )
260 free_path: str = os.path.join(self._free_dir, os.path.basename(socket_path))
261 lease: ConnectionLease | None = self._try_lock(free_path, open_flags=self._open_flags | os.O_CREAT)
262 if lease is not None:
263 return lease
265 def _try_lock(self, src_path: str, open_flags: int) -> ConnectionLease | None:
266 """Attempts to open and exclusively flock a lease file, atomically moves it to used/, and builds the lease."""
267 fd: int = -1
268 try:
269 fd = os.open(src_path, flags=open_flags, mode=FILE_PERMISSIONS)
271 # Acquire an exclusive lock; will raise a BlockingIOError if lock is already held by this process or another
272 # process. The (advisory) lock is auto-released when the process terminates or the fd is closed.
273 # Note: All supported operating systems also reject any attempt to acquire an exclusive flock that is already
274 # held within the same process under an fd obtained from a separate os.open() call, by raising a BlockingIOError.
275 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # LOCK_NB ... non-blocking
277 socket_name: str = os.path.basename(src_path)
278 used_path: str = os.path.join(self._used_dir, socket_name)
279 if src_path != used_path and os.path.exists(used_path): # extremely rare name collision?
280 close_quietly(fd) # release lock
281 return None # harmless; retry with another name. See test_collision_on_create_then_salvage_and_release()
283 # Rename cannot race: only free/<name> -> used/<name> produces used/ entries, and requires holding an exclusive
284 # flock on free/<name>. We hold that lock while renaming. With exclusive subtree control, used/<name> cannot
285 # appear between exists() and rename(). The atomic rename below is safe.
286 os.rename(src_path, used_path) # mv lock file atomically while holding the lock
288 except OSError as e:
289 close_quietly(fd) # release lock
290 if isinstance(e, BlockingIOError):
291 return None # lock is already held by this process or another process
292 elif isinstance(e, FileNotFoundError):
293 self._validate_dirs()
294 return None
295 raise
296 else: # success
297 return ConnectionLease(
298 fd=fd,
299 used_path=used_path,
300 free_path=os.path.join(self._free_dir, socket_name),
301 socket_path=os.path.join(self._sockets_dir, socket_name),
302 )