Coverage for bzfs_main/progress_reporter.py: 99%

248 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-07 04:44 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Thread-based progress monitor for `pv` output during data transfers. 

16 

17It tails the log files produced by (parallel) ``pv`` processes and periodically prints a single status line showing 

18aggregated throughput, ETA, etc. The reporter runs in a separate daemon thread to avoid blocking replication. All methods 

19are designed for minimal synchronization overhead. 

20""" 

21 

22from __future__ import ( 

23 annotations, 

24) 

25import argparse 

26import glob 

27import os 

28import re 

29import selectors 

30import sys 

31import threading 

32import time 

33from collections import ( 

34 deque, 

35) 

36from dataclasses import ( 

37 dataclass, 

38 field, 

39) 

40from datetime import ( 

41 datetime, 

42) 

43from enum import ( 

44 Enum, 

45 auto, 

46) 

47from logging import ( 

48 Logger, 

49) 

50from pathlib import ( 

51 Path, 

52) 

53from typing import ( 

54 IO, 

55 Any, 

56 Final, 

57 NamedTuple, 

58) 

59 

60from bzfs_main.utils import ( 

61 FILE_PERMISSIONS, 

62 InterruptibleSleep, 

63 human_readable_bytes, 

64 open_nofollow, 

65) 

66 

67# constants 

68PV_FILE_THREAD_SEPARATOR: Final[str] = "_" 

69ARABIC_DECIMAL_SEPARATOR: Final[str] = "\u066b" # "٫" # noqa: RUF003 

70PV_SIZE_TO_BYTES_REGEX: Final[re.Pattern[str]] = re.compile( 

71 rf"(\d+[.,{ARABIC_DECIMAL_SEPARATOR}]?\d*)\s*([KMGTPEZYRQ]?)(i?)([Bb])(.*)" 

72) 

73 

74 

75############################################################################# 

76class State(Enum): 

77 """Progress reporter lifecycle state transitions.""" 

78 

79 IS_PAUSING = auto() 

80 IS_RESETTING = auto() 

81 

82 

83############################################################################# 

84class ProgressReporter: 

85 """Periodically prints progress updates to the same console status line, which is helpful if the program runs in an 

86 interactive Unix terminal session. 

87 

88 Tails the 'pv' output log files that are being written to by (parallel) replication, 

89 and extracts aggregate progress and throughput metrics from them, such as MB, MB/s, ETA, etc. Periodically prints these 

90 metrics to the console status line (but not to the log file), and in doing so "visually overwrites" the previous status 

91 line, via appending a \r carriage return control char rather than a \n newline char. Does not print a status line if the 

92 Unix environment var 'bzfs_isatty' is set to 'false', in order not to confuse programs that scrape redirected stdout. 

93 Example console status line: 

94 2025-01-17 01:23:04 [I] zfs sent 41.7 GiB 0:00:46 [963 MiB/s] [907 MiB/s] [==========> ] 80% ETA 0:00:04 ETA 01:23:08 

