mirror of https://github.com/grpc/grpc-go.git
grpclb: propagate the most recent connection error when grpclb enters transient failure (#4605)
This commit is contained in:
parent
8332d5b997
commit
0a8c63739a
|
@ -25,6 +25,7 @@ package grpclb
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -221,6 +222,7 @@ type lbBalancer struct {
|
||||||
// when resolved address updates are received, and read in the goroutine
|
// when resolved address updates are received, and read in the goroutine
|
||||||
// handling fallback.
|
// handling fallback.
|
||||||
resolvedBackendAddrs []resolver.Address
|
resolvedBackendAddrs []resolver.Address
|
||||||
|
connErr error // the last connection error
|
||||||
}
|
}
|
||||||
|
|
||||||
// regeneratePicker takes a snapshot of the balancer, and generates a picker from
|
// regeneratePicker takes a snapshot of the balancer, and generates a picker from
|
||||||
|
@ -230,7 +232,7 @@ type lbBalancer struct {
|
||||||
// Caller must hold lb.mu.
|
// Caller must hold lb.mu.
|
||||||
func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
|
func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
|
||||||
if lb.state == connectivity.TransientFailure {
|
if lb.state == connectivity.TransientFailure {
|
||||||
lb.picker = &errPicker{err: balancer.ErrTransientFailure}
|
lb.picker = &errPicker{err: fmt.Errorf("all SubConns are in TransientFailure, last connection error: %v", lb.connErr)}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -336,6 +338,8 @@ func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubCo
|
||||||
// When an address was removed by resolver, b called RemoveSubConn but
|
// When an address was removed by resolver, b called RemoveSubConn but
|
||||||
// kept the sc's state in scStates. Remove state for this sc here.
|
// kept the sc's state in scStates. Remove state for this sc here.
|
||||||
delete(lb.scStates, sc)
|
delete(lb.scStates, sc)
|
||||||
|
case connectivity.TransientFailure:
|
||||||
|
lb.connErr = scs.ConnectionError
|
||||||
}
|
}
|
||||||
// Force regenerate picker if
|
// Force regenerate picker if
|
||||||
// - this sc became ready from not-ready
|
// - this sc became ready from not-ready
|
||||||
|
|
|
@ -1232,6 +1232,65 @@ func (s) TestGRPCLBPickFirst(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s) TestGRPCLBBackendConnectionErrorPropagation(t *testing.T) {
|
||||||
|
r := manual.NewBuilderWithScheme("whatever")
|
||||||
|
|
||||||
|
// Start up an LB which will tell the client to fall back
|
||||||
|
// right away.
|
||||||
|
tss, cleanup, err := newLoadBalancer(0, "", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create new load balancer: %v", err)
|
||||||
|
}
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
// Start a standalone backend, to be used during fallback. The creds
|
||||||
|
// are intentionally misconfigured in order to simulate failure of a
|
||||||
|
// security handshake.
|
||||||
|
beLis, err := net.Listen("tcp", "localhost:0")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to listen %v", err)
|
||||||
|
}
|
||||||
|
defer beLis.Close()
|
||||||
|
standaloneBEs := startBackends("arbitrary.invalid.name", true, beLis)
|
||||||
|
defer stopBackends(standaloneBEs)
|
||||||
|
|
||||||
|
creds := serverNameCheckCreds{}
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, grpc.WithResolvers(r),
|
||||||
|
grpc.WithTransportCredentials(&creds), grpc.WithContextDialer(fakeNameDialer))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to dial to the backend %v", err)
|
||||||
|
}
|
||||||
|
defer cc.Close()
|
||||||
|
testC := testpb.NewTestServiceClient(cc)
|
||||||
|
|
||||||
|
r.UpdateState(resolver.State{Addresses: []resolver.Address{{
|
||||||
|
Addr: tss.lbAddr,
|
||||||
|
Type: resolver.GRPCLB,
|
||||||
|
ServerName: lbServerName,
|
||||||
|
}, {
|
||||||
|
Addr: beLis.Addr().String(),
|
||||||
|
Type: resolver.Backend,
|
||||||
|
}}})
|
||||||
|
|
||||||
|
// If https://github.com/grpc/grpc-go/blob/65cabd74d8e18d7347fecd414fa8d83a00035f5f/balancer/grpclb/grpclb_test.go#L103
|
||||||
|
// changes, then expectedErrMsg may need to be updated.
|
||||||
|
const expectedErrMsg = "received unexpected server name"
|
||||||
|
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
tss.ls.fallbackNow()
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
if _, err := testC.EmptyCall(ctx, &testpb.Empty{}); err == nil || !strings.Contains(err.Error(), expectedErrMsg) {
|
||||||
|
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, rpc error containing substring: %q", testC, err, expectedErrMsg)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
type failPreRPCCred struct{}
|
type failPreRPCCred struct{}
|
||||||
|
|
||||||
func (failPreRPCCred) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
|
func (failPreRPCCred) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
|
||||||
|
|
Loading…
Reference in New Issue