Coverage for bzfs_main/progress_reporter.py: 99%
230 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Thread-based progress monitor for `pv` output during data transfers.
17It tails the log files produced by (parallel) ``pv`` processes and periodically prints a single status line showing
18aggregated throughput, ETA, etc. The reporter runs in a separate daemon thread to avoid blocking replication. All methods
19are designed for minimal synchronization overhead.
20"""
22from __future__ import annotations
23import argparse
24import glob
25import os
26import re
27import selectors
28import sys
29import threading
30import time
31from collections import deque
32from dataclasses import dataclass, field
33from datetime import datetime
34from logging import Logger
35from pathlib import Path
36from typing import (
37 IO,
38 Any,
39 NamedTuple,
40)
42from bzfs_main.utils import (
43 InterruptibleSleep,
44 human_readable_bytes,
45 open_nofollow,
46)
48# constants
49PV_FILE_THREAD_SEPARATOR: str = "_"
50ARABIC_DECIMAL_SEPARATOR: str = "\u066b" # "٫" # noqa: RUF003
51PV_SIZE_TO_BYTES_REGEX: re.Pattern = re.compile(rf"(\d+[.,{ARABIC_DECIMAL_SEPARATOR}]?\d*)\s*([KMGTPEZYRQ]?)(i?)([Bb])(.*)")
54#############################################################################
55class ProgressReporter:
56 """Periodically prints progress updates to the same console status line, which is helpful if the program runs in an
57 interactive Unix terminal session.
59 Tails the 'pv' output log files that are being written to by (parallel) replication,
60 and extracts aggregate progress and throughput metrics from them, such as MB, MB/s, ETA, etc. Periodically prints these
61 metrics to the console status line (but not to the log file), and in doing so "visually overwrites" the previous status
62 line, via appending a \r carriage return control char rather than a \n newline char. Does not print a status line if the
63 Unix environment var 'bzfs_isatty' is set to 'false', in order not to confuse programs that scrape redirected stdout.
64 Example console status line:
65 2025-01-17 01:23:04 [I] zfs sent 41.7 GiB 0:00:46 [963 MiB/s] [907 MiB/s] [==========> ] 80% ETA 0:00:04 ETA 01:23:08
66 """
68 def __init__(
69 self,
70 log: Logger,
71 pv_program_opts: list[str],
72 use_select: bool,
73 progress_update_intervals: tuple[float, float] | None,
74 fail: bool = False,
75 ) -> None:
76 """Creates a reporter configured for ``pv`` log parsing."""
77 # immutable variables:
78 self.log: Logger = log
79 self.pv_program_opts: list[str] = pv_program_opts
80 self.use_select: bool = use_select
81 self.progress_update_intervals: tuple[float, float] | None = progress_update_intervals
82 self.inject_error: bool = fail # for testing only
84 # mutable variables:
85 self.thread: threading.Thread | None = None
86 self.exception: BaseException | None = None
87 self.lock: threading.Lock = threading.Lock()
88 self.sleeper: InterruptibleSleep = InterruptibleSleep(self.lock) # sleeper shares lock with reporter
89 self.file_name_queue: set[str] = set()
90 self.file_name_set: set[str] = set()
91 self.is_resetting: bool = True
92 self.is_pausing: bool = False
94 def start(self) -> None:
95 """Starts the monitoring thread and begins asynchronous parsing of ``pv`` log files."""
96 with self.lock:
97 assert self.thread is None
98 self.thread = threading.Thread(target=lambda: self._run(), name="progress_reporter", daemon=True)
99 self.thread.start()
101 def stop(self) -> None:
102 """Blocks until reporter is stopped, then reraises any exception that may have happened during log processing."""
103 self.sleeper.interrupt()
104 t = self.thread
105 if t is not None:
106 t.join()
107 e = self.exception
108 if e is not None:
109 raise e # reraise exception in current thread
111 def pause(self) -> None:
112 """Temporarily suspends status logging."""
113 with self.lock:
114 self.is_pausing = True
116 def reset(self) -> None:
117 """Clears metrics before processing a new batch of logs; the purpose is to discard previous totals to avoid mixing
118 unrelated transfers."""
119 with self.lock:
120 self.is_resetting = True
122 def enqueue_pv_log_file(self, pv_log_file: str) -> None:
123 """Tells progress reporter thread to also monitor and tail the given pv log file."""
124 with self.lock:
125 if pv_log_file not in self.file_name_set:
126 self.file_name_queue.add(pv_log_file)
128 def _run(self) -> None:
129 """Thread entry point consuming pv logs and updating metrics."""
130 log = self.log
131 try:
132 fds: list[IO[Any]] = []
133 try:
134 selector = selectors.SelectSelector() if self.use_select else selectors.PollSelector()
135 try:
136 self._run_internal(fds, selector)
137 finally:
138 selector.close()
139 finally:
140 for fd in fds:
141 fd.close()
142 except BaseException as e:
143 self.exception = e # will be reraised in stop()
144 log.exception("%s", "ProgressReporter:")
146 @dataclass
147 class TransferStat:
148 """Tracks per-file transfer state and ETA."""
150 @dataclass(order=True)
151 class ETA:
152 """Estimated time of arrival."""
154 timestamp_nanos: int # sorted by future time at which current zfs send/recv transfer is estimated to complete
155 seq_nr: int # tiebreaker wrt. sort order
156 line_tail: str = field(compare=False) # trailing pv log line part w/ progress bar, duration ETA, timestamp ETA
158 bytes_in_flight: int
159 eta: ETA
161 def _run_internal(self, fds: list[IO[Any]], selector: selectors.BaseSelector) -> None:
162 """Tails pv log files and periodically logs aggregated progress."""
164 class Sample(NamedTuple):
165 """Sliding window entry for throughput calculation."""
167 sent_bytes: int
168 timestamp_nanos: int
170 log = self.log
171 update_interval_secs, sliding_window_secs = (
172 self.progress_update_intervals if self.progress_update_intervals is not None else self._get_update_intervals()
173 )
174 update_interval_nanos: int = round(update_interval_secs * 1_000_000_000)
175 sliding_window_nanos: int = round(sliding_window_secs * 1_000_000_000)
176 sleep_nanos: int = round(update_interval_nanos / 2.5)
177 etas: list[ProgressReporter.TransferStat.ETA] = []
178 while True:
179 empty_file_name_queue: set[str] = set()
180 with self.lock:
181 if self.sleeper.is_stopping:
182 return
183 # progress reporter thread picks up pv log files that so far aren't being tailed
184 n = len(self.file_name_queue)
185 m = len(self.file_name_set)
186 self.file_name_set.update(self.file_name_queue) # union
187 assert len(self.file_name_set) == n + m # aka assert (previous) file_name_set.isdisjoint(file_name_queue)
188 local_file_name_queue: set[str] = self.file_name_queue
189 self.file_name_queue = empty_file_name_queue # exchange buffers
190 is_pausing: bool = self.is_pausing
191 self.is_pausing = False
192 is_resetting: bool = self.is_resetting
193 self.is_resetting = False
194 if is_pausing:
195 next_update_nanos: int = time.monotonic_ns() + 10 * 365 * 86400 * 1_000_000_000 # infinity
196 if is_resetting:
197 sent_bytes, last_status_len = 0, 0
198 num_lines, num_readables = 0, 0
199 start_time_nanos = time.monotonic_ns()
200 next_update_nanos = start_time_nanos + update_interval_nanos
201 latest_samples: deque[Sample] = deque([Sample(0, start_time_nanos)]) # sliding window w/ recent measurements
202 for pv_log_file in local_file_name_queue:
203 try:
204 Path(pv_log_file).touch()
205 fd = open_nofollow(pv_log_file, mode="r", newline="", encoding="utf-8")
206 except FileNotFoundError: # a third party has somehow deleted the log file or directory
207 with self.lock:
208 self.file_name_set.discard(pv_log_file) # enable re-adding the file later via enqueue_pv_log_file()
209 log.warning("ProgressReporter: pv log file disappeared before initial open, skipping: %s", pv_log_file)
210 continue # skip to the next file in the queue
211 fds.append(fd)
212 eta = self.TransferStat.ETA(timestamp_nanos=0, seq_nr=-len(fds), line_tail="")
213 selector.register(fd, selectors.EVENT_READ, data=(iter(fd), self.TransferStat(bytes_in_flight=0, eta=eta)))
214 etas.append(eta)
215 readables: list[tuple[selectors.SelectorKey, int]] = selector.select(timeout=0) # 0 indicates "don't block"
216 has_line: bool = False
217 curr_time_nanos: int = time.monotonic_ns()
218 selector_key: selectors.SelectorKey
219 for selector_key, _ in readables: # for each file that's ready for non-blocking read
220 num_readables += 1
221 iter_fd, s = selector_key.data
222 for line in iter_fd: # aka iter(fd)
223 sent_bytes += self._update_transfer_stat(line, s, curr_time_nanos)
224 num_lines += 1
225 has_line = True
226 if curr_time_nanos >= next_update_nanos:
227 elapsed_nanos: int = curr_time_nanos - start_time_nanos
228 msg0, msg3 = self._format_sent_bytes(sent_bytes, elapsed_nanos) # throughput etc since replication starttime
229 msg1: str = self._format_duration(elapsed_nanos) # duration since replication start time
230 oldest: Sample = latest_samples[0] # throughput etc, over sliding window
231 _, msg2 = self._format_sent_bytes(sent_bytes - oldest.sent_bytes, curr_time_nanos - oldest.timestamp_nanos)
232 msg4: str = max(etas).line_tail if len(etas) > 0 else "" # progress bar, ETAs
233 timestamp: str = datetime.now().isoformat(sep=" ", timespec="seconds") # 2024-09-03 12:26:15
234 status_line: str = f"{timestamp} [I] zfs sent {msg0} {msg1} {msg2} {msg3} {msg4}"
235 status_line = status_line.ljust(last_status_len) # "overwrite" trailing chars of previous status with spaces
237 # The Unix console skips back to the beginning of the console line when it sees this \r control char:
238 sys.stdout.write(f"{status_line}\r")
239 sys.stdout.flush()
241 # log.log(log_trace, "\nnum_lines: %s, num_readables: %s", num_lines, num_readables)
242 last_status_len = len(status_line.rstrip())
243 next_update_nanos += update_interval_nanos
244 latest_samples.append(Sample(sent_bytes, curr_time_nanos))
245 if elapsed_nanos >= sliding_window_nanos:
246 latest_samples.popleft() # slide the sliding window containing recent measurements
247 elif not has_line:
248 # Avoid burning CPU busily spinning on I/O readiness as fds are almost always ready for non-blocking read
249 # even if no new pv log line has been written. Yet retain ability to wake up immediately on reporter.stop().
250 self.sleeper.sleep(min(sleep_nanos, next_update_nanos - curr_time_nanos))
251 if self.inject_error:
252 raise ValueError("Injected ProgressReporter error") # for testing only
254 def _update_transfer_stat(self, line: str, s: TransferStat, curr_time_nanos: int) -> int:
255 """Update ``s`` from one pv status line and return bytes delta."""
256 num_bytes, s.eta.timestamp_nanos, s.eta.line_tail = self._parse_pv_line(line, curr_time_nanos)
257 bytes_in_flight: int = s.bytes_in_flight
258 s.bytes_in_flight = num_bytes if line.endswith("\r") else 0 # intermediate vs. final status update of each transfer
259 return num_bytes - bytes_in_flight
261 NO_RATES_REGEX = re.compile(r".*/s\s*[)\]]?\s*") # matches until end of last pv rate, e.g. "834MiB/s]" or "834MiB/s)"
262 # time remaining --eta "ETA 00:00:39" or "ETA 2+0:00:39" or "ETA 2:0:00:39", followed by trailing --fineta timestamp ETA
263 TIME_REMAINING_ETA_REGEX = re.compile(r".*?ETA\s*((\d+)[+:])?(\d\d?):(\d\d):(\d\d).*(ETA|FIN).*")
265 @staticmethod
266 def _parse_pv_line(line: str, curr_time_nanos: int) -> tuple[int, int, str]:
267 """Parses a pv status line into transferred bytes and ETA timestamp."""
268 assert isinstance(line, str)
269 if ":" in line:
270 line = line.split(":", 1)[1].strip()
271 sent_bytes, line = _pv_size_to_bytes(line)
272 line = ProgressReporter.NO_RATES_REGEX.sub("", line.lstrip(), 1) # remove pv --timer, --rate, --average-rate
273 if match := ProgressReporter.TIME_REMAINING_ETA_REGEX.fullmatch(line): # extract pv --eta duration
274 _, days, hours, minutes, secs, _ = match.groups()
275 time_remaining_secs = (86400 * int(days) if days else 0) + int(hours) * 3600 + int(minutes) * 60 + int(secs)
276 curr_time_nanos += time_remaining_secs * 1_000_000_000 # ETA timestamp = now + time remaining duration
277 return sent_bytes, curr_time_nanos, line
278 return 0, curr_time_nanos, ""
280 @staticmethod
281 def _format_sent_bytes(num_bytes: int, duration_nanos: int) -> tuple[str, str]:
282 """Returns a human-readable byte count and rate."""
283 bytes_per_sec: int = round(1_000_000_000 * num_bytes / max(1, duration_nanos))
284 return f"{human_readable_bytes(num_bytes, precision=2)}", f"[{human_readable_bytes(bytes_per_sec, precision=2)}/s]"
286 @staticmethod
287 def _format_duration(duration_nanos: int) -> str:
288 """Formats ``duration_nanos`` as HH:MM:SS string."""
289 total_seconds: int = round(duration_nanos / 1_000_000_000)
290 hours, remainder = divmod(total_seconds, 3600)
291 minutes, seconds = divmod(remainder, 60)
292 return f"{hours}:{minutes:02d}:{seconds:02d}"
294 def _get_update_intervals(self) -> tuple[float, float]:
295 """Extracts polling intervals from ``pv_program_opts``."""
296 parser = argparse.ArgumentParser(allow_abbrev=False)
297 parser.add_argument("--interval", "-i", type=float, default=1)
298 parser.add_argument("--average-rate-window", "-m", type=float, default=30)
299 args, _ = parser.parse_known_args(args=self.pv_program_opts)
300 interval: float = min(60 * 60, max(args.interval, 0.1))
301 return interval, min(60 * 60, max(args.average_rate_window, interval))
304def _pv_size_to_bytes(
305 size: str,
306) -> tuple[int, str]: # example inputs: "800B", "4.12 KiB", "510 MiB", "510 MB", "4Gb", "2TiB"
307 """Converts pv size string to bytes and returns remaining text."""
308 if match := PV_SIZE_TO_BYTES_REGEX.fullmatch(size):
309 number: float = float(match.group(1).replace(",", ".").replace(ARABIC_DECIMAL_SEPARATOR, "."))
310 i: int = "KMGTPEZYRQ".index(match.group(2)) if match.group(2) else -1
311 m: int = 1024 if match.group(3) == "i" else 1000
312 b: int = 1 if match.group(4) == "B" else 8
313 line_tail: str = match.group(5)
314 if line_tail and line_tail.startswith("/s"):
315 raise ValueError("Invalid pv_size: " + size) # stems from 'pv --rate' or 'pv --average-rate'
316 size_in_bytes: int = round(number * (m ** (i + 1)) / b)
317 return size_in_bytes, line_tail
318 else:
319 return 0, "" # skip partial or bad 'pv' log file line (pv process killed while writing?)
322def count_num_bytes_transferred_by_zfs_send(basis_pv_log_file: str) -> int:
323 """Scrapes the .pv log file(s) and sums up the 'pv --bytes' column."""
325 def parse_pv_line(line: str) -> int:
326 """Extracts byte count from a single pv log line."""
327 if ":" in line:
328 col: str = line.split(":", 1)[1].strip()
329 num_bytes, _ = _pv_size_to_bytes(col)
330 return num_bytes
331 return 0
333 total_bytes: int = 0
334 files: list[str] = [basis_pv_log_file] + glob.glob(basis_pv_log_file + PV_FILE_THREAD_SEPARATOR + "[0-9]*")
335 for file in files:
336 if os.path.isfile(file):
337 try:
338 with open_nofollow(file, mode="r", newline="", encoding="utf-8") as fd:
339 line: str | None = None
340 for line in fd:
341 if line.endswith("\r"):
342 continue # skip all but the most recent status update of each transfer
343 total_bytes += parse_pv_line(line)
344 line = None
345 if line is not None:
346 total_bytes += parse_pv_line(line) # consume last line of file w/ intermediate status update, if any
347 except FileNotFoundError:
348 pass # harmless
349 return total_bytes