Coverage for bzfs_main/snapshot_cache.py: 96%

108 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-06 13:30 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Stores snapshot metadata in fast disk inodes to avoid repeated 'zfs list' calls, without adding external dependencies or 

16complex databases.""" 

17 

18from __future__ import annotations 

19import os 

20from collections import defaultdict 

21from os import stat as os_stat 

22from os import utime as os_utime 

23from os.path import exists as os_path_exists 

24from os.path import join as os_path_join 

25from subprocess import CalledProcessError 

26from typing import TYPE_CHECKING 

27 

28from bzfs_main.connection import ( 

29 run_ssh_command, 

30) 

31from bzfs_main.detect import ( 

32 is_caching_snapshots, 

33) 

34from bzfs_main.parallel_batch_cmd import ( 

35 itr_ssh_cmd_parallel, 

36) 

37from bzfs_main.utils import ( 

38 SortedInterner, 

39 stderr_to_str, 

40) 

41 

42if TYPE_CHECKING: # pragma: no cover - for type hints only 

43 from bzfs_main.bzfs import Job 

44 from bzfs_main.configuration import Remote, SnapshotLabel 

45 

46 

47class SnapshotCache: 

48 """Handles last-modified cache operations for snapshot management.""" 

49 

50 def __init__(self, job: Job) -> None: 

51 # immutable variables: 

52 self.job: Job = job 

53 

54 def get_snapshots_changed(self, path: str) -> int: 

55 """Returns numeric timestamp from cached snapshots-changed file.""" 

56 return self.get_snapshots_changed2(path)[1] 

57 

58 @staticmethod 

59 def get_snapshots_changed2(path: str) -> tuple[int, int]: 

60 """Like zfs_get_snapshots_changed() but reads from local cache.""" 

61 try: # perf: inode metadata reads and writes are fast - ballpark O(200k) ops/sec. 

62 s = os_stat(path) 

63 return round(s.st_atime), round(s.st_mtime) 

64 except FileNotFoundError: 

65 return 0, 0 # harmless 

66 

67 def last_modified_cache_file(self, remote: Remote, dataset: str, label: SnapshotLabel | None = None) -> str: 

68 """Returns the path of the cache file that is tracking last snapshot modification.""" 

69 cache_file: str = "=" if label is None else f"{label.prefix}{label.infix}{label.suffix}" 

70 userhost_dir: str = remote.ssh_user_host if remote.ssh_user_host else "-" 

71 return os_path_join(self.job.params.log_params.last_modified_cache_dir, userhost_dir, dataset, cache_file) 

72 

73 def invalidate_last_modified_cache_dataset(self, dataset: str) -> None: 

74 """Resets the last_modified timestamp of all cache files of the given dataset to zero.""" 

75 p = self.job.params 

76 cache_file: str = self.last_modified_cache_file(p.src, dataset) 

77 if not p.dry_run: 

78 try: 

79 zero_times = (0, 0) 

80 for entry in os.scandir(os.path.dirname(cache_file)): 

81 os_utime(entry.path, times=zero_times) 

82 os_utime(cache_file, times=zero_times) 

83 except FileNotFoundError: 

84 pass # harmless 

85 

86 def update_last_modified_cache(self, datasets_to_snapshot: dict[SnapshotLabel, list[str]]) -> None: 

87 """perf: copy lastmodified time of source dataset into local cache to reduce future 'zfs list -t snapshot' calls.""" 

88 p = self.job.params 

89 src = p.src 

90 if not is_caching_snapshots(p, src): 

91 return 

92 src_datasets_set: set[str] = set() 

93 dataset_labels: dict[str, list[SnapshotLabel]] = defaultdict(list) 

94 for label, datasets in datasets_to_snapshot.items(): 

95 src_datasets_set.update(datasets) # union 

96 for dataset in datasets: 

97 dataset_labels[dataset].append(label) 

98 

99 sorted_datasets: list[str] = sorted(src_datasets_set) 

100 snapshots_changed_dict: dict[str, int] = self.zfs_get_snapshots_changed(src, sorted_datasets) 

101 for src_dataset in sorted_datasets: 

102 snapshots_changed: int = snapshots_changed_dict.get(src_dataset, 0) 

103 self.job.src_properties[src_dataset].snapshots_changed = snapshots_changed 

104 if snapshots_changed == 0: 

105 self.invalidate_last_modified_cache_dataset(src_dataset) 

106 else: 

107 cache_file: str = self.last_modified_cache_file(src, src_dataset) 

108 cache_dir: str = os.path.dirname(cache_file) 

109 if not p.dry_run: 

110 try: 

111 os.makedirs(cache_dir, exist_ok=True) 

112 set_last_modification_time(cache_file, unixtime_in_secs=snapshots_changed, if_more_recent=True) 

113 for label in dataset_labels[src_dataset]: 

114 cache_file = self.last_modified_cache_file(src, src_dataset, label) 

115 set_last_modification_time(cache_file, unixtime_in_secs=snapshots_changed, if_more_recent=True) 

116 except FileNotFoundError: 

117 pass # harmless 

118 

119 def zfs_get_snapshots_changed(self, remote: Remote, sorted_datasets: list[str]) -> dict[str, int]: 

120 """Returns the ZFS dataset property "snapshots_changed", which is a UTC Unix time in integer seconds; 