95 """ 

96 

97 def __init__( 

98 self, 

99 log: Logger, 

100 pv_program_opts: list[str], 

101 use_select: bool, 

102 progress_update_intervals: tuple[float, float] | None, 

103 fail: bool = False, 

104 ) -> None: 

105 """Creates a reporter configured for ``pv`` log parsing.""" 

106 # immutable variables: 

107 self._log: Final[Logger] = log 

108 self._pv_program_opts: Final[list[str]] = pv_program_opts 

109 self._use_select: Final[bool] = use_select 

110 self._progress_update_intervals: Final[tuple[float, float] | None] = progress_update_intervals 

111 self._inject_error: bool = fail # for testing only 

112 

113 # mutable variables: 

114 self._thread: threading.Thread | None = None 

115 self._exception: BaseException | None = None 

116 self._lock: Final[threading.Lock] = threading.Lock() 

117 self._sleeper: Final[InterruptibleSleep] = InterruptibleSleep(self._lock) # sleeper shares lock with reporter 

118 self._file_name_queue: set[str] = set() 

119 self._file_name_set: Final[set[str]] = set() 

120 self._is_stopping: bool = False 

121 self._states: list[State] = [State.IS_RESETTING] 

122 

123 def start(self) -> None: 

124 """Starts the monitoring thread and begins asynchronous parsing of ``pv`` log files.""" 

125 with self._lock: 

126 assert self._thread is None 

127 self._thread = threading.Thread(target=lambda: self._run(), name="progress_reporter", daemon=True) 

128 self._thread.start() 

129 

130 def stop(self) -> None: 

131 """Blocks until reporter is stopped, then reraises any exception that may have happened during log processing.""" 

132 with self._lock: 

133 self._is_stopping = True 

134 self._sleeper.interrupt() 

135 t = self._thread 

136 if t is not None: 

137 t.join() 

138 e = self._exception 

139 if e is not None: 

140 raise e # reraise exception in current thread 

141 

142 def pause(self) -> None: 

143 """Temporarily suspends status logging.""" 

144 self._append_state(State.IS_PAUSING) 

145 

146 def reset(self) -> None: 

147 """Clears metrics before processing a new batch of logs; the purpose is to discard previous totals to avoid mixing 

