Coverage for bzfs_main / util / parallel_tasktree.py: 100%

208 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-29 12:49 +0000

1# Copyright 2024 Wolfgang Hoschek AT mac DOT com 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15"""Fault-tolerant, dependency-aware workflow scheduling and execution of parallel operations, ensuring that ancestor datasets 

16finish before descendants start; The design maximizes throughput while preventing inconsistent dataset states during 

17replication or snapshot deletion. 

18 

19This module contains only generic scheduling and coordination (the "algorithm"). Error handling, retries, and skip policies 

20are customizable and implemented by callers via the CompletionCallback API or wrappers such as ``parallel_tasktree_policy``. 

21 

22Has zero dependencies beyond the Python standard library. 

23 

24Example Usage: 

25-------------- 

26 import logging 

27 from bzfs_main.util.parallel_tasktree import CompletionCallback, CompletionCallbackResult, ParallelTaskTree 

28 

29 datasets = ["a", "a/b", "a/b/c", "a/d", "e"] 

30 

31 def process_dataset(dataset: str, _submit_count: int) -> CompletionCallback: 

32 print(dataset) 

33 

34 def completion_callback(_todo_futures) -> CompletionCallbackResult: 

35 return CompletionCallbackResult(no_skip=True, fail=False) 

36 

37 return completion_callback 

38 

39 log = logging.getLogger(__name__) 

40 tasktree = ParallelTaskTree( 

41 log=log, 

42 datasets=datasets, 

43 process_dataset=process_dataset, 

44 max_workers=2, 

45 ) 

46 tasktree.process_datasets_in_parallel() 

47 

48 # Sample output: 

49 # a 

50 # e 

51 # a/b 

52 # a/d 

53 # a/b/c 

54""" 

55 

56from __future__ import ( 

57 annotations, 

58) 

59import concurrent 

60import heapq 

61import logging 

62import os 

63from concurrent.futures import ( 

64 FIRST_COMPLETED, 

65 Executor, 

66 Future, 

67) 

68from concurrent.futures.thread import ( 

69 ThreadPoolExecutor, 

70) 

71from typing import ( 

72 Callable, 

73 Final, 

74 NamedTuple, 

75 final, 

76) 

77 

78from bzfs_main.util.utils import ( 

79 Comparable, 

80 HashedInterner, 

81 SortedInterner, 

82 SynchronousExecutor, 

83 TaskTiming, 

84 has_duplicates, 

85 has_siblings, 

86) 

87 

88# constants: 

89BARRIER_CHAR: Final[str] = "~" 

90COMPONENT_SEPARATOR: Final[str] = "/" # ZFS dataset component separator 

91 

92 

93############################################################################# 

94@final 

95class CompletionCallbackResult(NamedTuple): 

96 """Result of a CompletionCallback invocation.""" 

97 

98 no_skip: bool 

99 """True enqueues children, False skips the subtree.""" 

100 

101 fail: bool 

102 """True marks overall run as failed.""" 

103 

104 

105############################################################################# 

106CompletionCallback = Callable[[set[Future["CompletionCallback"]]], CompletionCallbackResult] # Type alias 

107"""Callable that is run by the main coordination thread after a ``process_dataset()`` task finishes. 

108 

109Purpose: 

110- Decide follow-up scheduling after a ``process_dataset()`` task finished. 

111 

112Assumptions: 

113- Runs in the single coordination thread. 

114- May inspect and cancel in-flight futures to implement fail-fast semantics. 

115- If cancelling in-flight futures for tasks that spawn subprocesses (e.g. via subprocess.run()), callers should also 

116consider terminating the corresponding process subtree to avoid child processes lingering longer than desired. Skipping 

117termination will not hang the scheduler (workers will complete naturally), but those subprocesses may outlive cancellation 

118until they exit or time out. 

