Coverage for bzfs_main/compare_snapshot_lists.py: 100%

207 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-06 13:30 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Implementation of bzfs --compare-snapshot-lists.""" 

16 

17from __future__ import annotations 

18import itertools 

19import os 

20import time 

21from collections import defaultdict 

22from dataclasses import dataclass, field 

23from typing import ( 

24 TYPE_CHECKING, 

25 Callable, 

26 Generator, 

27 Iterable, 

28 Iterator, 

29 Sequence, 

30) 

31 

32from bzfs_main.argparse_cli import ( 

33 CMP_CHOICES_ITEMS, 

34) 

35from bzfs_main.detect import ( 

36 are_bookmarks_enabled, 

37 is_solaris_zfs, 

38) 

39from bzfs_main.filter import ( 

40 filter_snapshots, 

41) 

42from bzfs_main.parallel_batch_cmd import ( 

43 zfs_list_snapshots_in_parallel, 

44) 

45from bzfs_main.parallel_iterator import ( 

46 run_in_parallel, 

47) 

48from bzfs_main.utils import ( 

49 FILE_PERMISSIONS, 

50 Interner, 

51 T, 

52 human_readable_bytes, 

53 human_readable_duration, 

54 isotime_from_unixtime, 

55 open_nofollow, 

56 relativize_dataset, 

57) 

58 

59if TYPE_CHECKING: # pragma: no cover - for type hints only 

60 from bzfs_main.bzfs import Job 

61 from bzfs_main.configuration import Remote 

62 

63 

64@dataclass(order=True, frozen=True) 

65class ComparableSnapshot: 

66 """Snapshot entry comparable by rel_dataset and GUID for sorting and merging.""" 

67 

68 key: tuple[str, str] # rel_dataset, guid 

69 cols: list[str] = field(compare=False) 

70 

71 

72def run_compare_snapshot_lists(job: Job, src_datasets: list[str], dst_datasets: list[str]) -> None: 

73 """Compares source and destination dataset trees recursively with respect to snapshots, for example to check if all 

74 recently taken snapshots have been successfully replicated by a periodic job; implements --compare-snapshot-lists. 

75 

76 Lists snapshots only contained in source (tagged with 'src'), only contained in destination (tagged with 'dst'), and 

77 contained in both source and destination (tagged with 'all'), in the form of a TSV file, along with other snapshot 

78 metadata. Implemented with a time and space efficient streaming algorithm; easily scales to millions of datasets and any 

79 number of snapshots. Assumes that both src_datasets and dst_datasets are sorted. 

80 """ 

81 p, log = job.params, job.params.log 

82 src, dst = p.src, p.dst 

83 task: str = src.root_dataset + " vs. " + dst.root_dataset 

84 tsv_dir: str = p.log_params.log_file[0 : -len(".log")] + ".cmp" 

85 os.makedirs(tsv_dir, exist_ok=True) 

86 tsv_file: str = os.path.join(tsv_dir, (src.root_dataset + "%" + dst.root_dataset).replace("/", "~") + ".tsv") 

87 tmp_tsv_file: str = tsv_file + ".tmp" 

88 compare_snapshot_lists: set[str] = set(p.compare_snapshot_lists.split("+")) 

89 is_src_dst_all: bool = all(choice in compare_snapshot_lists for choice in CMP_CHOICES_ITEMS) 

90 all_src_dst: list[str] = [loc for loc in ("all", "src", "dst") if loc in compare_snapshot_lists] 

91 is_first_row: bool = True 

92 now: int | None = None 

93 

94 def zfs_list_snapshot_iterator(r: Remote, sorted_datasets: list[str]) -> Generator[str, None, None]: 

95 """Lists snapshots sorted by dataset name; All snapshots of a given dataset will be adjacent.""" 

96 assert (not job.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted" 

97 written_zfs_prop: str = "written" # https://openzfs.github.io/openzfs-docs/man/master/7/zfsprops.7.html#written 

98 if is_solaris_zfs(p, r): # solaris-11.4 zfs does not know the "written" ZFS snapshot property 

99 written_zfs_prop = "type" # for simplicity, fill in the non-integer dummy constant type="snapshot" 

100 props: str = job.creation_prefix + f"creation,guid,createtxg,{written_zfs_prop},name" 

101 types: str = "snapshot" 

102 if p.use_bookmark and r.location == "src" and are_bookmarks_enabled(p, r): 

103 types = "snapshot,bookmark" # output list ordering: intentionally makes bookmarks appear *after* snapshots 

104 cmd: list[str] = p.split_args(f"{p.zfs_program} list -t {types} -d 1 -Hp -o {props}") # sorted by dataset, createtxg 

105 for lines in zfs_list_snapshots_in_parallel(job, r, cmd, sorted_datasets): 

106 yield from lines 

107 

108 def snapshot_iterator( 

109 root_dataset: str, sorted_itr: Generator[str, None, None] 

110 ) -> Generator[ComparableSnapshot, None, None]: 

111 """Splits/groups snapshot stream into distinct datasets, sorts by GUID within a dataset such that any two snapshots 

