Coverage for bzfs_main / util / parallel_iterator.py: 100%

78 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-22 08:03 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Parallel execution utilities for I/O-bound operations, with configurable result ordering.""" 

16 

17from __future__ import ( 

18 annotations, 

19) 

20import concurrent 

21import itertools 

22import os 

23import sys 

24import threading 

25from collections import ( 

26 deque, 

27) 

28from collections.abc import ( 

29 Iterable, 

30 Iterator, 

31) 

32from concurrent.futures import ( 

33 FIRST_COMPLETED, 

34 Executor, 

35 Future, 

36 ThreadPoolExecutor, 

37) 

38from typing import ( 

39 Callable, 

40 Final, 

41 TypeVar, 

42) 

43 

44from bzfs_main.util.utils import ( 

45 SynchronousExecutor, 

46) 

47 

48_T = TypeVar("_T") 

49 

50 

51def parallel_iterator( 

52 iterator_builder: Callable[[Executor], Iterable[Iterable[Future[_T]]]], 

53 max_workers: int = os.cpu_count() or 1, 

54 ordered: bool = True, 

55 termination_event: threading.Event | None = None, # optional event to request early async termination 

56) -> Iterator[_T]: 

57 """Executes multiple iterators in parallel/concurrently, with explicit backpressure and configurable result ordering; 

58 avoids pre-submitting the entire workload. 

59 

60 This function provides efficient parallel execution of iterator-based tasks using a shared thread pool, with precise 

61 control over result delivery ordering and concurrency management through a bounded buffer (a sliding window of at most 

62 ``max_workers`` in-flight futures). 

63 

64 Purpose: 

65 -------- 

66 Enables parallel/concurrent execution of multiple iterator streams while providing either sequential (ordered) or 

67 performance-optimized (unordered) result delivery. Primarily designed for I/O-bound operations like ZFS/SSH command 

68 execution where parallelism significantly improves throughput. 

69 

70 Assumptions: 

71 ------------ 

72 - The builder must submit tasks to the provided executor (e.g., via ``executor.submit(...)``) and yield an Iterator 

73 over the corresponding Future[T] objects. 

74 - Tasks are primarily I/O-bound and benefit from parallel execution 

75 - Caller can handle potential exceptions propagated from ``Future.result()`` 

76 - The builder properly scopes any resources to the lifecycle of the provided ThreadPoolExecutor 

77 

78 Design Rationale: 

79 ----------------- 

80 The design optimizes for bzfs's primary use case: executing similar ZFS/SSH commands across multiple remote systems 

81 where I/O or ZFS overhead dominates and parallel execution provides substantial performance improvements over 

82 sequential processing. The implementation addresses several key design challenges: 

83 

84 - Bounded Buffer: Maintains at most ``max_workers`` futures in flight, preventing resource exhaustion and bounding memory 

85 consumption while maximizing thread utilization. New tasks are submitted as completed ones are consumed. This is 

86 crucial when processing large numbers of datasets typical in ZFS operation. 

87 

88 - Ordered vs Unordered Execution: 

89 

90 - Ordered mode uses a FIFO queue (``deque.popleft()``) ensuring sequential delivery that preserves the order in 

91 which the builder's iterators yield Futures (i.e., the chain order), regardless of completion order. 

92 - Unordered mode uses ``concurrent.futures.wait(FIRST_COMPLETED)`` to yield results as soon as they complete for 

93 minimum end-to-end latency and maximum throughput. 

94 

95 - Exception Propagation: ``Future.result()`` naturally propagates exceptions from worker threads, maintaining error 

96 visibility for debugging. 

97 

98 Parameters: 

99 ----------- 

100 iterator_builder : Callable[[ThreadPoolExecutor], Iterable[Iterable[Future[T]]]] 

101 Factory function that receives a ThreadPoolExecutor and returns a series of iterators. Each iterator must yield 

102 Future[T] objects representing tasks that have already been submitted to the executor. The builder is called once 

103 with the managed thread pool. 

104 

105 max_workers : int, default=os.cpu_count() or 1 

106 Maximum number of worker threads in the thread pool. Also determines the buffer size for the bounded-concurrency 

107 execution model. Often higher than the number of available CPU cores for I/O-bound tasks. 

108 

109 ordered : bool, default=True 

110 Controls result delivery mode: 

111 - True: Results are yielded in the same order as produced by the builder's iterators (FIFO across the chained 

112 iterators), not by task completion order. 

113 - False: Results are yielded as soon as available (completion order) for minimum latency and maximum throughput. 

114 

115 Yields: 

116 ------- 

117 Results from completed Future objects, either in iterator order (``ordered=True``) or completion order 

118 (``ordered=False``). 

119 

120 Raises: 

121 ------- 

122 Any exception raised by the submitted tasks will be propagated when their results are consumed via ``Future.result()``. 

123 

124 Example: 

125 -------- 

126 # Parallel SSH command execution with ordered results 

127 def build_ssh_commands(executor): 

128 return [ 

129 (executor.submit(run_ssh_cmd, cmd) for cmd in commands) 

130 ] 

131 

132 for result in parallel_iterator(build_ssh_commands, max_workers=4, ordered=True): 

133 process_ssh_result(result) 

