xds/eds: rewrite EDS policy using child policies (#4457)

This commit is contained in:
Menghan Li 2021-06-15 14:03:10 -07:00 committed by GitHub
parent cd9f53ac49
commit 549c53a90c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 936 additions and 2756 deletions

View File

@ -21,7 +21,9 @@ package balancer
import (
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // Register the CDS balancer
_ "google.golang.org/grpc/xds/internal/balancer/clusterimpl" // Register the xds_cluster_impl balancer
_ "google.golang.org/grpc/xds/internal/balancer/clustermanager" // Register the xds_cluster_manager balancer
_ "google.golang.org/grpc/xds/internal/balancer/edsbalancer" // Register the EDS balancer
_ "google.golang.org/grpc/xds/internal/balancer/priority" // Register the priority balancer
_ "google.golang.org/grpc/xds/internal/balancer/weightedtarget" // Register the weighted_target balancer
)

View File

@ -183,7 +183,7 @@ type BalancerGroup struct {
cc balancer.ClientConn
buildOpts balancer.BuildOptions
logger *grpclog.PrefixLogger
loadStore load.PerClusterReporter
loadStore load.PerClusterReporter // TODO: delete this, no longer needed. It was used by EDS.
// stateAggregator is where the state/picker updates will be sent to. It's
// provided by the parent balancer, to build a picker with all the

View File

@ -27,9 +27,6 @@ import (
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/xdsclient"
_ "google.golang.org/grpc/xds/internal/balancer/clustermanager" // Register the xds_cluster_manager balancer
_ "google.golang.org/grpc/xds/internal/balancer/weightedtarget" // Register the weighted_target balancer
)
const (

View File

@ -21,29 +21,29 @@ package edsbalancer
import (
"encoding/json"
"errors"
"fmt"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/xds/internal/balancer/loadstore"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/load"
)
const edsName = "eds_experimental"
var (
newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions, enqueueState func(priorityType, balancer.State), lw load.PerClusterReporter, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
return newEDSBalancerImpl(cc, opts, enqueueState, lw, logger)
errBalancerClosed = errors.New("cdsBalancer is closed")
newChildBalancer = func(bb balancer.Builder, cc balancer.ClientConn, o balancer.BuildOptions) balancer.Balancer {
return bb.Build(cc, o)
}
)
@ -53,22 +53,38 @@ func init() {
type bb struct{}
// Build helps implement the balancer.Builder interface.
func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
x := &edsBalancer{
cc: cc,
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
grpcUpdate: make(chan interface{}),
xdsClientUpdate: make(chan *edsUpdate),
childPolicyUpdate: buffer.NewUnbounded(),
loadWrapper: loadstore.NewWrapper(),
config: &EDSConfig{},
priorityBuilder := balancer.Get(priority.Name)
if priorityBuilder == nil {
logger.Errorf("priority balancer is needed but not registered")
return nil
}
x.logger = prefixLogger(x)
x.edsImpl = newEDSBalancer(x.cc, opts, x.enqueueChildBalancerState, x.loadWrapper, x.logger)
x.logger.Infof("Created")
go x.run()
return x
priorityConfigParser, ok := priorityBuilder.(balancer.ConfigParser)
if !ok {
logger.Errorf("priority balancer builder is not a config parser")
return nil
}
b := &edsBalancer{
cc: cc,
bOpts: opts,
updateCh: buffer.NewUnbounded(),
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
priorityBuilder: priorityBuilder,
priorityConfigParser: priorityConfigParser,
}
b.logger = prefixLogger(b)
b.logger.Infof("Created")
b.edsWatcher = &edsWatcher{
parent: b,
updateChannel: make(chan *watchUpdate, 1),
}
go b.run()
return b
}
func (bb) Name() string {
@ -83,29 +99,18 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err
return &cfg, nil
}
// edsBalancerImplInterface defines the interface that edsBalancerImpl must
// implement to communicate with edsBalancer.
//
// It's implemented by the real eds balancer and a fake testing eds balancer.
type edsBalancerImplInterface interface {
// handleEDSResponse passes the received EDS message from traffic director
// to eds balancer.
handleEDSResponse(edsResp xdsclient.EndpointsUpdate)
// handleChildPolicy updates the eds balancer the intra-cluster load
// balancing policy to use.
handleChildPolicy(name string, config json.RawMessage)
// handleSubConnStateChange handles state change for SubConn.
handleSubConnStateChange(sc balancer.SubConn, state connectivity.State)
// updateState handle a balancer state update from the priority.
updateState(priority priorityType, s balancer.State)
// updateServiceRequestsConfig updates the service requests counter to the
// one for the given service name.
updateServiceRequestsConfig(serviceName string, max *uint32)
// updateClusterName updates the cluster name that will be attached to the
// address attributes.
updateClusterName(name string)
// close closes the eds balancer.
close()
// ccUpdate wraps a clientConn update received from gRPC (pushed from the
// xdsResolver).
type ccUpdate struct {
state balancer.ClientConnState
err error
}
// scUpdate wraps a subConn update received from gRPC. This is directly passed
// on to the child balancer.
type scUpdate struct {
subConn balancer.SubConn
state balancer.SubConnState
}
// edsBalancer manages xdsClient and the actual EDS balancer implementation that
@ -113,53 +118,111 @@ type edsBalancerImplInterface interface {
//
// It currently has only an edsBalancer. Later, we may add fallback.
type edsBalancer struct {
cc balancer.ClientConn
closed *grpcsync.Event
done *grpcsync.Event
logger *grpclog.PrefixLogger
cc balancer.ClientConn
bOpts balancer.BuildOptions
updateCh *buffer.Unbounded // Channel for updates from gRPC.
edsWatcher *edsWatcher
logger *grpclog.PrefixLogger
closed *grpcsync.Event
done *grpcsync.Event
// edsBalancer continuously monitors the channels below, and will handle
// events from them in sync.
grpcUpdate chan interface{}
xdsClientUpdate chan *edsUpdate
childPolicyUpdate *buffer.Unbounded
priorityBuilder balancer.Builder
priorityConfigParser balancer.ConfigParser
xdsClient xdsclient.XDSClient
loadWrapper *loadstore.Wrapper
config *EDSConfig // may change when passed a different service config
edsImpl edsBalancerImplInterface
config *EDSConfig
configRaw *serviceconfig.ParseResult
xdsClient xdsclient.XDSClient // xDS client to watch EDS resource.
attrsWithClient *attributes.Attributes // Attributes with xdsClient attached to be passed to the child policies.
clusterName string
edsServiceName string
edsToWatch string // this is edsServiceName if it's set, otherwise, it's clusterName.
cancelEndpointsWatch func()
loadReportServer *string // LRS is disabled if loadReporterServer is nil.
cancelLoadReport func()
child balancer.Balancer
edsResp xdsclient.EndpointsUpdate
edsRespReceived bool
}
// run gets executed in a goroutine once edsBalancer is created. It monitors
// updates from grpc, xdsClient and load balancer. It synchronizes the
// operations that happen inside edsBalancer. It exits when edsBalancer is
// closed.
func (b *edsBalancer) run() {
for {
select {
case update := <-b.grpcUpdate:
b.handleGRPCUpdate(update)
case update := <-b.xdsClientUpdate:
b.handleXDSClientUpdate(update)
case update := <-b.childPolicyUpdate.Get():
b.childPolicyUpdate.Load()
u := update.(*balancerStateWithPriority)
b.edsImpl.updateState(u.priority, u.s)
case <-b.closed.Done():
b.cancelWatch()
b.edsImpl.close()
b.logger.Infof("Shutdown")
b.done.Fire()
return
}
// handleClientConnUpdate handles a ClientConnUpdate received from gRPC. Good
// updates lead to registration of an EDS watch. Updates with error lead to
// cancellation of existing watch and propagation of the same error to the
// child balancer.
func (b *edsBalancer) handleClientConnUpdate(update *ccUpdate) {
// We first handle errors, if any, and then proceed with handling the
// update, only if the status quo has changed.
if err := update.err; err != nil {
b.handleErrorFromUpdate(err, true)
}
b.logger.Infof("Receive update from resolver, balancer config: %+v", update.state.BalancerConfig)
cfg, _ := update.state.BalancerConfig.(*EDSConfig)
if cfg == nil {
b.logger.Warningf("xds: unexpected LoadBalancingConfig type: %T", update.state.BalancerConfig)
return
}
b.config = cfg
b.configRaw = update.state.ResolverState.ServiceConfig
b.edsWatcher.updateConfig(cfg)
if !b.edsRespReceived {
// If eds resp was not received, wait for it.
return
}
// If eds resp was received before this, the child policy was created. We
// need to generate a new balancer config and send it to the child, because
// certain fields (unrelated to EDS watch) might have changed.
if err := b.updateChildConfig(); err != nil {
b.logger.Warningf("failed to update child policy config: %v", err)
}
}
// handleWatchUpdate handles a watch update from the xDS Client. Good updates
// lead to clientConn updates being invoked on the underlying child balancer.
func (b *edsBalancer) handleWatchUpdate(update *watchUpdate) {
if err := update.err; err != nil {
b.logger.Warningf("Watch error from xds-client %p: %v", b.xdsClient, err)
b.handleErrorFromUpdate(err, false)
return
}
b.logger.Infof("Watch update from xds-client %p, content: %+v", b.xdsClient, pretty.ToJSON(update.eds))
b.edsRespReceived = true
b.edsResp = update.eds
// A new EDS update triggers new child configs (e.g. different priorities
// for the priority balancer), and new addresses (the endpoints come from
// the EDS response).
if err := b.updateChildConfig(); err != nil {
b.logger.Warningf("failed to update child policy's balancer config: %v", err)
}
}
// updateChildConfig builds a balancer config from eb's cached eds resp and
// service config, and sends that to the child balancer. Note that it also
// generates the addresses, because the endpoints come from the EDS resp.
//
// If child balancer doesn't already exist, one will be created.
func (b *edsBalancer) updateChildConfig() error {
// Child was build when the first EDS resp was received, so we just build
// the config and addresses.
if b.child == nil {
b.child = newChildBalancer(b.priorityBuilder, b.cc, b.bOpts)
}
childCfgBytes, addrs, err := buildPriorityConfigJSON(b.edsResp, b.config)
if err != nil {
return fmt.Errorf("failed to build priority balancer config: %v", err)
}
childCfg, err := b.priorityConfigParser.ParseConfig(childCfgBytes)
if err != nil {
return fmt.Errorf("failed to parse generated priority balancer config, this should never happen because the config is generated: %v", err)
}
b.logger.Infof("build balancer config: %v", pretty.ToJSON(childCfg))
return b.child.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: addrs,
ServiceConfig: b.configRaw,
Attributes: b.attrsWithClient,
},
BalancerConfig: childCfg,
})
}
// handleErrorFromUpdate handles both the error from parent ClientConn (from CDS
@ -173,247 +236,111 @@ func (b *edsBalancer) run() {
// resources were removed. The EDS watch should be canceled.
// - If it's from xds client, it means EDS resource were removed. The EDS
// watcher should keep watching.
// In both cases, the sub-balancers will be closed, and the future picks will
// fail.
// In both cases, the sub-balancers will be receive the error.
func (b *edsBalancer) handleErrorFromUpdate(err error, fromParent bool) {
b.logger.Warningf("Received error: %v", err)
if xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound {
if fromParent {
// This is an error from the parent ClientConn (can be the parent
// CDS balancer), and is a resource-not-found error. This means the
// resource (can be either LDS or CDS) was removed. Stop the EDS
// watch.
b.cancelWatch()
}
b.edsImpl.handleEDSResponse(xdsclient.EndpointsUpdate{})
if fromParent && xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound {
// This is an error from the parent ClientConn (can be the parent CDS
// balancer), and is a resource-not-found error. This means the resource
// (can be either LDS or CDS) was removed. Stop the EDS watch.
b.edsWatcher.stopWatch()
}
if b.child != nil {
b.child.ResolverError(err)
} else {
// If eds balancer was never created, fail the RPCs with errors.
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: base.NewErrPicker(err),
})
}
}
func (b *edsBalancer) handleGRPCUpdate(update interface{}) {
switch u := update.(type) {
case *subConnStateUpdate:
b.edsImpl.handleSubConnStateChange(u.sc, u.state.ConnectivityState)
case *balancer.ClientConnState:
b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(u.BalancerConfig))
cfg, _ := u.BalancerConfig.(*EDSConfig)
if cfg == nil {
// service config parsing failed. should never happen.
// run is a long-running goroutine which handles all updates from gRPC and
// xdsClient. All methods which are invoked directly by gRPC or xdsClient simply
// push an update onto a channel which is read and acted upon right here.
func (b *edsBalancer) run() {
for {
select {
case u := <-b.updateCh.Get():
b.updateCh.Load()
switch update := u.(type) {
case *ccUpdate:
b.handleClientConnUpdate(update)
case *scUpdate:
// SubConn updates are simply handed over to the underlying
// child balancer.
if b.child == nil {
b.logger.Errorf("xds: received scUpdate {%+v} with no child balancer", update)
break
}
b.child.UpdateSubConnState(update.subConn, update.state)
}
case u := <-b.edsWatcher.updateChannel:
b.handleWatchUpdate(u)
// Close results in cancellation of the EDS watch and closing of the
// underlying child policy and is the only way to exit this goroutine.
case <-b.closed.Done():
b.edsWatcher.stopWatch()
if b.child != nil {
b.child.Close()
b.child = nil
}
// This is the *ONLY* point of return from this function.
b.logger.Infof("Shutdown")
b.done.Fire()
return
}
if err := b.handleServiceConfigUpdate(cfg); err != nil {
b.logger.Warningf("failed to update xDS client: %v", err)
}
b.edsImpl.updateServiceRequestsConfig(cfg.ClusterName, cfg.MaxConcurrentRequests)
// We will update the edsImpl with the new child policy, if we got a
// different one.
if !cmp.Equal(cfg.ChildPolicy, b.config.ChildPolicy, cmpopts.EquateEmpty()) {
if cfg.ChildPolicy != nil {
b.edsImpl.handleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config)
} else {
b.edsImpl.handleChildPolicy(roundrobin.Name, nil)
}
}
b.config = cfg
case error:
b.handleErrorFromUpdate(u, true)
default:
// unreachable path
b.logger.Errorf("wrong update type: %T", update)
}
}
// handleServiceConfigUpdate applies the service config update, watching a new
// EDS service name and restarting LRS stream, as required.
func (b *edsBalancer) handleServiceConfigUpdate(config *EDSConfig) error {
var updateLoadClusterAndService bool
if b.clusterName != config.ClusterName {
updateLoadClusterAndService = true
b.clusterName = config.ClusterName
b.edsImpl.updateClusterName(b.clusterName)
}
if b.edsServiceName != config.EDSServiceName {
updateLoadClusterAndService = true
b.edsServiceName = config.EDSServiceName
// Following are methods to implement the balancer interface.
// UpdateClientConnState receives the serviceConfig (which contains the
// clusterName to watch for in CDS) and the xdsClient object from the
// xdsResolver.
func (b *edsBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
if b.closed.HasFired() {
b.logger.Warningf("xds: received ClientConnState {%+v} after edsBalancer was closed", state)
return errBalancerClosed
}
// If EDSServiceName is set, use it to watch EDS. Otherwise, use the cluster
// name.
newEDSToWatch := config.EDSServiceName
if newEDSToWatch == "" {
newEDSToWatch = config.ClusterName
}
var restartEDSWatch bool
if b.edsToWatch != newEDSToWatch {
restartEDSWatch = true
b.edsToWatch = newEDSToWatch
}
// Restart EDS watch when the eds name has changed.
if restartEDSWatch {
b.startEndpointsWatch()
}
if updateLoadClusterAndService {
// TODO: this update for the LRS service name is too early. It should
// only apply to the new EDS response. But this is applied to the RPCs
// before the new EDS response. To fully fix this, the EDS balancer
// needs to do a graceful switch to another EDS implementation.
//
// This is OK for now, because we don't actually expect edsServiceName
// to change. Fix this (a bigger change) will happen later.
b.loadWrapper.UpdateClusterAndService(b.clusterName, b.edsServiceName)
}
// Restart load reporting when the loadReportServer name has changed.
if !equalStringPointers(b.loadReportServer, config.LrsLoadReportingServerName) {
loadStore := b.startLoadReport(config.LrsLoadReportingServerName)
b.loadWrapper.UpdateLoadStore(loadStore)
}
return nil
}
// startEndpointsWatch starts the EDS watch.
//
// This usually means load report needs to be restarted, but this function does
// NOT do that. Caller needs to call startLoadReport separately.
func (b *edsBalancer) startEndpointsWatch() {
if b.cancelEndpointsWatch != nil {
b.cancelEndpointsWatch()
}
edsToWatch := b.edsToWatch
cancelEDSWatch := b.xdsClient.WatchEndpoints(edsToWatch, func(update xdsclient.EndpointsUpdate, err error) {
b.logger.Infof("Watch update from xds-client %p, content: %+v", b.xdsClient, pretty.ToJSON(update))
b.handleEDSUpdate(update, err)
})
b.logger.Infof("Watch started on resource name %v with xds-client %p", edsToWatch, b.xdsClient)
b.cancelEndpointsWatch = func() {
cancelEDSWatch()
b.logger.Infof("Watch cancelled on resource name %v with xds-client %p", edsToWatch, b.xdsClient)
}
}
func (b *edsBalancer) cancelWatch() {
b.loadReportServer = nil
if b.cancelLoadReport != nil {
b.cancelLoadReport()
b.cancelLoadReport = nil
}
if b.cancelEndpointsWatch != nil {
b.edsToWatch = ""
b.cancelEndpointsWatch()
b.cancelEndpointsWatch = nil
}
}
// startLoadReport starts load reporting. If there's already a load reporting in
// progress, it cancels that.
//
// Caller can cal this when the loadReportServer name changes, but
// edsServiceName doesn't (so we only need to restart load reporting, not EDS
// watch).
func (b *edsBalancer) startLoadReport(loadReportServer *string) *load.Store {
b.loadReportServer = loadReportServer
if b.cancelLoadReport != nil {
b.cancelLoadReport()
b.cancelLoadReport = nil
}
if loadReportServer == nil {
return nil
}
ls, cancel := b.xdsClient.ReportLoad(*loadReportServer)
b.cancelLoadReport = cancel
return ls
}
func (b *edsBalancer) handleXDSClientUpdate(update *edsUpdate) {
if err := update.err; err != nil {
b.handleErrorFromUpdate(err, false)
return
}
b.edsImpl.handleEDSResponse(update.resp)
}
type subConnStateUpdate struct {
sc balancer.SubConn
state balancer.SubConnState
}
func (b *edsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
update := &subConnStateUpdate{
sc: sc,
state: state,
}
select {
case b.grpcUpdate <- update:
case <-b.closed.Done():
}
}
func (b *edsBalancer) ResolverError(err error) {
select {
case b.grpcUpdate <- err:
case <-b.closed.Done():
}
}
func (b *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
if b.xdsClient == nil {
c := xdsclient.FromResolverState(s.ResolverState)
c := xdsclient.FromResolverState(state.ResolverState)
if c == nil {
return balancer.ErrBadResolverState
}
b.xdsClient = c
b.attrsWithClient = state.ResolverState.Attributes
}
select {
case b.grpcUpdate <- &s:
case <-b.closed.Done():
}
b.updateCh.Put(&ccUpdate{state: state})
return nil
}
type edsUpdate struct {
resp xdsclient.EndpointsUpdate
err error
}
func (b *edsBalancer) handleEDSUpdate(resp xdsclient.EndpointsUpdate, err error) {
select {
case b.xdsClientUpdate <- &edsUpdate{resp: resp, err: err}:
case <-b.closed.Done():
// ResolverError handles errors reported by the xdsResolver.
func (b *edsBalancer) ResolverError(err error) {
if b.closed.HasFired() {
b.logger.Warningf("xds: received resolver error {%v} after edsBalancer was closed", err)
return
}
b.updateCh.Put(&ccUpdate{err: err})
}
type balancerStateWithPriority struct {
priority priorityType
s balancer.State
}
func (b *edsBalancer) enqueueChildBalancerState(p priorityType, s balancer.State) {
b.childPolicyUpdate.Put(&balancerStateWithPriority{
priority: p,
s: s,
})
// UpdateSubConnState handles subConn updates from gRPC.
func (b *edsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
if b.closed.HasFired() {
b.logger.Warningf("xds: received subConn update {%v, %v} after edsBalancer was closed", sc, state)
return
}
b.updateCh.Put(&scUpdate{subConn: sc, state: state})
}
// Close closes the cdsBalancer and the underlying child balancer.
func (b *edsBalancer) Close() {
b.closed.Fire()
<-b.done.Done()
}
// equalStringPointers returns true if
// - a and b are both nil OR
// - *a == *b (and a and b are both non-nil)
func equalStringPointers(a, b *string) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
return *a == *b
}

View File

@ -1,596 +0,0 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edsbalancer
import (
"encoding/json"
"reflect"
"sync"
"time"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/weightedroundrobin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
xdsi "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/load"
)
// TODO: make this a environment variable?
var defaultPriorityInitTimeout = 10 * time.Second
const defaultServiceRequestCountMax = 1024
type localityConfig struct {
weight uint32
addrs []resolver.Address
}
// balancerGroupWithConfig contains the localities with the same priority. It
// manages all localities using a balancerGroup.
type balancerGroupWithConfig struct {
bg *balancergroup.BalancerGroup
stateAggregator *weightedaggregator.Aggregator
configs map[xdsi.LocalityID]*localityConfig
}
// edsBalancerImpl does load balancing based on the EDS responses. Note that it
// doesn't implement the balancer interface. It's intended to be used by a high
// level balancer implementation.
//
// The localities are picked as weighted round robin. A configurable child
// policy is used to manage endpoints in each locality.
type edsBalancerImpl struct {
cc balancer.ClientConn
buildOpts balancer.BuildOptions
logger *grpclog.PrefixLogger
loadReporter load.PerClusterReporter
enqueueChildBalancerStateUpdate func(priorityType, balancer.State)
subBalancerBuilder balancer.Builder
priorityToLocalities map[priorityType]*balancerGroupWithConfig
respReceived bool
// There's no need to hold any mutexes at the same time. The order to take
// mutex should be: priorityMu > subConnMu, but this is implicit via
// balancers (starting balancer with next priority while holding priorityMu,
// and the balancer may create new SubConn).
priorityMu sync.Mutex
// priorities are pointers, and will be nil when EDS returns empty result.
priorityInUse priorityType
priorityLowest priorityType
priorityToState map[priorityType]*balancer.State
// The timer to give a priority 10 seconds to connect. And if the priority
// doesn't go into Ready/Failure, start the next priority.
//
// One timer is enough because there can be at most one priority in init
// state.
priorityInitTimer *time.Timer
subConnMu sync.Mutex
subConnToPriority map[balancer.SubConn]priorityType
pickerMu sync.Mutex
dropConfig []xdsclient.OverloadDropConfig
drops []*dropper
innerState balancer.State // The state of the picker without drop support.
serviceRequestsCounter *xdsclient.ServiceRequestsCounter
serviceRequestCountMax uint32
clusterNameMu sync.Mutex
clusterName string
}
// newEDSBalancerImpl create a new edsBalancerImpl.
func newEDSBalancerImpl(cc balancer.ClientConn, bOpts balancer.BuildOptions, enqueueState func(priorityType, balancer.State), lr load.PerClusterReporter, logger *grpclog.PrefixLogger) *edsBalancerImpl {
edsImpl := &edsBalancerImpl{
cc: cc,
buildOpts: bOpts,
logger: logger,
subBalancerBuilder: balancer.Get(roundrobin.Name),
loadReporter: lr,
enqueueChildBalancerStateUpdate: enqueueState,
priorityToLocalities: make(map[priorityType]*balancerGroupWithConfig),
priorityToState: make(map[priorityType]*balancer.State),
subConnToPriority: make(map[balancer.SubConn]priorityType),
serviceRequestCountMax: defaultServiceRequestCountMax,
}
// Don't start balancer group here. Start it when handling the first EDS
// response. Otherwise the balancer group will be started with round-robin,
// and if users specify a different sub-balancer, all balancers in balancer
// group will be closed and recreated when sub-balancer update happens.
return edsImpl
}
// handleChildPolicy updates the child balancers handling endpoints. Child
// policy is roundrobin by default. If the specified balancer is not installed,
// the old child balancer will be used.
//
// HandleChildPolicy and HandleEDSResponse must be called by the same goroutine.
func (edsImpl *edsBalancerImpl) handleChildPolicy(name string, config json.RawMessage) {
if edsImpl.subBalancerBuilder.Name() == name {
return
}
newSubBalancerBuilder := balancer.Get(name)
if newSubBalancerBuilder == nil {
edsImpl.logger.Infof("edsBalancerImpl: failed to find balancer with name %q, keep using %q", name, edsImpl.subBalancerBuilder.Name())
return
}
edsImpl.subBalancerBuilder = newSubBalancerBuilder
for _, bgwc := range edsImpl.priorityToLocalities {
if bgwc == nil {
continue
}
for lid, config := range bgwc.configs {
lidJSON, err := lid.ToString()
if err != nil {
edsImpl.logger.Errorf("failed to marshal LocalityID: %#v, skipping this locality", lid)
continue
}
// TODO: (eds) add support to balancer group to support smoothly
// switching sub-balancers (keep old balancer around until new
// balancer becomes ready).
bgwc.bg.Remove(lidJSON)
bgwc.bg.Add(lidJSON, edsImpl.subBalancerBuilder)
bgwc.bg.UpdateClientConnState(lidJSON, balancer.ClientConnState{
ResolverState: resolver.State{Addresses: config.addrs},
})
// This doesn't need to manually update picker, because the new
// sub-balancer will send it's picker later.
}
}
}
// updateDrops compares new drop policies with the old. If they are different,
// it updates the drop policies and send ClientConn an updated picker.
func (edsImpl *edsBalancerImpl) updateDrops(dropConfig []xdsclient.OverloadDropConfig) {
if cmp.Equal(dropConfig, edsImpl.dropConfig) {
return
}
edsImpl.pickerMu.Lock()
edsImpl.dropConfig = dropConfig
var newDrops []*dropper
for _, c := range edsImpl.dropConfig {
newDrops = append(newDrops, newDropper(c))
}
edsImpl.drops = newDrops
if edsImpl.innerState.Picker != nil {
// Update picker with old inner picker, new drops.
edsImpl.cc.UpdateState(balancer.State{
ConnectivityState: edsImpl.innerState.ConnectivityState,
Picker: newDropPicker(edsImpl.innerState.Picker, newDrops, edsImpl.loadReporter, edsImpl.serviceRequestsCounter, edsImpl.serviceRequestCountMax)},
)
}
edsImpl.pickerMu.Unlock()
}
// handleEDSResponse handles the EDS response and creates/deletes localities and
// SubConns. It also handles drops.
//
// HandleChildPolicy and HandleEDSResponse must be called by the same goroutine.
func (edsImpl *edsBalancerImpl) handleEDSResponse(edsResp xdsclient.EndpointsUpdate) {
// TODO: Unhandled fields from EDS response:
// - edsResp.GetPolicy().GetOverprovisioningFactor()
// - locality.GetPriority()
// - lbEndpoint.GetMetadata(): contains BNS name, send to sub-balancers
// - as service config or as resolved address
// - if socketAddress is not ip:port
// - socketAddress.GetNamedPort(), socketAddress.GetResolverName()
// - resolve endpoint's name with another resolver
// If the first EDS update is an empty update, nothing is changing from the
// previous update (which is the default empty value). We need to explicitly
// handle first update being empty, and send a transient failure picker.
//
// TODO: define Equal() on type EndpointUpdate to avoid DeepEqual. And do
// the same for the other types.
if !edsImpl.respReceived && reflect.DeepEqual(edsResp, xdsclient.EndpointsUpdate{}) {
edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(errAllPrioritiesRemoved)})
}
edsImpl.respReceived = true
edsImpl.updateDrops(edsResp.Drops)
// Filter out all localities with weight 0.
//
// Locality weighted load balancer can be enabled by setting an option in
// CDS, and the weight of each locality. Currently, without the guarantee
// that CDS is always sent, we assume locality weighted load balance is
// always enabled, and ignore all weight 0 localities.
//
// In the future, we should look at the config in CDS response and decide
// whether locality weight matters.
newLocalitiesWithPriority := make(map[priorityType][]xdsclient.Locality)
for _, locality := range edsResp.Localities {
if locality.Weight == 0 {
continue
}
priority := newPriorityType(locality.Priority)
newLocalitiesWithPriority[priority] = append(newLocalitiesWithPriority[priority], locality)
}
var (
priorityLowest priorityType
priorityChanged bool
)
for priority, newLocalities := range newLocalitiesWithPriority {
if !priorityLowest.isSet() || priorityLowest.higherThan(priority) {
priorityLowest = priority
}
bgwc, ok := edsImpl.priorityToLocalities[priority]
if !ok {
// Create balancer group if it's never created (this is the first
// time this priority is received). We don't start it here. It may
// be started when necessary (e.g. when higher is down, or if it's a
// new lowest priority).
ccPriorityWrapper := edsImpl.ccWrapperWithPriority(priority)
stateAggregator := weightedaggregator.New(ccPriorityWrapper, edsImpl.logger, newRandomWRR)
bgwc = &balancerGroupWithConfig{
bg: balancergroup.New(ccPriorityWrapper, edsImpl.buildOpts, stateAggregator, edsImpl.loadReporter, edsImpl.logger),
stateAggregator: stateAggregator,
configs: make(map[xdsi.LocalityID]*localityConfig),
}
edsImpl.priorityToLocalities[priority] = bgwc
priorityChanged = true
edsImpl.logger.Infof("New priority %v added", priority)
}
edsImpl.handleEDSResponsePerPriority(bgwc, newLocalities)
}
edsImpl.priorityLowest = priorityLowest
// Delete priorities that are removed in the latest response, and also close
// the balancer group.
for p, bgwc := range edsImpl.priorityToLocalities {
if _, ok := newLocalitiesWithPriority[p]; !ok {
delete(edsImpl.priorityToLocalities, p)
bgwc.bg.Close()
delete(edsImpl.priorityToState, p)
priorityChanged = true
edsImpl.logger.Infof("Priority %v deleted", p)
}
}
// If priority was added/removed, it may affect the balancer group to use.
// E.g. priorityInUse was removed, or all priorities are down, and a new
// lower priority was added.
if priorityChanged {
edsImpl.handlePriorityChange()
}
}
func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroupWithConfig, newLocalities []xdsclient.Locality) {
// newLocalitiesSet contains all names of localities in the new EDS response
// for the same priority. It's used to delete localities that are removed in
// the new EDS response.
newLocalitiesSet := make(map[xdsi.LocalityID]struct{})
var rebuildStateAndPicker bool
for _, locality := range newLocalities {
// One balancer for each locality.
lid := locality.ID
lidJSON, err := lid.ToString()
if err != nil {
edsImpl.logger.Errorf("failed to marshal LocalityID: %#v, skipping this locality", lid)
continue
}
newLocalitiesSet[lid] = struct{}{}
newWeight := locality.Weight
var newAddrs []resolver.Address
for _, lbEndpoint := range locality.Endpoints {
// Filter out all "unhealthy" endpoints (unknown and
// healthy are both considered to be healthy:
// https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus).
if lbEndpoint.HealthStatus != xdsclient.EndpointHealthStatusHealthy &&
lbEndpoint.HealthStatus != xdsclient.EndpointHealthStatusUnknown {
continue
}
address := resolver.Address{
Addr: lbEndpoint.Address,
}
if edsImpl.subBalancerBuilder.Name() == weightedroundrobin.Name && lbEndpoint.Weight != 0 {
ai := weightedroundrobin.AddrInfo{Weight: lbEndpoint.Weight}
address = weightedroundrobin.SetAddrInfo(address, ai)
// Metadata field in resolver.Address is deprecated. The
// attributes field should be used to specify arbitrary
// attributes about the address. We still need to populate the
// Metadata field here to allow users of this field to migrate
// to the new one.
// TODO(easwars): Remove this once all users have migrated.
// See https://github.com/grpc/grpc-go/issues/3563.
address.Metadata = &ai
}
newAddrs = append(newAddrs, address)
}
var weightChanged, addrsChanged bool
config, ok := bgwc.configs[lid]
if !ok {
// A new balancer, add it to balancer group and balancer map.
bgwc.stateAggregator.Add(lidJSON, newWeight)
bgwc.bg.Add(lidJSON, edsImpl.subBalancerBuilder)
config = &localityConfig{
weight: newWeight,
}
bgwc.configs[lid] = config
// weightChanged is false for new locality, because there's no need
// to update weight in bg.
addrsChanged = true
edsImpl.logger.Infof("New locality %v added", lid)
} else {
// Compare weight and addrs.
if config.weight != newWeight {
weightChanged = true
}
if !cmp.Equal(config.addrs, newAddrs) {
addrsChanged = true
}
edsImpl.logger.Infof("Locality %v updated, weightedChanged: %v, addrsChanged: %v", lid, weightChanged, addrsChanged)
}
if weightChanged {
config.weight = newWeight
bgwc.stateAggregator.UpdateWeight(lidJSON, newWeight)
rebuildStateAndPicker = true
}
if addrsChanged {
config.addrs = newAddrs
bgwc.bg.UpdateClientConnState(lidJSON, balancer.ClientConnState{
ResolverState: resolver.State{Addresses: newAddrs},
})
}
}
// Delete localities that are removed in the latest response.
for lid := range bgwc.configs {
lidJSON, err := lid.ToString()
if err != nil {
edsImpl.logger.Errorf("failed to marshal LocalityID: %#v, skipping this locality", lid)
continue
}
if _, ok := newLocalitiesSet[lid]; !ok {
bgwc.stateAggregator.Remove(lidJSON)
bgwc.bg.Remove(lidJSON)
delete(bgwc.configs, lid)
edsImpl.logger.Infof("Locality %v deleted", lid)
rebuildStateAndPicker = true
}
}
if rebuildStateAndPicker {
bgwc.stateAggregator.BuildAndUpdate()
}
}
// handleSubConnStateChange handles the state change and update pickers accordingly.
func (edsImpl *edsBalancerImpl) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
edsImpl.subConnMu.Lock()
var bgwc *balancerGroupWithConfig
if p, ok := edsImpl.subConnToPriority[sc]; ok {
if s == connectivity.Shutdown {
// Only delete sc from the map when state changed to Shutdown.
delete(edsImpl.subConnToPriority, sc)
}
bgwc = edsImpl.priorityToLocalities[p]
}
edsImpl.subConnMu.Unlock()
if bgwc == nil {
edsImpl.logger.Infof("edsBalancerImpl: priority not found for sc state change")
return
}
if bg := bgwc.bg; bg != nil {
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s})
}
}
// updateServiceRequestsConfig handles changes to the circuit breaking configuration.
func (edsImpl *edsBalancerImpl) updateServiceRequestsConfig(serviceName string, max *uint32) {
edsImpl.pickerMu.Lock()
var updatePicker bool
if edsImpl.serviceRequestsCounter == nil || edsImpl.serviceRequestsCounter.ServiceName != serviceName {
edsImpl.serviceRequestsCounter = xdsclient.GetServiceRequestsCounter(serviceName)
updatePicker = true
}
var newMax uint32 = defaultServiceRequestCountMax
if max != nil {
newMax = *max
}
if edsImpl.serviceRequestCountMax != newMax {
edsImpl.serviceRequestCountMax = newMax
updatePicker = true
}
if updatePicker && edsImpl.innerState.Picker != nil {
// Update picker with old inner picker, new counter and counterMax.
edsImpl.cc.UpdateState(balancer.State{
ConnectivityState: edsImpl.innerState.ConnectivityState,
Picker: newDropPicker(edsImpl.innerState.Picker, edsImpl.drops, edsImpl.loadReporter, edsImpl.serviceRequestsCounter, edsImpl.serviceRequestCountMax)},
)
}
edsImpl.pickerMu.Unlock()
}
func (edsImpl *edsBalancerImpl) updateClusterName(name string) {
edsImpl.clusterNameMu.Lock()
defer edsImpl.clusterNameMu.Unlock()
edsImpl.clusterName = name
}
func (edsImpl *edsBalancerImpl) getClusterName() string {
edsImpl.clusterNameMu.Lock()
defer edsImpl.clusterNameMu.Unlock()
return edsImpl.clusterName
}
// updateState first handles priority, and then wraps picker in a drop picker
// before forwarding the update.
func (edsImpl *edsBalancerImpl) updateState(priority priorityType, s balancer.State) {
_, ok := edsImpl.priorityToLocalities[priority]
if !ok {
edsImpl.logger.Infof("eds: received picker update from unknown priority")
return
}
if edsImpl.handlePriorityWithNewState(priority, s) {
edsImpl.pickerMu.Lock()
defer edsImpl.pickerMu.Unlock()
edsImpl.innerState = s
// Don't reset drops when it's a state change.
edsImpl.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: newDropPicker(s.Picker, edsImpl.drops, edsImpl.loadReporter, edsImpl.serviceRequestsCounter, edsImpl.serviceRequestCountMax)})
}
}
func (edsImpl *edsBalancerImpl) ccWrapperWithPriority(priority priorityType) *edsBalancerWrapperCC {
return &edsBalancerWrapperCC{
ClientConn: edsImpl.cc,
priority: priority,
parent: edsImpl,
}
}
// edsBalancerWrapperCC implements the balancer.ClientConn API and get passed to
// each balancer group. It contains the locality priority.
type edsBalancerWrapperCC struct {
balancer.ClientConn
priority priorityType
parent *edsBalancerImpl
}
func (ebwcc *edsBalancerWrapperCC) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
clusterName := ebwcc.parent.getClusterName()
newAddrs := make([]resolver.Address, len(addrs))
for i, addr := range addrs {
newAddrs[i] = internal.SetXDSHandshakeClusterName(addr, clusterName)
}
return ebwcc.parent.newSubConn(ebwcc.priority, newAddrs, opts)
}
func (ebwcc *edsBalancerWrapperCC) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
clusterName := ebwcc.parent.getClusterName()
newAddrs := make([]resolver.Address, len(addrs))
for i, addr := range addrs {
newAddrs[i] = internal.SetXDSHandshakeClusterName(addr, clusterName)
}
ebwcc.ClientConn.UpdateAddresses(sc, newAddrs)
}
func (ebwcc *edsBalancerWrapperCC) UpdateState(state balancer.State) {
ebwcc.parent.enqueueChildBalancerStateUpdate(ebwcc.priority, state)
}
func (edsImpl *edsBalancerImpl) newSubConn(priority priorityType, addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
sc, err := edsImpl.cc.NewSubConn(addrs, opts)
if err != nil {
return nil, err
}
edsImpl.subConnMu.Lock()
edsImpl.subConnToPriority[sc] = priority
edsImpl.subConnMu.Unlock()
return sc, nil
}
// close closes the balancer.
func (edsImpl *edsBalancerImpl) close() {
for _, bgwc := range edsImpl.priorityToLocalities {
if bg := bgwc.bg; bg != nil {
bgwc.stateAggregator.Stop()
bg.Close()
}
}
}
type dropPicker struct {
drops []*dropper
p balancer.Picker
loadStore load.PerClusterReporter
counter *xdsclient.ServiceRequestsCounter
countMax uint32
}
func newDropPicker(p balancer.Picker, drops []*dropper, loadStore load.PerClusterReporter, counter *xdsclient.ServiceRequestsCounter, countMax uint32) *dropPicker {
return &dropPicker{
drops: drops,
p: p,
loadStore: loadStore,
counter: counter,
countMax: countMax,
}
}
func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
var (
drop bool
category string
)
for _, dp := range d.drops {
if dp.drop() {
drop = true
category = dp.c.Category
break
}
}
if drop {
if d.loadStore != nil {
d.loadStore.CallDropped(category)
}
return balancer.PickResult{}, status.Errorf(codes.Unavailable, "RPC is dropped")
}
if d.counter != nil {
if err := d.counter.StartRequest(d.countMax); err != nil {
// Drops by circuit breaking are reported with empty category. They
// will be reported only in total drops, but not in per category.
if d.loadStore != nil {
d.loadStore.CallDropped("")
}
return balancer.PickResult{}, status.Errorf(codes.Unavailable, err.Error())
}
pr, err := d.p.Pick(info)
if err != nil {
d.counter.EndRequest()
return pr, err
}
oldDone := pr.Done
pr.Done = func(doneInfo balancer.DoneInfo) {
d.counter.EndRequest()
if oldDone != nil {
oldDone(doneInfo)
}
}
return pr, err
}
// TODO: (eds) don't drop unless the inner picker is READY. Similar to
// https://github.com/grpc/grpc-go/issues/2622.
return d.p.Pick(info)
}

