Coverage for bzfs_main / replication.py: 99%

661 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-22 08:03 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""The core replication algorithm is in replicate_dataset(), which performs reliable full and/or incremental 'zfs send' and 

16'zfs receive' operations on snapshots, using resumable ZFS sends when possible. 

17 

18For replication of multiple datasets, including recursive replication, see bzfs.py/replicate_datasets(). 

19""" 

20 

21from __future__ import ( 

22 annotations, 

23) 

24import os 

25import re 

26import shlex 

27import subprocess 

28import sys 

29import threading 

30import time 

31from collections.abc import ( 

32 Iterable, 

33 Iterator, 

34) 

35from concurrent.futures import ( 

36 Executor, 

37 Future, 

38) 

39from subprocess import ( 

40 DEVNULL, 

41 PIPE, 

42) 

43from typing import ( 

44 TYPE_CHECKING, 

45 Final, 

46) 

47 

48from bzfs_main.argparse_actions import ( 

49 has_timerange_filter, 

50) 

51from bzfs_main.detect import ( 

52 ZFS_VERSION_IS_AT_LEAST_2_1_0, 

53 ZFS_VERSION_IS_AT_LEAST_2_2_0, 

54 are_bookmarks_enabled, 

55 is_zpool_feature_enabled_or_active, 

56) 

57from bzfs_main.filter import ( 

58 filter_properties, 

59 filter_snapshots, 

60) 

61from bzfs_main.incremental_send_steps import ( 

62 incremental_send_steps, 

63) 

64from bzfs_main.parallel_batch_cmd import ( 

65 run_ssh_cmd_batched, 

66 run_ssh_cmd_parallel, 

67) 

68from bzfs_main.progress_reporter import ( 

69 PV_FILE_THREAD_SEPARATOR, 

70) 

71from bzfs_main.util.connection import ( 

72 DEDICATED, 

73 SHARED, 

74 ConnectionPool, 

75 dquote, 

76 squote, 

77 timeout, 

78) 

79from bzfs_main.util.parallel_iterator import ( 

80 parallel_iterator, 

81 run_in_parallel, 

82) 

83from bzfs_main.util.retry import ( 

84 Retry, 

85 RetryableError, 

86) 

87from bzfs_main.util.utils import ( 

88 DONT_SKIP_DATASET, 

89 FILE_PERMISSIONS, 

90 LOG_DEBUG, 

91 LOG_TRACE, 

92 Subprocesses, 

93 append_if_absent, 

94 cut, 

95 die, 

96 getenv_bool, 

97 human_readable_bytes, 

98 is_descendant, 

99 list_formatter, 

100 open_nofollow, 

101 replace_prefix, 

102 stderr_to_str, 

103 xprint, 

104) 

105 

106if TYPE_CHECKING: # pragma: no cover - for type hints only 

107 from bzfs_main.bzfs import ( 

108 Job, 

109 ) 

110 from bzfs_main.configuration import ( 

111 Params, 

112 Remote, 

113 ) 

114 

115 

116# constants: 

117INJECT_DST_PIPE_FAIL_KBYTES: Final[int] = 400 # for testing only 

118_RIGHT_JUST: Final[int] = 7 

119 

120 

121def replicate_dataset(job: Job, src_dataset: str, tid: str, retry: Retry) -> bool: 

122 """Replicates src_dataset to dst_dataset (thread-safe); For replication of multiple datasets, including recursive 

123 replication, see bzfs.py/replicate_datasets().""" 

124 p, log = job.params, job.params.log 

125 src, dst = p.src, p.dst 

126 retry_count: int = retry.count 

127 dst_dataset: str = replace_prefix(src_dataset, old_prefix=src.root_dataset, new_prefix=dst.root_dataset) 

128 log.debug(p.dry(f"{tid} Replicating: %s"), f"{src_dataset} --> {dst_dataset} ...") 

129 

130 list_result: bool | tuple[list[str], list[str], list[str], set[str], str, str] = _list_and_filter_src_and_dst_snapshots( 

131 job, src_dataset, dst_dataset 

132 ) 

133 if isinstance(list_result, bool): 

134 return list_result 

135 ( 

136 basis_src_snapshots_with_guids, 

137 _src_snapshots_with_guids, 

138 dst_snapshots_with_guids, 

139 included_src_guids, 

140 latest_src_snapshot, 

141 oldest_src_snapshot, 

142 ) = list_result 

143 log.debug("latest_src_snapshot: %s", latest_src_snapshot) 

144 latest_dst_snapshot: str = "" 

145 latest_common_src_snapshot: str = "" 

146 done_checking: bool = False 

147 

148 if job.dst_dataset_exists[dst_dataset]: 

149 rollback_result: bool | tuple[str, str, bool] = _rollback_dst_dataset_if_necessary( 

150 job, 

151 dst_dataset, 

152 latest_src_snapshot, 

153 basis_src_snapshots_with_guids, 

154 dst_snapshots_with_guids, 

155 done_checking, 

156 tid, 

157 ) 

158 if isinstance(rollback_result, bool): 

159 return rollback_result 

160 latest_dst_snapshot, latest_common_src_snapshot, done_checking = rollback_result 

161 

162 log.debug("latest_common_src_snapshot: %s", latest_common_src_snapshot) # is a snapshot or bookmark 

163 log.log(LOG_TRACE, "latest_dst_snapshot: %s", latest_dst_snapshot) 

164 props_cache: dict[tuple[str, str, str], dict[str, str | None]] = {} # fresh empty ZFS props cache for each dataset 

165 dry_run_no_send: bool = False 

166 if not latest_common_src_snapshot: 

167 # no common snapshot exists; delete all dst snapshots and perform a full send of the oldest selected src snapshot 

168 full_result: tuple[str, bool, bool, int] = _replicate_dataset_fully( 

169 job, 

170 src_dataset, 

171 dst_dataset, 

172 oldest_src_snapshot, 

173 latest_src_snapshot, 

174 latest_dst_snapshot, 

175 dst_snapshots_with_guids, 

176 props_cache, 

177 dry_run_no_send, 

178 done_checking, 

179 retry_count, 

180 tid, 

181 ) 

182 # we have now created a common snapshot 

183 latest_common_src_snapshot, dry_run_no_send, done_checking, retry_count = full_result 

184 if latest_common_src_snapshot: 

185 # finally, incrementally replicate all selected snapshots from latest common snapshot until latest src snapshot 

186 _replicate_dataset_incrementally( 

187 job, 

188 src_dataset, 

189 dst_dataset, 

190 latest_common_src_snapshot, 

191 latest_src_snapshot, 

192 basis_src_snapshots_with_guids, 

193 included_src_guids, 

194 props_cache, 

195 dry_run_no_send, 

196 done_checking, 

197 retry_count, 

198 tid, 

199 ) 

200 return True 

201 

202 

203def _list_and_filter_src_and_dst_snapshots( 

204 job: Job, src_dataset: str, dst_dataset: str 

205) -> bool | tuple[list[str], list[str], list[str], set[str], str, str]: 

206 """On replication, list and filter src and dst snapshots.""" 

207 p, log = job.params, job.params.log 

208 src, dst = p.src, p.dst 

209 

210 # list GUID and name for dst snapshots, sorted ascending by createtxg (more precise than creation time) 

211 dst_cmd: list[str] = p.split_args(f"{p.zfs_program} list -t snapshot -d 1 -s createtxg -Hp -o guid,name", dst_dataset) 

212 

213 # list GUID and name for src snapshots + bookmarks, primarily sort ascending by transaction group (which is more 

214 # precise than creation time), secondarily sort such that snapshots appear after bookmarks for the same GUID. 

215 # Note: A snapshot and its ZFS bookmarks always have the same GUID, creation time and transaction group. A snapshot 

216 # changes its transaction group but retains its creation time and GUID on 'zfs receive' on another pool, i.e. 

217 # comparing createtxg is only meaningful within a single pool, not across pools from src to dst. Comparing creation 

218 # time remains meaningful across pools from src to dst. Creation time is a UTC Unix time in integer seconds. 

219 # Note that 'zfs create', 'zfs snapshot' and 'zfs bookmark' CLIs enforce that snapshot names must not contain a '#' 

220 # char, bookmark names must not contain a '@' char, and dataset names must not contain a '#' or '@' char. 

221 # GUID and creation time also do not contain a '#' or '@' char. 

222 filter_needs_creation_time: bool = has_timerange_filter(p.snapshot_filters) 

223 types: str = "snapshot,bookmark" if p.use_bookmark and are_bookmarks_enabled(p, src) else "snapshot" 

224 props: str = job.creation_prefix + "creation,guid,name" if filter_needs_creation_time else "guid,name" 

225 src_cmd = p.split_args(f"{p.zfs_program} list -t {types} -s createtxg -s type -d 1 -Hp -o {props}", src_dataset) 

226 job.maybe_inject_delete(src, dataset=src_dataset, delete_trigger="zfs_list_snapshot_src") 

227 src_snapshots_and_bookmarks, dst_snapshots_with_guids_str = run_in_parallel( # list src+dst snapshots in parallel 

228 lambda: job.try_ssh_command(src, LOG_TRACE, cmd=src_cmd), 

229 lambda: job.try_ssh_command(dst, LOG_TRACE, cmd=dst_cmd, error_trigger="zfs_list_snapshot_dst"), 

230 ) 

231 job.dst_dataset_exists[dst_dataset] = dst_snapshots_with_guids_str is not None 

232 dst_snapshots_with_guids: list[str] = (dst_snapshots_with_guids_str or "").splitlines() 

233 if src_snapshots_and_bookmarks is None: 

234 log.warning("Third party deleted source: %s", src_dataset) 

235 return False # src dataset has been deleted by some third party while we're running - nothing to do anymore 

236 src_snapshots_with_guids: list[str] = src_snapshots_and_bookmarks.splitlines() 

237 src_snapshots_and_bookmarks = None 

238 if len(dst_snapshots_with_guids) == 0 and "bookmark" in types: 

239 # src bookmarks serve no purpose if the destination dataset has no snapshot; ignore them 

240 src_snapshots_with_guids = [snapshot for snapshot in src_snapshots_with_guids if "@" in snapshot] 

241 num_src_snapshots_found: int = sum(1 for snapshot in src_snapshots_with_guids if "@" in snapshot) 

242 with job.stats_lock: 

243 job.num_snapshots_found += num_src_snapshots_found 

244 # apply include/exclude regexes to ignore irrelevant src snapshots 

245 basis_src_snapshots_with_guids: list[str] = src_snapshots_with_guids 

246 src_snapshots_with_guids = filter_snapshots(job, src_snapshots_with_guids) 

247 if filter_needs_creation_time: 

248 src_snapshots_with_guids = cut(field=2, lines=src_snapshots_with_guids) 

249 basis_src_snapshots_with_guids = cut(field=2, lines=basis_src_snapshots_with_guids) 

250 

251 # find oldest and latest "true" snapshot, as well as GUIDs of all snapshots and bookmarks. 

252 # a snapshot is "true" if it is not a bookmark. 

253 oldest_src_snapshot: str = "" 

254 latest_src_snapshot: str = "" 

255 included_src_guids: set[str] = set() 

256 for line in src_snapshots_with_guids: 

257 guid, snapshot = line.split("\t", 1) 

258 if "@" in snapshot: 

259 included_src_guids.add(guid) 

260 latest_src_snapshot = snapshot 

261 if not oldest_src_snapshot: 

262 oldest_src_snapshot = snapshot 

263 if len(src_snapshots_with_guids) == 0: 

264 if p.skip_missing_snapshots == "fail": 

265 die(f"Source dataset includes no snapshot: {src_dataset}. Consider using --skip-missing-snapshots=dataset") 

266 elif p.skip_missing_snapshots == "dataset": 

267 log.warning("Skipping source dataset because it includes no snapshot: %s", src_dataset) 

268 if p.recursive and not job.dst_dataset_exists[dst_dataset]: 

269 log.warning("Also skipping descendant datasets as dst dataset does not exist for %s", src_dataset) 

270 return job.dst_dataset_exists[dst_dataset] 

271 return ( 

272 basis_src_snapshots_with_guids, 

273 src_snapshots_with_guids, 

274 dst_snapshots_with_guids, 

275 included_src_guids, 

276 latest_src_snapshot, 

277 oldest_src_snapshot, 

278 ) 

279 

280 

281def _rollback_dst_dataset_if_necessary( 

282 job: Job, 

283 dst_dataset: str, 

284 latest_src_snapshot: str, 

285 src_snapshots_with_guids: list[str], 

286 dst_snapshots_with_guids: list[str], 

287 done_checking: bool, 

288 tid: str, 

289) -> bool | tuple[str, str, bool]: 

290 """On replication, rollback dst if necessary.""" 

291 p, log = job.params, job.params.log 

292 dst = p.dst 

293 latest_dst_snapshot: str = "" 

294 latest_dst_guid: str = "" 

295 if len(dst_snapshots_with_guids) > 0: 

296 latest_dst_guid, latest_dst_snapshot = dst_snapshots_with_guids[-1].split("\t", 1) 

297 if p.force_rollback_to_latest_snapshot: 

298 log.info(p.dry(f"{tid} Rolling back destination to most recent snapshot: %s"), latest_dst_snapshot) 

299 # rollback just in case the dst dataset was modified since its most recent snapshot 

300 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset) 

301 cmd: list[str] = p.split_args(f"{dst.sudo} {p.zfs_program} rollback", latest_dst_snapshot) 

302 job.try_ssh_command(dst, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=cmd, exists=False) 

303 elif latest_src_snapshot == "": 

304 log.info(f"{tid} Already-up-to-date: %s", dst_dataset) 

305 return True 

306 

307 # find most recent snapshot (or bookmark) that src and dst have in common - we'll start to replicate 

308 # from there up to the most recent src snapshot. any two snapshots are "common" iff their ZFS GUIDs (i.e. 

309 # contents) are equal. See https://github.com/openzfs/zfs/commit/305bc4b370b20de81eaf10a1cf724374258b74d1 

310 def latest_common_snapshot(snapshots_with_guids: list[str], intersect_guids: set[str]) -> tuple[str | None, str]: 

311 """Returns a true snapshot instead of its bookmark with the same GUID, per the sort order previously used for 'zfs 