112 with the same GUID will lie adjacent to each other during the upcoming phase that merges src snapshots and dst 

113 snapshots.""" 

114 # streaming group by dataset name (consumes constant memory only) 

115 for dataset, group in itertools.groupby( 

116 sorted_itr, key=lambda line: line[line.rindex("\t") + 1 : line.replace("#", "@").index("@")] 

117 ): 

118 snapshots: list[str] = list(group) # fetch all snapshots of current dataset, e.g. dataset=tank1/src/foo 

119 snapshots = filter_snapshots(job, snapshots, filter_bookmarks=True) # apply include/exclude policy 

120 snapshots.sort(key=lambda line: line.split("\t", 2)[1]) # stable sort by GUID (2nd remains createtxg) 

121 rel_dataset: str = relativize_dataset(dataset, root_dataset) # rel_dataset=/foo, root_dataset=tank1/src 

122 last_guid: str = "" 

123 for line in snapshots: 

124 cols = line.split("\t") 

125 creation, guid, createtxg, written, snapshot_name = cols 

126 if guid == last_guid: 

127 assert "#" in snapshot_name 

128 continue # ignore bookmarks whose snapshot still exists. also ignore dupes of bookmarks 

129 last_guid = guid 

130 if written == "snapshot": 

131 written = "-" # sanitize solaris-11.4 work-around (solaris-11.4 also has no bookmark feature) 

132 cols = [creation, guid, createtxg, written, snapshot_name] 

133 key = (rel_dataset, guid) # ensures src snapshots and dst snapshots with the same GUID will be adjacent 

134 yield ComparableSnapshot(key, cols) 

135 

136 def print_dataset(rel_dataset: str, entries: Iterable[tuple[str, ComparableSnapshot]]) -> None: 

137 entries = sorted( # fetch all snapshots of current dataset and sort em by creation, createtxg, snapshot_tag 

138 entries, 

139 key=lambda entry: ( 

140 int((cols := entry[1].cols)[0]), 

141 int(cols[2]), 

142 (snapshot_name := cols[-1])[snapshot_name.replace("#", "@").index("@") + 1 :], 

143 ), 

144 ) 

145 

146 @dataclass 

147 class SnapshotStats: 

148 snapshot_count: int = field(default=0) 

149 sum_written: int = field(default=0) 

150 snapshot_count_since: int = field(default=0) 

151 sum_written_since: int = field(default=0) 

152 latest_snapshot_idx: int | None = field(default=None) 

153 latest_snapshot_row_str: str | None = field(default=None) 

154 latest_snapshot_creation: str | None = field(default=None) 

155 oldest_snapshot_row_str: str | None = field(default=None) 

156 oldest_snapshot_creation: str | None = field(default=None) 

157 

158 # print metadata of snapshots of current dataset to TSV file; custom stats can later be computed from there 

159 stats: defaultdict[str, SnapshotStats] = defaultdict(SnapshotStats) 

160 header: str = "location creation_iso createtxg rel_name guid root_dataset rel_dataset name creation written" 

161 nonlocal is_first_row 

162 if is_first_row: 

163 fd.write(header.replace(" ", "\t") + "\n") 

164 is_first_row = False 

165 for i, entry in enumerate(entries): # entry is tuple[location:str, ComparableSnapshot] 

166 location: str = entry[0] # "src" or "dst" or "all" 

167 creation, guid, createtxg, written, name = entry[1].cols 

168 root_dataset: str = dst.root_dataset if location == CMP_CHOICES_ITEMS[1] else src.root_dataset 

169 rel_name: str = relativize_dataset(name, root_dataset) 

170 creation_iso: str = isotime_from_unixtime(int(creation)) 

171 row = (location, creation_iso, createtxg, rel_name, guid, root_dataset, rel_dataset, name, creation, written) 

172 # Example: src 2024-11-06_08:30:05 17435050 /foo@test_2024-11-06_08:30:05_daily 2406491805272097867 tank1/src /foo tank1/src/foo@test_2024-10-06_08:30:04_daily 1730878205 24576 

173 row_str = "\t".join(row) 

174 if not p.dry_run: 

175 fd.write(row_str + "\n") 

176 s = stats[location] 

177 s.snapshot_count += 1 

178 s.sum_written += int(written) if written != "-" else 0 

179 s.latest_snapshot_idx = i 

180 s.latest_snapshot_row_str = row_str 

181 s.latest_snapshot_creation = creation 

182 if not s.oldest_snapshot_row_str: 

183 s.oldest_snapshot_row_str = row_str 

184 s.oldest_snapshot_creation = creation 

185 

186 # for convenience, directly log basic summary stats of current dataset 

187 k = stats["all"].latest_snapshot_idx # defaults to None 

188 k = k if k is not None else -1 

189 for entry in entries[k + 1 :]: # aggregate basic stats since latest common snapshot 

190 location = entry[0] 

191 creation, guid, createtxg, written, name = entry[1].cols 

192 s = stats[location] 

193 s.snapshot_count_since += 1 

194 s.sum_written_since += int(written) if written != "-" else 0 

195 prefix: str = f"Comparing {rel_dataset}~" 

196 msgs: list[str] = [] 

197 msgs.append(f"{prefix} of {task}") 

198 msgs.append( 

199 f"{prefix} Q: No src snapshots are missing on dst, and no dst snapshots are missing on src, " 

200 "and there is a common snapshot? A: " 

201 + ( 

202 "n/a" 

203 if not is_src_dst_all 

204 else str( 

205 stats["src"].snapshot_count == 0 and stats["dst"].snapshot_count == 0 and stats["all"].snapshot_count > 0 

206 ) 

207 ) 

208 ) 

209 nonlocal now 

210 now = now or round(time.time()) # uses the same timestamp across the entire dataset tree 

211 latcom = "latest common snapshot" 

212 for loc in all_src_dst: 

213 s = stats[loc] 

214 msgs.append(f"{prefix} Latest snapshot only in {loc}: {s.latest_snapshot_row_str or 'n/a'}") 

215 msgs.append(f"{prefix} Oldest snapshot only in {loc}: {s.oldest_snapshot_row_str or 'n/a'}") 

216 msgs.append(f"{prefix} Snapshots only in {loc}: {s.snapshot_count}") 

217 msgs.append(f"{prefix} Snapshot data written only in {loc}: {human_readable_bytes(s.sum_written)}") 

218 if loc != "all": 

219 na = None if k >= 0 else "n/a" 

220 msgs.append(f"{prefix} Snapshots only in {loc} since {latcom}: {na or s.snapshot_count_since}") 

221 msgs.append( 

222 f"{prefix} Snapshot data written only in {loc} since {latcom}: " 

223 f"{na or human_readable_bytes(s.sum_written_since)}" 

224 ) 

225 all_creation = stats["all"].latest_snapshot_creation 

226 latest = ("latest", s.latest_snapshot_creation) 

227 oldest = ("oldest", s.oldest_snapshot_creation) 

228 for label, s_creation in latest, oldest: 

229 if loc != "all": 

230 hd = "n/a" 

231 if s_creation and k >= 0: 

232 assert all_creation is not None 

233 hd = human_readable_duration(int(all_creation) - int(s_creation), unit="s") 

234 msgs.append(f"{prefix} Time diff between {latcom} and {label} snapshot only in {loc}: {hd}") 

235 for label, s_creation in latest, oldest: 

236 hd = "n/a" if not s_creation else human_readable_duration(now - int(s_creation), unit="s") 

237 msgs.append(f"{prefix} Time diff between now and {label} snapshot only in {loc}: {hd}") 

238 log.info("%s", "\n".join(msgs)) 

239 

240 # setup streaming pipeline 

241 src_snapshot_itr: Iterator = snapshot_iterator(src.root_dataset, zfs_list_snapshot_iterator(src, src_datasets)) 

242 dst_snapshot_itr: Iterator = snapshot_iterator(dst.root_dataset, zfs_list_snapshot_iterator(dst, dst_datasets)) 

243 merge_itr = _merge_sorted_iterators(CMP_CHOICES_ITEMS, p.compare_snapshot_lists, src_snapshot_itr, dst_snapshot_itr) 

244 

245 interner: Interner[str] = Interner() # reduces memory footprint 

246 rel_datasets: dict[str, set[str]] = defaultdict(set) 

247 for datasets, remote in (src_datasets, src), (dst_datasets, dst): 

248 for dataset in datasets: # rel_dataset=/foo, root_dataset=tank1/src 

249 rel_datasets[remote.location].add(interner.intern(relativize_dataset(dataset, remote.root_dataset))) 

250 rel_src_or_dst: list[str] = sorted(rel_datasets["src"].union(rel_datasets["dst"])) 

251 

252 log.debug("%s", f"Temporary TSV output file comparing {task} is: {tmp_tsv_file}") 

253 with open_nofollow(tmp_tsv_file, "w", encoding="utf-8", perm=FILE_PERMISSIONS) as fd: 

254 # streaming group by rel_dataset (consumes constant memory only); entry is a Tuple[str, ComparableSnapshot] 

255 group = itertools.groupby(merge_itr, key=lambda entry: entry[1].key[0]) 

256 _print_datasets(group, lambda rel_ds, entries: print_dataset(rel_ds, entries), rel_src_or_dst) 

257 os.rename(tmp_tsv_file, tsv_file) 

258 log.info("%s", f"Final TSV output file comparing {task} is: {tsv_file}") 

259 

260 tsv_file = tsv_file[0 : tsv_file.rindex(".")] + ".rel_datasets_tsv" 

261 tmp_tsv_file = tsv_file + ".tmp" 

262 with open_nofollow(tmp_tsv_file, "w", encoding="utf-8", perm=FILE_PERMISSIONS) as fd: 

263 header: str = "location rel_dataset src_dataset dst_dataset" 

264 fd.write(header.replace(" ", "\t") + "\n") 

265 src_only: set[str] = rel_datasets["src"].difference(rel_datasets["dst"]) 

266 dst_only: set[str] = rel_datasets["dst"].difference(rel_datasets["src"]) 

267 for rel_dataset in rel_src_or_dst: 

268 loc = "src" if rel_dataset in src_only else "dst" if rel_dataset in dst_only else "all" 

269 src_dataset = src.root_dataset + rel_dataset if rel_dataset not in dst_only else "" 

270 dst_dataset = dst.root_dataset + rel_dataset if rel_dataset not in src_only else "" 

271 row = (loc, rel_dataset, src_dataset, dst_dataset) # Example: all /foo/bar tank1/src/foo/bar tank2/dst/foo/bar 

272 if not p.dry_run: 

273 fd.write("\t".join(row) + "\n") 

274 os.rename(tmp_tsv_file, tsv_file) 

275 

276 

277def _print_datasets(group: itertools.groupby, fn: Callable[[str, Iterable], None], rel_datasets: Iterable[str]) -> None: 

278 """Iterate over grouped datasets and apply fn, adding gaps for missing ones.""" 

279 rel_datasets = sorted(rel_datasets) 

280 n = len(rel_datasets) 

281 i = 0 

282 for rel_dataset, entries in group: 

283 while i < n and rel_datasets[i] < rel_dataset: 

284 fn(rel_datasets[i], []) # Also print summary stats for datasets whose snapshot stream is empty 

285 i += 1 

286 assert i >= n or rel_datasets[i] == rel_dataset 

287 i += 1 

288 fn(rel_dataset, entries) 

289 while i < n: 

290 fn(rel_datasets[i], []) # Also print summary stats for datasets whose snapshot stream is empty 

291 i += 1 

292 

293 

294def _merge_sorted_iterators( 

295 choices: Sequence[str], # ["src", "dst", "all"] 

296 choice: str, # Example: "src+dst+all" 

297 src_itr: Iterator[T], 

298 dst_itr: Iterator[T], 

299) -> Generator[tuple[str, T] | tuple[str, T, T], None, None]: 

300 """The typical pipelined merge algorithm of a merge sort, slightly adapted to our specific use case.""" 

301 assert len(choices) == 3 

302 assert choice 

303 flags: int = 0 

304 for i, item in enumerate(choices): 

305 if item in choice: 

306 flags |= 1 << i 

307 src_next, dst_next = run_in_parallel(lambda: next(src_itr, None), lambda: next(dst_itr, None)) 

308 while not (src_next is None and dst_next is None): 

309 if src_next == dst_next: 

310 n = 2 

311 if (flags & (1 << n)) != 0: 

312 assert src_next is not None 

313 assert dst_next is not None 

314 yield choices[n], src_next, dst_next 

315 src_next = next(src_itr, None) 

316 dst_next = next(dst_itr, None) 

317 elif src_next is None or (dst_next is not None and dst_next < src_next): 

318 n = 1 

319 if (flags & (1 << n)) != 0: 

320 assert dst_next is not None 

321 yield choices[n], dst_next 

322 dst_next = next(dst_itr, None) 

323 else: 

324 n = 0 

325 if (flags & (1 << n)) != 0: 

326 yield choices[n], src_next 

327 src_next = next(src_itr, None)