View File

@ -1,358 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edsbalancer
import (
"errors"
"fmt"
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
)
var errAllPrioritiesRemoved = errors.New("eds: no locality is provided, all priorities are removed")
// handlePriorityChange handles priority after EDS adds/removes a
// priority.
//
// - If all priorities were deleted, unset priorityInUse, and set parent
// ClientConn to TransientFailure
// - If priorityInUse wasn't set, this is either the first EDS resp, or the
// previous EDS resp deleted everything. Set priorityInUse to 0, and start 0.
// - If priorityInUse was deleted, send the picker from the new lowest priority
// to parent ClientConn, and set priorityInUse to the new lowest.
// - If priorityInUse has a non-Ready state, and also there's a priority lower
// than priorityInUse (which means a lower priority was added), set the next
// priority as new priorityInUse, and start the bg.
func (edsImpl *edsBalancerImpl) handlePriorityChange() {
edsImpl.priorityMu.Lock()
defer edsImpl.priorityMu.Unlock()
// Everything was removed by EDS.
if !edsImpl.priorityLowest.isSet() {
edsImpl.priorityInUse = newPriorityTypeUnset()
// Stop the init timer. This can happen if the only priority is removed
// shortly after it's added.
if timer := edsImpl.priorityInitTimer; timer != nil {
timer.Stop()
edsImpl.priorityInitTimer = nil
}
edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(errAllPrioritiesRemoved)})
return
}
// priorityInUse wasn't set, use 0.
if !edsImpl.priorityInUse.isSet() {
edsImpl.logger.Infof("Switching priority from unset to %v", 0)
edsImpl.startPriority(newPriorityType(0))
return
}
// priorityInUse was deleted, use the new lowest.
if _, ok := edsImpl.priorityToLocalities[edsImpl.priorityInUse]; !ok {
oldP := edsImpl.priorityInUse
edsImpl.priorityInUse = edsImpl.priorityLowest
edsImpl.logger.Infof("Switching priority from %v to %v, because former was deleted", oldP, edsImpl.priorityInUse)
if s, ok := edsImpl.priorityToState[edsImpl.priorityLowest]; ok {
edsImpl.cc.UpdateState(*s)
} else {
// If state for priorityLowest is not found, this means priorityLowest was
// started, but never sent any update. The init timer fired and
// triggered the next priority. The old_priorityInUse (that was just
// deleted EDS) was picked later.
//
// We don't have an old state to send to parent, but we also don't
// want parent to keep using picker from old_priorityInUse. Send an
// update to trigger block picks until a new picker is ready.
edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)})
}
return
}
// priorityInUse is not ready, look for next priority, and use if found.
if s, ok := edsImpl.priorityToState[edsImpl.priorityInUse]; ok && s.ConnectivityState != connectivity.Ready {
pNext := edsImpl.priorityInUse.nextLower()
if _, ok := edsImpl.priorityToLocalities[pNext]; ok {
edsImpl.logger.Infof("Switching priority from %v to %v, because latter was added, and former wasn't Ready")
edsImpl.startPriority(pNext)
}
}
}
// startPriority sets priorityInUse to p, and starts the balancer group for p.
// It also starts a timer to fall to next priority after timeout.
//
// Caller must hold priorityMu, priority must exist, and edsImpl.priorityInUse
// must be non-nil.
func (edsImpl *edsBalancerImpl) startPriority(priority priorityType) {
edsImpl.priorityInUse = priority
p := edsImpl.priorityToLocalities[priority]
// NOTE: this will eventually send addresses to sub-balancers. If the
// sub-balancer tries to update picker, it will result in a deadlock on
// priorityMu in the update is handled synchronously. The deadlock is
// currently avoided by handling balancer update in a goroutine (the run
// goroutine in the parent eds balancer). When priority balancer is split
// into its own, this asynchronous state handling needs to be copied.
p.stateAggregator.Start()
p.bg.Start()
// startPriority can be called when
// 1. first EDS resp, start p0
// 2. a high priority goes Failure, start next
// 3. a high priority init timeout, start next
//
// In all the cases, the existing init timer is either closed, also already
// expired. There's no need to close the old timer.
edsImpl.priorityInitTimer = time.AfterFunc(defaultPriorityInitTimeout, func() {
edsImpl.priorityMu.Lock()
defer edsImpl.priorityMu.Unlock()
if !edsImpl.priorityInUse.isSet() || !edsImpl.priorityInUse.equal(priority) {
return
}
edsImpl.priorityInitTimer = nil
pNext := priority.nextLower()
if _, ok := edsImpl.priorityToLocalities[pNext]; ok {
edsImpl.startPriority(pNext)
}
})
}
// handlePriorityWithNewState start/close priorities based on the connectivity
// state. It returns whether the state should be forwarded to parent ClientConn.
func (edsImpl *edsBalancerImpl) handlePriorityWithNewState(priority priorityType, s balancer.State) bool {
edsImpl.priorityMu.Lock()
defer edsImpl.priorityMu.Unlock()
if !edsImpl.priorityInUse.isSet() {
edsImpl.logger.Infof("eds: received picker update when no priority is in use (EDS returned an empty list)")
return false
}
if edsImpl.priorityInUse.higherThan(priority) {
// Lower priorities should all be closed, this is an unexpected update.
edsImpl.logger.Infof("eds: received picker update from priority lower then priorityInUse")
return false
}
bState, ok := edsImpl.priorityToState[priority]
if !ok {
bState = &balancer.State{}
edsImpl.priorityToState[priority] = bState
}
oldState := bState.ConnectivityState
*bState = s
switch s.ConnectivityState {
case connectivity.Ready:
return edsImpl.handlePriorityWithNewStateReady(priority)
case connectivity.TransientFailure:
return edsImpl.handlePriorityWithNewStateTransientFailure(priority)
case connectivity.Connecting:
return edsImpl.handlePriorityWithNewStateConnecting(priority, oldState)
default:
// New state is Idle, should never happen. Don't forward.
return false
}
}
// handlePriorityWithNewStateReady handles state Ready and decides whether to
// forward update or not.
//
// An update with state Ready:
// - If it's from higher priority:
// - Forward the update
// - Set the priority as priorityInUse
// - Close all priorities lower than this one
// - If it's from priorityInUse:
// - Forward and do nothing else
//
// Caller must make sure priorityInUse is not higher than priority.
//
// Caller must hold priorityMu.
func (edsImpl *edsBalancerImpl) handlePriorityWithNewStateReady(priority priorityType) bool {
// If one priority higher or equal to priorityInUse goes Ready, stop the
// init timer. If update is from higher than priorityInUse,
// priorityInUse will be closed, and the init timer will become useless.
if timer := edsImpl.priorityInitTimer; timer != nil {
timer.Stop()
edsImpl.priorityInitTimer = nil
}
if edsImpl.priorityInUse.lowerThan(priority) {
edsImpl.logger.Infof("Switching priority from %v to %v, because latter became Ready", edsImpl.priorityInUse, priority)
edsImpl.priorityInUse = priority
for i := priority.nextLower(); !i.lowerThan(edsImpl.priorityLowest); i = i.nextLower() {
bgwc := edsImpl.priorityToLocalities[i]
bgwc.stateAggregator.Stop()
bgwc.bg.Close()
}
return true
}
return true
}
// handlePriorityWithNewStateTransientFailure handles state TransientFailure and
// decides whether to forward update or not.
//
// An update with state Failure:
// - If it's from a higher priority:
// - Do not forward, and do nothing
// - If it's from priorityInUse:
// - If there's no lower:
// - Forward and do nothing else
// - If there's a lower priority:
// - Forward
// - Set lower as priorityInUse
// - Start lower
//
// Caller must make sure priorityInUse is not higher than priority.
//
// Caller must hold priorityMu.
func (edsImpl *edsBalancerImpl) handlePriorityWithNewStateTransientFailure(priority priorityType) bool {
if edsImpl.priorityInUse.lowerThan(priority) {
return false
}
// priorityInUse sends a failure. Stop its init timer.
if timer := edsImpl.priorityInitTimer; timer != nil {
timer.Stop()
edsImpl.priorityInitTimer = nil
}
pNext := priority.nextLower()
if _, okNext := edsImpl.priorityToLocalities[pNext]; !okNext {
return true
}
edsImpl.logger.Infof("Switching priority from %v to %v, because former became TransientFailure", priority, pNext)
edsImpl.startPriority(pNext)
return true
}
// handlePriorityWithNewStateConnecting handles state Connecting and decides
// whether to forward update or not.
//
// An update with state Connecting:
// - If it's from a higher priority
// - Do nothing
// - If it's from priorityInUse, the behavior depends on previous state.
//
// When new state is Connecting, the behavior depends on previous state. If the
// previous state was Ready, this is a transition out from Ready to Connecting.
// Assuming there are multiple backends in the same priority, this mean we are
// in a bad situation and we should failover to the next priority (Side note:
// the current connectivity state aggregating algorhtim (e.g. round-robin) is
// not handling this right, because if many backends all go from Ready to
// Connecting, the overall situation is more like TransientFailure, not
// Connecting).
//
// If the previous state was Idle, we don't do anything special with failure,
// and simply forward the update. The init timer should be in process, will
// handle failover if it timeouts. If the previous state was TransientFailure,
// we do not forward, because the lower priority is in use.
//
// Caller must make sure priorityInUse is not higher than priority.
//
// Caller must hold priorityMu.
func (edsImpl *edsBalancerImpl) handlePriorityWithNewStateConnecting(priority priorityType, oldState connectivity.State) bool {
if edsImpl.priorityInUse.lowerThan(priority) {
return false
}
switch oldState {
case connectivity.Ready:
pNext := priority.nextLower()
if _, okNext := edsImpl.priorityToLocalities[pNext]; !okNext {
return true
}
edsImpl.logger.Infof("Switching priority from %v to %v, because former became Connecting from Ready", priority, pNext)
edsImpl.startPriority(pNext)
return true
case connectivity.Idle:
return true
case connectivity.TransientFailure:
return false
default:
// Old state is Connecting or Shutdown. Don't forward.
return false
}
}
// priorityType represents the priority from EDS response.
//
// 0 is the highest priority. The bigger the number, the lower the priority.
type priorityType struct {
set bool
p uint32
}
func newPriorityType(p uint32) priorityType {
return priorityType{
set: true,
p: p,
}
}
func newPriorityTypeUnset() priorityType {
return priorityType{}
}
func (p priorityType) isSet() bool {
return p.set
}
func (p priorityType) equal(p2 priorityType) bool {
if !p.isSet() && !p2.isSet() {
return true
}
if !p.isSet() || !p2.isSet() {
return false
}
return p == p2
}
func (p priorityType) higherThan(p2 priorityType) bool {
if !p.isSet() || !p2.isSet() {
// TODO(menghanl): return an appropriate value instead of panic.
panic("priority unset")
}
return p.p < p2.p
}
func (p priorityType) lowerThan(p2 priorityType) bool {
if !p.isSet() || !p2.isSet() {
// TODO(menghanl): return an appropriate value instead of panic.
panic("priority unset")
}
return p.p > p2.p
}
func (p priorityType) nextLower() priorityType {
if !p.isSet() {
panic("priority unset")
}
return priorityType{
set: true,
p: p.p + 1,
}
}
func (p priorityType) String() string {
if !p.set {
return "Nil"
}
return fmt.Sprint(p.p)
}

