Coverage for bzfs_main/replication.py: 99%
664 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 04:44 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""The core replication algorithm is in replicate_dataset(), which performs reliable full and/or incremental 'zfs send' and
16'zfs receive' operations on snapshots, using resumable ZFS sends when possible.
18For replication of multiple datasets, including recursive replication, see bzfs.py/replicate_datasets().
19"""
21from __future__ import (
22 annotations,
23)
24import os
25import re
26import shlex
27import subprocess
28import sys
29import threading
30import time
31from collections.abc import (
32 Iterable,
33 Iterator,
34)
35from concurrent.futures import (
36 Executor,
37 Future,
38)
39from subprocess import (
40 DEVNULL,
41 PIPE,
42)
43from typing import (
44 TYPE_CHECKING,
45 Final,
46)
48from bzfs_main.argparse_actions import (
49 has_timerange_filter,
50)
51from bzfs_main.connection import (
52 DEDICATED,
53 SHARED,
54 ConnectionPool,
55 maybe_inject_error,
56 refresh_ssh_connection_if_necessary,
57 run_ssh_command,
58 timeout,
59 try_ssh_command,
60)
61from bzfs_main.detect import (
62 ZFS_VERSION_IS_AT_LEAST_2_1_0,
63 ZFS_VERSION_IS_AT_LEAST_2_2_0,
64 are_bookmarks_enabled,
65 is_zpool_feature_enabled_or_active,
66)
67from bzfs_main.filter import (
68 filter_properties,
69 filter_snapshots,
70)
71from bzfs_main.incremental_send_steps import (
72 incremental_send_steps,
73)
74from bzfs_main.parallel_batch_cmd import (
75 run_ssh_cmd_batched,
76 run_ssh_cmd_parallel,
77)
78from bzfs_main.parallel_iterator import (
79 parallel_iterator,
80 run_in_parallel,
81)
82from bzfs_main.progress_reporter import (
83 PV_FILE_THREAD_SEPARATOR,
84)
85from bzfs_main.retry import (
86 Retry,
87 RetryableError,
88)
89from bzfs_main.utils import (
90 DONT_SKIP_DATASET,
91 FILE_PERMISSIONS,
92 LOG_DEBUG,
93 LOG_TRACE,
94 Subprocesses,
95 append_if_absent,
96 cut,
97 die,
98 getenv_bool,
99 human_readable_bytes,
100 is_descendant,
101 list_formatter,
102 open_nofollow,
103 replace_prefix,
104 stderr_to_str,
105 xprint,
106)
108if TYPE_CHECKING: # pragma: no cover - for type hints only
109 from bzfs_main.bzfs import (
110 Job,
111 )
112 from bzfs_main.configuration import (
113 Params,
114 Remote,
115 )
118# constants:
119INJECT_DST_PIPE_FAIL_KBYTES: Final[int] = 400 # for testing only
120RIGHT_JUST: Final[int] = 7
123def replicate_dataset(job: Job, src_dataset: str, tid: str, retry: Retry) -> bool:
124 """Replicates src_dataset to dst_dataset (thread-safe); For replication of multiple datasets, including recursive
125 replication, see bzfs.py/replicate_datasets()."""
126 p, log = job.params, job.params.log
127 src, dst = p.src, p.dst
128 retry_count: int = retry.count
129 dst_dataset: str = replace_prefix(src_dataset, old_prefix=src.root_dataset, new_prefix=dst.root_dataset)
130 log.debug(p.dry(f"{tid} Replicating: %s"), f"{src_dataset} --> {dst_dataset} ...")
132 list_result: bool | tuple[list[str], list[str], list[str], set[str], str, str] = _list_and_filter_src_and_dst_snapshots(
133 job, src_dataset, dst_dataset
134 )
135 if isinstance(list_result, bool):
136 return list_result
137 (
138 basis_src_snapshots_with_guids,
139 _src_snapshots_with_guids,
140 dst_snapshots_with_guids,
141 included_src_guids,
142 latest_src_snapshot,
143 oldest_src_snapshot,
144 ) = list_result
145 log.debug("latest_src_snapshot: %s", latest_src_snapshot)
146 latest_dst_snapshot: str = ""
147 latest_common_src_snapshot: str = ""
148 done_checking: bool = False
150 if job.dst_dataset_exists[dst_dataset]:
151 rollback_result: bool | tuple[str, str, bool] = _rollback_dst_dataset_if_necessary(
152 job,
153 dst_dataset,
154 latest_src_snapshot,
155 basis_src_snapshots_with_guids,
156 dst_snapshots_with_guids,
157 done_checking,
158 tid,
159 )
160 if isinstance(rollback_result, bool):
161 return rollback_result
162 latest_dst_snapshot, latest_common_src_snapshot, done_checking = rollback_result
164 log.debug("latest_common_src_snapshot: %s", latest_common_src_snapshot) # is a snapshot or bookmark
165 log.log(LOG_TRACE, "latest_dst_snapshot: %s", latest_dst_snapshot)
166 props_cache: dict[tuple[str, str, str], dict[str, str | None]] = {} # fresh empty ZFS props cache for each dataset
167 dry_run_no_send: bool = False
168 if not latest_common_src_snapshot:
169 # no common snapshot exists; delete all dst snapshots and perform a full send of the oldest selected src snapshot
170 full_result: tuple[str, bool, bool, int] = _replicate_dataset_fully(
171 job,
172 src_dataset,
173 dst_dataset,
174 oldest_src_snapshot,
175 latest_src_snapshot,
176 latest_dst_snapshot,
177 dst_snapshots_with_guids,
178 props_cache,
179 dry_run_no_send,
180 done_checking,
181 retry_count,
182 tid,
183 )
184 # we have now created a common snapshot
185 latest_common_src_snapshot, dry_run_no_send, done_checking, retry_count = full_result
186 if latest_common_src_snapshot:
187 # finally, incrementally replicate all selected snapshots from latest common snapshot until latest src snapshot
188 _replicate_dataset_incrementally(
189 job,
190 src_dataset,
191 dst_dataset,
192 latest_common_src_snapshot,
193 latest_src_snapshot,
194 basis_src_snapshots_with_guids,
195 included_src_guids,
196 props_cache,
197 dry_run_no_send,
198 done_checking,
199 retry_count,
200 tid,
201 )
202 return True
205def _list_and_filter_src_and_dst_snapshots(
206 job: Job, src_dataset: str, dst_dataset: str
207) -> bool | tuple[list[str], list[str], list[str], set[str], str, str]:
208 """On replication, list and filter src and dst snapshots."""
209 p, log = job.params, job.params.log
210 src, dst = p.src, p.dst
212 # list GUID and name for dst snapshots, sorted ascending by createtxg (more precise than creation time)
213 dst_cmd: list[str] = p.split_args(f"{p.zfs_program} list -t snapshot -d 1 -s createtxg -Hp -o guid,name", dst_dataset)
215 # list GUID and name for src snapshots + bookmarks, primarily sort ascending by transaction group (which is more
216 # precise than creation time), secondarily sort such that snapshots appear after bookmarks for the same GUID.
217 # Note: A snapshot and its ZFS bookmarks always have the same GUID, creation time and transaction group. A snapshot
218 # changes its transaction group but retains its creation time and GUID on 'zfs receive' on another pool, i.e.
219 # comparing createtxg is only meaningful within a single pool, not across pools from src to dst. Comparing creation
220 # time remains meaningful across pools from src to dst. Creation time is a UTC Unix time in integer seconds.
221 # Note that 'zfs create', 'zfs snapshot' and 'zfs bookmark' CLIs enforce that snapshot names must not contain a '#'
222 # char, bookmark names must not contain a '@' char, and dataset names must not contain a '#' or '@' char.
223 # GUID and creation time also do not contain a '#' or '@' char.
224 filter_needs_creation_time: bool = has_timerange_filter(p.snapshot_filters)
225 types: str = "snapshot,bookmark" if p.use_bookmark and are_bookmarks_enabled(p, src) else "snapshot"
226 props: str = job.creation_prefix + "creation,guid,name" if filter_needs_creation_time else "guid,name"
227 src_cmd = p.split_args(f"{p.zfs_program} list -t {types} -s createtxg -s type -d 1 -Hp -o {props}", src_dataset)
228 job.maybe_inject_delete(src, dataset=src_dataset, delete_trigger="zfs_list_snapshot_src")
229 src_snapshots_and_bookmarks, dst_snapshots_with_guids_str = run_in_parallel( # list src+dst snapshots in parallel
230 lambda: try_ssh_command(job, src, LOG_TRACE, cmd=src_cmd),
231 lambda: try_ssh_command(job, dst, LOG_TRACE, cmd=dst_cmd, error_trigger="zfs_list_snapshot_dst"),
232 )
233 job.dst_dataset_exists[dst_dataset] = dst_snapshots_with_guids_str is not None
234 dst_snapshots_with_guids: list[str] = (dst_snapshots_with_guids_str or "").splitlines()
235 if src_snapshots_and_bookmarks is None:
236 log.warning("Third party deleted source: %s", src_dataset)
237 return False # src dataset has been deleted by some third party while we're running - nothing to do anymore
238 src_snapshots_with_guids: list[str] = src_snapshots_and_bookmarks.splitlines()
239 src_snapshots_and_bookmarks = None
240 if len(dst_snapshots_with_guids) == 0 and "bookmark" in types:
241 # src bookmarks serve no purpose if the destination dataset has no snapshot; ignore them
242 src_snapshots_with_guids = [snapshot for snapshot in src_snapshots_with_guids if "@" in snapshot]
243 num_src_snapshots_found: int = sum(1 for snapshot in src_snapshots_with_guids if "@" in snapshot)
244 with job.stats_lock:
245 job.num_snapshots_found += num_src_snapshots_found
246 # apply include/exclude regexes to ignore irrelevant src snapshots
247 basis_src_snapshots_with_guids: list[str] = src_snapshots_with_guids
248 src_snapshots_with_guids = filter_snapshots(job, src_snapshots_with_guids)
249 if filter_needs_creation_time:
250 src_snapshots_with_guids = cut(field=2, lines=src_snapshots_with_guids)
251 basis_src_snapshots_with_guids = cut(field=2, lines=basis_src_snapshots_with_guids)
253 # find oldest and latest "true" snapshot, as well as GUIDs of all snapshots and bookmarks.
254 # a snapshot is "true" if it is not a bookmark.
255 oldest_src_snapshot: str = ""
256 latest_src_snapshot: str = ""
257 included_src_guids: set[str] = set()
258 for line in src_snapshots_with_guids:
259 guid, snapshot = line.split("\t", 1)
260 if "@" in snapshot:
261 included_src_guids.add(guid)
262 latest_src_snapshot = snapshot
263 if not oldest_src_snapshot:
264 oldest_src_snapshot = snapshot
265 if len(src_snapshots_with_guids) == 0:
266 if p.skip_missing_snapshots == "fail":
267 die(f"Source dataset includes no snapshot: {src_dataset}. Consider using --skip-missing-snapshots=dataset")
268 elif p.skip_missing_snapshots == "dataset":
269 log.warning("Skipping source dataset because it includes no snapshot: %s", src_dataset)
270 if p.recursive and not job.dst_dataset_exists[dst_dataset]:
271 log.warning("Also skipping descendant datasets as dst dataset does not exist for %s", src_dataset)
272 return job.dst_dataset_exists[dst_dataset]
273 return (
274 basis_src_snapshots_with_guids,
275 src_snapshots_with_guids,
276 dst_snapshots_with_guids,
277 included_src_guids,
278 latest_src_snapshot,
279 oldest_src_snapshot,
280 )
283def _rollback_dst_dataset_if_necessary(
284 job: Job,
285 dst_dataset: str,
286 latest_src_snapshot: str,
287 src_snapshots_with_guids: list[str],
288 dst_snapshots_with_guids: list[str],
289 done_checking: bool,
290 tid: str,
291) -> bool | tuple[str, str, bool]:
292 """On replication, rollback dst if necessary."""
293 p, log = job.params, job.params.log
294 dst = p.dst
295 latest_dst_snapshot: str = ""
296 latest_dst_guid: str = ""
297 if len(dst_snapshots_with_guids) > 0:
298 latest_dst_guid, latest_dst_snapshot = dst_snapshots_with_guids[-1].split("\t", 1)
299 if p.force_rollback_to_latest_snapshot:
300 log.info(p.dry(f"{tid} Rolling back destination to most recent snapshot: %s"), latest_dst_snapshot)
301 # rollback just in case the dst dataset was modified since its most recent snapshot
302 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset)
303 cmd: list[str] = p.split_args(f"{dst.sudo} {p.zfs_program} rollback", latest_dst_snapshot)
304 try_ssh_command(job, dst, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=cmd, exists=False)
305 elif latest_src_snapshot == "":
306 log.info(f"{tid} Already-up-to-date: %s", dst_dataset)
307 return True
309 # find most recent snapshot (or bookmark) that src and dst have in common - we'll start to replicate
310 # from there up to the most recent src snapshot. any two snapshots are "common" iff their ZFS GUIDs (i.e.
311 # contents) are equal. See https://github.com/openzfs/zfs/commit/305bc4b370b20de81eaf10a1cf724374258b74d1
312 def latest_common_snapshot(snapshots_with_guids: list[str], intersect_guids: set[str]) -> tuple[str | None, str]:
313 """Returns a true snapshot instead of its bookmark with the same GUID, per the sort order previously used for 'zfs
314 list -s ...'."""
315 for _line in reversed(snapshots_with_guids):
316 guid_, snapshot_ = _line.split("\t", 1)
317 if guid_ in intersect_guids:
318 return guid_, snapshot_ # can be a snapshot or bookmark
319 return None, ""
321 latest_common_guid, latest_common_src_snapshot = latest_common_snapshot(
322 src_snapshots_with_guids, set(cut(field=1, lines=dst_snapshots_with_guids))
323 )
324 log.debug("latest_common_src_snapshot: %s", latest_common_src_snapshot) # is a snapshot or bookmark
325 log.log(LOG_TRACE, "latest_dst_snapshot: %s", latest_dst_snapshot)
327 if latest_common_src_snapshot and latest_common_guid != latest_dst_guid:
328 # found latest common snapshot but dst has an even newer snapshot. rollback dst to that common snapshot.
329 assert latest_common_guid
330 _, latest_common_dst_snapshot = latest_common_snapshot(dst_snapshots_with_guids, {latest_common_guid})
331 if not (p.force_rollback_to_latest_common_snapshot or p.force):
332 die(
333 f"Conflict: Most recent destination snapshot {latest_dst_snapshot} is more recent than "
334 f"most recent common snapshot {latest_common_dst_snapshot}. Rollback destination first, "
335 "for example via --force-rollback-to-latest-common-snapshot (or --force) option."
336 )
337 if p.force_once:
338 p.force.value = False
339 p.force_rollback_to_latest_common_snapshot.value = False
340 log.info(p.dry(f"{tid} Rolling back destination to most recent common snapshot: %s"), latest_common_dst_snapshot)
341 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset)
342 cmd = p.split_args(
343 f"{dst.sudo} {p.zfs_program} rollback -r {p.force_unmount} {p.force_hard}", latest_common_dst_snapshot
344 )
345 try:
346 run_ssh_command(job, dst, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=cmd)
347 except (subprocess.CalledProcessError, UnicodeDecodeError) as e:
348 stderr: str = stderr_to_str(e.stderr) if hasattr(e, "stderr") else ""
349 no_sleep: bool = _clear_resumable_recv_state_if_necessary(job, dst_dataset, stderr)
350 # op isn't idempotent so retries regather current state from the start of replicate_dataset()
351 raise RetryableError("Subprocess failed", no_sleep=no_sleep) from e
353 if latest_src_snapshot and latest_src_snapshot == latest_common_src_snapshot:
354 log.info(f"{tid} Already up-to-date: %s", dst_dataset)
355 return True
356 return latest_dst_snapshot, latest_common_src_snapshot, done_checking
359def _replicate_dataset_fully(
360 job: Job,
361 src_dataset: str,
362 dst_dataset: str,
363 oldest_src_snapshot: str,
364 latest_src_snapshot: str,
365 latest_dst_snapshot: str,
366 dst_snapshots_with_guids: list[str],
367 props_cache: dict[tuple[str, str, str], dict[str, str | None]],
368 dry_run_no_send: bool,
369 done_checking: bool,
370 retry_count: int,
371 tid: str,
372) -> tuple[str, bool, bool, int]:
373 """On replication, deletes all dst snapshots and performs a full send of the oldest selected src snapshot, which in turn
374 creates a common snapshot."""
375 p, log = job.params, job.params.log
376 src, dst = p.src, p.dst
377 latest_common_src_snapshot: str = ""
378 if latest_dst_snapshot:
379 if not p.force:
380 die(
381 f"Conflict: No common snapshot found between {src_dataset} and {dst_dataset} even though "
382 "destination has at least one snapshot. Aborting. Consider using --force option to first "
383 "delete all existing destination snapshots in order to be able to proceed with replication."
384 )
385 if p.force_once: 385 ↛ 387line 385 didn't jump to line 387 because the condition on line 385 was always true
386 p.force.value = False
387 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset)
388 # extract SNAPSHOT_TAG from GUID<TAB>DATASET@SNAPSHOT_TAG
389 delete_snapshots(job, dst, dst_dataset, snapshot_tags=cut(2, separator="@", lines=dst_snapshots_with_guids))
390 if p.dry_run:
391 # As we're in --dryrun (--force) mode this conflict resolution step (see above) wasn't really executed:
392 # "no common snapshot was found. delete all dst snapshots". In turn, this would cause the subsequent
393 # 'zfs receive -n' to fail with "cannot receive new filesystem stream: destination has snapshots; must
394 # destroy them to overwrite it". So we skip the zfs send/receive step and keep on trucking.
395 dry_run_no_send = True
397 # to start with, fully replicate oldest snapshot, which in turn creates a common snapshot
398 if p.no_stream:
399 oldest_src_snapshot = latest_src_snapshot
400 if oldest_src_snapshot:
401 if not job.dst_dataset_exists[dst_dataset]:
402 # on destination, create parent filesystem and ancestors if they do not yet exist
403 dst_dataset_parent: str = os.path.dirname(dst_dataset)
404 if not job.dst_dataset_exists[dst_dataset_parent]:
405 if p.dry_run:
406 dry_run_no_send = True
407 if dst_dataset_parent: 407 ↛ 410line 407 didn't jump to line 410 because the condition on line 407 was always true
408 _create_zfs_filesystem(job, dst_dataset_parent)
410 recv_resume_token_result: tuple[str | None, list[str], list[str]] = _recv_resume_token(job, dst_dataset, retry_count)
411 recv_resume_token, send_resume_opts, recv_resume_opts = recv_resume_token_result
412 curr_size: int = _estimate_send_size(job, src, dst_dataset, recv_resume_token, oldest_src_snapshot)
413 humansize: str = _format_size(curr_size)
414 if recv_resume_token:
415 send_opts: list[str] = send_resume_opts # e.g. ["-t", "1-c740b4779-..."]
416 else:
417 send_opts = p.curr_zfs_send_program_opts + [oldest_src_snapshot]
418 send_cmd: list[str] = p.split_args(f"{src.sudo} {p.zfs_program} send", send_opts)
419 recv_opts: list[str] = p.zfs_full_recv_opts.copy() + recv_resume_opts
420 recv_opts, set_opts = _add_recv_property_options(job, True, recv_opts, src_dataset, props_cache)
421 recv_cmd: list[str] = p.split_args(
422 f"{dst.sudo} {p.zfs_program} receive -F", p.dry_run_recv, recv_opts, dst_dataset, allow_all=True
423 )
424 log.info(p.dry(f"{tid} Full send: %s"), f"{oldest_src_snapshot} --> {dst_dataset} ({humansize.strip()}) ...")
425 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset)
426 dry_run_no_send = dry_run_no_send or p.dry_run_no_send
427 job.maybe_inject_params(error_trigger="full_zfs_send_params")
428 humansize = humansize.rjust(RIGHT_JUST * 3 + 2)
429 _run_zfs_send_receive(
430 job, src_dataset, dst_dataset, send_cmd, recv_cmd, curr_size, humansize, dry_run_no_send, "full_zfs_send"
431 )
432 latest_common_src_snapshot = oldest_src_snapshot # we have now created a common snapshot
433 if not dry_run_no_send and not p.dry_run:
434 job.dst_dataset_exists[dst_dataset] = True
435 with job.stats_lock:
436 job.num_snapshots_replicated += 1
437 _create_zfs_bookmarks(job, src, src_dataset, [oldest_src_snapshot])
438 _zfs_set(job, set_opts, dst, dst_dataset)
439 dry_run_no_send = dry_run_no_send or p.dry_run
440 retry_count = 0
442 return latest_common_src_snapshot, dry_run_no_send, done_checking, retry_count
445def _replicate_dataset_incrementally(
446 job: Job,
447 src_dataset: str,
448 dst_dataset: str,
449 latest_common_src_snapshot: str,
450 latest_src_snapshot: str,
451 basis_src_snapshots_with_guids: list[str],
452 included_src_guids: set[str],
453 props_cache: dict[tuple[str, str, str], dict[str, str | None]],
454 dry_run_no_send: bool,
455 done_checking: bool,
456 retry_count: int,
457 tid: str,
458) -> None:
459 """Incrementally replicates all selected snapshots from latest common snapshot until latest src snapshot."""
460 p, log = job.params, job.params.log
461 src, dst = p.src, p.dst
463 def replication_candidates() -> tuple[list[str], list[str]]:
464 assert len(basis_src_snapshots_with_guids) > 0
465 result_snapshots: list[str] = []
466 result_guids: list[str] = []
467 last_appended_guid: str = ""
468 snapshot_itr: Iterator[str] = reversed(basis_src_snapshots_with_guids)
469 while True:
470 guid, snapshot = next(snapshot_itr).split("\t", 1)
471 if "@" in snapshot:
472 result_snapshots.append(snapshot)
473 result_guids.append(guid)
474 last_appended_guid = guid
475 if snapshot == latest_common_src_snapshot: # latest_common_src_snapshot is a snapshot or bookmark
476 if guid != last_appended_guid and "@" not in snapshot:
477 # only appends the src bookmark if it has no snapshot. If the bookmark has a snapshot then that
478 # snapshot has already been appended, per the sort order previously used for 'zfs list -s ...'
479 result_snapshots.append(snapshot)
480 result_guids.append(guid)
481 break
482 result_snapshots.reverse()
483 result_guids.reverse()
484 assert len(result_snapshots) > 0
485 assert len(result_snapshots) == len(result_guids)
486 return result_guids, result_snapshots
488 # collect the most recent common snapshot (which may be a bookmark) followed by all src snapshots
489 # (that are not a bookmark) that are more recent than that.
490 cand_guids, cand_snapshots = replication_candidates()
491 if len(cand_snapshots) == 1:
492 # latest_src_snapshot is a (true) snapshot that is equal to latest_common_src_snapshot or LESS recent
493 # than latest_common_src_snapshot. The latter case can happen if latest_common_src_snapshot is a
494 # bookmark whose snapshot has been deleted on src.
495 return # nothing more tbd
497 recv_resume_token_result: tuple[str | None, list[str], list[str]] = _recv_resume_token(job, dst_dataset, retry_count)
498 recv_resume_token, send_resume_opts, recv_resume_opts = recv_resume_token_result
499 recv_opts: list[str] = p.zfs_recv_program_opts.copy() + recv_resume_opts
500 recv_opts, set_opts = _add_recv_property_options(job, False, recv_opts, src_dataset, props_cache)
501 if p.no_stream:
502 # skip intermediate snapshots
503 steps_todo: list[tuple[str, str, str, list[str]]] = [
504 ("-i", latest_common_src_snapshot, latest_src_snapshot, [latest_src_snapshot])
505 ]
506 else:
507 # include intermediate src snapshots that pass --{include,exclude}-snapshot-* policy, using
508 # a series of -i/-I send/receive steps that skip excluded src snapshots.
509 steps_todo = _incremental_send_steps_wrapper(
510 p, cand_snapshots, cand_guids, included_src_guids, recv_resume_token is not None
511 )
512 log.log(LOG_TRACE, "steps_todo: %s", list_formatter(steps_todo, "; "))
513 estimate_send_sizes: list[int] = _estimate_send_sizes_in_parallel(job, src, dst_dataset, recv_resume_token, steps_todo)
514 total_size: int = sum(estimate_send_sizes)
515 total_num: int = sum(len(to_snapshots) for incr_flag, from_snap, to_snap, to_snapshots in steps_todo)
516 done_size: int = 0
517 done_num: int = 0
518 for i, (incr_flag, from_snap, to_snap, to_snapshots) in enumerate(steps_todo):
519 curr_num_snapshots: int = len(to_snapshots)
520 curr_size: int = estimate_send_sizes[i]
521 humansize: str = _format_size(total_size) + "/" + _format_size(done_size) + "/" + _format_size(curr_size)
522 human_num: str = f"{total_num}/{done_num}/{curr_num_snapshots} snapshots"
523 if recv_resume_token:
524 send_opts: list[str] = send_resume_opts # e.g. ["-t", "1-c740b4779-..."]
525 else:
526 send_opts = p.curr_zfs_send_program_opts + [incr_flag, from_snap, to_snap]
527 send_cmd: list[str] = p.split_args(f"{src.sudo} {p.zfs_program} send", send_opts)
528 recv_cmd: list[str] = p.split_args(
529 f"{dst.sudo} {p.zfs_program} receive", p.dry_run_recv, recv_opts, dst_dataset, allow_all=True
530 )
531 dense_size: str = p.two_or_more_spaces_regex.sub("", humansize.strip())
532 log.info(
533 p.dry(f"{tid} Incremental send {incr_flag}: %s"),
534 f"{from_snap} .. {to_snap[to_snap.index('@'):]} --> {dst_dataset} ({dense_size}) ({human_num}) ...",
535 )
536 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset, busy_if_send=False)
537 if p.dry_run and not job.dst_dataset_exists[dst_dataset]:
538 dry_run_no_send = True
539 dry_run_no_send = dry_run_no_send or p.dry_run_no_send
540 job.maybe_inject_params(error_trigger="incr_zfs_send_params")
541 _run_zfs_send_receive(
542 job, src_dataset, dst_dataset, send_cmd, recv_cmd, curr_size, humansize, dry_run_no_send, "incr_zfs_send"
543 )
544 done_size += curr_size
545 done_num += curr_num_snapshots
546 recv_resume_token = None
547 with job.stats_lock:
548 job.num_snapshots_replicated += curr_num_snapshots
549 assert p.create_bookmarks
550 if p.create_bookmarks == "all":
551 _create_zfs_bookmarks(job, src, src_dataset, to_snapshots)
552 elif p.create_bookmarks != "none":
553 threshold_millis: int = p.xperiods.label_milliseconds("_" + p.create_bookmarks)
554 to_snapshots = [snap for snap in to_snapshots if p.xperiods.label_milliseconds(snap) >= threshold_millis]
555 if i == len(steps_todo) - 1 and (len(to_snapshots) == 0 or to_snapshots[-1] != to_snap):
556 to_snapshots.append(to_snap) # ensure latest common snapshot is bookmarked
557 _create_zfs_bookmarks(job, src, src_dataset, to_snapshots)
558 _zfs_set(job, set_opts, dst, dst_dataset)
561def _format_size(num_bytes: int) -> str:
562 """Formats a byte count for human-readable logs."""
563 return human_readable_bytes(num_bytes, separator="").rjust(RIGHT_JUST)
566def _prepare_zfs_send_receive(
567 job: Job, src_dataset: str, send_cmd: list[str], recv_cmd: list[str], size_estimate_bytes: int, size_estimate_human: str
568) -> tuple[str, str, str]:
569 """Constructs zfs send/recv pipelines with optional compression and pv."""
570 p = job.params
571 send_cmd_str: str = shlex.join(send_cmd)
572 recv_cmd_str: str = shlex.join(recv_cmd)
574 if (
575 p.is_program_available("zstd", "src")
576 and p.is_program_available("zstd", "dst")
577 and p.is_program_available("sh", "src")
578 and p.is_program_available("sh", "dst")
579 ):
580 compress_cmd_: str = _compress_cmd(p, "src", size_estimate_bytes)
581 decompress_cmd_: str = _decompress_cmd(p, "dst", size_estimate_bytes)
582 else: # no compression is used if source and destination do not both support compression
583 compress_cmd_, decompress_cmd_ = "cat", "cat"
585 recordsize: int = abs(job.src_properties[src_dataset].recordsize)
586 src_buffer: str = _mbuffer_cmd(p, "src", size_estimate_bytes, recordsize)
587 dst_buffer: str = _mbuffer_cmd(p, "dst", size_estimate_bytes, recordsize)
588 local_buffer: str = _mbuffer_cmd(p, "local", size_estimate_bytes, recordsize)
590 pv_src_cmd: str = ""
591 pv_dst_cmd: str = ""
592 pv_loc_cmd: str = ""
593 if not p.src.ssh_user_host:
594 pv_src_cmd = _pv_cmd(job, "local", size_estimate_bytes, size_estimate_human)
595 elif not p.dst.ssh_user_host:
596 pv_dst_cmd = _pv_cmd(job, "local", size_estimate_bytes, size_estimate_human)
597 elif compress_cmd_ == "cat":
598 pv_loc_cmd = _pv_cmd(job, "local", size_estimate_bytes, size_estimate_human) # compression disabled
599 else:
600 # pull-push mode with compression enabled: reporting "percent complete" isn't straightforward because
601 # localhost observes the compressed data instead of the uncompressed data, so we disable the progress bar.
602 pv_loc_cmd = _pv_cmd(job, "local", size_estimate_bytes, size_estimate_human, disable_progress_bar=True)
604 # assemble pipeline running on source leg
605 src_pipe: str = ""
606 if job.inject_params.get("inject_src_pipe_fail", False):
607 # for testing; initially forward some bytes and then fail
608 src_pipe = f"{src_pipe} | dd bs=64 count=1 2>/dev/null && false"
609 if job.inject_params.get("inject_src_pipe_garble", False):
610 src_pipe = f"{src_pipe} | base64" # for testing; forward garbled bytes
611 if pv_src_cmd and pv_src_cmd != "cat":
612 src_pipe = f"{src_pipe} | {pv_src_cmd}"
613 if compress_cmd_ != "cat":
614 src_pipe = f"{src_pipe} | {compress_cmd_}"
615 if src_buffer != "cat":
616 src_pipe = f"{src_pipe} | {src_buffer}"
617 if src_pipe.startswith(" |"):
618 src_pipe = src_pipe[2:] # strip leading ' |' part
619 if job.inject_params.get("inject_src_send_error", False):
620 send_cmd_str = f"{send_cmd_str} --injectedGarbageParameter" # for testing; induce CLI parse error
621 if src_pipe:
622 src_pipe = f"{send_cmd_str} | {src_pipe}"
623 if p.src.ssh_user_host:
624 src_pipe = p.shell_program + " -c " + _dquote(src_pipe)
625 else:
626 src_pipe = send_cmd_str
628 # assemble pipeline running on middle leg between source and destination. only enabled for pull-push mode
629 local_pipe: str = ""
630 if local_buffer != "cat":
631 local_pipe = f"{local_buffer}"
632 if pv_loc_cmd and pv_loc_cmd != "cat":
633 local_pipe = f"{local_pipe} | {pv_loc_cmd}"
634 if local_buffer != "cat":
635 local_pipe = f"{local_pipe} | {local_buffer}"
636 if local_pipe.startswith(" |"):
637 local_pipe = local_pipe[2:] # strip leading ' |' part
638 if local_pipe:
639 local_pipe = f"| {local_pipe}"
641 # assemble pipeline running on destination leg
642 dst_pipe: str = ""
643 if dst_buffer != "cat":
644 dst_pipe = f"{dst_buffer}"
645 if decompress_cmd_ != "cat":
646 dst_pipe = f"{dst_pipe} | {decompress_cmd_}"
647 if pv_dst_cmd and pv_dst_cmd != "cat":
648 dst_pipe = f"{dst_pipe} | {pv_dst_cmd}"
649 if job.inject_params.get("inject_dst_pipe_fail", False):
650 # interrupt zfs receive for testing retry/resume; initially forward some bytes and then stop forwarding
651 dst_pipe = f"{dst_pipe} | dd bs=1024 count={INJECT_DST_PIPE_FAIL_KBYTES} 2>/dev/null"
652 if job.inject_params.get("inject_dst_pipe_garble", False):
653 dst_pipe = f"{dst_pipe} | base64" # for testing; forward garbled bytes
654 if dst_pipe.startswith(" |"):
655 dst_pipe = dst_pipe[2:] # strip leading ' |' part
656 if job.inject_params.get("inject_dst_receive_error", False):
657 recv_cmd_str = f"{recv_cmd_str} --injectedGarbageParameter" # for testing; induce CLI parse error
658 if dst_pipe:
659 dst_pipe = f"{dst_pipe} | {recv_cmd_str}"
660 if p.dst.ssh_user_host:
661 dst_pipe = p.shell_program + " -c " + _dquote(dst_pipe)
662 else:
663 dst_pipe = recv_cmd_str
665 # If there's no support for shell pipelines, we can't do compression, mbuffering, monitoring and rate-limiting,
666 # so we fall back to simple zfs send/receive.
667 if not p.is_program_available("sh", "src"):
668 src_pipe = send_cmd_str
669 if not p.is_program_available("sh", "dst"):
670 dst_pipe = recv_cmd_str
672 src_pipe = _squote(p.src, src_pipe)
673 dst_pipe = _squote(p.dst, dst_pipe)
674 return src_pipe, local_pipe, dst_pipe
677def _run_zfs_send_receive(
678 job: Job,
679 src_dataset: str,
680 dst_dataset: str,
681 send_cmd: list[str],
682 recv_cmd: list[str],
683 size_estimate_bytes: int,
684 size_estimate_human: str,
685 dry_run_no_send: bool,
686 error_trigger: str | None = None,
687) -> None:
688 """Executes a zfs send/receive pipeline between source and destination."""
689 p, log = job.params, job.params.log
690 pipes: tuple[str, str, str] = _prepare_zfs_send_receive(
691 job, src_dataset, send_cmd, recv_cmd, size_estimate_bytes, size_estimate_human
692 )
693 src_pipe, local_pipe, dst_pipe = pipes
694 conn_pool_name: str = DEDICATED if p.dedicated_tcp_connection_per_zfs_send else SHARED
695 src_conn_pool: ConnectionPool = p.connection_pools["src"].pool(conn_pool_name)
696 dst_conn_pool: ConnectionPool = p.connection_pools["dst"].pool(conn_pool_name)
697 with src_conn_pool.connection() as src_conn, dst_conn_pool.connection() as dst_conn:
698 refresh_ssh_connection_if_necessary(job, p.src, src_conn)
699 refresh_ssh_connection_if_necessary(job, p.dst, dst_conn)
700 src_ssh_cmd: str = " ".join(src_conn.ssh_cmd_quoted)
701 dst_ssh_cmd: str = " ".join(dst_conn.ssh_cmd_quoted)
702 cmd: list[str] = [p.shell_program_local, "-c", f"{src_ssh_cmd} {src_pipe} {local_pipe} | {dst_ssh_cmd} {dst_pipe}"]
703 msg: str = "Would execute: %s" if dry_run_no_send else "Executing: %s"
704 log.debug(msg, cmd[2].lstrip())
705 if not dry_run_no_send:
706 try:
707 maybe_inject_error(job, cmd=cmd, error_trigger=error_trigger)
708 sp: Subprocesses = job.subprocesses
709 process = sp.subprocess_run(
710 cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, text=True, timeout=timeout(job), check=True
711 )
712 except (subprocess.CalledProcessError, UnicodeDecodeError) as e:
713 no_sleep: bool = False
714 if not isinstance(e, UnicodeDecodeError):
715 xprint(log, stderr_to_str(e.stdout), file=sys.stdout)
716 log.warning("%s", stderr_to_str(e.stderr).rstrip())
717 if isinstance(e, subprocess.CalledProcessError):
718 no_sleep = _clear_resumable_recv_state_if_necessary(job, dst_dataset, e.stderr)
719 # op isn't idempotent so retries regather current state from the start of replicate_dataset()
720 raise RetryableError("Subprocess failed", no_sleep=no_sleep) from e
721 else:
722 xprint(log, process.stdout, file=sys.stdout)
723 xprint(log, process.stderr, file=sys.stderr)
726def _clear_resumable_recv_state_if_necessary(job: Job, dst_dataset: str, stderr: str) -> bool:
727 """Deletes leftover state when resume tokens fail to apply."""
729 def clear_resumable_recv_state() -> bool:
730 log.warning(p.dry("Aborting an interrupted zfs receive -s, deleting partially received state: %s"), dst_dataset)
731 cmd: list[str] = p.split_args(f"{p.dst.sudo} {p.zfs_program} receive -A", dst_dataset)
732 try_ssh_command(job, p.dst, LOG_TRACE, is_dry=p.dry_run, print_stdout=True, cmd=cmd)
733 log.log(LOG_TRACE, p.dry("Done Aborting an interrupted zfs receive -s: %s"), dst_dataset)
734 return True
736 p, log = job.params, job.params.log
737 # No i18n needed here. OpenZFS ships no translation catalogs, so gettext falls back to English msgids and locale settings
738 # have no effect. If translations ever appear, revisit this or inject LC_ALL=C.
740 # "cannot resume send: 'wb_src/tmp/src@s1' is no longer the same snapshot used in the initial send"
741 # "cannot resume send: 'wb_src/tmp/src@s1' used in the initial send no longer exists"
742 # "cannot resume send: incremental source 0xa000000000000000 no longer exists"
743 if "cannot resume send" in stderr and (
744 "is no longer the same snapshot used in the initial send" in stderr
745 or "used in the initial send no longer exists" in stderr
746 or re.search(r"incremental source [0-9a-fx]+ no longer exists", stderr)
747 ):
748 return clear_resumable_recv_state()
750 # "cannot receive resume stream: incompatible embedded data stream feature with encrypted receive."
751 # see https://github.com/openzfs/zfs/issues/12480
752 # 'cannot receive new filesystem stream: destination xx contains partially-complete state from "zfs receive -s"'
753 # this indicates that --no-resume-recv detects that dst contains a previously interrupted recv -s
754 elif "cannot receive" in stderr and (
755 "cannot receive resume stream: incompatible embedded data stream feature with encrypted receive" in stderr
756 or 'contains partially-complete state from "zfs receive -s"' in stderr
757 ):
758 return clear_resumable_recv_state()
760 elif ( # this signals normal behavior on interrupt of 'zfs receive -s' if running without --no-resume-recv
761 "cannot receive new filesystem stream: checksum mismatch or incomplete stream" in stderr
762 and "Partially received snapshot is saved" in stderr
763 ):
764 return True
766 # "cannot destroy 'wb_dest/tmp/dst@s1': snapshot has dependent clones ... use '-R' to destroy the following
767 # datasets: wb_dest/tmp/dst/%recv" # see https://github.com/openzfs/zfs/issues/10439#issuecomment-642774560
768 # This msg indicates a failed 'zfs destroy' via --delete-dst-snapshots. This "clone" is caused by a previously
769 # interrupted 'zfs receive -s'. The fix used here is to delete the partially received state of said
770 # 'zfs receive -s' via 'zfs receive -A', followed by an automatic retry, which will now succeed to delete the
771 # snapshot without user intervention.
772 elif (
773 "cannot destroy" in stderr
774 and "snapshot has dependent clone" in stderr
775 and "use '-R' to destroy the following dataset" in stderr
776 and f"\n{dst_dataset}/%recv\n" in stderr
777 ):
778 return clear_resumable_recv_state()
780 # Same cause as above, except that this error can occur during 'zfs rollback'
781 # Also see https://github.com/openzfs/zfs/blob/master/cmd/zfs/zfs_main.c
782 elif (
783 "cannot rollback to" in stderr
784 and "clones of previous snapshots exist" in stderr
785 and "use '-R' to force deletion of the following clones and dependents" in stderr
786 and f"\n{dst_dataset}/%recv\n" in stderr
787 ):
788 return clear_resumable_recv_state()
790 return False
793def _recv_resume_token(job: Job, dst_dataset: str, retry_count: int) -> tuple[str | None, list[str], list[str]]:
794 """Gets recv_resume_token ZFS property from dst_dataset and returns corresponding opts to use for send+recv."""
795 p, log = job.params, job.params.log
796 if not p.resume_recv:
797 return None, [], []
798 warning: str | None = None
799 if not is_zpool_feature_enabled_or_active(p, p.dst, "feature@extensible_dataset"):
800 warning = "not available on destination dataset"
801 elif not p.is_program_available(ZFS_VERSION_IS_AT_LEAST_2_1_0, "dst"):
802 warning = "unreliable as zfs version is too old" # e.g. zfs-0.8.3 "internal error: Unknown error 1040"
803 if warning:
804 log.warning(f"ZFS receive resume feature is {warning}. Falling back to --no-resume-recv: %s", dst_dataset)
805 return None, [], []
806 recv_resume_token: str | None = None
807 send_resume_opts: list[str] = []
808 if job.dst_dataset_exists[dst_dataset]:
809 cmd: list[str] = p.split_args(f"{p.zfs_program} get -Hp -o value -s none receive_resume_token", dst_dataset)
810 recv_resume_token = run_ssh_command(job, p.dst, LOG_TRACE, cmd=cmd).rstrip()
811 if recv_resume_token == "-" or not recv_resume_token: # noqa: S105
812 recv_resume_token = None
813 else:
814 send_resume_opts += ["-n"] if p.dry_run else []
815 send_resume_opts += ["-v"] if p.verbose_zfs else []
816 send_resume_opts += ["-t", recv_resume_token]
817 recv_resume_opts = ["-s"]
818 return recv_resume_token, send_resume_opts, recv_resume_opts
821def _mbuffer_cmd(p: Params, loc: str, size_estimate_bytes: int, recordsize: int) -> str:
822 """If mbuffer command is on the PATH, uses it in the ssh network pipe between 'zfs send' and 'zfs receive' to smooth out
823 the rate of data flow and prevent bottlenecks caused by network latency or speed fluctuation."""
824 if (
825 (p.no_estimate_send_size or size_estimate_bytes >= p.min_pipe_transfer_size)
826 and (
827 (loc == "src" and (p.src.is_nonlocal or p.dst.is_nonlocal))
828 or (loc == "dst" and (p.src.is_nonlocal or p.dst.is_nonlocal))
829 or (loc == "local" and p.src.is_nonlocal and p.dst.is_nonlocal)
830 )
831 and p.is_program_available("mbuffer", loc)
832 ):
833 recordsize = max(recordsize, 2 * 1024 * 1024)
834 return shlex.join([p.mbuffer_program, "-s", str(recordsize)] + p.mbuffer_program_opts)
835 else:
836 return "cat"
839def _compress_cmd(p: Params, loc: str, size_estimate_bytes: int) -> str:
840 """If zstd command is on the PATH, uses it in the ssh network pipe between 'zfs send' and 'zfs receive' to reduce network
841 bottlenecks by sending compressed data."""
842 if (
843 (p.no_estimate_send_size or size_estimate_bytes >= p.min_pipe_transfer_size)
844 and (p.src.is_nonlocal or p.dst.is_nonlocal)
845 and p.is_program_available("zstd", loc)
846 ):
847 return shlex.join([p.compression_program, "-c"] + p.compression_program_opts)
848 else:
849 return "cat"
852def _decompress_cmd(p: Params, loc: str, size_estimate_bytes: int) -> str:
853 """Returns decompression command for network pipe if remote supports it."""
854 if (
855 (p.no_estimate_send_size or size_estimate_bytes >= p.min_pipe_transfer_size)
856 and (p.src.is_nonlocal or p.dst.is_nonlocal)
857 and p.is_program_available("zstd", loc)
858 ):
859 return shlex.join([p.compression_program, "-dc"])
860 else:
861 return "cat"
864WORKER_THREAD_NUMBER_REGEX: Final[re.Pattern[str]] = re.compile(r"ThreadPoolExecutor-\d+_(\d+)")
867def _pv_cmd(
868 job: Job, loc: str, size_estimate_bytes: int, size_estimate_human: str, disable_progress_bar: bool = False
869) -> str:
870 """If pv command is on the PATH, monitors the progress of data transfer from 'zfs send' to 'zfs receive'; Progress can be
871 viewed via "tail -f $pv_log_file" aka tail -f ~/bzfs-logs/current.pv or similar."""
872 p = job.params
873 if p.is_program_available("pv", loc):
874 size: str = f"--size={size_estimate_bytes}"
875 if disable_progress_bar or p.no_estimate_send_size:
876 size = ""
877 pv_log_file: str = p.log_params.pv_log_file
878 thread_name: str = threading.current_thread().name
879 if match := WORKER_THREAD_NUMBER_REGEX.fullmatch(thread_name):
880 worker = int(match.group(1))
881 if worker > 0:
882 pv_log_file += PV_FILE_THREAD_SEPARATOR + f"{worker:04}"
883 if job.is_first_replication_task.get_and_set(False):
884 if job.isatty and not p.quiet:
885 job.progress_reporter.start()
886 job.replication_start_time_nanos = time.monotonic_ns()
887 if job.isatty and not p.quiet:
888 with open_nofollow(pv_log_file, mode="a", encoding="utf-8", perm=FILE_PERMISSIONS) as fd:
889 fd.write("\n") # mark start of new stream so ProgressReporter can reliably reset bytes_in_flight
890 job.progress_reporter.enqueue_pv_log_file(pv_log_file)
891 pv_program_opts: list[str] = [p.pv_program] + p.pv_program_opts
892 if job.progress_update_intervals is not None: # for testing
893 pv_program_opts += [f"--interval={job.progress_update_intervals[0]}"]
894 pv_program_opts += ["--force", f"--name={size_estimate_human}"]
895 pv_program_opts += [size] if size else []
896 return f"LC_ALL=C {shlex.join(pv_program_opts)} 2>> {shlex.quote(pv_log_file)}"
897 else:
898 return "cat"
901def _squote(remote: Remote, arg: str) -> str:
902 """Quotes an argument only when running remotely over ssh."""
903 assert arg is not None
904 return shlex.quote(arg) if remote.ssh_user_host else arg
907def _dquote(arg: str) -> str:
908 """Shell-escapes double quotes and dollar and backticks, then surrounds with double quotes."""
909 return '"' + arg.replace('"', '\\"').replace("$", "\\$").replace("`", "\\`") + '"'
912def delete_snapshots(job: Job, remote: Remote, dataset: str, snapshot_tags: list[str]) -> None:
913 """Deletes snapshots in manageable batches on the specified remote."""
914 if len(snapshot_tags) == 0:
915 return
916 p, log = job.params, job.params.log
917 log.info(p.dry(f"Deleting {len(snapshot_tags)} snapshots within %s: %s"), dataset, snapshot_tags)
918 # delete snapshots in batches without creating a command line that's too big for the OS to handle
919 run_ssh_cmd_batched(
920 job,
921 remote,
922 _delete_snapshot_cmd(p, remote, dataset + "@"),
923 snapshot_tags,
924 lambda batch: _delete_snapshot(job, remote, dataset, dataset + "@" + ",".join(batch)),
925 max_batch_items=job.params.max_snapshots_per_minibatch_on_delete_snaps,
926 sep=",",
927 )
930def _delete_snapshot(job: Job, r: Remote, dataset: str, snapshots_to_delete: str) -> None:
931 """Runs zfs destroy for a comma-separated snapshot list."""
932 p = job.params
933 cmd: list[str] = _delete_snapshot_cmd(p, r, snapshots_to_delete)
934 is_dry: bool = False # False is safe because we're using the 'zfs destroy -n' flag
935 try:
936 maybe_inject_error(job, cmd=cmd, error_trigger="zfs_delete_snapshot")
937 run_ssh_command(job, r, LOG_DEBUG, is_dry=is_dry, print_stdout=True, cmd=cmd)
938 except (subprocess.CalledProcessError, UnicodeDecodeError) as e:
939 stderr: str = stderr_to_str(e.stderr) if hasattr(e, "stderr") else ""
940 no_sleep: bool = _clear_resumable_recv_state_if_necessary(job, dataset, stderr)
941 # op isn't idempotent so retries regather current state from the start
942 raise RetryableError("Subprocess failed", no_sleep=no_sleep) from e
945def _delete_snapshot_cmd(p: Params, r: Remote, snapshots_to_delete: str) -> list[str]:
946 """Builds zfs destroy command for given snapshots."""
947 return p.split_args(
948 f"{r.sudo} {p.zfs_program} destroy", p.force_hard, p.verbose_destroy, p.dry_run_destroy, snapshots_to_delete
949 )
952def delete_bookmarks(job: Job, remote: Remote, dataset: str, snapshot_tags: list[str]) -> None:
953 """Removes bookmarks individually since zfs lacks batch deletion."""
954 if len(snapshot_tags) == 0:
955 return
956 # Unfortunately ZFS has no syntax yet to delete multiple bookmarks in a single CLI invocation
957 p, log = job.params, job.params.log
958 log.info(
959 p.dry(f"Deleting {len(snapshot_tags)} bookmarks within %s: %s"), dataset, dataset + "#" + ",".join(snapshot_tags)
960 )
961 cmd: list[str] = p.split_args(f"{remote.sudo} {p.zfs_program} destroy")
962 run_ssh_cmd_parallel(
963 job,
964 remote,
965 [(cmd, (f"{dataset}#{snapshot_tag}" for snapshot_tag in snapshot_tags))],
966 lambda _cmd, batch: try_ssh_command(
967 job, remote, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=_cmd + batch, exists=False
968 ),
969 max_batch_items=1,
970 )
973def delete_datasets(job: Job, remote: Remote, datasets: Iterable[str]) -> None:
974 """Deletes the given datasets via zfs destroy -r on the given remote."""
975 # Impl is batch optimized to minimize CLI + network roundtrips: only need to run zfs destroy if previously
976 # destroyed dataset (within sorted datasets) is not a prefix (aka ancestor) of current dataset
977 p, log = job.params, job.params.log
978 last_deleted_dataset: str = DONT_SKIP_DATASET
979 for dataset in sorted(datasets):
980 if is_descendant(dataset, of_root_dataset=last_deleted_dataset):
981 continue
982 log.info(p.dry("Deleting dataset tree: %s"), f"{dataset} ...")
983 cmd: list[str] = p.split_args(
984 f"{remote.sudo} {p.zfs_program} destroy -r {p.force_unmount} {p.force_hard} {p.verbose_destroy}",
985 p.dry_run_destroy,
986 dataset,
987 )
988 is_dry: bool = False # False is safe because we're using the 'zfs destroy -n' flag
989 run_ssh_command(job, remote, LOG_DEBUG, is_dry=is_dry, print_stdout=True, cmd=cmd)
990 last_deleted_dataset = dataset
993def _create_zfs_filesystem(job: Job, filesystem: str) -> None:
994 """Creates destination filesystem hierarchies without mounting them."""
995 # zfs create -p -u $filesystem
996 # To ensure the filesystems that we create do not get mounted, we apply a separate 'zfs create -p -u'
997 # invocation for each non-existing ancestor. This is because a single 'zfs create -p -u' applies the '-u'
998 # part only to the immediate filesystem, rather than to the not-yet existing ancestors.
999 p = job.params
1000 parent: str = ""
1001 no_mount: str = "-u" if p.is_program_available(ZFS_VERSION_IS_AT_LEAST_2_1_0, "dst") else ""
1002 for component in filesystem.split("/"):
1003 parent += component
1004 if not job.dst_dataset_exists[parent]:
1005 cmd: list[str] = p.split_args(f"{p.dst.sudo} {p.zfs_program} create -p", no_mount, parent)
1006 try:
1007 run_ssh_command(job, p.dst, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=cmd)
1008 except subprocess.CalledProcessError as e:
1009 # ignore harmless error caused by 'zfs create' without the -u flag, or by dataset already existing
1010 if not ( 1010 ↛ 1016line 1010 didn't jump to line 1016 because the condition on line 1010 was never true
1011 "filesystem successfully created, but it may only be mounted by root" in e.stderr
1012 or "filesystem successfully created, but not mounted" in e.stderr # SolarisZFS
1013 or "dataset already exists" in e.stderr
1014 or "filesystem already exists" in e.stderr # SolarisZFS?
1015 ):
1016 raise
1017 if not p.dry_run:
1018 job.dst_dataset_exists[parent] = True
1019 parent += "/"
1022def _create_zfs_bookmarks(job: Job, remote: Remote, dataset: str, snapshots: list[str]) -> None:
1023 """Creates bookmarks for the given snapshots, using the 'zfs bookmark' CLI."""
1024 # Unfortunately ZFS has no syntax yet to create multiple bookmarks in a single CLI invocation
1025 p = job.params
1027 def create_zfs_bookmark(cmd: list[str]) -> None:
1028 snapshot = cmd[-1]
1029 assert "@" in snapshot
1030 bookmark_cmd: list[str] = cmd + [replace_prefix(snapshot, old_prefix=f"{dataset}@", new_prefix=f"{dataset}#")]
1031 try:
1032 run_ssh_command(job, remote, LOG_DEBUG, is_dry=p.dry_run, print_stderr=False, cmd=bookmark_cmd)
1033 except subprocess.CalledProcessError as e:
1034 # ignore harmless zfs error caused by bookmark with the same name already existing
1035 if ": bookmark exists" not in e.stderr:
1036 print(e.stderr, file=sys.stderr, end="")
1037 raise
1039 if p.create_bookmarks != "none" and are_bookmarks_enabled(p, remote):
1040 cmd: list[str] = p.split_args(f"{remote.sudo} {p.zfs_program} bookmark")
1041 run_ssh_cmd_parallel(
1042 job, remote, [(cmd, snapshots)], lambda _cmd, batch: create_zfs_bookmark(_cmd + batch), max_batch_items=1
1043 )
1046def _estimate_send_size(job: Job, remote: Remote, dst_dataset: str, recv_resume_token: str | None, *items: str) -> int:
1047 """Estimates num bytes to transfer via 'zfs send -nvP'; Thread-safe."""
1048 p = job.params
1049 if p.no_estimate_send_size: 1049 ↛ 1050line 1049 didn't jump to line 1050 because the condition on line 1049 was never true
1050 return 0
1051 zfs_send_program_opts: list[str] = ["--parsable" if opt == "-P" else opt for opt in p.curr_zfs_send_program_opts]
1052 zfs_send_program_opts = append_if_absent(zfs_send_program_opts, "-v", "-n", "--parsable")
1053 if recv_resume_token:
1054 zfs_send_program_opts = ["-Pnv", "-t", recv_resume_token]
1055 items = ()
1056 cmd: list[str] = p.split_args(f"{remote.sudo} {p.zfs_program} send", zfs_send_program_opts, items)
1057 try:
1058 lines: str | None = try_ssh_command(job, remote, LOG_TRACE, cmd=cmd)
1059 except RetryableError as retryable_error:
1060 assert retryable_error.__cause__ is not None
1061 if recv_resume_token:
1062 e = retryable_error.__cause__
1063 stderr: str = stderr_to_str(e.stderr) if hasattr(e, "stderr") else ""
1064 retryable_error.no_sleep = _clear_resumable_recv_state_if_necessary(job, dst_dataset, stderr)
1065 # op isn't idempotent so retries regather current state from the start of replicate_dataset()
1066 raise
1067 if lines is None: 1067 ↛ 1068line 1067 didn't jump to line 1068 because the condition on line 1067 was never true
1068 return 0 # src dataset or snapshot has been deleted by third party
1069 size: str = lines.splitlines()[-1]
1070 assert size.startswith("size")
1071 return int(size[size.index("\t") + 1 :])
1074def _estimate_send_sizes_in_parallel(
1075 job: Job,
1076 r: Remote,
1077 dst_dataset: str,
1078 recv_resume_token: str | None,
1079 steps_todo: list[tuple[str, str, str, list[str]]],
1080) -> list[int]:
1081 """Estimates num bytes to transfer for multiple send steps; in parallel to reduce latency."""
1082 p = job.params
1083 if p.no_estimate_send_size:
1084 return [0 for _ in steps_todo]
1086 def iterator_builder(executor: Executor) -> Iterable[Iterable[Future[int]]]:
1087 resume_token: str | None = recv_resume_token
1088 return [
1089 (
1090 executor.submit(
1091 _estimate_send_size, job, r, dst_dataset, resume_token if i == 0 else None, incr_flag, from_snap, to_snap
1092 )
1093 for i, (incr_flag, from_snap, to_snap, _to_snapshots) in enumerate(steps_todo)
1094 )
1095 ]
1097 max_workers: int = min(len(steps_todo), job.max_workers[r.location])
1098 return list(
1099 parallel_iterator(iterator_builder, max_workers=max_workers, ordered=True, termination_event=job.termination_event)
1100 )
1103def _zfs_set(job: Job, properties: list[str], remote: Remote, dataset: str) -> None:
1104 """Applies the given property key=value pairs via 'zfs set' CLI to the given dataset on the given remote."""
1105 p = job.params
1106 if len(properties) == 0:
1107 return
1108 # set properties in batches without creating a command line that's too big for the OS to handle
1109 cmd: list[str] = p.split_args(f"{remote.sudo} {p.zfs_program} set")
1110 run_ssh_cmd_batched(
1111 job,
1112 remote,
1113 cmd,
1114 properties,
1115 lambda batch: run_ssh_command(
1116 job, remote, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=cmd + batch + [dataset]
1117 ),
1118 max_batch_items=2**29,
1119 )
1122def _zfs_get(
1123 job: Job,
1124 remote: Remote,
1125 dataset: str,
1126 sources: str,
1127 output_columns: str,
1128 propnames: str,
1129 splitlines: bool,
1130 props_cache: dict[tuple[str, str, str], dict[str, str | None]],
1131) -> dict[str, str | None]:
1132 """Returns the results of 'zfs get' CLI on the given dataset on the given remote."""
1133 assert dataset
1134 assert sources
1135 assert output_columns
1136 if not propnames:
1137 return {}
1138 p = job.params
1139 cache_key: tuple[str, str, str] = (sources, output_columns, propnames)
1140 props: dict[str, str | None] | None = props_cache.get(cache_key)
1141 if props is None:
1142 cmd: list[str] = p.split_args(f"{p.zfs_program} get -Hp -o {output_columns} -s {sources} {propnames}", dataset)
1143 lines: str = run_ssh_command(job, remote, LOG_TRACE, cmd=cmd)
1144 is_name_value_pair: bool = "," in output_columns
1145 props = {}
1146 # if not splitlines: omit single trailing newline that was appended by 'zfs get' CLI
1147 assert splitlines or len(lines) == 0 or lines[-1] == "\n"
1148 for line in lines.splitlines() if splitlines else [lines[0:-1]]:
1149 if is_name_value_pair:
1150 propname, propvalue = line.split("\t", 1)
1151 props[propname] = propvalue
1152 else:
1153 props[line] = None
1154 props_cache[cache_key] = props
1155 return props
1158def _incremental_send_steps_wrapper(
1159 p: Params, src_snapshots: list[str], src_guids: list[str], included_guids: set[str], is_resume: bool
1160) -> list[tuple[str, str, str, list[str]]]:
1161 """Returns incremental send steps, optionally converting -I to -i."""
1162 force_convert_I_to_i: bool = p.src.use_zfs_delegation and not getenv_bool("no_force_convert_I_to_i", True) # noqa: N806
1163 # force_convert_I_to_i == True implies that:
1164 # If using 'zfs allow' delegation mechanism, force convert 'zfs send -I' to a series of
1165 # 'zfs send -i' as a workaround for zfs issue https://github.com/openzfs/zfs/issues/16394
1166 return incremental_send_steps(src_snapshots, src_guids, included_guids, is_resume, force_convert_I_to_i)
1169def _add_recv_property_options(
1170 job: Job, full_send: bool, recv_opts: list[str], dataset: str, cache: dict[tuple[str, str, str], dict[str, str | None]]
1171) -> tuple[list[str], list[str]]:
1172 """Reads the ZFS properties of the given src dataset; Appends zfs recv -o and -x values to recv_opts according to CLI
1173 params, and returns properties to explicitly set on the dst dataset after 'zfs receive' completes successfully."""
1174 p = job.params
1175 set_opts: list[str] = []
1176 x_names: list[str] = p.zfs_recv_x_names
1177 x_names_set: set[str] = set(x_names)
1178 ox_names: set[str] = p.zfs_recv_ox_names.copy()
1179 if p.is_program_available(ZFS_VERSION_IS_AT_LEAST_2_2_0, p.dst.location):
1180 # workaround for https://github.com/openzfs/zfs/commit/b0269cd8ced242e66afc4fa856d62be29bb5a4ff
1181 # 'zfs recv -x foo' on zfs < 2.2 errors out if the 'foo' property isn't contained in the send stream
1182 for propname in x_names:
1183 recv_opts.append("-x")
1184 recv_opts.append(propname)
1185 ox_names.update(x_names) # union
1186 for config in [p.zfs_recv_o_config, p.zfs_recv_x_config, p.zfs_set_config]:
1187 if len(config.include_regexes) == 0:
1188 continue # this is the default - it's an instant noop
1189 if (full_send and "full" in config.targets) or (not full_send and "incremental" in config.targets):
1190 # 'zfs get' uses newline as record separator and tab as separator between output columns. A ZFS user property
1191 # may contain newline and tab characters (indeed anything). Together, this means that there is no reliable
1192 # way to determine where a record ends and the next record starts when listing multiple arbitrary records in
1193 # a single 'zfs get' call. Therefore, here we use a separate 'zfs get' call for each ZFS user property.
1194 # TODO: perf: on zfs >= 2.3 use json via zfs get -j to safely merge all zfs gets into one 'zfs get' call
1195 try:
1196 props_any: dict = _zfs_get(job, p.src, dataset, config.sources, "property", "all", True, cache)
1197 props_filtered: dict = filter_properties(p, props_any, config.include_regexes, config.exclude_regexes)
1198 user_propnames: list[str] = [name for name in props_filtered if ":" in name]
1199 sys_propnames: str = ",".join(name for name in props_filtered if ":" not in name)
1200 props: dict = _zfs_get(job, p.src, dataset, config.sources, "property,value", sys_propnames, True, cache)
1201 for propnames in user_propnames:
1202 props.update(_zfs_get(job, p.src, dataset, config.sources, "property,value", propnames, False, cache))
1203 except (subprocess.CalledProcessError, UnicodeDecodeError) as e:
1204 raise RetryableError("Subprocess failed") from e
1205 for propname in sorted(props.keys()):
1206 if config is p.zfs_recv_o_config:
1207 if not (propname in ox_names or propname in x_names_set):
1208 recv_opts.append("-o")
1209 recv_opts.append(f"{propname}={props[propname]}")
1210 ox_names.add(propname)
1211 elif config is p.zfs_recv_x_config:
1212 if propname not in ox_names:
1213 recv_opts.append("-x")
1214 recv_opts.append(propname)
1215 ox_names.add(propname)
1216 else:
1217 assert config is p.zfs_set_config
1218 set_opts.append(f"{propname}={props[propname]}")
1219 return recv_opts, set_opts
1222def _check_zfs_dataset_busy(job: Job, remote: Remote, dataset: str, busy_if_send: bool = True) -> bool:
1223 """Decline to start a state changing ZFS operation that is, although harmless, likely to collide with other currently
1224 running processes. Instead, retry the operation later, after some delay. For example, decline to start a 'zfs receive'
1225 into a destination dataset if another process is already running another 'zfs receive' into the same destination dataset,
1226 as ZFS would reject any such attempt. However, it's actually fine to run an incremental 'zfs receive' into a dataset in
1227 parallel with a 'zfs send' out of the very same dataset. This also helps daisy chain use cases where A replicates to B,
1228 and B replicates to C.
1230 _check_zfs_dataset_busy() offers no guarantees, it merely proactively avoids likely collisions. In other words, even if
1231 the process check below passes there is no guarantee that the destination dataset won't be busy by the time we actually
1232 execute the 'zfs send' operation. In such an event ZFS will reject the operation, we'll detect that, and we'll simply
1233 retry, after some delay. _check_zfs_dataset_busy() can be disabled via --ps-program=-.
1235 TLDR: As is common for long-running operations in distributed systems, we use coordination-free optimistic concurrency
1236 control where the parties simply retry on collision detection (rather than coordinate concurrency via a remote lock
1237 server).
1238 """
1239 p, log = job.params, job.params.log
1240 if not p.is_program_available("ps", remote.location):
1241 return True
1242 cmd: list[str] = p.split_args(f"{p.ps_program} -Ao args")
1243 procs: list[str] = (try_ssh_command(job, remote, LOG_TRACE, cmd=cmd) or "").splitlines()
1244 if job.inject_params.get("is_zfs_dataset_busy", False):
1245 procs += ["sudo -n zfs receive -u -o foo:bar=/baz " + dataset] # for unit testing only
1246 if not _is_zfs_dataset_busy(procs, dataset, busy_if_send=busy_if_send):
1247 return True
1248 op: str = "zfs {receive" + ("|send" if busy_if_send else "") + "} operation"
1249 try:
1250 die(f"Cannot continue now: Destination is already busy with {op} from another process: {dataset}")
1251 except SystemExit as e:
1252 log.warning("%s", e)
1253 raise RetryableError("dst currently busy with zfs mutation op") from e
1256ZFS_DATASET_BUSY_PREFIX: Final[str] = r"(([^ ]*?/)?(sudo|doas)( +-n)? +)?([^ ]*?/)?zfs (receive|recv"
1257ZFS_DATASET_BUSY_IF_MODS: Final[re.Pattern[str]] = re.compile((ZFS_DATASET_BUSY_PREFIX + ") .*").replace("(", "(?:"))
1258ZFS_DATASET_BUSY_IF_SEND: Final[re.Pattern[str]] = re.compile((ZFS_DATASET_BUSY_PREFIX + "|send) .*").replace("(", "(?:"))
1261def _is_zfs_dataset_busy(procs: list[str], dataset: str, busy_if_send: bool) -> bool:
1262 """Checks if any process list entry indicates ZFS activity on dataset."""
1263 regex: re.Pattern[str] = ZFS_DATASET_BUSY_IF_SEND if busy_if_send else ZFS_DATASET_BUSY_IF_MODS
1264 suffix: str = " " + dataset
1265 infix: str = " " + dataset + "@"
1266 return any((proc.endswith(suffix) or infix in proc) and regex.fullmatch(proc) for proc in procs)