Coverage for bzfs_main / filter.py: 100%

220 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-22 08:03 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""The filter algorithms that apply include/exclude policies are in filter_datasets() and filter_snapshots().""" 

16 

17from __future__ import ( 

18 annotations, 

19) 

20import math 

21import os 

22import re 

23from collections.abc import ( 

24 Iterable, 

25) 

26from datetime import ( 

27 timedelta, 

28) 

29from typing import ( 

30 TYPE_CHECKING, 

31 Final, 

32 Optional, 

33 Union, 

34) 

35 

36from bzfs_main.util.utils import ( 

37 DONT_SKIP_DATASET, 

38 LOG_DEBUG, 

39 LOG_TRACE, 

40 UNIX_TIME_INFINITY_SECS, 

41 RegexList, 

42 is_descendant, 

43 is_included, 

44 relativize_dataset, 

45) 

46 

47if TYPE_CHECKING: # pragma: no cover - for type hints only 

48 from bzfs_main.bzfs import ( 

49 Job, 

50 ) 

51 from bzfs_main.configuration import ( 

52 Params, 

53 Remote, 

54 ) 

55 

56# constants: 

57SNAPSHOT_REGEX_FILTER_NAME: Final[str] = "snapshot_regex" 

58SNAPSHOT_REGEX_FILTER_NAMES: Final[frozenset[str]] = frozenset({"include_snapshot_regex", "exclude_snapshot_regex"}) 

59SNAPSHOT_FILTERS_VAR: Final[str] = "snapshot_filters_var" 

60 

61 

62UnixTimeRange = Optional[tuple[Union[timedelta, int], Union[timedelta, int]]] # Type alias 

63RankRange = tuple[tuple[str, int, bool], tuple[str, int, bool]] # Type alias 

64 

65 

66def filter_datasets(job: Job, remote: Remote, sorted_datasets: list[str]) -> list[str]: 

67 """Returns all datasets (and their descendants) that match at least one of the include regexes but none of the exclude 

68 regexes. 

69 

70 Assumes the list of input datasets is sorted. The list of output datasets will be sorted too. 

71 """ 

72 assert (not job.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted" 

73 p, log = job.params, job.params.log 

74 results: list[str] = [] 

75 for i, dataset in enumerate(sorted_datasets): 

76 if i == 0 and p.skip_parent: 

77 continue 

78 rel_dataset: str = relativize_dataset(dataset, remote.root_dataset) 

79 if rel_dataset.startswith("/"): 

80 rel_dataset = rel_dataset[1:] # strip leading '/' char if any 

81 if is_included(rel_dataset, p.include_dataset_regexes, p.exclude_dataset_regexes): 

82 results.append(dataset) 

83 log.debug("Including b/c dataset regex: %s", dataset) 

84 else: 

85 log.debug("Excluding b/c dataset regex: %s", dataset) 

86 if p.exclude_dataset_property: 

87 results = _filter_datasets_by_exclude_property(job, remote, results) 

88 is_debug: bool = p.log.isEnabledFor(LOG_DEBUG) 

89 for dataset in results: 

90 if is_debug: 

91 log.debug(f"Finally included {remote.location} dataset: %s", dataset) 

92 if job.is_test_mode: 

93 assert results == sorted(results), "List is not sorted" 

94 # Asserts the following: If a dataset is excluded its descendants are automatically excluded too, and this 

95 # decision is never reconsidered even for the descendants because exclude takes precedence over include. 

96 resultset: set[str] = set(results) 

97 root_datasets: list[str] = [dataset for dataset in results if os.path.dirname(dataset) not in resultset] # no parent 

98 for root in root_datasets: # each root is not a descendant of another dataset 

99 assert not any(is_descendant(root, of_root_dataset=dataset) for dataset in results if dataset != root) 

100 for dataset in results: # each dataset belongs to a subtree rooted at one of the roots 

101 assert any(is_descendant(dataset, of_root_dataset=root) for root in root_datasets) 

102 return results 

103 

104 

105def _filter_datasets_by_exclude_property(job: Job, remote: Remote, sorted_datasets: list[str]) -> list[str]: 

106 """Excludes datasets that are marked with a ZFS user property value that, in effect, says 'skip me'.""" 

107 p, log = job.params, job.params.log 

108 results: list[str] = [] 

109 localhostname: str | None = None 

110 skip_dataset: str = DONT_SKIP_DATASET 

111 for dataset in sorted_datasets: 

112 if is_descendant(dataset, of_root_dataset=skip_dataset): 

113 # skip_dataset shall be ignored or has been deleted by some third party while we're running 

114 continue # nothing to do anymore for this dataset subtree (note that datasets is sorted) 

115 skip_dataset = DONT_SKIP_DATASET 

116 # TODO perf: on zfs >= 2.3 use json via zfs list -j to safely merge all zfs list's into one 'zfs list' call 

117 cmd = p.split_args(f"{p.zfs_program} list -t filesystem,volume -Hp -o {p.exclude_dataset_property}", dataset) 

118 job.maybe_inject_delete(remote, dataset=dataset, delete_trigger="zfs_list_exclude_property") 

119 property_value: str | None = job.try_ssh_command_with_retries(remote, LOG_TRACE, cmd=cmd) 

120 if property_value is None: 

121 log.warning(f"Third party deleted {remote.location}: %s", dataset) 

122 skip_dataset = dataset 

123 else: 

124 reason: str = "" 

125 property_value = property_value.strip() 

126 sync: bool 

127 if not property_value or property_value == "-" or property_value.lower() == "true": 

128 sync = True 

129 elif property_value.lower() == "false": 

130 sync = False 

131 else: 

132 import socket # lazy import for startup perf 

133 

134 localhostname = localhostname or socket.gethostname() 

135 sync = any(localhostname == hostname.strip() for hostname in property_value.split(",")) 

136 reason = f", localhostname: {localhostname}, hostnames: {property_value}" 

137 

138 if sync: 

139 results.append(dataset) 

140 log.debug("Including b/c dataset prop: %s%s", dataset, reason) 

141 else: 

142 skip_dataset = dataset 

143 log.debug("Excluding b/c dataset prop: %s%s", dataset, reason) 

144 return results 

145 

146 

147def filter_snapshots( 

148 job: Job, basis_snapshots: list[str], all_except: bool = False, filter_bookmarks: bool = False 

149) -> list[str]: 

150 """Returns all snapshots that pass all include/exclude policies. 