119""" 

120 

121 

122############################################################################# 

123@final 

124class ParallelTaskTree: 

125 """Main class for dependency-aware workflow scheduling of dataset jobs with optional barriers and priority ordering.""" 

126 

127 def __init__( 

128 self, 

129 *, 

130 log: logging.Logger, 

131 datasets: list[str], # (sorted) list of datasets to process 

132 process_dataset: Callable[[str, int], CompletionCallback], # lambda: dataset, tid; must be thread-safe 

133 priority: Callable[[str], Comparable] = lambda dataset: dataset, # lexicographical order by default 

134 max_workers: int = os.cpu_count() or 1, 

135 executors: Callable[[], Executor] | None = None, # factory producing Executor; None means 'auto-choose' 

136 interval_nanos: Callable[ 

137 [int, str, int], int 

138 ] = lambda last_update_nanos, dataset, submit_count: 0, # optionally spread tasks out over time; e.g. for jitter 

139 timing: TaskTiming = TaskTiming(), # noqa: B008 

140 enable_barriers: bool | None = None, # for testing only; None means 'auto-detect' 

141 barrier_name: str = BARRIER_CHAR, 

142 is_test_mode: bool = False, 

143 ) -> None: 

144 """Prepares to process datasets in parallel with dependency-aware workflow scheduling and fault tolerance. 

145 

146 This class orchestrates parallel execution of dataset operations while maintaining strict hierarchical dependencies. 

147 Processing of a dataset only starts after processing of all its ancestor datasets has completed, ensuring data 

148 consistency during operations like ZFS replication or snapshot deletion. 

149 

150 Purpose: 

151 -------- 

152 - Process hierarchical datasets in parallel while respecting parent-child dependencies 

153 - Provide dependency-aware workflow scheduling; error handling and retries are implemented by callers via 

154 ``CompletionCallback`` or thin wrappers 

155 - Maximize throughput by processing independent dataset subtrees in parallel 

156 - Support complex job scheduling patterns via optional barrier synchronization 

157 

158 Assumptions: 

159 ----------------- 

160 - Input `datasets` list is sorted in lexicographical order (enforced in test mode) 

161 - Input `datasets` list contains no duplicate entries (enforced in test mode) 

162 - Input `datasets` list contains no empty dataset names and none that start with '/' 

163 - Dataset hierarchy is determined by slash-separated path components 

164 - The `process_dataset` callable is thread-safe and can be executed in parallel 

165 

166 Design Rationale: 

167 ----------------- 

168 - The implementation uses a priority queue-based scheduler that maintains two key invariants: 

169 

170 - Dependency Ordering: Children are only made available for start of processing after their parent completes, 

171 preventing inconsistent dataset states. 

172 

173 - Priority: Among the datasets available for start of processing, the "smallest" is always processed next, according 

174 to a customizable priority callback function, which by default sorts by lexicographical order (not dataset size), 

175 ensuring more deterministic execution order. 

176 

177 Algorithm Selection: 

178 -------------------- 

179 - Simple Algorithm (default): Used when no barriers ('~') are detected in dataset names. Provides efficient 

180 scheduling for standard parent-child dependencies via recursive child enqueueing after parent completion. 

181 

182 - Barrier Algorithm (advanced): Activated when barriers are detected or explicitly enabled. Supports complex 

183 synchronization scenarios where jobs must wait for completion of entire subtrees before proceeding. Essential 

184 for advanced job scheduling patterns like "complete all parallel replications before starting pruning phase." 

185 

186 - Both algorithms are CPU and memory efficient. They require main memory proportional to the number of datasets 

187 (~400 bytes per dataset), and easily scale to millions of datasets. Time complexity is O(N log N), where 

188 N is the number of datasets. 

189 

190 Concurrency Design: 

191 ------------------- 

192 By default uses ThreadPoolExecutor with configurable worker limits to balance parallelism against resource 

193 consumption. Optionally, plug in a custom Executor to submit tasks to scale-out clusters via frameworks like 

194 Ray Core or Dask, etc. 

195 The single-threaded coordination loop prevents race conditions while worker threads execute dataset operations in 

196 parallel. 

197 

198 Params: 

199 ------- 

200 - datasets: Sorted list of dataset names to process (must not contain duplicates) 

201 - process_dataset: Thread-safe Callback function to execute on each dataset; returns a CompletionCallback determining 

202 if to fail or skip subtree on error; CompletionCallback runs in the (single) main thread as part of the 

203 coordination loop. 

204 - priority: Callback function to determine dataset processing order; defaults to lexicographical order. 

