Coverage for bzfs_main / argparse_actions.py: 100%

330 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-24 10:16 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Custom argparse actions shared by the 'bzfs' and 'bzfs_jobrunner' CLIs; These helpers validate and expand complex command 

16line syntax such as +file references, dataset pairs, and snapshot filters.""" 

17 

18from __future__ import ( 

19 annotations, 

20) 

21import argparse 

22import ast 

23import os 

24import re 

25from dataclasses import ( 

26 dataclass, 

27 field, 

28) 

29from datetime import ( 

30 timedelta, 

31) 

32from typing import ( 

33 Any, 

34 cast, 

35 final, 

36) 

37 

38from bzfs_main.filter import ( 

39 SNAPSHOT_FILTERS_VAR, 

40 SNAPSHOT_REGEX_FILTER_NAME, 

41 SNAPSHOT_REGEX_FILTER_NAMES, 

42 RankRange, 

43 UnixTimeRange, 

44) 

45from bzfs_main.util.check_range import ( 

46 CheckRange, 

47) 

48from bzfs_main.util.utils import ( 

49 SHELL_CHARS, 

50 UNIX_TIME_INFINITY_SECS, 

51 YEAR_WITH_FOUR_DIGITS_REGEX, 

52 RegexList, 

53 SnapshotPeriods, 

54 die, 

55 ninfix, 

56 nprefix, 

57 nsuffix, 

58 open_nofollow, 

59 parse_duration_to_milliseconds, 

60 unixtime_fromisoformat, 

61) 

62 

63 

64############################################################################# 

65@dataclass(order=True) 

66@final 

67class SnapshotFilter: 

68 """Represents a snapshot filter with matching options and time range.""" 

69 

70 name: str 

71 timerange: UnixTimeRange # defined in bzfs_main.filter 

72 options: None | list[RankRange] | list[str] | tuple[list[str], list[str]] | tuple[RegexList, RegexList] = field( 

73 compare=False, default=None 

74 ) 

75 

76 

77def _add_snapshot_filter(args: argparse.Namespace, _filter: SnapshotFilter) -> None: 

78 """Appends snapshot filter to namespace list, creating the list if absent.""" 

79 if not hasattr(args, SNAPSHOT_FILTERS_VAR): 

80 args.snapshot_filters_var = [[]] 

81 args.snapshot_filters_var[-1].append(_filter) 

82 

83 

84def _add_time_and_rank_snapshot_filter( 

85 args: argparse.Namespace, dst: str, timerange: UnixTimeRange, rankranges: list[RankRange] 

86) -> None: 

87 """Creates and adds a SnapshotFilter using timerange and rank ranges.""" 

88 if timerange is None or len(rankranges) == 0 or any(rankrange[0] == rankrange[1] for rankrange in rankranges): 

89 _add_snapshot_filter(args, SnapshotFilter("include_snapshot_times", timerange, None)) 

90 else: 

91 assert timerange is not None 

92 _add_snapshot_filter(args, SnapshotFilter(dst, timerange, rankranges)) 

93 

94 

95def has_timerange_filter(snapshot_filters: list[list[SnapshotFilter]]) -> bool: 

96 """Interacts with add_time_and_rank_snapshot_filter() and optimize_snapshot_filters().""" 

97 return any(f.timerange is not None for snapshot_filter in snapshot_filters for f in snapshot_filter) 

98 

99 

100def optimize_snapshot_filters(snapshot_filters: list[SnapshotFilter]) -> list[SnapshotFilter]: 

101 """Applies basic optimizations to the snapshot filter execution plan.""" 

102 _merge_adjacent_snapshot_filters(snapshot_filters) 

103 _merge_adjacent_snapshot_regexes(snapshot_filters) 

104 snapshot_filters = [f for f in snapshot_filters if f.timerange or f.options] 

105 _reorder_snapshot_time_filters(snapshot_filters) 

106 return snapshot_filters 

107 

108 

109def _merge_adjacent_snapshot_filters(snapshot_filters: list[SnapshotFilter]) -> None: 

110 """Merge adjacent filters of the same type if possible. 

111 

112 Merge filter operators of the same kind if they are next to each other and carry an option list, for example 

113 --include-snapshot-times-and-ranks and --include-snapshot-regex and --exclude-snapshot-regex. This improves execution 

114 perf and makes handling easier in later stages. 

115 Example: merges --include-snapshot-times-and-ranks notime oldest10% --include-snapshot-times-and-ranks notime latest20% 

