diff --git a/src/nyl/commands/template.py b/src/nyl/commands/template.py index e48ff980..80a6e23a 100644 --- a/src/nyl/commands/template.py +++ b/src/nyl/commands/template.py @@ -21,7 +21,7 @@ from nyl.profiles import DEFAULT_PROFILE, ProfileManager from nyl.project.config import ProjectConfig from nyl.resources import API_VERSION_INLINE, NylResource -from nyl.resources.applyset import APPLYSET_LABEL_PART_OF, ApplySet +from nyl.resources.applyset import ApplySet, ApplySetContext, ApplySetManager from nyl.resources.postprocessor import PostProcessor from nyl.secrets.config import SecretsConfig from nyl.templating import NylTemplateEngine @@ -173,11 +173,6 @@ def template( ) exit(1) - if apply: - # When running with --apply, we must ensure that the --applyset-part-of option is disabled, as it would cause - # an error when passing the generated manifests to `kubectl apply --applyset=...`. - applyset_part_of = False - if apply and diff: logger.error("The --apply and --diff options cannot be combined.") exit(1) @@ -288,60 +283,47 @@ def worker() -> ResourceList: source.resources, post_processors = PostProcessor.extract_from_list(source.resources) # Find the namespaces that are defined in the file. If we find any resources without a namespace, we will - # inject that namespace name into them. Also find the applyset defined in the file. + # inject that namespace name into them. namespaces: set[str] = set() - applyset: ApplySet | None = None for resource in list(source.resources): if is_namespace_resource(resource): namespaces.add(resource["metadata"]["name"]) - elif ApplySet.matches(resource): - if applyset is not None: - logger.opt(colors=True).error( - "Multiple ApplySet resources defined in {}, there can only be one per source.", - source.file, - ) - exit(1) - applyset = ApplySet.load(resource) - source.resources.remove(resource) - - if not applyset and project.config.settings.generate_applysets: + + # Create an ApplySet if configured to do so + applyset: ApplySet | None = None + if project.config.settings.generate_applysets: if not current_default_namespace: logger.opt(colors=True).error( "No default namespace defined for {}, but it is required for the automatically " - "generated nyl.io/v1/ApplySet resource (the ApplySet is named after the default namespace).", + "generated ApplySet ConfigMap (the ApplySet is named after the default namespace).", source.file, ) exit(1) - applyset_name = current_default_namespace - applyset = ApplySet.new(applyset_name) + applyset_name = f"nyl.applyset.{current_default_namespace}" + applyset = ApplySet.new(applyset_name, current_default_namespace) + + # Build context information for the ApplySet from environment + applyset.context = ApplySetContext.from_environment(files=[str(source.file)]) + logger.opt(colors=True).info( - "Automatically creating ApplySet for {} (name: {}).", source.file, applyset_name + "Automatically creating ApplySet for {} (name: {}, namespace: {}).", + source.file, + applyset_name, + current_default_namespace, ) + # Create the ApplySetManager to handle apply/diff operations + applyset_manager = ApplySetManager( + client=client, applyset=applyset, add_part_of_labels=applyset_part_of + ) + if applyset is not None: applyset.set_group_kinds(source.resources) - # HACK: Kubectl 1.30 can't create the custom resource, so we need to create it. But it will also reject - # using the custom resource unless it has the tooling label set appropriately. For more details, see - # https://github.com/helsing-ai/nyl/issues/5. applyset.tooling = f"kubectl/v{generator.kube_version}" applyset.validate() - if apply: - # We need to ensure that ApplySet parent object exists before invoking `kubectl apply --applyset=...`. - logger.opt(colors=True).info( - "Kubectl-apply ApplySet resource {} from {}.", - applyset.reference, - source.file, - ) - kubectl.apply(ResourceList([applyset.dump()]), force_conflicts=True) - elif diff: - kubectl.diff(ResourceList([applyset.dump()])) - else: - print("---") - print(yaml.dumps(applyset.dump())) - # Validate resources. for resource in source.resources: # Inline resources often don't have metadata and they are not persisted to the cluster, hence @@ -358,32 +340,59 @@ def worker() -> ResourceList: ) exit(1) - # Tag resources as part of the current apply set, if any. - if applyset is not None and applyset_part_of: - for resource in source.resources: - if APPLYSET_LABEL_PART_OF not in (labels := resource["metadata"].setdefault("labels", {})): - labels[APPLYSET_LABEL_PART_OF] = applyset.id - populate_namespace_to_resources(source.resources, current_default_namespace) drop_empty_metadata_labels(source.resources) # Now apply the post-processor. source.resources = PostProcessor.apply_all(source.resources, post_processors, source.file) + # Prepare resources with ApplySet (adds part-of labels and includes ConfigMap) + resources_to_apply = applyset_manager.prepare_resources(source.resources) + + # Find deleted resources (resources that exist in cluster but not in manifest) + deleted_resources = applyset_manager.get_deleted_resources(source.resources) + if apply: - logger.info("Kubectl-apply {} resource(s) from '{}'", len(source.resources), source.file) + logger.info("Kubectl-apply {} resource(s) from '{}'", len(resources_to_apply), source.file) + # Note: We don't use kubectl's --applyset flag because it has limitations with multi-namespace deployments. + # Instead, we manually set the applyset.kubernetes.io/part-of label on all resources and include the + # ApplySet ConfigMap with the rest of the resources. kubectl.apply( - manifests=source.resources, - applyset=applyset.reference if applyset else None, - prune=True if applyset else False, + manifests=resources_to_apply, force_conflicts=True, ) + + # Delete resources that are no longer in the manifest + if deleted_resources: + logger.info("Deleting {} resource(s) that are no longer in the manifest", len(deleted_resources)) + for resource in deleted_resources: + metadata = resource.get("metadata", {}) + resource_name = f"{resource.get('kind', 'unknown')}/{metadata.get('name', 'unknown')}" + if metadata.get("namespace"): + resource_name = f"{metadata['namespace']}/{resource_name}" + logger.opt(colors=True).info("Deleting {}", resource_name) + try: + kubectl.delete(resource) + except Exception as e: + logger.warning("Failed to delete {}: {}", resource_name, e) + elif diff: - logger.info("Kubectl-diff {} resource(s) from '{}'", len(source.resources), source.file) - kubectl.diff(manifests=source.resources, applyset=applyset) + logger.info("Kubectl-diff {} resource(s) from '{}'", len(resources_to_apply), source.file) + kubectl.diff(manifests=resources_to_apply, applyset=applyset) + + # Show deleted resources in diff output + if deleted_resources: + print("\n# Resources to be DELETED (no longer in manifest):") + for resource in deleted_resources: + metadata = resource.get("metadata", {}) + resource_name = f"{resource.get('kind', 'unknown')}/{metadata.get('name', 'unknown')}" + if metadata.get("namespace"): + resource_name = f"{metadata['namespace']}/{resource_name}" + print(f"# - {resource_name}") + else: # If we're not going to be applying the resources immediately via `kubectl`, we print them to stdout. - for resource in source.resources: + for resource in resources_to_apply: print("---") print(yaml.dumps(resource)) diff --git a/src/nyl/resources/applyset.py b/src/nyl/resources/applyset.py index 5718c251..47428b01 100644 --- a/src/nyl/resources/applyset.py +++ b/src/nyl/resources/applyset.py @@ -1,12 +1,16 @@ import base64 import hashlib -from dataclasses import dataclass -from typing import Annotated, ClassVar +import json +import os +from dataclasses import dataclass, field +from typing import Any +from loguru import logger -from databind.core import SerializeDefaults +from kubernetes.client.api_client import ApiClient +from kubernetes.dynamic import DynamicClient +from kubernetes.dynamic.exceptions import NotFoundError -from nyl.resources import API_VERSION_K8S, NylResource, ObjectMetadata -from nyl.tools.types import ResourceList +from nyl.tools.types import Resource, ResourceList APPLYSET_LABEL_PART_OF = "applyset.kubernetes.io/part-of" """ Label key to use to associate objects with an ApplySet resource. """ @@ -20,137 +24,143 @@ APPLYSET_ANNOTATION_CONTAINS_GROUP_KINDS = "applyset.kubernetes.io/contains-group-kinds" """ Annotation key to use on ApplySet resources to specify the kinds of resources that are part of the ApplySet. """ +NYL_ANNOTATION_LAST_APPLIED_CONTEXT = "nyl.io/last-applied-context" +""" +Annotation key to store contextual information about the last applied configuration. +Contains a JSON object with fields like: +- source: "cli" or "argocd" +- revision: Git commit hash (when available via ArgoCD) +- files: List of manifest file names used +""" -@dataclass(kw_only=True) -class ApplySet(NylResource, api_version=API_VERSION_K8S): - """ - An ApplySet functions as a grouping mechanism for a set of objects that are applied together. This is a standard - Kubernetes mechanism that needs to be implemented as a custom resource. To read more about ApplySets, check out the - following article: - - https://kubernetes.io/blog/2023/05/09/introducing-kubectl-applyset-pruning/ - - Nyl's ApplySet resource is not namespaces. - When loading manifests from a file, Nyl looks for an ApplySet resource to determine if the manifests are to be - associated with an ApplySet. +@dataclass +class ApplySetContext: + """ + Contextual information about when/how the ApplySet was last applied. """ - # HACK: Can't set it on the class level, see https://github.com/NiklasRosenstein/python-databind/issues/73. - metadata: Annotated[ObjectMetadata, SerializeDefaults(False)] - - # note: the only purpose of this CRD is to create resources that act as a parent for ApplySets. - # check out this GitHub issue, and specifically this comment for more information: - # https://github.com/kubernetes/enhancements/issues/3659#issuecomment-1753091733 - CRD: ClassVar = { - "apiVersion": "apiextensions.k8s.io/v1", - "kind": "CustomResourceDefinition", - "metadata": { - "name": f"applysets.{API_VERSION_K8S.split('/')[0]}", - "labels": { - "applyset.kubernetes.io/is-parent-type": "true", - }, - }, - "spec": { - "group": API_VERSION_K8S.split("/")[0], - "names": { - "kind": "ApplySet", - "plural": "applysets", - }, - "scope": "Cluster", - "versions": [ - { - "name": "v1", - "served": True, - "storage": True, - "schema": { - "openAPIV3Schema": { - "type": "object", - } - }, - } - ], - }, - } + source: str + """The source of the apply operation: "cli" or "argocd".""" + + files: list[str] = field(default_factory=list) + """List of manifest file names used to generate the resources.""" + + revision: str | None = None + """Git commit hash (when available, e.g., via ArgoCD).""" + + app_name: str | None = None + """ArgoCD application name (when running via ArgoCD).""" + + app_namespace: str | None = None + """ArgoCD application namespace (when running via ArgoCD).""" + + project_name: str | None = None + """ArgoCD project name (when running via ArgoCD).""" + + source_path: str | None = None + """ArgoCD source path (when running via ArgoCD).""" + + source_repo_url: str | None = None + """ArgoCD source repository URL (when running via ArgoCD).""" + + target_revision: str | None = None + """ArgoCD target revision (when running via ArgoCD).""" + + kube_version: str | None = None + """Kubernetes version (from KUBE_VERSION env var).""" + + def to_json(self) -> str: + """Serialize the context to a JSON string.""" + data: dict[str, Any] = {"source": self.source} + if self.files: + data["files"] = self.files + if self.revision: + data["revision"] = self.revision + if self.app_name: + data["app_name"] = self.app_name + if self.app_namespace: + data["app_namespace"] = self.app_namespace + if self.project_name: + data["project_name"] = self.project_name + if self.source_path: + data["source_path"] = self.source_path + if self.source_repo_url: + data["source_repo_url"] = self.source_repo_url + if self.target_revision: + data["target_revision"] = self.target_revision + if self.kube_version: + data["kube_version"] = self.kube_version + return json.dumps(data, separators=(",", ":")) - @property - def reference(self) -> str: - """ - Return the refernce to this ApplySet resource that can be given to the `--applyset` flag of `kubectl apply`. + @staticmethod + def from_environment(files: list[str] | None = None) -> "ApplySetContext": """ + Create an ApplySetContext from the current environment. - return f"applysets.{self.API_VERSION.split('/')[0]}/{self.metadata.name}" - - @property - def id(self) -> str | None: - """ - Returns the ID of the ApplySet as it is configured in the `applyset.kubernetes.io/id` label. - """ + Detects whether running via ArgoCD (by checking ARGOCD_APP_NAME env var) + and populates the context accordingly. - if self.metadata.labels is not None: - return self.metadata.labels.get(APPLYSET_LABEL_ID) - return None + Args: + files: List of manifest file names used to generate the resources. - @id.setter - def id(self, value: str) -> None: - """ - Set the ID of the ApplySet. + Returns: + An ApplySetContext populated from environment variables. """ + argocd_app_name = os.getenv("ARGOCD_APP_NAME") + kube_version = os.getenv("KUBE_VERSION") + + if argocd_app_name: + # Running via ArgoCD + return ApplySetContext( + source="argocd", + files=files or [], + revision=os.getenv("ARGOCD_APP_REVISION"), + app_name=argocd_app_name, + app_namespace=os.getenv("ARGOCD_APP_NAMESPACE"), + project_name=os.getenv("ARGOCD_APP_PROJECT_NAME"), + source_path=os.getenv("ARGOCD_APP_SOURCE_PATH"), + source_repo_url=os.getenv("ARGOCD_APP_SOURCE_REPO_URL"), + target_revision=os.getenv("ARGOCD_APP_SOURCE_TARGET_REVISION"), + kube_version=kube_version, + ) + else: + # Running via CLI + return ApplySetContext( + source="cli", + files=files or [], + kube_version=kube_version, + ) - if self.metadata.labels is None: - self.metadata.labels = {} - self.metadata.labels[APPLYSET_LABEL_ID] = value - - def calculate_id(self) -> str: - """ - Calculate the ID of the ApplySet based on the name and namespace of the ApplySet. - """ - return calculate_applyset_id( - name=self.metadata.name, namespace=self.metadata.namespace or "", group=self.API_VERSION.split("/")[0] - ) +@dataclass +class ApplySet: + """ + An ApplySet functions as a grouping mechanism for a set of objects that are applied together. This uses a + ConfigMap as the parent resource, which is the recommended approach for ApplySets. - @property - def tooling(self) -> str | None: - """ - Returns the tooling that was used to apply the ApplySet. - """ + To read more about ApplySets, check out the following article: + https://kubernetes.io/blog/2023/05/09/introducing-kubectl-applyset-pruning/ - if self.metadata.annotations is not None: - return self.metadata.annotations.get(APPLYSET_ANNOTATION_TOOLING) - return None + The ApplySet is namespace-scoped (using a ConfigMap) and should be placed in the "default" namespace + determined by Nyl's namespace resolution logic. - @tooling.setter - def tooling(self, value: str) -> None: - """ - Set the tooling that was used to apply the ApplySet. - """ + When loading manifests from a file, Nyl looks for an ApplySet definition to determine if the manifests are to be + associated with an ApplySet. + """ - if self.metadata.annotations is None: - self.metadata.annotations = {} - self.metadata.annotations[APPLYSET_ANNOTATION_TOOLING] = value + name: str + namespace: str + tooling: str = "" + contains_group_kinds: list[str] = field(default_factory=list) + context: ApplySetContext | None = None @property - def contains_group_kinds(self) -> list[str] | None: - """ - Returns the kinds of resources that are part of the ApplySet. + def id(self) -> str: """ - - if self.metadata.annotations is not None: - value = self.metadata.annotations.get(APPLYSET_ANNOTATION_CONTAINS_GROUP_KINDS) - if value is not None: - return value.split(",") - return None - - @contains_group_kinds.setter - def contains_group_kinds(self, value: list[str]) -> None: - """ - Set the kinds of resources that are part of the ApplySet. + Returns the ID of the ApplySet. """ - - if self.metadata.annotations is None: - self.metadata.annotations = {} - self.metadata.annotations[APPLYSET_ANNOTATION_CONTAINS_GROUP_KINDS] = ",".join(sorted(value)) + return calculate_applyset_id(name=self.name, namespace=self.namespace) def set_group_kinds(self, manifests: ResourceList) -> None: """ @@ -161,62 +171,79 @@ def set_group_kinds(self, manifests: ResourceList) -> None: for manifest in manifests: if "kind" in manifest: kinds.add(get_canonical_resource_kind_name(manifest["apiVersion"], manifest["kind"])) - self.contains_group_kinds = list(kinds) + self.contains_group_kinds = sorted(kinds) def validate(self) -> None: """ Validate the ApplySet configuration. - Mutations: - - Sets the `applyset.kubernetes.io/id` label on the metadata of the ApplySet resource if it is not set. - Raises: ValueError: - - If the resource is namespaced. - - If the annotations has no `applyset.kubernetes.io/tooling` key. - - If the annotations has no `applyset.kubernetes.io/contains-group-kinds` key. - - If the `applyset.kubernetes.io/id` label has an invalid value. + - If the name is empty. + - If the namespace is empty. + - If the tooling is not set. + - If the contains_group_kinds is empty. """ - if self.metadata.namespace: - raise ValueError("ApplySet resources cannot be namespaced") + if not self.name: + raise ValueError("ApplySet name cannot be empty") + + if not self.namespace: + raise ValueError("ApplySet namespace cannot be empty") - if self.metadata.labels is None: - self.metadata.labels = {} + if not self.tooling: + raise ValueError(f"ApplySet must have a {APPLYSET_ANNOTATION_TOOLING!r} annotation") - if self.id is None: - self.id = self.calculate_id() - elif self.id != self.calculate_id(): - raise ValueError(f"Invalid {APPLYSET_LABEL_ID!r} label value: {self.id!r}") + if not self.contains_group_kinds: + raise ValueError(f"ApplySet must have a {APPLYSET_ANNOTATION_CONTAINS_GROUP_KINDS!r} annotation") - if self.tooling is None: - raise ValueError(f"ApplySet resource must have a {APPLYSET_ANNOTATION_TOOLING!r} annotation") + def dump(self) -> Resource: + """ + Dump the ApplySet as a ConfigMap resource with the appropriate annotations and labels. + """ - if self.contains_group_kinds is None: - raise ValueError(f"ApplySet resource must have a {APPLYSET_ANNOTATION_CONTAINS_GROUP_KINDS!r} annotation") + annotations: dict[str, str] = { + APPLYSET_ANNOTATION_TOOLING: self.tooling, + APPLYSET_ANNOTATION_CONTAINS_GROUP_KINDS: ",".join(self.contains_group_kinds), + } + + # Add the context annotation if available + if self.context is not None: + annotations[NYL_ANNOTATION_LAST_APPLIED_CONTEXT] = self.context.to_json() + + return Resource({ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": { + "name": self.name, + "namespace": self.namespace, + "annotations": annotations, + "labels": { + APPLYSET_LABEL_ID: self.id, + }, + }, + }) @staticmethod - def new(name: str) -> "ApplySet": + def new(name: str, namespace: str) -> "ApplySet": """ - Create a new ApplySet resource with the specified name. + Create a new ApplySet with the specified name and namespace. """ - return ApplySet( - metadata=ObjectMetadata( - name=name, - namespace=None, - ) - ) + return ApplySet(name=name, namespace=namespace) -def calculate_applyset_id(*, name: str, namespace: str = "", group: str) -> str: +def calculate_applyset_id(*, name: str, namespace: str) -> str: """ - Calculate the ID of a Kubernetes ApplySet with the specified name. + Calculate the ID of a Kubernetes ApplySet with the specified name and namespace. + The ID is based on a ConfigMap parent resource. """ # reference: https://kubernetes.io/docs/reference/labels-annotations-taints/#applyset-kubernetes-io-id - hash = hashlib.sha256(f"{name}.{namespace}.ApplySet.{group}".encode()).digest() - uid = base64.b64encode(hash).decode().rstrip("=").replace("/", "_").replace("+", "-") + # Format: applyset-..ConfigMap.))>-v1 + hash_input = f"{name}.{namespace}.ConfigMap." + hash_bytes = hashlib.sha256(hash_input.encode()).digest() + uid = base64.b64encode(hash_bytes).decode().rstrip("=").replace("/", "_").replace("+", "-") return f"applyset-{uid}-v1" @@ -225,15 +252,256 @@ def get_canonical_resource_kind_name(api_version: str, kind: str) -> str: Given the apiVersion and kind of a Kubernetes resource, return the canonical name of the resource. This name can be used to identify the resource in an ApplySet's `applyset.kubernetes.io/contains-group-kinds` annotation. - Note that according to the [reference][1], the resource name should use the plural form, but it appears that the - resource kind name is also accepted. Deriving the plural form will be difficult without querying the Kubernetes - API. + The annotation format is `.` where Kind is the singular, capitalized kind name. + For core v1 resources (no group), the format is just ``. + See: https://kubernetes.io/docs/reference/labels-annotations-taints/#applyset-kubernetes-io-contains-group-kinds + + Args: + api_version: The apiVersion of the resource (e.g., 'v1', 'apps/v1', 'argoproj.io/v1alpha1'). + kind: The kind of the resource (e.g., 'Pod', 'Deployment', 'CronWorkflow'). + + Returns: + The canonical resource name in the format '.' (e.g., 'Deployment.apps', 'Pod'). + """ + + group = api_version.split("/")[0] if "/" in api_version else "" + return (f"{kind}.{group}").rstrip(".") - [1]: https://kubernetes.io/docs/reference/labels-annotations-taints/#applyset-kubernetes-io-contains-group-kinds + +class ApplySetManager: + """ + Helper class to manage applying and diffing resources associated with an ApplySet. + + This class handles: + - Creating and configuring ApplySet resources + - Including the ApplySet ConfigMap with other resources during apply/diff + - Tagging resources with the applyset.kubernetes.io/part-of label + - Looking up existing ApplySet ConfigMaps in the cluster + - Listing all resources that are members of an ApplySet + - Computing which resources have been removed from the manifest + """ + + def __init__( + self, + client: ApiClient, + applyset: ApplySet | None = None, + add_part_of_labels: bool = True, + ) -> None: + """ + Initialize the ApplySetManager. + + Args: + client: The Kubernetes API client to use. + applyset: The ApplySet to manage, or None to skip ApplySet-related logic. + add_part_of_labels: Whether to add the applyset.kubernetes.io/part-of label to resources. + """ + self.client = client + self.applyset = applyset + self.add_part_of_labels = add_part_of_labels + + @property + def enabled(self) -> bool: + """Returns True if ApplySet management is enabled (i.e., an ApplySet is configured).""" + return self.applyset is not None + + def prepare_resources(self, resources: ResourceList) -> ResourceList: + """ + Prepare resources for apply/diff by adding ApplySet labels and including the ApplySet ConfigMap. + + Args: + resources: The list of resources to prepare. + + Returns: + A new ResourceList with the ApplySet ConfigMap included (if enabled) and part-of labels added. + """ + if not self.enabled or self.applyset is None: + return resources + + result = ResourceList(list(resources)) + + # Tag resources as part of the current apply set + if self.add_part_of_labels: + for resource in result: + if "metadata" in resource: + labels = resource["metadata"].setdefault("labels", {}) + if APPLYSET_LABEL_PART_OF not in labels: + labels[APPLYSET_LABEL_PART_OF] = self.applyset.id + + # Include the ApplySet ConfigMap with the resources + result.insert(0, self.applyset.dump()) + + return result + + def get_applyset_resource(self) -> Resource | None: + """ + Get the ApplySet ConfigMap resource. + + Returns: + The ApplySet ConfigMap resource, or None if ApplySet is not enabled. + """ + if not self.enabled or self.applyset is None: + return None + return self.applyset.dump() + + def get_deleted_resources(self, new_resources: ResourceList) -> ResourceList: + """ + Compute which resources have been removed from the manifest. + + This compares the new resources against the existing resources in the cluster + that are part of this ApplySet, and returns the resources that exist in the + cluster but are not in the new manifest. + + Args: + new_resources: The new list of resources from the manifest. + + Returns: + A ResourceList of resources that should be deleted (exist in cluster but not in manifest). + """ + if not self.enabled or self.applyset is None: + return ResourceList([]) + + # Get existing resources from the cluster that belong to this ApplySet + # Try to find the existing ApplySet ConfigMap to know which kinds to look for + existing_applyset_cm = get_existing_applyset( + self.applyset.name, self.applyset.namespace, self.client + ) + kinds: list[str] = [] + + if existing_applyset_cm: + annotations = existing_applyset_cm.get("metadata", {}).get("annotations", {}) + kinds_str = annotations.get(APPLYSET_ANNOTATION_CONTAINS_GROUP_KINDS) + if kinds_str: + kinds = kinds_str.split(",") + + existing_resources = list_applyset_members(self.applyset.id, self.client, kinds) + + if not existing_resources: + return ResourceList([]) + + # Build a set of resource identifiers from the new manifest + new_resource_ids = set() + for resource in new_resources: + resource_id = _get_resource_identifier(resource) + if resource_id: + new_resource_ids.add(resource_id) + + # Also add the ApplySet ConfigMap itself to avoid deleting it + applyset_cm = self.applyset.dump() + applyset_id = _get_resource_identifier(applyset_cm) + if applyset_id: + new_resource_ids.add(applyset_id) + + # Find resources that exist in the cluster but not in the new manifest + deleted_resources: list[Resource] = [] + for resource in existing_resources: + resource_id = _get_resource_identifier(resource) + if resource_id and resource_id not in new_resource_ids: + deleted_resources.append(resource) + + return ResourceList(deleted_resources) + + +def _get_resource_identifier(resource: Resource) -> str | None: + """ + Get a unique identifier for a resource based on apiVersion, kind, namespace, and name. Args: - api_version: The apiVersion of the resource. - kind: The kind of the resource. + resource: The resource to identify. + + Returns: + A string identifier in the format "apiVersion/kind/namespace/name" or None if the resource + is missing required fields. """ + api_version = resource.get("apiVersion") + kind = resource.get("kind") + metadata = resource.get("metadata", {}) + name = metadata.get("name") + namespace = metadata.get("namespace", "") + + if not api_version or not kind or not name: + return None - return (f"{kind}." + (api_version.split("/")[0] if "/" in api_version else "")).rstrip(".") + return f"{api_version}/{kind}/{namespace}/{name}" + + +def get_existing_applyset(name: str, namespace: str, client: ApiClient) -> Resource | None: + """ + Look up an existing ApplySet ConfigMap in the cluster. + + Args: + name: The name of the ApplySet ConfigMap. + namespace: The namespace of the ApplySet ConfigMap. + client: The Kubernetes API client to use. + + Returns: + The ApplySet ConfigMap resource if it exists, or None if not found. + """ + try: + dynamic = DynamicClient(client) + # ConfigMap is v1 + resource_client = dynamic.resources.get(api_version="v1", kind="ConfigMap") + resource = resource_client.get(name=name, namespace=namespace) + + # Verify it's an ApplySet ConfigMap by checking for the id label + labels = resource.metadata.get("labels", {}).to_dict() + if APPLYSET_LABEL_ID in labels: + return Resource(resource.to_dict()) + return None + except NotFoundError: + return None + + +def list_applyset_members( + applyset_id: str, client: ApiClient, kinds: list[str] +) -> ResourceList: + """ + List all resources in the cluster that are members of an ApplySet. + + Args: + applyset_id: The ID of the ApplySet (value of applyset.kubernetes.io/id label). + client: The Kubernetes API client to use. + kinds: List of resource kinds to search for. + + Returns: + A ResourceList of all resources that have the applyset.kubernetes.io/part-of label + matching the given ApplySet ID. + """ + dynamic = DynamicClient(client) + all_resources: list[Resource] = [] + + resources_to_check: list[tuple[str, str]] = [] + + for k in kinds: + parts = k.split(".", 1) + if len(parts) == 2: + resources_to_check.append((parts[0], parts[1])) + else: + resources_to_check.append((parts[0], "")) + + for kind, group in resources_to_check: + try: + api_resources = dynamic.resources.search(kind=kind, group=group) + if not api_resources: + continue + + res_client = api_resources[0] + + items = res_client.get(label_selector=f"{APPLYSET_LABEL_PART_OF}={applyset_id}") + for item in items.items: + all_resources.append(Resource(item.to_dict())) + + except NotFoundError: + # It's possible the resource kind doesn't exist in the cluster (e.g. CRD was removed). + # In this case, we can just skip it. + continue + + # Deduplicate resources by their UID + seen_uids: set[str] = set() + unique_resources: list[Resource] = [] + for resource in all_resources: + uid = resource.get("metadata", {}).get("uid", "") + if uid and uid not in seen_uids: + seen_uids.add(uid) + unique_resources.append(resource) + + return ResourceList(unique_resources) \ No newline at end of file diff --git a/src/nyl/resources/applyset_test.py b/src/nyl/resources/applyset_test.py index 7c07c1c1..1730d70f 100644 --- a/src/nyl/resources/applyset_test.py +++ b/src/nyl/resources/applyset_test.py @@ -1,39 +1,214 @@ -from nyl.resources import ObjectMetadata -from nyl.resources.applyset import ApplySet, calculate_applyset_id, get_canonical_resource_kind_name +import os +from unittest.mock import patch + +from nyl.resources.applyset import ( + APPLYSET_LABEL_PART_OF, + ApplySet, + ApplySetContext, + ApplySetManager, + NYL_ANNOTATION_LAST_APPLIED_CONTEXT, + calculate_applyset_id, + get_canonical_resource_kind_name, +) +from nyl.tools.types import Resource, ResourceList def test__ApplySet__dump() -> None: - resource = ApplySet( - metadata=ObjectMetadata( - name="test-applyset", - namespace=None, - ) - ) - resource.tooling = "kubectl/1.30" - resource.contains_group_kinds = ["Service", "Deployment.apps"] - resource.validate() + applyset = ApplySet.new("test-applyset", "default") + applyset.tooling = "kubectl/1.30" + applyset.contains_group_kinds = ["Deployment.apps", "Service"] + applyset.validate() - assert resource.dump() == { - "apiVersion": "nyl.io/v1", - "kind": "ApplySet", + assert applyset.dump() == { + "apiVersion": "v1", + "kind": "ConfigMap", "metadata": { "name": "test-applyset", + "namespace": "default", "annotations": { "applyset.kubernetes.io/tooling": "kubectl/1.30", - "applyset.kubernetes.io/contains-group-kinds": "Deployment.apps,Service", # sorted + "applyset.kubernetes.io/contains-group-kinds": "Deployment.apps,Service", }, "labels": { "applyset.kubernetes.io/id": calculate_applyset_id( name="test-applyset", - namespace="", - group="nyl.io", + namespace="default", ), }, }, } +def test__ApplySet__dump_with_context() -> None: + applyset = ApplySet.new("test-applyset", "default") + applyset.tooling = "kubectl/1.30" + applyset.contains_group_kinds = ["Deployment.apps", "Service"] + applyset.context = ApplySetContext( + source="cli", + files=["test.yaml"], + ) + applyset.validate() + + result = applyset.dump() + assert result["metadata"]["annotations"][NYL_ANNOTATION_LAST_APPLIED_CONTEXT] == '{"source":"cli","files":["test.yaml"]}' + + +def test__ApplySet__dump_with_argocd_context() -> None: + applyset = ApplySet.new("test-applyset", "default") + applyset.tooling = "kubectl/1.30" + applyset.contains_group_kinds = ["Deployment.apps", "Service"] + applyset.context = ApplySetContext( + source="argocd", + files=["test.yaml"], + revision="abc123", + app_name="my-app", + app_namespace="argocd", + project_name="default", + source_path="/apps/myapp", + source_repo_url="https://github.com/example/repo", + target_revision="main", + kube_version="1.31", + ) + applyset.validate() + + result = applyset.dump() + expected = '{"source":"argocd","files":["test.yaml"],"revision":"abc123","app_name":"my-app","app_namespace":"argocd","project_name":"default","source_path":"/apps/myapp","source_repo_url":"https://github.com/example/repo","target_revision":"main","kube_version":"1.31"}' + assert result["metadata"]["annotations"][NYL_ANNOTATION_LAST_APPLIED_CONTEXT] == expected + + +def test__ApplySetContext__to_json() -> None: + # Test CLI context + context = ApplySetContext(source="cli", files=["a.yaml", "b.yaml"]) + assert context.to_json() == '{"source":"cli","files":["a.yaml","b.yaml"]}' + + # Test ArgoCD context with all fields + context = ApplySetContext( + source="argocd", + files=["test.yaml"], + revision="abc123def", + app_name="my-app", + app_namespace="argocd", + project_name="default", + source_path="/apps/myapp", + source_repo_url="https://github.com/example/repo", + target_revision="main", + kube_version="1.31", + ) + expected = '{"source":"argocd","files":["test.yaml"],"revision":"abc123def","app_name":"my-app","app_namespace":"argocd","project_name":"default","source_path":"/apps/myapp","source_repo_url":"https://github.com/example/repo","target_revision":"main","kube_version":"1.31"}' + assert context.to_json() == expected + + # Test minimal context + context = ApplySetContext(source="cli") + assert context.to_json() == '{"source":"cli"}' + + +def test__ApplySetContext__from_environment_cli() -> None: + # Test CLI context (no ArgoCD env vars) + with patch.dict(os.environ, {}, clear=True): + context = ApplySetContext.from_environment(files=["test.yaml"]) + assert context.source == "cli" + assert context.files == ["test.yaml"] + assert context.app_name is None + assert context.revision is None + assert context.kube_version is None + + # Test CLI context with KUBE_VERSION + with patch.dict(os.environ, {"KUBE_VERSION": "1.31"}, clear=True): + context = ApplySetContext.from_environment(files=["test.yaml"]) + assert context.source == "cli" + assert context.kube_version == "1.31" + + +def test__ApplySetContext__from_environment_argocd() -> None: + # Test ArgoCD context (with ArgoCD env vars) + argocd_env = { + "ARGOCD_APP_NAME": "my-app", + "ARGOCD_APP_REVISION": "abc123", + "ARGOCD_APP_NAMESPACE": "argocd", + "ARGOCD_APP_PROJECT_NAME": "default", + "ARGOCD_APP_SOURCE_PATH": "/apps/myapp", + "ARGOCD_APP_SOURCE_REPO_URL": "https://github.com/example/repo", + "ARGOCD_APP_SOURCE_TARGET_REVISION": "main", + "KUBE_VERSION": "1.31", + } + with patch.dict(os.environ, argocd_env, clear=True): + context = ApplySetContext.from_environment(files=["test.yaml"]) + assert context.source == "argocd" + assert context.files == ["test.yaml"] + assert context.app_name == "my-app" + assert context.revision == "abc123" + assert context.app_namespace == "argocd" + assert context.project_name == "default" + assert context.source_path == "/apps/myapp" + assert context.source_repo_url == "https://github.com/example/repo" + assert context.target_revision == "main" + assert context.kube_version == "1.31" + + +def test__ApplySetManager__disabled() -> None: + # Test that manager passes through resources when disabled + manager = ApplySetManager(applyset=None) + assert not manager.enabled + + resources = ResourceList([ + Resource({"apiVersion": "v1", "kind": "Pod", "metadata": {"name": "test"}}) + ]) + result = manager.prepare_resources(resources) + assert len(result) == 1 + assert result[0]["kind"] == "Pod" + + +def test__ApplySetManager__prepare_resources() -> None: + # Test that manager adds ConfigMap and part-of labels + applyset = ApplySet.new("test-applyset", "default") + applyset.tooling = "kubectl/1.30" + applyset.contains_group_kinds = ["Pod"] + + manager = ApplySetManager(applyset=applyset, add_part_of_labels=True) + assert manager.enabled + + resources = ResourceList([ + Resource({"apiVersion": "v1", "kind": "Pod", "metadata": {"name": "test"}}) + ]) + result = manager.prepare_resources(resources) + + # Should have ConfigMap at the beginning + original resource + assert len(result) == 2 + assert result[0]["kind"] == "ConfigMap" + assert result[0]["metadata"]["name"] == "test-applyset" + assert result[1]["kind"] == "Pod" + assert result[1]["metadata"]["labels"][APPLYSET_LABEL_PART_OF] == applyset.id + + +def test__ApplySetManager__prepare_resources_no_labels() -> None: + # Test that manager doesn't add part-of labels when disabled + applyset = ApplySet.new("test-applyset", "default") + applyset.tooling = "kubectl/1.30" + applyset.contains_group_kinds = ["Pod"] + + manager = ApplySetManager(applyset=applyset, add_part_of_labels=False) + + resources = ResourceList([ + Resource({"apiVersion": "v1", "kind": "Pod", "metadata": {"name": "test"}}) + ]) + result = manager.prepare_resources(resources) + + # Should have ConfigMap but no part-of label on the Pod + assert len(result) == 2 + assert result[0]["kind"] == "ConfigMap" + assert APPLYSET_LABEL_PART_OF not in result[1]["metadata"].get("labels", {}) + + +def test__calculate_applyset_id() -> None: + # Verify the ID format is correct for ConfigMap-based ApplySets + applyset_id = calculate_applyset_id(name="test", namespace="default") + assert applyset_id.startswith("applyset-") + assert applyset_id.endswith("-v1") + + def test__get_canonical_resource_kind_name() -> None: + # The format is . - singular, capitalized Kind name assert get_canonical_resource_kind_name("v1", "Pod") == "Pod" assert get_canonical_resource_kind_name("apps/v1", "Deployment") == "Deployment.apps" assert get_canonical_resource_kind_name("nyl.io/v1", "ApplySet") == "ApplySet.nyl.io" + assert get_canonical_resource_kind_name("argoproj.io/v1alpha1", "CronWorkflow") == "CronWorkflow.argoproj.io" diff --git a/src/nyl/tools/kubectl.py b/src/nyl/tools/kubectl.py index 6147ed1e..11b74dd3 100644 --- a/src/nyl/tools/kubectl.py +++ b/src/nyl/tools/kubectl.py @@ -5,15 +5,18 @@ from dataclasses import dataclass from pathlib import Path from tempfile import TemporaryDirectory -from typing import Any, Literal, TypedDict +from typing import TYPE_CHECKING, Any, Literal, TypedDict import yaml from loguru import logger -from nyl.resources.applyset import APPLYSET_LABEL_PART_OF, ApplySet +from nyl.resources.applyset import APPLYSET_LABEL_PART_OF from nyl.tools.logging import lazy_str from nyl.tools.shell import pretty_cmd -from nyl.tools.types import ResourceList +from nyl.tools.types import Resource, ResourceList + +if TYPE_CHECKING: + from nyl.resources.applyset import ApplySet @dataclass @@ -90,35 +93,66 @@ def apply( manifests: ResourceList, force_conflicts: bool = False, server_side: bool = True, - applyset: str | None = None, - prune: bool = False, ) -> None: """ Apply the given manifests to the cluster. """ - env = self.env command = ["kubectl", "apply", "-f", "-"] if server_side: command.append("--server-side") - if applyset: - env = env.copy() - env["KUBECTL_APPLYSET"] = "true" - command.extend(["--applyset", applyset]) - if prune: - command.append("--prune") if force_conflicts: command.append("--force-conflicts") logger.debug("Applying manifests with command: $ {command}", command=lazy_str(pretty_cmd, command)) - status = subprocess.run(command, input=yaml.safe_dump_all(manifests), text=True, env={**os.environ, **env}) + + status = subprocess.run(command, input=yaml.safe_dump_all(manifests), text=True, env={**os.environ, **self.env}) if status.returncode: raise KubectlError(status.returncode) + def delete(self, resource: Resource) -> bool: + """ + Delete a single resource from the cluster. + + Args: + resource: The resource to delete. + + Returns: + True if the resource was deleted, False if it didn't exist. + """ + api_version = resource.get("apiVersion", "") + kind = resource.get("kind", "") + metadata = resource.get("metadata", {}) + name = metadata.get("name", "") + namespace = metadata.get("namespace") + + # Build the resource identifier + if "/" in api_version: + # For resources like apps/v1 Deployment, the type is "deployment.apps" + group = api_version.split("/")[0] + resource_type = f"{kind.lower()}.{group}" + else: + # For core v1 resources like Pod, the type is just the kind + resource_type = kind.lower() + + command = ["kubectl", "delete", resource_type, name] + if namespace: + command.extend(["-n", namespace]) + + logger.debug("Deleting resource with command: $ {command}", command=lazy_str(pretty_cmd, command)) + status = subprocess.run(command, capture_output=True, text=True, env={**os.environ, **self.env}) + + if status.returncode == 0: + return True + elif "NotFound" in status.stderr: + return False + else: + raise KubectlError(status.returncode, status.stderr) + def diff( self, manifests: ResourceList, - applyset: ApplySet | None = None, + applyset: "ApplySet | None" = None, on_error: Literal["raise", "return"] = "raise", ) -> Literal["no-diff", "diff", "error"]: """