205 - interval_nanos: Callback that returns a non-negative delay (ns) to add to ``next_update_nanos`` for 

206 jitter/back-pressure control; arguments are ``(last_update_nanos, dataset, submit_count)`` 

207 - max_workers: Maximum number of parallel worker threads 

208 - executors: Factory returning an Executor to submit tasks to; None means 'auto-choose' 

209 - enable_barriers: Force enable/disable barrier algorithm (None = auto-detect) 

210 - barrier_name: Directory name that denotes a barrier within dataset/job strings (default '~'); must be non-empty and 

211 not contain '/' 

212 - timing: Optionally request early async termination; stops new submissions and cancels in-flight tasks 

213 """ 

214 assert log is not None 

215 assert (not is_test_mode) or datasets == sorted(datasets), "List is not sorted" 

216 assert (not is_test_mode) or not has_duplicates(datasets), "List contains duplicates" 

217 if COMPONENT_SEPARATOR in barrier_name or not barrier_name: 

218 raise ValueError(f"Invalid barrier_name: {barrier_name}") 

219 for dataset in datasets: 

220 if dataset.startswith(COMPONENT_SEPARATOR) or not dataset: 

221 raise ValueError(f"Invalid dataset name: {dataset}") 

222 assert callable(process_dataset) 

223 assert callable(priority) 

224 assert max_workers > 0 

225 assert callable(interval_nanos) 

226 has_barrier: Final[bool] = any(barrier_name in dataset.split(COMPONENT_SEPARATOR) for dataset in datasets) 

227 assert (enable_barriers is not False) or not has_barrier, "Barrier seen in datasets but barriers explicitly disabled" 

228 

229 self._barriers_enabled: Final[bool] = bool(has_barrier or enable_barriers) 

230 self._barrier_name: Final[str] = barrier_name 

231 self._log: Final[logging.Logger] = log 

232 self._datasets: Final[list[str]] = datasets 

233 self._process_dataset: Final[Callable[[str, int], CompletionCallback]] = process_dataset 

234 self._priority: Final[Callable[[str], Comparable]] = priority 

235 self._max_workers: Final[int] = max_workers 

236 self._interval_nanos: Final[Callable[[int, str, int], int]] = interval_nanos 

237 self._timing: Final[TaskTiming] = timing 

238 self._is_test_mode: Final[bool] = is_test_mode 

239 self._priority_queue: Final[list[_TreeNode]] = [] 

240 self._tree: Final[_Tree] = _build_dataset_tree(datasets) # tree consists of nested dictionaries and is immutable 

241 self._empty_barrier: Final[_TreeNode] = _make_tree_node("empty_barrier", "empty_barrier", {}) # immutable! 

242 self._datasets_set: Final[SortedInterner[str]] = SortedInterner(datasets) # reduces memory footprint 

243 if executors is None: 

244 is_parallel: bool = max_workers > 1 and len(datasets) > 1 and has_siblings(datasets) # siblings can run in par 

245 

246 def _default_executor_factory() -> Executor: 

247 return ThreadPoolExecutor(max_workers) if is_parallel else SynchronousExecutor() 

248 

249 executors = _default_executor_factory 

250 self._executors: Final[Callable[[], Executor]] = executors 

251 assert callable(executors) 

252 

253 def process_datasets_in_parallel(self) -> bool: 

254 """Executes the configured tasks and returns True if any dataset processing failed, False if all succeeded.""" 

255 self._build_priority_queue() 

256 executor: Executor = self._executors() 

257 with executor: 

258 todo_futures: set[Future[CompletionCallback]] = set() 

259 future_to_node: dict[Future[CompletionCallback], _TreeNode] = {} 

260 submit_count: int = 0 

261 timing: TaskTiming = self._timing 

262 next_update_nanos: int = timing.monotonic_ns() 

263 wait_timeout: float | None = None 

264 failed: bool = False 

265 

266 def submit_datasets() -> bool: 

267 """Submits available datasets to worker threads and returns False if all tasks have been completed.""" 

268 nonlocal wait_timeout 

269 wait_timeout = None # indicates to use blocking flavor of concurrent.futures.wait() 

270 while len(self._priority_queue) > 0 and len(todo_futures) < self._max_workers: 

271 # pick "smallest" dataset (wrt. sort order) available for start of processing; submit to thread pool 

272 nonlocal next_update_nanos 

273 sleep_nanos: int = next_update_nanos - timing.monotonic_ns() 

274 if sleep_nanos > 0: 

275 timing.sleep(sleep_nanos) # allow early wakeup on async termination 

276 if timing.is_terminated(): 

277 break 

278 if sleep_nanos > 0 and len(todo_futures) > 0: 

279 wait_timeout = 0 # indicates to use non-blocking flavor of concurrent.futures.wait() 

280 # It's possible an even "smaller" dataset (wrt. sort order) has become available while we slept. 

281 # If so it's preferable to submit to the thread pool the smaller one first. 

282 break # break out of loop to check if that's the case via non-blocking concurrent.futures.wait() 

283 node: _TreeNode = heapq.heappop(self._priority_queue) # pick "smallest" dataset (wrt. sort order) 

284 nonlocal submit_count 

285 submit_count += 1 

286 next_update_nanos += max(0, self._interval_nanos(next_update_nanos, node.dataset, submit_count)) 

287 future: Future[CompletionCallback] = executor.submit(self._process_dataset, node.dataset, submit_count) 

288 future_to_node[future] = node 

289 todo_futures.add(future) 

290 return len(todo_futures) > 0 and not timing.is_terminated() 

291 

292 def complete_datasets() -> None: 

293 """Waits for completed futures, processes results and errors, then enqueues follow-up tasks per policy.""" 

294 nonlocal failed 

295 nonlocal todo_futures 

296 done_futures: set[Future[CompletionCallback]] 

297 done_futures, todo_futures = concurrent.futures.wait(todo_futures, wait_timeout, return_when=FIRST_COMPLETED) 

298 for done_future in done_futures: 

299 done_node: _TreeNode = future_to_node.pop(done_future) 

300 c_callback: CompletionCallback = done_future.result() # does not block as processing already completed 

301 c_callback_result: CompletionCallbackResult = c_callback(todo_futures) 

302 no_skip: bool = c_callback_result.no_skip 

303 fail: bool = c_callback_result.fail 

304 failed = failed or fail 

305 self._complete_dataset(done_node, no_skip=no_skip) 

306 

307 # coordination loop; runs in the (single) main thread; submits tasks to worker threads and handles their results 

308 while submit_datasets(): 

309 complete_datasets() 

310 

311 if timing.is_terminated(): 

312 for todo_future in todo_futures: 

313 todo_future.cancel() 

314 failed = failed or len(self._priority_queue) > 0 or len(todo_futures) > 0 

315 self._priority_queue.clear() 

316 todo_futures.clear() 

317 future_to_node.clear() 

318 assert len(self._priority_queue) == 0 

319 assert len(todo_futures) == 0 

320 assert len(future_to_node) == 0 

321 return failed 

322 

323 def _build_priority_queue(self) -> None: 

324 """Builds and fills initial priority queue of available root nodes for this task tree, ensuring the scheduler starts 

