Coverage for bzfs_main / util / parallel_iterator.py: 100%

77 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-29 12:49 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Parallel execution utilities for I/O-bound operations, with configurable result ordering.""" 

16 

17from __future__ import ( 

18 annotations, 

19) 

20import concurrent 

21import itertools 

22import os 

23import sys 

24from collections import ( 

25 deque, 

26) 

27from collections.abc import ( 

28 Iterable, 

29 Iterator, 

30) 

31from concurrent.futures import ( 

32 FIRST_COMPLETED, 

33 Executor, 

34 Future, 

35) 

36from concurrent.futures.thread import ( 

37 ThreadPoolExecutor, 

38) 

39from typing import ( 

40 Callable, 

41 Final, 

42 TypeVar, 

43) 

44 

45from bzfs_main.util.utils import ( 

46 SynchronousExecutor, 

47) 

48 

49_T = TypeVar("_T") 

50 

51 

52def parallel_iterator( 

53 iterator_builder: Callable[[Executor], Iterable[Iterable[Future[_T]]]], 

54 *, 

55 max_workers: int = os.cpu_count() or 1, 

56 ordered: bool = True, 

57 is_terminated: Callable[[], bool] = lambda: False, # optional predicate to request early async termination 

58) -> Iterator[_T]: 

59 """Executes multiple iterators in parallel/concurrently, with explicit backpressure and configurable result ordering; 

60 avoids pre-submitting the entire workload. 

61 

62 This function provides efficient parallel execution of iterator-based tasks using a shared thread pool, with precise 

63 control over result delivery ordering and concurrency management through a bounded buffer (a sliding window of at most 

64 ``max_workers`` in-flight futures). 

65 

66 Purpose: 

67 -------- 

68 Enables parallel/concurrent execution of multiple iterator streams while providing either sequential (ordered) or 

69 performance-optimized (unordered) result delivery. Primarily designed for I/O-bound operations like ZFS/SSH command 

70 execution where parallelism significantly improves throughput. 

71 

72 Assumptions: 

73 ------------ 

74 - The builder must submit tasks to the provided executor (e.g., via ``executor.submit(...)``) and yield an Iterator 

75 over the corresponding Future[T] objects. 

76 - Tasks are primarily I/O-bound and benefit from parallel execution 

77 - Caller can handle potential exceptions propagated from ``Future.result()`` 

78 - The builder properly scopes any resources to the lifecycle of the provided ThreadPoolExecutor 

79 

80 Design Rationale: 

81 ----------------- 

82 The design optimizes for bzfs's primary use case: executing similar ZFS/SSH commands across multiple remote systems 

83 where I/O or ZFS overhead dominates and parallel execution provides substantial performance improvements over 

84 sequential processing. The implementation addresses several key design challenges: 

85 

86 - Bounded Buffer: Maintains at most ``max_workers`` futures in flight, preventing resource exhaustion and bounding memory 

87 consumption while maximizing thread utilization. New tasks are submitted as completed ones are consumed. This is 

88 crucial when processing large numbers of datasets typical in ZFS operation. 

89 

90 - Ordered vs Unordered Execution: 

91 

92 - Ordered mode uses a FIFO queue (``deque.popleft()``) ensuring sequential delivery that preserves the order in 

93 which the builder's iterators yield Futures (i.e., the chain order), regardless of completion order. 

94 - Unordered mode uses ``concurrent.futures.wait(FIRST_COMPLETED)`` to yield results as soon as they complete for 

95 minimum end-to-end latency and maximum throughput. 

96 

97 - Exception Propagation: ``Future.result()`` naturally propagates exceptions from worker threads, maintaining error 

98 visibility for debugging. 

99 

100 Parameters: 

101 ----------- 

102 iterator_builder : Callable[[ThreadPoolExecutor], Iterable[Iterable[Future[T]]]] 

103 Factory function that is called once (and only once) with the ThreadPoolExecutor as parameter, returning a 

104 corresponding series of iterators. Typically, each iterator is a lazy on-demand Python Generator of (a potentially 

105 infinite number of) Future[T] objects representing the (future and eventually actual) result of tasks that have been 

106 incrementally submitted to the thread pool, avoiding submitting all tasks at once. Typically, advancing the iterator 

107 submits the next task to the executor and yields the corresponding Future. 

108 

109 max_workers : int, default=os.cpu_count() or 1 

110 Maximum number of worker threads in the thread pool. Also determines the buffer size for the bounded-concurrency 

111 execution model. Often higher than the number of available CPU cores for I/O-bound tasks. 

112 

113 ordered : bool, default=True 

114 Controls result delivery mode: 

115 - True: Results are yielded in the same order as produced by the builder's iterators (FIFO across the chained 

116 iterators), not by task completion order. 

117 - False: Results are yielded as soon as available (completion order) for minimum latency and maximum throughput. 

118 

119 Yields: 

120 ------- 

121 Results from completed Future objects, either in iterator order (``ordered=True``) or completion order 

122 (``ordered=False``). 

123 

124 Raises: 

125 ------- 

126 Any exception raised by the submitted tasks will be propagated when their results are consumed via ``Future.result()``. 

127 

128 Example: 

129 -------- 

130 # Parallel SSH command execution with ordered results 

131 def build_ssh_commands(executor): 

132 return [ 

133 (executor.submit(run_ssh_cmd, cmd) for cmd in commands) # lazy on-demand Python Generator of Future objects 

134 ] 

135 

136 for result in parallel_iterator(build_ssh_commands, max_workers=4, ordered=True): 

137 process_ssh_result(result) 

