grpclb: support explicit fallback signal (#3351)

This commit is contained in:
Doug Fawley 2020-02-05 10:16:25 -08:00 committed by GitHub
parent 13535f71d1
commit 597699c0ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 134 additions and 8 deletions

View File

@ -278,6 +278,12 @@ func (ccw *remoteBalancerCCWrapper) readServerList(s *balanceLoadClientStream) e
if serverList := reply.GetServerList(); serverList != nil {
ccw.lb.processServerList(serverList)
}
if reply.GetFallbackResponse() != nil {
// Eagerly enter fallback
ccw.lb.mu.Lock()
ccw.lb.refreshSubConns(ccw.lb.resolvedBackendAddrs, true, ccw.lb.usePickFirst)
ccw.lb.mu.Unlock()
}
}
}

View File

@ -183,6 +183,7 @@ type remoteBalancer struct {
done chan struct{}
stats *rpcStats
statsChan chan *lbpb.ClientStats
fbChan chan struct{}
}
func newRemoteBalancer(intervals []time.Duration, statsChan chan *lbpb.ClientStats) *remoteBalancer {
@ -191,6 +192,7 @@ func newRemoteBalancer(intervals []time.Duration, statsChan chan *lbpb.ClientSta
done: make(chan struct{}),
stats: newRPCStats(),
statsChan: statsChan,
fbChan: make(chan struct{}),
}
}
@ -199,6 +201,10 @@ func (b *remoteBalancer) stop() {
close(b.done)
}
func (b *remoteBalancer) fallbackNow() {
b.fbChan <- struct{}{}
}
func (b *remoteBalancer) BalanceLoad(stream lbgrpc.LoadBalancer_BalanceLoadServer) error {
req, err := stream.Recv()
if err != nil {
@ -244,6 +250,12 @@ func (b *remoteBalancer) BalanceLoad(stream lbgrpc.LoadBalancer_BalanceLoadServe
ServerList: v,
},
}
case <-b.fbChan:
resp = &lbpb.LoadBalanceResponse{
LoadBalanceResponseType: &lbpb.LoadBalanceResponse_FallbackResponse{
FallbackResponse: &lbpb.FallbackResponse{},
},
}
case <-stream.Context().Done():
return stream.Context().Err()
}
@ -805,12 +817,17 @@ func TestFallback(t *testing.T) {
// Close backend and remote balancer connections, should use fallback.
tss.beListeners[0].(*restartableListener).stopPreviousConns()
tss.lbListener.(*restartableListener).stopPreviousConns()
time.Sleep(time.Second)
var fallbackUsed bool
for i := 0; i < 1000; i++ {
for i := 0; i < 2000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
// Because we are hard-closing the connection, above, it's possible
// for the first RPC attempt to be sent on the old connection,
// which will lead to an Unavailable error when it is closed.
// Ignore unavailable errors.
if status.Code(err) != codes.Unavailable {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
}
if p.Addr.String() == beLis.Addr().String() {
fallbackUsed = true
@ -819,7 +836,7 @@ func TestFallback(t *testing.T) {
time.Sleep(time.Millisecond)
}
if !fallbackUsed {
t.Fatalf("No RPC sent to fallback after 1 second")
t.Fatalf("No RPC sent to fallback after 2 seconds")
}
// Restart backend and remote balancer, should not use backends.
@ -827,10 +844,8 @@ func TestFallback(t *testing.T) {
tss.lbListener.(*restartableListener).restart()
tss.ls.sls <- sl
time.Sleep(time.Second)
var backendUsed2 bool
for i := 0; i < 1000; i++ {
for i := 0; i < 2000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
@ -841,7 +856,112 @@ func TestFallback(t *testing.T) {
time.Sleep(time.Millisecond)
}
if !backendUsed2 {
t.Fatalf("No RPC sent to backend behind remote balancer after 1 second")
t.Fatalf("No RPC sent to backend behind remote balancer after 2 seconds")
}
}
func TestExplicitFallback(t *testing.T) {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
tss, cleanup, err := newLoadBalancer(1, nil)
if err != nil {
t.Fatalf("failed to create new load balancer: %v", err)
}
defer cleanup()
// Start a standalone backend.
beLis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Failed to listen %v", err)
}
defer beLis.Close()
standaloneBEs := startBackends(beServerName, true, beLis)
defer stopBackends(standaloneBEs)
be := &lbpb.Server{
IpAddress: tss.beIPs[0],
Port: int32(tss.bePorts[0]),
LoadBalanceToken: lbToken,
}
var bes []*lbpb.Server
bes = append(bes, be)
sl := &lbpb.ServerList{
Servers: bes,
}
tss.ls.sls <- sl
creds := serverNameCheckCreds{}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
grpc.WithTransportCredentials(&creds), grpc.WithContextDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
}
defer cc.Close()
testC := testpb.NewTestServiceClient(cc)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}, {
Addr: beLis.Addr().String(),
Type: resolver.Backend,
}}})
var p peer.Peer
var backendUsed bool
for i := 0; i < 2000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] {
backendUsed = true
break
}
time.Sleep(time.Millisecond)
}
if !backendUsed {
t.Fatalf("No RPC sent to backend behind remote balancer after 2 seconds")
}
// Send fallback signal from remote balancer; should use fallback.
tss.ls.fallbackNow()
var fallbackUsed bool
for i := 0; i < 2000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if p.Addr.String() == beLis.Addr().String() {
fallbackUsed = true
break
}
time.Sleep(time.Millisecond)
}
if !fallbackUsed {
t.Fatalf("No RPC sent to fallback after 2 seconds")
}
// Send another server list; should use backends again.
tss.ls.sls <- sl
backendUsed = false
for i := 0; i < 2000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] {
backendUsed = true
break
}
time.Sleep(time.Millisecond)
}
if !backendUsed {
t.Fatalf("No RPC sent to backend behind remote balancer after 2 seconds")
}
}