internal: server deletes stream after receiving an RST_STREAM frame

* Fixes established streams leak in the loopy writer.

RSTStreamFrames used to be ignored by the server transport, if a trailer had already been put into the transport's control buffer. If loopy writer couldn't write anything into a stream because of an error on the client side, then this trailer would never be sent. At that point, server would receive an RSTStreamFrame from client. But this RSTStreamFrame would be ignored because a trailer was already put into the control buffer. This would keep the stream open and in memory on the server side.

With this change, a cleanupStream item is put into the transport's control buffer, whenever an RSTStreamFrame is received by the server, even after a trailer has been put into the buffer.

* When client sends a header to initiate a stream just after sending an RST_STREAM, server gets these frames in the correct order.
When server receives the RST_STREAM, it marks the stream as done and defers the deletion of the stream to the loopy writer by putting a cleanupStream item into control buffer.
Then the server receives the header to initiate a stream. It acts on the header immediately and attempts to create the stream. But because the old stream is not deleted, it hits the number of streams limit and fails.
This commit solves this problem by letting server handle the deletion immediately after receiving the RST_STREAM.

* Refactors deleteStream method.

* Moves consts declarations into test function's body.
This commit is contained in:
Can Guler 2019-02-11 17:33:22 -08:00 committed by GitHub
parent a402911c6f
commit 32559e2175
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 102 additions and 25 deletions

View File

@ -1004,45 +1004,60 @@ func (t *http2Server) Close() error {
return err
}
// deleteStream deletes the stream s from transport's active streams.
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
t.mu.Lock()
if _, ok := t.activeStreams[s.id]; !ok {
t.mu.Unlock()
return
}
delete(t.activeStreams, s.id)
if len(t.activeStreams) == 0 {
t.idle = time.Now()
}
t.mu.Unlock()
if channelz.IsOn() {
if eosReceived {
atomic.AddInt64(&t.czData.streamsSucceeded, 1)
} else {
atomic.AddInt64(&t.czData.streamsFailed, 1)
}
}
}
// closeStream clears the footprint of a stream when the stream is not needed
// any more.
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
if s.swapState(streamDone) == streamDone {
// If the stream was already done, return.
return
}
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
// called to interrupt the potential blocking on other goroutines.
s.cancel()
// Deletes the stream from active streams
t.deleteStream(s, eosReceived)
cleanup := &cleanupStream{
streamID: s.id,
rst: rst,
rstCode: rstCode,
onWrite: func() {
t.mu.Lock()
if t.activeStreams != nil {
delete(t.activeStreams, s.id)
if len(t.activeStreams) == 0 {
t.idle = time.Now()
}
}
t.mu.Unlock()
if channelz.IsOn() {
if eosReceived {
atomic.AddInt64(&t.czData.streamsSucceeded, 1)
} else {
atomic.AddInt64(&t.czData.streamsFailed, 1)
}
}
},
onWrite: func() {},
}
if hdr != nil {
hdr.cleanup = cleanup
t.controlBuf.put(hdr)
} else {
// No trailer. Puts cleanupFrame into transport's control buffer.
if hdr == nil {
t.controlBuf.put(cleanup)
return
}
// If the stream is already done, don't send the trailer.
if s.swapState(streamDone) == streamDone {
return
}
hdr.cleanup = cleanup
t.controlBuf.put(hdr)
}
func (t *http2Server) RemoteAddr() net.Addr {

View File

@ -5154,6 +5154,7 @@ type stubServer struct {
// Customizable implementations of server handlers.
emptyCall func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error)
unaryCall func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
fullDuplexCall func(stream testpb.TestService_FullDuplexCallServer) error
// A client connected to this service the test may use. Created in Start().
@ -5169,6 +5170,10 @@ func (ss *stubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.
return ss.emptyCall(ctx, in)
}
func (ss *stubServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return ss.unaryCall(ctx, in)
}
func (ss *stubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
return ss.fullDuplexCall(stream)
}

View File

@ -0,0 +1,57 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package test
import (
"context"
"testing"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
testpb "google.golang.org/grpc/test/grpc_testing"
)
func (s) TestStreamCleanup(t *testing.T) {
const initialWindowSize uint = 70 * 1024 // Must be higher than default 64K, ignored otherwise
const bodySize uint = 2 * initialWindowSize // Something that is not going to fit in a single window
const callRecvMsgSize uint = 1 // The maximum message size the client can receive
ss := &stubServer{
unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{Payload: &testpb.Payload{
Body: make([]byte, bodySize),
}}, nil
},
emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
}
if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)}, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(callRecvMsgSize))), grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
if _, err := ss.client.UnaryCall(context.Background(), &testpb.SimpleRequest{}); status.Code(err) != codes.ResourceExhausted {
t.Fatalf("should fail with ResourceExhausted, message's body size: %v, maximum message size the client can receive: %v", bodySize, callRecvMsgSize)
}
if _, err := ss.client.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
t.Fatalf("should succeed, err: %v", err)
}
}