Coverage for bzfs_main/progress_reporter.py: 99%

230 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-06 13:30 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Thread-based progress monitor for `pv` output during data transfers. 

16 

17It tails the log files produced by (parallel) ``pv`` processes and periodically prints a single status line showing 

18aggregated throughput, ETA, etc. The reporter runs in a separate daemon thread to avoid blocking replication. All methods 

19are designed for minimal synchronization overhead. 

20""" 

21 

22from __future__ import annotations 

23import argparse 

24import glob 

25import os 

26import re 

27import selectors 

28import sys 

29import threading 

30import time 

31from collections import deque 

32from dataclasses import dataclass, field 

33from datetime import datetime 

34from logging import Logger 

35from pathlib import Path 

36from typing import ( 

37 IO, 

38 Any, 

39 NamedTuple, 

40) 

41 

42from bzfs_main.utils import ( 

43 InterruptibleSleep, 

44 human_readable_bytes, 

45 open_nofollow, 

46) 

47 

48# constants 

49PV_FILE_THREAD_SEPARATOR: str = "_" 

50ARABIC_DECIMAL_SEPARATOR: str = "\u066b" # "٫" # noqa: RUF003 

51PV_SIZE_TO_BYTES_REGEX: re.Pattern = re.compile(rf"(\d+[.,{ARABIC_DECIMAL_SEPARATOR}]?\d*)\s*([KMGTPEZYRQ]?)(i?)([Bb])(.*)") 

52 

53 

54############################################################################# 

55class ProgressReporter: 

56 """Periodically prints progress updates to the same console status line, which is helpful if the program runs in an 

57 interactive Unix terminal session. 

58 

59 Tails the 'pv' output log files that are being written to by (parallel) replication, 

60 and extracts aggregate progress and throughput metrics from them, such as MB, MB/s, ETA, etc. Periodically prints these 

61 metrics to the console status line (but not to the log file), and in doing so "visually overwrites" the previous status 

62 line, via appending a \r carriage return control char rather than a \n newline char. Does not print a status line if the 

63 Unix environment var 'bzfs_isatty' is set to 'false', in order not to confuse programs that scrape redirected stdout. 

64 Example console status line: 

65 2025-01-17 01:23:04 [I] zfs sent 41.7 GiB 0:00:46 [963 MiB/s] [907 MiB/s] [==========> ] 80% ETA 0:00:04 ETA 01:23:08 

66 """ 

67 

68 def __init__( 

69 self, 

70 log: Logger, 

71 pv_program_opts: list[str], 

72 use_select: bool, 

73 progress_update_intervals: tuple[float, float] | None, 

74 fail: bool = False, 

75 ) -> None: 

76 """Creates a reporter configured for ``pv`` log parsing.""" 

77 # immutable variables: 

78 self.log: Logger = log 

79 self.pv_program_opts: list[str] = pv_program_opts 

80 self.use_select: bool = use_select 

81 self.progress_update_intervals: tuple[float, float] | None = progress_update_intervals 

82 self.inject_error: bool = fail # for testing only 

83 

84 # mutable variables: 

85 self.thread: threading.Thread | None = None 

86 self.exception: BaseException | None = None 

87 self.lock: threading.Lock = threading.Lock() 

88 self.sleeper: InterruptibleSleep = InterruptibleSleep(self.lock) # sleeper shares lock with reporter 

89 self.file_name_queue: set[str] = set() 

90 self.file_name_set: set[str] = set() 

91 self.is_resetting: bool = True 

92 self.is_pausing: bool = False 

93 

94 def start(self) -> None: 

95 """Starts the monitoring thread and begins asynchronous parsing of ``pv`` log files.""" 

96 with self.lock: 

97 assert self.thread is None 

98 self.thread = threading.Thread(target=lambda: self._run(), name="progress_reporter", daemon=True) 

99 self.thread.start() 

100 

101 def stop(self) -> None: 

102 """Blocks until reporter is stopped, then reraises any exception that may have happened during log processing.""" 

103 self.sleeper.interrupt() 

104 t = self.thread 

105 if t is not None: 

106 t.join() 

107 e = self.exception 

108 if e is not None: 

109 raise e # reraise exception in current thread 

110 

111 def pause(self) -> None: 

112 """Temporarily suspends status logging.""" 

113 with self.lock: 

114 self.is_pausing = True 

115 

116 def reset(self) -> None: 

117 """Clears metrics before processing a new batch of logs; the purpose is to discard previous totals to avoid mixing 

