Coverage for bzfs_main / filter.py: 100%

222 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-24 10:16 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""The filter algorithms that apply include/exclude policies are in filter_datasets() and filter_snapshots().""" 

16 

17from __future__ import ( 

18 annotations, 

19) 

20import math 

21import os 

22import re 

23from collections.abc import ( 

24 Iterable, 

25) 

26from datetime import ( 

27 timedelta, 

28) 

29from typing import ( 

30 TYPE_CHECKING, 

31 Final, 

32 Optional, 

33 Union, 

34 cast, 

35) 

36 

37from bzfs_main.util.utils import ( 

38 DONT_SKIP_DATASET, 

39 LOG_DEBUG, 

40 LOG_TRACE, 

41 UNIX_TIME_INFINITY_SECS, 

42 RegexList, 

43 is_descendant, 

44 is_included, 

45 relativize_dataset, 

46) 

47 

48if TYPE_CHECKING: # pragma: no cover - for type hints only 

49 from bzfs_main.bzfs import ( 

50 Job, 

51 ) 

52 from bzfs_main.configuration import ( 

53 Params, 

54 Remote, 

55 ) 

56 

57# constants: 

58SNAPSHOT_REGEX_FILTER_NAME: Final[str] = "snapshot_regex" 

59SNAPSHOT_REGEX_FILTER_NAMES: Final[frozenset[str]] = frozenset({"include_snapshot_regex", "exclude_snapshot_regex"}) 

60SNAPSHOT_FILTERS_VAR: Final[str] = "snapshot_filters_var" 

61 

62 

63UnixTimeRange = Optional[tuple[Union[timedelta, int], Union[timedelta, int]]] # Type alias 

64RankRange = tuple[tuple[str, int, bool], tuple[str, int, bool]] # Type alias 

65 

66 

67def filter_datasets(job: Job, remote: Remote, sorted_datasets: list[str]) -> list[str]: 

68 """Returns all datasets (and their descendants) that match at least one of the include regexes but none of the exclude 

69 regexes. 

70 

71 Assumes the list of input datasets is sorted. The list of output datasets will be sorted too. 

72 """ 

73 assert (not job.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted" 

74 p, log = job.params, job.params.log 

75 results: list[str] = [] 

76 for i, dataset in enumerate(sorted_datasets): 

77 if i == 0 and p.skip_parent: 

78 continue 

79 rel_dataset: str = relativize_dataset(dataset, remote.root_dataset) 

80 if rel_dataset.startswith("/"): 

81 rel_dataset = rel_dataset[1:] # strip leading '/' char if any 

82 if is_included(rel_dataset, p.include_dataset_regexes, p.exclude_dataset_regexes): 

83 results.append(dataset) 

84 log.debug("Including b/c dataset regex: %s", dataset) 

85 else: 

86 log.debug("Excluding b/c dataset regex: %s", dataset) 

87 if p.exclude_dataset_property: 

88 results = _filter_datasets_by_exclude_property(job, remote, results) 

89 is_debug: bool = p.log.isEnabledFor(LOG_DEBUG) 

90 for dataset in results: 

91 if is_debug: 

92 log.debug(f"Finally included {remote.location} dataset: %s", dataset) 

93 if job.is_test_mode: 

94 assert results == sorted(results), "List is not sorted" 

95 # Asserts the following: If a dataset is excluded its descendants are automatically excluded too, and this 

96 # decision is never reconsidered even for the descendants because exclude takes precedence over include. 

97 resultset: set[str] = set(results) 

98 root_datasets: list[str] = [dataset for dataset in results if os.path.dirname(dataset) not in resultset] # no parent 

99 for root in root_datasets: # each root is not a descendant of another dataset 

100 assert not any(is_descendant(root, of_root_dataset=dataset) for dataset in results if dataset != root) 

101 for dataset in results: # each dataset belongs to a subtree rooted at one of the roots 

102 assert any(is_descendant(dataset, of_root_dataset=root) for root in root_datasets) 

103 return results 

104 

105 

106def _filter_datasets_by_exclude_property(job: Job, remote: Remote, sorted_datasets: list[str]) -> list[str]: 

107 """Excludes datasets that are marked with a ZFS user property value that, in effect, says 'skip me'.""" 

108 p, log = job.params, job.params.log 

109 results: list[str] = [] 

110 localhostname: str | None = None 

111 skip_dataset: str = DONT_SKIP_DATASET 

112 for dataset in sorted_datasets: 

113 if is_descendant(dataset, of_root_dataset=skip_dataset): 

114 # skip_dataset shall be ignored or has been deleted by some third party while we're running 

115 continue # nothing to do anymore for this dataset subtree (note that datasets is sorted) 

116 skip_dataset = DONT_SKIP_DATASET 

117 # TODO perf: on zfs >= 2.3 use json via zfs list -j to safely merge all zfs list's into one 'zfs list' call 

118 cmd = p.split_args(f"{p.zfs_program} list -t filesystem,volume -Hp -o {p.exclude_dataset_property}", dataset) 

119 job.maybe_inject_delete(remote, dataset=dataset, delete_trigger="zfs_list_exclude_property") 

120 property_value: str | None = job.try_ssh_command_with_retries(remote, LOG_TRACE, cmd=cmd) 

121 if property_value is None: 

122 log.warning(f"Third party deleted {remote.location}: %s", dataset) 

123 skip_dataset = dataset 

124 else: 

125 reason: str = "" 

126 property_value = property_value.strip() 

127 sync: bool 

128 if not property_value or property_value == "-" or property_value.lower() == "true": 

129 sync = True 

130 elif property_value.lower() == "false": 

131 sync = False 

132 else: 

133 import socket # lazy import for startup perf 

134 

135 localhostname = localhostname or socket.gethostname() 

136 sync = any(localhostname == hostname.strip() for hostname in property_value.split(",")) 

137 reason = f", localhostname: {localhostname}, hostnames: {property_value}" 

138 

139 if sync: 

140 results.append(dataset) 

141 log.debug("Including b/c dataset prop: %s%s", dataset, reason) 

142 else: 

143 skip_dataset = dataset 

144 log.debug("Excluding b/c dataset prop: %s%s", dataset, reason) 

145 return results 

146 

147 

148def filter_snapshots( 

149 job: Job, basis_snapshots: list[str], all_except: bool = False, filter_bookmarks: bool = False 

150) -> list[str]: 

151 """Returns all snapshots that pass all include/exclude policies. 

