Coverage for bzfs_main/filter.py: 100%

220 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-07 04:44 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""The filter algorithms that apply include/exclude policies are in filter_datasets() and filter_snapshots().""" 

16 

17from __future__ import ( 

18 annotations, 

19) 

20import math 

21import os 

22import re 

23from collections.abc import ( 

24 Iterable, 

25) 

26from datetime import ( 

27 timedelta, 

28) 

29from typing import ( 

30 TYPE_CHECKING, 

31 Final, 

32 Optional, 

33 Union, 

34) 

35 

36from bzfs_main.connection import ( 

37 try_ssh_command, 

38) 

39from bzfs_main.utils import ( 

40 DONT_SKIP_DATASET, 

41 LOG_DEBUG, 

42 LOG_TRACE, 

43 UNIX_TIME_INFINITY_SECS, 

44 RegexList, 

45 is_descendant, 

46 is_included, 

47 relativize_dataset, 

48) 

49 

50if TYPE_CHECKING: # pragma: no cover - for type hints only 

51 from bzfs_main.bzfs import ( 

52 Job, 

53 ) 

54 from bzfs_main.configuration import ( 

55 Params, 

56 Remote, 

57 ) 

58 

59# constants: 

60SNAPSHOT_REGEX_FILTER_NAME: Final[str] = "snapshot_regex" 

61SNAPSHOT_REGEX_FILTER_NAMES: Final[frozenset[str]] = frozenset({"include_snapshot_regex", "exclude_snapshot_regex"}) 

62 

63 

64UnixTimeRange = Optional[tuple[Union[timedelta, int], Union[timedelta, int]]] # Type alias 

65RankRange = tuple[tuple[str, int, bool], tuple[str, int, bool]] # Type alias 

66 

67 

68def filter_datasets(job: Job, remote: Remote, sorted_datasets: list[str]) -> list[str]: 

69 """Returns all datasets (and their descendants) that match at least one of the include regexes but none of the exclude 

70 regexes. 

71 

72 Assumes the list of input datasets is sorted. The list of output datasets will be sorted too. 

73 """ 

74 assert (not job.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted" 

75 p, log = job.params, job.params.log 

76 results: list[str] = [] 

77 for i, dataset in enumerate(sorted_datasets): 

78 if i == 0 and p.skip_parent: 

79 continue 

80 rel_dataset: str = relativize_dataset(dataset, remote.root_dataset) 

81 if rel_dataset.startswith("/"): 

82 rel_dataset = rel_dataset[1:] # strip leading '/' char if any 

83 if is_included(rel_dataset, p.include_dataset_regexes, p.exclude_dataset_regexes): 

84 results.append(dataset) 

85 log.debug("Including b/c dataset regex: %s", dataset) 

86 else: 

87 log.debug("Excluding b/c dataset regex: %s", dataset) 

88 if p.exclude_dataset_property: 

89 results = _filter_datasets_by_exclude_property(job, remote, results) 

90 is_debug: bool = p.log.isEnabledFor(LOG_DEBUG) 

91 for dataset in results: 

92 if is_debug: 

93 log.debug(f"Finally included {remote.location} dataset: %s", dataset) 

94 if job.is_test_mode: 

95 assert results == sorted(results), "List is not sorted" 

96 # Asserts the following: If a dataset is excluded its descendants are automatically excluded too, and this 

97 # decision is never reconsidered even for the descendants because exclude takes precedence over include. 

98 resultset: set[str] = set(results) 

99 root_datasets: list[str] = [dataset for dataset in results if os.path.dirname(dataset) not in resultset] # no parent 

100 for root in root_datasets: # each root is not a descendant of another dataset 

101 assert not any(is_descendant(root, of_root_dataset=dataset) for dataset in results if dataset != root) 

102 for dataset in results: # each dataset belongs to a subtree rooted at one of the roots 

103 assert any(is_descendant(dataset, of_root_dataset=root) for root in root_datasets) 

104 return results 

105 

106 

107def _filter_datasets_by_exclude_property(job: Job, remote: Remote, sorted_datasets: list[str]) -> list[str]: 

108 """Excludes datasets that are marked with a ZFS user property value that, in effect, says 'skip me'.""" 

109 p, log = job.params, job.params.log 

110 results: list[str] = [] 

111 localhostname: str | None = None 

112 skip_dataset: str = DONT_SKIP_DATASET 

113 for dataset in sorted_datasets: 

114 if is_descendant(dataset, of_root_dataset=skip_dataset): 

115 # skip_dataset shall be ignored or has been deleted by some third party while we're running 

116 continue # nothing to do anymore for this dataset subtree (note that datasets is sorted) 

117 skip_dataset = DONT_SKIP_DATASET 

118 # TODO perf: on zfs >= 2.3 use json via zfs list -j to safely merge all zfs list's into one 'zfs list' call 

119 cmd = p.split_args(f"{p.zfs_program} list -t filesystem,volume -Hp -o {p.exclude_dataset_property}", dataset) 

120 job.maybe_inject_delete(remote, dataset=dataset, delete_trigger="zfs_list_exclude_property") 

121 property_value: str | None = try_ssh_command(job, remote, LOG_TRACE, cmd=cmd) 

122 if property_value is None: 

123 log.warning(f"Third party deleted {remote.location}: %s", dataset) 

124 skip_dataset = dataset 

125 else: 

126 reason: str = "" 

127 property_value = property_value.strip() 

128 sync: bool 

129 if not property_value or property_value == "-" or property_value.lower() == "true": 

130 sync = True 

131 elif property_value.lower() == "false": 

132 sync = False 

133 else: 

134 import socket # lazy import for startup perf 

135 

136 localhostname = localhostname or socket.gethostname() 

137 sync = any(localhostname == hostname.strip() for hostname in property_value.split(",")) 

138 reason = f", localhostname: {localhostname}, hostnames: {property_value}" 

139 

140 if sync: 

141 results.append(dataset) 

142 log.debug("Including b/c dataset prop: %s%s", dataset, reason) 

143 else: 

144 skip_dataset = dataset 

145 log.debug("Excluding b/c dataset prop: %s%s", dataset, reason) 

146 return results 

147 

148 

149def filter_snapshots( 

150 job: Job, basis_snapshots: list[str], all_except: bool = False, filter_bookmarks: bool = False 

151) -> list[str]: 

152 """Returns all snapshots that pass all include/exclude policies. 

153 

154 Semantics: Within a single snapshot-filter group, filters are applied sequentially (logical AND). Across groups, 

155 results are union-ized (logical OR). Set `all_except=True` to invert the final selection (retain-selected vs 

156 delete-selected modes). Bookmarks: when `filter_bookmarks=False`, bookmark entries (with '#') are always retained to 

157 assist common-snapshot detection; when `True`, bookmarks are subject to the same filters as snapshots. 

158 """ 

159 

160 def resolve_timerange(timerange: UnixTimeRange) -> UnixTimeRange: 

161 """Converts relative timerange values to UTC Unix time in integer seconds.""" 

162 assert timerange is not None 

163 lo, hi = timerange 

164 if isinstance(lo, timedelta): 

165 lo = math.ceil(current_unixtime_in_secs - lo.total_seconds()) 

166 if isinstance(hi, timedelta): 

167 hi = math.ceil(current_unixtime_in_secs - hi.total_seconds()) 

168 assert isinstance(lo, int) 

169 assert isinstance(hi, int) 

170 return (lo, hi) if lo <= hi else (hi, lo) 

171 

172 p, log = job.params, job.params.log 

173 current_unixtime_in_secs: float = p.create_src_snapshots_config.current_datetime.timestamp() 

174 resultset: set[str] = set() 

175 for snapshot_filter in p.snapshot_filters: 

176 snapshots: list[str] = basis_snapshots 

177 for _filter in snapshot_filter: 

178 name: str = _filter.name 

179 if name == SNAPSHOT_REGEX_FILTER_NAME: 

180 snapshots = _filter_snapshots_by_regex( 

181 job, snapshots, regexes=_filter.options, filter_bookmarks=filter_bookmarks 

182 ) 

183 elif name == "include_snapshot_times": 

184 timerange = resolve_timerange(_filter.timerange) if _filter.timerange is not None else _filter.timerange 

185 snapshots = _filter_snapshots_by_creation_time( 

186 job, snapshots, include_snapshot_times=timerange, filter_bookmarks=filter_bookmarks 

187 ) 

188 else: 

189 assert name == "include_snapshot_times_and_ranks" 

190 timerange = resolve_timerange(_filter.timerange) if _filter.timerange is not None else _filter.timerange 

191 snapshots = _filter_snapshots_by_creation_time_and_rank( 

192 job, 

193 snapshots, 

194 include_snapshot_times=timerange, 

195 include_snapshot_ranks=_filter.options, 

196 filter_bookmarks=filter_bookmarks, 

197 ) 

198 resultset.update(snapshots) # union 

199 

200 no_f_bookmarks: bool = not filter_bookmarks 

201 snapshots = [line for line in basis_snapshots if (no_f_bookmarks and "#" in line) or ((line in resultset) != all_except)] 

202 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

203 for snapshot in snapshots: 

204 if is_debug: 

205 log.debug("Finally included snapshot: %s", snapshot[snapshot.rindex("\t") + 1 :]) 

206 return snapshots 

207 

208 

209def _filter_snapshots_by_regex( 

210 job: Job, snapshots: list[str], regexes: tuple[RegexList, RegexList], filter_bookmarks: bool = False 

211) -> list[str]: 

212 """Returns all snapshots that match at least one of the include regexes but none of the exclude regexes. 

213 

214 Precondition: Each line is TSV of the form ...guid\tname. Regexes are applied to the snapshot or bookmark tag portion 

215 of `name` (after '@' or, if `filter_bookmarks=True`, after '#'). 

216 """ 

217 exclude_snapshot_regexes, include_snapshot_regexes = regexes 

218 log = job.params.log 

219 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

220 results: list[str] = [] 

221 for snapshot in snapshots: 

222 i = snapshot.find("@") # snapshot separator 

223 if i < 0 and filter_bookmarks: 

224 i = snapshot.index("#") # bookmark separator 

225 if i < 0: 

226 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots 

227 elif is_included(snapshot[i + 1 :], include_snapshot_regexes, exclude_snapshot_regexes): 

228 results.append(snapshot) 

229 if is_debug: 

230 log.debug("Including b/c snapshot regex: %s", snapshot[snapshot.rindex("\t") + 1 :]) 

231 else: 

232 if is_debug: 

233 log.debug("Excluding b/c snapshot regex: %s", snapshot[snapshot.rindex("\t") + 1 :]) 

234 return results 

235 

236 

237def _filter_snapshots_by_creation_time( 

238 job: Job, snapshots: list[str], include_snapshot_times: UnixTimeRange, filter_bookmarks: bool = False 

239) -> list[str]: 

240 """Filters snapshots to those created within the specified time window. 

241 

242 Precondition: Each line is TSV of the form creation\t...\tname. The creation column (first field) is compared against 

243 [lo, hi). Bookmarks are skipped unless `filter_bookmarks=True`. 

244 """ 

245 log = job.params.log 

246 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

247 lo_snaptime, hi_snaptime = include_snapshot_times or (0, UNIX_TIME_INFINITY_SECS) 

248 assert isinstance(lo_snaptime, int) 

249 assert isinstance(hi_snaptime, int) 

250 results: list[str] = [] 

251 for snapshot in snapshots: 

252 if (not filter_bookmarks) and "@" not in snapshot: 

253 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots 

254 elif lo_snaptime <= int(snapshot[0 : snapshot.index("\t")]) < hi_snaptime: 

255 results.append(snapshot) 

256 if is_debug: 

257 log.debug("Including b/c creation time: %s", snapshot[snapshot.rindex("\t") + 1 :]) 

258 else: 

259 if is_debug: 

260 log.debug("Excluding b/c creation time: %s", snapshot[snapshot.rindex("\t") + 1 :]) 

261 return results 

262 

263 

264def _filter_snapshots_by_creation_time_and_rank( 

265 job: Job, 

266 snapshots: list[str], 

267 include_snapshot_times: UnixTimeRange, 

268 include_snapshot_ranks: list[RankRange], 

269 filter_bookmarks: bool = False, 

270) -> list[str]: 

271 """Filters by creation time and rank within the snapshot list. 

272 

273 Precondition: Each line is TSV of the form creation\t...\tname. The creation column (first field) is compared against 

274 [lo, hi). Bookmarks are skipped unless `filter_bookmarks=True`. 

