Coverage for bzfs_main / util / connection.py: 99%

284 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-22 08:03 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Efficient thread-safe SSH command client; See run_ssh_command() and refresh_ssh_connection_if_necessary() and class 

16ConnectionPool and class ConnectionLease. 

17 

18Can be configured to reuse multiplexed SSH connections for low latency, even on fresh process startup, for example leading to 

19ballpark 3-5ms total time for running `/bin/echo hello` end-to-end over SSH on LAN, which requires two (sequential) network 

20round trips (one for CHANNEL_OPEN, plus a subsequent one for CHANNEL_REQUEST). 

21Has zero dependencies beyond the standard OpenSSH client CLI (`ssh`); also works with `hpnssh`. The latter uses larger TCP 

22window sizes for best throughput over high speed long distance networks, aka paths with large bandwidth-delay product. 

23 

24Example usage: 

25 

26import logging 

27from subprocess import DEVNULL, PIPE 

28from bzfs_main.util.connection import ConnectionPool, create_simple_minijob, create_simple_miniremote 

29from bzfs_main.util.retry import Retry, RetryPolicy, call_with_retries 

30 

31log = logging.getLogger(__name__) 

32remote = create_simple_miniremote(log=log, ssh_user_host="alice@127.0.0.1") 

33connection_pool = ConnectionPool(remote, connpool_name="example") 

34try: 

35 job = create_simple_minijob() 

36 retry_policy = RetryPolicy( 

37 max_retries=10, 

38 min_sleep_secs=0, 

39 initial_max_sleep_secs=0.125, 

40 max_sleep_secs=10, 

41 max_elapsed_secs=60, 

42 ) 

43 

44 def run_cmd(retry: Retry) -> str: 

45 with connection_pool.connection() as conn: 

46 stdout: str = conn.run_ssh_command( 

47 cmd=["echo", "hello"], job=job, check=True, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, text=True 

48 ).stdout 

49 return stdout 

50 

51 stdout = call_with_retries(fn=run_cmd, policy=retry_policy, log=log) 

52 print(f"stdout: {stdout}") 

53finally: 

54 connection_pool.shutdown() 

55""" 

56 

57from __future__ import ( 

58 annotations, 

59) 

60import contextlib 

61import copy 

62import logging 

63import os 

64import shlex 

65import subprocess 

66import threading 

67import time 

68from collections.abc import ( 

69 Iterator, 

70) 

71from dataclasses import ( 

72 dataclass, 

73) 

74from subprocess import ( 

75 DEVNULL, 

76 PIPE, 

77) 

78from typing import ( 

79 Any, 

80 Final, 

81 Protocol, 

82 final, 

83 runtime_checkable, 

84) 

85 

86from bzfs_main.util.connection_lease import ( 

87 ConnectionLease, 

88 ConnectionLeaseManager, 

89) 

90from bzfs_main.util.retry import ( 

91 RetryableError, 

92) 

93from bzfs_main.util.utils import ( 

94 LOG_TRACE, 

95 SmallPriorityQueue, 

96 Subprocesses, 

97 die, 

98 get_home_directory, 

99 list_formatter, 

100 sha256_urlsafe_base64, 

101 stderr_to_str, 

102) 

103 

104# constants: 

105SHARED: Final[str] = "shared" 

106DEDICATED: Final[str] = "dedicated" 

107 

108 

109############################################################################# 

110@runtime_checkable 

111class MiniJob(Protocol): 

112 """Minimal Job interface required by the connections module; for loose coupling.""" 

113 

114 timeout_nanos: int | None # timestamp aka instant in time 

115 timeout_duration_nanos: int | None # duration (not a timestamp); for logging only 

116 subprocesses: Subprocesses 

117 

118 

119############################################################################# 

120@runtime_checkable 

121class MiniParams(Protocol): 

122 """Minimal Params interface used by the connections module; for loose coupling.""" 

123 

124 log: logging.Logger 

125 ssh_program: str # name or path of executable; "hpnssh" is also valid 

126 

127 

128############################################################################# 

129@runtime_checkable 

130class MiniRemote(Protocol): 

131 """Minimal Remote interface used by the connections module; for loose coupling.""" 

132 

133 params: MiniParams 

134 location: str # "src" or "dst" 

135 ssh_user_host: str # use the empty string to indicate local mode (no ssh) 

136 ssh_extra_opts: tuple[str, ...] 

137 reuse_ssh_connection: bool 

138 ssh_control_persist_secs: int 

139 ssh_control_persist_margin_secs: int 

140 ssh_exit_on_shutdown: bool 

141 ssh_socket_dir: str 

142 

143 def is_ssh_available(self) -> bool: 

144 """Return True if the ssh client program required for this remote is available on the local host.""" 

145 

146 def local_ssh_command(self, socket_file: str | None) -> list[str]: 

147 """Returns the ssh CLI command to run locally in order to talk to the remote host; This excludes the (trailing) 

148 command to run on the remote host, which will be appended later.""" 

149 

150 def cache_namespace(self) -> str: 

151 """Returns cache namespace string which is a stable, unique directory component for caches that distinguishes 

152 endpoints by username+host+port+ssh_config_file where applicable, and uses '-' when no user/host is present (local 

153 mode).""" 

154 

155 

156############################################################################# 

157def create_simple_miniremote( 

158 log: logging.Logger, 

159 ssh_user_host: str = "", # option passed to `ssh` CLI; empty string indicates local mode 

160 ssh_port: int | None = None, # option passed to `ssh` CLI 

161 ssh_extra_opts: list[str] | None = None, # optional args passed to `ssh` CLI 

162 ssh_verbose: bool = False, # option passed to `ssh` CLI 

163 ssh_config_file: str = "", # option passed to `ssh` CLI; example: /path/to/homedir/.ssh/config 

164 ssh_cipher: str = "^aes256-gcm@openssh.com", # option passed to `ssh` CLI 

165 ssh_program: str = "ssh", # name or path of CLI executable; "hpnssh" is also valid 

166 reuse_ssh_connection: bool = True, 

167 ssh_control_persist_secs: int = 90, 

168 ssh_control_persist_margin_secs: int = 2, 

169 ssh_socket_dir: str = os.path.join(get_home_directory(), ".ssh", "bzfs"), 

170 location: str = "dst", 

171) -> MiniRemote: 

172 """Factory that returns a simple implementation of the MiniRemote interface.""" 

173 

174 @dataclass(frozen=True) # aka immutable 

175 @final 

176 class SimpleMiniParams(MiniParams): 

177 log: logging.Logger 

178 ssh_program: str 

179 

180 @dataclass(frozen=True) # aka immutable 

181 @final 

182 class SimpleMiniRemote(MiniRemote): 

183 params: MiniParams 

184 location: str # "src" or "dst" 

185 ssh_user_host: str 

186 ssh_extra_opts: tuple[str, ...] 

187 reuse_ssh_connection: bool 

188 ssh_control_persist_secs: int 

189 ssh_control_persist_margin_secs: int 

190 ssh_exit_on_shutdown: bool 

191 ssh_socket_dir: str 

192 ssh_port: int | None 

193 ssh_config_file: str 

194 ssh_config_file_hash: str 

195 

196 def is_ssh_available(self) -> bool: 

197 return True 

198 

199 def local_ssh_command(self, socket_file: str | None) -> list[str]: 

200 if not self.ssh_user_host: 

201 return [] # local mode 

202 ssh_cmd: list[str] = [self.params.ssh_program] 

203 ssh_cmd.extend(self.ssh_extra_opts) 

204 if self.reuse_ssh_connection and socket_file: 

205 ssh_cmd.append("-S") 

206 ssh_cmd.append(socket_file) 

207 ssh_cmd.append(self.ssh_user_host) 

208 return ssh_cmd 

209 

210 def cache_namespace(self) -> str: 

211 if not self.ssh_user_host: 

212 return "-" # local mode 

213 return f"{self.ssh_user_host}#{self.ssh_port or ''}#{self.ssh_config_file_hash}" 

214 

215 if log is None: 

216 raise ValueError("log must not be None") 

217 if not ssh_program: 

218 raise ValueError("ssh_program must be a non-empty string") 

219 if location not in ("src", "dst"): 

220 raise ValueError("location must be 'src' or 'dst'") 

221 if ssh_control_persist_secs < 1: 

222 raise ValueError("ssh_control_persist_secs must be >= 1") 

223 params: MiniParams = SimpleMiniParams(log=log, ssh_program=ssh_program) 

224 

225 ssh_extra_opts = ( # disable interactive password prompts and X11 forwarding and pseudo-terminal allocation 

226 ["-oBatchMode=yes", "-oServerAliveInterval=0", "-x", "-T"] if ssh_extra_opts is None else list(ssh_extra_opts) 

227 ) 

228 ssh_extra_opts += ["-v"] if ssh_verbose else [] 

229 ssh_extra_opts += ["-F", ssh_config_file] if ssh_config_file else [] 

230 ssh_extra_opts += ["-c", ssh_cipher] if ssh_cipher else [] 

231 ssh_extra_opts += ["-p", str(ssh_port)] if ssh_port is not None else [] 

232 ssh_config_file_hash = sha256_urlsafe_base64(os.path.abspath(ssh_config_file), padding=False) if ssh_config_file else "" 

233 return SimpleMiniRemote( 

234 params=params, 

235 location=location, 

236 ssh_user_host=ssh_user_host, 

237 ssh_extra_opts=tuple(ssh_extra_opts), 

238 reuse_ssh_connection=reuse_ssh_connection, 

239 ssh_control_persist_secs=ssh_control_persist_secs, 

240 ssh_control_persist_margin_secs=ssh_control_persist_margin_secs, 

241 ssh_exit_on_shutdown=False, 

242 ssh_socket_dir=ssh_socket_dir, 

243 ssh_port=ssh_port, 

244 ssh_config_file=ssh_config_file, 

245 ssh_config_file_hash=ssh_config_file_hash, 

246 ) 

247 

248 

249def create_simple_minijob(timeout_duration_secs: float | None = None, subprocesses: Subprocesses | None = None) -> MiniJob: 

250 """Factory that returns a simple implementation of the MiniJob interface.""" 

251 

252 @dataclass(frozen=True) # aka immutable 

253 @final 

254 class SimpleMiniJob(MiniJob): 

255 timeout_nanos: int | None # timestamp aka instant in time 

256 timeout_duration_nanos: int | None # duration (not a timestamp); for logging only 

257 subprocesses: Subprocesses 

258 

259 t_duration_nanos: int | None = None if timeout_duration_secs is None else int(timeout_duration_secs * 1_000_000_000) 

260 timeout_nanos: int | None = None if t_duration_nanos is None else time.monotonic_ns() + t_duration_nanos 

261 subprocesses = Subprocesses() if subprocesses is None else subprocesses 

262 return SimpleMiniJob(timeout_nanos=timeout_nanos, timeout_duration_nanos=t_duration_nanos, subprocesses=subprocesses) 

263 

264 

265############################################################################# 

266def timeout(job: MiniJob) -> float | None: 

267 """Raises TimeoutExpired if necessary, else returns the number of seconds left until timeout is to occur.""" 

268 timeout_nanos: int | None = job.timeout_nanos 

269 if timeout_nanos is None: 

270 return None # never raise a timeout 

271 assert job.timeout_duration_nanos is not None 

272 delta_nanos: int = timeout_nanos - time.monotonic_ns() 

273 if delta_nanos <= 0: 

274 raise subprocess.TimeoutExpired("_timeout", timeout=job.timeout_duration_nanos / 1_000_000_000) 

275 return delta_nanos / 1_000_000_000 # seconds 

276 

277 

278def squote(remote: MiniRemote, arg: str) -> str: 

279 """Quotes an argument only when running remotely over ssh.""" 

280 assert arg is not None 

281 return shlex.quote(arg) if remote.ssh_user_host else arg 

282 

283 

284def dquote(arg: str) -> str: 

285 """Shell-escapes backslash and double quotes and dollar and backticks, then surrounds with double quotes; For an example 

286 how to safely construct and quote complex shell pipeline commands for use over SSH, see 

287 replication.py:_prepare_zfs_send_receive()""" 

288 arg = arg.replace("\\", "\\\\").replace('"', '\\"').replace("$", "\\$").replace("`", "\\`") 

289 return '"' + arg + '"' 

290 

291 

292############################################################################# 

293@dataclass(order=True, repr=False) 

294@final 

295class Connection: 

296 """Represents the ability to multiplex N=capacity concurrent SSH sessions over the same TCP connection.""" 

297 

298 _free: int # sort order evens out the number of concurrent sessions among the TCP connections 

299 _last_modified: int # LIFO: tiebreaker favors latest returned conn as that's most alive and hot; also ensures no dupes 

300 

301 def __init__( 

302 self, 

303 remote: MiniRemote, 

304 max_concurrent_ssh_sessions_per_tcp_connection: int, 

305 lease: ConnectionLease | None = None, 

306 ) -> None: 

307 assert max_concurrent_ssh_sessions_per_tcp_connection > 0 

308 self._remote: Final[MiniRemote] = remote 

309 self._capacity: Final[int] = max_concurrent_ssh_sessions_per_tcp_connection 

310 self._free: int = max_concurrent_ssh_sessions_per_tcp_connection 

311 self._last_modified: int = 0 # monotonically increasing 

312 self._last_refresh_time: int = 0 

313 self._lock: Final[threading.Lock] = threading.Lock() 

314 self._reuse_ssh_connection: Final[bool] = remote.reuse_ssh_connection 

315 self._connection_lease: Final[ConnectionLease | None] = lease 

316 self._ssh_cmd: Final[list[str]] = remote.local_ssh_command( 

317 None if self._connection_lease is None else self._connection_lease.socket_path 

318 ) 

319 self._ssh_cmd_quoted: Final[list[str]] = [shlex.quote(item) for item in self._ssh_cmd] 

320 

321 @property 

322 def ssh_cmd(self) -> list[str]: 

323 return self._ssh_cmd.copy() 

324 

325 @property 

326 def ssh_cmd_quoted(self) -> list[str]: 

327 return self._ssh_cmd_quoted.copy() 

328 

329 def __repr__(self) -> str: 

330 return str({"free": self._free}) 

331 

332 def run_ssh_command( 

333 self, 

334 cmd: list[str], 

335 job: MiniJob, 

336 loglevel: int = logging.INFO, 

337 is_dry: bool = False, 

338 **kwargs: Any, # optional low-level keyword args to be forwarded to subprocess.run() 

339 ) -> subprocess.CompletedProcess: 

340 """Runs the given CLI cmd via ssh on the given remote, and returns CompletedProcess including stdout and stderr. 

341 

342 The full command is the concatenation of both the command to run on the localhost in order to talk to the remote host 

343 ($remote.local_ssh_command()) and the command to run on the given remote host ($cmd). 

344 

345 Note: When executing on a remote host (remote.ssh_user_host is set), cmd arguments are pre-quoted with shlex.quote to 

346 safely traverse the ssh "remote shell" boundary, as ssh concatenates argv into a single remote shell string. In local 

347 mode (no remote.ssh_user_host) argv is executed directly without an intermediate shell. 

348 """ 

349 if not cmd: 

350 raise ValueError("run_ssh_command requires a non-empty cmd list") 

351 log: logging.Logger = self._remote.params.log 

352 quoted_cmd: list[str] = [shlex.quote(arg) for arg in cmd] 

353 ssh_cmd: list[str] = self._ssh_cmd 

354 if self._remote.ssh_user_host: 

355 self.refresh_ssh_connection_if_necessary(job) 

356 cmd = quoted_cmd 

357 msg: str = "Would execute: %s" if is_dry else "Executing: %s" 

358 log.log(loglevel, msg, list_formatter(self._ssh_cmd_quoted + quoted_cmd, lstrip=True)) 

359 if is_dry: 

360 return subprocess.CompletedProcess(ssh_cmd + cmd, returncode=0, stdout=None, stderr=None) 

361 else: 

362 sp: Subprocesses = job.subprocesses 

363 return sp.subprocess_run(ssh_cmd + cmd, timeout=timeout(job), log=log, **kwargs) 

364 

365 def refresh_ssh_connection_if_necessary(self, job: MiniJob) -> None: 

366 """Maintain or create an ssh master connection for low latency reuse.""" 

367 remote: MiniRemote = self._remote 

368 p: MiniParams = remote.params 

369 log: logging.Logger = p.log 

370 if not remote.ssh_user_host: 

371 return # we're in local mode; no ssh required 

372 if not remote.is_ssh_available(): 

373 die(f"{p.ssh_program} CLI is not available to talk to remote host. Install {p.ssh_program} first!") 

374 if not remote.reuse_ssh_connection: 

375 return 

376 

377 # Performance: reuse ssh connection for low latency startup of frequent ssh invocations via the 'ssh -S' and 

378 # 'ssh -S -M -oControlPersist=90s' options. See https://en.wikibooks.org/wiki/OpenSSH/Cookbook/Multiplexing 

379 # and https://chessman7.substack.com/p/how-ssh-multiplexing-reuses-master 

380 control_limit_nanos: int = (remote.ssh_control_persist_secs - remote.ssh_control_persist_margin_secs) * 1_000_000_000 

381 with self._lock: 

382 if time.monotonic_ns() < self._last_refresh_time + control_limit_nanos: 

383 return # ssh master is alive, reuse its TCP connection (this is the common case and the ultra-fast path) 

384 ssh_cmd: list[str] = self._ssh_cmd 

385 ssh_sock_cmd: list[str] = ssh_cmd[0:-1] # omit trailing ssh_user_host 

386 ssh_sock_cmd += ["-O", "check", remote.ssh_user_host] 

387 # extend lifetime of ssh master by $ssh_control_persist_secs via `ssh -O check` if master is still running. 

388 # `ssh -S /path/to/socket -O check` doesn't talk over the network, hence is still a low latency fast path. 

389 sp: Subprocesses = job.subprocesses 

390 t: float | None = timeout(job) 

391 if sp.subprocess_run(ssh_sock_cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, timeout=t, log=log).returncode == 0: 

392 log.log(LOG_TRACE, "ssh connection is alive: %s", list_formatter(ssh_sock_cmd)) 

393 else: # ssh master is not alive; start a new master: 

394 log.log(LOG_TRACE, "ssh connection is not yet alive: %s", list_formatter(ssh_sock_cmd)) 

395 ssh_control_persist_secs: int = max(1, remote.ssh_control_persist_secs) 

396 if "-v" in remote.ssh_extra_opts: 

397 # Unfortunately, with `ssh -v` (debug mode), the ssh master won't background; instead it stays in the 

398 # foreground and blocks until the ControlPersist timer expires (90 secs). To make progress earlier we ... 

399 ssh_control_persist_secs = min(1, ssh_control_persist_secs) # tell ssh block as briefly as possible (1s) 

400 ssh_sock_cmd = ssh_cmd[0:-1] # omit trailing ssh_user_host 

401 ssh_sock_cmd += ["-M", f"-oControlPersist={ssh_control_persist_secs}s", remote.ssh_user_host, "exit"] 

402 log.log(LOG_TRACE, "Executing: %s", list_formatter(ssh_sock_cmd)) 

403 t = timeout(job) 

404 try: 

405 sp.subprocess_run(ssh_sock_cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, check=True, timeout=t, log=log) 

406 except subprocess.CalledProcessError as e: 

407 log.error("%s", stderr_to_str(e.stderr).rstrip()) 

408 raise RetryableError( 

409 f"Cannot ssh into remote host via '{' '.join(ssh_sock_cmd)}'. Fix ssh configuration first, " 

410 "considering diagnostic log file output from running with -v -v -v.", 

411 display_msg="ssh connect", 

412 ) from e 

413 self._last_refresh_time = time.monotonic_ns() 

414 if self._connection_lease is not None: 

415 self._connection_lease.set_socket_mtime_to_now() 

416 

417 def _increment_free(self, value: int) -> None: 

418 """Adjusts the count of available SSH slots.""" 

419 self._free += value 

420 assert self._free >= 0 

421 assert self._free <= self._capacity 

422 

423 def _is_full(self) -> bool: 

424 """Returns True if no more SSH sessions may be opened over this TCP connection.""" 

425 return self._free <= 0 

426 

427 def _update_last_modified(self, last_modified: int) -> None: 

428 """Records when the connection was last used.""" 

429 self._last_modified = last_modified 

430 

431 def shutdown(self, msg_prefix: str) -> None: 

432 """Closes the underlying SSH master connection and releases the corresponding connection lease.""" 

433 ssh_cmd: list[str] = self._ssh_cmd 

434 if ssh_cmd and self._reuse_ssh_connection: 

435 if self._connection_lease is None: 

436 ssh_sock_cmd: list[str] = ssh_cmd[0:-1] + ["-O", "exit", ssh_cmd[-1]] 

437 log = self._remote.params.log 

438 log.log(LOG_TRACE, f"Executing {msg_prefix}: %s", shlex.join(ssh_sock_cmd)) 

439 try: 

440 proc: subprocess.CompletedProcess = subprocess.run( 

441 ssh_sock_cmd, stdin=DEVNULL, stderr=PIPE, text=True, timeout=0.1 

442 ) 

443 except subprocess.TimeoutExpired as e: # harmless as master auto-exits after ssh_control_persist_secs anyway 

444 log.log(LOG_TRACE, "Harmless ssh master connection shutdown timeout: %s", e) 

445 else: 

446 if proc.returncode != 0: # harmless for the same reason 

447 log.log(LOG_TRACE, "Harmless ssh master connection shutdown issue: %s", proc.stderr.rstrip()) 

448 else: 

449 self._connection_lease.release() 

450 

451 

452############################################################################# 

453class ConnectionPool: 

454 """Fetch a TCP connection for use in an SSH session, use it, finally return it back to the pool for future reuse; 

