Coverage for bzfs_main/compare_snapshot_lists.py: 100%

202 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-07 04:44 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Implementation of bzfs --compare-snapshot-lists.""" 

16 

17from __future__ import ( 

18 annotations, 

19) 

20import itertools 

21import os 

22import time 

23from collections import ( 

24 defaultdict, 

25) 

26from collections.abc import ( 

27 Iterable, 

28 Iterator, 

29 Sequence, 

30) 

31from dataclasses import ( 

32 dataclass, 

33 field, 

34) 

35from typing import ( 

36 TYPE_CHECKING, 

37 Callable, 

38) 

39 

40from bzfs_main.argparse_cli import ( 

41 CMP_CHOICES_ITEMS, 

42) 

43from bzfs_main.detect import ( 

44 are_bookmarks_enabled, 

45) 

46from bzfs_main.filter import ( 

47 filter_snapshots, 

48) 

49from bzfs_main.parallel_batch_cmd import ( 

50 zfs_list_snapshots_in_parallel, 

51) 

52from bzfs_main.parallel_iterator import ( 

53 run_in_parallel, 

54) 

55from bzfs_main.utils import ( 

56 DIR_PERMISSIONS, 

57 FILE_PERMISSIONS, 

58 Interner, 

59 T, 

60 human_readable_bytes, 

61 human_readable_duration, 

62 isotime_from_unixtime, 

63 open_nofollow, 

64 relativize_dataset, 

65) 

66 

67if TYPE_CHECKING: # pragma: no cover - for type hints only 

68 from bzfs_main.bzfs import ( 

69 Job, 

70 ) 

71 from bzfs_main.configuration import ( 

72 Remote, 

73 ) 

74 

75 

76@dataclass(order=True, frozen=True) 

77class _ComparableSnapshot: 

78 """Snapshot entry comparable by rel_dataset and GUID for sorting and merging.""" 

79 

80 key: tuple[str, str] # rel_dataset, guid 

81 cols: list[str] = field(compare=False) # excluded from comparison/equality checks 

82 

83 

84def run_compare_snapshot_lists(job: Job, src_datasets: list[str], dst_datasets: list[str]) -> None: 

85 """Compares source and destination dataset trees recursively with respect to snapshots, for example to check if all 

86 recently taken snapshots have been successfully replicated by a periodic job; implements --compare-snapshot-lists. 

87 

88 Lists snapshots only contained in source (tagged with 'src'), only contained in destination (tagged with 'dst'), and 

89 contained in both source and destination (tagged with 'all'), in the form of a TSV file, along with other snapshot 

90 metadata. 

91 

92 Implemented with a time and space efficient streaming algorithm; easily scales to millions of datasets and any number of 

93 snapshots. Time complexity is O(max(N log N, M log M)) where N is the number of datasets and M is the number of snapshots 

94 per dataset. Space complexity is O(max(N, M)). Assumes that both src_datasets and dst_datasets are sorted. 

95 """ 

96 p, log = job.params, job.params.log 

97 src, dst = p.src, p.dst 

98 task: str = src.root_dataset + " vs. " + dst.root_dataset 

99 tsv_dir: str = p.log_params.log_file[0 : -len(".log")] + ".cmp" 

100 os.makedirs(tsv_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

101 tsv_file: str = os.path.join(tsv_dir, (src.root_dataset + "%" + dst.root_dataset).replace("/", "~") + ".tsv") 

102 tmp_tsv_file: str = tsv_file + ".tmp" 

103 compare_snapshot_lists: set[str] = set(p.compare_snapshot_lists.split("+")) 

104 is_src_dst_all: bool = all(choice in compare_snapshot_lists for choice in CMP_CHOICES_ITEMS) 

105 all_src_dst: list[str] = [loc for loc in ("all", "src", "dst") if loc in compare_snapshot_lists] 

106 is_first_row: bool = True 

107 now: int | None = None 

108 

109 def zfs_list_snapshot_iterator(r: Remote, sorted_datasets: list[str]) -> Iterator[str]: 

110 """Lists snapshots sorted by dataset name; All snapshots of a given dataset will be adjacent.""" 

111 assert (not job.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted" 

112 # Also see https://openzfs.github.io/openzfs-docs/man/master/7/zfsprops.7.html#written 

113 props: str = job.creation_prefix + "creation,guid,createtxg,written,name" 

114 types: str = "snapshot" 

115 if p.use_bookmark and r.location == "src" and are_bookmarks_enabled(p, r): 

116 types = "snapshot,bookmark" # output list ordering: intentionally makes bookmarks appear *after* snapshots 

117 cmd: list[str] = p.split_args(f"{p.zfs_program} list -t {types} -d 1 -Hp -o {props}") # sorted by dataset, createtxg 

118 for lines in zfs_list_snapshots_in_parallel(job, r, cmd, sorted_datasets): 

119 yield from lines 

120 

121 def snapshot_iterator(root_dataset: str, sorted_itr: Iterator[str]) -> Iterator[_ComparableSnapshot]: 

122 """Splits/groups snapshot stream into distinct datasets, sorts by GUID within a dataset such that any two snapshots 

