Coverage for bzfs_main/filter.py: 100%
220 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""The filter algorithms that apply include/exclude policies are in filter_datasets() and filter_snapshots()."""
17from __future__ import (
18 annotations,
19)
20import math
21import os
22import re
23from collections.abc import (
24 Iterable,
25)
26from datetime import (
27 timedelta,
28)
29from typing import (
30 TYPE_CHECKING,
31 Final,
32 Optional,
33 Union,
34)
36from bzfs_main.connection import (
37 try_ssh_command,
38)
39from bzfs_main.utils import (
40 DONT_SKIP_DATASET,
41 LOG_DEBUG,
42 LOG_TRACE,
43 UNIX_TIME_INFINITY_SECS,
44 RegexList,
45 is_descendant,
46 is_included,
47 relativize_dataset,
48)
50if TYPE_CHECKING: # pragma: no cover - for type hints only
51 from bzfs_main.bzfs import (
52 Job,
53 )
54 from bzfs_main.configuration import (
55 Params,
56 Remote,
57 )
59# constants:
60SNAPSHOT_REGEX_FILTER_NAME: Final[str] = "snapshot_regex"
61SNAPSHOT_REGEX_FILTER_NAMES: Final[frozenset[str]] = frozenset({"include_snapshot_regex", "exclude_snapshot_regex"})
64UnixTimeRange = Optional[tuple[Union[timedelta, int], Union[timedelta, int]]] # Type alias
65RankRange = tuple[tuple[str, int, bool], tuple[str, int, bool]] # Type alias
68def filter_datasets(job: Job, remote: Remote, sorted_datasets: list[str]) -> list[str]:
69 """Returns all datasets (and their descendants) that match at least one of the include regexes but none of the exclude
70 regexes.
72 Assumes the list of input datasets is sorted. The list of output datasets will be sorted too.
73 """
74 assert (not job.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted"
75 p, log = job.params, job.params.log
76 results: list[str] = []
77 for i, dataset in enumerate(sorted_datasets):
78 if i == 0 and p.skip_parent:
79 continue
80 rel_dataset: str = relativize_dataset(dataset, remote.root_dataset)
81 if rel_dataset.startswith("/"):
82 rel_dataset = rel_dataset[1:] # strip leading '/' char if any
83 if is_included(rel_dataset, p.include_dataset_regexes, p.exclude_dataset_regexes):
84 results.append(dataset)
85 log.debug("Including b/c dataset regex: %s", dataset)
86 else:
87 log.debug("Excluding b/c dataset regex: %s", dataset)
88 if p.exclude_dataset_property:
89 results = _filter_datasets_by_exclude_property(job, remote, results)
90 is_debug: bool = p.log.isEnabledFor(LOG_DEBUG)
91 for dataset in results:
92 if is_debug:
93 log.debug(f"Finally included {remote.location} dataset: %s", dataset)
94 if job.is_test_mode:
95 assert results == sorted(results), "List is not sorted"
96 # Asserts the following: If a dataset is excluded its descendants are automatically excluded too, and this
97 # decision is never reconsidered even for the descendants because exclude takes precedence over include.
98 resultset: set[str] = set(results)
99 root_datasets: list[str] = [dataset for dataset in results if os.path.dirname(dataset) not in resultset] # no parent
100 for root in root_datasets: # each root is not a descendant of another dataset
101 assert not any(is_descendant(root, of_root_dataset=dataset) for dataset in results if dataset != root)
102 for dataset in results: # each dataset belongs to a subtree rooted at one of the roots
103 assert any(is_descendant(dataset, of_root_dataset=root) for root in root_datasets)
104 return results
107def _filter_datasets_by_exclude_property(job: Job, remote: Remote, sorted_datasets: list[str]) -> list[str]:
108 """Excludes datasets that are marked with a ZFS user property value that, in effect, says 'skip me'."""
109 p, log = job.params, job.params.log
110 results: list[str] = []
111 localhostname: str | None = None
112 skip_dataset: str = DONT_SKIP_DATASET
113 for dataset in sorted_datasets:
114 if is_descendant(dataset, of_root_dataset=skip_dataset):
115 # skip_dataset shall be ignored or has been deleted by some third party while we're running
116 continue # nothing to do anymore for this dataset subtree (note that datasets is sorted)
117 skip_dataset = DONT_SKIP_DATASET
118 # TODO perf: on zfs >= 2.3 use json via zfs list -j to safely merge all zfs list's into one 'zfs list' call
119 cmd = p.split_args(f"{p.zfs_program} list -t filesystem,volume -Hp -o {p.exclude_dataset_property}", dataset)
120 job.maybe_inject_delete(remote, dataset=dataset, delete_trigger="zfs_list_exclude_property")
121 property_value: str | None = try_ssh_command(job, remote, LOG_TRACE, cmd=cmd)
122 if property_value is None:
123 log.warning(f"Third party deleted {remote.location}: %s", dataset)
124 skip_dataset = dataset
125 else:
126 reason: str = ""
127 property_value = property_value.strip()
128 sync: bool
129 if not property_value or property_value == "-" or property_value.lower() == "true":
130 sync = True
131 elif property_value.lower() == "false":
132 sync = False
133 else:
134 import socket # lazy import for startup perf
136 localhostname = localhostname or socket.gethostname()
137 sync = any(localhostname == hostname.strip() for hostname in property_value.split(","))
138 reason = f", localhostname: {localhostname}, hostnames: {property_value}"
140 if sync:
141 results.append(dataset)
142 log.debug("Including b/c dataset prop: %s%s", dataset, reason)
143 else:
144 skip_dataset = dataset
145 log.debug("Excluding b/c dataset prop: %s%s", dataset, reason)
146 return results
149def filter_snapshots(
150 job: Job, basis_snapshots: list[str], all_except: bool = False, filter_bookmarks: bool = False
151) -> list[str]:
152 """Returns all snapshots that pass all include/exclude policies.
154 Semantics: Within a single snapshot-filter group, filters are applied sequentially (logical AND). Across groups,
155 results are union-ized (logical OR). Set `all_except=True` to invert the final selection (retain-selected vs
156 delete-selected modes). Bookmarks: when `filter_bookmarks=False`, bookmark entries (with '#') are always retained to
157 assist common-snapshot detection; when `True`, bookmarks are subject to the same filters as snapshots.
158 """
160 def resolve_timerange(timerange: UnixTimeRange) -> UnixTimeRange:
161 """Converts relative timerange values to UTC Unix time in integer seconds."""
162 assert timerange is not None
163 lo, hi = timerange
164 if isinstance(lo, timedelta):
165 lo = math.ceil(current_unixtime_in_secs - lo.total_seconds())
166 if isinstance(hi, timedelta):
167 hi = math.ceil(current_unixtime_in_secs - hi.total_seconds())
168 assert isinstance(lo, int)
169 assert isinstance(hi, int)
170 return (lo, hi) if lo <= hi else (hi, lo)
172 p, log = job.params, job.params.log
173 current_unixtime_in_secs: float = p.create_src_snapshots_config.current_datetime.timestamp()
174 resultset: set[str] = set()
175 for snapshot_filter in p.snapshot_filters:
176 snapshots: list[str] = basis_snapshots
177 for _filter in snapshot_filter:
178 name: str = _filter.name
179 if name == SNAPSHOT_REGEX_FILTER_NAME:
180 snapshots = _filter_snapshots_by_regex(
181 job, snapshots, regexes=_filter.options, filter_bookmarks=filter_bookmarks
182 )
183 elif name == "include_snapshot_times":
184 timerange = resolve_timerange(_filter.timerange) if _filter.timerange is not None else _filter.timerange
185 snapshots = _filter_snapshots_by_creation_time(
186 job, snapshots, include_snapshot_times=timerange, filter_bookmarks=filter_bookmarks
187 )
188 else:
189 assert name == "include_snapshot_times_and_ranks"
190 timerange = resolve_timerange(_filter.timerange) if _filter.timerange is not None else _filter.timerange
191 snapshots = _filter_snapshots_by_creation_time_and_rank(
192 job,
193 snapshots,
194 include_snapshot_times=timerange,
195 include_snapshot_ranks=_filter.options,
196 filter_bookmarks=filter_bookmarks,
197 )
198 resultset.update(snapshots) # union
200 no_f_bookmarks: bool = not filter_bookmarks
201 snapshots = [line for line in basis_snapshots if (no_f_bookmarks and "#" in line) or ((line in resultset) != all_except)]
202 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
203 for snapshot in snapshots:
204 if is_debug:
205 log.debug("Finally included snapshot: %s", snapshot[snapshot.rindex("\t") + 1 :])
206 return snapshots
209def _filter_snapshots_by_regex(
210 job: Job, snapshots: list[str], regexes: tuple[RegexList, RegexList], filter_bookmarks: bool = False
211) -> list[str]:
212 """Returns all snapshots that match at least one of the include regexes but none of the exclude regexes.
214 Precondition: Each line is TSV of the form ...guid\tname. Regexes are applied to the snapshot or bookmark tag portion
215 of `name` (after '@' or, if `filter_bookmarks=True`, after '#').
216 """
217 exclude_snapshot_regexes, include_snapshot_regexes = regexes
218 log = job.params.log
219 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
220 results: list[str] = []
221 for snapshot in snapshots:
222 i = snapshot.find("@") # snapshot separator
223 if i < 0 and filter_bookmarks:
224 i = snapshot.index("#") # bookmark separator
225 if i < 0:
226 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots
227 elif is_included(snapshot[i + 1 :], include_snapshot_regexes, exclude_snapshot_regexes):
228 results.append(snapshot)
229 if is_debug:
230 log.debug("Including b/c snapshot regex: %s", snapshot[snapshot.rindex("\t") + 1 :])
231 else:
232 if is_debug:
233 log.debug("Excluding b/c snapshot regex: %s", snapshot[snapshot.rindex("\t") + 1 :])
234 return results
237def _filter_snapshots_by_creation_time(
238 job: Job, snapshots: list[str], include_snapshot_times: UnixTimeRange, filter_bookmarks: bool = False
239) -> list[str]:
240 """Filters snapshots to those created within the specified time window.
242 Precondition: Each line is TSV of the form creation\t...\tname. The creation column (first field) is compared against
243 [lo, hi). Bookmarks are skipped unless `filter_bookmarks=True`.
244 """
245 log = job.params.log
246 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
247 lo_snaptime, hi_snaptime = include_snapshot_times or (0, UNIX_TIME_INFINITY_SECS)
248 assert isinstance(lo_snaptime, int)
249 assert isinstance(hi_snaptime, int)
250 results: list[str] = []
251 for snapshot in snapshots:
252 if (not filter_bookmarks) and "@" not in snapshot:
253 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots
254 elif lo_snaptime <= int(snapshot[0 : snapshot.index("\t")]) < hi_snaptime:
255 results.append(snapshot)
256 if is_debug:
257 log.debug("Including b/c creation time: %s", snapshot[snapshot.rindex("\t") + 1 :])
258 else:
259 if is_debug:
260 log.debug("Excluding b/c creation time: %s", snapshot[snapshot.rindex("\t") + 1 :])
261 return results
264def _filter_snapshots_by_creation_time_and_rank(
265 job: Job,
266 snapshots: list[str],
267 include_snapshot_times: UnixTimeRange,
268 include_snapshot_ranks: list[RankRange],
269 filter_bookmarks: bool = False,
270) -> list[str]:
271 """Filters by creation time and rank within the snapshot list.
273 Precondition: Each line is TSV of the form creation\t...\tname. The creation column (first field) is compared against
274 [lo, hi). Bookmarks are skipped unless `filter_bookmarks=True`.
275 """
277 def get_idx(rank: tuple[str, int, bool], n: int) -> int:
278 """Returns index for rank tuple (kind, value, percent)."""
279 kind, num, is_percent = rank
280 m = round(n * num / 100) if is_percent else min(n, num)
281 assert kind == "latest" or kind == "oldest"
282 return m if kind == "oldest" else n - m
284 assert isinstance(include_snapshot_ranks, list)
285 assert len(include_snapshot_ranks) > 0
286 log = job.params.log
287 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
288 lo_time, hi_time = include_snapshot_times or (0, UNIX_TIME_INFINITY_SECS)
289 assert isinstance(lo_time, int)
290 assert isinstance(hi_time, int)
291 n = sum(1 for snapshot in snapshots if "@" in snapshot)
292 for rank_range in include_snapshot_ranks:
293 lo_rank, hi_rank = rank_range
294 lo: int = get_idx(lo_rank, n)
295 hi: int = get_idx(hi_rank, n)
296 lo, hi = (lo, hi) if lo <= hi else (hi, lo)
297 i: int = 0
298 results: list[str] = []
299 for snapshot in snapshots:
300 is_snapshot = "@" in snapshot
301 if (not filter_bookmarks) and not is_snapshot:
302 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots
303 else:
304 msg = None
305 if is_snapshot and lo <= i < hi:
306 msg = "Including b/c snapshot rank: %s"
307 elif lo_time <= int(snapshot[0 : snapshot.index("\t")]) < hi_time:
308 msg = "Including b/c creation time: %s"
309 if msg:
310 results.append(snapshot)
311 else:
312 msg = "Excluding b/c snapshot rank: %s"
313 if is_debug:
314 log.debug(msg, snapshot[snapshot.rindex("\t") + 1 :])
315 i += 1 if is_snapshot else 0
316 snapshots = results
317 n = hi - lo
318 return snapshots
321def filter_properties(
322 p: Params, props: dict[str, str | None], include_regexes: RegexList, exclude_regexes: RegexList
323) -> dict[str, str | None]:
324 """Returns ZFS props whose name matches at least one of the include regexes but none of the exclude regexes."""
325 log = p.log
326 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
327 results: dict[str, str | None] = {}
328 for propname, propvalue in props.items():
329 if is_included(propname, include_regexes, exclude_regexes):
330 results[propname] = propvalue
331 if is_debug:
332 log.debug("Including b/c property regex: %s", propname)
333 else:
334 if is_debug:
335 log.debug("Excluding b/c property regex: %s", propname)
336 return results
339def filter_lines(input_list: Iterable[str], input_set: set[str]) -> list[str]:
340 """For each line in input_list, includes the line if input_set contains the first column field of that line."""
341 if len(input_set) == 0:
342 return []
343 return [line for line in input_list if line[0 : line.index("\t")] in input_set]
346def filter_lines_except(input_list: list[str], input_set: set[str]) -> list[str]:
347 """For each line in input_list, includes the line if input_set does not contain the first column field of that line."""
348 if len(input_set) == 0:
349 return input_list
350 return [line for line in input_list if line[0 : line.index("\t")] not in input_set]
353def dataset_regexes(src: Remote, dst: Remote, datasets: list[str]) -> list[str]:
354 """Converts dataset paths to regex strings relative to src or dst roots."""
355 results: list[str] = []
356 for dataset in datasets:
357 if dataset.startswith("/"):
358 # it's an absolute dataset - convert it to a relative dataset
359 dataset = dataset[1:]
360 if is_descendant(dataset, of_root_dataset=src.root_dataset):
361 dataset = relativize_dataset(dataset, src.root_dataset)
362 elif is_descendant(dataset, of_root_dataset=dst.root_dataset):
363 dataset = relativize_dataset(dataset, dst.root_dataset)
364 else:
365 continue # ignore datasets that make no difference
366 if dataset.startswith("/"):
367 dataset = dataset[1:]
368 if dataset.endswith("/"):
369 dataset = dataset[0:-1]
370 regex: str
371 if dataset:
372 regex = re.escape(dataset)
373 else:
374 regex = ".*"
375 results.append(regex)
376 return results