Coverage for bzfs_main/replication.py: 99%

644 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-06 13:30 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""The core replication algorithm is in replicate_dataset(), which performs reliable full and/or incremental 'zfs send' and 

16'zfs receive' operations on snapshots, using resumable ZFS sends when possible. 

17 

18For replication of multiple datasets, including recursive replication, see bzfs.py/replicate_datasets(). 

19""" 

20 

21from __future__ import annotations 

22import os 

23import re 

24import shlex 

25import subprocess 

26import sys 

27import threading 

28import time 

29from subprocess import DEVNULL, PIPE 

30from typing import ( 

31 TYPE_CHECKING, 

32 Iterable, 

33) 

34 

35from bzfs_main.argparse_actions import ( 

36 has_timerange_filter, 

37) 

38from bzfs_main.connection import ( 

39 DEDICATED, 

40 SHARED, 

41 ConnectionPool, 

42 maybe_inject_error, 

43 refresh_ssh_connection_if_necessary, 

44 run_ssh_command, 

45 timeout, 

46 try_ssh_command, 

47) 

48from bzfs_main.detect import ( 

49 ZFS_VERSION_IS_AT_LEAST_2_1_0, 

50 ZFS_VERSION_IS_AT_LEAST_2_2_0, 

51 are_bookmarks_enabled, 

52 is_solaris_zfs, 

53 is_solaris_zfs_location, 

54 is_zpool_feature_enabled_or_active, 

55) 

56from bzfs_main.filter import ( 

57 filter_properties, 

58 filter_snapshots, 

59) 

60from bzfs_main.incremental_send_steps import ( 

61 incremental_send_steps, 

62) 

63from bzfs_main.parallel_batch_cmd import ( 

64 run_ssh_cmd_batched, 

65 run_ssh_cmd_parallel, 

66) 

67from bzfs_main.parallel_iterator import ( 

68 run_in_parallel, 

69) 

70from bzfs_main.progress_reporter import ( 

71 PV_FILE_THREAD_SEPARATOR, 

72) 

73from bzfs_main.retry import ( 

74 Retry, 

75 RetryableError, 

76) 

77from bzfs_main.utils import ( 

78 DONT_SKIP_DATASET, 

79 LOG_DEBUG, 

80 LOG_TRACE, 

81 append_if_absent, 

82 cut, 

83 die, 

84 getenv_bool, 

85 human_readable_bytes, 

86 is_descendant, 

87 list_formatter, 

88 replace_prefix, 

89 stderr_to_str, 

90 subprocess_run, 

91 xprint, 

92) 

93 

94if TYPE_CHECKING: # pragma: no cover - for type hints only 

95 from bzfs_main.bzfs import Job 

96 from bzfs_main.configuration import Params, Remote 

97 

98 

99# constants: 

100INJECT_DST_PIPE_FAIL_KBYTES: int = 400 # for testing only 

101RIGHT_JUST: int = 7 

102 

103 

104def replicate_dataset(job: Job, src_dataset: str, tid: str, retry: Retry) -> bool: 

105 """Replicates src_dataset to dst_dataset (thread-safe); For replication of multiple datasets, including recursive 

106 replication, see bzfs.py/replicate_datasets().""" 

107 p, log = job.params, job.params.log 

108 src, dst = p.src, p.dst 

109 retry_count: int = retry.count 

110 dst_dataset: str = replace_prefix(src_dataset, old_prefix=src.root_dataset, new_prefix=dst.root_dataset) 

111 log.debug(p.dry(f"{tid} Replicating: %s"), f"{src_dataset} --> {dst_dataset} ...") 

112 

113 list_result: bool | tuple[list[str], list[str], list[str], set[str], str, str] = _list_and_filter_src_and_dst_snapshots( 

114 job, src_dataset, dst_dataset 

115 ) 

116 if isinstance(list_result, bool): 

117 return list_result 

118 ( 

119 basis_src_snapshots_with_guids, 

120 src_snapshots_with_guids, 

121 dst_snapshots_with_guids, 

122 included_src_guids, 

123 latest_src_snapshot, 

124 oldest_src_snapshot, 

125 ) = list_result 

126 log.debug("latest_src_snapshot: %s", latest_src_snapshot) 

127 latest_dst_snapshot: str = "" 

128 latest_common_src_snapshot: str = "" 

129 done_checking: bool = False 

130 

131 if job.dst_dataset_exists[dst_dataset]: 

132 rollback_result: bool | tuple[str, str, bool] = _rollback_dst_dataset_if_necessary( 

133 job, dst_dataset, latest_src_snapshot, src_snapshots_with_guids, dst_snapshots_with_guids, done_checking, tid 

134 ) 

135 if isinstance(rollback_result, bool): 

136 return rollback_result 

137 latest_dst_snapshot, latest_common_src_snapshot, done_checking = rollback_result 

138 

139 log.debug("latest_common_src_snapshot: %s", latest_common_src_snapshot) # is a snapshot or bookmark 

140 log.log(LOG_TRACE, "latest_dst_snapshot: %s", latest_dst_snapshot) 

141 props_cache: dict[tuple[str, str, str], dict[str, str | None]] = {} 

142 dry_run_no_send: bool = False 

143 if not latest_common_src_snapshot: 

144 # no common snapshot exists; delete all dst snapshots and perform a full send of the oldest selected src snapshot 

145 full_result: tuple[str, bool, bool, int] = _replicate_dataset_fully( 

146 job, 

147 src_dataset, 

148 dst_dataset, 

149 oldest_src_snapshot, 

150 latest_src_snapshot, 

151 latest_dst_snapshot, 

152 dst_snapshots_with_guids, 

153 props_cache, 

154 dry_run_no_send, 

155 done_checking, 

156 retry_count, 

157 tid, 

158 ) # we have now created a common snapshot 

159 latest_common_src_snapshot, dry_run_no_send, done_checking, retry_count = full_result 

160 if latest_common_src_snapshot: 

161 # finally, incrementally replicate all selected snapshots from latest common snapshot until latest src snapshot 

162 _replicate_dataset_incrementally( 

163 job, 

164 src_dataset, 

165 dst_dataset, 

166 latest_common_src_snapshot, 

167 latest_src_snapshot, 

168 basis_src_snapshots_with_guids, 

169 included_src_guids, 

170 props_cache, 

171 dry_run_no_send, 

172 done_checking, 

173 retry_count, 

174 tid, 

175 ) 

176 return True 

177 

178 

179def _list_and_filter_src_and_dst_snapshots( 

180 job: Job, src_dataset: str, dst_dataset: str 

181) -> bool | tuple[list[str], list[str], list[str], set[str], str, str]: 

182 """On replication, list and filter src and dst snapshots.""" 

183 p, log = job.params, job.params.log 

184 src, dst = p.src, p.dst 

185 

186 # list GUID and name for dst snapshots, sorted ascending by createtxg (more precise than creation time) 

187 dst_cmd: list[str] = p.split_args(f"{p.zfs_program} list -t snapshot -d 1 -s createtxg -Hp -o guid,name", dst_dataset) 

188 

189 # list GUID and name for src snapshots + bookmarks, primarily sort ascending by transaction group (which is more 

190 # precise than creation time), secondarily sort such that snapshots appear after bookmarks for the same GUID. 

191 # Note: A snapshot and its ZFS bookmarks always have the same GUID, creation time and transaction group. A snapshot 

192 # changes its transaction group but retains its creation time and GUID on 'zfs receive' on another pool, i.e. 

193 # comparing createtxg is only meaningful within a single pool, not across pools from src to dst. Comparing creation 

194 # time remains meaningful across pools from src to dst. Creation time is a UTC Unix time in integer seconds. 

195 # Note that 'zfs create', 'zfs snapshot' and 'zfs bookmark' CLIs enforce that snapshot names must not contain a '#' 

196 # char, bookmark names must not contain a '@' char, and dataset names must not contain a '#' or '@' char. 

197 # GUID and creation time also do not contain a '#' or '@' char. 

198 filter_needs_creation_time: bool = has_timerange_filter(p.snapshot_filters) 

199 types: str = "snapshot,bookmark" if p.use_bookmark and are_bookmarks_enabled(p, src) else "snapshot" 

200 props: str = job.creation_prefix + "creation,guid,name" if filter_needs_creation_time else "guid,name" 

201 src_cmd = p.split_args(f"{p.zfs_program} list -t {types} -s createtxg -s type -d 1 -Hp -o {props}", src_dataset) 

202 job.maybe_inject_delete(src, dataset=src_dataset, delete_trigger="zfs_list_snapshot_src") 

203 src_snapshots_and_bookmarks, dst_snapshots_with_guids_str = run_in_parallel( # list src+dst snapshots in parallel 

204 lambda: try_ssh_command(job, src, LOG_TRACE, cmd=src_cmd), 

205 lambda: try_ssh_command(job, dst, LOG_TRACE, cmd=dst_cmd, error_trigger="zfs_list_snapshot_dst"), 

206 ) 

207 job.dst_dataset_exists[dst_dataset] = dst_snapshots_with_guids_str is not None 

208 dst_snapshots_with_guids: list[str] = (dst_snapshots_with_guids_str or "").splitlines() 

209 if src_snapshots_and_bookmarks is None: 

210 log.warning("Third party deleted source: %s", src_dataset) 

211 return False # src dataset has been deleted by some third party while we're running - nothing to do anymore 

212 src_snapshots_with_guids: list[str] = src_snapshots_and_bookmarks.splitlines() 

213 src_snapshots_and_bookmarks = None 

214 if len(dst_snapshots_with_guids) == 0 and "bookmark" in types: 

215 # src bookmarks serve no purpose if the destination dataset has no snapshot; ignore them 

216 src_snapshots_with_guids = [snapshot for snapshot in src_snapshots_with_guids if "@" in snapshot] 

217 num_src_snapshots_found: int = sum(1 for snapshot in src_snapshots_with_guids if "@" in snapshot) 

218 with job.stats_lock: 

219 job.num_snapshots_found += num_src_snapshots_found 

220 # apply include/exclude regexes to ignore irrelevant src snapshots 

221 basis_src_snapshots_with_guids: list[str] = src_snapshots_with_guids 

222 src_snapshots_with_guids = filter_snapshots(job, src_snapshots_with_guids) 

223 if filter_needs_creation_time: 

224 src_snapshots_with_guids = cut(field=2, lines=src_snapshots_with_guids) 

225 basis_src_snapshots_with_guids = cut(field=2, lines=basis_src_snapshots_with_guids) 

226 

227 # find oldest and latest "true" snapshot, as well as GUIDs of all snapshots and bookmarks. 

228 # a snapshot is "true" if it is not a bookmark. 

229 oldest_src_snapshot: str = "" 

230 latest_src_snapshot: str = "" 

231 included_src_guids: set[str] = set() 

232 for line in src_snapshots_with_guids: 

233 guid, snapshot = line.split("\t", 1) 

234 included_src_guids.add(guid) 

235 if "@" in snapshot: 

236 latest_src_snapshot = snapshot 

237 if not oldest_src_snapshot: 

238 oldest_src_snapshot = snapshot 

239 if len(src_snapshots_with_guids) == 0: 

240 if p.skip_missing_snapshots == "fail": 

241 die(f"Source dataset includes no snapshot: {src_dataset}. Consider using --skip-missing-snapshots=dataset") 

242 elif p.skip_missing_snapshots == "dataset": 

243 log.warning("Skipping source dataset because it includes no snapshot: %s", src_dataset) 

244 if p.recursive and not job.dst_dataset_exists[dst_dataset]: 

245 log.warning("Also skipping descendant datasets as dst dataset does not exist for %s", src_dataset) 

246 return job.dst_dataset_exists[dst_dataset] 

247 return ( 

248 basis_src_snapshots_with_guids, 

249 src_snapshots_with_guids, 

250 dst_snapshots_with_guids, 

251 included_src_guids, 

252 latest_src_snapshot, 

253 oldest_src_snapshot, 

254 ) 

255 

256 

257def _rollback_dst_dataset_if_necessary( 

258 job: Job, 

259 dst_dataset: str, 

260 latest_src_snapshot: str, 

261 src_snapshots_with_guids: list[str], 

262 dst_snapshots_with_guids: list[str], 

263 done_checking: bool, 

264 tid: str, 

265) -> bool | tuple[str, str, bool]: 

266 """On replication, rollback dst if necessary.""" 

267 p, log = job.params, job.params.log 

268 dst = p.dst 

269 latest_dst_snapshot: str = "" 

270 latest_dst_guid: str = "" 

271 if len(dst_snapshots_with_guids) > 0: 

272 latest_dst_guid, latest_dst_snapshot = dst_snapshots_with_guids[-1].split("\t", 1) 

273 if p.force_rollback_to_latest_snapshot: 

274 log.info(p.dry(f"{tid} Rolling back destination to most recent snapshot: %s"), latest_dst_snapshot) 

275 # rollback just in case the dst dataset was modified since its most recent snapshot 

276 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset) 

277 cmd: list[str] = p.split_args(f"{dst.sudo} {p.zfs_program} rollback", latest_dst_snapshot) 

278 try_ssh_command(job, dst, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=cmd, exists=False) 

279 elif latest_src_snapshot == "": 

280 log.info(f"{tid} Already-up-to-date: %s", dst_dataset) 

281 return True 

282 

283 # find most recent snapshot (or bookmark) that src and dst have in common - we'll start to replicate 

284 # from there up to the most recent src snapshot. any two snapshots are "common" iff their ZFS GUIDs (i.e. 

285 # contents) are equal. See https://github.com/openzfs/zfs/commit/305bc4b370b20de81eaf10a1cf724374258b74d1 

286 def latest_common_snapshot(snapshots_with_guids: list[str], intersect_guids: set[str]) -> tuple[str | None, str]: 

287 """Returns a true snapshot instead of its bookmark with the same GUID, per the sort order previously used for 'zfs 

