Coverage for bzfs_main/compare_snapshot_lists.py: 100%
207 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Implementation of bzfs --compare-snapshot-lists."""
17from __future__ import annotations
18import itertools
19import os
20import time
21from collections import defaultdict
22from dataclasses import dataclass, field
23from typing import (
24 TYPE_CHECKING,
25 Callable,
26 Generator,
27 Iterable,
28 Iterator,
29 Sequence,
30)
32from bzfs_main.argparse_cli import (
33 CMP_CHOICES_ITEMS,
34)
35from bzfs_main.detect import (
36 are_bookmarks_enabled,
37 is_solaris_zfs,
38)
39from bzfs_main.filter import (
40 filter_snapshots,
41)
42from bzfs_main.parallel_batch_cmd import (
43 zfs_list_snapshots_in_parallel,
44)
45from bzfs_main.parallel_iterator import (
46 run_in_parallel,
47)
48from bzfs_main.utils import (
49 FILE_PERMISSIONS,
50 Interner,
51 T,
52 human_readable_bytes,
53 human_readable_duration,
54 isotime_from_unixtime,
55 open_nofollow,
56 relativize_dataset,
57)
59if TYPE_CHECKING: # pragma: no cover - for type hints only
60 from bzfs_main.bzfs import Job
61 from bzfs_main.configuration import Remote
64@dataclass(order=True, frozen=True)
65class ComparableSnapshot:
66 """Snapshot entry comparable by rel_dataset and GUID for sorting and merging."""
68 key: tuple[str, str] # rel_dataset, guid
69 cols: list[str] = field(compare=False)
72def run_compare_snapshot_lists(job: Job, src_datasets: list[str], dst_datasets: list[str]) -> None:
73 """Compares source and destination dataset trees recursively with respect to snapshots, for example to check if all
74 recently taken snapshots have been successfully replicated by a periodic job; implements --compare-snapshot-lists.
76 Lists snapshots only contained in source (tagged with 'src'), only contained in destination (tagged with 'dst'), and
77 contained in both source and destination (tagged with 'all'), in the form of a TSV file, along with other snapshot
78 metadata. Implemented with a time and space efficient streaming algorithm; easily scales to millions of datasets and any
79 number of snapshots. Assumes that both src_datasets and dst_datasets are sorted.
80 """
81 p, log = job.params, job.params.log
82 src, dst = p.src, p.dst
83 task: str = src.root_dataset + " vs. " + dst.root_dataset
84 tsv_dir: str = p.log_params.log_file[0 : -len(".log")] + ".cmp"
85 os.makedirs(tsv_dir, exist_ok=True)
86 tsv_file: str = os.path.join(tsv_dir, (src.root_dataset + "%" + dst.root_dataset).replace("/", "~") + ".tsv")
87 tmp_tsv_file: str = tsv_file + ".tmp"
88 compare_snapshot_lists: set[str] = set(p.compare_snapshot_lists.split("+"))
89 is_src_dst_all: bool = all(choice in compare_snapshot_lists for choice in CMP_CHOICES_ITEMS)
90 all_src_dst: list[str] = [loc for loc in ("all", "src", "dst") if loc in compare_snapshot_lists]
91 is_first_row: bool = True
92 now: int | None = None
94 def zfs_list_snapshot_iterator(r: Remote, sorted_datasets: list[str]) -> Generator[str, None, None]:
95 """Lists snapshots sorted by dataset name; All snapshots of a given dataset will be adjacent."""
96 assert (not job.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted"
97 written_zfs_prop: str = "written" # https://openzfs.github.io/openzfs-docs/man/master/7/zfsprops.7.html#written
98 if is_solaris_zfs(p, r): # solaris-11.4 zfs does not know the "written" ZFS snapshot property
99 written_zfs_prop = "type" # for simplicity, fill in the non-integer dummy constant type="snapshot"
100 props: str = job.creation_prefix + f"creation,guid,createtxg,{written_zfs_prop},name"
101 types: str = "snapshot"
102 if p.use_bookmark and r.location == "src" and are_bookmarks_enabled(p, r):
103 types = "snapshot,bookmark" # output list ordering: intentionally makes bookmarks appear *after* snapshots
104 cmd: list[str] = p.split_args(f"{p.zfs_program} list -t {types} -d 1 -Hp -o {props}") # sorted by dataset, createtxg
105 for lines in zfs_list_snapshots_in_parallel(job, r, cmd, sorted_datasets):
106 yield from lines
108 def snapshot_iterator(
109 root_dataset: str, sorted_itr: Generator[str, None, None]
110 ) -> Generator[ComparableSnapshot, None, None]:
111 """Splits/groups snapshot stream into distinct datasets, sorts by GUID within a dataset such that any two snapshots
112 with the same GUID will lie adjacent to each other during the upcoming phase that merges src snapshots and dst
113 snapshots."""
114 # streaming group by dataset name (consumes constant memory only)
115 for dataset, group in itertools.groupby(
116 sorted_itr, key=lambda line: line[line.rindex("\t") + 1 : line.replace("#", "@").index("@")]
117 ):
118 snapshots: list[str] = list(group) # fetch all snapshots of current dataset, e.g. dataset=tank1/src/foo
119 snapshots = filter_snapshots(job, snapshots, filter_bookmarks=True) # apply include/exclude policy
120 snapshots.sort(key=lambda line: line.split("\t", 2)[1]) # stable sort by GUID (2nd remains createtxg)
121 rel_dataset: str = relativize_dataset(dataset, root_dataset) # rel_dataset=/foo, root_dataset=tank1/src
122 last_guid: str = ""
123 for line in snapshots:
124 cols = line.split("\t")
125 creation, guid, createtxg, written, snapshot_name = cols
126 if guid == last_guid:
127 assert "#" in snapshot_name
128 continue # ignore bookmarks whose snapshot still exists. also ignore dupes of bookmarks
129 last_guid = guid
130 if written == "snapshot":
131 written = "-" # sanitize solaris-11.4 work-around (solaris-11.4 also has no bookmark feature)
132 cols = [creation, guid, createtxg, written, snapshot_name]
133 key = (rel_dataset, guid) # ensures src snapshots and dst snapshots with the same GUID will be adjacent
134 yield ComparableSnapshot(key, cols)
136 def print_dataset(rel_dataset: str, entries: Iterable[tuple[str, ComparableSnapshot]]) -> None:
137 entries = sorted( # fetch all snapshots of current dataset and sort em by creation, createtxg, snapshot_tag
138 entries,
139 key=lambda entry: (
140 int((cols := entry[1].cols)[0]),
141 int(cols[2]),
142 (snapshot_name := cols[-1])[snapshot_name.replace("#", "@").index("@") + 1 :],
143 ),
144 )
146 @dataclass
147 class SnapshotStats:
148 snapshot_count: int = field(default=0)
149 sum_written: int = field(default=0)
150 snapshot_count_since: int = field(default=0)
151 sum_written_since: int = field(default=0)
152 latest_snapshot_idx: int | None = field(default=None)
153 latest_snapshot_row_str: str | None = field(default=None)
154 latest_snapshot_creation: str | None = field(default=None)
155 oldest_snapshot_row_str: str | None = field(default=None)
156 oldest_snapshot_creation: str | None = field(default=None)
158 # print metadata of snapshots of current dataset to TSV file; custom stats can later be computed from there
159 stats: defaultdict[str, SnapshotStats] = defaultdict(SnapshotStats)
160 header: str = "location creation_iso createtxg rel_name guid root_dataset rel_dataset name creation written"
161 nonlocal is_first_row
162 if is_first_row:
163 fd.write(header.replace(" ", "\t") + "\n")
164 is_first_row = False
165 for i, entry in enumerate(entries): # entry is tuple[location:str, ComparableSnapshot]
166 location: str = entry[0] # "src" or "dst" or "all"
167 creation, guid, createtxg, written, name = entry[1].cols
168 root_dataset: str = dst.root_dataset if location == CMP_CHOICES_ITEMS[1] else src.root_dataset
169 rel_name: str = relativize_dataset(name, root_dataset)
170 creation_iso: str = isotime_from_unixtime(int(creation))
171 row = (location, creation_iso, createtxg, rel_name, guid, root_dataset, rel_dataset, name, creation, written)
172 # Example: src 2024-11-06_08:30:05 17435050 /foo@test_2024-11-06_08:30:05_daily 2406491805272097867 tank1/src /foo tank1/src/foo@test_2024-10-06_08:30:04_daily 1730878205 24576
173 row_str = "\t".join(row)
174 if not p.dry_run:
175 fd.write(row_str + "\n")
176 s = stats[location]
177 s.snapshot_count += 1
178 s.sum_written += int(written) if written != "-" else 0
179 s.latest_snapshot_idx = i
180 s.latest_snapshot_row_str = row_str
181 s.latest_snapshot_creation = creation
182 if not s.oldest_snapshot_row_str:
183 s.oldest_snapshot_row_str = row_str
184 s.oldest_snapshot_creation = creation
186 # for convenience, directly log basic summary stats of current dataset
187 k = stats["all"].latest_snapshot_idx # defaults to None
188 k = k if k is not None else -1
189 for entry in entries[k + 1 :]: # aggregate basic stats since latest common snapshot
190 location = entry[0]
191 creation, guid, createtxg, written, name = entry[1].cols
192 s = stats[location]
193 s.snapshot_count_since += 1
194 s.sum_written_since += int(written) if written != "-" else 0
195 prefix: str = f"Comparing {rel_dataset}~"
196 msgs: list[str] = []
197 msgs.append(f"{prefix} of {task}")
198 msgs.append(
199 f"{prefix} Q: No src snapshots are missing on dst, and no dst snapshots are missing on src, "
200 "and there is a common snapshot? A: "
201 + (
202 "n/a"
203 if not is_src_dst_all
204 else str(
205 stats["src"].snapshot_count == 0 and stats["dst"].snapshot_count == 0 and stats["all"].snapshot_count > 0
206 )
207 )
208 )
209 nonlocal now
210 now = now or round(time.time()) # uses the same timestamp across the entire dataset tree
211 latcom = "latest common snapshot"
212 for loc in all_src_dst:
213 s = stats[loc]
214 msgs.append(f"{prefix} Latest snapshot only in {loc}: {s.latest_snapshot_row_str or 'n/a'}")
215 msgs.append(f"{prefix} Oldest snapshot only in {loc}: {s.oldest_snapshot_row_str or 'n/a'}")
216 msgs.append(f"{prefix} Snapshots only in {loc}: {s.snapshot_count}")
217 msgs.append(f"{prefix} Snapshot data written only in {loc}: {human_readable_bytes(s.sum_written)}")
218 if loc != "all":
219 na = None if k >= 0 else "n/a"
220 msgs.append(f"{prefix} Snapshots only in {loc} since {latcom}: {na or s.snapshot_count_since}")
221 msgs.append(
222 f"{prefix} Snapshot data written only in {loc} since {latcom}: "
223 f"{na or human_readable_bytes(s.sum_written_since)}"
224 )
225 all_creation = stats["all"].latest_snapshot_creation
226 latest = ("latest", s.latest_snapshot_creation)
227 oldest = ("oldest", s.oldest_snapshot_creation)
228 for label, s_creation in latest, oldest:
229 if loc != "all":
230 hd = "n/a"
231 if s_creation and k >= 0:
232 assert all_creation is not None
233 hd = human_readable_duration(int(all_creation) - int(s_creation), unit="s")
234 msgs.append(f"{prefix} Time diff between {latcom} and {label} snapshot only in {loc}: {hd}")
235 for label, s_creation in latest, oldest:
236 hd = "n/a" if not s_creation else human_readable_duration(now - int(s_creation), unit="s")
237 msgs.append(f"{prefix} Time diff between now and {label} snapshot only in {loc}: {hd}")
238 log.info("%s", "\n".join(msgs))
240 # setup streaming pipeline
241 src_snapshot_itr: Iterator = snapshot_iterator(src.root_dataset, zfs_list_snapshot_iterator(src, src_datasets))
242 dst_snapshot_itr: Iterator = snapshot_iterator(dst.root_dataset, zfs_list_snapshot_iterator(dst, dst_datasets))
243 merge_itr = _merge_sorted_iterators(CMP_CHOICES_ITEMS, p.compare_snapshot_lists, src_snapshot_itr, dst_snapshot_itr)
245 interner: Interner[str] = Interner() # reduces memory footprint
246 rel_datasets: dict[str, set[str]] = defaultdict(set)
247 for datasets, remote in (src_datasets, src), (dst_datasets, dst):
248 for dataset in datasets: # rel_dataset=/foo, root_dataset=tank1/src
249 rel_datasets[remote.location].add(interner.intern(relativize_dataset(dataset, remote.root_dataset)))
250 rel_src_or_dst: list[str] = sorted(rel_datasets["src"].union(rel_datasets["dst"]))
252 log.debug("%s", f"Temporary TSV output file comparing {task} is: {tmp_tsv_file}")
253 with open_nofollow(tmp_tsv_file, "w", encoding="utf-8", perm=FILE_PERMISSIONS) as fd:
254 # streaming group by rel_dataset (consumes constant memory only); entry is a Tuple[str, ComparableSnapshot]
255 group = itertools.groupby(merge_itr, key=lambda entry: entry[1].key[0])
256 _print_datasets(group, lambda rel_ds, entries: print_dataset(rel_ds, entries), rel_src_or_dst)
257 os.rename(tmp_tsv_file, tsv_file)
258 log.info("%s", f"Final TSV output file comparing {task} is: {tsv_file}")
260 tsv_file = tsv_file[0 : tsv_file.rindex(".")] + ".rel_datasets_tsv"
261 tmp_tsv_file = tsv_file + ".tmp"
262 with open_nofollow(tmp_tsv_file, "w", encoding="utf-8", perm=FILE_PERMISSIONS) as fd:
263 header: str = "location rel_dataset src_dataset dst_dataset"
264 fd.write(header.replace(" ", "\t") + "\n")
265 src_only: set[str] = rel_datasets["src"].difference(rel_datasets["dst"])
266 dst_only: set[str] = rel_datasets["dst"].difference(rel_datasets["src"])
267 for rel_dataset in rel_src_or_dst:
268 loc = "src" if rel_dataset in src_only else "dst" if rel_dataset in dst_only else "all"
269 src_dataset = src.root_dataset + rel_dataset if rel_dataset not in dst_only else ""
270 dst_dataset = dst.root_dataset + rel_dataset if rel_dataset not in src_only else ""
271 row = (loc, rel_dataset, src_dataset, dst_dataset) # Example: all /foo/bar tank1/src/foo/bar tank2/dst/foo/bar
272 if not p.dry_run:
273 fd.write("\t".join(row) + "\n")
274 os.rename(tmp_tsv_file, tsv_file)
277def _print_datasets(group: itertools.groupby, fn: Callable[[str, Iterable], None], rel_datasets: Iterable[str]) -> None:
278 """Iterate over grouped datasets and apply fn, adding gaps for missing ones."""
279 rel_datasets = sorted(rel_datasets)
280 n = len(rel_datasets)
281 i = 0
282 for rel_dataset, entries in group:
283 while i < n and rel_datasets[i] < rel_dataset:
284 fn(rel_datasets[i], []) # Also print summary stats for datasets whose snapshot stream is empty
285 i += 1
286 assert i >= n or rel_datasets[i] == rel_dataset
287 i += 1
288 fn(rel_dataset, entries)
289 while i < n:
290 fn(rel_datasets[i], []) # Also print summary stats for datasets whose snapshot stream is empty
291 i += 1
294def _merge_sorted_iterators(
295 choices: Sequence[str], # ["src", "dst", "all"]
296 choice: str, # Example: "src+dst+all"
297 src_itr: Iterator[T],
298 dst_itr: Iterator[T],
299) -> Generator[tuple[str, T] | tuple[str, T, T], None, None]:
300 """The typical pipelined merge algorithm of a merge sort, slightly adapted to our specific use case."""
301 assert len(choices) == 3
302 assert choice
303 flags: int = 0
304 for i, item in enumerate(choices):
305 if item in choice:
306 flags |= 1 << i
307 src_next, dst_next = run_in_parallel(lambda: next(src_itr, None), lambda: next(dst_itr, None))
308 while not (src_next is None and dst_next is None):
309 if src_next == dst_next:
310 n = 2
311 if (flags & (1 << n)) != 0:
312 assert src_next is not None
313 assert dst_next is not None
314 yield choices[n], src_next, dst_next
315 src_next = next(src_itr, None)
316 dst_next = next(dst_itr, None)
317 elif src_next is None or (dst_next is not None and dst_next < src_next):
318 n = 1
319 if (flags & (1 << n)) != 0:
320 assert dst_next is not None
321 yield choices[n], dst_next
322 dst_next = next(dst_itr, None)
323 else:
324 n = 0
325 if (flags & (1 << n)) != 0:
326 yield choices[n], src_next
327 src_next = next(src_itr, None)