mirror of https://github.com/grpc/grpc-go.git
base: update base balancer for new APIs (#6503)
This commit is contained in:
parent
6c0c69efd5
commit
e9a4e942b1
|
|
@ -105,7 +105,12 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
|
|||
addrsSet.Set(a, nil)
|
||||
if _, ok := b.subConns.Get(a); !ok {
|
||||
// a is a new address (not existing in b.subConns).
|
||||
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
|
||||
var sc balancer.SubConn
|
||||
opts := balancer.NewSubConnOptions{
|
||||
HealthCheckEnabled: b.config.HealthCheck,
|
||||
StateListener: func(scs balancer.SubConnState) { b.updateSubConnState(sc, scs) },
|
||||
}
|
||||
sc, err := b.cc.NewSubConn([]resolver.Address{a}, opts)
|
||||
if err != nil {
|
||||
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
|
||||
continue
|
||||
|
|
@ -124,7 +129,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
|
|||
sc.Shutdown()
|
||||
b.subConns.Delete(a)
|
||||
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
|
||||
// The entry will be deleted in UpdateSubConnState.
|
||||
// The entry will be deleted in updateSubConnState.
|
||||
}
|
||||
}
|
||||
// If resolver state contains no addresses, return an error so ClientConn
|
||||
|
|
@ -177,7 +182,12 @@ func (b *baseBalancer) regeneratePicker() {
|
|||
b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
|
||||
}
|
||||
|
||||
// UpdateSubConnState is a nop because a StateListener is always set in NewSubConn.
|
||||
func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
|
||||
logger.Errorf("base.baseBalancer: UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
|
||||
}
|
||||
|
||||
func (b *baseBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
|
||||
s := state.ConnectivityState
|
||||
if logger.V(2) {
|
||||
logger.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
|
||||
|
|
|
|||
|
|
@ -19,7 +19,9 @@
|
|||
package base
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/attributes"
|
||||
"google.golang.org/grpc/balancer"
|
||||
|
|
@ -38,7 +40,9 @@ func (c *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewS
|
|||
|
||||
func (c *testClientConn) UpdateState(balancer.State) {}
|
||||
|
||||
type testSubConn struct{}
|
||||
type testSubConn struct {
|
||||
updateState func(balancer.SubConnState)
|
||||
}
|
||||
|
||||
func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {}
|
||||
|
||||
|
|
@ -61,7 +65,11 @@ func (p *testPickBuilder) Build(info PickerBuildInfo) balancer.Picker {
|
|||
}
|
||||
|
||||
func TestBaseBalancerReserveAttributes(t *testing.T) {
|
||||
var v = func(info PickerBuildInfo) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
validated := make(chan struct{}, 1)
|
||||
v := func(info PickerBuildInfo) {
|
||||
defer func() { validated <- struct{}{} }()
|
||||
for _, sc := range info.ReadySCs {
|
||||
if sc.Address.Addr == "1.1.1.1" {
|
||||
if sc.Address.Attributes == nil {
|
||||
|
|
@ -80,8 +88,8 @@ func TestBaseBalancerReserveAttributes(t *testing.T) {
|
|||
}
|
||||
pickBuilder := &testPickBuilder{validate: v}
|
||||
b := (&baseBuilder{pickerBuilder: pickBuilder}).Build(&testClientConn{
|
||||
newSubConn: func(addrs []resolver.Address, _ balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
||||
return &testSubConn{}, nil
|
||||
newSubConn: func(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
||||
return &testSubConn{updateState: opts.StateListener}, nil
|
||||
},
|
||||
}, balancer.BuildOptions{}).(*baseBalancer)
|
||||
|
||||
|
|
@ -93,8 +101,18 @@ func TestBaseBalancerReserveAttributes(t *testing.T) {
|
|||
},
|
||||
},
|
||||
})
|
||||
select {
|
||||
case <-validated:
|
||||
case <-ctx.Done():
|
||||
t.Fatalf("timed out waiting for UpdateClientConnState to call picker.Build")
|
||||
}
|
||||
|
||||
for sc := range b.scStates {
|
||||
b.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready, ConnectionError: nil})
|
||||
sc.(*testSubConn).updateState(balancer.SubConnState{ConnectivityState: connectivity.Ready, ConnectionError: nil})
|
||||
select {
|
||||
case <-validated:
|
||||
case <-ctx.Done():
|
||||
t.Fatalf("timed out waiting for UpdateClientConnState to call picker.Build")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue