Coverage for bzfs_main / progress_reporter.py: 99%

257 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-22 08:03 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Thread-based progress monitor for `pv` output during data transfers. 

16 

17It tails the log files produced by (parallel) ``pv`` processes and periodically prints a single status line showing 

18aggregated throughput, ETA, etc. The reporter runs in a separate daemon thread to avoid blocking replication. All methods 

19are designed for minimal synchronization overhead. 

20""" 

21 

22from __future__ import ( 

23 annotations, 

24) 

25import argparse 

26import glob 

27import os 

28import re 

29import selectors 

30import threading 

31import time 

32from collections import ( 

33 deque, 

34) 

35from dataclasses import ( 

36 dataclass, 

37 field, 

38) 

39from datetime import ( 

40 datetime, 

41) 

42from enum import ( 

43 Enum, 

44 auto, 

45) 

46from logging import ( 

47 Logger, 

48) 

49from pathlib import ( 

50 Path, 

51) 

52from typing import ( 

53 IO, 

54 Any, 

55 Final, 

56 NamedTuple, 

57 final, 

58) 

59 

60from bzfs_main.util.utils import ( 

61 FILE_PERMISSIONS, 

62 LOG_STDOUT, 

63 InterruptibleSleep, 

64 human_readable_bytes, 

65 open_nofollow, 

66) 

67 

68# constants 

69PV_FILE_THREAD_SEPARATOR: Final[str] = "_" 

70_ARABIC_DECIMAL_SEPARATOR: Final[str] = "\u066b" # "٫" # noqa: RUF003 

71_PV_SIZE_TO_BYTES_REGEX: Final[re.Pattern[str]] = re.compile( 

72 rf"(\d+[.,{_ARABIC_DECIMAL_SEPARATOR}]?\d*)\s*([KMGTPEZYRQ]?)(i?)([Bb])(.*)" 

73) 

74 

75 

76############################################################################# 

77@final 

78class State(Enum): 

79 """Progress reporter lifecycle state transitions.""" 

80 

81 IS_PAUSING = auto() 

82 IS_RESETTING = auto() 

83 

84 

85############################################################################# 

86@final 

87class ProgressReporter: 

88 """Periodically prints progress updates to the same console status line, which is helpful if the program runs in an 

89 interactive Unix terminal session. 

90 

91 Tails the 'pv' output log files that are being written to by (parallel) replication, 

92 and extracts aggregate progress and throughput metrics from them, such as MB, MB/s, ETA, etc. Periodically prints these 

93 metrics to the console status line, and in doing so "visually overwrites" the previous status line, via appending a \r 

94 carriage return control char rather than a \n newline char. 

95 Example console status line: 

96 2025-01-17 01:23:04 [I] zfs sent 41.7 GiB 0:00:46 [963 MiB/s] [907 MiB/s] [==========> ] 80% ETA 0:00:04 ETA 01:23:08 

