Coverage for bzfs_main/filter.py: 99%

218 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-06 13:30 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""The filter algorithms that apply include/exclude policies are in filter_datasets() and filter_snapshots().""" 

16 

17from __future__ import annotations 

18import math 

19import os 

20import re 

21import socket 

22from datetime import timedelta 

23from typing import ( 

24 TYPE_CHECKING, 

25 Iterable, 

26 Optional, 

27 Tuple, 

28 Union, 

29) 

30 

31from bzfs_main.connection import ( 

32 try_ssh_command, 

33) 

34from bzfs_main.utils import ( 

35 DONT_SKIP_DATASET, 

36 LOG_DEBUG, 

37 LOG_TRACE, 

38 UNIX_TIME_INFINITY_SECS, 

39 RegexList, 

40 is_descendant, 

41 is_included, 

42 relativize_dataset, 

43) 

44 

45if TYPE_CHECKING: # pragma: no cover - for type hints only 

46 from bzfs_main.bzfs import Job 

47 from bzfs_main.configuration import Params, Remote 

48 

49# constants: 

50SNAPSHOT_REGEX_FILTER_NAME: str = "snapshot_regex" 

51SNAPSHOT_REGEX_FILTER_NAMES: frozenset[str] = frozenset({"include_snapshot_regex", "exclude_snapshot_regex"}) 

52 

53 

54UnixTimeRange = Optional[Tuple[Union[timedelta, int], Union[timedelta, int]]] # Type alias 

55RankRange = Tuple[Tuple[str, int, bool], Tuple[str, int, bool]] # Type alias 

56 

57 

58def filter_datasets(job: Job, remote: Remote, sorted_datasets: list[str]) -> list[str]: 

59 """Returns all datasets (and their descendants) that match at least one of the include regexes but none of the exclude 

60 regexes. 

61 

62 Assumes the list of input datasets is sorted. The list of output datasets will be sorted too. 

63 """ 

64 p, log = job.params, job.params.log 

65 results: list[str] = [] 

66 for i, dataset in enumerate(sorted_datasets): 

67 if i == 0 and p.skip_parent: 

68 continue 

69 rel_dataset: str = relativize_dataset(dataset, remote.root_dataset) 

70 if rel_dataset.startswith("/"): 

71 rel_dataset = rel_dataset[1:] # strip leading '/' char if any 

72 if is_included(rel_dataset, p.include_dataset_regexes, p.exclude_dataset_regexes): 

73 results.append(dataset) 

74 log.debug("Including b/c dataset regex: %s", dataset) 

75 else: 

76 log.debug("Excluding b/c dataset regex: %s", dataset) 

77 if p.exclude_dataset_property: 

78 results = _filter_datasets_by_exclude_property(job, remote, results) 

79 is_debug: bool = p.log.isEnabledFor(LOG_DEBUG) 

80 for dataset in results: 

81 if is_debug: 

82 log.debug("Finally included %s dataset: %s", remote.location, dataset) 

83 if job.is_test_mode: 

84 assert results == sorted(results), "List is not sorted" 

85 # Asserts the following: If a dataset is excluded its descendants are automatically excluded too, and this 

86 # decision is never reconsidered even for the descendants because exclude takes precedence over include. 

87 resultset: set[str] = set(results) 

88 root_datasets: list[str] = [dataset for dataset in results if os.path.dirname(dataset) not in resultset] # no parent 

89 for root in root_datasets: # each root is not a descendant of another dataset 

90 assert not any(is_descendant(root, of_root_dataset=dataset) for dataset in results if dataset != root) 

91 for dataset in results: # each dataset belongs to a subtree rooted at one of the roots 

92 assert any(is_descendant(dataset, of_root_dataset=root) for root in root_datasets) 

93 return results 

94 

95 

96def _filter_datasets_by_exclude_property(job: Job, remote: Remote, sorted_datasets: list[str]) -> list[str]: 

97 """Excludes datasets that are marked with a ZFS user property value that, in effect, says 'skip me'.""" 

98 p, log = job.params, job.params.log 

99 results: list[str] = [] 

100 localhostname: str | None = None 

101 skip_dataset: str = DONT_SKIP_DATASET 

102 for dataset in sorted_datasets: 

103 if is_descendant(dataset, of_root_dataset=skip_dataset): 

104 # skip_dataset shall be ignored or has been deleted by some third party while we're running 

105 continue # nothing to do anymore for this dataset subtree (note that datasets is sorted) 

106 skip_dataset = DONT_SKIP_DATASET 

107 # TODO perf: on zfs >= 2.3 use json via zfs list -j to safely merge all zfs list's into one 'zfs list' call 

108 cmd = p.split_args(f"{p.zfs_program} list -t filesystem,volume -Hp -o {p.exclude_dataset_property}", dataset) 

109 job.maybe_inject_delete(remote, dataset=dataset, delete_trigger="zfs_list_exclude_property") 

110 property_value: str | None = try_ssh_command(job, remote, LOG_TRACE, cmd=cmd) 

111 if property_value is None: 

112 log.warning(f"Third party deleted {remote.location}: %s", dataset) 

113 skip_dataset = dataset 

114 else: 

115 reason: str = "" 

116 property_value = property_value.strip() 

117 sync: bool 

118 if not property_value or property_value == "-" or property_value.lower() == "true": 

119 sync = True 

120 elif property_value.lower() == "false": 

121 sync = False 

122 else: 

123 localhostname = localhostname or socket.gethostname() 

124 sync = any(localhostname == hostname.strip() for hostname in property_value.split(",")) 

125 reason = f", localhostname: {localhostname}, hostnames: {property_value}" 

126 

127 if sync: 

128 results.append(dataset) 

129 log.debug("Including b/c dataset prop: %s%s", dataset, reason) 

130 else: 

131 skip_dataset = dataset 

132 log.debug("Excluding b/c dataset prop: %s%s", dataset, reason) 

133 return results 

134 

135 

136def filter_snapshots( 

137 job: Job, basis_snapshots: list[str], all_except: bool = False, filter_bookmarks: bool = False 

138) -> list[str]: 

139 """Returns all snapshots that pass all include/exclude policies. 