312 list -s ...'.""" 

313 for _line in reversed(snapshots_with_guids): 

314 guid_, snapshot_ = _line.split("\t", 1) 

315 if guid_ in intersect_guids: 

316 return guid_, snapshot_ # can be a snapshot or bookmark 

317 return None, "" 

318 

319 latest_common_guid, latest_common_src_snapshot = latest_common_snapshot( 

320 src_snapshots_with_guids, set(cut(field=1, lines=dst_snapshots_with_guids)) 

321 ) 

322 log.debug("latest_common_src_snapshot: %s", latest_common_src_snapshot) # is a snapshot or bookmark 

323 log.log(LOG_TRACE, "latest_dst_snapshot: %s", latest_dst_snapshot) 

324 

325 if latest_common_src_snapshot and latest_common_guid != latest_dst_guid: 

326 # found latest common snapshot but dst has an even newer snapshot. rollback dst to that common snapshot. 

327 assert latest_common_guid 

328 _, latest_common_dst_snapshot = latest_common_snapshot(dst_snapshots_with_guids, {latest_common_guid}) 

329 if not (p.force_rollback_to_latest_common_snapshot or p.force): 

330 die( 

331 f"Conflict: Most recent destination snapshot {latest_dst_snapshot} is more recent than " 

332 f"most recent common snapshot {latest_common_dst_snapshot}. Rollback destination first, " 

333 "for example via --force-rollback-to-latest-common-snapshot (or --force) option." 

334 ) 

335 if p.force_once: 

336 p.force.value = False 

337 p.force_rollback_to_latest_common_snapshot.value = False 

338 log.info(p.dry(f"{tid} Rolling back destination to most recent common snapshot: %s"), latest_common_dst_snapshot) 

339 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset) 

340 cmd = p.split_args( 

341 f"{dst.sudo} {p.zfs_program} rollback -r {p.force_unmount} {p.force_hard}", latest_common_dst_snapshot 

342 ) 

343 try: 

344 job.run_ssh_command(dst, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=cmd) 

345 except (subprocess.CalledProcessError, UnicodeDecodeError) as e: 

346 stderr: str = stderr_to_str(e.stderr) if hasattr(e, "stderr") else "" 

347 retry_immediately_once: bool = _clear_resumable_recv_state_if_necessary(job, dst_dataset, stderr) 

348 # op isn't idempotent so retries regather current state from the start of replicate_dataset() 

349 raise RetryableError( 

350 "Subprocess failed", display_msg="zfs rollback", retry_immediately_once=retry_immediately_once 

351 ) from e 

352 

353 if latest_src_snapshot and latest_src_snapshot == latest_common_src_snapshot: 

354 log.info(f"{tid} Already up-to-date: %s", dst_dataset) 

355 return True 

356 return latest_dst_snapshot, latest_common_src_snapshot, done_checking 

357 

358 

359def _replicate_dataset_fully( 

360 job: Job, 

361 src_dataset: str, 

362 dst_dataset: str, 

363 oldest_src_snapshot: str, 

364 latest_src_snapshot: str, 

365 latest_dst_snapshot: str, 

366 dst_snapshots_with_guids: list[str], 

367 props_cache: dict[tuple[str, str, str], dict[str, str | None]], 

368 dry_run_no_send: bool, 

369 done_checking: bool, 

370 retry_count: int, 

371 tid: str, 

372) -> tuple[str, bool, bool, int]: 

373 """On replication, deletes all dst snapshots and performs a full send of the oldest selected src snapshot, which in turn 

374 creates a common snapshot.""" 

375 p, log = job.params, job.params.log 

376 src, dst = p.src, p.dst 

377 latest_common_src_snapshot: str = "" 

378 if latest_dst_snapshot: 

379 if not p.force: 

380 die( 

381 f"Conflict: No common snapshot found between {src_dataset} and {dst_dataset} even though " 

382 "destination has at least one snapshot. Aborting. Consider using --force option to first " 

383 "delete all existing destination snapshots in order to be able to proceed with replication." 

384 ) 

