client: fix potential panic during RPC retries (#5323)

This commit is contained in:
Doug Fawley 2022-05-04 10:06:12 -07:00 committed by GitHub
parent 78b13f27de
commit 799605c228
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 229 additions and 152 deletions

View File

@ -907,14 +907,10 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
} }
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) { func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{ return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
Ctx: ctx, Ctx: ctx,
FullMethodName: method, FullMethodName: method,
}) })
if err != nil {
return nil, nil, toRPCErr(err)
}
return t, done, nil
} }
func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) { func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {

View File

@ -631,8 +631,8 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
// the wire. However, there are two notable exceptions: // the wire. However, there are two notable exceptions:
// //
// 1. If the stream headers violate the max header list size allowed by the // 1. If the stream headers violate the max header list size allowed by the
// server. In this case there is no reason to retry at all, as it is // server. It's possible this could succeed on another transport, even if
// assumed the RPC would continue to fail on subsequent attempts. // it's unlikely, but do not transparently retry.
// 2. If the credentials errored when requesting their headers. In this case, // 2. If the credentials errored when requesting their headers. In this case,
// it's possible a retry can fix the problem, but indefinitely transparently // it's possible a retry can fix the problem, but indefinitely transparently
// retrying is not appropriate as it is likely the credentials, if they can // retrying is not appropriate as it is likely the credentials, if they can
@ -640,8 +640,7 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
type NewStreamError struct { type NewStreamError struct {
Err error Err error
DoNotRetry bool AllowTransparentRetry bool
DoNotTransparentRetry bool
} }
func (e NewStreamError) Error() string { func (e NewStreamError) Error() string {
@ -650,11 +649,11 @@ func (e NewStreamError) Error() string {
// NewStream creates a stream and registers it into the transport as "active" // NewStream creates a stream and registers it into the transport as "active"
// streams. All non-nil errors returned will be *NewStreamError. // streams. All non-nil errors returned will be *NewStreamError.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) { func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) {
ctx = peer.NewContext(ctx, t.getPeer()) ctx = peer.NewContext(ctx, t.getPeer())
headerFields, err := t.createHeaderFields(ctx, callHdr) headerFields, err := t.createHeaderFields(ctx, callHdr)
if err != nil { if err != nil {
return nil, &NewStreamError{Err: err, DoNotTransparentRetry: true} return nil, &NewStreamError{Err: err, AllowTransparentRetry: false}
} }
s := t.newStream(ctx, callHdr) s := t.newStream(ctx, callHdr)
cleanup := func(err error) { cleanup := func(err error) {
@ -754,13 +753,14 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return true return true
}, hdr) }, hdr)
if err != nil { if err != nil {
return nil, &NewStreamError{Err: err} // Connection closed.
return nil, &NewStreamError{Err: err, AllowTransparentRetry: true}
} }
if success { if success {
break break
} }
if hdrListSizeErr != nil { if hdrListSizeErr != nil {
return nil, &NewStreamError{Err: hdrListSizeErr, DoNotRetry: true} return nil, &NewStreamError{Err: hdrListSizeErr}
} }
firstTry = false firstTry = false
select { select {
@ -768,9 +768,9 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
case <-ctx.Done(): case <-ctx.Done():
return nil, &NewStreamError{Err: ContextErr(ctx.Err())} return nil, &NewStreamError{Err: ContextErr(ctx.Err())}
case <-t.goAway: case <-t.goAway:
return nil, &NewStreamError{Err: errStreamDrain} return nil, &NewStreamError{Err: errStreamDrain, AllowTransparentRetry: true}
case <-t.ctx.Done(): case <-t.ctx.Done():
return nil, &NewStreamError{Err: ErrConnClosing} return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
} }
} }
if t.statsHandler != nil { if t.statsHandler != nil {

View File

@ -131,7 +131,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
} }
if _, ok := status.FromError(err); ok { if _, ok := status.FromError(err); ok {
// Status error: end the RPC unconditionally with this status. // Status error: end the RPC unconditionally with this status.
return nil, nil, err return nil, nil, dropError{error: err}
} }
// For all other errors, wait for ready RPCs should block and other // For all other errors, wait for ready RPCs should block and other
// RPCs should fail with unavailable. // RPCs should fail with unavailable.
@ -175,3 +175,9 @@ func (pw *pickerWrapper) close() {
pw.done = true pw.done = true
close(pw.blockingCh) close(pw.blockingCh)
} }
// dropError is a wrapper error that indicates the LB policy wishes to drop the
// RPC and not retry it.
type dropError struct {
error
}

