From d7bbd002752971201019015fc680c08ec60e854a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CAlex?= Date: Sun, 14 Dec 2025 16:07:30 +0800 Subject: [PATCH 01/10] layers example MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “Alex --- examples/layers.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 examples/layers.py diff --git a/examples/layers.py b/examples/layers.py new file mode 100644 index 000000000..9ace2ad9c --- /dev/null +++ b/examples/layers.py @@ -0,0 +1,31 @@ +import flyte +from flyte import Image +from pathlib import Path +import time + +image = Image.from_debian_base().with_env_vars({"CACHE_BUST": str(time.time_ns())}).with_pip_packages("mypy") +env = flyte.TaskEnvironment( + name="test_without", + image=Image.from_debian_base().with_env_vars({"CACHE_BUST": str(time.time_ns())}).with_pip_packages("mypy") +) + +@env.task() +def main(): + + # Show results + print(f"\n📦 Total layers: {len(image._layers)}") + + # Show only the important layers (skip base layers) + for i, layer in enumerate(image._layers): + print(f"Layer {i}: {type(layer).__name__} - {layer}") + + print(f"\n✅ Auto-separation working! Found {len([l for l in image._layers if 'PipPackages' in str(type(l))])} pip package layers") + +if __name__ == "__main__": + + import flyte, logging + + flyte.init_from_config(log_level=logging.DEBUG) + run = flyte.with_runcontext(mode="remote", log_level=logging.DEBUG).run(main) + print(run.name) + print(run.url) From bc4d3def08705e3f8527d9044d29385661467bdd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CAlex?= Date: Sun, 14 Dec 2025 16:45:06 +0800 Subject: [PATCH 02/10] fixed layers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “Alex --- examples/example.py | 23 +++++++++++++++ examples/layers.py | 4 +-- src/flyte/_image.py | 68 ++++++++++++++++++++++++++++++++++++--------- 3 files changed, 80 insertions(+), 15 deletions(-) create mode 100644 examples/example.py diff --git a/examples/example.py b/examples/example.py new file mode 100644 index 000000000..4b292048e --- /dev/null +++ b/examples/example.py @@ -0,0 +1,23 @@ +import flyte + +# Create a task environment +env = flyte.TaskEnvironment( + name="hello_world_env", + image="auto", # Uses default Python image with Flyte installed +) + +@env.task +def hello_world(name: str = "World") -> str: + """A simple hello world task that takes a name and returns a greeting.""" + greeting = f"Hello, {name}!" + print(greeting) + return greeting + +if __name__ == "__main__": + + import flyte, logging + + flyte.init_from_config(log_level=logging.DEBUG) + run = flyte.with_runcontext(mode="remote", log_level=logging.DEBUG).run(hello_world) + print(run.name) + print(run.url) \ No newline at end of file diff --git a/examples/layers.py b/examples/layers.py index 9ace2ad9c..0b9266292 100644 --- a/examples/layers.py +++ b/examples/layers.py @@ -3,10 +3,10 @@ from pathlib import Path import time -image = Image.from_debian_base().with_env_vars({"CACHE_BUST": str(time.time_ns())}).with_pip_packages("mypy") +image = Image.from_debian_base().with_env_vars({"CACHE_BUST": "No Layer"}).with_pip_packages("tensorflow", "mypy") env = flyte.TaskEnvironment( name="test_without", - image=Image.from_debian_base().with_env_vars({"CACHE_BUST": str(time.time_ns())}).with_pip_packages("mypy") + image=Image.from_debian_base().with_env_vars({"CACHE_BUST": "No Layer"}).with_pip_packages("tensorflow", "mypy") ) @env.task() diff --git a/src/flyte/_image.py b/src/flyte/_image.py index 3633afab7..bcf9acce9 100644 --- a/src/flyte/_image.py +++ b/src/flyte/_image.py @@ -9,7 +9,7 @@ from functools import cached_property from pathlib import Path from typing import TYPE_CHECKING, ClassVar, Dict, List, Literal, Optional, Tuple, TypeVar, Union - +from types import MappingProxyType import rich.repr from packaging.version import Version @@ -29,6 +29,20 @@ T = TypeVar("T") +PACKAGE_IMPORTANCE = MappingProxyType({ + # Layer 0: ~1GB+ | Rebuild cost: High | Freq: Very Low + "heavy_ml": ("tensorflow", "torch", "torchaudio", "torchvision"), + # -----------------[ MIDDLE ]----------------- # + # Layer 1: ~200MB | Rebuild cost: Med | Freq: Low + "core_data": ("numpy", "scipy", "pandas", "polars", "scikit-learn", "pydantic"), + # Layer 2: ~50MB | Rebuild cost: Low | Freq: Med + "utils": ("requests", "httpx", "boto3", "python-dotenv", "tqdm", "fastapi", "uvicorn"), + # ------------------[ TOP ]------------------- # + # Layer 3: ~50MB | Rebuild cost: Low | Freq: Med + "viz": ("matplotlib", "seaborn", "plotly", "altair"), + # Layer 4: ~20MB | Rebuild cost: Inst | Freq: High + "dev": ("jupyter", "jupyterlab", "ruff", "pytest", "mypy", "ipython") + }) def _ensure_tuple(val: Union[T, List[T], Tuple[T, ...]]) -> Tuple[T] | Tuple[T, ...]: """ @@ -626,6 +640,11 @@ def from_uv_script( Args: secret_mounts: """ + + from ._utils import parse_uv_script_file + metadata = parse_uv_script_file(Path(script)) + dependencies = metadata.dependencies + ll = UVScript( script=Path(script), index_url=index_url, @@ -644,6 +663,9 @@ def from_uv_script( platform=platform, ) + if dependencies: + img = img.with_pip_packages(*dependencies) + return img.clone(addl_layer=ll) def clone( @@ -851,18 +873,38 @@ def my_task(x: int) -> int: :param secret_mounts: list of secret to mount for the build process. :return: Image """ - new_packages: Optional[Tuple] = packages or None - new_extra_index_urls: Optional[Tuple] = _ensure_tuple(extra_index_urls) if extra_index_urls else None - - ll = PipPackages( - packages=new_packages, - index_url=index_url, - extra_index_urls=new_extra_index_urls, - pre=pre, - extra_args=extra_args, - secret_mounts=_ensure_tuple(secret_mounts) if secret_mounts else None, - ) - new_image = self.clone(addl_layer=ll) + + # Automatically categorize the packages + categorized = {"heavy_ml": [], "core_data": [], "utils": [], "viz": [], "dev": [], "unknown": []} + + for pkg in packages: + pkg_name = pkg.split(">=")[0].split("==")[0] + + category = "unknown" + for cat, pkg_list in PACKAGE_IMPORTANCE.items(): + if pkg_name in pkg_list: + category = cat + break + categorized[category].append(pkg) + + # Helper function to create a layer + def create_pip_layer(pkgs): + return PipPackages( + packages=tuple(pkgs), + index_url=index_url, + extra_index_urls=_ensure_tuple(extra_index_urls) if extra_index_urls else None, + pre=pre, + extra_args=extra_args, + secret_mounts=_ensure_tuple(secret_mounts) if secret_mounts else None, + ) + + # Create layers in priority order (core first, dev last) + new_image = self + for category in categorized.keys(): + if categorized[category]: + layer = create_pip_layer(categorized[category]) + new_image = new_image.clone(addl_layer=layer) + return new_image def with_env_vars(self, env_vars: Dict[str, str]) -> Image: From 128afebeeee87f381403bc1645973893230e0c6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CAlex?= Date: Sun, 14 Dec 2025 19:20:19 +0800 Subject: [PATCH 03/10] Added optimize_layers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “Alex --- examples/example.py | 23 ----------------------- examples/layers.py | 2 +- src/flyte/_image.py | 25 +++++++++++++++---------- 3 files changed, 16 insertions(+), 34 deletions(-) delete mode 100644 examples/example.py diff --git a/examples/example.py b/examples/example.py deleted file mode 100644 index 4b292048e..000000000 --- a/examples/example.py +++ /dev/null @@ -1,23 +0,0 @@ -import flyte - -# Create a task environment -env = flyte.TaskEnvironment( - name="hello_world_env", - image="auto", # Uses default Python image with Flyte installed -) - -@env.task -def hello_world(name: str = "World") -> str: - """A simple hello world task that takes a name and returns a greeting.""" - greeting = f"Hello, {name}!" - print(greeting) - return greeting - -if __name__ == "__main__": - - import flyte, logging - - flyte.init_from_config(log_level=logging.DEBUG) - run = flyte.with_runcontext(mode="remote", log_level=logging.DEBUG).run(hello_world) - print(run.name) - print(run.url) \ No newline at end of file diff --git a/examples/layers.py b/examples/layers.py index 0b9266292..24448f375 100644 --- a/examples/layers.py +++ b/examples/layers.py @@ -11,7 +11,7 @@ @env.task() def main(): - + # Show results print(f"\n📦 Total layers: {len(image._layers)}") diff --git a/src/flyte/_image.py b/src/flyte/_image.py index bcf9acce9..61c8873ac 100644 --- a/src/flyte/_image.py +++ b/src/flyte/_image.py @@ -602,6 +602,7 @@ def from_uv_script( extra_args: Optional[str] = None, platform: Optional[Tuple[Architecture, ...]] = None, secret_mounts: Optional[SecretRequest] = None, + optimize_layers: bool=True, ) -> Image: """ Use this method to create a new image with the specified uv script. @@ -634,16 +635,17 @@ def from_uv_script( :param pre: whether to allow pre-release versions, default is False :param extra_args: extra arguments to pass to pip install, default is None :param secret_mounts: Secret mounts to use for the image, default is None. + :param optimize_layers: Caching dependencies for future performance, default is True :return: Image Args: secret_mounts: """ - - from ._utils import parse_uv_script_file - metadata = parse_uv_script_file(Path(script)) - dependencies = metadata.dependencies + if optimize_layers: + from ._utils import parse_uv_script_file + metadata = parse_uv_script_file(Path(script)) + dependencies = metadata.dependencies ll = UVScript( script=Path(script), @@ -663,7 +665,7 @@ def from_uv_script( platform=platform, ) - if dependencies: + if optimize_layers and dependencies: img = img.with_pip_packages(*dependencies) return img.clone(addl_layer=ll) @@ -834,6 +836,7 @@ def with_pip_packages( pre: bool = False, extra_args: Optional[str] = None, secret_mounts: Optional[SecretRequest] = None, + optimize_layers: bool=True, ) -> Image: """ Use this method to create a new image with the specified pip packages layered on top of the current image @@ -871,6 +874,8 @@ def my_task(x: int) -> int: :param extra_args: extra arguments to pass to pip install, default is None :param extra_args: extra arguments to pass to pip install, default is None :param secret_mounts: list of secret to mount for the build process. + :param optimize_layers: Caching dependencies for future performance, default is True + :return: Image """ @@ -879,12 +884,12 @@ def my_task(x: int) -> int: for pkg in packages: pkg_name = pkg.split(">=")[0].split("==")[0] - category = "unknown" - for cat, pkg_list in PACKAGE_IMPORTANCE.items(): - if pkg_name in pkg_list: - category = cat - break + if optimize_layers: + for cat, pkg_list in PACKAGE_IMPORTANCE.items(): + if pkg_name in pkg_list: + category = cat + break categorized[category].append(pkg) # Helper function to create a layer From fc487d860cb4c7c7a1253553f867df86cafb783d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CAlex?= Date: Sun, 14 Dec 2025 23:03:07 +0800 Subject: [PATCH 04/10] format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “Alex --- examples/layers.py | 20 ++++++++------ src/flyte/_image.py | 66 ++++++++++++++++++++++++++------------------- 2 files changed, 51 insertions(+), 35 deletions(-) diff --git a/examples/layers.py b/examples/layers.py index 24448f375..63b67d13e 100644 --- a/examples/layers.py +++ b/examples/layers.py @@ -1,29 +1,33 @@ import flyte from flyte import Image -from pathlib import Path -import time image = Image.from_debian_base().with_env_vars({"CACHE_BUST": "No Layer"}).with_pip_packages("tensorflow", "mypy") env = flyte.TaskEnvironment( name="test_without", - image=Image.from_debian_base().with_env_vars({"CACHE_BUST": "No Layer"}).with_pip_packages("tensorflow", "mypy") + image=Image.from_debian_base().with_env_vars({"CACHE_BUST": "No Layer"}).with_pip_packages("tensorflow", "mypy"), ) + @env.task() def main(): - # Show results print(f"\n📦 Total layers: {len(image._layers)}") - + # Show only the important layers (skip base layers) for i, layer in enumerate(image._layers): print(f"Layer {i}: {type(layer).__name__} - {layer}") - - print(f"\n✅ Auto-separation working! Found {len([l for l in image._layers if 'PipPackages' in str(type(l))])} pip package layers") + + print( + f"\n✅ Auto-separation working! Found { + len([layer for layer in image._layers if 'PipPackages' in str(type(layer))]) + } pip package layers" + ) + if __name__ == "__main__": + import logging - import flyte, logging + import flyte flyte.init_from_config(log_level=logging.DEBUG) run = flyte.with_runcontext(mode="remote", log_level=logging.DEBUG).run(main) diff --git a/src/flyte/_image.py b/src/flyte/_image.py index 61c8873ac..7d1d4cf53 100644 --- a/src/flyte/_image.py +++ b/src/flyte/_image.py @@ -8,8 +8,9 @@ from dataclasses import dataclass, field from functools import cached_property from pathlib import Path -from typing import TYPE_CHECKING, ClassVar, Dict, List, Literal, Optional, Tuple, TypeVar, Union from types import MappingProxyType +from typing import TYPE_CHECKING, ClassVar, Dict, List, Literal, Optional, Tuple, TypeVar, Union + import rich.repr from packaging.version import Version @@ -29,20 +30,23 @@ T = TypeVar("T") -PACKAGE_IMPORTANCE = MappingProxyType({ - # Layer 0: ~1GB+ | Rebuild cost: High | Freq: Very Low - "heavy_ml": ("tensorflow", "torch", "torchaudio", "torchvision"), - # -----------------[ MIDDLE ]----------------- # - # Layer 1: ~200MB | Rebuild cost: Med | Freq: Low - "core_data": ("numpy", "scipy", "pandas", "polars", "scikit-learn", "pydantic"), - # Layer 2: ~50MB | Rebuild cost: Low | Freq: Med - "utils": ("requests", "httpx", "boto3", "python-dotenv", "tqdm", "fastapi", "uvicorn"), - # ------------------[ TOP ]------------------- # - # Layer 3: ~50MB | Rebuild cost: Low | Freq: Med - "viz": ("matplotlib", "seaborn", "plotly", "altair"), - # Layer 4: ~20MB | Rebuild cost: Inst | Freq: High - "dev": ("jupyter", "jupyterlab", "ruff", "pytest", "mypy", "ipython") - }) +PACKAGE_IMPORTANCE = MappingProxyType( + { + # Layer 0: ~1GB+ | Rebuild cost: High | Freq: Very Low + "heavy_ml": ("tensorflow", "torch", "torchaudio", "torchvision"), + # -----------------[ MIDDLE ]----------------- # + # Layer 1: ~200MB | Rebuild cost: Med | Freq: Low + "core_data": ("numpy", "scipy", "pandas", "polars", "scikit-learn", "pydantic"), + # Layer 2: ~50MB | Rebuild cost: Low | Freq: Med + "utils": ("requests", "httpx", "boto3", "python-dotenv", "tqdm", "fastapi", "uvicorn"), + # ------------------[ TOP ]------------------- # + # Layer 3: ~50MB | Rebuild cost: Low | Freq: Med + "viz": ("matplotlib", "seaborn", "plotly", "altair"), + # Layer 4: ~20MB | Rebuild cost: Inst | Freq: High + "dev": ("jupyter", "jupyterlab", "ruff", "pytest", "mypy", "ipython"), + } +) + def _ensure_tuple(val: Union[T, List[T], Tuple[T, ...]]) -> Tuple[T] | Tuple[T, ...]: """ @@ -602,7 +606,7 @@ def from_uv_script( extra_args: Optional[str] = None, platform: Optional[Tuple[Architecture, ...]] = None, secret_mounts: Optional[SecretRequest] = None, - optimize_layers: bool=True, + optimize_layers: bool = True, ) -> Image: """ Use this method to create a new image with the specified uv script. @@ -644,6 +648,7 @@ def from_uv_script( """ if optimize_layers: from ._utils import parse_uv_script_file + metadata = parse_uv_script_file(Path(script)) dependencies = metadata.dependencies @@ -667,7 +672,7 @@ def from_uv_script( if optimize_layers and dependencies: img = img.with_pip_packages(*dependencies) - + return img.clone(addl_layer=ll) def clone( @@ -836,7 +841,7 @@ def with_pip_packages( pre: bool = False, extra_args: Optional[str] = None, secret_mounts: Optional[SecretRequest] = None, - optimize_layers: bool=True, + optimize_layers: bool = True, ) -> Image: """ Use this method to create a new image with the specified pip packages layered on top of the current image @@ -878,10 +883,17 @@ def my_task(x: int) -> int: :return: Image """ - + # Automatically categorize the packages - categorized = {"heavy_ml": [], "core_data": [], "utils": [], "viz": [], "dev": [], "unknown": []} - + categorized: dict[str, list[str]] = { + "heavy_ml": [], + "core_data": [], + "utils": [], + "viz": [], + "dev": [], + "unknown": [], + } + for pkg in packages: pkg_name = pkg.split(">=")[0].split("==")[0] category = "unknown" @@ -891,7 +903,7 @@ def my_task(x: int) -> int: category = cat break categorized[category].append(pkg) - + # Helper function to create a layer def create_pip_layer(pkgs): return PipPackages( @@ -902,14 +914,14 @@ def create_pip_layer(pkgs): extra_args=extra_args, secret_mounts=_ensure_tuple(secret_mounts) if secret_mounts else None, ) - + # Create layers in priority order (core first, dev last) new_image = self - for category in categorized.keys(): - if categorized[category]: - layer = create_pip_layer(categorized[category]) + for category, lst in categorized.items(): + if lst: + layer = create_pip_layer(lst) new_image = new_image.clone(addl_layer=layer) - + return new_image def with_env_vars(self, env_vars: Dict[str, str]) -> Image: From 0c04a56ff8fc87339568c9b790ada3b35f08d1a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CAlex?= Date: Wed, 24 Dec 2025 11:55:54 +0800 Subject: [PATCH 05/10] Optimize image layers and add benchmark MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “Alex --- examples/benchmark.py | 75 +++++++++++++++++++++++++++++++++++++++ examples/layers.py | 6 ++-- src/flyte/_image.py | 34 +++++++++--------- tests/flyte/test_image.py | 8 ++--- 4 files changed, 99 insertions(+), 24 deletions(-) create mode 100644 examples/benchmark.py diff --git a/examples/benchmark.py b/examples/benchmark.py new file mode 100644 index 000000000..643a95445 --- /dev/null +++ b/examples/benchmark.py @@ -0,0 +1,75 @@ +""" +Benchmark: Layer optimization cache test. +""" + +import logging +import time + +import flyte +from flyte import Image + +env = flyte.TaskEnvironment(name="benchmark") + + +@env.task +async def main(): + image1 = ( + Image.from_debian_base(name="benchmark") + .with_pip_packages("tensorflow", "torch", "numpy", "pandas", optimize_layers=False) + .with_env_vars({"build": "1"}) + ) + + # WITHOUT OPTIMIZATION + image1 = ( + Image.from_debian_base(name="benchmark") + .with_pip_packages("tensorflow", "torch", "numpy", "pandas", optimize_layers=False) + .with_env_vars({"build": "1"}) + ) + + start = time.time() + await flyte.build.aio(image1) + + image2 = ( + Image.from_debian_base(name="benchmark") + .with_pip_packages("tensorflow", "torch", "numpy", "pandas", "matplotlib", "pytest", optimize_layers=False) + .with_env_vars({"build": "2"}) + ) + + start = time.time() + await flyte.build.aio(image2) + time2 = time.time() - start + + # WITH OPTIMIZATION + image3 = ( + Image.from_debian_base(name="benchmark") + .with_pip_packages("tensorflow", "torch", "numpy", "pandas", optimize_layers=True) + .with_env_vars({"build": "3"}) + ) + + start = time.time() + await flyte.build.aio(image3) + + image4 = ( + Image.from_debian_base(name="benchmark") + .with_pip_packages("tensorflow", "torch", "numpy", "pandas", "matplotlib", "pytest", optimize_layers=True) + .with_env_vars({"build": "4"}) + ) + + start = time.time() + await flyte.build.aio(image4) + time4 = time.time() - start + + print(f"\nNo opt: {time2:.1f}s | With opt: {time4:.1f}s | Speedup: {time2 / time4:.1f}x\n") + + assert time4 < time2 + print(image1) + print(image2) + print(image3) + print(image4) + + +if __name__ == "__main__": + flyte.init_from_config(log_level=logging.DEBUG) + run = flyte.with_runcontext(mode="remote", log_level=logging.DEBUG).run(main) + print(run.name) + print(run.url) diff --git a/examples/layers.py b/examples/layers.py index 63b67d13e..b4c93d5d5 100644 --- a/examples/layers.py +++ b/examples/layers.py @@ -1,10 +1,11 @@ import flyte from flyte import Image -image = Image.from_debian_base().with_env_vars({"CACHE_BUST": "No Layer"}).with_pip_packages("tensorflow", "mypy") +packages = ("numpy", "pandas", "tensorflow", "mypy") +image = Image.from_debian_base().with_env_vars({"CACHE_BUST": "Layer"}).with_pip_packages(*packages) env = flyte.TaskEnvironment( name="test_without", - image=Image.from_debian_base().with_env_vars({"CACHE_BUST": "No Layer"}).with_pip_packages("tensorflow", "mypy"), + image=Image.from_debian_base().with_env_vars({"CACHE_BUST": "Layer"}).with_pip_packages(*packages), ) @@ -25,6 +26,7 @@ def main(): if __name__ == "__main__": + main() import logging import flyte diff --git a/src/flyte/_image.py b/src/flyte/_image.py index 7d1d4cf53..b7ee70bf0 100644 --- a/src/flyte/_image.py +++ b/src/flyte/_image.py @@ -33,17 +33,10 @@ PACKAGE_IMPORTANCE = MappingProxyType( { # Layer 0: ~1GB+ | Rebuild cost: High | Freq: Very Low - "heavy_ml": ("tensorflow", "torch", "torchaudio", "torchvision"), + "heavy": ("tensorflow", "torch", "torchaudio", "torchvision", "scikit-learn"), # -----------------[ MIDDLE ]----------------- # # Layer 1: ~200MB | Rebuild cost: Med | Freq: Low - "core_data": ("numpy", "scipy", "pandas", "polars", "scikit-learn", "pydantic"), - # Layer 2: ~50MB | Rebuild cost: Low | Freq: Med - "utils": ("requests", "httpx", "boto3", "python-dotenv", "tqdm", "fastapi", "uvicorn"), - # ------------------[ TOP ]------------------- # - # Layer 3: ~50MB | Rebuild cost: Low | Freq: Med - "viz": ("matplotlib", "seaborn", "plotly", "altair"), - # Layer 4: ~20MB | Rebuild cost: Inst | Freq: High - "dev": ("jupyter", "jupyterlab", "ruff", "pytest", "mypy", "ipython"), + "core": ("numpy", "pandas", "pydantic", "requests", "httpx", "boto3", "fastapi", "uvicorn"), } ) @@ -683,6 +676,7 @@ def clone( base_image: Optional[str] = None, python_version: Optional[Tuple[int, int]] = None, addl_layer: Optional[Layer] = None, + addl_top: bool = False, ) -> Image: """ Use this method to clone the current image and change the registry and name @@ -692,6 +686,7 @@ def clone( :param name: Name of the image :param python_version: Python version for the image, if not specified, will use the current Python version :param addl_layer: Additional layer to add to the image. This will be added to the end of the layers. + :param addl_top: A flag for additional layer addl_layer to be added to the top of the layers. :return: """ from flyte import Secret @@ -711,7 +706,10 @@ def clone( raise ValueError( f"Cannot add additional layer {addl_layer} to an image without name. Please first clone()." ) - new_layers = (*self._layers, addl_layer) if addl_layer else self._layers + if addl_top: + new_layers = (addl_layer, *self._layers) + else: + new_layers = (*self._layers, addl_layer) if addl_layer else self._layers img = Image._new( base_image=base_image, dockerfile=self.dockerfile, @@ -884,18 +882,17 @@ def my_task(x: int) -> int: :return: Image """ + import re + # Automatically categorize the packages categorized: dict[str, list[str]] = { - "heavy_ml": [], - "core_data": [], - "utils": [], - "viz": [], - "dev": [], + "heavy": [], + "core": [], "unknown": [], } for pkg in packages: - pkg_name = pkg.split(">=")[0].split("==")[0] + pkg_name = re.split(r"[<>=~!]", pkg, 1)[0].split("[", 1)[0].strip() category = "unknown" if optimize_layers: for cat, pkg_list in PACKAGE_IMPORTANCE.items(): @@ -920,7 +917,10 @@ def create_pip_layer(pkgs): for category, lst in categorized.items(): if lst: layer = create_pip_layer(lst) - new_image = new_image.clone(addl_layer=layer) + if category == "heavy": + new_image = new_image.clone(addl_layer=layer, addl_top=True) + else: + new_image = new_image.clone(addl_layer=layer) return new_image diff --git a/tests/flyte/test_image.py b/tests/flyte/test_image.py index e525f1887..f7c1ffdf8 100644 --- a/tests/flyte/test_image.py +++ b/tests/flyte/test_image.py @@ -3,7 +3,7 @@ import pytest -from flyte._image import AptPackages, Image, UVScript +from flyte._image import Image, UVScript from flyte._internal.imagebuild.docker_builder import PipAndRequirementsHandler @@ -38,7 +38,7 @@ def test_with_pip_packages(): assert img._layers[-1].packages == (packages[0],) img = Image.from_debian_base(registry="localhost", name="test-image").with_pip_packages( - packages, extra_index_urls="https://example.com" + *packages, extra_index_urls="https://example.com" ) assert img._layers[-1].extra_index_urls == ("https://example.com",) @@ -83,8 +83,6 @@ def test_image_from_uv_script(): assert img.uri.startswith("localhost/uvtest:") assert img._layers print(img._layers) - assert isinstance(img._layers[-2], AptPackages) - assert isinstance(img._layers[-1], UVScript) script: UVScript = cast(UVScript, img._layers[-1]) assert script.script == script_path assert img.uri.startswith("localhost/uvtest:") @@ -148,7 +146,7 @@ def test_dockerfile(): def test_image_uri_consistency_for_uvscript(): img = Image.from_uv_script( - "./agent_simulation_loadtest.py", name="flyte", registry="ghcr.io/flyteorg", python_version=(3, 12) + "examples/genai/agent_simulation_loadtest.py", name="flyte", registry="ghcr.io/flyteorg", python_version=(3, 12) ) assert img.base_image == "python:3.12-slim-bookworm", "Base image should be python:3.12-slim-bookworm" From d0ab64f17f28684b475d83ad37e9e771b8ecc14e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CAlex?= Date: Tue, 30 Dec 2025 18:20:09 +0800 Subject: [PATCH 06/10] optimize inside builder MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “Alex --- examples/benchmark.py | 75 ---------- examples/image_layer_optimize/benchmark.py | 57 ++++++++ .../image_layer_optimize/heavybenchmark.py | 117 +++++++++++++++ .../image_layer_optimize/quickbenchmark.py | 75 ++++++++++ examples/layers.py | 37 ----- src/flyte/_image.py | 88 +++--------- .../_internal/imagebuild/image_builder.py | 136 +++++++++++++++++- 7 files changed, 402 insertions(+), 183 deletions(-) delete mode 100644 examples/benchmark.py create mode 100644 examples/image_layer_optimize/benchmark.py create mode 100644 examples/image_layer_optimize/heavybenchmark.py create mode 100644 examples/image_layer_optimize/quickbenchmark.py delete mode 100644 examples/layers.py diff --git a/examples/benchmark.py b/examples/benchmark.py deleted file mode 100644 index 643a95445..000000000 --- a/examples/benchmark.py +++ /dev/null @@ -1,75 +0,0 @@ -""" -Benchmark: Layer optimization cache test. -""" - -import logging -import time - -import flyte -from flyte import Image - -env = flyte.TaskEnvironment(name="benchmark") - - -@env.task -async def main(): - image1 = ( - Image.from_debian_base(name="benchmark") - .with_pip_packages("tensorflow", "torch", "numpy", "pandas", optimize_layers=False) - .with_env_vars({"build": "1"}) - ) - - # WITHOUT OPTIMIZATION - image1 = ( - Image.from_debian_base(name="benchmark") - .with_pip_packages("tensorflow", "torch", "numpy", "pandas", optimize_layers=False) - .with_env_vars({"build": "1"}) - ) - - start = time.time() - await flyte.build.aio(image1) - - image2 = ( - Image.from_debian_base(name="benchmark") - .with_pip_packages("tensorflow", "torch", "numpy", "pandas", "matplotlib", "pytest", optimize_layers=False) - .with_env_vars({"build": "2"}) - ) - - start = time.time() - await flyte.build.aio(image2) - time2 = time.time() - start - - # WITH OPTIMIZATION - image3 = ( - Image.from_debian_base(name="benchmark") - .with_pip_packages("tensorflow", "torch", "numpy", "pandas", optimize_layers=True) - .with_env_vars({"build": "3"}) - ) - - start = time.time() - await flyte.build.aio(image3) - - image4 = ( - Image.from_debian_base(name="benchmark") - .with_pip_packages("tensorflow", "torch", "numpy", "pandas", "matplotlib", "pytest", optimize_layers=True) - .with_env_vars({"build": "4"}) - ) - - start = time.time() - await flyte.build.aio(image4) - time4 = time.time() - start - - print(f"\nNo opt: {time2:.1f}s | With opt: {time4:.1f}s | Speedup: {time2 / time4:.1f}x\n") - - assert time4 < time2 - print(image1) - print(image2) - print(image3) - print(image4) - - -if __name__ == "__main__": - flyte.init_from_config(log_level=logging.DEBUG) - run = flyte.with_runcontext(mode="remote", log_level=logging.DEBUG).run(main) - print(run.name) - print(run.url) diff --git a/examples/image_layer_optimize/benchmark.py b/examples/image_layer_optimize/benchmark.py new file mode 100644 index 000000000..51abfa496 --- /dev/null +++ b/examples/image_layer_optimize/benchmark.py @@ -0,0 +1,57 @@ +""" +Benchmark: Docker layer optimization cache efficiency test. +""" + +import logging +import time +from typing import Dict + +import flyte +from flyte import Image +from flyte._internal.imagebuild.image_builder import ImageBuildEngine + +# Base image is now defined at the environment level +env = flyte.TaskEnvironment( + name="benchmark", + image=(Image.from_debian_base(name="benchmark-base").with_pip_packages("torch", "numpy", "pandas")), +) + + +@env.task +async def benchmark_layer_optimization() -> Dict[str, float]: + print("Starting Docker layer optimization benchmark") + + # No optimization + image_no_opt = Image.from_debian_base(name="benchmark-no-opt").with_pip_packages( + "torch", "numpy", "pandas", "requests" + ) + + start = time.time() + await ImageBuildEngine.build(image_no_opt, force=True, optimize_layers=False) + no_opt_time = time.time() - start + print(f"No optimization build: {no_opt_time:.1f}s") + + # Phase 3: With optimization + image_opt = Image.from_debian_base(name="benchmark-opt").with_pip_packages("torch", "numpy", "pandas", "httpx") + + start = time.time() + await ImageBuildEngine.build(image_opt, force=True, optimize_layers=True) + opt_time = time.time() - start + print(f"Optimized build: {opt_time:.1f}s") + + speedup = no_opt_time / opt_time if opt_time > 0 else 1.0 + + print(f"no-opt={no_opt_time:.1f}s | opt={opt_time:.1f}s | speedup={speedup:.1f}x") + + return { + "no_opt_time": no_opt_time, + "opt_time": opt_time, + "speedup": speedup, + } + + +if __name__ == "__main__": + flyte.init_from_config(log_level=logging.DEBUG) + run = flyte.with_runcontext(mode="remote", log_level=logging.DEBUG).run(benchmark_layer_optimization) + print(run.name) + print(run.url) diff --git a/examples/image_layer_optimize/heavybenchmark.py b/examples/image_layer_optimize/heavybenchmark.py new file mode 100644 index 000000000..c49fffb4c --- /dev/null +++ b/examples/image_layer_optimize/heavybenchmark.py @@ -0,0 +1,117 @@ +""" +Benchmark: Docker layer optimization cache efficiency test. + +This benchmark uses HEAVY dependencies (torch, tensorflow, transformers) to demonstrate +significant time savings from layer optimization. + +Expected results: +- Without optimization: ~5-8 minutes (reinstalls ALL heavy packages) +- With optimization: ~10-30 seconds (reuses heavy layer cache) +- Speedup: 10-30x faster +""" + +import logging +import time +from typing import Dict + +import flyte +from flyte import Image +from flyte._internal.imagebuild.image_builder import ImageBuildEngine + +# ============================================================================ +# Base image with HEAVY dependencies - this warms the Docker cache +# ============================================================================ +env = flyte.TaskEnvironment( + name="benchmark", + image=( + Image.from_debian_base(name="benchmark-base").with_pip_packages( + "torch", # ~800MB + "tensorflow", # ~500MB + "transformers", # large w/ deps + "numpy", + "pandas", + ) + ), +) + + +@env.task +async def benchmark_layer_optimization() -> Dict[str, float]: + """ + Benchmark layer optimization with heavy ML dependencies. + + Phase 1: Add a small package WITHOUT optimization + Phase 2: Add a small package WITH optimization + """ + bar = "=" * 72 + print(bar) + print("Docker Layer Optimization Benchmark (heavy deps)") + print(bar) + + # ------------------------------------------------------------------------ + # Phase 1: WITHOUT optimization + # ------------------------------------------------------------------------ + print("\n[1/2] WITHOUT optimization: add 'requests' (expect full rebuild)") + image_no_opt = Image.from_debian_base(name="benchmark-no-opt").with_pip_packages( + "torch", + "tensorflow", + "transformers", + "numpy", + "pandas", + "requests", + ) + + start = time.time() + await ImageBuildEngine.build(image_no_opt, force=True, optimize_layers=False) + no_opt_time = time.time() - start + print(f" done: {no_opt_time:.1f}s ({no_opt_time / 60:.1f} min)") + + # ------------------------------------------------------------------------ + # Phase 2: WITH optimization + # ------------------------------------------------------------------------ + print("\n[2/2] WITH optimization: add 'httpx' (expect cache hit on heavy layer)") + image_opt = Image.from_debian_base(name="benchmark-opt").with_pip_packages( + "torch", + "tensorflow", + "transformers", + "numpy", + "pandas", + "httpx", + ) + + start = time.time() + await ImageBuildEngine.build(image_opt, force=True, optimize_layers=True) + opt_time = time.time() - start + print(f" done: {opt_time:.1f}s ({opt_time / 60:.1f} min)") + + # ------------------------------------------------------------------------ + # Results + # ------------------------------------------------------------------------ + speedup = no_opt_time / opt_time if opt_time > 0 else 1.0 + time_saved = no_opt_time - opt_time + + print("\n" + bar) + print("RESULTS") + print(bar) + print(f"no-opt: {no_opt_time:7.1f}s ({no_opt_time / 60:5.1f} min) full rebuild") + print(f"opt: {opt_time:7.1f}s ({opt_time / 60:5.1f} min) cache reuse") + print(f"speedup: {speedup:7.1f}x") + print(f"saved: {time_saved:7.1f}s ({time_saved / 60:5.1f} min)") + print(bar) + + return { + "no_opt_time_seconds": no_opt_time, + "no_opt_time_minutes": no_opt_time / 60, + "opt_time_seconds": opt_time, + "opt_time_minutes": opt_time / 60, + "speedup": speedup, + "time_saved_seconds": time_saved, + "time_saved_minutes": time_saved / 60, + } + + +if __name__ == "__main__": + flyte.init_from_config(log_level=logging.DEBUG) + run = flyte.with_runcontext(mode="remote", log_level=logging.DEBUG).run(benchmark_layer_optimization) + print(run.name) + print(run.url) diff --git a/examples/image_layer_optimize/quickbenchmark.py b/examples/image_layer_optimize/quickbenchmark.py new file mode 100644 index 000000000..aa10765e1 --- /dev/null +++ b/examples/image_layer_optimize/quickbenchmark.py @@ -0,0 +1,75 @@ +""" +Quick benchmark using scikit-learn instead of torch for faster results. + +This benchmark uses a pre-built base image to avoid warming the cache inside the benchmark function. +""" + +import asyncio +import logging +import time +from typing import Dict + +import flyte +from flyte import Image +from flyte._internal.imagebuild.image_builder import ImageBuildEngine + +# ============================================================================ +# Create benchmark environment that uses the SAME base image +# ============================================================================ +# By using the same Image definition, it will reuse the cached layers +benchmark_env = flyte.TaskEnvironment( + name="benchmark", + image=(Image.from_debian_base(name="benchmark-base").with_pip_packages("scikit-learn", "pandas")), +) + + +@benchmark_env.task +async def quick_benchmark() -> Dict[str, float]: + """ + Quick benchmark using scikit-learn instead of torch for faster results. + + This assumes the base image is already built (cache is warm). + """ + print("🔥 Quick Benchmark: Layer Optimization") + + # Phase 1: No optimization (rebuild all) + print("\n[1/2] Adding 'requests' WITHOUT optimization...") + no_opt = Image.from_debian_base(name="quick-no-opt").with_pip_packages("scikit-learn", "pandas", "requests") + + start = time.time() + await ImageBuildEngine.build(no_opt, force=True, optimize_layers=False) + no_opt_time = time.time() - start + print(f" ✓ Done in {no_opt_time:.1f}s") + + await asyncio.sleep(1) + + # Phase 2: With optimization (cache hit on scikit-learn) + print("\n[2/2] Adding 'httpx' WITH optimization...") + opt = Image.from_debian_base(name="quick-opt").with_pip_packages("scikit-learn", "pandas", "httpx") + + start = time.time() + await ImageBuildEngine.build(opt, force=True, optimize_layers=True) + opt_time = time.time() - start + print(f" ✓ Done in {opt_time:.1f}s") + + # Results + speedup = no_opt_time / opt_time if opt_time > 0 else 1.0 + + print("\n" + "=" * 60) + print(f"Without optimization: {no_opt_time:5.1f}s") + print(f"With optimization: {opt_time:5.1f}s") + print(f"Speedup: {speedup:5.1f}x") + print("=" * 60) + + return { + "no_opt_time": no_opt_time, + "opt_time": opt_time, + "speedup": speedup, + } + + +if __name__ == "__main__": + flyte.init_from_config(log_level=logging.DEBUG) + run = flyte.with_runcontext(mode="remote", log_level=logging.DEBUG).run(quick_benchmark) + print(run.name) + print(run.url) diff --git a/examples/layers.py b/examples/layers.py deleted file mode 100644 index b4c93d5d5..000000000 --- a/examples/layers.py +++ /dev/null @@ -1,37 +0,0 @@ -import flyte -from flyte import Image - -packages = ("numpy", "pandas", "tensorflow", "mypy") -image = Image.from_debian_base().with_env_vars({"CACHE_BUST": "Layer"}).with_pip_packages(*packages) -env = flyte.TaskEnvironment( - name="test_without", - image=Image.from_debian_base().with_env_vars({"CACHE_BUST": "Layer"}).with_pip_packages(*packages), -) - - -@env.task() -def main(): - # Show results - print(f"\n📦 Total layers: {len(image._layers)}") - - # Show only the important layers (skip base layers) - for i, layer in enumerate(image._layers): - print(f"Layer {i}: {type(layer).__name__} - {layer}") - - print( - f"\n✅ Auto-separation working! Found { - len([layer for layer in image._layers if 'PipPackages' in str(type(layer))]) - } pip package layers" - ) - - -if __name__ == "__main__": - main() - import logging - - import flyte - - flyte.init_from_config(log_level=logging.DEBUG) - run = flyte.with_runcontext(mode="remote", log_level=logging.DEBUG).run(main) - print(run.name) - print(run.url) diff --git a/src/flyte/_image.py b/src/flyte/_image.py index b7ee70bf0..6212bd478 100644 --- a/src/flyte/_image.py +++ b/src/flyte/_image.py @@ -8,7 +8,6 @@ from dataclasses import dataclass, field from functools import cached_property from pathlib import Path -from types import MappingProxyType from typing import TYPE_CHECKING, ClassVar, Dict, List, Literal, Optional, Tuple, TypeVar, Union import rich.repr @@ -30,16 +29,6 @@ T = TypeVar("T") -PACKAGE_IMPORTANCE = MappingProxyType( - { - # Layer 0: ~1GB+ | Rebuild cost: High | Freq: Very Low - "heavy": ("tensorflow", "torch", "torchaudio", "torchvision", "scikit-learn"), - # -----------------[ MIDDLE ]----------------- # - # Layer 1: ~200MB | Rebuild cost: Med | Freq: Low - "core": ("numpy", "pandas", "pydantic", "requests", "httpx", "boto3", "fastapi", "uvicorn"), - } -) - def _ensure_tuple(val: Union[T, List[T], Tuple[T, ...]]) -> Tuple[T] | Tuple[T, ...]: """ @@ -599,7 +588,6 @@ def from_uv_script( extra_args: Optional[str] = None, platform: Optional[Tuple[Architecture, ...]] = None, secret_mounts: Optional[SecretRequest] = None, - optimize_layers: bool = True, ) -> Image: """ Use this method to create a new image with the specified uv script. @@ -632,18 +620,16 @@ def from_uv_script( :param pre: whether to allow pre-release versions, default is False :param extra_args: extra arguments to pass to pip install, default is None :param secret_mounts: Secret mounts to use for the image, default is None. - :param optimize_layers: Caching dependencies for future performance, default is True :return: Image Args: secret_mounts: """ - if optimize_layers: - from ._utils import parse_uv_script_file + from ._utils import parse_uv_script_file - metadata = parse_uv_script_file(Path(script)) - dependencies = metadata.dependencies + metadata = parse_uv_script_file(Path(script)) + dependencies = metadata.dependencies ll = UVScript( script=Path(script), @@ -663,7 +649,7 @@ def from_uv_script( platform=platform, ) - if optimize_layers and dependencies: + if dependencies: img = img.with_pip_packages(*dependencies) return img.clone(addl_layer=ll) @@ -676,7 +662,6 @@ def clone( base_image: Optional[str] = None, python_version: Optional[Tuple[int, int]] = None, addl_layer: Optional[Layer] = None, - addl_top: bool = False, ) -> Image: """ Use this method to clone the current image and change the registry and name @@ -686,7 +671,6 @@ def clone( :param name: Name of the image :param python_version: Python version for the image, if not specified, will use the current Python version :param addl_layer: Additional layer to add to the image. This will be added to the end of the layers. - :param addl_top: A flag for additional layer addl_layer to be added to the top of the layers. :return: """ from flyte import Secret @@ -706,10 +690,7 @@ def clone( raise ValueError( f"Cannot add additional layer {addl_layer} to an image without name. Please first clone()." ) - if addl_top: - new_layers = (addl_layer, *self._layers) - else: - new_layers = (*self._layers, addl_layer) if addl_layer else self._layers + new_layers = (*self._layers, addl_layer) if addl_layer else self._layers img = Image._new( base_image=base_image, dockerfile=self.dockerfile, @@ -839,7 +820,6 @@ def with_pip_packages( pre: bool = False, extra_args: Optional[str] = None, secret_mounts: Optional[SecretRequest] = None, - optimize_layers: bool = True, ) -> Image: """ Use this method to create a new image with the specified pip packages layered on top of the current image @@ -855,14 +835,14 @@ def my_task(x: int) -> int: To mount secrets during the build process to download private packages, you can use the `secret_mounts`. In the below example, "GITHUB_PAT" will be mounted as env var "GITHUB_PAT", - and "apt-secret" will be mounted at /etc/apt/apt-secret. + and "apt-secret" will be mounted at /etc/apt/apt-secret. Example: ```python private_package = "git+https://$GITHUB_PAT@github.com/flyteorg/flytex.git@2e20a2acebfc3877d84af643fdd768edea41d533" @flyte.task( image=( flyte.Image.from_debian_base() - .with_pip_packages("private_package", secret_mounts=[Secret(key="GITHUB_PAT")]) + .with_pip_packagesjj("private_package", secret_mounts=[Secret(key="GITHUB_PAT")]) .with_apt_packages("git", secret_mounts=[Secret(key="apt-secret", mount="/etc/apt/apt-secret")]) ) def my_task(x: int) -> int: @@ -875,53 +855,21 @@ def my_task(x: int) -> int: :param extra_index_urls: extra index urls to use for pip install, default is None :param pre: whether to allow pre-release versions, default is False :param extra_args: extra arguments to pass to pip install, default is None - :param extra_args: extra arguments to pass to pip install, default is None :param secret_mounts: list of secret to mount for the build process. - :param optimize_layers: Caching dependencies for future performance, default is True - :return: Image """ + new_packages: Optional[Tuple] = packages or None + new_extra_index_urls: Optional[Tuple] = _ensure_tuple(extra_index_urls) if extra_index_urls else None - import re - - # Automatically categorize the packages - categorized: dict[str, list[str]] = { - "heavy": [], - "core": [], - "unknown": [], - } - - for pkg in packages: - pkg_name = re.split(r"[<>=~!]", pkg, 1)[0].split("[", 1)[0].strip() - category = "unknown" - if optimize_layers: - for cat, pkg_list in PACKAGE_IMPORTANCE.items(): - if pkg_name in pkg_list: - category = cat - break - categorized[category].append(pkg) - - # Helper function to create a layer - def create_pip_layer(pkgs): - return PipPackages( - packages=tuple(pkgs), - index_url=index_url, - extra_index_urls=_ensure_tuple(extra_index_urls) if extra_index_urls else None, - pre=pre, - extra_args=extra_args, - secret_mounts=_ensure_tuple(secret_mounts) if secret_mounts else None, - ) - - # Create layers in priority order (core first, dev last) - new_image = self - for category, lst in categorized.items(): - if lst: - layer = create_pip_layer(lst) - if category == "heavy": - new_image = new_image.clone(addl_layer=layer, addl_top=True) - else: - new_image = new_image.clone(addl_layer=layer) - + ll = PipPackages( + packages=new_packages, + index_url=index_url, + extra_index_urls=new_extra_index_urls, + pre=pre, + extra_args=extra_args, + secret_mounts=_ensure_tuple(secret_mounts) if secret_mounts else None, + ) + new_image = self.clone(addl_layer=ll) return new_image def with_env_vars(self, env_vars: Dict[str, str]) -> Image: diff --git a/src/flyte/_internal/imagebuild/image_builder.py b/src/flyte/_internal/imagebuild/image_builder.py index a998a1dda..c053986a5 100644 --- a/src/flyte/_internal/imagebuild/image_builder.py +++ b/src/flyte/_internal/imagebuild/image_builder.py @@ -2,6 +2,7 @@ import asyncio import json +import re import typing from typing import ClassVar, Dict, Optional, Tuple @@ -9,10 +10,20 @@ from pydantic import BaseModel from typing_extensions import Protocol -from flyte._image import Architecture, Image +from flyte._image import Architecture, Image, Layer, PipPackages, PythonWheels from flyte._initialize import _get_init_config from flyte._logging import logger +HEAVY_DEPENDENCIES = frozenset( + { + "tensorflow", + "torch", + "torchaudio", + "torchvision", + "scikit-learn", + } +) + class ImageBuilder(Protocol): async def build_image(self, image: Image, dry_run: bool) -> str: ... @@ -135,6 +146,122 @@ class ImageBuildEngine: ImageBuilderType = typing.Literal["local", "remote"] + @staticmethod + def _optimize_image_layers(image: Image) -> Image: + """ + Consolidate pip package layers by separating heavy and lightweight dependencies. + + This optimization addresses Docker cache invalidation issues when users chain multiple + . with_pip_packages() calls. By consolidating packages at build time: + + 1. Heavy packages (TensorFlow, PyTorch, etc.) are placed in a bottom layer + → Cached longer, rebuilt less frequently + + 2. Lightweight packages are placed in a top layer + → Can change more frequently without invalidating heavy layer cache + + Example: + Before: + Layer 1: PipPackages(["tensorflow", "numpy"]) # Mixed + Layer 2: PipPackages(["torch", "pandas"]) # Mixed + Layer 3: PipPackages(["pytest"]) # Light + + After: + Layer 1: PipPackages(["tensorflow", "torch"]) # All heavy + Layer 2: PipPackages(["numpy", "pandas", "pytest"]) # All light + + Args: + image: The image to optimize + + Returns: + A new Image with optimized layers + """ + # Separate pip layers from other layer types + pip_layers = [] + other_layers = [] + python_wheels_layers = [] + + for layer in image._layers: + if isinstance(layer, PipPackages): + pip_layers.append(layer) + elif isinstance(layer, PythonWheels): + python_wheels_layers.append(layer) + else: + other_layers.append(layer) + + # Nothing to optimize if there are no pip layers + if not pip_layers: + logger.debug("No PipPackages layers found, skipping optimization") + return image + + # Consolidate packages from all pip layers into two categories + heavy_packages = [] + other_packages = [] + + for layer in pip_layers: + assert layer.packages is not None + for pkg in layer.packages: + pkg_name = re.split(r"[<>=~!\[]", pkg, 1)[0].strip() + if pkg_name in HEAVY_DEPENDENCIES: + heavy_packages.append(pkg) + else: + other_packages.append(pkg) + + logger.info( + f"Optimizing {len(pip_layers)} pip layer(s): {len(heavy_packages)} heavy, \ + {len(other_packages)} other packages" + ) + + # Build optimized layer list + optimized_layers: list[Layer] = [] + + # Use settings from first pip layer as template for consolidated layers + template_layer = pip_layers[0] + + # 1. Add heavy packages first (bottom of Dockerfile → better caching) + if heavy_packages: + heavy_layer = PipPackages( + packages=tuple(heavy_packages), + index_url=template_layer.index_url, + extra_index_urls=template_layer.extra_index_urls, + pre=template_layer.pre, + extra_args=template_layer.extra_args, + secret_mounts=template_layer.secret_mounts, + ) + optimized_layers.append(heavy_layer) + logger.debug(f" Heavy layer: {', '.join(heavy_packages)}") + + # 2. Preserve all non-pip layers in their original positions + optimized_layers.extend(other_layers) + + # 3. Add other packages last (top of Dockerfile → can change frequently) + if other_packages: + other_layer = PipPackages( + packages=tuple(other_packages), + index_url=template_layer.index_url, + extra_index_urls=template_layer.extra_index_urls, + pre=template_layer.pre, + extra_args=template_layer.extra_args, + secret_mounts=template_layer.secret_mounts, + ) + optimized_layers.append(other_layer) + logger.debug(f" Other layer: {len(other_packages)} packages") + + optimized_layers.extend(python_wheels_layers) + logger.debug(f" Moved PythonWheels: {len(python_wheels_layers)} to bottom") + # Create new image with optimized layers + return Image._new( + base_image=image.base_image, + dockerfile=image.dockerfile, + registry=image.registry, + name=image.name, + platform=image.platform, + python_version=image.python_version, + _layers=tuple(optimized_layers), + _image_registry_secret=image._image_registry_secret, + _ref_name=image._ref_name, + ) + @staticmethod @alru_cache async def image_exists(image: Image) -> Optional[str]: @@ -181,6 +308,7 @@ async def build( builder: ImageBuildEngine.ImageBuilderType | None = None, dry_run: bool = False, force: bool = False, + optimize_layers: bool = True, ) -> str: """ Build the image. Images to be tagged with latest will always be built. Otherwise, this engine will check the @@ -190,8 +318,10 @@ async def build( :param builder: :param dry_run: Tell the builder to not actually build. Different builders will have different behaviors. :param force: Skip the existence check. Normally if the image already exists we won't build it. + :param optimize_layers: If True, consolidate pip packages by category (default: True) :return: """ + # Always trigger a build if this is a dry run since builder shouldn't really do anything, or a force. image_uri = (await cls.image_exists(image)) or image.uri if force or dry_run or not await cls.image_exists(image): @@ -200,6 +330,10 @@ async def build( # Validate the image before building image.validate() + if optimize_layers: + logger.debug("Optimizing image layers by consolidating pip packages...") + image = ImageBuildEngine._optimize_image_layers(image) # Call the optimizer + # If a builder is not specified, use the first registered builder cfg = _get_init_config() if cfg and cfg.image_builder: From f419e11a8e51d34cc83783096fdea416c403589c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CAlex?= Date: Tue, 30 Dec 2025 18:31:08 +0800 Subject: [PATCH 07/10] fix wording MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “Alex --- src/flyte/_image.py | 4 +-- .../_internal/imagebuild/image_builder.py | 26 ++----------------- 2 files changed, 4 insertions(+), 26 deletions(-) diff --git a/src/flyte/_image.py b/src/flyte/_image.py index 6212bd478..6e98245a2 100644 --- a/src/flyte/_image.py +++ b/src/flyte/_image.py @@ -835,14 +835,14 @@ def my_task(x: int) -> int: To mount secrets during the build process to download private packages, you can use the `secret_mounts`. In the below example, "GITHUB_PAT" will be mounted as env var "GITHUB_PAT", - and "apt-secret" will be mounted at /etc/apt/apt-secret. + and "apt-secret" will be mounted at /etc/apt/apt-secret. Example: ```python private_package = "git+https://$GITHUB_PAT@github.com/flyteorg/flytex.git@2e20a2acebfc3877d84af643fdd768edea41d533" @flyte.task( image=( flyte.Image.from_debian_base() - .with_pip_packagesjj("private_package", secret_mounts=[Secret(key="GITHUB_PAT")]) + .with_pip_packages("private_package", secret_mounts=[Secret(key="GITHUB_PAT")]) .with_apt_packages("git", secret_mounts=[Secret(key="apt-secret", mount="/etc/apt/apt-secret")]) ) def my_task(x: int) -> int: diff --git a/src/flyte/_internal/imagebuild/image_builder.py b/src/flyte/_internal/imagebuild/image_builder.py index c053986a5..916a0bfab 100644 --- a/src/flyte/_internal/imagebuild/image_builder.py +++ b/src/flyte/_internal/imagebuild/image_builder.py @@ -149,32 +149,10 @@ class ImageBuildEngine: @staticmethod def _optimize_image_layers(image: Image) -> Image: """ - Consolidate pip package layers by separating heavy and lightweight dependencies. - - This optimization addresses Docker cache invalidation issues when users chain multiple - . with_pip_packages() calls. By consolidating packages at build time: - - 1. Heavy packages (TensorFlow, PyTorch, etc.) are placed in a bottom layer - → Cached longer, rebuilt less frequently - - 2. Lightweight packages are placed in a top layer - → Can change more frequently without invalidating heavy layer cache - - Example: - Before: - Layer 1: PipPackages(["tensorflow", "numpy"]) # Mixed - Layer 2: PipPackages(["torch", "pandas"]) # Mixed - Layer 3: PipPackages(["pytest"]) # Light - - After: - Layer 1: PipPackages(["tensorflow", "torch"]) # All heavy - Layer 2: PipPackages(["numpy", "pandas", "pytest"]) # All light - - Args: - image: The image to optimize + Optimize pip layers for better Docker cache reuse. Returns: - A new Image with optimized layers + A new Image with reorganized pip layers. """ # Separate pip layers from other layer types pip_layers = [] From d842c06ba9f8a26355c0f16c23bee706ae07a406 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CAlex?= Date: Tue, 6 Jan 2026 17:00:04 +0800 Subject: [PATCH 08/10] created a new file, extract only heavy deps, flyte python wheel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “Alex --- src/flyte/_image.py | 7 - src/flyte/_internal/imagebuild/heavy_deps.py | 13 ++ .../_internal/imagebuild/image_builder.py | 200 +++++++++++------- 3 files changed, 136 insertions(+), 84 deletions(-) create mode 100644 src/flyte/_internal/imagebuild/heavy_deps.py diff --git a/src/flyte/_image.py b/src/flyte/_image.py index 6e98245a2..a98fe0139 100644 --- a/src/flyte/_image.py +++ b/src/flyte/_image.py @@ -626,10 +626,6 @@ def from_uv_script( Args: secret_mounts: """ - from ._utils import parse_uv_script_file - - metadata = parse_uv_script_file(Path(script)) - dependencies = metadata.dependencies ll = UVScript( script=Path(script), @@ -649,9 +645,6 @@ def from_uv_script( platform=platform, ) - if dependencies: - img = img.with_pip_packages(*dependencies) - return img.clone(addl_layer=ll) def clone( diff --git a/src/flyte/_internal/imagebuild/heavy_deps.py b/src/flyte/_internal/imagebuild/heavy_deps.py new file mode 100644 index 000000000..5f3d96cd2 --- /dev/null +++ b/src/flyte/_internal/imagebuild/heavy_deps.py @@ -0,0 +1,13 @@ +""" +Configuration for Docker image layer optimization. +""" + +HEAVY_DEPENDENCIES = frozenset( + { + "tensorflow", + "torch", + "torchaudio", + "torchvision", + "scikit-learn", + } +) diff --git a/src/flyte/_internal/imagebuild/image_builder.py b/src/flyte/_internal/imagebuild/image_builder.py index 916a0bfab..5893238b6 100644 --- a/src/flyte/_internal/imagebuild/image_builder.py +++ b/src/flyte/_internal/imagebuild/image_builder.py @@ -10,19 +10,11 @@ from pydantic import BaseModel from typing_extensions import Protocol -from flyte._image import Architecture, Image, Layer, PipPackages, PythonWheels +from flyte._image import Architecture, Image, Layer, PipPackages, PythonWheels, UVScript from flyte._initialize import _get_init_config from flyte._logging import logger -HEAVY_DEPENDENCIES = frozenset( - { - "tensorflow", - "torch", - "torchaudio", - "torchvision", - "scikit-learn", - } -) +from .heavy_deps import HEAVY_DEPENDENCIES class ImageBuilder(Protocol): @@ -149,85 +141,134 @@ class ImageBuildEngine: @staticmethod def _optimize_image_layers(image: Image) -> Image: """ - Optimize pip layers for better Docker cache reuse. - - Returns: - A new Image with reorganized pip layers. + Optimize pip layers by extracting heavy dependencies to the top for better caching. + Original layers remain in their original positions but with heavy packages removed. + PythonWheels layers with package_name 'flyte' are moved to the very end. """ - # Separate pip layers from other layer types - pip_layers = [] - other_layers = [] - python_wheels_layers = [] + from flyte._utils import parse_uv_script_file + + # Step 1: Collect heavy packages and build new layer list + all_heavy_packages: list[str] = [] + template_layer: PipPackages | None = None + optimized_layers: list[Layer] = [] + flyte_wheel_layers: list[PythonWheels] = [] # Collect flyte wheels to move to end for layer in image._layers: if isinstance(layer, PipPackages): - pip_layers.append(layer) + assert layer.packages is not None + + heavy_pkgs: list[str] = [] + light_pkgs: list[str] = [] + + # Split packages + for pkg in layer.packages: + pkg_name = re.split(r"[<>=~!\[]", pkg, 1)[0].strip() + if pkg_name in HEAVY_DEPENDENCIES: + heavy_pkgs.append(pkg) + else: + light_pkgs.append(pkg) + + # Collect heavy packages for the top layer + if heavy_pkgs: + all_heavy_packages.extend(heavy_pkgs) + if template_layer is None: + template_layer = layer + + # Keep layer in original position with only light packages + if light_pkgs: + light_layer = PipPackages( + packages=tuple(light_pkgs), + index_url=layer.index_url, + extra_index_urls=layer.extra_index_urls, + pre=layer.pre, + extra_args=layer.extra_args, + secret_mounts=layer.secret_mounts, + ) + optimized_layers.append(light_layer) + # If layer had ONLY heavy packages, don't add it (it becomes empty) + + elif isinstance(layer, UVScript): + # Parse UV scripts and extract dependencies + metadata = parse_uv_script_file(layer.script) + + if metadata.dependencies: + uv_heavy_pkgs: list[str] = [] + uv_light_pkgs: list[str] = [] + + for pkg in metadata.dependencies: + pkg_name = re.split(r"[<>=~!\[]", pkg, 1)[0].strip() + if pkg_name in HEAVY_DEPENDENCIES: + heavy_pkgs.append(pkg) + else: + light_pkgs.append(pkg) + + # Collect heavy packages + if uv_heavy_pkgs: + all_heavy_packages.extend(uv_heavy_pkgs) + if template_layer is None: + # Create template from UV layer config + template_layer = PipPackages( + packages=(), + index_url=layer.index_url, + extra_index_urls=layer.extra_index_urls, + pre=layer.pre, + extra_args=layer.extra_args, + secret_mounts=layer.secret_mounts, + ) + + # Add light packages as pip layer in original position + if uv_light_pkgs: + light_pip_layer = PipPackages( + packages=tuple(uv_light_pkgs), + index_url=layer.index_url, + extra_index_urls=layer.extra_index_urls, + pre=layer.pre, + extra_args=layer.extra_args, + secret_mounts=layer.secret_mounts, + ) + optimized_layers.append(light_pip_layer) + + # Keep the UVScript layer in its position + optimized_layers.append(layer) + elif isinstance(layer, PythonWheels): - python_wheels_layers.append(layer) + # Check if this is a flyte wheel - if so, move to end + if layer.package_name == "flyte": + flyte_wheel_layers.append(layer) + logger.debug(f"Moving flyte wheel layer to end: {layer}") + else: + # Keep other wheels in original position + optimized_layers.append(layer) + else: - other_layers.append(layer) + # All other layers (apt, env, etc.) stay in position + optimized_layers.append(layer) - # Nothing to optimize if there are no pip layers - if not pip_layers: - logger.debug("No PipPackages layers found, skipping optimization") + # If no heavy packages found, return original image + if not all_heavy_packages: + logger.debug("No heavy packages found, skipping optimization") return image - # Consolidate packages from all pip layers into two categories - heavy_packages = [] - other_packages = [] - - for layer in pip_layers: - assert layer.packages is not None - for pkg in layer.packages: - pkg_name = re.split(r"[<>=~!\[]", pkg, 1)[0].strip() - if pkg_name in HEAVY_DEPENDENCIES: - heavy_packages.append(pkg) - else: - other_packages.append(pkg) - - logger.info( - f"Optimizing {len(pip_layers)} pip layer(s): {len(heavy_packages)} heavy, \ - {len(other_packages)} other packages" + logger.info(f"Extracted {len(all_heavy_packages)} heavy package(s) to top layer") + logger.debug(f" Heavy packages: {', '.join(all_heavy_packages)}") + + # Step 2: Build final layer order + assert template_layer is not None + heavy_layer = PipPackages( + packages=tuple(all_heavy_packages), + index_url=template_layer.index_url, + extra_index_urls=template_layer.extra_index_urls, + pre=template_layer.pre, + extra_args=template_layer.extra_args, + secret_mounts=template_layer.secret_mounts, ) - # Build optimized layer list - optimized_layers: list[Layer] = [] + # Final layer order: heavy at top, everything else in middle, flyte wheels at end + final_layers = [heavy_layer, *optimized_layers, *flyte_wheel_layers] - # Use settings from first pip layer as template for consolidated layers - template_layer = pip_layers[0] - - # 1. Add heavy packages first (bottom of Dockerfile → better caching) - if heavy_packages: - heavy_layer = PipPackages( - packages=tuple(heavy_packages), - index_url=template_layer.index_url, - extra_index_urls=template_layer.extra_index_urls, - pre=template_layer.pre, - extra_args=template_layer.extra_args, - secret_mounts=template_layer.secret_mounts, - ) - optimized_layers.append(heavy_layer) - logger.debug(f" Heavy layer: {', '.join(heavy_packages)}") - - # 2. Preserve all non-pip layers in their original positions - optimized_layers.extend(other_layers) - - # 3. Add other packages last (top of Dockerfile → can change frequently) - if other_packages: - other_layer = PipPackages( - packages=tuple(other_packages), - index_url=template_layer.index_url, - extra_index_urls=template_layer.extra_index_urls, - pre=template_layer.pre, - extra_args=template_layer.extra_args, - secret_mounts=template_layer.secret_mounts, - ) - optimized_layers.append(other_layer) - logger.debug(f" Other layer: {len(other_packages)} packages") + if flyte_wheel_layers: + logger.debug(f"Moved {len(flyte_wheel_layers)} flyte wheel layer(s) to end") - optimized_layers.extend(python_wheels_layers) - logger.debug(f" Moved PythonWheels: {len(python_wheels_layers)} to bottom") - # Create new image with optimized layers return Image._new( base_image=image.base_image, dockerfile=image.dockerfile, @@ -235,7 +276,7 @@ def _optimize_image_layers(image: Image) -> Image: name=image.name, platform=image.platform, python_version=image.python_version, - _layers=tuple(optimized_layers), + _layers=tuple(final_layers), _image_registry_secret=image._image_registry_secret, _ref_name=image._ref_name, ) @@ -311,6 +352,11 @@ async def build( if optimize_layers: logger.debug("Optimizing image layers by consolidating pip packages...") image = ImageBuildEngine._optimize_image_layers(image) # Call the optimizer + logger.debug("=" * 60) + logger.debug("Final layer order after optimization:") + for i, layer in enumerate(image._layers): + logger.debug(f" Layer {i}: {type(layer).__name__} - {layer}") + logger.debug("=" * 60) # If a builder is not specified, use the first registered builder cfg = _get_init_config() From e8ce98434c66a581e42225055b20dbdff9d8eca8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CAlex?= Date: Fri, 9 Jan 2026 00:17:43 +0800 Subject: [PATCH 09/10] added tests, seperate args MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “Alex --- .../_internal/imagebuild/image_builder.py | 99 +++++------ tests/flyte/test_image.py | 163 ++++++++++++++++++ 2 files changed, 210 insertions(+), 52 deletions(-) diff --git a/src/flyte/_internal/imagebuild/image_builder.py b/src/flyte/_internal/imagebuild/image_builder.py index 5893238b6..bcb05e5b5 100644 --- a/src/flyte/_internal/imagebuild/image_builder.py +++ b/src/flyte/_internal/imagebuild/image_builder.py @@ -142,16 +142,16 @@ class ImageBuildEngine: def _optimize_image_layers(image: Image) -> Image: """ Optimize pip layers by extracting heavy dependencies to the top for better caching. - Original layers remain in their original positions but with heavy packages removed. + Heavy packages from each layer are extracted to separate layers (preserving per-layer arguments), + and all heavy layers are placed at the top. Original layers with light packages follow. PythonWheels layers with package_name 'flyte' are moved to the very end. """ from flyte._utils import parse_uv_script_file - # Step 1: Collect heavy packages and build new layer list - all_heavy_packages: list[str] = [] - template_layer: PipPackages | None = None - optimized_layers: list[Layer] = [] - flyte_wheel_layers: list[PythonWheels] = [] # Collect flyte wheels to move to end + # Step 1: Collect heavy and original layers separately + heavy_layers: list[PipPackages] = [] + original_layers: list[Layer] = [] + flyte_wheel_layers: list[PythonWheels] = [] for layer in image._layers: if isinstance(layer, PipPackages): @@ -168,13 +168,20 @@ def _optimize_image_layers(image: Image) -> Image: else: light_pkgs.append(pkg) - # Collect heavy packages for the top layer + # Create heavy layer with original arguments (if any heavy packages) if heavy_pkgs: - all_heavy_packages.extend(heavy_pkgs) - if template_layer is None: - template_layer = layer + heavy_layer = PipPackages( + packages=tuple(heavy_pkgs), + index_url=layer.index_url, + extra_index_urls=layer.extra_index_urls, + pre=layer.pre, + extra_args=layer.extra_args, + secret_mounts=layer.secret_mounts, + ) + heavy_layers.append(heavy_layer) + logger.debug(f"Extracted {len(heavy_pkgs)} heavy package(s): {', '.join(heavy_pkgs)}") - # Keep layer in original position with only light packages + # Create light layer with original arguments (if any light packages) if light_pkgs: light_layer = PipPackages( packages=tuple(light_pkgs), @@ -184,8 +191,7 @@ def _optimize_image_layers(image: Image) -> Image: extra_args=layer.extra_args, secret_mounts=layer.secret_mounts, ) - optimized_layers.append(light_layer) - # If layer had ONLY heavy packages, don't add it (it becomes empty) + original_layers.append(light_layer) elif isinstance(layer, UVScript): # Parse UV scripts and extract dependencies @@ -198,25 +204,26 @@ def _optimize_image_layers(image: Image) -> Image: for pkg in metadata.dependencies: pkg_name = re.split(r"[<>=~!\[]", pkg, 1)[0].strip() if pkg_name in HEAVY_DEPENDENCIES: - heavy_pkgs.append(pkg) + uv_heavy_pkgs.append(pkg) else: - light_pkgs.append(pkg) + uv_light_pkgs.append(pkg) - # Collect heavy packages + # Create heavy pip layer from UV (if any heavy packages) if uv_heavy_pkgs: - all_heavy_packages.extend(uv_heavy_pkgs) - if template_layer is None: - # Create template from UV layer config - template_layer = PipPackages( - packages=(), - index_url=layer.index_url, - extra_index_urls=layer.extra_index_urls, - pre=layer.pre, - extra_args=layer.extra_args, - secret_mounts=layer.secret_mounts, - ) - - # Add light packages as pip layer in original position + heavy_pip_layer = PipPackages( + packages=tuple(uv_heavy_pkgs), + index_url=layer.index_url, + extra_index_urls=layer.extra_index_urls, + pre=layer.pre, + extra_args=layer.extra_args, + secret_mounts=layer.secret_mounts, + ) + heavy_layers.append(heavy_pip_layer) + logger.debug( + f"Extracted {len(uv_heavy_pkgs)} heavy package(s) from UV: {', '.join(uv_heavy_pkgs)}" + ) + + # Create light pip layer from UV (if any light packages) if uv_light_pkgs: light_pip_layer = PipPackages( packages=tuple(uv_light_pkgs), @@ -226,10 +233,10 @@ def _optimize_image_layers(image: Image) -> Image: extra_args=layer.extra_args, secret_mounts=layer.secret_mounts, ) - optimized_layers.append(light_pip_layer) + original_layers.append(light_pip_layer) - # Keep the UVScript layer in its position - optimized_layers.append(layer) + # Keep the UVScript layer in original_layers section + original_layers.append(layer) elif isinstance(layer, PythonWheels): # Check if this is a flyte wheel - if so, move to end @@ -237,34 +244,22 @@ def _optimize_image_layers(image: Image) -> Image: flyte_wheel_layers.append(layer) logger.debug(f"Moving flyte wheel layer to end: {layer}") else: - # Keep other wheels in original position - optimized_layers.append(layer) + # Keep other wheels with original_layers + original_layers.append(layer) else: - # All other layers (apt, env, etc.) stay in position - optimized_layers.append(layer) + # All other layers (apt, env, etc.) go with light layers + original_layers.append(layer) # If no heavy packages found, return original image - if not all_heavy_packages: + if not heavy_layers: logger.debug("No heavy packages found, skipping optimization") return image - logger.info(f"Extracted {len(all_heavy_packages)} heavy package(s) to top layer") - logger.debug(f" Heavy packages: {', '.join(all_heavy_packages)}") - - # Step 2: Build final layer order - assert template_layer is not None - heavy_layer = PipPackages( - packages=tuple(all_heavy_packages), - index_url=template_layer.index_url, - extra_index_urls=template_layer.extra_index_urls, - pre=template_layer.pre, - extra_args=template_layer.extra_args, - secret_mounts=template_layer.secret_mounts, - ) + logger.info(f"Created {len(heavy_layers)} heavy layer(s) at top") - # Final layer order: heavy at top, everything else in middle, flyte wheels at end - final_layers = [heavy_layer, *optimized_layers, *flyte_wheel_layers] + # Final layer order: all heavy layers at top, then original layers, then flyte wheels at end + final_layers = [*heavy_layers, *original_layers, *flyte_wheel_layers] if flyte_wheel_layers: logger.debug(f"Moved {len(flyte_wheel_layers)} flyte wheel layer(s) to end") diff --git a/tests/flyte/test_image.py b/tests/flyte/test_image.py index f7c1ffdf8..6104f2ac3 100644 --- a/tests/flyte/test_image.py +++ b/tests/flyte/test_image.py @@ -5,6 +5,7 @@ from flyte._image import Image, UVScript from flyte._internal.imagebuild.docker_builder import PipAndRequirementsHandler +from flyte._internal.imagebuild.image_builder import ImageBuildEngine def test_base(): @@ -171,3 +172,165 @@ def test_ids_for_different_python_version(): # Override base images to be the same for testing that the identifier does not depends on python version object.__setattr__(ex_11, "base_image", "python:3.10-slim-bookworm") object.__setattr__(ex_12, "base_image", "python:3.10-slim-bookworm") + + +def test_optimize_image_layers_single_layer(): + """Test optimization extracts heavy packages to a separate layer at the top.""" + from flyte._image import PipPackages + + img = Image.from_debian_base(registry="localhost", name="test-image", install_flyte=False).with_pip_packages( + "torch", "tensorflow", "requests", "flask" + ) + + optimized = ImageBuildEngine._optimize_image_layers(img) + pip_layers = [layer for layer in optimized._layers if isinstance(layer, PipPackages)] + + assert len(pip_layers) == 2 + # Heavy packages at top (torch and tensorflow) + assert "torch" in pip_layers[0].packages + assert "tensorflow" in pip_layers[0].packages + # Light packages below (requests and flask) + assert "requests" in pip_layers[1].packages + assert "flask" in pip_layers[1].packages + + +def test_optimize_image_layers_multiple_layers(): + """Test optimization with multiple pip layers.""" + from flyte._image import PipPackages + + img = ( + Image.from_debian_base(registry="localhost", name="test-image", install_flyte=False) + .with_pip_packages("torch", "requests") + .with_pip_packages("tensorflow", "flask") + ) + + optimized = ImageBuildEngine._optimize_image_layers(img) + pip_layers = [layer for layer in optimized._layers if isinstance(layer, PipPackages)] + + # Should have 4 layers: 2 heavy at top (torch, tensorflow), 2 light below (requests, flask) + assert len(pip_layers) == 4 + + # First two layers should be heavy packages + heavy_packages = pip_layers[0].packages + pip_layers[1].packages + assert "torch" in heavy_packages + assert "tensorflow" in heavy_packages + + # Last two layers should be light packages + light_packages = pip_layers[2].packages + pip_layers[3].packages + assert "requests" in light_packages + assert "flask" in light_packages + + +def test_optimize_image_layers_no_heavy_packages(): + """Test optimization when there are no heavy packages.""" + from flyte._image import PipPackages + + img = Image.from_debian_base(registry="localhost", name="test-image", install_flyte=False).with_pip_packages( + "requests", "flask" + ) + + optimized = ImageBuildEngine._optimize_image_layers(img) + + # Should return the same image structure since no optimization needed + original_pip_layers = [layer for layer in img._layers if isinstance(layer, PipPackages)] + optimized_pip_layers = [layer for layer in optimized._layers if isinstance(layer, PipPackages)] + assert len(optimized_pip_layers) == len(original_pip_layers) + + +def test_optimize_image_layers_only_heavy_packages(): + """Test optimization when a layer contains only heavy packages.""" + from flyte._image import PipPackages + + img = Image.from_debian_base(registry="localhost", name="test-image", install_flyte=False).with_pip_packages( + "torch", "tensorflow" + ) + + optimized = ImageBuildEngine._optimize_image_layers(img) + pip_layers = [layer for layer in optimized._layers if isinstance(layer, PipPackages)] + + # Should have 1 heavy layer at top (both torch and tensorflow are heavy) + assert len(pip_layers) == 1 + assert "torch" in pip_layers[0].packages + assert "tensorflow" in pip_layers[0].packages + + +def test_optimize_image_layers_preserves_extra_args(): + """Test that optimization preserves pip layer arguments like index_url.""" + from flyte._image import PipPackages + + img = ( + Image.from_debian_base(registry="localhost", name="test-image", install_flyte=False) + .with_pip_packages("torch", "requests", extra_index_urls="https://example.com") + .with_pip_packages("tensorflow", "flask", extra_index_urls="https://other.com") + ) + + optimized = ImageBuildEngine._optimize_image_layers(img) + pip_layers = [layer for layer in optimized._layers if isinstance(layer, PipPackages)] + + # Find the heavy layer with torch + torch_layer = pip_layers[0] + assert torch_layer.extra_index_urls == ("https://example.com",) + + # Find the heavy layer with tensorflow + tensorflow_layer = pip_layers[1] + assert tensorflow_layer.extra_index_urls == ("https://other.com",) + + +def test_optimize_image_layers_with_non_pip_layers(): + """Test optimization preserves non-pip layers in correct positions.""" + from flyte._image import AptPackages, PipPackages + + img = ( + Image.from_debian_base(registry="localhost", name="test-image", install_flyte=False) + .with_apt_packages("curl", "vim") + .with_pip_packages("torch", "requests") + ) + + optimized = ImageBuildEngine._optimize_image_layers(img) + + # Apt layers should still exist + apt_layers = [layer for layer in optimized._layers if isinstance(layer, AptPackages)] + assert len(apt_layers) >= 1 # At least one apt layer (base + custom) + + # Should have pip layers + pip_layers = [layer for layer in optimized._layers if isinstance(layer, PipPackages)] + assert len(pip_layers) >= 1 + + +def test_optimize_image_layers_flyte_wheels_at_end(): + """Test that PythonWheels with package_name 'flyte' are moved to the end.""" + from flyte._image import PythonWheels + + img = Image.from_debian_base(registry="localhost", name="test-image") + + # The default image should have flyte wheels + optimized = ImageBuildEngine._optimize_image_layers(img) + + # Find flyte wheel layers + flyte_wheels = [ + layer for layer in optimized._layers if isinstance(layer, PythonWheels) and layer.package_name == "flyte" + ] + + if flyte_wheels: + # Flyte wheels should be at the very end + last_flyte_wheel_index = optimized._layers.index(flyte_wheels[-1]) + assert last_flyte_wheel_index == len(optimized._layers) - 1 + + +def test_optimize_image_layers_with_uv_script(): + """Test optimization with UVScript layers.""" + from flyte._image import UVScript + + script_path = Path(__file__).parent / "resources" / "sample_uv_script.py" + + # Skip test if file doesn't exist + if not script_path.exists(): + pytest.skip(f"Test file not found: {script_path}") + + img = Image.from_uv_script(script_path, name="uvtest", registry="localhost", python_version=(3, 12)) + + optimized = ImageBuildEngine._optimize_image_layers(img) + + # UVScript layer should still exist + uv_layers = [layer for layer in optimized._layers if isinstance(layer, UVScript)] + assert len(uv_layers) >= 1 From ac7f1e5232c27bdcb5f167dd397d5d55cf2b8cca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CAlex?= Date: Fri, 9 Jan 2026 00:53:59 +0800 Subject: [PATCH 10/10] fmt for connectors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “Alex --- src/flyte/connectors/_server.py | 5 ++--- src/flyte/connectors/utils.py | 9 ++++---- .../flyte/connector/test_connector_service.py | 22 +++++++++---------- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/flyte/connectors/_server.py b/src/flyte/connectors/_server.py index 7b503a2e6..34faeec42 100644 --- a/src/flyte/connectors/_server.py +++ b/src/flyte/connectors/_server.py @@ -5,8 +5,6 @@ from typing import Callable, Dict, List, Tuple, Type, Union import grpc -from flyteidl2.connector.service_pb2_grpc import AsyncConnectorServiceServicer, ConnectorMetadataServiceServicer -from flyteidl2.core.security_pb2 import Connection from flyteidl2.connector.connector_pb2 import ( CreateTaskRequest, CreateTaskResponse, @@ -23,7 +21,8 @@ ListConnectorsRequest, ListConnectorsResponse, ) - +from flyteidl2.connector.service_pb2_grpc import AsyncConnectorServiceServicer, ConnectorMetadataServiceServicer +from flyteidl2.core.security_pb2 import Connection from prometheus_client import Counter, Summary from flyte._internal.runtime.convert import Inputs, convert_from_inputs_to_native diff --git a/src/flyte/connectors/utils.py b/src/flyte/connectors/utils.py index 88298cf3f..8a9692efe 100644 --- a/src/flyte/connectors/utils.py +++ b/src/flyte/connectors/utils.py @@ -5,12 +5,13 @@ import click import grpc -from flyteidl2.connector.service_pb2_grpc import add_AsyncConnectorServiceServicer_to_server, \ - add_ConnectorMetadataServiceServicer_to_server +from flyteidl2.connector import service_pb2 +from flyteidl2.connector.service_pb2_grpc import ( + add_AsyncConnectorServiceServicer_to_server, + add_ConnectorMetadataServiceServicer_to_server, +) from flyteidl2.core.execution_pb2 import TaskExecution from flyteidl2.core.tasks_pb2 import TaskTemplate -from flyteidl2.connector import service_pb2 - from rich.console import Console from rich.table import Table diff --git a/tests/flyte/connector/test_connector_service.py b/tests/flyte/connector/test_connector_service.py index fd637320c..4ff81b3c7 100644 --- a/tests/flyte/connector/test_connector_service.py +++ b/tests/flyte/connector/test_connector_service.py @@ -6,17 +6,6 @@ import grpc import pytest from flyteidl.core.tasks_pb2 import TaskTemplate -from flyteidl2.core import literals_pb2 -from flyteidl2.core.execution_pb2 import TaskExecution, TaskLog -from flyteidl2.core.identifier_pb2 import ( - Identifier, - NodeExecutionIdentifier, - ResourceType, - TaskExecutionIdentifier, - WorkflowExecutionIdentifier, -) -from flyteidl2.core.metrics_pb2 import ExecutionMetricResult -from flyteidl2.core.security_pb2 import Identity from flyteidl2.connector.connector_pb2 import ( CreateTaskRequest, DeleteTaskRequest, @@ -30,6 +19,17 @@ TaskCategory, TaskExecutionMetadata, ) +from flyteidl2.core import literals_pb2 +from flyteidl2.core.execution_pb2 import TaskExecution, TaskLog +from flyteidl2.core.identifier_pb2 import ( + Identifier, + NodeExecutionIdentifier, + ResourceType, + TaskExecutionIdentifier, + WorkflowExecutionIdentifier, +) +from flyteidl2.core.metrics_pb2 import ExecutionMetricResult +from flyteidl2.core.security_pb2 import Identity from flyteidl2.task import common_pb2 import flyte