385 if p.force_once: 385 ↛ 387line 385 didn't jump to line 387 because the condition on line 385 was always true

386 p.force.value = False 

387 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset) 

388 # extract SNAPSHOT_TAG from GUID<TAB>DATASET@SNAPSHOT_TAG 

389 delete_snapshots(job, dst, dst_dataset, snapshot_tags=cut(2, separator="@", lines=dst_snapshots_with_guids)) 

390 if p.dry_run: 

391 # As we're in --dryrun (--force) mode this conflict resolution step (see above) wasn't really executed: 

392 # "no common snapshot was found. delete all dst snapshots". In turn, this would cause the subsequent 

393 # 'zfs receive -n' to fail with "cannot receive new filesystem stream: destination has snapshots; must 

394 # destroy them to overwrite it". So we skip the zfs send/receive step and keep on trucking. 

395 dry_run_no_send = True 

396 

397 # to start with, fully replicate oldest snapshot, which in turn creates a common snapshot 

398 if p.no_stream: 

399 oldest_src_snapshot = latest_src_snapshot 

400 if oldest_src_snapshot: 

401 if not job.dst_dataset_exists[dst_dataset]: 

402 # on destination, create parent filesystem and ancestors if they do not yet exist 

403 dst_dataset_parent: str = os.path.dirname(dst_dataset) 

404 if not job.dst_dataset_exists[dst_dataset_parent]: 

405 if p.dry_run: 

406 dry_run_no_send = True 

407 if dst_dataset_parent: 407 ↛ 410line 407 didn't jump to line 410 because the condition on line 407 was always true

408 _create_zfs_filesystem(job, dst_dataset_parent) 

409 

410 recv_resume_token_result: tuple[str | None, list[str], list[str]] = _recv_resume_token(job, dst_dataset, retry_count) 

411 recv_resume_token, send_resume_opts, recv_resume_opts = recv_resume_token_result 

412 curr_size: int = _estimate_send_size(job, src, dst_dataset, recv_resume_token, oldest_src_snapshot) 

413 humansize: str = _format_size(curr_size) 

414 if recv_resume_token: 

415 send_opts: list[str] = p.curr_zfs_send_program_opts + send_resume_opts # e.g. curr + ["-t", "1-c740b4779-..."] 

416 else: 

417 send_opts = p.curr_zfs_send_program_opts + [oldest_src_snapshot] 

418 send_cmd: list[str] = p.split_args(f"{src.sudo} {p.zfs_program} send", send_opts) 

419 recv_opts: list[str] = p.zfs_full_recv_opts.copy() + recv_resume_opts 

420 recv_opts, set_opts = _add_recv_property_options(job, True, recv_opts, src_dataset, props_cache) 

421 recv_cmd: list[str] = p.split_args( 

422 f"{dst.sudo} {p.zfs_program} receive -F", p.dry_run_recv, recv_opts, dst_dataset, allow_all=True 

423 ) 

424 log.info(p.dry(f"{tid} Full send: %s"), f"{oldest_src_snapshot} --> {dst_dataset} ({humansize.strip()}) ...") 

425 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset) 

426 dry_run_no_send = dry_run_no_send or p.dry_run_no_send 

427 job.maybe_inject_params(error_trigger="full_zfs_send_params") 

428 humansize = humansize.rjust(_RIGHT_JUST * 3 + 2) 

429 _run_zfs_send_receive( 

430 job, src_dataset, dst_dataset, send_cmd, recv_cmd, curr_size, humansize, dry_run_no_send, "full_zfs_send" 

431 ) 

432 latest_common_src_snapshot = oldest_src_snapshot # we have now created a common snapshot 

433 if not dry_run_no_send and not p.dry_run: 

434 job.dst_dataset_exists[dst_dataset] = True 

435 with job.stats_lock: 

436 job.num_snapshots_replicated += 1 

437 _create_zfs_bookmarks(job, src, src_dataset, [oldest_src_snapshot]) 

438 _zfs_set(job, set_opts, dst, dst_dataset) 

439 dry_run_no_send = dry_run_no_send or p.dry_run 

440 retry_count = 0 

441 

442 return latest_common_src_snapshot, dry_run_no_send, done_checking, retry_count 

443 

444 

445def _replicate_dataset_incrementally( 

446 job: Job, 

447 src_dataset: str, 

448 dst_dataset: str, 

449 latest_common_src_snapshot: str, 

450 latest_src_snapshot: str, 

451 basis_src_snapshots_with_guids: list[str], 

452 included_src_guids: set[str], 

453 props_cache: dict[tuple[str, str, str], dict[str, str | None]], 

454 dry_run_no_send: bool, 

455 done_checking: bool, 

456 retry_count: int, 

457 tid: str, 

458) -> None: 

459 """Incrementally replicates all selected snapshots from latest common snapshot until latest src snapshot.""" 

460 p, log = job.params, job.params.log 

461 src, dst = p.src, p.dst 

462 

463 def replication_candidates() -> tuple[list[str], list[str]]: 

464 assert len(basis_src_snapshots_with_guids) > 0 

465 result_snapshots: list[str] = [] 

466 result_guids: list[str] = [] 

467 last_appended_guid: str = "" 

468 snapshot_itr: Iterator[str] = reversed(basis_src_snapshots_with_guids) 

469 while True: 

470 guid, snapshot = next(snapshot_itr).split("\t", 1) 

471 if "@" in snapshot: 

472 result_snapshots.append(snapshot) 

473 result_guids.append(guid) 

474 last_appended_guid = guid 

475 if snapshot == latest_common_src_snapshot: # latest_common_src_snapshot is a snapshot or bookmark 

476 if guid != last_appended_guid and "@" not in snapshot: 

477 # only appends the src bookmark if it has no snapshot. If the bookmark has a snapshot then that 

478 # snapshot has already been appended, per the sort order previously used for 'zfs list -s ...' 

479 result_snapshots.append(snapshot) 

480 result_guids.append(guid) 

481 break 

482 result_snapshots.reverse() 

483 result_guids.reverse() 

484 assert len(result_snapshots) > 0 

485 assert len(result_snapshots) == len(result_guids) 

486 return result_guids, result_snapshots 

487 

488 # collect the most recent common snapshot (which may be a bookmark) followed by all src snapshots 

489 # (that are not a bookmark) that are more recent than that. 

490 cand_guids, cand_snapshots = replication_candidates() 

491 if len(cand_snapshots) == 1: 

492 # latest_src_snapshot is a (true) snapshot that is equal to latest_common_src_snapshot or LESS recent 

493 # than latest_common_src_snapshot. The latter case can happen if latest_common_src_snapshot is a 

494 # bookmark whose snapshot has been deleted on src. 

495 return # nothing more tbd 

496 

497 recv_resume_token_result: tuple[str | None, list[str], list[str]] = _recv_resume_token(job, dst_dataset, retry_count) 

498 recv_resume_token, send_resume_opts, recv_resume_opts = recv_resume_token_result 

499 recv_opts: list[str] = p.zfs_recv_program_opts.copy() + recv_resume_opts 

500 recv_opts, set_opts = _add_recv_property_options(job, False, recv_opts, src_dataset, props_cache) 

501 if p.no_stream: 

502 # skip intermediate snapshots 

503 steps_todo: list[tuple[str, str, str, list[str]]] = [ 

504 ("-i", latest_common_src_snapshot, latest_src_snapshot, [latest_src_snapshot]) 

505 ] 

506 else: 

507 # include intermediate src snapshots that pass --{include,exclude}-snapshot-* policy, using 

508 # a series of -i/-I send/receive steps that skip excluded src snapshots. 

509 steps_todo = _incremental_send_steps_wrapper( 

510 p, cand_snapshots, cand_guids, included_src_guids, recv_resume_token is not None 

511 ) 

512 log.log(LOG_TRACE, "steps_todo: %s", list_formatter(steps_todo, "; ")) 

513 estimate_send_sizes: list[int] = _estimate_send_sizes_in_parallel(job, src, dst_dataset, recv_resume_token, steps_todo) 

514 total_size: int = sum(estimate_send_sizes) 

515 total_num: int = sum(len(to_snapshots) for incr_flag, from_snap, to_snap, to_snapshots in steps_todo) 

516 done_size: int = 0 

517 done_num: int = 0 

518 for i, (incr_flag, from_snap, to_snap, to_snapshots) in enumerate(steps_todo): 

519 curr_num_snapshots: int = len(to_snapshots) 

520 curr_size: int = estimate_send_sizes[i] 

521 humansize: str = _format_size(total_size) + "/" + _format_size(done_size) + "/" + _format_size(curr_size) 

522 human_num: str = f"{total_num}/{done_num}/{curr_num_snapshots} snapshots" 

523 if recv_resume_token: 

524 send_opts: list[str] = p.curr_zfs_send_program_opts + send_resume_opts # e.g. curr + ["-t", "1-c740b4779-..."] 

525 else: 

526 send_opts = p.curr_zfs_send_program_opts + [incr_flag, from_snap, to_snap] 

527 send_cmd: list[str] = p.split_args(f"{src.sudo} {p.zfs_program} send", send_opts) 