288 list -s ...'.""" 

289 for _line in reversed(snapshots_with_guids): 

290 guid_, snapshot_ = _line.split("\t", 1) 

291 if guid_ in intersect_guids: 

292 return guid_, snapshot_ # can be a snapshot or bookmark 

293 return None, "" 

294 

295 latest_common_guid, latest_common_src_snapshot = latest_common_snapshot( 

296 src_snapshots_with_guids, set(cut(field=1, lines=dst_snapshots_with_guids)) 

297 ) 

298 log.debug("latest_common_src_snapshot: %s", latest_common_src_snapshot) # is a snapshot or bookmark 

299 log.log(LOG_TRACE, "latest_dst_snapshot: %s", latest_dst_snapshot) 

300 

301 if latest_common_src_snapshot and latest_common_guid != latest_dst_guid: 

302 # found latest common snapshot but dst has an even newer snapshot. rollback dst to that common snapshot. 

303 assert latest_common_guid 

304 _, latest_common_dst_snapshot = latest_common_snapshot(dst_snapshots_with_guids, {latest_common_guid}) 

305 if not (p.force_rollback_to_latest_common_snapshot or p.force): 

306 die( 

307 f"Conflict: Most recent destination snapshot {latest_dst_snapshot} is more recent than " 

308 f"most recent common snapshot {latest_common_dst_snapshot}. Rollback destination first, " 

309 "for example via --force-rollback-to-latest-common-snapshot (or --force) option." 

310 ) 

311 if p.force_once: 

312 p.force.value = False 

313 p.force_rollback_to_latest_common_snapshot.value = False 

314 log.info(p.dry(f"{tid} Rolling back destination to most recent common snapshot: %s"), latest_common_dst_snapshot) 

315 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset) 

316 cmd = p.split_args( 

317 f"{dst.sudo} {p.zfs_program} rollback -r {p.force_unmount} {p.force_hard}", latest_common_dst_snapshot 

318 ) 

319 try: 

320 run_ssh_command(job, dst, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=cmd) 

321 except (subprocess.CalledProcessError, UnicodeDecodeError) as e: 

322 stderr: str = stderr_to_str(e.stderr) if hasattr(e, "stderr") else "" 

323 no_sleep: bool = _clear_resumable_recv_state_if_necessary(job, dst_dataset, stderr) 

324 # op isn't idempotent so retries regather current state from the start of replicate_dataset() 

325 raise RetryableError("Subprocess failed", no_sleep=no_sleep) from e 

326 

327 if latest_src_snapshot and latest_src_snapshot == latest_common_src_snapshot: 

328 log.info(f"{tid} Already up-to-date: %s", dst_dataset) 

329 return True 

330 return latest_dst_snapshot, latest_common_src_snapshot, done_checking 

331 

332 

333def _replicate_dataset_fully( 

334 job: Job, 

335 src_dataset: str, 

336 dst_dataset: str, 

337 oldest_src_snapshot: str, 

338 latest_src_snapshot: str, 

339 latest_dst_snapshot: str, 

340 dst_snapshots_with_guids: list[str], 

341 props_cache: dict[tuple[str, str, str], dict[str, str | None]], 

342 dry_run_no_send: bool, 

343 done_checking: bool, 

344 retry_count: int, 

345 tid: str, 

346) -> tuple[str, bool, bool, int]: 

347 """On replication, deletes all dst snapshots and performs a full send of the oldest selected src snapshot, which in turn 

348 creates a common snapshot.""" 

349 p, log = job.params, job.params.log 

350 src, dst = p.src, p.dst 

351 latest_common_src_snapshot: str = "" 

352 if latest_dst_snapshot: 

353 if not p.force: 

354 die( 

355 f"Conflict: No common snapshot found between {src_dataset} and {dst_dataset} even though " 

356 "destination has at least one snapshot. Aborting. Consider using --force option to first " 

357 "delete all existing destination snapshots in order to be able to proceed with replication." 

358 ) 

359 if p.force_once: 

360 p.force.value = False 

361 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset) 

362 delete_snapshots(job, dst, dst_dataset, snapshot_tags=cut(2, separator="@", lines=dst_snapshots_with_guids)) 

363 if p.dry_run: 

364 # As we're in --dryrun (--force) mode this conflict resolution step (see above) wasn't really executed: 

365 # "no common snapshot was found. delete all dst snapshots". In turn, this would cause the subsequent 

366 # 'zfs receive -n' to fail with "cannot receive new filesystem stream: destination has snapshots; must 

367 # destroy them to overwrite it". So we skip the zfs send/receive step and keep on trucking. 

368 dry_run_no_send = True 

369 

370 # to start with, fully replicate oldest snapshot, which in turn creates a common snapshot 

371 if p.no_stream: 

372 oldest_src_snapshot = latest_src_snapshot 

373 if oldest_src_snapshot: 

374 if not job.dst_dataset_exists[dst_dataset]: 

375 # on destination, create parent filesystem and ancestors if they do not yet exist 

376 dst_dataset_parent: str = os.path.dirname(dst_dataset) 

377 if not job.dst_dataset_exists[dst_dataset_parent]: 

378 if p.dry_run: 

379 dry_run_no_send = True 

380 if dst_dataset_parent != "": 380 ↛ 383line 380 didn't jump to line 383 because the condition on line 380 was always true

381 _create_zfs_filesystem(job, dst_dataset_parent) 

382 

383 recv_resume_token_result: tuple[str | None, list[str], list[str]] = _recv_resume_token(job, dst_dataset, retry_count) 

384 recv_resume_token, send_resume_opts, recv_resume_opts = recv_resume_token_result 

385 curr_size: int = _estimate_send_size(job, src, dst_dataset, recv_resume_token, oldest_src_snapshot) 

386 humansize: str = _format_size(curr_size) 

387 if recv_resume_token: 

388 send_opts: list[str] = send_resume_opts # e.g. ["-t", "1-c740b4779-..."] 

389 else: 

390 send_opts = p.curr_zfs_send_program_opts + [oldest_src_snapshot] 

391 send_cmd: list[str] = p.split_args(f"{src.sudo} {p.zfs_program} send", send_opts) 

392 recv_opts: list[str] = p.zfs_full_recv_opts.copy() + recv_resume_opts 

393 recv_opts, set_opts = _add_recv_property_options(job, True, recv_opts, src_dataset, props_cache) 

394 recv_cmd: list[str] = p.split_args( 

395 f"{dst.sudo} {p.zfs_program} receive -F", p.dry_run_recv, recv_opts, dst_dataset, allow_all=True 

396 ) 

397 log.info(p.dry(f"{tid} Full send: %s"), f"{oldest_src_snapshot} --> {dst_dataset} ({humansize.strip()}) ...") 

398 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset) 

399 dry_run_no_send = dry_run_no_send or p.dry_run_no_send 

400 job.maybe_inject_params(error_trigger="full_zfs_send_params") 

401 humansize = humansize.rjust(RIGHT_JUST * 3 + 2) 

402 _run_zfs_send_receive( 

403 job, src_dataset, dst_dataset, send_cmd, recv_cmd, curr_size, humansize, dry_run_no_send, "full_zfs_send" 

404 ) 

405 latest_common_src_snapshot = oldest_src_snapshot # we have now created a common snapshot 

406 if not dry_run_no_send and not p.dry_run: 

407 job.dst_dataset_exists[dst_dataset] = True 

408 with job.stats_lock: 

409 job.num_snapshots_replicated += 1 

410 _create_zfs_bookmarks(job, src, src_dataset, [oldest_src_snapshot]) 

411 _zfs_set(job, set_opts, dst, dst_dataset) 

412 dry_run_no_send = dry_run_no_send or p.dry_run 

413 retry_count = 0 

414 

415 return latest_common_src_snapshot, dry_run_no_send, done_checking, retry_count 

416 

417 

418def _replicate_dataset_incrementally( 

419 job: Job, 

420 src_dataset: str, 

421 dst_dataset: str, 

422 latest_common_src_snapshot: str, 

423 latest_src_snapshot: str, 

424 basis_src_snapshots_with_guids: list[str], 

425 included_src_guids: set[str], 

426 props_cache: dict[tuple[str, str, str], dict[str, str | None]], 

427 dry_run_no_send: bool, 

428 done_checking: bool, 

429 retry_count: int, 

430 tid: str, 

431) -> None: 

432 """Incrementally replicates all selected snapshots from latest common snapshot until latest src snapshot.""" 

433 p, log = job.params, job.params.log 

434 src, dst = p.src, p.dst 

435 

436 def replication_candidates() -> tuple[list[str], list[str]]: 

437 assert len(basis_src_snapshots_with_guids) > 0 

438 result_snapshots: list[str] = [] 

439 result_guids: list[str] = [] 

440 last_appended_guid: str = "" 

441 snapshot_itr = reversed(basis_src_snapshots_with_guids) 

442 while True: 

443 guid, snapshot = snapshot_itr.__next__().split("\t", 1) 

444 if "@" in snapshot: 

445 result_snapshots.append(snapshot) 

446 result_guids.append(guid) 

447 last_appended_guid = guid 

448 if snapshot == latest_common_src_snapshot: # latest_common_src_snapshot is a snapshot or bookmark 

449 if guid != last_appended_guid and "@" not in snapshot: 

450 # only appends the src bookmark if it has no snapshot. If the bookmark has a snapshot then that 

451 # snapshot has already been appended, per the sort order previously used for 'zfs list -s ...' 

452 result_snapshots.append(snapshot) 

453 result_guids.append(guid) 

454 break 

455 result_snapshots.reverse() 

456 result_guids.reverse() 

457 assert len(result_snapshots) > 0 

458 assert len(result_snapshots) == len(result_guids) 

459 return result_guids, result_snapshots 

460 

461 # collect the most recent common snapshot (which may be a bookmark) followed by all src snapshots 

462 # (that are not a bookmark) that are more recent than that. 

463 cand_guids, cand_snapshots = replication_candidates() 

464 if len(cand_snapshots) == 1: 

465 # latest_src_snapshot is a (true) snapshot that is equal to latest_common_src_snapshot or LESS recent 

466 # than latest_common_src_snapshot. The latter case can happen if latest_common_src_snapshot is a 

467 # bookmark whose snapshot has been deleted on src. 

468 return # nothing more tbd 

469 

470 recv_resume_token_result: tuple[str | None, list[str], list[str]] = _recv_resume_token(job, dst_dataset, retry_count) 

471 recv_resume_token, send_resume_opts, recv_resume_opts = recv_resume_token_result 

472 recv_opts: list[str] = p.zfs_recv_program_opts.copy() + recv_resume_opts 

473 recv_opts, set_opts = _add_recv_property_options(job, False, recv_opts, src_dataset, props_cache) 

474 if p.no_stream: 

475 # skip intermediate snapshots 

476 steps_todo: list[tuple[str, str, str, list[str]]] = [ 

477 ("-i", latest_common_src_snapshot, latest_src_snapshot, [latest_src_snapshot]) 

478 ] 

479 else: 

480 # include intermediate src snapshots that pass --{include,exclude}-snapshot-* policy, using 

481 # a series of -i/-I send/receive steps that skip excluded src snapshots. 

482 steps_todo = _incremental_send_steps_wrapper( 

483 p, cand_snapshots, cand_guids, included_src_guids, recv_resume_token is not None 

484 ) 

485 log.log(LOG_TRACE, "steps_todo: %s", list_formatter(steps_todo, "; ")) 

486 estimate_send_sizes: list[int] = [ 

487 _estimate_send_size(job, src, dst_dataset, recv_resume_token if i == 0 else None, incr_flag, from_snap, to_snap) 

488 for i, (incr_flag, from_snap, to_snap, to_snapshots) in enumerate(steps_todo) 

489 ] 

490 total_size: int = sum(estimate_send_sizes) 

491 total_num: int = sum(len(to_snapshots) for incr_flag, from_snap, to_snap, to_snapshots in steps_todo) 

492 done_size: int = 0 