152 

153 Semantics: Within a single snapshot-filter group, filters are applied sequentially (logical AND). Across groups, 

154 results are union-ized (logical OR). Set `all_except=True` to invert the final selection (retain-selected vs 

155 delete-selected modes). Bookmarks: when `filter_bookmarks=False`, bookmark entries (with '#') are always retained to 

156 assist common-snapshot detection; when `True`, bookmarks are subject to the same filters as snapshots. 

157 """ 

158 

159 def resolve_timerange(timerange: UnixTimeRange) -> UnixTimeRange: 

160 """Converts relative timerange values to UTC Unix time in integer seconds.""" 

161 assert timerange is not None 

162 lo, hi = timerange 

163 if isinstance(lo, timedelta): 

164 lo = math.ceil(current_unixtime_in_secs - lo.total_seconds()) 

165 if isinstance(hi, timedelta): 

166 hi = math.ceil(current_unixtime_in_secs - hi.total_seconds()) 

167 assert isinstance(lo, int) 

168 assert isinstance(hi, int) 

169 return (lo, hi) if lo <= hi else (hi, lo) 

170 

171 p, log = job.params, job.params.log 

172 current_unixtime_in_secs: float = p.create_src_snapshots_config.current_datetime.timestamp() 

173 resultset: set[str] = set() 

174 for snapshot_filter in p.snapshot_filters: 

175 snapshots: list[str] = basis_snapshots 

176 for _filter in snapshot_filter: 

177 name: str = _filter.name 

178 if name == SNAPSHOT_REGEX_FILTER_NAME: 

179 snapshots = _filter_snapshots_by_regex( 

180 job, 

181 snapshots, 

182 regexes=cast(tuple[RegexList, RegexList], _filter.options), 

183 filter_bookmarks=filter_bookmarks, 

184 ) 

185 elif name == "include_snapshot_times": 

186 timerange = resolve_timerange(_filter.timerange) if _filter.timerange is not None else _filter.timerange 

187 snapshots = _filter_snapshots_by_creation_time( 

188 job, snapshots, include_snapshot_times=timerange, filter_bookmarks=filter_bookmarks 

189 ) 

190 else: 

191 assert name == "include_snapshot_times_and_ranks" 

192 timerange = resolve_timerange(_filter.timerange) if _filter.timerange is not None else _filter.timerange 

193 snapshots = _filter_snapshots_by_creation_time_and_rank( 

194 job, 

195 snapshots, 

196 include_snapshot_times=timerange, 

197 include_snapshot_ranks=cast(list[RankRange], _filter.options), 

198 filter_bookmarks=filter_bookmarks, 

199 ) 

200 resultset.update(snapshots) # union 

201 

202 no_f_bookmarks: bool = not filter_bookmarks 

203 snapshots = [line for line in basis_snapshots if (no_f_bookmarks and "#" in line) or ((line in resultset) != all_except)] 

204 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

205 for snapshot in snapshots: 

206 if is_debug: 

207 log.debug("Finally included snapshot: %s", snapshot[snapshot.rindex("\t") + 1 :]) 

208 return snapshots 

209 

210 

211def _filter_snapshots_by_regex( 

212 job: Job, snapshots: list[str], regexes: tuple[RegexList, RegexList], filter_bookmarks: bool = False 

213) -> list[str]: 

214 """Returns all snapshots that match at least one of the include regexes but none of the exclude regexes. 

