Coverage for bzfs_main / util / connection.py: 99%
284 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Efficient thread-safe SSH command client; See run_ssh_command() and refresh_ssh_connection_if_necessary() and class
16ConnectionPool and class ConnectionLease.
18Can be configured to reuse multiplexed SSH connections for low latency, even on fresh process startup, for example leading to
19ballpark 3-5ms total time for running `/bin/echo hello` end-to-end over SSH on LAN, which requires two (sequential) network
20round trips (one for CHANNEL_OPEN, plus a subsequent one for CHANNEL_REQUEST).
21Has zero dependencies beyond the standard OpenSSH client CLI (`ssh`); also works with `hpnssh`. The latter uses larger TCP
22window sizes for best throughput over high speed long distance networks, aka paths with large bandwidth-delay product.
24Example usage:
26import logging
27from subprocess import DEVNULL, PIPE
28from bzfs_main.util.connection import ConnectionPool, create_simple_minijob, create_simple_miniremote
29from bzfs_main.util.retry import Retry, RetryPolicy, call_with_retries
31log = logging.getLogger(__name__)
32remote = create_simple_miniremote(log=log, ssh_user_host="alice@127.0.0.1")
33connection_pool = ConnectionPool(remote, connpool_name="example")
34try:
35 job = create_simple_minijob()
36 retry_policy = RetryPolicy(
37 max_retries=10,
38 min_sleep_secs=0,
39 initial_max_sleep_secs=0.125,
40 max_sleep_secs=10,
41 max_elapsed_secs=60,
42 )
44 def run_cmd(retry: Retry) -> str:
45 with connection_pool.connection() as conn:
46 stdout: str = conn.run_ssh_command(
47 cmd=["echo", "hello"], job=job, check=True, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, text=True
48 ).stdout
49 return stdout
51 stdout = call_with_retries(fn=run_cmd, policy=retry_policy, log=log)
52 print(f"stdout: {stdout}")
53finally:
54 connection_pool.shutdown()
55"""
57from __future__ import (
58 annotations,
59)
60import contextlib
61import copy
62import logging
63import os
64import shlex
65import subprocess
66import threading
67import time
68from collections.abc import (
69 Iterator,
70)
71from dataclasses import (
72 dataclass,
73)
74from subprocess import (
75 DEVNULL,
76 PIPE,
77)
78from typing import (
79 Any,
80 Final,
81 Protocol,
82 final,
83 runtime_checkable,
84)
86from bzfs_main.util.connection_lease import (
87 ConnectionLease,
88 ConnectionLeaseManager,
89)
90from bzfs_main.util.retry import (
91 RetryableError,
92)
93from bzfs_main.util.utils import (
94 LOG_TRACE,
95 SmallPriorityQueue,
96 Subprocesses,
97 die,
98 get_home_directory,
99 list_formatter,
100 sha256_urlsafe_base64,
101 stderr_to_str,
102)
104# constants:
105SHARED: Final[str] = "shared"
106DEDICATED: Final[str] = "dedicated"
109#############################################################################
110@runtime_checkable
111class MiniJob(Protocol):
112 """Minimal Job interface required by the connections module; for loose coupling."""
114 timeout_nanos: int | None # timestamp aka instant in time
115 timeout_duration_nanos: int | None # duration (not a timestamp); for logging only
116 subprocesses: Subprocesses
119#############################################################################
120@runtime_checkable
121class MiniParams(Protocol):
122 """Minimal Params interface used by the connections module; for loose coupling."""
124 log: logging.Logger
125 ssh_program: str # name or path of executable; "hpnssh" is also valid
128#############################################################################
129@runtime_checkable
130class MiniRemote(Protocol):
131 """Minimal Remote interface used by the connections module; for loose coupling."""
133 params: MiniParams
134 location: str # "src" or "dst"
135 ssh_user_host: str # use the empty string to indicate local mode (no ssh)
136 ssh_extra_opts: tuple[str, ...]
137 reuse_ssh_connection: bool
138 ssh_control_persist_secs: int
139 ssh_control_persist_margin_secs: int
140 ssh_exit_on_shutdown: bool
141 ssh_socket_dir: str
143 def is_ssh_available(self) -> bool:
144 """Return True if the ssh client program required for this remote is available on the local host."""
146 def local_ssh_command(self, socket_file: str | None) -> list[str]:
147 """Returns the ssh CLI command to run locally in order to talk to the remote host; This excludes the (trailing)
148 command to run on the remote host, which will be appended later."""
150 def cache_namespace(self) -> str:
151 """Returns cache namespace string which is a stable, unique directory component for caches that distinguishes
152 endpoints by username+host+port+ssh_config_file where applicable, and uses '-' when no user/host is present (local
153 mode)."""
156#############################################################################
157def create_simple_miniremote(
158 log: logging.Logger,
159 ssh_user_host: str = "", # option passed to `ssh` CLI; empty string indicates local mode
160 ssh_port: int | None = None, # option passed to `ssh` CLI
161 ssh_extra_opts: list[str] | None = None, # optional args passed to `ssh` CLI
162 ssh_verbose: bool = False, # option passed to `ssh` CLI
163 ssh_config_file: str = "", # option passed to `ssh` CLI; example: /path/to/homedir/.ssh/config
164 ssh_cipher: str = "^aes256-gcm@openssh.com", # option passed to `ssh` CLI
165 ssh_program: str = "ssh", # name or path of CLI executable; "hpnssh" is also valid
166 reuse_ssh_connection: bool = True,
167 ssh_control_persist_secs: int = 90,
168 ssh_control_persist_margin_secs: int = 2,
169 ssh_socket_dir: str = os.path.join(get_home_directory(), ".ssh", "bzfs"),
170 location: str = "dst",
171) -> MiniRemote:
172 """Factory that returns a simple implementation of the MiniRemote interface."""
174 @dataclass(frozen=True) # aka immutable
175 @final
176 class SimpleMiniParams(MiniParams):
177 log: logging.Logger
178 ssh_program: str
180 @dataclass(frozen=True) # aka immutable
181 @final
182 class SimpleMiniRemote(MiniRemote):
183 params: MiniParams
184 location: str # "src" or "dst"
185 ssh_user_host: str
186 ssh_extra_opts: tuple[str, ...]
187 reuse_ssh_connection: bool
188 ssh_control_persist_secs: int
189 ssh_control_persist_margin_secs: int
190 ssh_exit_on_shutdown: bool
191 ssh_socket_dir: str
192 ssh_port: int | None
193 ssh_config_file: str
194 ssh_config_file_hash: str
196 def is_ssh_available(self) -> bool:
197 return True
199 def local_ssh_command(self, socket_file: str | None) -> list[str]:
200 if not self.ssh_user_host:
201 return [] # local mode
202 ssh_cmd: list[str] = [self.params.ssh_program]
203 ssh_cmd.extend(self.ssh_extra_opts)
204 if self.reuse_ssh_connection and socket_file:
205 ssh_cmd.append("-S")
206 ssh_cmd.append(socket_file)
207 ssh_cmd.append(self.ssh_user_host)
208 return ssh_cmd
210 def cache_namespace(self) -> str:
211 if not self.ssh_user_host:
212 return "-" # local mode
213 return f"{self.ssh_user_host}#{self.ssh_port or ''}#{self.ssh_config_file_hash}"
215 if log is None:
216 raise ValueError("log must not be None")
217 if not ssh_program:
218 raise ValueError("ssh_program must be a non-empty string")
219 if location not in ("src", "dst"):
220 raise ValueError("location must be 'src' or 'dst'")
221 if ssh_control_persist_secs < 1:
222 raise ValueError("ssh_control_persist_secs must be >= 1")
223 params: MiniParams = SimpleMiniParams(log=log, ssh_program=ssh_program)
225 ssh_extra_opts = ( # disable interactive password prompts and X11 forwarding and pseudo-terminal allocation
226 ["-oBatchMode=yes", "-oServerAliveInterval=0", "-x", "-T"] if ssh_extra_opts is None else list(ssh_extra_opts)
227 )
228 ssh_extra_opts += ["-v"] if ssh_verbose else []
229 ssh_extra_opts += ["-F", ssh_config_file] if ssh_config_file else []
230 ssh_extra_opts += ["-c", ssh_cipher] if ssh_cipher else []
231 ssh_extra_opts += ["-p", str(ssh_port)] if ssh_port is not None else []
232 ssh_config_file_hash = sha256_urlsafe_base64(os.path.abspath(ssh_config_file), padding=False) if ssh_config_file else ""
233 return SimpleMiniRemote(
234 params=params,
235 location=location,
236 ssh_user_host=ssh_user_host,
237 ssh_extra_opts=tuple(ssh_extra_opts),
238 reuse_ssh_connection=reuse_ssh_connection,
239 ssh_control_persist_secs=ssh_control_persist_secs,
240 ssh_control_persist_margin_secs=ssh_control_persist_margin_secs,
241 ssh_exit_on_shutdown=False,
242 ssh_socket_dir=ssh_socket_dir,
243 ssh_port=ssh_port,
244 ssh_config_file=ssh_config_file,
245 ssh_config_file_hash=ssh_config_file_hash,
246 )
249def create_simple_minijob(timeout_duration_secs: float | None = None, subprocesses: Subprocesses | None = None) -> MiniJob:
250 """Factory that returns a simple implementation of the MiniJob interface."""
252 @dataclass(frozen=True) # aka immutable
253 @final
254 class SimpleMiniJob(MiniJob):
255 timeout_nanos: int | None # timestamp aka instant in time
256 timeout_duration_nanos: int | None # duration (not a timestamp); for logging only
257 subprocesses: Subprocesses
259 t_duration_nanos: int | None = None if timeout_duration_secs is None else int(timeout_duration_secs * 1_000_000_000)
260 timeout_nanos: int | None = None if t_duration_nanos is None else time.monotonic_ns() + t_duration_nanos
261 subprocesses = Subprocesses() if subprocesses is None else subprocesses
262 return SimpleMiniJob(timeout_nanos=timeout_nanos, timeout_duration_nanos=t_duration_nanos, subprocesses=subprocesses)
265#############################################################################
266def timeout(job: MiniJob) -> float | None:
267 """Raises TimeoutExpired if necessary, else returns the number of seconds left until timeout is to occur."""
268 timeout_nanos: int | None = job.timeout_nanos
269 if timeout_nanos is None:
270 return None # never raise a timeout
271 assert job.timeout_duration_nanos is not None
272 delta_nanos: int = timeout_nanos - time.monotonic_ns()
273 if delta_nanos <= 0:
274 raise subprocess.TimeoutExpired("_timeout", timeout=job.timeout_duration_nanos / 1_000_000_000)
275 return delta_nanos / 1_000_000_000 # seconds
278def squote(remote: MiniRemote, arg: str) -> str:
279 """Quotes an argument only when running remotely over ssh."""
280 assert arg is not None
281 return shlex.quote(arg) if remote.ssh_user_host else arg
284def dquote(arg: str) -> str:
285 """Shell-escapes backslash and double quotes and dollar and backticks, then surrounds with double quotes; For an example
286 how to safely construct and quote complex shell pipeline commands for use over SSH, see
287 replication.py:_prepare_zfs_send_receive()"""
288 arg = arg.replace("\\", "\\\\").replace('"', '\\"').replace("$", "\\$").replace("`", "\\`")
289 return '"' + arg + '"'
292#############################################################################
293@dataclass(order=True, repr=False)
294@final
295class Connection:
296 """Represents the ability to multiplex N=capacity concurrent SSH sessions over the same TCP connection."""
298 _free: int # sort order evens out the number of concurrent sessions among the TCP connections
299 _last_modified: int # LIFO: tiebreaker favors latest returned conn as that's most alive and hot; also ensures no dupes
301 def __init__(
302 self,
303 remote: MiniRemote,
304 max_concurrent_ssh_sessions_per_tcp_connection: int,
305 lease: ConnectionLease | None = None,
306 ) -> None:
307 assert max_concurrent_ssh_sessions_per_tcp_connection > 0
308 self._remote: Final[MiniRemote] = remote
309 self._capacity: Final[int] = max_concurrent_ssh_sessions_per_tcp_connection
310 self._free: int = max_concurrent_ssh_sessions_per_tcp_connection
311 self._last_modified: int = 0 # monotonically increasing
312 self._last_refresh_time: int = 0
313 self._lock: Final[threading.Lock] = threading.Lock()
314 self._reuse_ssh_connection: Final[bool] = remote.reuse_ssh_connection
315 self._connection_lease: Final[ConnectionLease | None] = lease
316 self._ssh_cmd: Final[list[str]] = remote.local_ssh_command(
317 None if self._connection_lease is None else self._connection_lease.socket_path
318 )
319 self._ssh_cmd_quoted: Final[list[str]] = [shlex.quote(item) for item in self._ssh_cmd]
321 @property
322 def ssh_cmd(self) -> list[str]:
323 return self._ssh_cmd.copy()
325 @property
326 def ssh_cmd_quoted(self) -> list[str]:
327 return self._ssh_cmd_quoted.copy()
329 def __repr__(self) -> str:
330 return str({"free": self._free})
332 def run_ssh_command(
333 self,
334 cmd: list[str],
335 job: MiniJob,
336 loglevel: int = logging.INFO,
337 is_dry: bool = False,
338 **kwargs: Any, # optional low-level keyword args to be forwarded to subprocess.run()
339 ) -> subprocess.CompletedProcess:
340 """Runs the given CLI cmd via ssh on the given remote, and returns CompletedProcess including stdout and stderr.
342 The full command is the concatenation of both the command to run on the localhost in order to talk to the remote host
343 ($remote.local_ssh_command()) and the command to run on the given remote host ($cmd).
345 Note: When executing on a remote host (remote.ssh_user_host is set), cmd arguments are pre-quoted with shlex.quote to
346 safely traverse the ssh "remote shell" boundary, as ssh concatenates argv into a single remote shell string. In local
347 mode (no remote.ssh_user_host) argv is executed directly without an intermediate shell.
348 """
349 if not cmd:
350 raise ValueError("run_ssh_command requires a non-empty cmd list")
351 log: logging.Logger = self._remote.params.log
352 quoted_cmd: list[str] = [shlex.quote(arg) for arg in cmd]
353 ssh_cmd: list[str] = self._ssh_cmd
354 if self._remote.ssh_user_host:
355 self.refresh_ssh_connection_if_necessary(job)
356 cmd = quoted_cmd
357 msg: str = "Would execute: %s" if is_dry else "Executing: %s"
358 log.log(loglevel, msg, list_formatter(self._ssh_cmd_quoted + quoted_cmd, lstrip=True))
359 if is_dry:
360 return subprocess.CompletedProcess(ssh_cmd + cmd, returncode=0, stdout=None, stderr=None)
361 else:
362 sp: Subprocesses = job.subprocesses
363 return sp.subprocess_run(ssh_cmd + cmd, timeout=timeout(job), log=log, **kwargs)
365 def refresh_ssh_connection_if_necessary(self, job: MiniJob) -> None:
366 """Maintain or create an ssh master connection for low latency reuse."""
367 remote: MiniRemote = self._remote
368 p: MiniParams = remote.params
369 log: logging.Logger = p.log
370 if not remote.ssh_user_host:
371 return # we're in local mode; no ssh required
372 if not remote.is_ssh_available():
373 die(f"{p.ssh_program} CLI is not available to talk to remote host. Install {p.ssh_program} first!")
374 if not remote.reuse_ssh_connection:
375 return
377 # Performance: reuse ssh connection for low latency startup of frequent ssh invocations via the 'ssh -S' and
378 # 'ssh -S -M -oControlPersist=90s' options. See https://en.wikibooks.org/wiki/OpenSSH/Cookbook/Multiplexing
379 # and https://chessman7.substack.com/p/how-ssh-multiplexing-reuses-master
380 control_limit_nanos: int = (remote.ssh_control_persist_secs - remote.ssh_control_persist_margin_secs) * 1_000_000_000
381 with self._lock:
382 if time.monotonic_ns() < self._last_refresh_time + control_limit_nanos:
383 return # ssh master is alive, reuse its TCP connection (this is the common case and the ultra-fast path)
384 ssh_cmd: list[str] = self._ssh_cmd
385 ssh_sock_cmd: list[str] = ssh_cmd[0:-1] # omit trailing ssh_user_host
386 ssh_sock_cmd += ["-O", "check", remote.ssh_user_host]
387 # extend lifetime of ssh master by $ssh_control_persist_secs via `ssh -O check` if master is still running.
388 # `ssh -S /path/to/socket -O check` doesn't talk over the network, hence is still a low latency fast path.
389 sp: Subprocesses = job.subprocesses
390 t: float | None = timeout(job)
391 if sp.subprocess_run(ssh_sock_cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, timeout=t, log=log).returncode == 0:
392 log.log(LOG_TRACE, "ssh connection is alive: %s", list_formatter(ssh_sock_cmd))
393 else: # ssh master is not alive; start a new master:
394 log.log(LOG_TRACE, "ssh connection is not yet alive: %s", list_formatter(ssh_sock_cmd))
395 ssh_control_persist_secs: int = max(1, remote.ssh_control_persist_secs)
396 if "-v" in remote.ssh_extra_opts:
397 # Unfortunately, with `ssh -v` (debug mode), the ssh master won't background; instead it stays in the
398 # foreground and blocks until the ControlPersist timer expires (90 secs). To make progress earlier we ...
399 ssh_control_persist_secs = min(1, ssh_control_persist_secs) # tell ssh block as briefly as possible (1s)
400 ssh_sock_cmd = ssh_cmd[0:-1] # omit trailing ssh_user_host
401 ssh_sock_cmd += ["-M", f"-oControlPersist={ssh_control_persist_secs}s", remote.ssh_user_host, "exit"]
402 log.log(LOG_TRACE, "Executing: %s", list_formatter(ssh_sock_cmd))
403 t = timeout(job)
404 try:
405 sp.subprocess_run(ssh_sock_cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, check=True, timeout=t, log=log)
406 except subprocess.CalledProcessError as e:
407 log.error("%s", stderr_to_str(e.stderr).rstrip())
408 raise RetryableError(
409 f"Cannot ssh into remote host via '{' '.join(ssh_sock_cmd)}'. Fix ssh configuration first, "
410 "considering diagnostic log file output from running with -v -v -v.",
411 display_msg="ssh connect",
412 ) from e
413 self._last_refresh_time = time.monotonic_ns()
414 if self._connection_lease is not None:
415 self._connection_lease.set_socket_mtime_to_now()
417 def _increment_free(self, value: int) -> None:
418 """Adjusts the count of available SSH slots."""
419 self._free += value
420 assert self._free >= 0
421 assert self._free <= self._capacity
423 def _is_full(self) -> bool:
424 """Returns True if no more SSH sessions may be opened over this TCP connection."""
425 return self._free <= 0
427 def _update_last_modified(self, last_modified: int) -> None:
428 """Records when the connection was last used."""
429 self._last_modified = last_modified
431 def shutdown(self, msg_prefix: str) -> None:
432 """Closes the underlying SSH master connection and releases the corresponding connection lease."""
433 ssh_cmd: list[str] = self._ssh_cmd
434 if ssh_cmd and self._reuse_ssh_connection:
435 if self._connection_lease is None:
436 ssh_sock_cmd: list[str] = ssh_cmd[0:-1] + ["-O", "exit", ssh_cmd[-1]]
437 log = self._remote.params.log
438 log.log(LOG_TRACE, f"Executing {msg_prefix}: %s", shlex.join(ssh_sock_cmd))
439 try:
440 proc: subprocess.CompletedProcess = subprocess.run(
441 ssh_sock_cmd, stdin=DEVNULL, stderr=PIPE, text=True, timeout=0.1
442 )
443 except subprocess.TimeoutExpired as e: # harmless as master auto-exits after ssh_control_persist_secs anyway
444 log.log(LOG_TRACE, "Harmless ssh master connection shutdown timeout: %s", e)
445 else:
446 if proc.returncode != 0: # harmless for the same reason
447 log.log(LOG_TRACE, "Harmless ssh master connection shutdown issue: %s", proc.stderr.rstrip())
448 else:
449 self._connection_lease.release()
452#############################################################################
453class ConnectionPool:
454 """Fetch a TCP connection for use in an SSH session, use it, finally return it back to the pool for future reuse;
455 Note that max_concurrent_ssh_sessions_per_tcp_connection must not be larger than the server-side sshd_config(5)
456 MaxSessions parameter (which defaults to 10, see https://manpages.ubuntu.com/manpages/man5/sshd_config.5.html)."""
458 def __init__(
459 self, remote: MiniRemote, connpool_name: str, max_concurrent_ssh_sessions_per_tcp_connection: int = 8
460 ) -> None:
461 assert max_concurrent_ssh_sessions_per_tcp_connection > 0
462 self._remote: Final[MiniRemote] = copy.copy(remote) # shallow copy for immutability (Remote is mutable)
463 self._capacity: Final[int] = max_concurrent_ssh_sessions_per_tcp_connection
464 self._connpool_name: Final[str] = connpool_name
465 self._priority_queue: Final[SmallPriorityQueue[Connection]] = SmallPriorityQueue(
466 reverse=True # sorted by #free slots and last_modified
467 )
468 self._last_modified: int = 0 # monotonically increasing sequence number
469 self._lock: Final[threading.Lock] = threading.Lock()
470 lease_mgr: ConnectionLeaseManager | None = None
471 if self._remote.ssh_user_host and self._remote.reuse_ssh_connection and not self._remote.ssh_exit_on_shutdown:
472 lease_mgr = ConnectionLeaseManager(
473 root_dir=self._remote.ssh_socket_dir,
474 namespace=f"{self._remote.location}#{self._remote.cache_namespace()}#{self._connpool_name}",
475 ssh_control_persist_secs=max(90 * 60, 2 * self._remote.ssh_control_persist_secs + 2),
476 log=self._remote.params.log,
477 )
478 self._lease_mgr: Final[ConnectionLeaseManager | None] = lease_mgr
480 @contextlib.contextmanager
481 def connection(self) -> Iterator[Connection]:
482 """Context manager that yields a connection from the pool and automatically returns it on __exit__."""
483 conn: Connection = self.get_connection()
484 try:
485 yield conn
486 finally:
487 self.return_connection(conn)
489 def get_connection(self) -> Connection:
490 """Any Connection object returned on get_connection() also remains intentionally contained in the priority queue
491 while it is "checked out", and that identical Connection object is later, on return_connection(), temporarily removed
492 from the priority queue, updated with an incremented "free" slot count and then immediately reinserted into the
493 priority queue.
495 In effect, any Connection object remains intentionally contained in the priority queue at all times. This design
496 keeps ordering/fairness accurate while avoiding duplicate Connection instances.
497 """
498 with self._lock:
499 conn = self._priority_queue.pop() if len(self._priority_queue) > 0 else None
500 if conn is None or conn._is_full(): # noqa: SLF001 # pylint: disable=protected-access
501 if conn is not None:
502 self._priority_queue.push(conn)
503 conn = self._new_connection() # add a new connection
504 self._last_modified += 1
505 conn._update_last_modified(self._last_modified) # noqa: SLF001 # pylint: disable=protected-access
506 conn._increment_free(-1) # noqa: SLF001 # pylint: disable=protected-access
507 self._priority_queue.push(conn)
508 return conn
510 def _new_connection(self) -> Connection:
511 lease: ConnectionLease | None = None if self._lease_mgr is None else self._lease_mgr.acquire()
512 return Connection(self._remote, self._capacity, lease=lease)
514 def return_connection(self, conn: Connection) -> None:
515 """Returns the given connection to the pool and updates its priority."""
516 assert conn is not None
517 with self._lock:
518 # update priority = remove conn from queue, increment priority, finally reinsert updated conn into queue
519 if self._priority_queue.remove(conn): # conn is not contained only if ConnectionPool.shutdown() was called
520 conn._increment_free(1) # noqa: SLF001 # pylint: disable=protected-access
521 self._last_modified += 1
522 conn._update_last_modified(self._last_modified) # noqa: SLF001 # pylint: disable=protected-access
523 self._priority_queue.push(conn)
525 def shutdown(self, msg_prefix: str = "") -> None:
526 """Closes all SSH connections managed by this pool."""
527 with self._lock:
528 try:
529 if self._remote.reuse_ssh_connection:
530 msg_prefix = msg_prefix + "/" + self._connpool_name
531 for conn in self._priority_queue:
532 conn.shutdown(msg_prefix)
533 finally:
534 self._priority_queue.clear()
536 def __repr__(self) -> str:
537 with self._lock:
538 queue = self._priority_queue
539 return str({"capacity": self._capacity, "queue_len": len(queue), "queue": queue})
542#############################################################################
543@final
544class ConnectionPools:
545 """A bunch of named connection pools with various multiplexing capacities."""
547 def __init__(self, remote: MiniRemote, capacities: dict[str, int]) -> None:
548 """Creates one connection pool per name with the given capacities."""
549 self._pools: Final[dict[str, ConnectionPool]] = {
550 name: ConnectionPool(remote, name, capacity) for name, capacity in capacities.items()
551 }
553 def __repr__(self) -> str:
554 return str(self._pools)
556 def pool(self, name: str) -> ConnectionPool:
557 """Returns the pool associated with the given name."""
558 return self._pools[name]
560 def shutdown(self, msg_prefix: str = "") -> None:
561 """Shuts down every contained pool."""
562 for pool in self._pools.values():
563 pool.shutdown(msg_prefix)