client: fix race between client-side stream cancellation and compressed server data arriving (#3054)

`transport/Stream.RecvCompress` returns what the header contains, if present,
or empty string if a context error occurs.  However, it "prefers" the header
data even if there is a context error, to prevent a related race.  What happens
here is:

1. RPC starts.

2. Client cancels RPC.

3. `RecvCompress` tells `ClientStream.Recv` that compression used is "" because
   of the context error.  `as.decomp` is left nil, because there is no
   compressor to look up in the registry.

4. Server's header and first message hit client.

5. Client sees the header and message and allows grpc's stream to see them.
   (We only provide context errors if we need to block.)

6. Client performs a successful `Read` on the stream, receiving the gzipped
   payload, then checks `as.decomp`.

7. We have no decompressor but the payload has a bit set indicating the message
   is compressed, so this is an error.  However, when forming the error string,
   `RecvCompress` now returns "gzip" because it doesn't need to block to get
   this from the now-received header.  This leads to the confusing message
   about how "gzip" is not installed even though it is.

This change makes `waitOnHeader` close the stream when context cancellation happens.
Then `RecvCompress` uses whatever value is present in the stream at that time, which
can no longer change because the stream is closed.  Also, this will be in sync with
the messages on the stream - if there are any messages present, the headers must
have been processed first, and `RecvCompress` will contain the proper value.
This commit is contained in:
Doug Fawley 2019-10-01 10:47:40 -07:00 committed by GitHub
parent 861d8e7992
commit 663e4ce0c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 24 additions and 36 deletions

View File

@ -352,6 +352,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
s := &Stream{
ct: t,
done: make(chan struct{}),
method: callHdr.Method,
sendCompress: callHdr.SendCompress,
@ -1191,6 +1192,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
// If headerChan hasn't been closed yet
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
s.headerValid = true
if !endStream {
// HEADERS frame block carries a Response-Headers.
isHeader = true

View File

@ -233,6 +233,7 @@ const (
type Stream struct {
id uint32
st ServerTransport // nil for client side Stream
ct *http2Client // nil for server side Stream
ctx context.Context // the associated context of the stream
cancel context.CancelFunc // always nil for client side Stream
done chan struct{} // closed at the end of stream to unblock writers. On the client side.
@ -251,6 +252,10 @@ type Stream struct {
headerChan chan struct{} // closed to indicate the end of header metadata.
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
// headerValid indicates whether a valid header was received. Only
// meaningful after headerChan is closed (always call waitOnHeader() before
// reading its value).
headerValid bool
// hdrMu protects header and trailer metadata on the server-side.
hdrMu sync.Mutex
@ -303,34 +308,26 @@ func (s *Stream) getState() streamState {
return streamState(atomic.LoadUint32((*uint32)(&s.state)))
}
func (s *Stream) waitOnHeader() error {
func (s *Stream) waitOnHeader() {
if s.headerChan == nil {
// On the server headerChan is always nil since a stream originates
// only after having received headers.
return nil
return
}
select {
case <-s.ctx.Done():
// We prefer success over failure when reading messages because we delay
// context error in stream.Read(). To keep behavior consistent, we also
// prefer success here.
select {
case <-s.headerChan:
return nil
default:
}
return ContextErr(s.ctx.Err())
// Close the stream to prevent headers/trailers from changing after
// this function returns.
err := ContextErr(s.ctx.Err())
s.ct.closeStream(s, err, false, 0, status.Convert(err), nil, false)
case <-s.headerChan:
return nil
}
}
// RecvCompress returns the compression algorithm applied to the inbound
// message. It is empty string if there is no compression applied.
func (s *Stream) RecvCompress() string {
if err := s.waitOnHeader(); err != nil {
return ""
}
s.waitOnHeader()
return s.recvCompress
}
@ -358,29 +355,19 @@ func (s *Stream) Header() (metadata.MD, error) {
// header after t.WriteHeader is called.
return s.header.Copy(), nil
}
err := s.waitOnHeader()
// Even if the stream is closed, header is returned if available.
select {
case <-s.headerChan:
if s.header == nil {
return nil, nil
}
return s.header.Copy(), nil
default:
s.waitOnHeader()
if !s.headerValid {
return nil, s.status.Err()
}
return nil, err
return s.header.Copy(), nil
}
// TrailersOnly blocks until a header or trailers-only frame is received and
// then returns true if the stream was trailers-only. If the stream ends
// before headers are received, returns true, nil. If a context error happens
// first, returns it as a status error. Client-side only.
func (s *Stream) TrailersOnly() (bool, error) {
err := s.waitOnHeader()
if err != nil {
return false, err
}
return s.noHeaders, nil
// before headers are received, returns true, nil. Client-side only.
func (s *Stream) TrailersOnly() bool {
s.waitOnHeader()
return s.noHeaders
}
// Trailer returns the cached trailer metedata. Note that if it is not called

View File

@ -488,7 +488,7 @@ func (cs *clientStream) shouldRetry(err error) error {
pushback := 0
hasPushback := false
if cs.attempt.s != nil {
if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil || !to {
if !cs.attempt.s.TrailersOnly() {
return err
}

View File

@ -139,13 +139,12 @@ func (s) TestCancelWhileRecvingWithCompression(t *testing.T) {
for i := 0; i < 10; i++ {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
s, err := ss.client.FullDuplexCall(ctx, grpc.UseCompressor(gzip.Name))
if err != nil {
t.Fatalf("failed to start bidi streaming RPC: %v", err)
}
// Cancel the stream while receiving to trigger the internal error.
time.AfterFunc(time.Millisecond*10, cancel)
time.AfterFunc(time.Millisecond, cancel)
for {
_, err := s.Recv()
if err != nil {