151 

152 Semantics: Within a single snapshot-filter group, filters are applied sequentially (logical AND). Across groups, 

153 results are union-ized (logical OR). Set `all_except=True` to invert the final selection (retain-selected vs 

154 delete-selected modes). Bookmarks: when `filter_bookmarks=False`, bookmark entries (with '#') are always retained to 

155 assist common-snapshot detection; when `True`, bookmarks are subject to the same filters as snapshots. 

156 """ 

157 

158 def resolve_timerange(timerange: UnixTimeRange) -> UnixTimeRange: 

159 """Converts relative timerange values to UTC Unix time in integer seconds.""" 

160 assert timerange is not None 

161 lo, hi = timerange 

162 if isinstance(lo, timedelta): 

163 lo = math.ceil(current_unixtime_in_secs - lo.total_seconds()) 

164 if isinstance(hi, timedelta): 

165 hi = math.ceil(current_unixtime_in_secs - hi.total_seconds()) 

166 assert isinstance(lo, int) 

167 assert isinstance(hi, int) 

168 return (lo, hi) if lo <= hi else (hi, lo) 

169 

170 p, log = job.params, job.params.log 

171 current_unixtime_in_secs: float = p.create_src_snapshots_config.current_datetime.timestamp() 

172 resultset: set[str] = set() 

173 for snapshot_filter in p.snapshot_filters: 

174 snapshots: list[str] = basis_snapshots 

175 for _filter in snapshot_filter: 

176 name: str = _filter.name 

177 if name == SNAPSHOT_REGEX_FILTER_NAME: 

178 snapshots = _filter_snapshots_by_regex( 

179 job, snapshots, regexes=_filter.options, filter_bookmarks=filter_bookmarks 

180 ) 

181 elif name == "include_snapshot_times": 

182 timerange = resolve_timerange(_filter.timerange) if _filter.timerange is not None else _filter.timerange 

183 snapshots = _filter_snapshots_by_creation_time( 

184 job, snapshots, include_snapshot_times=timerange, filter_bookmarks=filter_bookmarks 

185 ) 

186 else: 

187 assert name == "include_snapshot_times_and_ranks" 

188 timerange = resolve_timerange(_filter.timerange) if _filter.timerange is not None else _filter.timerange 

189 snapshots = _filter_snapshots_by_creation_time_and_rank( 

190 job, 

191 snapshots, 

192 include_snapshot_times=timerange, 

193 include_snapshot_ranks=_filter.options, 

194 filter_bookmarks=filter_bookmarks, 

195 ) 

196 resultset.update(snapshots) # union 

197 

198 no_f_bookmarks: bool = not filter_bookmarks 

199 snapshots = [line for line in basis_snapshots if (no_f_bookmarks and "#" in line) or ((line in resultset) != all_except)] 

200 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

201 for snapshot in snapshots: 

202 if is_debug: 

203 log.debug("Finally included snapshot: %s", snapshot[snapshot.rindex("\t") + 1 :]) 

204 return snapshots 

205 

206 

207def _filter_snapshots_by_regex( 

208 job: Job, snapshots: list[str], regexes: tuple[RegexList, RegexList], filter_bookmarks: bool = False 

209) -> list[str]: 

210 """Returns all snapshots that match at least one of the include regexes but none of the exclude regexes. 

211 

212 Precondition: Each line is TSV of the form ...guid\tname. Regexes are applied to the snapshot or bookmark tag portion 

213 of `name` (after '@' or, if `filter_bookmarks=True`, after '#'). 

214 """ 

215 exclude_snapshot_regexes, include_snapshot_regexes = regexes 

216 log = job.params.log 

217 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

218 results: list[str] = [] 

219 for snapshot in snapshots: 

220 i = snapshot.find("@") # snapshot separator 

221 if i < 0 and filter_bookmarks: 

222 i = snapshot.index("#") # bookmark separator 

223 if i < 0: 

224 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots 

225 elif is_included(snapshot[i + 1 :], include_snapshot_regexes, exclude_snapshot_regexes): 

226 results.append(snapshot) 

227 if is_debug: 

228 log.debug("Including b/c snapshot regex: %s", snapshot[snapshot.rindex("\t") + 1 :]) 

229 else: 

230 if is_debug: 

231 log.debug("Excluding b/c snapshot regex: %s", snapshot[snapshot.rindex("\t") + 1 :]) 

232 return results 

233 

234 

235def _filter_snapshots_by_creation_time( 

236 job: Job, snapshots: list[str], include_snapshot_times: UnixTimeRange, filter_bookmarks: bool = False 

237) -> list[str]: 

238 """Filters snapshots to those created within the specified time window. 

239 

240 Precondition: Each line is TSV of the form creation\t...\tname. The creation column (first field) is compared against 

241 [lo, hi). Bookmarks are skipped unless `filter_bookmarks=True`. 

242 """ 

243 log = job.params.log 

244 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

245 lo_snaptime, hi_snaptime = include_snapshot_times or (0, UNIX_TIME_INFINITY_SECS) 

246 assert isinstance(lo_snaptime, int) 

247 assert isinstance(hi_snaptime, int) 

248 results: list[str] = [] 

249 for snapshot in snapshots: 

250 if (not filter_bookmarks) and "@" not in snapshot: 

251 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots 

252 elif lo_snaptime <= int(snapshot[0 : snapshot.index("\t")]) < hi_snaptime: 

253 results.append(snapshot) 

254 if is_debug: 

255 log.debug("Including b/c creation time: %s", snapshot[snapshot.rindex("\t") + 1 :]) 

256 else: 

257 if is_debug: 

258 log.debug("Excluding b/c creation time: %s", snapshot[snapshot.rindex("\t") + 1 :]) 

259 return results 

260 

261 

262def _filter_snapshots_by_creation_time_and_rank( 

263 job: Job, 

264 snapshots: list[str], 

265 include_snapshot_times: UnixTimeRange, 

266 include_snapshot_ranks: list[RankRange], 

267 filter_bookmarks: bool = False, 

268) -> list[str]: 

269 """Filters by creation time and rank within the snapshot list. 

270 

271 Precondition: Each line is TSV of the form creation\t...\tname. The creation column (first field) is compared against 

272 [lo, hi). Bookmarks are skipped unless `filter_bookmarks=True`. 