148 unrelated transfers.""" 

149 self._append_state(State.IS_RESETTING) 

150 

151 def _append_state(self, state: State) -> None: 

152 with self._lock: 

153 states: list[State] = self._states 

154 if len(states) > 0 and states[-1] is state: 

155 return # same state twice in a row is a no-op 

156 states.append(state) 

157 if len(states) >= 3: 

158 del states[0] # cap time and memory consumption by removing redundant state transitions 

159 self._sleeper.interrupt() 

160 

161 def enqueue_pv_log_file(self, pv_log_file: str) -> None: 

162 """Tells progress reporter thread to also monitor and tail the given pv log file.""" 

163 with self._lock: 

164 if pv_log_file not in self._file_name_set: 

165 self._file_name_queue.add(pv_log_file) 

166 

167 def _run(self) -> None: 

168 """Thread entry point consuming pv logs and updating metrics.""" 

169 log = self._log 

170 try: 

171 fds: list[IO[Any]] = [] 

172 try: 

173 selector = selectors.SelectSelector() if self._use_select else selectors.PollSelector() 

174 try: 

175 self._run_internal(fds, selector) 

176 finally: 

177 selector.close() 

178 finally: 

179 for fd in fds: 

180 fd.close() 

181 except BaseException as e: 

182 if not isinstance(e, BrokenPipeError): 182 ↛ exitline 182 didn't return from function '_run' because the condition on line 182 was always true

183 self._exception = e # will be reraised in stop() 

184 log.exception("%s", "ProgressReporter:") 

185 

186 @dataclass 

187 class TransferStat: 

188 """Tracks per-file transfer state and ETA.""" 

189 

190 @dataclass(order=True) 

191 class ETA: 

192 """Estimated time of arrival.""" 

193 

194 timestamp_nanos: int # sorted by future time at which current zfs send/recv transfer is estimated to complete 

195 seq_nr: int # tiebreaker wrt. sort order 

196 line_tail: str = field(compare=False) # trailing pv log line part w/ progress bar, duration ETA, timestamp ETA 

197 

198 bytes_in_flight: int 

199 eta: ETA 

200 

201 def _run_internal(self, fds: list[IO[Any]], selector: selectors.BaseSelector) -> None: 

202 """Tails pv log files and periodically logs aggregated progress.""" 

203 

204 class Sample(NamedTuple): 

205 """Sliding window entry for throughput calculation.""" 

206 

207 sent_bytes: int 

208 timestamp_nanos: int 

209 

210 log = self._log 

211 update_interval_secs, sliding_window_secs = ( 

212 self._progress_update_intervals if self._progress_update_intervals is not None else self._get_update_intervals() 

213 ) 

214 update_interval_nanos: int = round(update_interval_secs * 1_000_000_000) 

215 sliding_window_nanos: int = round(sliding_window_secs * 1_000_000_000) 

216 sleep_nanos: int = 0 

217 etas: list[ProgressReporter.TransferStat.ETA] = [] 

218 while True: 

219 empty_file_name_queue: set[str] = set() 

220 empty_states: list[State] = [] 

221 with self._lock: 

222 if self._is_stopping: 

223 return 

224 # progress reporter thread picks up pv log files that so far aren't being tailed 

225 n = len(self._file_name_queue) 

226 m = len(self._file_name_set) 

227 self._file_name_set.update(self._file_name_queue) # union 

228 assert len(self._file_name_set) == n + m # aka assert (previous) file_name_set.isdisjoint(file_name_queue) 

229 local_file_name_queue: set[str] = self._file_name_queue 

230 self._file_name_queue = empty_file_name_queue # exchange buffers 

231 states: list[State] = self._states 

232 self._states = empty_states # exchange buffers 

233 for state in states: 

234 if state is State.IS_PAUSING: 

235 next_update_nanos: int = time.monotonic_ns() + 10 * 365 * 86400 * 1_000_000_000 # infinity 

236 sleep_nanos = next_update_nanos 

237 else: 

238 assert state is State.IS_RESETTING 

239 sent_bytes, last_status_len = 0, 0 

240 num_lines, num_readables = 0, 0 

241 start_time_nanos = time.monotonic_ns() 

242 next_update_nanos = start_time_nanos + update_interval_nanos 

243 sleep_nanos = round(update_interval_nanos / 2.5) 

244 latest_samples: deque[Sample] = deque([Sample(0, start_time_nanos)]) # sliding window w/ recent measures 

245 for pv_log_file in local_file_name_queue: 

246 try: 

247 Path(pv_log_file).touch(mode=FILE_PERMISSIONS) 

248 fd = open_nofollow(pv_log_file, mode="r", newline="", encoding="utf-8") 

249 except FileNotFoundError: # a third party has somehow deleted the log file or directory 

250 with self._lock: 

251 self._file_name_set.discard(pv_log_file) # enable re-adding the file later via enqueue_pv_log_file() 

252 log.warning("ProgressReporter: pv log file disappeared before initial open, skipping: %s", pv_log_file) 

253 continue # skip to the next file in the queue 

254 fds.append(fd) 

255 eta = self.TransferStat.ETA(timestamp_nanos=0, seq_nr=-len(fds), line_tail="") 

256 selector.register(fd, selectors.EVENT_READ, data=(iter(fd), self.TransferStat(bytes_in_flight=0, eta=eta))) 

257 etas.append(eta) 

258 readables: list[tuple[selectors.SelectorKey, int]] = selector.select(timeout=0) # 0 indicates "don't block" 

259 has_line: bool = False 

260 curr_time_nanos: int = time.monotonic_ns() 

261 for selector_key, _ in readables: # for each file that's ready for non-blocking read 

262 num_readables += 1 

263 iter_fd, transfer_stat = selector_key.data 

264 for line in iter_fd: # aka iter(fd) 

265 sent_bytes += self._update_transfer_stat(line, transfer_stat, curr_time_nanos) 

266 num_lines += 1 

267 has_line = True 

268 if curr_time_nanos >= next_update_nanos: 

269 elapsed_nanos: int = curr_time_nanos - start_time_nanos 

270 msg0, msg3 = self._format_sent_bytes(sent_bytes, elapsed_nanos) # throughput etc since replication starttime 

271 msg1: str = self._format_duration(elapsed_nanos) # duration since replication start time 

272 oldest: Sample = latest_samples[0] # throughput etc, over sliding window 

273 _, msg2 = self._format_sent_bytes(sent_bytes - oldest.sent_bytes, curr_time_nanos - oldest.timestamp_nanos) 

274 msg4: str = max(etas).line_tail if len(etas) > 0 else "" # progress bar, ETAs 

275 timestamp: str = datetime.now().isoformat(sep=" ", timespec="seconds") # 2024-09-03 12:26:15 

276 status_line: str = f"{timestamp} [I] zfs sent {msg0} {msg1} {msg2} {msg3} {msg4}" 

277 status_line = status_line.ljust(last_status_len) # "overwrite" trailing chars of previous status with spaces 

278 

279 # The Unix console skips back to the beginning of the console line when it sees this \r control char: 

280 sys.stdout.write(f"{status_line}\r") 

281 sys.stdout.flush() 

282 

283 # log.log(log_trace, "\nnum_lines: %s, num_readables: %s", num_lines, num_readables) 

284 last_status_len = len(status_line.rstrip()) 

285 next_update_nanos += update_interval_nanos 

286 latest_samples.append(Sample(sent_bytes, curr_time_nanos)) 

287 if elapsed_nanos >= sliding_window_nanos: 

288 latest_samples.popleft() # slide the sliding window containing recent measurements 

289 elif not has_line: 

290 # Avoid burning CPU busily spinning on I/O readiness as fds are almost always ready for non-blocking read 

291 # even if no new pv log line has been written. Yet retain ability to wake up immediately on reporter.stop(). 

292 if self._sleeper.sleep(min(sleep_nanos, next_update_nanos - curr_time_nanos)): 

293 self._sleeper.reset() # sleep was interrupted; ensure we can sleep normally again 

294 if self._inject_error: 

295 raise ValueError("Injected ProgressReporter error") # for testing only 

296 

297 def _update_transfer_stat(self, line: str, s: TransferStat, curr_time_nanos: int) -> int: 

298 """Update ``s`` from one pv status line and return bytes delta.""" 

299 num_bytes, s.eta.timestamp_nanos, s.eta.line_tail = self._parse_pv_line(line, curr_time_nanos) 

300 bytes_in_flight: int = s.bytes_in_flight 

301 s.bytes_in_flight = num_bytes if line.endswith("\r") else 0 # intermediate vs. final status update of each transfer 

302 return num_bytes - bytes_in_flight 

303 

304 NO_RATES_REGEX = re.compile(r".*/s\s*[)\]]?\s*") # matches until end of last pv rate, e.g. "834MiB/s]" or "834MiB/s)" 

305 # time remaining --eta "ETA 00:00:39" or "ETA 2+0:00:39" or "ETA 2:0:00:39", followed by trailing --fineta timestamp ETA 

306 TIME_REMAINING_ETA_REGEX = re.compile(r".*?ETA\s*((\d+)[+:])?(\d\d?):(\d\d):(\d\d).*(ETA|FIN).*") 

307 

308 @staticmethod 

309 def _parse_pv_line(line: str, curr_time_nanos: int) -> tuple[int, int, str]: 

310 """Parses a pv status line into transferred bytes and ETA timestamp.""" 

311 assert isinstance(line, str) 

312 if ":" in line: 

313 line = line.split(":", 1)[1].strip() 

314 sent_bytes, line = _pv_size_to_bytes(line) 

315 line = ProgressReporter.NO_RATES_REGEX.sub("", line.lstrip(), count=1) # strip --timer, --rate, --avg-rate 

316 if match := ProgressReporter.TIME_REMAINING_ETA_REGEX.fullmatch(line): # extract pv --eta duration 

317 _, days, hours, minutes, secs, _ = match.groups() 

318 time_remaining_secs = (86400 * int(days) if days else 0) + int(hours) * 3600 + int(minutes) * 60 + int(secs) 

319 curr_time_nanos += time_remaining_secs * 1_000_000_000 # ETA timestamp = now + time remaining duration 

320 return sent_bytes, curr_time_nanos, line 

321 return 0, curr_time_nanos, "" 

322 

323 @staticmethod 

324 def _format_sent_bytes(num_bytes: int, duration_nanos: int) -> tuple[str, str]: 

325 """Returns a human-readable byte count and rate.""" 

326 bytes_per_sec: int = round(1_000_000_000 * num_bytes / max(1, duration_nanos)) 

327 return f"{human_readable_bytes(num_bytes, precision=2)}", f"[{human_readable_bytes(bytes_per_sec, precision=2)}/s]" 

328 

329 @staticmethod 

330 def _format_duration(duration_nanos: int) -> str: 

331 """Formats ``duration_nanos`` as HH:MM:SS string.""" 

332 total_seconds: int = duration_nanos // 1_000_000_000 

333 hours, remainder = divmod(total_seconds, 3600) 

334 minutes, seconds = divmod(remainder, 60) 

335 return f"{hours}:{minutes:02d}:{seconds:02d}" 

336 

337 def _get_update_intervals(self) -> tuple[float, float]: 

338 """Extracts polling intervals from ``pv_program_opts``.""" 

339 parser = argparse.ArgumentParser(allow_abbrev=False) 

340 parser.add_argument("--interval", "-i", type=float, default=1) 

341 parser.add_argument("--average-rate-window", "-m", type=float, default=30) 

342 args, _ = parser.parse_known_args(args=self._pv_program_opts) 

343 interval: float = min(60 * 60, max(args.interval, 0.1)) 

344 return interval, min(60 * 60, max(args.average_rate_window, interval)) 

345 

346 

347def _pv_size_to_bytes( 

348 size: str, 

349) -> tuple[int, str]: # example inputs: "800B", "4.12 KiB", "510 MiB", "510 MB", "4Gb", "2TiB" 

350 """Converts pv size string to bytes and returns remaining text.""" 

351 if match := PV_SIZE_TO_BYTES_REGEX.fullmatch(size): 

352 number: float = float(match.group(1).replace(",", ".").replace(ARABIC_DECIMAL_SEPARATOR, ".")) 

353 i: int = "KMGTPEZYRQ".index(match.group(2)) if match.group(2) else -1 

354 m: int = 1024 if match.group(3) == "i" else 1000 

355 b: int = 1 if match.group(4) == "B" else 8 

356 line_tail: str = match.group(5) 

357 if line_tail and line_tail.startswith("/s"): 

358 raise ValueError("Invalid pv_size: " + size) # stems from 'pv --rate' or 'pv --average-rate' 

359 size_in_bytes: int = round(number * (m ** (i + 1)) / b) 

360 return size_in_bytes, line_tail 

361 else: 

362 return 0, "" # skip partial or bad 'pv' log file line (pv process killed while writing?) 

363 

364 

365def count_num_bytes_transferred_by_zfs_send(basis_pv_log_file: str) -> int: 

366 """Scrapes the .pv log file(s) and sums up the 'pv --bytes' column.""" 

367 

368 def parse_pv_line(line: str) -> int: 

369 """Extracts byte count from a single pv log line.""" 

370 if ":" in line: 

371 col: str = line.split(":", 1)[1].strip() 

372 num_bytes, _ = _pv_size_to_bytes(col) 

373 return num_bytes 

374 return 0 

375 

376 total_bytes: int = 0 

377 files: list[str] = [basis_pv_log_file] + glob.glob(basis_pv_log_file + PV_FILE_THREAD_SEPARATOR + "[0-9]*") 

378 for file in files: 

379 if os.path.isfile(file): 

380 try: 

381 with open_nofollow(file, mode="r", newline="", encoding="utf-8") as fd: 

382 line: str | None = None 

383 for line in fd: 

384 assert line is not None 

385 if line.endswith("\r"): 

386 continue # skip all but the most recent status update of each transfer 

387 total_bytes += parse_pv_line(line) 

388 line = None 

389 if line is not None: 

390 total_bytes += parse_pv_line(line) # consume last line of file w/ intermediate status update, if any 

391 except FileNotFoundError: 

392 pass # harmless 

393 return total_bytes