116 into --include-snapshot-times-and-ranks notime oldest10% latest20% 

117 """ 

118 i = len(snapshot_filters) - 1 

119 while i >= 0: 

120 filter_i: SnapshotFilter = snapshot_filters[i] 

121 if isinstance(filter_i.options, list): 

122 j = i - 1 

123 if j >= 0 and snapshot_filters[j] == filter_i: 

124 lst: list = cast(list, snapshot_filters[j].options) 

125 assert isinstance(lst, list) 

126 assert isinstance(filter_i.options, list) 

127 lst.extend(filter_i.options) 

128 snapshot_filters.pop(i) 

129 i -= 1 

130 

131 

132def _merge_adjacent_snapshot_regexes(snapshot_filters: list[SnapshotFilter]) -> None: 

133 """Combine consecutive regex filters of the same type for efficiency.""" 

134 

135 # Merge regex filter operators of the same kind as long as they are within the same group, aka as long as they are not 

136 # separated by a non-regex filter. This improves execution perf and makes handling easier in later stages. 

137 # Example: --include-snapshot-regex .*daily --exclude-snapshot-regex .*weekly --include-snapshot-regex .*hourly 

138 # --exclude-snapshot-regex .*monthly 

139 # gets merged into the following: --include-snapshot-regex .*daily .*hourly --exclude-snapshot-regex .*weekly .*monthly 

140 i = len(snapshot_filters) - 1 

141 while i >= 0: 

142 filter_i: SnapshotFilter = snapshot_filters[i] 

143 if filter_i.name in SNAPSHOT_REGEX_FILTER_NAMES: 

144 assert isinstance(filter_i.options, list) 

145 j = i - 1 

146 while j >= 0 and snapshot_filters[j].name in SNAPSHOT_REGEX_FILTER_NAMES: 

147 if snapshot_filters[j].name == filter_i.name: 

148 lst: list = cast(list[str], snapshot_filters[j].options) 

149 assert isinstance(lst, list) 

150 assert isinstance(filter_i.options, list) 

151 lst.extend(filter_i.options) 

152 snapshot_filters.pop(i) 

153 break 

154 j -= 1 

155 i -= 1 

156 

157 # Merge --include-snapshot-regex and --exclude-snapshot-regex filters that are part of the same group (i.e. next to each 

158 # other) into a single combined filter operator that contains the info of both, and hence all info for the group, which 

159 # makes handling easier in later stages. 

160 # Example: --include-snapshot-regex .*daily .*hourly --exclude-snapshot-regex .*weekly .*monthly 

161 # gets merged into the following: snapshot-regex(excludes=[.*weekly, .*monthly], includes=[.*daily, .*hourly]) 

162 i = len(snapshot_filters) - 1 

163 while i >= 0: 

164 filter_i = snapshot_filters[i] 

165 name: str = filter_i.name 

166 if name in SNAPSHOT_REGEX_FILTER_NAMES: 

167 j = i - 1 

168 if j >= 0 and snapshot_filters[j].name in SNAPSHOT_REGEX_FILTER_NAMES: 

169 filter_j = snapshot_filters[j] 

170 assert filter_j.name != name 

171 snapshot_filters.pop(i) 

172 i -= 1 

173 else: 

174 name_j: str = next(iter(SNAPSHOT_REGEX_FILTER_NAMES.difference({name}))) 

175 filter_j = SnapshotFilter(name_j, None, []) 

176 sorted_filters: list[SnapshotFilter] = sorted([filter_i, filter_j]) 

177 exclude_regexes = cast(list[str], sorted_filters[0].options) 

178 include_regexes = cast(list[str], sorted_filters[1].options) 

179 snapshot_filters[i] = SnapshotFilter(SNAPSHOT_REGEX_FILTER_NAME, None, (exclude_regexes, include_regexes)) 

180 i -= 1 

181 

182 

183def _reorder_snapshot_time_filters(snapshot_filters: list[SnapshotFilter]) -> None: 

184 """Reorder time filters before regex filters within execution plan sections. 

185 

186 In an execution plan that contains filter operators based on sort order (the --include-snapshot-times-and-ranks operator 

187 with non-empty ranks), filters cannot freely be reordered without violating correctness, but they can still be partially 

188 reordered for better execution performance. 

189 

190 The filter list is partitioned into sections such that sections are separated by --include-snapshot-times-and-ranks 

191 operators with non-empty ranks. Within each section, we move include_snapshot_times operators aka 

