Coverage for bzfs_main / replication.py: 99%

661 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-24 10:16 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""The core replication algorithm is in replicate_dataset(), which performs reliable full and/or incremental 'zfs send' and 

16'zfs receive' operations on snapshots, using resumable ZFS sends when possible. 

17 

18For replication of multiple datasets, including recursive replication, see bzfs.py:replicate_datasets(). 

19""" 

20 

21from __future__ import ( 

22 annotations, 

23) 

24import os 

25import re 

26import shlex 

27import subprocess 

28import sys 

29import threading 

30import time 

31from collections.abc import ( 

32 Iterable, 

33 Iterator, 

34) 

35from concurrent.futures import ( 

36 Executor, 

37 Future, 

38) 

39from subprocess import ( 

40 DEVNULL, 

41 PIPE, 

42) 

43from typing import ( 

44 TYPE_CHECKING, 

45 Final, 

46) 

47 

48from bzfs_main.argparse_actions import ( 

49 has_timerange_filter, 

50) 

51from bzfs_main.detect import ( 

52 ZFS_VERSION_IS_AT_LEAST_2_1_0, 

53 ZFS_VERSION_IS_AT_LEAST_2_2_0, 

54 are_bookmarks_enabled, 

55 is_zpool_feature_enabled_or_active, 

56) 

57from bzfs_main.filter import ( 

58 filter_properties, 

59 filter_snapshots, 

60) 

61from bzfs_main.incremental_send_steps import ( 

62 incremental_send_steps, 

63) 

64from bzfs_main.parallel_batch_cmd import ( 

65 run_ssh_cmd_batched, 

66 run_ssh_cmd_parallel, 

67) 

68from bzfs_main.progress_reporter import ( 

69 PV_FILE_THREAD_SEPARATOR, 

70) 

71from bzfs_main.util.connection import ( 

72 DEDICATED, 

73 SHARED, 

74 ConnectionPool, 

75 dquote, 

76 squote, 

77 timeout, 

78) 

79from bzfs_main.util.parallel_iterator import ( 

80 parallel_iterator, 

81 run_in_parallel, 

82) 

83from bzfs_main.util.retry import ( 

84 Retry, 

85 RetryableError, 

86) 

87from bzfs_main.util.utils import ( 

88 DONT_SKIP_DATASET, 

89 FILE_PERMISSIONS, 

90 LOG_DEBUG, 

91 LOG_TRACE, 

92 Subprocesses, 

93 append_if_absent, 

94 cut, 

95 die, 

96 getenv_bool, 

97 human_readable_bytes, 

98 is_descendant, 

99 list_formatter, 

100 open_nofollow, 

101 replace_prefix, 

102 stderr_to_str, 

103 xprint, 

104) 

105 

106if TYPE_CHECKING: # pragma: no cover - for type hints only 

107 from bzfs_main.bzfs import ( 

108 Job, 

109 ) 

110 from bzfs_main.configuration import ( 

111 Params, 

112 Remote, 

113 ) 

114 

115 

116# constants: 

117INJECT_DST_PIPE_FAIL_KBYTES: Final[int] = 400 # for testing only 

118_RIGHT_JUST: Final[int] = 7 

119 

120 

121def replicate_dataset(job: Job, src_dataset: str, tid: str, retry: Retry) -> bool: 

122 """Replicates src_dataset to dst_dataset (thread-safe); For replication of multiple datasets, including recursive 

123 replication, see bzfs.py:replicate_datasets().""" 

124 p, log = job.params, job.params.log 

125 src, dst = p.src, p.dst 

126 retry_count: int = retry.count 

127 dst_dataset: str = replace_prefix(src_dataset, old_prefix=src.root_dataset, new_prefix=dst.root_dataset) 

128 log.debug(p.dry(f"{tid} Replicating: %s"), f"{src_dataset} --> {dst_dataset} ...") 

129 

130 list_result: bool | tuple[list[str], list[str], list[str], set[str], str, str] = _list_and_filter_src_and_dst_snapshots( 

131 job, src_dataset, dst_dataset 

132 ) 

133 if isinstance(list_result, bool): 

134 return list_result 

135 ( 

136 basis_src_snapshots_with_guids, 

137 _src_snapshots_with_guids, 

138 dst_snapshots_with_guids, 

139 included_src_guids, 

140 latest_src_snapshot, 

141 oldest_src_snapshot, 

142 ) = list_result 

143 log.debug("latest_src_snapshot: %s", latest_src_snapshot) 

144 latest_dst_snapshot: str = "" 

145 latest_common_src_snapshot: str = "" 

146 done_checking: bool = False 

147 

148 if job.dst_dataset_exists[dst_dataset]: 

149 rollback_result: bool | tuple[str, str, bool] = _rollback_dst_dataset_if_necessary( 

150 job, 

151 dst_dataset, 

152 latest_src_snapshot, 

153 basis_src_snapshots_with_guids, 

154 dst_snapshots_with_guids, 

155 done_checking, 

156 tid, 

157 ) 

158 if isinstance(rollback_result, bool): 

159 return rollback_result 

160 latest_dst_snapshot, latest_common_src_snapshot, done_checking = rollback_result 

161 

162 log.debug("latest_common_src_snapshot: %s", latest_common_src_snapshot) # is a snapshot or bookmark 

163 log.log(LOG_TRACE, "latest_dst_snapshot: %s", latest_dst_snapshot) 

164 props_cache: dict[tuple[str, str, str], dict[str, str | None]] = {} # fresh empty ZFS props cache for each dataset 

165 dry_run_no_send: bool = False 

166 if not latest_common_src_snapshot: 

167 # no common snapshot exists; delete all dst snapshots and perform a full send of the oldest selected src snapshot 

168 full_result: tuple[str, bool, bool, int] = _replicate_dataset_fully( 

169 job, 

170 src_dataset, 

171 dst_dataset, 

172 oldest_src_snapshot, 

173 latest_src_snapshot, 

174 latest_dst_snapshot, 

175 dst_snapshots_with_guids, 

176 props_cache, 

177 dry_run_no_send, 

178 done_checking, 

179 retry_count, 

180 tid, 

181 ) 

182 # we have now created a common snapshot 

183 latest_common_src_snapshot, dry_run_no_send, done_checking, retry_count = full_result 

184 if latest_common_src_snapshot: 

185 # finally, incrementally replicate all selected snapshots from latest common snapshot until latest src snapshot 

186 _replicate_dataset_incrementally( 

187 job, 

188 src_dataset, 

189 dst_dataset, 

190 latest_common_src_snapshot, 

191 latest_src_snapshot, 

192 basis_src_snapshots_with_guids, 

193 included_src_guids, 

194 props_cache, 

195 dry_run_no_send, 

196 done_checking, 

197 retry_count, 

198 tid, 

199 ) 

200 return True 

201 

202 

203def _list_and_filter_src_and_dst_snapshots( 

204 job: Job, src_dataset: str, dst_dataset: str 

205) -> bool | tuple[list[str], list[str], list[str], set[str], str, str]: 

206 """On replication, list and filter src and dst snapshots.""" 

207 p, log = job.params, job.params.log 

208 src, dst = p.src, p.dst 

209 

210 # list GUID and name for dst snapshots, sorted ascending by createtxg (more precise than creation time) 

211 dst_cmd: list[str] = p.split_args(f"{p.zfs_program} list -t snapshot -d 1 -s createtxg -Hp -o guid,name", dst_dataset) 

212 

213 # list GUID and name for src snapshots + bookmarks, primarily sort ascending by transaction group (which is more 

214 # precise than creation time), secondarily sort such that snapshots appear after bookmarks for the same GUID. 

215 # Note: A snapshot and its ZFS bookmarks always have the same GUID, creation time and transaction group. A snapshot 

216 # changes its transaction group but retains its creation time and GUID on 'zfs receive' on another pool, i.e. 

217 # comparing createtxg is only meaningful within a single pool, not across pools from src to dst. Comparing creation 

218 # time remains meaningful across pools from src to dst. Creation time is a UTC Unix time in integer seconds. 

219 # Note that 'zfs create', 'zfs snapshot' and 'zfs bookmark' CLIs enforce that snapshot names must not contain a '#' 

220 # char, bookmark names must not contain a '@' char, and dataset names must not contain a '#' or '@' char. 

221 # GUID and creation time also do not contain a '#' or '@' char. 

222 filter_needs_creation_time: bool = has_timerange_filter(p.snapshot_filters) 

223 types: str = "snapshot,bookmark" if p.use_bookmark and are_bookmarks_enabled(p, src) else "snapshot" 

224 props: str = job.creation_prefix + "creation,guid,name" if filter_needs_creation_time else "guid,name" 

225 src_cmd = p.split_args(f"{p.zfs_program} list -t {types} -s createtxg -s type -d 1 -Hp -o {props}", src_dataset) 

226 job.maybe_inject_delete(src, dataset=src_dataset, delete_trigger="zfs_list_snapshot_src") 

227 src_snapshots_and_bookmarks, dst_snapshots_with_guids_str = run_in_parallel( # list src+dst snapshots in parallel 

228 lambda: job.try_ssh_command(src, LOG_TRACE, cmd=src_cmd), 

229 lambda: job.try_ssh_command(dst, LOG_TRACE, cmd=dst_cmd, error_trigger="zfs_list_snapshot_dst"), 

230 ) 

231 job.dst_dataset_exists[dst_dataset] = dst_snapshots_with_guids_str is not None 

232 dst_snapshots_with_guids: list[str] = (dst_snapshots_with_guids_str or "").splitlines() 

233 if src_snapshots_and_bookmarks is None: 

234 log.warning("Third party deleted source: %s", src_dataset) 

235 return False # src dataset has been deleted by some third party while we're running - nothing to do anymore 

236 src_snapshots_with_guids: list[str] = src_snapshots_and_bookmarks.splitlines() 

237 src_snapshots_and_bookmarks = None 

238 if len(dst_snapshots_with_guids) == 0 and "bookmark" in types: 

239 # src bookmarks serve no purpose if the destination dataset has no snapshot; ignore them 

240 src_snapshots_with_guids = [snapshot for snapshot in src_snapshots_with_guids if "@" in snapshot] 

241 num_src_snapshots_found: int = sum(1 for snapshot in src_snapshots_with_guids if "@" in snapshot) 

242 with job.stats_lock: 

243 job.num_snapshots_found += num_src_snapshots_found 

244 # apply include/exclude regexes to ignore irrelevant src snapshots 

245 basis_src_snapshots_with_guids: list[str] = src_snapshots_with_guids 

246 src_snapshots_with_guids = filter_snapshots(job, src_snapshots_with_guids) 

247 if filter_needs_creation_time: 

248 src_snapshots_with_guids = cut(field=2, lines=src_snapshots_with_guids) 

249 basis_src_snapshots_with_guids = cut(field=2, lines=basis_src_snapshots_with_guids) 

250 

251 # find oldest and latest "true" snapshot, as well as GUIDs of all snapshots and bookmarks. 

252 # a snapshot is "true" if it is not a bookmark. 

253 oldest_src_snapshot: str = "" 

254 latest_src_snapshot: str = "" 

255 included_src_guids: set[str] = set() 

256 for line in src_snapshots_with_guids: 

257 guid, snapshot = line.split("\t", 1) 

258 if "@" in snapshot: 

259 included_src_guids.add(guid) 

260 latest_src_snapshot = snapshot 

261 if not oldest_src_snapshot: 

262 oldest_src_snapshot = snapshot 

263 if len(src_snapshots_with_guids) == 0: 

264 if p.skip_missing_snapshots == "fail": 

265 die(f"Source dataset includes no snapshot: {src_dataset}. Consider using --skip-missing-snapshots=dataset") 

266 elif p.skip_missing_snapshots == "dataset": 

267 log.warning("Skipping source dataset because it includes no snapshot: %s", src_dataset) 

268 if p.recursive and not job.dst_dataset_exists[dst_dataset]: 

269 log.warning("Also skipping descendant datasets as dst dataset does not exist for %s", src_dataset) 

270 return job.dst_dataset_exists[dst_dataset] 

271 return ( 

272 basis_src_snapshots_with_guids, 

273 src_snapshots_with_guids, 

274 dst_snapshots_with_guids, 

275 included_src_guids, 

276 latest_src_snapshot, 

277 oldest_src_snapshot, 

278 ) 

279 

280 

281def _rollback_dst_dataset_if_necessary( 

282 job: Job, 

283 dst_dataset: str, 

284 latest_src_snapshot: str, 

285 src_snapshots_with_guids: list[str], 

286 dst_snapshots_with_guids: list[str], 

287 done_checking: bool, 

288 tid: str, 

289) -> bool | tuple[str, str, bool]: 

290 """On replication, rollback dst if necessary; error out if not permitted.""" 

291 p, log = job.params, job.params.log 

292 dst = p.dst 

293 latest_dst_snapshot: str = "" 

294 latest_dst_guid: str = "" 

295 if len(dst_snapshots_with_guids) > 0: 

296 latest_dst_guid, latest_dst_snapshot = dst_snapshots_with_guids[-1].split("\t", 1) 

297 if p.force_rollback_to_latest_snapshot: 

298 log.info(p.dry(f"{tid} Rolling back destination to most recent snapshot: %s"), latest_dst_snapshot) 

299 # rollback just in case the dst dataset was modified since its most recent snapshot 

300 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset) 

301 cmd: list[str] = p.split_args(f"{dst.sudo} {p.zfs_program} rollback", latest_dst_snapshot) 

302 job.try_ssh_command(dst, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=cmd, exists=False) 

303 elif latest_src_snapshot == "": 

304 log.info(f"{tid} Already-up-to-date: %s", dst_dataset) 

305 return True 

306 

307 # find most recent snapshot (or bookmark) that src and dst have in common - we'll start to replicate 

308 # from there up to the most recent src snapshot. any two snapshots are "common" iff their ZFS GUIDs (i.e. 

309 # contents) are equal. See https://github.com/openzfs/zfs/commit/305bc4b370b20de81eaf10a1cf724374258b74d1 

310 def latest_common_snapshot(snapshots_with_guids: list[str], intersect_guids: set[str]) -> tuple[str | None, str]: 

311 """Returns a true snapshot instead of its bookmark with the same GUID, per the sort order previously used for 'zfs 