118 unrelated transfers.""" 

119 with self.lock: 

120 self.is_resetting = True 

121 

122 def enqueue_pv_log_file(self, pv_log_file: str) -> None: 

123 """Tells progress reporter thread to also monitor and tail the given pv log file.""" 

124 with self.lock: 

125 if pv_log_file not in self.file_name_set: 

126 self.file_name_queue.add(pv_log_file) 

127 

128 def _run(self) -> None: 

129 """Thread entry point consuming pv logs and updating metrics.""" 

130 log = self.log 

131 try: 

132 fds: list[IO[Any]] = [] 

133 try: 

134 selector = selectors.SelectSelector() if self.use_select else selectors.PollSelector() 

135 try: 

136 self._run_internal(fds, selector) 

137 finally: 

138 selector.close() 

139 finally: 

140 for fd in fds: 

141 fd.close() 

142 except BaseException as e: 

143 self.exception = e # will be reraised in stop() 

144 log.exception("%s", "ProgressReporter:") 

145 

146 @dataclass 

147 class TransferStat: 

148 """Tracks per-file transfer state and ETA.""" 

149 

150 @dataclass(order=True) 

151 class ETA: 

152 """Estimated time of arrival.""" 

153 

154 timestamp_nanos: int # sorted by future time at which current zfs send/recv transfer is estimated to complete 

155 seq_nr: int # tiebreaker wrt. sort order 

156 line_tail: str = field(compare=False) # trailing pv log line part w/ progress bar, duration ETA, timestamp ETA 

157 

158 bytes_in_flight: int 

159 eta: ETA 

160 

161 def _run_internal(self, fds: list[IO[Any]], selector: selectors.BaseSelector) -> None: 

162 """Tails pv log files and periodically logs aggregated progress.""" 

163 

164 class Sample(NamedTuple): 

165 """Sliding window entry for throughput calculation.""" 

166 

167 sent_bytes: int 

168 timestamp_nanos: int 

169 

170 log = self.log 

171 update_interval_secs, sliding_window_secs = ( 

172 self.progress_update_intervals if self.progress_update_intervals is not None else self._get_update_intervals() 

173 ) 

174 update_interval_nanos: int = round(update_interval_secs * 1_000_000_000) 

175 sliding_window_nanos: int = round(sliding_window_secs * 1_000_000_000) 

176 sleep_nanos: int = round(update_interval_nanos / 2.5) 

177 etas: list[ProgressReporter.TransferStat.ETA] = [] 

178 while True: 

179 empty_file_name_queue: set[str] = set() 

180 with self.lock: 

181 if self.sleeper.is_stopping: 

182 return 

183 # progress reporter thread picks up pv log files that so far aren't being tailed 

184 n = len(self.file_name_queue) 

185 m = len(self.file_name_set) 

186 self.file_name_set.update(self.file_name_queue) # union 

187 assert len(self.file_name_set) == n + m # aka assert (previous) file_name_set.isdisjoint(file_name_queue) 

188 local_file_name_queue: set[str] = self.file_name_queue 

189 self.file_name_queue = empty_file_name_queue # exchange buffers 

190 is_pausing: bool = self.is_pausing 

191 self.is_pausing = False 

192 is_resetting: bool = self.is_resetting 

193 self.is_resetting = False 

194 if is_pausing: 

195 next_update_nanos: int = time.monotonic_ns() + 10 * 365 * 86400 * 1_000_000_000 # infinity 

196 if is_resetting: 

197 sent_bytes, last_status_len = 0, 0 

198 num_lines, num_readables = 0, 0 

199 start_time_nanos = time.monotonic_ns() 

200 next_update_nanos = start_time_nanos + update_interval_nanos 

201 latest_samples: deque[Sample] = deque([Sample(0, start_time_nanos)]) # sliding window w/ recent measurements 

202 for pv_log_file in local_file_name_queue: 

203 try: 

204 Path(pv_log_file).touch() 

205 fd = open_nofollow(pv_log_file, mode="r", newline="", encoding="utf-8") 

206 except FileNotFoundError: # a third party has somehow deleted the log file or directory 

207 with self.lock: 

208 self.file_name_set.discard(pv_log_file) # enable re-adding the file later via enqueue_pv_log_file() 

209 log.warning("ProgressReporter: pv log file disappeared before initial open, skipping: %s", pv_log_file) 

210 continue # skip to the next file in the queue 

211 fds.append(fd) 

212 eta = self.TransferStat.ETA(timestamp_nanos=0, seq_nr=-len(fds), line_tail="") 

213 selector.register(fd, selectors.EVENT_READ, data=(iter(fd), self.TransferStat(bytes_in_flight=0, eta=eta))) 

214 etas.append(eta) 

215 readables: list[tuple[selectors.SelectorKey, int]] = selector.select(timeout=0) # 0 indicates "don't block" 

216 has_line: bool = False 

217 curr_time_nanos: int = time.monotonic_ns() 

218 selector_key: selectors.SelectorKey 

219 for selector_key, _ in readables: # for each file that's ready for non-blocking read 

220 num_readables += 1 

221 iter_fd, s = selector_key.data 

222 for line in iter_fd: # aka iter(fd) 

223 sent_bytes += self._update_transfer_stat(line, s, curr_time_nanos) 

224 num_lines += 1 

225 has_line = True 

226 if curr_time_nanos >= next_update_nanos: 

227 elapsed_nanos: int = curr_time_nanos - start_time_nanos 

228 msg0, msg3 = self._format_sent_bytes(sent_bytes, elapsed_nanos) # throughput etc since replication starttime 

229 msg1: str = self._format_duration(elapsed_nanos) # duration since replication start time 

230 oldest: Sample = latest_samples[0] # throughput etc, over sliding window 

231 _, msg2 = self._format_sent_bytes(sent_bytes - oldest.sent_bytes, curr_time_nanos - oldest.timestamp_nanos) 

232 msg4: str = max(etas).line_tail if len(etas) > 0 else "" # progress bar, ETAs 

233 timestamp: str = datetime.now().isoformat(sep=" ", timespec="seconds") # 2024-09-03 12:26:15 

234 status_line: str = f"{timestamp} [I] zfs sent {msg0} {msg1} {msg2} {msg3} {msg4}" 

235 status_line = status_line.ljust(last_status_len) # "overwrite" trailing chars of previous status with spaces 

236 

237 # The Unix console skips back to the beginning of the console line when it sees this \r control char: 

238 sys.stdout.write(f"{status_line}\r") 

239 sys.stdout.flush() 

240 

241 # log.log(log_trace, "\nnum_lines: %s, num_readables: %s", num_lines, num_readables) 

242 last_status_len = len(status_line.rstrip()) 

243 next_update_nanos += update_interval_nanos 

244 latest_samples.append(Sample(sent_bytes, curr_time_nanos)) 

245 if elapsed_nanos >= sliding_window_nanos: 

246 latest_samples.popleft() # slide the sliding window containing recent measurements 

247 elif not has_line: 

248 # Avoid burning CPU busily spinning on I/O readiness as fds are almost always ready for non-blocking read 

249 # even if no new pv log line has been written. Yet retain ability to wake up immediately on reporter.stop(). 

250 self.sleeper.sleep(min(sleep_nanos, next_update_nanos - curr_time_nanos)) 

251 if self.inject_error: 

252 raise ValueError("Injected ProgressReporter error") # for testing only 

253 

254 def _update_transfer_stat(self, line: str, s: TransferStat, curr_time_nanos: int) -> int: 

255 """Update ``s`` from one pv status line and return bytes delta.""" 

256 num_bytes, s.eta.timestamp_nanos, s.eta.line_tail = self._parse_pv_line(line, curr_time_nanos) 

257 bytes_in_flight: int = s.bytes_in_flight 

258 s.bytes_in_flight = num_bytes if line.endswith("\r") else 0 # intermediate vs. final status update of each transfer 

259 return num_bytes - bytes_in_flight 

260 

261 NO_RATES_REGEX = re.compile(r".*/s\s*[)\]]?\s*") # matches until end of last pv rate, e.g. "834MiB/s]" or "834MiB/s)" 

262 # time remaining --eta "ETA 00:00:39" or "ETA 2+0:00:39" or "ETA 2:0:00:39", followed by trailing --fineta timestamp ETA 

263 TIME_REMAINING_ETA_REGEX = re.compile(r".*?ETA\s*((\d+)[+:])?(\d\d?):(\d\d):(\d\d).*(ETA|FIN).*") 

264 

265 @staticmethod 

266 def _parse_pv_line(line: str, curr_time_nanos: int) -> tuple[int, int, str]: 

267 """Parses a pv status line into transferred bytes and ETA timestamp.""" 

268 assert isinstance(line, str) 

269 if ":" in line: 

270 line = line.split(":", 1)[1].strip() 

271 sent_bytes, line = _pv_size_to_bytes(line) 

272 line = ProgressReporter.NO_RATES_REGEX.sub("", line.lstrip(), 1) # remove pv --timer, --rate, --average-rate 

273 if match := ProgressReporter.TIME_REMAINING_ETA_REGEX.fullmatch(line): # extract pv --eta duration 

274 _, days, hours, minutes, secs, _ = match.groups() 

275 time_remaining_secs = (86400 * int(days) if days else 0) + int(hours) * 3600 + int(minutes) * 60 + int(secs) 

276 curr_time_nanos += time_remaining_secs * 1_000_000_000 # ETA timestamp = now + time remaining duration 

277 return sent_bytes, curr_time_nanos, line 

278 return 0, curr_time_nanos, "" 

279 

280 @staticmethod 

281 def _format_sent_bytes(num_bytes: int, duration_nanos: int) -> tuple[str, str]: 

282 """Returns a human-readable byte count and rate.""" 

283 bytes_per_sec: int = round(1_000_000_000 * num_bytes / max(1, duration_nanos)) 

284 return f"{human_readable_bytes(num_bytes, precision=2)}", f"[{human_readable_bytes(bytes_per_sec, precision=2)}/s]" 

285 

286 @staticmethod 

287 def _format_duration(duration_nanos: int) -> str: 

288 """Formats ``duration_nanos`` as HH:MM:SS string.""" 

289 total_seconds: int = round(duration_nanos / 1_000_000_000) 

290 hours, remainder = divmod(total_seconds, 3600) 

291 minutes, seconds = divmod(remainder, 60) 

292 return f"{hours}:{minutes:02d}:{seconds:02d}" 

293 

294 def _get_update_intervals(self) -> tuple[float, float]: 

295 """Extracts polling intervals from ``pv_program_opts``.""" 

296 parser = argparse.ArgumentParser(allow_abbrev=False) 

297 parser.add_argument("--interval", "-i", type=float, default=1) 

298 parser.add_argument("--average-rate-window", "-m", type=float, default=30) 

299 args, _ = parser.parse_known_args(args=self.pv_program_opts) 

300 interval: float = min(60 * 60, max(args.interval, 0.1)) 

301 return interval, min(60 * 60, max(args.average_rate_window, interval)) 

302 

303 

304def _pv_size_to_bytes( 

305 size: str, 

306) -> tuple[int, str]: # example inputs: "800B", "4.12 KiB", "510 MiB", "510 MB", "4Gb", "2TiB" 

307 """Converts pv size string to bytes and returns remaining text.""" 

308 if match := PV_SIZE_TO_BYTES_REGEX.fullmatch(size): 

309 number: float = float(match.group(1).replace(",", ".").replace(ARABIC_DECIMAL_SEPARATOR, ".")) 

310 i: int = "KMGTPEZYRQ".index(match.group(2)) if match.group(2) else -1 

311 m: int = 1024 if match.group(3) == "i" else 1000 

312 b: int = 1 if match.group(4) == "B" else 8 

313 line_tail: str = match.group(5) 

314 if line_tail and line_tail.startswith("/s"): 

315 raise ValueError("Invalid pv_size: " + size) # stems from 'pv --rate' or 'pv --average-rate' 

316 size_in_bytes: int = round(number * (m ** (i + 1)) / b) 

317 return size_in_bytes, line_tail 

318 else: 

319 return 0, "" # skip partial or bad 'pv' log file line (pv process killed while writing?) 

320 

321 

322def count_num_bytes_transferred_by_zfs_send(basis_pv_log_file: str) -> int: 

323 """Scrapes the .pv log file(s) and sums up the 'pv --bytes' column.""" 

324 

325 def parse_pv_line(line: str) -> int: 

326 """Extracts byte count from a single pv log line.""" 

327 if ":" in line: 

328 col: str = line.split(":", 1)[1].strip() 

329 num_bytes, _ = _pv_size_to_bytes(col) 

330 return num_bytes 

331 return 0 

332 

333 total_bytes: int = 0 

334 files: list[str] = [basis_pv_log_file] + glob.glob(basis_pv_log_file + PV_FILE_THREAD_SEPARATOR + "[0-9]*") 

335 for file in files: 

336 if os.path.isfile(file): 

337 try: 

338 with open_nofollow(file, mode="r", newline="", encoding="utf-8") as fd: 

339 line: str | None = None 

340 for line in fd: 

341 if line.endswith("\r"): 

342 continue # skip all but the most recent status update of each transfer 

343 total_bytes += parse_pv_line(line) 

344 line = None 

345 if line is not None: 

346 total_bytes += parse_pv_line(line) # consume last line of file w/ intermediate status update, if any 

347 except FileNotFoundError: 

348 pass # harmless 

349 return total_bytes