123 with the same GUID will lie adjacent to each other during the upcoming phase that merges src snapshots and dst 

124 snapshots.""" 

125 # streaming group by dataset name (consumes constant memory only) 

126 for dataset, group in itertools.groupby( 

127 sorted_itr, key=lambda line: line.rsplit("\t", 1)[1].replace("#", "@", 1).split("@", 1)[0] 

128 ): 

129 snapshots: list[str] = list(group) # fetch all snapshots of current dataset, e.g. dataset=tank1/src/foo 

130 snapshots = filter_snapshots(job, snapshots, filter_bookmarks=True) # apply include/exclude policy 

131 snapshots.sort(key=lambda line: line.split("\t", 2)[1]) # stable sort by GUID (2nd remains createtxg) 

132 rel_dataset: str = relativize_dataset(dataset, root_dataset) # rel_dataset=/foo, root_dataset=tank1/src 

133 last_guid: str = "" 

134 for line in snapshots: 

135 cols = line.split("\t") 

136 _creation, guid, _createtxg, _written, snapshot_name = cols 

137 if guid == last_guid: 

138 assert "#" in snapshot_name 

139 continue # ignore bookmarks whose snapshot still exists. also ignore dupes of bookmarks 

140 last_guid = guid 

141 key = (rel_dataset, guid) # ensures src snapshots and dst snapshots with the same GUID will be adjacent 

142 yield _ComparableSnapshot(key, cols) 

143 

144 def print_dataset(rel_dataset: str, entries: Iterable[tuple[str, _ComparableSnapshot]]) -> None: 

145 entries = sorted( # fetch all snapshots of current dataset and sort em by creation, createtxg, snapshot_tag 

146 entries, 

147 key=lambda entry: ( 

148 int((cols := entry[1].cols)[0]), # creation 

149 int(cols[2]), # createtxg 

150 cols[-1].replace("#", "@", 1).split("@", 1)[1], # snapshot_tag 

151 ), 

152 ) 

153 

154 @dataclass 

155 class SnapshotStats: 

156 snapshot_count: int = field(default=0) 

157 sum_written: int = field(default=0) 

158 snapshot_count_since: int = field(default=0) 

159 sum_written_since: int = field(default=0) 

160 latest_snapshot_idx: int | None = field(default=None) 

161 latest_snapshot_row_str: str | None = field(default=None) 

162 latest_snapshot_creation: str | None = field(default=None) 

163 oldest_snapshot_row_str: str | None = field(default=None) 

164 oldest_snapshot_creation: str | None = field(default=None) 

165 

166 # print metadata of snapshots of current dataset to TSV file; custom stats can later be computed from there 

167 stats: defaultdict[str, SnapshotStats] = defaultdict(SnapshotStats) 

168 header: str = "location creation_iso createtxg rel_name guid root_dataset rel_dataset name creation written" 

169 nonlocal is_first_row 

170 if is_first_row: 

171 fd.write(header.replace(" ", "\t") + "\n") 

172 is_first_row = False 

173 for i, entry in enumerate(entries): # entry is tuple[location:str, ComparableSnapshot] 

174 location: str = entry[0] # "src" or "dst" or "all" 

175 creation, guid, createtxg, written, name = entry[1].cols 

176 root_dataset: str = dst.root_dataset if location == CMP_CHOICES_ITEMS[1] else src.root_dataset 

177 rel_name: str = relativize_dataset(name, root_dataset) 

178 creation_iso: str = isotime_from_unixtime(int(creation)) 

179 row = (location, creation_iso, createtxg, rel_name, guid, root_dataset, rel_dataset, name, creation, written) 

180 # Example: src 2024-11-06_08:30:05 17435050 /foo@test_2024-11-06_08:30:05_daily 2406491805272097867 tank1/src /foo tank1/src/foo@test_2024-10-06_08:30:04_daily 1730878205 24576 

181 row_str = "\t".join(row) 

182 if not p.dry_run: 

183 fd.write(row_str + "\n") 

184 s = stats[location] 

185 s.snapshot_count += 1 

186 s.sum_written += int(written) if written != "-" else 0 

187 s.latest_snapshot_idx = i 

188 s.latest_snapshot_row_str = row_str 

189 s.latest_snapshot_creation = creation 

190 if not s.oldest_snapshot_row_str: 

191 s.oldest_snapshot_row_str = row_str 

192 s.oldest_snapshot_creation = creation 

193 

194 # for convenience, directly log basic summary stats of current dataset 

195 k = stats["all"].latest_snapshot_idx # defaults to None 

196 k = k if k is not None else -1 

197 for entry in entries[k + 1 :]: # aggregate basic stats since latest common snapshot 

198 location = entry[0] 

199 creation, guid, createtxg, written, name = entry[1].cols 

200 s = stats[location] 

201 s.snapshot_count_since += 1 

202 s.sum_written_since += int(written) if written != "-" else 0 

203 prefix: str = f"Comparing {rel_dataset}~" 

204 msgs: list[str] = [] 

205 msgs.append(f"{prefix} of {task}") 

206 msgs.append( 

207 f"{prefix} Q: No src snapshots are missing on dst, and no dst snapshots are missing on src, " 

208 "and there is a common snapshot? A: " 

209 + ( 

210 "n/a" 

211 if not is_src_dst_all 

212 else str( 

213 stats["src"].snapshot_count == 0 and stats["dst"].snapshot_count == 0 and stats["all"].snapshot_count > 0 

214 ) 

215 ) 

216 ) 

217 nonlocal now 

218 now = now or round(time.time()) # uses the same timestamp across the entire dataset tree 

219 latcom = "latest common snapshot" 

220 for loc in all_src_dst: 

221 s = stats[loc] 

222 msgs.append(f"{prefix} Latest snapshot only in {loc}: {s.latest_snapshot_row_str or 'n/a'}") 

223 msgs.append(f"{prefix} Oldest snapshot only in {loc}: {s.oldest_snapshot_row_str or 'n/a'}") 

224 msgs.append(f"{prefix} Snapshots only in {loc}: {s.snapshot_count}") 

225 msgs.append(f"{prefix} Snapshot data written only in {loc}: {human_readable_bytes(s.sum_written)}") 

226 if loc != "all": 

227 na = None if k >= 0 else "n/a" 

228 msgs.append(f"{prefix} Snapshots only in {loc} since {latcom}: {na or s.snapshot_count_since}") 

229 msgs.append( 

230 f"{prefix} Snapshot data written only in {loc} since {latcom}: " 

231 f"{na or human_readable_bytes(s.sum_written_since)}" 

232 ) 

233 all_creation = stats["all"].latest_snapshot_creation 

234 latest = ("latest", s.latest_snapshot_creation) 

235 oldest = ("oldest", s.oldest_snapshot_creation) 

236 for label, s_creation in latest, oldest: 

237 if loc != "all": 

238 hd = "n/a" 

239 if s_creation and k >= 0: 

240 assert all_creation is not None 

241 hd = human_readable_duration(int(all_creation) - int(s_creation), unit="s") 

242 msgs.append(f"{prefix} Time diff between {latcom} and {label} snapshot only in {loc}: {hd}") 

243 for label, s_creation in latest, oldest: 

244 hd = "n/a" if not s_creation else human_readable_duration(now - int(s_creation), unit="s") 

245 msgs.append(f"{prefix} Time diff between now and {label} snapshot only in {loc}: {hd}") 

246 log.info("%s", "\n".join(msgs)) 

247 

248 # setup streaming pipeline 

249 src_snapshot_itr: Iterator = snapshot_iterator(src.root_dataset, zfs_list_snapshot_iterator(src, src_datasets)) 

250 dst_snapshot_itr: Iterator = snapshot_iterator(dst.root_dataset, zfs_list_snapshot_iterator(dst, dst_datasets)) 

251 merge_itr = _merge_sorted_iterators(CMP_CHOICES_ITEMS, p.compare_snapshot_lists, src_snapshot_itr, dst_snapshot_itr) 

252 

253 interner: Interner[str] = Interner() # reduces memory footprint 

254 rel_datasets: dict[str, set[str]] = defaultdict(set) 

255 for datasets, remote in (src_datasets, src), (dst_datasets, dst): 

256 for dataset in datasets: # rel_dataset=/foo, root_dataset=tank1/src 

257 rel_datasets[remote.location].add(interner.intern(relativize_dataset(dataset, remote.root_dataset))) 

258 rel_src_or_dst: list[str] = sorted(rel_datasets["src"].union(rel_datasets["dst"])) 

259 

260 log.debug("%s", f"Temporary TSV output file comparing {task} is: {tmp_tsv_file}") 

261 with open_nofollow(tmp_tsv_file, "w", encoding="utf-8", perm=FILE_PERMISSIONS) as fd: 

262 # streaming group by rel_dataset (consumes constant memory only); entry is a Tuple[str, ComparableSnapshot] 

263 groups = itertools.groupby(merge_itr, key=lambda entry: entry[1].key[0]) 

264 _print_datasets(groups, lambda rel_ds, entries: print_dataset(rel_ds, entries), rel_src_or_dst) 

265 os.rename(tmp_tsv_file, tsv_file) 

266 log.info("%s", f"Final TSV output file comparing {task} is: {tsv_file}") 

267 

268 tsv_file = tsv_file[0 : tsv_file.rindex(".")] + ".rel_datasets_tsv" 

269 tmp_tsv_file = tsv_file + ".tmp" 

270 with open_nofollow(tmp_tsv_file, "w", encoding="utf-8", perm=FILE_PERMISSIONS) as fd: 

271 header: str = "location rel_dataset src_dataset dst_dataset" 

272 fd.write(header.replace(" ", "\t") + "\n") 

273 src_only: set[str] = rel_datasets["src"].difference(rel_datasets["dst"]) 

274 dst_only: set[str] = rel_datasets["dst"].difference(rel_datasets["src"]) 

275 for rel_dataset in rel_src_or_dst: 

276 loc = "src" if rel_dataset in src_only else "dst" if rel_dataset in dst_only else "all" 

277 src_dataset = src.root_dataset + rel_dataset if rel_dataset not in dst_only else "" 

278 dst_dataset = dst.root_dataset + rel_dataset if rel_dataset not in src_only else "" 

279 row = (loc, rel_dataset, src_dataset, dst_dataset) # Example: all /foo/bar tank1/src/foo/bar tank2/dst/foo/bar 

280 if not p.dry_run: 

281 fd.write("\t".join(row) + "\n") 

282 os.rename(tmp_tsv_file, tsv_file) 

283 

284 

285def _print_datasets(groups: itertools.groupby, fn: Callable[[str, Iterable], None], rel_datasets: Iterable[str]) -> None: 

286 """Iterate over grouped datasets and apply fn, adding gaps for missing ones.""" 

287 rel_datasets = sorted(rel_datasets) 

288 n = len(rel_datasets) 

289 i = 0 

290 for rel_dataset, entries in groups: 

291 while i < n and rel_datasets[i] < rel_dataset: 

292 fn(rel_datasets[i], []) # Also print summary stats for datasets whose snapshot stream is empty 

293 i += 1 

294 assert i >= n or rel_datasets[i] == rel_dataset 

295 i += 1 

296 fn(rel_dataset, entries) 

297 while i < n: 

298 fn(rel_datasets[i], []) # Also print summary stats for datasets whose snapshot stream is empty 

299 i += 1 

300 

301 

302def _merge_sorted_iterators( 

303 choices: Sequence[str], # ["src", "dst", "all"] 

304 choice: str, # Example: "src+dst+all" 

305 src_itr: Iterator[T], 

306 dst_itr: Iterator[T], 

307) -> Iterator[tuple[str, T] | tuple[str, T, T]]: 

308 """The typical pipelined merge algorithm of a merge sort, slightly adapted to our specific use case.""" 

309 assert len(choices) == 3 

310 assert choice 

311 flags: int = 0 

312 for i, item in enumerate(choices): 

313 if item in choice: 

314 flags |= 1 << i 

315 src_next, dst_next = run_in_parallel(lambda: next(src_itr, None), lambda: next(dst_itr, None)) 

316 while not (src_next is None and dst_next is None): 

317 if src_next == dst_next: 

318 n = 2 

319 if (flags & (1 << n)) != 0: 

320 assert src_next is not None 

321 assert dst_next is not None 

322 yield choices[n], src_next, dst_next 

323 src_next = next(src_itr, None) 

324 dst_next = next(dst_itr, None) 

325 elif src_next is None or (dst_next is not None and dst_next < src_next): 

326 n = 1 

327 if (flags & (1 << n)) != 0: 

328 assert dst_next is not None 

329 yield choices[n], dst_next 

330 dst_next = next(dst_itr, None) 

331 else: 

332 n = 0 

333 if (flags & (1 << n)) != 0: 

334 yield choices[n], src_next 

335 src_next = next(src_itr, None)