312 list -s ...'.""" 

313 for _line in reversed(snapshots_with_guids): 

314 guid_, snapshot_ = _line.split("\t", 1) 

315 if guid_ in intersect_guids: 

316 return guid_, snapshot_ # can be a snapshot or bookmark 

317 return None, "" 

318 

319 latest_common_guid, latest_common_src_snapshot = latest_common_snapshot( 

320 src_snapshots_with_guids, set(cut(field=1, lines=dst_snapshots_with_guids)) 

321 ) 

322 log.debug("latest_common_src_snapshot: %s", latest_common_src_snapshot) # is a snapshot or bookmark 

323 log.log(LOG_TRACE, "latest_dst_snapshot: %s", latest_dst_snapshot) 

324 

325 if latest_common_src_snapshot and latest_common_guid != latest_dst_guid: 

326 # found latest common snapshot but dst has an even newer snapshot. rollback dst to that common snapshot. 

327 assert latest_common_guid 

328 _, latest_common_dst_snapshot = latest_common_snapshot(dst_snapshots_with_guids, {latest_common_guid}) 

329 if not (p.force_rollback_to_latest_common_snapshot or p.force): 

330 die( 

331 f"Conflict: Most recent destination snapshot {latest_dst_snapshot} is more recent than " 

332 f"most recent common snapshot {latest_common_dst_snapshot}. Rollback destination first, " 

333 "for example via --force-rollback-to-latest-common-snapshot (or --force) option." 

334 ) 

335 if p.force_once: 

336 p.force.value = False 

337 p.force_rollback_to_latest_common_snapshot.value = False 

338 log.info(p.dry(f"{tid} Rolling back destination to most recent common snapshot: %s"), latest_common_dst_snapshot) 

339 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset) 

340 cmd = p.split_args( 

341 f"{dst.sudo} {p.zfs_program} rollback -r {p.force_unmount} {p.force_hard}", latest_common_dst_snapshot 

342 ) 

343 try: 

344 job.run_ssh_command(dst, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=cmd) 

345 except (subprocess.CalledProcessError, UnicodeDecodeError) as e: 

346 stderr: str = stderr_to_str(e.stderr) if hasattr(e, "stderr") else "" 

347 retry_immediately_once: bool = _clear_resumable_recv_state_if_necessary(job, dst_dataset, stderr) 

348 # op isn't idempotent so retries regather current state from the start of replicate_dataset() 

349 raise RetryableError(display_msg="zfs rollback", retry_immediately_once=retry_immediately_once) from e 

350 

351 if latest_src_snapshot and latest_src_snapshot == latest_common_src_snapshot: 

352 log.info(f"{tid} Already up-to-date: %s", dst_dataset) 

353 return True 

354 return latest_dst_snapshot, latest_common_src_snapshot, done_checking 

355 

356 

357def _replicate_dataset_fully( 

358 job: Job, 

359 src_dataset: str, 

360 dst_dataset: str, 

361 oldest_src_snapshot: str, 

362 latest_src_snapshot: str, 

363 latest_dst_snapshot: str, 

364 dst_snapshots_with_guids: list[str], 

365 props_cache: dict[tuple[str, str, str], dict[str, str | None]], 

366 dry_run_no_send: bool, 

367 done_checking: bool, 

368 retry_count: int, 

369 tid: str, 

370) -> tuple[str, bool, bool, int]: 

371 """On replication, deletes all dst snapshots and performs a full send of the oldest selected src snapshot, which in turn 

372 creates a common snapshot; error out if not permitted.""" 

373 p, log = job.params, job.params.log 

374 src, dst = p.src, p.dst 

375 latest_common_src_snapshot: str = "" 

376 if latest_dst_snapshot: 

377 if not p.force: 

378 die( 

379 f"Conflict: No common snapshot found between {src_dataset} and {dst_dataset} even though " 

380 "destination has at least one snapshot. Aborting. Consider using --force option to first " 

381 "delete all existing destination snapshots in order to be able to proceed with replication." 

382 ) 

