Coverage for bzfs_main/parallel_iterator.py: 100%
70 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Parallel execution utilities for I/O-bound operations, with configurable result ordering."""
17from __future__ import annotations
18import concurrent
19import itertools
20import os
21import sys
22from collections import deque
23from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor
24from typing import (
25 Callable,
26 Generator,
27 Iterable,
28 Iterator,
29 TypeVar,
30)
32T = TypeVar("T")
35def parallel_iterator(
36 iterator_builder: Callable[[ThreadPoolExecutor], list[Iterable[Future[T]]]],
37 max_workers: int = os.cpu_count() or 1,
38 ordered: bool = True,
39) -> Generator[T, None, None]:
40 """Executes multiple iterators in parallel/concurrently, with configurable result ordering.
42 This function provides high-performance parallel execution of iterator-based tasks using a shared thread pool, with
43 precise control over result delivery ordering and concurrency management through a sliding window buffer approach.
45 Purpose:
46 --------
47 Enables parallel/concurrent execution of multiple iterator streams while providing either sequential (ordered) or
48 optimized-latency (unordered) result delivery. Primarily designed for I/O-bound operations like ZFS/SSH command
49 execution where parallelism significantly improves throughput.
51 Assumptions:
52 ------------
53 - Iterator builder creates iterators that yield properly submitted Future objects
54 - Tasks are primarily I/O-bound and benefit from parallel execution
55 - Caller can handle potential exceptions propagated from Future.result()
56 - Iterator builder properly handles ThreadPoolExecutor lifecycle
58 Design Rationale:
59 -----------------
60 The design optimizes for bzfs's primary use case: executing similar ZFS/SSH commands across multiple remote systems
61 where I/O or ZFS overhead dominates and parallel execution provides substantial performance improvements over
62 sequential processing. The implementation addresses several key design challenges:
64 - Sliding Window Buffer: Maintains at most max_workers futures in flight, preventing resource exhaustion and bounding
65 memory consumption while maximizing thread utilization. This is crucial when processing large numbers of datasets
66 typical in ZFS operation. New tasks are submitted as completed ones are consumed.
68 - Ordered vs Unordered Execution:
70 - Ordered mode uses FIFO queue (deque.popleft()) ensuring sequential delivery
71 - Unordered mode uses concurrent.futures.wait(FIRST_COMPLETED) for minimal latency
73 - Exception Propagation: Future.result() naturally propagates exceptions from worker threads, maintaining error
74 visibility for debugging.
76 Parameters:
77 -----------
78 iterator_builder : Callable[[ThreadPoolExecutor], list[Iterable[Future[T]]]]
79 Factory function that receives a ThreadPoolExecutor and returns a list of iterators. Each iterator should yield
80 Future objects representing submitted tasks. The builder is called once with the managed thread pool.
82 max_workers : int, default=os.cpu_count() or 1
83 Maximum number of worker threads in the thread pool. Also determines the buffer size for the sliding window
84 execution model. Often higher than the number of available CPU cores for I/O-bound tasks.
86 ordered : bool, default=True
87 Controls result delivery mode:
88 - True: Results yielded in same order as input iterators (FIFO)
89 - False: Results yielded as soon as available (minimum latency)
91 Yields:
92 -------
93 Results from completed Future objects, either in original order (ordered=True) or completion order (ordered=False).
95 Raises:
96 -------
97 Any exception raised by the submitted tasks will be propagated when their results are consumed via Future.result().
99 Example:
100 --------
101 # Parallel SSH command execution with ordered results
102 def build_ssh_commands(executor):
103 return [
104 (executor.submit(run_ssh_cmd, cmd) for cmd in commands)
105 ]
107 for result in parallel_iterator(build_ssh_commands, max_workers=4, ordered=True):
108 process_ssh_result(result)
109 """
110 with ThreadPoolExecutor(max_workers=max_workers) as executor:
111 iterators: list[Iterable[Future[T]]] = iterator_builder(executor)
112 assert isinstance(iterators, list)
113 iterator: Iterator[Future[T]] = itertools.chain(*iterators)
114 iterators.clear() # help gc
115 # Materialize the next N futures into a buffer, causing submission + parallel execution of their CLI calls
116 fifo_buffer: deque[Future[T]] = deque(itertools.islice(iterator, max_workers))
117 next_future: Future[T] | None
119 if ordered:
120 while fifo_buffer: # submit the next CLI call whenever the current CLI call returns
121 curr_future: Future[T] = fifo_buffer.popleft()
122 next_future = next(iterator, None) # causes the next CLI call to be submitted
123 if next_future is not None:
124 fifo_buffer.append(next_future)
125 yield curr_future.result() # blocks until CLI returns
126 else:
127 todo_futures: set[Future[T]] = set(fifo_buffer)
128 fifo_buffer.clear() # help gc
129 done_futures: set[Future[T]]
130 while todo_futures:
131 done_futures, todo_futures = concurrent.futures.wait(todo_futures, return_when=FIRST_COMPLETED) # blocks
132 for done_future in done_futures: # submit the next CLI call whenever a CLI call returns
133 next_future = next(iterator, None) # causes the next CLI call to be submitted
134 if next_future is not None:
135 todo_futures.add(next_future)
136 yield done_future.result() # does not block as processing has already completed
137 assert next(iterator, None) is None
140K = TypeVar("K")
141V = TypeVar("V")
144def run_in_parallel(fn1: Callable[[], K], fn2: Callable[[], V]) -> tuple[K, V]:
145 """perf: Runs both I/O functions in parallel/concurrently."""
146 with ThreadPoolExecutor(max_workers=1) as executor:
147 future: Future[V] = executor.submit(fn2) # async fn2
148 result1: K = fn1() # blocks until fn1 call returns
149 result2: V = future.result() # blocks until fn2 call returns
150 return result1, result2
153def batch_cmd_iterator(
154 cmd_args: Iterable[str], # list of arguments to be split across one or more commands
155 fn: Callable[[list[str]], T], # callback that runs a CLI command with a single batch
156 max_batch_items: int = 2**29, # max number of args per batch
157 max_batch_bytes: int = 127 * 1024, # max number of bytes per batch
158 sep: str = " ", # separator between batch args
159) -> Generator[T, None, None]:
160 """Returns an iterator that runs fn(cmd_args) in sequential batches, without creating a cmdline that's too big for the OS
161 to handle."""
162 assert isinstance(sep, str)
163 fsenc: str = sys.getfilesystemencoding()
164 seplen: int = len(sep.encode(fsenc))
165 batch: list[str]
166 batch, total_bytes, max_items = [], 0, max_batch_items
168 def flush() -> T | None:
169 if len(batch) > 0:
170 return fn(batch)
171 return None
173 for cmd_arg in cmd_args:
174 curr_bytes: int = seplen + len(cmd_arg.encode(fsenc))
175 if total_bytes + curr_bytes > max_batch_bytes or max_items <= 0:
176 results = flush()
177 if results is not None:
178 yield results
179 batch, total_bytes, max_items = [], 0, max_batch_items
180 batch.append(cmd_arg)
181 total_bytes += curr_bytes
182 max_items -= 1
183 results = flush()
184 if results is not None:
185 yield results
188def get_max_command_line_bytes(os_name: str) -> int:
189 """Remote flavor of os.sysconf("SC_ARG_MAX") - size(os.environb) - safety margin"""
190 arg_max = MAX_CMDLINE_BYTES.get(os_name, 256 * 1024)
191 environ_size = 4 * 1024 # typically is 1-4 KB
192 safety_margin = (8 * 2 * 4 + 4) * 1024 if arg_max >= 1 * 1024 * 1024 else 8 * 1024
193 max_bytes = max(4 * 1024, arg_max - environ_size - safety_margin)
194 return max_bytes
197# constants:
198MAX_CMDLINE_BYTES: dict[str, int] = {
199 "Linux": 2 * 1024 * 1024,
200 "FreeBSD": 256 * 1024,
201 "SunOS": 1 * 1024 * 1024,
202 "Darwin": 1 * 1024 * 1024,
203 "Windows": 32 * 1024,
204}