134 """ 

135 with SynchronousExecutor.executor_for(max_workers=max_workers) as executor: 

136 yield from parallel_iterator_results( 

137 iterator=itertools.chain.from_iterable(iterator_builder(executor)), 

138 max_workers=max_workers, 

139 ordered=ordered, 

140 termination_event=termination_event, 

141 ) 

142 

143 

144def parallel_iterator_results( 

145 iterator: Iterator[Future[_T]], 

146 max_workers: int, 

147 ordered: bool, 

148 termination_event: threading.Event | None = None, # optional event to request early async termination 

149) -> Iterator[_T]: 

150 """Yield results from an iterator of Future[T] using bounded concurrency with optional ordered delivery.""" 

151 assert max_workers >= 0 

152 max_workers = max(1, max_workers) 

153 termination_event = threading.Event() if termination_event is None else termination_event 

154 if termination_event.is_set(): 

155 return 

156 # Materialize the next N=max_workers futures into a buffer, causing submission + parallel execution of their CLI calls 

157 fifo_buffer: deque[Future[_T]] = deque(itertools.islice(iterator, max_workers)) 

158 sentinel: Future[_T] = Future() 

159 next_future: Future[_T] 

160 

161 if ordered: 

162 while fifo_buffer: # submit the next CLI call whenever the current CLI call returns 

163 if termination_event.is_set(): 

164 for future in fifo_buffer: 

165 future.cancel() 

166 return 

167 curr_future: Future[_T] = fifo_buffer.popleft() 

168 next_future = next(iterator, sentinel) # keep the buffer full; causes the next CLI call to be submitted 

169 if next_future is not sentinel: 

170 fifo_buffer.append(next_future) 

171 yield curr_future.result() # blocks until CLI returns 

172 else: 

173 todo_futures: set[Future[_T]] = set(fifo_buffer) 

174 del fifo_buffer # help gc 

175 done_futures: set[Future[_T]] 

176 while todo_futures: 

177 done_futures, todo_futures = concurrent.futures.wait(todo_futures, return_when=FIRST_COMPLETED) # blocks 

178 while done_futures: # submit the next CLI call whenever a CLI call returns 

179 if termination_event.is_set(): 

180 for future in todo_futures: 

181 future.cancel() 

182 return 

183 next_future = next(iterator, sentinel) # keep the buffer full; causes the next CLI call to be submitted 

184 if next_future is not sentinel: 

185 todo_futures.add(next_future) 

186 yield done_futures.pop().result() # does not block as processing has already completed 

187 assert next(iterator, sentinel) is sentinel 

188 

189 

190_K = TypeVar("_K") 

191_V = TypeVar("_V") 

192 

193 

194def run_in_parallel(fn1: Callable[[], _K], fn2: Callable[[], _V]) -> tuple[_K, _V]: 

195 """perf: Runs both I/O functions in parallel/concurrently.""" 

196 with ThreadPoolExecutor(max_workers=1) as executor: 

197 future: Future[_V] = executor.submit(fn2) # async fn2 

198 result1: _K = fn1() # blocks until fn1 call returns 

199 result2: _V = future.result() # blocks until fn2 call returns 

200 return result1, result2 

201 

202 

203def batch_cmd_iterator( 

204 cmd_args: Iterable[str], # list of arguments to be split across one or more commands 

205 fn: Callable[[list[str]], _T], # callback that runs a CLI command with a single batch 

206 max_batch_items: int = 2**29, # max number of args per batch 

207 max_batch_bytes: int = 127 * 1024, # max number of bytes per batch 

208 sep: str = " ", # separator between batch args 

209) -> Iterator[_T]: 

210 """Returns an iterator that runs fn(cmd_args) in sequential batches, without creating a cmdline that's too big for the OS 

211 to handle; Can be seen as a Pythonic xargs -n / -s with OS-aware safety margin. 

212 

213 Except for the max_batch_bytes logic, this is essentially the same as: 

214 >>> 

215 while batch := itertools.batched(cmd_args, max_batch_items): # doctest: +SKIP 

216 yield fn(batch) 

217 """ 

218 assert isinstance(sep, str) 

219 fsenc: str = sys.getfilesystemencoding() 

220 seplen: int = len(sep.encode(fsenc)) 

221 batch: list[str] 

222 batch, total_bytes, total_items = [], 0, 0 

223 for cmd_arg in cmd_args: 

224 arg_bytes: int = seplen + len(cmd_arg.encode(fsenc)) 

225 if (total_items >= max_batch_items or total_bytes + arg_bytes > max_batch_bytes) and len(batch) > 0: 

226 yield fn(batch) 

227 batch, total_bytes, total_items = [], 0, 0 

228 batch.append(cmd_arg) 

229 total_bytes += arg_bytes 

230 total_items += 1 

231 if len(batch) > 0: 

232 yield fn(batch) 

233 

234 

235def get_max_command_line_bytes(os_name: str) -> int: 

236 """Remote flavor of os.sysconf("SC_ARG_MAX") - size(os.environb) - safety margin""" 

237 arg_max = _MAX_CMDLINE_BYTES.get(os_name, 256 * 1024) 

238 environ_size = 4 * 1024 # typically is 1-4 KB 

239 safety_margin = (8 * 2 * 4 + 4) * 1024 if arg_max >= 1 * 1024 * 1024 else 8 * 1024 

240 max_bytes = max(4 * 1024, arg_max - environ_size - safety_margin) 

241 return max_bytes 

242 

243 

244# constants: 

245_MAX_CMDLINE_BYTES: Final[dict[str, int]] = { 

246 "Linux": 2 * 1024 * 1024, 

247 "FreeBSD": 256 * 1024, 

248 "Darwin": 1 * 1024 * 1024, 

249 "Windows": 32 * 1024, 

250}