Coverage for bzfs_main/parallel_iterator.py: 100%

74 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-07 04:44 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Parallel execution utilities for I/O-bound operations, with configurable result ordering.""" 

16 

17from __future__ import ( 

18 annotations, 

19) 

20import concurrent 

21import itertools 

22import os 

23import sys 

24import threading 

25from collections import ( 

26 deque, 

27) 

28from collections.abc import ( 

29 Iterable, 

30 Iterator, 

31) 

32from concurrent.futures import ( 

33 FIRST_COMPLETED, 

34 Executor, 

35 Future, 

36 ThreadPoolExecutor, 

37) 

38from typing import ( 

39 Callable, 

40 Final, 

41 TypeVar, 

42) 

43 

44from bzfs_main.utils import ( 

45 SynchronousExecutor, 

46) 

47 

48T = TypeVar("T") 

49 

50 

51def parallel_iterator( 

52 iterator_builder: Callable[[Executor], Iterable[Iterable[Future[T]]]], 

53 max_workers: int = os.cpu_count() or 1, 

54 ordered: bool = True, 

55 termination_event: threading.Event | None = None, # optional event to request early async termination 

56) -> Iterator[T]: 

57 """Executes multiple iterators in parallel/concurrently, with explicit backpressure and configurable result ordering; 

58 avoids pre-submitting the entire workload. 

59 

60 This function provides high-performance parallel execution of iterator-based tasks using a shared thread pool, with 

61 precise control over result delivery ordering and concurrency management through a sliding window buffer approach. 

62 

63 Purpose: 

64 -------- 

65 Enables parallel/concurrent execution of multiple iterator streams while providing either sequential (ordered) or 

66 performance-optimized (unordered) result delivery. Primarily designed for I/O-bound operations like ZFS/SSH command 

67 execution where parallelism significantly improves throughput. 

68 

69 Assumptions: 

70 ------------ 

71 - The builder must submit tasks to the provided executor (e.g., via ``executor.submit(...)``) and yield an Iterator 

72 over the corresponding Future[T] objects. 

73 - Tasks are primarily I/O-bound and benefit from parallel execution 

74 - Caller can handle potential exceptions propagated from ``Future.result()`` 

75 - The builder properly scopes any resources to the lifecycle of the provided ThreadPoolExecutor 

76 

77 Design Rationale: 

78 ----------------- 

79 The design optimizes for bzfs's primary use case: executing similar ZFS/SSH commands across multiple remote systems 

80 where I/O or ZFS overhead dominates and parallel execution provides substantial performance improvements over 

81 sequential processing. The implementation addresses several key design challenges: 

82 

83 - Sliding Window Buffer: Maintains at most ``max_workers`` futures in flight, preventing resource exhaustion and 

84 bounding memory consumption while maximizing thread utilization. New tasks are submitted as completed ones are 

85 consumed. This is crucial when processing large numbers of datasets typical in ZFS operation. 

86 

87 - Ordered vs Unordered Execution: 

88 

89 - Ordered mode uses a FIFO queue (``deque.popleft()``) ensuring sequential delivery that preserves the order in 

90 which the builder's iterators yield Futures (i.e., the chain order), regardless of completion order. 

91 - Unordered mode uses ``concurrent.futures.wait(FIRST_COMPLETED)`` to yield results as soon as they complete for 

92 minimum end-to-end latency and maximum throughput. 

93 

94 - Exception Propagation: ``Future.result()`` naturally propagates exceptions from worker threads, maintaining error 

95 visibility for debugging. 

96 

97 Parameters: 

98 ----------- 

99 iterator_builder : Callable[[ThreadPoolExecutor], Iterable[Iterable[Future[T]]]] 

100 Factory function that receives a ThreadPoolExecutor and returns a series of iterators. Each iterator must yield 

101 Future[T] objects representing tasks that have already been submitted to the executor. The builder is called once 

102 with the managed thread pool. 

103 

104 max_workers : int, default=os.cpu_count() or 1 

105 Maximum number of worker threads in the thread pool. Also determines the buffer size for the sliding window 

106 execution model. Often higher than the number of available CPU cores for I/O-bound tasks. 

107 

108 ordered : bool, default=True 

109 Controls result delivery mode: 

110 - True: Results are yielded in the same order as produced by the builder's iterators (FIFO across the chained 

111 iterators), not by task completion order. 

112 - False: Results are yielded as soon as available (completion order) for minimum latency and maximum throughput. 

113 

114 Yields: 

115 ------- 

116 Results from completed Future objects, either in iterator order (``ordered=True``) or completion order 

117 (``ordered=False``). 

118 

119 Raises: 

120 ------- 

121 Any exception raised by the submitted tasks will be propagated when their results are consumed via ``Future.result()``. 

122 

123 Example: 

124 -------- 

125 # Parallel SSH command execution with ordered results 

126 def build_ssh_commands(executor): 

127 return [ 

128 (executor.submit(run_ssh_cmd, cmd) for cmd in commands) 

129 ] 

130 

131 for result in parallel_iterator(build_ssh_commands, max_workers=4, ordered=True): 

132 process_ssh_result(result) 

