mirror of https://github.com/grpc/grpc-go.git
allow stream cancellation when blocked on flow control
This commit is contained in:
parent
0ebea3ebca
commit
c1c5477e98
|
@ -1353,10 +1353,10 @@ func (t *http2Server) closeStream(s *ServerStream, rst bool, rstCode http2.ErrCo
|
||||||
// called to interrupt the potential blocking on other goroutines.
|
// called to interrupt the potential blocking on other goroutines.
|
||||||
s.cancel()
|
s.cancel()
|
||||||
|
|
||||||
oldState := s.swapState(streamDone)
|
// We can't return early even if the stream's state is "done," as the state
|
||||||
if oldState == streamDone {
|
// might have been set by the `finishStream` method. Deleting the stream via
|
||||||
return
|
// `finishStream` can get blocked on flow control.
|
||||||
}
|
s.swapState(streamDone)
|
||||||
t.deleteStream(s, eosReceived)
|
t.deleteStream(s, eosReceived)
|
||||||
|
|
||||||
t.controlBuf.put(&cleanupStream{
|
t.controlBuf.put(&cleanupStream{
|
||||||
|
|
|
@ -229,3 +229,76 @@ func (s) TestRSTDuringMessageRead(t *testing.T) {
|
||||||
t.Fatalf("client.EmptyCall() returned %v; want status with code %v", err, codes.Canceled)
|
t.Fatalf("client.EmptyCall() returned %v; want status with code %v", err, codes.Canceled)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test verifies that a client-side cancellation correctly frees up resources on
|
||||||
|
// the server. The test setup is designed to simulate a scenario where a server
|
||||||
|
// is blocked from sending a large message due to a full client-side flow
|
||||||
|
// control window. The client-side cancellation of this blocked RPC then frees
|
||||||
|
// up the max concurrent streams quota on the server, allowing a new RPC to be
|
||||||
|
// created successfully.
|
||||||
|
func (s) TestCancelWhileServerWaitingForFlowControl(t *testing.T) {
|
||||||
|
serverDoneCh := make(chan struct{}, 2)
|
||||||
|
const flowControlWindowSize = 65535
|
||||||
|
ss := &stubserver.StubServer{
|
||||||
|
StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
|
||||||
|
// Send a large message to exhaust the client's flow control window.
|
||||||
|
stream.Send(&testpb.StreamingOutputCallResponse{
|
||||||
|
Payload: &testpb.Payload{
|
||||||
|
Body: make([]byte, flowControlWindowSize+1),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
serverDoneCh <- struct{}{}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a server that allows only 1 stream at a time.
|
||||||
|
ss = stubserver.StartTestService(t, ss, grpc.MaxConcurrentStreams(1))
|
||||||
|
defer ss.Stop()
|
||||||
|
// Use a static flow control window.
|
||||||
|
if err := ss.StartClient(grpc.WithStaticStreamWindowSize(flowControlWindowSize)); err != nil {
|
||||||
|
t.Fatalf("Error while start test service client: %v", err)
|
||||||
|
}
|
||||||
|
client := ss.Client
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
streamCtx, streamCancel := context.WithCancel(ctx)
|
||||||
|
defer streamCancel()
|
||||||
|
|
||||||
|
if _, err := client.StreamingOutputCall(streamCtx, &testpb.StreamingOutputCallRequest{}); err != nil {
|
||||||
|
t.Fatalf("Failed to create server streaming RPC: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the server handler to return. This should cause the trailers to
|
||||||
|
// be buffered on the server, waiting for flow control quota to first send
|
||||||
|
// the data frame.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
t.Fatal("Context timed out waiting for server handler to return.")
|
||||||
|
case <-serverDoneCh:
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempt to create a stream. It should fail since the previous stream is
|
||||||
|
// still blocked.
|
||||||
|
shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
|
||||||
|
defer shortCancel()
|
||||||
|
_, err := client.StreamingOutputCall(shortCtx, &testpb.StreamingOutputCallRequest{})
|
||||||
|
if status.Code(err) != codes.DeadlineExceeded {
|
||||||
|
t.Fatalf("Server stream creation returned error with unexpected status code: %v, want code: %v", err, codes.DeadlineExceeded)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cancel the RPC, this should free up concurrent stream quota on the
|
||||||
|
// server.
|
||||||
|
streamCancel()
|
||||||
|
|
||||||
|
// Attempt to create another stream.
|
||||||
|
stream, err := client.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create server streaming RPC: %v", err)
|
||||||
|
}
|
||||||
|
_, err = stream.Recv()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to read from the stream: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue