diff --git a/bootstrap/server.go b/bootstrap/server.go index b21eef04a..50aa59994 100644 --- a/bootstrap/server.go +++ b/bootstrap/server.go @@ -40,6 +40,7 @@ import ( "github.com/polarismesh/polaris/common/log" "github.com/polarismesh/polaris/common/metrics" "github.com/polarismesh/polaris/common/model" + "github.com/polarismesh/polaris/common/pluggable" "github.com/polarismesh/polaris/common/utils" "github.com/polarismesh/polaris/common/version" config_center "github.com/polarismesh/polaris/config" @@ -98,6 +99,12 @@ func Start(configFilePath string) { metrics.InitMetrics() eventhub.InitEventHub() + // 加载可插拔插件 + if err = pluggable.Discovery(ctx); err != nil { + fmt.Printf("[ERROR] discover pluggable plugin fail: %+v", err) + return + } + // 设置插件配置 plugin.SetPluginConfig(&cfg.Plugin) diff --git a/common/pluggable/grpc.go b/common/pluggable/grpc.go new file mode 100644 index 000000000..a073de550 --- /dev/null +++ b/common/pluggable/grpc.go @@ -0,0 +1,115 @@ +/** + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package pluggable + +import ( + "context" + "time" + + "github.com/pkg/errors" + "github.com/polaris-contrib/polaris-server-remote-plugin-common/api" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// SetupTimeout is the timeout for setting up a connection. +const SetupTimeout = 5 * time.Second + +// GRPCConnectionDialer defines the function to dial a grpc connection. +type GRPCConnectionDialer func(ctx context.Context, name string) (*grpc.ClientConn, error) + +// SocketDialContext dials a gRPC connection using a socket. +func SocketDialContext(ctx context.Context, socket string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + unixSock := "unix://" + socket + + // disable TLS as default when using socket + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + + dialCtx, cancel := context.WithTimeout(ctx, SetupTimeout) + defer cancel() + + grpcConn, err := grpc.DialContext(dialCtx, unixSock, opts...) + if err != nil { + return nil, err + } + + return grpcConn, nil +} + +// GRPCPluginClient defines the interface for a gRPC plugin client, +// polaris server will call the plugin client's Ping method to check if the plugin is alive. +type GRPCPluginClient interface { + // Ping checks if the plugin is alive. + Ping(ctx context.Context, in *api.PingRequest, opts ...grpc.CallOption) (*api.PongResponse, error) +} + +// GRPCConnector defines the connector for a gRPC plugin. +type GRPCConnector struct { + // pluginClient is the client that is used to communicate with the plugin, exposed for plugin logic layer. + pluginClient GRPCPluginClient + // dialer use to dial a grpc connection. + dialer GRPCConnectionDialer + // conn is the grpc client connection. + conn *grpc.ClientConn + // clientFactory is the factory to create a grpc client. + clientFactory func(grpc.ClientConnInterface) GRPCPluginClient +} + +// NewGRPCConnectorWithDialer creates a new grpc connector for the given client factory and dialer. +func NewGRPCConnectorWithDialer( + dialer GRPCConnectionDialer, factory func(grpc.ClientConnInterface) GRPCPluginClient) *GRPCConnector { + return &GRPCConnector{ + dialer: dialer, + clientFactory: factory, + } +} + +// Dial init a grpc connection to the plugin server and create a grpc client. +func (g *GRPCConnector) Dial(ctx context.Context, name string) error { + conn, err := g.dialer(ctx, name) + if err != nil { + return errors.Wrapf(err, "unable to open GRPC connection using the dialer") + } + + g.conn = conn + g.pluginClient = g.clientFactory(conn) + return nil +} + +// PluginClient returns the grpc client. +func (g *GRPCConnector) PluginClient() GRPCPluginClient { + return g.pluginClient +} + +// socketDialer returns a GRPCConnectionDialer that dials a grpc connection using a socket. +func socketDialer(socket string, opts ...grpc.DialOption) GRPCConnectionDialer { + return func(ctx context.Context, name string) (*grpc.ClientConn, error) { + return SocketDialContext(ctx, socket, opts...) + } +} + +// Ping checks if the plugin is alive. +func (g *GRPCConnector) Ping(ctx context.Context) error { + _, err := g.pluginClient.Ping(ctx, &api.PingRequest{}, grpc.WaitForReady(true)) + return err +} + +// Close closes the underlying gRPC connection. +func (g *GRPCConnector) Close() error { + return g.conn.Close() +} diff --git a/common/pluggable/pluggable.go b/common/pluggable/pluggable.go new file mode 100644 index 000000000..75eac87f3 --- /dev/null +++ b/common/pluggable/pluggable.go @@ -0,0 +1,180 @@ +/** + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package pluggable + +import ( + "context" + "os" + "path/filepath" + "strings" + + "github.com/jhump/protoreflect/grpcreflect" + "github.com/pkg/errors" + "google.golang.org/grpc" + _ "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" + reflectV1Alpha "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" + + "github.com/polarismesh/polaris/common/log" +) + +const ( + // envVarPolarisPluggableFolder + envVarPolarisPluggableFolder string = "POLARIS_PLUGGABLE_SOCKETS_FOLDER" + // defaultPolarisPluggablePath + defaultPolarisPluggablePath = "/tmp/polaris-pluggable-sockets" +) + +// onFinishedCallback is a callback to be called when a plugin is finished. +type onFinishedCallback func(name string, dialer GRPCConnectionDialer) + +// onFinished +var onFinished = make(map[string]onFinishedCallback) + +// AddOnFinished adds a callback to be called when a plugin is finished. +func AddOnFinished(serviceDesc string, cb onFinishedCallback) { + _, ok := onFinished[serviceDesc] + if ok { + log.Fatalf("onFinished callback for %s already exists", serviceDesc) + } + onFinished[serviceDesc] = cb +} + +// Discovery discovers all the plugins. +func Discovery(ctx context.Context) error { + services, err := discovery(ctx) + if err != nil { + return err + } + finished(services) + return nil +} + +// finished calls the onFinished callback for the given services. +func finished(services []*pluginService) { + for _, svc := range services { + cb, ok := onFinished[svc.protoRef] + if !ok { + continue + } + + cb(svc.name, svc.dialer) + log.Infof("discovered pluggable component service: %s", svc.protoRef) + } +} + +// pluginService is a plugin service. +type pluginService struct { + name string + protoRef string + dialer GRPCConnectionDialer +} + +// discovery discovers all the plugins. +func discovery(ctx context.Context) ([]*pluginService, error) { + sockFolder := socketFolder() + files, err := pluginFiles(sockFolder) + if err != nil { + return nil, err + } + + var services []*pluginService + for _, dirEntry := range files { + if dirEntry.IsDir() { + continue + } + + var discoveredServices []*pluginService + discoveredServices, err = trySingleSocket(ctx, dirEntry, sockFolder) + + // skip non-socket files. + if err == errNotSocket { + continue + } + + // return error if any other error occurs. + if err != nil { + return nil, err + } + + services = append(services, discoveredServices...) + } + return services, nil +} + +// trySingleSocket tries to discover plugins in a single socket. +func trySingleSocket(ctx context.Context, entry os.DirEntry, socketsFolder string) ([]*pluginService, error) { + socket, err := socketName(entry) + if err != nil { + return nil, err + } + + socketFullPath := filepath.Join(socketsFolder, socket) + reflectClient, cleanup, err := dialServerReflection(ctx, socketFullPath) + if err != nil { + return nil, err + } + defer cleanup() + + services, err := reflectClient.ListServices() + if err != nil { + return nil, errors.Wrapf(err, "unable to list plugin: %s's services", socket) + } + + socketNameWithoutExt := strings.Trim(socket, filepath.Ext(socket)) + dialer := socketDialer(socketFullPath, grpc.WithBlock(), grpc.FailOnNonTempDialError(true)) + + var pluginServices []*pluginService + for _, svc := range services { + pluginServices = append(pluginServices, &pluginService{ + protoRef: svc, + dialer: dialer, + name: socketNameWithoutExt, + }) + } + + return pluginServices, nil +} + +// dialServerReflection dials the server reflection service, returning the client and a cleanup function. +func dialServerReflection(ctx context.Context, socket string) (*grpcreflect.Client, func(), error) { + conn, err := SocketDialContext(ctx, socket, grpc.WithBlock()) + if err != nil { + return nil, nil, err + } + + reflectClient := grpcreflect.NewClientV1Alpha(ctx, reflectV1Alpha.NewServerReflectionClient(conn)) + return reflectClient, reflectionConnectionCleanup(conn, reflectClient), nil +} + +// reflectionConnectionCleanup closes the reflection connection. +func reflectionConnectionCleanup(conn *grpc.ClientConn, client *grpcreflect.Client) func() { + return func() { + client.Reset() + if err := conn.Close(); err != nil { + log.Errorf("error closing grpc reflection connection: %v", err) + } + } +} + +// socketFolder returns the socket folder path specified by the environment variable. +func socketFolder() string { + if value, ok := os.LookupEnv(envVarPolarisPluggableFolder); ok { + return value + } + return defaultPolarisPluggablePath +} diff --git a/common/pluggable/utils.go b/common/pluggable/utils.go new file mode 100644 index 000000000..0338093d9 --- /dev/null +++ b/common/pluggable/utils.go @@ -0,0 +1,72 @@ +/** + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package pluggable + +import ( + "os" + + "github.com/pkg/errors" + + "github.com/polarismesh/polaris/common/log" +) + +var ( + // errNotSocket is returned when the file is not a socket. + errNotSocket = errors.New("not a socket") +) + +// pluginFiles returns the plugin files in the socket folder. +func pluginFiles(sockFolder string) ([]os.DirEntry, error) { + _, err := os.Stat(sockFolder) + if os.IsNotExist(err) { + log.Infof("socket folder %s does not exist, skip plugin discovery", sockFolder) + return nil, nil + } + + if err != nil { + log.Errorf("failed to stat socket folder %s: %v", sockFolder, err) + return nil, err + } + + var files []os.DirEntry + files, err = os.ReadDir(sockFolder) + if err != nil { + return nil, errors.Wrapf(err, "failed to read socket folder %s", sockFolder) + } + + return files, nil +} + +// socketName returns true if the file is a socket. +func socketName(entry os.DirEntry) (string, error) { + if entry.IsDir() { + return "", errNotSocket + } + + f, err := entry.Info() + if err != nil { + return "", err + } + + // skip non-socket files. + if f.Mode()&os.ModeSocket == 0 { + return "", errNotSocket + } + + return entry.Name(), nil +} diff --git a/go.mod b/go.mod index ab74b504b..675303a77 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/BurntSushi/toml v1.2.0 github.com/boltdb/bolt v1.3.1 github.com/emicklei/go-restful/v3 v3.9.0 - github.com/envoyproxy/go-control-plane v0.11.0 + github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1 github.com/go-openapi/spec v0.20.7 github.com/go-redis/redis/v8 v8.11.5 github.com/go-sql-driver/mysql v1.6.0 @@ -16,11 +16,13 @@ require ( github.com/google/uuid v1.3.0 github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/golang-lru v0.5.4 + github.com/jhump/protoreflect v1.15.1 github.com/json-iterator/go v1.1.12 // indirect github.com/mitchellh/mapstructure v1.4.3 github.com/natefinch/lumberjack v2.0.0+incompatible github.com/nicksnyder/go-i18n/v2 v2.2.0 github.com/pkg/errors v0.9.1 + github.com/polaris-contrib/polaris-server-remote-plugin-common v0.0.1 github.com/polarismesh/go-restful-openapi/v2 v2.0.0-20220928152401-083908d10219 github.com/prometheus/client_golang v1.12.2 github.com/smartystreets/goconvey v1.6.4 @@ -30,12 +32,12 @@ require ( go.uber.org/automaxprocs v1.4.0 go.uber.org/zap v1.23.0 golang.org/x/crypto v0.1.0 - golang.org/x/net v0.4.0 // indirect + golang.org/x/net v0.7.0 // indirect golang.org/x/sync v0.1.0 - golang.org/x/text v0.5.0 + golang.org/x/text v0.7.0 golang.org/x/time v0.1.1-0.20221020023724-80b9fac54d29 google.golang.org/grpc v1.52.0 - google.golang.org/protobuf v1.28.1 + google.golang.org/protobuf v1.28.2-0.20230222093303-bc1253ad3743 gopkg.in/yaml.v2 v2.4.0 ) @@ -49,6 +51,7 @@ require ( // Indirect dependencies group require ( github.com/beorn7/perks v1.0.1 // indirect + github.com/bufbuild/protocompile v0.4.0 // indirect github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc // indirect @@ -75,7 +78,7 @@ require ( github.com/spf13/pflag v1.0.5 // indirect go.uber.org/goleak v1.1.12 // indirect go.uber.org/multierr v1.8.0 // indirect - golang.org/x/sys v0.3.0 // indirect + golang.org/x/sys v0.5.0 // indirect google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index d685a7dc9..50e6e0ea8 100644 --- a/go.sum +++ b/go.sum @@ -68,6 +68,8 @@ github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kB github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM= github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= +github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA= +github.com/bufbuild/protocompile v0.4.0/go.mod h1:3v93+mbWn/v3xzN+31nwkJfrEpAUwp+BagBSZWx+TP8= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g= github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= @@ -81,6 +83,7 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc h1:PYXxkRUBGUMa5xgMVMDl62vEklZvKpVaxQeN9ie7Hfk= github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= @@ -101,8 +104,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= -github.com/envoyproxy/go-control-plane v0.11.0 h1:jtLewhRR2vMRNnq2ZZUoCjUlgut+Y0+sDDWPOfwOi1o= -github.com/envoyproxy/go-control-plane v0.11.0/go.mod h1:VnHyVMpzcLvCFt9yUz1UnCwHLhwx1WguiVDV7pTG/tI= +github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1 h1:xvqufLtNVwAhN8NMyWklVgxnWohi+wtMGQMhtxexlm0= +github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.9.1 h1:PS7VIOgmSVhWUEeZwTe7z7zouA22Cr590PzXKbZHOVY= github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w= @@ -185,6 +188,7 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= @@ -241,6 +245,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c= +github.com/jhump/protoreflect v1.15.1/go.mod h1:jD/2GMKKE6OqX8qTjhADU1e6DShO+gavG9e0Q693nKo= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= @@ -316,6 +322,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/polaris-contrib/polaris-server-remote-plugin-common v0.0.1 h1:LpLqifQLh8QhREuWHOUrKgV9/kTl+y0vgSA0KxpP3pw= +github.com/polaris-contrib/polaris-server-remote-plugin-common v0.0.1/go.mod h1:HFAxuvZbOTZyAommWHJv425l66l+HdqW59TN1hBsTQY= github.com/polarismesh/go-restful-openapi/v2 v2.0.0-20220928152401-083908d10219 h1:XnFyNUWnciM6zgXaz6tm+Egs35rhoD0KGMmKh4gCdi0= github.com/polarismesh/go-restful-openapi/v2 v2.0.0-20220928152401-083908d10219/go.mod h1:4WhwBysTom9Eoy0hQ4W69I0FmO+T0EpjEW9/5sgHoUk= github.com/polarismesh/specification v1.3.2-alpha.4 h1:J5Qh1Ef1RKbuuD1UqKr6mfG1gXY4rSLL0DQQph2CJP8= @@ -397,6 +405,7 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= @@ -495,8 +504,8 @@ golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLd golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1Kcs5dz7/ng1VjMUvfKvpfy+jM= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU= -golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= +golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -575,8 +584,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= -golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -587,8 +596,8 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM= -golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -757,8 +766,9 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= -google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.2-0.20230222093303-bc1253ad3743 h1:yqElulDvOF26oZ2O+2/aoX7mQ8DY/6+p39neytrycd8= +google.golang.org/protobuf v1.28.2-0.20230222093303-bc1253ad3743/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/plugin.go b/plugin.go index 3f3740cb9..ca7ec1b78 100644 --- a/plugin.go +++ b/plugin.go @@ -35,6 +35,7 @@ import ( _ "github.com/polarismesh/polaris/plugin/history/logger" _ "github.com/polarismesh/polaris/plugin/password" _ "github.com/polarismesh/polaris/plugin/ratelimit/lrurate" + _ "github.com/polarismesh/polaris/plugin/ratelimit/pluggable" _ "github.com/polarismesh/polaris/plugin/ratelimit/token" _ "github.com/polarismesh/polaris/plugin/statis/logger" _ "github.com/polarismesh/polaris/plugin/statis/prometheus" diff --git a/plugin/ratelimit/pluggable/pluggable.go b/plugin/ratelimit/pluggable/pluggable.go new file mode 100644 index 000000000..a457fa527 --- /dev/null +++ b/plugin/ratelimit/pluggable/pluggable.go @@ -0,0 +1,128 @@ +/** + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package pluggable + +import ( + "context" + "time" + + "github.com/pkg/errors" + "github.com/polaris-contrib/polaris-server-remote-plugin-common/api" + "google.golang.org/grpc" + + "github.com/polarismesh/polaris/common/log" + "github.com/polarismesh/polaris/common/pluggable" + "github.com/polarismesh/polaris/plugin" +) + +const ( + // PluginName pluggable rate limit plugin + PluginName = "pluggable-rate-limit" +) + +// init register plugin +func init() { + pluggable.AddOnFinished( + api.RateLimiter_ServiceDesc.ServiceName, + func(name string, dialer pluggable.GRPCConnectionDialer) { + log.Infof("[Pluggable] registering rate limit plugin %s", name) + plugin.RegisterPlugin(PluginName, newGRPCRateLimiter(dialer)) + }, + ) +} + +// RateLimiter pluggable rate limit plugin +type RateLimiter struct { + *pluggable.GRPCConnector + client api.RateLimiterClient +} + +// Name returns the name of the plugin. +func (r *RateLimiter) Name() string { + return PluginName +} + +// Initialize initializes the plugin. +func (r *RateLimiter) Initialize(c *plugin.ConfigEntry) error { + ctx := context.Background() + if err := r.Dial(ctx, r.Name()); err != nil { + return err + } + + if err := r.Ping(ctx); err != nil { + return errors.Wrapf(err, "plugin %s ping failed", r.Name()) + } + + client, ok := r.PluginClient().(api.RateLimiterClient) + if !ok { + return errors.New("invalid rate limiter client") + } + + r.client = client + + log.Infof("[Pluggable] initialized pluggable rate limit plugin") + return nil +} + +// Destroy destroys the plugin. +func (r *RateLimiter) Destroy() error { + log.Infof("[Pluggable] destroying pluggable rate limit plugin") + return r.Close() +} + +// newGRPCRateLimiter creates a new grpc rate limiter. +func newGRPCRateLimiter(dialer pluggable.GRPCConnectionDialer) *RateLimiter { + return &RateLimiter{ + GRPCConnector: pluggable.NewGRPCConnectorWithDialer( + dialer, + func(connInterface grpc.ClientConnInterface) pluggable.GRPCPluginClient { + client := api.NewRateLimiterClient(connInterface) + return client.(pluggable.GRPCPluginClient) + }, + ), + } +} + +// rateLimitTypes rate limit type map +var rateLimitTypes = map[plugin.RatelimitType]api.RatelimitType{ + plugin.IPRatelimit: api.RatelimitType_IPRatelimit, + plugin.APIRatelimit: api.RatelimitType_APIRatelimit, + plugin.ServiceRatelimit: api.RatelimitType_ServiceRatelimit, + plugin.InstanceRatelimit: api.RatelimitType_InstanceRatelimit, +} + +// Allow 实现是否放行判断逻辑 +func (r *RateLimiter) Allow(typ plugin.RatelimitType, key string) bool { + log.Debugf("[Pluggable] request allow with type: %v and key: %s", typ, key) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + req := &api.RateLimitRequest{ + Type: rateLimitTypes[typ], + Key: key, + } + + rsp, err := r.client.Allow(ctx, req) + if err != nil { + log.Errorf("[Pluggable] fail to request plugin server, get error: %v", err) + return false + } + + return rsp.GetAllow() +}