mirror of https://github.com/grpc/grpc-go.git
transport: fix race causing flow control discrepancy when sending messages over server limit (#1859)
* In case of an error write transport quota back. * Added test.
This commit is contained in:
parent
6c48c7f5c8
commit
484b3ebb4a
|
@ -5996,3 +5996,47 @@ func TestServeExitsWhenListenerClosed(t *testing.T) {
|
|||
t.Fatalf("Serve did not return after %v", timeout)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientDoesntDeadlockWhileWritingErrornousLargeMessages(t *testing.T) {
|
||||
defer leakcheck.Check(t)
|
||||
for _, e := range listTestEnv() {
|
||||
if e.httpHandler {
|
||||
continue
|
||||
}
|
||||
testClientDoesntDeadlockWhileWritingErrornousLargeMessages(t, e)
|
||||
}
|
||||
}
|
||||
|
||||
func testClientDoesntDeadlockWhileWritingErrornousLargeMessages(t *testing.T, e env) {
|
||||
te := newTest(t, e)
|
||||
te.userAgent = testAppUA
|
||||
smallSize := 1024
|
||||
te.maxServerReceiveMsgSize = &smallSize
|
||||
te.startServer(&testServer{security: e.security})
|
||||
defer te.tearDown()
|
||||
tc := testpb.NewTestServiceClient(te.clientConn())
|
||||
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1048576)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req := &testpb.SimpleRequest{
|
||||
ResponseType: testpb.PayloadType_COMPRESSABLE,
|
||||
Payload: payload,
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for j := 0; j < 100; j++ {
|
||||
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
|
||||
defer cancel()
|
||||
if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.ResourceExhausted {
|
||||
t.Errorf("TestService/UnaryCall(_,_) = _. %v, want code: %s", err, codes.ResourceExhausted)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
|
|
@ -716,6 +716,8 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
|
|||
}
|
||||
ltq, _, err := t.localSendQuota.get(size, s.waiters)
|
||||
if err != nil {
|
||||
// Add the acquired quota back to transport.
|
||||
t.sendQuotaPool.add(tq)
|
||||
return err
|
||||
}
|
||||
// even if ltq is smaller than size we don't adjust size since
|
||||
|
|
|
@ -888,6 +888,8 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
|
|||
}
|
||||
ltq, _, err := t.localSendQuota.get(size, s.waiters)
|
||||
if err != nil {
|
||||
// Add the acquired quota back to transport.
|
||||
t.sendQuotaPool.add(tq)
|
||||
return err
|
||||
}
|
||||
// even if ltq is smaller than size we don't adjust size since,
|
||||
|
|
Loading…
Reference in New Issue