493 done_num: int = 0 

494 for i, (incr_flag, from_snap, to_snap, to_snapshots) in enumerate(steps_todo): 

495 curr_num_snapshots: int = len(to_snapshots) 

496 curr_size: int = estimate_send_sizes[i] 

497 humansize: str = _format_size(total_size) + "/" + _format_size(done_size) + "/" + _format_size(curr_size) 

498 human_num: str = f"{total_num}/{done_num}/{curr_num_snapshots} snapshots" 

499 if recv_resume_token: 

500 send_opts: list[str] = send_resume_opts # e.g. ["-t", "1-c740b4779-..."] 

501 else: 

502 send_opts = p.curr_zfs_send_program_opts + [incr_flag, from_snap, to_snap] 

503 send_cmd: list[str] = p.split_args(f"{src.sudo} {p.zfs_program} send", send_opts) 

504 recv_cmd: list[str] = p.split_args( 

505 f"{dst.sudo} {p.zfs_program} receive", p.dry_run_recv, recv_opts, dst_dataset, allow_all=True 

506 ) 

507 dense_size: str = p.two_or_more_spaces_regex.sub("", humansize.strip()) 

508 log.info( 

509 p.dry(f"{tid} Incremental send {incr_flag}: %s"), 

510 f"{from_snap} .. {to_snap[to_snap.index('@'):]} --> {dst_dataset} ({dense_size}) ({human_num}) ...", 

511 ) 

512 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset, busy_if_send=False) 

513 if p.dry_run and not job.dst_dataset_exists[dst_dataset]: 

514 dry_run_no_send = True 

515 dry_run_no_send = dry_run_no_send or p.dry_run_no_send 

516 job.maybe_inject_params(error_trigger="incr_zfs_send_params") 

517 _run_zfs_send_receive( 

518 job, src_dataset, dst_dataset, send_cmd, recv_cmd, curr_size, humansize, dry_run_no_send, "incr_zfs_send" 

519 ) 

520 done_size += curr_size 

521 done_num += curr_num_snapshots 

522 recv_resume_token = None 

523 with job.stats_lock: 

524 job.num_snapshots_replicated += curr_num_snapshots 

525 if p.create_bookmarks == "all": 

526 _create_zfs_bookmarks(job, src, src_dataset, to_snapshots) 

527 elif p.create_bookmarks == "many": 

528 to_snapshots = [snap for snap in to_snapshots if p.xperiods.label_milliseconds(snap) >= 60 * 60 * 1000] 

529 if i == len(steps_todo) - 1 and (len(to_snapshots) == 0 or to_snapshots[-1] != to_snap): 

530 to_snapshots.append(to_snap) 

531 _create_zfs_bookmarks(job, src, src_dataset, to_snapshots) 

532 _zfs_set(job, set_opts, dst, dst_dataset) 

533 

534 

535def _format_size(num_bytes: int) -> str: 

536 """Formats a byte count for human-readable logs.""" 

537 return human_readable_bytes(num_bytes, separator="").rjust(RIGHT_JUST) 

538 

539 

540def _prepare_zfs_send_receive( 

541 job: Job, src_dataset: str, send_cmd: list[str], recv_cmd: list[str], size_estimate_bytes: int, size_estimate_human: str 

542) -> tuple[str, str, str]: 

543 """Constructs zfs send/recv pipelines with optional compression and pv.""" 

544 p = job.params 

545 send_cmd_str: str = shlex.join(send_cmd) 

546 recv_cmd_str: str = shlex.join(recv_cmd) 

547 

548 if p.is_program_available("zstd", "src") and p.is_program_available("zstd", "dst"): 

549 compress_cmd_: str = _compress_cmd(p, "src", size_estimate_bytes) 

550 decompress_cmd_: str = _decompress_cmd(p, "dst", size_estimate_bytes) 

551 else: # no compression is used if source and destination do not both support compression 

552 compress_cmd_, decompress_cmd_ = "cat", "cat" 

553 

554 recordsize: int = abs(job.src_properties[src_dataset].recordsize) 

555 src_buffer: str = _mbuffer_cmd(p, "src", size_estimate_bytes, recordsize) 

556 dst_buffer: str = _mbuffer_cmd(p, "dst", size_estimate_bytes, recordsize) 

557 local_buffer: str = _mbuffer_cmd(p, "local", size_estimate_bytes, recordsize) 

558 

559 pv_src_cmd: str = "" 

560 pv_dst_cmd: str = "" 

561 pv_loc_cmd: str = "" 

562 if p.src.ssh_user_host == "": 

563 pv_src_cmd = _pv_cmd(job, "local", size_estimate_bytes, size_estimate_human) 

564 elif p.dst.ssh_user_host == "": 

565 pv_dst_cmd = _pv_cmd(job, "local", size_estimate_bytes, size_estimate_human) 

566 elif compress_cmd_ == "cat": 

567 pv_loc_cmd = _pv_cmd(job, "local", size_estimate_bytes, size_estimate_human) # compression disabled 

568 else: 

569 # pull-push mode with compression enabled: reporting "percent complete" isn't straightforward because 

570 # localhost observes the compressed data instead of the uncompressed data, so we disable the progress bar. 

571 pv_loc_cmd = _pv_cmd(job, "local", size_estimate_bytes, size_estimate_human, disable_progress_bar=True) 

572 

573 # assemble pipeline running on source leg 

574 src_pipe: str = "" 

575 if job.inject_params.get("inject_src_pipe_fail", False): 

576 # for testing; initially forward some bytes and then fail 

577 src_pipe = f"{src_pipe} | dd bs=64 count=1 2>/dev/null && false" 

578 if job.inject_params.get("inject_src_pipe_garble", False): 

579 src_pipe = f"{src_pipe} | base64" # for testing; forward garbled bytes 

580 if pv_src_cmd != "" and pv_src_cmd != "cat": 

581 src_pipe = f"{src_pipe} | {pv_src_cmd}" 

582 if compress_cmd_ != "cat": 

583 src_pipe = f"{src_pipe} | {compress_cmd_}" 

584 if src_buffer != "cat": 

585 src_pipe = f"{src_pipe} | {src_buffer}" 

586 if src_pipe.startswith(" |"): 

587 src_pipe = src_pipe[2:] # strip leading ' |' part 

588 if job.inject_params.get("inject_src_send_error", False): 

589 send_cmd_str = f"{send_cmd_str} --injectedGarbageParameter" # for testing; induce CLI parse error 

590 if src_pipe != "": 

591 src_pipe = f"{send_cmd_str} | {src_pipe}" 

592 if p.src.ssh_user_host != "": 

593 src_pipe = p.shell_program + " -c " + _dquote(src_pipe) 

594 else: 

595 src_pipe = send_cmd_str 

596 

597 # assemble pipeline running on middle leg between source and destination. only enabled for pull-push mode 

598 local_pipe: str = "" 

599 if local_buffer != "cat": 

600 local_pipe = f"{local_buffer}" 

601 if pv_loc_cmd != "" and pv_loc_cmd != "cat": 

602 local_pipe = f"{local_pipe} | {pv_loc_cmd}" 

603 if local_buffer != "cat": 

604 local_pipe = f"{local_pipe} | {local_buffer}" 

605 if local_pipe.startswith(" |"): 

606 local_pipe = local_pipe[2:] # strip leading ' |' part 

607 if local_pipe != "": 

608 local_pipe = f"| {local_pipe}" 

609 

610 # assemble pipeline running on destination leg 

611 dst_pipe: str = "" 

612 if dst_buffer != "cat": 

613 dst_pipe = f"{dst_buffer}" 

614 if decompress_cmd_ != "cat": 

615 dst_pipe = f"{dst_pipe} | {decompress_cmd_}" 

616 if pv_dst_cmd != "" and pv_dst_cmd != "cat": 

617 dst_pipe = f"{dst_pipe} | {pv_dst_cmd}" 

618 if job.inject_params.get("inject_dst_pipe_fail", False): 

619 # interrupt zfs receive for testing retry/resume; initially forward some bytes and then stop forwarding 

620 dst_pipe = f"{dst_pipe} | dd bs=1024 count={INJECT_DST_PIPE_FAIL_KBYTES} 2>/dev/null" 

621 if job.inject_params.get("inject_dst_pipe_garble", False): 

622 dst_pipe = f"{dst_pipe} | base64" # for testing; forward garbled bytes 

623 if dst_pipe.startswith(" |"): 

624 dst_pipe = dst_pipe[2:] # strip leading ' |' part 

625 if job.inject_params.get("inject_dst_receive_error", False): 

626 recv_cmd_str = f"{recv_cmd_str} --injectedGarbageParameter" # for testing; induce CLI parse error 

627 if dst_pipe != "": 

628 dst_pipe = f"{dst_pipe} | {recv_cmd_str}" 

629 if p.dst.ssh_user_host != "": 

630 dst_pipe = p.shell_program + " -c " + _dquote(dst_pipe) 

631 else: 

632 dst_pipe = recv_cmd_str 

633 

634 # If there's no support for shell pipelines, we can't do compression, mbuffering, monitoring and rate-limiting, 

635 # so we fall back to simple zfs send/receive. 

636 if not p.is_program_available("sh", "src"): 

637 src_pipe = send_cmd_str 

638 if not p.is_program_available("sh", "dst"): 

639 dst_pipe = recv_cmd_str 

640 if not p.is_program_available("sh", "local"): 

641 local_pipe = "" 

642 

643 src_pipe = _squote(p.src, src_pipe) 

644 dst_pipe = _squote(p.dst, dst_pipe) 

645 return src_pipe, local_pipe, dst_pipe 

646 

647 

648def _run_zfs_send_receive( 

649 job: Job, 

650 src_dataset: str, 

651 dst_dataset: str, 

652 send_cmd: list[str], 

653 recv_cmd: list[str], 

654 size_estimate_bytes: int, 

655 size_estimate_human: str, 

656 dry_run_no_send: bool, 

657 error_trigger: str | None = None, 

658) -> None: 

659 """Executes a zfs send/receive pipeline between source and destination.""" 

660 p, log = job.params, job.params.log 

661 pipes: tuple[str, str, str] = _prepare_zfs_send_receive( 

662 job, src_dataset, send_cmd, recv_cmd, size_estimate_bytes, size_estimate_human 

663 ) 

664 src_pipe, local_pipe, dst_pipe = pipes 

665 conn_pool_name: str = DEDICATED if job.dedicated_tcp_connection_per_zfs_send else SHARED 

666 src_conn_pool: ConnectionPool = p.connection_pools["src"].pool(conn_pool_name) 

667 dst_conn_pool: ConnectionPool = p.connection_pools["dst"].pool(conn_pool_name) 

668 with src_conn_pool.connection() as src_conn, dst_conn_pool.connection() as dst_conn: 

669 refresh_ssh_connection_if_necessary(job, p.src, src_conn) 

670 refresh_ssh_connection_if_necessary(job, p.dst, dst_conn) 

671 src_ssh_cmd: str = " ".join(src_conn.ssh_cmd_quoted) 

672 dst_ssh_cmd: str = " ".join(dst_conn.ssh_cmd_quoted) 

673 cmd: list[str] = [p.shell_program_local, "-c", f"{src_ssh_cmd} {src_pipe} {local_pipe} | {dst_ssh_cmd} {dst_pipe}"] 

674 msg: str = "Would execute: %s" if dry_run_no_send else "Executing: %s" 

675 log.debug(msg, cmd[2].lstrip()) 

676 if not dry_run_no_send: 

677 try: 

678 maybe_inject_error(job, cmd=cmd, error_trigger=error_trigger) 

679 process = subprocess_run( 

680 cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, text=True, timeout=timeout(job), check=True 

681 ) 

682 except (subprocess.CalledProcessError, UnicodeDecodeError) as e: 

683 no_sleep: bool = False 

684 if not isinstance(e, UnicodeDecodeError): 

685 xprint(log, stderr_to_str(e.stdout), file=sys.stdout) 

686 log.warning("%s", stderr_to_str(e.stderr).rstrip()) 

687 if isinstance(e, subprocess.CalledProcessError): 

688 no_sleep = _clear_resumable_recv_state_if_necessary(job, dst_dataset, e.stderr) 

689 # op isn't idempotent so retries regather current state from the start of replicate_dataset() 

690 raise RetryableError("Subprocess failed", no_sleep=no_sleep) from e 

691 else: 

692 xprint(log, process.stdout, file=sys.stdout) 

693 xprint(log, process.stderr, file=sys.stderr) 

694 

695 

696def _clear_resumable_recv_state_if_necessary(job: Job, dst_dataset: str, stderr: str) -> bool: 

697 """Deletes leftover state when resume tokens fail to apply.""" 

698 

699 def clear_resumable_recv_state() -> bool: 

700 log.warning(p.dry("Aborting an interrupted zfs receive -s, deleting partially received state: %s"), dst_dataset) 

701 cmd: list[str] = p.split_args(f"{p.dst.sudo} {p.zfs_program} receive -A", dst_dataset) 

702 try_ssh_command(job, p.dst, LOG_TRACE, is_dry=p.dry_run, print_stdout=True, cmd=cmd) 

703 log.log(LOG_TRACE, p.dry("Done Aborting an interrupted zfs receive -s: %s"), dst_dataset) 

704 return True 

705 

706 p, log = job.params, job.params.log 

707 # "cannot resume send: 'wb_src/tmp/src@s1' is no longer the same snapshot used in the initial send" 

708 # "cannot resume send: 'wb_src/tmp/src@s1' used in the initial send no longer exists" 

709 # "cannot resume send: incremental source 0xa000000000000000 no longer exists" 

710 if "cannot resume send" in stderr and ( 

711 "is no longer the same snapshot used in the initial send" in stderr 

712 or "used in the initial send no longer exists" in stderr 

713 or re.match(r".*incremental source [0-9a-fx]+ no longer exists", stderr) 

714 ): 

715 return clear_resumable_recv_state() 

716 

717 # "cannot receive resume stream: incompatible embedded data stream feature with encrypted receive." 

718 # see https://github.com/openzfs/zfs/issues/12480 

719 # 'cannot receive new filesystem stream: destination xx contains partially-complete state from "zfs receive -s"' 

720 # this indicates that --no-resume-recv detects that dst contains a previously interrupted recv -s 

721 elif "cannot receive" in stderr and ( 

722 "cannot receive resume stream: incompatible embedded data stream feature with encrypted receive" in stderr 

723 or 'contains partially-complete state from "zfs receive -s"' in stderr 

724 ): 

725 return clear_resumable_recv_state() 

726 

727 elif ( # this signals normal behavior on interrupt of 'zfs receive -s' if running without --no-resume-recv 

728 "cannot receive new filesystem stream: checksum mismatch or incomplete stream" in stderr 

729 and "Partially received snapshot is saved" in stderr 

730 ): 

731 return True 

732 

733 # "cannot destroy 'wb_dest/tmp/dst@s1': snapshot has dependent clones ... use '-R' to destroy the following 

734 # datasets: wb_dest/tmp/dst/%recv" # see https://github.com/openzfs/zfs/issues/10439#issuecomment-642774560 

735 # This msg indicates a failed 'zfs destroy' via --delete-dst-snapshots. This "clone" is caused by a previously 

736 # interrupted 'zfs receive -s'. The fix used here is to delete the partially received state of said 

737 # 'zfs receive -s' via 'zfs receive -A', followed by an automatic retry, which will now succeed to delete the 

738 # snapshot without user intervention. 

739 elif ( 

740 "cannot destroy" in stderr 

741 and "snapshot has dependent clone" in stderr 

742 and "use '-R' to destroy the following dataset" in stderr 

743 and f"\n{dst_dataset}/%recv\n" in stderr 

744 ): 

745 return clear_resumable_recv_state() 

746 

747 # Same cause as above, except that this error can occur during 'zfs rollback' 

748 # Also see https://github.com/openzfs/zfs/blob/master/cmd/zfs/zfs_main.c 

749 elif ( 

750 "cannot rollback to" in stderr 

751 and "clones of previous snapshots exist" in stderr 

752 and "use '-R' to force deletion of the following clones and dependents" in stderr 

753 and f"\n{dst_dataset}/%recv\n" in stderr 

754 ): 

755 return clear_resumable_recv_state() 

756 

757 return False 

758 

759 

760def _recv_resume_token(job: Job, dst_dataset: str, retry_count: int) -> tuple[str | None, list[str], list[str]]: 

761 """Gets recv_resume_token ZFS property from dst_dataset and returns corresponding opts to use for send+recv.""" 

762 p, log = job.params, job.params.log 

763 if not p.resume_recv: 

764 return None, [], [] 

765 warning: str | None = None 

766 if not is_zpool_feature_enabled_or_active(p, p.dst, "feature@extensible_dataset"): 

767 warning = "not available on destination dataset" 

768 elif not p.is_program_available(ZFS_VERSION_IS_AT_LEAST_2_1_0, "dst"): 

769 warning = "unreliable as zfs version is too old" # e.g. zfs-0.8.3 "internal error: Unknown error 1040" 

770 if warning: 

771 log.warning(f"ZFS receive resume feature is {warning}. Falling back to --no-resume-recv: %s", dst_dataset) 

