Coverage for bzfs_main/parallel_iterator.py: 100%

70 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-06 13:30 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Parallel execution utilities for I/O-bound operations, with configurable result ordering.""" 

16 

17from __future__ import annotations 

18import concurrent 

19import itertools 

20import os 

21import sys 

22from collections import deque 

23from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor 

24from typing import ( 

25 Callable, 

26 Generator, 

27 Iterable, 

28 Iterator, 

29 TypeVar, 

30) 

31 

32T = TypeVar("T") 

33 

34 

35def parallel_iterator( 

36 iterator_builder: Callable[[ThreadPoolExecutor], list[Iterable[Future[T]]]], 

37 max_workers: int = os.cpu_count() or 1, 

38 ordered: bool = True, 

39) -> Generator[T, None, None]: 

40 """Executes multiple iterators in parallel/concurrently, with configurable result ordering. 

41 

42 This function provides high-performance parallel execution of iterator-based tasks using a shared thread pool, with 

43 precise control over result delivery ordering and concurrency management through a sliding window buffer approach. 

44 

45 Purpose: 

46 -------- 

47 Enables parallel/concurrent execution of multiple iterator streams while providing either sequential (ordered) or 

48 optimized-latency (unordered) result delivery. Primarily designed for I/O-bound operations like ZFS/SSH command 

49 execution where parallelism significantly improves throughput. 

50 

51 Assumptions: 

52 ------------ 

53 - Iterator builder creates iterators that yield properly submitted Future objects 

54 - Tasks are primarily I/O-bound and benefit from parallel execution 

55 - Caller can handle potential exceptions propagated from Future.result() 

56 - Iterator builder properly handles ThreadPoolExecutor lifecycle 

57 

58 Design Rationale: 

59 ----------------- 

60 The design optimizes for bzfs's primary use case: executing similar ZFS/SSH commands across multiple remote systems 

61 where I/O or ZFS overhead dominates and parallel execution provides substantial performance improvements over 

62 sequential processing. The implementation addresses several key design challenges: 

63 

64 - Sliding Window Buffer: Maintains at most max_workers futures in flight, preventing resource exhaustion and bounding 

65 memory consumption while maximizing thread utilization. This is crucial when processing large numbers of datasets 

66 typical in ZFS operation. New tasks are submitted as completed ones are consumed. 

67 

68 - Ordered vs Unordered Execution: 

69 

70 - Ordered mode uses FIFO queue (deque.popleft()) ensuring sequential delivery 

71 - Unordered mode uses concurrent.futures.wait(FIRST_COMPLETED) for minimal latency 

72 

73 - Exception Propagation: Future.result() naturally propagates exceptions from worker threads, maintaining error 

74 visibility for debugging. 

75 

76 Parameters: 

77 ----------- 

78 iterator_builder : Callable[[ThreadPoolExecutor], list[Iterable[Future[T]]]] 

79 Factory function that receives a ThreadPoolExecutor and returns a list of iterators. Each iterator should yield 

80 Future objects representing submitted tasks. The builder is called once with the managed thread pool. 

81 

82 max_workers : int, default=os.cpu_count() or 1 

83 Maximum number of worker threads in the thread pool. Also determines the buffer size for the sliding window 

84 execution model. Often higher than the number of available CPU cores for I/O-bound tasks. 

85 

86 ordered : bool, default=True 

87 Controls result delivery mode: 

88 - True: Results yielded in same order as input iterators (FIFO) 

89 - False: Results yielded as soon as available (minimum latency) 

90 

91 Yields: 

92 ------- 

93 Results from completed Future objects, either in original order (ordered=True) or completion order (ordered=False). 

94 

95 Raises: 

96 ------- 

97 Any exception raised by the submitted tasks will be propagated when their results are consumed via Future.result(). 

98 

99 Example: 

100 -------- 

101 # Parallel SSH command execution with ordered results 

102 def build_ssh_commands(executor): 

103 return [ 

104 (executor.submit(run_ssh_cmd, cmd) for cmd in commands) 

105 ] 

106 

107 for result in parallel_iterator(build_ssh_commands, max_workers=4, ordered=True): 

108 process_ssh_result(result) 

