From ee7672abeb7c1913a796adc47cf4f1919fd2305c Mon Sep 17 00:00:00 2001 From: Jacob Barzee Date: Fri, 21 Mar 2025 10:43:13 -0700 Subject: [PATCH 1/2] generate operator client and create new routing client --- Makefile | 4 + client/dual/client.go | 384 +++++++++++++++++++++++++++ client/dual/conv.go | 59 ++++ client/operator/lazy_client.go | 20 ++ client/operator/lazy_client_gen.go | 165 ++++++++++++ client/temporal_client.go | 41 ++- cmd/tools/genrpcwrappers/extra.go | 21 +- cmd/tools/genrpcwrappers/main.go | 17 +- mocks/client/temporal_client_mock.go | 39 ++- proxy/adminservice_test.go | 2 +- 10 files changed, 715 insertions(+), 37 deletions(-) create mode 100644 client/dual/client.go create mode 100644 client/dual/conv.go create mode 100644 client/operator/lazy_client.go create mode 100644 client/operator/lazy_client_gen.go diff --git a/Makefile b/Makefile index 706006d3..52c0e4cc 100644 --- a/Makefile +++ b/Makefile @@ -64,6 +64,10 @@ generate-rpcwrappers: cd $(GENRPCWRAPPERS_DIR); go run . -service frontend -license_file ../../../LICENSE cp $(GENRPCWRAPPERS_DIR)/lazy_client_gen.go client/frontend/lazy_client_gen.go + rm -rf $(GENRPCWRAPPERS_DIR)/*_gen.go + cd $(GENRPCWRAPPERS_DIR); go run . -service operator -license_file ../../../LICENSE + cp $(GENRPCWRAPPERS_DIR)/lazy_client_gen.go client/operator/lazy_client_gen.go + rm -rf ./cmd/tools/genrpcwrappers/*_gen.go cd $(GENRPCWRAPPERS_DIR); go run . -service admin -license_file ../../../LICENSE cp ./cmd/tools/genrpcwrappers/lazy_client_gen.go client/admin/lazy_client_gen.go diff --git a/client/dual/client.go b/client/dual/client.go new file mode 100644 index 00000000..da61adb2 --- /dev/null +++ b/client/dual/client.go @@ -0,0 +1,384 @@ +package dual + +import ( + "context" + + "github.com/temporalio/s2s-proxy/client" + adminclient "github.com/temporalio/s2s-proxy/client/admin" + operatorclient "github.com/temporalio/s2s-proxy/client/operator" + "google.golang.org/grpc" + + "go.temporal.io/api/operatorservice/v1" + "go.temporal.io/server/api/adminservice/v1" +) + +type ( + routingClient struct { + adminClient adminservice.AdminServiceClient + operatorClient operatorservice.OperatorServiceClient + } +) + +func NewRoutingClient(clientProvider client.ClientProvider) *routingClient { + return &routingClient{ + adminClient: adminclient.NewLazyClient(clientProvider), + operatorClient: operatorclient.NewLazyClient(clientProvider), + } +} + +func (c *routingClient) AddOrUpdateRemoteCluster( + ctx context.Context, + request *adminservice.AddOrUpdateRemoteClusterRequest, + opts ...grpc.CallOption, +) (*adminservice.AddOrUpdateRemoteClusterResponse, error) { + operatorRequest := convertAddOrUpdateRemoteClusterRequest(request) + operatorResponse, err := c.operatorClient.AddOrUpdateRemoteCluster(ctx, operatorRequest, opts...) + if err != nil { + return nil, err + } + return convertAddOrUpdateRemoteClusterResponse(operatorResponse), nil +} + +func (c *routingClient) AddSearchAttributes( + ctx context.Context, + request *adminservice.AddSearchAttributesRequest, + opts ...grpc.CallOption, +) (*adminservice.AddSearchAttributesResponse, error) { + return c.adminClient.AddSearchAttributes(ctx, request, opts...) +} + +func (c *routingClient) AddTasks( + ctx context.Context, + request *adminservice.AddTasksRequest, + opts ...grpc.CallOption, +) (*adminservice.AddTasksResponse, error) { + return c.adminClient.AddTasks(ctx, request, opts...) +} + +func (c *routingClient) CancelDLQJob( + ctx context.Context, + request *adminservice.CancelDLQJobRequest, + opts ...grpc.CallOption, +) (*adminservice.CancelDLQJobResponse, error) { + return c.adminClient.CancelDLQJob(ctx, request, opts...) +} + +func (c *routingClient) CloseShard( + ctx context.Context, + request *adminservice.CloseShardRequest, + opts ...grpc.CallOption, +) (*adminservice.CloseShardResponse, error) { + return c.adminClient.CloseShard(ctx, request, opts...) +} + +func (c *routingClient) DeepHealthCheck( + ctx context.Context, + request *adminservice.DeepHealthCheckRequest, + opts ...grpc.CallOption, +) (*adminservice.DeepHealthCheckResponse, error) { + return c.adminClient.DeepHealthCheck(ctx, request, opts...) +} + +func (c *routingClient) DeleteWorkflowExecution( + ctx context.Context, + request *adminservice.DeleteWorkflowExecutionRequest, + opts ...grpc.CallOption, +) (*adminservice.DeleteWorkflowExecutionResponse, error) { + return c.adminClient.DeleteWorkflowExecution(ctx, request, opts...) +} + +func (c *routingClient) DescribeCluster( + ctx context.Context, + request *adminservice.DescribeClusterRequest, + opts ...grpc.CallOption, +) (*adminservice.DescribeClusterResponse, error) { + return c.adminClient.DescribeCluster(ctx, request, opts...) +} + +func (c *routingClient) DescribeDLQJob( + ctx context.Context, + request *adminservice.DescribeDLQJobRequest, + opts ...grpc.CallOption, +) (*adminservice.DescribeDLQJobResponse, error) { + return c.adminClient.DescribeDLQJob(ctx, request, opts...) +} + +func (c *routingClient) DescribeHistoryHost( + ctx context.Context, + request *adminservice.DescribeHistoryHostRequest, + opts ...grpc.CallOption, +) (*adminservice.DescribeHistoryHostResponse, error) { + return c.adminClient.DescribeHistoryHost(ctx, request, opts...) +} + +func (c *routingClient) DescribeMutableState( + ctx context.Context, + request *adminservice.DescribeMutableStateRequest, + opts ...grpc.CallOption, +) (*adminservice.DescribeMutableStateResponse, error) { + return c.adminClient.DescribeMutableState(ctx, request, opts...) +} + +func (c *routingClient) DescribeTaskQueuePartition( + ctx context.Context, + request *adminservice.DescribeTaskQueuePartitionRequest, + opts ...grpc.CallOption, +) (*adminservice.DescribeTaskQueuePartitionResponse, error) { + return c.adminClient.DescribeTaskQueuePartition(ctx, request, opts...) +} + +func (c *routingClient) ForceUnloadTaskQueuePartition( + ctx context.Context, + request *adminservice.ForceUnloadTaskQueuePartitionRequest, + opts ...grpc.CallOption, +) (*adminservice.ForceUnloadTaskQueuePartitionResponse, error) { + return c.adminClient.ForceUnloadTaskQueuePartition(ctx, request, opts...) +} + +func (c *routingClient) GenerateLastHistoryReplicationTasks( + ctx context.Context, + request *adminservice.GenerateLastHistoryReplicationTasksRequest, + opts ...grpc.CallOption, +) (*adminservice.GenerateLastHistoryReplicationTasksResponse, error) { + return c.adminClient.GenerateLastHistoryReplicationTasks(ctx, request, opts...) +} + +func (c *routingClient) GetDLQMessages( + ctx context.Context, + request *adminservice.GetDLQMessagesRequest, + opts ...grpc.CallOption, +) (*adminservice.GetDLQMessagesResponse, error) { + return c.adminClient.GetDLQMessages(ctx, request, opts...) +} + +func (c *routingClient) GetDLQReplicationMessages( + ctx context.Context, + request *adminservice.GetDLQReplicationMessagesRequest, + opts ...grpc.CallOption, +) (*adminservice.GetDLQReplicationMessagesResponse, error) { + return c.adminClient.GetDLQReplicationMessages(ctx, request, opts...) +} + +func (c *routingClient) GetDLQTasks( + ctx context.Context, + request *adminservice.GetDLQTasksRequest, + opts ...grpc.CallOption, +) (*adminservice.GetDLQTasksResponse, error) { + return c.adminClient.GetDLQTasks(ctx, request, opts...) +} + +func (c *routingClient) GetNamespace( + ctx context.Context, + request *adminservice.GetNamespaceRequest, + opts ...grpc.CallOption, +) (*adminservice.GetNamespaceResponse, error) { + return c.adminClient.GetNamespace(ctx, request, opts...) +} + +func (c *routingClient) GetNamespaceReplicationMessages( + ctx context.Context, + request *adminservice.GetNamespaceReplicationMessagesRequest, + opts ...grpc.CallOption, +) (*adminservice.GetNamespaceReplicationMessagesResponse, error) { + return c.adminClient.GetNamespaceReplicationMessages(ctx, request, opts...) +} + +func (c *routingClient) GetReplicationMessages( + ctx context.Context, + request *adminservice.GetReplicationMessagesRequest, + opts ...grpc.CallOption, +) (*adminservice.GetReplicationMessagesResponse, error) { + return c.adminClient.GetReplicationMessages(ctx, request, opts...) +} + +func (c *routingClient) GetSearchAttributes( + ctx context.Context, + request *adminservice.GetSearchAttributesRequest, + opts ...grpc.CallOption, +) (*adminservice.GetSearchAttributesResponse, error) { + return c.adminClient.GetSearchAttributes(ctx, request, opts...) +} + +func (c *routingClient) GetShard( + ctx context.Context, + request *adminservice.GetShardRequest, + opts ...grpc.CallOption, +) (*adminservice.GetShardResponse, error) { + return c.adminClient.GetShard(ctx, request, opts...) +} + +func (c *routingClient) GetTaskQueueTasks( + ctx context.Context, + request *adminservice.GetTaskQueueTasksRequest, + opts ...grpc.CallOption, +) (*adminservice.GetTaskQueueTasksResponse, error) { + return c.adminClient.GetTaskQueueTasks(ctx, request, opts...) +} + +func (c *routingClient) GetWorkflowExecutionRawHistory( + ctx context.Context, + request *adminservice.GetWorkflowExecutionRawHistoryRequest, + opts ...grpc.CallOption, +) (*adminservice.GetWorkflowExecutionRawHistoryResponse, error) { + return c.adminClient.GetWorkflowExecutionRawHistory(ctx, request, opts...) +} + +func (c *routingClient) GetWorkflowExecutionRawHistoryV2( + ctx context.Context, + request *adminservice.GetWorkflowExecutionRawHistoryV2Request, + opts ...grpc.CallOption, +) (*adminservice.GetWorkflowExecutionRawHistoryV2Response, error) { + return c.adminClient.GetWorkflowExecutionRawHistoryV2(ctx, request, opts...) +} + +func (c *routingClient) ImportWorkflowExecution( + ctx context.Context, + request *adminservice.ImportWorkflowExecutionRequest, + opts ...grpc.CallOption, +) (*adminservice.ImportWorkflowExecutionResponse, error) { + return c.adminClient.ImportWorkflowExecution(ctx, request, opts...) +} + +func (c *routingClient) ListClusterMembers( + ctx context.Context, + request *adminservice.ListClusterMembersRequest, + opts ...grpc.CallOption, +) (*adminservice.ListClusterMembersResponse, error) { + return c.adminClient.ListClusterMembers(ctx, request, opts...) +} + +func (c *routingClient) ListClusters( + ctx context.Context, + request *adminservice.ListClustersRequest, + opts ...grpc.CallOption, +) (*adminservice.ListClustersResponse, error) { + operatorRequest := convertListClustersRequest(request) + operatorResponse, err := c.operatorClient.ListClusters(ctx, operatorRequest, opts...) + if err != nil { + return nil, err + } + return convertListClustersResponse(operatorResponse), nil +} +func (c *routingClient) ListHistoryTasks( + ctx context.Context, + request *adminservice.ListHistoryTasksRequest, + opts ...grpc.CallOption, +) (*adminservice.ListHistoryTasksResponse, error) { + return c.adminClient.ListHistoryTasks(ctx, request, opts...) +} + +func (c *routingClient) ListQueues( + ctx context.Context, + request *adminservice.ListQueuesRequest, + opts ...grpc.CallOption, +) (*adminservice.ListQueuesResponse, error) { + return c.adminClient.ListQueues(ctx, request, opts...) +} + +func (c *routingClient) MergeDLQMessages( + ctx context.Context, + request *adminservice.MergeDLQMessagesRequest, + opts ...grpc.CallOption, +) (*adminservice.MergeDLQMessagesResponse, error) { + return c.adminClient.MergeDLQMessages(ctx, request, opts...) +} + +func (c *routingClient) MergeDLQTasks( + ctx context.Context, + request *adminservice.MergeDLQTasksRequest, + opts ...grpc.CallOption, +) (*adminservice.MergeDLQTasksResponse, error) { + return c.adminClient.MergeDLQTasks(ctx, request, opts...) +} + +func (c *routingClient) PurgeDLQMessages( + ctx context.Context, + request *adminservice.PurgeDLQMessagesRequest, + opts ...grpc.CallOption, +) (*adminservice.PurgeDLQMessagesResponse, error) { + return c.adminClient.PurgeDLQMessages(ctx, request, opts...) +} + +func (c *routingClient) PurgeDLQTasks( + ctx context.Context, + request *adminservice.PurgeDLQTasksRequest, + opts ...grpc.CallOption, +) (*adminservice.PurgeDLQTasksResponse, error) { + return c.adminClient.PurgeDLQTasks(ctx, request, opts...) +} + +func (c *routingClient) ReapplyEvents( + ctx context.Context, + request *adminservice.ReapplyEventsRequest, + opts ...grpc.CallOption, +) (*adminservice.ReapplyEventsResponse, error) { + return c.adminClient.ReapplyEvents(ctx, request, opts...) +} + +func (c *routingClient) RebuildMutableState( + ctx context.Context, + request *adminservice.RebuildMutableStateRequest, + opts ...grpc.CallOption, +) (*adminservice.RebuildMutableStateResponse, error) { + return c.adminClient.RebuildMutableState(ctx, request, opts...) +} + +func (c *routingClient) RefreshWorkflowTasks( + ctx context.Context, + request *adminservice.RefreshWorkflowTasksRequest, + opts ...grpc.CallOption, +) (*adminservice.RefreshWorkflowTasksResponse, error) { + return c.adminClient.RefreshWorkflowTasks(ctx, request, opts...) +} + +func (c *routingClient) RemoveRemoteCluster( + ctx context.Context, + request *adminservice.RemoveRemoteClusterRequest, + opts ...grpc.CallOption, +) (*adminservice.RemoveRemoteClusterResponse, error) { + operatorRequest := convertRemoveRemoteClusterRequest(request) + operatorResponse, err := c.operatorClient.RemoveRemoteCluster(ctx, operatorRequest, opts...) + if err != nil { + return nil, err + } + return convertRemoveRemoteClusterResponse(operatorResponse), nil +} + +func (c *routingClient) RemoveSearchAttributes( + ctx context.Context, + request *adminservice.RemoveSearchAttributesRequest, + opts ...grpc.CallOption, +) (*adminservice.RemoveSearchAttributesResponse, error) { + return c.adminClient.RemoveSearchAttributes(ctx, request, opts...) +} + +func (c *routingClient) RemoveTask( + ctx context.Context, + request *adminservice.RemoveTaskRequest, + opts ...grpc.CallOption, +) (*adminservice.RemoveTaskResponse, error) { + return c.adminClient.RemoveTask(ctx, request, opts...) +} + +func (c *routingClient) ResendReplicationTasks( + ctx context.Context, + request *adminservice.ResendReplicationTasksRequest, + opts ...grpc.CallOption, +) (*adminservice.ResendReplicationTasksResponse, error) { + return c.adminClient.ResendReplicationTasks(ctx, request, opts...) +} + +func (c *routingClient) SyncWorkflowState( + ctx context.Context, + request *adminservice.SyncWorkflowStateRequest, + opts ...grpc.CallOption, +) (*adminservice.SyncWorkflowStateResponse, error) { + return c.adminClient.SyncWorkflowState(ctx, request, opts...) +} + +func (c *routingClient) StreamWorkflowReplicationMessages( + ctx context.Context, + opts ...grpc.CallOption, +) (adminservice.AdminService_StreamWorkflowReplicationMessagesClient, error) { + return c.adminClient.StreamWorkflowReplicationMessages(ctx, opts...) +} diff --git a/client/dual/conv.go b/client/dual/conv.go new file mode 100644 index 00000000..349d1d41 --- /dev/null +++ b/client/dual/conv.go @@ -0,0 +1,59 @@ +package dual + +import ( + "go.temporal.io/api/operatorservice/v1" + "go.temporal.io/server/api/adminservice/v1" + "go.temporal.io/server/api/persistence/v1" +) + +// ListClusters converter functions +func convertListClustersRequest(request *adminservice.ListClustersRequest) *operatorservice.ListClustersRequest { + return &operatorservice.ListClustersRequest{ + PageSize: request.GetPageSize(), + NextPageToken: request.GetNextPageToken(), + } +} + +func convertListClustersResponse(operatorResponse *operatorservice.ListClustersResponse) *adminservice.ListClustersResponse { + adminResponse := &adminservice.ListClustersResponse{ + NextPageToken: operatorResponse.GetNextPageToken(), + } + for _, cluster := range operatorResponse.GetClusters() { + adminResponse.Clusters = append(adminResponse.Clusters, &persistence.ClusterMetadata{ + ClusterName: cluster.GetClusterName(), + ClusterId: cluster.GetClusterId(), + ClusterAddress: cluster.GetAddress(), + HttpAddress: cluster.GetHttpAddress(), + InitialFailoverVersion: cluster.GetInitialFailoverVersion(), + HistoryShardCount: cluster.GetHistoryShardCount(), + IsConnectionEnabled: cluster.GetIsConnectionEnabled(), + // There are more fields in the response, but we can only support the ones that the operator service offers + }) + } + return adminResponse +} + +// AddOrUpdateRemoteCluster converter functions +func convertAddOrUpdateRemoteClusterRequest(request *adminservice.AddOrUpdateRemoteClusterRequest) *operatorservice.AddOrUpdateRemoteClusterRequest { + return &operatorservice.AddOrUpdateRemoteClusterRequest{ + FrontendAddress: request.GetFrontendAddress(), + FrontendHttpAddress: request.GetFrontendHttpAddress(), // Deprecated, used for backward compatibility + EnableRemoteClusterConnection: request.GetEnableRemoteClusterConnection(), + } +} + +func convertAddOrUpdateRemoteClusterResponse(operatorResponse *operatorservice.AddOrUpdateRemoteClusterResponse) *adminservice.AddOrUpdateRemoteClusterResponse { + return &adminservice.AddOrUpdateRemoteClusterResponse{} +} + +// RemoveRemoteCluster converter functions +func convertRemoveRemoteClusterRequest(request *adminservice.RemoveRemoteClusterRequest) *operatorservice.RemoveRemoteClusterRequest { + return &operatorservice.RemoveRemoteClusterRequest{ + ClusterName: request.GetClusterName(), + } +} + +func convertRemoveRemoteClusterResponse(operatorResponse *operatorservice.RemoveRemoteClusterResponse) *adminservice.RemoveRemoteClusterResponse { + // RemoveRemoteCluster returns an empty response + return &adminservice.RemoveRemoteClusterResponse{} +} diff --git a/client/operator/lazy_client.go b/client/operator/lazy_client.go new file mode 100644 index 00000000..19c9b2ac --- /dev/null +++ b/client/operator/lazy_client.go @@ -0,0 +1,20 @@ +package operator + +import ( + "github.com/temporalio/s2s-proxy/client" + "go.temporal.io/api/operatorservice/v1" +) + +type ( + lazyClient struct { + clientProvider client.ClientProvider + } +) + +func NewLazyClient( + clientProvider client.ClientProvider, +) operatorservice.OperatorServiceClient { + return &lazyClient{ + clientProvider: clientProvider, + } +} diff --git a/client/operator/lazy_client_gen.go b/client/operator/lazy_client_gen.go new file mode 100644 index 00000000..f8823aa2 --- /dev/null +++ b/client/operator/lazy_client_gen.go @@ -0,0 +1,165 @@ +// Code generated by cmd/tools/genrpcwrappers. DO NOT EDIT. + +package operator + +import ( + "context" + "go.temporal.io/api/operatorservice/v1" + "google.golang.org/grpc" +) + +func (c *lazyClient) AddOrUpdateRemoteCluster( + ctx context.Context, + request *operatorservice.AddOrUpdateRemoteClusterRequest, + opts ...grpc.CallOption, +) (*operatorservice.AddOrUpdateRemoteClusterResponse, error) { + var resp *operatorservice.AddOrUpdateRemoteClusterResponse + client, err := c.clientProvider.GetOperatorClient() + if err != nil { + return resp, err + } + return client.AddOrUpdateRemoteCluster(ctx, request, opts...) +} + +func (c *lazyClient) AddSearchAttributes( + ctx context.Context, + request *operatorservice.AddSearchAttributesRequest, + opts ...grpc.CallOption, +) (*operatorservice.AddSearchAttributesResponse, error) { + var resp *operatorservice.AddSearchAttributesResponse + client, err := c.clientProvider.GetOperatorClient() + if err != nil { + return resp, err + } + return client.AddSearchAttributes(ctx, request, opts...) +} + +func (c *lazyClient) CreateNexusEndpoint( + ctx context.Context, + request *operatorservice.CreateNexusEndpointRequest, + opts ...grpc.CallOption, +) (*operatorservice.CreateNexusEndpointResponse, error) { + var resp *operatorservice.CreateNexusEndpointResponse + client, err := c.clientProvider.GetOperatorClient() + if err != nil { + return resp, err + } + return client.CreateNexusEndpoint(ctx, request, opts...) +} + +func (c *lazyClient) DeleteNamespace( + ctx context.Context, + request *operatorservice.DeleteNamespaceRequest, + opts ...grpc.CallOption, +) (*operatorservice.DeleteNamespaceResponse, error) { + var resp *operatorservice.DeleteNamespaceResponse + client, err := c.clientProvider.GetOperatorClient() + if err != nil { + return resp, err + } + return client.DeleteNamespace(ctx, request, opts...) +} + +func (c *lazyClient) DeleteNexusEndpoint( + ctx context.Context, + request *operatorservice.DeleteNexusEndpointRequest, + opts ...grpc.CallOption, +) (*operatorservice.DeleteNexusEndpointResponse, error) { + var resp *operatorservice.DeleteNexusEndpointResponse + client, err := c.clientProvider.GetOperatorClient() + if err != nil { + return resp, err + } + return client.DeleteNexusEndpoint(ctx, request, opts...) +} + +func (c *lazyClient) GetNexusEndpoint( + ctx context.Context, + request *operatorservice.GetNexusEndpointRequest, + opts ...grpc.CallOption, +) (*operatorservice.GetNexusEndpointResponse, error) { + var resp *operatorservice.GetNexusEndpointResponse + client, err := c.clientProvider.GetOperatorClient() + if err != nil { + return resp, err + } + return client.GetNexusEndpoint(ctx, request, opts...) +} + +func (c *lazyClient) ListClusters( + ctx context.Context, + request *operatorservice.ListClustersRequest, + opts ...grpc.CallOption, +) (*operatorservice.ListClustersResponse, error) { + var resp *operatorservice.ListClustersResponse + client, err := c.clientProvider.GetOperatorClient() + if err != nil { + return resp, err + } + return client.ListClusters(ctx, request, opts...) +} + +func (c *lazyClient) ListNexusEndpoints( + ctx context.Context, + request *operatorservice.ListNexusEndpointsRequest, + opts ...grpc.CallOption, +) (*operatorservice.ListNexusEndpointsResponse, error) { + var resp *operatorservice.ListNexusEndpointsResponse + client, err := c.clientProvider.GetOperatorClient() + if err != nil { + return resp, err + } + return client.ListNexusEndpoints(ctx, request, opts...) +} + +func (c *lazyClient) ListSearchAttributes( + ctx context.Context, + request *operatorservice.ListSearchAttributesRequest, + opts ...grpc.CallOption, +) (*operatorservice.ListSearchAttributesResponse, error) { + var resp *operatorservice.ListSearchAttributesResponse + client, err := c.clientProvider.GetOperatorClient() + if err != nil { + return resp, err + } + return client.ListSearchAttributes(ctx, request, opts...) +} + +func (c *lazyClient) RemoveRemoteCluster( + ctx context.Context, + request *operatorservice.RemoveRemoteClusterRequest, + opts ...grpc.CallOption, +) (*operatorservice.RemoveRemoteClusterResponse, error) { + var resp *operatorservice.RemoveRemoteClusterResponse + client, err := c.clientProvider.GetOperatorClient() + if err != nil { + return resp, err + } + return client.RemoveRemoteCluster(ctx, request, opts...) +} + +func (c *lazyClient) RemoveSearchAttributes( + ctx context.Context, + request *operatorservice.RemoveSearchAttributesRequest, + opts ...grpc.CallOption, +) (*operatorservice.RemoveSearchAttributesResponse, error) { + var resp *operatorservice.RemoveSearchAttributesResponse + client, err := c.clientProvider.GetOperatorClient() + if err != nil { + return resp, err + } + return client.RemoveSearchAttributes(ctx, request, opts...) +} + +func (c *lazyClient) UpdateNexusEndpoint( + ctx context.Context, + request *operatorservice.UpdateNexusEndpointRequest, + opts ...grpc.CallOption, +) (*operatorservice.UpdateNexusEndpointResponse, error) { + var resp *operatorservice.UpdateNexusEndpointResponse + client, err := c.clientProvider.GetOperatorClient() + if err != nil { + return resp, err + } + return client.UpdateNexusEndpoint(ctx, request, opts...) +} diff --git a/client/temporal_client.go b/client/temporal_client.go index c252038c..748e8f8a 100644 --- a/client/temporal_client.go +++ b/client/temporal_client.go @@ -7,6 +7,7 @@ import ( "github.com/temporalio/s2s-proxy/config" "github.com/temporalio/s2s-proxy/transport" + "go.temporal.io/api/operatorservice/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/server/api/adminservice/v1" "go.temporal.io/server/common/log" @@ -15,7 +16,8 @@ import ( type ( // ClientFactory can be used to create RPC clients for temporal services ClientFactory interface { - NewRemoteAdminClient(clientConfig config.ProxyClientConfig) (adminservice.AdminServiceClient, error) + NewRemoteAdminClient() (adminservice.AdminServiceClient, error) + NewRemoteOperatorClient() (operatorservice.OperatorServiceClient, error) NewRemoteWorkflowServiceClient(clientConfig config.ProxyClientConfig) (workflowservice.WorkflowServiceClient, error) } @@ -26,6 +28,7 @@ type ( ClientProvider interface { GetAdminClient() (adminservice.AdminServiceClient, error) + GetOperatorClient() (operatorservice.OperatorServiceClient, error) GetWorkflowServiceClient() (workflowservice.WorkflowServiceClient, error) } @@ -37,6 +40,9 @@ type ( adminClientsLock sync.Mutex adminClient adminservice.AdminServiceClient + operatorClientLock sync.Mutex + operatorClient operatorservice.OperatorServiceClient + workflowserviceClientsLock sync.Mutex workflowserviceClient workflowservice.WorkflowServiceClient } @@ -61,7 +67,7 @@ func (c *clientProvider) GetAdminClient() (adminservice.AdminServiceClient, erro defer c.adminClientsLock.Unlock() c.logger.Info(fmt.Sprintf("Create adminclient with config: %v", c.clientConfig)) - adminClient, err := c.clientFactory.NewRemoteAdminClient(c.clientConfig) + adminClient, err := c.clientFactory.NewRemoteAdminClient() if err != nil { return nil, err } @@ -72,6 +78,24 @@ func (c *clientProvider) GetAdminClient() (adminservice.AdminServiceClient, erro return c.adminClient, nil } +func (c *clientProvider) GetOperatorClient() (operatorservice.OperatorServiceClient, error) { + if c.operatorClient == nil { + // Create operator client + c.operatorClientLock.Lock() + defer c.operatorClientLock.Unlock() + + c.logger.Info(fmt.Sprintf("Create operatorclient with config: %v", c.clientConfig)) + operatorClient, err := c.clientFactory.NewRemoteOperatorClient() + if err != nil { + return nil, err + } + + c.operatorClient = operatorClient + } + + return c.operatorClient, nil +} + func (c *clientProvider) GetWorkflowServiceClient() (workflowservice.WorkflowServiceClient, error) { if c.workflowserviceClient == nil { // Create admin client @@ -101,9 +125,7 @@ func NewClientFactory( } } -func (cf *clientFactory) NewRemoteAdminClient( - clientConfig config.ProxyClientConfig, // TODO: not used. remove it. -) (adminservice.AdminServiceClient, error) { +func (cf *clientFactory) NewRemoteAdminClient() (adminservice.AdminServiceClient, error) { connection, err := cf.clientTransport.Connect() if err != nil { return nil, err @@ -112,6 +134,15 @@ func (cf *clientFactory) NewRemoteAdminClient( return adminservice.NewAdminServiceClient(connection), nil } +func (cf *clientFactory) NewRemoteOperatorClient() (operatorservice.OperatorServiceClient, error) { + connection, err := cf.clientTransport.Connect() + if err != nil { + return nil, err + } + + return operatorservice.NewOperatorServiceClient(connection), nil +} + func (cf *clientFactory) NewRemoteWorkflowServiceClient( clientConfig config.ProxyClientConfig, ) (workflowservice.WorkflowServiceClient, error) { diff --git a/cmd/tools/genrpcwrappers/extra.go b/cmd/tools/genrpcwrappers/extra.go index 87e02270..9921e26d 100644 --- a/cmd/tools/genrpcwrappers/extra.go +++ b/cmd/tools/genrpcwrappers/extra.go @@ -5,6 +5,7 @@ import "io" var ( lazyClientMap = map[string]string{ "admin": "GetAdminClient", + "operator": "GetOperatorClient", "frontend": "GetWorkflowServiceClient", } ) @@ -14,26 +15,6 @@ func init() { ignoreMethod["aclServer.admin.StreamWorkflowReplicationMessages"] = true } -func generateACLServer(w io.Writer, service service) { - writeTemplatedCode(w, service, ` -package {{.ServiceName}} -import ( - "context" - "{{.ServicePackagePath}}" - "google.golang.org/grpc" -) -`) - - writeTemplatedMethods(w, service, "aclServer", ` -func (s *adminServiceAuth) {{.Method}}(ctx context.Context, in0 {{.RequestType}}) ({{.ResponseType}}, error) { - if !s.access.IsAllowed("{{.Method}}") { - return nil, status.Errorf(codes.PermissionDenied, "Calling method {{.Method}} is not allowed.") - } - return s.delegate.{{.Method}}(ctx, in0) -} -`) -} - func generateLazyClient(w io.Writer, service service) { writeTemplatedCode(w, service, ` package {{.ServiceName}} diff --git a/cmd/tools/genrpcwrappers/main.go b/cmd/tools/genrpcwrappers/main.go index c81b76a0..1398d457 100644 --- a/cmd/tools/genrpcwrappers/main.go +++ b/cmd/tools/genrpcwrappers/main.go @@ -37,6 +37,7 @@ import ( "strings" "text/template" + "go.temporal.io/api/operatorservice/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" "golang.org/x/text/cases" @@ -72,12 +73,17 @@ var ( { name: "frontend", clientType: reflect.TypeOf((*workflowservice.WorkflowServiceClient)(nil)), - clientGenerator: generateFrontendOrAdminClient, + clientGenerator: generateFrontendOrAdminOrOperatorClient, }, { name: "admin", clientType: reflect.TypeOf((*adminservice.AdminServiceClient)(nil)), - clientGenerator: generateFrontendOrAdminClient, + clientGenerator: generateFrontendOrAdminOrOperatorClient, + }, + { + name: "operator", + clientType: reflect.TypeOf((*operatorservice.OperatorServiceClient)(nil)), + clientGenerator: generateFrontendOrAdminOrOperatorClient, }, { name: "history", @@ -464,7 +470,7 @@ func writeTemplatedMethods(w io.Writer, service service, impl string, text strin } } -func generateFrontendOrAdminClient(w io.Writer, service service) { +func generateFrontendOrAdminOrOperatorClient(w io.Writer, service service) { writeTemplatedCode(w, service, ` package {{.ServiceName}} @@ -669,10 +675,7 @@ func main() { callWithFile(generateRetryableClient, svc, "retryable_client", licenseText) // s2s-proxy customizations - if svc.name == "admin" || svc.name == "frontend" { + if svc.name == "admin" || svc.name == "operator" || svc.name == "frontend" { callWithFile(generateLazyClient, svc, "lazy_client", "") } - if svc.name == "admin" { - callWithFile(generateACLServer, svc, "acl_server", "") - } } diff --git a/mocks/client/temporal_client_mock.go b/mocks/client/temporal_client_mock.go index e4bc4376..135f7d8b 100644 --- a/mocks/client/temporal_client_mock.go +++ b/mocks/client/temporal_client_mock.go @@ -13,6 +13,7 @@ import ( reflect "reflect" config "github.com/temporalio/s2s-proxy/config" + operatorservice "go.temporal.io/api/operatorservice/v1" workflowservice "go.temporal.io/api/workflowservice/v1" adminservice "go.temporal.io/server/api/adminservice/v1" gomock "go.uber.org/mock/gomock" @@ -42,18 +43,33 @@ func (m *MockClientFactory) EXPECT() *MockClientFactoryMockRecorder { } // NewRemoteAdminClient mocks base method. -func (m *MockClientFactory) NewRemoteAdminClient(clientConfig config.ProxyClientConfig) (adminservice.AdminServiceClient, error) { +func (m *MockClientFactory) NewRemoteAdminClient() (adminservice.AdminServiceClient, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "NewRemoteAdminClient", clientConfig) + ret := m.ctrl.Call(m, "NewRemoteAdminClient") ret0, _ := ret[0].(adminservice.AdminServiceClient) ret1, _ := ret[1].(error) return ret0, ret1 } // NewRemoteAdminClient indicates an expected call of NewRemoteAdminClient. -func (mr *MockClientFactoryMockRecorder) NewRemoteAdminClient(clientConfig any) *gomock.Call { +func (mr *MockClientFactoryMockRecorder) NewRemoteAdminClient() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewRemoteAdminClient", reflect.TypeOf((*MockClientFactory)(nil).NewRemoteAdminClient), clientConfig) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewRemoteAdminClient", reflect.TypeOf((*MockClientFactory)(nil).NewRemoteAdminClient)) +} + +// NewRemoteOperatorClient mocks base method. +func (m *MockClientFactory) NewRemoteOperatorClient() (operatorservice.OperatorServiceClient, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewRemoteOperatorClient") + ret0, _ := ret[0].(operatorservice.OperatorServiceClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewRemoteOperatorClient indicates an expected call of NewRemoteOperatorClient. +func (mr *MockClientFactoryMockRecorder) NewRemoteOperatorClient() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewRemoteOperatorClient", reflect.TypeOf((*MockClientFactory)(nil).NewRemoteOperatorClient)) } // NewRemoteWorkflowServiceClient mocks base method. @@ -109,6 +125,21 @@ func (mr *MockClientProviderMockRecorder) GetAdminClient() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAdminClient", reflect.TypeOf((*MockClientProvider)(nil).GetAdminClient)) } +// GetOperatorClient mocks base method. +func (m *MockClientProvider) GetOperatorClient() (operatorservice.OperatorServiceClient, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetOperatorClient") + ret0, _ := ret[0].(operatorservice.OperatorServiceClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetOperatorClient indicates an expected call of GetOperatorClient. +func (mr *MockClientProviderMockRecorder) GetOperatorClient() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOperatorClient", reflect.TypeOf((*MockClientProvider)(nil).GetOperatorClient)) +} + // GetWorkflowServiceClient mocks base method. func (m *MockClientProvider) GetWorkflowServiceClient() (workflowservice.WorkflowServiceClient, error) { m.ctrl.T.Helper() diff --git a/proxy/adminservice_test.go b/proxy/adminservice_test.go index 28e93557..10a4542d 100644 --- a/proxy/adminservice_test.go +++ b/proxy/adminservice_test.go @@ -40,7 +40,7 @@ func (s *adminserviceSuite) newAdminServiceProxyServer(opts proxyOptions) admins TLS: encryption.ClientTLSConfig{}, }, } - s.clientFactoryMock.EXPECT().NewRemoteAdminClient(cfg).Return(s.adminClientMock, nil).Times(1) + s.clientFactoryMock.EXPECT().NewRemoteAdminClient().Return(s.adminClientMock, nil).Times(1) return NewAdminServiceProxyServer("test-service-name", cfg, s.clientFactoryMock, opts, log.NewTestLogger()) } From 9818f53d4d7efc8e2ab7ca69a056aac0261600bd Mon Sep 17 00:00:00 2001 From: Jacob Barzee Date: Mon, 24 Mar 2025 14:02:16 -0700 Subject: [PATCH 2/2] point Search attribute routes to operatorService --- client/dual/client.go | 20 ++++++++++-- client/dual/conv.go | 72 ++++++++++++++++++++++++++++++++++--------- proxy/adminservice.go | 4 +-- 3 files changed, 78 insertions(+), 18 deletions(-) diff --git a/client/dual/client.go b/client/dual/client.go index da61adb2..d2c490c4 100644 --- a/client/dual/client.go +++ b/client/dual/client.go @@ -45,6 +45,12 @@ func (c *routingClient) AddSearchAttributes( opts ...grpc.CallOption, ) (*adminservice.AddSearchAttributesResponse, error) { return c.adminClient.AddSearchAttributes(ctx, request, opts...) + operatorRequest := convertAddSearchAttributesRequest(request) + operatorResponse, err := c.operatorClient.AddSearchAttributes(ctx, operatorRequest, opts...) + if err != nil { + return nil, err + } + return convertAddSearchAttributesResponse(operatorResponse), nil } func (c *routingClient) AddTasks( @@ -196,7 +202,12 @@ func (c *routingClient) GetSearchAttributes( request *adminservice.GetSearchAttributesRequest, opts ...grpc.CallOption, ) (*adminservice.GetSearchAttributesResponse, error) { - return c.adminClient.GetSearchAttributes(ctx, request, opts...) + operatorRequest := convertGetSearchAttributesRequest(request) + operatorResponse, err := c.operatorClient.ListSearchAttributes(ctx, operatorRequest, opts...) + if err != nil { + return nil, err + } + return convertGetSearchAttributesResponse(operatorResponse), nil } func (c *routingClient) GetShard( @@ -349,7 +360,12 @@ func (c *routingClient) RemoveSearchAttributes( request *adminservice.RemoveSearchAttributesRequest, opts ...grpc.CallOption, ) (*adminservice.RemoveSearchAttributesResponse, error) { - return c.adminClient.RemoveSearchAttributes(ctx, request, opts...) + operatorRequest := convertRemoveSearchAttributesRequest(request) + operatorResponse, err := c.operatorClient.RemoveSearchAttributes(ctx, operatorRequest, opts...) + if err != nil { + return nil, err + } + return convertRemoveSearchAttributesResponse(operatorResponse), nil } func (c *routingClient) RemoveTask( diff --git a/client/dual/conv.go b/client/dual/conv.go index 349d1d41..cc8b2619 100644 --- a/client/dual/conv.go +++ b/client/dual/conv.go @@ -6,6 +6,63 @@ import ( "go.temporal.io/server/api/persistence/v1" ) +// AddOrUpdateRemoteCluster converter functions +func convertAddOrUpdateRemoteClusterRequest(request *adminservice.AddOrUpdateRemoteClusterRequest) *operatorservice.AddOrUpdateRemoteClusterRequest { + return &operatorservice.AddOrUpdateRemoteClusterRequest{ + FrontendAddress: request.GetFrontendAddress(), + FrontendHttpAddress: request.GetFrontendHttpAddress(), // Deprecated, used for backward compatibility + EnableRemoteClusterConnection: request.GetEnableRemoteClusterConnection(), + } +} + +func convertAddOrUpdateRemoteClusterResponse(_ *operatorservice.AddOrUpdateRemoteClusterResponse) *adminservice.AddOrUpdateRemoteClusterResponse { + return &adminservice.AddOrUpdateRemoteClusterResponse{} +} + +// AddSearchAttributes converter functions +func convertAddSearchAttributesRequest(request *adminservice.AddSearchAttributesRequest) *operatorservice.AddSearchAttributesRequest { + return &operatorservice.AddSearchAttributesRequest{ + Namespace: request.GetNamespace(), + SearchAttributes: request.GetSearchAttributes(), + // operatorService dropped support for IndexName, so we do nothing with it. + // operatorService dropped support for SkipSchemaUpdate, so we do nothing with it also. + } +} + +func convertAddSearchAttributesResponse(_ *operatorservice.AddSearchAttributesResponse) *adminservice.AddSearchAttributesResponse { + return &adminservice.AddSearchAttributesResponse{} +} + +// GetSearchAttributes converter functions +func convertGetSearchAttributesRequest(request *adminservice.GetSearchAttributesRequest) *operatorservice.ListSearchAttributesRequest { + return &operatorservice.ListSearchAttributesRequest{ + Namespace: request.GetNamespace(), + // operatorService dropped support for IndexName, so we do nothing with it. + } +} + +func convertGetSearchAttributesResponse(operatorResponse *operatorservice.ListSearchAttributesResponse) *adminservice.GetSearchAttributesResponse { + adminResponse := &adminservice.GetSearchAttributesResponse{ + SystemAttributes: operatorResponse.GetSystemAttributes(), + CustomAttributes: operatorResponse.GetCustomAttributes(), + Mapping: operatorResponse.GetStorageSchema(), + } + return adminResponse +} + +// RemoveSearchAttributes converter functions +func convertRemoveSearchAttributesRequest(request *adminservice.RemoveSearchAttributesRequest) *operatorservice.RemoveSearchAttributesRequest { + return &operatorservice.RemoveSearchAttributesRequest{ + Namespace: request.GetNamespace(), + SearchAttributes: request.GetSearchAttributes(), + // operatorService dropped support for IndexName, so we do nothing with it. + } +} + +func convertRemoveSearchAttributesResponse(_ *operatorservice.RemoveSearchAttributesResponse) *adminservice.RemoveSearchAttributesResponse { + return &adminservice.RemoveSearchAttributesResponse{} +} + // ListClusters converter functions func convertListClustersRequest(request *adminservice.ListClustersRequest) *operatorservice.ListClustersRequest { return &operatorservice.ListClustersRequest{ @@ -33,19 +90,6 @@ func convertListClustersResponse(operatorResponse *operatorservice.ListClustersR return adminResponse } -// AddOrUpdateRemoteCluster converter functions -func convertAddOrUpdateRemoteClusterRequest(request *adminservice.AddOrUpdateRemoteClusterRequest) *operatorservice.AddOrUpdateRemoteClusterRequest { - return &operatorservice.AddOrUpdateRemoteClusterRequest{ - FrontendAddress: request.GetFrontendAddress(), - FrontendHttpAddress: request.GetFrontendHttpAddress(), // Deprecated, used for backward compatibility - EnableRemoteClusterConnection: request.GetEnableRemoteClusterConnection(), - } -} - -func convertAddOrUpdateRemoteClusterResponse(operatorResponse *operatorservice.AddOrUpdateRemoteClusterResponse) *adminservice.AddOrUpdateRemoteClusterResponse { - return &adminservice.AddOrUpdateRemoteClusterResponse{} -} - // RemoveRemoteCluster converter functions func convertRemoveRemoteClusterRequest(request *adminservice.RemoveRemoteClusterRequest) *operatorservice.RemoveRemoteClusterRequest { return &operatorservice.RemoveRemoteClusterRequest{ @@ -53,7 +97,7 @@ func convertRemoveRemoteClusterRequest(request *adminservice.RemoveRemoteCluster } } -func convertRemoveRemoteClusterResponse(operatorResponse *operatorservice.RemoveRemoteClusterResponse) *adminservice.RemoveRemoteClusterResponse { +func convertRemoveRemoteClusterResponse(_ *operatorservice.RemoveRemoteClusterResponse) *adminservice.RemoveRemoteClusterResponse { // RemoveRemoteCluster returns an empty response return &adminservice.RemoveRemoteClusterResponse{} } diff --git a/proxy/adminservice.go b/proxy/adminservice.go index 132ea646..cb368d56 100644 --- a/proxy/adminservice.go +++ b/proxy/adminservice.go @@ -7,7 +7,7 @@ import ( "sync" "github.com/temporalio/s2s-proxy/client" - adminclient "github.com/temporalio/s2s-proxy/client/admin" + dualclient "github.com/temporalio/s2s-proxy/client/dual" "github.com/temporalio/s2s-proxy/common" "github.com/temporalio/s2s-proxy/config" @@ -40,7 +40,7 @@ func NewAdminServiceProxyServer( logger = log.With(logger, common.ServiceTag(serviceName)) clientProvider := client.NewClientProvider(clientConfig, clientFactory, logger) return &adminServiceProxyServer{ - adminClient: adminclient.NewLazyClient(clientProvider), + adminClient: dualclient.NewRoutingClient(clientProvider), logger: logger, proxyOptions: opts, }