383 if p.force_once: 383 ↛ 385line 383 didn't jump to line 385 because the condition on line 383 was always true

384 p.force.value = False 

385 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset) 

386 # extract SNAPSHOT_TAG from GUID<TAB>DATASET@SNAPSHOT_TAG 

387 delete_snapshots(job, dst, dst_dataset, snapshot_tags=cut(2, separator="@", lines=dst_snapshots_with_guids)) 

388 if p.dry_run: 

389 # As we're in --dryrun (--force) mode this conflict resolution step (see above) wasn't really executed: 

390 # "no common snapshot was found. delete all dst snapshots". In turn, this would cause the subsequent 

391 # 'zfs receive -n' to fail with "cannot receive new filesystem stream: destination has snapshots; must 

392 # destroy them to overwrite it". So we skip the zfs send/receive step and keep on trucking. 

393 dry_run_no_send = True 

394 

395 # to start with, fully replicate oldest snapshot, which in turn creates a common snapshot 

396 if p.no_stream: 

397 oldest_src_snapshot = latest_src_snapshot 

398 if oldest_src_snapshot: 

399 if not job.dst_dataset_exists[dst_dataset]: 

400 # on destination, create parent filesystem and ancestors if they do not yet exist 

401 dst_dataset_parent: str = os.path.dirname(dst_dataset) 

402 if not job.dst_dataset_exists[dst_dataset_parent]: 

403 if p.dry_run: 

404 dry_run_no_send = True 

405 if dst_dataset_parent: 405 ↛ 408line 405 didn't jump to line 408 because the condition on line 405 was always true

406 _create_zfs_filesystem(job, dst_dataset_parent) 

407 

408 recv_resume_token_result: tuple[str | None, list[str], list[str]] = _recv_resume_token(job, dst_dataset, retry_count) 

409 recv_resume_token, send_resume_opts, recv_resume_opts = recv_resume_token_result 

410 curr_size: int = _estimate_send_size(job, src, dst_dataset, recv_resume_token, oldest_src_snapshot) 

411 humansize: str = _format_size(curr_size) 

412 if recv_resume_token: 

413 send_opts: list[str] = p.curr_zfs_send_program_opts + send_resume_opts # e.g. curr + ["-t", "1-c740b4779-..."] 

414 else: 

415 send_opts = p.curr_zfs_send_program_opts + [oldest_src_snapshot] 

416 send_cmd: list[str] = p.split_args(f"{src.sudo} {p.zfs_program} send", send_opts) 

417 recv_opts: list[str] = p.zfs_full_recv_opts.copy() + recv_resume_opts 

418 recv_opts, set_opts = _add_recv_property_options(job, True, recv_opts, src_dataset, props_cache) 

419 recv_cmd: list[str] = p.split_args( 

420 f"{dst.sudo} {p.zfs_program} receive -F", p.dry_run_recv, recv_opts, dst_dataset, allow_all=True 

421 ) 

422 log.info(p.dry(f"{tid} Full send: %s"), f"{oldest_src_snapshot} --> {dst_dataset} ({humansize.strip()}) ...") 

423 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset) 

424 dry_run_no_send = dry_run_no_send or p.dry_run_no_send 

425 job.maybe_inject_params(error_trigger="full_zfs_send_params") 

426 humansize = humansize.rjust(_RIGHT_JUST * 3 + 2) 

427 _run_zfs_send_receive( # do the real work 

428 job, src_dataset, dst_dataset, send_cmd, recv_cmd, curr_size, humansize, dry_run_no_send, "full_zfs_send" 

429 ) 

430 latest_common_src_snapshot = oldest_src_snapshot # we have now created a common snapshot 

431 if not dry_run_no_send and not p.dry_run: 

432 job.dst_dataset_exists[dst_dataset] = True 

433 with job.stats_lock: 

434 job.num_snapshots_replicated += 1 

435 _create_zfs_bookmarks(job, src, src_dataset, [oldest_src_snapshot]) 

436 _zfs_set(job, set_opts, dst, dst_dataset) 

437 dry_run_no_send = dry_run_no_send or p.dry_run 

438 retry_count = 0 

439 

440 return latest_common_src_snapshot, dry_run_no_send, done_checking, retry_count 

441 

442 

443def _replicate_dataset_incrementally( 

444 job: Job, 

445 src_dataset: str, 

446 dst_dataset: str, 

447 latest_common_src_snapshot: str, 

448 latest_src_snapshot: str, 

449 basis_src_snapshots_with_guids: list[str], 

450 included_src_guids: set[str], 

451 props_cache: dict[tuple[str, str, str], dict[str, str | None]], 

452 dry_run_no_send: bool, 

453 done_checking: bool, 

454 retry_count: int, 

455 tid: str, 

456) -> None: 

457 """Incrementally replicates all selected snapshots from latest common snapshot until latest src snapshot.""" 

458 p, log = job.params, job.params.log 

459 src, dst = p.src, p.dst 

460 

461 def replication_candidates() -> tuple[list[str], list[str]]: 

462 assert len(basis_src_snapshots_with_guids) > 0 

463 result_snapshots: list[str] = [] 

464 result_guids: list[str] = [] 

465 last_appended_guid: str = "" 

466 snapshot_itr: Iterator[str] = reversed(basis_src_snapshots_with_guids) 

467 while True: 

468 guid, snapshot = next(snapshot_itr).split("\t", 1) 

469 if "@" in snapshot: 

470 result_snapshots.append(snapshot) 

471 result_guids.append(guid) 

472 last_appended_guid = guid 

473 if snapshot == latest_common_src_snapshot: # latest_common_src_snapshot is a snapshot or bookmark 

474 if guid != last_appended_guid and "@" not in snapshot: 

475 # only appends the src bookmark if it has no snapshot. If the bookmark has a snapshot then that 

476 # snapshot has already been appended, per the sort order previously used for 'zfs list -s ...' 

477 result_snapshots.append(snapshot) 

478 result_guids.append(guid) 

479 break 

480 result_snapshots.reverse() 

481 result_guids.reverse() 

482 assert len(result_snapshots) > 0 

483 assert len(result_snapshots) == len(result_guids) 

484 return result_guids, result_snapshots 

485 

486 # collect the most recent common snapshot (which may be a bookmark) followed by all src snapshots 

487 # (that are not a bookmark) that are more recent than that. 

488 cand_guids, cand_snapshots = replication_candidates() 

489 if len(cand_snapshots) == 1: 

490 # latest_src_snapshot is a (true) snapshot that is equal to latest_common_src_snapshot or LESS recent 

491 # than latest_common_src_snapshot. The latter case can happen if latest_common_src_snapshot is a 

492 # bookmark whose snapshot has been deleted on src. 

493 return # nothing more tbd 

494 

495 recv_resume_token_result: tuple[str | None, list[str], list[str]] = _recv_resume_token(job, dst_dataset, retry_count) 

496 recv_resume_token, send_resume_opts, recv_resume_opts = recv_resume_token_result 

497 recv_opts: list[str] = p.zfs_recv_program_opts.copy() + recv_resume_opts 

498 recv_opts, set_opts = _add_recv_property_options(job, False, recv_opts, src_dataset, props_cache) 

499 if p.no_stream: 

500 # skip intermediate snapshots 

501 steps_todo: list[tuple[str, str, str, list[str]]] = [ 

502 ("-i", latest_common_src_snapshot, latest_src_snapshot, [latest_src_snapshot]) 

503 ] 

504 else: 

505 # include intermediate src snapshots that pass --{include,exclude}-snapshot-* policy, using 

506 # a series of -i/-I send/receive steps that skip excluded src snapshots. 

507 steps_todo = _incremental_send_steps_wrapper( 

508 p, cand_snapshots, cand_guids, included_src_guids, recv_resume_token is not None 

509 ) 

510 log.log(LOG_TRACE, "steps_todo: %s", list_formatter(steps_todo, "; ")) 

511 estimate_send_sizes: list[int] = _estimate_send_sizes_in_parallel(job, src, dst_dataset, recv_resume_token, steps_todo) 

512 total_size: int = sum(estimate_send_sizes) 

513 total_num: int = sum(len(to_snapshots) for incr_flag, from_snap, to_snap, to_snapshots in steps_todo) 

514 done_size: int = 0 

515 done_num: int = 0 

516 for i, (incr_flag, from_snap, to_snap, to_snapshots) in enumerate(steps_todo): 

517 curr_num_snapshots: int = len(to_snapshots) 

518 curr_size: int = estimate_send_sizes[i] 

519 humansize: str = _format_size(total_size) + "/" + _format_size(done_size) + "/" + _format_size(curr_size) 

520 human_num: str = f"{total_num}/{done_num}/{curr_num_snapshots} snapshots" 

521 if recv_resume_token: 

522 send_opts: list[str] = p.curr_zfs_send_program_opts + send_resume_opts # e.g. curr + ["-t", "1-c740b4779-..."] 

523 else: 

524 send_opts = p.curr_zfs_send_program_opts + [incr_flag, from_snap, to_snap] 

525 send_cmd: list[str] = p.split_args(f"{src.sudo} {p.zfs_program} send", send_opts) 

526 recv_cmd: list[str] = p.split_args( 

527 f"{dst.sudo} {p.zfs_program} receive", p.dry_run_recv, recv_opts, dst_dataset, allow_all=True 

528 ) 

529 dense_size: str = p.two_or_more_spaces_regex.sub("", humansize.strip()) 

530 log.info( 

531 p.dry(f"{tid} Incremental send {incr_flag}: %s"), 

532 f"{from_snap} .. {to_snap[to_snap.index('@'):]} --> {dst_dataset} ({dense_size}) ({human_num}) ...", 

533 ) 

534 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset, busy_if_send=False) 

535 if p.dry_run and not job.dst_dataset_exists[dst_dataset]: 

536 dry_run_no_send = True 

537 dry_run_no_send = dry_run_no_send or p.dry_run_no_send 

538 job.maybe_inject_params(error_trigger="incr_zfs_send_params") 

539 _run_zfs_send_receive( # do the real work 

540 job, src_dataset, dst_dataset, send_cmd, recv_cmd, curr_size, humansize, dry_run_no_send, "incr_zfs_send" 

541 ) 

542 done_size += curr_size 

543 done_num += curr_num_snapshots 

544 recv_resume_token = None 

545 with job.stats_lock: 

546 job.num_snapshots_replicated += curr_num_snapshots 

547 assert p.create_bookmarks 

548 if p.create_bookmarks == "all": 

549 _create_zfs_bookmarks(job, src, src_dataset, to_snapshots) 

550 elif p.create_bookmarks != "none": 

551 threshold_millis: int = p.xperiods.label_milliseconds("_" + p.create_bookmarks) 

552 to_snapshots = [snap for snap in to_snapshots if p.xperiods.label_milliseconds(snap) >= threshold_millis] 

553 if i == len(steps_todo) - 1 and (len(to_snapshots) == 0 or to_snapshots[-1] != to_snap): 

554 to_snapshots.append(to_snap) # ensure latest common snapshot is bookmarked 

555 _create_zfs_bookmarks(job, src, src_dataset, to_snapshots) 

556 _zfs_set(job, set_opts, dst, dst_dataset) 

557 

558 

559def _format_size(num_bytes: int) -> str: 

560 """Formats a byte count for human-readable logs.""" 

561 return human_readable_bytes(num_bytes, separator="").rjust(_RIGHT_JUST) 

562 

563 

564def _prepare_zfs_send_receive( 

565 job: Job, src_dataset: str, send_cmd: list[str], recv_cmd: list[str], size_estimate_bytes: int, size_estimate_human: str 

566) -> tuple[str, str, str]: 

567 """Constructs zfs send/recv pipelines with optional compression, mbuffer and pv.""" 

568 p = job.params 

569 send_cmd_str: str = shlex.join(send_cmd) 

570 recv_cmd_str: str = shlex.join(recv_cmd) 

571 

572 if ( 

573 p.is_program_available("zstd", "src") 

574 and p.is_program_available("zstd", "dst") 

575 and p.is_program_available("sh", "src") 

576 and p.is_program_available("sh", "dst") 

577 ): 

578 compress_cmd_: str = _compress_cmd(p, "src", size_estimate_bytes) 

579 decompress_cmd_: str = _decompress_cmd(p, "dst", size_estimate_bytes) 

580 else: # no compression is used if source and destination do not both support compression 

581 compress_cmd_, decompress_cmd_ = "cat", "cat" 

582 

583 recordsize: int = abs(job.src_properties[src_dataset].recordsize) 

584 src_buffer: str = _mbuffer_cmd(p, "src", size_estimate_bytes, recordsize) 

585 dst_buffer: str = _mbuffer_cmd(p, "dst", size_estimate_bytes, recordsize) 

586 local_buffer: str = _mbuffer_cmd(p, "local", size_estimate_bytes, recordsize) 

587 

588 pv_src_cmd: str = "" 

589 pv_dst_cmd: str = "" 

590 pv_loc_cmd: str = "" 

591 if not p.src.ssh_user_host: 

592 pv_src_cmd = _pv_cmd(job, "local", size_estimate_bytes, size_estimate_human) 

593 elif not p.dst.ssh_user_host: 

594 pv_dst_cmd = _pv_cmd(job, "local", size_estimate_bytes, size_estimate_human) 

595 elif compress_cmd_ == "cat": 

596 pv_loc_cmd = _pv_cmd(job, "local", size_estimate_bytes, size_estimate_human) # compression disabled 

597 else: 

598 # pull-push mode with compression enabled: reporting "percent complete" isn't straightforward because 

599 # localhost observes the compressed data instead of the uncompressed data, so we disable the progress bar. 

600 pv_loc_cmd = _pv_cmd(job, "local", size_estimate_bytes, size_estimate_human, disable_progress_bar=True) 

601 

602 # assemble pipeline running on source leg 

603 src_pipe: str = "" 

604 if job.inject_params.get("inject_src_pipe_fail", False): 

605 # for testing; initially forward some bytes and then fail 

606 src_pipe = f"{src_pipe} | dd bs=64 count=1 2>/dev/null && false" 

607 if job.inject_params.get("inject_src_pipe_garble", False): 

608 src_pipe = f"{src_pipe} | base64" # for testing; forward garbled bytes 

609 if pv_src_cmd and pv_src_cmd != "cat": 

610 src_pipe = f"{src_pipe} | {pv_src_cmd}" 

611 if compress_cmd_ != "cat": 

612 src_pipe = f"{src_pipe} | {compress_cmd_}" 

613 if src_buffer != "cat": 

614 src_pipe = f"{src_pipe} | {src_buffer}" 

615 if src_pipe.startswith(" |"): 

616 src_pipe = src_pipe[2:] # strip leading ' |' part 

617 if job.inject_params.get("inject_src_send_error", False): 

618 send_cmd_str = f"{send_cmd_str} --injectedGarbageParameter" # for testing; induce CLI parse error 

619 if src_pipe: 

620 src_pipe = f"{send_cmd_str} | {src_pipe}" 

621 if p.src.ssh_user_host: 

622 src_pipe = p.shell_program + " -c " + dquote(src_pipe) 

623 else: 

624 src_pipe = send_cmd_str 

625 

626 # assemble pipeline running on middle leg between source and destination. only enabled for pull-push mode 

627 local_pipe: str = "" 

628 if local_buffer != "cat": 

629 local_pipe = f"{local_buffer}" 

630 if pv_loc_cmd and pv_loc_cmd != "cat": 

631 local_pipe = f"{local_pipe} | {pv_loc_cmd}" 

632 if local_buffer != "cat": 

633 local_pipe = f"{local_pipe} | {local_buffer}" 

634 if local_pipe.startswith(" |"): 

635 local_pipe = local_pipe[2:] # strip leading ' |' part 

636 if local_pipe: 

637 local_pipe = f"| {local_pipe}" 

638 

639 # assemble pipeline running on destination leg 

640 dst_pipe: str = "" 

641 if dst_buffer != "cat": 

642 dst_pipe = f"{dst_buffer}" 

643 if decompress_cmd_ != "cat": 

644 dst_pipe = f"{dst_pipe} | {decompress_cmd_}" 

645 if pv_dst_cmd and pv_dst_cmd != "cat": 

646 dst_pipe = f"{dst_pipe} | {pv_dst_cmd}" 

647 if job.inject_params.get("inject_dst_pipe_fail", False): 

648 # interrupt zfs receive for testing retry/resume; initially forward some bytes and then stop forwarding 

649 dst_pipe = f"{dst_pipe} | dd bs=1024 count={INJECT_DST_PIPE_FAIL_KBYTES} 2>/dev/null" 

650 if job.inject_params.get("inject_dst_pipe_garble", False): 

651 dst_pipe = f"{dst_pipe} | base64" # for testing; forward garbled bytes 

652 if dst_pipe.startswith(" |"): 

653 dst_pipe = dst_pipe[2:] # strip leading ' |' part 

654 if job.inject_params.get("inject_dst_receive_error", False): 

655 recv_cmd_str = f"{recv_cmd_str} --injectedGarbageParameter" # for testing; induce CLI parse error 

656 if dst_pipe: 

657 dst_pipe = f"{dst_pipe} | {recv_cmd_str}" 

658 if p.dst.ssh_user_host: 

659 dst_pipe = p.shell_program + " -c " + dquote(dst_pipe) 

660 else: 

661 dst_pipe = recv_cmd_str 

662 

663 # If there's no support for shell pipelines, we can't do compression, mbuffering, monitoring and rate-limiting, 

664 # so we fall back to simple zfs send/receive. 

665 if not p.is_program_available("sh", "src"): 

666 src_pipe = send_cmd_str 

667 if not p.is_program_available("sh", "dst"): 

668 dst_pipe = recv_cmd_str 

669 

670 src_pipe = squote(p.src, src_pipe) 

671 dst_pipe = squote(p.dst, dst_pipe) 

672 return src_pipe, local_pipe, dst_pipe 