138 """ 

139 with SynchronousExecutor.executor_for(max_workers=max_workers) as executor: 

140 yield from parallel_iterator_results( 

141 iterator=itertools.chain.from_iterable(iterator_builder(executor)), 

142 max_workers=max_workers, 

143 ordered=ordered, 

144 is_terminated=is_terminated, 

145 ) 

146 

147 

148def parallel_iterator_results( 

149 iterator: Iterator[Future[_T]], 

150 *, 

151 max_workers: int, 

152 ordered: bool, 

153 is_terminated: Callable[[], bool] = lambda: False, # optional predicate to request early async termination 

154) -> Iterator[_T]: 

155 """Yield results from an iterator of Future[T] using bounded concurrency with optional ordered delivery.""" 

156 assert max_workers >= 0 

157 max_workers = max(1, max_workers) 

158 if is_terminated(): 

159 return 

160 

161 # Materialize the next N=max_workers futures into a buffer, causing submission + parallel execution of their CLI calls 

162 fifo_buffer: deque[Future[_T]] = deque(itertools.islice(iterator, max_workers)) 

163 sentinel: Future[_T] = Future() 

164 next_future: Future[_T] 

165 

166 if ordered: 

167 while fifo_buffer: # submit the next CLI call whenever the current CLI call returns 

168 if is_terminated(): 

169 for future in fifo_buffer: 

170 future.cancel() 

171 return 

172 curr_future: Future[_T] = fifo_buffer.popleft() 

173 next_future = next(iterator, sentinel) # keep the buffer full; causes the next CLI call to be submitted 

174 if next_future is not sentinel: 

175 fifo_buffer.append(next_future) 

176 yield curr_future.result() # blocks until CLI returns 

177 else: 

178 todo_futures: set[Future[_T]] = set(fifo_buffer) 

179 del fifo_buffer # help gc 

180 done_futures: set[Future[_T]] 

181 while todo_futures: 

182 done_futures, todo_futures = concurrent.futures.wait(todo_futures, return_when=FIRST_COMPLETED) # blocks 

183 while done_futures: # submit the next CLI call whenever a CLI call returns 

184 if is_terminated(): 

185 for future in todo_futures: 

186 future.cancel() 

187 return 

188 next_future = next(iterator, sentinel) # keep the buffer full; causes the next CLI call to be submitted 

189 if next_future is not sentinel: 

190 todo_futures.add(next_future) 

191 yield done_futures.pop().result() # does not block as processing has already completed 

192 assert next(iterator, sentinel) is sentinel 

193 

194 

195_K = TypeVar("_K") 

196_V = TypeVar("_V") 

197 

198 

199def run_in_parallel(fn1: Callable[[], _K], fn2: Callable[[], _V]) -> tuple[_K, _V]: 

200 """perf: Runs both I/O functions in parallel/concurrently.""" 

201 with ThreadPoolExecutor(max_workers=1) as executor: 

202 future: Future[_V] = executor.submit(fn2) # async fn2 

203 result1: _K = fn1() # blocks until fn1 call returns 

204 result2: _V = future.result() # blocks until fn2 call returns 

205 return result1, result2 

206 

207 

208def batch_cmd_iterator( 

209 cmd_args: Iterable[str], # list of arguments to be split across one or more commands 

210 fn: Callable[[list[str]], _T], # callback that runs a CLI command with a single batch 

211 *, 

212 max_batch_items: int = 2**29, # max number of args per batch 

213 max_batch_bytes: int = 127 * 1024, # max number of bytes per batch 

214 sep: str = " ", # separator between batch args 

215) -> Iterator[_T]: 

216 """Returns an iterator that runs fn(cmd_args) in sequential batches, without creating a cmdline that's too big for the OS 

217 to handle; Can be seen as a Pythonic xargs -n / -s with OS-aware safety margin. 

218 

219 Except for the max_batch_bytes logic, this is essentially the same as: 

220 >>> 

221 while batch := itertools.batched(cmd_args, max_batch_items): # doctest: +SKIP 

222 yield fn(batch) 

223 """ 

224 assert isinstance(sep, str) 

225 fsenc: str = sys.getfilesystemencoding() 

226 seplen: int = len(sep.encode(fsenc)) 

227 batch: list[str] 

228 batch, total_bytes, total_items = [], 0, 0 

229 for cmd_arg in cmd_args: 

230 arg_bytes: int = seplen + len(cmd_arg.encode(fsenc)) 

231 if (total_items >= max_batch_items or total_bytes + arg_bytes > max_batch_bytes) and len(batch) > 0: 

232 yield fn(batch) 

233 batch, total_bytes, total_items = [], 0, 0 

234 batch.append(cmd_arg) 

235 total_bytes += arg_bytes 

236 total_items += 1 

237 if len(batch) > 0: 

238 yield fn(batch) 

239 

240 

241def get_max_command_line_bytes(os_name: str) -> int: 

242 """Remote flavor of os.sysconf("SC_ARG_MAX") - size(os.environb) - safety margin""" 

243 arg_max = _MAX_CMDLINE_BYTES.get(os_name, 256 * 1024) 

244 environ_size = 4 * 1024 # typically is 1-4 KB 

245 safety_margin = (8 * 2 * 4 + 4) * 1024 if arg_max >= 1 * 1024 * 1024 else 8 * 1024 

246 max_bytes = max(4 * 1024, arg_max - environ_size - safety_margin) 

247 return max_bytes 

248 

249 

250# constants: 

251_MAX_CMDLINE_BYTES: Final[dict[str, int]] = { 

252 "Linux": 2 * 1024 * 1024, 

253 "FreeBSD": 256 * 1024, 

254 "Darwin": 1 * 1024 * 1024, 

255 "Windows": 32 * 1024, 

256}