109 """ 

110 with ThreadPoolExecutor(max_workers=max_workers) as executor: 

111 iterators: list[Iterable[Future[T]]] = iterator_builder(executor) 

112 assert isinstance(iterators, list) 

113 iterator: Iterator[Future[T]] = itertools.chain(*iterators) 

114 iterators.clear() # help gc 

115 # Materialize the next N futures into a buffer, causing submission + parallel execution of their CLI calls 

116 fifo_buffer: deque[Future[T]] = deque(itertools.islice(iterator, max_workers)) 

117 next_future: Future[T] | None 

118 

119 if ordered: 

120 while fifo_buffer: # submit the next CLI call whenever the current CLI call returns 

121 curr_future: Future[T] = fifo_buffer.popleft() 

122 next_future = next(iterator, None) # causes the next CLI call to be submitted 

123 if next_future is not None: 

124 fifo_buffer.append(next_future) 

125 yield curr_future.result() # blocks until CLI returns 

126 else: 

127 todo_futures: set[Future[T]] = set(fifo_buffer) 

128 fifo_buffer.clear() # help gc 

129 done_futures: set[Future[T]] 

130 while todo_futures: 

131 done_futures, todo_futures = concurrent.futures.wait(todo_futures, return_when=FIRST_COMPLETED) # blocks 

132 for done_future in done_futures: # submit the next CLI call whenever a CLI call returns 

133 next_future = next(iterator, None) # causes the next CLI call to be submitted 

134 if next_future is not None: 

135 todo_futures.add(next_future) 

136 yield done_future.result() # does not block as processing has already completed 

137 assert next(iterator, None) is None 

138 

139 

140K = TypeVar("K") 

141V = TypeVar("V") 

142 

143 

144def run_in_parallel(fn1: Callable[[], K], fn2: Callable[[], V]) -> tuple[K, V]: 

145 """perf: Runs both I/O functions in parallel/concurrently.""" 

146 with ThreadPoolExecutor(max_workers=1) as executor: 

147 future: Future[V] = executor.submit(fn2) # async fn2 

148 result1: K = fn1() # blocks until fn1 call returns 

149 result2: V = future.result() # blocks until fn2 call returns 

150 return result1, result2 

151 

152 

153def batch_cmd_iterator( 

154 cmd_args: Iterable[str], # list of arguments to be split across one or more commands 

155 fn: Callable[[list[str]], T], # callback that runs a CLI command with a single batch 

156 max_batch_items: int = 2**29, # max number of args per batch 

157 max_batch_bytes: int = 127 * 1024, # max number of bytes per batch 

158 sep: str = " ", # separator between batch args 

159) -> Generator[T, None, None]: 

160 """Returns an iterator that runs fn(cmd_args) in sequential batches, without creating a cmdline that's too big for the OS 

161 to handle.""" 

162 assert isinstance(sep, str) 

163 fsenc: str = sys.getfilesystemencoding() 

164 seplen: int = len(sep.encode(fsenc)) 

165 batch: list[str] 

166 batch, total_bytes, max_items = [], 0, max_batch_items 

167 

168 def flush() -> T | None: 

169 if len(batch) > 0: 

170 return fn(batch) 

171 return None 

172 

173 for cmd_arg in cmd_args: 

174 curr_bytes: int = seplen + len(cmd_arg.encode(fsenc)) 

175 if total_bytes + curr_bytes > max_batch_bytes or max_items <= 0: 

176 results = flush() 

177 if results is not None: 

178 yield results 

179 batch, total_bytes, max_items = [], 0, max_batch_items 

180 batch.append(cmd_arg) 

181 total_bytes += curr_bytes 

182 max_items -= 1 

183 results = flush() 

184 if results is not None: 

185 yield results 

186 

187 

188def get_max_command_line_bytes(os_name: str) -> int: 

189 """Remote flavor of os.sysconf("SC_ARG_MAX") - size(os.environb) - safety margin""" 

190 arg_max = MAX_CMDLINE_BYTES.get(os_name, 256 * 1024) 

191 environ_size = 4 * 1024 # typically is 1-4 KB 

192 safety_margin = (8 * 2 * 4 + 4) * 1024 if arg_max >= 1 * 1024 * 1024 else 8 * 1024 

193 max_bytes = max(4 * 1024, arg_max - environ_size - safety_margin) 

194 return max_bytes 

195 

196 

197# constants: 

198MAX_CMDLINE_BYTES: dict[str, int] = { 

199 "Linux": 2 * 1024 * 1024, 

200 "FreeBSD": 256 * 1024, 

201 "SunOS": 1 * 1024 * 1024, 

202 "Darwin": 1 * 1024 * 1024, 

203 "Windows": 32 * 1024, 

204}