673 

674 

675def _run_zfs_send_receive( 

676 job: Job, 

677 src_dataset: str, 

678 dst_dataset: str, 

679 send_cmd: list[str], 

680 recv_cmd: list[str], 

681 size_estimate_bytes: int, 

682 size_estimate_human: str, 

683 dry_run_no_send: bool, 

684 error_trigger: str | None = None, 

685) -> None: 

686 """Executes a zfs send/receive pipeline between source and destination.""" 

687 p, log = job.params, job.params.log 

688 pipes: tuple[str, str, str] = _prepare_zfs_send_receive( 

689 job, src_dataset, send_cmd, recv_cmd, size_estimate_bytes, size_estimate_human 

690 ) 

691 src_pipe, local_pipe, dst_pipe = pipes 

692 conn_pool_name: str = DEDICATED if p.dedicated_tcp_connection_per_zfs_send else SHARED 

693 src_conn_pool: ConnectionPool = p.connection_pools[p.src.location].pool(conn_pool_name) 

694 dst_conn_pool: ConnectionPool = p.connection_pools[p.dst.location].pool(conn_pool_name) 

695 with src_conn_pool.connection() as src_conn, dst_conn_pool.connection() as dst_conn: 

696 src_conn.refresh_ssh_connection_if_necessary(job) 

697 dst_conn.refresh_ssh_connection_if_necessary(job) 

698 src_ssh_cmd: str = " ".join(src_conn.ssh_cmd_quoted) 

699 dst_ssh_cmd: str = " ".join(dst_conn.ssh_cmd_quoted) 

700 cmd: list[str] = [p.shell_program_local, "-c", f"{src_ssh_cmd} {src_pipe} {local_pipe} | {dst_ssh_cmd} {dst_pipe}"] 

701 msg: str = "Would execute: %s" if dry_run_no_send else "Executing: %s" 

702 log.debug(msg, cmd[2].lstrip()) 

703 if not dry_run_no_send: 

704 try: 

705 job.maybe_inject_error(cmd=cmd, error_trigger=error_trigger) 

706 sp: Subprocesses = job.subprocesses 

707 process = sp.subprocess_run( 

708 cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, text=True, timeout=timeout(job), check=True, log=log 

709 ) 

710 except (subprocess.CalledProcessError, UnicodeDecodeError) as e: 

711 retry_immediately_once: bool = False 

712 if not isinstance(e, UnicodeDecodeError): 

713 xprint(log, stderr_to_str(e.stdout), file=sys.stdout) 

714 log.warning("%s", stderr_to_str(e.stderr).rstrip()) 

715 if isinstance(e, subprocess.CalledProcessError): 

716 retry_immediately_once = _clear_resumable_recv_state_if_necessary( 

717 job, dst_dataset, stderr_to_str(e.stderr) 

718 ) 

719 # op isn't idempotent so retries regather current state from the start of replicate_dataset() 

720 raise RetryableError(display_msg="zfs send/receive", retry_immediately_once=retry_immediately_once) from e 

721 else: 

722 xprint(log, process.stdout, file=sys.stdout) 

723 xprint(log, process.stderr, file=sys.stderr) 

724 

725 

726def _clear_resumable_recv_state_if_necessary(job: Job, dst_dataset: str, stderr: str) -> bool: 

727 """Deletes leftover ZFS resume token state on the receiving dataset if necessary to continue operations. 

728 

729 To make resumable ZFS receive a reliable feature, we cope with the following ZFS facts: 

730 - A failed `zfs receive -s` prohibits the following subsequent operations, until the situation is explicitly resolved 

731 via a successful subsequent `zfs receive`, or cleared via `zfs receive -A`: 

732 - `zfs receive` without the resumable receive token (`zfs send -t <token>` is now required) 

733 - `zfs destroy <snapshot>` 

734 - `zfs rollback` 

735 - `zfs send -t` does not support sending more than a single snapshot; e.g. https://github.com/openzfs/zfs/issues/16764 

736 - A stale receive token prohibits subsequent `zfs send -t` if not handled (meanwhile, state changed on src or dst). 

737 - `zfs receive -A` fails if the receiving dataset has no ZFS resume token (anymore). 

738 """ 

739 

740 def clear_resumable_recv_state() -> bool: 

741 log.warning(p.dry("Aborting an interrupted zfs receive -s, deleting partially received state: %s"), dst_dataset) 

742 cmd: list[str] = p.split_args(f"{p.dst.sudo} {p.zfs_program} receive -A", dst_dataset) 

743 job.try_ssh_command(p.dst, LOG_TRACE, is_dry=p.dry_run, print_stdout=True, cmd=cmd) 

744 log.log(LOG_TRACE, p.dry("Done Aborting an interrupted zfs receive -s: %s"), dst_dataset) 

745 return True 

746 

747 p, log = job.params, job.params.log 

748 # No i18n needed here. OpenZFS ships no translation catalogs, so gettext falls back to English msgids and locale settings 

749 # have no effect. If translations ever appear, revisit this or inject LC_ALL=C. 

750 

751 # "cannot resume send: 'wb_src/tmp/src@s1' is no longer the same snapshot used in the initial send" 

752 # "cannot resume send: 'wb_src/tmp/src@s1' used in the initial send no longer exists" 

753 # "cannot resume send: incremental source 0xa000000000000000 no longer exists" 

754 if "cannot resume send" in stderr and ( 

755 "is no longer the same snapshot used in the initial send" in stderr 

756 or "used in the initial send no longer exists" in stderr 

757 or re.search(r"incremental source [0-9a-fx]+ no longer exists", stderr) 

758 ): 

759 return clear_resumable_recv_state() 

760 

761 # "cannot receive resume stream: incompatible embedded data stream feature with encrypted receive." 

762 # see https://github.com/openzfs/zfs/issues/12480 

763 # 'cannot receive new filesystem stream: destination xx contains partially-complete state from "zfs receive -s"' 

764 # this indicates that --no-resume-recv detects that dst contains a previously interrupted recv -s 

765 elif "cannot receive" in stderr and ( 

766 "cannot receive resume stream: incompatible embedded data stream feature with encrypted receive" in stderr 

767 or 'contains partially-complete state from "zfs receive -s"' in stderr 

768 ): 

769 return clear_resumable_recv_state() 

770 

771 elif ( # this signals normal behavior on interrupt of 'zfs receive -s' if running without --no-resume-recv 

772 "cannot receive new filesystem stream: checksum mismatch or incomplete stream" in stderr 

773 and "Partially received snapshot is saved" in stderr 

774 ): 

775 return True 

776 

777 # "cannot destroy 'wb_dest/tmp/dst@s1': snapshot has dependent clones ... use '-R' to destroy the following 

778 # datasets: wb_dest/tmp/dst/%recv" # see https://github.com/openzfs/zfs/issues/10439#issuecomment-642774560 

779 # This msg indicates a failed 'zfs destroy' via --delete-dst-snapshots. This "clone" is caused by a previously 

780 # interrupted 'zfs receive -s'. The fix used here is to delete the partially received state of said 

781 # 'zfs receive -s' via 'zfs receive -A', followed by an automatic retry, which will now succeed to delete the 

782 # snapshot without user intervention. 

783 elif ( 

784 "cannot destroy" in stderr 

785 and "snapshot has dependent clone" in stderr 

786 and "use '-R' to destroy the following dataset" in stderr 

787 and f"\n{dst_dataset}/%recv\n" in stderr 

788 ): 

789 return clear_resumable_recv_state() 

790 

791 # Same cause as above, except that this error can occur during 'zfs rollback' 

792 # Also see https://github.com/openzfs/zfs/blob/master/cmd/zfs/zfs_main.c 

793 elif ( 

794 "cannot rollback to" in stderr 

795 and "clones of previous snapshots exist" in stderr 

796 and "use '-R' to force deletion of the following clones and dependents" in stderr 

797 and f"\n{dst_dataset}/%recv\n" in stderr 

798 ): 

799 return clear_resumable_recv_state() 

800 

801 return False 

802 

803 

804def _recv_resume_token(job: Job, dst_dataset: str, retry_count: int) -> tuple[str | None, list[str], list[str]]: 

805 """Gets recv_resume_token ZFS property from dst_dataset and returns corresponding opts to use for send+recv.""" 

806 p, log = job.params, job.params.log 

807 if not p.resume_recv: 

808 return None, [], [] 

809 warning: str | None = None 

810 if not is_zpool_feature_enabled_or_active(p, p.dst, "feature@extensible_dataset"): 

811 warning = "not available on destination dataset" 

812 elif not p.is_program_available(ZFS_VERSION_IS_AT_LEAST_2_1_0, "dst"): 

813 warning = "unreliable as zfs version is too old" # e.g. zfs-0.8.3 "internal error: Unknown error 1040" 

814 if warning: 

815 log.warning(f"ZFS receive resume feature is {warning}. Falling back to --no-resume-recv: %s", dst_dataset) 

816 return None, [], [] 

817 recv_resume_token: str | None = None 

818 send_resume_opts: list[str] = [] 

819 if job.dst_dataset_exists[dst_dataset]: 

820 cmd: list[str] = p.split_args(f"{p.zfs_program} get -Hp -o value -s none receive_resume_token", dst_dataset) 

821 recv_resume_token = job.run_ssh_command(p.dst, LOG_TRACE, cmd=cmd).rstrip() 

822 if recv_resume_token == "-" or not recv_resume_token: # noqa: S105 

823 recv_resume_token = None 

824 else: 

825 send_resume_opts += ["-n"] if p.dry_run else [] 

826 send_resume_opts += ["-v"] if p.verbose_zfs else [] 

827 send_resume_opts += ["-t", recv_resume_token] 

828 recv_resume_opts = ["-s"] 

829 return recv_resume_token, send_resume_opts, recv_resume_opts 

830 

831 

832def _mbuffer_cmd(p: Params, loc: str, size_estimate_bytes: int, recordsize: int) -> str: 

833 """If mbuffer command is on the PATH, uses it in the ssh network pipe between 'zfs send' and 'zfs receive' to smooth out 

834 the rate of data flow and prevent bottlenecks caused by network latency or speed fluctuation.""" 

835 if ( 

836 (p.no_estimate_send_size or size_estimate_bytes >= p.min_pipe_transfer_size) 

837 and ( 

838 (loc == "src" and (p.src.is_nonlocal or p.dst.is_nonlocal)) 

839 or (loc == "dst" and (p.src.is_nonlocal or p.dst.is_nonlocal)) 

840 or (loc == "local" and p.src.is_nonlocal and p.dst.is_nonlocal) 

841 ) 

842 and p.is_program_available("mbuffer", loc) 

843 ): 

844 recordsize = max(recordsize, 2 * 1024 * 1024) 

845 return shlex.join([p.mbuffer_program, "-s", str(recordsize)] + p.mbuffer_program_opts) 

846 else: 

847 return "cat" 

848 

849 

850def _compress_cmd(p: Params, loc: str, size_estimate_bytes: int) -> str: 

851 """If zstd command is on the PATH, uses it in the ssh network pipe between 'zfs send' and 'zfs receive' to reduce network 

852 bottlenecks by sending compressed data.""" 

853 if ( 

854 (p.no_estimate_send_size or size_estimate_bytes >= p.min_pipe_transfer_size) 

855 and (p.src.is_nonlocal or p.dst.is_nonlocal) 

856 and p.is_program_available("zstd", loc) 

857 ): 

858 return shlex.join([p.compression_program, "-c"] + p.compression_program_opts) 

859 else: 

860 return "cat" 

861 

862 

863def _decompress_cmd(p: Params, loc: str, size_estimate_bytes: int) -> str: 

864 """Returns decompression command for network pipe if remote supports it.""" 

865 if ( 

866 (p.no_estimate_send_size or size_estimate_bytes >= p.min_pipe_transfer_size) 

867 and (p.src.is_nonlocal or p.dst.is_nonlocal) 

868 and p.is_program_available("zstd", loc) 

869 ): 

870 return shlex.join([p.compression_program, "-dc"]) 

871 else: 

872 return "cat" 

873 

874 

875_WORKER_THREAD_NUMBER_REGEX: Final[re.Pattern[str]] = re.compile(r"ThreadPoolExecutor-\d+_(\d+)") 

876 

877 

878def _pv_cmd( 

879 job: Job, loc: str, size_estimate_bytes: int, size_estimate_human: str, disable_progress_bar: bool = False 

880) -> str: 

881 """If pv command is on the PATH, monitors the progress of data transfer from 'zfs send' to 'zfs receive'; Progress can be 

882 viewed via "tail -f $pv_log_file" aka tail -f ~/bzfs-logs/current/current.pv or similar.""" 

883 p = job.params 

884 if p.is_program_available("pv", loc): 

885 size: str = f"--size={size_estimate_bytes}" 

886 if disable_progress_bar or p.no_estimate_send_size: 

887 size = "" 

888 pv_log_file: str = p.log_params.pv_log_file 

889 thread_name: str = threading.current_thread().name 

890 if match := _WORKER_THREAD_NUMBER_REGEX.fullmatch(thread_name): 

891 worker = int(match.group(1)) 

892 if worker > 0: 

893 pv_log_file += PV_FILE_THREAD_SEPARATOR + f"{worker:04}" 

894 if job.is_first_replication_task.get_and_set(False): 

895 if not p.log_params.quiet: 

896 job.progress_reporter.start() 

897 job.replication_start_time_nanos = time.monotonic_ns() 

898 if not p.log_params.quiet: 

899 with open_nofollow(pv_log_file, mode="a", encoding="utf-8", perm=FILE_PERMISSIONS) as fd: 

900 fd.write("\n") # mark start of new stream so ProgressReporter can reliably reset bytes_in_flight 

901 job.progress_reporter.enqueue_pv_log_file(pv_log_file) 

902 pv_program_opts: list[str] = [p.pv_program] + p.pv_program_opts 

903 if job.progress_update_intervals is not None: # for testing 

904 pv_program_opts += [f"--interval={job.progress_update_intervals[0]}"] 

905 pv_program_opts += ["--force", f"--name={size_estimate_human}"] 

906 pv_program_opts += [size] if size else [] 

907 return f"LC_ALL=C {shlex.join(pv_program_opts)} 2>> {shlex.quote(pv_log_file)}" 

908 else: 

909 return "cat" 

910 

911 

912def delete_snapshots(job: Job, remote: Remote, dataset: str, snapshot_tags: list[str]) -> None: 

913 """Deletes snapshots in manageable batches on the specified remote.""" 

914 if len(snapshot_tags) == 0: 

915 return 

916 p, log = job.params, job.params.log 

917 log.info(p.dry(f"Deleting {len(snapshot_tags)} snapshots within %s: %s"), dataset, snapshot_tags) 

918 # delete snapshots in batches without creating a command line that's too big for the OS to handle 

919 run_ssh_cmd_batched( 

920 job, 

921 remote, 

922 _delete_snapshot_cmd(p, remote, dataset + "@"), 

923 snapshot_tags, 

924 lambda batch: _delete_snapshot(job, remote, dataset, dataset + "@" + ",".join(batch)), 

925 max_batch_items=job.params.max_snapshots_per_minibatch_on_delete_snaps, 

926 sep=",", 

927 ) 

928 

929 

930def _delete_snapshot(job: Job, r: Remote, dataset: str, snapshots_to_delete: str) -> None: 

931 """Runs zfs destroy for a comma-separated snapshot list.""" 

932 p = job.params 

933 cmd: list[str] = _delete_snapshot_cmd(p, r, snapshots_to_delete) 

934 is_dry: bool = False # False is safe because we're using the 'zfs destroy -n' flag 

935 try: 

936 job.maybe_inject_error(cmd=cmd, error_trigger="zfs_delete_snapshot") 

937 job.run_ssh_command(r, LOG_DEBUG, is_dry=is_dry, print_stdout=True, cmd=cmd) 

938 except (subprocess.CalledProcessError, UnicodeDecodeError) as e: 

939 stderr: str = stderr_to_str(e.stderr) if hasattr(e, "stderr") else "" 

940 retry_immediately_once: bool = _clear_resumable_recv_state_if_necessary(job, dataset, stderr) 

941 # op isn't idempotent so retries regather current state from the start of delete_destination_snapshots() or similar 

942 raise RetryableError(display_msg="zfs destroy snapshot", retry_immediately_once=retry_immediately_once) from e 

943 

944 

945def _delete_snapshot_cmd(p: Params, r: Remote, snapshots_to_delete: str) -> list[str]: 

946 """Builds zfs destroy command for given snapshots.""" 

947 return p.split_args( 

948 f"{r.sudo} {p.zfs_program} destroy", p.force_hard, p.verbose_destroy, p.dry_run_destroy, snapshots_to_delete 

949 ) 

950 

951 

952def delete_bookmarks(job: Job, remote: Remote, dataset: str, snapshot_tags: list[str]) -> None: 

953 """Removes bookmarks individually since zfs lacks batch deletion.""" 

954 if len(snapshot_tags) == 0: 

955 return 

956 # Unfortunately ZFS has no syntax yet to delete multiple bookmarks in a single CLI invocation 

957 p, log = job.params, job.params.log 

958 log.info( 

959 p.dry(f"Deleting {len(snapshot_tags)} bookmarks within %s: %s"), dataset, dataset + "#" + ",".join(snapshot_tags) 

960 ) 

961 cmd: list[str] = p.split_args(f"{remote.sudo} {p.zfs_program} destroy") 

962 run_ssh_cmd_parallel( 

963 job, 

964 remote, 

965 [(cmd, (f"{dataset}#{snapshot_tag}" for snapshot_tag in snapshot_tags))], 

966 lambda _cmd, batch: job.try_ssh_command( 

967 remote, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=_cmd + batch, exists=False 

968 ), 

969 max_batch_items=1, 

970 ) 

971 

972 

973def delete_datasets(job: Job, remote: Remote, datasets: Iterable[str]) -> None: 

974 """Deletes the given datasets via zfs destroy -r on the given remote.""" 

