Coverage for bzfs_main/progress_reporter.py: 99%
248 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Thread-based progress monitor for `pv` output during data transfers.
17It tails the log files produced by (parallel) ``pv`` processes and periodically prints a single status line showing
18aggregated throughput, ETA, etc. The reporter runs in a separate daemon thread to avoid blocking replication. All methods
19are designed for minimal synchronization overhead.
20"""
22from __future__ import (
23 annotations,
24)
25import argparse
26import glob
27import os
28import re
29import selectors
30import sys
31import threading
32import time
33from collections import (
34 deque,
35)
36from dataclasses import (
37 dataclass,
38 field,
39)
40from datetime import (
41 datetime,
42)
43from enum import (
44 Enum,
45 auto,
46)
47from logging import (
48 Logger,
49)
50from pathlib import (
51 Path,
52)
53from typing import (
54 IO,
55 Any,
56 Final,
57 NamedTuple,
58)
60from bzfs_main.utils import (
61 FILE_PERMISSIONS,
62 InterruptibleSleep,
63 human_readable_bytes,
64 open_nofollow,
65)
67# constants
68PV_FILE_THREAD_SEPARATOR: Final[str] = "_"
69ARABIC_DECIMAL_SEPARATOR: Final[str] = "\u066b" # "٫" # noqa: RUF003
70PV_SIZE_TO_BYTES_REGEX: Final[re.Pattern[str]] = re.compile(
71 rf"(\d+[.,{ARABIC_DECIMAL_SEPARATOR}]?\d*)\s*([KMGTPEZYRQ]?)(i?)([Bb])(.*)"
72)
75#############################################################################
76class State(Enum):
77 """Progress reporter lifecycle state transitions."""
79 IS_PAUSING = auto()
80 IS_RESETTING = auto()
83#############################################################################
84class ProgressReporter:
85 """Periodically prints progress updates to the same console status line, which is helpful if the program runs in an
86 interactive Unix terminal session.
88 Tails the 'pv' output log files that are being written to by (parallel) replication,
89 and extracts aggregate progress and throughput metrics from them, such as MB, MB/s, ETA, etc. Periodically prints these
90 metrics to the console status line (but not to the log file), and in doing so "visually overwrites" the previous status
91 line, via appending a \r carriage return control char rather than a \n newline char. Does not print a status line if the
92 Unix environment var 'bzfs_isatty' is set to 'false', in order not to confuse programs that scrape redirected stdout.
93 Example console status line:
94 2025-01-17 01:23:04 [I] zfs sent 41.7 GiB 0:00:46 [963 MiB/s] [907 MiB/s] [==========> ] 80% ETA 0:00:04 ETA 01:23:08
95 """
97 def __init__(
98 self,
99 log: Logger,
100 pv_program_opts: list[str],
101 use_select: bool,
102 progress_update_intervals: tuple[float, float] | None,
103 fail: bool = False,
104 ) -> None:
105 """Creates a reporter configured for ``pv`` log parsing."""
106 # immutable variables:
107 self._log: Final[Logger] = log
108 self._pv_program_opts: Final[list[str]] = pv_program_opts
109 self._use_select: Final[bool] = use_select
110 self._progress_update_intervals: Final[tuple[float, float] | None] = progress_update_intervals
111 self._inject_error: bool = fail # for testing only
113 # mutable variables:
114 self._thread: threading.Thread | None = None
115 self._exception: BaseException | None = None
116 self._lock: Final[threading.Lock] = threading.Lock()
117 self._sleeper: Final[InterruptibleSleep] = InterruptibleSleep(self._lock) # sleeper shares lock with reporter
118 self._file_name_queue: set[str] = set()
119 self._file_name_set: Final[set[str]] = set()
120 self._is_stopping: bool = False
121 self._states: list[State] = [State.IS_RESETTING]
123 def start(self) -> None:
124 """Starts the monitoring thread and begins asynchronous parsing of ``pv`` log files."""
125 with self._lock:
126 assert self._thread is None
127 self._thread = threading.Thread(target=lambda: self._run(), name="progress_reporter", daemon=True)
128 self._thread.start()
130 def stop(self) -> None:
131 """Blocks until reporter is stopped, then reraises any exception that may have happened during log processing."""
132 with self._lock:
133 self._is_stopping = True
134 self._sleeper.interrupt()
135 t = self._thread
136 if t is not None:
137 t.join()
138 e = self._exception
139 if e is not None:
140 raise e # reraise exception in current thread
142 def pause(self) -> None:
143 """Temporarily suspends status logging."""
144 self._append_state(State.IS_PAUSING)
146 def reset(self) -> None:
147 """Clears metrics before processing a new batch of logs; the purpose is to discard previous totals to avoid mixing
148 unrelated transfers."""
149 self._append_state(State.IS_RESETTING)
151 def _append_state(self, state: State) -> None:
152 with self._lock:
153 states: list[State] = self._states
154 if len(states) > 0 and states[-1] is state:
155 return # same state twice in a row is a no-op
156 states.append(state)
157 if len(states) >= 3:
158 del states[0] # cap time and memory consumption by removing redundant state transitions
159 self._sleeper.interrupt()
161 def enqueue_pv_log_file(self, pv_log_file: str) -> None:
162 """Tells progress reporter thread to also monitor and tail the given pv log file."""
163 with self._lock:
164 if pv_log_file not in self._file_name_set:
165 self._file_name_queue.add(pv_log_file)
167 def _run(self) -> None:
168 """Thread entry point consuming pv logs and updating metrics."""
169 log = self._log
170 try:
171 fds: list[IO[Any]] = []
172 try:
173 selector = selectors.SelectSelector() if self._use_select else selectors.PollSelector()
174 try:
175 self._run_internal(fds, selector)
176 finally:
177 selector.close()
178 finally:
179 for fd in fds:
180 fd.close()
181 except BaseException as e:
182 if not isinstance(e, BrokenPipeError): 182 ↛ exitline 182 didn't return from function '_run' because the condition on line 182 was always true
183 self._exception = e # will be reraised in stop()
184 log.exception("%s", "ProgressReporter:")
186 @dataclass
187 class TransferStat:
188 """Tracks per-file transfer state and ETA."""
190 @dataclass(order=True)
191 class ETA:
192 """Estimated time of arrival."""
194 timestamp_nanos: int # sorted by future time at which current zfs send/recv transfer is estimated to complete
195 seq_nr: int # tiebreaker wrt. sort order
196 line_tail: str = field(compare=False) # trailing pv log line part w/ progress bar, duration ETA, timestamp ETA
198 bytes_in_flight: int
199 eta: ETA
201 def _run_internal(self, fds: list[IO[Any]], selector: selectors.BaseSelector) -> None:
202 """Tails pv log files and periodically logs aggregated progress."""
204 class Sample(NamedTuple):
205 """Sliding window entry for throughput calculation."""
207 sent_bytes: int
208 timestamp_nanos: int
210 log = self._log
211 update_interval_secs, sliding_window_secs = (
212 self._progress_update_intervals if self._progress_update_intervals is not None else self._get_update_intervals()
213 )
214 update_interval_nanos: int = round(update_interval_secs * 1_000_000_000)
215 sliding_window_nanos: int = round(sliding_window_secs * 1_000_000_000)
216 sleep_nanos: int = 0
217 etas: list[ProgressReporter.TransferStat.ETA] = []
218 while True:
219 empty_file_name_queue: set[str] = set()
220 empty_states: list[State] = []
221 with self._lock:
222 if self._is_stopping:
223 return
224 # progress reporter thread picks up pv log files that so far aren't being tailed
225 n = len(self._file_name_queue)
226 m = len(self._file_name_set)
227 self._file_name_set.update(self._file_name_queue) # union
228 assert len(self._file_name_set) == n + m # aka assert (previous) file_name_set.isdisjoint(file_name_queue)
229 local_file_name_queue: set[str] = self._file_name_queue
230 self._file_name_queue = empty_file_name_queue # exchange buffers
231 states: list[State] = self._states
232 self._states = empty_states # exchange buffers
233 for state in states:
234 if state is State.IS_PAUSING:
235 next_update_nanos: int = time.monotonic_ns() + 10 * 365 * 86400 * 1_000_000_000 # infinity
236 sleep_nanos = next_update_nanos
237 else:
238 assert state is State.IS_RESETTING
239 sent_bytes, last_status_len = 0, 0
240 num_lines, num_readables = 0, 0
241 start_time_nanos = time.monotonic_ns()
242 next_update_nanos = start_time_nanos + update_interval_nanos
243 sleep_nanos = round(update_interval_nanos / 2.5)
244 latest_samples: deque[Sample] = deque([Sample(0, start_time_nanos)]) # sliding window w/ recent measures
245 for pv_log_file in local_file_name_queue:
246 try:
247 Path(pv_log_file).touch(mode=FILE_PERMISSIONS)
248 fd = open_nofollow(pv_log_file, mode="r", newline="", encoding="utf-8")
249 except FileNotFoundError: # a third party has somehow deleted the log file or directory
250 with self._lock:
251 self._file_name_set.discard(pv_log_file) # enable re-adding the file later via enqueue_pv_log_file()
252 log.warning("ProgressReporter: pv log file disappeared before initial open, skipping: %s", pv_log_file)
253 continue # skip to the next file in the queue
254 fds.append(fd)
255 eta = self.TransferStat.ETA(timestamp_nanos=0, seq_nr=-len(fds), line_tail="")
256 selector.register(fd, selectors.EVENT_READ, data=(iter(fd), self.TransferStat(bytes_in_flight=0, eta=eta)))
257 etas.append(eta)
258 readables: list[tuple[selectors.SelectorKey, int]] = selector.select(timeout=0) # 0 indicates "don't block"
259 has_line: bool = False
260 curr_time_nanos: int = time.monotonic_ns()
261 for selector_key, _ in readables: # for each file that's ready for non-blocking read
262 num_readables += 1
263 iter_fd, transfer_stat = selector_key.data
264 for line in iter_fd: # aka iter(fd)
265 sent_bytes += self._update_transfer_stat(line, transfer_stat, curr_time_nanos)
266 num_lines += 1
267 has_line = True
268 if curr_time_nanos >= next_update_nanos:
269 elapsed_nanos: int = curr_time_nanos - start_time_nanos
270 msg0, msg3 = self._format_sent_bytes(sent_bytes, elapsed_nanos) # throughput etc since replication starttime
271 msg1: str = self._format_duration(elapsed_nanos) # duration since replication start time
272 oldest: Sample = latest_samples[0] # throughput etc, over sliding window
273 _, msg2 = self._format_sent_bytes(sent_bytes - oldest.sent_bytes, curr_time_nanos - oldest.timestamp_nanos)
274 msg4: str = max(etas).line_tail if len(etas) > 0 else "" # progress bar, ETAs
275 timestamp: str = datetime.now().isoformat(sep=" ", timespec="seconds") # 2024-09-03 12:26:15
276 status_line: str = f"{timestamp} [I] zfs sent {msg0} {msg1} {msg2} {msg3} {msg4}"
277 status_line = status_line.ljust(last_status_len) # "overwrite" trailing chars of previous status with spaces
279 # The Unix console skips back to the beginning of the console line when it sees this \r control char:
280 sys.stdout.write(f"{status_line}\r")
281 sys.stdout.flush()
283 # log.log(log_trace, "\nnum_lines: %s, num_readables: %s", num_lines, num_readables)
284 last_status_len = len(status_line.rstrip())
285 next_update_nanos += update_interval_nanos
286 latest_samples.append(Sample(sent_bytes, curr_time_nanos))
287 if elapsed_nanos >= sliding_window_nanos:
288 latest_samples.popleft() # slide the sliding window containing recent measurements
289 elif not has_line:
290 # Avoid burning CPU busily spinning on I/O readiness as fds are almost always ready for non-blocking read
291 # even if no new pv log line has been written. Yet retain ability to wake up immediately on reporter.stop().
292 if self._sleeper.sleep(min(sleep_nanos, next_update_nanos - curr_time_nanos)):
293 self._sleeper.reset() # sleep was interrupted; ensure we can sleep normally again
294 if self._inject_error:
295 raise ValueError("Injected ProgressReporter error") # for testing only
297 def _update_transfer_stat(self, line: str, s: TransferStat, curr_time_nanos: int) -> int:
298 """Update ``s`` from one pv status line and return bytes delta."""
299 num_bytes, s.eta.timestamp_nanos, s.eta.line_tail = self._parse_pv_line(line, curr_time_nanos)
300 bytes_in_flight: int = s.bytes_in_flight
301 s.bytes_in_flight = num_bytes if line.endswith("\r") else 0 # intermediate vs. final status update of each transfer
302 return num_bytes - bytes_in_flight
304 NO_RATES_REGEX = re.compile(r".*/s\s*[)\]]?\s*") # matches until end of last pv rate, e.g. "834MiB/s]" or "834MiB/s)"
305 # time remaining --eta "ETA 00:00:39" or "ETA 2+0:00:39" or "ETA 2:0:00:39", followed by trailing --fineta timestamp ETA
306 TIME_REMAINING_ETA_REGEX = re.compile(r".*?ETA\s*((\d+)[+:])?(\d\d?):(\d\d):(\d\d).*(ETA|FIN).*")
308 @staticmethod
309 def _parse_pv_line(line: str, curr_time_nanos: int) -> tuple[int, int, str]:
310 """Parses a pv status line into transferred bytes and ETA timestamp."""
311 assert isinstance(line, str)
312 if ":" in line:
313 line = line.split(":", 1)[1].strip()
314 sent_bytes, line = _pv_size_to_bytes(line)
315 line = ProgressReporter.NO_RATES_REGEX.sub("", line.lstrip(), count=1) # strip --timer, --rate, --avg-rate
316 if match := ProgressReporter.TIME_REMAINING_ETA_REGEX.fullmatch(line): # extract pv --eta duration
317 _, days, hours, minutes, secs, _ = match.groups()
318 time_remaining_secs = (86400 * int(days) if days else 0) + int(hours) * 3600 + int(minutes) * 60 + int(secs)
319 curr_time_nanos += time_remaining_secs * 1_000_000_000 # ETA timestamp = now + time remaining duration
320 return sent_bytes, curr_time_nanos, line
321 return 0, curr_time_nanos, ""
323 @staticmethod
324 def _format_sent_bytes(num_bytes: int, duration_nanos: int) -> tuple[str, str]:
325 """Returns a human-readable byte count and rate."""
326 bytes_per_sec: int = round(1_000_000_000 * num_bytes / max(1, duration_nanos))
327 return f"{human_readable_bytes(num_bytes, precision=2)}", f"[{human_readable_bytes(bytes_per_sec, precision=2)}/s]"
329 @staticmethod
330 def _format_duration(duration_nanos: int) -> str:
331 """Formats ``duration_nanos`` as HH:MM:SS string."""
332 total_seconds: int = duration_nanos // 1_000_000_000
333 hours, remainder = divmod(total_seconds, 3600)
334 minutes, seconds = divmod(remainder, 60)
335 return f"{hours}:{minutes:02d}:{seconds:02d}"
337 def _get_update_intervals(self) -> tuple[float, float]:
338 """Extracts polling intervals from ``pv_program_opts``."""
339 parser = argparse.ArgumentParser(allow_abbrev=False)
340 parser.add_argument("--interval", "-i", type=float, default=1)
341 parser.add_argument("--average-rate-window", "-m", type=float, default=30)
342 args, _ = parser.parse_known_args(args=self._pv_program_opts)
343 interval: float = min(60 * 60, max(args.interval, 0.1))
344 return interval, min(60 * 60, max(args.average_rate_window, interval))
347def _pv_size_to_bytes(
348 size: str,
349) -> tuple[int, str]: # example inputs: "800B", "4.12 KiB", "510 MiB", "510 MB", "4Gb", "2TiB"
350 """Converts pv size string to bytes and returns remaining text."""
351 if match := PV_SIZE_TO_BYTES_REGEX.fullmatch(size):
352 number: float = float(match.group(1).replace(",", ".").replace(ARABIC_DECIMAL_SEPARATOR, "."))
353 i: int = "KMGTPEZYRQ".index(match.group(2)) if match.group(2) else -1
354 m: int = 1024 if match.group(3) == "i" else 1000
355 b: int = 1 if match.group(4) == "B" else 8
356 line_tail: str = match.group(5)
357 if line_tail and line_tail.startswith("/s"):
358 raise ValueError("Invalid pv_size: " + size) # stems from 'pv --rate' or 'pv --average-rate'
359 size_in_bytes: int = round(number * (m ** (i + 1)) / b)
360 return size_in_bytes, line_tail
361 else:
362 return 0, "" # skip partial or bad 'pv' log file line (pv process killed while writing?)
365def count_num_bytes_transferred_by_zfs_send(basis_pv_log_file: str) -> int:
366 """Scrapes the .pv log file(s) and sums up the 'pv --bytes' column."""
368 def parse_pv_line(line: str) -> int:
369 """Extracts byte count from a single pv log line."""
370 if ":" in line:
371 col: str = line.split(":", 1)[1].strip()
372 num_bytes, _ = _pv_size_to_bytes(col)
373 return num_bytes
374 return 0
376 total_bytes: int = 0
377 files: list[str] = [basis_pv_log_file] + glob.glob(basis_pv_log_file + PV_FILE_THREAD_SEPARATOR + "[0-9]*")
378 for file in files:
379 if os.path.isfile(file):
380 try:
381 with open_nofollow(file, mode="r", newline="", encoding="utf-8") as fd:
382 line: str | None = None
383 for line in fd:
384 assert line is not None
385 if line.endswith("\r"):
386 continue # skip all but the most recent status update of each transfer
387 total_bytes += parse_pv_line(line)
388 line = None
389 if line is not None:
390 total_bytes += parse_pv_line(line) # consume last line of file w/ intermediate status update, if any
391 except FileNotFoundError:
392 pass # harmless
393 return total_bytes