Coverage for bzfs_main/compare_snapshot_lists.py: 100%
202 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Implementation of bzfs --compare-snapshot-lists."""
17from __future__ import (
18 annotations,
19)
20import itertools
21import os
22import time
23from collections import (
24 defaultdict,
25)
26from collections.abc import (
27 Iterable,
28 Iterator,
29 Sequence,
30)
31from dataclasses import (
32 dataclass,
33 field,
34)
35from typing import (
36 TYPE_CHECKING,
37 Callable,
38)
40from bzfs_main.argparse_cli import (
41 CMP_CHOICES_ITEMS,
42)
43from bzfs_main.detect import (
44 are_bookmarks_enabled,
45)
46from bzfs_main.filter import (
47 filter_snapshots,
48)
49from bzfs_main.parallel_batch_cmd import (
50 zfs_list_snapshots_in_parallel,
51)
52from bzfs_main.parallel_iterator import (
53 run_in_parallel,
54)
55from bzfs_main.utils import (
56 DIR_PERMISSIONS,
57 FILE_PERMISSIONS,
58 Interner,
59 T,
60 human_readable_bytes,
61 human_readable_duration,
62 isotime_from_unixtime,
63 open_nofollow,
64 relativize_dataset,
65)
67if TYPE_CHECKING: # pragma: no cover - for type hints only
68 from bzfs_main.bzfs import (
69 Job,
70 )
71 from bzfs_main.configuration import (
72 Remote,
73 )
76@dataclass(order=True, frozen=True)
77class _ComparableSnapshot:
78 """Snapshot entry comparable by rel_dataset and GUID for sorting and merging."""
80 key: tuple[str, str] # rel_dataset, guid
81 cols: list[str] = field(compare=False) # excluded from comparison/equality checks
84def run_compare_snapshot_lists(job: Job, src_datasets: list[str], dst_datasets: list[str]) -> None:
85 """Compares source and destination dataset trees recursively with respect to snapshots, for example to check if all
86 recently taken snapshots have been successfully replicated by a periodic job; implements --compare-snapshot-lists.
88 Lists snapshots only contained in source (tagged with 'src'), only contained in destination (tagged with 'dst'), and
89 contained in both source and destination (tagged with 'all'), in the form of a TSV file, along with other snapshot
90 metadata.
92 Implemented with a time and space efficient streaming algorithm; easily scales to millions of datasets and any number of
93 snapshots. Time complexity is O(max(N log N, M log M)) where N is the number of datasets and M is the number of snapshots
94 per dataset. Space complexity is O(max(N, M)). Assumes that both src_datasets and dst_datasets are sorted.
95 """
96 p, log = job.params, job.params.log
97 src, dst = p.src, p.dst
98 task: str = src.root_dataset + " vs. " + dst.root_dataset
99 tsv_dir: str = p.log_params.log_file[0 : -len(".log")] + ".cmp"
100 os.makedirs(tsv_dir, mode=DIR_PERMISSIONS, exist_ok=True)
101 tsv_file: str = os.path.join(tsv_dir, (src.root_dataset + "%" + dst.root_dataset).replace("/", "~") + ".tsv")
102 tmp_tsv_file: str = tsv_file + ".tmp"
103 compare_snapshot_lists: set[str] = set(p.compare_snapshot_lists.split("+"))
104 is_src_dst_all: bool = all(choice in compare_snapshot_lists for choice in CMP_CHOICES_ITEMS)
105 all_src_dst: list[str] = [loc for loc in ("all", "src", "dst") if loc in compare_snapshot_lists]
106 is_first_row: bool = True
107 now: int | None = None
109 def zfs_list_snapshot_iterator(r: Remote, sorted_datasets: list[str]) -> Iterator[str]:
110 """Lists snapshots sorted by dataset name; All snapshots of a given dataset will be adjacent."""
111 assert (not job.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted"
112 # Also see https://openzfs.github.io/openzfs-docs/man/master/7/zfsprops.7.html#written
113 props: str = job.creation_prefix + "creation,guid,createtxg,written,name"
114 types: str = "snapshot"
115 if p.use_bookmark and r.location == "src" and are_bookmarks_enabled(p, r):
116 types = "snapshot,bookmark" # output list ordering: intentionally makes bookmarks appear *after* snapshots
117 cmd: list[str] = p.split_args(f"{p.zfs_program} list -t {types} -d 1 -Hp -o {props}") # sorted by dataset, createtxg
118 for lines in zfs_list_snapshots_in_parallel(job, r, cmd, sorted_datasets):
119 yield from lines
121 def snapshot_iterator(root_dataset: str, sorted_itr: Iterator[str]) -> Iterator[_ComparableSnapshot]:
122 """Splits/groups snapshot stream into distinct datasets, sorts by GUID within a dataset such that any two snapshots
123 with the same GUID will lie adjacent to each other during the upcoming phase that merges src snapshots and dst
124 snapshots."""
125 # streaming group by dataset name (consumes constant memory only)
126 for dataset, group in itertools.groupby(
127 sorted_itr, key=lambda line: line.rsplit("\t", 1)[1].replace("#", "@", 1).split("@", 1)[0]
128 ):
129 snapshots: list[str] = list(group) # fetch all snapshots of current dataset, e.g. dataset=tank1/src/foo
130 snapshots = filter_snapshots(job, snapshots, filter_bookmarks=True) # apply include/exclude policy
131 snapshots.sort(key=lambda line: line.split("\t", 2)[1]) # stable sort by GUID (2nd remains createtxg)
132 rel_dataset: str = relativize_dataset(dataset, root_dataset) # rel_dataset=/foo, root_dataset=tank1/src
133 last_guid: str = ""
134 for line in snapshots:
135 cols = line.split("\t")
136 _creation, guid, _createtxg, _written, snapshot_name = cols
137 if guid == last_guid:
138 assert "#" in snapshot_name
139 continue # ignore bookmarks whose snapshot still exists. also ignore dupes of bookmarks
140 last_guid = guid
141 key = (rel_dataset, guid) # ensures src snapshots and dst snapshots with the same GUID will be adjacent
142 yield _ComparableSnapshot(key, cols)
144 def print_dataset(rel_dataset: str, entries: Iterable[tuple[str, _ComparableSnapshot]]) -> None:
145 entries = sorted( # fetch all snapshots of current dataset and sort em by creation, createtxg, snapshot_tag
146 entries,
147 key=lambda entry: (
148 int((cols := entry[1].cols)[0]), # creation
149 int(cols[2]), # createtxg
150 cols[-1].replace("#", "@", 1).split("@", 1)[1], # snapshot_tag
151 ),
152 )
154 @dataclass
155 class SnapshotStats:
156 snapshot_count: int = field(default=0)
157 sum_written: int = field(default=0)
158 snapshot_count_since: int = field(default=0)
159 sum_written_since: int = field(default=0)
160 latest_snapshot_idx: int | None = field(default=None)
161 latest_snapshot_row_str: str | None = field(default=None)
162 latest_snapshot_creation: str | None = field(default=None)
163 oldest_snapshot_row_str: str | None = field(default=None)
164 oldest_snapshot_creation: str | None = field(default=None)
166 # print metadata of snapshots of current dataset to TSV file; custom stats can later be computed from there
167 stats: defaultdict[str, SnapshotStats] = defaultdict(SnapshotStats)
168 header: str = "location creation_iso createtxg rel_name guid root_dataset rel_dataset name creation written"
169 nonlocal is_first_row
170 if is_first_row:
171 fd.write(header.replace(" ", "\t") + "\n")
172 is_first_row = False
173 for i, entry in enumerate(entries): # entry is tuple[location:str, ComparableSnapshot]
174 location: str = entry[0] # "src" or "dst" or "all"
175 creation, guid, createtxg, written, name = entry[1].cols
176 root_dataset: str = dst.root_dataset if location == CMP_CHOICES_ITEMS[1] else src.root_dataset
177 rel_name: str = relativize_dataset(name, root_dataset)
178 creation_iso: str = isotime_from_unixtime(int(creation))
179 row = (location, creation_iso, createtxg, rel_name, guid, root_dataset, rel_dataset, name, creation, written)
180 # Example: src 2024-11-06_08:30:05 17435050 /foo@test_2024-11-06_08:30:05_daily 2406491805272097867 tank1/src /foo tank1/src/foo@test_2024-10-06_08:30:04_daily 1730878205 24576
181 row_str = "\t".join(row)
182 if not p.dry_run:
183 fd.write(row_str + "\n")
184 s = stats[location]
185 s.snapshot_count += 1
186 s.sum_written += int(written) if written != "-" else 0
187 s.latest_snapshot_idx = i
188 s.latest_snapshot_row_str = row_str
189 s.latest_snapshot_creation = creation
190 if not s.oldest_snapshot_row_str:
191 s.oldest_snapshot_row_str = row_str
192 s.oldest_snapshot_creation = creation
194 # for convenience, directly log basic summary stats of current dataset
195 k = stats["all"].latest_snapshot_idx # defaults to None
196 k = k if k is not None else -1
197 for entry in entries[k + 1 :]: # aggregate basic stats since latest common snapshot
198 location = entry[0]
199 creation, guid, createtxg, written, name = entry[1].cols
200 s = stats[location]
201 s.snapshot_count_since += 1
202 s.sum_written_since += int(written) if written != "-" else 0
203 prefix: str = f"Comparing {rel_dataset}~"
204 msgs: list[str] = []
205 msgs.append(f"{prefix} of {task}")
206 msgs.append(
207 f"{prefix} Q: No src snapshots are missing on dst, and no dst snapshots are missing on src, "
208 "and there is a common snapshot? A: "
209 + (
210 "n/a"
211 if not is_src_dst_all
212 else str(
213 stats["src"].snapshot_count == 0 and stats["dst"].snapshot_count == 0 and stats["all"].snapshot_count > 0
214 )
215 )
216 )
217 nonlocal now
218 now = now or round(time.time()) # uses the same timestamp across the entire dataset tree
219 latcom = "latest common snapshot"
220 for loc in all_src_dst:
221 s = stats[loc]
222 msgs.append(f"{prefix} Latest snapshot only in {loc}: {s.latest_snapshot_row_str or 'n/a'}")
223 msgs.append(f"{prefix} Oldest snapshot only in {loc}: {s.oldest_snapshot_row_str or 'n/a'}")
224 msgs.append(f"{prefix} Snapshots only in {loc}: {s.snapshot_count}")
225 msgs.append(f"{prefix} Snapshot data written only in {loc}: {human_readable_bytes(s.sum_written)}")
226 if loc != "all":
227 na = None if k >= 0 else "n/a"
228 msgs.append(f"{prefix} Snapshots only in {loc} since {latcom}: {na or s.snapshot_count_since}")
229 msgs.append(
230 f"{prefix} Snapshot data written only in {loc} since {latcom}: "
231 f"{na or human_readable_bytes(s.sum_written_since)}"
232 )
233 all_creation = stats["all"].latest_snapshot_creation
234 latest = ("latest", s.latest_snapshot_creation)
235 oldest = ("oldest", s.oldest_snapshot_creation)
236 for label, s_creation in latest, oldest:
237 if loc != "all":
238 hd = "n/a"
239 if s_creation and k >= 0:
240 assert all_creation is not None
241 hd = human_readable_duration(int(all_creation) - int(s_creation), unit="s")
242 msgs.append(f"{prefix} Time diff between {latcom} and {label} snapshot only in {loc}: {hd}")
243 for label, s_creation in latest, oldest:
244 hd = "n/a" if not s_creation else human_readable_duration(now - int(s_creation), unit="s")
245 msgs.append(f"{prefix} Time diff between now and {label} snapshot only in {loc}: {hd}")
246 log.info("%s", "\n".join(msgs))
248 # setup streaming pipeline
249 src_snapshot_itr: Iterator = snapshot_iterator(src.root_dataset, zfs_list_snapshot_iterator(src, src_datasets))
250 dst_snapshot_itr: Iterator = snapshot_iterator(dst.root_dataset, zfs_list_snapshot_iterator(dst, dst_datasets))
251 merge_itr = _merge_sorted_iterators(CMP_CHOICES_ITEMS, p.compare_snapshot_lists, src_snapshot_itr, dst_snapshot_itr)
253 interner: Interner[str] = Interner() # reduces memory footprint
254 rel_datasets: dict[str, set[str]] = defaultdict(set)
255 for datasets, remote in (src_datasets, src), (dst_datasets, dst):
256 for dataset in datasets: # rel_dataset=/foo, root_dataset=tank1/src
257 rel_datasets[remote.location].add(interner.intern(relativize_dataset(dataset, remote.root_dataset)))
258 rel_src_or_dst: list[str] = sorted(rel_datasets["src"].union(rel_datasets["dst"]))
260 log.debug("%s", f"Temporary TSV output file comparing {task} is: {tmp_tsv_file}")
261 with open_nofollow(tmp_tsv_file, "w", encoding="utf-8", perm=FILE_PERMISSIONS) as fd:
262 # streaming group by rel_dataset (consumes constant memory only); entry is a Tuple[str, ComparableSnapshot]
263 groups = itertools.groupby(merge_itr, key=lambda entry: entry[1].key[0])
264 _print_datasets(groups, lambda rel_ds, entries: print_dataset(rel_ds, entries), rel_src_or_dst)
265 os.rename(tmp_tsv_file, tsv_file)
266 log.info("%s", f"Final TSV output file comparing {task} is: {tsv_file}")
268 tsv_file = tsv_file[0 : tsv_file.rindex(".")] + ".rel_datasets_tsv"
269 tmp_tsv_file = tsv_file + ".tmp"
270 with open_nofollow(tmp_tsv_file, "w", encoding="utf-8", perm=FILE_PERMISSIONS) as fd:
271 header: str = "location rel_dataset src_dataset dst_dataset"
272 fd.write(header.replace(" ", "\t") + "\n")
273 src_only: set[str] = rel_datasets["src"].difference(rel_datasets["dst"])
274 dst_only: set[str] = rel_datasets["dst"].difference(rel_datasets["src"])
275 for rel_dataset in rel_src_or_dst:
276 loc = "src" if rel_dataset in src_only else "dst" if rel_dataset in dst_only else "all"
277 src_dataset = src.root_dataset + rel_dataset if rel_dataset not in dst_only else ""
278 dst_dataset = dst.root_dataset + rel_dataset if rel_dataset not in src_only else ""
279 row = (loc, rel_dataset, src_dataset, dst_dataset) # Example: all /foo/bar tank1/src/foo/bar tank2/dst/foo/bar
280 if not p.dry_run:
281 fd.write("\t".join(row) + "\n")
282 os.rename(tmp_tsv_file, tsv_file)
285def _print_datasets(groups: itertools.groupby, fn: Callable[[str, Iterable], None], rel_datasets: Iterable[str]) -> None:
286 """Iterate over grouped datasets and apply fn, adding gaps for missing ones."""
287 rel_datasets = sorted(rel_datasets)
288 n = len(rel_datasets)
289 i = 0
290 for rel_dataset, entries in groups:
291 while i < n and rel_datasets[i] < rel_dataset:
292 fn(rel_datasets[i], []) # Also print summary stats for datasets whose snapshot stream is empty
293 i += 1
294 assert i >= n or rel_datasets[i] == rel_dataset
295 i += 1
296 fn(rel_dataset, entries)
297 while i < n:
298 fn(rel_datasets[i], []) # Also print summary stats for datasets whose snapshot stream is empty
299 i += 1
302def _merge_sorted_iterators(
303 choices: Sequence[str], # ["src", "dst", "all"]
304 choice: str, # Example: "src+dst+all"
305 src_itr: Iterator[T],
306 dst_itr: Iterator[T],
307) -> Iterator[tuple[str, T] | tuple[str, T, T]]:
308 """The typical pipelined merge algorithm of a merge sort, slightly adapted to our specific use case."""
309 assert len(choices) == 3
310 assert choice
311 flags: int = 0
312 for i, item in enumerate(choices):
313 if item in choice:
314 flags |= 1 << i
315 src_next, dst_next = run_in_parallel(lambda: next(src_itr, None), lambda: next(dst_itr, None))
316 while not (src_next is None and dst_next is None):
317 if src_next == dst_next:
318 n = 2
319 if (flags & (1 << n)) != 0:
320 assert src_next is not None
321 assert dst_next is not None
322 yield choices[n], src_next, dst_next
323 src_next = next(src_itr, None)
324 dst_next = next(dst_itr, None)
325 elif src_next is None or (dst_next is not None and dst_next < src_next):
326 n = 1
327 if (flags & (1 << n)) != 0:
328 assert dst_next is not None
329 yield choices[n], dst_next
330 dst_next = next(dst_itr, None)
331 else:
332 n = 0
333 if (flags & (1 << n)) != 0:
334 yield choices[n], src_next
335 src_next = next(src_itr, None)