Coverage for bzfs_main / parallel_batch_cmd.py: 100%

37 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-24 10:16 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Helpers for running CLI commands in (sequential or parallel) batches, without exceeding operating system limits. 

16 

17The batch size aka max_batch_items splits one CLI command into one or more CLI commands. The resulting commands are executed 

18sequentially (via functions *_batched()), or in parallel across max_workers threads (via functions *_parallel()). 

19 

20The degree of parallelism (max_workers) is specified by the job (via --threads). 

21Batch size is a trade-off between resource consumption, latency, bandwidth and throughput. 

22 

23Example: 

24-------- 

25 

26- max_batch_items=1 (seq or par): 

27``` 

28zfs list -t snapshot d1 

29zfs list -t snapshot d2 

30zfs list -t snapshot d3 

31zfs list -t snapshot d4 

32zfs list -t snapshot d5 

33``` 

34 

35- max_batch_items=2 (seq or par): 

36``` 

37zfs list -t snapshot d1 d2 

38zfs list -t snapshot d3 d4 

39zfs list -t snapshot d5 

40``` 

41 

42- max_batch_items=N (seq or par): 

43``` 

44zfs list -t snapshot d1 d2 d3 d4 d5 

45``` 

46""" 

47 

48from __future__ import ( 

49 annotations, 

50) 

51import sys 

52from collections.abc import ( 

53 Iterable, 

54 Iterator, 

55) 

56from typing import ( 

57 TYPE_CHECKING, 

58 Any, 

59 Callable, 

60 TypeVar, 

61) 

62 

63from bzfs_main.util.connection import ( 

64 SHARED, 

65 ConnectionPool, 

66 MiniRemote, 

67) 

68from bzfs_main.util.parallel_iterator import ( 

69 batch_cmd_iterator, 

70 get_max_command_line_bytes, 

71 parallel_iterator, 

72) 

73from bzfs_main.util.utils import ( 

74 LOG_TRACE, 

75 drain, 

76) 

77 

78if TYPE_CHECKING: # pragma: no cover - for type hints only 

79 from bzfs_main.bzfs import ( 

80 Job, 

81 ) 

82 

83_T = TypeVar("_T") 

84 

85 

86def run_ssh_cmd_batched( 

87 job: Job, 

88 r: MiniRemote, 

89 cmd: list[str], 

90 cmd_args: Iterable[str], 

91 fn: Callable[[list[str]], Any], 

92 *, 

93 max_batch_items: int = 2**29, 

94 sep: str = " ", 

95) -> None: 

96 """Runs ssh command for each sequential batch of args, without creating a cmdline that's too big for the OS to handle.""" 

97 drain(itr_ssh_cmd_batched(job, r, cmd, cmd_args, fn, max_batch_items=max_batch_items, sep=sep)) 

98 

99 

100def itr_ssh_cmd_batched( 

101 job: Job, 

102 r: MiniRemote, 

103 cmd: list[str], 

104 cmd_args: Iterable[str], 

105 fn: Callable[[list[str]], _T], 

106 *, 

107 max_batch_items: int = 2**29, 

108 sep: str = " ", 

109) -> Iterator[_T]: 

110 """Runs fn(cmd_args) in sequential batches w/ cmd, without creating a cmdline that's too big for the OS to handle.""" 

111 max_bytes: int = _max_batch_bytes(job, r, cmd, sep) 

112 return batch_cmd_iterator(cmd_args=cmd_args, fn=fn, max_batch_items=max_batch_items, max_batch_bytes=max_bytes, sep=sep) 

113 

114 

115def run_ssh_cmd_parallel( 

116 job: Job, 

117 r: MiniRemote, 

118 cmd_args_list: Iterable[tuple[list[str], Iterable[str]]], 

119 fn: Callable[[list[str], list[str]], Any], 

120 *, 

121 max_batch_items: int = 2**29, 

122) -> None: 

123 """Runs multiple ssh commands in parallel, batching each set of args.""" 

124 drain(itr_ssh_cmd_parallel(job, r, cmd_args_list, fn=fn, max_batch_items=max_batch_items, ordered=False)) 

125 

126 

