Coverage for bzfs_main/filter.py: 99%
218 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""The filter algorithms that apply include/exclude policies are in filter_datasets() and filter_snapshots()."""
17from __future__ import annotations
18import math
19import os
20import re
21import socket
22from datetime import timedelta
23from typing import (
24 TYPE_CHECKING,
25 Iterable,
26 Optional,
27 Tuple,
28 Union,
29)
31from bzfs_main.connection import (
32 try_ssh_command,
33)
34from bzfs_main.utils import (
35 DONT_SKIP_DATASET,
36 LOG_DEBUG,
37 LOG_TRACE,
38 UNIX_TIME_INFINITY_SECS,
39 RegexList,
40 is_descendant,
41 is_included,
42 relativize_dataset,
43)
45if TYPE_CHECKING: # pragma: no cover - for type hints only
46 from bzfs_main.bzfs import Job
47 from bzfs_main.configuration import Params, Remote
49# constants:
50SNAPSHOT_REGEX_FILTER_NAME: str = "snapshot_regex"
51SNAPSHOT_REGEX_FILTER_NAMES: frozenset[str] = frozenset({"include_snapshot_regex", "exclude_snapshot_regex"})
54UnixTimeRange = Optional[Tuple[Union[timedelta, int], Union[timedelta, int]]] # Type alias
55RankRange = Tuple[Tuple[str, int, bool], Tuple[str, int, bool]] # Type alias
58def filter_datasets(job: Job, remote: Remote, sorted_datasets: list[str]) -> list[str]:
59 """Returns all datasets (and their descendants) that match at least one of the include regexes but none of the exclude
60 regexes.
62 Assumes the list of input datasets is sorted. The list of output datasets will be sorted too.
63 """
64 p, log = job.params, job.params.log
65 results: list[str] = []
66 for i, dataset in enumerate(sorted_datasets):
67 if i == 0 and p.skip_parent:
68 continue
69 rel_dataset: str = relativize_dataset(dataset, remote.root_dataset)
70 if rel_dataset.startswith("/"):
71 rel_dataset = rel_dataset[1:] # strip leading '/' char if any
72 if is_included(rel_dataset, p.include_dataset_regexes, p.exclude_dataset_regexes):
73 results.append(dataset)
74 log.debug("Including b/c dataset regex: %s", dataset)
75 else:
76 log.debug("Excluding b/c dataset regex: %s", dataset)
77 if p.exclude_dataset_property:
78 results = _filter_datasets_by_exclude_property(job, remote, results)
79 is_debug: bool = p.log.isEnabledFor(LOG_DEBUG)
80 for dataset in results:
81 if is_debug:
82 log.debug("Finally included %s dataset: %s", remote.location, dataset)
83 if job.is_test_mode:
84 assert results == sorted(results), "List is not sorted"
85 # Asserts the following: If a dataset is excluded its descendants are automatically excluded too, and this
86 # decision is never reconsidered even for the descendants because exclude takes precedence over include.
87 resultset: set[str] = set(results)
88 root_datasets: list[str] = [dataset for dataset in results if os.path.dirname(dataset) not in resultset] # no parent
89 for root in root_datasets: # each root is not a descendant of another dataset
90 assert not any(is_descendant(root, of_root_dataset=dataset) for dataset in results if dataset != root)
91 for dataset in results: # each dataset belongs to a subtree rooted at one of the roots
92 assert any(is_descendant(dataset, of_root_dataset=root) for root in root_datasets)
93 return results
96def _filter_datasets_by_exclude_property(job: Job, remote: Remote, sorted_datasets: list[str]) -> list[str]:
97 """Excludes datasets that are marked with a ZFS user property value that, in effect, says 'skip me'."""
98 p, log = job.params, job.params.log
99 results: list[str] = []
100 localhostname: str | None = None
101 skip_dataset: str = DONT_SKIP_DATASET
102 for dataset in sorted_datasets:
103 if is_descendant(dataset, of_root_dataset=skip_dataset):
104 # skip_dataset shall be ignored or has been deleted by some third party while we're running
105 continue # nothing to do anymore for this dataset subtree (note that datasets is sorted)
106 skip_dataset = DONT_SKIP_DATASET
107 # TODO perf: on zfs >= 2.3 use json via zfs list -j to safely merge all zfs list's into one 'zfs list' call
108 cmd = p.split_args(f"{p.zfs_program} list -t filesystem,volume -Hp -o {p.exclude_dataset_property}", dataset)
109 job.maybe_inject_delete(remote, dataset=dataset, delete_trigger="zfs_list_exclude_property")
110 property_value: str | None = try_ssh_command(job, remote, LOG_TRACE, cmd=cmd)
111 if property_value is None:
112 log.warning(f"Third party deleted {remote.location}: %s", dataset)
113 skip_dataset = dataset
114 else:
115 reason: str = ""
116 property_value = property_value.strip()
117 sync: bool
118 if not property_value or property_value == "-" or property_value.lower() == "true":
119 sync = True
120 elif property_value.lower() == "false":
121 sync = False
122 else:
123 localhostname = localhostname or socket.gethostname()
124 sync = any(localhostname == hostname.strip() for hostname in property_value.split(","))
125 reason = f", localhostname: {localhostname}, hostnames: {property_value}"
127 if sync:
128 results.append(dataset)
129 log.debug("Including b/c dataset prop: %s%s", dataset, reason)
130 else:
131 skip_dataset = dataset
132 log.debug("Excluding b/c dataset prop: %s%s", dataset, reason)
133 return results
136def filter_snapshots(
137 job: Job, basis_snapshots: list[str], all_except: bool = False, filter_bookmarks: bool = False
138) -> list[str]:
139 """Returns all snapshots that pass all include/exclude policies.
141 `all_except=False` returns snapshots *matching* the filters, for example those that should be deleted if we are in
142 "delete selected" mode.
144 `all_except=True` returns snapshots *not* matching the filters, for example those that should be deleted if we are in
145 "retain selected" mode.
146 """
148 def resolve_timerange(timerange: UnixTimeRange) -> UnixTimeRange:
149 """Converts relative timerange values to UTC Unix time in integer seconds."""
150 assert timerange is not None
151 lo, hi = timerange
152 if isinstance(lo, timedelta):
153 lo = math.ceil(current_unixtime_in_secs - lo.total_seconds())
154 if isinstance(hi, timedelta):
155 hi = math.ceil(current_unixtime_in_secs - hi.total_seconds())
156 assert isinstance(lo, int)
157 assert isinstance(hi, int)
158 return (lo, hi) if lo <= hi else (hi, lo)
160 p, log = job.params, job.params.log
161 current_unixtime_in_secs: float = p.create_src_snapshots_config.current_datetime.timestamp()
162 resultset: set[str] = set()
163 for snapshot_filter in p.snapshot_filters:
164 snapshots: list[str] = basis_snapshots
165 for _filter in snapshot_filter:
166 name: str = _filter.name
167 if name == SNAPSHOT_REGEX_FILTER_NAME:
168 snapshots = _filter_snapshots_by_regex(
169 job, snapshots, regexes=_filter.options, filter_bookmarks=filter_bookmarks
170 )
171 elif name == "include_snapshot_times":
172 timerange = resolve_timerange(_filter.timerange) if _filter.timerange is not None else _filter.timerange
173 snapshots = _filter_snapshots_by_creation_time(
174 job, snapshots, include_snapshot_times=timerange, filter_bookmarks=filter_bookmarks
175 )
176 else:
177 assert name == "include_snapshot_times_and_ranks"
178 timerange = resolve_timerange(_filter.timerange) if _filter.timerange is not None else _filter.timerange
179 snapshots = _filter_snapshots_by_creation_time_and_rank(
180 job,
181 snapshots,
182 include_snapshot_times=timerange,
183 include_snapshot_ranks=_filter.options,
184 filter_bookmarks=filter_bookmarks,
185 )
186 resultset.update(snapshots) # union
188 no_f_bookmarks: bool = not filter_bookmarks
189 snapshots = [line for line in basis_snapshots if (no_f_bookmarks and "#" in line) or ((line in resultset) != all_except)]
190 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
191 for snapshot in snapshots:
192 if is_debug:
193 log.debug("Finally included snapshot: %s", snapshot[snapshot.rindex("\t") + 1 :])
194 return snapshots
197def _filter_snapshots_by_regex(
198 job: Job, snapshots: list[str], regexes: tuple[RegexList, RegexList], filter_bookmarks: bool = False
199) -> list[str]:
200 """Returns all snapshots that match at least one of the include regexes but none of the exclude regexes."""
201 exclude_snapshot_regexes, include_snapshot_regexes = regexes
202 log = job.params.log
203 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
204 results: list[str] = []
205 for snapshot in snapshots:
206 i = snapshot.find("@") # snapshot separator
207 if i < 0 and filter_bookmarks:
208 i = snapshot.index("#") # bookmark separator
209 if i < 0:
210 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots
211 elif is_included(snapshot[i + 1 :], include_snapshot_regexes, exclude_snapshot_regexes):
212 results.append(snapshot)
213 if is_debug:
214 log.debug("Including b/c snapshot regex: %s", snapshot[snapshot.rindex("\t") + 1 :])
215 else:
216 if is_debug:
217 log.debug("Excluding b/c snapshot regex: %s", snapshot[snapshot.rindex("\t") + 1 :])
218 return results
221def _filter_snapshots_by_creation_time(
222 job: Job, snapshots: list[str], include_snapshot_times: UnixTimeRange, filter_bookmarks: bool = False
223) -> list[str]:
224 """Filters snapshots to those created within the specified time window."""
225 log = job.params.log
226 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
227 lo_snaptime, hi_snaptime = include_snapshot_times or (0, UNIX_TIME_INFINITY_SECS)
228 assert isinstance(lo_snaptime, int)
229 assert isinstance(hi_snaptime, int)
230 results: list[str] = []
231 for snapshot in snapshots:
232 if (not filter_bookmarks) and "@" not in snapshot:
233 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots
234 elif lo_snaptime <= int(snapshot[0 : snapshot.index("\t")]) < hi_snaptime:
235 results.append(snapshot)
236 if is_debug: 236 ↛ 231line 236 didn't jump to line 231 because the condition on line 236 was always true
237 log.debug("Including b/c creation time: %s", snapshot[snapshot.rindex("\t") + 1 :])
238 else:
239 if is_debug: 239 ↛ 231line 239 didn't jump to line 231 because the condition on line 239 was always true
240 log.debug("Excluding b/c creation time: %s", snapshot[snapshot.rindex("\t") + 1 :])
241 return results
244def _filter_snapshots_by_creation_time_and_rank(
245 job: Job,
246 snapshots: list[str],
247 include_snapshot_times: UnixTimeRange,
248 include_snapshot_ranks: list[RankRange],
249 filter_bookmarks: bool = False,
250) -> list[str]:
251 """Filters by creation time and rank within the snapshot list."""
253 def get_idx(rank: tuple[str, int, bool], n: int) -> int:
254 """Returns index for rank tuple (kind, value, percent)."""
255 kind, num, is_percent = rank
256 m = round(n * num / 100) if is_percent else min(n, num)
257 assert kind == "latest" or kind == "oldest"
258 return m if kind == "oldest" else n - m
260 assert isinstance(include_snapshot_ranks, list)
261 assert len(include_snapshot_ranks) > 0
262 log = job.params.log
263 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
264 lo_time, hi_time = include_snapshot_times or (0, UNIX_TIME_INFINITY_SECS)
265 assert isinstance(lo_time, int)
266 assert isinstance(hi_time, int)
267 n = sum(1 for snapshot in snapshots if "@" in snapshot)
268 for rank_range in include_snapshot_ranks:
269 lo_rank, hi_rank = rank_range
270 lo: int = get_idx(lo_rank, n)
271 hi: int = get_idx(hi_rank, n)
272 lo, hi = (lo, hi) if lo <= hi else (hi, lo)
273 i: int = 0
274 results: list[str] = []
275 for snapshot in snapshots:
276 is_snapshot = "@" in snapshot
277 if (not filter_bookmarks) and not is_snapshot:
278 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots
279 else:
280 msg = None
281 if is_snapshot and lo <= i < hi:
282 msg = "Including b/c snapshot rank: %s"
283 elif lo_time <= int(snapshot[0 : snapshot.index("\t")]) < hi_time:
284 msg = "Including b/c creation time: %s"
285 if msg:
286 results.append(snapshot)
287 else:
288 msg = "Excluding b/c snapshot rank: %s"
289 if is_debug: 289 ↛ 291line 289 didn't jump to line 291 because the condition on line 289 was always true
290 log.debug(msg, snapshot[snapshot.rindex("\t") + 1 :])
291 i += 1 if is_snapshot else 0
292 snapshots = results
293 n = hi - lo
294 return snapshots
297def filter_properties(
298 p: Params, props: dict[str, str | None], include_regexes: RegexList, exclude_regexes: RegexList
299) -> dict[str, str | None]:
300 """Returns ZFS props whose name matches at least one of the include regexes but none of the exclude regexes."""
301 log = p.log
302 is_debug: bool = log.isEnabledFor(LOG_DEBUG)
303 results: dict[str, str | None] = {}
304 for propname, propvalue in props.items():
305 if is_included(propname, include_regexes, exclude_regexes):
306 results[propname] = propvalue
307 if is_debug:
308 log.debug("Including b/c property regex: %s", propname)
309 else:
310 if is_debug:
311 log.debug("Excluding b/c property regex: %s", propname)
312 return results
315def filter_lines(input_list: Iterable[str], input_set: set[str]) -> list[str]:
316 """For each line in input_list, includes the line if input_set contains the first column field of that line."""
317 if len(input_set) == 0:
318 return []
319 return [line for line in input_list if line[0 : line.index("\t")] in input_set]
322def filter_lines_except(input_list: list[str], input_set: set[str]) -> list[str]:
323 """For each line in input_list, includes the line if input_set does not contain the first column field of that line."""
324 if len(input_set) == 0:
325 return input_list
326 return [line for line in input_list if line[0 : line.index("\t")] not in input_set]
329def dataset_regexes(src: Remote, dst: Remote, datasets: list[str]) -> list[str]:
330 """Converts dataset paths to regex strings relative to src or dst roots."""
331 results: list[str] = []
332 for dataset in datasets:
333 if dataset.startswith("/"):
334 # it's an absolute dataset - convert it to a relative dataset
335 dataset = dataset[1:]
336 if is_descendant(dataset, of_root_dataset=src.root_dataset):
337 dataset = relativize_dataset(dataset, src.root_dataset)
338 elif is_descendant(dataset, of_root_dataset=dst.root_dataset):
339 dataset = relativize_dataset(dataset, dst.root_dataset)
340 else:
341 continue # ignore datasets that make no difference
342 if dataset.startswith("/"):
343 dataset = dataset[1:]
344 if dataset.endswith("/"):
345 dataset = dataset[0:-1]
346 regex: str
347 if dataset:
348 regex = re.escape(dataset)
349 else:
350 regex = ".*"
351 results.append(regex)
352 return results