772 return None, [], [] 

773 recv_resume_token: str | None = None 

774 send_resume_opts: list[str] = [] 

775 if job.dst_dataset_exists[dst_dataset]: 

776 cmd: list[str] = p.split_args(f"{p.zfs_program} get -Hp -o value -s none receive_resume_token", dst_dataset) 

777 recv_resume_token = run_ssh_command(job, p.dst, LOG_TRACE, cmd=cmd).rstrip() 

778 if recv_resume_token == "-" or not recv_resume_token: # noqa: S105 

779 recv_resume_token = None 

780 else: 

781 send_resume_opts += ["-n"] if p.dry_run else [] 

782 send_resume_opts += ["-v"] if p.verbose_zfs else [] 

783 send_resume_opts += ["-t", recv_resume_token] 

784 recv_resume_opts = ["-s"] 

785 return recv_resume_token, send_resume_opts, recv_resume_opts 

786 

787 

788def _mbuffer_cmd(p: Params, loc: str, size_estimate_bytes: int, recordsize: int) -> str: 

789 """If mbuffer command is on the PATH, uses it in the ssh network pipe between 'zfs send' and 'zfs receive' to smooth out 

790 the rate of data flow and prevent bottlenecks caused by network latency or speed fluctuation.""" 

791 if ( 

792 size_estimate_bytes >= p.min_pipe_transfer_size 

793 and ( 

794 (loc == "src" and (p.src.is_nonlocal or p.dst.is_nonlocal)) 

795 or (loc == "dst" and (p.src.is_nonlocal or p.dst.is_nonlocal)) 

796 or (loc == "local" and p.src.is_nonlocal and p.dst.is_nonlocal) 

797 ) 

798 and p.is_program_available("mbuffer", loc) 

799 ): 

800 recordsize = max(recordsize, 128 * 1024 if is_solaris_zfs_location(p, loc) else 2 * 1024 * 1024) 

801 return shlex.join([p.mbuffer_program, "-s", str(recordsize)] + p.mbuffer_program_opts) 

802 else: 

803 return "cat" 

804 

805 

806def _compress_cmd(p: Params, loc: str, size_estimate_bytes: int) -> str: 

807 """If zstd command is on the PATH, uses it in the ssh network pipe between 'zfs send' and 'zfs receive' to reduce network 

808 bottlenecks by sending compressed data.""" 

809 if ( 

810 size_estimate_bytes >= p.min_pipe_transfer_size 

811 and (p.src.is_nonlocal or p.dst.is_nonlocal) 

812 and p.is_program_available("zstd", loc) 

813 ): 

814 return shlex.join([p.compression_program, "-c"] + p.compression_program_opts) 

815 else: 

816 return "cat" 

817 

818 

819def _decompress_cmd(p: Params, loc: str, size_estimate_bytes: int) -> str: 

820 """Returns decompression command for network pipe if remote supports it.""" 

821 if ( 

822 size_estimate_bytes >= p.min_pipe_transfer_size 

823 and (p.src.is_nonlocal or p.dst.is_nonlocal) 

824 and p.is_program_available("zstd", loc) 

825 ): 

826 return shlex.join([p.compression_program, "-dc"]) 

827 else: 

828 return "cat" 

829 

830 

831WORKER_THREAD_NUMBER_REGEX: re.Pattern = re.compile(r"ThreadPoolExecutor-\d+_(\d+)") 

832 

833 

834def _pv_cmd( 

835 job: Job, loc: str, size_estimate_bytes: int, size_estimate_human: str, disable_progress_bar: bool = False 

836) -> str: 

837 """If pv command is on the PATH, monitors the progress of data transfer from 'zfs send' to 'zfs receive'; Progress can be 

838 viewed via "tail -f $pv_log_file" aka tail -f ~/bzfs-logs/current.pv or similar.""" 

839 p = job.params 

840 if p.is_program_available("pv", loc): 

841 size: str = f"--size={size_estimate_bytes}" 

842 if disable_progress_bar or size_estimate_bytes == 0: 

843 size = "" 

844 pv_log_file: str = p.log_params.pv_log_file 

845 thread_name: str = threading.current_thread().name 

846 if match := WORKER_THREAD_NUMBER_REGEX.fullmatch(thread_name): 

847 worker = int(match.group(1)) 

848 if worker > 0: 

849 pv_log_file += PV_FILE_THREAD_SEPARATOR + f"{worker:04}" 

850 if job.is_first_replication_task.get_and_set(False): 

851 if job.isatty and not p.quiet: 

852 job.progress_reporter.start() 

853 job.replication_start_time_nanos = time.monotonic_ns() 

854 if job.isatty and not p.quiet: 

855 job.progress_reporter.enqueue_pv_log_file(pv_log_file) 

856 pv_program_opts: list[str] = [p.pv_program] + p.pv_program_opts 

857 if job.progress_update_intervals is not None: # for testing 

858 pv_program_opts += [f"--interval={job.progress_update_intervals[0]}"] 

859 pv_program_opts += ["--force", f"--name={size_estimate_human}"] 

860 pv_program_opts += [size] if size else [] 

861 return f"LC_ALL=C {shlex.join(pv_program_opts)} 2>> {shlex.quote(pv_log_file)}" 

862 else: 

863 return "cat" 

864 

865 

866def _squote(remote: Remote, arg: str) -> str: 

867 """Quotes an argument only when running remotely over ssh.""" 

868 return arg if remote.ssh_user_host == "" else shlex.quote(arg) 

869 

870 

871def _dquote(arg: str) -> str: 

872 """Shell-escapes double quotes and dollar and backticks, then surrounds with double quotes.""" 

873 return '"' + arg.replace('"', '\\"').replace("$", "\\$").replace("`", "\\`") + '"' 

874 

875 

876def delete_snapshots(job: Job, remote: Remote, dataset: str, snapshot_tags: list[str]) -> None: 

877 """Deletes snapshots in manageable batches on the specified remote.""" 

878 if len(snapshot_tags) == 0: 

879 return 

880 p, log = job.params, job.params.log 

881 log.info(p.dry(f"Deleting {len(snapshot_tags)} snapshots within %s: %s"), dataset, snapshot_tags) 

882 # delete snapshots in batches without creating a command line that's too big for the OS to handle 

883 run_ssh_cmd_batched( 

884 job, 

885 remote, 

886 _delete_snapshot_cmd(p, remote, dataset + "@"), 

887 snapshot_tags, 

888 lambda batch: _delete_snapshot(job, remote, dataset, dataset + "@" + ",".join(batch)), 

889 max_batch_items=1 if is_solaris_zfs(p, remote) else job.params.max_snapshots_per_minibatch_on_delete_snaps, 

890 sep=",", 

891 ) 

892 

893 

894def _delete_snapshot(job: Job, r: Remote, dataset: str, snapshots_to_delete: str) -> None: 

895 """Runs zfs destroy for a comma-separated snapshot list.""" 

896 p = job.params 

897 cmd: list[str] = _delete_snapshot_cmd(p, r, snapshots_to_delete) 

898 is_dry: bool = p.dry_run and is_solaris_zfs(p, r) # solaris-11.4 knows no 'zfs destroy -n' flag 

899 try: 

900 maybe_inject_error(job, cmd=cmd, error_trigger="zfs_delete_snapshot") 

901 run_ssh_command(job, r, LOG_DEBUG, is_dry=is_dry, print_stdout=True, cmd=cmd) 

902 except (subprocess.CalledProcessError, UnicodeDecodeError) as e: 

903 stderr: str = stderr_to_str(e.stderr) if hasattr(e, "stderr") else "" 

904 no_sleep: bool = _clear_resumable_recv_state_if_necessary(job, dataset, stderr) 

905 # op isn't idempotent so retries regather current state from the start 

906 raise RetryableError("Subprocess failed", no_sleep=no_sleep) from e 

907 

908 

909def _delete_snapshot_cmd(p: Params, r: Remote, snapshots_to_delete: str) -> list[str]: 

910 """Builds zfs destroy command for given snapshots.""" 

911 return p.split_args( 

912 f"{r.sudo} {p.zfs_program} destroy", p.force_hard, p.verbose_destroy, p.dry_run_destroy, snapshots_to_delete 

913 ) 

914 

915 

916def delete_bookmarks(job: Job, remote: Remote, dataset: str, snapshot_tags: list[str]) -> None: 

917 """Removes bookmarks individually since zfs lacks batch deletion.""" 

918 if len(snapshot_tags) == 0: 

919 return 

920 # Unfortunately ZFS has no syntax yet to delete multiple bookmarks in a single CLI invocation 

921 p, log = job.params, job.params.log 