97 """ 

98 

99 def __init__( 

100 self, 

101 log: Logger, 

102 pv_program_opts: list[str], 

103 use_select: bool, 

104 progress_update_intervals: tuple[float, float] | None, 

105 fail: bool = False, 

106 ) -> None: 

107 """Creates a reporter configured for ``pv`` log parsing.""" 

108 # immutable variables: 

109 self._log: Final[Logger] = log 

110 self._pv_program_opts: Final[list[str]] = pv_program_opts 

111 self._use_select: Final[bool] = use_select 

112 self._progress_update_intervals: Final[tuple[float, float] | None] = progress_update_intervals 

113 self._inject_error: bool = fail # for testing only 

114 

115 # mutable variables: 

116 self._thread: threading.Thread | None = None 

117 self._exception: BaseException | None = None 

118 self._lock: Final[threading.Lock] = threading.Lock() 

119 self._sleeper: Final[InterruptibleSleep] = InterruptibleSleep(self._lock) # sleeper shares lock with reporter 

120 self._file_name_queue: set[str] = set() 

121 self._file_name_set: Final[set[str]] = set() 

122 self._is_stopping: bool = False 

123 self._states: list[State] = [State.IS_RESETTING] 

124 self._first_status_line_emitted: Final[threading.Event] = threading.Event() # for testing only 

125 

126 def start(self) -> None: 

127 """Starts the monitoring thread and begins asynchronous parsing of ``pv`` log files.""" 

128 with self._lock: 

129 assert self._thread is None 

130 self._thread = threading.Thread(target=lambda: self._run(), name="progress_reporter", daemon=True) 

131 self._thread.start() 

132 

133 def stop(self) -> None: 

134 """Blocks until reporter is stopped, then reraises any exception that may have happened during log processing.""" 

135 with self._lock: 

136 self._is_stopping = True 

137 self._sleeper.interrupt() 

138 t = self._thread 

139 if t is not None: 

140 t.join() 

141 e = self._exception 

142 if e is not None: 

143 raise e # reraise exception in current thread 

144 

145 def pause(self) -> None: 

146 """Temporarily suspends status logging.""" 

147 self._append_state(State.IS_PAUSING) 

148 

149 def reset(self) -> None: 

150 """Clears metrics before processing a new batch of logs; the purpose is to discard previous totals to avoid mixing 

