Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion client/callopt/streamcall/call_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func WithTag(key, val string) Option {
}

// WithRecvTimeout add recv timeout for stream.Recv function.
// NOTICE: ONLY effective for ttheader streaming protocol for now.
func WithRecvTimeout(d time.Duration) Option {
return Option{f: func(o *callopt.CallOptions, di *strings.Builder) {
di.WriteString("WithRecvTimeout(")
Expand All @@ -58,3 +57,25 @@ func WithRecvTimeout(d time.Duration) Option {
o.StreamOptions.RecvTimeout = d
}}
}

// WithSendTimeout add send timeout for stream.Send function.
func WithSendTimeout(d time.Duration) Option {
return Option{f: func(o *callopt.CallOptions, di *strings.Builder) {
di.WriteString("WithSendTimeout(")
di.WriteString(d.String())
di.WriteString(")")

o.StreamOptions.SendTimeout = d
}}
}

// WithStreamTimeout add timeout for whole stream.
func WithStreamTimeout(d time.Duration) Option {
return Option{f: func(o *callopt.CallOptions, di *strings.Builder) {
di.WriteString("WithStreamTimeout(")
di.WriteString(d.String())
di.WriteString(")")

o.StreamOptions.StreamTimeout = d
}}
}
32 changes: 25 additions & 7 deletions client/callopt/streamcall/call_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,29 @@ import (
"github.com/cloudwego/kitex/internal/test"
)

func TestWithRecvTimeout(t *testing.T) {
var sb strings.Builder
callOpts := callopt.CallOptions{}
testTimeout := 1 * time.Second
WithRecvTimeout(testTimeout).f(&callOpts, &sb)
test.Assert(t, callOpts.StreamOptions.RecvTimeout == testTimeout)
test.Assert(t, sb.String() == "WithRecvTimeout(1s)")
func Test_streamCallTimeoutCallOptions(t *testing.T) {
t.Run("WithRecvTimeout", func(t *testing.T) {
var sb strings.Builder
callOpts := callopt.CallOptions{}
testTimeout := 1 * time.Second
WithRecvTimeout(testTimeout).f(&callOpts, &sb)
test.Assert(t, callOpts.StreamOptions.RecvTimeout == testTimeout)
test.Assert(t, sb.String() == "WithRecvTimeout(1s)")
})
t.Run("WithSendTimeout", func(t *testing.T) {
var sb strings.Builder
callOpts := callopt.CallOptions{}
testTimeout := 1 * time.Second
WithSendTimeout(testTimeout).f(&callOpts, &sb)
test.Assert(t, callOpts.StreamOptions.SendTimeout == testTimeout)
test.Assert(t, sb.String() == "WithSendTimeout(1s)")
})
t.Run("WithStreamTimeout", func(t *testing.T) {
var sb strings.Builder
callOpts := callopt.CallOptions{}
testTimeout := 1 * time.Second
WithStreamTimeout(testTimeout).f(&callOpts, &sb)
test.Assert(t, callOpts.StreamOptions.StreamTimeout == testTimeout)
test.Assert(t, sb.String() == "WithStreamTimeout(1s)")
})
}
12 changes: 12 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,12 @@ func initRPCInfo(ctx context.Context, method string, opt *client.Options, svcInf
if sopt.RecvTimeout > 0 {
cfg.SetStreamRecvTimeout(sopt.RecvTimeout)
}
if sopt.SendTimeout > 0 {
cfg.SetStreamSendTimeout(sopt.SendTimeout)
}
if sopt.StreamTimeout > 0 {
cfg.SetStreamTimeout(sopt.StreamTimeout)
}

ctx = rpcinfo.NewCtxWithRPCInfo(ctx, ri)

Expand All @@ -838,6 +844,12 @@ func initRPCInfo(ctx context.Context, method string, opt *client.Options, svcInf
if callOpts.StreamOptions.RecvTimeout != 0 {
cfg.SetStreamRecvTimeout(callOpts.StreamOptions.RecvTimeout)
}
if callOpts.StreamOptions.SendTimeout != 0 {
cfg.SetStreamSendTimeout(callOpts.StreamOptions.SendTimeout)
}
if callOpts.StreamOptions.StreamTimeout != 0 {
cfg.SetStreamTimeout(callOpts.StreamOptions.StreamTimeout)
}
}

return ctx, ri, callOpts
Expand Down
74 changes: 59 additions & 15 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1266,22 +1266,66 @@ func Test_initRPCInfoWithStreamClientCallOption(t *testing.T) {
mtd := mocks.MockMethod
svcInfo := mocks.ServiceInfo()
callOptTimeout := 1 * time.Second
cliTimeout := 2 * time.Second
testService := "testService"

// config call option
cliIntf, err := NewClient(svcInfo, WithTransportProtocol(transport.TTHeaderStreaming), WithDestService(testService))
test.Assert(t, err == nil, err)
cli := cliIntf.(*kcFinalizerClient)
ctx := NewCtxWithCallOptions(context.Background(), streamcall.GetCallOptions([]streamcall.Option{streamcall.WithRecvTimeout(callOptTimeout)}))
_, ri, _ := cli.initRPCInfo(ctx, mtd, 0, nil, true)
test.Assert(t, ri.Config().StreamRecvTimeout() == callOptTimeout)
testcases := []struct {
desc string
cliOpt StreamOption
callOpt streamcall.Option
verifyFunc func(t *testing.T, ri rpcinfo.RPCInfo, isPureCli bool)
}{
{
desc: "stream recv timeout",
cliOpt: WithStreamRecvTimeout(cliTimeout),
callOpt: streamcall.WithRecvTimeout(callOptTimeout),
verifyFunc: func(t *testing.T, ri rpcinfo.RPCInfo, isPureCli bool) {
if isPureCli {
test.Assert(t, ri.Config().StreamRecvTimeout() == cliTimeout, ri)
} else {
test.Assert(t, ri.Config().StreamRecvTimeout() == callOptTimeout, ri)
}
},
},
{
desc: "stream send timeout",
cliOpt: WithStreamSendTimeout(cliTimeout),
callOpt: streamcall.WithSendTimeout(callOptTimeout),
verifyFunc: func(t *testing.T, ri rpcinfo.RPCInfo, isPureCli bool) {
if isPureCli {
test.Assert(t, ri.Config().StreamSendTimeout() == cliTimeout, ri)
} else {
test.Assert(t, ri.Config().StreamSendTimeout() == callOptTimeout, ri)
}
},
},
{
desc: "stream timeout",
cliOpt: WithStreamTimeout(cliTimeout),
callOpt: streamcall.WithStreamTimeout(callOptTimeout),
verifyFunc: func(t *testing.T, ri rpcinfo.RPCInfo, isPureCli bool) {
if isPureCli {
test.Assert(t, ri.Config().StreamTimeout() == cliTimeout, ri)
} else {
test.Assert(t, ri.Config().StreamTimeout() == callOptTimeout, ri)
}
},
},
}

// call option has higher priority
cliTimeout := 2 * time.Second
cliIntf, err = NewClient(svcInfo, WithTransportProtocol(transport.TTHeaderStreaming), WithStreamOptions(WithStreamRecvTimeout(cliTimeout)), WithDestService(testService))
test.Assert(t, err == nil, err)
cli = cliIntf.(*kcFinalizerClient)
ctx = NewCtxWithCallOptions(context.Background(), streamcall.GetCallOptions([]streamcall.Option{streamcall.WithRecvTimeout(callOptTimeout)}))
_, ri, _ = cli.initRPCInfo(ctx, mtd, 0, nil, true)
test.Assert(t, ri.Config().StreamRecvTimeout() == callOptTimeout)
for _, tc := range testcases {
t.Run(tc.desc, func(t *testing.T) {
// config client option
cliIntf, err := NewClient(svcInfo, WithTransportProtocol(transport.TTHeaderStreaming), WithDestService(testService), WithStreamOptions(tc.cliOpt))
test.Assert(t, err == nil, err)
cli := cliIntf.(*kcFinalizerClient)
_, ri, _ := cli.initRPCInfo(ctx, mtd, 0, nil, true)
tc.verifyFunc(t, ri, true)

// call option has higher priority
ctx = NewCtxWithCallOptions(context.Background(), streamcall.GetCallOptions([]streamcall.Option{tc.callOpt}))
_, ri, _ = cli.initRPCInfo(ctx, mtd, 0, nil, true)
tc.verifyFunc(t, ri, false)
})
}
}
19 changes: 18 additions & 1 deletion client/option_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func WithStreamOptions(opts ...StreamOption) Option {
}

// WithStreamRecvTimeout add recv timeout for stream.Recv function.
// NOTICE: ONLY effective for ttheader streaming protocol for now.
func WithStreamRecvTimeout(d time.Duration) StreamOption {
return StreamOption{F: func(o *client.StreamOptions, di *utils.Slice) {
di.Push(fmt.Sprintf("WithStreamRecvTimeout(%dms)", d.Milliseconds()))
Expand All @@ -49,6 +48,24 @@ func WithStreamRecvTimeout(d time.Duration) StreamOption {
}}
}

// WithStreamSendTimeout add send timeout for stream.Send function.
func WithStreamSendTimeout(d time.Duration) StreamOption {
return StreamOption{F: func(o *client.StreamOptions, di *utils.Slice) {
di.Push(fmt.Sprintf("WithStreamSendTimeout(%dms)", d.Milliseconds()))

o.SendTimeout = d
}}
}

// WithStreamTimeout add timeout for whole stream.
func WithStreamTimeout(d time.Duration) StreamOption {
return StreamOption{F: func(o *client.StreamOptions, di *utils.Slice) {
di.Push(fmt.Sprintf("WithStreamTimeout(%dms)", d.Milliseconds()))

o.StreamTimeout = d
}}
}

// WithStreamMiddleware add middleware for stream.
func WithStreamMiddleware(mw cep.StreamMiddleware) StreamOption {
return StreamOption{F: func(o *StreamOptions, di *utils.Slice) {
Expand Down
Loading
Loading