mirror of https://github.com/grpc/grpc-go.git
balancergroup: Propagate balancer.BuildOptions to child policies (#4184)
This commit is contained in:
parent
b753f4903c
commit
9280052d36
|
|
@ -60,6 +60,8 @@ type subBalancerWrapper struct {
|
||||||
// The static part of sub-balancer. Keeps balancerBuilders and addresses.
|
// The static part of sub-balancer. Keeps balancerBuilders and addresses.
|
||||||
// To be used when restarting sub-balancer.
|
// To be used when restarting sub-balancer.
|
||||||
builder balancer.Builder
|
builder balancer.Builder
|
||||||
|
// Options to be passed to sub-balancer at the time of creation.
|
||||||
|
buildOpts balancer.BuildOptions
|
||||||
// ccState is a cache of the addresses/balancer config, so when the balancer
|
// ccState is a cache of the addresses/balancer config, so when the balancer
|
||||||
// is restarted after close, it will get the previous update. It's a pointer
|
// is restarted after close, it will get the previous update. It's a pointer
|
||||||
// and is set to nil at init, so when the balancer is built for the first
|
// and is set to nil at init, so when the balancer is built for the first
|
||||||
|
|
@ -94,7 +96,7 @@ func (sbc *subBalancerWrapper) updateBalancerStateWithCachedPicker() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sbc *subBalancerWrapper) startBalancer() {
|
func (sbc *subBalancerWrapper) startBalancer() {
|
||||||
b := sbc.builder.Build(sbc, balancer.BuildOptions{})
|
b := sbc.builder.Build(sbc, sbc.buildOpts)
|
||||||
sbc.group.logger.Infof("Created child policy %p of type %v", b, sbc.builder.Name())
|
sbc.group.logger.Infof("Created child policy %p of type %v", b, sbc.builder.Name())
|
||||||
sbc.balancer = b
|
sbc.balancer = b
|
||||||
if sbc.ccState != nil {
|
if sbc.ccState != nil {
|
||||||
|
|
@ -179,6 +181,7 @@ func (sbc *subBalancerWrapper) stopBalancer() {
|
||||||
// balancer group.
|
// balancer group.
|
||||||
type BalancerGroup struct {
|
type BalancerGroup struct {
|
||||||
cc balancer.ClientConn
|
cc balancer.ClientConn
|
||||||
|
buildOpts balancer.BuildOptions
|
||||||
logger *grpclog.PrefixLogger
|
logger *grpclog.PrefixLogger
|
||||||
loadStore load.PerClusterReporter
|
loadStore load.PerClusterReporter
|
||||||
|
|
||||||
|
|
@ -235,9 +238,12 @@ var DefaultSubBalancerCloseTimeout = 15 * time.Minute
|
||||||
|
|
||||||
// New creates a new BalancerGroup. Note that the BalancerGroup
|
// New creates a new BalancerGroup. Note that the BalancerGroup
|
||||||
// needs to be started to work.
|
// needs to be started to work.
|
||||||
func New(cc balancer.ClientConn, stateAggregator BalancerStateAggregator, loadStore load.PerClusterReporter, logger *grpclog.PrefixLogger) *BalancerGroup {
|
//
|
||||||
|
// TODO(easwars): Pass an options struct instead of N args.
|
||||||
|
func New(cc balancer.ClientConn, bOpts balancer.BuildOptions, stateAggregator BalancerStateAggregator, loadStore load.PerClusterReporter, logger *grpclog.PrefixLogger) *BalancerGroup {
|
||||||
return &BalancerGroup{
|
return &BalancerGroup{
|
||||||
cc: cc,
|
cc: cc,
|
||||||
|
buildOpts: bOpts,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
loadStore: loadStore,
|
loadStore: loadStore,
|
||||||
|
|
||||||
|
|
@ -305,6 +311,7 @@ func (bg *BalancerGroup) Add(id string, builder balancer.Builder) {
|
||||||
id: id,
|
id: id,
|
||||||
group: bg,
|
group: bg,
|
||||||
builder: builder,
|
builder: builder,
|
||||||
|
buildOpts: bg.buildOpts,
|
||||||
}
|
}
|
||||||
if bg.outgoingStarted {
|
if bg.outgoingStarted {
|
||||||
// Only start the balancer if bg is started. Otherwise, we only keep the
|
// Only start the balancer if bg is started. Otherwise, we only keep the
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,8 @@ import (
|
||||||
"google.golang.org/grpc/balancer"
|
"google.golang.org/grpc/balancer"
|
||||||
"google.golang.org/grpc/balancer/roundrobin"
|
"google.golang.org/grpc/balancer/roundrobin"
|
||||||
"google.golang.org/grpc/connectivity"
|
"google.golang.org/grpc/connectivity"
|
||||||
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
|
"google.golang.org/grpc/internal/balancer/stub"
|
||||||
"google.golang.org/grpc/resolver"
|
"google.golang.org/grpc/resolver"
|
||||||
"google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator"
|
"google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator"
|
||||||
"google.golang.org/grpc/xds/internal/client/load"
|
"google.golang.org/grpc/xds/internal/client/load"
|
||||||
|
|
@ -74,7 +76,7 @@ func newTestBalancerGroup(t *testing.T, loadStore load.PerClusterReporter) (*tes
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
|
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
|
||||||
gator.Start()
|
gator.Start()
|
||||||
bg := New(cc, gator, loadStore, nil)
|
bg := New(cc, balancer.BuildOptions{}, gator, loadStore, nil)
|
||||||
bg.Start()
|
bg.Start()
|
||||||
return cc, gator, bg
|
return cc, gator, bg
|
||||||
}
|
}
|
||||||
|
|
@ -501,7 +503,7 @@ func (s) TestBalancerGroup_start_close(t *testing.T) {
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
|
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
|
||||||
gator.Start()
|
gator.Start()
|
||||||
bg := New(cc, gator, nil, nil)
|
bg := New(cc, balancer.BuildOptions{}, gator, nil, nil)
|
||||||
|
|
||||||
// Add two balancers to group and send two resolved addresses to both
|
// Add two balancers to group and send two resolved addresses to both
|
||||||
// balancers.
|
// balancers.
|
||||||
|
|
@ -590,16 +592,20 @@ func (s) TestBalancerGroup_start_close(t *testing.T) {
|
||||||
// whenever it gets an address update. It's expected that start() doesn't block
|
// whenever it gets an address update. It's expected that start() doesn't block
|
||||||
// because of deadlock.
|
// because of deadlock.
|
||||||
func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
|
func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
|
||||||
|
const balancerName = "stub-TestBalancerGroup_start_close_deadlock"
|
||||||
|
stub.Register(balancerName, stub.BalancerFuncs{})
|
||||||
|
builder := balancer.Get(balancerName)
|
||||||
|
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
|
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
|
||||||
gator.Start()
|
gator.Start()
|
||||||
bg := New(cc, gator, nil, nil)
|
bg := New(cc, balancer.BuildOptions{}, gator, nil, nil)
|
||||||
|
|
||||||
gator.Add(testBalancerIDs[0], 2)
|
gator.Add(testBalancerIDs[0], 2)
|
||||||
bg.Add(testBalancerIDs[0], &testutils.TestConstBalancerBuilder{})
|
bg.Add(testBalancerIDs[0], builder)
|
||||||
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
|
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
|
||||||
gator.Add(testBalancerIDs[1], 1)
|
gator.Add(testBalancerIDs[1], 1)
|
||||||
bg.Add(testBalancerIDs[1], &testutils.TestConstBalancerBuilder{})
|
bg.Add(testBalancerIDs[1], builder)
|
||||||
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
|
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
|
||||||
|
|
||||||
bg.Start()
|
bg.Start()
|
||||||
|
|
@ -695,7 +701,7 @@ func initBalancerGroupForCachingTest(t *testing.T) (*weightedaggregator.Aggregat
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
|
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
|
||||||
gator.Start()
|
gator.Start()
|
||||||
bg := New(cc, gator, nil, nil)
|
bg := New(cc, balancer.BuildOptions{}, gator, nil, nil)
|
||||||
|
|
||||||
// Add two balancers to group and send two resolved addresses to both
|
// Add two balancers to group and send two resolved addresses to both
|
||||||
// balancers.
|
// balancers.
|
||||||
|
|
@ -931,3 +937,43 @@ func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *test
|
||||||
t.Fatalf("want %v, got %v", want, err)
|
t.Fatalf("want %v, got %v", want, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestBalancerGroupBuildOptions verifies that the balancer.BuildOptions passed
|
||||||
|
// to the balancergroup at creation time is passed to child policies.
|
||||||
|
func (s) TestBalancerGroupBuildOptions(t *testing.T) {
|
||||||
|
const (
|
||||||
|
balancerName = "stubBalancer-TestBalancerGroupBuildOptions"
|
||||||
|
parent = int64(1234)
|
||||||
|
userAgent = "ua"
|
||||||
|
defaultTestTimeout = 1 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
// Setup the stub balancer such that we can read the build options passed to
|
||||||
|
// it in the UpdateClientConnState method.
|
||||||
|
bOpts := balancer.BuildOptions{
|
||||||
|
DialCreds: insecure.NewCredentials(),
|
||||||
|
ChannelzParentID: parent,
|
||||||
|
CustomUserAgent: userAgent,
|
||||||
|
}
|
||||||
|
stub.Register(balancerName, stub.BalancerFuncs{
|
||||||
|
UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
|
||||||
|
if !cmp.Equal(bd.BuildOptions, bOpts) {
|
||||||
|
return fmt.Errorf("buildOptions in child balancer: %v, want %v", bd, bOpts)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
cc := testutils.NewTestClientConn(t)
|
||||||
|
bg := New(cc, bOpts, nil, nil, nil)
|
||||||
|
bg.Start()
|
||||||
|
|
||||||
|
// Add the stub balancer build above as a child policy.
|
||||||
|
balancerBuilder := balancer.Get(balancerName)
|
||||||
|
bg.Add(testBalancerIDs[0], balancerBuilder)
|
||||||
|
|
||||||
|
// Send an empty clientConn state change. This should trigger the
|
||||||
|
// verification of the buildOptions being passed to the child policy.
|
||||||
|
if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -40,12 +40,12 @@ func init() {
|
||||||
|
|
||||||
type builder struct{}
|
type builder struct{}
|
||||||
|
|
||||||
func (builder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
|
func (builder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
|
||||||
b := &bal{}
|
b := &bal{}
|
||||||
b.logger = prefixLogger(b)
|
b.logger = prefixLogger(b)
|
||||||
b.stateAggregator = newBalancerStateAggregator(cc, b.logger)
|
b.stateAggregator = newBalancerStateAggregator(cc, b.logger)
|
||||||
b.stateAggregator.start()
|
b.stateAggregator.start()
|
||||||
b.bg = balancergroup.New(cc, b.stateAggregator, nil, b.logger)
|
b.bg = balancergroup.New(cc, opts, b.stateAggregator, nil, b.logger)
|
||||||
b.bg.Start()
|
b.bg.Start()
|
||||||
b.logger.Infof("Created")
|
b.logger.Infof("Created")
|
||||||
return b
|
return b
|
||||||
|
|
|
||||||
|
|
@ -29,8 +29,11 @@ import (
|
||||||
"google.golang.org/grpc/balancer/roundrobin"
|
"google.golang.org/grpc/balancer/roundrobin"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/connectivity"
|
"google.golang.org/grpc/connectivity"
|
||||||
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
|
"google.golang.org/grpc/internal/balancer/stub"
|
||||||
"google.golang.org/grpc/internal/grpctest"
|
"google.golang.org/grpc/internal/grpctest"
|
||||||
"google.golang.org/grpc/internal/hierarchy"
|
"google.golang.org/grpc/internal/hierarchy"
|
||||||
|
itestutils "google.golang.org/grpc/internal/testutils"
|
||||||
"google.golang.org/grpc/resolver"
|
"google.golang.org/grpc/resolver"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
|
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
|
||||||
|
|
@ -510,3 +513,55 @@ func TestRoutingConfigUpdateDeleteAll(t *testing.T) {
|
||||||
testPick(t, p3, tt.pickInfo, tt.wantSC, tt.wantErr)
|
testPick(t, p3, tt.pickInfo, tt.wantSC, tt.wantErr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) {
|
||||||
|
const (
|
||||||
|
balancerName = "stubBalancer-TestClusterManagerForwardsBalancerBuildOptions"
|
||||||
|
parent = int64(1234)
|
||||||
|
userAgent = "ua"
|
||||||
|
defaultTestTimeout = 1 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
// Setup the stub balancer such that we can read the build options passed to
|
||||||
|
// it in the UpdateClientConnState method.
|
||||||
|
ccsCh := itestutils.NewChannel()
|
||||||
|
bOpts := balancer.BuildOptions{
|
||||||
|
DialCreds: insecure.NewCredentials(),
|
||||||
|
ChannelzParentID: parent,
|
||||||
|
CustomUserAgent: userAgent,
|
||||||
|
}
|
||||||
|
stub.Register(balancerName, stub.BalancerFuncs{
|
||||||
|
UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
|
||||||
|
if !cmp.Equal(bd.BuildOptions, bOpts) {
|
||||||
|
err := fmt.Errorf("buildOptions in child balancer: %v, want %v", bd, bOpts)
|
||||||
|
ccsCh.Send(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
ccsCh.Send(nil)
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
cc := testutils.NewTestClientConn(t)
|
||||||
|
rtb := rtBuilder.Build(cc, bOpts)
|
||||||
|
|
||||||
|
configJSON1 := fmt.Sprintf(`{
|
||||||
|
"children": {
|
||||||
|
"cds:cluster_1":{ "childPolicy": [{"%s":""}] }
|
||||||
|
}
|
||||||
|
}`, balancerName)
|
||||||
|
config1, err := rtParser.ParseConfig([]byte(configJSON1))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to parse balancer config: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := rtb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: config1}); err != nil {
|
||||||
|
t.Fatalf("failed to update ClientConn state: %v", err)
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||||
|
defer cancel()
|
||||||
|
if v, err := ccsCh.Receive(ctx); err != nil {
|
||||||
|
err2 := v.(error)
|
||||||
|
t.Fatal(err2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -48,8 +48,8 @@ type xdsClientInterface interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
newEDSBalancer = func(cc balancer.ClientConn, enqueueState func(priorityType, balancer.State), lw load.PerClusterReporter, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
|
newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions, enqueueState func(priorityType, balancer.State), lw load.PerClusterReporter, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
|
||||||
return newEDSBalancerImpl(cc, enqueueState, lw, logger)
|
return newEDSBalancerImpl(cc, opts, enqueueState, lw, logger)
|
||||||
}
|
}
|
||||||
newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() }
|
newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() }
|
||||||
)
|
)
|
||||||
|
|
@ -61,7 +61,7 @@ func init() {
|
||||||
type edsBalancerBuilder struct{}
|
type edsBalancerBuilder struct{}
|
||||||
|
|
||||||
// Build helps implement the balancer.Builder interface.
|
// Build helps implement the balancer.Builder interface.
|
||||||
func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
|
func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
|
||||||
x := &edsBalancer{
|
x := &edsBalancer{
|
||||||
cc: cc,
|
cc: cc,
|
||||||
closed: grpcsync.NewEvent(),
|
closed: grpcsync.NewEvent(),
|
||||||
|
|
@ -80,7 +80,7 @@ func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptio
|
||||||
}
|
}
|
||||||
|
|
||||||
x.xdsClient = client
|
x.xdsClient = client
|
||||||
x.edsImpl = newEDSBalancer(x.cc, x.enqueueChildBalancerState, x.lsw, x.logger)
|
x.edsImpl = newEDSBalancer(x.cc, opts, x.enqueueChildBalancerState, x.lsw, x.logger)
|
||||||
x.logger.Infof("Created")
|
x.logger.Infof("Created")
|
||||||
go x.run()
|
go x.run()
|
||||||
return x
|
return x
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
|
|
||||||
"google.golang.org/grpc/balancer"
|
"google.golang.org/grpc/balancer"
|
||||||
"google.golang.org/grpc/balancer/base"
|
"google.golang.org/grpc/balancer/base"
|
||||||
"google.golang.org/grpc/balancer/roundrobin"
|
"google.golang.org/grpc/balancer/roundrobin"
|
||||||
|
|
@ -65,6 +66,7 @@ type balancerGroupWithConfig struct {
|
||||||
// policy is used to manage endpoints in each locality.
|
// policy is used to manage endpoints in each locality.
|
||||||
type edsBalancerImpl struct {
|
type edsBalancerImpl struct {
|
||||||
cc balancer.ClientConn
|
cc balancer.ClientConn
|
||||||
|
buildOpts balancer.BuildOptions
|
||||||
logger *grpclog.PrefixLogger
|
logger *grpclog.PrefixLogger
|
||||||
loadReporter load.PerClusterReporter
|
loadReporter load.PerClusterReporter
|
||||||
|
|
||||||
|
|
@ -102,9 +104,10 @@ type edsBalancerImpl struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// newEDSBalancerImpl create a new edsBalancerImpl.
|
// newEDSBalancerImpl create a new edsBalancerImpl.
|
||||||
func newEDSBalancerImpl(cc balancer.ClientConn, enqueueState func(priorityType, balancer.State), lr load.PerClusterReporter, logger *grpclog.PrefixLogger) *edsBalancerImpl {
|
func newEDSBalancerImpl(cc balancer.ClientConn, bOpts balancer.BuildOptions, enqueueState func(priorityType, balancer.State), lr load.PerClusterReporter, logger *grpclog.PrefixLogger) *edsBalancerImpl {
|
||||||
edsImpl := &edsBalancerImpl{
|
edsImpl := &edsBalancerImpl{
|
||||||
cc: cc,
|
cc: cc,
|
||||||
|
buildOpts: bOpts,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
subBalancerBuilder: balancer.Get(roundrobin.Name),
|
subBalancerBuilder: balancer.Get(roundrobin.Name),
|
||||||
loadReporter: lr,
|
loadReporter: lr,
|
||||||
|
|
@ -248,7 +251,7 @@ func (edsImpl *edsBalancerImpl) handleEDSResponse(edsResp xdsclient.EndpointsUpd
|
||||||
ccPriorityWrapper := edsImpl.ccWrapperWithPriority(priority)
|
ccPriorityWrapper := edsImpl.ccWrapperWithPriority(priority)
|
||||||
stateAggregator := weightedaggregator.New(ccPriorityWrapper, edsImpl.logger, newRandomWRR)
|
stateAggregator := weightedaggregator.New(ccPriorityWrapper, edsImpl.logger, newRandomWRR)
|
||||||
bgwc = &balancerGroupWithConfig{
|
bgwc = &balancerGroupWithConfig{
|
||||||
bg: balancergroup.New(ccPriorityWrapper, stateAggregator, edsImpl.loadReporter, edsImpl.logger),
|
bg: balancergroup.New(ccPriorityWrapper, edsImpl.buildOpts, stateAggregator, edsImpl.loadReporter, edsImpl.logger),
|
||||||
stateAggregator: stateAggregator,
|
stateAggregator: stateAggregator,
|
||||||
configs: make(map[internal.LocalityID]*localityConfig),
|
configs: make(map[internal.LocalityID]*localityConfig),
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ import (
|
||||||
// Init 0 and 1; 0 is up, use 0; add 2, use 0; remove 2, use 0.
|
// Init 0 and 1; 0 is up, use 0; add 2, use 0; remove 2, use 0.
|
||||||
func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
|
func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
|
||||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||||
|
|
||||||
// Two localities, with priorities [0, 1], each with one backend.
|
// Two localities, with priorities [0, 1], each with one backend.
|
||||||
|
|
@ -101,7 +101,7 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
|
||||||
// down, use 2; remove 2, use 1.
|
// down, use 2; remove 2, use 1.
|
||||||
func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
|
func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
|
||||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||||
|
|
||||||
// Two localities, with priorities [0, 1], each with one backend.
|
// Two localities, with priorities [0, 1], each with one backend.
|
||||||
|
|
@ -208,7 +208,7 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
|
||||||
// Init 0 and 1; 0 and 1 both down; add 2, use 2.
|
// Init 0 and 1; 0 and 1 both down; add 2, use 2.
|
||||||
func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) {
|
func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) {
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
|
||||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||||
|
|
||||||
// Two localities, with different priorities, each with one backend.
|
// Two localities, with different priorities, each with one backend.
|
||||||
|
|
@ -271,7 +271,7 @@ func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) {
|
||||||
// Init 0,1,2; 0 and 1 down, use 2; 0 up, close 1 and 2.
|
// Init 0,1,2; 0 and 1 down, use 2; 0 up, close 1 and 2.
|
||||||
func (s) TestEDSPriority_HigherReadyCloseAllLower(t *testing.T) {
|
func (s) TestEDSPriority_HigherReadyCloseAllLower(t *testing.T) {
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
|
||||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||||
|
|
||||||
// Two localities, with priorities [0,1,2], each with one backend.
|
// Two localities, with priorities [0,1,2], each with one backend.
|
||||||
|
|
@ -353,7 +353,7 @@ func (s) TestEDSPriority_InitTimeout(t *testing.T) {
|
||||||
}()()
|
}()()
|
||||||
|
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
|
||||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||||
|
|
||||||
// Two localities, with different priorities, each with one backend.
|
// Two localities, with different priorities, each with one backend.
|
||||||
|
|
@ -403,7 +403,7 @@ func (s) TestEDSPriority_InitTimeout(t *testing.T) {
|
||||||
// - add localities to existing p0 and p1
|
// - add localities to existing p0 and p1
|
||||||
func (s) TestEDSPriority_MultipleLocalities(t *testing.T) {
|
func (s) TestEDSPriority_MultipleLocalities(t *testing.T) {
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
|
||||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||||
|
|
||||||
// Two localities, with different priorities, each with one backend.
|
// Two localities, with different priorities, each with one backend.
|
||||||
|
|
@ -514,7 +514,7 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) {
|
||||||
}()()
|
}()()
|
||||||
|
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
|
||||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||||
|
|
||||||
// Two localities, with different priorities, each with one backend.
|
// Two localities, with different priorities, each with one backend.
|
||||||
|
|
@ -698,7 +698,7 @@ func (s) TestPriorityTypeEqual(t *testing.T) {
|
||||||
// will be used.
|
// will be used.
|
||||||
func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) {
|
func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) {
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
|
||||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||||
|
|
||||||
// Two localities, with priorities [0, 1], each with one backend.
|
// Two localities, with priorities [0, 1], each with one backend.
|
||||||
|
|
@ -757,7 +757,7 @@ func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) {
|
||||||
// priority will be used.
|
// priority will be used.
|
||||||
func (s) TestEDSPriority_HighPriorityAllUnhealthy(t *testing.T) {
|
func (s) TestEDSPriority_HighPriorityAllUnhealthy(t *testing.T) {
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
|
||||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||||
|
|
||||||
// Two localities, with priorities [0, 1], each with one backend.
|
// Two localities, with priorities [0, 1], each with one backend.
|
||||||
|
|
@ -823,7 +823,7 @@ func (s) TestEDSPriority_FirstPriorityUnavailable(t *testing.T) {
|
||||||
defaultPriorityInitTimeout = testPriorityInitTimeout
|
defaultPriorityInitTimeout = testPriorityInitTimeout
|
||||||
|
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
|
||||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||||
|
|
||||||
// One localities, with priorities [0], each with one backend.
|
// One localities, with priorities [0], each with one backend.
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,7 @@ import (
|
||||||
"google.golang.org/grpc/balancer"
|
"google.golang.org/grpc/balancer"
|
||||||
"google.golang.org/grpc/balancer/roundrobin"
|
"google.golang.org/grpc/balancer/roundrobin"
|
||||||
"google.golang.org/grpc/connectivity"
|
"google.golang.org/grpc/connectivity"
|
||||||
|
"google.golang.org/grpc/internal/balancer/stub"
|
||||||
"google.golang.org/grpc/xds/internal"
|
"google.golang.org/grpc/xds/internal"
|
||||||
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
|
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
|
||||||
"google.golang.org/grpc/xds/internal/client"
|
"google.golang.org/grpc/xds/internal/client"
|
||||||
|
|
@ -61,7 +62,7 @@ func init() {
|
||||||
// - change drop rate
|
// - change drop rate
|
||||||
func (s) TestEDS_OneLocality(t *testing.T) {
|
func (s) TestEDS_OneLocality(t *testing.T) {
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
|
||||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||||
|
|
||||||
// One locality with one backend.
|
// One locality with one backend.
|
||||||
|
|
@ -182,7 +183,7 @@ func (s) TestEDS_OneLocality(t *testing.T) {
|
||||||
// - update locality weight
|
// - update locality weight
|
||||||
func (s) TestEDS_TwoLocalities(t *testing.T) {
|
func (s) TestEDS_TwoLocalities(t *testing.T) {
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
|
||||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||||
|
|
||||||
// Two localities, each with one backend.
|
// Two localities, each with one backend.
|
||||||
|
|
@ -313,7 +314,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
|
||||||
// healthy ones are used.
|
// healthy ones are used.
|
||||||
func (s) TestEDS_EndpointsHealth(t *testing.T) {
|
func (s) TestEDS_EndpointsHealth(t *testing.T) {
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
|
||||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||||
|
|
||||||
// Two localities, each 3 backend, one Healthy, one Unhealthy, one Unknown.
|
// Two localities, each 3 backend, one Healthy, one Unhealthy, one Unknown.
|
||||||
|
|
@ -385,7 +386,7 @@ func (s) TestEDS_EndpointsHealth(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s) TestClose(t *testing.T) {
|
func (s) TestClose(t *testing.T) {
|
||||||
edsb := newEDSBalancerImpl(nil, nil, nil, nil)
|
edsb := newEDSBalancerImpl(nil, balancer.BuildOptions{}, nil, nil, nil)
|
||||||
// This is what could happen when switching between fallback and eds. This
|
// This is what could happen when switching between fallback and eds. This
|
||||||
// make sure it doesn't panic.
|
// make sure it doesn't panic.
|
||||||
edsb.close()
|
edsb.close()
|
||||||
|
|
@ -396,7 +397,7 @@ func (s) TestClose(t *testing.T) {
|
||||||
// It should send an error picker with transient failure to the parent.
|
// It should send an error picker with transient failure to the parent.
|
||||||
func (s) TestEDS_EmptyUpdate(t *testing.T) {
|
func (s) TestEDS_EmptyUpdate(t *testing.T) {
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
|
||||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||||
|
|
||||||
// The first update is an empty update.
|
// The first update is an empty update.
|
||||||
|
|
@ -456,15 +457,33 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create XDS balancer, and update sub-balancer before handling eds responses.
|
// Create XDS balancer, and update sub-balancer before handling eds responses.
|
||||||
// Then switch between round-robin and test-const-balancer after handling first
|
// Then switch between round-robin and a test stub-balancer after handling first
|
||||||
// eds response.
|
// eds response.
|
||||||
func (s) TestEDS_UpdateSubBalancerName(t *testing.T) {
|
func (s) TestEDS_UpdateSubBalancerName(t *testing.T) {
|
||||||
|
const balancerName = "stubBalancer-TestEDS_UpdateSubBalancerName"
|
||||||
|
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
stub.Register(balancerName, stub.BalancerFuncs{
|
||||||
|
UpdateClientConnState: func(bd *stub.BalancerData, s balancer.ClientConnState) error {
|
||||||
|
if len(s.ResolverState.Addresses) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
bd.ClientConn.NewSubConn(s.ResolverState.Addresses, balancer.NewSubConnOptions{})
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
|
||||||
|
bd.ClientConn.UpdateState(balancer.State{
|
||||||
|
ConnectivityState: state.ConnectivityState,
|
||||||
|
Picker: &testutils.TestConstPicker{Err: testutils.ErrTestConstPicker},
|
||||||
|
})
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
|
||||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||||
|
|
||||||
t.Logf("update sub-balancer to test-const-balancer")
|
t.Logf("update sub-balancer to stub-balancer")
|
||||||
edsb.handleChildPolicy("test-const-balancer", nil)
|
edsb.handleChildPolicy(balancerName, nil)
|
||||||
|
|
||||||
// Two localities, each with one backend.
|
// Two localities, each with one backend.
|
||||||
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||||
|
|
@ -506,8 +525,8 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) {
|
||||||
t.Fatalf("want %v, got %v", want, err)
|
t.Fatalf("want %v, got %v", want, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Logf("update sub-balancer to test-const-balancer")
|
t.Logf("update sub-balancer to stub-balancer")
|
||||||
edsb.handleChildPolicy("test-const-balancer", nil)
|
edsb.handleChildPolicy(balancerName, nil)
|
||||||
|
|
||||||
for i := 0; i < 2; i++ {
|
for i := 0; i < 2; i++ {
|
||||||
scToRemove := <-cc.RemoveSubConnCh
|
scToRemove := <-cc.RemoveSubConnCh
|
||||||
|
|
@ -558,7 +577,7 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) {
|
||||||
defer func() { env.CircuitBreakingSupport = origCircuitBreakingSupport }()
|
defer func() { env.CircuitBreakingSupport = origCircuitBreakingSupport }()
|
||||||
|
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
|
||||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||||
edsb.updateServiceRequestsCounter("test")
|
edsb.updateServiceRequestsCounter("test")
|
||||||
var maxRequests uint32 = 50
|
var maxRequests uint32 = 50
|
||||||
|
|
@ -658,7 +677,7 @@ func (*testInlineUpdateBalancer) Close() {
|
||||||
// by acquiring a locked mutex.
|
// by acquiring a locked mutex.
|
||||||
func (s) TestEDS_ChildPolicyUpdatePickerInline(t *testing.T) {
|
func (s) TestEDS_ChildPolicyUpdatePickerInline(t *testing.T) {
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
|
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
|
||||||
edsb.enqueueChildBalancerStateUpdate = func(p priorityType, state balancer.State) {
|
edsb.enqueueChildBalancerStateUpdate = func(p priorityType, state balancer.State) {
|
||||||
// For this test, euqueue needs to happen asynchronously (like in the
|
// For this test, euqueue needs to happen asynchronously (like in the
|
||||||
// real implementation).
|
// real implementation).
|
||||||
|
|
@ -759,7 +778,7 @@ func (s) TestEDS_LoadReport(t *testing.T) {
|
||||||
lsWrapper.updateLoadStore(loadStore)
|
lsWrapper.updateLoadStore(loadStore)
|
||||||
|
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
edsb := newEDSBalancerImpl(cc, nil, lsWrapper, nil)
|
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, lsWrapper, nil)
|
||||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
@ -853,7 +872,7 @@ func (s) TestEDS_LoadReportDisabled(t *testing.T) {
|
||||||
// Not calling lsWrapper.updateLoadStore(loadStore) because LRS is disabled.
|
// Not calling lsWrapper.updateLoadStore(loadStore) because LRS is disabled.
|
||||||
|
|
||||||
cc := testutils.NewTestClientConn(t)
|
cc := testutils.NewTestClientConn(t)
|
||||||
edsb := newEDSBalancerImpl(cc, nil, lsWrapper, nil)
|
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, lsWrapper, nil)
|
||||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||||
|
|
||||||
// One localities, with one backend.
|
// One localities, with one backend.
|
||||||
|
|
|
||||||
|
|
@ -221,7 +221,7 @@ func setup(edsLBCh *testutils.Channel) (*fakeclient.Client, func()) {
|
||||||
newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil }
|
newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil }
|
||||||
|
|
||||||
origNewEDSBalancer := newEDSBalancer
|
origNewEDSBalancer := newEDSBalancer
|
||||||
newEDSBalancer = func(cc balancer.ClientConn, enqueue func(priorityType, balancer.State), _ load.PerClusterReporter, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
|
newEDSBalancer = func(cc balancer.ClientConn, _ balancer.BuildOptions, _ func(priorityType, balancer.State), _ load.PerClusterReporter, _ *grpclog.PrefixLogger) edsBalancerImplInterface {
|
||||||
edsLB := newFakeEDSBalancer(cc)
|
edsLB := newFakeEDSBalancer(cc)
|
||||||
defer func() { edsLBCh.Send(edsLB) }()
|
defer func() { edsLBCh.Send(edsLB) }()
|
||||||
return edsLB
|
return edsLB
|
||||||
|
|
|
||||||
|
|
@ -45,12 +45,12 @@ func init() {
|
||||||
|
|
||||||
type weightedTargetBB struct{}
|
type weightedTargetBB struct{}
|
||||||
|
|
||||||
func (wt *weightedTargetBB) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
|
func (wt *weightedTargetBB) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
|
||||||
b := &weightedTargetBalancer{}
|
b := &weightedTargetBalancer{}
|
||||||
b.logger = prefixLogger(b)
|
b.logger = prefixLogger(b)
|
||||||
b.stateAggregator = weightedaggregator.New(cc, b.logger, newRandomWRR)
|
b.stateAggregator = weightedaggregator.New(cc, b.logger, newRandomWRR)
|
||||||
b.stateAggregator.Start()
|
b.stateAggregator.Start()
|
||||||
b.bg = balancergroup.New(cc, b.stateAggregator, nil, b.logger)
|
b.bg = balancergroup.New(cc, bOpts, b.stateAggregator, nil, b.logger)
|
||||||
b.bg.Start()
|
b.bg.Start()
|
||||||
b.logger.Infof("Created")
|
b.logger.Infof("Created")
|
||||||
return b
|
return b
|
||||||
|
|
|
||||||
|
|
@ -143,7 +143,7 @@ func (v3c *client) RecvResponse(s grpc.ClientStream) (proto.Message, error) {
|
||||||
return nil, fmt.Errorf("xds: stream.Recv() failed: %v", err)
|
return nil, fmt.Errorf("xds: stream.Recv() failed: %v", err)
|
||||||
}
|
}
|
||||||
v3c.logger.Infof("ADS response received, type: %v", resp.GetTypeUrl())
|
v3c.logger.Infof("ADS response received, type: %v", resp.GetTypeUrl())
|
||||||
v3c.logger.Debugf("ADS response received: %v", resp)
|
v3c.logger.Debugf("ADS response received: %+v", resp)
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -232,49 +232,9 @@ func (tc *testClosure) next() balancer.SubConn {
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
|
||||||
balancer.Register(&TestConstBalancerBuilder{})
|
|
||||||
}
|
|
||||||
|
|
||||||
// ErrTestConstPicker is error returned by test const picker.
|
// ErrTestConstPicker is error returned by test const picker.
|
||||||
var ErrTestConstPicker = fmt.Errorf("const picker error")
|
var ErrTestConstPicker = fmt.Errorf("const picker error")
|
||||||
|
|
||||||
// TestConstBalancerBuilder is a balancer builder for tests.
|
|
||||||
type TestConstBalancerBuilder struct{}
|
|
||||||
|
|
||||||
// Build builds a test const balancer.
|
|
||||||
func (*TestConstBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
|
|
||||||
return &testConstBalancer{cc: cc}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Name returns test-const-balancer name.
|
|
||||||
func (*TestConstBalancerBuilder) Name() string {
|
|
||||||
return "test-const-balancer"
|
|
||||||
}
|
|
||||||
|
|
||||||
type testConstBalancer struct {
|
|
||||||
cc balancer.ClientConn
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tb *testConstBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
|
|
||||||
tb.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &TestConstPicker{Err: ErrTestConstPicker}})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tb *testConstBalancer) ResolverError(error) {
|
|
||||||
panic("not implemented")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tb *testConstBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
|
|
||||||
if len(s.ResolverState.Addresses) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
tb.cc.NewSubConn(s.ResolverState.Addresses, balancer.NewSubConnOptions{})
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (*testConstBalancer) Close() {
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestConstPicker is a const picker for tests.
|
// TestConstPicker is a const picker for tests.
|
||||||
type TestConstPicker struct {
|
type TestConstPicker struct {
|
||||||
Err error
|
Err error
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue