diff --git a/client/client.go b/client/client.go index 4b8215ec..545a1444 100644 --- a/client/client.go +++ b/client/client.go @@ -35,6 +35,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" pb "github.com/dapr/dapr/pkg/proto/runtime/v1" @@ -44,14 +45,16 @@ import ( ) const ( - daprPortDefault = "50001" - daprPortEnvVarName = "DAPR_GRPC_PORT" /* #nosec */ - daprGRPCEndpointEnvVarName = "DAPR_GRPC_ENDPOINT" - traceparentKey = "traceparent" - apiTokenKey = "dapr-api-token" /* #nosec */ - apiTokenEnvVarName = "DAPR_API_TOKEN" /* #nosec */ - clientDefaultTimeoutSeconds = 5 - clientTimeoutSecondsEnvVarName = "DAPR_CLIENT_TIMEOUT_SECONDS" + daprPortDefault = "50001" + daprPortEnvVarName = "DAPR_GRPC_PORT" /* #nosec */ + daprGRPCEndpointEnvVarName = "DAPR_GRPC_ENDPOINT" + traceparentKey = "traceparent" + apiTokenKey = "dapr-api-token" /* #nosec */ + apiTokenEnvVarName = "DAPR_API_TOKEN" /* #nosec */ + clientDefaultTimeoutSeconds = 5 + clientDefaultKeepaliveSeconds = 10 + clientTimeoutSecondsEnvVarName = "DAPR_CLIENT_TIMEOUT_SECONDS" + clientKeepAliveSecondsEnvVarName = "DAPR_CLIENT_KEEP_ALIVE_SECONDS" ) var ( @@ -350,6 +353,11 @@ func NewClientWithAddressContext(ctx context.Context, address string) (client Cl return nil, err } + keepAliveSeconds, err := getClientKeepaliveSeconds() + if err != nil { + return nil, err + } + parsedAddress, err := internal.ParseGRPCEndpoint(address) if err != nil { return nil, fmt.Errorf("error parsing address '%s': %w", address, err) @@ -362,6 +370,13 @@ func NewClientWithAddressContext(ctx context.Context, address string) (client Cl grpc.WithBlock(), //nolint:staticcheck authTokenUnaryInterceptor(at), authTokenStreamInterceptor(at), + grpc.WithKeepaliveParams( + keepalive.ClientParameters{ + Time: time.Duration(keepAliveSeconds), + Timeout: time.Second, + PermitWithoutStream: true, + }, + ), } if parsedAddress.TLS { @@ -399,6 +414,22 @@ func getClientTimeoutSeconds() (int, error) { return timeoutVar, nil } +func getClientKeepaliveSeconds() (int, error) { + tStr := os.Getenv(clientKeepAliveSecondsEnvVarName) + if len(tStr) == 0 { + return clientDefaultKeepaliveSeconds, nil + } + keepAlive, err := strconv.Atoi(tStr) + if err != nil { + return 0, fmt.Errorf("error parsing keep alive value (%s): %w", + tStr, err) + } + if keepAlive <= 0 { + return 0, errors.New("incorrect value") + } + return keepAlive, nil +} + // NewClientWithSocket instantiates Dapr using specific socket. func NewClientWithSocket(socket string) (client Client, err error) { if socket == "" {