140 

141 `all_except=False` returns snapshots *matching* the filters, for example those that should be deleted if we are in 

142 "delete selected" mode. 

143 

144 `all_except=True` returns snapshots *not* matching the filters, for example those that should be deleted if we are in 

145 "retain selected" mode. 

146 """ 

147 

148 def resolve_timerange(timerange: UnixTimeRange) -> UnixTimeRange: 

149 """Converts relative timerange values to UTC Unix time in integer seconds.""" 

150 assert timerange is not None 

151 lo, hi = timerange 

152 if isinstance(lo, timedelta): 

153 lo = math.ceil(current_unixtime_in_secs - lo.total_seconds()) 

154 if isinstance(hi, timedelta): 

155 hi = math.ceil(current_unixtime_in_secs - hi.total_seconds()) 

156 assert isinstance(lo, int) 

157 assert isinstance(hi, int) 

158 return (lo, hi) if lo <= hi else (hi, lo) 

159 

160 p, log = job.params, job.params.log 

161 current_unixtime_in_secs: float = p.create_src_snapshots_config.current_datetime.timestamp() 

162 resultset: set[str] = set() 

163 for snapshot_filter in p.snapshot_filters: 

164 snapshots: list[str] = basis_snapshots 

165 for _filter in snapshot_filter: 

166 name: str = _filter.name 

167 if name == SNAPSHOT_REGEX_FILTER_NAME: 

168 snapshots = _filter_snapshots_by_regex( 

169 job, snapshots, regexes=_filter.options, filter_bookmarks=filter_bookmarks 

170 ) 

171 elif name == "include_snapshot_times": 

172 timerange = resolve_timerange(_filter.timerange) if _filter.timerange is not None else _filter.timerange 

173 snapshots = _filter_snapshots_by_creation_time( 

174 job, snapshots, include_snapshot_times=timerange, filter_bookmarks=filter_bookmarks 

175 ) 

176 else: 

177 assert name == "include_snapshot_times_and_ranks" 

178 timerange = resolve_timerange(_filter.timerange) if _filter.timerange is not None else _filter.timerange 

179 snapshots = _filter_snapshots_by_creation_time_and_rank( 

180 job, 

181 snapshots, 

182 include_snapshot_times=timerange, 

183 include_snapshot_ranks=_filter.options, 

184 filter_bookmarks=filter_bookmarks, 

185 ) 

186 resultset.update(snapshots) # union 

187 

188 no_f_bookmarks: bool = not filter_bookmarks 

189 snapshots = [line for line in basis_snapshots if (no_f_bookmarks and "#" in line) or ((line in resultset) != all_except)] 

190 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

191 for snapshot in snapshots: 

192 if is_debug: 

193 log.debug("Finally included snapshot: %s", snapshot[snapshot.rindex("\t") + 1 :]) 

194 return snapshots 

195 

196 

197def _filter_snapshots_by_regex( 

198 job: Job, snapshots: list[str], regexes: tuple[RegexList, RegexList], filter_bookmarks: bool = False 

199) -> list[str]: 

200 """Returns all snapshots that match at least one of the include regexes but none of the exclude regexes.""" 

201 exclude_snapshot_regexes, include_snapshot_regexes = regexes 

202 log = job.params.log 

203 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

204 results: list[str] = [] 

205 for snapshot in snapshots: 

206 i = snapshot.find("@") # snapshot separator 

207 if i < 0 and filter_bookmarks: 

208 i = snapshot.index("#") # bookmark separator 

209 if i < 0: 

210 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots 

211 elif is_included(snapshot[i + 1 :], include_snapshot_regexes, exclude_snapshot_regexes): 

212 results.append(snapshot) 

213 if is_debug: 

214 log.debug("Including b/c snapshot regex: %s", snapshot[snapshot.rindex("\t") + 1 :]) 

215 else: 

216 if is_debug: 

217 log.debug("Excluding b/c snapshot regex: %s", snapshot[snapshot.rindex("\t") + 1 :]) 

218 return results 

219 

220 

221def _filter_snapshots_by_creation_time( 

222 job: Job, snapshots: list[str], include_snapshot_times: UnixTimeRange, filter_bookmarks: bool = False 

223) -> list[str]: 

224 """Filters snapshots to those created within the specified time window.""" 

225 log = job.params.log 

226 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

227 lo_snaptime, hi_snaptime = include_snapshot_times or (0, UNIX_TIME_INFINITY_SECS) 

228 assert isinstance(lo_snaptime, int) 

229 assert isinstance(hi_snaptime, int) 

230 results: list[str] = [] 

231 for snapshot in snapshots: 

232 if (not filter_bookmarks) and "@" not in snapshot: 

233 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots 

234 elif lo_snaptime <= int(snapshot[0 : snapshot.index("\t")]) < hi_snaptime: 

235 results.append(snapshot) 

236 if is_debug: 236 ↛ 231line 236 didn't jump to line 231 because the condition on line 236 was always true

237 log.debug("Including b/c creation time: %s", snapshot[snapshot.rindex("\t") + 1 :]) 

238 else: 

239 if is_debug: 239 ↛ 231line 239 didn't jump to line 231 because the condition on line 239 was always true

240 log.debug("Excluding b/c creation time: %s", snapshot[snapshot.rindex("\t") + 1 :]) 

241 return results 

242 

243 

244def _filter_snapshots_by_creation_time_and_rank( 

245 job: Job, 

246 snapshots: list[str], 

247 include_snapshot_times: UnixTimeRange, 

248 include_snapshot_ranks: list[RankRange], 

249 filter_bookmarks: bool = False, 

250) -> list[str]: 

251 """Filters by creation time and rank within the snapshot list.""" 

252 

253 def get_idx(rank: tuple[str, int, bool], n: int) -> int: 

254 """Returns index for rank tuple (kind, value, percent).""" 

255 kind, num, is_percent = rank 

256 m = round(n * num / 100) if is_percent else min(n, num) 

257 assert kind == "latest" or kind == "oldest" 

258 return m if kind == "oldest" else n - m 

259 

260 assert isinstance(include_snapshot_ranks, list) 

261 assert len(include_snapshot_ranks) > 0 

262 log = job.params.log 

263 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

264 lo_time, hi_time = include_snapshot_times or (0, UNIX_TIME_INFINITY_SECS) 

265 assert isinstance(lo_time, int) 

266 assert isinstance(hi_time, int) 

267 n = sum(1 for snapshot in snapshots if "@" in snapshot) 

268 for rank_range in include_snapshot_ranks: 

269 lo_rank, hi_rank = rank_range 

270 lo: int = get_idx(lo_rank, n) 

271 hi: int = get_idx(hi_rank, n) 

272 lo, hi = (lo, hi) if lo <= hi else (hi, lo) 

273 i: int = 0 

274 results: list[str] = [] 

275 for snapshot in snapshots: 

276 is_snapshot = "@" in snapshot 

277 if (not filter_bookmarks) and not is_snapshot: 

278 continue # retain bookmarks to help find common snapshots, apply filter only to snapshots 

279 else: 

280 msg = None 

281 if is_snapshot and lo <= i < hi: 

282 msg = "Including b/c snapshot rank: %s" 

283 elif lo_time <= int(snapshot[0 : snapshot.index("\t")]) < hi_time: 

284 msg = "Including b/c creation time: %s" 

285 if msg: 

286 results.append(snapshot) 

287 else: 

288 msg = "Excluding b/c snapshot rank: %s" 

289 if is_debug: 289 ↛ 291line 289 didn't jump to line 291 because the condition on line 289 was always true

290 log.debug(msg, snapshot[snapshot.rindex("\t") + 1 :]) 

291 i += 1 if is_snapshot else 0 

292 snapshots = results 

293 n = hi - lo 

294 return snapshots 

295 

296 

297def filter_properties( 

298 p: Params, props: dict[str, str | None], include_regexes: RegexList, exclude_regexes: RegexList 

299) -> dict[str, str | None]: 

300 """Returns ZFS props whose name matches at least one of the include regexes but none of the exclude regexes.""" 

301 log = p.log 

302 is_debug: bool = log.isEnabledFor(LOG_DEBUG) 

303 results: dict[str, str | None] = {} 

304 for propname, propvalue in props.items(): 

305 if is_included(propname, include_regexes, exclude_regexes): 

306 results[propname] = propvalue 

307 if is_debug: 

308 log.debug("Including b/c property regex: %s", propname) 

309 else: 

310 if is_debug: 

311 log.debug("Excluding b/c property regex: %s", propname) 

312 return results 

313 

314 

315def filter_lines(input_list: Iterable[str], input_set: set[str]) -> list[str]: 

316 """For each line in input_list, includes the line if input_set contains the first column field of that line.""" 

317 if len(input_set) == 0: 

318 return [] 

319 return [line for line in input_list if line[0 : line.index("\t")] in input_set] 

320 

321 

322def filter_lines_except(input_list: list[str], input_set: set[str]) -> list[str]: 

323 """For each line in input_list, includes the line if input_set does not contain the first column field of that line.""" 

324 if len(input_set) == 0: 

325 return input_list 

326 return [line for line in input_list if line[0 : line.index("\t")] not in input_set] 

327 

328 

329def dataset_regexes(src: Remote, dst: Remote, datasets: list[str]) -> list[str]: 

330 """Converts dataset paths to regex strings relative to src or dst roots.""" 

331 results: list[str] = [] 

332 for dataset in datasets: 

333 if dataset.startswith("/"): 

334 # it's an absolute dataset - convert it to a relative dataset 

335 dataset = dataset[1:] 

336 if is_descendant(dataset, of_root_dataset=src.root_dataset): 

337 dataset = relativize_dataset(dataset, src.root_dataset) 

338 elif is_descendant(dataset, of_root_dataset=dst.root_dataset): 

339 dataset = relativize_dataset(dataset, dst.root_dataset) 

340 else: 

341 continue # ignore datasets that make no difference 

342 if dataset.startswith("/"): 

343 dataset = dataset[1:] 

344 if dataset.endswith("/"): 

345 dataset = dataset[0:-1] 

346 regex: str 

347 if dataset: 

348 regex = re.escape(dataset) 

349 else: 

350 regex = ".*" 

351 results.append(regex) 

352 return results