Coverage for bzfs_main / parallel_batch_cmd.py: 100%

37 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-22 08:03 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Helpers for running CLI commands in (sequential or parallel) batches, without exceeding operating system limits. 

16 

17The batch size aka max_batch_items splits one CLI command into one or more CLI commands. The resulting commands are executed 

18sequentially (via functions *_batched()), or in parallel across max_workers threads (via functions *_parallel()). 

19 

20The degree of parallelism (max_workers) is specified by the job (via --threads). 

21Batch size is a trade-off between resource consumption, latency, bandwidth and throughput. 

22 

23Example: 

24-------- 

25 

26- max_batch_items=1 (seq or par): 

27``` 

28zfs list -t snapshot d1 

29zfs list -t snapshot d2 

30zfs list -t snapshot d3 

31zfs list -t snapshot d4 

32``` 

33 

34- max_batch_items=2 (seq or par): 

35``` 

36zfs list -t snapshot d1 d2 

37zfs list -t snapshot d3 d4 

38``` 

39 

40- max_batch_items=N (seq or par): 

41``` 

42zfs list -t snapshot d1 d2 d3 d4 

43``` 

44""" 

45 

46from __future__ import ( 

47 annotations, 

48) 

49import sys 

50from collections.abc import ( 

51 Iterable, 

52 Iterator, 

53) 

54from typing import ( 

55 TYPE_CHECKING, 

56 Any, 

57 Callable, 

58 TypeVar, 

59) 

60 

61from bzfs_main.util.connection import ( 

62 SHARED, 

63 ConnectionPool, 

64 MiniRemote, 

65) 

66from bzfs_main.util.parallel_iterator import ( 

67 batch_cmd_iterator, 

68 get_max_command_line_bytes, 

69 parallel_iterator, 

70) 

71from bzfs_main.util.utils import ( 

72 LOG_TRACE, 

73 drain, 

74) 

75 

76if TYPE_CHECKING: # pragma: no cover - for type hints only 

77 from bzfs_main.bzfs import ( 

78 Job, 

79 ) 

80 

81_T = TypeVar("_T") 

82 

83 

84def run_ssh_cmd_batched( 

85 job: Job, 

86 r: MiniRemote, 

87 cmd: list[str], 

88 cmd_args: Iterable[str], 

89 fn: Callable[[list[str]], Any], 

90 max_batch_items: int = 2**29, 

91 sep: str = " ", 

92) -> None: 

93 """Runs ssh command for each sequential batch of args, without creating a cmdline that's too big for the OS to handle.""" 

94 drain(itr_ssh_cmd_batched(job, r, cmd, cmd_args, fn, max_batch_items=max_batch_items, sep=sep)) 

95 

96 

97def itr_ssh_cmd_batched( 

98 job: Job, 

99 r: MiniRemote, 

100 cmd: list[str], 

101 cmd_args: Iterable[str], 

102 fn: Callable[[list[str]], _T], 

103 max_batch_items: int = 2**29, 

104 sep: str = " ", 

105) -> Iterator[_T]: 

106 """Runs fn(cmd_args) in sequential batches w/ cmd, without creating a cmdline that's too big for the OS to handle.""" 

107 max_bytes: int = _max_batch_bytes(job, r, cmd, sep) 

108 return batch_cmd_iterator(cmd_args=cmd_args, fn=fn, max_batch_items=max_batch_items, max_batch_bytes=max_bytes, sep=sep) 

109 

110 

111def run_ssh_cmd_parallel( 

112 job: Job, 

113 r: MiniRemote, 

114 cmd_args_list: Iterable[tuple[list[str], Iterable[str]]], 

115 fn: Callable[[list[str], list[str]], Any], 

116 max_batch_items: int = 2**29, 

117) -> None: 

118 """Runs multiple ssh commands in parallel, batching each set of args.""" 

119 drain(itr_ssh_cmd_parallel(job, r, cmd_args_list, fn=fn, max_batch_items=max_batch_items, ordered=False)) 

120 

121 

