|
12 | 12 | # See LICENSE.rst for license information. |
13 | 13 | # |
14 | 14 | ############################################################################## |
15 | | - |
| 15 | +import shutil |
16 | 16 | from importlib.resources import as_file |
17 | 17 | from pathlib import Path |
18 | 18 | from typing import List, Union |
@@ -133,6 +133,129 @@ def available_examples(self) -> dict[str, List[tuple[str, Path]]]: |
133 | 133 | ) |
134 | 134 | return examples_dict |
135 | 135 |
|
| 136 | + def copy_examples( |
| 137 | + self, |
| 138 | + examples_to_copy: List[str], |
| 139 | + target_dir: Path = None, |
| 140 | + force: bool = False, |
| 141 | + ) -> None: |
| 142 | + """Copy examples or packs into the target or current working |
| 143 | + directory. |
| 144 | +
|
| 145 | + Parameters |
| 146 | + ---------- |
| 147 | + examples_to_copy : list of str |
| 148 | + User-specified pack(s), example(s), or "all" to copy all. |
| 149 | + target_dir : pathlib.Path, optional |
| 150 | + Target directory to copy examples into. Defaults to current |
| 151 | + working directory. |
| 152 | + force : bool, optional |
| 153 | + Defaults to ``False``. If ``True``, existing files are |
| 154 | + overwritten and directories are merged |
| 155 | + (extra files in the target are preserved). |
| 156 | + """ |
| 157 | + self._target_dir = target_dir.resolve() if target_dir else Path.cwd() |
| 158 | + self._force = force |
| 159 | + |
| 160 | + if "all" in examples_to_copy: |
| 161 | + self._copy_all() |
| 162 | + return |
| 163 | + |
| 164 | + for item in examples_to_copy: |
| 165 | + if item in self.available_examples(): |
| 166 | + self._copy_pack(item) |
| 167 | + elif self._is_example_name(item): |
| 168 | + self._copy_example(item) |
| 169 | + else: |
| 170 | + raise FileNotFoundError( |
| 171 | + f"No examples or packs found for input: '{item}'" |
| 172 | + ) |
| 173 | + del self._target_dir |
| 174 | + del self._force |
| 175 | + return |
| 176 | + |
| 177 | + def _copy_all(self): |
| 178 | + """Copy all packs and examples.""" |
| 179 | + for pack_name in self.available_examples(): |
| 180 | + self._copy_pack(pack_name) |
| 181 | + |
| 182 | + def _copy_pack(self, pack_name): |
| 183 | + """Copy all examples in a single pack.""" |
| 184 | + examples = self.available_examples().get(pack_name, []) |
| 185 | + for ex_name, ex_path in examples: |
| 186 | + self._copy_tree_to_target(pack_name, ex_name, ex_path) |
| 187 | + |
| 188 | + def _copy_example(self, example_name): |
| 189 | + """Copy a single example by its name.""" |
| 190 | + example_found = False |
| 191 | + for pack_name, examples in self.available_examples().items(): |
| 192 | + for ex_name, ex_path in examples: |
| 193 | + if ex_name == example_name: |
| 194 | + self._copy_tree_to_target(pack_name, ex_name, ex_path) |
| 195 | + example_found = True |
| 196 | + if not example_found: |
| 197 | + raise FileNotFoundError( |
| 198 | + f"No examples or packs found for input: '{example_name}'" |
| 199 | + ) |
| 200 | + |
| 201 | + def _is_example_name(self, name): |
| 202 | + """Return True if the given name matches any known example.""" |
| 203 | + for pack_name, examples in self.available_examples().items(): |
| 204 | + for example_name, _ in examples: |
| 205 | + if example_name == name: |
| 206 | + return True |
| 207 | + return False |
| 208 | + |
| 209 | + def _copy_tree_to_target(self, pack_name, example_name, example_origin): |
| 210 | + """Copy an example folder from source to the user's target |
| 211 | + directory.""" |
| 212 | + target_dir = self._target_dir / pack_name / example_name |
| 213 | + target_dir.parent.mkdir(parents=True, exist_ok=True) |
| 214 | + if target_dir.exists() and self._force: |
| 215 | + self._overwrite_example( |
| 216 | + example_origin, target_dir, pack_name, example_name |
| 217 | + ) |
| 218 | + return |
| 219 | + if target_dir.exists(): |
| 220 | + self._copy_missing_files(example_origin, target_dir) |
| 221 | + print( |
| 222 | + f"WARNING: Example '{pack_name}/{example_name}'" |
| 223 | + " already exists at the specified target directory. " |
| 224 | + "Existing files were left unchanged; " |
| 225 | + "new or missing files were copied. To overwrite everything, " |
| 226 | + "rerun with --force." |
| 227 | + ) |
| 228 | + return |
| 229 | + self._copy_new_example( |
| 230 | + example_origin, target_dir, pack_name, example_name |
| 231 | + ) |
| 232 | + |
| 233 | + def _overwrite_example( |
| 234 | + self, example_origin, target, pack_name, example_name |
| 235 | + ): |
| 236 | + """Delete target and copy example.""" |
| 237 | + shutil.rmtree(target) |
| 238 | + shutil.copytree(example_origin, target) |
| 239 | + print(f"Overwriting example '{pack_name}/{example_name}'.") |
| 240 | + |
| 241 | + def _copy_missing_files(self, example_origin, target): |
| 242 | + """Copy only files and directories that are missing in the |
| 243 | + target.""" |
| 244 | + for example_item in example_origin.rglob("*"): |
| 245 | + rel_path = example_item.relative_to(example_origin) |
| 246 | + target_item = target / rel_path |
| 247 | + if example_item.is_dir(): |
| 248 | + target_item.mkdir(parents=True, exist_ok=True) |
| 249 | + elif example_item.is_file() and not target_item.exists(): |
| 250 | + target_item.parent.mkdir(parents=True, exist_ok=True) |
| 251 | + shutil.copy2(example_item, target_item) |
| 252 | + |
| 253 | + def _copy_new_example( |
| 254 | + self, example_origin, target, pack_name, example_name |
| 255 | + ): |
| 256 | + shutil.copytree(example_origin, target) |
| 257 | + print(f"Copied example '{pack_name}/{example_name}'.") |
| 258 | + |
136 | 259 | def _resolve_pack_file(self, identifier: Union[str, Path]) -> Path: |
137 | 260 | """Resolve a pack identifier to an absolute .txt path. |
138 | 261 |
|
|
0 commit comments