275 """ 

276 

277 def get_idx(rank: tuple[str, int, bool], n: int) -> int: 

278 """Returns index for rank tuple (kind, value, percent).""" 

279 kind, num, is_percent = rank 

280 m = round(n * num / 100) if is_percent else min(n, num) 

281 assert kind == "latest" or kind == "oldest" 

282 return m if kind == "oldest" else n - m 

283 

284 assert isinstance(include_snapshot_ranks, list) 

285 assert len(include_snapshot_ranks) > 0 

286 log = job.params.log 

287 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

288 lo_time, hi_time = include_snapshot_times or (0, UNIX_TIME_INFINITY_SECS) 

289 assert isinstance(lo_time, int) 

290 assert isinstance(hi_time, int) 

291 n = sum(1 for snapshot in snapshots if "@" in snapshot) 

292 for rank_range in include_snapshot_ranks: 

293 lo_rank, hi_rank = rank_range 

294 lo: int = get_idx(lo_rank, n) 

295 hi: int = get_idx(hi_rank, n) 

296 lo, hi = (lo, hi) if lo <= hi else (hi, lo) 

297 i: int = 0 

298 results: list[str] = [] 

299 for snapshot in snapshots: 

300 is_snapshot = "@" in snapshot 

301 if (not filter_bookmarks) and not is_snapshot: 

302 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots 

303 else: 

304 msg = None 

305 if is_snapshot and lo <= i < hi: 

306 msg = "Including b/c snapshot rank: %s" 

307 elif lo_time <= int(snapshot[0 : snapshot.index("\t")]) < hi_time: 

308 msg = "Including b/c creation time: %s" 

309 if msg: 

310 results.append(snapshot) 

311 else: 

312 msg = "Excluding b/c snapshot rank: %s" 

313 if is_debug: 

314 log.debug(msg, snapshot[snapshot.rindex("\t") + 1 :]) 

315 i += 1 if is_snapshot else 0 

316 snapshots = results 

317 n = hi - lo 

318 return snapshots 

319 

320 

321def filter_properties( 

322 p: Params, props: dict[str, str | None], include_regexes: RegexList, exclude_regexes: RegexList 

323) -> dict[str, str | None]: 

324 """Returns ZFS props whose name matches at least one of the include regexes but none of the exclude regexes.""" 

325 log = p.log 

326 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

327 results: dict[str, str | None] = {} 

328 for propname, propvalue in props.items(): 

329 if is_included(propname, include_regexes, exclude_regexes): 

330 results[propname] = propvalue 

331 if is_debug: 

332 log.debug("Including b/c property regex: %s", propname) 

333 else: 

334 if is_debug: 

335 log.debug("Excluding b/c property regex: %s", propname) 

336 return results 

337 

338 

339def filter_lines(input_list: Iterable[str], input_set: set[str]) -> list[str]: 

340 """For each line in input_list, includes the line if input_set contains the first column field of that line.""" 

341 if len(input_set) == 0: 

342 return [] 

343 return [line for line in input_list if line[0 : line.index("\t")] in input_set] 

344 

345 

346def filter_lines_except(input_list: list[str], input_set: set[str]) -> list[str]: 

347 """For each line in input_list, includes the line if input_set does not contain the first column field of that line.""" 

348 if len(input_set) == 0: 

349 return input_list 

350 return [line for line in input_list if line[0 : line.index("\t")] not in input_set] 

351 

352 

353def dataset_regexes(src: Remote, dst: Remote, datasets: list[str]) -> list[str]: 

354 """Converts dataset paths to regex strings relative to src or dst roots.""" 

355 results: list[str] = [] 

356 for dataset in datasets: 

357 if dataset.startswith("/"): 

358 # it's an absolute dataset - convert it to a relative dataset 

359 dataset = dataset[1:] 

360 if is_descendant(dataset, of_root_dataset=src.root_dataset): 

361 dataset = relativize_dataset(dataset, src.root_dataset) 

362 elif is_descendant(dataset, of_root_dataset=dst.root_dataset): 

363 dataset = relativize_dataset(dataset, dst.root_dataset) 

364 else: 

365 continue # ignore datasets that make no difference 

366 if dataset.startswith("/"): 

367 dataset = dataset[1:] 

368 if dataset.endswith("/"): 

369 dataset = dataset[0:-1] 

370 regex: str 

371 if dataset: 

372 regex = re.escape(dataset) 

373 else: 

374 regex = ".*" 

375 results.append(regex) 

376 return results