Coverage for bzfs_main / filter.py: 100%
222 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 10:16 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 10:16 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""The filter algorithms that apply include/exclude policies are in filter_datasets() and filter_snapshots()."""
17from __future__ import (
18 annotations,
19)
20import math
21import os
22import re
23from collections.abc import (
24 Iterable,
25)
26from datetime import (
27 timedelta,
28)
29from typing import (
30 TYPE_CHECKING,
31 Final,
32 Optional,
33 Union,
34 cast,
35)
37from bzfs_main.util.utils import (
38 DONT_SKIP_DATASET,
39 LOG_DEBUG,
40 LOG_TRACE,
41 UNIX_TIME_INFINITY_SECS,
42 RegexList,
43 is_descendant,
44 is_included,
45 relativize_dataset,
46)
48if TYPE_CHECKING: # pragma: no cover - for type hints only
49 from bzfs_main.bzfs import (
50 Job,
51 )
52 from bzfs_main.configuration import (
53 Params,
54 Remote,
55 )
57# constants:
58SNAPSHOT_REGEX_FILTER_NAME: Final[str] = "snapshot_regex"
59SNAPSHOT_REGEX_FILTER_NAMES: Final[frozenset[str]] = frozenset({"include_snapshot_regex", "exclude_snapshot_regex"})
60SNAPSHOT_FILTERS_VAR: Final[str] = "snapshot_filters_var"
63UnixTimeRange = Optional[tuple[Union[timedelta, int], Union[timedelta, int]]] # Type alias
64RankRange = tuple[tuple[str, int, bool], tuple[str, int, bool]] # Type alias
67def filter_datasets(job: Job, remote: Remote, sorted_datasets: list[str]) -> list[str]:
68 """Returns all datasets (and their descendants) that match at least one of the include regexes but none of the exclude
69 regexes.
71 Assumes the list of input datasets is sorted. The list of output datasets will be sorted too.
72 """
73 assert (not job.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted"
74 p, log = job.params, job.params.log
75 results: list[str] = []
76 for i, dataset in enumerate(sorted_datasets):
77 if i == 0 and p.skip_parent:
78 continue
79 rel_dataset: str = relativize_dataset(dataset, remote.root_dataset)
80 if rel_dataset.startswith("/"):
81 rel_dataset = rel_dataset[1:] # strip leading '/' char if any
82 if is_included(rel_dataset, p.include_dataset_regexes, p.exclude_dataset_regexes):
83 results.append(dataset)
84 log.debug("Including b/c dataset regex: %s", dataset)
85 else:
86 log.debug("Excluding b/c dataset regex: %s", dataset)
87 if p.exclude_dataset_property:
88 results = _filter_datasets_by_exclude_property(job, remote, results)
89 is_debug: bool = p.log.isEnabledFor(LOG_DEBUG)
90 for dataset in results:
91 if is_debug:
92 log.debug(f"Finally included {remote.location} dataset: %s", dataset)
93 if job.is_test_mode:
94 assert results == sorted(results), "List is not sorted"
95 # Asserts the following: If a dataset is excluded its descendants are automatically excluded too, and this
96 # decision is never reconsidered even for the descendants because exclude takes precedence over include.
97 resultset: set[str] = set(results)
98 root_datasets: list[str] = [dataset for dataset in results if os.path.dirname(dataset) not in resultset] # no parent
99 for root in root_datasets: # each root is not a descendant of another dataset
100 assert not any(is_descendant(root, of_root_dataset=dataset) for dataset in results if dataset != root)
101 for dataset in results: # each dataset belongs to a subtree rooted at one of the roots
102 assert any(is_descendant(dataset, of_root_dataset=root) for root in root_datasets)
103 return results
106def _filter_datasets_by_exclude_property(job: Job, remote: Remote, sorted_datasets: list[str]) -> list[str]:
107 """Excludes datasets that are marked with a ZFS user property value that, in effect, says 'skip me'."""
108 p, log = job.params, job.params.log
109 results: list[str] = []
110 localhostname: str | None = None
111 skip_dataset: str = DONT_SKIP_DATASET
112 for dataset in sorted_datasets:
113 if is_descendant(dataset, of_root_dataset=skip_dataset):
114 # skip_dataset shall be ignored or has been deleted by some third party while we're running
115 continue # nothing to do anymore for this dataset subtree (note that datasets is sorted)
116 skip_dataset = DONT_SKIP_DATASET
117 # TODO perf: on zfs >= 2.3 use json via zfs list -j to safely merge all zfs list's into one 'zfs list' call
118 cmd = p.split_args(f"{p.zfs_program} list -t filesystem,volume -Hp -o {p.exclude_dataset_property}", dataset)
119 job.maybe_inject_delete(remote, dataset=dataset, delete_trigger="zfs_list_exclude_property")
120 property_value: str | None = job.try_ssh_command_with_retries(remote, LOG_TRACE, cmd=cmd)
121 if property_value is None:
122 log.warning(f"Third party deleted {remote.location}: %s", dataset)
123 skip_dataset = dataset
124 else:
125 reason: str = ""
126 property_value = property_value.strip()
127 sync: bool
128 if not property_value or property_value == "-" or property_value.lower() == "true":
129 sync = True
130 elif property_value.lower() == "false":
131 sync = False
132 else:
133 import socket # lazy import for startup perf
135 localhostname = localhostname or socket.gethostname()
136 sync = any(localhostname == hostname.strip() for hostname in property_value.split(","))
137 reason = f", localhostname: {localhostname}, hostnames: {property_value}"
139 if sync:
140 results.append(dataset)
141 log.debug("Including b/c dataset prop: %s%s", dataset, reason)
142 else:
143 skip_dataset = dataset
144 log.debug("Excluding b/c dataset prop: %s%s", dataset, reason)
145 return results
148def filter_snapshots(
149 job: Job, basis_snapshots: list[str], all_except: bool = False, filter_bookmarks: bool = False
150) -> list[str]:
151 """Returns all snapshots that pass all include/exclude policies.
153 Semantics: Within a single snapshot-filter group, filters are applied sequentially (logical AND). Across groups,
154 results are union-ized (logical OR). Set `all_except=True` to invert the final selection (retain-selected vs
155 delete-selected modes). Bookmarks: when `filter_bookmarks=False`, bookmark entries (with '#') are always retained to
156 assist common-snapshot detection; when `True`, bookmarks are subject to the same filters as snapshots.
157 """
159 def resolve_timerange(timerange: UnixTimeRange) -> UnixTimeRange:
160 """Converts relative timerange values to UTC Unix time in integer seconds."""
161 assert timerange is not None
162 lo, hi = timerange
163 if isinstance(lo, timedelta):
164 lo = math.ceil(current_unixtime_in_secs - lo.total_seconds())
165 if isinstance(hi, timedelta):
166 hi = math.ceil(current_unixtime_in_secs - hi.total_seconds())
167 assert isinstance(lo, int)
168 assert isinstance(hi, int)
169 return (lo, hi) if lo <= hi else (hi, lo)
171 p, log = job.params, job.params.log
172 current_unixtime_in_secs: float = p.create_src_snapshots_config.current_datetime.timestamp()
173 resultset: set[str] = set()
174 for snapshot_filter in p.snapshot_filters:
175 snapshots: list[str] = basis_snapshots
176 for _filter in snapshot_filter:
177 name: str = _filter.name
178 if name == SNAPSHOT_REGEX_FILTER_NAME:
179 snapshots = _filter_snapshots_by_regex(
180 job,
181 snapshots,
182 regexes=cast(tuple[RegexList, RegexList], _filter.options),
183 filter_bookmarks=filter_bookmarks,
184 )
185 elif name == "include_snapshot_times":
186 timerange = resolve_timerange(_filter.timerange) if _filter.timerange is not None else _filter.timerange
187 snapshots = _filter_snapshots_by_creation_time(
188 job, snapshots, include_snapshot_times=timerange, filter_bookmarks=filter_bookmarks
189 )
190 else:
191 assert name == "include_snapshot_times_and_ranks"
192 timerange = resolve_timerange(_filter.timerange) if _filter.timerange is not None else _filter.timerange
193 snapshots = _filter_snapshots_by_creation_time_and_rank(
194 job,
195 snapshots,
196 include_snapshot_times=timerange,
197 include_snapshot_ranks=cast(list[RankRange], _filter.options),
198 filter_bookmarks=filter_bookmarks,
199 )
200 resultset.update(snapshots) # union
202 no_f_bookmarks: bool = not filter_bookmarks
203 snapshots = [line for line in basis_snapshots if (no_f_bookmarks and "#" in line) or ((line in resultset) != all_except)]
204 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
205 for snapshot in snapshots:
206 if is_debug:
207 log.debug("Finally included snapshot: %s", snapshot[snapshot.rindex("\t") + 1 :])
208 return snapshots
211def _filter_snapshots_by_regex(
212 job: Job, snapshots: list[str], regexes: tuple[RegexList, RegexList], filter_bookmarks: bool = False
213) -> list[str]:
214 """Returns all snapshots that match at least one of the include regexes but none of the exclude regexes.
216 Precondition: Each line is TSV of the form ...guid\tname. Regexes are applied to the snapshot or bookmark tag portion
217 of `name` (after '@' or, if `filter_bookmarks=True`, after '#').
218 """
219 exclude_snapshot_regexes, include_snapshot_regexes = regexes
220 log = job.params.log
221 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
222 results: list[str] = []
223 for snapshot in snapshots:
224 i = snapshot.find("@") # snapshot separator
225 if i < 0 and filter_bookmarks:
226 i = snapshot.index("#") # bookmark separator
227 if i < 0:
228 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots
229 elif is_included(snapshot[i + 1 :], include_snapshot_regexes, exclude_snapshot_regexes):
230 results.append(snapshot)
231 if is_debug:
232 log.debug("Including b/c snapshot regex: %s", snapshot[snapshot.rindex("\t") + 1 :])
233 else:
234 if is_debug:
235 log.debug("Excluding b/c snapshot regex: %s", snapshot[snapshot.rindex("\t") + 1 :])
236 return results
239def _filter_snapshots_by_creation_time(
240 job: Job, snapshots: list[str], include_snapshot_times: UnixTimeRange, filter_bookmarks: bool = False
241) -> list[str]:
242 """Filters snapshots to those created within the specified time window.
244 Precondition: Each line is TSV of the form creation\t...\tname. The creation column (first field) is compared against
245 [lo, hi). Bookmarks are skipped unless `filter_bookmarks=True`.
246 """
247 log = job.params.log
248 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
249 lo_snaptime, hi_snaptime = include_snapshot_times or (0, UNIX_TIME_INFINITY_SECS)
250 assert isinstance(lo_snaptime, int)
251 assert isinstance(hi_snaptime, int)
252 results: list[str] = []
253 for snapshot in snapshots:
254 if (not filter_bookmarks) and "@" not in snapshot:
255 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots
256 elif lo_snaptime <= int(snapshot[: snapshot.index("\t")]) < hi_snaptime:
257 results.append(snapshot)
258 if is_debug:
259 log.debug("Including b/c creation time: %s", snapshot[snapshot.rindex("\t") + 1 :])
260 else:
261 if is_debug:
262 log.debug("Excluding b/c creation time: %s", snapshot[snapshot.rindex("\t") + 1 :])
263 return results
266def _filter_snapshots_by_creation_time_and_rank(
267 job: Job,
268 snapshots: list[str],
269 include_snapshot_times: UnixTimeRange,
270 include_snapshot_ranks: list[RankRange],
271 filter_bookmarks: bool = False,
272) -> list[str]:
273 """Filters by creation time and rank within the snapshot list.
275 Precondition: Each line is TSV of the form creation\t...\tname. The creation column (first field) is compared against
276 [lo, hi). Bookmarks are skipped unless `filter_bookmarks=True`.
277 """
279 def get_idx(rank: tuple[str, int, bool], n: int) -> int:
280 """Returns index for rank tuple (kind, value, percent)."""
281 kind, num, is_percent = rank
282 m = round(n * num / 100) if is_percent else min(n, num)
283 assert kind == "latest" or kind == "oldest"
284 return m if kind == "oldest" else n - m
286 assert isinstance(include_snapshot_ranks, list)
287 assert len(include_snapshot_ranks) > 0
288 log = job.params.log
289 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
290 lo_time, hi_time = include_snapshot_times or (0, UNIX_TIME_INFINITY_SECS)
291 assert isinstance(lo_time, int)
292 assert isinstance(hi_time, int)
293 n = sum(1 for snapshot in snapshots if "@" in snapshot)
294 for rank_range in include_snapshot_ranks:
295 lo_rank, hi_rank = rank_range
296 lo: int = get_idx(lo_rank, n)
297 hi: int = get_idx(hi_rank, n)
298 lo, hi = (lo, hi) if lo <= hi else (hi, lo)
299 i: int = 0
300 k: int = 0
301 results: list[str] = []
302 for snapshot in snapshots:
303 is_snapshot = "@" in snapshot
304 if (not filter_bookmarks) and not is_snapshot:
305 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots
306 else:
307 msg = None
308 if is_snapshot and lo <= i < hi:
309 msg = "Including b/c snapshot rank: %s"
310 elif lo_time <= int(snapshot[: snapshot.index("\t")]) < hi_time:
311 msg = "Including b/c creation time: %s"
312 if msg:
313 results.append(snapshot)
314 k += 1 if is_snapshot else 0
315 else:
316 msg = "Excluding b/c snapshot rank: %s"
317 if is_debug:
318 log.debug(msg, snapshot[snapshot.rindex("\t") + 1 :])
319 i += 1 if is_snapshot else 0
320 snapshots = results
321 n = k
322 return snapshots
325def filter_properties(
326 p: Params, props: dict[str, str | None], include_regexes: RegexList, exclude_regexes: RegexList
327) -> dict[str, str | None]:
328 """Returns ZFS props whose name matches at least one of the include regexes but none of the exclude regexes."""
329 log = p.log
330 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
331 results: dict[str, str | None] = {}
332 for propname, propvalue in props.items():
333 if is_included(propname, include_regexes, exclude_regexes):
334 results[propname] = propvalue
335 if is_debug:
336 log.debug("Including b/c property regex: %s", propname)
337 else:
338 if is_debug:
339 log.debug("Excluding b/c property regex: %s", propname)
340 return results
343def filter_lines(input_list: Iterable[str], input_set: set[str]) -> list[str]:
344 """For each line in input_list, includes the line if input_set contains the first column field of that line."""
345 if len(input_set) == 0:
346 return []
347 return [line for line in input_list if line[: line.index("\t")] in input_set]
350def filter_lines_except(input_list: list[str], input_set: set[str]) -> list[str]:
351 """For each line in input_list, includes the line if input_set does not contain the first column field of that line."""
352 if len(input_set) == 0:
353 return input_list
354 return [line for line in input_list if line[: line.index("\t")] not in input_set]
357def dataset_regexes(src: Remote, dst: Remote, datasets: list[str]) -> list[str]:
358 """Converts dataset paths to regex strings relative to src or dst roots."""
359 results: list[str] = []
360 for dataset in datasets:
361 if dataset.startswith("/"):
362 # it's an absolute dataset - convert it to a relative dataset
363 dataset = dataset[1:]
364 if is_descendant(dataset, of_root_dataset=src.root_dataset):
365 dataset = relativize_dataset(dataset, src.root_dataset)
366 elif is_descendant(dataset, of_root_dataset=dst.root_dataset):
367 dataset = relativize_dataset(dataset, dst.root_dataset)
368 else:
369 continue # ignore datasets that make no difference
370 if dataset.startswith("/"):
371 dataset = dataset[1:]
372 if dataset.endswith("/"):
373 dataset = dataset[0:-1]
374 regex: str
375 if dataset:
376 regex = re.escape(dataset)
377 else:
378 regex = ".*"
379 results.append(regex)
380 return results