From 0f8e3ebbbbe66c754e0a25215db77a794714d061 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Tue, 2 Dec 2025 16:21:18 -0800 Subject: [PATCH 1/2] [WIP] Settings api Signed-off-by: Ketan Umare --- settings.proto | 149 +++++++++++++++ src/flyte/cli/_edit.py | 113 +++++++++++ src/flyte/cli/main.py | 6 + src/flyte/remote/__init__.py | 2 + src/flyte/remote/_client/_protocols.py | 12 ++ src/flyte/remote/_client/controlplane.py | 7 + src/flyte/remote/_settings.py | 232 +++++++++++++++++++++++ 7 files changed, 521 insertions(+) create mode 100644 settings.proto create mode 100644 src/flyte/cli/_edit.py create mode 100644 src/flyte/remote/_settings.py diff --git a/settings.proto b/settings.proto new file mode 100644 index 000000000..9e6ca0940 --- /dev/null +++ b/settings.proto @@ -0,0 +1,149 @@ +syntax = "proto3"; + +package flyte.settings; + +// SettingsService provides hierarchical configuration management across ROOT, DOMAIN, and PROJECT scopes. +// +// The service supports: +// - Hierarchical settings inheritance (ROOT -> DOMAIN -> PROJECT) +// - Local overrides at each scope level +// - Origin tracking for each effective setting +// - Simple two-method API: GetSettings and UpdateSettings +service SettingsService { + // GetSettings retrieves settings for a given scope, returning both effective + // (inherited + local) settings with origin metadata, and local overrides only. + // + // Scope is determined by the presence of domain and project fields: + // - Neither set: ROOT scope + // - Only domain set: DOMAIN scope + // - Both set: PROJECT scope + rpc GetSettings(GetSettingsRequest) returns (GetSettingsResponse); + + // UpdateSettings replaces the complete set of local overrides for a given scope. + // + // Any setting not included in local_settings will be removed as an override + // and will inherit from parent scope. This is a full replacement operation, + // not a merge. + rpc UpdateSettings(UpdateSettingsRequest) returns (UpdateSettingsResponse); +} + +// SettingValue represents a configuration value with support for common types. +message SettingValue { + oneof kind { + string string_value = 1; + int64 int_value = 2; + double double_value = 3; + bool bool_value = 4; + } +} + +// SettingOrigin identifies where a setting value originates from in the hierarchy. +message SettingOrigin { + // scope_type is one of: "ROOT", "DOMAIN", or "PROJECT" + string scope_type = 1; + + // domain is populated for DOMAIN and PROJECT scopes, empty for ROOT + string domain = 2; + + // project is populated only for PROJECT scope, empty for ROOT and DOMAIN + string project = 3; +} + +// EffectiveSetting represents a resolved setting value with its origin. +// This shows what value is actually in effect, considering inheritance. +message EffectiveSetting { + // key is the setting name (e.g., "default_queue", "run_concurrency") + string key = 1; + + // value is the resolved setting value + SettingValue value = 2; + + // origin indicates which scope this value comes from + SettingOrigin origin = 3; +} + +// LocalSetting represents an explicit override at a specific scope. +message LocalSetting { + // key is the setting name + string key = 1; + + // value is the override value + SettingValue value = 2; +} + +// GetSettingsRequest specifies the scope to retrieve settings for. +message GetSettingsRequest { + // domain is optional. If empty, ROOT scope is queried. + string domain = 1; + + // project is optional. Requires domain to be set. If both are set, PROJECT scope is queried. + string project = 2; +} + +// GetSettingsResponse contains both effective settings (with inheritance) and local overrides. +message GetSettingsResponse { + // effective_settings contains all resolved settings with their origin metadata. + // This includes both inherited settings and local overrides. + repeated EffectiveSetting effective_settings = 1; + + // local_settings contains only the explicit overrides set at this scope. + // Settings not present here are inherited from parent scopes. + repeated LocalSetting local_settings = 2; +} + +// UpdateSettingsRequest replaces the complete set of local overrides for a scope. +message UpdateSettingsRequest { + // domain is optional. If empty, ROOT scope is updated. + string domain = 1; + + // project is optional. Requires domain to be set. If both are set, PROJECT scope is updated. + string project = 2; + + // local_settings is the complete list of local overrides for this scope. + // Any previous override not included here will be removed. + repeated LocalSetting local_settings = 3; +} + +// UpdateSettingsResponse is currently empty but reserved for future extensions. +message UpdateSettingsResponse {} + +// ============================================================================ +// Available Settings Reference +// ============================================================================ +// +// The following settings are supported in the Flyte settings hierarchy: +// +// Queue Configuration: +// - default_queue (string): The default queue for task execution +// - accessible_queues (string): Comma-separated list of accessible queues +// - queue_priority (int): Priority level for queue processing +// +// Concurrency Controls: +// - run_concurrency (int): Maximum concurrent runs +// - action_concurrency (int): Maximum concurrent actions +// - child_action_concurrency (int): Maximum concurrent child actions +// +// Cluster and Namespace: +// - cluster_mappings (string): JSON mapping of cluster assignments +// - namespace_mappings (string): JSON mapping of namespace assignments +// - namespace_auto_creation (bool): Enable automatic namespace creation on project creation +// +// Resource Defaults: +// - task_resource_defaults (string): JSON of default task resource requests/limits +// - service_account_defaults (string): Default service account name +// - metadata_bucket_defaults (string): Default metadata storage bucket +// - data_storage_location_defaults (string): Default data storage location +// +// Labels and Annotations: +// - labels (string): JSON map of Kubernetes labels to apply +// - annotations (string): JSON map of Kubernetes annotations to apply +// +// Runtime Configuration: +// - environment_variables (string): JSON map of environment variables +// - resource_quotas (string): JSON of resource quota definitions +// - interruptible (bool): Enable interruptible task execution +// - overwrite_cache (bool): Force cache overwrite +// +// Registry and Secrets: +// - container_registry (string): Container image registry URL +// - secrets (string): JSON map of secret references diff --git a/src/flyte/cli/_edit.py b/src/flyte/cli/_edit.py new file mode 100644 index 000000000..42b2d7d2a --- /dev/null +++ b/src/flyte/cli/_edit.py @@ -0,0 +1,113 @@ +import os +import subprocess +import tempfile +from pathlib import Path + +import rich_click as click + +from flyte.cli import _common as common + + +@click.command(cls=common.CommandBase) +@click.pass_obj +def edit(cfg: common.CLIConfig, project: str | None, domain: str | None): + """Edit hierarchical settings interactively. + + Opens settings in your $EDITOR, showing: + - Local overrides (uncommented) + - Inherited settings (commented with origin) + + To create an override: uncomment a line and/or edit its value + To remove an override: comment out the line + """ + import flyte.remote as remote + + # Determine scope + scope_desc = "ROOT" + if project and domain: + scope_desc = f"PROJECT({domain}/{project})" + elif domain: + scope_desc = f"DOMAIN({domain})" + + console = common.Console() + console.print(f"[bold]Editing settings for scope:[/bold] {scope_desc}") + + # Get current settings + try: + settings = remote.Settings.get(project=project, domain=domain) + except Exception as e: + console.print(f"[red]Error fetching settings:[/red] {e}") + raise click.Abort + + # Generate YAML for editing + yaml_content = settings.to_yaml() + + # Get editor from environment + editor = os.environ.get("EDITOR", os.environ.get("VISUAL", "vi")) + + # Create a temporary file for editing + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as tmp_file: + tmp_path = Path(tmp_file.name) + tmp_file.write(yaml_content) + + try: + # Open the editor + result = subprocess.run([editor, str(tmp_path)], check=False) + if result.returncode != 0: + console.print(f"[yellow]Editor exited with code {result.returncode}[/yellow]") + + # Read the edited content + with open(tmp_path, "r") as f: + edited_content = f.read() + + # Check if content changed + if edited_content.strip() == yaml_content.strip(): + console.print("[dim]No changes detected.[/dim]") + return + + # Parse the edited YAML to extract overrides + try: + overrides = remote.Settings.parse_yaml(edited_content) + except Exception as e: + console.print(f"[red]Error parsing edited YAML:[/red] {e}") + raise click.Abort + + # Show changes + original_local = {s.key: s.value for s in settings.local_settings} + added = {k: v for k, v in overrides.items() if k not in original_local} + removed = {k: v for k, v in original_local.items() if k not in overrides} + modified = { + k: (original_local[k], v) for k, v in overrides.items() if k in original_local and original_local[k] != v + } + + if added or removed or modified: + console.print("\n[bold]Changes to apply:[/bold]") + if added: + console.print("[green]Added overrides:[/green]") + for k, v in added.items(): + console.print(f" + {k}: {v}") + if modified: + console.print("[yellow]Modified overrides:[/yellow]") + for k, (old, new) in modified.items(): + console.print(f" ~ {k}: {old} → {new}") + if removed: + console.print("[red]Removed overrides (will inherit):[/red]") + for k, v in removed.items(): + console.print(f" - {k}: {v}") + + # Confirm and apply + if click.confirm("\nApply these changes?", default=True): + try: + settings.update(overrides) + console.print("[green]✓ Settings updated successfully[/green]") + except Exception as e: + console.print(f"[red]Error updating settings:[/red] {e}") + raise click.Abort + else: + console.print("[dim]Changes discarded.[/dim]") + else: + console.print("[dim]No changes detected.[/dim]") + + finally: + # Clean up temporary file + tmp_path.unlink(missing_ok=True) diff --git a/src/flyte/cli/main.py b/src/flyte/cli/main.py index 99fb4214f..57f4b74b9 100644 --- a/src/flyte/cli/main.py +++ b/src/flyte/cli/main.py @@ -11,6 +11,7 @@ from ._create import create from ._delete import delete from ._deploy import deploy +from ._edit import edit from ._gen import gen from ._get import get from ._plugins import discover_and_register_plugins @@ -36,6 +37,10 @@ "name": "Management of various objects.", "commands": ["create", "get", "delete", "update"], }, + { + "name": "Settings management.", + "commands": ["edit"], + }, { "name": "Build and deploy environments, tasks and images.", "commands": ["build", "deploy"], @@ -224,6 +229,7 @@ def main( main.add_command(whoami) # type: ignore main.add_command(update) # type: ignore main.add_command(serve) # type: ignore +main.add_command(edit) # type: ignore # Discover and register CLI plugins from installed packages discover_and_register_plugins(main) diff --git a/src/flyte/remote/__init__.py b/src/flyte/remote/__init__.py index a46ae52d6..762457ba7 100644 --- a/src/flyte/remote/__init__.py +++ b/src/flyte/remote/__init__.py @@ -14,6 +14,7 @@ "RunDetails", "Secret", "SecretTypes", + "Settings", "Task", "TaskDetails", "Trigger", @@ -30,6 +31,7 @@ from ._project import Project from ._run import Phase, Run, RunDetails from ._secret import Secret, SecretTypes +from ._settings import Settings from ._task import Task, TaskDetails from ._trigger import Trigger from ._user import User diff --git a/src/flyte/remote/_client/_protocols.py b/src/flyte/remote/_client/_protocols.py index a07f68f5f..6a7a78c8b 100644 --- a/src/flyte/remote/_client/_protocols.py +++ b/src/flyte/remote/_client/_protocols.py @@ -187,3 +187,15 @@ async def UpdateTriggers( async def DeleteTriggers( self, request: trigger_service_pb2.DeleteTriggersRequest ) -> trigger_service_pb2.DeleteTriggersResponse: ... + + +class SettingsService(Protocol): + """Settings service protocol for hierarchical configuration management. + + Note: This assumes the protobuf messages will be available from flyteidl2.settings + once the settings.proto file is compiled. + """ + + async def GetSettings(self, request) -> any: ... + + async def UpdateSettings(self, request) -> any: ... diff --git a/src/flyte/remote/_client/controlplane.py b/src/flyte/remote/_client/controlplane.py index 5c6fa05ae..f4e614238 100644 --- a/src/flyte/remote/_client/controlplane.py +++ b/src/flyte/remote/_client/controlplane.py @@ -31,6 +31,7 @@ RunLogsService, RunService, SecretService, + SettingsService, TaskService, TriggerService, ) @@ -57,6 +58,8 @@ def __init__( self._secrets_service = secret_pb2_grpc.SecretServiceStub(channel=channel) self._identity_service = identity_pb2_grpc.IdentityServiceStub(channel=channel) self._trigger_service = trigger_service_pb2_grpc.TriggerServiceStub(channel=channel) + # Note: Settings service stub will be initialized once settings_pb2_grpc is available + self._settings_service = None # settings_pb2_grpc.SettingsServiceStub(channel=channel) @classmethod async def for_endpoint(cls, endpoint: str, *, insecure: bool = False, **kwargs) -> ClientSet: @@ -124,5 +127,9 @@ def identity_service(self) -> IdentityService: def trigger_service(self) -> TriggerService: return self._trigger_service + @property + def settings_service(self) -> SettingsService: + return self._settings_service + async def close(self, grace: float | None = None): return await self._channel.close(grace=grace) diff --git a/src/flyte/remote/_settings.py b/src/flyte/remote/_settings.py new file mode 100644 index 000000000..c0ca296bc --- /dev/null +++ b/src/flyte/remote/_settings.py @@ -0,0 +1,232 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +from flyte.remote._common import ToJSONMixin +from flyte.syncify import syncify + + +@dataclass +class SettingOrigin: + """Represents where a setting value comes from in the hierarchy.""" + + scope_type: str # "ROOT", "DOMAIN", or "PROJECT" + domain: str | None = None + project: str | None = None + + def __str__(self) -> str: + if self.scope_type == "ROOT": + return "ROOT" + elif self.scope_type == "DOMAIN": + return f"DOMAIN({self.domain})" + else: # PROJECT + return f"PROJECT({self.domain}/{self.project})" + + +@dataclass +class EffectiveSetting: + """Represents a resolved setting value with its origin.""" + + key: str + value: Any + origin: SettingOrigin + + +@dataclass +class LocalSetting: + """Represents an explicit override at a specific scope.""" + + key: str + value: Any + + +@dataclass +class Settings(ToJSONMixin): + """Hierarchical configuration settings with inheritance support. + + This class manages settings across ROOT, DOMAIN, and PROJECT scopes, + supporting local overrides and inheritance from parent scopes. + """ + + effective_settings: list[EffectiveSetting] + local_settings: list[LocalSetting] + domain: str | None = None + project: str | None = None + + @staticmethod + def _parse_value(raw_value: str) -> Any: + """Parse a YAML value string into appropriate Python type.""" + raw_value = raw_value.strip() + + # Remove quotes if present + if (raw_value.startswith('"') and raw_value.endswith('"')) or ( + raw_value.startswith("'") and raw_value.endswith("'") + ): + return raw_value[1:-1] + + # Parse booleans + if raw_value.lower() == "true": + return True + if raw_value.lower() == "false": + return False + + # Parse numbers + try: + if "." in raw_value: + return float(raw_value) + return int(raw_value) + except ValueError: + pass + + # Return as string + return raw_value + + @staticmethod + def _format_value(value: Any) -> str: + """Format a Python value for YAML output.""" + if isinstance(value, bool): + return "true" if value else "false" + elif isinstance(value, str): + # Quote strings that contain special characters or are numeric-looking + if ( + any(c in value for c in [":", "#", "\n", '"', "'"]) + or value.strip() != value + or (value and (value[0].isdigit() or value in ["true", "false", "null"])) + ): + return f'"{value}"' + return value + elif isinstance(value, (int, float)): + return str(value) + else: + return str(value) + + def to_yaml(self) -> str: + """Generate YAML representation showing local and inherited settings. + + Local settings are uncommented, inherited settings are commented with origin info. + """ + lines = [] + + # Create a set of local setting keys for quick lookup + local_keys = {s.key for s in self.local_settings} + + # Add local overrides section + if self.local_settings: + lines.append("# Local overrides") + for setting in self.local_settings: + value_str = self._format_value(setting.value) + lines.append(f"{setting.key}: {value_str}") + lines.append("") + + # Add inherited settings section + inherited = [s for s in self.effective_settings if s.key not in local_keys] + if inherited: + lines.append("# Inherited settings (uncomment to override)") + for setting in inherited: + value_str = self._format_value(setting.value) + lines.append(f"# {setting.key}: {value_str} # inherited from {setting.origin}") + + return "\n".join(lines) + + @staticmethod + def parse_yaml(yaml_content: str) -> dict[str, Any]: + """Parse edited YAML content into a dictionary of overrides. + + Only uncommented lines are considered as local overrides. + Commented lines are treated as removed/inherited. + """ + overrides = {} + for line in yaml_content.split("\n"): + stripped = line.strip() + # Skip empty lines and comments + if not stripped or stripped.startswith("#"): + continue + + # Parse key-value pair + if ":" not in stripped: + continue + + key, raw_value = stripped.split(":", 1) + # Remove inline comments + if "#" in raw_value: + raw_value = raw_value.split("#")[0] + + overrides[key.strip()] = Settings._parse_value(raw_value) + + return overrides + + @syncify + @classmethod + async def get(cls, project: str | None = None, domain: str | None = None) -> Settings: + """Retrieve settings for a given scope. + + Args: + project: Project name (requires domain to be set) + domain: Domain name + + Returns: + Settings object with effective and local settings + """ + # TODO: Once the gRPC client is implemented, replace this with actual API call + # from flyte.remote._client.controlplane import ClientSet + # client = await ClientSet.from_env() + # request = GetSettingsRequest(domain=domain or "", project=project or "") + # response = await client.settings_service.GetSettings(request) + + # For now, return dummy data + effective_settings = [ + EffectiveSetting(key="default_queue", value="default", origin=SettingOrigin(scope_type="ROOT")), + EffectiveSetting(key="run_concurrency", value=10, origin=SettingOrigin(scope_type="ROOT")), + EffectiveSetting(key="interruptible", value=False, origin=SettingOrigin(scope_type="ROOT")), + ] + + local_settings = [] + if domain: + # Add some domain-level overrides for demo + local_settings.append(LocalSetting(key="run_concurrency", value=20)) + effective_settings[1] = EffectiveSetting( + key="run_concurrency", value=20, origin=SettingOrigin(scope_type="DOMAIN", domain=domain) + ) + + return cls(effective_settings=effective_settings, local_settings=local_settings, domain=domain, project=project) + + @syncify + async def update(self, overrides: dict[str, Any]) -> None: + """Update settings with new local overrides. + + This replaces the complete set of local overrides for the current scope. + Settings not included will be removed and inherit from parent scope. + + Args: + overrides: Dictionary of setting key-value pairs to set as local overrides + """ + # TODO: Once the gRPC client is implemented, replace this with actual API call + # from flyte.remote._client.controlplane import ClientSet + # client = await ClientSet.from_env() + # + # local_settings = [ + # LocalSetting(key=k, value=v) for k, v in overrides.items() + # ] + # + # request = UpdateSettingsRequest( + # domain=self.domain or "", + # project=self.project or "", + # local_settings=local_settings + # ) + # await client.settings_service.UpdateSettings(request) + + # For now, just update the local state + self.local_settings = [LocalSetting(key=k, value=v) for k, v in overrides.items()] + + # Update effective settings to reflect the changes + effective_dict = {s.key: s for s in self.effective_settings} + for key, value in overrides.items(): + origin = SettingOrigin( + scope_type="PROJECT" if self.project else ("DOMAIN" if self.domain else "ROOT"), + domain=self.domain, + project=self.project, + ) + effective_dict[key] = EffectiveSetting(key=key, value=value, origin=origin) + + self.effective_settings = list(effective_dict.values()) From ba6575068a44f9d059f5890341ee1e5e417a4469 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Tue, 2 Dec 2025 21:23:14 -0800 Subject: [PATCH 2/2] Updated Signed-off-by: Ketan Umare --- .../connectors/databricks/connector.py | 1 - src/flyte/cli/_edit.py | 11 +- src/flyte/remote/_client/_protocols.py | 70 ++----- src/flyte/remote/_client/controlplane.py | 10 +- src/flyte/remote/_settings.py | 83 +++++--- src/flyte/remote/_settings_store.py | 198 ++++++++++++++++++ 6 files changed, 293 insertions(+), 80 deletions(-) create mode 100644 src/flyte/remote/_settings_store.py diff --git a/plugins/connectors/src/flyteplugins/connectors/databricks/connector.py b/plugins/connectors/src/flyteplugins/connectors/databricks/connector.py index e9a79bb68..a14aad8e2 100644 --- a/plugins/connectors/src/flyteplugins/connectors/databricks/connector.py +++ b/plugins/connectors/src/flyteplugins/connectors/databricks/connector.py @@ -6,7 +6,6 @@ from typing import Optional import aiohttp -import flyte from flyte import logger from flyte.connectors import AsyncConnector, ConnectorRegistry, Resource, ResourceMeta from flyte.connectors.utils import convert_to_flyte_phase diff --git a/src/flyte/cli/_edit.py b/src/flyte/cli/_edit.py index 42b2d7d2a..6cdf3af99 100644 --- a/src/flyte/cli/_edit.py +++ b/src/flyte/cli/_edit.py @@ -8,9 +8,14 @@ from flyte.cli import _common as common -@click.command(cls=common.CommandBase) +@click.group() +def edit(): + pass + + +@edit.command(cls=common.CommandBase) @click.pass_obj -def edit(cfg: common.CLIConfig, project: str | None, domain: str | None): +def settings(cfg: common.CLIConfig, project: str | None, domain: str | None): """Edit hierarchical settings interactively. Opens settings in your $EDITOR, showing: @@ -28,6 +33,8 @@ def edit(cfg: common.CLIConfig, project: str | None, domain: str | None): scope_desc = f"PROJECT({domain}/{project})" elif domain: scope_desc = f"DOMAIN({domain})" + elif project: + raise click.BadOptionUsage("project", "to set project settings, domain is required") console = common.Console() console.print(f"[bold]Editing settings for scope:[/bold] {scope_desc}") diff --git a/src/flyte/remote/_client/_protocols.py b/src/flyte/remote/_client/_protocols.py index 6a7a78c8b..fe2f931fc 100644 --- a/src/flyte/remote/_client/_protocols.py +++ b/src/flyte/remote/_client/_protocols.py @@ -17,7 +17,7 @@ async def GetVersion(self, request: version_pb2.GetVersionRequest) -> version_pb class ProjectDomainService(Protocol): async def RegisterProject( - self, request: project_pb2.ProjectRegisterRequest + self, request: project_pb2.ProjectRegisterRequest ) -> project_pb2.ProjectRegisterResponse: ... async def UpdateProject(self, request: project_pb2.Project) -> project_pb2.ProjectUpdateResponse: ... @@ -28,36 +28,12 @@ async def ListProjects(self, request: project_pb2.ProjectListRequest) -> project async def GetDomains(self, request: project_pb2.GetDomainRequest) -> project_pb2.GetDomainsResponse: ... - async def UpdateProjectDomainAttributes( - self, request: project_attributes_pb2.ProjectAttributesUpdateRequest - ) -> project_pb2.ProjectUpdateResponse: ... - - async def GetProjectDomainAttributes( - self, request: project_attributes_pb2.ProjectAttributesGetRequest - ) -> project_attributes_pb2.ProjectAttributes: ... - - async def DeleteProjectDomainAttributes( - self, request: project_attributes_pb2.ProjectAttributesDeleteRequest - ) -> project_attributes_pb2.ProjectAttributesDeleteResponse: ... - - async def UpdateProjectAttributes( - self, request: project_attributes_pb2.ProjectAttributesUpdateRequest - ) -> project_attributes_pb2.ProjectAttributesUpdateResponse: ... - - async def GetProjectAttributes( - self, request: project_attributes_pb2.ProjectAttributesGetRequest - ) -> project_attributes_pb2.ProjectAttributes: ... - - async def DeleteProjectAttributes( - self, request: project_attributes_pb2.ProjectAttributesDeleteRequest - ) -> project_attributes_pb2.ProjectAttributesDeleteResponse: ... - class TaskService(Protocol): async def DeployTask(self, request: task_service_pb2.DeployTaskRequest) -> task_service_pb2.DeployTaskResponse: ... async def GetTaskDetails( - self, request: task_service_pb2.GetTaskDetailsRequest + self, request: task_service_pb2.GetTaskDetailsRequest ) -> task_service_pb2.GetTaskDetailsResponse: ... async def ListTasks(self, request: task_service_pb2.ListTasksRequest) -> task_service_pb2.ListTasksResponse: ... @@ -71,7 +47,7 @@ async def Get(self, request: app_payload_pb2.GetRequest) -> app_payload_pb2.GetR async def Update(self, request: app_payload_pb2.UpdateRequest) -> app_payload_pb2.UpdateResponse: ... async def UpdateStatus( - self, request: app_payload_pb2.UpdateStatusRequest + self, request: app_payload_pb2.UpdateStatusRequest ) -> app_payload_pb2.UpdateStatusResponse: ... async def Delete(self, request: app_payload_pb2.DeleteRequest) -> app_payload_pb2.DeleteResponse: ... @@ -89,49 +65,49 @@ async def CreateRun(self, request: run_service_pb2.CreateRunRequest) -> run_serv async def AbortRun(self, request: run_service_pb2.AbortRunRequest) -> run_service_pb2.AbortRunResponse: ... async def GetRunDetails( - self, request: run_service_pb2.GetRunDetailsRequest + self, request: run_service_pb2.GetRunDetailsRequest ) -> run_service_pb2.GetRunDetailsResponse: ... async def WatchRunDetails( - self, request: run_service_pb2.WatchRunDetailsRequest + self, request: run_service_pb2.WatchRunDetailsRequest ) -> AsyncIterator[run_service_pb2.WatchRunDetailsResponse]: ... async def GetActionDetails( - self, request: run_service_pb2.GetActionDetailsRequest + self, request: run_service_pb2.GetActionDetailsRequest ) -> run_service_pb2.GetActionDetailsResponse: ... async def WatchActionDetails( - self, request: run_service_pb2.WatchActionDetailsRequest + self, request: run_service_pb2.WatchActionDetailsRequest ) -> AsyncIterator[run_service_pb2.WatchActionDetailsResponse]: ... async def GetActionData( - self, request: run_service_pb2.GetActionDataRequest + self, request: run_service_pb2.GetActionDataRequest ) -> run_service_pb2.GetActionDataResponse: ... async def ListRuns(self, request: run_service_pb2.ListRunsRequest) -> run_service_pb2.ListRunsResponse: ... async def WatchRuns( - self, request: run_service_pb2.WatchRunsRequest + self, request: run_service_pb2.WatchRunsRequest ) -> AsyncIterator[run_service_pb2.WatchRunsResponse]: ... async def ListActions(self, request: run_service_pb2.ListActionsRequest) -> run_service_pb2.ListActionsResponse: ... async def WatchActions( - self, request: run_service_pb2.WatchActionsRequest + self, request: run_service_pb2.WatchActionsRequest ) -> AsyncIterator[run_service_pb2.WatchActionsResponse]: ... class DataProxyService(Protocol): async def CreateUploadLocation( - self, request: dataproxy_pb2.CreateUploadLocationRequest + self, request: dataproxy_pb2.CreateUploadLocationRequest ) -> dataproxy_pb2.CreateUploadLocationResponse: ... async def CreateDownloadLocation( - self, request: dataproxy_pb2.CreateDownloadLocationRequest + self, request: dataproxy_pb2.CreateDownloadLocationRequest ) -> dataproxy_pb2.CreateDownloadLocationResponse: ... async def CreateDownloadLink( - self, request: dataproxy_pb2.CreateDownloadLinkRequest + self, request: dataproxy_pb2.CreateDownloadLinkRequest ) -> dataproxy_pb2.CreateDownloadLinkResponse: ... async def GetData(self, request: dataproxy_pb2.GetDataRequest) -> dataproxy_pb2.GetDataResponse: ... @@ -139,7 +115,7 @@ async def GetData(self, request: dataproxy_pb2.GetDataRequest) -> dataproxy_pb2. class RunLogsService(Protocol): def TailLogs( - self, request: run_logs_service_pb2.TailLogsRequest + self, request: run_logs_service_pb2.TailLogsRequest ) -> UnaryStreamCall[RequestType, run_logs_service_pb2.TailLogsResponse]: ... @@ -161,31 +137,31 @@ async def UserInfo(self, request: identity_pb2.UserInfoRequest) -> identity_pb2. class TriggerService(Protocol): async def DeployTrigger( - self, request: trigger_service_pb2.DeployTriggerRequest + self, request: trigger_service_pb2.DeployTriggerRequest ) -> trigger_service_pb2.DeployTriggerResponse: ... async def GetTriggerDetails( - self, request: trigger_service_pb2.GetTriggerDetailsRequest + self, request: trigger_service_pb2.GetTriggerDetailsRequest ) -> trigger_service_pb2.GetTriggerDetailsResponse: ... async def GetTriggerRevisionDetails( - self, request: trigger_service_pb2.GetTriggerRevisionDetailsRequest + self, request: trigger_service_pb2.GetTriggerRevisionDetailsRequest ) -> trigger_service_pb2.GetTriggerRevisionDetailsResponse: ... async def ListTriggers( - self, request: trigger_service_pb2.ListTriggersRequest + self, request: trigger_service_pb2.ListTriggersRequest ) -> trigger_service_pb2.ListTriggersResponse: ... async def GetTriggerRevisionHistory( - self, request: trigger_service_pb2.GetTriggerRevisionHistoryRequest + self, request: trigger_service_pb2.GetTriggerRevisionHistoryRequest ) -> trigger_service_pb2.GetTriggerRevisionHistoryResponse: ... async def UpdateTriggers( - self, request: trigger_service_pb2.UpdateTriggersRequest + self, request: trigger_service_pb2.UpdateTriggersRequest ) -> trigger_service_pb2.UpdateTriggersResponse: ... async def DeleteTriggers( - self, request: trigger_service_pb2.DeleteTriggersRequest + self, request: trigger_service_pb2.DeleteTriggersRequest ) -> trigger_service_pb2.DeleteTriggersResponse: ... @@ -196,6 +172,6 @@ class SettingsService(Protocol): once the settings.proto file is compiled. """ - async def GetSettings(self, request) -> any: ... + async def GetSettings(self, request) -> ...: ... - async def UpdateSettings(self, request) -> any: ... + async def UpdateSettings(self, request) -> ...: ... diff --git a/src/flyte/remote/_client/controlplane.py b/src/flyte/remote/_client/controlplane.py index f4e614238..9818545ab 100644 --- a/src/flyte/remote/_client/controlplane.py +++ b/src/flyte/remote/_client/controlplane.py @@ -38,6 +38,14 @@ from .auth import create_channel +class DummySettingService(): + async def GetSettings(self, request): + return None + + async def UpdateSettings(self, request): + return None + + class ClientSet: def __init__( self, @@ -59,7 +67,7 @@ def __init__( self._identity_service = identity_pb2_grpc.IdentityServiceStub(channel=channel) self._trigger_service = trigger_service_pb2_grpc.TriggerServiceStub(channel=channel) # Note: Settings service stub will be initialized once settings_pb2_grpc is available - self._settings_service = None # settings_pb2_grpc.SettingsServiceStub(channel=channel) + self._settings_service = DummySettingService() @classmethod async def for_endpoint(cls, endpoint: str, *, insecure: bool = False, **kwargs) -> ClientSet: diff --git a/src/flyte/remote/_settings.py b/src/flyte/remote/_settings.py index c0ca296bc..ce39ea707 100644 --- a/src/flyte/remote/_settings.py +++ b/src/flyte/remote/_settings.py @@ -1,9 +1,11 @@ from __future__ import annotations from dataclasses import dataclass +from pathlib import Path from typing import Any from flyte.remote._common import ToJSONMixin +from flyte.remote._settings_store import SettingsStore from flyte.syncify import syncify @@ -53,6 +55,12 @@ class Settings(ToJSONMixin): local_settings: list[LocalSetting] domain: str | None = None project: str | None = None + _store: SettingsStore | None = None + + def __post_init__(self): + """Initialize the settings store if not provided.""" + if self._store is None: + object.__setattr__(self, "_store", SettingsStore()) @staticmethod def _parse_value(raw_value: str) -> Any: @@ -114,9 +122,9 @@ def to_yaml(self) -> str: # Add local overrides section if self.local_settings: lines.append("# Local overrides") - for setting in self.local_settings: - value_str = self._format_value(setting.value) - lines.append(f"{setting.key}: {value_str}") + for l_setting in self.local_settings: + value_str = self._format_value(l_setting.value) + lines.append(f"{l_setting.key}: {value_str}") lines.append("") # Add inherited settings section @@ -158,38 +166,50 @@ def parse_yaml(yaml_content: str) -> dict[str, Any]: @syncify @classmethod - async def get(cls, project: str | None = None, domain: str | None = None) -> Settings: + async def get( + cls, project: str | None = None, domain: str | None = None, store_path: Path | None = None + ) -> Settings: """Retrieve settings for a given scope. Args: project: Project name (requires domain to be set) domain: Domain name + store_path: Optional custom path to settings store file Returns: Settings object with effective and local settings """ - # TODO: Once the gRPC client is implemented, replace this with actual API call + # TODO: Once the gRPC client is implemented, replace local store with API call # from flyte.remote._client.controlplane import ClientSet # client = await ClientSet.from_env() # request = GetSettingsRequest(domain=domain or "", project=project or "") # response = await client.settings_service.GetSettings(request) - # For now, return dummy data + # Use local file-based store + store = SettingsStore(store_path) + + # Get effective settings with inheritance + effective_dict = store.get_effective_settings(domain=domain, project=project) effective_settings = [ - EffectiveSetting(key="default_queue", value="default", origin=SettingOrigin(scope_type="ROOT")), - EffectiveSetting(key="run_concurrency", value=10, origin=SettingOrigin(scope_type="ROOT")), - EffectiveSetting(key="interruptible", value=False, origin=SettingOrigin(scope_type="ROOT")), + EffectiveSetting( + key=key, + value=value, + origin=SettingOrigin(scope_type=scope_type, domain=origin_domain, project=origin_project), + ) + for key, (value, scope_type, origin_domain, origin_project) in effective_dict.items() ] - local_settings = [] - if domain: - # Add some domain-level overrides for demo - local_settings.append(LocalSetting(key="run_concurrency", value=20)) - effective_settings[1] = EffectiveSetting( - key="run_concurrency", value=20, origin=SettingOrigin(scope_type="DOMAIN", domain=domain) - ) + # Get local settings for this scope only + local_dict = store.get_local_settings(domain=domain, project=project) + local_settings = [LocalSetting(key=k, value=v) for k, v in local_dict.items()] - return cls(effective_settings=effective_settings, local_settings=local_settings, domain=domain, project=project) + return cls( + effective_settings=effective_settings, + local_settings=local_settings, + domain=domain, + project=project, + _store=store, + ) @syncify async def update(self, overrides: dict[str, Any]) -> None: @@ -201,7 +221,7 @@ async def update(self, overrides: dict[str, Any]) -> None: Args: overrides: Dictionary of setting key-value pairs to set as local overrides """ - # TODO: Once the gRPC client is implemented, replace this with actual API call + # TODO: Once the gRPC client is implemented, replace local store with API call # from flyte.remote._client.controlplane import ClientSet # client = await ClientSet.from_env() # @@ -216,17 +236,22 @@ async def update(self, overrides: dict[str, Any]) -> None: # ) # await client.settings_service.UpdateSettings(request) - # For now, just update the local state + # Update local file-based store + if self._store is None: + self._store = SettingsStore() + + self._store.set_local_settings(overrides, domain=self.domain, project=self.project) + + # Update local state to reflect the changes self.local_settings = [LocalSetting(key=k, value=v) for k, v in overrides.items()] - # Update effective settings to reflect the changes - effective_dict = {s.key: s for s in self.effective_settings} - for key, value in overrides.items(): - origin = SettingOrigin( - scope_type="PROJECT" if self.project else ("DOMAIN" if self.domain else "ROOT"), - domain=self.domain, - project=self.project, + # Recalculate effective settings with inheritance + effective_dict = self._store.get_effective_settings(domain=self.domain, project=self.project) + self.effective_settings = [ + EffectiveSetting( + key=key, + value=value, + origin=SettingOrigin(scope_type=scope_type, domain=origin_domain, project=origin_project), ) - effective_dict[key] = EffectiveSetting(key=key, value=value, origin=origin) - - self.effective_settings = list(effective_dict.values()) + for key, (value, scope_type, origin_domain, origin_project) in effective_dict.items() + ] diff --git a/src/flyte/remote/_settings_store.py b/src/flyte/remote/_settings_store.py new file mode 100644 index 000000000..e6997d405 --- /dev/null +++ b/src/flyte/remote/_settings_store.py @@ -0,0 +1,198 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + + +class SettingsStore: + """File-based datastore for hierarchical settings. + + Stores settings in a JSON file with the following structure: + { + "ROOT": {"key": value, ...}, + "DOMAIN": { + "domain_name": {"key": value, ...} + }, + "PROJECT": { + "domain_name": { + "project_name": {"key": value, ...} + } + } + } + """ + + def __init__(self, store_path: Path | None = None): + """Initialize the settings store. + + Args: + store_path: Path to the JSON file. If None, uses ~/.flyte/settings.json + """ + if store_path is None: + store_path = Path.home() / ".flyte" / "settings.json" + + self.store_path = Path(store_path) + self._ensure_store_exists() + + def _ensure_store_exists(self) -> None: + """Create the store file and directory if they don't exist.""" + self.store_path.parent.mkdir(parents=True, exist_ok=True) + if not self.store_path.exists(): + self._write_store({"ROOT": {}, "DOMAIN": {}, "PROJECT": {}}) + + def _read_store(self) -> dict[str, Any]: + """Read the entire store from disk.""" + try: + with open(self.store_path, "r") as f: + return json.load(f) + except (json.JSONDecodeError, FileNotFoundError): + # If file is corrupted or missing, reset it + default_store: dict = {"ROOT": {}, "DOMAIN": {}, "PROJECT": {}} + self._write_store(default_store) + return default_store + + def _write_store(self, data: dict[str, Any]) -> None: + """Write the entire store to disk.""" + with open(self.store_path, "w") as f: + json.dump(data, f, indent=2) + + def get_local_settings(self, domain: str | None = None, project: str | None = None) -> dict[str, Any]: + """Get local settings for a specific scope. + + Args: + domain: Domain name (optional) + project: Project name (requires domain) + + Returns: + Dictionary of local settings for the specified scope + """ + store = self._read_store() + + if project and domain: + # PROJECT scope + return store.get("PROJECT", {}).get(domain, {}).get(project, {}) + elif domain: + # DOMAIN scope + return store.get("DOMAIN", {}).get(domain, {}) + else: + # ROOT scope + return store.get("ROOT", {}) + + def get_effective_settings( + self, domain: str | None = None, project: str | None = None + ) -> dict[str, tuple[Any, str, str | None, str | None]]: + """Get effective settings with inheritance and origin tracking. + + Returns a dictionary where each value is a tuple of: + (value, scope_type, origin_domain, origin_project) + + Args: + domain: Domain name (optional) + project: Project name (requires domain) + + Returns: + Dictionary mapping setting keys to (value, scope_type, domain, project) tuples + """ + store = self._read_store() + effective = {} + + # Start with ROOT settings + for key, value in store.get("ROOT", {}).items(): + effective[key] = (value, "ROOT", None, None) + + # Override with DOMAIN settings if applicable + if domain: + domain_settings = store.get("DOMAIN", {}).get(domain, {}) + for key, value in domain_settings.items(): + effective[key] = (value, "DOMAIN", domain, None) + + # Override with PROJECT settings if applicable + if project and domain: + project_settings = store.get("PROJECT", {}).get(domain, {}).get(project, {}) + for key, value in project_settings.items(): + effective[key] = (value, "PROJECT", domain, project) + + return effective + + def set_local_settings( + self, settings: dict[str, Any], domain: str | None = None, project: str | None = None + ) -> None: + """Set local settings for a specific scope, replacing all existing settings. + + Args: + settings: Dictionary of settings to store + domain: Domain name (optional) + project: Project name (requires domain) + """ + store = self._read_store() + + if project and domain: + # PROJECT scope + if "PROJECT" not in store: + store["PROJECT"] = {} + if domain not in store["PROJECT"]: + store["PROJECT"][domain] = {} + store["PROJECT"][domain][project] = settings + elif domain: + # DOMAIN scope + if "DOMAIN" not in store: + store["DOMAIN"] = {} + store["DOMAIN"][domain] = settings + else: + # ROOT scope + store["ROOT"] = settings + + self._write_store(store) + + def delete_scope(self, domain: str | None = None, project: str | None = None) -> None: + """Delete all settings for a specific scope. + + Args: + domain: Domain name (optional) + project: Project name (requires domain) + """ + store = self._read_store() + + if project and domain: + # Delete PROJECT scope + if domain in store.get("PROJECT", {}) and project in store["PROJECT"][domain]: + del store["PROJECT"][domain][project] + # Clean up empty domain + if not store["PROJECT"][domain]: + del store["PROJECT"][domain] + elif domain: + # Delete DOMAIN scope + if domain in store.get("DOMAIN", {}): + del store["DOMAIN"][domain] + # Also delete all projects in this domain + if domain in store.get("PROJECT", {}): + del store["PROJECT"][domain] + else: + # Delete ROOT scope (reset to empty) + store["ROOT"] = {} + + self._write_store(store) + + def list_domains(self) -> list[str]: + """List all domains that have settings defined. + + Returns: + List of domain names + """ + store = self._read_store() + domains = set() + domains.update(store.get("DOMAIN", {}).keys()) + domains.update(store.get("PROJECT", {}).keys()) + return sorted(domains) + + def list_projects(self, domain: str) -> list[str]: + """List all projects in a domain that have settings defined. + + Args: + domain: Domain name + + Returns: + List of project names + """ + store = self._read_store() + return sorted(store.get("PROJECT", {}).get(domain, {}).keys())