diff --git a/config/config.go b/config/config.go index ab712301..44428067 100644 --- a/config/config.go +++ b/config/config.go @@ -110,6 +110,7 @@ type ( HealthCheck *HealthCheckConfig `yaml:"healthCheck"` NamespaceNameTranslation NamespaceNameTranslationConfig `yaml:"namespaceNameTranslation"` Metrics *MetricsConfig `yaml:"metrics"` + FilterNsData bool `yaml:"filterNsData"` } NamespaceNameTranslationConfig struct { diff --git a/proxy/adminservice.go b/proxy/adminservice.go index 8e89e883..8550c1f2 100644 --- a/proxy/adminservice.go +++ b/proxy/adminservice.go @@ -4,28 +4,34 @@ import ( "context" "fmt" "io" + "strings" "sync" + "github.com/gogo/status" "github.com/temporalio/s2s-proxy/client" adminclient "github.com/temporalio/s2s-proxy/client/admin" "github.com/temporalio/s2s-proxy/common" "github.com/temporalio/s2s-proxy/config" + "github.com/temporalio/s2s-proxy/transport" "go.temporal.io/api/serviceerror" "go.temporal.io/server/api/adminservice/v1" + repication "go.temporal.io/server/api/replication/v1" "go.temporal.io/server/client/history" "go.temporal.io/server/common/channel" "go.temporal.io/server/common/headers" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" ) type ( adminServiceProxyServer struct { adminservice.UnimplementedAdminServiceServer - adminClient adminservice.AdminServiceClient - logger log.Logger + adminClient adminservice.AdminServiceClient + remoteAdminClient adminservice.AdminServiceClient + logger log.Logger proxyOptions } ) @@ -39,10 +45,27 @@ func NewAdminServiceProxyServer( ) adminservice.AdminServiceServer { logger = log.With(logger, common.ServiceTag(serviceName)) clientProvider := client.NewClientProvider(clientConfig, clientFactory, logger) + + var remoteClientProvider client.ClientProvider + if opts.IsInbound { + // not used + remoteClientProvider = client.NewClientProvider(opts.Config.Outbound.Client, clientFactory, logger) + } else { + // HACK: This doesn't work for mux transport. + mgr := &transport.TransportManager{} + tp, err := mgr.OpenClient(opts.Config.Inbound.Client) + if err != nil { + panic(err) + } + remoteClientFactory := client.NewClientFactory(tp, logger) + remoteClientProvider = client.NewClientProvider(opts.Config.Inbound.Client, remoteClientFactory, logger) + } + return &adminServiceProxyServer{ - adminClient: adminclient.NewLazyClient(clientProvider), - logger: logger, - proxyOptions: opts, + adminClient: adminclient.NewLazyClient(clientProvider), + remoteAdminClient: adminclient.NewLazyClient(remoteClientProvider), + logger: logger, + proxyOptions: opts, } } @@ -134,7 +157,76 @@ func (s *adminServiceProxyServer) GetNamespace(ctx context.Context, in0 *adminse } func (s *adminServiceProxyServer) GetNamespaceReplicationMessages(ctx context.Context, in0 *adminservice.GetNamespaceReplicationMessagesRequest) (*adminservice.GetNamespaceReplicationMessagesResponse, error) { - return s.adminClient.GetNamespaceReplicationMessages(ctx, in0) + resp, err := s.adminClient.GetNamespaceReplicationMessages(ctx, in0) + if err != nil { + return resp, err + } + + if !s.Config.FilterNsData { + return resp, err + } + + for _, r := range resp.Messages.ReplicationTasks { + if r == nil { + continue + } + + nsTask, ok := r.Attributes.(*repication.ReplicationTask_NamespaceTaskAttributes) + if !ok || nsTask == nil { + continue + } + + attrs := nsTask.NamespaceTaskAttributes + if attrs == nil || attrs.Info == nil { + continue + } + + data := attrs.Info.Data + if s.IsInbound { + // Inbound request means Outbound response. Drop the __temporal fields. + for k := range data { + if strings.HasPrefix(k, "__temporal") { + delete(data, k) + } + } + } else { + // Outbound request means Inbound response. Add the __temporal back. + // First, find them on the target cluster. + nsResp, err := s.remoteAdminClient.GetNamespace(ctx, &adminservice.GetNamespaceRequest{ + Attributes: &adminservice.GetNamespaceRequest_Id{ + Id: attrs.Id, + }, + }) + if err != nil { + s.logger.Error("GetNamespace from local cluster failed", + tag.NewStringTag("namespace-id", attrs.Id), + tag.NewErrorTag("error", err), + ) + // hopefully, this is retried. + return nil, status.Errorf(codes.Unavailable, "s2s-proxy error") + } + + // TODO: Set ConfigVersion? So that the server ignores this task if the ns + // changes before this replication task arrives. + // + // Example: + // nsResp - Version = 1 + // something ns update - Version = 2 + // attrs - Version = 3 + // + // In this case, we copy fields outdated fields from nsResp (Version 1) to the nsTask (Version 3). + // We could protect against this by setting attrs.ConfigVersion = attrs.ConfigVersion + 1. + // + // Copy the __temporal fields from the target cluster. + s.logger.Info("Adding __temporal to namespace replication task") + for k, v := range nsResp.Info.Data { + if strings.HasPrefix(k, "__temporal") { + data[k] = v + } + } + } + } + return resp, err } func (s *adminServiceProxyServer) GetReplicationMessages(ctx context.Context, in0 *adminservice.GetReplicationMessagesRequest) (*adminservice.GetReplicationMessagesResponse, error) {