215 

216 Precondition: Each line is TSV of the form ...guid\tname. Regexes are applied to the snapshot or bookmark tag portion 

217 of `name` (after '@' or, if `filter_bookmarks=True`, after '#'). 

218 """ 

219 exclude_snapshot_regexes, include_snapshot_regexes = regexes 

220 log = job.params.log 

221 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

222 results: list[str] = [] 

223 for snapshot in snapshots: 

224 i = snapshot.find("@") # snapshot separator 

225 if i < 0 and filter_bookmarks: 

226 i = snapshot.index("#") # bookmark separator 

227 if i < 0: 

228 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots 

229 elif is_included(snapshot[i + 1 :], include_snapshot_regexes, exclude_snapshot_regexes): 

230 results.append(snapshot) 

231 if is_debug: 

232 log.debug("Including b/c snapshot regex: %s", snapshot[snapshot.rindex("\t") + 1 :]) 

233 else: 

234 if is_debug: 

235 log.debug("Excluding b/c snapshot regex: %s", snapshot[snapshot.rindex("\t") + 1 :]) 

236 return results 

237 

238 

239def _filter_snapshots_by_creation_time( 

240 job: Job, snapshots: list[str], include_snapshot_times: UnixTimeRange, filter_bookmarks: bool = False 

241) -> list[str]: 

242 """Filters snapshots to those created within the specified time window. 

243 

244 Precondition: Each line is TSV of the form creation\t...\tname. The creation column (first field) is compared against 

245 [lo, hi). Bookmarks are skipped unless `filter_bookmarks=True`. 

246 """ 

247 log = job.params.log 

248 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

249 lo_snaptime, hi_snaptime = include_snapshot_times or (0, UNIX_TIME_INFINITY_SECS) 

250 assert isinstance(lo_snaptime, int) 

251 assert isinstance(hi_snaptime, int) 

252 results: list[str] = [] 

253 for snapshot in snapshots: 

254 if (not filter_bookmarks) and "@" not in snapshot: 

255 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots 

256 elif lo_snaptime <= int(snapshot[: snapshot.index("\t")]) < hi_snaptime: 

257 results.append(snapshot) 

258 if is_debug: 

259 log.debug("Including b/c creation time: %s", snapshot[snapshot.rindex("\t") + 1 :]) 

260 else: 

261 if is_debug: 

262 log.debug("Excluding b/c creation time: %s", snapshot[snapshot.rindex("\t") + 1 :]) 

263 return results 

264 

265 

266def _filter_snapshots_by_creation_time_and_rank( 

267 job: Job, 

268 snapshots: list[str], 

269 include_snapshot_times: UnixTimeRange, 

270 include_snapshot_ranks: list[RankRange], 

271 filter_bookmarks: bool = False, 

272) -> list[str]: 

273 """Filters by creation time and rank within the snapshot list. 

274 

275 Precondition: Each line is TSV of the form creation\t...\tname. The creation column (first field) is compared against 

276 [lo, hi). Bookmarks are skipped unless `filter_bookmarks=True`. 

