diff --git a/MODULE.bazel b/MODULE.bazel index 5dfbb32b..594f7ca8 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -57,6 +57,7 @@ use_repo( "com_github_gorilla_mux", "com_github_grpc_ecosystem_go_grpc_middleware", "com_github_grpc_ecosystem_go_grpc_prometheus", + "com_github_jhump_protoreflect_v2", "com_github_jmespath_go_jmespath", "com_github_klauspost_compress", "com_github_lazybeaver_xorshift", diff --git a/go.mod b/go.mod index a56ac164..c2411c17 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/gorilla/mux v1.8.1 github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 + github.com/jhump/protoreflect/v2 v2.0.0-beta.2 github.com/jmespath/go-jmespath v0.4.0 github.com/klauspost/compress v1.18.1 github.com/lazybeaver/xorshift v0.0.0-20170702203709-ce511d4823dd diff --git a/go.sum b/go.sum index a151f158..e48aaa30 100644 --- a/go.sum +++ b/go.sum @@ -198,6 +198,8 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92Bcuy github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 h1:NmZ1PKzSTQbuGHw9DGPFomqkkLWMC+vZCkfs+FHv1Vg= github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3/go.mod h1:zQrxl1YP88HQlA6i9c63DSVPFklWpGX4OWAc9bFuaH4= +github.com/jhump/protoreflect/v2 v2.0.0-beta.2 h1:qZU+rEZUOYTz1Bnhi3xbwn+VxdXkLVeEpAeZzVXLY88= +github.com/jhump/protoreflect/v2 v2.0.0-beta.2/go.mod h1:4tnOYkB/mq7QTyS3YKtVtNrJv4Psqout8HA1U+hZtgM= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= diff --git a/internal/mock/BUILD.bazel b/internal/mock/BUILD.bazel index e5f0a54d..255808e2 100644 --- a/internal/mock/BUILD.bazel +++ b/internal/mock/BUILD.bazel @@ -260,6 +260,7 @@ gomock( "ClientConnInterface", "ClientStream", "ServerStream", + "ServerTransportStream", "StreamHandler", "Streamer", "UnaryHandler", diff --git a/pkg/grpc/BUILD.bazel b/pkg/grpc/BUILD.bazel index 600ea5ec..fe7c95aa 100644 --- a/pkg/grpc/BUILD.bazel +++ b/pkg/grpc/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "client_factory.go", "deduplicating_client_factory.go", "deny_authenticator.go", + "forwarding_stream_handler.go", "jmespath_extractor.go", "lazy_client_dialer.go", "metadata_adding_interceptor.go", @@ -26,8 +27,10 @@ go_library( "peer_transport_credentials_linux.go", "proto_trace_attributes_extractor.go", "proxy_dialer.go", + "reflection_relay.go", "request_headers_authenticator.go", "request_metadata_tracing_interceptor.go", + "routing_stream_handler.go", "server.go", "tls_client_certificate_authenticator.go", ], @@ -49,6 +52,8 @@ go_library( "@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", "@com_github_grpc_ecosystem_go_grpc_middleware//:go-grpc-middleware", "@com_github_grpc_ecosystem_go_grpc_prometheus//:go-grpc-prometheus", + "@com_github_jhump_protoreflect_v2//grpcreflect", + "@com_github_jhump_protoreflect_v2//protoresolve", "@io_opentelemetry_go_contrib_instrumentation_google_golang_org_grpc_otelgrpc//:otelgrpc", "@io_opentelemetry_go_otel//attribute", "@io_opentelemetry_go_otel_trace//:trace", @@ -63,11 +68,14 @@ go_library( "@org_golang_google_grpc//metadata", "@org_golang_google_grpc//peer", "@org_golang_google_grpc//reflection", + "@org_golang_google_grpc//reflection/grpc_reflection_v1", "@org_golang_google_grpc//status", "@org_golang_google_grpc_security_advancedtls//:advancedtls", "@org_golang_google_protobuf//encoding/prototext", "@org_golang_google_protobuf//proto", "@org_golang_google_protobuf//reflect/protoreflect", + "@org_golang_google_protobuf//types/known/emptypb", + "@org_golang_x_sync//errgroup", "@org_golang_x_sync//semaphore", ] + select({ "@rules_go//go/platform:android": [ @@ -98,6 +106,7 @@ go_test( "authenticating_interceptor_test.go", "deduplicating_client_factory_test.go", "deny_authenticator_test.go", + "forwarding_stream_handler_test.go", "jmespath_extractor_test.go", "lazy_client_dialer_test.go", "metadata_adding_interceptor_test.go", @@ -107,6 +116,7 @@ go_test( "proto_trace_attributes_extractor_test.go", "request_headers_authenticator_test.go", "request_metadata_tracing_interceptor_test.go", + "routing_stream_handler_test.go", ] + select({ "@rules_go//go/platform:android": [ "peer_transport_credentials_test.go", diff --git a/pkg/grpc/forwarding_stream_handler.go b/pkg/grpc/forwarding_stream_handler.go new file mode 100644 index 00000000..be95d913 --- /dev/null +++ b/pkg/grpc/forwarding_stream_handler.go @@ -0,0 +1,92 @@ +package grpc + +import ( + "io" + + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" +) + +// NewForwardingStreamHandler creates a grpc.StreamHandler that forwards gRPC +// calls to a grpc.ClientConnInterface backend. +func NewForwardingStreamHandler(client grpc.ClientConnInterface) grpc.StreamHandler { + forwarder := &forwardingStreamHandler{ + backend: client, + } + return forwarder.HandleStream +} + +type forwardingStreamHandler struct { + backend grpc.ClientConnInterface +} + +// HandleStream creates a new stream to the backend. Requests from +// incomingStream are forwarded to the backend stream and responses from the +// backend stream are sent back in the incomingStream. +func (s *forwardingStreamHandler) HandleStream(srv any, incomingStream grpc.ServerStream) error { + // All gRPC invocations has a grpc.ServerTransportStream context. + method, _ := grpc.Method(incomingStream.Context()) + desc := grpc.StreamDesc{ + // According to grpc.StreamDesc documentation, StreamName and Handler + // are only used when registering handlers on a server. + StreamName: "", + Handler: nil, + // Streaming behaviour is wanted, single message is treated the same on + // transport level, the application just closes the stream after the + // first message. + ServerStreams: true, + ClientStreams: true, + } + group, groupCtx := errgroup.WithContext(incomingStream.Context()) + group.Go(func() error { + // groupCtx is guaranteed to be canceled before returning from this method, so outgoingStream will not leak resources. + outgoingStream, err := s.backend.NewStream(groupCtx, &desc, method) + if err != nil { + return err + } + // Avoid group.Go because incomingStream.RecvMsg might block returning + // an error from the outgoingStream and getting the context for + // incomingStream canceled. + go func() { + for { + msg := &emptypb.Empty{} + if err := incomingStream.RecvMsg(msg); err != nil { + if err == io.EOF { + // Let's continue to receive on outgoingStream, so don't + // cancel grouptCtx. + outgoingStream.CloseSend() + return + } + // Cancel groupCtx immediately. + group.Go(func() error { return err }) + return + } + if err := outgoingStream.SendMsg(msg); err != nil { + if err == io.EOF { + // The error will be returned by outgoingStream.RecvMsg(), + // no need to cancel groupCtx now. + return + } + // Cancel groupCtx immediately. + group.Go(func() error { return err }) + return + } + } + }() + + for { + msg := &emptypb.Empty{} + if err := outgoingStream.RecvMsg(msg); err != nil { + if err == io.EOF { + return nil + } + return err + } + if err := incomingStream.SendMsg(msg); err != nil { + return err + } + } + }) + return group.Wait() +} diff --git a/pkg/grpc/forwarding_stream_handler_test.go b/pkg/grpc/forwarding_stream_handler_test.go new file mode 100644 index 00000000..c5eb2096 --- /dev/null +++ b/pkg/grpc/forwarding_stream_handler_test.go @@ -0,0 +1,301 @@ +package grpc_test + +import ( + "context" + "errors" + "io" + "testing" + "testing/synctest" + + "github.com/buildbarn/bb-storage/internal/mock" + bb_grpc "github.com/buildbarn/bb-storage/pkg/grpc" + "github.com/buildbarn/bb-storage/pkg/testutil" + "github.com/stretchr/testify/require" + + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/structpb" + + "go.uber.org/mock/gomock" +) + +type eqProtoStringValueMatcher struct { + gomock.Matcher +} + +// newEqProtoStringValueMatcher is a gomock matcher for proto equality after +// converting the proto.Message to structpb.Value. +func newEqProtoStringValueMatcher(t *testing.T, v string) gomock.Matcher { + proto := structpb.NewStringValue(v) + return &eqProtoStringValueMatcher{ + Matcher: testutil.EqProto(t, proto), + } +} + +func (m *eqProtoStringValueMatcher) Matches(other interface{}) bool { + otherProto, ok := other.(proto.Message) + if !ok { + return false + } + bytes, err := proto.Marshal(otherProto) + if err != nil { + return false + } + value := new(structpb.Value) + if proto.Unmarshal(bytes, value) != nil { + return false + } + return m.Matcher.Matches(value) +} + +func newForwardingStreamRecvMsgStub(v string) func(msg any) error { + src := structpb.NewStringValue(v) + bytes, err := proto.Marshal(src) + return func(dst any) error { + if err != nil { + return err + } + return proto.Unmarshal(bytes, dst.(proto.Message)) + } +} + +func TestSimpleStreamForwarder(t *testing.T) { + ctrl, _ := gomock.WithContext(context.Background(), t) + + backend := mock.NewMockClientConnInterface(ctrl) + forwarder := bb_grpc.NewForwardingStreamHandler(backend) + serverTransportStream := mock.NewMockServerTransportStream(ctrl) + serverTransportStream.EXPECT().Method().Return("/serviceA/method1").AnyTimes() + + t.Run("RequestSuccess", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var outgoingStreamCtx context.Context + outgoingRecvBarrier := make(chan struct{}) + incomingStreamCtx := grpc.NewContextWithServerTransportStream(context.Background(), serverTransportStream) + incomingStream := mock.NewMockServerStream(ctrl) + outgoingStream := mock.NewMockClientStream(ctrl) + + newStreamCall := backend.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/serviceA/method1").DoAndReturn( + func(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { + outgoingStreamCtx = ctx + return outgoingStream, nil + }, + ) + + incomingStream.EXPECT().Context().Return(incomingStreamCtx).AnyTimes() + gomock.InOrder( + newStreamCall, + incomingStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(newForwardingStreamRecvMsgStub("beep")), + outgoingStream.EXPECT().SendMsg(newEqProtoStringValueMatcher(t, "beep")).Return(nil), + incomingStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(newForwardingStreamRecvMsgStub("boop")), + outgoingStream.EXPECT().SendMsg(newEqProtoStringValueMatcher(t, "boop")).Return(nil), + incomingStream.EXPECT().RecvMsg(gomock.Any()).Return(io.EOF), + outgoingStream.EXPECT().CloseSend().DoAndReturn(func() error { + close(outgoingRecvBarrier) + return nil + }), + ) + gomock.InOrder( + newStreamCall, + outgoingStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(func(msg any) error { + <-outgoingRecvBarrier + synctest.Wait() + require.NoError(t, outgoingStreamCtx.Err()) + return io.EOF + }), + ) + + require.NoError(t, forwarder(nil, incomingStream)) + <-outgoingStreamCtx.Done() + }) + }) + + t.Run("RequestRecvError", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var outgoingStreamCtx context.Context + incomingStreamCtx := grpc.NewContextWithServerTransportStream(context.Background(), serverTransportStream) + incomingStream := mock.NewMockServerStream(ctrl) + outgoingStream := mock.NewMockClientStream(ctrl) + + newStreamCall := backend.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/serviceA/method1").DoAndReturn( + func(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { + outgoingStreamCtx = ctx + return outgoingStream, nil + }, + ) + + incomingStream.EXPECT().Context().Return(incomingStreamCtx).AnyTimes() + gomock.InOrder( + newStreamCall, + incomingStream.EXPECT().RecvMsg(gomock.Any()).Return(errors.New("incoming recv")), + ) + gomock.InOrder( + newStreamCall, + outgoingStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(func(msg any) error { + // When incomingStream.RecvMsg returns, the backend context + // should be canceled due to the error. + <-outgoingStreamCtx.Done() + return context.Canceled + }), + ) + + require.EqualError(t, forwarder(nil, incomingStream), "incoming recv") + }) + }) + + t.Run("RequestSendError", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var outgoingStreamCtx context.Context + incomingStreamCtx := grpc.NewContextWithServerTransportStream(context.Background(), serverTransportStream) + incomingStream := mock.NewMockServerStream(ctrl) + outgoingStream := mock.NewMockClientStream(ctrl) + + newStreamCall := backend.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/serviceA/method1").DoAndReturn( + func(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { + outgoingStreamCtx = ctx + return outgoingStream, nil + }, + ) + + incomingStream.EXPECT().Context().Return(incomingStreamCtx).AnyTimes() + gomock.InOrder( + newStreamCall, + incomingStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(newForwardingStreamRecvMsgStub("beep")), + outgoingStream.EXPECT().SendMsg(newEqProtoStringValueMatcher(t, "beep")).Return(errors.New("outgoing send")), + ) + gomock.InOrder( + newStreamCall, + outgoingStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(func(msg any) error { + // When outgoingStream.SendMsg returns, the outgoing context + // should be canceled due to the error. + <-outgoingStreamCtx.Done() + return context.Canceled + }), + ) + + require.EqualError(t, forwarder(nil, incomingStream), "outgoing send") + }) + }) + + t.Run("ResponseSuccess", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var outgoingStreamCtx context.Context + incomingRecvBarrier := make(chan struct{}) + incomingStreamCtx := grpc.NewContextWithServerTransportStream(context.Background(), serverTransportStream) + incomingStream := mock.NewMockServerStream(ctrl) + outgoingStream := mock.NewMockClientStream(ctrl) + + newStreamCall := backend.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/serviceA/method1").DoAndReturn( + func(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { + outgoingStreamCtx = ctx + return outgoingStream, nil + }, + ) + + incomingStream.EXPECT().Context().Return(incomingStreamCtx).AnyTimes() + gomock.InOrder( + newStreamCall, + outgoingStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(newForwardingStreamRecvMsgStub("beep")), + incomingStream.EXPECT().SendMsg(newEqProtoStringValueMatcher(t, "beep")).Return(nil), + outgoingStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(newForwardingStreamRecvMsgStub("boop")), + incomingStream.EXPECT().SendMsg(newEqProtoStringValueMatcher(t, "boop")).Return(nil), + outgoingStream.EXPECT().RecvMsg(gomock.Any()).Return(io.EOF), + ) + gomock.InOrder( + newStreamCall, + incomingStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(func(msg any) error { + <-incomingRecvBarrier + return context.Canceled + }), + ) + + require.NoError(t, forwarder(nil, incomingStream)) + <-outgoingStreamCtx.Done() + + // incomingStream.Recv() is still blocking. + close(incomingRecvBarrier) + }) + }) + + t.Run("ResponseRecvError", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var outgoingStreamCtx context.Context + incomingRecvBarrier := make(chan struct{}) + incomingStreamCtx := grpc.NewContextWithServerTransportStream(context.Background(), serverTransportStream) + incomingStream := mock.NewMockServerStream(ctrl) + outgoingStream := mock.NewMockClientStream(ctrl) + + newStreamCall := backend.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/serviceA/method1").DoAndReturn( + func(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { + outgoingStreamCtx = ctx + return outgoingStream, nil + }, + ) + + incomingStream.EXPECT().Context().Return(incomingStreamCtx).AnyTimes() + gomock.InOrder( + newStreamCall, + outgoingStream.EXPECT().RecvMsg(gomock.Any()).Return(errors.New("outgoing recv")), + ) + gomock.InOrder( + newStreamCall, + incomingStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(func(msg any) error { + <-incomingRecvBarrier + return context.Canceled + }), + ) + + require.EqualError(t, forwarder(nil, incomingStream), "outgoing recv") + <-outgoingStreamCtx.Done() + + // incomingStream.Recv() is still blocking. + close(incomingRecvBarrier) + }) + }) + + t.Run("ResponseSendError", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var outgoingStreamCtx context.Context + incomingStreamCtx := grpc.NewContextWithServerTransportStream(context.Background(), serverTransportStream) + incomingStream := mock.NewMockServerStream(ctrl) + outgoingStream := mock.NewMockClientStream(ctrl) + + newStreamCall := backend.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/serviceA/method1").DoAndReturn( + func(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { + outgoingStreamCtx = ctx + return outgoingStream, nil + }, + ) + + incomingStream.EXPECT().Context().Return(incomingStreamCtx).AnyTimes() + gomock.InOrder( + newStreamCall, + outgoingStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(newForwardingStreamRecvMsgStub("beep")), + incomingStream.EXPECT().SendMsg(newEqProtoStringValueMatcher(t, "beep")).Return(errors.New("incoming send")), + ) + gomock.InOrder( + newStreamCall, + incomingStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(func(msg any) error { + // When incomingStream.SendMsg returns, the outgoing context + // should be canceled due to the error. + <-outgoingStreamCtx.Done() + return context.Canceled + }), + ) + + require.EqualError(t, forwarder(nil, incomingStream), "incoming send") + }) + }) + + t.Run("NewStreamError", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + incomingStreamCtx := grpc.NewContextWithServerTransportStream(context.Background(), serverTransportStream) + incomingStream := mock.NewMockServerStream(ctrl) + + incomingStream.EXPECT().Context().Return(incomingStreamCtx).AnyTimes() + backend.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/serviceA/method1").Return(nil, errors.New("no stream")) + + require.EqualError(t, forwarder(nil, incomingStream), "no stream") + }) + }) +} diff --git a/pkg/grpc/reflection_relay.go b/pkg/grpc/reflection_relay.go new file mode 100644 index 00000000..143f996c --- /dev/null +++ b/pkg/grpc/reflection_relay.go @@ -0,0 +1,64 @@ +package grpc + +import ( + "context" + "maps" + + "github.com/jhump/protoreflect/v2/grpcreflect" + "github.com/jhump/protoreflect/v2/protoresolve" + + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" + "google.golang.org/grpc/reflection/grpc_reflection_v1" +) + +type combinedServiceInfoProvider struct { + server reflection.ServiceInfoProvider + extraServices map[string]grpc.ServiceInfo +} + +var _ reflection.ServiceInfoProvider = (*combinedServiceInfoProvider)(nil) + +// GetServiceInfo returns the currently available services, which might have +// changed since the creation of this reflection server. +func (p *combinedServiceInfoProvider) GetServiceInfo() map[string]grpc.ServiceInfo { + serverServiceInfo := p.server.GetServiceInfo() + services := make(map[string]grpc.ServiceInfo, len(p.extraServices)+len(serverServiceInfo)) + maps.Copy(services, p.extraServices) + maps.Copy(services, serverServiceInfo) + return services +} + +// registerReflectionServer registers the google.golang.org/grpc/reflection/ +// service on a grpc.Server and calls remote backends in case for relayed +// services. The connections to the backend will run with the backendCtx. +func registerReflectionServer(backendCtx context.Context, s *grpc.Server, serverRelayConfigurations []serverRelayConfigWithGrpcClient) error { + // Accumulate all the service names. + relayServices := make(map[string]grpc.ServiceInfo) + for _, relay := range serverRelayConfigurations { + for _, service := range relay.config.Services { + // According to ServiceInfoProvider docs for ServerOptions.Services, + // the reflection service is only interested in the service names. + relayServices[service] = grpc.ServiceInfo{} + } + } + + // Make a combined descriptor and extension resolver. + reflectionBackends := []protoresolve.Resolver{} + for _, relay := range serverRelayConfigurations { + resolver := grpcreflect.NewClientAuto(backendCtx, relay.grpcClient).AsResolver() + reflectionBackends = append(reflectionBackends, resolver) + } + combinedRemoteResolver := protoresolve.Combine(reflectionBackends...) + + serverOptions := reflection.ServerOptions{ + Services: &combinedServiceInfoProvider{ + server: s, + extraServices: relayServices, + }, + DescriptorResolver: combinedRemoteResolver, + ExtensionResolver: protoresolve.TypesFromDescriptorPool(combinedRemoteResolver), + } + grpc_reflection_v1.RegisterServerReflectionServer(s, reflection.NewServerV1(serverOptions)) + return nil +} diff --git a/pkg/grpc/routing_stream_handler.go b/pkg/grpc/routing_stream_handler.go new file mode 100644 index 00000000..1b897611 --- /dev/null +++ b/pkg/grpc/routing_stream_handler.go @@ -0,0 +1,34 @@ +package grpc + +import ( + "strings" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// NewRoutingStreamHandler creates a RoutingStreamForwarder which routes gRPC +// streams based on the invoked gRPC method name. The keys in the routeTable map +// are gRPC service names, for example: +// +// build.bazel.remote.execution.v2.Execution +// com.google.devtools.build.v1.PublishBuildEvent +func NewRoutingStreamHandler(routeTable map[string]grpc.StreamHandler) grpc.StreamHandler { + return func(srv any, stream grpc.ServerStream) error { + // All gRPC invocations has a grpc.ServerTransportStream context. + orgServiceMethod, _ := grpc.Method(stream.Context()) + // Service and method name parsing based on grpc.Server.handleStream(). + serviceMethod := strings.TrimPrefix(orgServiceMethod, "/") + endIdx := strings.LastIndex(serviceMethod, "/") + if endIdx == -1 { + return status.Errorf(codes.InvalidArgument, "Malformed method name %v", orgServiceMethod) + } + service := serviceMethod[:endIdx] + + if streamHandler, ok := routeTable[service]; ok { + return streamHandler(srv, stream) + } + return status.Errorf(codes.Unimplemented, "No route for service %v", service) + } +} diff --git a/pkg/grpc/routing_stream_handler_test.go b/pkg/grpc/routing_stream_handler_test.go new file mode 100644 index 00000000..652f0b49 --- /dev/null +++ b/pkg/grpc/routing_stream_handler_test.go @@ -0,0 +1,87 @@ +package grpc_test + +import ( + "context" + "errors" + "testing" + + "github.com/buildbarn/bb-storage/internal/mock" + bb_grpc "github.com/buildbarn/bb-storage/pkg/grpc" + "github.com/buildbarn/bb-storage/pkg/testutil" + "github.com/stretchr/testify/require" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "go.uber.org/mock/gomock" +) + +func TestRoutingStreamForwarder(t *testing.T) { + ctrl, ctx := gomock.WithContext(context.Background(), t) + + someSrv := "server" + serverTransportStream := mock.NewMockServerTransportStream(ctrl) + streamCtx := grpc.NewContextWithServerTransportStream(ctx, serverTransportStream) + incomingStream := mock.NewMockServerStream(ctrl) + incomingStream.EXPECT().Context().Return(streamCtx).AnyTimes() + + // The test assumes that the incomingStream is forwarded straight through + // the RoutingStreamForwarder, even if the implementation is allowed to do + // some wrapping. + streamHandler := mock.NewMockStreamHandler(ctrl) + + forwarder := bb_grpc.NewRoutingStreamHandler(map[string]grpc.StreamHandler{ + "serviceA": streamHandler.Call, + "/serviceB": streamHandler.Call, + }) + + serverTransportStream.EXPECT().Method().Return("/serviceA/method1") + streamHandler.EXPECT().Call(someSrv, incomingStream).Return(errors.New("called")) + require.Error(t, forwarder(someSrv, incomingStream), "called") + + serverTransportStream.EXPECT().Method().Return("/serviceB/method2") + testutil.RequireEqualStatus( + t, + status.Error(codes.Unimplemented, "No route for service serviceB"), + forwarder(someSrv, incomingStream), + ) + + serverTransportStream.EXPECT().Method().Return("/non.existing/service/bad-method") + testutil.RequireEqualStatus( + t, + status.Error(codes.Unimplemented, "No route for service non.existing/service"), + forwarder(someSrv, incomingStream), + ) + serverTransportStream.EXPECT().Method().Return("non.existing/service/bad-method") + testutil.RequireEqualStatus( + t, + status.Error(codes.Unimplemented, "No route for service non.existing/service"), + forwarder(someSrv, incomingStream), + ) + + serverTransportStream.EXPECT().Method().Return("/service.only") + testutil.RequireEqualStatus( + t, + status.Error(codes.InvalidArgument, "Malformed method name /service.only"), + forwarder(someSrv, incomingStream), + ) + serverTransportStream.EXPECT().Method().Return("service.only") + testutil.RequireEqualStatus( + t, + status.Error(codes.InvalidArgument, "Malformed method name service.only"), + forwarder(someSrv, incomingStream), + ) + serverTransportStream.EXPECT().Method().Return("/") + testutil.RequireEqualStatus( + t, + status.Error(codes.InvalidArgument, "Malformed method name /"), + forwarder(someSrv, incomingStream), + ) + serverTransportStream.EXPECT().Method().Return("") + testutil.RequireEqualStatus( + t, + status.Error(codes.InvalidArgument, "Malformed method name "), + forwarder(someSrv, incomingStream), + ) +} diff --git a/pkg/grpc/server.go b/pkg/grpc/server.go index 30f97086..e54cbb47 100644 --- a/pkg/grpc/server.go +++ b/pkg/grpc/server.go @@ -9,7 +9,7 @@ import ( configuration "github.com/buildbarn/bb-storage/pkg/proto/configuration/grpc" grpcpb "github.com/buildbarn/bb-storage/pkg/proto/configuration/grpc" "github.com/buildbarn/bb-storage/pkg/util" - "github.com/grpc-ecosystem/go-grpc-prometheus" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -18,7 +18,6 @@ import ( "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/keepalive" - "google.golang.org/grpc/reflection" "google.golang.org/grpc/status" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -31,6 +30,13 @@ func init() { util.DecimalExponentialBuckets(-3, 6, 2))) } +type serverRelayConfigWithGrpcClient struct { + // config is never nil. + config *grpcpb.ServerRelayConfiguration + // grpcClient is a client created according to config.Endpoint. + grpcClient grpc.ClientConnInterface +} + // NewServersFromConfigurationAndServe creates a series of gRPC servers // based on a configuration stored in a list of Protobuf messages. It // then lets all of these gRPC servers listen on the network addresses @@ -147,6 +153,25 @@ func NewServersFromConfigurationAndServe(configurations []*configuration.ServerC })) } + relayConfigWithGrpcClients := make([]serverRelayConfigWithGrpcClient, len(configuration.Relays)) + for relayIdx, relay := range configuration.Relays { + grpcClient, err := grpcClientFactory.NewClientFromConfiguration(relay.Endpoint, group) + if err != nil { + return util.StatusWrapf(err, "Failed to create relay RPC client %d", relayIdx+1) + } + relayConfigWithGrpcClients[relayIdx] = serverRelayConfigWithGrpcClient{ + config: relay, + grpcClient: grpcClient, + } + } + if len(configuration.Relays) != 0 { + handler, err := newRoutingStreamHandlerFromConfiguration(relayConfigWithGrpcClients) + if err != nil { + return err + } + serverOptions = append(serverOptions, grpc.UnknownServiceHandler(handler)) + } + // Create server. s := grpc.NewServer(serverOptions...) stopFunc := s.Stop @@ -162,7 +187,9 @@ func NewServersFromConfigurationAndServe(configurations []*configuration.ServerC // Enable default services. grpc_prometheus.Register(s) - reflection.Register(s) + if err := registerReflectionServer(context.Background(), s, relayConfigWithGrpcClients); err != nil { + return util.StatusWrap(err, "Failed to create reflection service") + } h := health.NewServer() grpc_health_v1.RegisterHealthServer(s, h) // TODO: Construct an API for the caller to indicate @@ -208,3 +235,17 @@ func NewServersFromConfigurationAndServe(configurations []*configuration.ServerC } return nil } + +func newRoutingStreamHandlerFromConfiguration(serverRelayConfigurations []serverRelayConfigWithGrpcClient) (grpc.StreamHandler, error) { + routeTable := make(map[string]grpc.StreamHandler) + for _, relay := range serverRelayConfigurations { + handler := NewForwardingStreamHandler(relay.grpcClient) + for _, service := range relay.config.Services { + if _, ok := routeTable[service]; ok { + return nil, status.Errorf(codes.InvalidArgument, "Duplicated gRPC relay for %v", service) + } + routeTable[service] = handler + } + } + return NewRoutingStreamHandler(routeTable), nil +} diff --git a/pkg/proto/configuration/grpc/grpc.pb.go b/pkg/proto/configuration/grpc/grpc.pb.go index d21c8a15..b79d1a79 100644 --- a/pkg/proto/configuration/grpc/grpc.pb.go +++ b/pkg/proto/configuration/grpc/grpc.pb.go @@ -233,6 +233,7 @@ type ServerConfiguration struct { // *ServerConfiguration_Tls // *ServerConfiguration_Alts TransportSecurity isServerConfiguration_TransportSecurity `protobuf_oneof:"transport_security"` + Relays []*ServerRelayConfiguration `protobuf:"bytes,14,rep,name=relays,proto3" json:"relays,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -369,6 +370,13 @@ func (x *ServerConfiguration) GetAlts() *emptypb.Empty { return nil } +func (x *ServerConfiguration) GetRelays() []*ServerRelayConfiguration { + if x != nil { + return x.Relays + } + return nil +} + type isServerConfiguration_TransportSecurity interface { isServerConfiguration_TransportSecurity() } @@ -967,6 +975,58 @@ func (x *TracingMethodConfiguration) GetAttributesFromFirstResponseMessage() []s return nil } +type ServerRelayConfiguration struct { + state protoimpl.MessageState `protogen:"open.v1"` + Endpoint *ClientConfiguration `protobuf:"bytes,1,opt,name=endpoint,proto3" json:"endpoint,omitempty"` + Services []string `protobuf:"bytes,2,rep,name=services,proto3" json:"services,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ServerRelayConfiguration) Reset() { + *x = ServerRelayConfiguration{} + mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_grpc_grpc_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ServerRelayConfiguration) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ServerRelayConfiguration) ProtoMessage() {} + +func (x *ServerRelayConfiguration) ProtoReflect() protoreflect.Message { + mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_grpc_grpc_proto_msgTypes[11] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ServerRelayConfiguration.ProtoReflect.Descriptor instead. +func (*ServerRelayConfiguration) Descriptor() ([]byte, []int) { + return file_github_com_buildbarn_bb_storage_pkg_proto_configuration_grpc_grpc_proto_rawDescGZIP(), []int{11} +} + +func (x *ServerRelayConfiguration) GetEndpoint() *ClientConfiguration { + if x != nil { + return x.Endpoint + } + return nil +} + +func (x *ServerRelayConfiguration) GetServices() []string { + if x != nil { + return x.Services + } + return nil +} + type ClientConfiguration_HeaderValues struct { state protoimpl.MessageState `protogen:"open.v1"` Header string `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"` @@ -977,7 +1037,7 @@ type ClientConfiguration_HeaderValues struct { func (x *ClientConfiguration_HeaderValues) Reset() { *x = ClientConfiguration_HeaderValues{} - mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_grpc_grpc_proto_msgTypes[11] + mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_grpc_grpc_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -989,7 +1049,7 @@ func (x *ClientConfiguration_HeaderValues) String() string { func (*ClientConfiguration_HeaderValues) ProtoMessage() {} func (x *ClientConfiguration_HeaderValues) ProtoReflect() protoreflect.Message { - mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_grpc_grpc_proto_msgTypes[11] + mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_grpc_grpc_proto_msgTypes[12] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1046,7 +1106,7 @@ const file_github_com_buildbarn_bb_storage_pkg_proto_configuration_grpc_grpc_pro "\x1cClientKeepaliveConfiguration\x12-\n" + "\x04time\x18\x01 \x01(\v2\x19.google.protobuf.DurationR\x04time\x123\n" + "\atimeout\x18\x02 \x01(\v2\x19.google.protobuf.DurationR\atimeout\x122\n" + - "\x15permit_without_stream\x18\x03 \x01(\bR\x13permitWithoutStream\"\xbd\b\n" + + "\x15permit_without_stream\x18\x03 \x01(\bR\x13permitWithoutStream\"\x8d\t\n" + "\x13ServerConfiguration\x12)\n" + "\x10listen_addresses\x18\x01 \x03(\tR\x0flistenAddresses\x12!\n" + "\flisten_paths\x18\x02 \x03(\tR\vlistenPaths\x12g\n" + @@ -1061,7 +1121,8 @@ const file_github_com_buildbarn_bb_storage_pkg_proto_configuration_grpc_grpc_pro "\x14keepalive_parameters\x18\v \x01(\v27.buildbarn.configuration.grpc.ServerKeepaliveParametersR\x13keepaliveParameters\x12'\n" + "\x0fstop_gracefully\x18\f \x01(\bR\x0estopGracefully\x12D\n" + "\x03tls\x18\x03 \x01(\v20.buildbarn.configuration.tls.ServerConfigurationH\x00R\x03tls\x12,\n" + - "\x04alts\x18\r \x01(\v2\x16.google.protobuf.EmptyH\x00R\x04alts\x1at\n" + + "\x04alts\x18\r \x01(\v2\x16.google.protobuf.EmptyH\x00R\x04alts\x12N\n" + + "\x06relays\x18\x0e \x03(\v26.buildbarn.configuration.grpc.ServerRelayConfigurationR\x06relays\x1at\n" + "\fTracingEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12N\n" + "\x05value\x18\x02 \x01(\v28.buildbarn.configuration.grpc.TracingMethodConfigurationR\x05value:\x028\x01B\x14\n" + @@ -1101,7 +1162,10 @@ const file_github_com_buildbarn_bb_storage_pkg_proto_configuration_grpc_grpc_pro "\x18cache_replacement_policy\x18\x05 \x01(\x0e28.buildbarn.configuration.eviction.CacheReplacementPolicyR\x16cacheReplacementPolicy\"\xc2\x01\n" + "\x1aTracingMethodConfiguration\x12P\n" + "%attributes_from_first_request_message\x18\x01 \x03(\tR!attributesFromFirstRequestMessage\x12R\n" + - "&attributes_from_first_response_message\x18\x02 \x03(\tR\"attributesFromFirstResponseMessageB>ZZ buildbarn.configuration.tls.ClientConfiguration + 15, // 0: buildbarn.configuration.grpc.ClientConfiguration.tls:type_name -> buildbarn.configuration.tls.ClientConfiguration 1, // 1: buildbarn.configuration.grpc.ClientConfiguration.keepalive:type_name -> buildbarn.configuration.grpc.ClientKeepaliveConfiguration - 11, // 2: buildbarn.configuration.grpc.ClientConfiguration.add_metadata:type_name -> buildbarn.configuration.grpc.ClientConfiguration.HeaderValues - 15, // 3: buildbarn.configuration.grpc.ClientConfiguration.add_metadata_jmespath_expression:type_name -> buildbarn.configuration.jmespath.Expression - 16, // 4: buildbarn.configuration.grpc.ClientConfiguration.oauth2:type_name -> buildbarn.configuration.http.client.OAuth2Configuration - 12, // 5: buildbarn.configuration.grpc.ClientConfiguration.tracing:type_name -> buildbarn.configuration.grpc.ClientConfiguration.TracingEntry - 17, // 6: buildbarn.configuration.grpc.ClientConfiguration.default_service_config:type_name -> google.protobuf.Struct - 18, // 7: buildbarn.configuration.grpc.ClientKeepaliveConfiguration.time:type_name -> google.protobuf.Duration - 18, // 8: buildbarn.configuration.grpc.ClientKeepaliveConfiguration.timeout:type_name -> google.protobuf.Duration + 12, // 2: buildbarn.configuration.grpc.ClientConfiguration.add_metadata:type_name -> buildbarn.configuration.grpc.ClientConfiguration.HeaderValues + 16, // 3: buildbarn.configuration.grpc.ClientConfiguration.add_metadata_jmespath_expression:type_name -> buildbarn.configuration.jmespath.Expression + 17, // 4: buildbarn.configuration.grpc.ClientConfiguration.oauth2:type_name -> buildbarn.configuration.http.client.OAuth2Configuration + 13, // 5: buildbarn.configuration.grpc.ClientConfiguration.tracing:type_name -> buildbarn.configuration.grpc.ClientConfiguration.TracingEntry + 18, // 6: buildbarn.configuration.grpc.ClientConfiguration.default_service_config:type_name -> google.protobuf.Struct + 19, // 7: buildbarn.configuration.grpc.ClientKeepaliveConfiguration.time:type_name -> google.protobuf.Duration + 19, // 8: buildbarn.configuration.grpc.ClientKeepaliveConfiguration.timeout:type_name -> google.protobuf.Duration 5, // 9: buildbarn.configuration.grpc.ServerConfiguration.authentication_policy:type_name -> buildbarn.configuration.grpc.AuthenticationPolicy 3, // 10: buildbarn.configuration.grpc.ServerConfiguration.keepalive_enforcement_policy:type_name -> buildbarn.configuration.grpc.ServerKeepaliveEnforcementPolicy - 13, // 11: buildbarn.configuration.grpc.ServerConfiguration.tracing:type_name -> buildbarn.configuration.grpc.ServerConfiguration.TracingEntry + 14, // 11: buildbarn.configuration.grpc.ServerConfiguration.tracing:type_name -> buildbarn.configuration.grpc.ServerConfiguration.TracingEntry 4, // 12: buildbarn.configuration.grpc.ServerConfiguration.keepalive_parameters:type_name -> buildbarn.configuration.grpc.ServerKeepaliveParameters - 19, // 13: buildbarn.configuration.grpc.ServerConfiguration.tls:type_name -> buildbarn.configuration.tls.ServerConfiguration - 20, // 14: buildbarn.configuration.grpc.ServerConfiguration.alts:type_name -> google.protobuf.Empty - 18, // 15: buildbarn.configuration.grpc.ServerKeepaliveEnforcementPolicy.min_time:type_name -> google.protobuf.Duration - 18, // 16: buildbarn.configuration.grpc.ServerKeepaliveParameters.max_connection_idle:type_name -> google.protobuf.Duration - 18, // 17: buildbarn.configuration.grpc.ServerKeepaliveParameters.max_connection_age:type_name -> google.protobuf.Duration - 18, // 18: buildbarn.configuration.grpc.ServerKeepaliveParameters.max_connection_age_grace:type_name -> google.protobuf.Duration - 18, // 19: buildbarn.configuration.grpc.ServerKeepaliveParameters.time:type_name -> google.protobuf.Duration - 18, // 20: buildbarn.configuration.grpc.ServerKeepaliveParameters.timeout:type_name -> google.protobuf.Duration - 21, // 21: buildbarn.configuration.grpc.AuthenticationPolicy.allow:type_name -> buildbarn.auth.AuthenticationMetadata - 6, // 22: buildbarn.configuration.grpc.AuthenticationPolicy.any:type_name -> buildbarn.configuration.grpc.AnyAuthenticationPolicy - 7, // 23: buildbarn.configuration.grpc.AuthenticationPolicy.all:type_name -> buildbarn.configuration.grpc.AllAuthenticationPolicy - 22, // 24: buildbarn.configuration.grpc.AuthenticationPolicy.tls_client_certificate:type_name -> buildbarn.configuration.x509.ClientCertificateVerifierConfiguration - 23, // 25: buildbarn.configuration.grpc.AuthenticationPolicy.jwt:type_name -> buildbarn.configuration.jwt.AuthorizationHeaderParserConfiguration - 15, // 26: buildbarn.configuration.grpc.AuthenticationPolicy.peer_credentials_jmespath_expression:type_name -> buildbarn.configuration.jmespath.Expression - 9, // 27: buildbarn.configuration.grpc.AuthenticationPolicy.remote:type_name -> buildbarn.configuration.grpc.RemoteAuthenticationPolicy - 5, // 28: buildbarn.configuration.grpc.AnyAuthenticationPolicy.policies:type_name -> buildbarn.configuration.grpc.AuthenticationPolicy - 5, // 29: buildbarn.configuration.grpc.AllAuthenticationPolicy.policies:type_name -> buildbarn.configuration.grpc.AuthenticationPolicy - 15, // 30: buildbarn.configuration.grpc.TLSClientCertificateAuthenticationPolicy.validation_jmespath_expression:type_name -> buildbarn.configuration.jmespath.Expression - 15, // 31: buildbarn.configuration.grpc.TLSClientCertificateAuthenticationPolicy.metadata_extraction_jmespath_expression:type_name -> buildbarn.configuration.jmespath.Expression - 0, // 32: buildbarn.configuration.grpc.RemoteAuthenticationPolicy.endpoint:type_name -> buildbarn.configuration.grpc.ClientConfiguration - 24, // 33: buildbarn.configuration.grpc.RemoteAuthenticationPolicy.scope:type_name -> google.protobuf.Value - 25, // 34: buildbarn.configuration.grpc.RemoteAuthenticationPolicy.cache_replacement_policy:type_name -> buildbarn.configuration.eviction.CacheReplacementPolicy - 10, // 35: buildbarn.configuration.grpc.ClientConfiguration.TracingEntry.value:type_name -> buildbarn.configuration.grpc.TracingMethodConfiguration - 10, // 36: buildbarn.configuration.grpc.ServerConfiguration.TracingEntry.value:type_name -> buildbarn.configuration.grpc.TracingMethodConfiguration - 37, // [37:37] is the sub-list for method output_type - 37, // [37:37] is the sub-list for method input_type - 37, // [37:37] is the sub-list for extension type_name - 37, // [37:37] is the sub-list for extension extendee - 0, // [0:37] is the sub-list for field type_name + 20, // 13: buildbarn.configuration.grpc.ServerConfiguration.tls:type_name -> buildbarn.configuration.tls.ServerConfiguration + 21, // 14: buildbarn.configuration.grpc.ServerConfiguration.alts:type_name -> google.protobuf.Empty + 11, // 15: buildbarn.configuration.grpc.ServerConfiguration.relays:type_name -> buildbarn.configuration.grpc.ServerRelayConfiguration + 19, // 16: buildbarn.configuration.grpc.ServerKeepaliveEnforcementPolicy.min_time:type_name -> google.protobuf.Duration + 19, // 17: buildbarn.configuration.grpc.ServerKeepaliveParameters.max_connection_idle:type_name -> google.protobuf.Duration + 19, // 18: buildbarn.configuration.grpc.ServerKeepaliveParameters.max_connection_age:type_name -> google.protobuf.Duration + 19, // 19: buildbarn.configuration.grpc.ServerKeepaliveParameters.max_connection_age_grace:type_name -> google.protobuf.Duration + 19, // 20: buildbarn.configuration.grpc.ServerKeepaliveParameters.time:type_name -> google.protobuf.Duration + 19, // 21: buildbarn.configuration.grpc.ServerKeepaliveParameters.timeout:type_name -> google.protobuf.Duration + 22, // 22: buildbarn.configuration.grpc.AuthenticationPolicy.allow:type_name -> buildbarn.auth.AuthenticationMetadata + 6, // 23: buildbarn.configuration.grpc.AuthenticationPolicy.any:type_name -> buildbarn.configuration.grpc.AnyAuthenticationPolicy + 7, // 24: buildbarn.configuration.grpc.AuthenticationPolicy.all:type_name -> buildbarn.configuration.grpc.AllAuthenticationPolicy + 23, // 25: buildbarn.configuration.grpc.AuthenticationPolicy.tls_client_certificate:type_name -> buildbarn.configuration.x509.ClientCertificateVerifierConfiguration + 24, // 26: buildbarn.configuration.grpc.AuthenticationPolicy.jwt:type_name -> buildbarn.configuration.jwt.AuthorizationHeaderParserConfiguration + 16, // 27: buildbarn.configuration.grpc.AuthenticationPolicy.peer_credentials_jmespath_expression:type_name -> buildbarn.configuration.jmespath.Expression + 9, // 28: buildbarn.configuration.grpc.AuthenticationPolicy.remote:type_name -> buildbarn.configuration.grpc.RemoteAuthenticationPolicy + 5, // 29: buildbarn.configuration.grpc.AnyAuthenticationPolicy.policies:type_name -> buildbarn.configuration.grpc.AuthenticationPolicy + 5, // 30: buildbarn.configuration.grpc.AllAuthenticationPolicy.policies:type_name -> buildbarn.configuration.grpc.AuthenticationPolicy + 16, // 31: buildbarn.configuration.grpc.TLSClientCertificateAuthenticationPolicy.validation_jmespath_expression:type_name -> buildbarn.configuration.jmespath.Expression + 16, // 32: buildbarn.configuration.grpc.TLSClientCertificateAuthenticationPolicy.metadata_extraction_jmespath_expression:type_name -> buildbarn.configuration.jmespath.Expression + 0, // 33: buildbarn.configuration.grpc.RemoteAuthenticationPolicy.endpoint:type_name -> buildbarn.configuration.grpc.ClientConfiguration + 25, // 34: buildbarn.configuration.grpc.RemoteAuthenticationPolicy.scope:type_name -> google.protobuf.Value + 26, // 35: buildbarn.configuration.grpc.RemoteAuthenticationPolicy.cache_replacement_policy:type_name -> buildbarn.configuration.eviction.CacheReplacementPolicy + 0, // 36: buildbarn.configuration.grpc.ServerRelayConfiguration.endpoint:type_name -> buildbarn.configuration.grpc.ClientConfiguration + 10, // 37: buildbarn.configuration.grpc.ClientConfiguration.TracingEntry.value:type_name -> buildbarn.configuration.grpc.TracingMethodConfiguration + 10, // 38: buildbarn.configuration.grpc.ServerConfiguration.TracingEntry.value:type_name -> buildbarn.configuration.grpc.TracingMethodConfiguration + 39, // [39:39] is the sub-list for method output_type + 39, // [39:39] is the sub-list for method input_type + 39, // [39:39] is the sub-list for extension type_name + 39, // [39:39] is the sub-list for extension extendee + 0, // [0:39] is the sub-list for field type_name } func init() { file_github_com_buildbarn_bb_storage_pkg_proto_configuration_grpc_grpc_proto_init() } @@ -1214,7 +1281,7 @@ func file_github_com_buildbarn_bb_storage_pkg_proto_configuration_grpc_grpc_prot GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_github_com_buildbarn_bb_storage_pkg_proto_configuration_grpc_grpc_proto_rawDesc), len(file_github_com_buildbarn_bb_storage_pkg_proto_configuration_grpc_grpc_proto_rawDesc)), NumEnums: 0, - NumMessages: 14, + NumMessages: 15, NumExtensions: 0, NumServices: 0, }, diff --git a/pkg/proto/configuration/grpc/grpc.proto b/pkg/proto/configuration/grpc/grpc.proto index 083355bf..d82daa37 100644 --- a/pkg/proto/configuration/grpc/grpc.proto +++ b/pkg/proto/configuration/grpc/grpc.proto @@ -240,6 +240,16 @@ message ServerConfiguration { // https://docs.cloud.google.com/docs/security/encryption-in-transit/application-layer-transport-security google.protobuf.Empty alts = 13; } + + // Forward calls to certain named gRPC services to a different endpoint. + // + // One use case is to let the user connect to the same DNS name for extra + // services without having to use separate DNS names or setup another gRPC + // proxy. build.bazel.remote.execution.v2.Execution is a candidate for this, + // but note that build.bazel.remote.execution.v2.Capabilities might still need + // information from the scheduler. Another use case is Bazel's Build Event + // Streaming. + repeated ServerRelayConfiguration relays = 14; } message ServerKeepaliveEnforcementPolicy { @@ -501,3 +511,15 @@ message TracingMethodConfiguration { // 'attributes_from_first_request_message'. repeated string attributes_from_first_response_message = 2; } + +message ServerRelayConfiguration { + // The remote gRPC server to forward the gRPC calls to. + ClientConfiguration endpoint = 1; + + // The gRPC services to relay. + // Examples of valid names include: + // + // build.bazel.remote.execution.v2.Execution + // com.google.devtools.build.v1.PublishBuildEvent + repeated string services = 2; +}