Coverage for bzfs_main / compare_snapshot_lists.py: 100%

204 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-22 08:03 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Implementation of bzfs --compare-snapshot-lists.""" 

16 

17from __future__ import ( 

18 annotations, 

19) 

20import itertools 

21import os 

22import time 

23from collections import ( 

24 defaultdict, 

25) 

26from collections.abc import ( 

27 Iterable, 

28 Iterator, 

29 Sequence, 

30) 

31from dataclasses import ( 

32 dataclass, 

33 field, 

34) 

35from typing import ( 

36 TYPE_CHECKING, 

37 Callable, 

38 final, 

39) 

40 

41from bzfs_main.argparse_cli import ( 

42 CMP_CHOICES_ITEMS, 

43) 

44from bzfs_main.detect import ( 

45 are_bookmarks_enabled, 

46) 

47from bzfs_main.filter import ( 

48 filter_snapshots, 

49) 

50from bzfs_main.parallel_batch_cmd import ( 

51 zfs_list_snapshots_in_parallel, 

52) 

53from bzfs_main.util.parallel_iterator import ( 

54 run_in_parallel, 

55) 

56from bzfs_main.util.utils import ( 

57 DIR_PERMISSIONS, 

58 FILE_PERMISSIONS, 

59 HashedInterner, 

60 human_readable_bytes, 

61 human_readable_duration, 

62 isotime_from_unixtime, 

63 open_nofollow, 

64 relativize_dataset, 

65) 

66 

67if TYPE_CHECKING: # pragma: no cover - for type hints only 

68 from bzfs_main.bzfs import ( 

69 Job, 

70 ) 

71 from bzfs_main.configuration import ( 

72 Remote, 

73 ) 

74 

75 

76@dataclass(order=True, frozen=True) 

77@final 

78class _ComparableSnapshot: 

79 """Snapshot entry comparable by rel_dataset and GUID for sorting and merging.""" 

80 

81 key: tuple[str, str] # rel_dataset, guid 

82 cols: list[str] = field(compare=False) # excluded from comparison/equality checks 

83 

84 

85def run_compare_snapshot_lists(job: Job, src_datasets: list[str], dst_datasets: list[str]) -> None: 

86 """Compares source and destination dataset trees recursively with respect to snapshots, for example to check if all 

87 recently taken snapshots have been successfully replicated by a periodic job; implements --compare-snapshot-lists. 

88 

89 Lists snapshots only contained in source (tagged with 'src'), only contained in destination (tagged with 'dst'), and 

90 contained in both source and destination (tagged with 'all'), in the form of a TSV file, along with other snapshot 

91 metadata. 

92 

93 Implemented with a time and space efficient streaming algorithm; easily scales to millions of datasets and any number of 

94 snapshots. Time complexity is O((N log N) + (N * M log M)) where N is the number of datasets and M is the number of 

95 snapshots per dataset. Space complexity is O(max(N, M)). Assumes that both src_datasets and dst_datasets are sorted. 

96 """ 

97 p, log = job.params, job.params.log 

98 src, dst = p.src, p.dst 

99 task: str = src.root_dataset + " vs. " + dst.root_dataset 

100 tsv_dir: str = p.log_params.log_file[0 : -len(".log")] + ".cmp" 

101 os.makedirs(tsv_dir, mode=DIR_PERMISSIONS, exist_ok=True) 

102 tsv_file: str = os.path.join(tsv_dir, (src.root_dataset + "%" + dst.root_dataset).replace("/", "~") + ".tsv") 

103 tmp_tsv_file: str = tsv_file + ".tmp" 

104 compare_snapshot_lists: set[str] = set(p.compare_snapshot_lists.split("+")) 

105 is_src_dst_all: bool = all(choice in compare_snapshot_lists for choice in CMP_CHOICES_ITEMS) 

106 all_src_dst: list[str] = [loc for loc in ("all", "src", "dst") if loc in compare_snapshot_lists] 

107 is_first_row: bool = True 

108 now: int | None = None 

109 

110 def zfs_list_snapshot_iterator(r: Remote, sorted_datasets: list[str]) -> Iterator[str]: 

111 """Lists snapshots sorted by dataset name; All snapshots of a given dataset will be adjacent.""" 

112 assert (not job.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted" 

113 # Also see https://openzfs.github.io/openzfs-docs/man/master/7/zfsprops.7.html#written 

114 props: str = job.creation_prefix + "creation,guid,createtxg,written,name" 

115 types: str = "snapshot" 

116 if p.use_bookmark and r.location == "src" and are_bookmarks_enabled(p, r): 

117 types = "snapshot,bookmark" # output list ordering: intentionally makes bookmarks appear *after* snapshots 

118 cmd: list[str] = p.split_args(f"{p.zfs_program} list -t {types} -d 1 -Hp -o {props}") # sorted by dataset, createtxg 

119 for lines in zfs_list_snapshots_in_parallel(job, r, cmd, sorted_datasets): 

120 yield from lines 

121 

122 def snapshot_iterator(root_dataset: str, sorted_itr: Iterator[str]) -> Iterator[_ComparableSnapshot]: 

123 """Splits/groups snapshot stream into distinct datasets, sorts by GUID within a dataset such that any two snapshots 