325 from a synthetic root node while honoring barriers; the synthetic root simplifies enqueueing logic.""" 

326 self._priority_queue.clear() 

327 root_node: _TreeNode = _make_tree_node(priority="", dataset="", children=self._tree) 

328 self._complete_dataset(root_node, no_skip=True) 

329 

330 def _complete_dataset(self, node: _TreeNode, no_skip: bool) -> None: 

331 """Enqueues child nodes for start of processing, using the appropriate algorithm.""" 

332 if self._barriers_enabled: # This barrier-based algorithm is for more general job scheduling, as in bzfs_jobrunner 

333 self._complete_dataset_with_barriers(node, no_skip=no_skip) 

334 elif no_skip: # This simple algorithm is sufficient for most uses 

335 self._simple_enqueue_children(node) 

336 

337 def _simple_enqueue_children(self, node: _TreeNode) -> None: 

338 """Enqueues child nodes for start of processing (using iteration to avoid potentially hitting recursion limits).""" 

339 stack: list[_TreeNode] = [node] 

340 while stack: 

341 current_node: _TreeNode = stack.pop() 

342 for child, grandchildren in current_node.children.items(): # as processing of parent has now completed 

343 child_abs_dataset: str = self._join_dataset(current_node.dataset, child) 

344 child_node: _TreeNode = _make_tree_node(self._priority(child_abs_dataset), child_abs_dataset, grandchildren) 

345 if child_abs_dataset in self._datasets_set: 

346 heapq.heappush(self._priority_queue, child_node) # make it available for start of processing 

347 else: # it's an intermediate node that has no job attached; pass the enqueue operation 

348 stack.append(child_node) # ... recursively down the tree 

349 

350 def _complete_dataset_with_barriers(self, node: _TreeNode, no_skip: bool) -> None: 

351 """After successful completion, enqueues children, opens barriers, and propagates completion upwards. 

352 

353 The (more complex) algorithm below is for more general job scheduling, as in bzfs_jobrunner. Here, a "dataset" string 

354 is treated as an identifier for any kind of job rather than a reference to a concrete ZFS object. An example 

355 "dataset" job string is "src_host1/createsnapshots/replicate_to_hostA". Jobs can depend on another job via a 

356 parent/child relationship formed by '/' directory separators within the dataset string, and multiple "datasets" form 

357 a job dependency tree by way of common dataset directory prefixes. Jobs that do not depend on each other can be 

358 executed in parallel, and jobs can be told to first wait for other jobs to complete successfully. The algorithm is 

359 based on a barrier primitive and is typically disabled. It is only required for rare jobrunner configs. 

360 

361 For example, a job scheduler can specify that all parallel replications jobs to multiple destinations must succeed 

362 before the jobs of the pruning phase can start. More generally, with this algo, a job scheduler can specify that all 

363 jobs within a given job subtree (containing any nested combination of sequential and/or parallel jobs) must 

364 successfully complete before a certain other job within the job tree is started. This is specified via the barrier 

365 directory named by ``barrier_name`` (default '~'). An example is "src_host1/createsnapshots/~/prune". 

366 

367 Note that the default '~' is unambiguous as it is not a valid ZFS dataset name component per the naming rules 

368 enforced by the 'zfs create', 'zfs snapshot' and 'zfs bookmark' CLIs. Custom barrier names should avoid colliding 

369 with real dataset/job components. 

370 """ 

371 

372 def enqueue_children(node: _TreeNode) -> int: 

373 """Returns number of jobs that were added to priority_queue for immediate start of processing.""" 

374 n: int = 0 

375 children: _Tree = node.children 

376 for child, grandchildren in children.items(): 

377 abs_dataset: str = self._join_dataset(node.dataset, child) 

378 child_node: _TreeNode = _make_tree_node(self._priority(abs_dataset), abs_dataset, grandchildren, parent=node) 

379 k: int 

380 if child != self._barrier_name: 

381 if abs_dataset in self._datasets_set: 

382 # it's not a barrier; make job available for immediate start of processing 

383 heapq.heappush(self._priority_queue, child_node) 

384 k = 1 

385 else: # it's an intermediate node that has no job attached; pass the enqueue operation 

386 k = enqueue_children(child_node) # ... recursively down the tree 

387 elif len(children) == 1: # if the only child is a barrier then pass the enqueue operation 

388 k = enqueue_children(child_node) # ... recursively down the tree 

389 else: # park the barrier node within the (still closed) barrier for the time being 

390 assert node.mut.barrier is None 

391 node.mut.barrier = child_node 

392 k = 0 

393 node.mut.pending += min(1, k) 

394 n += k 

395 assert n >= 0 

396 return n 

397 

398 if no_skip: 

399 enqueue_children(node) # make child datasets available for start of processing 

400 else: # job completed without success 

401 # ... thus, opening the barrier shall always do nothing in node and its ancestors. 

402 # perf: Irrevocably mark (exactly once) barriers of this node and all its ancestors as cleared due to subtree 

403 # skip, via barriers_cleared=True. This enables to avoid redundant re-walking the ancestor chain on subsequent 

404 # skip. 

405 tmp: _TreeNode | None = node 

406 while (tmp is not None) and not tmp.mut.barriers_cleared: 

407 tmp.mut.barriers_cleared = True 

408 tmp.mut.barrier = self._empty_barrier 

409 tmp = tmp.parent 

410 assert node.mut.pending >= 0 

411 while node.mut.pending == 0: # have all jobs in subtree of current node completed? 

412 if no_skip: # ... if so open the barrier, if it exists, and enqueue jobs waiting on it 

413 if not (node.mut.barrier is None or node.mut.barrier is self._empty_barrier): 

414 node.mut.pending += min(1, enqueue_children(node.mut.barrier)) 

415 node.mut.barrier = self._empty_barrier 

416 if node.mut.pending > 0: # did opening of barrier cause jobs to be enqueued in subtree? 

417 break # ... if so we have not yet completed the subtree, so don't mark the subtree as completed yet 

418 if node.parent is None: 

419 break # we've reached the root node 

420 node = node.parent # recurse up the tree to propagate completion upward 

421 node.mut.pending -= 1 # mark subtree as completed 

422 assert node.mut.pending >= 0 

423 

424 def _join_dataset(self, parent: str, child: str) -> str: 

425 """Concatenates parent and child dataset names; accommodates synthetic root node; interns for memory footprint.""" 

426 return self._datasets_set.interned(f"{parent}{COMPONENT_SEPARATOR}{child}" if parent else child) 

427 

428 

429############################################################################# 

430@final 

431class _TreeNodeMutableAttributes: 

432 """Container for mutable attributes, stored space efficiently.""" 

433 

434 __slots__ = ("barrier", "barriers_cleared", "pending") # uses more compact memory layout than __dict__ 

435 

436 def __init__(self) -> None: 

437 self.barrier: _TreeNode | None = None # zero or one barrier TreeNode waiting for this node to complete 

438 self.pending: int = 0 # number of children added to priority queue that haven't completed their work yet 

439 self.barriers_cleared: bool = False # irrevocably mark barriers of this node and all its ancestors as cleared? 

440 

441 

442############################################################################# 

443@final 

444class _TreeNode(NamedTuple): 

445 """Node in dataset dependency tree used by the scheduler; _TreeNodes are ordered by priority and dataset name within a 

446 priority queue, via __lt__ comparisons.""" 

447 

448 priority: Comparable # determines the processing order once this dataset has become available for start of processing 

449 dataset: str # each dataset name is unique; attribs other than `priority` and `dataset` are never used for comparisons 

450 children: _Tree # dataset "directory" tree consists of nested dicts; aka dict[str, dict] 

451 parent: _TreeNode | None 

452 mut: _TreeNodeMutableAttributes 

453 

454 def __repr__(self) -> str: 

455 priority, dataset, pending, barrier = self.priority, self.dataset, self.mut.pending, self.mut.barrier 

456 return str({"priority": priority, "dataset": dataset, "pending": pending, "barrier": barrier is not None}) 

457 

458 

459def _make_tree_node(priority: Comparable, dataset: str, children: _Tree, parent: _TreeNode | None = None) -> _TreeNode: 

460 """Creates a TreeNode with mutable state container.""" 

461 return _TreeNode(priority, dataset, children, parent, _TreeNodeMutableAttributes()) 

462 

463 

464############################################################################# 

465_Tree = dict[str, "_Tree"] # Type alias 

466 

467 

468def _build_dataset_tree(sorted_datasets: list[str]) -> _Tree: 

469 """Takes as input a sorted list of datasets and returns a (reverse) sorted directory tree containing the same dataset 

470 names, in the form of nested dicts; This converts the dataset list into a dependency tree.""" 

471 tree: _Tree = {} 

472 interner: HashedInterner[str] = HashedInterner() # reduces memory footprint 

473 shared_empty_leaf: _Tree = {} # tree with shared empty leafs has ~30% lower memory footprint than non-compacted version 

474 

475 for dataset in reversed(sorted_datasets): 

476 current: _Tree = tree 

477 components: list[str] = dataset.split(COMPONENT_SEPARATOR) 

478 k: int = len(components) - 1 

479 for i, component in enumerate(components): 

480 child: _Tree | None = current.get(component) 

481 if child is None: 

482 child = {} if i < k else shared_empty_leaf # sharing is safe as the tree is treated as immutable henceforth 

483 assert current is not shared_empty_leaf 

484 component = interner.intern(component) 

485 current[component] = child 

486 current = child 

487 return tree