Coverage for bzfs_main/parallel_batch_cmd.py: 100%
35 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Helpers for running CLI commands in (sequential or parallel) batches, without exceeding operating system limits.
17The batch size aka max_batch_items splits one CLI command into one or more CLI commands. The resulting commands are executed
18sequentially (via functions *_batched()), or in parallel across max_workers threads (via functions *_parallel()).
20The degree of parallelism (max_workers) is specified by the job (via --threads).
21Batch size is a trade-off between resource consumption, latency, bandwidth and throughput.
23Example:
24--------
26- max_batch_items=1 (seq or par):
27```
28zfs list -t snapshot d1
29zfs list -t snapshot d2
30zfs list -t snapshot d3
31zfs list -t snapshot d4
32```
34- max_batch_items=2 (seq or par):
35```
36zfs list -t snapshot d1 d2
37zfs list -t snapshot d3 d4
39- max_batch_items=N (seq or par):
40```
41zfs list -t snapshot d1 d2 d3 d4
42```
43"""
45from __future__ import annotations
46import sys
47from typing import (
48 TYPE_CHECKING,
49 Any,
50 Callable,
51 Generator,
52 Iterable,
53 TypeVar,
54)
56from bzfs_main.connection import (
57 SHARED,
58 ConnectionPool,
59 try_ssh_command,
60)
61from bzfs_main.parallel_iterator import (
62 batch_cmd_iterator,
63 get_max_command_line_bytes,
64 parallel_iterator,
65)
66from bzfs_main.utils import (
67 LOG_TRACE,
68 drain,
69)
71if TYPE_CHECKING: # pragma: no cover - for type hints only
72 from bzfs_main.bzfs import Job
73 from bzfs_main.configuration import Remote
75T = TypeVar("T")
78def run_ssh_cmd_batched(
79 job: Job,
80 r: Remote,
81 cmd: list[str],
82 cmd_args: Iterable[str],
83 fn: Callable[[list[str]], Any],
84 max_batch_items: int = 2**29,
85 sep: str = " ",
86) -> None:
87 """Runs ssh command for each sequential batch of args, without creating a cmdline that's too big for the OS to handle."""
88 drain(itr_ssh_cmd_batched(job, r, cmd, cmd_args, fn, max_batch_items=max_batch_items, sep=sep))
91def itr_ssh_cmd_batched(
92 job: Job,
93 r: Remote,
94 cmd: list[str],
95 cmd_args: Iterable[str],
96 fn: Callable[[list[str]], T],
97 max_batch_items: int = 2**29,
98 sep: str = " ",
99) -> Generator[T, None, None]:
100 """Runs fn(cmd_args) in sequential batches w/ cmd, without creating a cmdline that's too big for the OS to handle."""
101 max_bytes: int = _max_batch_bytes(job, r, cmd, sep)
102 return batch_cmd_iterator(cmd_args=cmd_args, fn=fn, max_batch_items=max_batch_items, max_batch_bytes=max_bytes, sep=sep)
105def run_ssh_cmd_parallel(
106 job: Job,
107 r: Remote,
108 cmd_args_list: list[tuple[list[str], Iterable[str]]],
109 fn: Callable[[list[str], list[str]], Any],
110 max_batch_items: int = 2**29,
111) -> None:
112 """Runs multiple ssh commands in parallel, batching each set of args."""
113 drain(itr_ssh_cmd_parallel(job, r, cmd_args_list, fn=fn, max_batch_items=max_batch_items, ordered=False))
116def itr_ssh_cmd_parallel(
117 job: Job,
118 r: Remote,
119 cmd_args_list: list[tuple[list[str], Iterable[str]]],
120 fn: Callable[[list[str], list[str]], T],
121 max_batch_items: int = 2**29,
122 ordered: bool = True,
123) -> Generator[T, None, None]:
124 """Streams results from multiple parallel (batched) SSH commands; Returns output datasets in the same order as the input
125 datasets (not in random order) if ordered == True."""
126 return parallel_iterator(
127 iterator_builder=lambda executr: [
128 itr_ssh_cmd_batched(
129 job, r, cmd, cmd_args, lambda batch, cmd=cmd: executr.submit(fn, cmd, batch), max_batch_items=max_batch_items # type: ignore[misc]
130 )
131 for cmd, cmd_args in cmd_args_list
132 ],
133 max_workers=job.max_workers[r.location],
134 ordered=ordered,
135 )
138def zfs_list_snapshots_in_parallel(
139 job: Job, r: Remote, cmd: list[str], datasets: list[str], ordered: bool = True
140) -> Generator[list[str], None, None]:
141 """Runs 'zfs list -t snapshot' on multiple datasets at the same time; Implemented with a time and space efficient
142 streaming algorithm; easily scales to millions of datasets and any number of snapshots."""
143 max_workers: int = job.max_workers[r.location]
144 return itr_ssh_cmd_parallel(
145 job,
146 r,
147 [(cmd, datasets)],
148 fn=lambda cmd, batch: (try_ssh_command(job, r, LOG_TRACE, cmd=cmd + batch) or "").splitlines(),
149 max_batch_items=min(
150 job.max_datasets_per_minibatch_on_list_snaps[r.location],
151 max(
152 len(datasets) // (max_workers if r.ssh_user_host else max_workers * 8),
153 max_workers if r.ssh_user_host else 1,
154 ),
155 ),
156 ordered=ordered,
157 )
160def _max_batch_bytes(job: Job, r: Remote, cmd: list[str], sep: str) -> int:
161 """Avoids creating a cmdline that's too big for the OS to handle."""
162 assert isinstance(sep, str)
163 max_bytes: int = min(_get_max_command_line_bytes(job, "local"), _get_max_command_line_bytes(job, r.location))
164 # Max size of a single argument is 128KB on Linux - https://lists.gnu.org/archive/html/bug-bash/2020-09/msg00095.html
165 max_bytes = max_bytes if sep == " " else min(max_bytes, 131071) # e.g. 'zfs destroy foo@s1,s2,...,sN'
166 conn_pool: ConnectionPool = job.params.connection_pools[r.location].pool(SHARED)
167 with conn_pool.connection() as conn:
168 cmd = conn.ssh_cmd + cmd
169 header_bytes: int = len(" ".join(cmd).encode(sys.getfilesystemencoding()))
170 return max_bytes - header_bytes
173def _get_max_command_line_bytes(job: Job, location: str, os_name: str | None = None) -> int:
174 """Remote flavor of os.sysconf("SC_ARG_MAX") - size(os.environb) - safety margin"""
175 os_name = os_name if os_name else job.params.available_programs[location].get("os")
176 os_name = os_name if os_name else "n/a"
177 max_bytes = get_max_command_line_bytes(os_name)
178 if job.max_command_line_bytes is not None:
179 return job.max_command_line_bytes # for testing only
180 else:
181 return max_bytes