528 recv_cmd: list[str] = p.split_args( 

529 f"{dst.sudo} {p.zfs_program} receive", p.dry_run_recv, recv_opts, dst_dataset, allow_all=True 

530 ) 

531 dense_size: str = p.two_or_more_spaces_regex.sub("", humansize.strip()) 

532 log.info( 

533 p.dry(f"{tid} Incremental send {incr_flag}: %s"), 

534 f"{from_snap} .. {to_snap[to_snap.index('@'):]} --> {dst_dataset} ({dense_size}) ({human_num}) ...", 

535 ) 

536 done_checking = done_checking or _check_zfs_dataset_busy(job, dst, dst_dataset, busy_if_send=False) 

537 if p.dry_run and not job.dst_dataset_exists[dst_dataset]: 

538 dry_run_no_send = True 

539 dry_run_no_send = dry_run_no_send or p.dry_run_no_send 

540 job.maybe_inject_params(error_trigger="incr_zfs_send_params") 

541 _run_zfs_send_receive( 

542 job, src_dataset, dst_dataset, send_cmd, recv_cmd, curr_size, humansize, dry_run_no_send, "incr_zfs_send" 

543 ) 

544 done_size += curr_size 

545 done_num += curr_num_snapshots 

546 recv_resume_token = None 

547 with job.stats_lock: 

548 job.num_snapshots_replicated += curr_num_snapshots 

549 assert p.create_bookmarks 

550 if p.create_bookmarks == "all": 

551 _create_zfs_bookmarks(job, src, src_dataset, to_snapshots) 

552 elif p.create_bookmarks != "none": 

553 threshold_millis: int = p.xperiods.label_milliseconds("_" + p.create_bookmarks) 

554 to_snapshots = [snap for snap in to_snapshots if p.xperiods.label_milliseconds(snap) >= threshold_millis] 

555 if i == len(steps_todo) - 1 and (len(to_snapshots) == 0 or to_snapshots[-1] != to_snap): 

556 to_snapshots.append(to_snap) # ensure latest common snapshot is bookmarked 

557 _create_zfs_bookmarks(job, src, src_dataset, to_snapshots) 

558 _zfs_set(job, set_opts, dst, dst_dataset) 

559 

560 

561def _format_size(num_bytes: int) -> str: 

562 """Formats a byte count for human-readable logs.""" 

563 return human_readable_bytes(num_bytes, separator="").rjust(_RIGHT_JUST) 

564 

565 

566def _prepare_zfs_send_receive( 

567 job: Job, src_dataset: str, send_cmd: list[str], recv_cmd: list[str], size_estimate_bytes: int, size_estimate_human: str 

568) -> tuple[str, str, str]: 

569 """Constructs zfs send/recv pipelines with optional compression and pv.""" 

570 p = job.params 

571 send_cmd_str: str = shlex.join(send_cmd) 

572 recv_cmd_str: str = shlex.join(recv_cmd) 

573 

574 if ( 

575 p.is_program_available("zstd", "src") 

576 and p.is_program_available("zstd", "dst") 

577 and p.is_program_available("sh", "src") 

578 and p.is_program_available("sh", "dst") 

579 ): 

580 compress_cmd_: str = _compress_cmd(p, "src", size_estimate_bytes) 

581 decompress_cmd_: str = _decompress_cmd(p, "dst", size_estimate_bytes) 

582 else: # no compression is used if source and destination do not both support compression 

583 compress_cmd_, decompress_cmd_ = "cat", "cat" 

584 

585 recordsize: int = abs(job.src_properties[src_dataset].recordsize) 

586 src_buffer: str = _mbuffer_cmd(p, "src", size_estimate_bytes, recordsize) 

587 dst_buffer: str = _mbuffer_cmd(p, "dst", size_estimate_bytes, recordsize) 

588 local_buffer: str = _mbuffer_cmd(p, "local", size_estimate_bytes, recordsize) 

589 

590 pv_src_cmd: str = "" 

591 pv_dst_cmd: str = "" 

592 pv_loc_cmd: str = "" 

593 if not p.src.ssh_user_host: 

594 pv_src_cmd = _pv_cmd(job, "local", size_estimate_bytes, size_estimate_human) 

595 elif not p.dst.ssh_user_host: 

596 pv_dst_cmd = _pv_cmd(job, "local", size_estimate_bytes, size_estimate_human) 

597 elif compress_cmd_ == "cat": 

598 pv_loc_cmd = _pv_cmd(job, "local", size_estimate_bytes, size_estimate_human) # compression disabled 

599 else: 

600 # pull-push mode with compression enabled: reporting "percent complete" isn't straightforward because 

601 # localhost observes the compressed data instead of the uncompressed data, so we disable the progress bar. 

602 pv_loc_cmd = _pv_cmd(job, "local", size_estimate_bytes, size_estimate_human, disable_progress_bar=True) 

603 

604 # assemble pipeline running on source leg 

605 src_pipe: str = "" 

606 if job.inject_params.get("inject_src_pipe_fail", False): 

607 # for testing; initially forward some bytes and then fail 

608 src_pipe = f"{src_pipe} | dd bs=64 count=1 2>/dev/null && false" 

609 if job.inject_params.get("inject_src_pipe_garble", False): 

610 src_pipe = f"{src_pipe} | base64" # for testing; forward garbled bytes 

611 if pv_src_cmd and pv_src_cmd != "cat": 

612 src_pipe = f"{src_pipe} | {pv_src_cmd}" 

613 if compress_cmd_ != "cat": 

614 src_pipe = f"{src_pipe} | {compress_cmd_}" 

615 if src_buffer != "cat": 

616 src_pipe = f"{src_pipe} | {src_buffer}" 

617 if src_pipe.startswith(" |"): 

618 src_pipe = src_pipe[2:] # strip leading ' |' part 

619 if job.inject_params.get("inject_src_send_error", False): 

620 send_cmd_str = f"{send_cmd_str} --injectedGarbageParameter" # for testing; induce CLI parse error 

621 if src_pipe: 

622 src_pipe = f"{send_cmd_str} | {src_pipe}" 

623 if p.src.ssh_user_host: 

624 src_pipe = p.shell_program + " -c " + dquote(src_pipe) 

625 else: 

626 src_pipe = send_cmd_str 

627 

628 # assemble pipeline running on middle leg between source and destination. only enabled for pull-push mode 

629 local_pipe: str = "" 

630 if local_buffer != "cat": 

631 local_pipe = f"{local_buffer}" 

632 if pv_loc_cmd and pv_loc_cmd != "cat": 

633 local_pipe = f"{local_pipe} | {pv_loc_cmd}" 

634 if local_buffer != "cat": 

635 local_pipe = f"{local_pipe} | {local_buffer}" 

636 if local_pipe.startswith(" |"): 

637 local_pipe = local_pipe[2:] # strip leading ' |' part 

638 if local_pipe: 

639 local_pipe = f"| {local_pipe}" 

640 

641 # assemble pipeline running on destination leg 

642 dst_pipe: str = "" 

643 if dst_buffer != "cat": 

644 dst_pipe = f"{dst_buffer}" 

645 if decompress_cmd_ != "cat": 

646 dst_pipe = f"{dst_pipe} | {decompress_cmd_}" 

647 if pv_dst_cmd and pv_dst_cmd != "cat": 

648 dst_pipe = f"{dst_pipe} | {pv_dst_cmd}" 

649 if job.inject_params.get("inject_dst_pipe_fail", False): 

650 # interrupt zfs receive for testing retry/resume; initially forward some bytes and then stop forwarding 

651 dst_pipe = f"{dst_pipe} | dd bs=1024 count={INJECT_DST_PIPE_FAIL_KBYTES} 2>/dev/null" 

652 if job.inject_params.get("inject_dst_pipe_garble", False): 

653 dst_pipe = f"{dst_pipe} | base64" # for testing; forward garbled bytes 

654 if dst_pipe.startswith(" |"): 

655 dst_pipe = dst_pipe[2:] # strip leading ' |' part 

656 if job.inject_params.get("inject_dst_receive_error", False): 

657 recv_cmd_str = f"{recv_cmd_str} --injectedGarbageParameter" # for testing; induce CLI parse error 

658 if dst_pipe: 

659 dst_pipe = f"{dst_pipe} | {recv_cmd_str}" 

660 if p.dst.ssh_user_host: 

661 dst_pipe = p.shell_program + " -c " + dquote(dst_pipe) 

662 else: 

663 dst_pipe = recv_cmd_str 

664 

665 # If there's no support for shell pipelines, we can't do compression, mbuffering, monitoring and rate-limiting, 

666 # so we fall back to simple zfs send/receive. 

667 if not p.is_program_available("sh", "src"): 

668 src_pipe = send_cmd_str 

669 if not p.is_program_available("sh", "dst"): 

670 dst_pipe = recv_cmd_str 

671 

672 src_pipe = squote(p.src, src_pipe) 

673 dst_pipe = squote(p.dst, dst_pipe) 

674 return src_pipe, local_pipe, dst_pipe 

675 

676 