151 unrelated transfers.""" 

152 self._append_state(State.IS_RESETTING) 

153 

154 def _append_state(self, state: State) -> None: 

155 with self._lock: 

156 states: list[State] = self._states 

157 if len(states) > 0 and states[-1] is state: 

158 return # same state twice in a row is a no-op 

159 states.append(state) 

160 if len(states) >= 3: 

161 del states[0] # cap time and memory consumption by removing redundant state transitions 

162 self._sleeper.interrupt() 

163 

164 def wait_for_first_status_line(self) -> None: 

165 """Blocks until at least the first status line has been emitted; for testing only.""" 

166 if not self._first_status_line_emitted.is_set(): 166 ↛ exitline 166 didn't return from function 'wait_for_first_status_line' because the condition on line 166 was always true

167 self._first_status_line_emitted.wait() 

168 

169 def enqueue_pv_log_file(self, pv_log_file: str) -> None: 

170 """Tells progress reporter thread to also monitor and tail the given pv log file.""" 

171 with self._lock: 

172 if pv_log_file not in self._file_name_set: 

173 self._file_name_queue.add(pv_log_file) 

174 

175 def _run(self) -> None: 

176 """Thread entry point consuming pv logs and updating metrics.""" 

177 log = self._log 

178 try: 

179 fds: list[IO[Any]] = [] 

180 try: 

181 selector = selectors.SelectSelector() if self._use_select else selectors.PollSelector() 

182 try: 

183 self._run_internal(fds, selector) 

184 finally: 

185 selector.close() 

186 finally: 

187 for fd in fds: 

188 fd.close() 

189 except BaseException as e: 

190 if not isinstance(e, BrokenPipeError): 190 ↛ exitline 190 didn't return from function '_run' because the condition on line 190 was always true

191 self._exception = e # will be reraised in stop() 

192 log.exception("%s", "ProgressReporter:") 

193 

194 @dataclass 

195 @final 

196 class TransferStat: 

197 """Tracks per-file transfer state and ETA.""" 

198 

199 @dataclass(order=True) 

200 @final 

201 class ETA: 

202 """Estimated time of arrival.""" 

203 

204 timestamp_nanos: int # sorted by future time at which current zfs send/recv transfer is estimated to complete 

205 seq_nr: int # tiebreaker wrt. sort order 

206 line_tail: str = field(compare=False) # trailing pv log line part w/ progress bar, duration ETA, timestamp ETA 

207 

208 bytes_in_flight: int 

209 eta: ETA 

210 

211 def _run_internal(self, fds: list[IO[Any]], selector: selectors.BaseSelector) -> None: 

212 """Tails pv log files and periodically logs aggregated progress.""" 

213 

214 @final 

215 class Sample(NamedTuple): 

216 """Sliding window entry for throughput calculation.""" 

217 

218 sent_bytes: int 

219 timestamp_nanos: int 

220 

221 log = self._log 

222 update_interval_secs, sliding_window_secs = ( 

223 self._progress_update_intervals if self._progress_update_intervals is not None else self._get_update_intervals() 

224 ) 

225 update_interval_nanos: int = round(update_interval_secs * 1_000_000_000) 

226 sliding_window_nanos: int = round(sliding_window_secs * 1_000_000_000) 

227 sleep_nanos: int = 0 

228 etas: list[ProgressReporter.TransferStat.ETA] = [] 

229 while True: 

230 empty_file_name_queue: set[str] = set() 

231 empty_states: list[State] = [] 

232 with self._lock: 

233 if self._is_stopping: 

234 return 

235 # progress reporter thread picks up pv log files that so far aren't being tailed 

236 n = len(self._file_name_queue) 

237 m = len(self._file_name_set) 

238 self._file_name_set.update(self._file_name_queue) # union 

239 assert len(self._file_name_set) == n + m # aka assert (previous) file_name_set.isdisjoint(file_name_queue) 

240 local_file_name_queue: set[str] = self._file_name_queue 

241 self._file_name_queue = empty_file_name_queue # exchange buffers 

242 states: list[State] = self._states 

243 self._states = empty_states # exchange buffers 

244 for state in states: 

245 if state is State.IS_PAUSING: 

246 next_update_nanos: int = time.monotonic_ns() + 10 * 365 * 86400 * 1_000_000_000 # infinity 

247 sleep_nanos = next_update_nanos 

248 else: 

249 assert state is State.IS_RESETTING 

250 sent_bytes, last_status_len = 0, 0 

251 num_lines, num_readables = 0, 0 

252 start_time_nanos = time.monotonic_ns() 

253 next_update_nanos = start_time_nanos + update_interval_nanos 

254 sleep_nanos = round(update_interval_nanos / 2.5) 

255 latest_samples: deque[Sample] = deque([Sample(0, start_time_nanos)]) # sliding window w/ recent measures 

256 for pv_log_file in local_file_name_queue: 

257 try: 

258 Path(pv_log_file).touch(mode=FILE_PERMISSIONS) 

259 fd = open_nofollow(pv_log_file, mode="r", newline="", encoding="utf-8") 

260 except FileNotFoundError: # a third party has somehow deleted the log file or directory 

261 with self._lock: 

262 self._file_name_set.discard(pv_log_file) # enable re-adding the file later via enqueue_pv_log_file() 

263 log.warning("ProgressReporter: pv log file disappeared before initial open, skipping: %s", pv_log_file) 

264 continue # skip to the next file in the queue 

265 fds.append(fd) 

266 eta = self.TransferStat.ETA(timestamp_nanos=0, seq_nr=-len(fds), line_tail="") 

267 selector.register(fd, selectors.EVENT_READ, data=(iter(fd), self.TransferStat(bytes_in_flight=0, eta=eta))) 

268 etas.append(eta) 

269 readables: list[tuple[selectors.SelectorKey, int]] = selector.select(timeout=0) # 0 indicates "don't block" 

270 has_line: bool = False 

271 curr_time_nanos: int = time.monotonic_ns() 

272 for selector_key, _ in readables: # for each file that's ready for non-blocking read 

273 num_readables += 1 

274 iter_fd, transfer_stat = selector_key.data 

275 for line in iter_fd: # aka iter(fd) 

276 sent_bytes += self._update_transfer_stat(line, transfer_stat, curr_time_nanos) 

277 num_lines += 1 

278 has_line = True 

279 if curr_time_nanos >= next_update_nanos: 

280 elapsed_nanos: int = curr_time_nanos - start_time_nanos 

281 msg0, msg3 = self._format_sent_bytes(sent_bytes, elapsed_nanos) # throughput etc since replication starttime 

282 msg1: str = self._format_duration(elapsed_nanos) # duration since replication start time 

283 oldest: Sample = latest_samples[0] # throughput etc, over sliding window 

284 _, msg2 = self._format_sent_bytes(sent_bytes - oldest.sent_bytes, curr_time_nanos - oldest.timestamp_nanos) 

285 msg4: str = max(etas).line_tail if len(etas) > 0 else "" # progress bar, ETAs 

286 timestamp: str = datetime.now().isoformat(sep=" ", timespec="seconds") # 2024-09-03 12:26:15 

287 status_line: str = f"{timestamp} [I] zfs sent {msg0} {msg1} {msg2} {msg3} {msg4}".rstrip() 

288 status_line = status_line.ljust(last_status_len) # "overwrite" trailing chars of previous status with spaces 

289 

290 # The Unix console skips back to the beginning of the console line when it sees this \r control char: 

291 log.log(LOG_STDOUT, "%s", status_line, extra={"terminator": "\r"}) 

292 

293 # log.log(log_trace, "\nnum_lines: %s, num_readables: %s", num_lines, num_readables) 

294 last_status_len = len(status_line.rstrip()) 

295 next_update_nanos += update_interval_nanos 

296 latest_samples.append(Sample(sent_bytes, curr_time_nanos)) 

297 if elapsed_nanos >= sliding_window_nanos: 

298 latest_samples.popleft() # slide the sliding window containing recent measurements 

299 if not self._first_status_line_emitted.is_set(): 

300 self._first_status_line_emitted.set() 

301 elif not has_line: 

302 # Avoid burning CPU busily spinning on I/O readiness as fds are almost always ready for non-blocking read 

303 # even if no new pv log line has been written. Yet retain ability to wake up immediately on reporter.stop(). 

304 if self._sleeper.sleep(min(sleep_nanos, next_update_nanos - curr_time_nanos)): 

305 self._sleeper.reset() # sleep was interrupted; ensure we can sleep normally again 

306 if self._inject_error: 

307 raise ValueError("Injected ProgressReporter error") # for testing only 

308 

309 def _update_transfer_stat(self, line: str, s: TransferStat, curr_time_nanos: int) -> int: 

310 """Update ``s`` from one pv status line and return bytes delta.""" 

311 num_bytes, s.eta.timestamp_nanos, s.eta.line_tail = self._parse_pv_line(line, curr_time_nanos) 

312 bytes_in_flight: int = s.bytes_in_flight 

313 s.bytes_in_flight = num_bytes if line.endswith("\r") else 0 # intermediate vs. final status update of each transfer 

314 return num_bytes - bytes_in_flight 

315 

316 NO_RATES_REGEX = re.compile(r".*/s\s*[)\]]?\s*") # matches until end of last pv rate, e.g. "834MiB/s]" or "834MiB/s)" 

317 # time remaining --eta "ETA 00:00:39" or "ETA 2+0:00:39" or "ETA 2:0:00:39", followed by trailing --fineta timestamp ETA 

318 TIME_REMAINING_ETA_REGEX = re.compile(r".*?ETA\s*((\d+)[+:])?(\d\d?):(\d\d):(\d\d).*(ETA|FIN).*") 

319 

320 @staticmethod 

321 def _parse_pv_line(line: str, curr_time_nanos: int) -> tuple[int, int, str]: 

322 """Parses a pv status line into transferred bytes and ETA timestamp.""" 

323 assert isinstance(line, str) 

324 if ":" in line: 

325 line = line.split(":", 1)[1].strip() 

326 sent_bytes, line = _pv_size_to_bytes(line) 

327 line = ProgressReporter.NO_RATES_REGEX.sub("", line.lstrip(), count=1) # strip --timer, --rate, --avg-rate 

328 if match := ProgressReporter.TIME_REMAINING_ETA_REGEX.fullmatch(line): # extract pv --eta duration 

329 _, days, hours, minutes, secs, _ = match.groups() 

330 time_remaining_secs = (86400 * int(days) if days else 0) + int(hours) * 3600 + int(minutes) * 60 + int(secs) 

331 curr_time_nanos += time_remaining_secs * 1_000_000_000 # ETA timestamp = now + time remaining duration 

332 return sent_bytes, curr_time_nanos, line 

333 return 0, curr_time_nanos, "" 

334 

335 @staticmethod 

336 def _format_sent_bytes(num_bytes: int, duration_nanos: int) -> tuple[str, str]: 

337 """Returns a human-readable byte count and rate.""" 

338 bytes_per_sec: int = round(1_000_000_000 * num_bytes / max(1, duration_nanos)) 

339 return f"{human_readable_bytes(num_bytes, precision=2)}", f"[{human_readable_bytes(bytes_per_sec, precision=2)}/s]" 

340 

341 @staticmethod 

342 def _format_duration(duration_nanos: int) -> str: 

343 """Formats ``duration_nanos`` as HH:MM:SS string.""" 

344 total_seconds: int = duration_nanos // 1_000_000_000 

345 hours, remainder = divmod(total_seconds, 3600) 

346 minutes, seconds = divmod(remainder, 60) 

347 return f"{hours}:{minutes:02d}:{seconds:02d}" 

348 

349 def _get_update_intervals(self) -> tuple[float, float]: 

350 """Extracts polling intervals from ``pv_program_opts``.""" 

351 parser = argparse.ArgumentParser(allow_abbrev=False) 

352 parser.add_argument("--interval", "-i", type=float, default=1) 

353 parser.add_argument("--average-rate-window", "-m", type=float, default=30) 

354 args, _ = parser.parse_known_args(args=self._pv_program_opts) 

355 interval: float = min(60 * 60, max(args.interval, 0.1)) 

356 return interval, min(60 * 60, max(args.average_rate_window, interval)) 

357 

358 

359def _pv_size_to_bytes( 

360 size: str, 

361) -> tuple[int, str]: # example inputs: "800B", "4.12 KiB", "510 MiB", "510 MB", "4Gb", "2TiB" 

362 """Converts pv size string to bytes and returns remaining text.""" 

363 if match := _PV_SIZE_TO_BYTES_REGEX.fullmatch(size): 

364 number: float = float(match.group(1).replace(",", ".").replace(_ARABIC_DECIMAL_SEPARATOR, ".")) 

365 i: int = "KMGTPEZYRQ".index(match.group(2)) if match.group(2) else -1 

366 m: int = 1024 if match.group(3) == "i" else 1000 

367 b: int = 1 if match.group(4) == "B" else 8 

368 line_tail: str = match.group(5) 

369 if line_tail and line_tail.startswith("/s"): 

370 raise ValueError("Invalid pv_size: " + size) # stems from 'pv --rate' or 'pv --average-rate' 

371 size_in_bytes: int = round(number * (m ** (i + 1)) / b) 

372 return size_in_bytes, line_tail 

373 else: 

374 return 0, "" # skip partial or bad 'pv' log file line (pv process killed while writing?) 

375 

376 

377def count_num_bytes_transferred_by_zfs_send(basis_pv_log_file: str) -> int: 

378 """Scrapes the .pv log file(s) and sums up the 'pv --bytes' column.""" 

379 

380 def parse_pv_line(line: str) -> int: 

381 """Extracts byte count from a single pv log line.""" 

382 if ":" in line: 

383 col: str = line.split(":", 1)[1].strip() 

384 num_bytes, _ = _pv_size_to_bytes(col) 

385 return num_bytes 

386 return 0 

387 

388 total_bytes: int = 0 

389 files: list[str] = [basis_pv_log_file] + glob.glob(glob.escape(basis_pv_log_file + PV_FILE_THREAD_SEPARATOR) + "[0-9]*") 

390 for file in files: 

391 if os.path.isfile(file): 

392 try: 

393 with open_nofollow(file, mode="r", newline="", encoding="utf-8") as fd: 

394 line: str | None = None 

395 for line in fd: 

396 assert line is not None 

397 if line.endswith("\r"): 

398 continue # skip all but the most recent status update of each transfer 

399 total_bytes += parse_pv_line(line) 

400 line = None 

401 if line is not None: 

402 total_bytes += parse_pv_line(line) # consume last line of file w/ intermediate status update, if any 

403 except FileNotFoundError: 

404 pass # harmless 

405 return total_bytes