From 424e3e9894f9206fca433fb4ba66f639be56e325 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 2 Feb 2018 10:35:15 -0800 Subject: [PATCH] Stream: do not cancel ctx created with service config timeout (#1838) --- stream.go | 29 ++++++++++++----- test/end2end_test.go | 74 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 7 deletions(-) diff --git a/stream.go b/stream.go index 8189e8327..d76f4dbeb 100644 --- a/stream.go +++ b/stream.go @@ -90,8 +90,9 @@ type ClientStream interface { // Stream.SendMsg() may return a non-nil error when something wrong happens sending // the request. The returned error indicates the status of this sending, not the final // status of the RPC. - // Always call Stream.RecvMsg() to get the final status if you care about the status of - // the RPC. + // + // Always call Stream.RecvMsg() to drain the stream and get the final + // status, otherwise there could be leaked resources. Stream } @@ -126,6 +127,14 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } if mc.Timeout != nil && *mc.Timeout >= 0 { + // The cancel function for this context will only be called when RecvMsg + // returns non-nil error, which means the stream finishes with error or + // io.EOF. https://github.com/grpc/grpc-go/issues/1818. + // + // Possible context leak: + // - If user provided context is Background, and the user doesn't call + // RecvMsg() for the final status, this ctx will be leaked after the + // stream is done, until the service config timeout happens. ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) defer func() { if err != nil { @@ -322,6 +331,8 @@ type clientStream struct { decomp encoding.Compressor decompSet bool + // cancel is only called when RecvMsg() returns non-nil error, which means + // the stream finishes with error or with io.EOF. cancel context.CancelFunc tracing bool // set to EnableTracing when the clientStream is created. @@ -446,6 +457,9 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { // err != nil indicates the termination of the stream. if err != nil { cs.finish(err) + if cs.cancel != nil { + cs.cancel() + } } }() if err == nil { @@ -477,6 +491,9 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { return se } cs.finish(err) + if cs.cancel != nil { + cs.cancel() + } return nil } return toRPCErr(err) @@ -523,17 +540,15 @@ func (cs *clientStream) closeTransportStream(err error) { } func (cs *clientStream) finish(err error) { + // Do not call cs.cancel in this function. Only call it when RecvMag() + // returns non-nil error because of + // https://github.com/grpc/grpc-go/issues/1818. cs.mu.Lock() defer cs.mu.Unlock() if cs.finished { return } cs.finished = true - defer func() { - if cs.cancel != nil { - cs.cancel() - } - }() for _, o := range cs.opts { o.after(cs.c) } diff --git a/test/end2end_test.go b/test/end2end_test.go index 467b1657a..412027d3b 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -1808,6 +1808,80 @@ func TestServiceConfigMaxMsgSize(t *testing.T) { } } +// Reading from a streaming RPC may fail with context canceled if timeout was +// set by service config (https://github.com/grpc/grpc-go/issues/1818). This +// test makes sure read from streaming RPC doesn't fail in this case. +func TestStreamingRPCWithTimeoutInServiceConfigRecv(t *testing.T) { + te := testServiceConfigSetup(t, tcpClearRREnv) + te.startServer(&testServer{security: tcpClearRREnv.security}) + defer te.tearDown() + r, rcleanup := manual.GenerateAndRegisterManualResolver() + defer rcleanup() + + te.resolverScheme = r.Scheme() + te.nonBlockingDial = true + fmt.Println("1") + cc := te.clientConn() + fmt.Println("10") + tc := testpb.NewTestServiceClient(cc) + + r.NewAddress([]resolver.Address{{Addr: te.srvAddr}}) + r.NewServiceConfig(`{ + "methodConfig": [ + { + "name": [ + { + "service": "grpc.testing.TestService", + "method": "FullDuplexCall" + } + ], + "waitForReady": true, + "timeout": "10s" + } + ] + }`) + // Make sure service config has been processed by grpc. + for { + if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil { + break + } + time.Sleep(time.Millisecond) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)) + if err != nil { + t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want ", err) + } + + payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 0) + if err != nil { + t.Fatalf("failed to newPayload: %v", err) + } + req := &testpb.StreamingOutputCallRequest{ + ResponseType: testpb.PayloadType_COMPRESSABLE, + ResponseParameters: []*testpb.ResponseParameters{{Size: 0}}, + Payload: payload, + } + if err := stream.Send(req); err != nil { + t.Fatalf("stream.Send(%v) = %v, want ", req, err) + } + stream.CloseSend() + time.Sleep(time.Second) + // Sleep 1 second before recv to make sure the final status is received + // before the recv. + if _, err := stream.Recv(); err != nil { + t.Fatalf("stream.Recv = _, %v, want _, ", err) + } + // Keep reading to drain the stream. + for { + if _, err := stream.Recv(); err != nil { + break + } + } +} + func TestMaxMsgSizeClientDefault(t *testing.T) { defer leakcheck.Check(t) for _, e := range listTestEnv() {