diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index a02d8e71f..d8936fd80 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -1004,45 +1004,60 @@ func (t *http2Server) Close() error { return err } +// deleteStream deletes the stream s from transport's active streams. +func (t *http2Server) deleteStream(s *Stream, eosReceived bool) { + t.mu.Lock() + if _, ok := t.activeStreams[s.id]; !ok { + t.mu.Unlock() + return + } + + delete(t.activeStreams, s.id) + if len(t.activeStreams) == 0 { + t.idle = time.Now() + } + t.mu.Unlock() + + if channelz.IsOn() { + if eosReceived { + atomic.AddInt64(&t.czData.streamsSucceeded, 1) + } else { + atomic.AddInt64(&t.czData.streamsFailed, 1) + } + } +} + // closeStream clears the footprint of a stream when the stream is not needed // any more. func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) { - if s.swapState(streamDone) == streamDone { - // If the stream was already done, return. - return - } // In case stream sending and receiving are invoked in separate // goroutines (e.g., bi-directional streaming), cancel needs to be // called to interrupt the potential blocking on other goroutines. s.cancel() + + // Deletes the stream from active streams + t.deleteStream(s, eosReceived) + cleanup := &cleanupStream{ streamID: s.id, rst: rst, rstCode: rstCode, - onWrite: func() { - t.mu.Lock() - if t.activeStreams != nil { - delete(t.activeStreams, s.id) - if len(t.activeStreams) == 0 { - t.idle = time.Now() - } - } - t.mu.Unlock() - if channelz.IsOn() { - if eosReceived { - atomic.AddInt64(&t.czData.streamsSucceeded, 1) - } else { - atomic.AddInt64(&t.czData.streamsFailed, 1) - } - } - }, + onWrite: func() {}, } - if hdr != nil { - hdr.cleanup = cleanup - t.controlBuf.put(hdr) - } else { + + // No trailer. Puts cleanupFrame into transport's control buffer. + if hdr == nil { t.controlBuf.put(cleanup) + return } + + // If the stream is already done, don't send the trailer. + if s.swapState(streamDone) == streamDone { + return + } + + hdr.cleanup = cleanup + t.controlBuf.put(hdr) } func (t *http2Server) RemoteAddr() net.Addr { diff --git a/test/end2end_test.go b/test/end2end_test.go index 1592fc735..a1ac4d383 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -5154,6 +5154,7 @@ type stubServer struct { // Customizable implementations of server handlers. emptyCall func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) + unaryCall func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) fullDuplexCall func(stream testpb.TestService_FullDuplexCallServer) error // A client connected to this service the test may use. Created in Start(). @@ -5169,6 +5170,10 @@ func (ss *stubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb. return ss.emptyCall(ctx, in) } +func (ss *stubServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return ss.unaryCall(ctx, in) +} + func (ss *stubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { return ss.fullDuplexCall(stream) } diff --git a/test/stream_cleanup_test.go b/test/stream_cleanup_test.go new file mode 100644 index 000000000..daee0ead0 --- /dev/null +++ b/test/stream_cleanup_test.go @@ -0,0 +1,57 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package test + +import ( + "context" + "testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + testpb "google.golang.org/grpc/test/grpc_testing" +) + +func (s) TestStreamCleanup(t *testing.T) { + const initialWindowSize uint = 70 * 1024 // Must be higher than default 64K, ignored otherwise + const bodySize uint = 2 * initialWindowSize // Something that is not going to fit in a single window + const callRecvMsgSize uint = 1 // The maximum message size the client can receive + + ss := &stubServer{ + unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{Payload: &testpb.Payload{ + Body: make([]byte, bodySize), + }}, nil + }, + emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + } + if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)}, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(callRecvMsgSize))), grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + defer ss.Stop() + + if _, err := ss.client.UnaryCall(context.Background(), &testpb.SimpleRequest{}); status.Code(err) != codes.ResourceExhausted { + t.Fatalf("should fail with ResourceExhausted, message's body size: %v, maximum message size the client can receive: %v", bodySize, callRecvMsgSize) + } + if _, err := ss.client.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + t.Fatalf("should succeed, err: %v", err) + } +}