From d255119699b5c44ebd6bb3b152c4ddb77b1eff44 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 30 May 2023 15:57:12 -0700 Subject: [PATCH 01/19] Reanme external plugin Signed-off-by: Kevin Su --- .github/workflows/pythonpublish.yml | 10 ++--- ...xternal-plugin-service => Dockerfile.agent | 0 flytekit/clis/sdk_in_container/serve.py | 6 +-- ...nal_plugin_service.py => agent_service.py} | 14 +++---- flytekit/extend/backend/base_plugin.py | 38 +++++++++---------- .../flytekitplugins/bigquery/__init__.py | 2 +- .../bigquery/{backend_plugin.py => agent.py} | 11 ++---- .../{test_backend_plugin.py => test_agent.py} | 8 ++-- setup.py | 2 +- .../{test_backend_plugin.py => test_agent.py} | 12 +++--- 10 files changed, 49 insertions(+), 54 deletions(-) rename Dockerfile.external-plugin-service => Dockerfile.agent (100%) rename flytekit/extend/backend/{external_plugin_service.py => agent_service.py} (79%) rename plugins/flytekit-bigquery/flytekitplugins/bigquery/{backend_plugin.py => agent.py} (91%) rename plugins/flytekit-bigquery/tests/{test_backend_plugin.py => test_agent.py} (91%) rename tests/flytekit/unit/extend/{test_backend_plugin.py => test_agent.py} (88%) diff --git a/.github/workflows/pythonpublish.yml b/.github/workflows/pythonpublish.yml index 120fd56cb8..e9a07710ea 100644 --- a/.github/workflows/pythonpublish.yml +++ b/.github/workflows/pythonpublish.yml @@ -142,7 +142,7 @@ jobs: cache-from: type=gha cache-to: type=gha,mode=max - build-and-push-external-plugin-service-images: + build-and-push-flyteagent-images: runs-on: ubuntu-latest needs: deploy steps: @@ -162,11 +162,11 @@ jobs: username: "${{ secrets.FLYTE_BOT_USERNAME }}" password: "${{ secrets.FLYTE_BOT_PAT }}" - name: Prepare External Plugin Service Image Names - id: external-plugin-service-names + id: flyteagent-names uses: docker/metadata-action@v3 with: images: | - ghcr.io/${{ github.repository_owner }}/external-plugin-service + ghcr.io/${{ github.repository_owner }}/flyteagent tags: | latest ${{ github.sha }} @@ -177,10 +177,10 @@ jobs: context: "." platforms: linux/arm64, linux/amd64 push: ${{ github.event_name == 'release' }} - tags: ${{ steps.external-plugin-service-names.outputs.tags }} + tags: ${{ steps.flyteagent-names.outputs.tags }} build-args: | VERSION=${{ needs.deploy.outputs.version }} - file: ./Dockerfile.external-plugin-service + file: ./Dockerfile.agent cache-from: type=gha cache-to: type=gha,mode=max diff --git a/Dockerfile.external-plugin-service b/Dockerfile.agent similarity index 100% rename from Dockerfile.external-plugin-service rename to Dockerfile.agent diff --git a/flytekit/clis/sdk_in_container/serve.py b/flytekit/clis/sdk_in_container/serve.py index 71b539d36c..4007b6cc16 100644 --- a/flytekit/clis/sdk_in_container/serve.py +++ b/flytekit/clis/sdk_in_container/serve.py @@ -2,9 +2,9 @@ import click import grpc -from flyteidl.service.external_plugin_service_pb2_grpc import add_ExternalPluginServiceServicer_to_server +from flyteidl.service.agent_service_pb2_grpc import add_AgentServiceServicer_to_server -from flytekit.extend.backend.external_plugin_service import BackendPluginServer +from flytekit.extend.backend.agent_service import BackendPluginServer _serve_help = """Start a grpc server for the external plugin service.""" @@ -39,7 +39,7 @@ def serve(_: click.Context, port, worker, timeout): """ click.secho("Starting the external plugin service...", fg="blue") server = grpc.server(futures.ThreadPoolExecutor(max_workers=worker)) - add_ExternalPluginServiceServicer_to_server(BackendPluginServer(), server) + add_AgentServiceServicer_to_server(BackendPluginServer(), server) server.add_insecure_port(f"[::]:{port}") server.start() diff --git a/flytekit/extend/backend/external_plugin_service.py b/flytekit/extend/backend/agent_service.py similarity index 79% rename from flytekit/extend/backend/external_plugin_service.py rename to flytekit/extend/backend/agent_service.py index e820a320b1..abfb4c74df 100644 --- a/flytekit/extend/backend/external_plugin_service.py +++ b/flytekit/extend/backend/agent_service.py @@ -1,5 +1,5 @@ import grpc -from flyteidl.service.external_plugin_service_pb2 import ( +from flyteidl.service.agent_service_pb2 import ( PERMANENT_FAILURE, TaskCreateRequest, TaskCreateResponse, @@ -8,20 +8,20 @@ TaskGetRequest, TaskGetResponse, ) -from flyteidl.service.external_plugin_service_pb2_grpc import ExternalPluginServiceServicer +from flyteidl.service.agent_service_pb2_grpc import AgentServiceServicer from flytekit import logger -from flytekit.extend.backend.base_plugin import BackendPluginRegistry +from flytekit.extend.backend.base_plugin import AgentRegistry from flytekit.models.literals import LiteralMap from flytekit.models.task import TaskTemplate -class BackendPluginServer(ExternalPluginServiceServicer): +class BackendPluginServer(AgentServiceServicer): def CreateTask(self, request: TaskCreateRequest, context: grpc.ServicerContext) -> TaskCreateResponse: try: tmp = TaskTemplate.from_flyte_idl(request.template) inputs = LiteralMap.from_flyte_idl(request.inputs) if request.inputs else None - plugin = BackendPluginRegistry.get_plugin(context, tmp.type) + plugin = AgentRegistry.get_plugin(context, tmp.type) if plugin is None: return TaskCreateResponse() return plugin.create(context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp) @@ -32,7 +32,7 @@ def CreateTask(self, request: TaskCreateRequest, context: grpc.ServicerContext) def GetTask(self, request: TaskGetRequest, context: grpc.ServicerContext) -> TaskGetResponse: try: - plugin = BackendPluginRegistry.get_plugin(context, request.task_type) + plugin = AgentRegistry.get_plugin(context, request.task_type) if plugin is None: return TaskGetResponse(state=PERMANENT_FAILURE) return plugin.get(context=context, job_id=request.job_id) @@ -43,7 +43,7 @@ def GetTask(self, request: TaskGetRequest, context: grpc.ServicerContext) -> Tas def DeleteTask(self, request: TaskDeleteRequest, context: grpc.ServicerContext) -> TaskDeleteResponse: try: - plugin = BackendPluginRegistry.get_plugin(context, request.task_type) + plugin = AgentRegistry.get_plugin(context, request.task_type) if plugin is None: return TaskDeleteResponse() return plugin.delete(context=context, job_id=request.job_id) diff --git a/flytekit/extend/backend/base_plugin.py b/flytekit/extend/backend/base_plugin.py index 9fc1bc206b..4b0e96931f 100644 --- a/flytekit/extend/backend/base_plugin.py +++ b/flytekit/extend/backend/base_plugin.py @@ -3,7 +3,7 @@ import grpc from flyteidl.core.tasks_pb2 import TaskTemplate -from flyteidl.service.external_plugin_service_pb2 import ( +from flyteidl.service.agent_service_pb2 import ( RETRYABLE_FAILURE, RUNNING, SUCCEEDED, @@ -17,14 +17,14 @@ from flytekit.models.literals import LiteralMap -class BackendPluginBase(ABC): +class AgentBase(ABC): """ This is the base class for all backend plugins. It defines the interface that all plugins must implement. - The external plugins service will be run either locally or in a pod, and will be responsible for - invoking backend plugins. The propeller will communicate with the external plugins service + The agent service will be run either locally or in a pod, and will be responsible for + invoking agents. The propeller will communicate with the agent service to create tasks, get the status of tasks, and delete tasks. - All the backend plugins should be registered in the BackendPluginRegistry. External plugins service + All the agents should be registered in the AgentRegistry. Agent Service will look up the plugin based on the task type. Every task type can only have one plugin. """ @@ -68,34 +68,34 @@ def delete(self, context: grpc.ServicerContext, job_id: str) -> TaskDeleteRespon pass -class BackendPluginRegistry(object): +class AgentRegistry(object): """ - This is the registry for all backend plugins. The external plugins service will look up the plugin + This is the registry for all agents. The agent service will look up the agent based on the task type. """ - _REGISTRY: typing.Dict[str, BackendPluginBase] = {} + _REGISTRY: typing.Dict[str, AgentBase] = {} @staticmethod - def register(plugin: BackendPluginBase): - if plugin.task_type in BackendPluginRegistry._REGISTRY: - raise ValueError(f"Duplicate plugin for task type {plugin.task_type}") - BackendPluginRegistry._REGISTRY[plugin.task_type] = plugin - logger.info(f"Registering backend plugin for task type {plugin.task_type}") + def register(plugin: AgentBase): + if plugin.task_type in AgentRegistry._REGISTRY: + raise ValueError(f"Duplicate agent for task type {plugin.task_type}") + AgentRegistry._REGISTRY[plugin.task_type] = plugin + logger.info(f"Registering an agent for task type {plugin.task_type}") @staticmethod - def get_plugin(context: grpc.ServicerContext, task_type: str) -> typing.Optional[BackendPluginBase]: - if task_type not in BackendPluginRegistry._REGISTRY: - logger.error(f"Cannot find backend plugin for task type [{task_type}]") + def get_plugin(context: grpc.ServicerContext, task_type: str) -> typing.Optional[AgentBase]: + if task_type not in AgentRegistry._REGISTRY: + logger.error(f"Cannot find agent for task type [{task_type}]") context.set_code(grpc.StatusCode.NOT_FOUND) - context.set_details(f"Cannot find backend plugin for task type [{task_type}]") + context.set_details(f"Cannot find the agent for task type [{task_type}]") return None - return BackendPluginRegistry._REGISTRY[task_type] + return AgentRegistry._REGISTRY[task_type] def convert_to_flyte_state(state: str) -> State: """ - Convert the state from the backend plugin to the state in flyte. + Convert the state from the agent to the state in flyte. """ state = state.lower() if state in ["failed"]: diff --git a/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py b/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py index 416a021516..6639a00416 100644 --- a/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py +++ b/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py @@ -11,5 +11,5 @@ BigQueryTask """ -from .backend_plugin import BigQueryPlugin +from .agent import BigQueryPlugin from .task import BigQueryConfig, BigQueryTask diff --git a/plugins/flytekit-bigquery/flytekitplugins/bigquery/backend_plugin.py b/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py similarity index 91% rename from plugins/flytekit-bigquery/flytekitplugins/bigquery/backend_plugin.py rename to plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py index acd5ece430..19a2deee76 100644 --- a/plugins/flytekit-bigquery/flytekitplugins/bigquery/backend_plugin.py +++ b/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py @@ -2,17 +2,12 @@ from typing import Dict, Optional import grpc -from flyteidl.service.external_plugin_service_pb2 import ( - SUCCEEDED, - TaskCreateResponse, - TaskDeleteResponse, - TaskGetResponse, -) +from flyteidl.service.agent_service_pb2 import SUCCEEDED, TaskCreateResponse, TaskDeleteResponse, TaskGetResponse from google.cloud import bigquery from flytekit import FlyteContextManager, StructuredDataset, logger from flytekit.core.type_engine import TypeEngine -from flytekit.extend.backend.base_plugin import BackendPluginBase, BackendPluginRegistry, convert_to_flyte_state +from flytekit.extend.backend.base_plugin import AgentBase, BackendPluginRegistry, convert_to_flyte_state from flytekit.models import literals from flytekit.models.literals import LiteralMap from flytekit.models.task import TaskTemplate @@ -30,7 +25,7 @@ } -class BigQueryPlugin(BackendPluginBase): +class BigQueryPlugin(AgentBase): def __init__(self): super().__init__(task_type="bigquery_query_job_task") diff --git a/plugins/flytekit-bigquery/tests/test_backend_plugin.py b/plugins/flytekit-bigquery/tests/test_agent.py similarity index 91% rename from plugins/flytekit-bigquery/tests/test_backend_plugin.py rename to plugins/flytekit-bigquery/tests/test_agent.py index c95cf308a7..55582d5c8c 100644 --- a/plugins/flytekit-bigquery/tests/test_backend_plugin.py +++ b/plugins/flytekit-bigquery/tests/test_agent.py @@ -3,10 +3,10 @@ from unittest.mock import MagicMock import grpc -from flyteidl.service.external_plugin_service_pb2 import SUCCEEDED +from flyteidl.service.agent_service_pb2 import SUCCEEDED import flytekit.models.interface as interface_models -from flytekit.extend.backend.base_plugin import BackendPluginRegistry +from flytekit.extend.backend.base_plugin import AgentRegistry from flytekit.interfaces.cli_identifiers import Identifier from flytekit.models import literals, task, types from flytekit.models.core.identifier import ResourceType @@ -15,7 +15,7 @@ @mock.patch("google.cloud.bigquery.job.QueryJob") @mock.patch("google.cloud.bigquery.Client") -def test_bigquery_plugin(mock_client, mock_query_job): +def test_bigquery_agent(mock_client, mock_query_job): job_id = "dummy_id" mock_instance = mock_client.return_value mock_query_job_instance = mock_query_job.return_value @@ -39,7 +39,7 @@ def __init__(self): mock_instance.cancel_job.return_value = MockJob() ctx = MagicMock(spec=grpc.ServicerContext) - p = BackendPluginRegistry.get_plugin(ctx, "bigquery_query_job_task") + p = AgentRegistry.get_plugin(ctx, "bigquery_query_job_task") task_id = Identifier( resource_type=ResourceType.TASK, project="project", domain="domain", name="name", version="version" diff --git a/setup.py b/setup.py index 5273590d9f..dff7f34579 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ }, install_requires=[ "googleapis-common-protos>=1.57", - "flyteidl>=1.5.6", + "flyteidl>=1.5.9", "wheel>=0.30.0,<1.0.0", "pandas>=1.0.0,<2.0.0", "pyarrow>=4.0.0,<11.0.0", diff --git a/tests/flytekit/unit/extend/test_backend_plugin.py b/tests/flytekit/unit/extend/test_agent.py similarity index 88% rename from tests/flytekit/unit/extend/test_backend_plugin.py rename to tests/flytekit/unit/extend/test_agent.py index 9dfd20d99e..fe6106c7fc 100644 --- a/tests/flytekit/unit/extend/test_backend_plugin.py +++ b/tests/flytekit/unit/extend/test_agent.py @@ -3,7 +3,7 @@ from unittest.mock import MagicMock import grpc -from flyteidl.service.external_plugin_service_pb2 import ( +from flyteidl.service.agent_service_pb2 import ( PERMANENT_FAILURE, SUCCEEDED, TaskCreateRequest, @@ -15,8 +15,8 @@ ) import flytekit.models.interface as interface_models -from flytekit.extend.backend.base_plugin import BackendPluginBase, BackendPluginRegistry -from flytekit.extend.backend.external_plugin_service import BackendPluginServer +from flytekit.extend.backend.agent_service import BackendPluginServer +from flytekit.extend.backend.base_plugin import AgentBase, AgentRegistry from flytekit.models import literals, task, types from flytekit.models.core.identifier import Identifier, ResourceType from flytekit.models.literals import LiteralMap @@ -25,7 +25,7 @@ dummy_id = "dummy_id" -class DummyPlugin(BackendPluginBase): +class DummyPlugin(AgentBase): def __init__(self): super().__init__(task_type="dummy") @@ -45,7 +45,7 @@ def delete(self, context: grpc.ServicerContext, job_id) -> TaskDeleteResponse: return TaskDeleteResponse() -BackendPluginRegistry.register(DummyPlugin()) +AgentRegistry.register(DummyPlugin()) task_id = Identifier(resource_type=ResourceType.TASK, project="project", domain="domain", name="t1", version="version") task_metadata = task.TaskMetadata( @@ -84,7 +84,7 @@ def delete(self, context: grpc.ServicerContext, job_id) -> TaskDeleteResponse: def test_dummy_plugin(): ctx = MagicMock(spec=grpc.ServicerContext) - p = BackendPluginRegistry.get_plugin(ctx, "dummy") + p = AgentRegistry.get_plugin(ctx, "dummy") assert p.create(ctx, "/tmp", dummy_template, task_inputs).job_id == dummy_id assert p.get(ctx, dummy_id).state == SUCCEEDED assert p.delete(ctx, dummy_id) == TaskDeleteResponse() From 22202aff47f8992e93596bd87a5fd81951bc27b1 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 30 May 2023 15:58:53 -0700 Subject: [PATCH 02/19] nit Signed-off-by: Kevin Su --- .github/workflows/pythonpublish.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pythonpublish.yml b/.github/workflows/pythonpublish.yml index e9a07710ea..cd0229efd8 100644 --- a/.github/workflows/pythonpublish.yml +++ b/.github/workflows/pythonpublish.yml @@ -161,7 +161,7 @@ jobs: registry: ghcr.io username: "${{ secrets.FLYTE_BOT_USERNAME }}" password: "${{ secrets.FLYTE_BOT_PAT }}" - - name: Prepare External Plugin Service Image Names + - name: Prepare Fylte Agent Image Names id: flyteagent-names uses: docker/metadata-action@v3 with: From 3eb25a1e34df43b5fe0fac22dcffe304052a99d6 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 30 May 2023 16:07:33 -0700 Subject: [PATCH 03/19] nit Signed-off-by: Kevin Su --- plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py b/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py index 19a2deee76..ed1fc8d014 100644 --- a/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py +++ b/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py @@ -7,7 +7,7 @@ from flytekit import FlyteContextManager, StructuredDataset, logger from flytekit.core.type_engine import TypeEngine -from flytekit.extend.backend.base_plugin import AgentBase, BackendPluginRegistry, convert_to_flyte_state +from flytekit.extend.backend.base_plugin import AgentBase, AgentRegistry, convert_to_flyte_state from flytekit.models import literals from flytekit.models.literals import LiteralMap from flytekit.models.task import TaskTemplate @@ -86,4 +86,4 @@ def delete(self, context: grpc.ServicerContext, job_id: str) -> TaskDeleteRespon return TaskDeleteResponse() -BackendPluginRegistry.register(BigQueryPlugin()) +AgentRegistry.register(BigQueryPlugin()) From 6462625364cd7d1796c65314081a06fc9a40b9bc Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 30 May 2023 16:15:20 -0700 Subject: [PATCH 04/19] nit Signed-off-by: Kevin Su --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index dff7f34579..d8e2ac6a0d 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ }, install_requires=[ "googleapis-common-protos>=1.57", - "flyteidl>=1.5.9", + "flyteidl==1.5.9", "wheel>=0.30.0,<1.0.0", "pandas>=1.0.0,<2.0.0", "pyarrow>=4.0.0,<11.0.0", From ee3def00fc2160f8918acd7b840926251dd55f85 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 30 May 2023 16:24:29 -0700 Subject: [PATCH 05/19] nit Signed-off-by: Kevin Su --- flytekit/extend/backend/agent_service.py | 22 +++++++++---------- .../backend/{base_plugin.py => base_agent.py} | 18 +++++++-------- .../flytekitplugins/bigquery/agent.py | 6 ++--- plugins/flytekit-bigquery/tests/test_agent.py | 4 ++-- tests/flytekit/unit/extend/test_agent.py | 22 +++++++++---------- 5 files changed, 36 insertions(+), 36 deletions(-) rename flytekit/extend/backend/{base_plugin.py => base_agent.py} (80%) diff --git a/flytekit/extend/backend/agent_service.py b/flytekit/extend/backend/agent_service.py index abfb4c74df..dae7591d41 100644 --- a/flytekit/extend/backend/agent_service.py +++ b/flytekit/extend/backend/agent_service.py @@ -11,20 +11,20 @@ from flyteidl.service.agent_service_pb2_grpc import AgentServiceServicer from flytekit import logger -from flytekit.extend.backend.base_plugin import AgentRegistry +from flytekit.extend.backend.base_agent import AgentRegistry from flytekit.models.literals import LiteralMap from flytekit.models.task import TaskTemplate -class BackendPluginServer(AgentServiceServicer): +class AgentService(AgentServiceServicer): def CreateTask(self, request: TaskCreateRequest, context: grpc.ServicerContext) -> TaskCreateResponse: try: tmp = TaskTemplate.from_flyte_idl(request.template) inputs = LiteralMap.from_flyte_idl(request.inputs) if request.inputs else None - plugin = AgentRegistry.get_plugin(context, tmp.type) - if plugin is None: + agent = AgentRegistry.get_agent(context, tmp.type) + if agent is None: return TaskCreateResponse() - return plugin.create(context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp) + return agent.create(context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp) except Exception as e: logger.error(f"failed to create task with error {e}") context.set_code(grpc.StatusCode.INTERNAL) @@ -32,10 +32,10 @@ def CreateTask(self, request: TaskCreateRequest, context: grpc.ServicerContext) def GetTask(self, request: TaskGetRequest, context: grpc.ServicerContext) -> TaskGetResponse: try: - plugin = AgentRegistry.get_plugin(context, request.task_type) - if plugin is None: + agent = AgentRegistry.get_agent(context, request.task_type) + if agent is None: return TaskGetResponse(state=PERMANENT_FAILURE) - return plugin.get(context=context, job_id=request.job_id) + return agent.get(context=context, job_id=request.job_id) except Exception as e: logger.error(f"failed to get task with error {e}") context.set_code(grpc.StatusCode.INTERNAL) @@ -43,10 +43,10 @@ def GetTask(self, request: TaskGetRequest, context: grpc.ServicerContext) -> Tas def DeleteTask(self, request: TaskDeleteRequest, context: grpc.ServicerContext) -> TaskDeleteResponse: try: - plugin = AgentRegistry.get_plugin(context, request.task_type) - if plugin is None: + agent = AgentRegistry.get_agent(context, request.task_type) + if agent is None: return TaskDeleteResponse() - return plugin.delete(context=context, job_id=request.job_id) + return agent.delete(context=context, job_id=request.job_id) except Exception as e: logger.error(f"failed to delete task with error {e}") context.set_code(grpc.StatusCode.INTERNAL) diff --git a/flytekit/extend/backend/base_plugin.py b/flytekit/extend/backend/base_agent.py similarity index 80% rename from flytekit/extend/backend/base_plugin.py rename to flytekit/extend/backend/base_agent.py index 4b0e96931f..4dddd2998b 100644 --- a/flytekit/extend/backend/base_plugin.py +++ b/flytekit/extend/backend/base_agent.py @@ -19,13 +19,13 @@ class AgentBase(ABC): """ - This is the base class for all backend plugins. It defines the interface that all plugins must implement. + This is the base class for all agents. It defines the interface that all agents must implement. The agent service will be run either locally or in a pod, and will be responsible for invoking agents. The propeller will communicate with the agent service to create tasks, get the status of tasks, and delete tasks. All the agents should be registered in the AgentRegistry. Agent Service - will look up the plugin based on the task type. Every task type can only have one plugin. + will look up the agent based on the task type. Every task type can only have one agent. """ def __init__(self, task_type: str): @@ -34,7 +34,7 @@ def __init__(self, task_type: str): @property def task_type(self) -> str: """ - task_type is the name of the task type that this plugin supports. + task_type is the name of the task type that this agent supports. """ return self._task_type @@ -77,14 +77,14 @@ class AgentRegistry(object): _REGISTRY: typing.Dict[str, AgentBase] = {} @staticmethod - def register(plugin: AgentBase): - if plugin.task_type in AgentRegistry._REGISTRY: - raise ValueError(f"Duplicate agent for task type {plugin.task_type}") - AgentRegistry._REGISTRY[plugin.task_type] = plugin - logger.info(f"Registering an agent for task type {plugin.task_type}") + def register(agent: AgentBase): + if agent.task_type in AgentRegistry._REGISTRY: + raise ValueError(f"Duplicate agent for task type {agent.task_type}") + AgentRegistry._REGISTRY[agent.task_type] = agent + logger.info(f"Registering an agent for task type {agent.task_type}") @staticmethod - def get_plugin(context: grpc.ServicerContext, task_type: str) -> typing.Optional[AgentBase]: + def get_agent(context: grpc.ServicerContext, task_type: str) -> typing.Optional[AgentBase]: if task_type not in AgentRegistry._REGISTRY: logger.error(f"Cannot find agent for task type [{task_type}]") context.set_code(grpc.StatusCode.NOT_FOUND) diff --git a/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py b/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py index ed1fc8d014..a4256db133 100644 --- a/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py +++ b/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py @@ -7,7 +7,7 @@ from flytekit import FlyteContextManager, StructuredDataset, logger from flytekit.core.type_engine import TypeEngine -from flytekit.extend.backend.base_plugin import AgentBase, AgentRegistry, convert_to_flyte_state +from flytekit.extend.backend.base_agent import AgentBase, AgentRegistry, convert_to_flyte_state from flytekit.models import literals from flytekit.models.literals import LiteralMap from flytekit.models.task import TaskTemplate @@ -25,7 +25,7 @@ } -class BigQueryPlugin(AgentBase): +class BigQueryAgent(AgentBase): def __init__(self): super().__init__(task_type="bigquery_query_job_task") @@ -86,4 +86,4 @@ def delete(self, context: grpc.ServicerContext, job_id: str) -> TaskDeleteRespon return TaskDeleteResponse() -AgentRegistry.register(BigQueryPlugin()) +AgentRegistry.register(BigQueryAgent()) diff --git a/plugins/flytekit-bigquery/tests/test_agent.py b/plugins/flytekit-bigquery/tests/test_agent.py index 55582d5c8c..e9b86a6af7 100644 --- a/plugins/flytekit-bigquery/tests/test_agent.py +++ b/plugins/flytekit-bigquery/tests/test_agent.py @@ -6,7 +6,7 @@ from flyteidl.service.agent_service_pb2 import SUCCEEDED import flytekit.models.interface as interface_models -from flytekit.extend.backend.base_plugin import AgentRegistry +from flytekit.extend.backend.base_agent import AgentRegistry from flytekit.interfaces.cli_identifiers import Identifier from flytekit.models import literals, task, types from flytekit.models.core.identifier import ResourceType @@ -39,7 +39,7 @@ def __init__(self): mock_instance.cancel_job.return_value = MockJob() ctx = MagicMock(spec=grpc.ServicerContext) - p = AgentRegistry.get_plugin(ctx, "bigquery_query_job_task") + p = AgentRegistry.get_agent(ctx, "bigquery_query_job_task") task_id = Identifier( resource_type=ResourceType.TASK, project="project", domain="domain", name="name", version="version" diff --git a/tests/flytekit/unit/extend/test_agent.py b/tests/flytekit/unit/extend/test_agent.py index fe6106c7fc..d4b27494a9 100644 --- a/tests/flytekit/unit/extend/test_agent.py +++ b/tests/flytekit/unit/extend/test_agent.py @@ -15,8 +15,8 @@ ) import flytekit.models.interface as interface_models -from flytekit.extend.backend.agent_service import BackendPluginServer -from flytekit.extend.backend.base_plugin import AgentBase, AgentRegistry +from flytekit.extend.backend.agent_service import AgentService +from flytekit.extend.backend.base_agent import AgentBase, AgentRegistry from flytekit.models import literals, task, types from flytekit.models.core.identifier import Identifier, ResourceType from flytekit.models.literals import LiteralMap @@ -25,7 +25,7 @@ dummy_id = "dummy_id" -class DummyPlugin(AgentBase): +class DummyAgent(AgentBase): def __init__(self): super().__init__(task_type="dummy") @@ -45,7 +45,7 @@ def delete(self, context: grpc.ServicerContext, job_id) -> TaskDeleteResponse: return TaskDeleteResponse() -AgentRegistry.register(DummyPlugin()) +AgentRegistry.register(DummyAgent()) task_id = Identifier(resource_type=ResourceType.TASK, project="project", domain="domain", name="t1", version="version") task_metadata = task.TaskMetadata( @@ -84,22 +84,22 @@ def delete(self, context: grpc.ServicerContext, job_id) -> TaskDeleteResponse: def test_dummy_plugin(): ctx = MagicMock(spec=grpc.ServicerContext) - p = AgentRegistry.get_plugin(ctx, "dummy") + p = AgentRegistry.get_agent(ctx, "dummy") assert p.create(ctx, "/tmp", dummy_template, task_inputs).job_id == dummy_id assert p.get(ctx, dummy_id).state == SUCCEEDED assert p.delete(ctx, dummy_id) == TaskDeleteResponse() -def test_backend_plugin_server(): - server = BackendPluginServer() +def test_agent_server(): + service = AgentService() ctx = MagicMock(spec=grpc.ServicerContext) request = TaskCreateRequest( inputs=task_inputs.to_flyte_idl(), output_prefix="/tmp", template=dummy_template.to_flyte_idl() ) - assert server.CreateTask(request, ctx).job_id == dummy_id - assert server.GetTask(TaskGetRequest(task_type="dummy", job_id=dummy_id), ctx).state == SUCCEEDED - assert server.DeleteTask(TaskDeleteRequest(task_type="dummy", job_id=dummy_id), ctx) == TaskDeleteResponse() + assert service.CreateTask(request, ctx).job_id == dummy_id + assert service.GetTask(TaskGetRequest(task_type="dummy", job_id=dummy_id), ctx).state == SUCCEEDED + assert service.DeleteTask(TaskDeleteRequest(task_type="dummy", job_id=dummy_id), ctx) == TaskDeleteResponse() - res = server.GetTask(TaskGetRequest(task_type="fake", job_id=dummy_id), ctx) + res = service.GetTask(TaskGetRequest(task_type="fake", job_id=dummy_id), ctx) assert res.state == PERMANENT_FAILURE From 499f2a2ee3e4079991bc261bbafecbd50aa31205 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 30 May 2023 16:27:19 -0700 Subject: [PATCH 06/19] fix tests Signed-off-by: Kevin Su --- flytekit/clis/sdk_in_container/serve.py | 4 ++-- .../flytekit-bigquery/flytekitplugins/bigquery/__init__.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/flytekit/clis/sdk_in_container/serve.py b/flytekit/clis/sdk_in_container/serve.py index 4007b6cc16..957e0ba8f2 100644 --- a/flytekit/clis/sdk_in_container/serve.py +++ b/flytekit/clis/sdk_in_container/serve.py @@ -4,7 +4,7 @@ import grpc from flyteidl.service.agent_service_pb2_grpc import add_AgentServiceServicer_to_server -from flytekit.extend.backend.agent_service import BackendPluginServer +from flytekit.extend.backend.agent_service import AgentServiceServicer _serve_help = """Start a grpc server for the external plugin service.""" @@ -39,7 +39,7 @@ def serve(_: click.Context, port, worker, timeout): """ click.secho("Starting the external plugin service...", fg="blue") server = grpc.server(futures.ThreadPoolExecutor(max_workers=worker)) - add_AgentServiceServicer_to_server(BackendPluginServer(), server) + add_AgentServiceServicer_to_server(AgentServiceServicer(), server) server.add_insecure_port(f"[::]:{port}") server.start() diff --git a/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py b/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py index 6639a00416..0e0fe80bc7 100644 --- a/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py +++ b/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py @@ -9,7 +9,8 @@ BigQueryConfig BigQueryTask + BigQueryAgent """ -from .agent import BigQueryPlugin +from .agent import BigQueryAgent from .task import BigQueryConfig, BigQueryTask From 2262489e125d9d21b4dff6b562a43778461e1935 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 30 May 2023 16:31:53 -0700 Subject: [PATCH 07/19] fix tests Signed-off-by: Kevin Su --- flytekit/clis/sdk_in_container/serve.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flytekit/clis/sdk_in_container/serve.py b/flytekit/clis/sdk_in_container/serve.py index 957e0ba8f2..ca03737036 100644 --- a/flytekit/clis/sdk_in_container/serve.py +++ b/flytekit/clis/sdk_in_container/serve.py @@ -4,7 +4,7 @@ import grpc from flyteidl.service.agent_service_pb2_grpc import add_AgentServiceServicer_to_server -from flytekit.extend.backend.agent_service import AgentServiceServicer +from flytekit.extend.backend.agent_service import AgentService _serve_help = """Start a grpc server for the external plugin service.""" @@ -39,7 +39,7 @@ def serve(_: click.Context, port, worker, timeout): """ click.secho("Starting the external plugin service...", fg="blue") server = grpc.server(futures.ThreadPoolExecutor(max_workers=worker)) - add_AgentServiceServicer_to_server(AgentServiceServicer(), server) + add_AgentServiceServicer_to_server(AgentService(), server) server.add_insecure_port(f"[::]:{port}") server.start() From 3ac9f05aacc92fc18889df42d82f9d354c338ef6 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 30 May 2023 16:32:47 -0700 Subject: [PATCH 08/19] nit Signed-off-by: Kevin Su --- tests/flytekit/unit/extend/test_agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/flytekit/unit/extend/test_agent.py b/tests/flytekit/unit/extend/test_agent.py index d4b27494a9..3b4d8cf3f6 100644 --- a/tests/flytekit/unit/extend/test_agent.py +++ b/tests/flytekit/unit/extend/test_agent.py @@ -82,7 +82,7 @@ def delete(self, context: grpc.ServicerContext, job_id) -> TaskDeleteResponse: ) -def test_dummy_plugin(): +def test_dummy_agent(): ctx = MagicMock(spec=grpc.ServicerContext) p = AgentRegistry.get_agent(ctx, "dummy") assert p.create(ctx, "/tmp", dummy_template, task_inputs).job_id == dummy_id From 7cf586ac0ea79386f30e9fb74649d5332117eb38 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 30 May 2023 16:38:12 -0700 Subject: [PATCH 09/19] bump id Signed-off-by: Kevin Su --- doc-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc-requirements.txt b/doc-requirements.txt index 5264673f4f..6998063023 100644 --- a/doc-requirements.txt +++ b/doc-requirements.txt @@ -244,7 +244,7 @@ flask==2.2.3 # via mlflow flatbuffers==23.1.21 # via tensorflow -flyteidl==1.5.6 +flyteidl==1.5.9 # via flytekit fonttools==4.38.0 # via matplotlib From a4a1e8a31fcacd3c4c192276a280b75af2b1e3cc Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 31 May 2023 07:52:48 +0800 Subject: [PATCH 10/19] Update setup.py Co-authored-by: Yee Hing Tong --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index d8e2ac6a0d..96d0bd8637 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ }, install_requires=[ "googleapis-common-protos>=1.57", - "flyteidl==1.5.9", + "flyteidl>=1.5.9,<1.6.0", "wheel>=0.30.0,<1.0.0", "pandas>=1.0.0,<2.0.0", "pyarrow>=4.0.0,<11.0.0", From 3e51f4345456914ad59b571e4cdfcc1a1f3a2edf Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 30 May 2023 17:28:54 -0700 Subject: [PATCH 11/19] pin idl to 1.5.9 Signed-off-by: Kevin Su --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 96d0bd8637..d8e2ac6a0d 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ }, install_requires=[ "googleapis-common-protos>=1.57", - "flyteidl>=1.5.9,<1.6.0", + "flyteidl==1.5.9", "wheel>=0.30.0,<1.0.0", "pandas>=1.0.0,<2.0.0", "pyarrow>=4.0.0,<11.0.0", From 4bfae818534ec66393c2222e260c8bd749ee7429 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 31 May 2023 10:07:55 -0700 Subject: [PATCH 12/19] update idl Signed-off-by: Kevin Su --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index d8e2ac6a0d..65e689f248 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ }, install_requires=[ "googleapis-common-protos>=1.57", - "flyteidl==1.5.9", + "flyteidl>1.5.8,<1.6.0", "wheel>=0.30.0,<1.0.0", "pandas>=1.0.0,<2.0.0", "pyarrow>=4.0.0,<11.0.0", From 0be33f2bbff8a6faf083355ce8ce5ba89b644128 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 31 May 2023 13:34:46 -0700 Subject: [PATCH 13/19] bump idl Signed-off-by: Kevin Su --- .github/workflows/pythonbuild.yml | 4 +++ flytekit/extend/backend/agent_service.py | 28 +++++++-------- flytekit/extend/backend/base_agent.py | 16 ++++----- .../flytekitplugins/bigquery/agent.py | 14 ++++---- plugins/flytekit-bigquery/tests/test_agent.py | 2 +- tests/flytekit/unit/extend/test_agent.py | 36 +++++++++---------- 6 files changed, 52 insertions(+), 48 deletions(-) diff --git a/.github/workflows/pythonbuild.yml b/.github/workflows/pythonbuild.yml index 210605e493..b8d84d1757 100644 --- a/.github/workflows/pythonbuild.yml +++ b/.github/workflows/pythonbuild.yml @@ -39,6 +39,7 @@ jobs: - name: Install dependencies run: | make setup + pip install git+https://github.com/flyteorg/flyteidl@65ce204c166b29ce5457c94b362cd9c8b3a3964e pip freeze - name: Test with coverage run: | @@ -152,6 +153,7 @@ jobs: pip install -r requirements.txt if [ -f dev-requirements.txt ]; then pip install -r dev-requirements.txt; fi pip install -U https://github.com/flyteorg/flytekit/archive/${{ github.sha }}.zip#egg=flytekit + pip install git+https://github.com/flyteorg/flyteidl@65ce204c166b29ce5457c94b362cd9c8b3a3964e pip freeze - name: Test with coverage run: | @@ -182,6 +184,7 @@ jobs: run: | python -m pip install --upgrade pip pip install -r dev-requirements.in + pip install git+https://github.com/flyteorg/flyteidl@65ce204c166b29ce5457c94b362cd9c8b3a3964e - name: Lint run: | make lint @@ -203,6 +206,7 @@ jobs: run: | python -m pip install --upgrade pip==21.2.4 setuptools wheel pip install -r doc-requirements.txt + pip install git+https://github.com/flyteorg/flyteidl@65ce204c166b29ce5457c94b362cd9c8b3a3964e - name: Build the documentation run: | # TODO: Remove after buf migration is done and packages updated diff --git a/flytekit/extend/backend/agent_service.py b/flytekit/extend/backend/agent_service.py index dae7591d41..2baf205c35 100644 --- a/flytekit/extend/backend/agent_service.py +++ b/flytekit/extend/backend/agent_service.py @@ -1,14 +1,14 @@ import grpc -from flyteidl.service.agent_service_pb2 import ( +from flyteidl.admin.agent_pb2 import ( PERMANENT_FAILURE, - TaskCreateRequest, - TaskCreateResponse, - TaskDeleteRequest, - TaskDeleteResponse, - TaskGetRequest, - TaskGetResponse, + CreateTaskRequest, + CreateTaskResponse, + DeleteTaskRequest, + DeleteTaskResponse, + GetTaskRequest, + GetTaskResponse, ) -from flyteidl.service.agent_service_pb2_grpc import AgentServiceServicer +from flyteidl.service.agent_pb2_grpc import AgentServiceServicer from flytekit import logger from flytekit.extend.backend.base_agent import AgentRegistry @@ -17,35 +17,35 @@ class AgentService(AgentServiceServicer): - def CreateTask(self, request: TaskCreateRequest, context: grpc.ServicerContext) -> TaskCreateResponse: + def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerContext) -> CreateTaskResponse: try: tmp = TaskTemplate.from_flyte_idl(request.template) inputs = LiteralMap.from_flyte_idl(request.inputs) if request.inputs else None agent = AgentRegistry.get_agent(context, tmp.type) if agent is None: - return TaskCreateResponse() + return CreateTaskResponse() return agent.create(context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp) except Exception as e: logger.error(f"failed to create task with error {e}") context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"failed to create task with error {e}") - def GetTask(self, request: TaskGetRequest, context: grpc.ServicerContext) -> TaskGetResponse: + def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) -> GetTaskResponse: try: agent = AgentRegistry.get_agent(context, request.task_type) if agent is None: - return TaskGetResponse(state=PERMANENT_FAILURE) + return GetTaskResponse(state=PERMANENT_FAILURE) return agent.get(context=context, job_id=request.job_id) except Exception as e: logger.error(f"failed to get task with error {e}") context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"failed to get task with error {e}") - def DeleteTask(self, request: TaskDeleteRequest, context: grpc.ServicerContext) -> TaskDeleteResponse: + def DeleteTask(self, request: DeleteTaskRequest, context: grpc.ServicerContext) -> DeleteTaskResponse: try: agent = AgentRegistry.get_agent(context, request.task_type) if agent is None: - return TaskDeleteResponse() + return DeleteTaskResponse() return agent.delete(context=context, job_id=request.job_id) except Exception as e: logger.error(f"failed to delete task with error {e}") diff --git a/flytekit/extend/backend/base_agent.py b/flytekit/extend/backend/base_agent.py index 4dddd2998b..62d8bb9d45 100644 --- a/flytekit/extend/backend/base_agent.py +++ b/flytekit/extend/backend/base_agent.py @@ -2,16 +2,16 @@ from abc import ABC, abstractmethod import grpc -from flyteidl.core.tasks_pb2 import TaskTemplate -from flyteidl.service.agent_service_pb2 import ( +from flyteidl.admin.agent_pb2 import ( RETRYABLE_FAILURE, RUNNING, SUCCEEDED, + CreateTaskResponse, + DeleteTaskResponse, + GetTaskResponse, State, - TaskCreateResponse, - TaskDeleteResponse, - TaskGetResponse, ) +from flyteidl.core.tasks_pb2 import TaskTemplate from flytekit import logger from flytekit.models.literals import LiteralMap @@ -45,14 +45,14 @@ def create( output_prefix: str, task_template: TaskTemplate, inputs: typing.Optional[LiteralMap] = None, - ) -> TaskCreateResponse: + ) -> CreateTaskResponse: """ Return a Unique ID for the task that was created. It should return error code if the task creation failed. """ pass @abstractmethod - def get(self, context: grpc.ServicerContext, job_id: str) -> TaskGetResponse: + def get(self, context: grpc.ServicerContext, job_id: str) -> GetTaskResponse: """ Return the status of the task, and return the outputs in some cases. For example, bigquery job can't write the structured dataset to the output location, so it returns the output literals to the propeller, @@ -61,7 +61,7 @@ def get(self, context: grpc.ServicerContext, job_id: str) -> TaskGetResponse: pass @abstractmethod - def delete(self, context: grpc.ServicerContext, job_id: str) -> TaskDeleteResponse: + def delete(self, context: grpc.ServicerContext, job_id: str) -> DeleteTaskResponse: """ Delete the task. This call should be idempotent. """ diff --git a/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py b/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py index a4256db133..920c5c976e 100644 --- a/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py +++ b/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py @@ -2,7 +2,7 @@ from typing import Dict, Optional import grpc -from flyteidl.service.agent_service_pb2 import SUCCEEDED, TaskCreateResponse, TaskDeleteResponse, TaskGetResponse +from flyteidl.admin.agent_pb2 import SUCCEEDED, CreateTaskResponse, DeleteTaskResponse, GetTaskResponse from google.cloud import bigquery from flytekit import FlyteContextManager, StructuredDataset, logger @@ -35,7 +35,7 @@ def create( output_prefix: str, task_template: TaskTemplate, inputs: Optional[LiteralMap] = None, - ) -> TaskCreateResponse: + ) -> CreateTaskResponse: job_config = None if inputs: ctx = FlyteContextManager.current_context() @@ -56,9 +56,9 @@ def create( client = bigquery.Client(project=custom["ProjectID"], location=custom["Location"]) query_job = client.query(task_template.sql.statement, job_config=job_config) - return TaskCreateResponse(job_id=str(query_job.job_id)) + return CreateTaskResponse(job_id=str(query_job.job_id)) - def get(self, context: grpc.ServicerContext, job_id: str) -> TaskGetResponse: + def get(self, context: grpc.ServicerContext, job_id: str) -> GetTaskResponse: client = bigquery.Client() job = client.get_job(job_id) cur_state = convert_to_flyte_state(str(job.state)) @@ -78,12 +78,12 @@ def get(self, context: grpc.ServicerContext, job_id: str) -> TaskGetResponse: } ) - return TaskGetResponse(state=cur_state, outputs=res.to_flyte_idl()) + return GetTaskResponse(state=cur_state, outputs=res.to_flyte_idl()) - def delete(self, context: grpc.ServicerContext, job_id: str) -> TaskDeleteResponse: + def delete(self, context: grpc.ServicerContext, job_id: str) -> DeleteTaskResponse: client = bigquery.Client() client.cancel_job(job_id) - return TaskDeleteResponse() + return DeleteTaskResponse() AgentRegistry.register(BigQueryAgent()) diff --git a/plugins/flytekit-bigquery/tests/test_agent.py b/plugins/flytekit-bigquery/tests/test_agent.py index e9b86a6af7..0b49383130 100644 --- a/plugins/flytekit-bigquery/tests/test_agent.py +++ b/plugins/flytekit-bigquery/tests/test_agent.py @@ -3,7 +3,7 @@ from unittest.mock import MagicMock import grpc -from flyteidl.service.agent_service_pb2 import SUCCEEDED +from flyteidl.admin.agent_pb2 import SUCCEEDED import flytekit.models.interface as interface_models from flytekit.extend.backend.base_agent import AgentRegistry diff --git a/tests/flytekit/unit/extend/test_agent.py b/tests/flytekit/unit/extend/test_agent.py index 3b4d8cf3f6..af8426e2e3 100644 --- a/tests/flytekit/unit/extend/test_agent.py +++ b/tests/flytekit/unit/extend/test_agent.py @@ -3,15 +3,15 @@ from unittest.mock import MagicMock import grpc -from flyteidl.service.agent_service_pb2 import ( +from flyteidl.admin.agent_pb2 import ( PERMANENT_FAILURE, SUCCEEDED, - TaskCreateRequest, - TaskCreateResponse, - TaskDeleteRequest, - TaskDeleteResponse, - TaskGetRequest, - TaskGetResponse, + CreateTaskRequest, + CreateTaskResponse, + DeleteTaskRequest, + DeleteTaskResponse, + GetTaskRequest, + GetTaskResponse, ) import flytekit.models.interface as interface_models @@ -35,14 +35,14 @@ def create( output_prefix: str, task_template: TaskTemplate, inputs: typing.Optional[LiteralMap] = None, - ) -> TaskCreateResponse: - return TaskCreateResponse(job_id=dummy_id) + ) -> CreateTaskResponse: + return CreateTaskResponse(job_id=dummy_id) - def get(self, context: grpc.ServicerContext, job_id: str) -> TaskGetResponse: - return TaskGetResponse(state=SUCCEEDED) + def get(self, context: grpc.ServicerContext, job_id: str) -> GetTaskResponse: + return GetTaskResponse(state=SUCCEEDED) - def delete(self, context: grpc.ServicerContext, job_id) -> TaskDeleteResponse: - return TaskDeleteResponse() + def delete(self, context: grpc.ServicerContext, job_id) -> DeleteTaskResponse: + return DeleteTaskResponse() AgentRegistry.register(DummyAgent()) @@ -87,19 +87,19 @@ def test_dummy_agent(): p = AgentRegistry.get_agent(ctx, "dummy") assert p.create(ctx, "/tmp", dummy_template, task_inputs).job_id == dummy_id assert p.get(ctx, dummy_id).state == SUCCEEDED - assert p.delete(ctx, dummy_id) == TaskDeleteResponse() + assert p.delete(ctx, dummy_id) == DeleteTaskResponse() def test_agent_server(): service = AgentService() ctx = MagicMock(spec=grpc.ServicerContext) - request = TaskCreateRequest( + request = CreateTaskRequest( inputs=task_inputs.to_flyte_idl(), output_prefix="/tmp", template=dummy_template.to_flyte_idl() ) assert service.CreateTask(request, ctx).job_id == dummy_id - assert service.GetTask(TaskGetRequest(task_type="dummy", job_id=dummy_id), ctx).state == SUCCEEDED - assert service.DeleteTask(TaskDeleteRequest(task_type="dummy", job_id=dummy_id), ctx) == TaskDeleteResponse() + assert service.GetTask(GetTaskRequest(task_type="dummy", job_id=dummy_id), ctx).state == SUCCEEDED + assert service.DeleteTask(DeleteTaskRequest(task_type="dummy", job_id=dummy_id), ctx) == DeleteTaskResponse() - res = service.GetTask(TaskGetRequest(task_type="fake", job_id=dummy_id), ctx) + res = service.GetTask(GetTaskRequest(task_type="fake", job_id=dummy_id), ctx) assert res.state == PERMANENT_FAILURE From d1b077686264bcd6087a0ff191c2f93fd651e223 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 31 May 2023 13:37:55 -0700 Subject: [PATCH 14/19] nit Signed-off-by: Kevin Su --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 65e689f248..2df0a05e3f 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ }, install_requires=[ "googleapis-common-protos>=1.57", - "flyteidl>1.5.8,<1.6.0", + "flyteidl>1.5.6", "wheel>=0.30.0,<1.0.0", "pandas>=1.0.0,<2.0.0", "pyarrow>=4.0.0,<11.0.0", From 1c98782ca0fd38491b2d83b2acd7de6aae7283d2 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 31 May 2023 13:51:11 -0700 Subject: [PATCH 15/19] fix tests Signed-off-by: Kevin Su --- flytekit/clis/sdk_in_container/serve.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/clis/sdk_in_container/serve.py b/flytekit/clis/sdk_in_container/serve.py index ca03737036..c38fafa2f9 100644 --- a/flytekit/clis/sdk_in_container/serve.py +++ b/flytekit/clis/sdk_in_container/serve.py @@ -2,7 +2,7 @@ import click import grpc -from flyteidl.service.agent_service_pb2_grpc import add_AgentServiceServicer_to_server +from flyteidl.service.agent_pb2_grpc import add_AgentServiceServicer_to_server from flytekit.extend.backend.agent_service import AgentService From 0ed21abeefcdce153e3abeef849cf297415c6109 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 31 May 2023 17:18:41 -0700 Subject: [PATCH 16/19] update idl Signed-off-by: Kevin Su --- .github/workflows/pythonbuild.yml | 8 ++-- flytekit/extend/backend/agent_service.py | 7 ++-- flytekit/extend/backend/base_agent.py | 4 +- .../flytekitplugins/bigquery/agent.py | 25 +++++++---- plugins/flytekit-bigquery/tests/test_agent.py | 17 +++++--- tests/flytekit/unit/extend/test_agent.py | 42 +++++++++++++------ 6 files changed, 68 insertions(+), 35 deletions(-) diff --git a/.github/workflows/pythonbuild.yml b/.github/workflows/pythonbuild.yml index b8d84d1757..253deb240f 100644 --- a/.github/workflows/pythonbuild.yml +++ b/.github/workflows/pythonbuild.yml @@ -39,7 +39,7 @@ jobs: - name: Install dependencies run: | make setup - pip install git+https://github.com/flyteorg/flyteidl@65ce204c166b29ce5457c94b362cd9c8b3a3964e + pip install git+https://github.com/flyteorg/flyteidl@5ce84169e8377077923d6c25fe25d10796c1f973 pip freeze - name: Test with coverage run: | @@ -153,7 +153,7 @@ jobs: pip install -r requirements.txt if [ -f dev-requirements.txt ]; then pip install -r dev-requirements.txt; fi pip install -U https://github.com/flyteorg/flytekit/archive/${{ github.sha }}.zip#egg=flytekit - pip install git+https://github.com/flyteorg/flyteidl@65ce204c166b29ce5457c94b362cd9c8b3a3964e + pip install git+https://github.com/flyteorg/flyteidl@5ce84169e8377077923d6c25fe25d10796c1f973 pip freeze - name: Test with coverage run: | @@ -184,7 +184,7 @@ jobs: run: | python -m pip install --upgrade pip pip install -r dev-requirements.in - pip install git+https://github.com/flyteorg/flyteidl@65ce204c166b29ce5457c94b362cd9c8b3a3964e + pip install git+https://github.com/flyteorg/flyteidl@5ce84169e8377077923d6c25fe25d10796c1f973 - name: Lint run: | make lint @@ -206,7 +206,7 @@ jobs: run: | python -m pip install --upgrade pip==21.2.4 setuptools wheel pip install -r doc-requirements.txt - pip install git+https://github.com/flyteorg/flyteidl@65ce204c166b29ce5457c94b362cd9c8b3a3964e + pip install git+https://github.com/flyteorg/flyteidl@5ce84169e8377077923d6c25fe25d10796c1f973 - name: Build the documentation run: | # TODO: Remove after buf migration is done and packages updated diff --git a/flytekit/extend/backend/agent_service.py b/flytekit/extend/backend/agent_service.py index 2baf205c35..b3b13c4daf 100644 --- a/flytekit/extend/backend/agent_service.py +++ b/flytekit/extend/backend/agent_service.py @@ -7,6 +7,7 @@ DeleteTaskResponse, GetTaskRequest, GetTaskResponse, + resource, ) from flyteidl.service.agent_pb2_grpc import AgentServiceServicer @@ -34,8 +35,8 @@ def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) -> Get try: agent = AgentRegistry.get_agent(context, request.task_type) if agent is None: - return GetTaskResponse(state=PERMANENT_FAILURE) - return agent.get(context=context, job_id=request.job_id) + return GetTaskResponse(resource=resource(state=PERMANENT_FAILURE)) + return agent.get(context=context, resource_meta=request.resource_meta) except Exception as e: logger.error(f"failed to get task with error {e}") context.set_code(grpc.StatusCode.INTERNAL) @@ -46,7 +47,7 @@ def DeleteTask(self, request: DeleteTaskRequest, context: grpc.ServicerContext) agent = AgentRegistry.get_agent(context, request.task_type) if agent is None: return DeleteTaskResponse() - return agent.delete(context=context, job_id=request.job_id) + return agent.delete(context=context, resource_meta=request.resource_meta) except Exception as e: logger.error(f"failed to delete task with error {e}") context.set_code(grpc.StatusCode.INTERNAL) diff --git a/flytekit/extend/backend/base_agent.py b/flytekit/extend/backend/base_agent.py index 62d8bb9d45..0c93d2f60f 100644 --- a/flytekit/extend/backend/base_agent.py +++ b/flytekit/extend/backend/base_agent.py @@ -52,7 +52,7 @@ def create( pass @abstractmethod - def get(self, context: grpc.ServicerContext, job_id: str) -> GetTaskResponse: + def get(self, context: grpc.ServicerContext, resource_meta: bytes) -> GetTaskResponse: """ Return the status of the task, and return the outputs in some cases. For example, bigquery job can't write the structured dataset to the output location, so it returns the output literals to the propeller, @@ -61,7 +61,7 @@ def get(self, context: grpc.ServicerContext, job_id: str) -> GetTaskResponse: pass @abstractmethod - def delete(self, context: grpc.ServicerContext, job_id: str) -> DeleteTaskResponse: + def delete(self, context: grpc.ServicerContext, resource_meta: bytes) -> DeleteTaskResponse: """ Delete the task. This call should be idempotent. """ diff --git a/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py b/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py index 920c5c976e..45106c7a49 100644 --- a/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py +++ b/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py @@ -1,8 +1,10 @@ import datetime +import json +from dataclasses import asdict, dataclass from typing import Dict, Optional import grpc -from flyteidl.admin.agent_pb2 import SUCCEEDED, CreateTaskResponse, DeleteTaskResponse, GetTaskResponse +from flyteidl.admin.agent_pb2 import SUCCEEDED, CreateTaskResponse, DeleteTaskResponse, GetTaskResponse, resource from google.cloud import bigquery from flytekit import FlyteContextManager, StructuredDataset, logger @@ -25,6 +27,11 @@ } +@dataclass +class Metadata: + job_id: str + + class BigQueryAgent(AgentBase): def __init__(self): super().__init__(task_type="bigquery_query_job_task") @@ -56,11 +63,14 @@ def create( client = bigquery.Client(project=custom["ProjectID"], location=custom["Location"]) query_job = client.query(task_template.sql.statement, job_config=job_config) - return CreateTaskResponse(job_id=str(query_job.job_id)) + return CreateTaskResponse( + resource_meta=json.dumps(asdict(Metadata(job_id=str(query_job.job_id)))).encode("utf-8") + ) - def get(self, context: grpc.ServicerContext, job_id: str) -> GetTaskResponse: + def get(self, context: grpc.ServicerContext, resource_meta: bytes) -> GetTaskResponse: client = bigquery.Client() - job = client.get_job(job_id) + metadata = Metadata(**json.loads(resource_meta.decode("utf-8"))) + job = client.get_job(metadata.job_id) cur_state = convert_to_flyte_state(str(job.state)) res = None @@ -78,11 +88,12 @@ def get(self, context: grpc.ServicerContext, job_id: str) -> GetTaskResponse: } ) - return GetTaskResponse(state=cur_state, outputs=res.to_flyte_idl()) + return GetTaskResponse(resource=resource(state=cur_state, outputs=res.to_flyte_idl())) - def delete(self, context: grpc.ServicerContext, job_id: str) -> DeleteTaskResponse: + def delete(self, context: grpc.ServicerContext, resource_meta: bytes) -> DeleteTaskResponse: client = bigquery.Client() - client.cancel_job(job_id) + metadata = Metadata(**json.loads(resource_meta.decode("utf-8"))) + client.cancel_job(metadata.job_id) return DeleteTaskResponse() diff --git a/plugins/flytekit-bigquery/tests/test_agent.py b/plugins/flytekit-bigquery/tests/test_agent.py index 0b49383130..237c5c0718 100644 --- a/plugins/flytekit-bigquery/tests/test_agent.py +++ b/plugins/flytekit-bigquery/tests/test_agent.py @@ -1,9 +1,12 @@ +import json +from dataclasses import asdict from datetime import timedelta from unittest import mock from unittest.mock import MagicMock import grpc from flyteidl.admin.agent_pb2 import SUCCEEDED +from flytekitplugins.bigquery.agent import Metadata import flytekit.models.interface as interface_models from flytekit.extend.backend.base_agent import AgentRegistry @@ -39,7 +42,7 @@ def __init__(self): mock_instance.cancel_job.return_value = MockJob() ctx = MagicMock(spec=grpc.ServicerContext) - p = AgentRegistry.get_agent(ctx, "bigquery_query_job_task") + agent = AgentRegistry.get_agent(ctx, "bigquery_query_job_task") task_id = Identifier( resource_type=ResourceType.TASK, project="project", domain="domain", name="name", version="version" @@ -84,11 +87,13 @@ def __init__(self): sql=Sql("SELECT 1"), ) - assert p.create(ctx, "/tmp", dummy_template, task_inputs).job_id == job_id - res = p.get(ctx, job_id) - assert res.state == SUCCEEDED + metadata_bytes = json.dumps(asdict(Metadata(job_id="dummy_id"))).encode("utf-8") + assert agent.create(ctx, "/tmp", dummy_template, task_inputs).resource_meta == metadata_bytes + res = agent.get(ctx, metadata_bytes) + assert res.resource.state == SUCCEEDED assert ( - res.outputs.literals["results"].scalar.structured_dataset.uri == "bq://dummy_project:dummy_dataset.dummy_table" + res.resource.outputs.literals["results"].scalar.structured_dataset.uri + == "bq://dummy_project:dummy_dataset.dummy_table" ) - p.delete(ctx, job_id) + agent.delete(ctx, metadata_bytes) mock_instance.cancel_job.assert_called() diff --git a/tests/flytekit/unit/extend/test_agent.py b/tests/flytekit/unit/extend/test_agent.py index af8426e2e3..9fc1c4a9cd 100644 --- a/tests/flytekit/unit/extend/test_agent.py +++ b/tests/flytekit/unit/extend/test_agent.py @@ -1,4 +1,6 @@ +import json import typing +from dataclasses import asdict, dataclass from datetime import timedelta from unittest.mock import MagicMock @@ -12,6 +14,7 @@ DeleteTaskResponse, GetTaskRequest, GetTaskResponse, + resource, ) import flytekit.models.interface as interface_models @@ -25,6 +28,11 @@ dummy_id = "dummy_id" +@dataclass +class Metadata: + job_id: str + + class DummyAgent(AgentBase): def __init__(self): super().__init__(task_type="dummy") @@ -36,12 +44,12 @@ def create( task_template: TaskTemplate, inputs: typing.Optional[LiteralMap] = None, ) -> CreateTaskResponse: - return CreateTaskResponse(job_id=dummy_id) + return CreateTaskResponse(resource_meta=json.dumps(asdict(Metadata(job_id=dummy_id))).encode("utf-8")) - def get(self, context: grpc.ServicerContext, job_id: str) -> GetTaskResponse: - return GetTaskResponse(state=SUCCEEDED) + def get(self, context: grpc.ServicerContext, resource_meta: bytes) -> GetTaskResponse: + return GetTaskResponse(resource=resource(state=SUCCEEDED)) - def delete(self, context: grpc.ServicerContext, job_id) -> DeleteTaskResponse: + def delete(self, context: grpc.ServicerContext, resource_meta: bytes) -> DeleteTaskResponse: return DeleteTaskResponse() @@ -84,10 +92,11 @@ def delete(self, context: grpc.ServicerContext, job_id) -> DeleteTaskResponse: def test_dummy_agent(): ctx = MagicMock(spec=grpc.ServicerContext) - p = AgentRegistry.get_agent(ctx, "dummy") - assert p.create(ctx, "/tmp", dummy_template, task_inputs).job_id == dummy_id - assert p.get(ctx, dummy_id).state == SUCCEEDED - assert p.delete(ctx, dummy_id) == DeleteTaskResponse() + agent = AgentRegistry.get_agent(ctx, "dummy") + metadata_bytes = json.dumps(asdict(Metadata(job_id=dummy_id))).encode("utf-8") + assert agent.create(ctx, "/tmp", dummy_template, task_inputs).resource_meta == metadata_bytes + assert agent.get(ctx, metadata_bytes).resource.state == SUCCEEDED + assert agent.delete(ctx, metadata_bytes) == DeleteTaskResponse() def test_agent_server(): @@ -97,9 +106,16 @@ def test_agent_server(): inputs=task_inputs.to_flyte_idl(), output_prefix="/tmp", template=dummy_template.to_flyte_idl() ) - assert service.CreateTask(request, ctx).job_id == dummy_id - assert service.GetTask(GetTaskRequest(task_type="dummy", job_id=dummy_id), ctx).state == SUCCEEDED - assert service.DeleteTask(DeleteTaskRequest(task_type="dummy", job_id=dummy_id), ctx) == DeleteTaskResponse() + metadata_bytes = json.dumps(asdict(Metadata(job_id=dummy_id))).encode("utf-8") + assert service.CreateTask(request, ctx).resource_meta == metadata_bytes + assert ( + service.GetTask(GetTaskRequest(task_type="dummy", resource_meta=metadata_bytes), ctx).resource.state + == SUCCEEDED + ) + assert ( + service.DeleteTask(DeleteTaskRequest(task_type="dummy", resource_meta=metadata_bytes), ctx) + == DeleteTaskResponse() + ) - res = service.GetTask(GetTaskRequest(task_type="fake", job_id=dummy_id), ctx) - assert res.state == PERMANENT_FAILURE + res = service.GetTask(GetTaskRequest(task_type="fake", resource_meta=metadata_bytes), ctx) + assert res.resource.state == PERMANENT_FAILURE From 9355af38903a5706e66f091d3c4b9399d7d81580 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 31 May 2023 21:04:13 -0700 Subject: [PATCH 17/19] nit Signed-off-by: Kevin Su --- flytekit/clis/sdk_in_container/serve.py | 8 ++++---- flytekit/extend/backend/agent_service.py | 4 ++-- .../flytekit-bigquery/flytekitplugins/bigquery/agent.py | 4 ++-- tests/flytekit/unit/extend/test_agent.py | 4 ++-- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/flytekit/clis/sdk_in_container/serve.py b/flytekit/clis/sdk_in_container/serve.py index c38fafa2f9..e2d12cd994 100644 --- a/flytekit/clis/sdk_in_container/serve.py +++ b/flytekit/clis/sdk_in_container/serve.py @@ -6,7 +6,7 @@ from flytekit.extend.backend.agent_service import AgentService -_serve_help = """Start a grpc server for the external plugin service.""" +_serve_help = """Start a grpc server for the agent service.""" @click.command("serve", help=_serve_help) @@ -15,7 +15,7 @@ default="8000", is_flag=False, type=int, - help="Grpc port for the external plugin service", + help="Grpc port for the agent service", ) @click.option( "--worker", @@ -35,9 +35,9 @@ @click.pass_context def serve(_: click.Context, port, worker, timeout): """ - Start a grpc server for the external plugin service. + Start a grpc server for the agent service. """ - click.secho("Starting the external plugin service...", fg="blue") + click.secho("Starting the agent service...", fg="blue") server = grpc.server(futures.ThreadPoolExecutor(max_workers=worker)) add_AgentServiceServicer_to_server(AgentService(), server) diff --git a/flytekit/extend/backend/agent_service.py b/flytekit/extend/backend/agent_service.py index b3b13c4daf..f45b4cbd78 100644 --- a/flytekit/extend/backend/agent_service.py +++ b/flytekit/extend/backend/agent_service.py @@ -7,7 +7,7 @@ DeleteTaskResponse, GetTaskRequest, GetTaskResponse, - resource, + Resource, ) from flyteidl.service.agent_pb2_grpc import AgentServiceServicer @@ -35,7 +35,7 @@ def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) -> Get try: agent = AgentRegistry.get_agent(context, request.task_type) if agent is None: - return GetTaskResponse(resource=resource(state=PERMANENT_FAILURE)) + return GetTaskResponse(resource=Resource(state=PERMANENT_FAILURE)) return agent.get(context=context, resource_meta=request.resource_meta) except Exception as e: logger.error(f"failed to get task with error {e}") diff --git a/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py b/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py index 45106c7a49..0a9a22923e 100644 --- a/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py +++ b/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py @@ -4,7 +4,7 @@ from typing import Dict, Optional import grpc -from flyteidl.admin.agent_pb2 import SUCCEEDED, CreateTaskResponse, DeleteTaskResponse, GetTaskResponse, resource +from flyteidl.admin.agent_pb2 import SUCCEEDED, CreateTaskResponse, DeleteTaskResponse, GetTaskResponse, Resource from google.cloud import bigquery from flytekit import FlyteContextManager, StructuredDataset, logger @@ -88,7 +88,7 @@ def get(self, context: grpc.ServicerContext, resource_meta: bytes) -> GetTaskRes } ) - return GetTaskResponse(resource=resource(state=cur_state, outputs=res.to_flyte_idl())) + return GetTaskResponse(resource=Resource(state=cur_state, outputs=res.to_flyte_idl())) def delete(self, context: grpc.ServicerContext, resource_meta: bytes) -> DeleteTaskResponse: client = bigquery.Client() diff --git a/tests/flytekit/unit/extend/test_agent.py b/tests/flytekit/unit/extend/test_agent.py index 9fc1c4a9cd..4fbba075af 100644 --- a/tests/flytekit/unit/extend/test_agent.py +++ b/tests/flytekit/unit/extend/test_agent.py @@ -14,7 +14,7 @@ DeleteTaskResponse, GetTaskRequest, GetTaskResponse, - resource, + Resource, ) import flytekit.models.interface as interface_models @@ -47,7 +47,7 @@ def create( return CreateTaskResponse(resource_meta=json.dumps(asdict(Metadata(job_id=dummy_id))).encode("utf-8")) def get(self, context: grpc.ServicerContext, resource_meta: bytes) -> GetTaskResponse: - return GetTaskResponse(resource=resource(state=SUCCEEDED)) + return GetTaskResponse(resource=Resource(state=SUCCEEDED)) def delete(self, context: grpc.ServicerContext, resource_meta: bytes) -> DeleteTaskResponse: return DeleteTaskResponse() From 7138d6764740e3242b8d21deb74646adae7148fd Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 2 Jun 2023 15:40:01 -0700 Subject: [PATCH 18/19] update idl Signed-off-by: Kevin Su --- .github/workflows/pythonbuild.yml | 4 ---- doc-requirements.txt | 2 +- flytekit/clis/sdk_in_container/serve.py | 4 ++-- flytekit/extend/backend/agent_service.py | 4 ++-- 4 files changed, 5 insertions(+), 9 deletions(-) diff --git a/.github/workflows/pythonbuild.yml b/.github/workflows/pythonbuild.yml index 253deb240f..210605e493 100644 --- a/.github/workflows/pythonbuild.yml +++ b/.github/workflows/pythonbuild.yml @@ -39,7 +39,6 @@ jobs: - name: Install dependencies run: | make setup - pip install git+https://github.com/flyteorg/flyteidl@5ce84169e8377077923d6c25fe25d10796c1f973 pip freeze - name: Test with coverage run: | @@ -153,7 +152,6 @@ jobs: pip install -r requirements.txt if [ -f dev-requirements.txt ]; then pip install -r dev-requirements.txt; fi pip install -U https://github.com/flyteorg/flytekit/archive/${{ github.sha }}.zip#egg=flytekit - pip install git+https://github.com/flyteorg/flyteidl@5ce84169e8377077923d6c25fe25d10796c1f973 pip freeze - name: Test with coverage run: | @@ -184,7 +182,6 @@ jobs: run: | python -m pip install --upgrade pip pip install -r dev-requirements.in - pip install git+https://github.com/flyteorg/flyteidl@5ce84169e8377077923d6c25fe25d10796c1f973 - name: Lint run: | make lint @@ -206,7 +203,6 @@ jobs: run: | python -m pip install --upgrade pip==21.2.4 setuptools wheel pip install -r doc-requirements.txt - pip install git+https://github.com/flyteorg/flyteidl@5ce84169e8377077923d6c25fe25d10796c1f973 - name: Build the documentation run: | # TODO: Remove after buf migration is done and packages updated diff --git a/doc-requirements.txt b/doc-requirements.txt index 6998063023..c1e17439f9 100644 --- a/doc-requirements.txt +++ b/doc-requirements.txt @@ -244,7 +244,7 @@ flask==2.2.3 # via mlflow flatbuffers==23.1.21 # via tensorflow -flyteidl==1.5.9 +flyteidl==1.5.10 # via flytekit fonttools==4.38.0 # via matplotlib diff --git a/flytekit/clis/sdk_in_container/serve.py b/flytekit/clis/sdk_in_container/serve.py index e2d12cd994..c95754e6c6 100644 --- a/flytekit/clis/sdk_in_container/serve.py +++ b/flytekit/clis/sdk_in_container/serve.py @@ -2,7 +2,7 @@ import click import grpc -from flyteidl.service.agent_pb2_grpc import add_AgentServiceServicer_to_server +from flyteidl.service.agent_pb2_grpc import add_AsyncAgentServiceServicer_to_server from flytekit.extend.backend.agent_service import AgentService @@ -39,7 +39,7 @@ def serve(_: click.Context, port, worker, timeout): """ click.secho("Starting the agent service...", fg="blue") server = grpc.server(futures.ThreadPoolExecutor(max_workers=worker)) - add_AgentServiceServicer_to_server(AgentService(), server) + add_AsyncAgentServiceServicer_to_server(AgentService(), server) server.add_insecure_port(f"[::]:{port}") server.start() diff --git a/flytekit/extend/backend/agent_service.py b/flytekit/extend/backend/agent_service.py index f45b4cbd78..55f71959fe 100644 --- a/flytekit/extend/backend/agent_service.py +++ b/flytekit/extend/backend/agent_service.py @@ -9,7 +9,7 @@ GetTaskResponse, Resource, ) -from flyteidl.service.agent_pb2_grpc import AgentServiceServicer +from flyteidl.service.agent_pb2_grpc import AsyncAgentServiceServicer from flytekit import logger from flytekit.extend.backend.base_agent import AgentRegistry @@ -17,7 +17,7 @@ from flytekit.models.task import TaskTemplate -class AgentService(AgentServiceServicer): +class AgentService(AsyncAgentServiceServicer): def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerContext) -> CreateTaskResponse: try: tmp = TaskTemplate.from_flyte_idl(request.template) From b37042fdc41b4d94017b817035ecc08fa3ea9065 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 7 Jun 2023 13:29:35 -0700 Subject: [PATCH 19/19] nit Signed-off-by: Kevin Su --- .github/workflows/pythonpublish.yml | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pythonpublish.yml b/.github/workflows/pythonpublish.yml index cd0229efd8..e6a147e67e 100644 --- a/.github/workflows/pythonpublish.yml +++ b/.github/workflows/pythonpublish.yml @@ -161,7 +161,7 @@ jobs: registry: ghcr.io username: "${{ secrets.FLYTE_BOT_USERNAME }}" password: "${{ secrets.FLYTE_BOT_PAT }}" - - name: Prepare Fylte Agent Image Names + - name: Prepare Flyte Agent Image Names id: flyteagent-names uses: docker/metadata-action@v3 with: diff --git a/setup.py b/setup.py index 2df0a05e3f..4bae776373 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ }, install_requires=[ "googleapis-common-protos>=1.57", - "flyteidl>1.5.6", + "flyteidl>=1.5.10", "wheel>=0.30.0,<1.0.0", "pandas>=1.0.0,<2.0.0", "pyarrow>=4.0.0,<11.0.0",