127def itr_ssh_cmd_parallel( 

128 job: Job, 

129 r: MiniRemote, 

130 cmd_args_list: Iterable[tuple[list[str], Iterable[str]]], 

131 fn: Callable[[list[str], list[str]], _T], 

132 *, 

133 max_batch_items: int = 2**29, 

134 ordered: bool = True, 

135) -> Iterator[_T]: 

136 """Streams results from multiple parallel (batched) SSH commands. 

137 

138 When ordered=True, preserves the order of the batches as provided by cmd_args_list (i.e. yields results in the same order 

139 as the input), not in completion order. When ordered=False, yields results as they complete for minimum latency. 

140 """ 

141 return parallel_iterator( 

142 iterator_builder=lambda executr: ( 

143 itr_ssh_cmd_batched( # advancing the Generator submits the next task and yields the corresponding Future 

144 job, r, cmd, cmd_args, lambda batch, cmd=cmd: executr.submit(fn, cmd, batch), max_batch_items=max_batch_items # type: ignore[misc] 

145 ) 

146 for cmd, cmd_args in cmd_args_list # lazy on-demand Python Generator 

147 ), 

148 max_workers=job.max_workers[r.location], 

149 ordered=ordered, 

150 is_terminated=job.termination_event.is_set, 

151 ) 

152 

153 

154def zfs_list_snapshots_in_parallel( 

155 job: Job, 

156 r: MiniRemote, 

157 cmd: list[str], 

158 datasets: list[str], 

159 *, 

160 ordered: bool = True, 

161) -> Iterator[list[str]]: 

162 """Runs 'zfs list -t snapshot' on multiple datasets at the same time. 

163 

164 Implemented with a time and space efficient streaming algorithm; easily scales to millions of datasets and any number of 

165 snapshots. Attempts to use at least 4 datasets per remote cmd mini batch to reflect increased communication latency. 

166 """ 

167 max_workers: int = job.max_workers[r.location] 

168 max_batch_items: int = min( 

169 job.max_datasets_per_minibatch_on_list_snaps[r.location], 

170 max( 

171 len(datasets) // (max_workers * 8 if max_workers > 1 else 1), 

172 4 if r.ssh_user_host else 1, 

173 ), 

174 ) 

175 return itr_ssh_cmd_parallel( 

176 job, 

177 r, 

178 [(cmd, datasets)], 

179 fn=lambda cmd, batch: (job.try_ssh_command_with_retries(r, LOG_TRACE, cmd=cmd + batch) or "").splitlines(), 

180 max_batch_items=max_batch_items, 

181 ordered=ordered, 

182 ) 

183 

184 

185def _max_batch_bytes(job: Job, r: MiniRemote, cmd: list[str], sep: str) -> int: 

186 """Avoids creating a cmdline that's too big for the OS to handle. 

187 

188 The calculation subtracts 'header_bytes', which accounts for the full SSH invocation (including control socket/options) 

189 plus the fixed subcommand prefix, so that the remaining budget is reserved exclusively for the batched arguments. 

190 """ 

191 assert isinstance(sep, str) 

192 max_bytes: int = min(_get_max_command_line_bytes(job, "local"), _get_max_command_line_bytes(job, r.location)) 

193 # Max size of a single argument is 128KB on Linux - https://lists.gnu.org/archive/html/bug-bash/2020-09/msg00095.html 

194 max_bytes = max_bytes if sep == " " else min(max_bytes, 128 * 1024 - 1) # e.g. 'zfs destroy foo@s1,s2,...,sN' 

195 conn_pool: ConnectionPool = job.params.connection_pools[r.location].pool(SHARED) 

196 with conn_pool.connection() as conn: 

197 cmd = conn.ssh_cmd + cmd 

198 header_bytes: int = len(" ".join(cmd).encode(sys.getfilesystemencoding())) 

199 return max_bytes - header_bytes 

200 

201 

202def _get_max_command_line_bytes(job: Job, location: str, os_name: str | None = None) -> int: 

203 """Remote flavor of os.sysconf("SC_ARG_MAX") - size(os.environb) - safety margin""" 

204 os_name = os_name if os_name else job.params.available_programs[location].get("os") 

205 os_name = os_name if os_name else "n/a" 

206 max_bytes = get_max_command_line_bytes(os_name) 

207 if job.max_command_line_bytes is not None: 

208 return job.max_command_line_bytes # for testing only 

209 else: 

210 return max_bytes