975 # Impl is batch optimized to minimize CLI + network roundtrips: only need to run zfs destroy if previously 

976 # destroyed dataset (within sorted datasets) is not a prefix (aka ancestor) of current dataset 

977 p, log = job.params, job.params.log 

978 last_deleted_dataset: str = DONT_SKIP_DATASET 

979 for dataset in sorted(datasets): 

980 if is_descendant(dataset, of_root_dataset=last_deleted_dataset): 

981 continue 

982 log.info(p.dry("Deleting dataset tree: %s"), f"{dataset} ...") 

983 cmd: list[str] = p.split_args( 

984 f"{remote.sudo} {p.zfs_program} destroy -r {p.force_unmount} {p.force_hard} {p.verbose_destroy}", 

985 p.dry_run_destroy, 

986 dataset, 

987 ) 

988 is_dry: bool = False # False is safe because we're using the 'zfs destroy -n' flag 

989 job.run_ssh_command(remote, LOG_DEBUG, is_dry=is_dry, print_stdout=True, cmd=cmd) 

990 last_deleted_dataset = dataset 

991 

992 

993def _create_zfs_filesystem(job: Job, filesystem: str) -> None: 

994 """Creates destination filesystem hierarchies without mounting them.""" 

995 # zfs create -p -u $filesystem 

996 # To ensure the filesystems that we create do not get mounted, we apply a separate 'zfs create -p -u' 

997 # invocation for each non-existing ancestor. This is because a single 'zfs create -p -u' applies the '-u' 

998 # part only to the immediate filesystem, rather than to the not-yet existing ancestors. 

999 p = job.params 

1000 parent: str = "" 

1001 no_mount: str = "-u" if p.is_program_available(ZFS_VERSION_IS_AT_LEAST_2_1_0, "dst") else "" 

1002 for component in filesystem.split("/"): 

1003 parent += component 

1004 if not job.dst_dataset_exists[parent]: 

1005 cmd: list[str] = p.split_args(f"{p.dst.sudo} {p.zfs_program} create -p", no_mount, parent) 

1006 try: 

1007 job.run_ssh_command(p.dst, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=cmd) 

1008 except subprocess.CalledProcessError as e: 

1009 # ignore harmless error caused by 'zfs create' without the -u flag, or by dataset already existing 

1010 stderr: str = stderr_to_str(e.stderr) 

1011 if not ( 1011 ↛ 1017line 1011 didn't jump to line 1017 because the condition on line 1011 was never true

1012 "filesystem successfully created, but it may only be mounted by root" in stderr 

1013 or "filesystem successfully created, but not mounted" in stderr # SolarisZFS 

1014 or "dataset already exists" in stderr 

1015 or "filesystem already exists" in stderr # SolarisZFS? 

1016 ): 

1017 raise 

1018 if not p.dry_run: 

1019 job.dst_dataset_exists[parent] = True 

1020 parent += "/" 

1021 

1022 

1023def _create_zfs_bookmarks(job: Job, remote: Remote, dataset: str, snapshots: list[str]) -> None: 

1024 """Creates bookmarks for the given snapshots, using the 'zfs bookmark' CLI.""" 

1025 # Unfortunately ZFS has no syntax yet to create multiple bookmarks in a single CLI invocation 

1026 p = job.params 

1027 

1028 def create_zfs_bookmark(cmd: list[str]) -> None: 

1029 snapshot = cmd[-1] 

1030 assert "@" in snapshot 

1031 bookmark_cmd: list[str] = cmd + [replace_prefix(snapshot, old_prefix=f"{dataset}@", new_prefix=f"{dataset}#")] 

1032 try: 

1033 job.run_ssh_command(remote, LOG_DEBUG, is_dry=p.dry_run, print_stderr=False, cmd=bookmark_cmd) 

1034 except subprocess.CalledProcessError as e: 

1035 # ignore harmless zfs error caused by bookmark with the same name already existing 

1036 stderr: str = stderr_to_str(e.stderr) 

1037 if ": bookmark exists" not in stderr: 

1038 xprint(p.log, stderr, file=sys.stderr, end="") 

1039 raise 

1040 

1041 if p.create_bookmarks != "none" and are_bookmarks_enabled(p, remote): 

1042 cmd: list[str] = p.split_args(f"{remote.sudo} {p.zfs_program} bookmark") 

1043 run_ssh_cmd_parallel( 

1044 job, remote, [(cmd, snapshots)], lambda _cmd, batch: create_zfs_bookmark(_cmd + batch), max_batch_items=1 

1045 ) 

1046 

1047 

1048def _estimate_send_size(job: Job, remote: Remote, dst_dataset: str, recv_resume_token: str | None, *items: str) -> int: 

1049 """Estimates num bytes to transfer via 'zfs send -nvP'; Thread-safe.""" 

1050 p = job.params 

1051 if p.no_estimate_send_size: 1051 ↛ 1052line 1051 didn't jump to line 1052 because the condition on line 1051 was never true

1052 return 0 

1053 zfs_send_program_opts: list[str] = ["--parsable" if opt == "-P" else opt for opt in p.curr_zfs_send_program_opts] 

1054 zfs_send_program_opts = append_if_absent(zfs_send_program_opts, "-v", "-n", "--parsable") 

1055 if recv_resume_token: 

1056 zfs_send_program_opts += ["-t", recv_resume_token] 

1057 items = () 

1058 cmd: list[str] = p.split_args(f"{remote.sudo} {p.zfs_program} send", zfs_send_program_opts, items) 

1059 try: 

1060 lines: str | None = job.try_ssh_command(remote, LOG_TRACE, cmd=cmd) 

1061 except RetryableError as retryable_error: 

1062 assert retryable_error.__cause__ is not None 

1063 if recv_resume_token: 

1064 e = retryable_error.__cause__ 

1065 stderr: str = stderr_to_str(e.stderr) if hasattr(e, "stderr") else "" 

1066 retryable_error.retry_immediately_once = _clear_resumable_recv_state_if_necessary(job, dst_dataset, stderr) 

1067 # op isn't idempotent so retries regather current state from the start of replicate_dataset() 

1068 raise 

1069 if lines is None: 1069 ↛ 1070line 1069 didn't jump to line 1070 because the condition on line 1069 was never true

1070 return 0 # src dataset or snapshot has been deleted by third party 

1071 size: str = lines.splitlines()[-1] 

1072 assert size.startswith("size") 

1073 return int(size[size.index("\t") + 1 :]) 

1074 

1075 

1076def _estimate_send_sizes_in_parallel( 

1077 job: Job, 

1078 r: Remote, 

1079 dst_dataset: str, 

1080 recv_resume_token: str | None, 

1081 steps_todo: list[tuple[str, str, str, list[str]]], 

1082) -> list[int]: 

1083 """Estimates num bytes to transfer for multiple send steps; in parallel to reduce latency.""" 

1084 p = job.params 

1085 if p.no_estimate_send_size: 

1086 return [0 for _ in steps_todo] 

1087 

1088 def iterator_builder(executor: Executor) -> Iterable[Iterable[Future[int]]]: 

1089 resume_token: str | None = recv_resume_token 

1090 return [ 

1091 ( 

1092 executor.submit( 

1093 _estimate_send_size, job, r, dst_dataset, resume_token if i == 0 else None, incr_flag, from_snap, to_snap 

1094 ) 

1095 for i, (incr_flag, from_snap, to_snap, _to_snapshots) in enumerate(steps_todo) 

1096 ) 

1097 ] 

1098 

1099 max_workers: int = min(len(steps_todo), job.max_workers[r.location]) 

1100 return list( 

1101 parallel_iterator( 

1102 iterator_builder, max_workers=max_workers, ordered=True, is_terminated=job.termination_event.is_set 

1103 ) 

1104 ) 

1105 

1106 

1107def _zfs_set(job: Job, properties: list[str], remote: Remote, dataset: str) -> None: 

1108 """Applies the given property key=value pairs via 'zfs set' CLI to the given dataset on the given remote.""" 

1109 p = job.params 

1110 if len(properties) == 0: 

1111 return 

1112 # set properties in batches without creating a command line that's too big for the OS to handle 

1113 cmd: list[str] = p.split_args(f"{remote.sudo} {p.zfs_program} set") 

1114 run_ssh_cmd_batched( 

1115 job, 

1116 remote, 

1117 cmd, 

1118 properties, 

1119 lambda batch: job.run_ssh_command( 

1120 remote, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=cmd + batch + [dataset] 

1121 ), 

1122 max_batch_items=2**29, 

1123 ) 

1124 

1125 

1126def _zfs_get( 

1127 job: Job, 

1128 remote: Remote, 

1129 dataset: str, 

1130 sources: str, 

1131 output_columns: str, 

1132 propnames: str, 

1133 splitlines: bool, 

1134 props_cache: dict[tuple[str, str, str], dict[str, str | None]], 

1135) -> dict[str, str | None]: 

1136 """Returns the results of 'zfs get' CLI on the given dataset on the given remote.""" 

1137 assert dataset 

1138 assert sources 

1139 assert output_columns 

1140 if not propnames: 

1141 return {} 

1142 p = job.params 