273 """ 

274 

275 def get_idx(rank: tuple[str, int, bool], n: int) -> int: 

276 """Returns index for rank tuple (kind, value, percent).""" 

277 kind, num, is_percent = rank 

278 m = round(n * num / 100) if is_percent else min(n, num) 

279 assert kind == "latest" or kind == "oldest" 

280 return m if kind == "oldest" else n - m 

281 

282 assert isinstance(include_snapshot_ranks, list) 

283 assert len(include_snapshot_ranks) > 0 

284 log = job.params.log 

285 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

286 lo_time, hi_time = include_snapshot_times or (0, UNIX_TIME_INFINITY_SECS) 

287 assert isinstance(lo_time, int) 

288 assert isinstance(hi_time, int) 

289 n = sum(1 for snapshot in snapshots if "@" in snapshot) 

290 for rank_range in include_snapshot_ranks: 

291 lo_rank, hi_rank = rank_range 

292 lo: int = get_idx(lo_rank, n) 

293 hi: int = get_idx(hi_rank, n) 

294 lo, hi = (lo, hi) if lo <= hi else (hi, lo) 

295 i: int = 0 

296 results: list[str] = [] 

297 for snapshot in snapshots: 

298 is_snapshot = "@" in snapshot 

299 if (not filter_bookmarks) and not is_snapshot: 

300 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots 

301 else: 

302 msg = None 

303 if is_snapshot and lo <= i < hi: 

304 msg = "Including b/c snapshot rank: %s" 

305 elif lo_time <= int(snapshot[0 : snapshot.index("\t")]) < hi_time: 

306 msg = "Including b/c creation time: %s" 

307 if msg: 

308 results.append(snapshot) 

309 else: 

310 msg = "Excluding b/c snapshot rank: %s" 

311 if is_debug: 

312 log.debug(msg, snapshot[snapshot.rindex("\t") + 1 :]) 

313 i += 1 if is_snapshot else 0 

314 snapshots = results 

315 n = hi - lo 

316 return snapshots 

317 

318 

319def filter_properties( 

320 p: Params, props: dict[str, str | None], include_regexes: RegexList, exclude_regexes: RegexList 

321) -> dict[str, str | None]: 

322 """Returns ZFS props whose name matches at least one of the include regexes but none of the exclude regexes.""" 

323 log = p.log 

324 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

325 results: dict[str, str | None] = {} 

326 for propname, propvalue in props.items(): 

327 if is_included(propname, include_regexes, exclude_regexes): 

328 results[propname] = propvalue 

329 if is_debug: 

330 log.debug("Including b/c property regex: %s", propname) 

331 else: 

332 if is_debug: 

333 log.debug("Excluding b/c property regex: %s", propname) 

334 return results 

335 

336 

337def filter_lines(input_list: Iterable[str], input_set: set[str]) -> list[str]: 

338 """For each line in input_list, includes the line if input_set contains the first column field of that line.""" 

339 if len(input_set) == 0: 

340 return [] 

341 return [line for line in input_list if line[0 : line.index("\t")] in input_set] 

342 

343 

344def filter_lines_except(input_list: list[str], input_set: set[str]) -> list[str]: 

345 """For each line in input_list, includes the line if input_set does not contain the first column field of that line.""" 

346 if len(input_set) == 0: 

347 return input_list 

348 return [line for line in input_list if line[0 : line.index("\t")] not in input_set] 

349 

350 

351def dataset_regexes(src: Remote, dst: Remote, datasets: list[str]) -> list[str]: 

352 """Converts dataset paths to regex strings relative to src or dst roots.""" 

353 results: list[str] = [] 

354 for dataset in datasets: 

355 if dataset.startswith("/"): 

356 # it's an absolute dataset - convert it to a relative dataset 

357 dataset = dataset[1:] 

358 if is_descendant(dataset, of_root_dataset=src.root_dataset): 

359 dataset = relativize_dataset(dataset, src.root_dataset) 

360 elif is_descendant(dataset, of_root_dataset=dst.root_dataset): 

361 dataset = relativize_dataset(dataset, dst.root_dataset) 

362 else: 

363 continue # ignore datasets that make no difference 

364 if dataset.startswith("/"): 

365 dataset = dataset[1:] 

366 if dataset.endswith("/"): 

367 dataset = dataset[0:-1] 

368 regex: str 

369 if dataset: 

370 regex = re.escape(dataset) 

371 else: 

372 regex = ".*" 

373 results.append(regex) 

374 return results