677def _run_zfs_send_receive( 

678 job: Job, 

679 src_dataset: str, 

680 dst_dataset: str, 

681 send_cmd: list[str], 

682 recv_cmd: list[str], 

683 size_estimate_bytes: int, 

684 size_estimate_human: str, 

685 dry_run_no_send: bool, 

686 error_trigger: str | None = None, 

687) -> None: 

688 """Executes a zfs send/receive pipeline between source and destination.""" 

689 p, log = job.params, job.params.log 

690 pipes: tuple[str, str, str] = _prepare_zfs_send_receive( 

691 job, src_dataset, send_cmd, recv_cmd, size_estimate_bytes, size_estimate_human 

692 ) 

693 src_pipe, local_pipe, dst_pipe = pipes 

694 conn_pool_name: str = DEDICATED if p.dedicated_tcp_connection_per_zfs_send else SHARED 

695 src_conn_pool: ConnectionPool = p.connection_pools[p.src.location].pool(conn_pool_name) 

696 dst_conn_pool: ConnectionPool = p.connection_pools[p.dst.location].pool(conn_pool_name) 

697 with src_conn_pool.connection() as src_conn, dst_conn_pool.connection() as dst_conn: 

698 src_conn.refresh_ssh_connection_if_necessary(job) 

699 dst_conn.refresh_ssh_connection_if_necessary(job) 

700 src_ssh_cmd: str = " ".join(src_conn.ssh_cmd_quoted) 

701 dst_ssh_cmd: str = " ".join(dst_conn.ssh_cmd_quoted) 

702 cmd: list[str] = [p.shell_program_local, "-c", f"{src_ssh_cmd} {src_pipe} {local_pipe} | {dst_ssh_cmd} {dst_pipe}"] 

703 msg: str = "Would execute: %s" if dry_run_no_send else "Executing: %s" 

704 log.debug(msg, cmd[2].lstrip()) 

705 if not dry_run_no_send: 

706 try: 

707 job.maybe_inject_error(cmd=cmd, error_trigger=error_trigger) 

708 sp: Subprocesses = job.subprocesses 

709 process = sp.subprocess_run( 

710 cmd, stdin=DEVNULL, stdout=PIPE, stderr=PIPE, text=True, timeout=timeout(job), check=True, log=log 

711 ) 

712 except (subprocess.CalledProcessError, UnicodeDecodeError) as e: 

713 retry_immediately_once: bool = False 

714 if not isinstance(e, UnicodeDecodeError): 

715 xprint(log, stderr_to_str(e.stdout), file=sys.stdout) 

716 log.warning("%s", stderr_to_str(e.stderr).rstrip()) 

717 if isinstance(e, subprocess.CalledProcessError): 

718 retry_immediately_once = _clear_resumable_recv_state_if_necessary( 

719 job, dst_dataset, stderr_to_str(e.stderr) 

720 ) 

721 # op isn't idempotent so retries regather current state from the start of replicate_dataset() 

722 raise RetryableError( 

723 "Subprocess failed", display_msg="zfs send/receive", retry_immediately_once=retry_immediately_once 

724 ) from e 

725 else: 

726 xprint(log, process.stdout, file=sys.stdout) 

727 xprint(log, process.stderr, file=sys.stderr) 

728 

729 

730def _clear_resumable_recv_state_if_necessary(job: Job, dst_dataset: str, stderr: str) -> bool: 

731 """Deletes leftover ZFS resume token state on the receiving dataset if necessary to continue operations. 

732 

733 To make resumable ZFS receive a reliable feature, we cope with the following ZFS facts: 

734 - A failed `zfs receive -s` prohibits the following subsequent operations, until the situation is explicitly resolved 

735 via a successful subsequent `zfs receive`, or cleared via `zfs receive -A`: 

736 - `zfs receive` without the resumable receive token (`zfs send -t <token>` is now required) 

737 - `zfs destroy <snapshot>` 

738 - `zfs rollback` 

739 - `zfs send -t` does not support sending more than a single snapshot; e.g. https://github.com/openzfs/zfs/issues/16764 

740 - A stale receive token prohibits subsequent `zfs send -t` if not handled (meanwhile, state changed on src or dst). 

741 - `zfs receive -A` fails if the receiving dataset has no ZFS resume token (anymore). 

742 """ 

743 

744 def clear_resumable_recv_state() -> bool: 

745 log.warning(p.dry("Aborting an interrupted zfs receive -s, deleting partially received state: %s"), dst_dataset) 

746 cmd: list[str] = p.split_args(f"{p.dst.sudo} {p.zfs_program} receive -A", dst_dataset) 

747 job.try_ssh_command(p.dst, LOG_TRACE, is_dry=p.dry_run, print_stdout=True, cmd=cmd) 

748 log.log(LOG_TRACE, p.dry("Done Aborting an interrupted zfs receive -s: %s"), dst_dataset) 

749 return True 

750 

751 p, log = job.params, job.params.log 

752 # No i18n needed here. OpenZFS ships no translation catalogs, so gettext falls back to English msgids and locale settings 

753 # have no effect. If translations ever appear, revisit this or inject LC_ALL=C. 

754 

755 # "cannot resume send: 'wb_src/tmp/src@s1' is no longer the same snapshot used in the initial send" 

756 # "cannot resume send: 'wb_src/tmp/src@s1' used in the initial send no longer exists" 

757 # "cannot resume send: incremental source 0xa000000000000000 no longer exists" 

758 if "cannot resume send" in stderr and ( 

759 "is no longer the same snapshot used in the initial send" in stderr 

760 or "used in the initial send no longer exists" in stderr 

761 or re.search(r"incremental source [0-9a-fx]+ no longer exists", stderr) 

762 ): 

763 return clear_resumable_recv_state() 

764 

765 # "cannot receive resume stream: incompatible embedded data stream feature with encrypted receive." 

766 # see https://github.com/openzfs/zfs/issues/12480 

767 # 'cannot receive new filesystem stream: destination xx contains partially-complete state from "zfs receive -s"' 

768 # this indicates that --no-resume-recv detects that dst contains a previously interrupted recv -s 

769 elif "cannot receive" in stderr and ( 

770 "cannot receive resume stream: incompatible embedded data stream feature with encrypted receive" in stderr 

771 or 'contains partially-complete state from "zfs receive -s"' in stderr 

772 ): 

773 return clear_resumable_recv_state() 

774 

775 elif ( # this signals normal behavior on interrupt of 'zfs receive -s' if running without --no-resume-recv 

776 "cannot receive new filesystem stream: checksum mismatch or incomplete stream" in stderr 

777 and "Partially received snapshot is saved" in stderr 

778 ): 

779 return True 

780 

781 # "cannot destroy 'wb_dest/tmp/dst@s1': snapshot has dependent clones ... use '-R' to destroy the following 

782 # datasets: wb_dest/tmp/dst/%recv" # see https://github.com/openzfs/zfs/issues/10439#issuecomment-642774560 

783 # This msg indicates a failed 'zfs destroy' via --delete-dst-snapshots. This "clone" is caused by a previously 

784 # interrupted 'zfs receive -s'. The fix used here is to delete the partially received state of said 

785 # 'zfs receive -s' via 'zfs receive -A', followed by an automatic retry, which will now succeed to delete the 

786 # snapshot without user intervention. 

787 elif ( 

788 "cannot destroy" in stderr 

789 and "snapshot has dependent clone" in stderr 

790 and "use '-R' to destroy the following dataset" in stderr 

791 and f"\n{dst_dataset}/%recv\n" in stderr 

792 ): 

793 return clear_resumable_recv_state() 

794 

795 # Same cause as above, except that this error can occur during 'zfs rollback' 

796 # Also see https://github.com/openzfs/zfs/blob/master/cmd/zfs/zfs_main.c 

797 elif ( 

798 "cannot rollback to" in stderr 

799 and "clones of previous snapshots exist" in stderr 

800 and "use '-R' to force deletion of the following clones and dependents" in stderr 

801 and f"\n{dst_dataset}/%recv\n" in stderr 

802 ): 

803 return clear_resumable_recv_state() 

804 

805 return False 

806 

807 

808def _recv_resume_token(job: Job, dst_dataset: str, retry_count: int) -> tuple[str | None, list[str], list[str]]: 

809 """Gets recv_resume_token ZFS property from dst_dataset and returns corresponding opts to use for send+recv.""" 

810 p, log = job.params, job.params.log 

811 if not p.resume_recv: 

812 return None, [], [] 

813 warning: str | None = None 

814 if not is_zpool_feature_enabled_or_active(p, p.dst, "feature@extensible_dataset"): 

815 warning = "not available on destination dataset" 

816 elif not p.is_program_available(ZFS_VERSION_IS_AT_LEAST_2_1_0, "dst"): 

817 warning = "unreliable as zfs version is too old" # e.g. zfs-0.8.3 "internal error: Unknown error 1040" 

818 if warning: 

819 log.warning(f"ZFS receive resume feature is {warning}. Falling back to --no-resume-recv: %s", dst_dataset) 

820 return None, [], [] 

821 recv_resume_token: str | None = None 

822 send_resume_opts: list[str] = [] 

823 if job.dst_dataset_exists[dst_dataset]: 

824 cmd: list[str] = p.split_args(f"{p.zfs_program} get -Hp -o value -s none receive_resume_token", dst_dataset) 

825 recv_resume_token = job.run_ssh_command(p.dst, LOG_TRACE, cmd=cmd).rstrip() 

826 if recv_resume_token == "-" or not recv_resume_token: # noqa: S105 

827 recv_resume_token = None 

828 else: 

829 send_resume_opts += ["-n"] if p.dry_run else [] 

830 send_resume_opts += ["-v"] if p.verbose_zfs else [] 

831 send_resume_opts += ["-t", recv_resume_token] 

832 recv_resume_opts = ["-s"] 

833 return recv_resume_token, send_resume_opts, recv_resume_opts 

834 

835 

836def _mbuffer_cmd(p: Params, loc: str, size_estimate_bytes: int, recordsize: int) -> str: 

837 """If mbuffer command is on the PATH, uses it in the ssh network pipe between 'zfs send' and 'zfs receive' to smooth out 

838 the rate of data flow and prevent bottlenecks caused by network latency or speed fluctuation.""" 

839 if ( 

840 (p.no_estimate_send_size or size_estimate_bytes >= p.min_pipe_transfer_size) 

841 and ( 

842 (loc == "src" and (p.src.is_nonlocal or p.dst.is_nonlocal)) 

843 or (loc == "dst" and (p.src.is_nonlocal or p.dst.is_nonlocal)) 

844 or (loc == "local" and p.src.is_nonlocal and p.dst.is_nonlocal) 

845 ) 

846 and p.is_program_available("mbuffer", loc) 

847 ): 

848 recordsize = max(recordsize, 2 * 1024 * 1024) 

849 return shlex.join([p.mbuffer_program, "-s", str(recordsize)] + p.mbuffer_program_opts) 

850 else: 

851 return "cat" 

852 

853 

854def _compress_cmd(p: Params, loc: str, size_estimate_bytes: int) -> str: 

855 """If zstd command is on the PATH, uses it in the ssh network pipe between 'zfs send' and 'zfs receive' to reduce network 

856 bottlenecks by sending compressed data.""" 

857 if ( 

858 (p.no_estimate_send_size or size_estimate_bytes >= p.min_pipe_transfer_size) 

859 and (p.src.is_nonlocal or p.dst.is_nonlocal) 

860 and p.is_program_available("zstd", loc) 

861 ): 

862 return shlex.join([p.compression_program, "-c"] + p.compression_program_opts) 

863 else: 

864 return "cat" 

865 

866 

867def _decompress_cmd(p: Params, loc: str, size_estimate_bytes: int) -> str: 

868 """Returns decompression command for network pipe if remote supports it.""" 

869 if ( 

870 (p.no_estimate_send_size or size_estimate_bytes >= p.min_pipe_transfer_size) 

871 and (p.src.is_nonlocal or p.dst.is_nonlocal) 

872 and p.is_program_available("zstd", loc) 

873 ): 

874 return shlex.join([p.compression_program, "-dc"]) 

875 else: 

876 return "cat" 

877 

878 

879_WORKER_THREAD_NUMBER_REGEX: Final[re.Pattern[str]] = re.compile(r"ThreadPoolExecutor-\d+_(\d+)") 

880 

881 

882def _pv_cmd( 

883 job: Job, loc: str, size_estimate_bytes: int, size_estimate_human: str, disable_progress_bar: bool = False 

884) -> str: 

885 """If pv command is on the PATH, monitors the progress of data transfer from 'zfs send' to 'zfs receive'; Progress can be 

886 viewed via "tail -f $pv_log_file" aka tail -f ~/bzfs-logs/current/current.pv or similar.""" 

887 p = job.params 

888 if p.is_program_available("pv", loc): 

889 size: str = f"--size={size_estimate_bytes}" 

890 if disable_progress_bar or p.no_estimate_send_size: 

891 size = "" 

892 pv_log_file: str = p.log_params.pv_log_file 

893 thread_name: str = threading.current_thread().name 

894 if match := _WORKER_THREAD_NUMBER_REGEX.fullmatch(thread_name): 

895 worker = int(match.group(1)) 

896 if worker > 0: 

897 pv_log_file += PV_FILE_THREAD_SEPARATOR + f"{worker:04}" 

898 if job.is_first_replication_task.get_and_set(False): 

899 if not p.log_params.quiet: 

900 job.progress_reporter.start() 

901 job.replication_start_time_nanos = time.monotonic_ns() 

902 if not p.log_params.quiet: 

903 with open_nofollow(pv_log_file, mode="a", encoding="utf-8", perm=FILE_PERMISSIONS) as fd: 

904 fd.write("\n") # mark start of new stream so ProgressReporter can reliably reset bytes_in_flight 

905 job.progress_reporter.enqueue_pv_log_file(pv_log_file) 

906 pv_program_opts: list[str] = [p.pv_program] + p.pv_program_opts 

907 if job.progress_update_intervals is not None: # for testing 

908 pv_program_opts += [f"--interval={job.progress_update_intervals[0]}"] 

909 pv_program_opts += ["--force", f"--name={size_estimate_human}"] 

910 pv_program_opts += [size] if size else [] 

911 return f"LC_ALL=C {shlex.join(pv_program_opts)} 2>> {shlex.quote(pv_log_file)}" 

912 else: 

913 return "cat" 

914 

915 

916def delete_snapshots(job: Job, remote: Remote, dataset: str, snapshot_tags: list[str]) -> None: 

917 """Deletes snapshots in manageable batches on the specified remote.""" 

918 if len(snapshot_tags) == 0: 

919 return 

920 p, log = job.params, job.params.log 

921 log.info(p.dry(f"Deleting {len(snapshot_tags)} snapshots within %s: %s"), dataset, snapshot_tags) 

922 # delete snapshots in batches without creating a command line that's too big for the OS to handle 

923 run_ssh_cmd_batched( 

924 job, 

925 remote, 

926 _delete_snapshot_cmd(p, remote, dataset + "@"), 

927 snapshot_tags, 

928 lambda batch: _delete_snapshot(job, remote, dataset, dataset + "@" + ",".join(batch)), 

929 max_batch_items=job.params.max_snapshots_per_minibatch_on_delete_snaps, 

930 sep=",", 

931 ) 

932 

933 

934def _delete_snapshot(job: Job, r: Remote, dataset: str, snapshots_to_delete: str) -> None: 

935 """Runs zfs destroy for a comma-separated snapshot list.""" 

936 p = job.params 

937 cmd: list[str] = _delete_snapshot_cmd(p, r, snapshots_to_delete) 

938 is_dry: bool = False # False is safe because we're using the 'zfs destroy -n' flag 

939 try: 

940 job.maybe_inject_error(cmd=cmd, error_trigger="zfs_delete_snapshot") 

941 job.run_ssh_command(r, LOG_DEBUG, is_dry=is_dry, print_stdout=True, cmd=cmd) 

942 except (subprocess.CalledProcessError, UnicodeDecodeError) as e: 

943 stderr: str = stderr_to_str(e.stderr) if hasattr(e, "stderr") else "" 

944 retry_immediately_once: bool = _clear_resumable_recv_state_if_necessary(job, dataset, stderr) 

945 # op isn't idempotent so retries regather current state from the start 

946 raise RetryableError( 

947 "Subprocess failed", display_msg="zfs destroy snapshot", retry_immediately_once=retry_immediately_once 

948 ) from e 

949 

950 

951def _delete_snapshot_cmd(p: Params, r: Remote, snapshots_to_delete: str) -> list[str]: 

952 """Builds zfs destroy command for given snapshots.""" 

953 return p.split_args( 

954 f"{r.sudo} {p.zfs_program} destroy", p.force_hard, p.verbose_destroy, p.dry_run_destroy, snapshots_to_delete 

955 ) 

956 

957 

958def delete_bookmarks(job: Job, remote: Remote, dataset: str, snapshot_tags: list[str]) -> None: 

959 """Removes bookmarks individually since zfs lacks batch deletion.""" 

960 if len(snapshot_tags) == 0: 

961 return 

962 # Unfortunately ZFS has no syntax yet to delete multiple bookmarks in a single CLI invocation 

963 p, log = job.params, job.params.log 

964 log.info( 

965 p.dry(f"Deleting {len(snapshot_tags)} bookmarks within %s: %s"), dataset, dataset + "#" + ",".join(snapshot_tags) 

966 ) 

967 cmd: list[str] = p.split_args(f"{remote.sudo} {p.zfs_program} destroy") 

968 run_ssh_cmd_parallel( 

969 job, 

970 remote, 

971 [(cmd, (f"{dataset}#{snapshot_tag}" for snapshot_tag in snapshot_tags))], 

972 lambda _cmd, batch: job.try_ssh_command( 

973 remote, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=_cmd + batch, exists=False 

974 ), 

975 max_batch_items=1, 

976 ) 

977 

978 

979def delete_datasets(job: Job, remote: Remote, datasets: Iterable[str]) -> None: 

980 """Deletes the given datasets via zfs destroy -r on the given remote.""" 

981 # Impl is batch optimized to minimize CLI + network roundtrips: only need to run zfs destroy if previously 

982 # destroyed dataset (within sorted datasets) is not a prefix (aka ancestor) of current dataset 

983 p, log = job.params, job.params.log 

984 last_deleted_dataset: str = DONT_SKIP_DATASET 

985 for dataset in sorted(datasets): 

986 if is_descendant(dataset, of_root_dataset=last_deleted_dataset): 

987 continue 

988 log.info(p.dry("Deleting dataset tree: %s"), f"{dataset} ...") 

989 cmd: list[str] = p.split_args( 

990 f"{remote.sudo} {p.zfs_program} destroy -r {p.force_unmount} {p.force_hard} {p.verbose_destroy}", 

991 p.dry_run_destroy, 

992 dataset, 

993 ) 

994 is_dry: bool = False # False is safe because we're using the 'zfs destroy -n' flag 

995 job.run_ssh_command(remote, LOG_DEBUG, is_dry=is_dry, print_stdout=True, cmd=cmd) 

996 last_deleted_dataset = dataset 

997 

998 

999def _create_zfs_filesystem(job: Job, filesystem: str) -> None: 

1000 """Creates destination filesystem hierarchies without mounting them.""" 

1001 # zfs create -p -u $filesystem 

1002 # To ensure the filesystems that we create do not get mounted, we apply a separate 'zfs create -p -u' 

1003 # invocation for each non-existing ancestor. This is because a single 'zfs create -p -u' applies the '-u' 

1004 # part only to the immediate filesystem, rather than to the not-yet existing ancestors. 

1005 p = job.params 

1006 parent: str = "" 

1007 no_mount: str = "-u" if p.is_program_available(ZFS_VERSION_IS_AT_LEAST_2_1_0, "dst") else "" 

1008 for component in filesystem.split("/"): 

1009 parent += component 

1010 if not job.dst_dataset_exists[parent]: 

1011 cmd: list[str] = p.split_args(f"{p.dst.sudo} {p.zfs_program} create -p", no_mount, parent) 

1012 try: 

1013 job.run_ssh_command(p.dst, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=cmd) 

1014 except subprocess.CalledProcessError as e: 

1015 # ignore harmless error caused by 'zfs create' without the -u flag, or by dataset already existing 

1016 stderr: str = stderr_to_str(e.stderr) 

1017 if not ( 1017 ↛ 1023line 1017 didn't jump to line 1023 because the condition on line 1017 was never true

1018 "filesystem successfully created, but it may only be mounted by root" in stderr 

1019 or "filesystem successfully created, but not mounted" in stderr # SolarisZFS 

1020 or "dataset already exists" in stderr 

1021 or "filesystem already exists" in stderr # SolarisZFS? 

1022 ): 

1023 raise 

1024 if not p.dry_run: 

1025 job.dst_dataset_exists[parent] = True 

1026 parent += "/" 

1027 

1028 

1029def _create_zfs_bookmarks(job: Job, remote: Remote, dataset: str, snapshots: list[str]) -> None: 

1030 """Creates bookmarks for the given snapshots, using the 'zfs bookmark' CLI.""" 

1031 # Unfortunately ZFS has no syntax yet to create multiple bookmarks in a single CLI invocation 

1032 p = job.params 

1033 

1034 def create_zfs_bookmark(cmd: list[str]) -> None: 

1035 snapshot = cmd[-1] 

1036 assert "@" in snapshot 

1037 bookmark_cmd: list[str] = cmd + [replace_prefix(snapshot, old_prefix=f"{dataset}@", new_prefix=f"{dataset}#")] 

1038 try: 

1039 job.run_ssh_command(remote, LOG_DEBUG, is_dry=p.dry_run, print_stderr=False, cmd=bookmark_cmd) 

1040 except subprocess.CalledProcessError as e: 

1041 # ignore harmless zfs error caused by bookmark with the same name already existing 

1042 stderr: str = stderr_to_str(e.stderr) 

1043 if ": bookmark exists" not in stderr: 

1044 xprint(p.log, stderr, file=sys.stderr, end="") 

1045 raise 

1046 

1047 if p.create_bookmarks != "none" and are_bookmarks_enabled(p, remote): 

1048 cmd: list[str] = p.split_args(f"{remote.sudo} {p.zfs_program} bookmark") 

1049 run_ssh_cmd_parallel( 

1050 job, remote, [(cmd, snapshots)], lambda _cmd, batch: create_zfs_bookmark(_cmd + batch), max_batch_items=1 

1051 ) 

1052 

1053 

1054def _estimate_send_size(job: Job, remote: Remote, dst_dataset: str, recv_resume_token: str | None, *items: str) -> int: 

1055 """Estimates num bytes to transfer via 'zfs send -nvP'; Thread-safe.""" 

1056 p = job.params 

1057 if p.no_estimate_send_size: 1057 ↛ 1058line 1057 didn't jump to line 1058 because the condition on line 1057 was never true

1058 return 0 

1059 zfs_send_program_opts: list[str] = ["--parsable" if opt == "-P" else opt for opt in p.curr_zfs_send_program_opts] 

1060 zfs_send_program_opts = append_if_absent(zfs_send_program_opts, "-v", "-n", "--parsable") 

1061 if recv_resume_token: 

1062 zfs_send_program_opts += ["-t", recv_resume_token] 

1063 items = () 

1064 cmd: list[str] = p.split_args(f"{remote.sudo} {p.zfs_program} send", zfs_send_program_opts, items) 

1065 try: 

1066 lines: str | None = job.try_ssh_command(remote, LOG_TRACE, cmd=cmd) 

1067 except RetryableError as retryable_error: 

1068 assert retryable_error.__cause__ is not None 

1069 if recv_resume_token: 

1070 e = retryable_error.__cause__ 

1071 stderr: str = stderr_to_str(e.stderr) if hasattr(e, "stderr") else "" 

1072 retryable_error.retry_immediately_once = _clear_resumable_recv_state_if_necessary(job, dst_dataset, stderr) 

1073 # op isn't idempotent so retries regather current state from the start of replicate_dataset() 

1074 raise 

1075 if lines is None: 1075 ↛ 1076line 1075 didn't jump to line 1076 because the condition on line 1075 was never true

1076 return 0 # src dataset or snapshot has been deleted by third party 

1077 size: str = lines.splitlines()[-1] 

1078 assert size.startswith("size") 

1079 return int(size[size.index("\t") + 1 :]) 

1080 

1081 

1082def _estimate_send_sizes_in_parallel( 

1083 job: Job, 

1084 r: Remote, 

1085 dst_dataset: str, 

1086 recv_resume_token: str | None, 

1087 steps_todo: list[tuple[str, str, str, list[str]]], 

1088) -> list[int]: 

1089 """Estimates num bytes to transfer for multiple send steps; in parallel to reduce latency.""" 

1090 p = job.params 

1091 if p.no_estimate_send_size: 

1092 return [0 for _ in steps_todo] 

1093 

1094 def iterator_builder(executor: Executor) -> Iterable[Iterable[Future[int]]]: 

1095 resume_token: str | None = recv_resume_token 

1096 return [ 

1097 ( 

1098 executor.submit( 

1099 _estimate_send_size, job, r, dst_dataset, resume_token if i == 0 else None, incr_flag, from_snap, to_snap 

1100 ) 

1101 for i, (incr_flag, from_snap, to_snap, _to_snapshots) in enumerate(steps_todo) 

1102 ) 

1103 ] 

1104 

1105 max_workers: int = min(len(steps_todo), job.max_workers[r.location]) 

1106 return list( 

1107 parallel_iterator(iterator_builder, max_workers=max_workers, ordered=True, termination_event=job.termination_event) 

1108 ) 

1109 

1110 

1111def _zfs_set(job: Job, properties: list[str], remote: Remote, dataset: str) -> None: 

1112 """Applies the given property key=value pairs via 'zfs set' CLI to the given dataset on the given remote.""" 

1113 p = job.params 

1114 if len(properties) == 0: 

1115 return 

1116 # set properties in batches without creating a command line that's too big for the OS to handle 

1117 cmd: list[str] = p.split_args(f"{remote.sudo} {p.zfs_program} set") 

1118 run_ssh_cmd_batched( 

1119 job, 

1120 remote, 

1121 cmd, 

1122 properties, 

1123 lambda batch: job.run_ssh_command( 

1124 remote, LOG_DEBUG, is_dry=p.dry_run, print_stdout=True, cmd=cmd + batch + [dataset] 

1125 ), 

1126 max_batch_items=2**29, 

1127 ) 

1128 

1129 

1130def _zfs_get( 

1131 job: Job, 

1132 remote: Remote, 

1133 dataset: str, 

1134 sources: str, 

1135 output_columns: str, 

1136 propnames: str, 

1137 splitlines: bool, 

1138 props_cache: dict[tuple[str, str, str], dict[str, str | None]], 

1139) -> dict[str, str | None]: 

1140 """Returns the results of 'zfs get' CLI on the given dataset on the given remote.""" 

1141 assert dataset 

1142 assert sources 

1143 assert output_columns 

1144 if not propnames: 

1145 return {} 

1146 p = job.params 

1147 cache_key: tuple[str, str, str] = (sources, output_columns, propnames) 

1148 props: dict[str, str | None] | None = props_cache.get(cache_key) 

1149 if props is None: 

1150 cmd: list[str] = p.split_args(f"{p.zfs_program} get -Hp -o {output_columns} -s {sources} {propnames}", dataset) 

1151 lines: str = job.run_ssh_command(remote, LOG_TRACE, cmd=cmd) 

1152 is_name_value_pair: bool = "," in output_columns 

1153 props = {} 

1154 # if not splitlines: omit single trailing newline that was appended by 'zfs get' CLI 

1155 assert splitlines or len(lines) == 0 or lines[-1] == "\n" 

1156 for line in lines.splitlines() if splitlines else [lines[0:-1]]: 

1157 if is_name_value_pair: 

1158 propname, propvalue = line.split("\t", 1) 

1159 props[propname] = propvalue 

1160 else: 

1161 props[line] = None 

1162 props_cache[cache_key] = props 

1163 return props 

1164 

1165 

1166def _incremental_send_steps_wrapper( 

1167 p: Params, src_snapshots: list[str], src_guids: list[str], included_guids: set[str], is_resume: bool 

1168) -> list[tuple[str, str, str, list[str]]]: 

1169 """Returns incremental send steps, optionally converting -I to -i.""" 

1170 force_convert_I_to_i: bool = p.src.use_zfs_delegation and not getenv_bool("no_force_convert_I_to_i", True) # noqa: N806 

1171 # force_convert_I_to_i == True implies that: 

1172 # If using 'zfs allow' delegation mechanism, force convert 'zfs send -I' to a series of 

1173 # 'zfs send -i' as a workaround for zfs issue https://github.com/openzfs/zfs/issues/16394 

1174 return incremental_send_steps(src_snapshots, src_guids, included_guids, is_resume, force_convert_I_to_i) 

1175 

1176 

1177def _add_recv_property_options( 

1178 job: Job, full_send: bool, recv_opts: list[str], dataset: str, cache: dict[tuple[str, str, str], dict[str, str | None]] 

1179) -> tuple[list[str], list[str]]: 

1180 """Reads the ZFS properties of the given src dataset; Appends zfs recv -o and -x values to recv_opts according to CLI 

1181 params, and returns properties to explicitly set on the dst dataset after 'zfs receive' completes successfully.""" 

1182 p = job.params 

1183 set_opts: list[str] = [] 

1184 x_names: list[str] = p.zfs_recv_x_names 

1185 x_names_set: set[str] = set(x_names) 

1186 ox_names: set[str] = p.zfs_recv_ox_names.copy() 

1187 if p.is_program_available(ZFS_VERSION_IS_AT_LEAST_2_2_0, p.dst.location): 

1188 # workaround for https://github.com/openzfs/zfs/commit/b0269cd8ced242e66afc4fa856d62be29bb5a4ff 

1189 # 'zfs recv -x foo' on zfs < 2.2 errors out if the 'foo' property isn't contained in the send stream 

1190 for propname in x_names: 

1191 recv_opts.append("-x") 

1192 recv_opts.append(propname) 

1193 ox_names.update(x_names) # union 

1194 for config in [p.zfs_recv_o_config, p.zfs_recv_x_config, p.zfs_set_config]: 

1195 if len(config.include_regexes) == 0: 

1196 continue # this is the default - it's an instant noop 

1197 if (full_send and "full" in config.targets) or (not full_send and "incremental" in config.targets): 

1198 # 'zfs get' uses newline as record separator and tab as separator between output columns. A ZFS user property 

1199 # may contain newline and tab characters (indeed anything). Together, this means that there is no reliable 

1200 # way to determine where a record ends and the next record starts when listing multiple arbitrary records in 

1201 # a single 'zfs get' call. Therefore, here we use a separate 'zfs get' call for each ZFS user property. 

1202 # TODO: perf: on zfs >= 2.3 use json via zfs get -j to safely merge all zfs gets into one 'zfs get' call 

1203 try: 

1204 props_any: dict = _zfs_get(job, p.src, dataset, config.sources, "property", "all", True, cache) 

1205 props_filtered: dict = filter_properties(p, props_any, config.include_regexes, config.exclude_regexes) 

1206 user_propnames: list[str] = [name for name in props_filtered if ":" in name] 

1207 sys_propnames: str = ",".join(name for name in props_filtered if ":" not in name) 

1208 props: dict = _zfs_get(job, p.src, dataset, config.sources, "property,value", sys_propnames, True, cache) 

1209 for propnames in user_propnames: 

1210 props.update(_zfs_get(job, p.src, dataset, config.sources, "property,value", propnames, False, cache)) 

1211 except (subprocess.CalledProcessError, UnicodeDecodeError) as e: 

1212 raise RetryableError("Subprocess failed", display_msg="zfs get") from e 

1213 for propname in sorted(props.keys()): 

1214 if config is p.zfs_recv_o_config: 

1215 if not (propname in ox_names or propname in x_names_set): 

1216 recv_opts.append("-o") 

1217 recv_opts.append(f"{propname}={props[propname]}") 

1218 ox_names.add(propname) 

1219 elif config is p.zfs_recv_x_config: 

1220 if propname not in ox_names: 

1221 recv_opts.append("-x") 

1222 recv_opts.append(propname) 

1223 ox_names.add(propname) 

1224 else: 

1225 assert config is p.zfs_set_config 

1226 set_opts.append(f"{propname}={props[propname]}") 

1227 return recv_opts, set_opts 

1228 

1229 

1230def _check_zfs_dataset_busy(job: Job, remote: Remote, dataset: str, busy_if_send: bool = True) -> bool: 

1231 """Decline to start a state changing ZFS operation that is, although harmless, likely to collide with other currently 

1232 running processes. Instead, retry the operation later, after some delay. For example, decline to start a 'zfs receive' 

1233 into a destination dataset if another process is already running another 'zfs receive' into the same destination dataset, 

1234 as ZFS would reject any such attempt. However, it's actually fine to run an incremental 'zfs receive' into a dataset in 

1235 parallel with a 'zfs send' out of the very same dataset. This also helps daisy chain use cases where A replicates to B, 

1236 and B replicates to C. 

1237 

1238 _check_zfs_dataset_busy() offers no guarantees, it merely proactively avoids likely collisions. In other words, even if 

1239 the process check below passes there is no guarantee that the destination dataset won't be busy by the time we actually 

1240 execute the 'zfs send' operation. In such an event ZFS will reject the operation, we'll detect that, and we'll simply 

1241 retry, after some delay. _check_zfs_dataset_busy() can be disabled via --ps-program=-. 

1242 

1243 TLDR: As is common for long-running operations in distributed systems, we use coordination-free optimistic concurrency 

1244 control where the parties simply retry on collision detection (rather than coordinate concurrency via a remote lock 

1245 server). 

1246 """ 

1247 p, log = job.params, job.params.log 

1248 if not p.is_program_available("ps", remote.location): 

1249 return True 

1250 cmd: list[str] = p.split_args(f"{p.ps_program} -Ao args") 

1251 procs: list[str] = (job.try_ssh_command(remote, LOG_TRACE, cmd=cmd) or "").splitlines() 

1252 if job.inject_params.get("is_zfs_dataset_busy", False): 

1253 procs += ["sudo -n zfs receive -u -o foo:bar=/baz " + dataset] # for unit testing only 

1254 if not _is_zfs_dataset_busy(procs, dataset, busy_if_send=busy_if_send): 

1255 return True 

1256 op: str = "zfs {receive" + ("|send" if busy_if_send else "") + "} operation" 

1257 try: 

1258 die(f"Cannot continue now: Destination is already busy with {op} from another process: {dataset}") 

1259 except SystemExit as e: 

1260 log.warning("%s", e) 

1261 raise RetryableError("dst currently busy with zfs mutation op", display_msg="replication") from e 

1262 

1263 

1264_ZFS_DATASET_BUSY_PREFIX: Final[str] = r"(([^ ]*?/)?(sudo|doas)( +-n)? +)?([^ ]*?/)?zfs (receive|recv" 

1265_ZFS_DATASET_BUSY_IF_MODS: Final[re.Pattern[str]] = re.compile((_ZFS_DATASET_BUSY_PREFIX + ") .*").replace("(", "(?:")) 

1266_ZFS_DATASET_BUSY_IF_SEND: Final[re.Pattern[str]] = re.compile((_ZFS_DATASET_BUSY_PREFIX + "|send) .*").replace("(", "(?:")) 

1267 

1268 

1269def _is_zfs_dataset_busy(procs: list[str], dataset: str, busy_if_send: bool) -> bool: 

1270 """Checks if any process list entry indicates ZFS activity on dataset.""" 

1271 regex: re.Pattern[str] = _ZFS_DATASET_BUSY_IF_SEND if busy_if_send else _ZFS_DATASET_BUSY_IF_MODS 

1272 suffix: str = " " + dataset 

1273 infix: str = " " + dataset + "@" 

1274 return any((proc.endswith(suffix) or infix in proc) and regex.fullmatch(proc) for proc in procs)