Coverage for bzfs_main / util / parallel_iterator.py: 100%
76 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 10:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 10:16 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Parallel execution utilities for I/O-bound operations, with configurable result ordering."""
17from __future__ import (
18 annotations,
19)
20import concurrent
21import itertools
22import os
23import sys
24from collections import (
25 deque,
26)
27from collections.abc import (
28 Iterable,
29 Iterator,
30)
31from concurrent.futures import (
32 FIRST_COMPLETED,
33 Executor,
34 Future,
35 ThreadPoolExecutor,
36)
37from typing import (
38 Callable,
39 Final,
40 TypeVar,
41)
43from bzfs_main.util.utils import (
44 SynchronousExecutor,
45)
47_T = TypeVar("_T")
50def parallel_iterator(
51 iterator_builder: Callable[[Executor], Iterable[Iterable[Future[_T]]]],
52 *,
53 max_workers: int = os.cpu_count() or 1,
54 ordered: bool = True,
55 is_terminated: Callable[[], bool] = lambda: False, # optional predicate to request early async termination
56) -> Iterator[_T]:
57 """Executes multiple iterators in parallel/concurrently, with explicit backpressure and configurable result ordering;
58 avoids pre-submitting the entire workload.
60 This function provides efficient parallel execution of iterator-based tasks using a shared thread pool, with precise
61 control over result delivery ordering and concurrency management through a bounded buffer (a sliding window of at most
62 ``max_workers`` in-flight futures).
64 Purpose:
65 --------
66 Enables parallel/concurrent execution of multiple iterator streams while providing either sequential (ordered) or
67 performance-optimized (unordered) result delivery. Primarily designed for I/O-bound operations like ZFS/SSH command
68 execution where parallelism significantly improves throughput.
70 Assumptions:
71 ------------
72 - The builder must submit tasks to the provided executor (e.g., via ``executor.submit(...)``) and yield an Iterator
73 over the corresponding Future[T] objects.
74 - Tasks are primarily I/O-bound and benefit from parallel execution
75 - Caller can handle potential exceptions propagated from ``Future.result()``
76 - The builder properly scopes any resources to the lifecycle of the provided ThreadPoolExecutor
78 Design Rationale:
79 -----------------
80 The design optimizes for bzfs's primary use case: executing similar ZFS/SSH commands across multiple remote systems
81 where I/O or ZFS overhead dominates and parallel execution provides substantial performance improvements over
82 sequential processing. The implementation addresses several key design challenges:
84 - Bounded Buffer: Maintains at most ``max_workers`` futures in flight, preventing resource exhaustion and bounding memory
85 consumption while maximizing thread utilization. New tasks are submitted as completed ones are consumed. This is
86 crucial when processing large numbers of datasets typical in ZFS operation.
88 - Ordered vs Unordered Execution:
90 - Ordered mode uses a FIFO queue (``deque.popleft()``) ensuring sequential delivery that preserves the order in
91 which the builder's iterators yield Futures (i.e., the chain order), regardless of completion order.
92 - Unordered mode uses ``concurrent.futures.wait(FIRST_COMPLETED)`` to yield results as soon as they complete for
93 minimum end-to-end latency and maximum throughput.
95 - Exception Propagation: ``Future.result()`` naturally propagates exceptions from worker threads, maintaining error
96 visibility for debugging.
98 Parameters:
99 -----------
100 iterator_builder : Callable[[ThreadPoolExecutor], Iterable[Iterable[Future[T]]]]
101 Factory function that is called once (and only once) with the ThreadPoolExecutor as parameter, returning a
102 corresponding series of iterators. Typically, each iterator is a lazy on-demand Python Generator of (a potentially
103 infinite number of) Future[T] objects representing the (future and eventually actual) result of tasks that have been
104 incrementally submitted to the thread pool, avoiding submitting all tasks at once. Typically, advancing the iterator
105 submits the next task to the executor and yields the corresponding Future.
107 max_workers : int, default=os.cpu_count() or 1
108 Maximum number of worker threads in the thread pool. Also determines the buffer size for the bounded-concurrency
109 execution model. Often higher than the number of available CPU cores for I/O-bound tasks.
111 ordered : bool, default=True
112 Controls result delivery mode:
113 - True: Results are yielded in the same order as produced by the builder's iterators (FIFO across the chained
114 iterators), not by task completion order.
115 - False: Results are yielded as soon as available (completion order) for minimum latency and maximum throughput.
117 Yields:
118 -------
119 Results from completed Future objects, either in iterator order (``ordered=True``) or completion order
120 (``ordered=False``).
122 Raises:
123 -------
124 Any exception raised by the submitted tasks will be propagated when their results are consumed via ``Future.result()``.
126 Example:
127 --------
128 # Parallel SSH command execution with ordered results
129 def build_ssh_commands(executor):
130 return [
131 (executor.submit(run_ssh_cmd, cmd) for cmd in commands) # lazy on-demand Python Generator of Future objects
132 ]
134 for result in parallel_iterator(build_ssh_commands, max_workers=4, ordered=True):
135 process_ssh_result(result)
136 """
137 with SynchronousExecutor.executor_for(max_workers=max_workers) as executor:
138 yield from parallel_iterator_results(
139 iterator=itertools.chain.from_iterable(iterator_builder(executor)),
140 max_workers=max_workers,
141 ordered=ordered,
142 is_terminated=is_terminated,
143 )
146def parallel_iterator_results(
147 iterator: Iterator[Future[_T]],
148 *,
149 max_workers: int,
150 ordered: bool,
151 is_terminated: Callable[[], bool] = lambda: False, # optional predicate to request early async termination
152) -> Iterator[_T]:
153 """Yield results from an iterator of Future[T] using bounded concurrency with optional ordered delivery."""
154 assert max_workers >= 0
155 max_workers = max(1, max_workers)
156 if is_terminated():
157 return
159 # Materialize the next N=max_workers futures into a buffer, causing submission + parallel execution of their CLI calls
160 fifo_buffer: deque[Future[_T]] = deque(itertools.islice(iterator, max_workers))
161 sentinel: Future[_T] = Future()
162 next_future: Future[_T]
164 if ordered:
165 while fifo_buffer: # submit the next CLI call whenever the current CLI call returns
166 if is_terminated():
167 for future in fifo_buffer:
168 future.cancel()
169 return
170 curr_future: Future[_T] = fifo_buffer.popleft()
171 next_future = next(iterator, sentinel) # keep the buffer full; causes the next CLI call to be submitted
172 if next_future is not sentinel:
173 fifo_buffer.append(next_future)
174 yield curr_future.result() # blocks until CLI returns
175 else:
176 todo_futures: set[Future[_T]] = set(fifo_buffer)
177 del fifo_buffer # help gc
178 done_futures: set[Future[_T]]
179 while todo_futures:
180 done_futures, todo_futures = concurrent.futures.wait(todo_futures, return_when=FIRST_COMPLETED) # blocks
181 while done_futures: # submit the next CLI call whenever a CLI call returns
182 if is_terminated():
183 for future in todo_futures:
184 future.cancel()
185 return
186 next_future = next(iterator, sentinel) # keep the buffer full; causes the next CLI call to be submitted
187 if next_future is not sentinel:
188 todo_futures.add(next_future)
189 yield done_futures.pop().result() # does not block as processing has already completed
190 assert next(iterator, sentinel) is sentinel
193_K = TypeVar("_K")
194_V = TypeVar("_V")
197def run_in_parallel(fn1: Callable[[], _K], fn2: Callable[[], _V]) -> tuple[_K, _V]:
198 """perf: Runs both I/O functions in parallel/concurrently."""
199 with ThreadPoolExecutor(max_workers=1) as executor:
200 future: Future[_V] = executor.submit(fn2) # async fn2
201 result1: _K = fn1() # blocks until fn1 call returns
202 result2: _V = future.result() # blocks until fn2 call returns
203 return result1, result2
206def batch_cmd_iterator(
207 cmd_args: Iterable[str], # list of arguments to be split across one or more commands
208 fn: Callable[[list[str]], _T], # callback that runs a CLI command with a single batch
209 *,
210 max_batch_items: int = 2**29, # max number of args per batch
211 max_batch_bytes: int = 127 * 1024, # max number of bytes per batch
212 sep: str = " ", # separator between batch args
213) -> Iterator[_T]:
214 """Returns an iterator that runs fn(cmd_args) in sequential batches, without creating a cmdline that's too big for the OS
215 to handle; Can be seen as a Pythonic xargs -n / -s with OS-aware safety margin.
217 Except for the max_batch_bytes logic, this is essentially the same as:
218 >>>
219 while batch := itertools.batched(cmd_args, max_batch_items): # doctest: +SKIP
220 yield fn(batch)
221 """
222 assert isinstance(sep, str)
223 fsenc: str = sys.getfilesystemencoding()
224 seplen: int = len(sep.encode(fsenc))
225 batch: list[str]
226 batch, total_bytes, total_items = [], 0, 0
227 for cmd_arg in cmd_args:
228 arg_bytes: int = seplen + len(cmd_arg.encode(fsenc))
229 if (total_items >= max_batch_items or total_bytes + arg_bytes > max_batch_bytes) and len(batch) > 0:
230 yield fn(batch)
231 batch, total_bytes, total_items = [], 0, 0
232 batch.append(cmd_arg)
233 total_bytes += arg_bytes
234 total_items += 1
235 if len(batch) > 0:
236 yield fn(batch)
239def get_max_command_line_bytes(os_name: str) -> int:
240 """Remote flavor of os.sysconf("SC_ARG_MAX") - size(os.environb) - safety margin"""
241 arg_max = _MAX_CMDLINE_BYTES.get(os_name, 256 * 1024)
242 environ_size = 4 * 1024 # typically is 1-4 KB
243 safety_margin = (8 * 2 * 4 + 4) * 1024 if arg_max >= 1 * 1024 * 1024 else 8 * 1024
244 max_bytes = max(4 * 1024, arg_max - environ_size - safety_margin)
245 return max_bytes
248# constants:
249_MAX_CMDLINE_BYTES: Final[dict[str, int]] = {
250 "Linux": 2 * 1024 * 1024,
251 "FreeBSD": 256 * 1024,
252 "Darwin": 1 * 1024 * 1024,
253 "Windows": 32 * 1024,
254}