Coverage for bzfs_main / filter.py: 100%
220 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 08:03 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""The filter algorithms that apply include/exclude policies are in filter_datasets() and filter_snapshots()."""
17from __future__ import (
18 annotations,
19)
20import math
21import os
22import re
23from collections.abc import (
24 Iterable,
25)
26from datetime import (
27 timedelta,
28)
29from typing import (
30 TYPE_CHECKING,
31 Final,
32 Optional,
33 Union,
34)
36from bzfs_main.util.utils import (
37 DONT_SKIP_DATASET,
38 LOG_DEBUG,
39 LOG_TRACE,
40 UNIX_TIME_INFINITY_SECS,
41 RegexList,
42 is_descendant,
43 is_included,
44 relativize_dataset,
45)
47if TYPE_CHECKING: # pragma: no cover - for type hints only
48 from bzfs_main.bzfs import (
49 Job,
50 )
51 from bzfs_main.configuration import (
52 Params,
53 Remote,
54 )
56# constants:
57SNAPSHOT_REGEX_FILTER_NAME: Final[str] = "snapshot_regex"
58SNAPSHOT_REGEX_FILTER_NAMES: Final[frozenset[str]] = frozenset({"include_snapshot_regex", "exclude_snapshot_regex"})
59SNAPSHOT_FILTERS_VAR: Final[str] = "snapshot_filters_var"
62UnixTimeRange = Optional[tuple[Union[timedelta, int], Union[timedelta, int]]] # Type alias
63RankRange = tuple[tuple[str, int, bool], tuple[str, int, bool]] # Type alias
66def filter_datasets(job: Job, remote: Remote, sorted_datasets: list[str]) -> list[str]:
67 """Returns all datasets (and their descendants) that match at least one of the include regexes but none of the exclude
68 regexes.
70 Assumes the list of input datasets is sorted. The list of output datasets will be sorted too.
71 """
72 assert (not job.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted"
73 p, log = job.params, job.params.log
74 results: list[str] = []
75 for i, dataset in enumerate(sorted_datasets):
76 if i == 0 and p.skip_parent:
77 continue
78 rel_dataset: str = relativize_dataset(dataset, remote.root_dataset)
79 if rel_dataset.startswith("/"):
80 rel_dataset = rel_dataset[1:] # strip leading '/' char if any
81 if is_included(rel_dataset, p.include_dataset_regexes, p.exclude_dataset_regexes):
82 results.append(dataset)
83 log.debug("Including b/c dataset regex: %s", dataset)
84 else:
85 log.debug("Excluding b/c dataset regex: %s", dataset)
86 if p.exclude_dataset_property:
87 results = _filter_datasets_by_exclude_property(job, remote, results)
88 is_debug: bool = p.log.isEnabledFor(LOG_DEBUG)
89 for dataset in results:
90 if is_debug:
91 log.debug(f"Finally included {remote.location} dataset: %s", dataset)
92 if job.is_test_mode:
93 assert results == sorted(results), "List is not sorted"
94 # Asserts the following: If a dataset is excluded its descendants are automatically excluded too, and this
95 # decision is never reconsidered even for the descendants because exclude takes precedence over include.
96 resultset: set[str] = set(results)
97 root_datasets: list[str] = [dataset for dataset in results if os.path.dirname(dataset) not in resultset] # no parent
98 for root in root_datasets: # each root is not a descendant of another dataset
99 assert not any(is_descendant(root, of_root_dataset=dataset) for dataset in results if dataset != root)
100 for dataset in results: # each dataset belongs to a subtree rooted at one of the roots
101 assert any(is_descendant(dataset, of_root_dataset=root) for root in root_datasets)
102 return results
105def _filter_datasets_by_exclude_property(job: Job, remote: Remote, sorted_datasets: list[str]) -> list[str]:
106 """Excludes datasets that are marked with a ZFS user property value that, in effect, says 'skip me'."""
107 p, log = job.params, job.params.log
108 results: list[str] = []
109 localhostname: str | None = None
110 skip_dataset: str = DONT_SKIP_DATASET
111 for dataset in sorted_datasets:
112 if is_descendant(dataset, of_root_dataset=skip_dataset):
113 # skip_dataset shall be ignored or has been deleted by some third party while we're running
114 continue # nothing to do anymore for this dataset subtree (note that datasets is sorted)
115 skip_dataset = DONT_SKIP_DATASET
116 # TODO perf: on zfs >= 2.3 use json via zfs list -j to safely merge all zfs list's into one 'zfs list' call
117 cmd = p.split_args(f"{p.zfs_program} list -t filesystem,volume -Hp -o {p.exclude_dataset_property}", dataset)
118 job.maybe_inject_delete(remote, dataset=dataset, delete_trigger="zfs_list_exclude_property")
119 property_value: str | None = job.try_ssh_command_with_retries(remote, LOG_TRACE, cmd=cmd)
120 if property_value is None:
121 log.warning(f"Third party deleted {remote.location}: %s", dataset)
122 skip_dataset = dataset
123 else:
124 reason: str = ""
125 property_value = property_value.strip()
126 sync: bool
127 if not property_value or property_value == "-" or property_value.lower() == "true":
128 sync = True
129 elif property_value.lower() == "false":
130 sync = False
131 else:
132 import socket # lazy import for startup perf
134 localhostname = localhostname or socket.gethostname()
135 sync = any(localhostname == hostname.strip() for hostname in property_value.split(","))
136 reason = f", localhostname: {localhostname}, hostnames: {property_value}"
138 if sync:
139 results.append(dataset)
140 log.debug("Including b/c dataset prop: %s%s", dataset, reason)
141 else:
142 skip_dataset = dataset
143 log.debug("Excluding b/c dataset prop: %s%s", dataset, reason)
144 return results
147def filter_snapshots(
148 job: Job, basis_snapshots: list[str], all_except: bool = False, filter_bookmarks: bool = False
149) -> list[str]:
150 """Returns all snapshots that pass all include/exclude policies.
152 Semantics: Within a single snapshot-filter group, filters are applied sequentially (logical AND). Across groups,
153 results are union-ized (logical OR). Set `all_except=True` to invert the final selection (retain-selected vs
154 delete-selected modes). Bookmarks: when `filter_bookmarks=False`, bookmark entries (with '#') are always retained to
155 assist common-snapshot detection; when `True`, bookmarks are subject to the same filters as snapshots.
156 """
158 def resolve_timerange(timerange: UnixTimeRange) -> UnixTimeRange:
159 """Converts relative timerange values to UTC Unix time in integer seconds."""
160 assert timerange is not None
161 lo, hi = timerange
162 if isinstance(lo, timedelta):
163 lo = math.ceil(current_unixtime_in_secs - lo.total_seconds())
164 if isinstance(hi, timedelta):
165 hi = math.ceil(current_unixtime_in_secs - hi.total_seconds())
166 assert isinstance(lo, int)
167 assert isinstance(hi, int)
168 return (lo, hi) if lo <= hi else (hi, lo)
170 p, log = job.params, job.params.log
171 current_unixtime_in_secs: float = p.create_src_snapshots_config.current_datetime.timestamp()
172 resultset: set[str] = set()
173 for snapshot_filter in p.snapshot_filters:
174 snapshots: list[str] = basis_snapshots
175 for _filter in snapshot_filter:
176 name: str = _filter.name
177 if name == SNAPSHOT_REGEX_FILTER_NAME:
178 snapshots = _filter_snapshots_by_regex(
179 job, snapshots, regexes=_filter.options, filter_bookmarks=filter_bookmarks
180 )
181 elif name == "include_snapshot_times":
182 timerange = resolve_timerange(_filter.timerange) if _filter.timerange is not None else _filter.timerange
183 snapshots = _filter_snapshots_by_creation_time(
184 job, snapshots, include_snapshot_times=timerange, filter_bookmarks=filter_bookmarks
185 )
186 else:
187 assert name == "include_snapshot_times_and_ranks"
188 timerange = resolve_timerange(_filter.timerange) if _filter.timerange is not None else _filter.timerange
189 snapshots = _filter_snapshots_by_creation_time_and_rank(
190 job,
191 snapshots,
192 include_snapshot_times=timerange,
193 include_snapshot_ranks=_filter.options,
194 filter_bookmarks=filter_bookmarks,
195 )
196 resultset.update(snapshots) # union
198 no_f_bookmarks: bool = not filter_bookmarks
199 snapshots = [line for line in basis_snapshots if (no_f_bookmarks and "#" in line) or ((line in resultset) != all_except)]
200 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
201 for snapshot in snapshots:
202 if is_debug:
203 log.debug("Finally included snapshot: %s", snapshot[snapshot.rindex("\t") + 1 :])
204 return snapshots
207def _filter_snapshots_by_regex(
208 job: Job, snapshots: list[str], regexes: tuple[RegexList, RegexList], filter_bookmarks: bool = False
209) -> list[str]:
210 """Returns all snapshots that match at least one of the include regexes but none of the exclude regexes.
212 Precondition: Each line is TSV of the form ...guid\tname. Regexes are applied to the snapshot or bookmark tag portion
213 of `name` (after '@' or, if `filter_bookmarks=True`, after '#').
214 """
215 exclude_snapshot_regexes, include_snapshot_regexes = regexes
216 log = job.params.log
217 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
218 results: list[str] = []
219 for snapshot in snapshots:
220 i = snapshot.find("@") # snapshot separator
221 if i < 0 and filter_bookmarks:
222 i = snapshot.index("#") # bookmark separator
223 if i < 0:
224 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots
225 elif is_included(snapshot[i + 1 :], include_snapshot_regexes, exclude_snapshot_regexes):
226 results.append(snapshot)
227 if is_debug:
228 log.debug("Including b/c snapshot regex: %s", snapshot[snapshot.rindex("\t") + 1 :])
229 else:
230 if is_debug:
231 log.debug("Excluding b/c snapshot regex: %s", snapshot[snapshot.rindex("\t") + 1 :])
232 return results
235def _filter_snapshots_by_creation_time(
236 job: Job, snapshots: list[str], include_snapshot_times: UnixTimeRange, filter_bookmarks: bool = False
237) -> list[str]:
238 """Filters snapshots to those created within the specified time window.
240 Precondition: Each line is TSV of the form creation\t...\tname. The creation column (first field) is compared against
241 [lo, hi). Bookmarks are skipped unless `filter_bookmarks=True`.
242 """
243 log = job.params.log
244 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
245 lo_snaptime, hi_snaptime = include_snapshot_times or (0, UNIX_TIME_INFINITY_SECS)
246 assert isinstance(lo_snaptime, int)
247 assert isinstance(hi_snaptime, int)
248 results: list[str] = []
249 for snapshot in snapshots:
250 if (not filter_bookmarks) and "@" not in snapshot:
251 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots
252 elif lo_snaptime <= int(snapshot[0 : snapshot.index("\t")]) < hi_snaptime:
253 results.append(snapshot)
254 if is_debug:
255 log.debug("Including b/c creation time: %s", snapshot[snapshot.rindex("\t") + 1 :])
256 else:
257 if is_debug:
258 log.debug("Excluding b/c creation time: %s", snapshot[snapshot.rindex("\t") + 1 :])
259 return results
262def _filter_snapshots_by_creation_time_and_rank(
263 job: Job,
264 snapshots: list[str],
265 include_snapshot_times: UnixTimeRange,
266 include_snapshot_ranks: list[RankRange],
267 filter_bookmarks: bool = False,
268) -> list[str]:
269 """Filters by creation time and rank within the snapshot list.
271 Precondition: Each line is TSV of the form creation\t...\tname. The creation column (first field) is compared against
272 [lo, hi). Bookmarks are skipped unless `filter_bookmarks=True`.
273 """
275 def get_idx(rank: tuple[str, int, bool], n: int) -> int:
276 """Returns index for rank tuple (kind, value, percent)."""
277 kind, num, is_percent = rank
278 m = round(n * num / 100) if is_percent else min(n, num)
279 assert kind == "latest" or kind == "oldest"
280 return m if kind == "oldest" else n - m
282 assert isinstance(include_snapshot_ranks, list)
283 assert len(include_snapshot_ranks) > 0
284 log = job.params.log
285 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
286 lo_time, hi_time = include_snapshot_times or (0, UNIX_TIME_INFINITY_SECS)
287 assert isinstance(lo_time, int)
288 assert isinstance(hi_time, int)
289 n = sum(1 for snapshot in snapshots if "@" in snapshot)
290 for rank_range in include_snapshot_ranks:
291 lo_rank, hi_rank = rank_range
292 lo: int = get_idx(lo_rank, n)
293 hi: int = get_idx(hi_rank, n)
294 lo, hi = (lo, hi) if lo <= hi else (hi, lo)
295 i: int = 0
296 results: list[str] = []
297 for snapshot in snapshots:
298 is_snapshot = "@" in snapshot
299 if (not filter_bookmarks) and not is_snapshot:
300 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots
301 else:
302 msg = None
303 if is_snapshot and lo <= i < hi:
304 msg = "Including b/c snapshot rank: %s"
305 elif lo_time <= int(snapshot[0 : snapshot.index("\t")]) < hi_time:
306 msg = "Including b/c creation time: %s"
307 if msg:
308 results.append(snapshot)
309 else:
310 msg = "Excluding b/c snapshot rank: %s"
311 if is_debug:
312 log.debug(msg, snapshot[snapshot.rindex("\t") + 1 :])
313 i += 1 if is_snapshot else 0
314 snapshots = results
315 n = hi - lo
316 return snapshots
319def filter_properties(
320 p: Params, props: dict[str, str | None], include_regexes: RegexList, exclude_regexes: RegexList
321) -> dict[str, str | None]:
322 """Returns ZFS props whose name matches at least one of the include regexes but none of the exclude regexes."""
323 log = p.log
324 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
325 results: dict[str, str | None] = {}
326 for propname, propvalue in props.items():
327 if is_included(propname, include_regexes, exclude_regexes):
328 results[propname] = propvalue
329 if is_debug:
330 log.debug("Including b/c property regex: %s", propname)
331 else:
332 if is_debug:
333 log.debug("Excluding b/c property regex: %s", propname)
334 return results
337def filter_lines(input_list: Iterable[str], input_set: set[str]) -> list[str]:
338 """For each line in input_list, includes the line if input_set contains the first column field of that line."""
339 if len(input_set) == 0:
340 return []
341 return [line for line in input_list if line[0 : line.index("\t")] in input_set]
344def filter_lines_except(input_list: list[str], input_set: set[str]) -> list[str]:
345 """For each line in input_list, includes the line if input_set does not contain the first column field of that line."""
346 if len(input_set) == 0:
347 return input_list
348 return [line for line in input_list if line[0 : line.index("\t")] not in input_set]
351def dataset_regexes(src: Remote, dst: Remote, datasets: list[str]) -> list[str]:
352 """Converts dataset paths to regex strings relative to src or dst roots."""
353 results: list[str] = []
354 for dataset in datasets:
355 if dataset.startswith("/"):
356 # it's an absolute dataset - convert it to a relative dataset
357 dataset = dataset[1:]
358 if is_descendant(dataset, of_root_dataset=src.root_dataset):
359 dataset = relativize_dataset(dataset, src.root_dataset)
360 elif is_descendant(dataset, of_root_dataset=dst.root_dataset):
361 dataset = relativize_dataset(dataset, dst.root_dataset)
362 else:
363 continue # ignore datasets that make no difference
364 if dataset.startswith("/"):
365 dataset = dataset[1:]
366 if dataset.endswith("/"):
367 dataset = dataset[0:-1]
368 regex: str
369 if dataset:
370 regex = re.escape(dataset)
371 else:
372 regex = ".*"
373 results.append(regex)
374 return results