133 """ 

134 with SynchronousExecutor.executor_for(max_workers=max_workers) as executor: 

135 yield from parallel_iterator_results( 

136 iterator=itertools.chain.from_iterable(iterator_builder(executor)), 

137 max_workers=max_workers, 

138 ordered=ordered, 

139 termination_event=termination_event, 

140 ) 

141 

142 

143def parallel_iterator_results( 

144 iterator: Iterator[Future[T]], 

145 max_workers: int, 

146 ordered: bool, 

147 termination_event: threading.Event | None = None, # optional event to request early async termination 

148) -> Iterator[T]: 

149 """Yield results from an iterator of Future[T] using sliding-window parallelism with optional ordered delivery.""" 

150 assert max_workers >= 0 

151 max_workers = max(1, max_workers) 

152 termination_event = threading.Event() if termination_event is None else termination_event 

153 if termination_event.is_set(): 

154 return 

155 # Materialize the next N=max_workers futures into a buffer, causing submission + parallel execution of their CLI calls 

156 fifo_buffer: deque[Future[T]] = deque(itertools.islice(iterator, max_workers)) 

157 sentinel: Future[T] = Future() 

158 next_future: Future[T] 

159 

160 if ordered: 

161 while fifo_buffer: # submit the next CLI call whenever the current CLI call returns 

162 if termination_event.is_set(): 

163 return 

164 curr_future: Future[T] = fifo_buffer.popleft() 

165 next_future = next(iterator, sentinel) # keep the buffer full; causes the next CLI call to be submitted 

166 if next_future is not sentinel: 

167 fifo_buffer.append(next_future) 

168 yield curr_future.result() # blocks until CLI returns 

169 else: 

170 todo_futures: set[Future[T]] = set(fifo_buffer) 

171 fifo_buffer.clear() # help gc 

172 done_futures: set[Future[T]] 

173 while todo_futures: 

174 done_futures, todo_futures = concurrent.futures.wait(todo_futures, return_when=FIRST_COMPLETED) # blocks 

175 while done_futures: # submit the next CLI call whenever a CLI call returns 

176 if termination_event.is_set(): 

177 return 

178 next_future = next(iterator, sentinel) # keep the buffer full; causes the next CLI call to be submitted 

179 if next_future is not sentinel: 

180 todo_futures.add(next_future) 

181 yield done_futures.pop().result() # does not block as processing has already completed 

182 assert next(iterator, sentinel) is sentinel 

183 

184 

185K = TypeVar("K") 

186V = TypeVar("V") 

187 

188 

189def run_in_parallel(fn1: Callable[[], K], fn2: Callable[[], V]) -> tuple[K, V]: 

190 """perf: Runs both I/O functions in parallel/concurrently.""" 

191 with ThreadPoolExecutor(max_workers=1) as executor: 

192 future: Future[V] = executor.submit(fn2) # async fn2 

193 result1: K = fn1() # blocks until fn1 call returns 

194 result2: V = future.result() # blocks until fn2 call returns 

195 return result1, result2 

196 

197 

198def batch_cmd_iterator( 

199 cmd_args: Iterable[str], # list of arguments to be split across one or more commands 

200 fn: Callable[[list[str]], T], # callback that runs a CLI command with a single batch 

201 max_batch_items: int = 2**29, # max number of args per batch 

202 max_batch_bytes: int = 127 * 1024, # max number of bytes per batch 

203 sep: str = " ", # separator between batch args 

204) -> Iterator[T]: 

205 """Returns an iterator that runs fn(cmd_args) in sequential batches, without creating a cmdline that's too big for the OS 

206 to handle; Can be seen as a Pythonic xargs -n / -s with OS-aware safety margin. 

207 

208 Except for the max_batch_bytes logic, this is essentially the same as: 

209 >>> 

210 while batch := itertools.batched(cmd_args, max_batch_items): # doctest: +SKIP 

211 yield fn(batch) 

212 """ 

213 assert isinstance(sep, str) 

214 fsenc: str = sys.getfilesystemencoding() 

215 seplen: int = len(sep.encode(fsenc)) 

216 batch: list[str] 

217 batch, total_bytes, total_items = [], 0, 0 

218 for cmd_arg in cmd_args: 

219 arg_bytes: int = seplen + len(cmd_arg.encode(fsenc)) 

220 if (total_items >= max_batch_items or total_bytes + arg_bytes > max_batch_bytes) and len(batch) > 0: 

221 yield fn(batch) 

222 batch, total_bytes, total_items = [], 0, 0 

223 batch.append(cmd_arg) 

224 total_bytes += arg_bytes 

225 total_items += 1 

226 if len(batch) > 0: 

227 yield fn(batch) 

228 

229 

230def get_max_command_line_bytes(os_name: str) -> int: 

231 """Remote flavor of os.sysconf("SC_ARG_MAX") - size(os.environb) - safety margin""" 

232 arg_max = MAX_CMDLINE_BYTES.get(os_name, 256 * 1024) 

233 environ_size = 4 * 1024 # typically is 1-4 KB 

234 safety_margin = (8 * 2 * 4 + 4) * 1024 if arg_max >= 1 * 1024 * 1024 else 8 * 1024 

235 max_bytes = max(4 * 1024, arg_max - environ_size - safety_margin) 

236 return max_bytes 

237 

238 

239# constants: 

240MAX_CMDLINE_BYTES: Final[dict[str, int]] = { 

241 "Linux": 2 * 1024 * 1024, 

242 "FreeBSD": 256 * 1024, 

243 "Darwin": 1 * 1024 * 1024, 

244 "Windows": 32 * 1024, 

245}