189
stream.go
View File

@ -303,14 +303,28 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
} }
cs.binlog = binarylog.GetMethodLogger(method) cs.binlog = binarylog.GetMethodLogger(method)
if err := cs.newAttemptLocked(false /* isTransparent */); err != nil { cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */)
if err != nil {
cs.finish(err) cs.finish(err)
return nil, err return nil, err
} }
op := func(a *csAttempt) error { return a.newStream() } // Pick the transport to use and create a new stream on the transport.
// Assign cs.attempt upon success.
op := func(a *csAttempt) error {
if err := a.getTransport(); err != nil {
return err
}
if err := a.newStream(); err != nil {
return err
}
// Because this operation is always called either here (while creating
// the clientStream) or by the retry code while locked when replaying
// the operation, it is safe to access cs.attempt directly.
cs.attempt = a
return nil
}
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil { if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
cs.finish(err)
return nil, err return nil, err
} }
@ -349,9 +363,15 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
return cs, nil return cs, nil
} }
// newAttemptLocked creates a new attempt with a transport. // newAttemptLocked creates a new csAttempt without a transport or stream.
// If it succeeds, then it replaces clientStream's attempt with this new attempt. func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) {
func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) { if err := cs.ctx.Err(); err != nil {
return nil, toRPCErr(err)
}
if err := cs.cc.ctx.Err(); err != nil {
return nil, ErrClientConnClosing
}
ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp) ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
method := cs.callHdr.Method method := cs.callHdr.Method
sh := cs.cc.dopts.copts.StatsHandler sh := cs.cc.dopts.copts.StatsHandler
@ -385,27 +405,6 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
ctx = trace.NewContext(ctx, trInfo.tr) ctx = trace.NewContext(ctx, trInfo.tr)
} }
newAttempt := &csAttempt{
ctx: ctx,
beginTime: beginTime,
cs: cs,
dc: cs.cc.dopts.dc,
statsHandler: sh,
trInfo: trInfo,
}
defer func() {
if retErr != nil {
// This attempt is not set in the clientStream, so it's finish won't
// be called. Call it here for stats and trace in case they are not
// nil.
newAttempt.finish(retErr)
}
}()
if err := ctx.Err(); err != nil {
return toRPCErr(err)
}
if cs.cc.parsedTarget.Scheme == "xds" { if cs.cc.parsedTarget.Scheme == "xds" {
// Add extra metadata (metadata that will be added by transport) to context // Add extra metadata (metadata that will be added by transport) to context
// so the balancer can see them. // so the balancer can see them.
@ -413,16 +412,32 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
"content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype), "content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
)) ))
} }
t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
return &csAttempt{
ctx: ctx,
beginTime: beginTime,
cs: cs,
dc: cs.cc.dopts.dc,
statsHandler: sh,
trInfo: trInfo,
}, nil
}
func (a *csAttempt) getTransport() error {
cs := a.cs
var err error
a.t, a.done, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
if err != nil { if err != nil {
if de, ok := err.(dropError); ok {
err = de.error
a.drop = true
}
return err return err
} }
if trInfo != nil { if a.trInfo != nil {
trInfo.firstLine.SetRemoteAddr(t.RemoteAddr()) a.trInfo.firstLine.SetRemoteAddr(a.t.RemoteAddr())
} }
newAttempt.t = t
newAttempt.done = done
cs.attempt = newAttempt
return nil return nil
} }
@ -431,12 +446,21 @@ func (a *csAttempt) newStream() error {
cs.callHdr.PreviousAttempts = cs.numRetries cs.callHdr.PreviousAttempts = cs.numRetries
s, err := a.t.NewStream(a.ctx, cs.callHdr) s, err := a.t.NewStream(a.ctx, cs.callHdr)
if err != nil { if err != nil {
// Return without converting to an RPC error so retry code can nse, ok := err.(*transport.NewStreamError)
// inspect. if !ok {
// Unexpected.
return err return err
} }
cs.attempt.s = s
cs.attempt.p = &parser{r: s} if nse.AllowTransparentRetry {
a.allowTransparentRetry = true
}
// Unwrap and convert error.
return toRPCErr(nse.Err)
}
a.s = s
a.p = &parser{r: s}
return nil return nil
} }
@ -514,6 +538,11 @@ type csAttempt struct {
statsHandler stats.Handler statsHandler stats.Handler
beginTime time.Time beginTime time.Time
// set for newStream errors that may be transparently retried
allowTransparentRetry bool
// set for pick errors that are returned as a status
drop bool
} }
func (cs *clientStream) commitAttemptLocked() { func (cs *clientStream) commitAttemptLocked() {
@ -533,41 +562,21 @@ func (cs *clientStream) commitAttempt() {
// shouldRetry returns nil if the RPC should be retried; otherwise it returns // shouldRetry returns nil if the RPC should be retried; otherwise it returns
// the error that should be returned by the operation. If the RPC should be // the error that should be returned by the operation. If the RPC should be
// retried, the bool indicates whether it is being retried transparently. // retried, the bool indicates whether it is being retried transparently.
func (cs *clientStream) shouldRetry(err error) (bool, error) { func (a *csAttempt) shouldRetry(err error) (bool, error) {
if cs.attempt.s == nil { cs := a.cs
// Error from NewClientStream.
nse, ok := err.(*transport.NewStreamError)
if !ok {
// Unexpected, but assume no I/O was performed and the RPC is not
// fatal, so retry indefinitely.
return true, nil
}
// Unwrap and convert error. if cs.finished || cs.committed || a.drop {
err = toRPCErr(nse.Err) // RPC is finished or committed or was dropped by the picker; cannot retry.
// Never retry DoNotRetry errors, which indicate the RPC should not be
// retried due to max header list size violation, etc.
if nse.DoNotRetry {
return false, err return false, err
} }
if a.s == nil && a.allowTransparentRetry {
// In the event of a non-IO operation error from NewStream, we never
// attempted to write anything to the wire, so we can retry
// indefinitely.
if !nse.DoNotTransparentRetry {
return true, nil return true, nil
} }
}
if cs.finished || cs.committed {
// RPC is finished or committed; cannot retry.
return false, err
}
// Wait for the trailers. // Wait for the trailers.
unprocessed := false unprocessed := false
if cs.attempt.s != nil { if a.s != nil {
<-cs.attempt.s.Done() <-a.s.Done()
unprocessed = cs.attempt.s.Unprocessed() unprocessed = a.s.Unprocessed()
} }
if cs.firstAttempt && unprocessed { if cs.firstAttempt && unprocessed {
// First attempt, stream unprocessed: transparently retry. // First attempt, stream unprocessed: transparently retry.
@ -579,14 +588,14 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) {
pushback := 0 pushback := 0
hasPushback := false hasPushback := false
if cs.attempt.s != nil { if a.s != nil {
if !cs.attempt.s.TrailersOnly() { if !a.s.TrailersOnly() {
return false, err return false, err
} }
// TODO(retry): Move down if the spec changes to not check server pushback // TODO(retry): Move down if the spec changes to not check server pushback
// before considering this a failure for throttling. // before considering this a failure for throttling.
sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"] sps := a.s.Trailer()["grpc-retry-pushback-ms"]
if len(sps) == 1 { if len(sps) == 1 {
var e error var e error
if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 { if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
@ -603,10 +612,10 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) {
} }
var code codes.Code var code codes.Code
if cs.attempt.s != nil { if a.s != nil {
code = cs.attempt.s.Status().Code() code = a.s.Status().Code()
} else { } else {
code = status.Convert(err).Code() code = status.Code(err)
} }
rp := cs.methodConfig.RetryPolicy rp := cs.methodConfig.RetryPolicy
@ -651,19 +660,24 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) {
} }
// Returns nil if a retry was performed and succeeded; error otherwise. // Returns nil if a retry was performed and succeeded; error otherwise.
func (cs *clientStream) retryLocked(lastErr error) error { func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error {
for { for {
cs.attempt.finish(toRPCErr(lastErr)) attempt.finish(toRPCErr(lastErr))
isTransparent, err := cs.shouldRetry(lastErr) isTransparent, err := attempt.shouldRetry(lastErr)
if err != nil { if err != nil {
cs.commitAttemptLocked() cs.commitAttemptLocked()
return err return err
} }
cs.firstAttempt = false cs.firstAttempt = false
if err := cs.newAttemptLocked(isTransparent); err != nil { attempt, err = cs.newAttemptLocked(isTransparent)
if err != nil {
// Only returns error if the clientconn is closed or the context of
// the stream is canceled.
return err return err
} }
if lastErr = cs.replayBufferLocked(); lastErr == nil { // Note that the first op in the replay buffer always sets cs.attempt
// if it is able to pick a transport and create a stream.
if lastErr = cs.replayBufferLocked(attempt); lastErr == nil {
return nil return nil
} }
} }
@ -673,7 +687,10 @@ func (cs *clientStream) Context() context.Context {
cs.commitAttempt() cs.commitAttempt()
// No need to lock before using attempt, since we know it is committed and // No need to lock before using attempt, since we know it is committed and
// cannot change. // cannot change.
if cs.attempt.s != nil {
return cs.attempt.s.Context() return cs.attempt.s.Context()
}
return cs.ctx
} }
func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error { func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
@ -703,7 +720,7 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())
cs.mu.Unlock() cs.mu.Unlock()
return err return err
} }
if err := cs.retryLocked(err); err != nil { if err := cs.retryLocked(a, err); err != nil {
cs.mu.Unlock() cs.mu.Unlock()
return err return err
} }
@ -734,7 +751,7 @@ func (cs *clientStream) Header() (metadata.MD, error) {
cs.binlog.Log(logEntry) cs.binlog.Log(logEntry)
cs.serverHeaderBinlogged = true cs.serverHeaderBinlogged = true
} }
return m, err return m, nil
} }
func (cs *clientStream) Trailer() metadata.MD { func (cs *clientStream) Trailer() metadata.MD {
@ -752,10 +769,9 @@ func (cs *clientStream) Trailer() metadata.MD {
return cs.attempt.s.Trailer() return cs.attempt.s.Trailer()
} }
func (cs *clientStream) replayBufferLocked() error { func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
a := cs.attempt
for _, f := range cs.buffer { for _, f := range cs.buffer {
if err := f(a); err != nil { if err := f(attempt); err != nil {
return err return err
} }
} }
@ -803,22 +819,17 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
if len(payload) > *cs.callInfo.maxSendMessageSize { if len(payload) > *cs.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize) return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
} }
msgBytes := data // Store the pointer before setting to nil. For binary logging.
op := func(a *csAttempt) error { op := func(a *csAttempt) error {
err := a.sendMsg(m, hdr, payload, data) return a.sendMsg(m, hdr, payload, data)
// nil out the message and uncomp when replaying; they are only needed for
// stats which is disabled for subsequent attempts.
m, data = nil, nil
return err
} }
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) }) err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
if cs.binlog != nil && err == nil { if cs.binlog != nil && err == nil {
cs.binlog.Log(&binarylog.ClientMessage{ cs.binlog.Log(&binarylog.ClientMessage{
OnClientSide: true, OnClientSide: true,
Message: msgBytes, Message: data,
}) })
} }
return return err
} }
func (cs *clientStream) RecvMsg(m interface{}) error { func (cs *clientStream) RecvMsg(m interface{}) error {

View File

@ -1508,7 +1508,7 @@ func testFailFast(t *testing.T, e env) {
cc := te.clientConn() cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel() defer cancel()
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
@ -1517,9 +1517,10 @@ func testFailFast(t *testing.T, e env) {
te.srv.Stop() te.srv.Stop()
// Loop until the server teardown is propagated to the client. // Loop until the server teardown is propagated to the client.
for { for {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) if err := ctx.Err(); err != nil {
t.Fatalf("EmptyCall did not return UNAVAILABLE before timeout")
}
_, err := tc.EmptyCall(ctx, &testpb.Empty{}) _, err := tc.EmptyCall(ctx, &testpb.Empty{})
cancel()
if status.Code(err) == codes.Unavailable { if status.Code(err) == codes.Unavailable {
break break
} }

View File

@ -34,6 +34,7 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats" "google.golang.org/grpc/stats"
@ -44,7 +45,8 @@ import (
func (s) TestRetryUnary(t *testing.T) { func (s) TestRetryUnary(t *testing.T) {
i := -1 i := -1
ss := &stubserver.StubServer{ ss := &stubserver.StubServer{
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { EmptyCallF: func(context.Context, *testpb.Empty) (r *testpb.Empty, err error) {
defer func() { t.Logf("server call %v returning err %v", i, err) }()
i++ i++
switch i { switch i {
case 0, 2, 5: case 0, 2, 5:
@ -55,11 +57,8 @@ func (s) TestRetryUnary(t *testing.T) {
return nil, status.New(codes.AlreadyExists, "retryable error").Err() return nil, status.New(codes.AlreadyExists, "retryable error").Err()
}, },
} }
if err := ss.Start([]grpc.ServerOption{}); err != nil { if err := ss.Start([]grpc.ServerOption{},
t.Fatalf("Error starting endpoint server: %v", err) grpc.WithDefaultServiceConfig(`{
}
defer ss.Stop()
ss.NewServiceConfig(`{
"methodConfig": [{ "methodConfig": [{
"name": [{"service": "grpc.testing.TestService"}], "name": [{"service": "grpc.testing.TestService"}],
"waitForReady": true, "waitForReady": true,
@ -70,18 +69,10 @@ func (s) TestRetryUnary(t *testing.T) {
"BackoffMultiplier": 1.0, "BackoffMultiplier": 1.0,
"RetryableStatusCodes": [ "ALREADY_EXISTS" ] "RetryableStatusCodes": [ "ALREADY_EXISTS" ]
} }
}]}`) }]}`)); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) t.Fatalf("Error starting endpoint server: %v", err)
for {
if ctx.Err() != nil {
t.Fatalf("Timed out waiting for service config update")
} }
if ss.CC.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil { defer ss.Stop()
break
}
time.Sleep(time.Millisecond)
}
cancel()
testCases := []struct { testCases := []struct {
code codes.Code code codes.Code
@ -95,7 +86,8 @@ func (s) TestRetryUnary(t *testing.T) {
{codes.Internal, 11}, {codes.Internal, 11},
{codes.AlreadyExists, 15}, {codes.AlreadyExists, 15},
} }
for _, tc := range testCases { for num, tc := range testCases {
t.Log("Case", num)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
_, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}) _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{})
cancel() cancel()
@ -120,11 +112,8 @@ func (s) TestRetryThrottling(t *testing.T) {
return nil, status.New(codes.Unavailable, "retryable error").Err() return nil, status.New(codes.Unavailable, "retryable error").Err()
}, },
} }
if err := ss.Start([]grpc.ServerOption{}); err != nil { if err := ss.Start([]grpc.ServerOption{},
t.Fatalf("Error starting endpoint server: %v", err) grpc.WithDefaultServiceConfig(`{
}
defer ss.Stop()
ss.NewServiceConfig(`{
"methodConfig": [{ "methodConfig": [{
"name": [{"service": "grpc.testing.TestService"}], "name": [{"service": "grpc.testing.TestService"}],
"waitForReady": true, "waitForReady": true,
@ -140,18 +129,10 @@ func (s) TestRetryThrottling(t *testing.T) {
"maxTokens": 10, "maxTokens": 10,
"tokenRatio": 0.5 "tokenRatio": 0.5
} }
}`) }`)); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) t.Fatalf("Error starting endpoint server: %v", err)
for {
if ctx.Err() != nil {
t.Fatalf("Timed out waiting for service config update")
} }
if ss.CC.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil { defer ss.Stop()
break
}
time.Sleep(time.Millisecond)
}
cancel()
testCases := []struct { testCases := []struct {
code codes.Code code codes.Code
@ -430,11 +411,8 @@ func (s) TestRetryStreaming(t *testing.T) {
return nil return nil
}, },
} }
if err := ss.Start([]grpc.ServerOption{}, grpc.WithDefaultCallOptions(grpc.MaxRetryRPCBufferSize(200))); err != nil { if err := ss.Start([]grpc.ServerOption{}, grpc.WithDefaultCallOptions(grpc.MaxRetryRPCBufferSize(200)),
t.Fatalf("Error starting endpoint server: %v", err) grpc.WithDefaultServiceConfig(`{
}
defer ss.Stop()
ss.NewServiceConfig(`{
"methodConfig": [{ "methodConfig": [{
"name": [{"service": "grpc.testing.TestService"}], "name": [{"service": "grpc.testing.TestService"}],
"waitForReady": true, "waitForReady": true,
@ -445,7 +423,10 @@ func (s) TestRetryStreaming(t *testing.T) {
"BackoffMultiplier": 1.0, "BackoffMultiplier": 1.0,
"RetryableStatusCodes": [ "UNAVAILABLE" ] "RetryableStatusCodes": [ "UNAVAILABLE" ]
} }
}]}`) }]}`)); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
for { for {
if ctx.Err() != nil { if ctx.Err() != nil {
@ -644,3 +625,85 @@ func (s) TestRetryStats(t *testing.T) {
t.Fatalf("pushback time before final attempt = %v; want ~10ms", diff) t.Fatalf("pushback time before final attempt = %v; want ~10ms", diff)
} }
} }
func (s) TestRetryTransparentWhenCommitted(t *testing.T) {
// With MaxConcurrentStreams=1:
//
// 1. Create stream 1 that is retriable.
// 2. Stream 1 is created and fails with a retriable code.
// 3. Create dummy stream 2, blocking indefinitely.
// 4. Stream 1 retries (and blocks until stream 2 finishes)
// 5. Stream 1 is canceled manually.
//
// If there is no bug, the stream is done and errors with CANCELED. With a bug:
//
// 6. Stream 1 has a nil stream (attempt.s). Operations like CloseSend will panic.
first := grpcsync.NewEvent()
ss := &stubserver.StubServer{
FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error {
// signal?
if !first.HasFired() {
first.Fire()
t.Log("returned first error")
return status.Error(codes.AlreadyExists, "first attempt fails and is retriable")
}
t.Log("blocking")
<-stream.Context().Done()
return stream.Context().Err()
},
}
if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)},
grpc.WithDefaultServiceConfig(`{
"methodConfig": [{
"name": [{"service": "grpc.testing.TestService"}],
"waitForReady": true,
"retryPolicy": {
"MaxAttempts": 2,
"InitialBackoff": ".1s",
"MaxBackoff": ".1s",
"BackoffMultiplier": 1.0,
"RetryableStatusCodes": [ "ALREADY_EXISTS" ]
}
}]}`)); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
ctx1, cancel1 := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel1()
ctx2, cancel2 := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel2()
stream1, err := ss.Client.FullDuplexCall(ctx1)
if err != nil {
t.Fatalf("Error creating stream 1: %v", err)
}
// Create dummy stream to block indefinitely.
_, err = ss.Client.FullDuplexCall(ctx2)
if err != nil {
t.Errorf("Error creating stream 2: %v", err)
}
stream1Closed := grpcsync.NewEvent()
go func() {
_, err := stream1.Recv()
// Will trigger a retry when it sees the ALREADY_EXISTS error
if status.Code(err) != codes.Canceled {
t.Errorf("Expected stream1 to be canceled; got error: %v", err)
}
stream1Closed.Fire()
}()
// Wait longer than the retry backoff timer.
time.Sleep(200 * time.Millisecond)
cancel1()
// Operations on the stream should not panic.
<-stream1Closed.Done()
stream1.CloseSend()
stream1.Recv()
stream1.Send(&testpb.StreamingOutputCallRequest{})
}