Coverage for bzfs_main / parallel_batch_cmd.py: 100%
37 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 10:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 10:16 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Helpers for running CLI commands in (sequential or parallel) batches, without exceeding operating system limits.
17The batch size aka max_batch_items splits one CLI command into one or more CLI commands. The resulting commands are executed
18sequentially (via functions *_batched()), or in parallel across max_workers threads (via functions *_parallel()).
20The degree of parallelism (max_workers) is specified by the job (via --threads).
21Batch size is a trade-off between resource consumption, latency, bandwidth and throughput.
23Example:
24--------
26- max_batch_items=1 (seq or par):
27```
28zfs list -t snapshot d1
29zfs list -t snapshot d2
30zfs list -t snapshot d3
31zfs list -t snapshot d4
32zfs list -t snapshot d5
33```
35- max_batch_items=2 (seq or par):
36```
37zfs list -t snapshot d1 d2
38zfs list -t snapshot d3 d4
39zfs list -t snapshot d5
40```
42- max_batch_items=N (seq or par):
43```
44zfs list -t snapshot d1 d2 d3 d4 d5
45```
46"""
48from __future__ import (
49 annotations,
50)
51import sys
52from collections.abc import (
53 Iterable,
54 Iterator,
55)
56from typing import (
57 TYPE_CHECKING,
58 Any,
59 Callable,
60 TypeVar,
61)
63from bzfs_main.util.connection import (
64 SHARED,
65 ConnectionPool,
66 MiniRemote,
67)
68from bzfs_main.util.parallel_iterator import (
69 batch_cmd_iterator,
70 get_max_command_line_bytes,
71 parallel_iterator,
72)
73from bzfs_main.util.utils import (
74 LOG_TRACE,
75 drain,
76)
78if TYPE_CHECKING: # pragma: no cover - for type hints only
79 from bzfs_main.bzfs import (
80 Job,
81 )
83_T = TypeVar("_T")
86def run_ssh_cmd_batched(
87 job: Job,
88 r: MiniRemote,
89 cmd: list[str],
90 cmd_args: Iterable[str],
91 fn: Callable[[list[str]], Any],
92 *,
93 max_batch_items: int = 2**29,
94 sep: str = " ",
95) -> None:
96 """Runs ssh command for each sequential batch of args, without creating a cmdline that's too big for the OS to handle."""
97 drain(itr_ssh_cmd_batched(job, r, cmd, cmd_args, fn, max_batch_items=max_batch_items, sep=sep))
100def itr_ssh_cmd_batched(
101 job: Job,
102 r: MiniRemote,
103 cmd: list[str],
104 cmd_args: Iterable[str],
105 fn: Callable[[list[str]], _T],
106 *,
107 max_batch_items: int = 2**29,
108 sep: str = " ",
109) -> Iterator[_T]:
110 """Runs fn(cmd_args) in sequential batches w/ cmd, without creating a cmdline that's too big for the OS to handle."""
111 max_bytes: int = _max_batch_bytes(job, r, cmd, sep)
112 return batch_cmd_iterator(cmd_args=cmd_args, fn=fn, max_batch_items=max_batch_items, max_batch_bytes=max_bytes, sep=sep)
115def run_ssh_cmd_parallel(
116 job: Job,
117 r: MiniRemote,
118 cmd_args_list: Iterable[tuple[list[str], Iterable[str]]],
119 fn: Callable[[list[str], list[str]], Any],
120 *,
121 max_batch_items: int = 2**29,
122) -> None:
123 """Runs multiple ssh commands in parallel, batching each set of args."""
124 drain(itr_ssh_cmd_parallel(job, r, cmd_args_list, fn=fn, max_batch_items=max_batch_items, ordered=False))
127def itr_ssh_cmd_parallel(
128 job: Job,
129 r: MiniRemote,
130 cmd_args_list: Iterable[tuple[list[str], Iterable[str]]],
131 fn: Callable[[list[str], list[str]], _T],
132 *,
133 max_batch_items: int = 2**29,
134 ordered: bool = True,
135) -> Iterator[_T]:
136 """Streams results from multiple parallel (batched) SSH commands.
138 When ordered=True, preserves the order of the batches as provided by cmd_args_list (i.e. yields results in the same order
139 as the input), not in completion order. When ordered=False, yields results as they complete for minimum latency.
140 """
141 return parallel_iterator(
142 iterator_builder=lambda executr: (
143 itr_ssh_cmd_batched( # advancing the Generator submits the next task and yields the corresponding Future
144 job, r, cmd, cmd_args, lambda batch, cmd=cmd: executr.submit(fn, cmd, batch), max_batch_items=max_batch_items # type: ignore[misc]
145 )
146 for cmd, cmd_args in cmd_args_list # lazy on-demand Python Generator
147 ),
148 max_workers=job.max_workers[r.location],
149 ordered=ordered,
150 is_terminated=job.termination_event.is_set,
151 )
154def zfs_list_snapshots_in_parallel(
155 job: Job,
156 r: MiniRemote,
157 cmd: list[str],
158 datasets: list[str],
159 *,
160 ordered: bool = True,
161) -> Iterator[list[str]]:
162 """Runs 'zfs list -t snapshot' on multiple datasets at the same time.
164 Implemented with a time and space efficient streaming algorithm; easily scales to millions of datasets and any number of
165 snapshots. Attempts to use at least 4 datasets per remote cmd mini batch to reflect increased communication latency.
166 """
167 max_workers: int = job.max_workers[r.location]
168 max_batch_items: int = min(
169 job.max_datasets_per_minibatch_on_list_snaps[r.location],
170 max(
171 len(datasets) // (max_workers * 8 if max_workers > 1 else 1),
172 4 if r.ssh_user_host else 1,
173 ),
174 )
175 return itr_ssh_cmd_parallel(
176 job,
177 r,
178 [(cmd, datasets)],
179 fn=lambda cmd, batch: (job.try_ssh_command_with_retries(r, LOG_TRACE, cmd=cmd + batch) or "").splitlines(),
180 max_batch_items=max_batch_items,
181 ordered=ordered,
182 )
185def _max_batch_bytes(job: Job, r: MiniRemote, cmd: list[str], sep: str) -> int:
186 """Avoids creating a cmdline that's too big for the OS to handle.
188 The calculation subtracts 'header_bytes', which accounts for the full SSH invocation (including control socket/options)
189 plus the fixed subcommand prefix, so that the remaining budget is reserved exclusively for the batched arguments.
190 """
191 assert isinstance(sep, str)
192 max_bytes: int = min(_get_max_command_line_bytes(job, "local"), _get_max_command_line_bytes(job, r.location))
193 # Max size of a single argument is 128KB on Linux - https://lists.gnu.org/archive/html/bug-bash/2020-09/msg00095.html
194 max_bytes = max_bytes if sep == " " else min(max_bytes, 128 * 1024 - 1) # e.g. 'zfs destroy foo@s1,s2,...,sN'
195 conn_pool: ConnectionPool = job.params.connection_pools[r.location].pool(SHARED)
196 with conn_pool.connection() as conn:
197 cmd = conn.ssh_cmd + cmd
198 header_bytes: int = len(" ".join(cmd).encode(sys.getfilesystemencoding()))
199 return max_bytes - header_bytes
202def _get_max_command_line_bytes(job: Job, location: str, os_name: str | None = None) -> int:
203 """Remote flavor of os.sysconf("SC_ARG_MAX") - size(os.environb) - safety margin"""
204 os_name = os_name if os_name else job.params.available_programs[location].get("os")
205 os_name = os_name if os_name else "n/a"
206 max_bytes = get_max_command_line_bytes(os_name)
207 if job.max_command_line_bytes is not None:
208 return job.max_command_line_bytes # for testing only
209 else:
210 return max_bytes