From a3e03ffb2cace4484d5f6969b64b279bc3f31dc9 Mon Sep 17 00:00:00 2001 From: Hans Knecht Date: Sat, 13 Dec 2025 14:31:40 -0700 Subject: [PATCH] feat: support other manifest types --- main.py | 145 +++++++++++++++++++++++++++++++++++++++++++++++---- test_main.py | 104 ++++++++++++++++++++++++++++++++++-- 2 files changed, 235 insertions(+), 14 deletions(-) diff --git a/main.py b/main.py index 094cf91..669bd2b 100755 --- a/main.py +++ b/main.py @@ -1,11 +1,14 @@ #!/usr/bin/env python3 import boto3 -import sys -from kubernetes import client, config -import os +import json import logging +import os +import sys +from collections import defaultdict from datetime import datetime, timedelta + import pytz +from kubernetes import client, config logger = logging.getLogger("ecr-image-cleanup") hdlr = logging.StreamHandler() @@ -17,16 +20,117 @@ # Set a standard Global Localization UTC = pytz.UTC +CONTAINER_MANIFEST_MEDIA_TYPES = { + "application/vnd.docker.distribution.manifest.v2+json", + "application/vnd.oci.image.manifest.v1+json", +} + +MANIFEST_LIST_MEDIA_TYPES = { + "application/vnd.docker.distribution.manifest.list.v2+json", + "application/vnd.oci.image.index.v1+json", +} + +# These artifact media types represent standard image config blobs, not ancillary artifacts. +SAFE_ARTIFACT_MEDIA_TYPES = { + None, + "application/vnd.oci.image.config.v1+json", + "application/vnd.docker.container.image.v1+json", +} + + +def is_container_manifest(image: dict) -> bool: + """ + Determine whether the ECR entry is an actual runnable container manifest. + Artifacts such as cosign signatures set artifactMediaType and should be skipped. + """ + media_type = image.get("imageManifestMediaType") + artifact_media_type = image.get("artifactMediaType") + + if media_type in MANIFEST_LIST_MEDIA_TYPES: + return True + + if media_type in CONTAINER_MANIFEST_MEDIA_TYPES: + if artifact_media_type in SAFE_ARTIFACT_MEDIA_TYPES: + return True + logger.debug( + "Skipping artifact %s with unsupported artifact media type %s", + image.get("imageDigest"), + artifact_media_type, + ) + return False + + logger.debug( + "Skipping %s with unsupported manifest media type %s", + image.get("imageDigest"), + media_type, + ) + return False + + +def extract_subject_digest(manifest_body: str) -> str | None: + """ + Pull the OCI subject digest from an artifact manifest body if present. + """ + try: + manifest = json.loads(manifest_body) + except json.JSONDecodeError: + logger.warning("Unable to decode artifact manifest body") + return None + + subject = manifest.get("subject") + if isinstance(subject, dict): + return subject.get("digest") + return None + + +def get_artifact_subject_digest( + client: boto3.client, + registry_id: str, + repository: dict, + image: dict, +) -> str | None: + """ + Retrieve the subject digest for an artifact manifest so we can associate it with its image. + """ + media_type = image.get("imageManifestMediaType") + if not media_type: + return None + + try: + response = client.batch_get_image( + registryId=registry_id, + repositoryName=repository["repository_name"], + imageIds=[{"imageDigest": image["imageDigest"]}], + acceptedMediaTypes=[media_type], + ) + except client.exceptions.ImageNotFoundException: + logger.warning("Artifact %s no longer exists", image.get("imageDigest")) + return None + + images = response.get("images", []) + if not images: + logger.debug("No artifact data returned for %s", image.get("imageDigest")) + return None + + manifest_body = images[0].get("imageManifest") + if not manifest_body: + logger.debug("Artifact manifest missing for %s", image.get("imageDigest")) + return None + + subject_digest = extract_subject_digest(manifest_body) + if not subject_digest: + logger.debug( + "Artifact %s manifest has no subject digest", image.get("imageDigest") + ) + return subject_digest + def build_image_uri(image: dict, repository: dict) -> dict: logger.debug(image) logger.debug( "Only include actual container images not .sig files which are a different mediaType" ) - if ( - image["imageManifestMediaType"] - == "application/vnd.docker.distribution.manifest.v2+json" - ): + if is_container_manifest(image): if "imageTags" in image: image["image_uri"] = ( f"{repository['repository_uri']}:{image['imageTags'][0]}" @@ -157,7 +261,7 @@ def get_ecr_repositories( def get_ecr_images( client: boto3.client, registry_id: str, repositories: list -) -> list: # pragma: no cover +) -> tuple[list, dict]: # pragma: no cover """ :param client boto3.client: :param repositories list: @@ -165,6 +269,7 @@ def get_ecr_images( """ logging.debug("Attempting to retrieve a list of images in ECR") images = [] + artifact_index = defaultdict(list) paginator = client.get_paginator("describe_images") for repository in repositories: for response in paginator.paginate( @@ -189,10 +294,25 @@ def get_ecr_images( f"Image {repository['repository_uri']}@{imageDetails[0]['imageDigest']} is the only image in the repository skipping and hasn't been pulled in 7 days, consider deleting" ) break - images = append_image(images, imageDetails, repository) + for image in imageDetails: + built_image = build_image_uri(image, repository) + if built_image: + images.append(built_image) + else: + subject_digest = get_artifact_subject_digest( + client, registry_id, repository, image + ) + if subject_digest: + artifact_entry = image.copy() + artifact_entry["repository_uri"] = repository["repository_uri"] + artifact_entry["image_uri"] = ( + f"{repository['repository_uri']}@{image['imageDigest']}" + ) + artifact_entry["subjectDigest"] = subject_digest + artifact_index[subject_digest].append(artifact_entry) logger.debug(f"{images=}") - return images + return images, dict(artifact_index) def is_image_pushed_recently(image: dict) -> bool: @@ -368,12 +488,15 @@ def main(): # pragma: no cover ) sys.exit(1) repositories = get_ecr_repositories(client, registry_id) - ecr_images = get_ecr_images(client, registry_id, repositories) + ecr_images, artifact_index = get_ecr_images(client, registry_id, repositories) k8s_images = get_images_from_workloads() for image in ecr_images: logger.debug(f"{image=}") if is_image_deletable(image, k8s_images): deletable_images.append(image) + related_artifacts = artifact_index.get(image["imageDigest"], []) + if related_artifacts: + deletable_images.extend(related_artifacts) else: keepable_images.append(image["image_uri"]) diff --git a/test_main.py b/test_main.py index d9e9e27..f7bbb16 100644 --- a/test_main.py +++ b/test_main.py @@ -1,6 +1,7 @@ +from datetime import datetime, timedelta + import main import pytest -from datetime import datetime, timedelta import pytz UTC = pytz.UTC @@ -33,6 +34,20 @@ "image_uri": "testing@digest", }, ), + ( + { + "imageManifestMediaType": "application/vnd.docker.distribution.manifest.v2+json", + "artifactMediaType": "application/vnd.oci.image.config.v1+json", + "imageTags": ["test"], + }, + {"repository_uri": "testing"}, + { + "imageManifestMediaType": "application/vnd.docker.distribution.manifest.v2+json", + "artifactMediaType": "application/vnd.oci.image.config.v1+json", + "imageTags": ["test"], + "image_uri": "testing:test", + }, + ), ( { "imageManifestMediaType": "test", @@ -43,18 +58,95 @@ ), ( { - "imageManifestMediaType": "application/vnd.oci.image.manifest.v1+json", + "imageManifestMediaType": "application/vnd.docker.distribution.manifest.v2+json", + "artifactMediaType": "application/vnd.dev.cosign.simplesigning.v1+json", "imageTags": ["test"], }, {"repository_uri": "testing"}, None, ), + ( + { + "imageManifestMediaType": "application/vnd.oci.image.manifest.v1+json", + "imageTags": ["test"], + }, + {"repository_uri": "testing"}, + { + "imageManifestMediaType": "application/vnd.oci.image.manifest.v1+json", + "imageTags": ["test"], + "image_uri": "testing:test", + }, + ), ], ) def test_build_image_uri(image, repository, result): assert (main.build_image_uri(image, repository)) == result +@pytest.mark.parametrize( + "image,result", + [ + ( + { + "imageManifestMediaType": "application/vnd.docker.distribution.manifest.v2+json" + }, + True, + ), + ( + { + "imageManifestMediaType": "application/vnd.docker.distribution.manifest.v2+json", + "artifactMediaType": "application/vnd.oci.image.config.v1+json", + }, + True, + ), + ( + { + "imageManifestMediaType": "application/vnd.oci.image.manifest.v1+json", + "artifactMediaType": "application/vnd.docker.container.image.v1+json", + }, + True, + ), + ( + { + "imageManifestMediaType": "application/vnd.oci.image.index.v1+json", + }, + True, + ), + ( + { + "imageManifestMediaType": "application/vnd.docker.distribution.manifest.list.v2+json", + }, + True, + ), + ( + { + "imageManifestMediaType": "application/vnd.docker.distribution.manifest.v2+json", + "artifactMediaType": "application/vnd.dev.cosign.simplesigning.v1+json", + }, + False, + ), + ( + {"imageManifestMediaType": "other"}, + False, + ), + ], +) +def test_is_container_manifest(image, result): + assert main.is_container_manifest(image) == result + + +@pytest.mark.parametrize( + "manifest,result", + [ + ('{"subject": {"digest": "sha256:abc"}}', "sha256:abc"), + ('{"subject": {}}', None), + ("{}", None), + ], +) +def test_extract_subject_digest(manifest, result): + assert main.extract_subject_digest(manifest) == result + + @pytest.mark.parametrize( "image,imageDetails,repository,result", [ @@ -112,7 +204,13 @@ def test_build_image_uri(image, repository, result): } ], {"repository_uri": "testing"}, - [], + [ + { + "imageManifestMediaType": "application/vnd.oci.image.manifest.v1+json", + "imageTags": ["test"], + "image_uri": "testing:test", + } + ], ), ], )