192 --include-snapshot-times-and-ranks operators with empty ranks before --include/exclude-snapshot-regex operators because 

193 the former involves fast integer comparisons and the latter involves more expensive regex matching. 

194 

195 Example: reorders --include-snapshot-regex .*daily --include-snapshot-times-and-ranks 2024-01-01..2024-04-01 into 

196 --include-snapshot-times-and-ranks 2024-01-01..2024-04-01 --include-snapshot-regex .*daily 

197 """ 

198 

199 def reorder_time_filters_within_section(i: int, j: int) -> None: 

200 while j > i: 

201 filter_j: SnapshotFilter = snapshot_filters[j] 

202 if filter_j.name == "include_snapshot_times": 

203 snapshot_filters.pop(j) 

204 snapshot_filters.insert(i + 1, filter_j) 

205 j -= 1 

206 

207 i = len(snapshot_filters) - 1 

208 j = i 

209 while i >= 0: 

210 name: str = snapshot_filters[i].name 

211 if name == "include_snapshot_times_and_ranks": 

212 reorder_time_filters_within_section(i, j) 

213 j = i - 1 

214 i -= 1 

215 reorder_time_filters_within_section(i, j) 

216 

217 

218def validate_no_argument_file( 

219 path: str, namespace: argparse.Namespace, err_prefix: str, parser: argparse.ArgumentParser | None = None 

220) -> None: 

221 """Checks that command line options do not include +file when disabled.""" 

222 if getattr(namespace, "no_argument_file", False): 

223 die(f"{err_prefix}Argument file inclusion is disabled: {path}", parser=parser) 

224 

225 

226############################################################################# 

227@final 

228class NonEmptyStringAction(argparse.Action): 

229 """Argparse action rejecting empty string values.""" 

230 

231 def __call__( 

232 self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Any, option_string: str | None = None 

233 ) -> None: 

234 """Strip whitespace and reject empty values.""" 

235 values = values.strip() 

236 if values == "": 

237 parser.error(f"{option_string}: Empty string is not valid") 

238 setattr(namespace, self.dest, values) 

239 

240 

241############################################################################# 

242@final 

243class DatasetPairsAction(argparse.Action): 

244 """Parses alternating source/destination dataset arguments.""" 

245 

246 def __call__( 

247 self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Any, option_string: str | None = None 

248 ) -> None: 

249 """Validates dataset pair arguments and expand '+file' notation.""" 

250 datasets: list[str] = [] 

251 err_prefix: str = f"{option_string or self.dest}: " 

252 

253 for value in values: 

254 if not value.startswith("+"): 

255 datasets.append(value) 

256 else: 

257 path: str = value[1:] 

258 validate_no_argument_file(path, namespace, err_prefix=err_prefix, parser=parser) 

259 if "bzfs_argument_file" not in os.path.basename(path): 

260 parser.error(f"{err_prefix}basename must contain substring 'bzfs_argument_file': {path}") 

261 try: 

262 with open_nofollow(path, "r", encoding="utf-8") as fd: 

263 for i, line in enumerate(fd.read().splitlines()): 

264 if line.startswith("#") or not line.strip(): 

265 continue 

266 splits: list[str] = line.split("\t", 1) 

267 if len(splits) <= 1: 

268 parser.error(f"{err_prefix}Line must contain tab-separated SRC_DATASET and DST_DATASET: {i}") 

269 src_root_dataset, dst_root_dataset = splits 

270 if not src_root_dataset.strip() or not dst_root_dataset.strip(): 

271 parser.error( 

272 f"{err_prefix}SRC_DATASET and DST_DATASET must not be empty or whitespace-only: {i}" 

273 ) 

274 datasets.append(src_root_dataset) 

275 datasets.append(dst_root_dataset) 

276 except OSError as e: 

277 parser.error(f"{err_prefix}{e}") 

278 

279 if len(datasets) % 2 != 0: 

280 parser.error(f"{err_prefix}Each SRC_DATASET must have a corresponding DST_DATASET: {datasets}") 

281 root_dataset_pairs: list[tuple[str, str]] = [(datasets[i], datasets[i + 1]) for i in range(0, len(datasets), 2)] 

282 setattr(namespace, self.dest, root_dataset_pairs) 

283 

284 

285############################################################################# 

286@final 

287class SSHConfigFileNameAction(argparse.Action): 

288 """Validates SSH config file argument contains no whitespace or shell chars.""" 

289 

290 def __call__( 

291 self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Any, option_string: str | None = None 

292 ) -> None: 

293 """Reject invalid file names with spaces or shell metacharacters.""" 

294 values = values.strip() 

295 if values == "": 

296 parser.error(f"{option_string}: Empty string is not valid") 

297 if any(char in SHELL_CHARS or char.isspace() for char in values): 

298 parser.error(f"{option_string}: Invalid file name '{values}': must not contain whitespace or special chars.") 

299 setattr(namespace, self.dest, values) 

300 

301 

302############################################################################# 

303@final 

304class SafeFileNameAction(argparse.Action): 

305 """Ensures filenames lack path separators and weird whitespace.""" 

306 

307 def __call__( 

308 self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Any, option_string: str | None = None 

309 ) -> None: 

310 """Rejects filenames containing path traversal or unusual whitespace.""" 

311 if ".." in values or "/" in values or "\\" in values: 

312 parser.error(f"{option_string}: Invalid file name '{values}': must not contain '..' or '/' or '\\'.") 

313 if any(char.isspace() and char != " " for char in values): 

314 parser.error(f"{option_string}: Invalid file name '{values}': must not contain whitespace other than space.") 

315 setattr(namespace, self.dest, values) 

316 

317 

318############################################################################# 

319@final 

320class SafeDirectoryNameAction(argparse.Action): 

321 """Validates directory name argument, allowing only simple spaces.""" 

322 

323 def __call__( 

324 self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Any, option_string: str | None = None 

325 ) -> None: 

326 """Rejects directory names with weird whitespace or emptiness.""" 

327 values = values.strip() 

328 if values == "": 

329 parser.error(f"{option_string}: Empty string is not valid") 

330 if any(char.isspace() and char != " " for char in values): 

331 parser.error(f"{option_string}: Invalid dir name '{values}': must not contain whitespace other than space.") 

332 setattr(namespace, self.dest, values) 

333 

334 

335############################################################################# 

336@final 

337class NewSnapshotFilterGroupAction(argparse.Action): 

338 """Starts a new filter group when seen in command line arguments.""" 

339 

340 def __call__( 

341 self, parser: argparse.ArgumentParser, args: argparse.Namespace, values: Any, option_string: str | None = None 

342 ) -> None: 

343 """Insert an empty group before adding new snapshot filters.""" 

344 if not hasattr(args, SNAPSHOT_FILTERS_VAR): 

345 args.snapshot_filters_var = [[]] 

346 elif len(args.snapshot_filters_var[-1]) > 0: 

347 args.snapshot_filters_var.append([]) 

348 

349 

350############################################################################# 

351@final 

352class FileOrLiteralAction(argparse.Action): 

353 """Allows '@file' style argument expansion with '+' prefix.""" 

354 

355 def __call__( 

356 self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Any, option_string: str | None = None 

357 ) -> None: 

358 """Expands file arguments and appends them to the namespace.""" 

359 

360 current_values: list[str] | None = getattr(namespace, self.dest, None) 

361 if current_values is None: 

362 current_values = [] 

363 extra_values: list[str] = [] 

364 err_prefix: str = f"{option_string or self.dest}: " 

365 for value in values: 

366 if not value.startswith("+"): 

367 extra_values.append(value) 

368 else: 

369 path: str = value[1:] 

370 validate_no_argument_file(path, namespace, err_prefix=err_prefix, parser=parser) 

371 if "bzfs_argument_file" not in os.path.basename(path): 

372 parser.error(f"{err_prefix}basename must contain substring 'bzfs_argument_file': {path}") 

373 try: 

374 with open_nofollow(path, "r", encoding="utf-8") as fd: 

375 for line in fd.read().splitlines(): 

376 if line.startswith("#") or not line.strip(): 

377 continue 

378 extra_values.append(line) 

379 except OSError as e: 

380 parser.error(f"{err_prefix}{e}") 

381 current_values += extra_values 

382 setattr(namespace, self.dest, current_values) 

383 if self.dest in SNAPSHOT_REGEX_FILTER_NAMES: 

384 _add_snapshot_filter(namespace, SnapshotFilter(self.dest, None, extra_values)) 

385 

386 

387############################################################################# 

388class IncludeSnapshotPlanAction(argparse.Action): 

389 """Parses include plan dictionaries from the command line.""" 

390 

391 def __call__( 

392 self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Any, option_string: str | None = None 

393 ) -> None: 

394 """Builds a list of snapshot filters from a serialized plan.""" 

395 opts: list[str] | None = getattr(namespace, self.dest, None) 

396 opts = [] if opts is None else opts 

397 if not self._add_opts(opts, parser, values, option_string=option_string): 

398 opts += ["--new-snapshot-filter-group", "--include-snapshot-regex=!.*"] 

399 setattr(namespace, self.dest, opts) 

400 

401 def _add_opts( 

402 self, 

403 opts: list[str], 

404 parser: argparse.ArgumentParser, 

405 values: str, 

406 option_string: str | None = None, 

407 ) -> bool: 

408 xperiods: SnapshotPeriods = SnapshotPeriods() 

409 has_at_least_one_filter_clause: bool = False 

410 for org, target_periods in ast.literal_eval(values).items(): 

411 prefix: str = re.escape(nprefix(org)) 

412 for target, periods in target_periods.items(): 

413 infix: str = re.escape(ninfix(target)) if target else YEAR_WITH_FOUR_DIGITS_REGEX.pattern 

414 for period_unit, period_amount in periods.items(): 

415 if not isinstance(period_amount, int) or period_amount < 0: 

416 parser.error(f"{option_string}: Period amount must be a non-negative integer: {period_amount}") 

417 suffix: str = re.escape(nsuffix(period_unit)) 

418 regex: str = f"{prefix}{infix}.*{suffix}" 

419 opts += ["--new-snapshot-filter-group", f"--include-snapshot-regex={regex}"] 

420 duration_amount, duration_unit = xperiods.suffix_to_duration0(period_unit) 

421 duration_unit_label: str | None = xperiods.period_labels.get(duration_unit) 

422 opts += [ 

423 "--include-snapshot-times-and-ranks", 

424 ( 

425 "notime" 

426 if duration_unit_label is None or duration_amount * period_amount == 0 

427 else f"{duration_amount * period_amount}{duration_unit_label}ago..anytime" 

428 ), 

429 f"latest{period_amount}", 

430 ] 

431 has_at_least_one_filter_clause = True 

432 return has_at_least_one_filter_clause 

433 

434 

435############################################################################# 

436@final 

437class DeleteDstSnapshotsExceptPlanAction(IncludeSnapshotPlanAction): 

438 """Specialized include plan used to decide which dst snapshots to keep.""" 

439 

440 def __call__( 

441 self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Any, option_string: str | None = None 

442 ) -> None: 

443 """Parses plan while preventing disasters.""" 

444 opts: list[str] | None = getattr(namespace, self.dest, None) 

445 opts = [] if opts is None else opts 

446 opts += ["--delete-dst-snapshots-except"] 

447 if not self._add_opts(opts, parser, values, option_string=option_string): 

448 parser.error( 

449 f"{option_string}: Cowardly refusing to delete all snapshots on" 

450 f"--delete-dst-snapshots-except-plan='{values}' (which means 'retain no snapshots' aka " 

451 "'delete all snapshots'). Assuming this is an unintended pilot error rather than intended carnage. " 

452 "Aborting. If this is really what is intended, use `--delete-dst-snapshots --include-snapshot-regex=.*` " 

453 "instead to force the deletion." 

454 ) 

455 setattr(namespace, self.dest, opts) 

456 

457 

458############################################################################# 

459@final 

460class TimeRangeAndRankRangeAction(argparse.Action): 

461 """Parses --include-snapshot-times-and-ranks option values.""" 

462 

463 def __call__( 

464 self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Any, option_string: str | None = None 

465 ) -> None: 

466 """Converts user-supplied time and rank ranges into snapshot filters.""" 

467 

468 def parse_time(time_spec: str) -> int | timedelta | None: 

469 time_spec = time_spec.strip() 

470 if time_spec == "*" or time_spec == "anytime": 

471 return None 

472 if time_spec.isdigit(): 

473 return int(time_spec) 

474 try: 

475 return timedelta(milliseconds=parse_duration_to_milliseconds(time_spec, regex_suffix=r"\s*ago")) 

476 except ValueError: 

477 try: 

478 return unixtime_fromisoformat(time_spec) 

479 except ValueError: 

480 parser.error(f"{option_string}: Invalid duration, Unix time, or ISO 8601 datetime: {time_spec}") 

481 

482 assert isinstance(values, list) 

483 assert len(values) > 0 

484 value: str = values[0].strip() 

485 if value == "notime": 

486 value = "0..0" 

487 if ".." not in value: 

488 parser.error(f"{option_string}: Invalid time range: Missing '..' separator: {value}") 

489 timerange_specs: list[int | timedelta | None] = [parse_time(time_spec) for time_spec in value.split("..", 1)] 

490 rankranges: list[RankRange] = self._parse_rankranges(parser, values[1:], option_string=option_string) 

491 setattr(namespace, self.dest, [timerange_specs] + rankranges) 

492 timerange: UnixTimeRange = self._get_include_snapshot_times(timerange_specs) 

493 _add_time_and_rank_snapshot_filter(namespace, self.dest, timerange, rankranges) 

494 

495 @staticmethod 

496 def _get_include_snapshot_times(times: list[timedelta | int | None]) -> UnixTimeRange: 

497 """Convert start and end times to ``UnixTimeRange`` for filtering.""" 

498 

499 def utc_unix_time_in_seconds(time_spec: timedelta | int | None, default: int) -> timedelta | int: 

500 if isinstance(time_spec, timedelta): 

501 return time_spec 

502 if isinstance(time_spec, int): 

503 return int(time_spec) 

504 return default 

505 

506 lo, hi = times 

507 if lo is None and hi is None: 

508 return None 

509 lo = utc_unix_time_in_seconds(lo, default=0) 

510 hi = utc_unix_time_in_seconds(hi, default=UNIX_TIME_INFINITY_SECS) 

511 if isinstance(lo, int) and isinstance(hi, int): 

512 return (lo, hi) if lo <= hi else (hi, lo) 

513 return lo, hi 

514 

515 @staticmethod 

516 def _parse_rankranges(parser: argparse.ArgumentParser, values: Any, option_string: str | None = None) -> list[RankRange]: 

517 """Parses rank range strings like 'latest 3..latest 5' into tuples.""" 

518 

519 def parse_rank(spec: str) -> tuple[bool, str, int, bool]: 

520 spec = spec.strip() 

521 if not (match := re.fullmatch(r"(all\s*except\s*)?(oldest|latest)\s*(\d+)%?", spec)): 

522 parser.error(f"{option_string}: Invalid rank format: {spec}") 

523 assert match 

524 is_except: bool = bool(match.group(1)) 

525 kind: str = match.group(2) 

526 num: int = int(match.group(3)) 

527 is_percent: bool = spec.endswith("%") 

528 if is_percent and num > 100: 

529 parser.error(f"{option_string}: Invalid rank: Percent must not be greater than 100: {spec}") 

530 return is_except, kind, num, is_percent 

531 

532 rankranges: list[RankRange] = [] 

533 for value in values: 

534 value = value.strip() 

535 if ".." in value: 

536 lo_split, hi_split = value.split("..", 1) 

537 lo = parse_rank(lo_split) 

538 hi = parse_rank(hi_split) 

539 if lo[0] or hi[0]: 

540 parser.error(f"{option_string}: Invalid rank range: {value}") 

541 if lo[1] != hi[1]: 

542 parser.error(f"{option_string}: Ambiguous rank range: Must not compare oldest with latest: {value}") 

543 else: 

544 hi = parse_rank(value) 

545 is_except, kind, num, is_percent = hi 

546 if is_except: 

547 if is_percent: 

548 negated_kind: str = "oldest" if kind == "latest" else "latest" 

549 lo = parse_rank(f"{negated_kind}0") 

550 hi = parse_rank(f"{negated_kind}{100-num}%") 

551 else: 

552 lo = parse_rank(f"{kind}{num}") 

553 hi = parse_rank(f"{kind}100%") 

554 else: 

555 lo = parse_rank(f"{kind}0") 

556 rankranges.append((lo[1:], hi[1:])) 

557 return rankranges 

558 

559 

560############################################################################# 

561@final 

562class CheckPercentRange(CheckRange): 

563 """Argparse action verifying percentages fall within 0-100.""" 

564 

565 def __call__( 

566 self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Any, option_string: str | None = None 

567 ) -> None: 

568 """Normalizes integer or percent values and store them.""" 

569 assert isinstance(values, str) 

570 original = values 

571 values = values.strip() 

572 is_percent: bool = values.endswith("%") 

573 if is_percent: 

574 values = values[0:-1] 

575 try: 

576 values = float(values) 

577 except ValueError: 

578 parser.error(f"{option_string}: Invalid percentage or number: {original}") 

579 super().__call__(parser, namespace, values, option_string=option_string) 

580 setattr(namespace, self.dest, (getattr(namespace, self.dest), is_percent))