1143 cache_key: tuple[str, str, str] = (sources, output_columns, propnames) 

1144 props: dict[str, str | None] | None = props_cache.get(cache_key) 

1145 if props is None: 

1146 cmd: list[str] = p.split_args(f"{p.zfs_program} get -Hp -o {output_columns} -s {sources} {propnames}", dataset) 

1147 lines: str = job.run_ssh_command(remote, LOG_TRACE, cmd=cmd) 

1148 is_name_value_pair: bool = "," in output_columns 

1149 props = {} 

1150 # if not splitlines: omit single trailing newline that was appended by 'zfs get' CLI 

1151 assert splitlines or len(lines) == 0 or lines[-1] == "\n" 

1152 for line in lines.splitlines() if splitlines else [lines[0:-1]]: 

1153 if is_name_value_pair: 

1154 propname, propvalue = line.split("\t", 1) 

1155 props[propname] = propvalue 

1156 else: 

1157 props[line] = None 

1158 props_cache[cache_key] = props 

1159 return props 

1160 

1161 

1162def _incremental_send_steps_wrapper( 

1163 p: Params, src_snapshots: list[str], src_guids: list[str], included_guids: set[str], is_resume: bool 

1164) -> list[tuple[str, str, str, list[str]]]: 

1165 """Returns incremental send steps, optionally converting -I to -i.""" 

1166 force_convert_I_to_i: bool = p.src.use_zfs_delegation and not getenv_bool("no_force_convert_I_to_i", True) # noqa: N806 

1167 # force_convert_I_to_i == True implies that: 

1168 # If using 'zfs allow' delegation mechanism, force convert 'zfs send -I' to a series of 

1169 # 'zfs send -i' as a workaround for zfs issue https://github.com/openzfs/zfs/issues/16394 

1170 return incremental_send_steps(src_snapshots, src_guids, included_guids, is_resume, force_convert_I_to_i) 

1171 

1172 

1173def _add_recv_property_options( 

1174 job: Job, full_send: bool, recv_opts: list[str], dataset: str, cache: dict[tuple[str, str, str], dict[str, str | None]] 

1175) -> tuple[list[str], list[str]]: 

1176 """Reads the ZFS properties of the given src dataset; Appends zfs recv -o and -x values to recv_opts according to CLI 

1177 params, and returns properties to explicitly set on the dst dataset after 'zfs receive' completes successfully.""" 

1178 p = job.params 

1179 set_opts: list[str] = [] 

1180 x_names: list[str] = p.zfs_recv_x_names 

1181 x_names_set: set[str] = set(x_names) 

1182 ox_names: set[str] = p.zfs_recv_ox_names.copy() 

1183 if p.is_program_available(ZFS_VERSION_IS_AT_LEAST_2_2_0, p.dst.location): 

1184 # workaround for https://github.com/openzfs/zfs/commit/b0269cd8ced242e66afc4fa856d62be29bb5a4ff 

1185 # 'zfs recv -x foo' on zfs < 2.2 errors out if the 'foo' property isn't contained in the send stream 

1186 for propname in x_names: 

1187 recv_opts.append("-x") 

1188 recv_opts.append(propname) 

1189 ox_names.update(x_names) # union 

1190 for config in [p.zfs_recv_o_config, p.zfs_recv_x_config, p.zfs_set_config]: 

1191 if len(config.include_regexes) == 0: 

1192 continue # this is the default - it's an instant noop 

1193 if (full_send and "full" in config.targets) or (not full_send and "incremental" in config.targets): 

1194 # 'zfs get' uses newline as record separator and tab as separator between output columns. A ZFS user property 

1195 # may contain newline and tab characters (indeed anything). Together, this means that there is no reliable 

1196 # way to determine where a record ends and the next record starts when listing multiple arbitrary records in 

1197 # a single 'zfs get' call. Therefore, here we use a separate 'zfs get' call for each ZFS user property. 

1198 # TODO: perf: on zfs >= 2.3 use json via zfs get -j to safely merge all zfs gets into one 'zfs get' call 

1199 try: 

1200 props_any: dict = _zfs_get(job, p.src, dataset, config.sources, "property", "all", True, cache) 

1201 props_filtered: dict = filter_properties(p, props_any, config.include_regexes, config.exclude_regexes) 

1202 user_propnames: list[str] = [name for name in props_filtered if ":" in name] 

1203 sys_propnames: str = ",".join(name for name in props_filtered if ":" not in name) 

1204 props: dict = _zfs_get(job, p.src, dataset, config.sources, "property,value", sys_propnames, True, cache) 

1205 for propnames in user_propnames: 

1206 props.update(_zfs_get(job, p.src, dataset, config.sources, "property,value", propnames, False, cache)) 

1207 except (subprocess.CalledProcessError, UnicodeDecodeError) as e: 

1208 raise RetryableError(display_msg="zfs get") from e 

1209 for propname in sorted(props.keys()): 

1210 if config is p.zfs_recv_o_config: 

1211 if not (propname in ox_names or propname in x_names_set): 

1212 recv_opts.append("-o") 

1213 recv_opts.append(f"{propname}={props[propname]}") 

1214 ox_names.add(propname) 

1215 elif config is p.zfs_recv_x_config: 

1216 if propname not in ox_names: 

1217 recv_opts.append("-x") 

1218 recv_opts.append(propname) 

1219 ox_names.add(propname) 

1220 else: 

1221 assert config is p.zfs_set_config 

1222 set_opts.append(f"{propname}={props[propname]}") 

1223 return recv_opts, set_opts 

1224 

1225 

1226def _check_zfs_dataset_busy(job: Job, remote: Remote, dataset: str, busy_if_send: bool = True) -> bool: 

1227 """Decline to start a state changing ZFS operation that is, although harmless, likely to collide with another currently 

1228 running process. Instead, retry the operation later, after some delay. For example, decline to start a 'zfs receive' into 

1229 a destination dataset if another process is already running another 'zfs receive' into the same destination dataset, as 

1230 ZFS would reject any such attempt. However, it's actually fine to run an incremental 'zfs receive' into a dataset in 

1231 parallel with a 'zfs send' out of the very same dataset. This also helps daisy chain use cases where A replicates to B, 

1232 and B replicates to C. 

1233 

1234 _check_zfs_dataset_busy() offers no guarantees, it merely proactively avoids likely collisions. In other words, even if 

1235 the process check below passes there is no guarantee that the destination dataset won't be busy by the time we actually 

1236 execute the 'zfs send' operation. In such an event ZFS will reject the operation, we'll detect that, and we'll simply 

1237 auto-retry, after some delay. _check_zfs_dataset_busy() can be disabled via --ps-program=-. 

1238 

1239 TLDR: As is common for long-running operations in distributed systems, we use coordination-free optimistic concurrency 

1240 control where the parties simply retry on collision detection (rather than coordinate concurrency via a remote lock 

1241 server). 

1242 """ 

1243 p, log = job.params, job.params.log 

1244 if not p.is_program_available("ps", remote.location): 

1245 return True 

1246 cmd: list[str] = p.split_args(f"{p.ps_program} -Ao args") 

1247 procs: list[str] = (job.try_ssh_command(remote, LOG_TRACE, cmd=cmd) or "").splitlines() 

1248 if job.inject_params.get("is_zfs_dataset_busy", False): 

1249 procs += ["sudo -n zfs receive -u -o foo:bar=/baz " + dataset] # for unit testing only 

1250 if not _is_zfs_dataset_busy(procs, dataset, busy_if_send=busy_if_send): 

1251 return True 

1252 op: str = "zfs {receive" + ("|send" if busy_if_send else "") + "} operation" 

1253 try: 

1254 die(f"Cannot continue now: Destination is already busy with {op} from another process: {dataset}") 

1255 except SystemExit as e: 

1256 log.warning("%s", e) 

1257 raise RetryableError("dst currently busy with zfs mutation op", display_msg="replication") from e 

1258 

1259 

1260_ZFS_DATASET_BUSY_PREFIX: Final[str] = r"(([^ ]*?/)?(sudo|doas)( +-n)? +)?([^ ]*?/)?zfs (receive|recv" 

1261_ZFS_DATASET_BUSY_IF_MODS: Final[re.Pattern[str]] = re.compile((_ZFS_DATASET_BUSY_PREFIX + ") .*").replace("(", "(?:")) 

1262_ZFS_DATASET_BUSY_IF_SEND: Final[re.Pattern[str]] = re.compile((_ZFS_DATASET_BUSY_PREFIX + "|send) .*").replace("(", "(?:")) 

1263 

1264 

1265def _is_zfs_dataset_busy(procs: list[str], dataset: str, busy_if_send: bool) -> bool: 

1266 """Checks if any process list entry indicates ZFS activity on dataset.""" 

1267 regex: re.Pattern[str] = _ZFS_DATASET_BUSY_IF_SEND if busy_if_send else _ZFS_DATASET_BUSY_IF_MODS 

1268 suffix: str = " " + dataset 

1269 infix: str = " " + dataset + "@" 

1270 return any((proc.endswith(suffix) or infix in proc) and regex.fullmatch(proc) for proc in procs)