Coverage for bzfs_main/snapshot_cache.py: 96%
108 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 13:30 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""Stores snapshot metadata in fast disk inodes to avoid repeated 'zfs list' calls, without adding external dependencies or
16complex databases."""
18from __future__ import annotations
19import os
20from collections import defaultdict
21from os import stat as os_stat
22from os import utime as os_utime
23from os.path import exists as os_path_exists
24from os.path import join as os_path_join
25from subprocess import CalledProcessError
26from typing import TYPE_CHECKING
28from bzfs_main.connection import (
29 run_ssh_command,
30)
31from bzfs_main.detect import (
32 is_caching_snapshots,
33)
34from bzfs_main.parallel_batch_cmd import (
35 itr_ssh_cmd_parallel,
36)
37from bzfs_main.utils import (
38 SortedInterner,
39 stderr_to_str,
40)
42if TYPE_CHECKING: # pragma: no cover - for type hints only
43 from bzfs_main.bzfs import Job
44 from bzfs_main.configuration import Remote, SnapshotLabel
47class SnapshotCache:
48 """Handles last-modified cache operations for snapshot management."""
50 def __init__(self, job: Job) -> None:
51 # immutable variables:
52 self.job: Job = job
54 def get_snapshots_changed(self, path: str) -> int:
55 """Returns numeric timestamp from cached snapshots-changed file."""
56 return self.get_snapshots_changed2(path)[1]
58 @staticmethod
59 def get_snapshots_changed2(path: str) -> tuple[int, int]:
60 """Like zfs_get_snapshots_changed() but reads from local cache."""
61 try: # perf: inode metadata reads and writes are fast - ballpark O(200k) ops/sec.
62 s = os_stat(path)
63 return round(s.st_atime), round(s.st_mtime)
64 except FileNotFoundError:
65 return 0, 0 # harmless
67 def last_modified_cache_file(self, remote: Remote, dataset: str, label: SnapshotLabel | None = None) -> str:
68 """Returns the path of the cache file that is tracking last snapshot modification."""
69 cache_file: str = "=" if label is None else f"{label.prefix}{label.infix}{label.suffix}"
70 userhost_dir: str = remote.ssh_user_host if remote.ssh_user_host else "-"
71 return os_path_join(self.job.params.log_params.last_modified_cache_dir, userhost_dir, dataset, cache_file)
73 def invalidate_last_modified_cache_dataset(self, dataset: str) -> None:
74 """Resets the last_modified timestamp of all cache files of the given dataset to zero."""
75 p = self.job.params
76 cache_file: str = self.last_modified_cache_file(p.src, dataset)
77 if not p.dry_run:
78 try:
79 zero_times = (0, 0)
80 for entry in os.scandir(os.path.dirname(cache_file)):
81 os_utime(entry.path, times=zero_times)
82 os_utime(cache_file, times=zero_times)
83 except FileNotFoundError:
84 pass # harmless
86 def update_last_modified_cache(self, datasets_to_snapshot: dict[SnapshotLabel, list[str]]) -> None:
87 """perf: copy lastmodified time of source dataset into local cache to reduce future 'zfs list -t snapshot' calls."""
88 p = self.job.params
89 src = p.src
90 if not is_caching_snapshots(p, src):
91 return
92 src_datasets_set: set[str] = set()
93 dataset_labels: dict[str, list[SnapshotLabel]] = defaultdict(list)
94 for label, datasets in datasets_to_snapshot.items():
95 src_datasets_set.update(datasets) # union
96 for dataset in datasets:
97 dataset_labels[dataset].append(label)
99 sorted_datasets: list[str] = sorted(src_datasets_set)
100 snapshots_changed_dict: dict[str, int] = self.zfs_get_snapshots_changed(src, sorted_datasets)
101 for src_dataset in sorted_datasets:
102 snapshots_changed: int = snapshots_changed_dict.get(src_dataset, 0)
103 self.job.src_properties[src_dataset].snapshots_changed = snapshots_changed
104 if snapshots_changed == 0:
105 self.invalidate_last_modified_cache_dataset(src_dataset)
106 else:
107 cache_file: str = self.last_modified_cache_file(src, src_dataset)
108 cache_dir: str = os.path.dirname(cache_file)
109 if not p.dry_run:
110 try:
111 os.makedirs(cache_dir, exist_ok=True)
112 set_last_modification_time(cache_file, unixtime_in_secs=snapshots_changed, if_more_recent=True)
113 for label in dataset_labels[src_dataset]:
114 cache_file = self.last_modified_cache_file(src, src_dataset, label)
115 set_last_modification_time(cache_file, unixtime_in_secs=snapshots_changed, if_more_recent=True)
116 except FileNotFoundError:
117 pass # harmless
119 def zfs_get_snapshots_changed(self, remote: Remote, sorted_datasets: list[str]) -> dict[str, int]:
120 """Returns the ZFS dataset property "snapshots_changed", which is a UTC Unix time in integer seconds;
121 See https://openzfs.github.io/openzfs-docs/man/7/zfsprops.7.html#snapshots_changed"""
123 def try_zfs_list_command(_cmd: list[str], batch: list[str]) -> list[str]:
124 try:
125 return run_ssh_command(self.job, remote, print_stderr=False, cmd=_cmd + batch).splitlines()
126 except CalledProcessError as e:
127 return stderr_to_str(e.stdout).splitlines()
128 except UnicodeDecodeError:
129 return []
131 assert (not self.job.is_test_mode) or sorted_datasets == sorted(sorted_datasets), "List is not sorted"
132 p = self.job.params
133 cmd: list[str] = p.split_args(f"{p.zfs_program} list -t filesystem,volume -s name -Hp -o snapshots_changed,name")
134 results: dict[str, int] = {}
135 interner: SortedInterner[str] = SortedInterner(sorted_datasets) # reduces memory footprint
136 for lines in itr_ssh_cmd_parallel(
137 self.job, remote, [(cmd, sorted_datasets)], lambda _cmd, batch: try_zfs_list_command(_cmd, batch), ordered=False
138 ):
139 for line in lines:
140 if "\t" not in line:
141 break # partial output from failing 'zfs list' command
142 snapshots_changed, dataset = line.split("\t", 1)
143 if not dataset:
144 break # partial output from failing 'zfs list' command
145 dataset = interner.interned(dataset)
146 if snapshots_changed == "-" or not snapshots_changed:
147 snapshots_changed = "0"
148 results[dataset] = int(snapshots_changed)
149 return results
152def set_last_modification_time_safe(
153 path: str,
154 unixtime_in_secs: int | tuple[int, int],
155 if_more_recent: bool = False,
156) -> None:
157 """Like set_last_modification_time() but creates directories if necessary."""
158 try:
159 os.makedirs(os.path.dirname(path), exist_ok=True)
160 set_last_modification_time(path, unixtime_in_secs=unixtime_in_secs, if_more_recent=if_more_recent)
161 except FileNotFoundError:
162 pass # harmless
165def set_last_modification_time(
166 path: str,
167 unixtime_in_secs: int | tuple[int, int],
168 if_more_recent: bool = False,
169) -> None:
170 """if_more_recent=True is a concurrency control mechanism that prevents us from overwriting a newer (monotonically
171 increasing) snapshots_changed value (which is a UTC Unix time in integer seconds) that might have been written to the
172 cache file by a different, more up-to-date bzfs process."""
173 unixtime_in_secs = (unixtime_in_secs, unixtime_in_secs) if isinstance(unixtime_in_secs, int) else unixtime_in_secs
174 if not os_path_exists(path):
175 with open(path, "ab"):
176 pass
177 elif if_more_recent and unixtime_in_secs[1] <= round(os_stat(path).st_mtime):
178 return
179 os_utime(path, times=unixtime_in_secs)