455 Note that max_concurrent_ssh_sessions_per_tcp_connection must not be larger than the server-side sshd_config(5) 

456 MaxSessions parameter (which defaults to 10, see https://manpages.ubuntu.com/manpages/man5/sshd_config.5.html).""" 

457 

458 def __init__( 

459 self, remote: MiniRemote, connpool_name: str, max_concurrent_ssh_sessions_per_tcp_connection: int = 8 

460 ) -> None: 

461 assert max_concurrent_ssh_sessions_per_tcp_connection > 0 

462 self._remote: Final[MiniRemote] = copy.copy(remote) # shallow copy for immutability (Remote is mutable) 

463 self._capacity: Final[int] = max_concurrent_ssh_sessions_per_tcp_connection 

464 self._connpool_name: Final[str] = connpool_name 

465 self._priority_queue: Final[SmallPriorityQueue[Connection]] = SmallPriorityQueue( 

466 reverse=True # sorted by #free slots and last_modified 

467 ) 

468 self._last_modified: int = 0 # monotonically increasing sequence number 

469 self._lock: Final[threading.Lock] = threading.Lock() 

470 lease_mgr: ConnectionLeaseManager | None = None 

471 if self._remote.ssh_user_host and self._remote.reuse_ssh_connection and not self._remote.ssh_exit_on_shutdown: 

472 lease_mgr = ConnectionLeaseManager( 

473 root_dir=self._remote.ssh_socket_dir, 

474 namespace=f"{self._remote.location}#{self._remote.cache_namespace()}#{self._connpool_name}", 

475 ssh_control_persist_secs=max(90 * 60, 2 * self._remote.ssh_control_persist_secs + 2), 

476 log=self._remote.params.log, 

477 ) 

478 self._lease_mgr: Final[ConnectionLeaseManager | None] = lease_mgr 

479 

480 @contextlib.contextmanager 

481 def connection(self) -> Iterator[Connection]: 

482 """Context manager that yields a connection from the pool and automatically returns it on __exit__.""" 

483 conn: Connection = self.get_connection() 

484 try: 

485 yield conn 

486 finally: 

487 self.return_connection(conn) 

488 

489 def get_connection(self) -> Connection: 

490 """Any Connection object returned on get_connection() also remains intentionally contained in the priority queue 

491 while it is "checked out", and that identical Connection object is later, on return_connection(), temporarily removed 

492 from the priority queue, updated with an incremented "free" slot count and then immediately reinserted into the 

493 priority queue. 

494 

495 In effect, any Connection object remains intentionally contained in the priority queue at all times. This design 

496 keeps ordering/fairness accurate while avoiding duplicate Connection instances. 

497 """ 

498 with self._lock: 

499 conn = self._priority_queue.pop() if len(self._priority_queue) > 0 else None 

500 if conn is None or conn._is_full(): # noqa: SLF001 # pylint: disable=protected-access 

501 if conn is not None: 

502 self._priority_queue.push(conn) 

503 conn = self._new_connection() # add a new connection 

504 self._last_modified += 1 

505 conn._update_last_modified(self._last_modified) # noqa: SLF001 # pylint: disable=protected-access 

506 conn._increment_free(-1) # noqa: SLF001 # pylint: disable=protected-access 

507 self._priority_queue.push(conn) 

508 return conn 

509 

510 def _new_connection(self) -> Connection: 

511 lease: ConnectionLease | None = None if self._lease_mgr is None else self._lease_mgr.acquire() 

512 return Connection(self._remote, self._capacity, lease=lease) 

513 

514 def return_connection(self, conn: Connection) -> None: 

515 """Returns the given connection to the pool and updates its priority.""" 

516 assert conn is not None 

517 with self._lock: 

518 # update priority = remove conn from queue, increment priority, finally reinsert updated conn into queue 

519 if self._priority_queue.remove(conn): # conn is not contained only if ConnectionPool.shutdown() was called 

520 conn._increment_free(1) # noqa: SLF001 # pylint: disable=protected-access 

521 self._last_modified += 1 

522 conn._update_last_modified(self._last_modified) # noqa: SLF001 # pylint: disable=protected-access 

523 self._priority_queue.push(conn) 

524 

525 def shutdown(self, msg_prefix: str = "") -> None: 

526 """Closes all SSH connections managed by this pool.""" 

527 with self._lock: 

528 try: 

529 if self._remote.reuse_ssh_connection: 

530 msg_prefix = msg_prefix + "/" + self._connpool_name 

531 for conn in self._priority_queue: 

532 conn.shutdown(msg_prefix) 

533 finally: 

534 self._priority_queue.clear() 

535 

536 def __repr__(self) -> str: 

537 with self._lock: 

538 queue = self._priority_queue 

539 return str({"capacity": self._capacity, "queue_len": len(queue), "queue": queue}) 

540 

541 

542############################################################################# 

543@final 

544class ConnectionPools: 

545 """A bunch of named connection pools with various multiplexing capacities.""" 

546 

547 def __init__(self, remote: MiniRemote, capacities: dict[str, int]) -> None: 

548 """Creates one connection pool per name with the given capacities.""" 

549 self._pools: Final[dict[str, ConnectionPool]] = { 

550 name: ConnectionPool(remote, name, capacity) for name, capacity in capacities.items() 

551 } 

552 

553 def __repr__(self) -> str: 

554 return str(self._pools) 

555 

556 def pool(self, name: str) -> ConnectionPool: 

557 """Returns the pool associated with the given name.""" 

558 return self._pools[name] 

559 

560 def shutdown(self, msg_prefix: str = "") -> None: 

561 """Shuts down every contained pool.""" 

562 for pool in self._pools.values(): 

563 pool.shutdown(msg_prefix)