277 """ 

278 

279 def get_idx(rank: tuple[str, int, bool], n: int) -> int: 

280 """Returns index for rank tuple (kind, value, percent).""" 

281 kind, num, is_percent = rank 

282 m = round(n * num / 100) if is_percent else min(n, num) 

283 assert kind == "latest" or kind == "oldest" 

284 return m if kind == "oldest" else n - m 

285 

286 assert isinstance(include_snapshot_ranks, list) 

287 assert len(include_snapshot_ranks) > 0 

288 log = job.params.log 

289 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

290 lo_time, hi_time = include_snapshot_times or (0, UNIX_TIME_INFINITY_SECS) 

291 assert isinstance(lo_time, int) 

292 assert isinstance(hi_time, int) 

293 n = sum(1 for snapshot in snapshots if "@" in snapshot) 

294 for rank_range in include_snapshot_ranks: 

295 lo_rank, hi_rank = rank_range 

296 lo: int = get_idx(lo_rank, n) 

297 hi: int = get_idx(hi_rank, n) 

298 lo, hi = (lo, hi) if lo <= hi else (hi, lo) 

299 i: int = 0 

300 k: int = 0 

301 results: list[str] = [] 

302 for snapshot in snapshots: 

303 is_snapshot = "@" in snapshot 

304 if (not filter_bookmarks) and not is_snapshot: 

305 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots 

306 else: 

307 msg = None 

308 if is_snapshot and lo <= i < hi: 

309 msg = "Including b/c snapshot rank: %s" 

310 elif lo_time <= int(snapshot[: snapshot.index("\t")]) < hi_time: 

311 msg = "Including b/c creation time: %s" 

312 if msg: 

313 results.append(snapshot) 

314 k += 1 if is_snapshot else 0 

315 else: 

316 msg = "Excluding b/c snapshot rank: %s" 

317 if is_debug: 

318 log.debug(msg, snapshot[snapshot.rindex("\t") + 1 :]) 

319 i += 1 if is_snapshot else 0 

320 snapshots = results 

321 n = k 

322 return snapshots 

323 

324 

325def filter_properties( 

326 p: Params, props: dict[str, str | None], include_regexes: RegexList, exclude_regexes: RegexList 

327) -> dict[str, str | None]: 

328 """Returns ZFS props whose name matches at least one of the include regexes but none of the exclude regexes.""" 

329 log = p.log 

330 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

331 results: dict[str, str | None] = {} 

332 for propname, propvalue in props.items(): 

333 if is_included(propname, include_regexes, exclude_regexes): 

334 results[propname] = propvalue 

335 if is_debug: 

336 log.debug("Including b/c property regex: %s", propname) 

337 else: 

338 if is_debug: 

339 log.debug("Excluding b/c property regex: %s", propname) 

340 return results 

341 

342 

343def filter_lines(input_list: Iterable[str], input_set: set[str]) -> list[str]: 

344 """For each line in input_list, includes the line if input_set contains the first column field of that line.""" 

345 if len(input_set) == 0: 

346 return [] 

347 return [line for line in input_list if line[: line.index("\t")] in input_set] 

348 

349 

350def filter_lines_except(input_list: list[str], input_set: set[str]) -> list[str]: 

351 """For each line in input_list, includes the line if input_set does not contain the first column field of that line.""" 

352 if len(input_set) == 0: 

353 return input_list 

354 return [line for line in input_list if line[: line.index("\t")] not in input_set] 

355 

356 

357def dataset_regexes(src: Remote, dst: Remote, datasets: list[str]) -> list[str]: 

358 """Converts dataset paths to regex strings relative to src or dst roots.""" 

359 results: list[str] = [] 

360 for dataset in datasets: 

361 if dataset.startswith("/"): 

362 # it's an absolute dataset - convert it to a relative dataset 

363 dataset = dataset[1:] 

364 if is_descendant(dataset, of_root_dataset=src.root_dataset): 

365 dataset = relativize_dataset(dataset, src.root_dataset) 

366 elif is_descendant(dataset, of_root_dataset=dst.root_dataset): 

367 dataset = relativize_dataset(dataset, dst.root_dataset) 

368 else: 

369 continue # ignore datasets that make no difference 

370 if dataset.startswith("/"): 

371 dataset = dataset[1:] 

372 if dataset.endswith("/"): 

373 dataset = dataset[0:-1] 

374 regex: str 

375 if dataset: 

376 regex = re.escape(dataset) 

377 else: 

378 regex = ".*" 

379 results.append(regex) 

380 return results