Coverage for bzfs_main / progress_reporter.py: 99%
257 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Thread-based progress monitor for `pv` output during data transfers.
17It tails the log files produced by (parallel) ``pv`` processes and periodically prints a single status line showing
18aggregated throughput, ETA, etc. The reporter runs in a separate daemon thread to avoid blocking replication. All methods
19are designed for minimal synchronization overhead.
20"""
22from __future__ import (
23 annotations,
24)
25import argparse
26import glob
27import os
28import re
29import selectors
30import threading
31import time
32from collections import (
33 deque,
34)
35from dataclasses import (
36 dataclass,
37 field,
38)
39from datetime import (
40 datetime,
41)
42from enum import (
43 Enum,
44 auto,
45)
46from logging import (
47 Logger,
48)
49from pathlib import (
50 Path,
51)
52from typing import (
53 IO,
54 Any,
55 Final,
56 NamedTuple,
57 final,
58)
60from bzfs_main.util.utils import (
61 FILE_PERMISSIONS,
62 LOG_STDOUT,
63 InterruptibleSleep,
64 human_readable_bytes,
65 open_nofollow,
66)
68# constants
69PV_FILE_THREAD_SEPARATOR: Final[str] = "_"
70_ARABIC_DECIMAL_SEPARATOR: Final[str] = "\u066b" # "٫" # noqa: RUF003
71_PV_SIZE_TO_BYTES_REGEX: Final[re.Pattern[str]] = re.compile(
72 rf"(\d+[.,{_ARABIC_DECIMAL_SEPARATOR}]?\d*)\s*([KMGTPEZYRQ]?)(i?)([Bb])(.*)"
73)
76#############################################################################
77@final
78class State(Enum):
79 """Progress reporter lifecycle state transitions."""
81 IS_PAUSING = auto()
82 IS_RESETTING = auto()
85#############################################################################
86@final
87class ProgressReporter:
88 """Periodically prints progress updates to the same console status line, which is helpful if the program runs in an
89 interactive Unix terminal session.
91 Tails the 'pv' output log files that are being written to by (parallel) replication,
92 and extracts aggregate progress and throughput metrics from them, such as MB, MB/s, ETA, etc. Periodically prints these
93 metrics to the console status line, and in doing so "visually overwrites" the previous status line, via appending a \r
94 carriage return control char rather than a \n newline char.
95 Example console status line:
96 2025-01-17 01:23:04 [I] zfs sent 41.7 GiB 0:00:46 [963 MiB/s] [907 MiB/s] [==========> ] 80% ETA 0:00:04 ETA 01:23:08
97 """
99 def __init__(
100 self,
101 log: Logger,
102 pv_program_opts: list[str],
103 use_select: bool,
104 progress_update_intervals: tuple[float, float] | None,
105 fail: bool = False,
106 ) -> None:
107 """Creates a reporter configured for ``pv`` log parsing."""
108 # immutable variables:
109 self._log: Final[Logger] = log
110 self._pv_program_opts: Final[list[str]] = pv_program_opts
111 self._use_select: Final[bool] = use_select
112 self._progress_update_intervals: Final[tuple[float, float] | None] = progress_update_intervals
113 self._inject_error: bool = fail # for testing only
115 # mutable variables:
116 self._thread: threading.Thread | None = None
117 self._exception: BaseException | None = None
118 self._lock: Final[threading.Lock] = threading.Lock()
119 self._sleeper: Final[InterruptibleSleep] = InterruptibleSleep(self._lock) # sleeper shares lock with reporter
120 self._file_name_queue: set[str] = set()
121 self._file_name_set: Final[set[str]] = set()
122 self._is_stopping: bool = False
123 self._states: list[State] = [State.IS_RESETTING]
124 self._first_status_line_emitted: Final[threading.Event] = threading.Event() # for testing only
126 def start(self) -> None:
127 """Starts the monitoring thread and begins asynchronous parsing of ``pv`` log files."""
128 with self._lock:
129 assert self._thread is None
130 self._thread = threading.Thread(target=lambda: self._run(), name="progress_reporter", daemon=True)
131 self._thread.start()
133 def stop(self) -> None:
134 """Blocks until reporter is stopped, then reraises any exception that may have happened during log processing."""
135 with self._lock:
136 self._is_stopping = True
137 self._sleeper.interrupt()
138 t = self._thread
139 if t is not None:
140 t.join()
141 e = self._exception
142 if e is not None:
143 raise e # reraise exception in current thread
145 def pause(self) -> None:
146 """Temporarily suspends status logging."""
147 self._append_state(State.IS_PAUSING)
149 def reset(self) -> None:
150 """Clears metrics before processing a new batch of logs; the purpose is to discard previous totals to avoid mixing
151 unrelated transfers."""
152 self._append_state(State.IS_RESETTING)
154 def _append_state(self, state: State) -> None:
155 with self._lock:
156 states: list[State] = self._states
157 if len(states) > 0 and states[-1] is state:
158 return # same state twice in a row is a no-op
159 states.append(state)
160 if len(states) >= 3:
161 del states[0] # cap time and memory consumption by removing redundant state transitions
162 self._sleeper.interrupt()
164 def wait_for_first_status_line(self) -> None:
165 """Blocks until at least the first status line has been emitted; for testing only."""
166 if not self._first_status_line_emitted.is_set(): 166 ↛ exitline 166 didn't return from function 'wait_for_first_status_line' because the condition on line 166 was always true
167 self._first_status_line_emitted.wait()
169 def enqueue_pv_log_file(self, pv_log_file: str) -> None:
170 """Tells progress reporter thread to also monitor and tail the given pv log file."""
171 with self._lock:
172 if pv_log_file not in self._file_name_set:
173 self._file_name_queue.add(pv_log_file)
175 def _run(self) -> None:
176 """Thread entry point consuming pv logs and updating metrics."""
177 log = self._log
178 try:
179 fds: list[IO[Any]] = []
180 try:
181 selector = selectors.SelectSelector() if self._use_select else selectors.PollSelector()
182 try:
183 self._run_internal(fds, selector)
184 finally:
185 selector.close()
186 finally:
187 for fd in fds:
188 fd.close()
189 except BaseException as e:
190 if not isinstance(e, BrokenPipeError): 190 ↛ exitline 190 didn't return from function '_run' because the condition on line 190 was always true
191 self._exception = e # will be reraised in stop()
192 log.exception("%s", "ProgressReporter:")
194 @dataclass
195 @final
196 class TransferStat:
197 """Tracks per-file transfer state and ETA."""
199 @dataclass(order=True)
200 @final
201 class ETA:
202 """Estimated time of arrival."""
204 timestamp_nanos: int # sorted by future time at which current zfs send/recv transfer is estimated to complete
205 seq_nr: int # tiebreaker wrt. sort order
206 line_tail: str = field(compare=False) # trailing pv log line part w/ progress bar, duration ETA, timestamp ETA
208 bytes_in_flight: int
209 eta: ETA
211 def _run_internal(self, fds: list[IO[Any]], selector: selectors.BaseSelector) -> None:
212 """Tails pv log files and periodically logs aggregated progress."""
214 @final
215 class Sample(NamedTuple):
216 """Sliding window entry for throughput calculation."""
218 sent_bytes: int
219 timestamp_nanos: int
221 log = self._log
222 update_interval_secs, sliding_window_secs = (
223 self._progress_update_intervals if self._progress_update_intervals is not None else self._get_update_intervals()
224 )
225 update_interval_nanos: int = round(update_interval_secs * 1_000_000_000)
226 sliding_window_nanos: int = round(sliding_window_secs * 1_000_000_000)
227 sleep_nanos: int = 0
228 etas: list[ProgressReporter.TransferStat.ETA] = []
229 while True:
230 empty_file_name_queue: set[str] = set()
231 empty_states: list[State] = []
232 with self._lock:
233 if self._is_stopping:
234 return
235 # progress reporter thread picks up pv log files that so far aren't being tailed
236 n = len(self._file_name_queue)
237 m = len(self._file_name_set)
238 self._file_name_set.update(self._file_name_queue) # union
239 assert len(self._file_name_set) == n + m # aka assert (previous) file_name_set.isdisjoint(file_name_queue)
240 local_file_name_queue: set[str] = self._file_name_queue
241 self._file_name_queue = empty_file_name_queue # exchange buffers
242 states: list[State] = self._states
243 self._states = empty_states # exchange buffers
244 for state in states:
245 if state is State.IS_PAUSING:
246 next_update_nanos: int = time.monotonic_ns() + 10 * 365 * 86400 * 1_000_000_000 # infinity
247 sleep_nanos = next_update_nanos
248 else:
249 assert state is State.IS_RESETTING
250 sent_bytes, last_status_len = 0, 0
251 num_lines, num_readables = 0, 0
252 start_time_nanos = time.monotonic_ns()
253 next_update_nanos = start_time_nanos + update_interval_nanos
254 sleep_nanos = round(update_interval_nanos / 2.5)
255 latest_samples: deque[Sample] = deque([Sample(0, start_time_nanos)]) # sliding window w/ recent measures
256 for pv_log_file in local_file_name_queue:
257 try:
258 Path(pv_log_file).touch(mode=FILE_PERMISSIONS)
259 fd = open_nofollow(pv_log_file, mode="r", newline="", encoding="utf-8")
260 except FileNotFoundError: # a third party has somehow deleted the log file or directory
261 with self._lock:
262 self._file_name_set.discard(pv_log_file) # enable re-adding the file later via enqueue_pv_log_file()
263 log.warning("ProgressReporter: pv log file disappeared before initial open, skipping: %s", pv_log_file)
264 continue # skip to the next file in the queue
265 fds.append(fd)
266 eta = self.TransferStat.ETA(timestamp_nanos=0, seq_nr=-len(fds), line_tail="")
267 selector.register(fd, selectors.EVENT_READ, data=(iter(fd), self.TransferStat(bytes_in_flight=0, eta=eta)))
268 etas.append(eta)
269 readables: list[tuple[selectors.SelectorKey, int]] = selector.select(timeout=0) # 0 indicates "don't block"
270 has_line: bool = False
271 curr_time_nanos: int = time.monotonic_ns()
272 for selector_key, _ in readables: # for each file that's ready for non-blocking read
273 num_readables += 1
274 iter_fd, transfer_stat = selector_key.data
275 for line in iter_fd: # aka iter(fd)
276 sent_bytes += self._update_transfer_stat(line, transfer_stat, curr_time_nanos)
277 num_lines += 1
278 has_line = True
279 if curr_time_nanos >= next_update_nanos:
280 elapsed_nanos: int = curr_time_nanos - start_time_nanos
281 msg0, msg3 = self._format_sent_bytes(sent_bytes, elapsed_nanos) # throughput etc since replication starttime
282 msg1: str = self._format_duration(elapsed_nanos) # duration since replication start time
283 oldest: Sample = latest_samples[0] # throughput etc, over sliding window
284 _, msg2 = self._format_sent_bytes(sent_bytes - oldest.sent_bytes, curr_time_nanos - oldest.timestamp_nanos)
285 msg4: str = max(etas).line_tail if len(etas) > 0 else "" # progress bar, ETAs
286 timestamp: str = datetime.now().isoformat(sep=" ", timespec="seconds") # 2024-09-03 12:26:15
287 status_line: str = f"{timestamp} [I] zfs sent {msg0} {msg1} {msg2} {msg3} {msg4}".rstrip()
288 status_line = status_line.ljust(last_status_len) # "overwrite" trailing chars of previous status with spaces
290 # The Unix console skips back to the beginning of the console line when it sees this \r control char:
291 log.log(LOG_STDOUT, "%s", status_line, extra={"terminator": "\r"})
293 # log.log(log_trace, "\nnum_lines: %s, num_readables: %s", num_lines, num_readables)
294 last_status_len = len(status_line.rstrip())
295 next_update_nanos += update_interval_nanos
296 latest_samples.append(Sample(sent_bytes, curr_time_nanos))
297 if elapsed_nanos >= sliding_window_nanos:
298 latest_samples.popleft() # slide the sliding window containing recent measurements
299 if not self._first_status_line_emitted.is_set():
300 self._first_status_line_emitted.set()
301 elif not has_line:
302 # Avoid burning CPU busily spinning on I/O readiness as fds are almost always ready for non-blocking read
303 # even if no new pv log line has been written. Yet retain ability to wake up immediately on reporter.stop().
304 if self._sleeper.sleep(min(sleep_nanos, next_update_nanos - curr_time_nanos)):
305 self._sleeper.reset() # sleep was interrupted; ensure we can sleep normally again
306 if self._inject_error:
307 raise ValueError("Injected ProgressReporter error") # for testing only
309 def _update_transfer_stat(self, line: str, s: TransferStat, curr_time_nanos: int) -> int:
310 """Update ``s`` from one pv status line and return bytes delta."""
311 num_bytes, s.eta.timestamp_nanos, s.eta.line_tail = self._parse_pv_line(line, curr_time_nanos)
312 bytes_in_flight: int = s.bytes_in_flight
313 s.bytes_in_flight = num_bytes if line.endswith("\r") else 0 # intermediate vs. final status update of each transfer
314 return num_bytes - bytes_in_flight
316 NO_RATES_REGEX = re.compile(r".*/s\s*[)\]]?\s*") # matches until end of last pv rate, e.g. "834MiB/s]" or "834MiB/s)"
317 # time remaining --eta "ETA 00:00:39" or "ETA 2+0:00:39" or "ETA 2:0:00:39", followed by trailing --fineta timestamp ETA
318 TIME_REMAINING_ETA_REGEX = re.compile(r".*?ETA\s*((\d+)[+:])?(\d\d?):(\d\d):(\d\d).*(ETA|FIN).*")
320 @staticmethod
321 def _parse_pv_line(line: str, curr_time_nanos: int) -> tuple[int, int, str]:
322 """Parses a pv status line into transferred bytes and ETA timestamp."""
323 assert isinstance(line, str)
324 if ":" in line:
325 line = line.split(":", 1)[1].strip()
326 sent_bytes, line = _pv_size_to_bytes(line)
327 line = ProgressReporter.NO_RATES_REGEX.sub("", line.lstrip(), count=1) # strip --timer, --rate, --avg-rate
328 if match := ProgressReporter.TIME_REMAINING_ETA_REGEX.fullmatch(line): # extract pv --eta duration
329 _, days, hours, minutes, secs, _ = match.groups()
330 time_remaining_secs = (86400 * int(days) if days else 0) + int(hours) * 3600 + int(minutes) * 60 + int(secs)
331 curr_time_nanos += time_remaining_secs * 1_000_000_000 # ETA timestamp = now + time remaining duration
332 return sent_bytes, curr_time_nanos, line
333 return 0, curr_time_nanos, ""
335 @staticmethod
336 def _format_sent_bytes(num_bytes: int, duration_nanos: int) -> tuple[str, str]:
337 """Returns a human-readable byte count and rate."""
338 bytes_per_sec: int = round(1_000_000_000 * num_bytes / max(1, duration_nanos))
339 return f"{human_readable_bytes(num_bytes, precision=2)}", f"[{human_readable_bytes(bytes_per_sec, precision=2)}/s]"
341 @staticmethod
342 def _format_duration(duration_nanos: int) -> str:
343 """Formats ``duration_nanos`` as HH:MM:SS string."""
344 total_seconds: int = duration_nanos // 1_000_000_000
345 hours, remainder = divmod(total_seconds, 3600)
346 minutes, seconds = divmod(remainder, 60)
347 return f"{hours}:{minutes:02d}:{seconds:02d}"
349 def _get_update_intervals(self) -> tuple[float, float]:
350 """Extracts polling intervals from ``pv_program_opts``."""
351 parser = argparse.ArgumentParser(allow_abbrev=False)
352 parser.add_argument("--interval", "-i", type=float, default=1)
353 parser.add_argument("--average-rate-window", "-m", type=float, default=30)
354 args, _ = parser.parse_known_args(args=self._pv_program_opts)
355 interval: float = min(60 * 60, max(args.interval, 0.1))
356 return interval, min(60 * 60, max(args.average_rate_window, interval))
359def _pv_size_to_bytes(
360 size: str,
361) -> tuple[int, str]: # example inputs: "800B", "4.12 KiB", "510 MiB", "510 MB", "4Gb", "2TiB"
362 """Converts pv size string to bytes and returns remaining text."""
363 if match := _PV_SIZE_TO_BYTES_REGEX.fullmatch(size):
364 number: float = float(match.group(1).replace(",", ".").replace(_ARABIC_DECIMAL_SEPARATOR, "."))
365 i: int = "KMGTPEZYRQ".index(match.group(2)) if match.group(2) else -1
366 m: int = 1024 if match.group(3) == "i" else 1000
367 b: int = 1 if match.group(4) == "B" else 8
368 line_tail: str = match.group(5)
369 if line_tail and line_tail.startswith("/s"):
370 raise ValueError("Invalid pv_size: " + size) # stems from 'pv --rate' or 'pv --average-rate'
371 size_in_bytes: int = round(number * (m ** (i + 1)) / b)
372 return size_in_bytes, line_tail
373 else:
374 return 0, "" # skip partial or bad 'pv' log file line (pv process killed while writing?)
377def count_num_bytes_transferred_by_zfs_send(basis_pv_log_file: str) -> int:
378 """Scrapes the .pv log file(s) and sums up the 'pv --bytes' column."""
380 def parse_pv_line(line: str) -> int:
381 """Extracts byte count from a single pv log line."""
382 if ":" in line:
383 col: str = line.split(":", 1)[1].strip()
384 num_bytes, _ = _pv_size_to_bytes(col)
385 return num_bytes
386 return 0
388 total_bytes: int = 0
389 files: list[str] = [basis_pv_log_file] + glob.glob(glob.escape(basis_pv_log_file + PV_FILE_THREAD_SEPARATOR) + "[0-9]*")
390 for file in files:
391 if os.path.isfile(file):
392 try:
393 with open_nofollow(file, mode="r", newline="", encoding="utf-8") as fd:
394 line: str | None = None
395 for line in fd:
396 assert line is not None
397 if line.endswith("\r"):
398 continue # skip all but the most recent status update of each transfer
399 total_bytes += parse_pv_line(line)
400 line = None
401 if line is not None:
402 total_bytes += parse_pv_line(line) # consume last line of file w/ intermediate status update, if any
403 except FileNotFoundError:
404 pass # harmless
405 return total_bytes