124 with the same GUID will lie adjacent to each other during the upcoming phase that merges src snapshots and dst 

125 snapshots.""" 

126 # streaming group by dataset name (consumes constant memory only) 

127 for dataset, group in itertools.groupby( 

128 sorted_itr, key=lambda line: line.rsplit("\t", 1)[1].replace("#", "@", 1).split("@", 1)[0] 

129 ): 

130 snapshots: list[str] = list(group) # fetch all snapshots of current dataset, e.g. dataset=tank1/src/foo 

131 snapshots = filter_snapshots(job, snapshots, filter_bookmarks=True) # apply include/exclude policy 

132 snapshots.sort(key=lambda line: line.split("\t", 2)[1]) # stable sort by GUID (2nd remains createtxg) 

133 rel_dataset: str = relativize_dataset(dataset, root_dataset) # rel_dataset=/foo, root_dataset=tank1/src 

134 last_guid: str = "" 

135 for line in snapshots: 

136 cols = line.split("\t") 

137 _creation, guid, _createtxg, _written, snapshot_name = cols 

138 if guid == last_guid: 

139 assert "#" in snapshot_name 

140 continue # ignore bookmarks whose snapshot still exists. also ignore dupes of bookmarks 

141 last_guid = guid 

142 key = (rel_dataset, guid) # ensures src snapshots and dst snapshots with the same GUID will be adjacent 

143 yield _ComparableSnapshot(key, cols) 

144 

145 def print_dataset(rel_dataset: str, entries: Iterable[tuple[str, _ComparableSnapshot]]) -> None: 

146 entries = sorted( # fetch all snapshots of current dataset and sort em by creation, createtxg, snapshot_tag 

147 entries, 

148 key=lambda entry: ( 

149 int((cols := entry[1].cols)[0]), # creation 

150 int(cols[2]), # createtxg 

151 cols[-1].replace("#", "@", 1).split("@", 1)[1], # snapshot_tag 

152 ), 

153 ) 

154 

155 @dataclass 

156 @final 

157 class SnapshotStats: 

158 snapshot_count: int = field(default=0) 

159 sum_written: int = field(default=0) 

160 snapshot_count_since: int = field(default=0) 

161 sum_written_since: int = field(default=0) 

162 latest_snapshot_idx: int | None = field(default=None) 

163 latest_snapshot_row_str: str | None = field(default=None) 

164 latest_snapshot_creation: str | None = field(default=None) 

165 oldest_snapshot_row_str: str | None = field(default=None) 

166 oldest_snapshot_creation: str | None = field(default=None) 

167 

168 # print metadata of snapshots of current dataset to TSV file; custom stats can later be computed from there 

169 stats: defaultdict[str, SnapshotStats] = defaultdict(SnapshotStats) 

170 header: str = "location creation_iso createtxg rel_name guid root_dataset rel_dataset name creation written" 

171 nonlocal is_first_row 

172 if is_first_row: 

173 fd.write(header.replace(" ", "\t") + "\n") 

174 is_first_row = False 

175 for i, entry in enumerate(entries): # entry is tuple[location:str, ComparableSnapshot] 

176 location: str = entry[0] # "src" or "dst" or "all" 

177 creation, guid, createtxg, written, name = entry[1].cols 

178 root_dataset: str = dst.root_dataset if location == CMP_CHOICES_ITEMS[1] else src.root_dataset 

179 rel_name: str = relativize_dataset(name, root_dataset) 

180 creation_iso: str = isotime_from_unixtime(int(creation)) 

181 row = (location, creation_iso, createtxg, rel_name, guid, root_dataset, rel_dataset, name, creation, written) 

182 # Example: src 2024-11-06_08:30:05 17435050 /foo@test_2024-11-06_08:30:05_daily 2406491805272097867 tank1/src /foo tank1/src/foo@test_2024-10-06_08:30:04_daily 1730878205 24576 

183 row_str = "\t".join(row) 

184 if not p.dry_run: 

185 fd.write(row_str + "\n") 

186 s = stats[location] 

187 s.snapshot_count += 1 

188 s.sum_written += int(written) if written != "-" else 0 

189 s.latest_snapshot_idx = i 

190 s.latest_snapshot_row_str = row_str 

191 s.latest_snapshot_creation = creation 

192 if not s.oldest_snapshot_row_str: 

193 s.oldest_snapshot_row_str = row_str 

194 s.oldest_snapshot_creation = creation 

195 

196 # for convenience, directly log basic summary stats of current dataset 

197 k = stats["all"].latest_snapshot_idx # defaults to None 

198 k = k if k is not None else -1 

199 for entry in entries[k + 1 :]: # aggregate basic stats since latest common snapshot 

200 location = entry[0] 

201 creation, guid, createtxg, written, name = entry[1].cols 

202 s = stats[location] 

203 s.snapshot_count_since += 1 

204 s.sum_written_since += int(written) if written != "-" else 0 

205 prefix: str = f"Comparing {rel_dataset}~" 

206 msgs: list[str] = [] 

207 msgs.append(f"{prefix} of {task}") 

208 msgs.append( 

209 f"{prefix} Q: No src snapshots are missing on dst, and no dst snapshots are missing on src, " 

210 "and there is a common snapshot? A: " 

211 + ( 

212 "n/a" 

213 if not is_src_dst_all 

214 else str( 

215 stats["src"].snapshot_count == 0 and stats["dst"].snapshot_count == 0 and stats["all"].snapshot_count > 0 

216 ) 

217 ) 

218 ) 

219 nonlocal now 

220 now = now or round(time.time()) # uses the same timestamp across the entire dataset tree 

221 latcom = "latest common snapshot" 

222 for loc in all_src_dst: 

223 s = stats[loc] 

224 msgs.append(f"{prefix} Latest snapshot only in {loc}: {s.latest_snapshot_row_str or 'n/a'}") 

225 msgs.append(f"{prefix} Oldest snapshot only in {loc}: {s.oldest_snapshot_row_str or 'n/a'}") 

226 msgs.append(f"{prefix} Snapshots only in {loc}: {s.snapshot_count}") 

227 msgs.append(f"{prefix} Snapshot data written only in {loc}: {human_readable_bytes(s.sum_written)}") 

228 if loc != "all": 

229 na = None if k >= 0 else "n/a" 

230 msgs.append(f"{prefix} Snapshots only in {loc} since {latcom}: {na or s.snapshot_count_since}") 

231 msgs.append( 

232 f"{prefix} Snapshot data written only in {loc} since {latcom}: " 

233 f"{na or human_readable_bytes(s.sum_written_since)}" 

234 ) 

235 all_creation = stats["all"].latest_snapshot_creation 

236 latest = ("latest", s.latest_snapshot_creation) 

237 oldest = ("oldest", s.oldest_snapshot_creation) 

238 for label, s_creation in latest, oldest: 

239 if loc != "all": 

240 hd = "n/a" 

241 if s_creation and k >= 0: 

242 assert all_creation is not None 

243 hd = human_readable_duration(int(all_creation) - int(s_creation), unit="s") 

244 msgs.append(f"{prefix} Time diff between {latcom} and {label} snapshot only in {loc}: {hd}") 

245 for label, s_creation in latest, oldest: 

246 hd = "n/a" if not s_creation else human_readable_duration(now - int(s_creation), unit="s") 

247 msgs.append(f"{prefix} Time diff between now and {label} snapshot only in {loc}: {hd}") 

248 log.info("%s", "\n".join(msgs)) 

249 

250 # setup streaming pipeline 

251 src_snapshot_itr: Iterator = snapshot_iterator(src.root_dataset, zfs_list_snapshot_iterator(src, src_datasets)) 

252 dst_snapshot_itr: Iterator = snapshot_iterator(dst.root_dataset, zfs_list_snapshot_iterator(dst, dst_datasets)) 

253 merge_itr = _merge_sorted_iterators(CMP_CHOICES_ITEMS, p.compare_snapshot_lists, src_snapshot_itr, dst_snapshot_itr) 

254 

255 interner: HashedInterner[str] = HashedInterner() # reduces memory footprint 

256 rel_datasets: dict[str, set[str]] = defaultdict(set) 

257 for datasets, remote in (src_datasets, src), (dst_datasets, dst): 

258 for dataset in datasets: # rel_dataset=/foo, root_dataset=tank1/src 

259 rel_datasets[remote.location].add(interner.intern(relativize_dataset(dataset, remote.root_dataset))) 

260 rel_src_or_dst: list[str] = sorted(rel_datasets["src"].union(rel_datasets["dst"])) 

261 

262 log.debug("%s", f"Temporary TSV output file comparing {task} is: {tmp_tsv_file}") 

263 with open_nofollow(tmp_tsv_file, "w", encoding="utf-8", perm=FILE_PERMISSIONS) as fd: 

264 # streaming group by rel_dataset (consumes constant memory only); entry is a Tuple[str, ComparableSnapshot] 

265 groups = itertools.groupby(merge_itr, key=lambda entry: entry[1].key[0]) 

266 _print_datasets(groups, lambda rel_ds, entries: print_dataset(rel_ds, entries), rel_src_or_dst) 

267 os.rename(tmp_tsv_file, tsv_file) 

268 log.info("%s", f"Final TSV output file comparing {task} is: {tsv_file}") 

269 

270 tsv_file = tsv_file[0 : tsv_file.rindex(".")] + ".rel_datasets_tsv" 

271 tmp_tsv_file = tsv_file + ".tmp" 

272 with open_nofollow(tmp_tsv_file, "w", encoding="utf-8", perm=FILE_PERMISSIONS) as fd: 

273 header: str = "location rel_dataset src_dataset dst_dataset" 

274 fd.write(header.replace(" ", "\t") + "\n") 

275 src_only: set[str] = rel_datasets["src"].difference(rel_datasets["dst"]) 

276 dst_only: set[str] = rel_datasets["dst"].difference(rel_datasets["src"]) 

277 for rel_dataset in rel_src_or_dst: 

278 loc = "src" if rel_dataset in src_only else "dst" if rel_dataset in dst_only else "all" 

279 src_dataset = src.root_dataset + rel_dataset if rel_dataset not in dst_only else "" 

280 dst_dataset = dst.root_dataset + rel_dataset if rel_dataset not in src_only else "" 

281 row = (loc, rel_dataset, src_dataset, dst_dataset) # Example: all /foo/bar tank1/src/foo/bar tank2/dst/foo/bar 

282 if not p.dry_run: 

283 fd.write("\t".join(row) + "\n") 

284 os.rename(tmp_tsv_file, tsv_file) 

285 

286 

287def _print_datasets(groups: itertools.groupby, fn: Callable[[str, Iterable], None], rel_datasets: Iterable[str]) -> None: 

288 """Iterate over grouped datasets and apply fn, adding gaps for missing ones.""" 

289 rel_datasets = sorted(rel_datasets) 

290 n = len(rel_datasets) 

291 i = 0 

292 for rel_dataset, entries in groups: 

293 while i < n and rel_datasets[i] < rel_dataset: 

294 fn(rel_datasets[i], []) # Also print summary stats for datasets whose snapshot stream is empty 

295 i += 1 

296 assert i >= n or rel_datasets[i] == rel_dataset 

297 i += 1 

298 fn(rel_dataset, entries) 

299 while i < n: 

300 fn(rel_datasets[i], []) # Also print summary stats for datasets whose snapshot stream is empty 

301 i += 1 

302 

303 

304def _merge_sorted_iterators( 

305 choices: Sequence[str], # ["src", "dst", "all"] 

306 choice: str, # Example: "src+dst+all" 

307 src_itr: Iterator[_ComparableSnapshot], 

308 dst_itr: Iterator[_ComparableSnapshot], 

309) -> Iterator[tuple[str, _ComparableSnapshot] | tuple[str, _ComparableSnapshot, _ComparableSnapshot]]: 

310 """The typical pipelined merge algorithm of a merge sort, slightly adapted to our specific use case.""" 

311 assert len(choices) == 3 

312 assert choice 

313 flags: int = 0 

314 for i, item in enumerate(choices): 

315 if item in choice: 

316 flags |= 1 << i 

317 src_next, dst_next = run_in_parallel(lambda: next(src_itr, None), lambda: next(dst_itr, None)) 

318 while not (src_next is None and dst_next is None): 

319 if src_next == dst_next: 

320 n = 2 

321 if (flags & (1 << n)) != 0: 

322 assert src_next is not None 

323 assert dst_next is not None 

324 yield choices[n], src_next, dst_next 

325 src_next = next(src_itr, None) 

326 dst_next = next(dst_itr, None) 

327 elif src_next is None or (dst_next is not None and dst_next < src_next): 

328 n = 1 

329 if (flags & (1 << n)) != 0: 

330 assert dst_next is not None 

331 yield choices[n], dst_next 

332 dst_next = next(dst_itr, None) 

333 else: 

334 n = 0 

335 if (flags & (1 << n)) != 0: 

336 yield choices[n], src_next 

337 src_next = next(src_itr, None)