mirror of https://github.com/grpc/grpc-go.git
balancer/base: keep address attributes for pickers (#4253)
This commit is contained in:
parent
702608ffae
commit
80e380eff4
|
|
@ -22,6 +22,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
|
||||
"google.golang.org/grpc/attributes"
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
|
|
@ -41,7 +42,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions)
|
|||
cc: cc,
|
||||
pickerBuilder: bb.pickerBuilder,
|
||||
|
||||
subConns: make(map[resolver.Address]balancer.SubConn),
|
||||
subConns: make(map[resolver.Address]subConnInfo),
|
||||
scStates: make(map[balancer.SubConn]connectivity.State),
|
||||
csEvltr: &balancer.ConnectivityStateEvaluator{},
|
||||
config: bb.config,
|
||||
|
|
@ -57,6 +58,11 @@ func (bb *baseBuilder) Name() string {
|
|||
return bb.name
|
||||
}
|
||||
|
||||
type subConnInfo struct {
|
||||
subConn balancer.SubConn
|
||||
attrs *attributes.Attributes
|
||||
}
|
||||
|
||||
type baseBalancer struct {
|
||||
cc balancer.ClientConn
|
||||
pickerBuilder PickerBuilder
|
||||
|
|
@ -64,7 +70,7 @@ type baseBalancer struct {
|
|||
csEvltr *balancer.ConnectivityStateEvaluator
|
||||
state connectivity.State
|
||||
|
||||
subConns map[resolver.Address]balancer.SubConn // `attributes` is stripped from the keys of this map (the addresses)
|
||||
subConns map[resolver.Address]subConnInfo // `attributes` is stripped from the keys of this map (the addresses)
|
||||
scStates map[balancer.SubConn]connectivity.State
|
||||
picker balancer.Picker
|
||||
config Config
|
||||
|
|
@ -114,7 +120,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
|
|||
aNoAttrs := a
|
||||
aNoAttrs.Attributes = nil
|
||||
addrsSet[aNoAttrs] = struct{}{}
|
||||
if sc, ok := b.subConns[aNoAttrs]; !ok {
|
||||
if scInfo, ok := b.subConns[aNoAttrs]; !ok {
|
||||
// a is a new address (not existing in b.subConns).
|
||||
//
|
||||
// When creating SubConn, the original address with attributes is
|
||||
|
|
@ -125,7 +131,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
|
|||
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
|
||||
continue
|
||||
}
|
||||
b.subConns[aNoAttrs] = sc
|
||||
b.subConns[aNoAttrs] = subConnInfo{subConn: sc, attrs: a.Attributes}
|
||||
b.scStates[sc] = connectivity.Idle
|
||||
sc.Connect()
|
||||
} else {
|
||||
|
|
@ -135,13 +141,15 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
|
|||
// The SubConn does a reflect.DeepEqual of the new and old
|
||||
// addresses. So this is a noop if the current address is the same
|
||||
// as the old one (including attributes).
|
||||
b.cc.UpdateAddresses(sc, []resolver.Address{a})
|
||||
scInfo.attrs = a.Attributes
|
||||
b.subConns[aNoAttrs] = scInfo
|
||||
b.cc.UpdateAddresses(scInfo.subConn, []resolver.Address{a})
|
||||
}
|
||||
}
|
||||
for a, sc := range b.subConns {
|
||||
for a, scInfo := range b.subConns {
|
||||
// a was removed by resolver.
|
||||
if _, ok := addrsSet[a]; !ok {
|
||||
b.cc.RemoveSubConn(sc)
|
||||
b.cc.RemoveSubConn(scInfo.subConn)
|
||||
delete(b.subConns, a)
|
||||
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
|
||||
// The entry will be deleted in UpdateSubConnState.
|
||||
|
|
@ -184,9 +192,10 @@ func (b *baseBalancer) regeneratePicker() {
|
|||
readySCs := make(map[balancer.SubConn]SubConnInfo)
|
||||
|
||||
// Filter out all ready SCs from full subConn map.
|
||||
for addr, sc := range b.subConns {
|
||||
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
|
||||
readySCs[sc] = SubConnInfo{Address: addr}
|
||||
for addr, scInfo := range b.subConns {
|
||||
if st, ok := b.scStates[scInfo.subConn]; ok && st == connectivity.Ready {
|
||||
addr.Attributes = scInfo.attrs
|
||||
readySCs[scInfo.subConn] = SubConnInfo{Address: addr}
|
||||
}
|
||||
}
|
||||
b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import (
|
|||
|
||||
"google.golang.org/grpc/attributes"
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/resolver"
|
||||
)
|
||||
|
||||
|
|
@ -35,12 +36,24 @@ func (c *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewS
|
|||
return c.newSubConn(addrs, opts)
|
||||
}
|
||||
|
||||
func (c *testClientConn) UpdateState(balancer.State) {}
|
||||
|
||||
type testSubConn struct{}
|
||||
|
||||
func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {}
|
||||
|
||||
func (sc *testSubConn) Connect() {}
|
||||
|
||||
// testPickBuilder creates balancer.Picker for test.
|
||||
type testPickBuilder struct {
|
||||
validate func(info PickerBuildInfo)
|
||||
}
|
||||
|
||||
func (p *testPickBuilder) Build(info PickerBuildInfo) balancer.Picker {
|
||||
p.validate(info)
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestBaseBalancerStripAttributes(t *testing.T) {
|
||||
b := (&baseBuilder{}).Build(&testClientConn{
|
||||
newSubConn: func(addrs []resolver.Address, _ balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
||||
|
|
@ -64,7 +77,46 @@ func TestBaseBalancerStripAttributes(t *testing.T) {
|
|||
|
||||
for addr := range b.subConns {
|
||||
if addr.Attributes != nil {
|
||||
t.Errorf("in b.subConns, got address %+v with nil attributes, want not nil", addr)
|
||||
t.Errorf("in b.subConns, got address %+v with not nil attributes, want nil", addr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBaseBalancerReserveAttributes(t *testing.T) {
|
||||
var v = func(info PickerBuildInfo) {
|
||||
for _, sc := range info.ReadySCs {
|
||||
if sc.Address.Addr == "1.1.1.1" {
|
||||
if sc.Address.Attributes == nil {
|
||||
t.Errorf("in picker.validate, got address %+v with nil attributes, want not nil", sc.Address)
|
||||
}
|
||||
foo, ok := sc.Address.Attributes.Value("foo").(string)
|
||||
if !ok || foo != "2233niang" {
|
||||
t.Errorf("in picker.validate, got address[1.1.1.1] with invalid attributes value %v, want 2233niang", sc.Address.Attributes.Value("foo"))
|
||||
}
|
||||
} else if sc.Address.Addr == "2.2.2.2" {
|
||||
if sc.Address.Attributes != nil {
|
||||
t.Error("in b.subConns, got address[2.2.2.2] with not nil attributes, want nil")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
pickBuilder := &testPickBuilder{validate: v}
|
||||
b := (&baseBuilder{pickerBuilder: pickBuilder}).Build(&testClientConn{
|
||||
newSubConn: func(addrs []resolver.Address, _ balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
||||
return &testSubConn{}, nil
|
||||
},
|
||||
}, balancer.BuildOptions{}).(*baseBalancer)
|
||||
|
||||
b.UpdateClientConnState(balancer.ClientConnState{
|
||||
ResolverState: resolver.State{
|
||||
Addresses: []resolver.Address{
|
||||
{Addr: "1.1.1.1", Attributes: attributes.New("foo", "2233niang")},
|
||||
{Addr: "2.2.2.2", Attributes: nil},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
for sc := range b.scStates {
|
||||
b.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready, ConnectionError: nil})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue