From 8ba6688b6059b824d8e9071865c6f3793eb2a613 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 13 Mar 2026 00:24:12 -0700 Subject: [PATCH 1/9] feat: add persistent disk cache for image existence checks Adds PersistentCacheImageChecker that caches verified image URIs to ~/.flyte/cache/images/. After the first successful remote manifest check (~4s), subsequent runs read from disk cache (~0ms), eliminating repeated network round-trips to the container registry. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Kevin Su --- .../_internal/imagebuild/docker_builder.py | 3 +- .../_internal/imagebuild/image_builder.py | 42 +++++++++++++++++ .../imagebuild/test_image_build_engine.py | 45 ++++++++++++++++--- 3 files changed, 83 insertions(+), 7 deletions(-) diff --git a/src/flyte/_internal/imagebuild/docker_builder.py b/src/flyte/_internal/imagebuild/docker_builder.py index ccf146f44..472769975 100644 --- a/src/flyte/_internal/imagebuild/docker_builder.py +++ b/src/flyte/_internal/imagebuild/docker_builder.py @@ -40,6 +40,7 @@ ImageChecker, LocalDockerCommandImageChecker, LocalPodmanCommandImageChecker, + PersistentCacheImageChecker, ) from flyte._internal.imagebuild.utils import ( copy_files_to_context, @@ -575,7 +576,7 @@ class DockerImageBuilder(ImageBuilder): def get_checkers(self) -> Optional[typing.List[typing.Type[ImageChecker]]]: # Can get a public token for docker.io but ghcr requires a pat, so harder to get the manifest anonymously - return [LocalDockerCommandImageChecker, LocalPodmanCommandImageChecker, DockerAPIImageChecker] + return [PersistentCacheImageChecker, LocalDockerCommandImageChecker, LocalPodmanCommandImageChecker, DockerAPIImageChecker] async def build_image( self, image: Image, dry_run: bool = False, wait: bool = True, force: bool = False diff --git a/src/flyte/_internal/imagebuild/image_builder.py b/src/flyte/_internal/imagebuild/image_builder.py index 8eeb9562b..707f415d5 100644 --- a/src/flyte/_internal/imagebuild/image_builder.py +++ b/src/flyte/_internal/imagebuild/image_builder.py @@ -1,9 +1,12 @@ from __future__ import annotations import asyncio +import hashlib import json +import os import typing from importlib.metadata import entry_points +from pathlib import Path from typing import TYPE_CHECKING, ClassVar, Dict, Optional, Tuple from async_lru import alru_cache @@ -15,6 +18,8 @@ from flyte._logging import logger from flyte._status import status +_IMAGE_CACHE_DIR = Path.home() / ".flyte" / "cache" / "images" + if TYPE_CHECKING: from flyte._build import ImageBuild @@ -93,6 +98,40 @@ async def image_exists( return None +def _cache_key(repository: str, tag: str, arch: Tuple[str, ...]) -> str: + """Return a filesystem-safe cache key for an image.""" + raw = f"{repository}:{tag}:{','.join(sorted(arch))}" + return hashlib.sha256(raw.encode()).hexdigest() + + +def _write_image_cache(repository: str, tag: str, arch: Tuple[str, ...], image_uri: str) -> None: + """Persist a verified image URI to local disk cache.""" + try: + _IMAGE_CACHE_DIR.mkdir(parents=True, exist_ok=True) + cache_file = _IMAGE_CACHE_DIR / _cache_key(repository, tag, arch) + cache_file.write_text(image_uri) + except OSError as e: + logger.debug(f"Failed to write image cache: {e}") + + +class PersistentCacheImageChecker(ImageChecker): + """Check if image was previously verified and cached on disk (~0ms).""" + + @classmethod + async def image_exists( + cls, repository: str, tag: str, arch: Tuple[Architecture, ...] = ("linux/amd64",) + ) -> Optional[str]: + cache_file = _IMAGE_CACHE_DIR / _cache_key(repository, tag, arch) + try: + if cache_file.is_file(): + uri = cache_file.read_text().strip() + logger.debug(f"Image {uri} found in persistent cache") + return uri + except OSError as e: + logger.debug(f"Failed to read image cache: {e}") + return None + + class LocalDockerCommandImageChecker(ImageChecker): command_name: ClassVar[str] = "docker" @@ -170,6 +209,9 @@ async def image_exists(image: Image) -> Optional[str]: image_uri = await checker.image_exists(repository, tag, tuple(image.platform)) if image_uri: logger.debug(f"Image {image_uri} in registry") + # Persist to disk so future process invocations skip network checks + if checker is not PersistentCacheImageChecker: + _write_image_cache(repository, tag, tuple(image.platform), image_uri) return image_uri except Exception as e: logger.debug(f"Error checking image existence with {checker.__name__}: {e}") diff --git a/tests/flyte/imagebuild/test_image_build_engine.py b/tests/flyte/imagebuild/test_image_build_engine.py index 6c5fc76f3..846e1c441 100644 --- a/tests/flyte/imagebuild/test_image_build_engine.py +++ b/tests/flyte/imagebuild/test_image_build_engine.py @@ -8,26 +8,59 @@ DockerAPIImageChecker, ImageBuildEngine, LocalDockerCommandImageChecker, + PersistentCacheImageChecker, ) @mock.patch("flyte._internal.imagebuild.image_builder.DockerAPIImageChecker.image_exists") @mock.patch("flyte._internal.imagebuild.image_builder.LocalDockerCommandImageChecker.image_exists") +@mock.patch("flyte._internal.imagebuild.image_builder.PersistentCacheImageChecker.image_exists") @pytest.mark.asyncio -async def test_cached(mock_checker_cli, mock_checker_api): - # Simulate that the image exists locally - mock_checker_cli.return_value = True +async def test_cached(mock_checker_cache, mock_checker_cli, mock_checker_api): + # Simulate that the image exists via persistent cache + mock_checker_cache.return_value = True img = Image.from_debian_base() await ImageBuildEngine.image_exists(img) await ImageBuildEngine.image_exists(img) - # The local checker should be called once, and its result cached - mock_checker_cli.assert_called_once() - # The API checker should not be called at all + # The persistent cache checker should be called once, and its result cached by alru_cache + mock_checker_cache.assert_called_once() + # All other checkers should not be called + mock_checker_cli.assert_not_called() mock_checker_api.assert_not_called() +def test_persistent_cache_write_and_read(tmp_path, monkeypatch): + """PersistentCacheImageChecker reads back what _write_image_cache wrote.""" + import flyte._internal.imagebuild.image_builder as ib + + monkeypatch.setattr(ib, "_IMAGE_CACHE_DIR", tmp_path) + + # Initially nothing cached + import asyncio + + result = asyncio.get_event_loop().run_until_complete( + PersistentCacheImageChecker.image_exists("myrepo", "v1.0", ("linux/amd64",)) + ) + assert result is None + + # Write to cache + ib._write_image_cache("myrepo", "v1.0", ("linux/amd64",), "myrepo:v1.0") + + # Now it should be found + result = asyncio.get_event_loop().run_until_complete( + PersistentCacheImageChecker.image_exists("myrepo", "v1.0", ("linux/amd64",)) + ) + assert result == "myrepo:v1.0" + + # Different arch should NOT be found + result = asyncio.get_event_loop().run_until_complete( + PersistentCacheImageChecker.image_exists("myrepo", "v1.0", ("linux/arm64",)) + ) + assert result is None + + @mock.patch("flyte._internal.imagebuild.image_builder.ImageBuildEngine._get_builder") @mock.patch("flyte._internal.imagebuild.image_builder.ImageBuildEngine.image_exists", new_callable=mock.AsyncMock) @pytest.mark.asyncio From 144d85acf90f497629658e2cd4b8950e2868e7ca Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 13 Mar 2026 00:26:59 -0700 Subject: [PATCH 2/9] nit Signed-off-by: Kevin Su --- src/flyte/_internal/imagebuild/image_builder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flyte/_internal/imagebuild/image_builder.py b/src/flyte/_internal/imagebuild/image_builder.py index 707f415d5..be71cc6d8 100644 --- a/src/flyte/_internal/imagebuild/image_builder.py +++ b/src/flyte/_internal/imagebuild/image_builder.py @@ -212,7 +212,7 @@ async def image_exists(image: Image) -> Optional[str]: # Persist to disk so future process invocations skip network checks if checker is not PersistentCacheImageChecker: _write_image_cache(repository, tag, tuple(image.platform), image_uri) - return image_uri + return image_uri except Exception as e: logger.debug(f"Error checking image existence with {checker.__name__}: {e}") continue From 3a0acff9972984711b433e8e5e697e63501c2180 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 13 Mar 2026 00:30:06 -0700 Subject: [PATCH 3/9] lint Signed-off-by: Kevin Su --- src/flyte/_internal/imagebuild/docker_builder.py | 7 ++++++- src/flyte/_internal/imagebuild/image_builder.py | 1 - 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/flyte/_internal/imagebuild/docker_builder.py b/src/flyte/_internal/imagebuild/docker_builder.py index 472769975..94f2eea66 100644 --- a/src/flyte/_internal/imagebuild/docker_builder.py +++ b/src/flyte/_internal/imagebuild/docker_builder.py @@ -576,7 +576,12 @@ class DockerImageBuilder(ImageBuilder): def get_checkers(self) -> Optional[typing.List[typing.Type[ImageChecker]]]: # Can get a public token for docker.io but ghcr requires a pat, so harder to get the manifest anonymously - return [PersistentCacheImageChecker, LocalDockerCommandImageChecker, LocalPodmanCommandImageChecker, DockerAPIImageChecker] + return [ + PersistentCacheImageChecker, + LocalDockerCommandImageChecker, + LocalPodmanCommandImageChecker, + DockerAPIImageChecker, + ] async def build_image( self, image: Image, dry_run: bool = False, wait: bool = True, force: bool = False diff --git a/src/flyte/_internal/imagebuild/image_builder.py b/src/flyte/_internal/imagebuild/image_builder.py index be71cc6d8..42ebbab15 100644 --- a/src/flyte/_internal/imagebuild/image_builder.py +++ b/src/flyte/_internal/imagebuild/image_builder.py @@ -3,7 +3,6 @@ import asyncio import hashlib import json -import os import typing from importlib.metadata import entry_points from pathlib import Path From e0b54bacd4520e93412e45918cd843875df4bc07 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 17 Mar 2026 13:15:07 -0700 Subject: [PATCH 4/9] sqlite Signed-off-by: Kevin Su --- src/flyte/_code_bundle/bundle.py | 78 +++++++++++++++++++ .../_internal/imagebuild/image_builder.py | 61 +++++++++++---- .../imagebuild/test_image_build_engine.py | 2 +- 3 files changed, 126 insertions(+), 15 deletions(-) diff --git a/src/flyte/_code_bundle/bundle.py b/src/flyte/_code_bundle/bundle.py index 13fd109d7..c34f939c2 100644 --- a/src/flyte/_code_bundle/bundle.py +++ b/src/flyte/_code_bundle/bundle.py @@ -5,7 +5,10 @@ import logging import os import pathlib +import random +import sqlite3 import tempfile +import time from pathlib import Path from typing import TYPE_CHECKING, ClassVar, Type @@ -28,6 +31,61 @@ _pickled_file_extension = ".pkl.gz" _tar_file_extension = ".tar.gz" +_BUNDLE_CACHE_DB = Path.home() / ".flyte" / "cache" / "bundles.db" +_BUNDLE_CACHE_TTL_DAYS = 30 + + +def _get_bundle_cache_db() -> sqlite3.Connection: + """Open (and lazily initialize) the SQLite bundle cache database.""" + _BUNDLE_CACHE_DB.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(_BUNDLE_CACHE_DB)) + conn.execute( + "CREATE TABLE IF NOT EXISTS bundle_cache " + "(digest TEXT PRIMARY KEY, hash_digest TEXT NOT NULL, remote_path TEXT NOT NULL, " + "created_at REAL NOT NULL)" + ) + return conn + + +def _read_bundle_cache(digest: str) -> tuple[str, str] | None: + """Look up a previously uploaded bundle by its file digest. Returns (hash_digest, remote_path) or None.""" + try: + conn = _get_bundle_cache_db() + try: + cutoff = time.time() - _BUNDLE_CACHE_TTL_DAYS * 86400 + row = conn.execute( + "SELECT hash_digest, remote_path FROM bundle_cache WHERE digest = ? AND created_at > ?", + (digest, cutoff), + ).fetchone() + # Prune expired entries ~5% of the time to avoid doing it on every read + if random.random() < 0.05: + conn.execute("DELETE FROM bundle_cache WHERE created_at <= ?", (cutoff,)) + conn.commit() + if row: + return row[0], row[1] + finally: + conn.close() + except (OSError, sqlite3.Error) as e: + logger.debug(f"Failed to read bundle cache: {e}") + return None + + +def _write_bundle_cache(digest: str, hash_digest: str, remote_path: str) -> None: + """Persist a successfully uploaded bundle to the SQLite cache.""" + try: + conn = _get_bundle_cache_db() + try: + with conn: + conn.execute( + "INSERT OR REPLACE INTO bundle_cache (digest, hash_digest, remote_path, created_at) " + "VALUES (?, ?, ?, ?)", + (digest, hash_digest, remote_path, time.time()), + ) + finally: + conn.close() + except (OSError, sqlite3.Error) as e: + logger.debug(f"Failed to write bundle cache: {e}") + class _PklCache: _pkl_cache: ClassVar[AsyncLRUCache[str, str]] = AsyncLRUCache[str, str](maxsize=100) @@ -163,6 +221,15 @@ async def build_code_bundle( if logger.getEffectiveLevel() <= logging.INFO: print_ls_tree(from_dir, files) + # Check persistent cache before creating the tar bundle to avoid unnecessary work + if not dryrun: + cached = _read_bundle_cache(digest) + if cached: + hash_digest, remote_path = cached + status.success(f"Code bundle found in cache, skipping upload") + logger.debug(f"Code bundle cache hit: {remote_path}") + return CodeBundle(tgz=remote_path, destination=extract_dir, computed_version=hash_digest, files=files) + logger.debug("Building code bundle.") with tempfile.TemporaryDirectory() as tmp_dir: bundle_path, tar_size, archive_size = create_bundle( @@ -173,6 +240,7 @@ async def build_code_bundle( status.step("Uploading code bundle...") hash_digest, remote_path = await upload_file.aio(bundle_path) logger.debug(f"Code bundle uploaded to {remote_path}") + _write_bundle_cache(digest, hash_digest, remote_path) else: if copy_bundle_to: remote_path = str(copy_bundle_to / bundle_path.name) @@ -218,6 +286,15 @@ async def build_code_bundle_from_relative_paths( if logger.getEffectiveLevel() <= logging.INFO: print_ls_tree(from_dir, files) + # Check persistent cache before creating the tar bundle to avoid unnecessary work + if not dryrun: + cached = _read_bundle_cache(digest) + if cached: + hash_digest, remote_path = cached + status.success(f"Code bundle found in cache, skipping upload") + logger.debug(f"Code bundle cache hit: {remote_path}") + return CodeBundle(tgz=remote_path, destination=extract_dir, computed_version=hash_digest, files=files) + logger.debug("Building code bundle.") with tempfile.TemporaryDirectory() as tmp_dir: bundle_path, tar_size, archive_size = create_bundle(from_dir, pathlib.Path(tmp_dir), files, digest) @@ -226,6 +303,7 @@ async def build_code_bundle_from_relative_paths( status.step("Uploading code bundle...") hash_digest, remote_path = await upload_file.aio(bundle_path) logger.debug(f"Code bundle uploaded to {remote_path}") + _write_bundle_cache(digest, hash_digest, remote_path) else: remote_path = "na" if copy_bundle_to: diff --git a/src/flyte/_internal/imagebuild/image_builder.py b/src/flyte/_internal/imagebuild/image_builder.py index 42ebbab15..ab2cbfb95 100644 --- a/src/flyte/_internal/imagebuild/image_builder.py +++ b/src/flyte/_internal/imagebuild/image_builder.py @@ -3,6 +3,9 @@ import asyncio import hashlib import json +import random +import sqlite3 +import time import typing from importlib.metadata import entry_points from pathlib import Path @@ -17,7 +20,8 @@ from flyte._logging import logger from flyte._status import status -_IMAGE_CACHE_DIR = Path.home() / ".flyte" / "cache" / "images" +_IMAGE_CACHE_DB = Path.home() / ".flyte" / "cache" / "images.db" +_IMAGE_CACHE_TTL_DAYS = 30 if TYPE_CHECKING: from flyte._build import ImageBuild @@ -98,35 +102,64 @@ async def image_exists( def _cache_key(repository: str, tag: str, arch: Tuple[str, ...]) -> str: - """Return a filesystem-safe cache key for an image.""" + """Return a stable cache key for an image.""" raw = f"{repository}:{tag}:{','.join(sorted(arch))}" return hashlib.sha256(raw.encode()).hexdigest() +def _get_cache_db() -> sqlite3.Connection: + """Open (and lazily initialize) the SQLite image cache database.""" + _IMAGE_CACHE_DB.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(_IMAGE_CACHE_DB)) + conn.execute( + "CREATE TABLE IF NOT EXISTS image_cache " + "(key TEXT PRIMARY KEY, image_uri TEXT NOT NULL, " + "created_at REAL NOT NULL)" + ) + return conn + + def _write_image_cache(repository: str, tag: str, arch: Tuple[str, ...], image_uri: str) -> None: - """Persist a verified image URI to local disk cache.""" + """Persist a verified image URI to the SQLite cache.""" try: - _IMAGE_CACHE_DIR.mkdir(parents=True, exist_ok=True) - cache_file = _IMAGE_CACHE_DIR / _cache_key(repository, tag, arch) - cache_file.write_text(image_uri) - except OSError as e: + conn = _get_cache_db() + try: + with conn: + conn.execute( + "INSERT OR REPLACE INTO image_cache (key, image_uri, created_at) VALUES (?, ?, ?)", + (_cache_key(repository, tag, arch), image_uri, time.time()), + ) + finally: + conn.close() + except (OSError, sqlite3.Error) as e: logger.debug(f"Failed to write image cache: {e}") class PersistentCacheImageChecker(ImageChecker): - """Check if image was previously verified and cached on disk (~0ms).""" + """Check if image was previously verified and cached in SQLite (~0ms).""" @classmethod async def image_exists( cls, repository: str, tag: str, arch: Tuple[Architecture, ...] = ("linux/amd64",) ) -> Optional[str]: - cache_file = _IMAGE_CACHE_DIR / _cache_key(repository, tag, arch) try: - if cache_file.is_file(): - uri = cache_file.read_text().strip() - logger.debug(f"Image {uri} found in persistent cache") - return uri - except OSError as e: + conn = _get_cache_db() + try: + cutoff = time.time() - _IMAGE_CACHE_TTL_DAYS * 86400 + row = conn.execute( + "SELECT image_uri FROM image_cache WHERE key = ? AND created_at > ?", + (_cache_key(repository, tag, arch), cutoff), + ).fetchone() + # Prune expired entries ~5% of the time to avoid doing it on every read + if random.random() < 0.05: + conn.execute("DELETE FROM image_cache WHERE created_at <= ?", (cutoff,)) + conn.commit() + if row: + logger.debug(f"Image {row[0]} found in persistent cache") + return row[0] + finally: + conn.close() + except (OSError, sqlite3.Error) as e: logger.debug(f"Failed to read image cache: {e}") return None diff --git a/tests/flyte/imagebuild/test_image_build_engine.py b/tests/flyte/imagebuild/test_image_build_engine.py index 846e1c441..d89188686 100644 --- a/tests/flyte/imagebuild/test_image_build_engine.py +++ b/tests/flyte/imagebuild/test_image_build_engine.py @@ -35,7 +35,7 @@ def test_persistent_cache_write_and_read(tmp_path, monkeypatch): """PersistentCacheImageChecker reads back what _write_image_cache wrote.""" import flyte._internal.imagebuild.image_builder as ib - monkeypatch.setattr(ib, "_IMAGE_CACHE_DIR", tmp_path) + monkeypatch.setattr(ib, "_IMAGE_CACHE_DB", tmp_path / "images.db") # Initially nothing cached import asyncio From bd0bfcf3e40c73b2a52671be93f37a7facfc7bb5 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 17 Mar 2026 13:21:36 -0700 Subject: [PATCH 5/9] nit Signed-off-by: Kevin Su --- .../_internal/imagebuild/image_builder.py | 47 +++++++++++-------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/src/flyte/_internal/imagebuild/image_builder.py b/src/flyte/_internal/imagebuild/image_builder.py index ab2cbfb95..ff212530e 100644 --- a/src/flyte/_internal/imagebuild/image_builder.py +++ b/src/flyte/_internal/imagebuild/image_builder.py @@ -119,6 +119,29 @@ def _get_cache_db() -> sqlite3.Connection: return conn +def _read_image_cache(repository: str, tag: str, arch: Tuple[str, ...]) -> Optional[str]: + """Look up a previously verified image URI by repository, tag, and arch. Returns image_uri or None.""" + try: + conn = _get_cache_db() + try: + cutoff = time.time() - _IMAGE_CACHE_TTL_DAYS * 86400 + row = conn.execute( + "SELECT image_uri FROM image_cache WHERE key = ? AND created_at > ?", + (_cache_key(repository, tag, arch), cutoff), + ).fetchone() + # Prune expired entries ~5% of the time to avoid doing it on every read + if random.random() < 0.05: + conn.execute("DELETE FROM image_cache WHERE created_at <= ?", (cutoff,)) + conn.commit() + if row: + return row[0] + finally: + conn.close() + except (OSError, sqlite3.Error) as e: + logger.debug(f"Failed to read image cache: {e}") + return None + + def _write_image_cache(repository: str, tag: str, arch: Tuple[str, ...], image_uri: str) -> None: """Persist a verified image URI to the SQLite cache.""" try: @@ -142,26 +165,10 @@ class PersistentCacheImageChecker(ImageChecker): async def image_exists( cls, repository: str, tag: str, arch: Tuple[Architecture, ...] = ("linux/amd64",) ) -> Optional[str]: - try: - conn = _get_cache_db() - try: - cutoff = time.time() - _IMAGE_CACHE_TTL_DAYS * 86400 - row = conn.execute( - "SELECT image_uri FROM image_cache WHERE key = ? AND created_at > ?", - (_cache_key(repository, tag, arch), cutoff), - ).fetchone() - # Prune expired entries ~5% of the time to avoid doing it on every read - if random.random() < 0.05: - conn.execute("DELETE FROM image_cache WHERE created_at <= ?", (cutoff,)) - conn.commit() - if row: - logger.debug(f"Image {row[0]} found in persistent cache") - return row[0] - finally: - conn.close() - except (OSError, sqlite3.Error) as e: - logger.debug(f"Failed to read image cache: {e}") - return None + uri = _read_image_cache(repository, tag, arch) + if uri: + logger.debug(f"Image {uri} found in persistent cache") + return uri class LocalDockerCommandImageChecker(ImageChecker): From 65f9636a9bc357738c38f58ddbdc49c7758974dd Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 17 Mar 2026 13:22:29 -0700 Subject: [PATCH 6/9] lint Signed-off-by: Kevin Su --- src/flyte/_code_bundle/bundle.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/flyte/_code_bundle/bundle.py b/src/flyte/_code_bundle/bundle.py index c34f939c2..6b2d9b7dd 100644 --- a/src/flyte/_code_bundle/bundle.py +++ b/src/flyte/_code_bundle/bundle.py @@ -226,7 +226,7 @@ async def build_code_bundle( cached = _read_bundle_cache(digest) if cached: hash_digest, remote_path = cached - status.success(f"Code bundle found in cache, skipping upload") + status.success("Code bundle found in cache, skipping upload") logger.debug(f"Code bundle cache hit: {remote_path}") return CodeBundle(tgz=remote_path, destination=extract_dir, computed_version=hash_digest, files=files) @@ -291,7 +291,7 @@ async def build_code_bundle_from_relative_paths( cached = _read_bundle_cache(digest) if cached: hash_digest, remote_path = cached - status.success(f"Code bundle found in cache, skipping upload") + status.success("Code bundle found in cache, skipping upload") logger.debug(f"Code bundle cache hit: {remote_path}") return CodeBundle(tgz=remote_path, destination=extract_dir, computed_version=hash_digest, files=files) From 768cd77040299b3db6dcc74cb345a2bea2ee3027 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 17 Mar 2026 13:45:39 -0700 Subject: [PATCH 7/9] nit Signed-off-by: Kevin Su --- src/flyte/_code_bundle/bundle.py | 3 +-- .../_internal/imagebuild/image_builder.py | 19 ++++++++++++++++--- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/flyte/_code_bundle/bundle.py b/src/flyte/_code_bundle/bundle.py index 6b2d9b7dd..f866c5bdc 100644 --- a/src/flyte/_code_bundle/bundle.py +++ b/src/flyte/_code_bundle/bundle.py @@ -199,8 +199,6 @@ async def build_code_bundle( if copy_style == "none": raise ValueError("If copy_style is 'none', just don't make a code bundle") - status.step("Bundling code...") - logger.debug("Building code bundle.") from flyte.remote import upload_file if not ignore: @@ -230,6 +228,7 @@ async def build_code_bundle( logger.debug(f"Code bundle cache hit: {remote_path}") return CodeBundle(tgz=remote_path, destination=extract_dir, computed_version=hash_digest, files=files) + status.step("Bundling code...") logger.debug("Building code bundle.") with tempfile.TemporaryDirectory() as tmp_dir: bundle_path, tar_size, archive_size = create_bundle( diff --git a/src/flyte/_internal/imagebuild/image_builder.py b/src/flyte/_internal/imagebuild/image_builder.py index ff212530e..59d049dc1 100644 --- a/src/flyte/_internal/imagebuild/image_builder.py +++ b/src/flyte/_internal/imagebuild/image_builder.py @@ -44,7 +44,15 @@ class ImageChecker(Protocol): @classmethod async def image_exists( cls, repository: str, tag: str, arch: Tuple[Architecture, ...] = ("linux/amd64",) - ) -> Optional[str]: ... + ) -> Optional[str]: + """ + Check whether an image exists in a registry or cache. + + Returns the image URI if found, or None if the image definitively does not exist. + Raise an exception if existence cannot be determined (e.g. cache miss, network failure) + so the next checker in the chain gets a chance. + """ + ... class DockerAPIImageChecker(ImageChecker): @@ -168,7 +176,10 @@ async def image_exists( uri = _read_image_cache(repository, tag, arch) if uri: logger.debug(f"Image {uri} found in persistent cache") - return uri + return uri + # Cache miss — raise so the next checker in the chain gets a chance. + # Returning None would mean "image definitely doesn't exist". + raise LookupError(f"Image {repository}:{tag} not found in persistent cache") class LocalDockerCommandImageChecker(ImageChecker): @@ -252,11 +263,13 @@ async def image_exists(image: Image) -> Optional[str]: if checker is not PersistentCacheImageChecker: _write_image_cache(repository, tag, tuple(image.platform), image_uri) return image_uri + # Checker ran successfully and returned None — image not found + return None except Exception as e: logger.debug(f"Error checking image existence with {checker.__name__}: {e}") continue - # If all checkers fail, then assume the image exists. This is current flytekit behavior + # All checkers raised exceptions (e.g. network failures) — assume image exists status.info(f"All checkers failed to check existence of {image.uri}, assuming it exists") return image.uri From 72cfaf7517519bb5decd65f7a16a924e76f6180b Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 20 Mar 2026 03:14:23 -0700 Subject: [PATCH 8/9] Use local persistence Signed-off-by: Kevin Su --- src/flyte/_code_bundle/bundle.py | 59 ++++++++----------- .../_internal/imagebuild/image_builder.py | 55 ++++++----------- src/flyte/_persistence/_db.py | 31 ++++++++-- src/flyte/_persistence/_task_cache.py | 10 ++-- .../imagebuild/test_image_build_engine.py | 58 ++++++++++-------- 5 files changed, 106 insertions(+), 107 deletions(-) diff --git a/src/flyte/_code_bundle/bundle.py b/src/flyte/_code_bundle/bundle.py index f866c5bdc..c09b5fd2c 100644 --- a/src/flyte/_code_bundle/bundle.py +++ b/src/flyte/_code_bundle/bundle.py @@ -31,40 +31,27 @@ _pickled_file_extension = ".pkl.gz" _tar_file_extension = ".tar.gz" -_BUNDLE_CACHE_DB = Path.home() / ".flyte" / "cache" / "bundles.db" _BUNDLE_CACHE_TTL_DAYS = 30 -def _get_bundle_cache_db() -> sqlite3.Connection: - """Open (and lazily initialize) the SQLite bundle cache database.""" - _BUNDLE_CACHE_DB.parent.mkdir(parents=True, exist_ok=True) - conn = sqlite3.connect(str(_BUNDLE_CACHE_DB)) - conn.execute( - "CREATE TABLE IF NOT EXISTS bundle_cache " - "(digest TEXT PRIMARY KEY, hash_digest TEXT NOT NULL, remote_path TEXT NOT NULL, " - "created_at REAL NOT NULL)" - ) - return conn - - def _read_bundle_cache(digest: str) -> tuple[str, str] | None: """Look up a previously uploaded bundle by its file digest. Returns (hash_digest, remote_path) or None.""" + from flyte._persistence._db import LocalDB + try: - conn = _get_bundle_cache_db() - try: - cutoff = time.time() - _BUNDLE_CACHE_TTL_DAYS * 86400 - row = conn.execute( - "SELECT hash_digest, remote_path FROM bundle_cache WHERE digest = ? AND created_at > ?", - (digest, cutoff), - ).fetchone() - # Prune expired entries ~5% of the time to avoid doing it on every read - if random.random() < 0.05: + conn = LocalDB.get_sync() + cutoff = time.time() - _BUNDLE_CACHE_TTL_DAYS * 86400 + row = conn.execute( + "SELECT hash_digest, remote_path FROM bundle_cache WHERE digest = ? AND created_at > ?", + (digest, cutoff), + ).fetchone() + # Prune expired entries ~5% of the time to avoid doing it on every read + if random.random() < 0.05: + with LocalDB._write_lock: conn.execute("DELETE FROM bundle_cache WHERE created_at <= ?", (cutoff,)) conn.commit() - if row: - return row[0], row[1] - finally: - conn.close() + if row: + return row[0], row[1] except (OSError, sqlite3.Error) as e: logger.debug(f"Failed to read bundle cache: {e}") return None @@ -72,17 +59,17 @@ def _read_bundle_cache(digest: str) -> tuple[str, str] | None: def _write_bundle_cache(digest: str, hash_digest: str, remote_path: str) -> None: """Persist a successfully uploaded bundle to the SQLite cache.""" + from flyte._persistence._db import LocalDB + try: - conn = _get_bundle_cache_db() - try: - with conn: - conn.execute( - "INSERT OR REPLACE INTO bundle_cache (digest, hash_digest, remote_path, created_at) " - "VALUES (?, ?, ?, ?)", - (digest, hash_digest, remote_path, time.time()), - ) - finally: - conn.close() + conn = LocalDB.get_sync() + with LocalDB._write_lock: + conn.execute( + "INSERT OR REPLACE INTO bundle_cache (digest, hash_digest, remote_path, created_at) " + "VALUES (?, ?, ?, ?)", + (digest, hash_digest, remote_path, time.time()), + ) + conn.commit() except (OSError, sqlite3.Error) as e: logger.debug(f"Failed to write bundle cache: {e}") diff --git a/src/flyte/_internal/imagebuild/image_builder.py b/src/flyte/_internal/imagebuild/image_builder.py index 59d049dc1..e3a6ca01f 100644 --- a/src/flyte/_internal/imagebuild/image_builder.py +++ b/src/flyte/_internal/imagebuild/image_builder.py @@ -8,7 +8,6 @@ import time import typing from importlib.metadata import entry_points -from pathlib import Path from typing import TYPE_CHECKING, ClassVar, Dict, Optional, Tuple from async_lru import alru_cache @@ -18,9 +17,9 @@ from flyte._image import Architecture, Image from flyte._initialize import _get_init_config from flyte._logging import logger +from flyte._persistence._db import LocalDB from flyte._status import status -_IMAGE_CACHE_DB = Path.home() / ".flyte" / "cache" / "images.db" _IMAGE_CACHE_TTL_DAYS = 30 if TYPE_CHECKING: @@ -115,36 +114,22 @@ def _cache_key(repository: str, tag: str, arch: Tuple[str, ...]) -> str: return hashlib.sha256(raw.encode()).hexdigest() -def _get_cache_db() -> sqlite3.Connection: - """Open (and lazily initialize) the SQLite image cache database.""" - _IMAGE_CACHE_DB.parent.mkdir(parents=True, exist_ok=True) - conn = sqlite3.connect(str(_IMAGE_CACHE_DB)) - conn.execute( - "CREATE TABLE IF NOT EXISTS image_cache " - "(key TEXT PRIMARY KEY, image_uri TEXT NOT NULL, " - "created_at REAL NOT NULL)" - ) - return conn - - def _read_image_cache(repository: str, tag: str, arch: Tuple[str, ...]) -> Optional[str]: """Look up a previously verified image URI by repository, tag, and arch. Returns image_uri or None.""" try: - conn = _get_cache_db() - try: - cutoff = time.time() - _IMAGE_CACHE_TTL_DAYS * 86400 - row = conn.execute( - "SELECT image_uri FROM image_cache WHERE key = ? AND created_at > ?", - (_cache_key(repository, tag, arch), cutoff), - ).fetchone() - # Prune expired entries ~5% of the time to avoid doing it on every read - if random.random() < 0.05: + conn = LocalDB.get_sync() + cutoff = time.time() - _IMAGE_CACHE_TTL_DAYS * 86400 + row = conn.execute( + "SELECT image_uri FROM image_cache WHERE key = ? AND created_at > ?", + (_cache_key(repository, tag, arch), cutoff), + ).fetchone() + # Prune expired entries ~5% of the time to avoid doing it on every read + if random.random() < 0.05: + with LocalDB._write_lock: conn.execute("DELETE FROM image_cache WHERE created_at <= ?", (cutoff,)) conn.commit() - if row: - return row[0] - finally: - conn.close() + if row: + return row[0] except (OSError, sqlite3.Error) as e: logger.debug(f"Failed to read image cache: {e}") return None @@ -153,15 +138,13 @@ def _read_image_cache(repository: str, tag: str, arch: Tuple[str, ...]) -> Optio def _write_image_cache(repository: str, tag: str, arch: Tuple[str, ...], image_uri: str) -> None: """Persist a verified image URI to the SQLite cache.""" try: - conn = _get_cache_db() - try: - with conn: - conn.execute( - "INSERT OR REPLACE INTO image_cache (key, image_uri, created_at) VALUES (?, ?, ?)", - (_cache_key(repository, tag, arch), image_uri, time.time()), - ) - finally: - conn.close() + conn = LocalDB.get_sync() + with LocalDB._write_lock: + conn.execute( + "INSERT OR REPLACE INTO image_cache (key, image_uri, created_at) VALUES (?, ?, ?)", + (_cache_key(repository, tag, arch), image_uri, time.time()), + ) + conn.commit() except (OSError, sqlite3.Error) as e: logger.debug(f"Failed to write image cache: {e}") diff --git a/src/flyte/_persistence/_db.py b/src/flyte/_persistence/_db.py index 699aa19e3..7114f459f 100644 --- a/src/flyte/_persistence/_db.py +++ b/src/flyte/_persistence/_db.py @@ -22,6 +22,23 @@ ) """ +_IMAGE_CACHE_DDL = """ +CREATE TABLE IF NOT EXISTS image_cache ( + key TEXT PRIMARY KEY, + image_uri TEXT NOT NULL, + created_at REAL NOT NULL +) +""" + +_BUNDLE_CACHE_DDL = """ +CREATE TABLE IF NOT EXISTS bundle_cache ( + digest TEXT PRIMARY KEY, + hash_digest TEXT NOT NULL, + remote_path TEXT NOT NULL, + created_at REAL NOT NULL +) +""" + _RUNS_DDL = """ CREATE TABLE IF NOT EXISTS runs ( run_name TEXT NOT NULL, @@ -49,6 +66,8 @@ """ +_ALL_TABLE_DDLS = [_TASK_CACHE_DDL, _RUNS_DDL, _IMAGE_CACHE_DDL, _BUNDLE_CACHE_DDL] + _RUNS_INDEXES = [ "CREATE INDEX IF NOT EXISTS idx_runs_action_start ON runs (action_name, start_time)", "CREATE INDEX IF NOT EXISTS idx_runs_status_start ON runs (status, start_time)", @@ -114,16 +133,16 @@ async def initialize(): async def _initialize_async(): db_path = LocalDB._get_db_path() conn = await aiosqlite.connect(db_path) - await conn.execute(_TASK_CACHE_DDL) - await conn.execute(_RUNS_DDL) + for ddl in _ALL_TABLE_DDLS: + await conn.execute(ddl) for idx_stmt in _RUNS_INDEXES: await conn.execute(idx_stmt) await conn.commit() LocalDB._conn = conn # Also open a sync connection for sync callers sync_conn = sqlite3.connect(db_path, check_same_thread=False) - sync_conn.execute(_TASK_CACHE_DDL) - sync_conn.execute(_RUNS_DDL) + for ddl in _ALL_TABLE_DDLS: + sync_conn.execute(ddl) _migrate_sync(sync_conn) LocalDB._conn_sync = sync_conn LocalDB._initialized = True @@ -140,8 +159,8 @@ def initialize_sync(): def _initialize_sync_inner(): db_path = LocalDB._get_db_path() conn = sqlite3.connect(db_path, check_same_thread=False) - conn.execute(_TASK_CACHE_DDL) - conn.execute(_RUNS_DDL) + for ddl in _ALL_TABLE_DDLS: + conn.execute(ddl) _migrate_sync(conn) LocalDB._conn_sync = conn LocalDB._initialized = True diff --git a/src/flyte/_persistence/_task_cache.py b/src/flyte/_persistence/_task_cache.py index 1d1dc981f..94e6ccb5a 100644 --- a/src/flyte/_persistence/_task_cache.py +++ b/src/flyte/_persistence/_task_cache.py @@ -24,8 +24,9 @@ async def clear(): await conn.commit() else: conn = LocalDB.get_sync() - conn.execute("DELETE FROM task_cache") - conn.commit() + with LocalDB._write_lock: + conn.execute("DELETE FROM task_cache") + conn.commit() @staticmethod async def get(cache_key: str) -> convert.Outputs | None: @@ -76,8 +77,9 @@ async def _set_async(cache_key: str, value: convert.Outputs) -> None: def _set_sync(cache_key: str, value: convert.Outputs) -> None: conn = LocalDB.get_sync() output_bytes = value.proto_outputs.SerializeToString() - conn.execute("INSERT OR REPLACE INTO task_cache (key, value) VALUES (?, ?)", (cache_key, output_bytes)) - conn.commit() + with LocalDB._write_lock: + conn.execute("INSERT OR REPLACE INTO task_cache (key, value) VALUES (?, ?)", (cache_key, output_bytes)) + conn.commit() @staticmethod async def close(): diff --git a/tests/flyte/imagebuild/test_image_build_engine.py b/tests/flyte/imagebuild/test_image_build_engine.py index d89188686..b1c5490d7 100644 --- a/tests/flyte/imagebuild/test_image_build_engine.py +++ b/tests/flyte/imagebuild/test_image_build_engine.py @@ -34,31 +34,39 @@ async def test_cached(mock_checker_cache, mock_checker_cli, mock_checker_api): def test_persistent_cache_write_and_read(tmp_path, monkeypatch): """PersistentCacheImageChecker reads back what _write_image_cache wrote.""" import flyte._internal.imagebuild.image_builder as ib - - monkeypatch.setattr(ib, "_IMAGE_CACHE_DB", tmp_path / "images.db") - - # Initially nothing cached - import asyncio - - result = asyncio.get_event_loop().run_until_complete( - PersistentCacheImageChecker.image_exists("myrepo", "v1.0", ("linux/amd64",)) - ) - assert result is None - - # Write to cache - ib._write_image_cache("myrepo", "v1.0", ("linux/amd64",), "myrepo:v1.0") - - # Now it should be found - result = asyncio.get_event_loop().run_until_complete( - PersistentCacheImageChecker.image_exists("myrepo", "v1.0", ("linux/amd64",)) - ) - assert result == "myrepo:v1.0" - - # Different arch should NOT be found - result = asyncio.get_event_loop().run_until_complete( - PersistentCacheImageChecker.image_exists("myrepo", "v1.0", ("linux/arm64",)) - ) - assert result is None + from flyte._persistence._db import LocalDB + + monkeypatch.setattr(LocalDB, "_get_db_path", staticmethod(lambda: str(tmp_path / "cache.db"))) + monkeypatch.setattr(LocalDB, "_initialized", False) + monkeypatch.setattr(LocalDB, "_conn_sync", None) + monkeypatch.setattr(LocalDB, "_conn", None) + LocalDB.initialize_sync() + + try: + # Initially nothing cached — PersistentCacheImageChecker raises LookupError on miss + import asyncio + + with pytest.raises(LookupError): + asyncio.get_event_loop().run_until_complete( + PersistentCacheImageChecker.image_exists("myrepo", "v1.0", ("linux/amd64",)) + ) + + # Write to cache + ib._write_image_cache("myrepo", "v1.0", ("linux/amd64",), "myrepo:v1.0") + + # Now it should be found + result = asyncio.get_event_loop().run_until_complete( + PersistentCacheImageChecker.image_exists("myrepo", "v1.0", ("linux/amd64",)) + ) + assert result == "myrepo:v1.0" + + # Different arch should NOT be found + with pytest.raises(LookupError): + asyncio.get_event_loop().run_until_complete( + PersistentCacheImageChecker.image_exists("myrepo", "v1.0", ("linux/arm64",)) + ) + finally: + LocalDB.close_sync() @mock.patch("flyte._internal.imagebuild.image_builder.ImageBuildEngine._get_builder") From 729ac80f5e1aa6d322e23a4b4b2403a1ec6df4df Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 20 Mar 2026 03:20:23 -0700 Subject: [PATCH 9/9] lint Signed-off-by: Kevin Su --- plugins/hitl/src/flyteplugins/hitl/_event.py | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/hitl/src/flyteplugins/hitl/_event.py b/plugins/hitl/src/flyteplugins/hitl/_event.py index e293cd35a..5a7f97820 100644 --- a/plugins/hitl/src/flyteplugins/hitl/_event.py +++ b/plugins/hitl/src/flyteplugins/hitl/_event.py @@ -346,6 +346,7 @@ async def show_form(form_url: str, api_url: str, curl_body: str, type_name: str) await flyte.report.flush.aio() return html_report + @flyte.trace async def wait_for_input_event( name: str,