Coverage for bzfs_main / replication.py: 99%
737 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 12:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 12:49 +0000
1# Copyright 2024 Wolfgang Hoschek AT mac DOT com
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15"""The core replication algorithm is in replicate_dataset(), which performs reliable full and/or incremental 'zfs send' and
16'zfs receive' operations on snapshots, using resumable ZFS sends when possible.
18For replication of multiple datasets, including recursive replication, see bzfs.py:replicate_datasets().
19"""
21from __future__ import (
22 annotations,
23)
24import os
25import re
26import shlex
27import subprocess
28import sys
29import threading
30import time
31from collections.abc import (
32 Iterable,
33 Iterator,
34)
35from concurrent.futures import (
36 Executor,
37 Future,
38)
39from subprocess import (
40 DEVNULL,
41 PIPE,
42)
43from typing import (
44 TYPE_CHECKING,
45 Final,
46)
48from bzfs_main.argparse_actions import (
49 has_timerange_filter,
50)
51from bzfs_main.configuration import (
52 is_same_remote,
53)
54from bzfs_main.detect import (
55 ZFS_VERSION_IS_AT_LEAST_2_1_0,
56 ZFS_VERSION_IS_AT_LEAST_2_2_0,
57 are_bookmarks_enabled,
58 is_zpool_feature_enabled_or_active,
59)
60from bzfs_main.filter import (
61 filter_properties,
62 filter_snapshots,
63)
64from bzfs_main.incremental_send_steps import (
65 incremental_send_steps,
66)
67from bzfs_main.parallel_batch_cmd import (
68 run_ssh_cmd_batched,
69 run_ssh_cmd_parallel,
70)
71from bzfs_main.progress_reporter import (
72 PV_FILE_THREAD_SEPARATOR,
73)
74from bzfs_main.util.connection import (
75 DEDICATED,
76 SHARED,
77 ConnectionPool,
78 dquote,
79 squote,
80 timeout,
81)
82from bzfs_main.util.parallel_iterator import (
83 parallel_iterator,
84 run_in_parallel,
85)
86from bzfs_main.util.retry import (
87 Retry,
88 RetryableError,
89)
90from bzfs_main.util.utils import (
91 DONT_SKIP_DATASET,
92 FILE_PERMISSIONS,
93 LOG_DEBUG,
94 LOG_TRACE,
95 Subprocesses,
96 append_if_absent,
97 cut,
98 die,
99 getenv_bool,
100 human_readable_bytes,
101 is_descendant,
102 list_formatter,
103 open_nofollow,
104 replace_prefix,
105 stderr_to_str,
106 xprint,
107)
109if TYPE_CHECKING: # pragma: no cover - for type hints only
110 from bzfs_main.bzfs import (
111 Job,
112 )
113 from bzfs_main.configuration import (
114 Params,
115 Remote,
116 )
119# constants:
120INJECT_DST_PIPE_FAIL_KBYTES: Final[int] = 400 # for testing only
121_RIGHT_JUST: Final[int] = 7
124def replicate_dataset(job: Job, src_dataset: str, tid: str, retry: Retry) -> bool:
125 """Replicates src_dataset to dst_dataset (thread-safe); For replication of multiple datasets, including recursive
126 replication, see bzfs.py:replicate_datasets()."""
127 p, log = job.params, job.params.log
128 src, dst = p.src, p.dst
129 retry_count: int = retry.count
130 dst_dataset: str = replace_prefix(src_dataset, old_prefix=src.root_dataset, new_prefix=dst.root_dataset)
131 log.debug(p.dry(f"{tid} Replicating: %s"), f"{src_dataset} --> {dst_dataset} ...")
133 list_result: bool | tuple[list[str], list[str], list[str], set[str], str, str] = _list_and_filter_src_and_dst_snapshots(
134 job, src_dataset, dst_dataset
135 )
136 if isinstance(list_result, bool):
137 return list_result
138 (
139 basis_src_snapshots_with_guids,
140 _src_snapshots_with_guids,
141 dst_snapshots_with_guids,
142 included_src_guids,
143 latest_src_snapshot,
144 oldest_src_snapshot,
145 ) = list_result
146 log.debug("latest_src_snapshot: %s", latest_src_snapshot)
147 latest_dst_snapshot: str = ""
148 latest_common_src_snapshot: str = ""
149 done_checking: bool = False
151 if job.dst_dataset_exists[dst_dataset]:
152 rollback_result: bool | tuple[str, str, bool] = _rollback_dst_dataset_if_necessary(
153 job,
154 dst_dataset,
155 latest_src_snapshot,
156 basis_src_snapshots_with_guids,
157 dst_snapshots_with_guids,
158 done_checking,
159 tid,
160 )
161 if isinstance(rollback_result, bool):
162 return rollback_result
163 latest_dst_snapshot, latest_common_src_snapshot, done_checking = rollback_result
165 log.debug("latest_common_src_snapshot: %s", latest_common_src_snapshot) # is a snapshot or bookmark
166 log.log(LOG_TRACE, "latest_dst_snapshot: %s", latest_dst_snapshot)
167 props_cache: dict[tuple[str, str, str], dict[str, str | None]] = {} # fresh empty ZFS props cache for each dataset
168 dry_run_no_send: bool = False
169 if not latest_common_src_snapshot:
170 # no common snapshot exists; delete all dst snapshots and perform a full send of the oldest selected src snapshot
171 full_result: tuple[str, bool, bool, int] = _replicate_dataset_fully(
172 job,
173 src_dataset,
174 dst_dataset,
175 oldest_src_snapshot,
176 latest_src_snapshot,
177 latest_dst_snapshot,
178 dst_snapshots_with_guids,
179 props_cache,
180 dry_run_no_send,
181 done_checking,
182 retry_count,
183 tid,
184 )
185 # we have now created a common snapshot
186 latest_common_src_snapshot, dry_run_no_send, done_checking, retry_count = full_result
187 if latest_common_src_snapshot:
188 # finally, incrementally replicate all selected snapshots from latest common snapshot until latest src snapshot
189 _replicate_dataset_incrementally(
190 job,
191 src_dataset,
192 dst_dataset,
193 latest_common_src_snapshot,
194 latest_src_snapshot,
195 basis_src_snapshots_with_guids,
196 included_src_guids,
197 props_cache,
198 dry_run_no_send,
199 done_checking,
200 retry_count,
201 tid,
202 )
203 return True
206def _list_and_filter_src_and_dst_snapshots(
207 job: Job, src_dataset: str, dst_dataset: str
208) -> bool | tuple[list[str], list[str], list[str], set[str], str, str]:
209 """On replication, list and filter src and dst snapshots."""
210 p, log = job.params, job.params.log
211 src, dst = p.src, p.dst
213 # list GUID and name for dst snapshots, sorted ascending by createtxg (more precise than creation time)
214 dst_cmd: list[str] = p.split_args(f"{p.zfs_program} list -t snapshot -d 1 -s createtxg -Hp -o guid,name", dst_dataset)
216 # list GUID and name for src snapshots + bookmarks, primarily sort ascending by transaction group (which is more
217 # precise than creation time), secondarily sort such that snapshots appear after bookmarks for the same GUID.
218 # Note: A snapshot and its ZFS bookmarks always have the same GUID, creation time and transaction group. A snapshot
219 # changes its transaction group but retains its creation time and GUID on 'zfs receive' on another pool, i.e.
220 # comparing createtxg is only meaningful within a single pool, not across pools from src to dst. Comparing creation
221 # time remains meaningful across pools from src to dst. Creation time is a UTC Unix time in integer seconds.
222 # Note that 'zfs create', 'zfs snapshot' and 'zfs bookmark' CLIs enforce that snapshot names must not contain a '#'
223 # char, bookmark names must not contain a '@' char, and dataset names must not contain a '#' or '@' char.
224 # GUID and creation time also do not contain a '#' or '@' char.
225 filter_needs_creation_time: bool = has_timerange_filter(p.snapshot_filters)
226 types: str = "snapshot,bookmark" if p.use_bookmark and are_bookmarks_enabled(p, src) else "snapshot"
227 props: str = job.creation_prefix + "creation,guid,name" if filter_needs_creation_time else "guid,name"
228 src_cmd = p.split_args(f"{p.zfs_program} list -t {types} -s createtxg -s type -d 1 -Hp -o {props}", src_dataset)
229 job.maybe_inject_delete(src, dataset=src_dataset, delete_trigger="zfs_list_snapshot_src")
230 src_snapshots_and_bookmarks, dst_snapshots_with_guids_str = run_in_parallel( # list src+dst snapshots in parallel
231 lambda: job.try_ssh_command(src, LOG_TRACE, cmd=src_cmd),
232 lambda: job.try_ssh_command(dst, LOG_TRACE, cmd=dst_cmd, error_trigger="zfs_list_snapshot_dst"),
233 )
234 job.dst_dataset_exists[dst_dataset] = dst_snapshots_with_guids_str is not None
235 dst_snapshots_with_guids: list[str] = (dst_snapshots_with_guids_str or "").splitlines()
236 if src_snapshots_and_bookmarks is None:
237 log.warning("Third party deleted source: %s", src_dataset)
238 return False # src dataset has been deleted by some third party while we're running - nothing to do anymore
239 src_snapshots_with_guids: list[str] = src_snapshots_and_bookmarks.splitlines()
240 src_snapshots_and_bookmarks = None
241 if len(dst_snapshots_with_guids) == 0 and "bookmark" in types:
242 # src bookmarks serve no purpose if the destination dataset has no snapshot; ignore them
243 src_snapshots_with_guids = [snapshot for snapshot in src_snapshots_with_guids if "@" in snapshot]
244 num_src_snapshots_found: int = sum(1 for snapshot in src_snapshots_with_guids if "@" in snapshot)
245 with job.stats_lock:
246 job.num_snapshots_found += num_src_snapshots_found
247 # apply include/exclude regexes to ignore irrelevant src snapshots
248 basis_src_snapshots_with_guids: list[str] = src_snapshots_with_guids
249 src_snapshots_with_guids = filter_snapshots(job, src_snapshots_with_guids)
250 if filter_needs_creation_time:
251 src_snapshots_with_guids = cut(field=2, lines=src_snapshots_with_guids)
252 basis_src_snapshots_with_guids = cut(field=2, lines=basis_src_snapshots_with_guids)
254 # find oldest and latest "true" snapshot, as well as GUIDs of all snapshots and bookmarks.
255 # a snapshot is "true" if it is not a bookmark.
256 oldest_src_snapshot: str = ""
257 latest_src_snapshot: str = ""
258 included_src_guids: set[str] = set()
259 for line in src_snapshots_with_guids:
260 guid, snapshot = line.split("\t", 1)
261 if "@" in snapshot:
262 included_src_guids.add(guid)
263 latest_src_snapshot = snapshot
264 if not oldest_src_snapshot:
265 oldest_src_snapshot = snapshot
266 if len(src_snapshots_with_guids) == 0:
267 if p.skip_missing_snapshots == "fail":
268 die(f"Source dataset includes no snapshot: {src_dataset}. Consider using --skip-missing-snapshots=dataset")
269 elif p.skip_missing_snapshots == "dataset":
270 log.warning("Skipping source dataset because it includes no snapshot: %s", src_dataset)
271 if p.recursive and not job.dst_dataset_exists[dst_dataset]:
272 log.warning("Also skipping descendant datasets as dst dataset does not exist for %s", src_dataset)
273 return job.dst_dataset_exists[dst_dataset]
274 return (
275 basis_src_snapshots_with_guids,
276 src_snapshots_with_guids,
277 dst_snapshots_with_guids,
278 included_src_guids,
279 latest_src_snapshot,
280 oldest_src_snapshot,
281 )
284def _rollback_dst_dataset_if_necessary(
285 job: Job,
286 dst_dataset: str,
287 latest_src_snapshot: str,
288 src_snapshots_with_guids: list[str],
289 dst_snapshots_with_guids: list[str],
290 done_checking: bool,
291 tid: str,
292) -> bool | tuple[str, str, bool]:
293 """On replication, rollback dst if necessary; error out if not permitted."""
294 p, log = job.params, job.params.log
295 dst = p.dst
296 latest_dst_snapshot: str = ""
297 latest_dst_guid: str = ""
298 if len(dst_snapshots_with_guids) > 0:
299 latest_dst_guid, latest_dst_snapshot = dst_snapshots_with_guids[-1].split("\t", 1)
300 if p.force_rollback_to_latest_snapshot:
301 log.info(p.dry(f"{tid} Rolling back destination to most recent snapshot: %s"), latest_dst_snapshot)
302 # rollback just in case the dst dataset was modified since its most recent snapshot
303 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset)
304 cmd: list[str] = p.split_args(f"{dst.sudo} {p.zfs_program} rollback", latest_dst_snapshot)
305 job.try_ssh_command(dst, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=cmd, exists=False)
306 elif latest_src_snapshot == "":
307 log.info(f"{tid} Already-up-to-date: %s", dst_dataset)
308 return True
310 # find most recent snapshot (or bookmark) that src and dst have in common - we'll start to replicate
311 # from there up to the most recent src snapshot. any two snapshots are "common" iff their ZFS GUIDs (i.e.
312 # contents) are equal. See https://github.com/openzfs/zfs/commit/305bc4b370b20de81eaf10a1cf724374258b74d1
313 def latest_common_snapshot(snapshots_with_guids: list[str], intersect_guids: set[str]) -> tuple[str | None, str]:
314 """Returns a true snapshot instead of its bookmark with the same GUID, per the sort order previously used for 'zfs
315 list -s ...'."""
316 for _line in reversed(snapshots_with_guids):
317 guid_, snapshot_ = _line.split("\t", 1)
318 if guid_ in intersect_guids:
319 return guid_, snapshot_ # can be a snapshot or bookmark
320 return None, ""
322 latest_common_guid, latest_common_src_snapshot = latest_common_snapshot(
323 src_snapshots_with_guids, set(cut(field=1, lines=dst_snapshots_with_guids))
324 )
325 log.debug("latest_common_src_snapshot: %s", latest_common_src_snapshot) # is a snapshot or bookmark
326 log.log(LOG_TRACE, "latest_dst_snapshot: %s", latest_dst_snapshot)
328 if latest_common_src_snapshot and latest_common_guid != latest_dst_guid:
329 # found latest common snapshot but dst has an even newer snapshot. rollback dst to that common snapshot.
330 assert latest_common_guid
331 _, latest_common_dst_snapshot = latest_common_snapshot(dst_snapshots_with_guids, {latest_common_guid})
332 if not (p.force_rollback_to_latest_common_snapshot or p.force):
333 die(
334 f"Conflict: Most recent destination snapshot {latest_dst_snapshot} is more recent than "
335 f"most recent common snapshot {latest_common_dst_snapshot}. Rollback destination first, "
336 "for example via --force-rollback-to-latest-common-snapshot (or --force) option."
337 )
338 if p.force_once:
339 p.force.value = False
340 p.force_rollback_to_latest_common_snapshot.value = False
341 log.info(p.dry(f"{tid} Rolling back destination to most recent common snapshot: %s"), latest_common_dst_snapshot)
342 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset)
343 cmd = p.split_args(
344 f"{dst.sudo} {p.zfs_program} rollback -r {p.force_unmount} {p.force_hard}", latest_common_dst_snapshot
345 )
346 try:
347 job.run_ssh_command(dst, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=cmd)
348 except (subprocess.CalledProcessError, UnicodeDecodeError) as e:
349 stderr: str = stderr_to_str(e.stderr) if hasattr(e, "stderr") else ""
350 retry_immediately_once: bool = _clear_resumable_recv_state_if_necessary(job, dst_dataset, stderr)
351 # op isn't idempotent so retries regather current state from the start of replicate_dataset()
352 raise RetryableError(display_msg="zfs rollback", retry_immediately_once=retry_immediately_once) from e
354 if latest_src_snapshot and latest_src_snapshot == latest_common_src_snapshot:
355 log.info(f"{tid} Already up-to-date: %s", dst_dataset)
356 return True
357 return latest_dst_snapshot, latest_common_src_snapshot, done_checking
360def _replicate_dataset_fully(
361 job: Job,
362 src_dataset: str,
363 dst_dataset: str,
364 oldest_src_snapshot: str,
365 latest_src_snapshot: str,
366 latest_dst_snapshot: str,
367 dst_snapshots_with_guids: list[str],
368 props_cache: dict[tuple[str, str, str], dict[str, str | None]],
369 dry_run_no_send: bool,
370 done_checking: bool,
371 retry_count: int,
372 tid: str,
373) -> tuple[str, bool, bool, int]:
374 """On replication, deletes all dst snapshots and performs a full send of the oldest selected src snapshot, which in turn
375 creates a common snapshot; error out if not permitted."""
376 p, log = job.params, job.params.log
377 src, dst = p.src, p.dst
378 latest_common_src_snapshot: str = ""
379 if latest_dst_snapshot:
380 if not p.force:
381 die(
382 f"Conflict: No common snapshot found between {src_dataset} and {dst_dataset} even though "
383 "destination has at least one snapshot. Aborting. Consider using --force option to first "
384 "delete all existing destination snapshots in order to be able to proceed with replication."
385 )
386 if p.force_once: 386 ↛ 388line 386 didn't jump to line 388 because the condition on line 386 was always true
387 p.force.value = False
388 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset)
389 # extract SNAPSHOT_TAG from GUID<TAB>DATASET@SNAPSHOT_TAG
390 delete_snapshots(job, dst, dst_dataset, snapshot_tags=cut(2, separator="@", lines=dst_snapshots_with_guids))
391 if p.dry_run:
392 # As we're in --dryrun (--force) mode this conflict resolution step (see above) wasn't really executed:
393 # "no common snapshot was found. delete all dst snapshots". In turn, this would cause the subsequent
394 # 'zfs receive -n' to fail with "cannot receive new filesystem stream: destination has snapshots; must
395 # destroy them to overwrite it". So we skip the zfs send/receive step and keep on trucking.
396 dry_run_no_send = True
398 # to start with, fully replicate oldest snapshot, which in turn creates a common snapshot
399 if p.no_stream:
400 oldest_src_snapshot = latest_src_snapshot
401 if oldest_src_snapshot:
402 if not job.dst_dataset_exists[dst_dataset]:
403 # on destination, create parent filesystem and ancestors if they do not yet exist
404 dst_dataset_parent: str = os.path.dirname(dst_dataset)
405 if not job.dst_dataset_exists[dst_dataset_parent]:
406 if p.dry_run:
407 dry_run_no_send = True
408 if dst_dataset_parent: 408 ↛ 411line 408 didn't jump to line 411 because the condition on line 408 was always true
409 _create_zfs_filesystem(job, dst_dataset_parent)
411 recv_resume_token_result: tuple[str | None, list[str], list[str]] = _recv_resume_token(job, dst_dataset, retry_count)
412 recv_resume_token, send_resume_opts, recv_resume_opts = recv_resume_token_result
413 curr_size: int = _estimate_send_size(job, src, dst_dataset, recv_resume_token, oldest_src_snapshot)
414 humansize: str = _format_size(curr_size)
415 if recv_resume_token:
416 send_opts: list[str] = p.curr_zfs_send_program_opts + send_resume_opts # e.g. curr + ["-t", "1-c740b4779-..."]
417 else:
418 send_opts = p.curr_zfs_send_program_opts + [oldest_src_snapshot]
419 send_cmd: list[str] = p.split_args(f"{src.sudo} {p.zfs_program} send", send_opts)
420 recv_opts: list[str] = p.zfs_full_recv_opts.copy() + recv_resume_opts
421 recv_opts, set_opts = _add_recv_property_options(job, True, recv_opts, src_dataset, props_cache)
422 recv_cmd: list[str] = p.split_args(
423 f"{dst.sudo} {p.zfs_program} receive -F", p.dry_run_recv, recv_opts, dst_dataset, allow_all=True
424 )
425 log.info(p.dry(f"{tid} Full send: %s"), f"{oldest_src_snapshot} --> {dst_dataset} ({humansize.strip()}) ...")
426 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset)
427 dry_run_no_send = dry_run_no_send or p.dry_run_no_send
428 job.maybe_inject_params(error_trigger="full_zfs_send_params")
429 humansize = humansize.rjust(_RIGHT_JUST * 3 + 2)
430 _run_zfs_send_receive( # do the real work
431 job, src_dataset, dst_dataset, send_cmd, recv_cmd, curr_size, humansize, dry_run_no_send, "full_zfs_send"
432 )
433 latest_common_src_snapshot = oldest_src_snapshot # we have now created a common snapshot
434 if not dry_run_no_send and not p.dry_run:
435 job.dst_dataset_exists[dst_dataset] = True
436 with job.stats_lock:
437 job.num_snapshots_replicated += 1
438 _create_zfs_bookmarks(job, src, src_dataset, [oldest_src_snapshot])
439 _zfs_set(job, set_opts, dst, dst_dataset)
440 dry_run_no_send = dry_run_no_send or p.dry_run
441 retry_count = 0
443 return latest_common_src_snapshot, dry_run_no_send, done_checking, retry_count
446def _replicate_dataset_incrementally(
447 job: Job,
448 src_dataset: str,
449 dst_dataset: str,
450 latest_common_src_snapshot: str,
451 latest_src_snapshot: str,
452 basis_src_snapshots_with_guids: list[str],
453 included_src_guids: set[str],
454 props_cache: dict[tuple[str, str, str], dict[str, str | None]],
455 dry_run_no_send: bool,
456 done_checking: bool,
457 retry_count: int,
458 tid: str,
459) -> None:
460 """Incrementally replicates all selected snapshots from latest common snapshot until latest src snapshot."""
461 p, log = job.params, job.params.log
462 src, dst = p.src, p.dst
464 def replication_candidates() -> tuple[list[str], list[str]]:
465 assert len(basis_src_snapshots_with_guids) > 0
466 result_snapshots: list[str] = []
467 result_guids: list[str] = []
468 last_appended_guid: str = ""
469 snapshot_itr: Iterator[str] = reversed(basis_src_snapshots_with_guids)
470 while True:
471 guid, snapshot = next(snapshot_itr).split("\t", 1)
472 if "@" in snapshot:
473 result_snapshots.append(snapshot)
474 result_guids.append(guid)
475 last_appended_guid = guid
476 if snapshot == latest_common_src_snapshot: # latest_common_src_snapshot is a snapshot or bookmark
477 if guid != last_appended_guid and "@" not in snapshot:
478 # only appends the src bookmark if it has no snapshot. If the bookmark has a snapshot then that
479 # snapshot has already been appended, per the sort order previously used for 'zfs list -s ...'
480 result_snapshots.append(snapshot)
481 result_guids.append(guid)
482 break
483 result_snapshots.reverse()
484 result_guids.reverse()
485 assert len(result_snapshots) > 0
486 assert len(result_snapshots) == len(result_guids)
487 return result_guids, result_snapshots
489 # collect the most recent common snapshot (which may be a bookmark) followed by all src snapshots
490 # (that are not a bookmark) that are more recent than that.
491 cand_guids, cand_snapshots = replication_candidates()
492 if len(cand_snapshots) == 1:
493 # latest_src_snapshot is a (true) snapshot that is equal to latest_common_src_snapshot or LESS recent
494 # than latest_common_src_snapshot. The latter case can happen if latest_common_src_snapshot is a
495 # bookmark whose snapshot has been deleted on src.
496 return # nothing more tbd
498 recv_resume_token_result: tuple[str | None, list[str], list[str]] = _recv_resume_token(job, dst_dataset, retry_count)
499 recv_resume_token, send_resume_opts, recv_resume_opts = recv_resume_token_result
500 recv_opts: list[str] = p.zfs_recv_program_opts.copy() + recv_resume_opts
501 recv_opts, set_opts = _add_recv_property_options(job, False, recv_opts, src_dataset, props_cache)
502 if p.no_stream:
503 # skip intermediate snapshots
504 steps_todo: list[tuple[str, str, str, list[str]]] = [
505 ("-i", latest_common_src_snapshot, latest_src_snapshot, [latest_src_snapshot])
506 ]
507 else:
508 # include intermediate src snapshots that pass --{include,exclude}-snapshot-* policy, using
509 # a series of -i/-I send/receive steps that skip excluded src snapshots.
510 steps_todo = _incremental_send_steps_wrapper(
511 p, cand_snapshots, cand_guids, included_src_guids, recv_resume_token is not None
512 )
513 log.log(LOG_TRACE, "steps_todo: %s", list_formatter(steps_todo, "; "))
514 estimate_send_sizes: list[int] = _estimate_send_sizes_in_parallel(job, src, dst_dataset, recv_resume_token, steps_todo)
515 total_size: int = sum(estimate_send_sizes)
516 total_num: int = sum(len(to_snapshots) for incr_flag, from_snap, to_snap, to_snapshots in steps_todo)
517 done_size: int = 0
518 done_num: int = 0
519 for i, (incr_flag, from_snap, to_snap, to_snapshots) in enumerate(steps_todo):
520 curr_num_snapshots: int = len(to_snapshots)
521 curr_size: int = estimate_send_sizes[i]
522 humansize: str = _format_size(total_size) + "/" + _format_size(done_size) + "/" + _format_size(curr_size)
523 human_num: str = f"{total_num}/{done_num}/{curr_num_snapshots} snapshots"
524 if recv_resume_token:
525 send_opts: list[str] = p.curr_zfs_send_program_opts + send_resume_opts # e.g. curr + ["-t", "1-c740b4779-..."]
526 else:
527 send_opts = p.curr_zfs_send_program_opts + [incr_flag, from_snap, to_snap]
528 send_cmd: list[str] = p.split_args(f"{src.sudo} {p.zfs_program} send", send_opts)
529 recv_cmd: list[str] = p.split_args(
530 f"{dst.sudo} {p.zfs_program} receive", p.dry_run_recv, recv_opts, dst_dataset, allow_all=True
531 )
532 dense_size: str = p.two_or_more_spaces_regex.sub("", humansize.strip())
533 log.info(
534 p.dry(f"{tid} Incremental send {incr_flag}: %s"),
535 f"{from_snap} .. {to_snap[to_snap.index('@'):]} --> {dst_dataset} ({dense_size}) ({human_num}) ...",
536 )
537 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset, busy_if_send=False)
538 if p.dry_run and not job.dst_dataset_exists[dst_dataset]:
539 dry_run_no_send = True
540 dry_run_no_send = dry_run_no_send or p.dry_run_no_send
541 job.maybe_inject_params(error_trigger="incr_zfs_send_params")
542 _run_zfs_send_receive( # do the real work
543 job, src_dataset, dst_dataset, send_cmd, recv_cmd, curr_size, humansize, dry_run_no_send, "incr_zfs_send"
544 )
545 done_size += curr_size
546 done_num += curr_num_snapshots
547 recv_resume_token = None
548 with job.stats_lock:
549 job.num_snapshots_replicated += curr_num_snapshots
550 assert p.create_bookmarks
551 if p.create_bookmarks == "all":
552 _create_zfs_bookmarks(job, src, src_dataset, to_snapshots)
553 elif p.create_bookmarks != "none":
554 threshold_millis: int = p.xperiods.label_milliseconds("_" + p.create_bookmarks)
555 to_snapshots = [snap for snap in to_snapshots if p.xperiods.label_milliseconds(snap) >= threshold_millis]
556 if i == len(steps_todo) - 1 and (len(to_snapshots) == 0 or to_snapshots[-1] != to_snap):
557 to_snapshots.append(to_snap) # ensure latest common snapshot is bookmarked
558 _create_zfs_bookmarks(job, src, src_dataset, to_snapshots)
559 _zfs_set(job, set_opts, dst, dst_dataset)
562def _format_size(num_bytes: int) -> str:
563 """Formats a byte count for human-readable logs."""
564 return human_readable_bytes(num_bytes, separator="").rjust(_RIGHT_JUST)
567def _prepare_zfs_send_receive(
568 job: Job, src_dataset: str, send_cmd: list[str], recv_cmd: list[str], size_estimate_bytes: int, size_estimate_human: str
569) -> tuple[str, str, str]:
570 """Constructs zfs send/recv pipelines with optional compression, mbuffer and pv."""
571 p = job.params
572 src, dst = p.src, p.dst
573 send_cmd_str: str = shlex.join(send_cmd)
574 recv_cmd_str: str = shlex.join(recv_cmd)
575 src_has_shell: bool = p.is_program_available("sh", "src")
576 dst_has_shell: bool = p.is_program_available("sh", "dst")
577 same_remote: bool = is_same_remote(src, dst)
578 r2r_mode: str = p.r2r_mode
579 assert r2r_mode in ("off", "pull", "push"), r2r_mode
581 if (
582 p.is_program_available("zstd", "src")
583 and p.is_program_available("zstd", "dst")
584 and src_has_shell
585 and dst_has_shell
586 and (r2r_mode == "off" or not same_remote)
587 ):
588 compress_cmd_: str = _compress_cmd(p, "src", size_estimate_bytes)
589 decompress_cmd_: str = _decompress_cmd(p, "dst", size_estimate_bytes)
590 else: # no compression is used if source and destination do not both support compression
591 compress_cmd_, decompress_cmd_ = "cat", "cat"
593 recordsize: int = abs(job.src_properties[src_dataset].recordsize)
594 src_buffer: str = "cat"
595 if r2r_mode == "off" or (src_has_shell and not same_remote):
596 src_buffer = _mbuffer_cmd(p, "src", size_estimate_bytes, recordsize)
597 dst_buffer: str = "cat"
598 if r2r_mode == "off" or (dst_has_shell and not same_remote):
599 dst_buffer = _mbuffer_cmd(p, "dst", size_estimate_bytes, recordsize)
600 local_buffer: str = "cat"
601 if r2r_mode == "off":
602 local_buffer = _mbuffer_cmd(p, "local", size_estimate_bytes, recordsize)
604 pv_src_cmd: str = ""
605 pv_dst_cmd: str = ""
606 pv_loc_cmd: str = ""
607 if r2r_mode == "off":
608 if not p.src.ssh_user_host:
609 pv_src_cmd = _pv_cmd(job, "local", size_estimate_bytes, size_estimate_human)
610 elif not p.dst.ssh_user_host:
611 pv_dst_cmd = _pv_cmd(job, "local", size_estimate_bytes, size_estimate_human)
612 elif compress_cmd_ == "cat":
613 pv_loc_cmd = _pv_cmd(job, "local", size_estimate_bytes, size_estimate_human) # compression disabled
614 else:
615 # pull-push mode with compression enabled: reporting "percent complete" isn't straightforward because
616 # localhost observes the compressed data instead of the uncompressed data, so we disable the progress bar.
617 pv_loc_cmd = _pv_cmd(job, "local", size_estimate_bytes, size_estimate_human, disable_progress_bar=True)
619 # assemble pipeline running on source leg
620 src_pipe: str = ""
621 if job.inject_params.get("inject_src_pipe_fail", False):
622 # for testing; initially forward some bytes and then fail
623 src_pipe = f"{src_pipe} | dd bs=64 count=1 2>/dev/null && false"
624 if job.inject_params.get("inject_src_pipe_garble", False):
625 src_pipe = f"{src_pipe} | gzip -1 -c -n" # for testing; forward garbled bytes
626 if pv_src_cmd and pv_src_cmd != "cat":
627 src_pipe = f"{src_pipe} | {pv_src_cmd}"
628 if compress_cmd_ != "cat":
629 src_pipe = f"{src_pipe} | {compress_cmd_}"
630 if src_buffer != "cat":
631 src_pipe = f"{src_pipe} | {src_buffer}"
632 if src_pipe.startswith(" |"):
633 src_pipe = src_pipe[2:] # strip leading ' |' part
634 if job.inject_params.get("inject_src_send_error", False):
635 send_cmd_str = f"{send_cmd_str} --injectedGarbageParameter" # for testing; induce CLI parse error
636 if src_pipe:
637 src_pipe = f"{send_cmd_str} | {src_pipe}"
638 if p.src.ssh_user_host:
639 src_pipe = p.shell_program + " -c " + dquote(src_pipe)
640 else:
641 src_pipe = send_cmd_str
643 # assemble pipeline running on middle leg between source and destination. only enabled for pull-push mode
644 local_pipe: str = ""
645 if r2r_mode == "off":
646 if local_buffer != "cat":
647 local_pipe = f"{local_buffer}"
648 if pv_loc_cmd and pv_loc_cmd != "cat":
649 local_pipe = f"{local_pipe} | {pv_loc_cmd}"
650 if local_buffer != "cat":
651 local_pipe = f"{local_pipe} | {local_buffer}"
652 if local_pipe.startswith(" |"):
653 local_pipe = local_pipe[2:] # strip leading ' |' part
654 if local_pipe:
655 local_pipe = f"| {local_pipe}"
657 # assemble pipeline running on destination leg
658 dst_pipe: str = ""
659 if dst_buffer != "cat":
660 dst_pipe = f"{dst_buffer}"
661 if decompress_cmd_ != "cat":
662 dst_pipe = f"{dst_pipe} | {decompress_cmd_}"
663 if pv_dst_cmd and pv_dst_cmd != "cat":
664 dst_pipe = f"{dst_pipe} | {pv_dst_cmd}"
665 if job.inject_params.get("inject_dst_pipe_fail", False):
666 # interrupt zfs receive for testing retry/resume; initially forward some bytes and then stop forwarding
667 dst_pipe = f"{dst_pipe} | dd bs=1024 count={INJECT_DST_PIPE_FAIL_KBYTES} 2>/dev/null"
668 if job.inject_params.get("inject_dst_pipe_garble", False):
669 dst_pipe = f"{dst_pipe} | gzip -1 -c -n" # for testing; forward garbled bytes
670 if dst_pipe.startswith(" |"):
671 dst_pipe = dst_pipe[2:] # strip leading ' |' part
672 if job.inject_params.get("inject_dst_receive_error", False):
673 recv_cmd_str = f"{recv_cmd_str} --injectedGarbageParameter" # for testing; induce CLI parse error
674 if dst_pipe:
675 dst_pipe = f"{dst_pipe} | {recv_cmd_str}"
676 if p.dst.ssh_user_host:
677 dst_pipe = p.shell_program + " -c " + dquote(dst_pipe)
678 else:
679 dst_pipe = recv_cmd_str
681 if r2r_mode == "off":
682 # If there's no support for shell pipelines, we can't do compression, mbuffering, monitoring and rate-limiting,
683 # so we fall back to simple zfs send/receive.
684 if not src_has_shell:
685 src_pipe = send_cmd_str
686 if not dst_has_shell:
687 dst_pipe = recv_cmd_str
689 src_pipe = squote(p.src, src_pipe)
690 dst_pipe = squote(p.dst, dst_pipe)
691 return src_pipe, local_pipe, dst_pipe
693 def nested_ssh_cmd(remote: Remote) -> str:
694 """Builds nested ssh command text for r2r leg."""
695 if same_remote:
696 # Perf: use local replication if initiator sees src+dst have same user@host, port and ssh configfile.
697 # Example: ssh alice@src.example.com "sh -c 'zfs send <src_dataset> | zfs receive <dst_dataset>'"
698 return ""
699 cmd: list[str] = [p.ssh_program] + list(remote.ssh_extra_opts)
700 if remote.ssh_config_file:
701 cmd += ["-F", remote.ssh_config_file]
702 if remote.ssh_cipher:
703 cmd += ["-c", remote.ssh_cipher]
704 if remote.ssh_port:
705 cmd += ["-p", str(remote.ssh_port)]
706 assert remote.ssh_user_host
707 cmd.append(remote.ssh_user_host)
708 return shlex.join(cmd)
710 if r2r_mode == "push":
711 # Example: ssh alice@src.example.com "sh -c 'zfs send ... | ssh bob@dst.example.com zfs receive ...'"
712 dst_pipe = dst_pipe if same_remote else shlex.quote(dst_pipe)
713 r2r_cmd = f"{src_pipe} | {nested_ssh_cmd(dst)} {dst_pipe}"
714 else:
715 assert r2r_mode == "pull", r2r_mode
716 # Example: ssh bob@dst.example.com "sh -c 'ssh alice@src.example.com zfs send ... | zfs receive ...'"
717 src_pipe = src_pipe if same_remote else shlex.quote(src_pipe)
718 r2r_cmd = f"{nested_ssh_cmd(src)} {src_pipe} | {dst_pipe}"
719 r2r_cmd = shlex.quote(p.shell_program + " -c " + shlex.quote(r2r_cmd))
720 return r2r_cmd, "", ""
723def _run_zfs_send_receive(
724 job: Job,
725 src_dataset: str,
726 dst_dataset: str,
727 send_cmd: list[str],
728 recv_cmd: list[str],
729 size_estimate_bytes: int,
730 size_estimate_human: str,
731 dry_run_no_send: bool,
732 error_trigger: str | None = None,
733) -> None:
734 """Executes a zfs send/receive pipeline between source and destination."""
735 p, log = job.params, job.params.log
736 r2r_mode: str = p.r2r_mode
737 assert r2r_mode in ("off", "pull", "push"), r2r_mode
738 log.log(LOG_TRACE, "r2r_mode: %s", r2r_mode)
739 pipes: tuple[str, str, str] = _prepare_zfs_send_receive(
740 job, src_dataset, send_cmd, recv_cmd, size_estimate_bytes, size_estimate_human
741 )
742 src_pipe, local_pipe, dst_pipe = pipes
743 conn_pool_name: str = DEDICATED if p.dedicated_tcp_connection_per_zfs_send and r2r_mode == "off" else SHARED
744 src_conn_pool: ConnectionPool = p.connection_pools[p.src.location].pool(conn_pool_name)
745 dst_conn_pool: ConnectionPool = p.connection_pools[p.dst.location].pool(conn_pool_name)
746 with src_conn_pool.connection() as src_conn, dst_conn_pool.connection() as dst_conn:
747 src_conn.refresh_ssh_connection_if_necessary(job)
748 dst_conn.refresh_ssh_connection_if_necessary(job)
749 src_ssh_cmd: str = " ".join(src_conn.ssh_cmd_quoted)
750 dst_ssh_cmd: str = " ".join(dst_conn.ssh_cmd_quoted)
751 if r2r_mode == "off":
752 cmd_str = f"{src_ssh_cmd} {src_pipe} {local_pipe} | {dst_ssh_cmd} {dst_pipe}"
753 elif r2r_mode == "push":
754 cmd_str = f"{src_ssh_cmd} {src_pipe}"
755 else:
756 assert r2r_mode == "pull"
757 cmd_str = f"{dst_ssh_cmd} {src_pipe}"
759 cmd = [p.shell_program_local, "-c", cmd_str]
760 msg: str = "Would execute: %s" if dry_run_no_send else "Executing: %s"
761 log.debug(msg, cmd_str.lstrip())
762 if not dry_run_no_send:
763 try:
764 job.maybe_inject_error(cmd=cmd, error_trigger=error_trigger)
765 sp: Subprocesses = job.subprocesses
766 process = sp.subprocess_run(
767 cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, text=True, timeout=timeout(job), check=True, log=log
768 )
769 except (subprocess.CalledProcessError, UnicodeDecodeError) as e:
770 retry_immediately_once: bool = False
771 if not isinstance(e, UnicodeDecodeError):
772 xprint(log, stderr_to_str(e.stdout), file=sys.stdout)
773 log.warning("%s", stderr_to_str(e.stderr).rstrip())
774 if isinstance(e, subprocess.CalledProcessError):
775 retry_immediately_once = _clear_resumable_recv_state_if_necessary(
776 job, dst_dataset, stderr_to_str(e.stderr)
777 )
778 # op isn't idempotent so retries regather current state from the start of replicate_dataset()
779 raise RetryableError(display_msg="zfs send/receive", retry_immediately_once=retry_immediately_once) from e
780 else:
781 xprint(log, process.stdout, file=sys.stdout)
782 xprint(log, process.stderr, file=sys.stderr)
785def _clear_resumable_recv_state_if_necessary(job: Job, dst_dataset: str, stderr: str) -> bool:
786 """Deletes leftover ZFS resume token state on the receiving dataset if necessary to continue operations.
788 To make resumable ZFS receive a reliable feature, we cope with the following ZFS facts:
789 - A failed `zfs receive -s` prohibits the following subsequent operations, until the situation is explicitly resolved
790 via a successful subsequent `zfs receive`, or cleared via `zfs receive -A`:
791 - `zfs receive` without the resumable receive token (`zfs send -t <token>` is now required)
792 - `zfs destroy <snapshot>`
793 - `zfs rollback`
794 - `zfs send -t` does not support sending more than a single snapshot; e.g. https://github.com/openzfs/zfs/issues/16764
795 - A stale receive token prohibits subsequent `zfs send -t` if not handled (meanwhile, state changed on src or dst).
796 - `zfs receive -A` fails if the receiving dataset has no ZFS resume token (anymore).
797 """
799 def clear_resumable_recv_state() -> bool:
800 log.warning(p.dry("Aborting an interrupted zfs receive -s, deleting partially received state: %s"), dst_dataset)
801 cmd: list[str] = p.split_args(f"{p.dst.sudo} {p.zfs_program} receive -A", dst_dataset)
802 job.try_ssh_command(p.dst, LOG_TRACE, is_dry=p.dry_run, print_stdout=True, cmd=cmd)
803 log.log(LOG_TRACE, p.dry("Done Aborting an interrupted zfs receive -s: %s"), dst_dataset)
804 return True
806 p, log = job.params, job.params.log
807 # No i18n needed here. OpenZFS ships no translation catalogs, so gettext falls back to English msgids and locale settings
808 # have no effect. If translations ever appear, revisit this or inject LC_ALL=C.
810 # "cannot resume send: 'wb_src/tmp/src@s1' is no longer the same snapshot used in the initial send"
811 # "cannot resume send: 'wb_src/tmp/src@s1' used in the initial send no longer exists"
812 # "cannot resume send: incremental source 0xa000000000000000 no longer exists"
813 if "cannot resume send" in stderr and (
814 "is no longer the same snapshot used in the initial send" in stderr
815 or "used in the initial send no longer exists" in stderr
816 or re.search(r"incremental source [0-9a-fx]+ no longer exists", stderr)
817 ):
818 return clear_resumable_recv_state()
820 # "cannot receive resume stream: incompatible embedded data stream feature with encrypted receive."
821 # see https://github.com/openzfs/zfs/issues/12480
822 # 'cannot receive new filesystem stream: destination xx contains partially-complete state from "zfs receive -s"'
823 # this indicates that --no-resume-recv detects that dst contains a previously interrupted recv -s
824 elif "cannot receive" in stderr and (
825 "cannot receive resume stream: incompatible embedded data stream feature with encrypted receive" in stderr
826 or 'contains partially-complete state from "zfs receive -s"' in stderr
827 ):
828 return clear_resumable_recv_state()
830 elif ( # this signals normal behavior on interrupt of 'zfs receive -s' if running without --no-resume-recv
831 "cannot receive new filesystem stream: checksum mismatch or incomplete stream" in stderr
832 and "Partially received snapshot is saved" in stderr
833 ):
834 return True
836 # "cannot destroy 'wb_dest/tmp/dst@s1': snapshot has dependent clones ... use '-R' to destroy the following
837 # datasets: wb_dest/tmp/dst/%recv" # see https://github.com/openzfs/zfs/issues/10439#issuecomment-642774560
838 # This msg indicates a failed 'zfs destroy' via --delete-dst-snapshots. This "clone" is caused by a previously
839 # interrupted 'zfs receive -s'. The fix used here is to delete the partially received state of said
840 # 'zfs receive -s' via 'zfs receive -A', followed by an automatic retry, which will now succeed to delete the
841 # snapshot without user intervention.
842 elif (
843 "cannot destroy" in stderr
844 and "snapshot has dependent clone" in stderr
845 and "use '-R' to destroy the following dataset" in stderr
846 and f"\n{dst_dataset}/%recv\n" in stderr
847 ):
848 return clear_resumable_recv_state()
850 # Same cause as above, except that this error can occur during 'zfs rollback'
851 # Also see https://github.com/openzfs/zfs/blob/master/cmd/zfs/zfs_main.c
852 elif (
853 "cannot rollback to" in stderr
854 and "clones of previous snapshots exist" in stderr
855 and "use '-R' to force deletion of the following clones and dependents" in stderr
856 and f"\n{dst_dataset}/%recv\n" in stderr
857 ):
858 return clear_resumable_recv_state()
860 return False
863def _recv_resume_token(job: Job, dst_dataset: str, retry_count: int) -> tuple[str | None, list[str], list[str]]:
864 """Gets recv_resume_token ZFS property from dst_dataset and returns corresponding opts to use for send+recv."""
865 p, log = job.params, job.params.log
866 if not p.resume_recv:
867 return None, [], []
868 warning: str | None = None
869 if not is_zpool_feature_enabled_or_active(p, p.dst, "feature@extensible_dataset"):
870 warning = "not available on destination dataset"
871 elif not p.is_program_available(ZFS_VERSION_IS_AT_LEAST_2_1_0, "dst"):
872 warning = "unreliable as zfs version is too old" # e.g. zfs-0.8.3 "internal error: Unknown error 1040"
873 if warning:
874 log.warning(f"ZFS receive resume feature is {warning}. Falling back to --no-resume-recv: %s", dst_dataset)
875 return None, [], []
876 recv_resume_token: str | None = None
877 send_resume_opts: list[str] = []
878 if job.dst_dataset_exists[dst_dataset]:
879 cmd: list[str] = p.split_args(f"{p.zfs_program} get -Hp -o value -s none receive_resume_token", dst_dataset)
880 recv_resume_token = job.run_ssh_command(p.dst, LOG_TRACE, cmd=cmd).rstrip()
881 if recv_resume_token == "-" or not recv_resume_token: # noqa: S105
882 recv_resume_token = None
883 else:
884 send_resume_opts += ["-n"] if p.dry_run else []
885 send_resume_opts += ["-v"] if p.verbose_zfs else []
886 send_resume_opts += ["-t", recv_resume_token]
887 recv_resume_opts = ["-s"]
888 return recv_resume_token, send_resume_opts, recv_resume_opts
891def _mbuffer_cmd(p: Params, loc: str, size_estimate_bytes: int, recordsize: int) -> str:
892 """If mbuffer command is on the PATH, uses it in the ssh network pipe between 'zfs send' and 'zfs receive' to smooth out
893 the rate of data flow and prevent bottlenecks caused by network latency or speed fluctuation."""
894 if (
895 (p.no_estimate_send_size or size_estimate_bytes >= p.min_pipe_transfer_size)
896 and (
897 (loc == "src" and (p.src.is_nonlocal or p.dst.is_nonlocal))
898 or (loc == "dst" and (p.src.is_nonlocal or p.dst.is_nonlocal))
899 or (loc == "local" and p.src.is_nonlocal and p.dst.is_nonlocal)
900 )
901 and p.is_program_available("mbuffer", loc)
902 ):
903 recordsize = max(recordsize, 2 * 1024 * 1024)
904 mbuffer_program_opts: list[str] = [p.mbuffer_program, "-s", str(recordsize)] + p.mbuffer_program_opts
905 if p.bwlimit:
906 mbuffer_program_opts += ["-R" if loc == "src" else "-r", p.bwlimit.upper()]
907 return shlex.join(mbuffer_program_opts)
908 else:
909 return "cat"
912def _compress_cmd(p: Params, loc: str, size_estimate_bytes: int) -> str:
913 """If zstd command is on the PATH, uses it in the ssh network pipe between 'zfs send' and 'zfs receive' to reduce network
914 bottlenecks by sending compressed data."""
915 if (
916 (p.no_estimate_send_size or size_estimate_bytes >= p.min_pipe_transfer_size)
917 and (p.src.is_nonlocal or p.dst.is_nonlocal)
918 and p.is_program_available("zstd", loc)
919 ):
920 return shlex.join([p.compression_program, "-c"] + p.compression_program_opts)
921 else:
922 return "cat"
925def _decompress_cmd(p: Params, loc: str, size_estimate_bytes: int) -> str:
926 """Returns decompression command for network pipe if remote supports it."""
927 if (
928 (p.no_estimate_send_size or size_estimate_bytes >= p.min_pipe_transfer_size)
929 and (p.src.is_nonlocal or p.dst.is_nonlocal)
930 and p.is_program_available("zstd", loc)
931 ):
932 return shlex.join([p.compression_program, "-dc"])
933 else:
934 return "cat"
937_WORKER_THREAD_NUMBER_REGEX: Final[re.Pattern[str]] = re.compile(r"ThreadPoolExecutor-\d+_(\d+)")
940def _pv_cmd(
941 job: Job, loc: str, size_estimate_bytes: int, size_estimate_human: str, disable_progress_bar: bool = False
942) -> str:
943 """If pv command is on the PATH, monitors the progress of data transfer from 'zfs send' to 'zfs receive'; Progress can be
944 viewed via "tail -f $pv_log_file" aka tail -f ~/bzfs-logs/current/current.pv or similar."""
945 p = job.params
946 if p.is_program_available("pv", loc):
947 size: str = f"--size={size_estimate_bytes}"
948 if disable_progress_bar or p.no_estimate_send_size:
949 size = ""
950 pv_log_file: str = p.log_params.pv_log_file
951 thread_name: str = threading.current_thread().name
952 if match := _WORKER_THREAD_NUMBER_REGEX.fullmatch(thread_name):
953 worker = int(match.group(1))
954 if worker > 0:
955 pv_log_file += PV_FILE_THREAD_SEPARATOR + f"{worker:04}"
956 if job.is_first_replication_task.get_and_set(False):
957 if not p.log_params.quiet:
958 job.progress_reporter.start()
959 job.replication_start_time_nanos = time.monotonic_ns()
960 if not p.log_params.quiet:
961 with open_nofollow(pv_log_file, mode="a", encoding="utf-8", perm=FILE_PERMISSIONS) as fd:
962 fd.write("\n") # mark start of new stream so ProgressReporter can reliably reset bytes_in_flight
963 job.progress_reporter.enqueue_pv_log_file(pv_log_file)
964 pv_program_opts: list[str] = [p.pv_program] + p.pv_program_opts
965 if job.progress_update_intervals is not None: # for testing
966 pv_program_opts += [f"--interval={job.progress_update_intervals[0]}"]
967 pv_program_opts += ["--force", f"--name={size_estimate_human}"]
968 pv_program_opts += [size] if size else []
969 return f"LC_ALL=C {shlex.join(pv_program_opts)} 2>> {shlex.quote(pv_log_file)}"
970 else:
971 return "cat"
974def delete_snapshots(job: Job, remote: Remote, dataset: str, snapshot_tags: list[str]) -> None:
975 """Deletes snapshots in manageable batches on the specified remote."""
976 if len(snapshot_tags) == 0:
977 return
978 p, log = job.params, job.params.log
979 log.info(p.dry(f"Deleting {len(snapshot_tags)} snapshots within %s: %s"), dataset, snapshot_tags)
980 # delete snapshots in batches without creating a command line that's too big for the OS to handle
981 run_ssh_cmd_batched(
982 job,
983 remote,
984 _delete_snapshot_cmd(p, remote, dataset + "@"),
985 snapshot_tags,
986 lambda batch: _delete_snapshot(job, remote, dataset, dataset + "@" + ",".join(batch)),
987 max_batch_items=job.params.max_snapshots_per_minibatch_on_delete_snaps,
988 sep=",",
989 )
992def _delete_snapshot(job: Job, r: Remote, dataset: str, snapshots_to_delete: str) -> None:
993 """Runs zfs destroy for a comma-separated snapshot list."""
994 p = job.params
995 cmd: list[str] = _delete_snapshot_cmd(p, r, snapshots_to_delete)
996 is_dry: bool = False # False is safe because we're using the 'zfs destroy -n' flag
997 try:
998 job.maybe_inject_error(cmd=cmd, error_trigger="zfs_delete_snapshot")
999 job.run_ssh_command(r, LOG_DEBUG, is_dry=is_dry, print_stdout=True, cmd=cmd)
1000 except (subprocess.CalledProcessError, UnicodeDecodeError) as e:
1001 stderr: str = stderr_to_str(e.stderr) if hasattr(e, "stderr") else ""
1002 retry_immediately_once: bool = _clear_resumable_recv_state_if_necessary(job, dataset, stderr)
1003 # op isn't idempotent so retries regather current state from the start of delete_destination_snapshots() or similar
1004 raise RetryableError(display_msg="zfs destroy snapshot", retry_immediately_once=retry_immediately_once) from e
1007def _delete_snapshot_cmd(p: Params, r: Remote, snapshots_to_delete: str) -> list[str]:
1008 """Builds zfs destroy command for given snapshots."""
1009 return p.split_args(
1010 f"{r.sudo} {p.zfs_program} destroy", p.force_hard, p.verbose_destroy, p.dry_run_destroy, snapshots_to_delete
1011 )
1014def delete_bookmarks(job: Job, remote: Remote, dataset: str, snapshot_tags: list[str]) -> None:
1015 """Removes bookmarks individually since zfs lacks batch deletion."""
1016 if len(snapshot_tags) == 0:
1017 return
1018 # Unfortunately ZFS has no syntax yet to delete multiple bookmarks in a single CLI invocation
1019 p, log = job.params, job.params.log
1020 log.info(
1021 p.dry(f"Deleting {len(snapshot_tags)} bookmarks within %s: %s"), dataset, dataset + "#" + ",".join(snapshot_tags)
1022 )
1023 cmd: list[str] = p.split_args(f"{remote.sudo} {p.zfs_program} destroy")
1024 run_ssh_cmd_parallel(
1025 job,
1026 remote,
1027 [(cmd, (f"{dataset}#{snapshot_tag}" for snapshot_tag in snapshot_tags))],
1028 lambda _cmd, batch: job.try_ssh_command(
1029 remote, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=_cmd + batch, exists=False
1030 ),
1031 max_batch_items=1,
1032 )
1035def delete_datasets(job: Job, remote: Remote, datasets: Iterable[str]) -> None:
1036 """Deletes the given datasets via zfs destroy -r on the given remote."""
1037 # Impl is batch optimized to minimize CLI + network roundtrips: only need to run zfs destroy if previously
1038 # destroyed dataset (within sorted datasets) is not a prefix (aka ancestor) of current dataset
1039 p, log = job.params, job.params.log
1040 last_deleted_dataset: str = DONT_SKIP_DATASET
1041 for dataset in sorted(datasets):
1042 if is_descendant(dataset, of_root_dataset=last_deleted_dataset):
1043 continue
1044 log.info(p.dry("Deleting dataset tree: %s"), f"{dataset} ...")
1045 cmd: list[str] = p.split_args(
1046 f"{remote.sudo} {p.zfs_program} destroy -r {p.force_unmount} {p.force_hard} {p.verbose_destroy}",
1047 p.dry_run_destroy,
1048 dataset,
1049 )
1050 is_dry: bool = False # False is safe because we're using the 'zfs destroy -n' flag
1051 job.run_ssh_command(remote, LOG_DEBUG, is_dry=is_dry, print_stdout=True, cmd=cmd)
1052 last_deleted_dataset = dataset
1055def _create_zfs_filesystem(job: Job, filesystem: str) -> None:
1056 """Creates destination filesystem hierarchies without mounting them."""
1057 # zfs create -p -u $filesystem
1058 # To ensure the filesystems that we create do not get mounted, we apply a separate 'zfs create -p -u'
1059 # invocation for each non-existing ancestor. This is because a single 'zfs create -p -u' applies the '-u'
1060 # part only to the immediate filesystem, rather than to the not-yet existing ancestors.
1061 p = job.params
1062 parent: str = ""
1063 no_mount: str = "-u" if p.is_program_available(ZFS_VERSION_IS_AT_LEAST_2_1_0, "dst") else ""
1064 for component in filesystem.split("/"):
1065 parent += component
1066 if not job.dst_dataset_exists[parent]:
1067 cmd: list[str] = p.split_args(f"{p.dst.sudo} {p.zfs_program} create -p", no_mount, parent)
1068 try:
1069 job.run_ssh_command(p.dst, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=cmd)
1070 except subprocess.CalledProcessError as e:
1071 # ignore harmless error caused by 'zfs create' without the -u flag, or by dataset already existing
1072 stderr: str = stderr_to_str(e.stderr)
1073 if not ( 1073 ↛ 1079line 1073 didn't jump to line 1079 because the condition on line 1073 was never true
1074 "filesystem successfully created, but it may only be mounted by root" in stderr
1075 or "filesystem successfully created, but not mounted" in stderr # SolarisZFS
1076 or "dataset already exists" in stderr
1077 or "filesystem already exists" in stderr # SolarisZFS?
1078 ):
1079 raise
1080 if not p.dry_run:
1081 job.dst_dataset_exists[parent] = True
1082 parent += "/"
1085def _create_zfs_bookmarks(job: Job, remote: Remote, dataset: str, snapshots: list[str]) -> None:
1086 """Creates bookmarks for the given snapshots, using the 'zfs bookmark' CLI."""
1087 # Unfortunately ZFS has no syntax yet to create multiple bookmarks in a single CLI invocation
1088 p = job.params
1090 def create_zfs_bookmark(cmd: list[str]) -> None:
1091 snapshot = cmd[-1]
1092 assert "@" in snapshot
1093 bookmark_cmd: list[str] = cmd + [replace_prefix(snapshot, old_prefix=f"{dataset}@", new_prefix=f"{dataset}#")]
1094 try:
1095 job.run_ssh_command(remote, LOG_DEBUG, is_dry=p.dry_run, print_stderr=False, cmd=bookmark_cmd)
1096 except subprocess.CalledProcessError as e:
1097 # ignore harmless zfs error caused by bookmark with the same name already existing
1098 stderr: str = stderr_to_str(e.stderr)
1099 if ": bookmark exists" not in stderr:
1100 xprint(p.log, stderr, file=sys.stderr, end="")
1101 raise
1103 if p.create_bookmarks != "none" and are_bookmarks_enabled(p, remote):
1104 cmd: list[str] = p.split_args(f"{remote.sudo} {p.zfs_program} bookmark")
1105 run_ssh_cmd_parallel(
1106 job, remote, [(cmd, snapshots)], lambda _cmd, batch: create_zfs_bookmark(_cmd + batch), max_batch_items=1
1107 )
1110def _estimate_send_size(job: Job, remote: Remote, dst_dataset: str, recv_resume_token: str | None, *items: str) -> int:
1111 """Estimates num bytes to transfer via 'zfs send -nvP'; Thread-safe."""
1112 p = job.params
1113 if p.no_estimate_send_size: 1113 ↛ 1114line 1113 didn't jump to line 1114 because the condition on line 1113 was never true
1114 return 0
1115 zfs_send_program_opts: list[str] = ["--parsable" if opt == "-P" else opt for opt in p.curr_zfs_send_program_opts]
1116 zfs_send_program_opts = append_if_absent(zfs_send_program_opts, "-v", "-n", "--parsable")
1117 if recv_resume_token:
1118 zfs_send_program_opts += ["-t", recv_resume_token]
1119 items = ()
1120 cmd: list[str] = p.split_args(f"{remote.sudo} {p.zfs_program} send", zfs_send_program_opts, items)
1121 try:
1122 lines: str | None = job.try_ssh_command(remote, LOG_TRACE, cmd=cmd)
1123 except RetryableError as retryable_error:
1124 assert retryable_error.__cause__ is not None
1125 if recv_resume_token:
1126 e = retryable_error.__cause__
1127 stderr: str = stderr_to_str(e.stderr) if hasattr(e, "stderr") else ""
1128 retryable_error.retry_immediately_once = _clear_resumable_recv_state_if_necessary(job, dst_dataset, stderr)
1129 # op isn't idempotent so retries regather current state from the start of replicate_dataset()
1130 raise
1131 if lines is None: 1131 ↛ 1132line 1131 didn't jump to line 1132 because the condition on line 1131 was never true
1132 return 0 # src dataset or snapshot has been deleted by third party
1133 size: str = lines.splitlines()[-1]
1134 assert size.startswith("size")
1135 return int(size[size.index("\t") + 1 :])
1138def _estimate_send_sizes_in_parallel(
1139 job: Job,
1140 r: Remote,
1141 dst_dataset: str,
1142 recv_resume_token: str | None,
1143 steps_todo: list[tuple[str, str, str, list[str]]],
1144) -> list[int]:
1145 """Estimates num bytes to transfer for multiple send steps; in parallel to reduce latency."""
1146 p = job.params
1147 if p.no_estimate_send_size:
1148 return [0 for _ in steps_todo]
1150 def iterator_builder(executor: Executor) -> Iterable[Iterable[Future[int]]]:
1151 resume_token: str | None = recv_resume_token
1152 return [
1153 (
1154 executor.submit(
1155 _estimate_send_size, job, r, dst_dataset, resume_token if i == 0 else None, incr_flag, from_snap, to_snap
1156 )
1157 for i, (incr_flag, from_snap, to_snap, _to_snapshots) in enumerate(steps_todo)
1158 )
1159 ]
1161 max_workers: int = min(len(steps_todo), job.max_workers[r.location])
1162 return list(
1163 parallel_iterator(
1164 iterator_builder, max_workers=max_workers, ordered=True, is_terminated=job.termination_event.is_set
1165 )
1166 )
1169def _zfs_set(job: Job, properties: list[str], remote: Remote, dataset: str) -> None:
1170 """Applies the given property key=value pairs via 'zfs set' CLI to the given dataset on the given remote."""
1171 p = job.params
1172 if len(properties) == 0:
1173 return
1174 # set properties in batches without creating a command line that's too big for the OS to handle
1175 cmd: list[str] = p.split_args(f"{remote.sudo} {p.zfs_program} set")
1176 run_ssh_cmd_batched(
1177 job,
1178 remote,
1179 cmd,
1180 properties,
1181 lambda batch: job.run_ssh_command(
1182 remote, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=cmd + batch + [dataset]
1183 ),
1184 max_batch_items=2**29,
1185 )
1188def _zfs_get(
1189 job: Job,
1190 remote: Remote,
1191 dataset: str,
1192 sources: str,
1193 output_columns: str,
1194 propnames: str,
1195 splitlines: bool,
1196 props_cache: dict[tuple[str, str, str], dict[str, str | None]],
1197) -> dict[str, str | None]:
1198 """Returns the results of 'zfs get' CLI on the given dataset on the given remote."""
1199 assert dataset
1200 assert sources
1201 assert output_columns
1202 if not propnames:
1203 return {}
1204 p = job.params
1205 cache_key: tuple[str, str, str] = (sources, output_columns, propnames)
1206 props: dict[str, str | None] | None = props_cache.get(cache_key)
1207 if props is None:
1208 cmd: list[str] = p.split_args(f"{p.zfs_program} get -Hp -o {output_columns} -s {sources} {propnames}", dataset)
1209 lines: str = job.run_ssh_command(remote, LOG_TRACE, cmd=cmd)
1210 is_name_value_pair: bool = "," in output_columns
1211 props = {}
1212 # if not splitlines: omit single trailing newline that was appended by 'zfs get' CLI
1213 assert splitlines or len(lines) == 0 or lines[-1] == "\n"
1214 for line in lines.splitlines() if splitlines else [lines[0:-1]]:
1215 if is_name_value_pair:
1216 propname, propvalue = line.split("\t", 1)
1217 props[propname] = propvalue
1218 else:
1219 props[line] = None
1220 props_cache[cache_key] = props
1221 return props
1224def _incremental_send_steps_wrapper(
1225 p: Params, src_snapshots: list[str], src_guids: list[str], included_guids: set[str], is_resume: bool
1226) -> list[tuple[str, str, str, list[str]]]:
1227 """Returns incremental send steps, optionally converting -I to -i."""
1228 force_convert_I_to_i: bool = p.src.use_zfs_delegation and not getenv_bool("no_force_convert_I_to_i", True) # noqa: N806
1229 # force_convert_I_to_i == True implies that:
1230 # If using 'zfs allow' delegation mechanism, force convert 'zfs send -I' to a series of
1231 # 'zfs send -i' as a workaround for zfs issue https://github.com/openzfs/zfs/issues/16394
1232 return incremental_send_steps(src_snapshots, src_guids, included_guids, is_resume, force_convert_I_to_i)
1235def _add_recv_property_options(
1236 job: Job, full_send: bool, recv_opts: list[str], dataset: str, cache: dict[tuple[str, str, str], dict[str, str | None]]
1237) -> tuple[list[str], list[str]]:
1238 """Reads the ZFS properties of the given src dataset; Appends zfs recv -o and -x values to a recv_opts copy according to
1239 CLI params, and returns properties to explicitly set on the dst dataset after 'zfs receive' completes successfully."""
1240 is_volume: bool = job.src_properties[dataset].recordsize < 0
1241 p = job.params
1242 recv_opts = recv_opts.copy()
1243 set_opts: list[str] = []
1244 x_names: list[str] = p.zfs_recv_x_names
1245 x_names_set: set[str] = set(x_names)
1246 ox_names: set[str] = p.zfs_recv_ox_names.copy()
1247 if p.is_program_available(ZFS_VERSION_IS_AT_LEAST_2_2_0, p.dst.location):
1248 # workaround for https://github.com/openzfs/zfs/commit/b0269cd8ced242e66afc4fa856d62be29bb5a4ff
1249 # 'zfs recv -x foo' on zfs < 2.2 errors out if the 'foo' property isn't contained in the send stream
1250 for propname in x_names:
1251 recv_opts.append("-x")
1252 recv_opts.append(propname)
1253 ox_names.update(x_names) # union
1254 for config in [p.zfs_recv_o_config, p.zfs_recv_x_config, p.zfs_set_config]:
1255 if len(config.include_regexes) == 0:
1256 continue # this is the default - it's an instant noop
1257 if (full_send and "full" in config.targets) or (not full_send and "incremental" in config.targets):
1258 # 'zfs get' uses newline as record separator and tab as separator between output columns. A ZFS user property
1259 # may contain newline and tab characters (indeed anything). Together, this means that there is no reliable
1260 # way to determine where a record ends and the next record starts when listing multiple arbitrary records in
1261 # a single 'zfs get' call. Therefore, here we use a separate 'zfs get' call for each ZFS user property.
1262 # TODO: perf: on zfs >= 2.3 use json via zfs get -j to safely merge all zfs gets into one 'zfs get' call
1263 try:
1264 props_any: dict = _zfs_get(job, p.src, dataset, config.sources, "property", "all", True, cache)
1265 props_filtered: dict = filter_properties(p, props_any, config.include_regexes, config.exclude_regexes)
1266 user_propnames: list[str] = [name for name in props_filtered if ":" in name]
1267 sys_propnames: str = ",".join(name for name in props_filtered if ":" not in name)
1268 props: dict = _zfs_get(job, p.src, dataset, config.sources, "property,value", sys_propnames, True, cache)
1269 for propnames in user_propnames:
1270 props.update(_zfs_get(job, p.src, dataset, config.sources, "property,value", propnames, False, cache))
1271 except (subprocess.CalledProcessError, UnicodeDecodeError) as e:
1272 raise RetryableError(display_msg="zfs get") from e
1273 for propname in sorted(props.keys()):
1274 if config is p.zfs_recv_o_config:
1275 if not (propname in ox_names or propname in x_names_set):
1276 recv_opts.append("-o")
1277 recv_opts.append(f"{propname}={props[propname]}")
1278 ox_names.add(propname)
1279 elif config is p.zfs_recv_x_config:
1280 if propname not in ox_names:
1281 recv_opts.append("-x")
1282 recv_opts.append(propname)
1283 ox_names.add(propname)
1284 else:
1285 assert config is p.zfs_set_config
1286 set_opts.append(f"{propname}={props[propname]}")
1287 recv_opts = _sanitize_recv_opts_for_dataset_type(recv_opts, is_volume=is_volume)
1288 return recv_opts, set_opts
1291_ZFS_RECV_X_PROPS_VOLUME_ONLY: Final[frozenset[str]] = frozenset({"volblocksize", "volsize"})
1292_ZFS_RECV_O_PROPS_VOLUME_ONLY: Final[frozenset[str]] = _ZFS_RECV_X_PROPS_VOLUME_ONLY
1293_ZFS_RECV_X_PROPS_FILESYSTEM_ONLY: Final[frozenset[str]] = frozenset({"casesensitivity", "normalization", "utf8only"})
1294_ZFS_RECV_O_PROPS_FILESYSTEM_ONLY: Final[frozenset[str]] = frozenset(
1295 {
1296 # see https://github.com/openzfs/zfs/blob/master/module/zcommon/zfs_prop.c
1297 "aclinherit",
1298 "aclmode",
1299 "acltype",
1300 "atime",
1301 "canmount",
1302 "casesensitivity",
1303 "devices",
1304 "mountpoint",
1305 "normalization",
1306 "overlay",
1307 "recordsize",
1308 "relatime",
1309 "sharenfs",
1310 "sharesmb",
1311 "snapdir",
1312 "utf8only",
1313 "xattr",
1314 }
1315)
1318def _sanitize_recv_opts_for_dataset_type(recv_opts: list[str], *, is_volume: bool) -> list[str]:
1319 """Drops `zfs receive` `-o/-x` properties that `zfs receive` rejects as an error for this dataset type. Keeps properties
1320 for which `zfs receive` emits only (harmless) warnings.
1322 For example:
1323 drop -o canmount=<value> on zvols
1324 drop -o recordsize=<value> on zvols
1325 drop -x casesensitivity on zvols
1326 drop -o/-x volsize on filesystems
1327 keep -x mountpoint on zvols
1328 keep -o/-x volmode on filesystems
1329 """
1330 if is_volume:
1331 inapplicable_props_dict = {"-o": _ZFS_RECV_O_PROPS_FILESYSTEM_ONLY, "-x": _ZFS_RECV_X_PROPS_FILESYSTEM_ONLY}
1332 else:
1333 inapplicable_props_dict = {"-o": _ZFS_RECV_O_PROPS_VOLUME_ONLY, "-x": _ZFS_RECV_X_PROPS_VOLUME_ONLY}
1335 results: list[str] = []
1336 i = 0
1337 while i < len(recv_opts):
1338 opt: str = recv_opts[i]
1339 i += 1
1340 inapplicable_props: frozenset[str] | None = inapplicable_props_dict.get(opt)
1341 if inapplicable_props is None or i >= len(recv_opts):
1342 results.append(opt)
1343 continue
1344 arg: str = recv_opts[i]
1345 i += 1
1346 propname: str = arg.split("=", 1)[0] if opt == "-o" else arg
1347 if propname not in inapplicable_props: # retain this property on this dataset type
1348 results.append(opt)
1349 results.append(arg)
1350 return results
1353def _check_zfs_dataset_busy(job: Job, remote: Remote, dataset: str, busy_if_send: bool = True) -> bool:
1354 """Decline to start a state changing ZFS operation that is, although harmless, likely to collide with another currently
1355 running process. Instead, retry the operation later, after some delay. For example, decline to start a 'zfs receive' into
1356 a destination dataset if another process is already running another 'zfs receive' into the same destination dataset, as
1357 ZFS would reject any such attempt. However, it's actually fine to run an incremental 'zfs receive' into a dataset in
1358 parallel with a 'zfs send' out of the very same dataset. This also helps daisy chain use cases where A replicates to B,
1359 and B replicates to C.
1361 _check_zfs_dataset_busy() offers no guarantees, it merely proactively avoids likely collisions. In other words, even if
1362 the process check below passes there is no guarantee that the destination dataset won't be busy by the time we actually
1363 execute the 'zfs send' operation. In such an event ZFS will reject the operation, we'll detect that, and we'll simply
1364 auto-retry, after some delay. _check_zfs_dataset_busy() can be disabled via --ps-program=-.
1366 TLDR: As is common for long-running operations in distributed systems, we use coordination-free optimistic concurrency
1367 control where the parties simply retry on collision detection (rather than coordinate concurrency via a remote lock
1368 server).
1369 """
1370 p, log = job.params, job.params.log
1371 if not p.is_program_available("ps", remote.location):
1372 return True
1373 cmd: list[str] = p.split_args(f"{p.ps_program} -Ao args")
1374 procs: list[str] = (job.try_ssh_command(remote, LOG_TRACE, cmd=cmd) or "").splitlines()
1375 if job.inject_params.get("is_zfs_dataset_busy", False):
1376 procs += ["sudo -n zfs receive -u -o foo:bar=/baz " + dataset] # for unit testing only
1377 if not _is_zfs_dataset_busy(procs, dataset, busy_if_send=busy_if_send):
1378 return True
1379 op: str = "zfs {receive" + ("|send" if busy_if_send else "") + "} operation"
1380 try:
1381 die(f"Cannot continue now: Destination is already busy with {op} from another process: {dataset}")
1382 except SystemExit as e:
1383 log.warning("%s", e)
1384 raise RetryableError("dst currently busy with zfs mutation op", display_msg="replication") from e
1387_ZFS_DATASET_BUSY_PREFIX: Final[str] = r"(([^ ]*?/)?(sudo|doas)( +-n)? +)?([^ ]*?/)?zfs (receive|recv"
1388_ZFS_DATASET_BUSY_IF_MODS: Final[re.Pattern[str]] = re.compile((_ZFS_DATASET_BUSY_PREFIX + ") .*").replace("(", "(?:"))
1389_ZFS_DATASET_BUSY_IF_SEND: Final[re.Pattern[str]] = re.compile((_ZFS_DATASET_BUSY_PREFIX + "|send) .*").replace("(", "(?:"))
1392def _is_zfs_dataset_busy(procs: list[str], dataset: str, busy_if_send: bool) -> bool:
1393 """Checks if any process list entry indicates ZFS activity on dataset."""
1394 regex: re.Pattern[str] = _ZFS_DATASET_BUSY_IF_SEND if busy_if_send else _ZFS_DATASET_BUSY_IF_MODS
1395 suffix: str = " " + dataset
1396 infix: str = " " + dataset + "@"
1397 return any((proc.endswith(suffix) or infix in proc) and regex.fullmatch(proc) for proc in procs)