View File

@ -28,6 +28,7 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/testutils"
)
@ -36,15 +37,14 @@ import (
//
// Init 0 and 1; 0 is up, use 0; add 2, use 0; remove 2, use 0.
func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
edsb, cc, xdsC, cleanup := setupTestEDS(t)
defer cleanup()
// Two localities, with priorities [0, 1], each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil)
addrs1 := <-cc.NewSubConnAddrsCh
if got, want := addrs1[0].Addr, testEndpointAddrs[0]; got != want {
@ -53,22 +53,20 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
sc1 := <-cc.NewSubConnCh
// p0 is ready.
edsb.handleSubConnStateChange(sc1, connectivity.Connecting)
edsb.handleSubConnStateChange(sc1, connectivity.Ready)
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin with only p0 subconns.
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc1}); err != nil {
t.Fatal(err)
}
// Add p2, it shouldn't cause any udpates.
// Add p2, it shouldn't cause any updates.
clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
clab2.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab2.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab2.Build()), nil)
select {
case <-cc.NewPickerCh:
@ -84,7 +82,7 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
clab3 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab3.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab3.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab3.Build()), nil)
select {
case <-cc.NewPickerCh:
@ -102,15 +100,14 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
// Init 0 and 1; 0 is up, use 0; 0 is down, 1 is up, use 1; add 2, use 1; 1 is
// down, use 2; remove 2, use 1.
func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
edsb, cc, xdsC, cleanup := setupTestEDS(t)
defer cleanup()
// Two localities, with priorities [0, 1], each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil)
addrs0 := <-cc.NewSubConnAddrsCh
if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want {
@ -119,41 +116,35 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
sc0 := <-cc.NewSubConnCh
// p0 is ready.
edsb.handleSubConnStateChange(sc0, connectivity.Connecting)
edsb.handleSubConnStateChange(sc0, connectivity.Ready)
edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin with only p0 subconns.
p0 := <-cc.NewPickerCh
want := []balancer.SubConn{sc0}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p0)); err != nil {
t.Fatalf("want %v, got %v", want, err)
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc0}); err != nil {
t.Fatal(err)
}
// Turn down 0, 1 is used.
edsb.handleSubConnStateChange(sc0, connectivity.TransientFailure)
edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
addrs1 := <-cc.NewSubConnAddrsCh
if got, want := addrs1[0].Addr, testEndpointAddrs[1]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc1 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc1, connectivity.Connecting)
edsb.handleSubConnStateChange(sc1, connectivity.Ready)
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with 1.
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc1}); err != nil {
t.Fatal(err)
}
// Add p2, it shouldn't cause any udpates.
// Add p2, it shouldn't cause any updates.
clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
clab2.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab2.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab2.Build()), nil)
select {
case <-cc.NewPickerCh:
@ -166,29 +157,25 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
}
// Turn down 1, use 2
edsb.handleSubConnStateChange(sc1, connectivity.TransientFailure)
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
addrs2 := <-cc.NewSubConnAddrsCh
if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc2 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc2, connectivity.Connecting)
edsb.handleSubConnStateChange(sc2, connectivity.Ready)
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with 2.
p2 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p2.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2)
}
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc2}); err != nil {
t.Fatal(err)
}
// Remove 2, use 1.
clab3 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab3.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab3.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab3.Build()), nil)
// p2 SubConns are removed.
scToRemove := <-cc.RemoveSubConnCh
@ -197,28 +184,23 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
}
// Should get an update with 1's old picker, to override 2's old picker.
p3 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if _, err := p3.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure {
t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err)
}
if err := testErrPickerFromCh(cc.NewPickerCh, balancer.ErrTransientFailure); err != nil {
t.Fatal(err)
}
}
// Add a lower priority while the higher priority is down.
//
// Init 0 and 1; 0 and 1 both down; add 2, use 2.
func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) {
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
edsb, cc, xdsC, cleanup := setupTestEDS(t)
defer cleanup()
// Two localities, with different priorities, each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil)
addrs0 := <-cc.NewSubConnAddrsCh
if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
@ -226,21 +208,18 @@ func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) {
sc0 := <-cc.NewSubConnCh
// Turn down 0, 1 is used.
edsb.handleSubConnStateChange(sc0, connectivity.TransientFailure)
edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
addrs1 := <-cc.NewSubConnAddrsCh
if got, want := addrs1[0].Addr, testEndpointAddrs[1]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc1 := <-cc.NewSubConnCh
// Turn down 1, pick should error.
edsb.handleSubConnStateChange(sc1, connectivity.TransientFailure)
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
// Test pick failure.
pFail := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if _, err := pFail.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure {
t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err)
}
if err := testErrPickerFromCh(cc.NewPickerCh, balancer.ErrTransientFailure); err != nil {
t.Fatal(err)
}
// Add p2, it should create a new SubConn.
@ -248,41 +227,34 @@ func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) {
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
clab2.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab2.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab2.Build()), nil)
addrs2 := <-cc.NewSubConnAddrsCh
if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc2 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc2, connectivity.Connecting)
edsb.handleSubConnStateChange(sc2, connectivity.Ready)
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with 2.
p2 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p2.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2)
}
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc2}); err != nil {
t.Fatal(err)
}
}
// When a higher priority becomes available, all lower priorities are closed.
//
// Init 0,1,2; 0 and 1 down, use 2; 0 up, close 1 and 2.
func (s) TestEDSPriority_HigherReadyCloseAllLower(t *testing.T) {
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
edsb, cc, xdsC, cleanup := setupTestEDS(t)
defer cleanup()
// Two localities, with priorities [0,1,2], each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
clab1.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil)
addrs0 := <-cc.NewSubConnAddrsCh
if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
@ -290,39 +262,55 @@ func (s) TestEDSPriority_HigherReadyCloseAllLower(t *testing.T) {
sc0 := <-cc.NewSubConnCh
// Turn down 0, 1 is used.
edsb.handleSubConnStateChange(sc0, connectivity.TransientFailure)
edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
addrs1 := <-cc.NewSubConnAddrsCh
if got, want := addrs1[0].Addr, testEndpointAddrs[1]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc1 := <-cc.NewSubConnCh
// Turn down 1, 2 is used.
edsb.handleSubConnStateChange(sc1, connectivity.TransientFailure)
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
addrs2 := <-cc.NewSubConnAddrsCh
if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc2 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc2, connectivity.Connecting)
edsb.handleSubConnStateChange(sc2, connectivity.Ready)
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with 2.
p2 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p2.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2)
}
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc2}); err != nil {
t.Fatal(err)
}
// When 0 becomes ready, 0 should be used, 1 and 2 should all be closed.
edsb.handleSubConnStateChange(sc0, connectivity.Ready)
edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready})
var (
scToRemove []balancer.SubConn
scToRemoveMap = make(map[balancer.SubConn]struct{})
)
// Each subconn is removed twice. This is OK in production, but it makes
// testing harder.
//
// The sub-balancer to be closed is priority's child, clusterimpl, who has
// weightedtarget as children.
//
// - When clusterimpl is removed from priority's balancergroup, all its
// subconns are removed once.
// - When clusterimpl is closed, it closes weightedtarget, and this
// weightedtarget's balancer removes all the same subconns again.
for i := 0; i < 4; i++ {
// We expect 2 subconns, so we recv from channel 4 times.
scToRemoveMap[<-cc.RemoveSubConnCh] = struct{}{}
}
for sc := range scToRemoveMap {
scToRemove = append(scToRemove, sc)
}
// sc1 and sc2 should be removed.
//
// With localities caching, the lower priorities are closed after a timeout,
// in goroutines. The order is no longer guaranteed.
scToRemove := []balancer.SubConn{<-cc.RemoveSubConnCh, <-cc.RemoveSubConnCh}
if !(cmp.Equal(scToRemove[0], sc1, cmp.AllowUnexported(testutils.TestSubConn{})) &&
cmp.Equal(scToRemove[1], sc2, cmp.AllowUnexported(testutils.TestSubConn{}))) &&
!(cmp.Equal(scToRemove[0], sc2, cmp.AllowUnexported(testutils.TestSubConn{})) &&
@ -331,12 +319,8 @@ func (s) TestEDSPriority_HigherReadyCloseAllLower(t *testing.T) {
}
// Test pick with 0.
p0 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p0.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0)
}
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc0}); err != nil {
t.Fatal(err)
}
}
@ -347,23 +331,20 @@ func (s) TestEDSPriority_HigherReadyCloseAllLower(t *testing.T) {
func (s) TestEDSPriority_InitTimeout(t *testing.T) {
const testPriorityInitTimeout = time.Second
defer func() func() {
old := defaultPriorityInitTimeout
defaultPriorityInitTimeout = testPriorityInitTimeout
old := priority.DefaultPriorityInitTimeout
priority.DefaultPriorityInitTimeout = testPriorityInitTimeout
return func() {
defaultPriorityInitTimeout = old
priority.DefaultPriorityInitTimeout = old
}
}()()
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
edsb, cc, xdsC, cleanup := setupTestEDS(t)
defer cleanup()
// Two localities, with different priorities, each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil)
addrs0 := <-cc.NewSubConnAddrsCh
if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
@ -371,7 +352,7 @@ func (s) TestEDSPriority_InitTimeout(t *testing.T) {
sc0 := <-cc.NewSubConnCh
// Keep 0 in connecting, 1 will be used after init timeout.
edsb.handleSubConnStateChange(sc0, connectivity.Connecting)
edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// Make sure new SubConn is created before timeout.
select {
@ -386,16 +367,12 @@ func (s) TestEDSPriority_InitTimeout(t *testing.T) {
}
sc1 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc1, connectivity.Connecting)
edsb.handleSubConnStateChange(sc1, connectivity.Ready)
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with 1.
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc1}); err != nil {
t.Fatal(err)
}
}
@ -404,51 +381,44 @@ func (s) TestEDSPriority_InitTimeout(t *testing.T) {
// - start with 2 locality with p0 and p1
// - add localities to existing p0 and p1
func (s) TestEDSPriority_MultipleLocalities(t *testing.T) {
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
edsb, cc, xdsC, cleanup := setupTestEDS(t)
defer cleanup()
// Two localities, with different priorities, each with one backend.
clab0 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab0.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab0.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab0.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab0.Build()), nil)
addrs0 := <-cc.NewSubConnAddrsCh
if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc0 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc0, connectivity.Connecting)
edsb.handleSubConnStateChange(sc0, connectivity.Ready)
edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin with only p0 subconns.
p0 := <-cc.NewPickerCh
want := []balancer.SubConn{sc0}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p0)); err != nil {
t.Fatalf("want %v, got %v", want, err)
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc0}); err != nil {
t.Fatal(err)
}
// Turn down p0 subconns, p1 subconns will be created.
edsb.handleSubConnStateChange(sc0, connectivity.TransientFailure)
edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
addrs1 := <-cc.NewSubConnAddrsCh
if got, want := addrs1[0].Addr, testEndpointAddrs[1]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc1 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc1, connectivity.Connecting)
edsb.handleSubConnStateChange(sc1, connectivity.Ready)
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin with only p1 subconns.
p1 := <-cc.NewPickerCh
want = []balancer.SubConn{sc1}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc1}); err != nil {
t.Fatal(err)
}
// Reconnect p0 subconns, p1 subconn will be closed.
edsb.handleSubConnStateChange(sc0, connectivity.Ready)
edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready})
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
@ -456,10 +426,8 @@ func (s) TestEDSPriority_MultipleLocalities(t *testing.T) {
}
// Test roundrobin with only p0 subconns.
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{sc0}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc0}); err != nil {
t.Fatal(err)
}
// Add two localities, with two priorities, with one backend.
@ -468,39 +436,34 @@ func (s) TestEDSPriority_MultipleLocalities(t *testing.T) {
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
clab1.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil)
clab1.AddLocality(testSubZones[3], 1, 1, testEndpointAddrs[3:4], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil)
addrs2 := <-cc.NewSubConnAddrsCh
if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc2 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc2, connectivity.Connecting)
edsb.handleSubConnStateChange(sc2, connectivity.Ready)
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin with only two p0 subconns.
p3 := <-cc.NewPickerCh
want = []balancer.SubConn{sc0, sc2}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p3)); err != nil {
t.Fatalf("want %v, got %v", want, err)
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc0, sc2}); err != nil {
t.Fatal(err)
}
// Turn down p0 subconns, p1 subconns will be created.
edsb.handleSubConnStateChange(sc0, connectivity.TransientFailure)
edsb.handleSubConnStateChange(sc2, connectivity.TransientFailure)
edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
sc3 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc3, connectivity.Connecting)
edsb.handleSubConnStateChange(sc3, connectivity.Ready)
edsb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc4 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc4, connectivity.Connecting)
edsb.handleSubConnStateChange(sc4, connectivity.Ready)
edsb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin with only p1 subconns.
p4 := <-cc.NewPickerCh
want = []balancer.SubConn{sc3, sc4}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p4)); err != nil {
t.Fatalf("want %v, got %v", want, err)
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc3, sc4}); err != nil {
t.Fatal(err)
}
}
@ -508,62 +471,55 @@ func (s) TestEDSPriority_MultipleLocalities(t *testing.T) {
func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) {
const testPriorityInitTimeout = time.Second
defer func() func() {
old := defaultPriorityInitTimeout
defaultPriorityInitTimeout = testPriorityInitTimeout
old := priority.DefaultPriorityInitTimeout
priority.DefaultPriorityInitTimeout = testPriorityInitTimeout
return func() {
defaultPriorityInitTimeout = old
priority.DefaultPriorityInitTimeout = old
}
}()()
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
edsb, cc, xdsC, cleanup := setupTestEDS(t)
defer cleanup()
// Two localities, with different priorities, each with one backend.
clab0 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab0.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab0.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab0.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab0.Build()), nil)
addrs0 := <-cc.NewSubConnAddrsCh
if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc0 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc0, connectivity.Connecting)
edsb.handleSubConnStateChange(sc0, connectivity.Ready)
edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin with only p0 subconns.
p0 := <-cc.NewPickerCh
want := []balancer.SubConn{sc0}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p0)); err != nil {
t.Fatalf("want %v, got %v", want, err)
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc0}); err != nil {
t.Fatal(err)
}
// Remove all priorities.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil)
// p0 subconn should be removed.
scToRemove := <-cc.RemoveSubConnCh
<-cc.RemoveSubConnCh // Drain the duplicate subconn removed.
if !cmp.Equal(scToRemove, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc0, scToRemove)
}
// time.Sleep(time.Second)
// Test pick return TransientFailure.
pFail := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if _, err := pFail.Pick(balancer.PickInfo{}); err != errAllPrioritiesRemoved {
t.Fatalf("want pick error %v, got %v", errAllPrioritiesRemoved, err)
}
if err := testErrPickerFromCh(cc.NewPickerCh, priority.ErrAllPrioritiesRemoved); err != nil {
t.Fatal(err)
}
// Re-add two localities, with previous priorities, but different backends.
clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[3:4], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab2.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab2.Build()), nil)
addrs01 := <-cc.NewSubConnAddrsCh
if got, want := addrs01[0].Addr, testEndpointAddrs[2]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
@ -580,45 +536,39 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc11 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc11, connectivity.Connecting)
edsb.handleSubConnStateChange(sc11, connectivity.Ready)
edsb.UpdateSubConnState(sc11, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc11, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin with only p1 subconns.
p1 := <-cc.NewPickerCh
want = []balancer.SubConn{sc11}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc11}); err != nil {
t.Fatal(err)
}
// Remove p1 from EDS, to fallback to p0.
clab3 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab3.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab3.Build()), nil)
// p1 subconn should be removed.
scToRemove1 := <-cc.RemoveSubConnCh
<-cc.RemoveSubConnCh // Drain the duplicate subconn removed.
if !cmp.Equal(scToRemove1, sc11, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc11, scToRemove1)
}
// Test pick return TransientFailure.
pFail1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if scst, err := pFail1.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable {
t.Fatalf("want pick error _, %v, got %v, _ ,%v", balancer.ErrTransientFailure, scst, err)
}
if err := testErrPickerFromCh(cc.NewPickerCh, balancer.ErrNoSubConnAvailable); err != nil {
t.Fatal(err)
}
// Send an ready update for the p0 sc that was received when re-adding
// localities to EDS.
edsb.handleSubConnStateChange(sc01, connectivity.Connecting)
edsb.handleSubConnStateChange(sc01, connectivity.Ready)
edsb.UpdateSubConnState(sc01, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc01, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin with only p0 subconns.
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{sc01}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc01}); err != nil {
t.Fatal(err)
}
select {
@ -632,83 +582,16 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) {
}
}
func (s) TestPriorityType(t *testing.T) {
p0 := newPriorityType(0)
p1 := newPriorityType(1)
p2 := newPriorityType(2)
if !p0.higherThan(p1) || !p0.higherThan(p2) {
t.Errorf("want p0 to be higher than p1 and p2, got p0>p1: %v, p0>p2: %v", !p0.higherThan(p1), !p0.higherThan(p2))
}
if !p1.lowerThan(p0) || !p1.higherThan(p2) {
t.Errorf("want p1 to be between p0 and p2, got p1<p0: %v, p1>p2: %v", !p1.lowerThan(p0), !p1.higherThan(p2))
}
if !p2.lowerThan(p0) || !p2.lowerThan(p1) {
t.Errorf("want p2 to be lower than p0 and p1, got p2<p0: %v, p2<p1: %v", !p2.lowerThan(p0), !p2.lowerThan(p1))
}
if got := p1.equal(p0.nextLower()); !got {
t.Errorf("want p1 to be equal to p0's next lower, got p1==p0.nextLower: %v", got)
}
if got := p1.equal(newPriorityType(1)); !got {
t.Errorf("want p1 to be equal to priority with value 1, got p1==1: %v", got)
}
}
func (s) TestPriorityTypeEqual(t *testing.T) {
tests := []struct {
name string
p1, p2 priorityType
want bool
}{
{
name: "equal",
p1: newPriorityType(12),
p2: newPriorityType(12),
want: true,
},
{
name: "not equal",
p1: newPriorityType(12),
p2: newPriorityType(34),
want: false,
},
{
name: "one not set",
p1: newPriorityType(1),
p2: newPriorityTypeUnset(),
want: false,
},
{
name: "both not set",
p1: newPriorityTypeUnset(),
p2: newPriorityTypeUnset(),
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tt.p1.equal(tt.p2); got != tt.want {
t.Errorf("equal() = %v, want %v", got, tt.want)
}
})
}
}
// Test the case where the high priority contains no backends. The low priority
// will be used.
func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) {
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
edsb, cc, xdsC, cleanup := setupTestEDS(t)
defer cleanup()
// Two localities, with priorities [0, 1], each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil)
addrs1 := <-cc.NewSubConnAddrsCh
if got, want := addrs1[0].Addr, testEndpointAddrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
@ -716,26 +599,23 @@ func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) {
sc1 := <-cc.NewSubConnCh
// p0 is ready.
edsb.handleSubConnStateChange(sc1, connectivity.Connecting)
edsb.handleSubConnStateChange(sc1, connectivity.Ready)
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin with only p0 subconns.
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc1}); err != nil {
t.Fatal(err)
}
// Remove addresses from priority 0, should use p1.
clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, nil, nil)
clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab2.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab2.Build()), nil)
// p0 will remove the subconn, and ClientConn will send a sc update to
// shutdown.
scToRemove := <-cc.RemoveSubConnCh
edsb.handleSubConnStateChange(scToRemove, connectivity.Shutdown)
edsb.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
addrs2 := <-cc.NewSubConnAddrsCh
if got, want := addrs2[0].Addr, testEndpointAddrs[1]; got != want {
@ -744,30 +624,25 @@ func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) {
sc2 := <-cc.NewSubConnCh
// p1 is ready.
edsb.handleSubConnStateChange(sc2, connectivity.Connecting)
edsb.handleSubConnStateChange(sc2, connectivity.Ready)
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin with only p1 subconns.
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{sc2}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc2}); err != nil {
t.Fatal(err)
}
}
// Test the case where the high priority contains no healthy backends. The low
// priority will be used.
func (s) TestEDSPriority_HighPriorityAllUnhealthy(t *testing.T) {
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
edsb, cc, xdsC, cleanup := setupTestEDS(t)
defer cleanup()
// Two localities, with priorities [0, 1], each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil)
addrs1 := <-cc.NewSubConnAddrsCh
if got, want := addrs1[0].Addr, testEndpointAddrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
@ -775,14 +650,12 @@ func (s) TestEDSPriority_HighPriorityAllUnhealthy(t *testing.T) {
sc1 := <-cc.NewSubConnCh
// p0 is ready.
edsb.handleSubConnStateChange(sc1, connectivity.Connecting)
edsb.handleSubConnStateChange(sc1, connectivity.Ready)
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin with only p0 subconns.
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc1}); err != nil {
t.Fatal(err)
}
// Set priority 0 endpoints to all unhealthy, should use p1.
@ -791,12 +664,11 @@ func (s) TestEDSPriority_HighPriorityAllUnhealthy(t *testing.T) {
Health: []corepb.HealthStatus{corepb.HealthStatus_UNHEALTHY},
})
clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab2.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab2.Build()), nil)
// p0 will remove the subconn, and ClientConn will send a sc update to
// transient failure.
scToRemove := <-cc.RemoveSubConnCh
edsb.handleSubConnStateChange(scToRemove, connectivity.Shutdown)
edsb.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
addrs2 := <-cc.NewSubConnAddrsCh
if got, want := addrs2[0].Addr, testEndpointAddrs[1]; got != want {
@ -805,38 +677,35 @@ func (s) TestEDSPriority_HighPriorityAllUnhealthy(t *testing.T) {
sc2 := <-cc.NewSubConnCh
// p1 is ready.
edsb.handleSubConnStateChange(sc2, connectivity.Connecting)
edsb.handleSubConnStateChange(sc2, connectivity.Ready)
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin with only p1 subconns.
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{sc2}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc2}); err != nil {
t.Fatal(err)
}
}
// Test the case where the first and only priority is removed.
func (s) TestEDSPriority_FirstPriorityUnavailable(t *testing.T) {
func (s) TestEDSPriority_FirstPriorityRemoved(t *testing.T) {
const testPriorityInitTimeout = time.Second
defer func(t time.Duration) {
defaultPriorityInitTimeout = t
}(defaultPriorityInitTimeout)
defaultPriorityInitTimeout = testPriorityInitTimeout
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, balancer.BuildOptions{}, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
defer func() func() {
old := priority.DefaultPriorityInitTimeout
priority.DefaultPriorityInitTimeout = testPriorityInitTimeout
return func() {
priority.DefaultPriorityInitTimeout = old
}
}()()
_, cc, xdsC, cleanup := setupTestEDS(t)
defer cleanup()
// One localities, with priorities [0], each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab1.Build()), nil)
// Remove the only localities.
clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab2.Build()))
xdsC.InvokeWatchEDSCallback(parseEDSRespProtoForTesting(clab2.Build()), nil)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := cc.WaitForErrPicker(ctx); err != nil {

File diff suppressed because it is too large Load Diff

View File

@ -25,7 +25,6 @@ import (
"context"
"encoding/json"
"fmt"
"reflect"
"testing"
"time"
@ -34,7 +33,6 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpctest"
scpb "google.golang.org/grpc/internal/proto/grpc_service_config"
"google.golang.org/grpc/internal/testutils"
@ -43,7 +41,6 @@ import (
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/load"
_ "google.golang.org/grpc/xds/internal/xdsclient/v2" // V2 client registration.
)
@ -52,7 +49,7 @@ const (
defaultTestTimeout = 1 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond
testServiceName = "test/foo"
testEDSClusterName = "test/service/eds"
testClusterName = "test/cluster"
)
var (
@ -70,15 +67,22 @@ var (
}
)
func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
return func() balancer.SubConn {
scst, _ := p.Pick(balancer.PickInfo{})
return scst.SubConn
}
func init() {
balancer.Register(bb{})
}
type s struct {
grpctest.Tester
cleanup func()
}
func (ss s) Teardown(t *testing.T) {
xdsclient.ClearAllCountersForTesting()
ss.Tester.Teardown(t)
if ss.cleanup != nil {
ss.cleanup()
}
}
func Test(t *testing.T) {
@ -105,58 +109,49 @@ func (noopTestClientConn) Target() string { return testServiceName }
type scStateChange struct {
sc balancer.SubConn
state connectivity.State
state balancer.SubConnState
}
type fakeEDSBalancer struct {
cc balancer.ClientConn
childPolicy *testutils.Channel
subconnStateChange *testutils.Channel
edsUpdate *testutils.Channel
serviceName *testutils.Channel
serviceRequestMax *testutils.Channel
clusterName *testutils.Channel
type fakeChildBalancer struct {
cc balancer.ClientConn
subConnState *testutils.Channel
clientConnState *testutils.Channel
resolverError *testutils.Channel
}
func (f *fakeEDSBalancer) handleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
f.subconnStateChange.Send(&scStateChange{sc: sc, state: state})
func (f *fakeChildBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
f.clientConnState.Send(state)
return nil
}
func (f *fakeEDSBalancer) handleChildPolicy(name string, config json.RawMessage) {
f.childPolicy.Send(&loadBalancingConfig{Name: name, Config: config})
func (f *fakeChildBalancer) ResolverError(err error) {
f.resolverError.Send(err)
}
func (f *fakeEDSBalancer) handleEDSResponse(edsResp xdsclient.EndpointsUpdate) {
f.edsUpdate.Send(edsResp)
func (f *fakeChildBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
f.subConnState.Send(&scStateChange{sc: sc, state: state})
}
func (f *fakeEDSBalancer) updateState(priority priorityType, s balancer.State) {}
func (f *fakeChildBalancer) Close() {}
func (f *fakeEDSBalancer) updateServiceRequestsConfig(serviceName string, max *uint32) {
f.serviceName.Send(serviceName)
f.serviceRequestMax.Send(max)
}
func (f *fakeEDSBalancer) updateClusterName(name string) {
f.clusterName.Send(name)
}
func (f *fakeEDSBalancer) close() {}
func (f *fakeEDSBalancer) waitForChildPolicy(ctx context.Context, wantPolicy *loadBalancingConfig) error {
val, err := f.childPolicy.Receive(ctx)
func (f *fakeChildBalancer) waitForClientConnStateChange(ctx context.Context) error {
_, err := f.clientConnState.Receive(ctx)
if err != nil {
return err
}
gotPolicy := val.(*loadBalancingConfig)
if !cmp.Equal(gotPolicy, wantPolicy) {
return fmt.Errorf("got childPolicy %v, want %v", gotPolicy, wantPolicy)
}
return nil
}
func (f *fakeEDSBalancer) waitForSubConnStateChange(ctx context.Context, wantState *scStateChange) error {
val, err := f.subconnStateChange.Receive(ctx)
func (f *fakeChildBalancer) waitForResolverError(ctx context.Context) error {
_, err := f.resolverError.Receive(ctx)
if err != nil {
return err
}
return nil
}
func (f *fakeChildBalancer) waitForSubConnStateChange(ctx context.Context, wantState *scStateChange) error {
val, err := f.subConnState.Receive(ctx)
if err != nil {
return err
}
@ -167,70 +162,12 @@ func (f *fakeEDSBalancer) waitForSubConnStateChange(ctx context.Context, wantSta
return nil
}
func (f *fakeEDSBalancer) waitForEDSResponse(ctx context.Context, wantUpdate xdsclient.EndpointsUpdate) error {
val, err := f.edsUpdate.Receive(ctx)
if err != nil {
return err
}
gotUpdate := val.(xdsclient.EndpointsUpdate)
if !reflect.DeepEqual(gotUpdate, wantUpdate) {
return fmt.Errorf("got edsUpdate %+v, want %+v", gotUpdate, wantUpdate)
}
return nil
}
func (f *fakeEDSBalancer) waitForCounterUpdate(ctx context.Context, wantServiceName string) error {
val, err := f.serviceName.Receive(ctx)
if err != nil {
return err
}
gotServiceName := val.(string)
if gotServiceName != wantServiceName {
return fmt.Errorf("got serviceName %v, want %v", gotServiceName, wantServiceName)
}
return nil
}
func (f *fakeEDSBalancer) waitForCountMaxUpdate(ctx context.Context, want *uint32) error {
val, err := f.serviceRequestMax.Receive(ctx)
if err != nil {
return err
}
got := val.(*uint32)
if got == nil && want == nil {
return nil
}
if got != nil && want != nil {
if *got != *want {
return fmt.Errorf("got countMax %v, want %v", *got, *want)
}
return nil
}
return fmt.Errorf("got countMax %+v, want %+v", got, want)
}
func (f *fakeEDSBalancer) waitForClusterNameUpdate(ctx context.Context, wantClusterName string) error {
val, err := f.clusterName.Receive(ctx)
if err != nil {
return err
}
gotServiceName := val.(string)
if gotServiceName != wantClusterName {
return fmt.Errorf("got clusterName %v, want %v", gotServiceName, wantClusterName)
}
return nil
}
func newFakeEDSBalancer(cc balancer.ClientConn) edsBalancerImplInterface {
return &fakeEDSBalancer{
cc: cc,
childPolicy: testutils.NewChannelWithSize(10),
subconnStateChange: testutils.NewChannelWithSize(10),
edsUpdate: testutils.NewChannelWithSize(10),
serviceName: testutils.NewChannelWithSize(10),
serviceRequestMax: testutils.NewChannelWithSize(10),
clusterName: testutils.NewChannelWithSize(10),
func newFakeChildBalancer(cc balancer.ClientConn) balancer.Balancer {
return &fakeChildBalancer{
cc: cc,
subConnState: testutils.NewChannelWithSize(10),
clientConnState: testutils.NewChannelWithSize(10),
resolverError: testutils.NewChannelWithSize(10),
}
}
@ -239,168 +176,37 @@ type fakeSubConn struct{}
func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") }
func (*fakeSubConn) Connect() { panic("implement me") }
// waitForNewEDSLB makes sure that a new edsLB is created by the top-level
// waitForNewChildLB makes sure that a new child LB is created by the top-level
// edsBalancer.
func waitForNewEDSLB(ctx context.Context, ch *testutils.Channel) (*fakeEDSBalancer, error) {
func waitForNewChildLB(ctx context.Context, ch *testutils.Channel) (*fakeChildBalancer, error) {
val, err := ch.Receive(ctx)
if err != nil {
return nil, fmt.Errorf("error when waiting for a new edsLB: %v", err)
}
return val.(*fakeEDSBalancer), nil
return val.(*fakeChildBalancer), nil
}
// setup overrides the functions which are used to create the xdsClient and the
// edsLB, creates fake version of them and makes them available on the provided
// channels. The returned cancel function should be called by the test for
// cleanup.
func setup(edsLBCh *testutils.Channel) (*fakeclient.Client, func()) {
func setup(childLBCh *testutils.Channel) (*fakeclient.Client, func()) {
xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar)
origNewEDSBalancer := newEDSBalancer
newEDSBalancer = func(cc balancer.ClientConn, _ balancer.BuildOptions, _ func(priorityType, balancer.State), _ load.PerClusterReporter, _ *grpclog.PrefixLogger) edsBalancerImplInterface {
edsLB := newFakeEDSBalancer(cc)
defer func() { edsLBCh.Send(edsLB) }()
return edsLB
origNewChildBalancer := newChildBalancer
newChildBalancer = func(_ balancer.Builder, cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
childLB := newFakeChildBalancer(cc)
defer func() { childLBCh.Send(childLB) }()
return childLB
}
return xdsC, func() {
newEDSBalancer = origNewEDSBalancer
newChildBalancer = origNewChildBalancer
xdsC.Close()
}
}
const (
fakeBalancerA = "fake_balancer_A"
fakeBalancerB = "fake_balancer_B"
)
// Install two fake balancers for service config update tests.
//
// ParseConfig only accepts the json if the balancer specified is registered.
func init() {
balancer.Register(&fakeBalancerBuilder{name: fakeBalancerA})
balancer.Register(&fakeBalancerBuilder{name: fakeBalancerB})
}
type fakeBalancerBuilder struct {
name string
}
func (b *fakeBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return &fakeBalancer{cc: cc}
}
func (b *fakeBalancerBuilder) Name() string {
return b.name
}
type fakeBalancer struct {
cc balancer.ClientConn
}
func (b *fakeBalancer) ResolverError(error) {
panic("implement me")
}
func (b *fakeBalancer) UpdateClientConnState(balancer.ClientConnState) error {
panic("implement me")
}
func (b *fakeBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {
panic("implement me")
}
func (b *fakeBalancer) Close() {}
// TestConfigChildPolicyUpdate verifies scenarios where the childPolicy
// section of the lbConfig is updated.
//
// The test does the following:
// * Builds a new EDS balancer.
// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerA.
// Verifies that an EDS watch is registered. It then pushes a new edsUpdate
// through the fakexds client. Verifies that a new edsLB is created and it
// receives the expected childPolicy.
// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerB.
// Verifies that the existing edsLB receives the new child policy.
func (s) TestConfigChildPolicyUpdate(t *testing.T) {
edsLBCh := testutils.NewChannel()
xdsC, cleanup := setup(edsLBCh)
defer cleanup()
builder := balancer.Get(edsName)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
}
defer edsB.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
edsLB, err := waitForNewEDSLB(ctx, edsLBCh)
if err != nil {
t.Fatal(err)
}
lbCfgA := &loadBalancingConfig{
Name: fakeBalancerA,
Config: json.RawMessage("{}"),
}
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{
ChildPolicy: lbCfgA,
ClusterName: testEDSClusterName,
EDSServiceName: testServiceName,
},
}); err != nil {
t.Fatalf("edsB.UpdateClientConnState() failed: %v", err)
}
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
xdsC.InvokeWatchEDSCallback(defaultEndpointsUpdate, nil)
if err := edsLB.waitForChildPolicy(ctx, lbCfgA); err != nil {
t.Fatal(err)
}
if err := edsLB.waitForCounterUpdate(ctx, testEDSClusterName); err != nil {
t.Fatal(err)
}
if err := edsLB.waitForCountMaxUpdate(ctx, nil); err != nil {
t.Fatal(err)
}
var testCountMax uint32 = 100
lbCfgB := &loadBalancingConfig{
Name: fakeBalancerB,
Config: json.RawMessage("{}"),
}
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{
ChildPolicy: lbCfgB,
ClusterName: testEDSClusterName,
EDSServiceName: testServiceName,
MaxConcurrentRequests: &testCountMax,
},
}); err != nil {
t.Fatalf("edsB.UpdateClientConnState() failed: %v", err)
}
if err := edsLB.waitForChildPolicy(ctx, lbCfgB); err != nil {
t.Fatal(err)
}
if err := edsLB.waitForCounterUpdate(ctx, testEDSClusterName); err != nil {
// Counter is updated even though the service name didn't change. The
// eds_impl will compare the service names, and skip if it didn't change.
t.Fatal(err)
}
if err := edsLB.waitForCountMaxUpdate(ctx, &testCountMax); err != nil {
t.Fatal(err)
}
}
// TestSubConnStateChange verifies if the top-level edsBalancer passes on
// the subConnStateChange to appropriate child balancer.
// the subConnState to appropriate child balancer.
func (s) TestSubConnStateChange(t *testing.T) {
edsLBCh := testutils.NewChannel()
xdsC, cleanup := setup(edsLBCh)
@ -413,13 +219,6 @@ func (s) TestSubConnStateChange(t *testing.T) {
}
defer edsB.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
edsLB, err := waitForNewEDSLB(ctx, edsLBCh)
if err != nil {
t.Fatal(err)
}
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{EDSServiceName: testServiceName},
@ -427,14 +226,20 @@ func (s) TestSubConnStateChange(t *testing.T) {
t.Fatalf("edsB.UpdateClientConnState() failed: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
xdsC.InvokeWatchEDSCallback(defaultEndpointsUpdate, nil)
edsLB, err := waitForNewChildLB(ctx, edsLBCh)
if err != nil {
t.Fatal(err)
}
fsc := &fakeSubConn{}
state := connectivity.Ready
edsB.UpdateSubConnState(fsc, balancer.SubConnState{ConnectivityState: state})
state := balancer.SubConnState{ConnectivityState: connectivity.Ready}
edsB.UpdateSubConnState(fsc, state)
if err := edsLB.waitForSubConnStateChange(ctx, &scStateChange{sc: fsc, state: state}); err != nil {
t.Fatal(err)
}
@ -462,24 +267,22 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
edsLB, err := waitForNewEDSLB(ctx, edsLBCh)
if err != nil {
t.Fatal(err)
}
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{EDSServiceName: testServiceName},
}); err != nil {
t.Fatal(err)
}
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
if err := edsLB.waitForEDSResponse(ctx, xdsclient.EndpointsUpdate{}); err != nil {
t.Fatalf("EDS impl got unexpected EDS response: %v", err)
edsLB, err := waitForNewChildLB(ctx, edsLBCh)
if err != nil {
t.Fatal(err)
}
if err := edsLB.waitForClientConnStateChange(ctx); err != nil {
t.Fatalf("EDS impl got unexpected update: %v", err)
}
connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error")
@ -493,9 +296,12 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if err := edsLB.waitForEDSResponse(sCtx, xdsclient.EndpointsUpdate{}); err != context.DeadlineExceeded {
if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded {
t.Fatal(err)
}
if err := edsLB.waitForResolverError(ctx); err != nil {
t.Fatalf("want resolver error, got %v", err)
}
resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "edsBalancer resource not found error")
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, resourceErr)
@ -507,8 +313,11 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
if err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}
if err := edsLB.waitForEDSResponse(ctx, xdsclient.EndpointsUpdate{}); err != nil {
t.Fatalf("eds impl expecting empty update, got %v", err)
if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded {
t.Fatal(err)
}
if err := edsLB.waitForResolverError(ctx); err != nil {
t.Fatalf("want resolver error, got %v", err)
}
// An update with the same service name should not trigger a new watch.
@ -546,11 +355,6 @@ func (s) TestErrorFromResolver(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
edsLB, err := waitForNewEDSLB(ctx, edsLBCh)
if err != nil {
t.Fatal(err)
}
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{EDSServiceName: testServiceName},
@ -562,8 +366,12 @@ func (s) TestErrorFromResolver(t *testing.T) {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
if err := edsLB.waitForEDSResponse(ctx, xdsclient.EndpointsUpdate{}); err != nil {
t.Fatalf("EDS impl got unexpected EDS response: %v", err)
edsLB, err := waitForNewChildLB(ctx, edsLBCh)
if err != nil {
t.Fatal(err)
}
if err := edsLB.waitForClientConnStateChange(ctx); err != nil {
t.Fatalf("EDS impl got unexpected update: %v", err)
}
connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error")
@ -577,17 +385,23 @@ func (s) TestErrorFromResolver(t *testing.T) {
sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if err := edsLB.waitForEDSResponse(sCtx, xdsclient.EndpointsUpdate{}); err != context.DeadlineExceeded {
if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded {
t.Fatal("eds impl got EDS resp, want timeout error")
}
if err := edsLB.waitForResolverError(ctx); err != nil {
t.Fatalf("want resolver error, got %v", err)
}
resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "edsBalancer resource not found error")
edsB.ResolverError(resourceErr)
if err := xdsC.WaitForCancelEDSWatch(ctx); err != nil {
t.Fatalf("want watch to be canceled, waitForCancel failed: %v", err)
}
if err := edsLB.waitForEDSResponse(ctx, xdsclient.EndpointsUpdate{}); err != nil {
t.Fatalf("EDS impl got unexpected EDS response: %v", err)
if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded {
t.Fatal(err)
}
if err := edsLB.waitForResolverError(ctx); err != nil {
t.Fatalf("want resolver error, got %v", err)
}
// An update with the same service name should trigger a new watch, because
@ -681,97 +495,49 @@ func (s) TestClientWatchEDS(t *testing.T) {
}
}
// TestCounterUpdate verifies that the counter update is triggered with the
// service name from an update's config.
func (s) TestCounterUpdate(t *testing.T) {
edsLBCh := testutils.NewChannel()
xdsC, cleanup := setup(edsLBCh)
defer cleanup()
const (
fakeBalancerA = "fake_balancer_A"
fakeBalancerB = "fake_balancer_B"
)
builder := balancer.Get(edsName)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
}
defer edsB.Close()
var testCountMax uint32 = 100
// Update should trigger counter update with provided service name.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{
ClusterName: "foobar-1",
MaxConcurrentRequests: &testCountMax,
},
}); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
edsI := edsB.(*edsBalancer).edsImpl.(*fakeEDSBalancer)
if err := edsI.waitForCounterUpdate(ctx, "foobar-1"); err != nil {
t.Fatal(err)
}
if err := edsI.waitForCountMaxUpdate(ctx, &testCountMax); err != nil {
t.Fatal(err)
}
// Install two fake balancers for service config update tests.
//
// ParseConfig only accepts the json if the balancer specified is registered.
func init() {
balancer.Register(&fakeBalancerBuilder{name: fakeBalancerA})
balancer.Register(&fakeBalancerBuilder{name: fakeBalancerB})
}
// TestClusterNameUpdateInAddressAttributes verifies that cluster name update in
// edsImpl is triggered with the update from a new service config.
func (s) TestClusterNameUpdateInAddressAttributes(t *testing.T) {
edsLBCh := testutils.NewChannel()
xdsC, cleanup := setup(edsLBCh)
defer cleanup()
builder := balancer.Get(edsName)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
}
defer edsB.Close()
// Update should trigger counter update with provided service name.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{
ClusterName: "foobar-1",
},
}); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
gotCluster, err := xdsC.WaitForWatchEDS(ctx)
if err != nil || gotCluster != "foobar-1" {
t.Fatalf("unexpected EDS watch: %v, %v", gotCluster, err)
}
edsI := edsB.(*edsBalancer).edsImpl.(*fakeEDSBalancer)
if err := edsI.waitForClusterNameUpdate(ctx, "foobar-1"); err != nil {
t.Fatal(err)
}
// Update should trigger counter update with provided service name.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{
ClusterName: "foobar-2",
},
}); err != nil {
t.Fatal(err)
}
if err := xdsC.WaitForCancelEDSWatch(ctx); err != nil {
t.Fatalf("failed to wait for EDS cancel: %v", err)
}
gotCluster2, err := xdsC.WaitForWatchEDS(ctx)
if err != nil || gotCluster2 != "foobar-2" {
t.Fatalf("unexpected EDS watch: %v, %v", gotCluster2, err)
}
if err := edsI.waitForClusterNameUpdate(ctx, "foobar-2"); err != nil {
t.Fatal(err)
}
type fakeBalancerBuilder struct {
name string
}
func (b *fakeBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return &fakeBalancer{cc: cc}
}
func (b *fakeBalancerBuilder) Name() string {
return b.name
}
type fakeBalancer struct {
cc balancer.ClientConn
}
func (b *fakeBalancer) ResolverError(error) {
panic("implement me")
}
func (b *fakeBalancer) UpdateClientConnState(balancer.ClientConnState) error {
panic("implement me")
}
func (b *fakeBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {
panic("implement me")
}
func (b *fakeBalancer) Close() {}
func (s) TestBalancerConfigParsing(t *testing.T) {
const testEDSName = "eds.service"
var testLRSName = "lrs.server"
@ -910,30 +676,3 @@ func (s) TestBalancerConfigParsing(t *testing.T) {
})
}
}
func (s) TestEqualStringPointers(t *testing.T) {
var (
ta1 = "test-a"
ta2 = "test-a"
tb = "test-b"
)
tests := []struct {
name string
a *string
b *string
want bool
}{
{"both-nil", nil, nil, true},
{"a-non-nil", &ta1, nil, false},
{"b-non-nil", nil, &tb, false},
{"equal", &ta1, &ta2, true},
{"different", &ta1, &tb, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := equalStringPointers(tt.a, tt.b); got != tt.want {
t.Errorf("equalStringPointers() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -1,3 +1,5 @@
// +build go1.12
/*
* Copyright 2020 gRPC authors.
*
@ -19,13 +21,17 @@ package edsbalancer
import (
"fmt"
"net"
"reflect"
"strconv"
"time"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
typepb "github.com/envoyproxy/go-control-plane/envoy/type"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient"
)
@ -111,3 +117,70 @@ func parseEndpoints(lbEndpoints []*endpointpb.LbEndpoint) []xdsclient.Endpoint {
}
return endpoints
}
// testPickerFromCh receives pickers from the channel, and check if their
// behaviors are as expected (that the given function returns nil err).
//
// It returns nil if one picker has the correct behavior.
//
// It returns error when there's no picker from channel after 1 second timeout,
// and the error returned is the mismatch error from the previous picker.
func testPickerFromCh(ch chan balancer.Picker, f func(balancer.Picker) error) error {
var (
p balancer.Picker
err error
)
for {
select {
case p = <-ch:
case <-time.After(defaultTestTimeout):
// TODO: this function should take a context, and use the context
// here, instead of making a new timer.
return fmt.Errorf("timeout waiting for picker with expected behavior, error from previous picker: %v", err)
}
err = f(p)
if err == nil {
return nil
}
}
}
func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
return func() balancer.SubConn {
scst, _ := p.Pick(balancer.PickInfo{})
return scst.SubConn
}
}
// testRoundRobinPickerFromCh receives pickers from the channel, and check if
// their behaviors are round-robin of want.
//
// It returns nil if one picker has the correct behavior.
//
// It returns error when there's no picker from channel after 1 second timeout,
// and the error returned is the mismatch error from the previous picker.
func testRoundRobinPickerFromCh(ch chan balancer.Picker, want []balancer.SubConn) error {
return testPickerFromCh(ch, func(p balancer.Picker) error {
return testutils.IsRoundRobin(want, subConnFromPicker(p))
})
}
// testErrPickerFromCh receives pickers from the channel, and check if they
// return the wanted error.
//
// It returns nil if one picker has the correct behavior.
//
// It returns error when there's no picker from channel after 1 second timeout,
// and the error returned is the mismatch error from the previous picker.
func testErrPickerFromCh(ch chan balancer.Picker, want error) error {
return testPickerFromCh(ch, func(p balancer.Picker) error {
for i := 0; i < 5; i++ {
_, err := p.Pick(balancer.PickInfo{})
if !reflect.DeepEqual(err, want) {
return fmt.Errorf("picker.Pick, got err %q, want err %q", err, want)
}
}
return nil
})
}

View File

@ -0,0 +1,87 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package edsbalancer
import (
"google.golang.org/grpc/xds/internal/xdsclient"
)
// watchUpdate wraps the information received from a registered EDS watcher. A
// non-nil error is propagated to the underlying child balancer. A valid update
// results in creating a new child balancer (priority balancer, if one doesn't
// already exist) and pushing the updated balancer config to it.
type watchUpdate struct {
eds xdsclient.EndpointsUpdate
err error
}
// edsWatcher takes an EDS balancer config, and use the xds_client to watch EDS
// updates. The EDS updates are passed back to the balancer via a channel.
type edsWatcher struct {
parent *edsBalancer
updateChannel chan *watchUpdate
edsToWatch string
edsCancel func()
}
func (ew *edsWatcher) updateConfig(config *EDSConfig) {
// If EDSServiceName is set, use it to watch EDS. Otherwise, use the cluster
// name.
newEDSToWatch := config.EDSServiceName
if newEDSToWatch == "" {
newEDSToWatch = config.ClusterName
}
if ew.edsToWatch == newEDSToWatch {
return
}
// Restart EDS watch when the eds name to watch has changed.
ew.edsToWatch = newEDSToWatch
if ew.edsCancel != nil {
ew.edsCancel()
}
cancelEDSWatch := ew.parent.xdsClient.WatchEndpoints(newEDSToWatch, func(update xdsclient.EndpointsUpdate, err error) {
select {
case <-ew.updateChannel:
default:
}
ew.updateChannel <- &watchUpdate{eds: update, err: err}
})
ew.parent.logger.Infof("Watch started on resource name %v with xds-client %p", newEDSToWatch, ew.parent.xdsClient)
ew.edsCancel = func() {
cancelEDSWatch()
ew.parent.logger.Infof("Watch cancelled on resource name %v with xds-client %p", newEDSToWatch, ew.parent.xdsClient)
}
}
// stopWatch stops the EDS watch.
//
// Call to updateConfig will restart the watch with the new name.
func (ew *edsWatcher) stopWatch() {
if ew.edsCancel != nil {
ew.edsCancel()
ew.edsCancel = nil
}
ew.edsToWatch = ""
}

View File

@ -1,44 +0,0 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edsbalancer
import (
"google.golang.org/grpc/internal/wrr"
"google.golang.org/grpc/xds/internal/xdsclient"
)
var newRandomWRR = wrr.NewRandom
type dropper struct {
c xdsclient.OverloadDropConfig
w wrr.WRR
}
func newDropper(c xdsclient.OverloadDropConfig) *dropper {
w := newRandomWRR()
w.Add(true, int64(c.Numerator))
w.Add(false, int64(c.Denominator-c.Numerator))
return &dropper{
c: c,
w: w,
}
}
func (d *dropper) drop() (ret bool) {
return d.w.Next().(bool)
}

View File

@ -1,90 +0,0 @@
// +build go1.12
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edsbalancer
import (
"testing"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient"
)
func init() {
newRandomWRR = testutils.NewTestWRR
}
func (s) TestDropper(t *testing.T) {
const repeat = 2
type args struct {
numerator uint32
denominator uint32
}
tests := []struct {
name string
args args
}{
{
name: "2_3",
args: args{
numerator: 2,
denominator: 3,
},
},
{
name: "4_8",
args: args{
numerator: 4,
denominator: 8,
},
},
{
name: "7_20",
args: args{
numerator: 7,
denominator: 20,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := newDropper(xdsclient.OverloadDropConfig{
Category: "",
Numerator: tt.args.numerator,
Denominator: tt.args.denominator,
})
var (
dCount int
wantCount = int(tt.args.numerator) * repeat
loopCount = int(tt.args.denominator) * repeat
)
for i := 0; i < loopCount; i++ {
if d.drop() {
dCount++
}
}
if dCount != (wantCount) {
t.Errorf("with numerator %v, denominator %v repeat %v, got drop count: %v, want %v",
tt.args.numerator, tt.args.denominator, repeat, dCount, wantCount)
}
})
}
}

View File

@ -1,74 +0,0 @@
// +build go1.12
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package edsbalancer
import (
"context"
"testing"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
)
// TestXDSLoadReporting verifies that the edsBalancer starts the loadReport
// stream when the lbConfig passed to it contains a valid value for the LRS
// server (empty string).
func (s) TestXDSLoadReporting(t *testing.T) {
xdsC := fakeclient.NewClient()
defer xdsC.Close()
builder := balancer.Get(edsName)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
}
defer edsB.Close()
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{
EDSServiceName: testEDSClusterName,
LrsLoadReportingServerName: new(string),
},
}); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
gotCluster, err := xdsC.WaitForWatchEDS(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
if gotCluster != testEDSClusterName {
t.Fatalf("xdsClient.WatchEndpoints() called with cluster: %v, want %v", gotCluster, testEDSClusterName)
}
got, err := xdsC.WaitForReportLoad(ctx)
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != "" {
t.Fatalf("xdsClient.ReportLoad called with {%v}: want {\"\"}", got.Server)
}
}

View File

@ -1,46 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edsbalancer
import "google.golang.org/grpc/balancer"
// The old xds balancer implements logic for both CDS and EDS. With the new
// design, CDS is split and moved to a separate balancer, and the xds balancer
// becomes the EDS balancer.
//
// To keep the existing tests working, this file regisger EDS balancer under the
// old xds balancer name.
//
// TODO: delete this file when migration to new workflow (LDS, RDS, CDS, EDS) is
// done.
const xdsName = "xds_experimental"
func init() {
balancer.Register(xdsBalancerBuilder{})
}
// xdsBalancerBuilder register edsBalancerBuilder (now with name
// "eds_experimental") under the old name "xds_experimental".
type xdsBalancerBuilder struct {
bb
}
func (xdsBalancerBuilder) Name() string {
return xdsName
}

View File

@ -28,7 +28,8 @@ import (
)
var (
errAllPrioritiesRemoved = errors.New("no locality is provided, all priorities are removed")
// ErrAllPrioritiesRemoved is returned by the picker when there's no priority available.
ErrAllPrioritiesRemoved = errors.New("no priority is provided, all priorities are removed")
// DefaultPriorityInitTimeout is the timeout after which if a priority is
// not READY, the next will be started. It's exported to be overridden by
// tests.
@ -73,7 +74,7 @@ func (b *priorityBalancer) syncPriority() {
b.stopPriorityInitTimer()
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: base.NewErrPicker(errAllPrioritiesRemoved),
Picker: base.NewErrPicker(ErrAllPrioritiesRemoved),
})
return
}

View File

@ -828,8 +828,8 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) {
// Test pick return TransientFailure.
pFail := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if _, err := pFail.Pick(balancer.PickInfo{}); err != errAllPrioritiesRemoved {
t.Fatalf("want pick error %v, got %v", errAllPrioritiesRemoved, err)
if _, err := pFail.Pick(balancer.PickInfo{}); err != ErrAllPrioritiesRemoved {
t.Fatalf("want pick error %v, got %v", ErrAllPrioritiesRemoved, err)
}
}
@ -1436,8 +1436,8 @@ func (s) TestPriority_ReadyChildRemovedButInCache(t *testing.T) {
pFail := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if _, err := pFail.Pick(balancer.PickInfo{}); err != errAllPrioritiesRemoved {
t.Fatalf("want pick error %v, got %v", errAllPrioritiesRemoved, err)
if _, err := pFail.Pick(balancer.PickInfo{}); err != ErrAllPrioritiesRemoved {
t.Fatalf("want pick error %v, got %v", ErrAllPrioritiesRemoved, err)
}
}

View File

@ -84,3 +84,11 @@ func ClearCounterForTesting(serviceName string) {
}
c.numRequests = 0
}
// ClearAllCountersForTesting clears all the counters. Should be only used in
// tests.
func ClearAllCountersForTesting() {
src.mu.Lock()
defer src.mu.Unlock()
src.services = make(map[string]*ServiceRequestsCounter)
}