122def itr_ssh_cmd_parallel( 

123 job: Job, 

124 r: MiniRemote, 

125 cmd_args_list: Iterable[tuple[list[str], Iterable[str]]], 

126 fn: Callable[[list[str], list[str]], _T], 

127 max_batch_items: int = 2**29, 

128 ordered: bool = True, 

129) -> Iterator[_T]: 

130 """Streams results from multiple parallel (batched) SSH commands. 

131 

132 When ordered=True, preserves the order of the batches as provided by cmd_args_list (i.e. yields results in the same order 

133 as the input), not in completion order. When ordered=False, yields results as they complete for minimum latency. 

134 """ 

135 return parallel_iterator( 

136 iterator_builder=lambda executr: ( 

137 itr_ssh_cmd_batched( 

138 job, r, cmd, cmd_args, lambda batch, cmd=cmd: executr.submit(fn, cmd, batch), max_batch_items=max_batch_items # type: ignore[misc] 

139 ) 

140 for cmd, cmd_args in cmd_args_list 

141 ), 

142 max_workers=job.max_workers[r.location], 

143 ordered=ordered, 

144 termination_event=job.termination_event, 

145 ) 

146 

147 

148def zfs_list_snapshots_in_parallel( 

149 job: Job, r: MiniRemote, cmd: list[str], datasets: list[str], ordered: bool = True 

150) -> Iterator[list[str]]: 

151 """Runs 'zfs list -t snapshot' on multiple datasets at the same time. 

152 

153 Implemented with a time and space efficient streaming algorithm; easily scales to millions of datasets and any number of 

154 snapshots. Attempts to use at least 4 datasets per remote cmd mini batch to reflect increased communication latency. 

155 """ 

156 max_workers: int = job.max_workers[r.location] 

157 max_batch_items: int = min( 

158 job.max_datasets_per_minibatch_on_list_snaps[r.location], 

159 max( 

160 len(datasets) // (max_workers * 8), 

161 4 if r.ssh_user_host else 1, 

162 ), 

163 ) 

164 return itr_ssh_cmd_parallel( 

165 job, 

166 r, 

167 [(cmd, datasets)], 

168 fn=lambda cmd, batch: (job.try_ssh_command_with_retries(r, LOG_TRACE, cmd=cmd + batch) or "").splitlines(), 

169 max_batch_items=max_batch_items, 

170 ordered=ordered, 

171 ) 

172 

173 

174def _max_batch_bytes(job: Job, r: MiniRemote, cmd: list[str], sep: str) -> int: 

175 """Avoids creating a cmdline that's too big for the OS to handle. 

176 

177 The calculation subtracts 'header_bytes', which accounts for the full SSH invocation (including control socket/options) 

178 plus the fixed subcommand prefix, so that the remaining budget is reserved exclusively for the batched arguments. 

179 """ 

180 assert isinstance(sep, str) 

181 max_bytes: int = min(_get_max_command_line_bytes(job, "local"), _get_max_command_line_bytes(job, r.location)) 

182 # Max size of a single argument is 128KB on Linux - https://lists.gnu.org/archive/html/bug-bash/2020-09/msg00095.html 

183 max_bytes = max_bytes if sep == " " else min(max_bytes, 128 * 1024 - 1) # e.g. 'zfs destroy foo@s1,s2,...,sN' 

184 conn_pool: ConnectionPool = job.params.connection_pools[r.location].pool(SHARED) 

185 with conn_pool.connection() as conn: 

186 cmd = conn.ssh_cmd + cmd 

187 header_bytes: int = len(" ".join(cmd).encode(sys.getfilesystemencoding())) 

188 return max_bytes - header_bytes 

189 

190 

191def _get_max_command_line_bytes(job: Job, location: str, os_name: str | None = None) -> int: 

192 """Remote flavor of os.sysconf("SC_ARG_MAX") - size(os.environb) - safety margin""" 

193 os_name = os_name if os_name else job.params.available_programs[location].get("os") 

194 os_name = os_name if os_name else "n/a" 

195 max_bytes = get_max_command_line_bytes(os_name) 

196 if job.max_command_line_bytes is not None: 

197 return job.max_command_line_bytes # for testing only 

198 else: 

199 return max_bytes