922 log.info( 

923 p.dry(f"Deleting {len(snapshot_tags)} bookmarks within %s: %s"), dataset, dataset + "#" + ",".join(snapshot_tags) 

924 ) 

925 cmd: list[str] = p.split_args(f"{remote.sudo} {p.zfs_program} destroy") 

926 run_ssh_cmd_parallel( 

927 job, 

928 remote, 

929 [(cmd, [f"{dataset}#{snapshot_tag}" for snapshot_tag in snapshot_tags])], 

930 lambda _cmd, batch: try_ssh_command( 

931 job, remote, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=_cmd + batch, exists=False 

932 ), 

933 max_batch_items=1, 

934 ) 

935 

936 

937def delete_datasets(job: Job, remote: Remote, datasets: Iterable[str]) -> None: 

938 """Deletes the given datasets via zfs destroy -r on the given remote.""" 

939 # Impl is batch optimized to minimize CLI + network roundtrips: only need to run zfs destroy if previously 

940 # destroyed dataset (within sorted datasets) is not a prefix (aka ancestor) of current dataset 

941 p, log = job.params, job.params.log 

942 last_deleted_dataset: str = DONT_SKIP_DATASET 

943 for dataset in sorted(datasets): 

944 if is_descendant(dataset, of_root_dataset=last_deleted_dataset): 

945 continue 

946 log.info(p.dry("Deleting dataset tree: %s"), f"{dataset} ...") 

947 cmd: list[str] = p.split_args( 

948 f"{remote.sudo} {p.zfs_program} destroy -r {p.force_unmount} {p.force_hard} {p.verbose_destroy}", 

949 p.dry_run_destroy, 

950 dataset, 

951 ) 

952 is_dry = p.dry_run and is_solaris_zfs(p, remote) # solaris-11.4 knows no 'zfs destroy -n' flag 

953 run_ssh_command(job, remote, LOG_DEBUG, is_dry=is_dry, print_stdout=True, cmd=cmd) 

954 last_deleted_dataset = dataset 

955 

956 

957def _create_zfs_filesystem(job: Job, filesystem: str) -> None: 

958 """Creates destination filesystem hierarchies without mounting them.""" 

959 # zfs create -p -u $filesystem 

960 # To ensure the filesystems that we create do not get mounted, we apply a separate 'zfs create -p -u' 

961 # invocation for each non-existing ancestor. This is because a single 'zfs create -p -u' applies the '-u' 

962 # part only to the immediate filesystem, rather than to the not-yet existing ancestors. 

963 p = job.params 

964 parent: str = "" 

965 no_mount: str = "-u" if p.is_program_available(ZFS_VERSION_IS_AT_LEAST_2_1_0, "dst") else "" 

966 for component in filesystem.split("/"): 

967 parent += component 

968 if not job.dst_dataset_exists[parent]: 

969 cmd: list[str] = p.split_args(f"{p.dst.sudo} {p.zfs_program} create -p", no_mount, parent) 

970 try: 

971 run_ssh_command(job, p.dst, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=cmd) 

972 except subprocess.CalledProcessError as e: 

973 # ignore harmless error caused by 'zfs create' without the -u flag, or by dataset already existing 

974 if ( 974 ↛ 980line 974 didn't jump to line 980 because the condition on line 974 was never true

975 "filesystem successfully created, but it may only be mounted by root" not in e.stderr 

976 and "filesystem successfully created, but not mounted" not in e.stderr # SolarisZFS 

977 and "dataset already exists" not in e.stderr 

978 and "filesystem already exists" not in e.stderr # SolarisZFS? 

979 ): 

980 raise 

981 if not p.dry_run: 

982 job.dst_dataset_exists[parent] = True 

983 parent += "/" 

984 

985 

986def _create_zfs_bookmarks(job: Job, remote: Remote, dataset: str, snapshots: list[str]) -> None: 

987 """Creates bookmarks for the given snapshots, using the 'zfs bookmark' CLI.""" 

988 # Unfortunately ZFS has no syntax yet to create multiple bookmarks in a single CLI invocation 

989 p = job.params 

990 

991 def create_zfs_bookmark(cmd: list[str]) -> None: 

992 snapshot = cmd[-1] 

993 assert "@" in snapshot 

994 bookmark_cmd: list[str] = cmd + [replace_prefix(snapshot, old_prefix=f"{dataset}@", new_prefix=f"{dataset}#")] 

995 try: 

996 run_ssh_command(job, remote, LOG_DEBUG, is_dry=p.dry_run, print_stderr=False, cmd=bookmark_cmd) 

997 except subprocess.CalledProcessError as e: 

998 # ignore harmless zfs error caused by bookmark with the same name already existing 

999 if ": bookmark exists" not in e.stderr: 

1000 print(e.stderr, file=sys.stderr, end="") 

1001 raise 

1002 

1003 if p.create_bookmarks != "none" and are_bookmarks_enabled(p, remote): 

1004 cmd: list[str] = p.split_args(f"{remote.sudo} {p.zfs_program} bookmark") 

1005 run_ssh_cmd_parallel( 

1006 job, remote, [(cmd, snapshots)], lambda _cmd, batch: create_zfs_bookmark(_cmd + batch), max_batch_items=1 

1007 ) 

1008 

1009 

1010def _estimate_send_size(job: Job, remote: Remote, dst_dataset: str, recv_resume_token: str | None, *items: str) -> int: 

1011 """Estimates num bytes to transfer via 'zfs send'.""" 

1012 p = job.params 

1013 if p.no_estimate_send_size or is_solaris_zfs(p, remote): 

1014 return 0 # solaris-11.4 does not have a --parsable equivalent 

1015 zfs_send_program_opts: list[str] = ["--parsable" if opt == "-P" else opt for opt in p.curr_zfs_send_program_opts] 

1016 zfs_send_program_opts = append_if_absent(zfs_send_program_opts, "-v", "-n", "--parsable") 

1017 if recv_resume_token: 

1018 zfs_send_program_opts = ["-Pnv", "-t", recv_resume_token] 

1019 items = () 

1020 cmd: list[str] = p.split_args(f"{remote.sudo} {p.zfs_program} send", zfs_send_program_opts, items) 

1021 try: 

1022 lines: str | None = try_ssh_command(job, remote, LOG_TRACE, cmd=cmd) 

1023 except RetryableError as retryable_error: 

1024 assert retryable_error.__cause__ is not None 

1025 if recv_resume_token: 

1026 e = retryable_error.__cause__ 

1027 stderr: str = stderr_to_str(e.stderr) if hasattr(e, "stderr") else "" 

1028 retryable_error.no_sleep = _clear_resumable_recv_state_if_necessary(job, dst_dataset, stderr) 

1029 # op isn't idempotent so retries regather current state from the start of replicate_dataset() 

1030 raise 

1031 if lines is None: 1031 ↛ 1032line 1031 didn't jump to line 1032 because the condition on line 1031 was never true

1032 return 0 # src dataset or snapshot has been deleted by third party 

1033 size: str = lines.splitlines()[-1] 

1034 assert size.startswith("size") 

1035 return int(size[size.index("\t") + 1 :]) 

1036 

1037 

1038def _zfs_set(job: Job, properties: list[str], remote: Remote, dataset: str) -> None: 

1039 """Applies the given property key=value pairs via 'zfs set' CLI to the given dataset on the given remote.""" 

1040 p = job.params 

1041 if len(properties) == 0: 

1042 return 

1043 # set properties in batches without creating a command line that's too big for the OS to handle 

1044 cmd: list[str] = p.split_args(f"{remote.sudo} {p.zfs_program} set") 

1045 run_ssh_cmd_batched( 

1046 job, 

1047 remote, 

1048 cmd, 

1049 properties, 

1050 lambda batch: run_ssh_command( 

1051 job, remote, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=cmd + batch + [dataset] 

1052 ), 

1053 max_batch_items=1 if is_solaris_zfs(p, remote) else 2**29, # solaris-11.4 CLI doesn't accept multiple props 

1054 ) 

1055 

1056 

1057def _zfs_get( 

1058 job: Job, 

1059 remote: Remote, 

1060 dataset: str, 

1061 sources: str, 

1062 output_columns: str, 

1063 propnames: str, 

1064 splitlines: bool, 

1065 props_cache: dict[tuple[str, str, str], dict[str, str | None]], 

1066) -> dict[str, str | None]: 

1067 """Returns the results of 'zfs get' CLI on the given dataset on the given remote.""" 

1068 if not propnames: 

1069 return {} 

1070 p = job.params 

1071 cache_key = (sources, output_columns, propnames) 

1072 props: dict[str, str | None] | None = props_cache.get(cache_key) 

1073 if props is None: 

1074 cmd: list[str] = p.split_args(f"{p.zfs_program} get -Hp -o {output_columns} -s {sources} {propnames}", dataset) 

1075 lines: str = run_ssh_command(job, remote, LOG_TRACE, cmd=cmd) 

1076 is_name_value_pair: bool = "," in output_columns 

1077 props = {} 

1078 # if not splitlines: omit single trailing newline that was appended by 'zfs get' CLI 

1079 for line in lines.splitlines() if splitlines else [lines[0:-1]]: 

1080 if is_name_value_pair: 

1081 propname, propvalue = line.split("\t", 1) 

1082 props[propname] = propvalue 

1083 else: 

1084 props[line] = None 

1085 props_cache[cache_key] = props 

1086 return props 

1087 

1088 

1089def _incremental_send_steps_wrapper( 

1090 p: Params, src_snapshots: list[str], src_guids: list[str], included_guids: set[str], is_resume: bool 

1091) -> list[tuple[str, str, str, list[str]]]: 

1092 """Returns incremental send steps, optionally converting -I to -i.""" 

1093 force_convert_I_to_i: bool = p.src.use_zfs_delegation and not getenv_bool("no_force_convert_I_to_i", True) # noqa: N806 

1094 # force_convert_I_to_i == True implies that: 

1095 # If using 'zfs allow' delegation mechanism, force convert 'zfs send -I' to a series of 

1096 # 'zfs send -i' as a workaround for zfs issue https://github.com/openzfs/zfs/issues/16394 

1097 return incremental_send_steps(src_snapshots, src_guids, included_guids, is_resume, force_convert_I_to_i) 

1098 

1099 

1100def _add_recv_property_options( 

1101 job: Job, full_send: bool, recv_opts: list[str], dataset: str, cache: dict[tuple[str, str, str], dict[str, str | None]] 

1102) -> tuple[list[str], list[str]]: 

1103 """Reads the ZFS properties of the given src dataset; Appends zfs recv -o and -x values to recv_opts according to CLI 

1104 params, and returns properties to explicitly set on the dst dataset after 'zfs receive' completes successfully.""" 

1105 p = job.params 

1106 set_opts: list[str] = [] 

1107 x_names: list[str] = p.zfs_recv_x_names 

1108 x_names_set: set[str] = set(x_names) 

1109 ox_names: set[str] = p.zfs_recv_ox_names.copy() 

1110 if p.is_program_available(ZFS_VERSION_IS_AT_LEAST_2_2_0, p.dst.location): 

1111 # workaround for https://github.com/openzfs/zfs/commit/b0269cd8ced242e66afc4fa856d62be29bb5a4ff 

1112 # 'zfs recv -x foo' on zfs < 2.2 errors out if the 'foo' property isn't contained in the send stream 

1113 for propname in x_names: 

1114 recv_opts.append("-x") 

1115 recv_opts.append(propname) 

1116 ox_names.update(x_names) # union 

1117 for config in [p.zfs_recv_o_config, p.zfs_recv_x_config, p.zfs_set_config]: 

1118 if len(config.include_regexes) == 0: 

1119 continue # this is the default - it's an instant noop 

1120 if (full_send and "full" in config.targets) or (not full_send and "incremental" in config.targets): 

1121 # 'zfs get' uses newline as record separator and tab as separator between output columns. A ZFS user property 

1122 # may contain newline and tab characters (indeed anything). Together, this means that there is no reliable 

1123 # way to determine where a record ends and the next record starts when listing multiple arbitrary records in 

1124 # a single 'zfs get' call. Therefore, here we use a separate 'zfs get' call for each ZFS user property. 

1125 # TODO: perf: on zfs >= 2.3 use json via zfs get -j to safely merge all zfs gets into one 'zfs get' call 

1126 try: 

1127 props_any: dict = _zfs_get(job, p.src, dataset, config.sources, "property", "all", True, cache) 

1128 props_filtered: dict = filter_properties(p, props_any, config.include_regexes, config.exclude_regexes) 

1129 user_propnames: list[str] = [name for name in props_filtered.keys() if ":" in name] 

1130 sys_propnames: str = ",".join([name for name in props_filtered.keys() if ":" not in name]) 

1131 props: dict = _zfs_get(job, p.src, dataset, config.sources, "property,value", sys_propnames, True, cache) 

1132 for propnames in user_propnames: 

1133 props.update(_zfs_get(job, p.src, dataset, config.sources, "property,value", propnames, False, cache)) 

1134 except (subprocess.CalledProcessError, UnicodeDecodeError) as e: 

1135 raise RetryableError("Subprocess failed") from e 

1136 for propname in sorted(props.keys()): 

1137 if config is p.zfs_recv_o_config: 

1138 if not (propname in ox_names or propname in x_names_set): 

1139 recv_opts.append("-o") 

1140 recv_opts.append(f"{propname}={props[propname]}") 

1141 ox_names.add(propname) 

1142 elif config is p.zfs_recv_x_config: 

1143 if propname not in ox_names: 

1144 recv_opts.append("-x") 

1145 recv_opts.append(propname) 

1146 ox_names.add(propname) 

1147 else: 

1148 set_opts.append(f"{propname}={props[propname]}") 

1149 return recv_opts, set_opts 

1150 

1151 

1152def _check_zfs_dataset_busy(job: Job, remote: Remote, dataset: str, busy_if_send: bool = True) -> bool: 

1153 """Decline to start a state changing ZFS operation that is, although harmless, likely to collide with other currently 

1154 running processes. Instead, retry the operation later, after some delay. For example, decline to start a 'zfs receive' 

1155 into a destination dataset if another process is already running another 'zfs receive' into the same destination dataset, 

1156 as ZFS would reject any such attempt. However, it's actually fine to run an incremental 'zfs receive' into a dataset in 

1157 parallel with a 'zfs send' out of the very same dataset. This also helps daisy chain use cases where A replicates to B, 

1158 and B replicates to C. 

1159 

1160 check_zfs_dataset_busy() offers no guarantees, it merely proactively avoids likely collisions. In other words, even if 

1161 the process check below passes there is no guarantee that the destination dataset won't be busy by the time we actually 

1162 execute the 'zfs send' operation. In such an event ZFS will reject the operation, we'll detect that, and we'll simply 

1163 retry, after some delay. check_zfs_dataset_busy() can be disabled via --ps-program=-. 

1164 

1165 TLDR: As is common for long-running operations in distributed systems, we use coordination-free optimistic concurrency 

1166 control where the parties simply retry on collision detection (rather than coordinate concurrency via a remote lock 

1167 server). 

1168 """ 

1169 p, log = job.params, job.params.log 

1170 if not p.is_program_available("ps", remote.location): 

1171 return True 

1172 cmd: list[str] = p.split_args(f"{p.ps_program} -Ao args") 

1173 procs: list[str] = (try_ssh_command(job, remote, LOG_TRACE, cmd=cmd) or "").splitlines() 

1174 if job.inject_params.get("is_zfs_dataset_busy", False): 

1175 procs += ["sudo -n zfs receive -u -o foo:bar=/baz " + dataset] # for unit testing only 

1176 if not _is_zfs_dataset_busy(procs, dataset, busy_if_send=busy_if_send): 

1177 return True 

1178 op: str = "zfs {receive" + ("|send" if busy_if_send else "") + "} operation" 

1179 try: 

1180 die(f"Cannot continue now: Destination is already busy with {op} from another process: {dataset}") 

1181 except SystemExit as e: 

1182 log.warning("%s", e) 

1183 raise RetryableError("dst currently busy with zfs mutation op") from e 

1184 

1185 

1186ZFS_DATASET_BUSY_PREFIX: str = r"(([^ ]*?/)?(sudo|doas)( +-n)? +)?([^ ]*?/)?zfs (receive|recv" 

1187ZFS_DATASET_BUSY_IF_MODS: re.Pattern[str] = re.compile((ZFS_DATASET_BUSY_PREFIX + ") .*").replace("(", "(?:")) 

1188ZFS_DATASET_BUSY_IF_SEND: re.Pattern[str] = re.compile((ZFS_DATASET_BUSY_PREFIX + "|send) .*").replace("(", "(?:")) 

1189 

1190 

1191def _is_zfs_dataset_busy(procs: list[str], dataset: str, busy_if_send: bool) -> bool: 

1192 """Checks if any process list entry indicates ZFS activity on dataset.""" 

1193 regex: re.Pattern[str] = ZFS_DATASET_BUSY_IF_SEND if busy_if_send else ZFS_DATASET_BUSY_IF_MODS 

1194 suffix: str = " " + dataset 

1195 infix: str = " " + dataset + "@" 

1196 return any((proc.endswith(suffix) or infix in proc) and regex.fullmatch(proc) for proc in procs)