Coverage for bzfs_main / compare_snapshot_lists.py: 100%
204 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Implementation of bzfs --compare-snapshot-lists."""
17from __future__ import (
18 annotations,
19)
20import itertools
21import os
22import time
23from collections import (
24 defaultdict,
25)
26from collections.abc import (
27 Iterable,
28 Iterator,
29 Sequence,
30)
31from dataclasses import (
32 dataclass,
33 field,
34)
35from typing import (
36 TYPE_CHECKING,
37 Callable,
38 final,
39)
41from bzfs_main.argparse_cli import (
42 CMP_CHOICES_ITEMS,
43)
44from bzfs_main.detect import (
45 are_bookmarks_enabled,
46)
47from bzfs_main.filter import (
48 filter_snapshots,
49)
50from bzfs_main.parallel_batch_cmd import (
51 zfs_list_snapshots_in_parallel,
52)
53from bzfs_main.util.parallel_iterator import (
54 run_in_parallel,
55)
56from bzfs_main.util.utils import (
57 DIR_PERMISSIONS,
58 FILE_PERMISSIONS,
59 HashedInterner,
60 human_readable_bytes,
61 human_readable_duration,
62 isotime_from_unixtime,
63 open_nofollow,
64 relativize_dataset,
65)
67if TYPE_CHECKING: # pragma: no cover - for type hints only
68 from bzfs_main.bzfs import (
69 Job,
70 )
71 from bzfs_main.configuration import (
72 Remote,
73 )
76@dataclass(order=True, frozen=True)
77@final
78class _ComparableSnapshot:
79 """Snapshot entry comparable by rel_dataset and GUID for sorting and merging."""
81 key: tuple[str, str] # rel_dataset, guid
82 cols: list[str] = field(compare=False) # excluded from comparison/equality checks
85def run_compare_snapshot_lists(job: Job, src_datasets: list[str], dst_datasets: list[str]) -> None:
86 """Compares source and destination dataset trees recursively with respect to snapshots, for example to check if all
87 recently taken snapshots have been successfully replicated by a periodic job; implements --compare-snapshot-lists.
89 Lists snapshots only contained in source (tagged with 'src'), only contained in destination (tagged with 'dst'), and
90 contained in both source and destination (tagged with 'all'), in the form of a TSV file, along with other snapshot
91 metadata.
93 Implemented with a time and space efficient streaming algorithm; easily scales to millions of datasets and any number of
94 snapshots. Time complexity is O((N log N) + (N * M log M)) where N is the number of datasets and M is the number of
95 snapshots per dataset. Space complexity is O(max(N, M)). Assumes that both src_datasets and dst_datasets are sorted.
96 """
97 p, log = job.params, job.params.log
98 src, dst = p.src, p.dst
99 task: str = src.root_dataset + " vs. " + dst.root_dataset
100 tsv_dir: str = p.log_params.log_file[0 : -len(".log")] + ".cmp"
101 os.makedirs(tsv_dir, mode=DIR_PERMISSIONS, exist_ok=True)
102 tsv_file: str = os.path.join(tsv_dir, (src.root_dataset + "%" + dst.root_dataset).replace("/", "~") + ".tsv")
103 tmp_tsv_file: str = tsv_file + ".tmp"
104 compare_snapshot_lists: set[str] = set(p.compare_snapshot_lists.split("+"))
105 is_src_dst_all: bool = all(choice in compare_snapshot_lists for choice in CMP_CHOICES_ITEMS)
106 all_src_dst: list[str] = [loc for loc in ("all", "src", "dst") if loc in compare_snapshot_lists]
107 is_first_row: bool = True
108 now: int | None = None
110 def zfs_list_snapshot_iterator(r: Remote, sorted_datasets: list[str]) -> Iterator[str]:
111 """Lists snapshots sorted by dataset name; All snapshots of a given dataset will be adjacent."""
112 assert (not job.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted"
113 # Also see https://openzfs.github.io/openzfs-docs/man/master/7/zfsprops.7.html#written
114 props: str = job.creation_prefix + "creation,guid,createtxg,written,name"
115 types: str = "snapshot"
116 if p.use_bookmark and r.location == "src" and are_bookmarks_enabled(p, r):
117 types = "snapshot,bookmark" # output list ordering: intentionally makes bookmarks appear *after* snapshots
118 cmd: list[str] = p.split_args(f"{p.zfs_program} list -t {types} -d 1 -Hp -o {props}") # sorted by dataset, createtxg
119 for lines in zfs_list_snapshots_in_parallel(job, r, cmd, sorted_datasets):
120 yield from lines
122 def snapshot_iterator(root_dataset: str, sorted_itr: Iterator[str]) -> Iterator[_ComparableSnapshot]:
123 """Splits/groups snapshot stream into distinct datasets, sorts by GUID within a dataset such that any two snapshots
124 with the same GUID will lie adjacent to each other during the upcoming phase that merges src snapshots and dst
125 snapshots."""
126 # streaming group by dataset name (consumes constant memory only)
127 for dataset, group in itertools.groupby(
128 sorted_itr, key=lambda line: line.rsplit("\t", 1)[1].replace("#", "@", 1).split("@", 1)[0]
129 ):
130 snapshots: list[str] = list(group) # fetch all snapshots of current dataset, e.g. dataset=tank1/src/foo
131 snapshots = filter_snapshots(job, snapshots, filter_bookmarks=True) # apply include/exclude policy
132 snapshots.sort(key=lambda line: line.split("\t", 2)[1]) # stable sort by GUID (2nd remains createtxg)
133 rel_dataset: str = relativize_dataset(dataset, root_dataset) # rel_dataset=/foo, root_dataset=tank1/src
134 last_guid: str = ""
135 for line in snapshots:
136 cols = line.split("\t")
137 _creation, guid, _createtxg, _written, snapshot_name = cols
138 if guid == last_guid:
139 assert "#" in snapshot_name
140 continue # ignore bookmarks whose snapshot still exists. also ignore dupes of bookmarks
141 last_guid = guid
142 key = (rel_dataset, guid) # ensures src snapshots and dst snapshots with the same GUID will be adjacent
143 yield _ComparableSnapshot(key, cols)
145 def print_dataset(rel_dataset: str, entries: Iterable[tuple[str, _ComparableSnapshot]]) -> None:
146 entries = sorted( # fetch all snapshots of current dataset and sort em by creation, createtxg, snapshot_tag
147 entries,
148 key=lambda entry: (
149 int((cols := entry[1].cols)[0]), # creation
150 int(cols[2]), # createtxg
151 cols[-1].replace("#", "@", 1).split("@", 1)[1], # snapshot_tag
152 ),
153 )
155 @dataclass
156 @final
157 class SnapshotStats:
158 snapshot_count: int = field(default=0)
159 sum_written: int = field(default=0)
160 snapshot_count_since: int = field(default=0)
161 sum_written_since: int = field(default=0)
162 latest_snapshot_idx: int | None = field(default=None)
163 latest_snapshot_row_str: str | None = field(default=None)
164 latest_snapshot_creation: str | None = field(default=None)
165 oldest_snapshot_row_str: str | None = field(default=None)
166 oldest_snapshot_creation: str | None = field(default=None)
168 # print metadata of snapshots of current dataset to TSV file; custom stats can later be computed from there
169 stats: defaultdict[str, SnapshotStats] = defaultdict(SnapshotStats)
170 header: str = "location creation_iso createtxg rel_name guid root_dataset rel_dataset name creation written"
171 nonlocal is_first_row
172 if is_first_row:
173 fd.write(header.replace(" ", "\t") + "\n")
174 is_first_row = False
175 for i, entry in enumerate(entries): # entry is tuple[location:str, ComparableSnapshot]
176 location: str = entry[0] # "src" or "dst" or "all"
177 creation, guid, createtxg, written, name = entry[1].cols
178 root_dataset: str = dst.root_dataset if location == CMP_CHOICES_ITEMS[1] else src.root_dataset
179 rel_name: str = relativize_dataset(name, root_dataset)
180 creation_iso: str = isotime_from_unixtime(int(creation))
181 row = (location, creation_iso, createtxg, rel_name, guid, root_dataset, rel_dataset, name, creation, written)
182 # Example: src 2024-11-06_08:30:05 17435050 /foo@test_2024-11-06_08:30:05_daily 2406491805272097867 tank1/src /foo tank1/src/foo@test_2024-10-06_08:30:04_daily 1730878205 24576
183 row_str = "\t".join(row)
184 if not p.dry_run:
185 fd.write(row_str + "\n")
186 s = stats[location]
187 s.snapshot_count += 1
188 s.sum_written += int(written) if written != "-" else 0
189 s.latest_snapshot_idx = i
190 s.latest_snapshot_row_str = row_str
191 s.latest_snapshot_creation = creation
192 if not s.oldest_snapshot_row_str:
193 s.oldest_snapshot_row_str = row_str
194 s.oldest_snapshot_creation = creation
196 # for convenience, directly log basic summary stats of current dataset
197 k = stats["all"].latest_snapshot_idx # defaults to None
198 k = k if k is not None else -1
199 for entry in entries[k + 1 :]: # aggregate basic stats since latest common snapshot
200 location = entry[0]
201 creation, guid, createtxg, written, name = entry[1].cols
202 s = stats[location]
203 s.snapshot_count_since += 1
204 s.sum_written_since += int(written) if written != "-" else 0
205 prefix: str = f"Comparing {rel_dataset}~"
206 msgs: list[str] = []
207 msgs.append(f"{prefix} of {task}")
208 msgs.append(
209 f"{prefix} Q: No src snapshots are missing on dst, and no dst snapshots are missing on src, "
210 "and there is a common snapshot? A: "
211 + (
212 "n/a"
213 if not is_src_dst_all
214 else str(
215 stats["src"].snapshot_count == 0 and stats["dst"].snapshot_count == 0 and stats["all"].snapshot_count > 0
216 )
217 )
218 )
219 nonlocal now
220 now = now or round(time.time()) # uses the same timestamp across the entire dataset tree
221 latcom = "latest common snapshot"
222 for loc in all_src_dst:
223 s = stats[loc]
224 msgs.append(f"{prefix} Latest snapshot only in {loc}: {s.latest_snapshot_row_str or 'n/a'}")
225 msgs.append(f"{prefix} Oldest snapshot only in {loc}: {s.oldest_snapshot_row_str or 'n/a'}")
226 msgs.append(f"{prefix} Snapshots only in {loc}: {s.snapshot_count}")
227 msgs.append(f"{prefix} Snapshot data written only in {loc}: {human_readable_bytes(s.sum_written)}")
228 if loc != "all":
229 na = None if k >= 0 else "n/a"
230 msgs.append(f"{prefix} Snapshots only in {loc} since {latcom}: {na or s.snapshot_count_since}")
231 msgs.append(
232 f"{prefix} Snapshot data written only in {loc} since {latcom}: "
233 f"{na or human_readable_bytes(s.sum_written_since)}"
234 )
235 all_creation = stats["all"].latest_snapshot_creation
236 latest = ("latest", s.latest_snapshot_creation)
237 oldest = ("oldest", s.oldest_snapshot_creation)
238 for label, s_creation in latest, oldest:
239 if loc != "all":
240 hd = "n/a"
241 if s_creation and k >= 0:
242 assert all_creation is not None
243 hd = human_readable_duration(int(all_creation) - int(s_creation), unit="s")
244 msgs.append(f"{prefix} Time diff between {latcom} and {label} snapshot only in {loc}: {hd}")
245 for label, s_creation in latest, oldest:
246 hd = "n/a" if not s_creation else human_readable_duration(now - int(s_creation), unit="s")
247 msgs.append(f"{prefix} Time diff between now and {label} snapshot only in {loc}: {hd}")
248 log.info("%s", "\n".join(msgs))
250 # setup streaming pipeline
251 src_snapshot_itr: Iterator = snapshot_iterator(src.root_dataset, zfs_list_snapshot_iterator(src, src_datasets))
252 dst_snapshot_itr: Iterator = snapshot_iterator(dst.root_dataset, zfs_list_snapshot_iterator(dst, dst_datasets))
253 merge_itr = _merge_sorted_iterators(CMP_CHOICES_ITEMS, p.compare_snapshot_lists, src_snapshot_itr, dst_snapshot_itr)
255 interner: HashedInterner[str] = HashedInterner() # reduces memory footprint
256 rel_datasets: dict[str, set[str]] = defaultdict(set)
257 for datasets, remote in (src_datasets, src), (dst_datasets, dst):
258 for dataset in datasets: # rel_dataset=/foo, root_dataset=tank1/src
259 rel_datasets[remote.location].add(interner.intern(relativize_dataset(dataset, remote.root_dataset)))
260 rel_src_or_dst: list[str] = sorted(rel_datasets["src"].union(rel_datasets["dst"]))
262 log.debug("%s", f"Temporary TSV output file comparing {task} is: {tmp_tsv_file}")
263 with open_nofollow(tmp_tsv_file, "w", encoding="utf-8", perm=FILE_PERMISSIONS) as fd:
264 # streaming group by rel_dataset (consumes constant memory only); entry is a Tuple[str, ComparableSnapshot]
265 groups = itertools.groupby(merge_itr, key=lambda entry: entry[1].key[0])
266 _print_datasets(groups, lambda rel_ds, entries: print_dataset(rel_ds, entries), rel_src_or_dst)
267 os.rename(tmp_tsv_file, tsv_file)
268 log.info("%s", f"Final TSV output file comparing {task} is: {tsv_file}")
270 tsv_file = tsv_file[0 : tsv_file.rindex(".")] + ".rel_datasets_tsv"
271 tmp_tsv_file = tsv_file + ".tmp"
272 with open_nofollow(tmp_tsv_file, "w", encoding="utf-8", perm=FILE_PERMISSIONS) as fd:
273 header: str = "location rel_dataset src_dataset dst_dataset"
274 fd.write(header.replace(" ", "\t") + "\n")
275 src_only: set[str] = rel_datasets["src"].difference(rel_datasets["dst"])
276 dst_only: set[str] = rel_datasets["dst"].difference(rel_datasets["src"])
277 for rel_dataset in rel_src_or_dst:
278 loc = "src" if rel_dataset in src_only else "dst" if rel_dataset in dst_only else "all"
279 src_dataset = src.root_dataset + rel_dataset if rel_dataset not in dst_only else ""
280 dst_dataset = dst.root_dataset + rel_dataset if rel_dataset not in src_only else ""
281 row = (loc, rel_dataset, src_dataset, dst_dataset) # Example: all /foo/bar tank1/src/foo/bar tank2/dst/foo/bar
282 if not p.dry_run:
283 fd.write("\t".join(row) + "\n")
284 os.rename(tmp_tsv_file, tsv_file)
287def _print_datasets(groups: itertools.groupby, fn: Callable[[str, Iterable], None], rel_datasets: Iterable[str]) -> None:
288 """Iterate over grouped datasets and apply fn, adding gaps for missing ones."""
289 rel_datasets = sorted(rel_datasets)
290 n = len(rel_datasets)
291 i = 0
292 for rel_dataset, entries in groups:
293 while i < n and rel_datasets[i] < rel_dataset:
294 fn(rel_datasets[i], []) # Also print summary stats for datasets whose snapshot stream is empty
295 i += 1
296 assert i >= n or rel_datasets[i] == rel_dataset
297 i += 1
298 fn(rel_dataset, entries)
299 while i < n:
300 fn(rel_datasets[i], []) # Also print summary stats for datasets whose snapshot stream is empty
301 i += 1
304def _merge_sorted_iterators(
305 choices: Sequence[str], # ["src", "dst", "all"]
306 choice: str, # Example: "src+dst+all"
307 src_itr: Iterator[_ComparableSnapshot],
308 dst_itr: Iterator[_ComparableSnapshot],
309) -> Iterator[tuple[str, _ComparableSnapshot] | tuple[str, _ComparableSnapshot, _ComparableSnapshot]]:
310 """The typical pipelined merge algorithm of a merge sort, slightly adapted to our specific use case."""
311 assert len(choices) == 3
312 assert choice
313 flags: int = 0
314 for i, item in enumerate(choices):
315 if item in choice:
316 flags |= 1 << i
317 src_next, dst_next = run_in_parallel(lambda: next(src_itr, None), lambda: next(dst_itr, None))
318 while not (src_next is None and dst_next is None):
319 if src_next == dst_next:
320 n = 2
321 if (flags & (1 << n)) != 0:
322 assert src_next is not None
323 assert dst_next is not None
324 yield choices[n], src_next, dst_next
325 src_next = next(src_itr, None)
326 dst_next = next(dst_itr, None)
327 elif src_next is None or (dst_next is not None and dst_next < src_next):
328 n = 1
329 if (flags & (1 << n)) != 0:
330 assert dst_next is not None
331 yield choices[n], dst_next
332 dst_next = next(dst_itr, None)
333 else:
334 n = 0
335 if (flags & (1 << n)) != 0:
336 yield choices[n], src_next
337 src_next = next(src_itr, None)