mirror of https://github.com/grpc/grpc-go.git
Switch balancer to grpclb when at least one address is grpclb address (#1692)
This commit is contained in:
parent
cce0e436e5
commit
dba60db4f4
|
@ -38,7 +38,7 @@ func checkPickFirst(cc *ClientConn, servers []*server) error {
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
connected := false
|
connected := false
|
||||||
for i := 0; i < 1000; i++ {
|
for i := 0; i < 5000; i++ {
|
||||||
if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); ErrorDesc(err) == servers[0].port {
|
if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); ErrorDesc(err) == servers[0].port {
|
||||||
if connected {
|
if connected {
|
||||||
// connected is set to false if peer is not server[0]. So if
|
// connected is set to false if peer is not server[0]. So if
|
||||||
|
@ -53,7 +53,7 @@ func checkPickFirst(cc *ClientConn, servers []*server) error {
|
||||||
time.Sleep(time.Millisecond)
|
time.Sleep(time.Millisecond)
|
||||||
}
|
}
|
||||||
if !connected {
|
if !connected {
|
||||||
return fmt.Errorf("pickfirst is not in effect after 1 second, EmptyCall() = _, %v, want _, %v", err, servers[0].port)
|
return fmt.Errorf("pickfirst is not in effect after 5 second, EmptyCall() = _, %v, want _, %v", err, servers[0].port)
|
||||||
}
|
}
|
||||||
// The following RPCs should all succeed with the first server.
|
// The following RPCs should all succeed with the first server.
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
|
@ -78,7 +78,7 @@ func checkRoundRobin(cc *ClientConn, servers []*server) error {
|
||||||
// picked by the closing pickfirst balancer, and the test becomes flaky.
|
// picked by the closing pickfirst balancer, and the test becomes flaky.
|
||||||
for _, s := range servers {
|
for _, s := range servers {
|
||||||
var up bool
|
var up bool
|
||||||
for i := 0; i < 1000; i++ {
|
for i := 0; i < 5000; i++ {
|
||||||
if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); ErrorDesc(err) == s.port {
|
if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); ErrorDesc(err) == s.port {
|
||||||
up = true
|
up = true
|
||||||
break
|
break
|
||||||
|
@ -86,7 +86,7 @@ func checkRoundRobin(cc *ClientConn, servers []*server) error {
|
||||||
time.Sleep(time.Millisecond)
|
time.Sleep(time.Millisecond)
|
||||||
}
|
}
|
||||||
if !up {
|
if !up {
|
||||||
return fmt.Errorf("server %v is not up within 1 second", s.port)
|
return fmt.Errorf("server %v is not up within 5 second", s.port)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -131,3 +131,284 @@ func TestSwitchBalancer(t *testing.T) {
|
||||||
t.Fatalf("check pickfirst returned non-nil error: %v", err)
|
t.Fatalf("check pickfirst returned non-nil error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// First addr update contains grpclb.
|
||||||
|
func TestSwitchBalancerGRPCLBFirst(t *testing.T) {
|
||||||
|
defer leakcheck.Check(t)
|
||||||
|
r, rcleanup := manual.GenerateAndRegisterManualResolver()
|
||||||
|
defer rcleanup()
|
||||||
|
|
||||||
|
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to dial: %v", err)
|
||||||
|
}
|
||||||
|
defer cc.Close()
|
||||||
|
|
||||||
|
// ClientConn will switch balancer to grpclb when receives an address of
|
||||||
|
// type GRPCLB.
|
||||||
|
r.NewAddress([]resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}})
|
||||||
|
var isGRPCLB bool
|
||||||
|
for i := 0; i < 5000; i++ {
|
||||||
|
cc.mu.Lock()
|
||||||
|
isGRPCLB = cc.curBalancerName == "grpclb"
|
||||||
|
cc.mu.Unlock()
|
||||||
|
if isGRPCLB {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
}
|
||||||
|
if !isGRPCLB {
|
||||||
|
t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// New update containing new backend and new grpclb. Should not switch
|
||||||
|
// balancer.
|
||||||
|
r.NewAddress([]resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}})
|
||||||
|
for i := 0; i < 200; i++ {
|
||||||
|
cc.mu.Lock()
|
||||||
|
isGRPCLB = cc.curBalancerName == "grpclb"
|
||||||
|
cc.mu.Unlock()
|
||||||
|
if !isGRPCLB {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
}
|
||||||
|
if !isGRPCLB {
|
||||||
|
t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb")
|
||||||
|
}
|
||||||
|
|
||||||
|
var isPickFirst bool
|
||||||
|
// Switch balancer to pickfirst.
|
||||||
|
r.NewAddress([]resolver.Address{{Addr: "backend"}})
|
||||||
|
for i := 0; i < 5000; i++ {
|
||||||
|
cc.mu.Lock()
|
||||||
|
isPickFirst = cc.curBalancerName == pickfirstName
|
||||||
|
cc.mu.Unlock()
|
||||||
|
if isPickFirst {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
}
|
||||||
|
if !isPickFirst {
|
||||||
|
t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// First addr update does not contain grpclb.
|
||||||
|
func TestSwitchBalancerGRPCLBSecond(t *testing.T) {
|
||||||
|
defer leakcheck.Check(t)
|
||||||
|
r, rcleanup := manual.GenerateAndRegisterManualResolver()
|
||||||
|
defer rcleanup()
|
||||||
|
|
||||||
|
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to dial: %v", err)
|
||||||
|
}
|
||||||
|
defer cc.Close()
|
||||||
|
|
||||||
|
r.NewAddress([]resolver.Address{{Addr: "backend"}})
|
||||||
|
var isPickFirst bool
|
||||||
|
for i := 0; i < 5000; i++ {
|
||||||
|
cc.mu.Lock()
|
||||||
|
isPickFirst = cc.curBalancerName == pickfirstName
|
||||||
|
cc.mu.Unlock()
|
||||||
|
if isPickFirst {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
}
|
||||||
|
if !isPickFirst {
|
||||||
|
t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClientConn will switch balancer to grpclb when receives an address of
|
||||||
|
// type GRPCLB.
|
||||||
|
r.NewAddress([]resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}})
|
||||||
|
var isGRPCLB bool
|
||||||
|
for i := 0; i < 5000; i++ {
|
||||||
|
cc.mu.Lock()
|
||||||
|
isGRPCLB = cc.curBalancerName == "grpclb"
|
||||||
|
cc.mu.Unlock()
|
||||||
|
if isGRPCLB {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
}
|
||||||
|
if !isGRPCLB {
|
||||||
|
t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// New update containing new backend and new grpclb. Should not switch
|
||||||
|
// balancer.
|
||||||
|
r.NewAddress([]resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}})
|
||||||
|
for i := 0; i < 200; i++ {
|
||||||
|
cc.mu.Lock()
|
||||||
|
isGRPCLB = cc.curBalancerName == "grpclb"
|
||||||
|
cc.mu.Unlock()
|
||||||
|
if !isGRPCLB {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
}
|
||||||
|
if !isGRPCLB {
|
||||||
|
t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Switch balancer back.
|
||||||
|
r.NewAddress([]resolver.Address{{Addr: "backend"}})
|
||||||
|
for i := 0; i < 5000; i++ {
|
||||||
|
cc.mu.Lock()
|
||||||
|
isPickFirst = cc.curBalancerName == pickfirstName
|
||||||
|
cc.mu.Unlock()
|
||||||
|
if isPickFirst {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
}
|
||||||
|
if !isPickFirst {
|
||||||
|
t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that if the current balancer is roundrobin, after switching to grpclb,
|
||||||
|
// when the resolved address doesn't contain grpclb addresses, balancer will be
|
||||||
|
// switched back to roundrobin.
|
||||||
|
func TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) {
|
||||||
|
defer leakcheck.Check(t)
|
||||||
|
r, rcleanup := manual.GenerateAndRegisterManualResolver()
|
||||||
|
defer rcleanup()
|
||||||
|
|
||||||
|
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to dial: %v", err)
|
||||||
|
}
|
||||||
|
defer cc.Close()
|
||||||
|
|
||||||
|
r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
|
||||||
|
|
||||||
|
r.NewAddress([]resolver.Address{{Addr: "backend"}})
|
||||||
|
var isRoundRobin bool
|
||||||
|
for i := 0; i < 5000; i++ {
|
||||||
|
cc.mu.Lock()
|
||||||
|
isRoundRobin = cc.curBalancerName == "round_robin"
|
||||||
|
cc.mu.Unlock()
|
||||||
|
if isRoundRobin {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
}
|
||||||
|
if !isRoundRobin {
|
||||||
|
t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClientConn will switch balancer to grpclb when receives an address of
|
||||||
|
// type GRPCLB.
|
||||||
|
r.NewAddress([]resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}})
|
||||||
|
var isGRPCLB bool
|
||||||
|
for i := 0; i < 5000; i++ {
|
||||||
|
cc.mu.Lock()
|
||||||
|
isGRPCLB = cc.curBalancerName == "grpclb"
|
||||||
|
cc.mu.Unlock()
|
||||||
|
if isGRPCLB {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
}
|
||||||
|
if !isGRPCLB {
|
||||||
|
t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Switch balancer back.
|
||||||
|
r.NewAddress([]resolver.Address{{Addr: "backend"}})
|
||||||
|
for i := 0; i < 5000; i++ {
|
||||||
|
cc.mu.Lock()
|
||||||
|
isRoundRobin = cc.curBalancerName == "round_robin"
|
||||||
|
cc.mu.Unlock()
|
||||||
|
if isRoundRobin {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
}
|
||||||
|
if !isRoundRobin {
|
||||||
|
t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that if resolved address list contains grpclb, the balancer option in
|
||||||
|
// service config won't take effect. But when there's no grpclb address in a new
|
||||||
|
// resolved address list, balancer will be switched to the new one.
|
||||||
|
func TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
|
||||||
|
defer leakcheck.Check(t)
|
||||||
|
r, rcleanup := manual.GenerateAndRegisterManualResolver()
|
||||||
|
defer rcleanup()
|
||||||
|
|
||||||
|
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to dial: %v", err)
|
||||||
|
}
|
||||||
|
defer cc.Close()
|
||||||
|
|
||||||
|
r.NewAddress([]resolver.Address{{Addr: "backend"}})
|
||||||
|
var isPickFirst bool
|
||||||
|
for i := 0; i < 5000; i++ {
|
||||||
|
cc.mu.Lock()
|
||||||
|
isPickFirst = cc.curBalancerName == pickfirstName
|
||||||
|
cc.mu.Unlock()
|
||||||
|
if isPickFirst {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
}
|
||||||
|
if !isPickFirst {
|
||||||
|
t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClientConn will switch balancer to grpclb when receives an address of
|
||||||
|
// type GRPCLB.
|
||||||
|
r.NewAddress([]resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}})
|
||||||
|
var isGRPCLB bool
|
||||||
|
for i := 0; i < 5000; i++ {
|
||||||
|
cc.mu.Lock()
|
||||||
|
isGRPCLB = cc.curBalancerName == "grpclb"
|
||||||
|
cc.mu.Unlock()
|
||||||
|
if isGRPCLB {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
}
|
||||||
|
if !isGRPCLB {
|
||||||
|
t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
|
||||||
|
}
|
||||||
|
|
||||||
|
r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
|
||||||
|
var isRoundRobin bool
|
||||||
|
for i := 0; i < 200; i++ {
|
||||||
|
cc.mu.Lock()
|
||||||
|
isRoundRobin = cc.curBalancerName == "round_robin"
|
||||||
|
cc.mu.Unlock()
|
||||||
|
if isRoundRobin {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
}
|
||||||
|
// Balancer shoult NOT switch to round_robin because resolved list contains
|
||||||
|
// grpclb.
|
||||||
|
if isRoundRobin {
|
||||||
|
t.Fatalf("within 200 ms, cc.balancer switched to round_robin, want grpclb")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Switch balancer back.
|
||||||
|
r.NewAddress([]resolver.Address{{Addr: "backend"}})
|
||||||
|
for i := 0; i < 5000; i++ {
|
||||||
|
cc.mu.Lock()
|
||||||
|
isRoundRobin = cc.curBalancerName == "round_robin"
|
||||||
|
cc.mu.Unlock()
|
||||||
|
if isRoundRobin {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
}
|
||||||
|
if !isRoundRobin {
|
||||||
|
t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -610,6 +610,7 @@ type ClientConn struct {
|
||||||
// Keepalive parameter can be updated if a GoAway is received.
|
// Keepalive parameter can be updated if a GoAway is received.
|
||||||
mkp keepalive.ClientParameters
|
mkp keepalive.ClientParameters
|
||||||
curBalancerName string
|
curBalancerName string
|
||||||
|
preBalancerName string // previous balancer name.
|
||||||
curAddresses []resolver.Address
|
curAddresses []resolver.Address
|
||||||
balancerWrapper *ccBalancerWrapper
|
balancerWrapper *ccBalancerWrapper
|
||||||
}
|
}
|
||||||
|
@ -667,42 +668,65 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(bar switching) when grpclb is submitted, check address type and start grpclb.
|
|
||||||
if cc.balancerWrapper == nil {
|
|
||||||
// First time handling resolved addresses. Build a balancer use either
|
|
||||||
// the builder specified by dial option, or pickfirst.
|
|
||||||
builder := cc.dopts.balancerBuilder
|
|
||||||
if builder == nil {
|
|
||||||
// No customBalancer was specified by DialOption, and this is the first
|
|
||||||
// time handling resolved addresses, create a pickfirst balancer.
|
|
||||||
builder = newPickfirstBuilder()
|
|
||||||
}
|
|
||||||
cc.curBalancerName = builder.Name()
|
|
||||||
cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
|
|
||||||
}
|
|
||||||
|
|
||||||
cc.curAddresses = addrs
|
cc.curAddresses = addrs
|
||||||
|
|
||||||
|
if cc.dopts.balancerBuilder != nil && cc.balancerWrapper == nil {
|
||||||
|
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
|
||||||
|
} else {
|
||||||
|
var isGRPCLB bool
|
||||||
|
for _, a := range addrs {
|
||||||
|
if a.Type == resolver.GRPCLB {
|
||||||
|
isGRPCLB = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var newBalancerName string
|
||||||
|
if isGRPCLB {
|
||||||
|
newBalancerName = grpclbName
|
||||||
|
} else {
|
||||||
|
// Address list doesn't contain grpclb address. Try to pick a
|
||||||
|
// non-grpclb balancer.
|
||||||
|
newBalancerName = cc.curBalancerName
|
||||||
|
// If current balancer is grpclb, switch to the previous one.
|
||||||
|
if newBalancerName == grpclbName {
|
||||||
|
newBalancerName = cc.preBalancerName
|
||||||
|
}
|
||||||
|
// The following could be true in two cases:
|
||||||
|
// - the first time handling resolved addresses
|
||||||
|
// (curBalancerName="")
|
||||||
|
// - the first time handling non-grpclb addresses
|
||||||
|
// (curBalancerName="grpclb", preBalancerName="")
|
||||||
|
if newBalancerName == "" {
|
||||||
|
newBalancerName = pickfirstName
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cc.switchBalancer(newBalancerName)
|
||||||
|
}
|
||||||
cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
|
cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// switchBalancer starts the switching from current balancer to the balancer with name.
|
// switchBalancer starts the switching from current balancer to the balancer
|
||||||
|
// with the given name.
|
||||||
|
//
|
||||||
|
// It will NOT send the current address list to the new balancer. If needed,
|
||||||
|
// caller of this function should send address list to the new balancer after
|
||||||
|
// this function returns.
|
||||||
//
|
//
|
||||||
// Caller must hold cc.mu.
|
// Caller must hold cc.mu.
|
||||||
func (cc *ClientConn) switchBalancer(name string) {
|
func (cc *ClientConn) switchBalancer(name string) {
|
||||||
if cc.conns == nil {
|
if cc.conns == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
grpclog.Infof("ClientConn switching balancer to %q", name)
|
|
||||||
|
|
||||||
if cc.dopts.balancerBuilder != nil {
|
|
||||||
grpclog.Infoln("ignoring service config balancer configuration: WithBalancer DialOption used instead")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
|
if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
grpclog.Infof("ClientConn switching balancer to %q", name)
|
||||||
|
if cc.dopts.balancerBuilder != nil {
|
||||||
|
grpclog.Infoln("ignoring balancer switching: WithBalancer DialOption used instead")
|
||||||
|
return
|
||||||
|
}
|
||||||
// TODO(bar switching) change this to two steps: drain and close.
|
// TODO(bar switching) change this to two steps: drain and close.
|
||||||
// Keep track of sc in wrapper.
|
// Keep track of sc in wrapper.
|
||||||
if cc.balancerWrapper != nil {
|
if cc.balancerWrapper != nil {
|
||||||
|
@ -711,12 +735,12 @@ func (cc *ClientConn) switchBalancer(name string) {
|
||||||
|
|
||||||
builder := balancer.Get(name)
|
builder := balancer.Get(name)
|
||||||
if builder == nil {
|
if builder == nil {
|
||||||
grpclog.Infof("failed to get balancer builder for: %v (this should never happen...)", name)
|
grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
|
||||||
builder = newPickfirstBuilder()
|
builder = newPickfirstBuilder()
|
||||||
}
|
}
|
||||||
|
cc.preBalancerName = cc.curBalancerName
|
||||||
cc.curBalancerName = builder.Name()
|
cc.curBalancerName = builder.Name()
|
||||||
cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
|
cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
|
||||||
cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
|
func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
|
||||||
|
@ -867,8 +891,18 @@ func (cc *ClientConn) handleServiceConfig(js string) error {
|
||||||
cc.mu.Lock()
|
cc.mu.Lock()
|
||||||
cc.scRaw = js
|
cc.scRaw = js
|
||||||
cc.sc = sc
|
cc.sc = sc
|
||||||
if sc.LB != nil {
|
if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
|
||||||
cc.switchBalancer(*sc.LB)
|
if cc.curBalancerName == grpclbName {
|
||||||
|
// If current balancer is grpclb, there's at least one grpclb
|
||||||
|
// balancer address in the resolved list. Don't switch the balancer,
|
||||||
|
// but change the previous balancer name, so if a new resolved
|
||||||
|
// address list doesn't contain grpclb address, balancer will be
|
||||||
|
// switched to *sc.LB.
|
||||||
|
cc.preBalancerName = *sc.LB
|
||||||
|
} else {
|
||||||
|
cc.switchBalancer(*sc.LB)
|
||||||
|
cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
cc.mu.Unlock()
|
cc.mu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
|
|
12
grpclb.go
12
grpclb.go
|
@ -36,6 +36,7 @@ import (
|
||||||
const (
|
const (
|
||||||
lbTokeyKey = "lb-token"
|
lbTokeyKey = "lb-token"
|
||||||
defaultFallbackTimeout = 10 * time.Second
|
defaultFallbackTimeout = 10 * time.Second
|
||||||
|
grpclbName = "grpclb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func convertDuration(d *lbpb.Duration) time.Duration {
|
func convertDuration(d *lbpb.Duration) time.Duration {
|
||||||
|
@ -82,12 +83,15 @@ func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
|
||||||
return m, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewLBBuilder creates a builder for grpclb. For testing only.
|
// newLBBuilder creates a builder for grpclb.
|
||||||
func NewLBBuilder() balancer.Builder {
|
func newLBBuilder() balancer.Builder {
|
||||||
// TODO(bar grpclb) this function is exported for testing only, remove it when resolver supports selecting grpclb.
|
|
||||||
return NewLBBuilderWithFallbackTimeout(defaultFallbackTimeout)
|
return NewLBBuilderWithFallbackTimeout(defaultFallbackTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
balancer.Register(newLBBuilder())
|
||||||
|
}
|
||||||
|
|
||||||
// NewLBBuilderWithFallbackTimeout creates a grpclb builder with the given
|
// NewLBBuilderWithFallbackTimeout creates a grpclb builder with the given
|
||||||
// fallbackTimeout. If no response is received from the remote balancer within
|
// fallbackTimeout. If no response is received from the remote balancer within
|
||||||
// fallbackTimeout, the backend addresses from the resolved address list will be
|
// fallbackTimeout, the backend addresses from the resolved address list will be
|
||||||
|
@ -105,7 +109,7 @@ type lbBuilder struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *lbBuilder) Name() string {
|
func (b *lbBuilder) Name() string {
|
||||||
return "grpclb"
|
return grpclbName
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
|
func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
|
||||||
|
|
|
@ -344,7 +344,6 @@ func TestGRPCLB(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
|
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
|
||||||
grpc.WithBalancerBuilder(grpc.NewLBBuilder()),
|
|
||||||
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
|
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to dial to the backend %v", err)
|
t.Fatalf("Failed to dial to the backend %v", err)
|
||||||
|
@ -396,7 +395,6 @@ func TestGRPCLBWeighted(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
|
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
|
||||||
grpc.WithBalancerBuilder(grpc.NewLBBuilder()),
|
|
||||||
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
|
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to dial to the backend %v", err)
|
t.Fatalf("Failed to dial to the backend %v", err)
|
||||||
|
@ -462,7 +460,6 @@ func TestDropRequest(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
|
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
|
||||||
grpc.WithBalancerBuilder(grpc.NewLBBuilder()),
|
|
||||||
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
|
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to dial to the backend %v", err)
|
t.Fatalf("Failed to dial to the backend %v", err)
|
||||||
|
@ -537,7 +534,6 @@ func TestBalancerDisconnects(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
|
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
|
||||||
grpc.WithBalancerBuilder(grpc.NewLBBuilder()),
|
|
||||||
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
|
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to dial to the backend %v", err)
|
t.Fatalf("Failed to dial to the backend %v", err)
|
||||||
|
@ -710,7 +706,6 @@ func runAndGetStats(t *testing.T, dropForLoadBalancing, dropForRateLimiting bool
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
|
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
|
||||||
grpc.WithBalancerBuilder(grpc.NewLBBuilder()),
|
|
||||||
grpc.WithTransportCredentials(&creds),
|
grpc.WithTransportCredentials(&creds),
|
||||||
grpc.WithPerRPCCredentials(failPreRPCCred{}),
|
grpc.WithPerRPCCredentials(failPreRPCCred{}),
|
||||||
grpc.WithDialer(fakeNameDialer))
|
grpc.WithDialer(fakeNameDialer))
|
||||||
|
|
|
@ -26,6 +26,8 @@ import (
|
||||||
"google.golang.org/grpc/resolver"
|
"google.golang.org/grpc/resolver"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const pickfirstName = "pick_first"
|
||||||
|
|
||||||
func newPickfirstBuilder() balancer.Builder {
|
func newPickfirstBuilder() balancer.Builder {
|
||||||
return &pickfirstBuilder{}
|
return &pickfirstBuilder{}
|
||||||
}
|
}
|
||||||
|
@ -37,7 +39,7 @@ func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*pickfirstBuilder) Name() string {
|
func (*pickfirstBuilder) Name() string {
|
||||||
return "pick_first"
|
return pickfirstName
|
||||||
}
|
}
|
||||||
|
|
||||||
type pickfirstBalancer struct {
|
type pickfirstBalancer struct {
|
||||||
|
|
Loading…
Reference in New Issue