This commit is contained in:
Arjan Singh Bal 2025-08-21 22:29:09 +05:30 committed by GitHub
commit a810667d4e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 77 additions and 4 deletions

View File

@ -1353,10 +1353,10 @@ func (t *http2Server) closeStream(s *ServerStream, rst bool, rstCode http2.ErrCo
// called to interrupt the potential blocking on other goroutines.
s.cancel()
oldState := s.swapState(streamDone)
if oldState == streamDone {
return
}
// We can't return early even if the stream's state is "done," as the state
// might have been set by the `finishStream` method. Deleting the stream via
// `finishStream` can get blocked on flow control.
s.swapState(streamDone)
t.deleteStream(s, eosReceived)
t.controlBuf.put(&cleanupStream{

View File

@ -229,3 +229,76 @@ func (s) TestRSTDuringMessageRead(t *testing.T) {
t.Fatalf("client.EmptyCall() returned %v; want status with code %v", err, codes.Canceled)
}
}
// Test verifies that a client-side cancellation correctly frees up resources on
// the server. The test setup is designed to simulate a scenario where a server
// is blocked from sending a large message due to a full client-side flow
// control window. The client-side cancellation of this blocked RPC then frees
// up the max concurrent streams quota on the server, allowing a new RPC to be
// created successfully.
func (s) TestCancelWhileServerWaitingForFlowControl(t *testing.T) {
serverDoneCh := make(chan struct{}, 2)
const flowControlWindowSize = 65535
ss := &stubserver.StubServer{
StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
// Send a large message to exhaust the client's flow control window.
stream.Send(&testpb.StreamingOutputCallResponse{
Payload: &testpb.Payload{
Body: make([]byte, flowControlWindowSize+1),
},
})
serverDoneCh <- struct{}{}
return nil
},
}
// Create a server that allows only 1 stream at a time.
ss = stubserver.StartTestService(t, ss, grpc.MaxConcurrentStreams(1))
defer ss.Stop()
// Use a static flow control window.
if err := ss.StartClient(grpc.WithStaticStreamWindowSize(flowControlWindowSize)); err != nil {
t.Fatalf("Error while start test service client: %v", err)
}
client := ss.Client
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
streamCtx, streamCancel := context.WithCancel(ctx)
defer streamCancel()
if _, err := client.StreamingOutputCall(streamCtx, &testpb.StreamingOutputCallRequest{}); err != nil {
t.Fatalf("Failed to create server streaming RPC: %v", err)
}
// Wait for the server handler to return. This should cause the trailers to
// be buffered on the server, waiting for flow control quota to first send
// the data frame.
select {
case <-ctx.Done():
t.Fatal("Context timed out waiting for server handler to return.")
case <-serverDoneCh:
}
// Attempt to create a stream. It should fail since the previous stream is
// still blocked.
shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer shortCancel()
_, err := client.StreamingOutputCall(shortCtx, &testpb.StreamingOutputCallRequest{})
if status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("Server stream creation returned error with unexpected status code: %v, want code: %v", err, codes.DeadlineExceeded)
}
// Cancel the RPC, this should free up concurrent stream quota on the
// server.
streamCancel()
// Attempt to create another stream.
stream, err := client.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{})
if err != nil {
t.Fatalf("Failed to create server streaming RPC: %v", err)
}
_, err = stream.Recv()
if err != nil {
t.Fatalf("Failed to read from the stream: %v", err)
}
}