121 See https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed""" 

122 

123 def try_zfs_list_command(_cmd: list[str], batch: list[str]) -> list[str]: 

124 try: 

125 return run_ssh_command(self.job, remote, print_stderr=False, cmd=_cmd + batch).splitlines() 

126 except CalledProcessError as e: 

127 return stderr_to_str(e.stdout).splitlines() 

128 except UnicodeDecodeError: 

129 return [] 

130 

131 assert (not self.job.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted" 

132 p = self.job.params 

133 cmd: list[str] = p.split_args(f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o snapshots_changed,name") 

134 results: dict[str, int] = {} 

135 interner: SortedInterner[str] = SortedInterner(sorted_datasets) # reduces memory footprint 

136 for lines in itr_ssh_cmd_parallel( 

137 self.job, remote, [(cmd, sorted_datasets)], lambda _cmd, batch: try_zfs_list_command(_cmd, batch), ordered=False 

138 ): 

139 for line in lines: 

140 if "\t" not in line: 

141 break # partial output from failing 'zfs list' command 

142 snapshots_changed, dataset = line.split("\t", 1) 

143 if not dataset: 

144 break # partial output from failing 'zfs list' command 

145 dataset = interner.interned(dataset) 

146 if snapshots_changed == "-" or not snapshots_changed: 

147 snapshots_changed = "0" 

148 results[dataset] = int(snapshots_changed) 

149 return results 

150 

151 

152def set_last_modification_time_safe( 

153 path: str, 

154 unixtime_in_secs: int | tuple[int, int], 

155 if_more_recent: bool = False, 

156) -> None: 

157 """Like set_last_modification_time() but creates directories if necessary.""" 

158 try: 

159 os.makedirs(os.path.dirname(path), exist_ok=True) 

160 set_last_modification_time(path, unixtime_in_secs=unixtime_in_secs, if_more_recent=if_more_recent) 

161 except FileNotFoundError: 

162 pass # harmless 

163 

164 

165def set_last_modification_time( 

166 path: str, 

167 unixtime_in_secs: int | tuple[int, int], 

168 if_more_recent: bool = False, 

169) -> None: 

170 """if_more_recent=True is a concurrency control mechanism that prevents us from overwriting a newer (monotonically 

171 increasing) snapshots_changed value (which is a UTC Unix time in integer seconds) that might have been written to the 

172 cache file by a different, more up-to-date bzfs process.""" 

173 unixtime_in_secs = (unixtime_in_secs, unixtime_in_secs) if isinstance(unixtime_in_secs, int) else unixtime_in_secs 

174 if not os_path_exists(path): 

175 with open(path, "ab"): 

176 pass 

177 elif if_more_recent and unixtime_in_secs[1] <= round(os_stat(path).st_mtime): 

178 return 

179 os_utime(path, times=unixtime_in_secs)