mirror of https://github.com/grpc/grpc-go.git
grpclb: recover after receiving an empty server list (#4879)
This commit is contained in:
parent
0d5030755c
commit
bd0f88150d
|
|
@ -135,11 +135,19 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
|
|||
}
|
||||
|
||||
if lb.usePickFirst {
|
||||
var sc balancer.SubConn
|
||||
for _, sc = range lb.subConns {
|
||||
var (
|
||||
scKey resolver.Address
|
||||
sc balancer.SubConn
|
||||
)
|
||||
for scKey, sc = range lb.subConns {
|
||||
break
|
||||
}
|
||||
if sc != nil {
|
||||
if len(backendAddrs) == 0 {
|
||||
lb.cc.cc.RemoveSubConn(sc)
|
||||
delete(lb.subConns, scKey)
|
||||
return
|
||||
}
|
||||
lb.cc.cc.UpdateAddresses(sc, backendAddrs)
|
||||
sc.Connect()
|
||||
return
|
||||
|
|
|
|||
|
|
@ -1274,6 +1274,73 @@ func (s) TestGRPCLBBackendConnectionErrorPropagation(t *testing.T) {
|
|||
wg.Wait()
|
||||
}
|
||||
|
||||
func testGRPCLBEmptyServerList(t *testing.T, svcfg string) {
|
||||
r := manual.NewBuilderWithScheme("whatever")
|
||||
|
||||
tss, cleanup, err := startBackendsAndRemoteLoadBalancer(1, "", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new load balancer: %v", err)
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
beServers := []*lbpb.Server{{
|
||||
IpAddress: tss.beIPs[0],
|
||||
Port: int32(tss.bePorts[0]),
|
||||
LoadBalanceToken: lbToken,
|
||||
}}
|
||||
|
||||
creds := serverNameCheckCreds{}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
|
||||
grpc.WithResolvers(r),
|
||||
grpc.WithTransportCredentials(&creds),
|
||||
grpc.WithContextDialer(fakeNameDialer))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to dial to the backend %v", err)
|
||||
}
|
||||
defer cc.Close()
|
||||
testC := testpb.NewTestServiceClient(cc)
|
||||
|
||||
tss.ls.sls <- &lbpb.ServerList{Servers: beServers}
|
||||
|
||||
rs := grpclbstate.Set(resolver.State{ServiceConfig: r.CC.ParseServiceConfig(svcfg)},
|
||||
&grpclbstate.State{BalancerAddresses: []resolver.Address{{
|
||||
Addr: tss.lbAddr,
|
||||
ServerName: lbServerName,
|
||||
}}})
|
||||
r.UpdateState(rs)
|
||||
t.Log("Perform an initial RPC and expect it to succeed...")
|
||||
if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
||||
t.Fatalf("Initial _.EmptyCall(_, _) = _, %v, want _, <nil>", err)
|
||||
}
|
||||
t.Log("Now send an empty server list. Wait until we see an RPC failure to make sure the client got it...")
|
||||
tss.ls.sls <- &lbpb.ServerList{}
|
||||
gotError := false
|
||||
for i := 0; i < 100; i++ {
|
||||
if _, err := testC.EmptyCall(ctx, &testpb.Empty{}); err != nil {
|
||||
gotError = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !gotError {
|
||||
t.Fatalf("Expected to eventually see an RPC fail after the grpclb sends an empty server list, but none did.")
|
||||
}
|
||||
t.Log("Now send a non-empty server list. A wait-for-ready RPC should now succeed...")
|
||||
tss.ls.sls <- &lbpb.ServerList{Servers: beServers}
|
||||
if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
||||
t.Fatalf("Final _.EmptyCall(_, _) = _, %v, want _, <nil>", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s) TestGRPCLBEmptyServerListRoundRobin(t *testing.T) {
|
||||
testGRPCLBEmptyServerList(t, `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}}]}}]}`)
|
||||
}
|
||||
|
||||
func (s) TestGRPCLBEmptyServerListPickFirst(t *testing.T) {
|
||||
testGRPCLBEmptyServerList(t, `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`)
|
||||
}
|
||||
|
||||
func (s) TestGRPCLBWithTargetNameFieldInConfig(t *testing.T) {
|
||||
r := manual.NewBuilderWithScheme("whatever")
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue