xds: standardize builder type names (bb) and balancer receiver names (b) (#4517)

This commit is contained in:
Doug Fawley 2021-06-04 11:40:23 -07:00 committed by GitHub
parent 7beddeea91
commit 7f9eeeae36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 254 additions and 261 deletions

View File

@ -64,17 +64,16 @@ var (
)
func init() {
balancer.Register(cdsBB{})
balancer.Register(bb{})
}
// cdsBB (short for cdsBalancerBuilder) implements the balancer.Builder
// interface to help build a cdsBalancer.
// bb implements the balancer.Builder interface to help build a cdsBalancer.
// It also implements the balancer.ConfigParser interface to help parse the
// JSON service config, to be passed to the cdsBalancer.
type cdsBB struct{}
type bb struct{}
// Build creates a new CDS balancer with the ClientConn.
func (cdsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
b := &cdsBalancer{
bOpts: opts,
updateCh: buffer.NewUnbounded(),
@ -117,7 +116,7 @@ func (cdsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
}
// Name returns the name of balancers built by this builder.
func (cdsBB) Name() string {
func (bb) Name() string {
return cdsName
}
@ -130,7 +129,7 @@ type lbConfig struct {
// ParseConfig parses the JSON load balancer config provided into an
// internal form or returns an error if the config is invalid.
func (cdsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
var cfg lbConfig
if err := json.Unmarshal(c, &cfg); err != nil {
return nil, fmt.Errorf("xds: unable to unmarshal lbconfig: %s, error: %v", string(c), err)

View File

@ -49,14 +49,14 @@ const (
)
func init() {
balancer.Register(clusterImplBB{})
balancer.Register(bb{})
}
var newXDSClient func() (xdsClient, error)
type clusterImplBB struct{}
type bb struct{}
func (clusterImplBB) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &clusterImplBalancer{
ClientConn: cc,
bOpts: bOpts,
@ -83,11 +83,11 @@ func (clusterImplBB) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions)
return b
}
func (clusterImplBB) Name() string {
func (bb) Name() string {
return Name
}
func (clusterImplBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return parseConfig(c)
}
@ -139,20 +139,20 @@ type clusterImplBalancer struct {
// updateLoadStore checks the config for load store, and decides whether it
// needs to restart the load reporting stream.
func (cib *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
var updateLoadClusterAndService bool
// ClusterName is different, restart. ClusterName is from ClusterName and
// EDSServiceName.
clusterName := cib.getClusterName()
clusterName := b.getClusterName()
if clusterName != newConfig.Cluster {
updateLoadClusterAndService = true
cib.setClusterName(newConfig.Cluster)
b.setClusterName(newConfig.Cluster)
clusterName = newConfig.Cluster
}
if cib.edsServiceName != newConfig.EDSServiceName {
if b.edsServiceName != newConfig.EDSServiceName {
updateLoadClusterAndService = true
cib.edsServiceName = newConfig.EDSServiceName
b.edsServiceName = newConfig.EDSServiceName
}
if updateLoadClusterAndService {
// This updates the clusterName and serviceName that will be reported
@ -163,7 +163,7 @@ func (cib *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
// On the other hand, this will almost never happen. Each LRS policy
// shouldn't get updated config. The parent should do a graceful switch
// when the clusterName or serviceName is changed.
cib.loadWrapper.UpdateClusterAndService(clusterName, cib.edsServiceName)
b.loadWrapper.UpdateClusterAndService(clusterName, b.edsServiceName)
}
// Check if it's necessary to restart load report.
@ -171,31 +171,31 @@ func (cib *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
if newConfig.LoadReportingServerName != nil {
newLRSServerName = *newConfig.LoadReportingServerName
}
if cib.lrsServerName != newLRSServerName {
if b.lrsServerName != newLRSServerName {
// LoadReportingServerName is different, load should be report to a
// different server, restart.
cib.lrsServerName = newLRSServerName
if cib.cancelLoadReport != nil {
cib.cancelLoadReport()
cib.cancelLoadReport = nil
b.lrsServerName = newLRSServerName
if b.cancelLoadReport != nil {
b.cancelLoadReport()
b.cancelLoadReport = nil
}
var loadStore *load.Store
if cib.xdsC != nil {
loadStore, cib.cancelLoadReport = cib.xdsC.ReportLoad(cib.lrsServerName)
if b.xdsC != nil {
loadStore, b.cancelLoadReport = b.xdsC.ReportLoad(b.lrsServerName)
}
cib.loadWrapper.UpdateLoadStore(loadStore)
b.loadWrapper.UpdateLoadStore(loadStore)
}
return nil
}
func (cib *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
if cib.closed.HasFired() {
cib.logger.Warningf("xds: received ClientConnState {%+v} after clusterImplBalancer was closed", s)
func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
if b.closed.HasFired() {
b.logger.Warningf("xds: received ClientConnState {%+v} after clusterImplBalancer was closed", s)
return nil
}
cib.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(s.BalancerConfig))
b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(s.BalancerConfig))
newConfig, ok := s.BalancerConfig.(*LBConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
@ -209,36 +209,36 @@ func (cib *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState
return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name)
}
if cib.xdsC == nil {
if b.xdsC == nil {
c := xdsclient.FromResolverState(s.ResolverState)
if c == nil {
return balancer.ErrBadResolverState
}
cib.xdsC = c
b.xdsC = c
}
// Update load reporting config. This needs to be done before updating the
// child policy because we need the loadStore from the updated client to be
// passed to the ccWrapper, so that the next picker from the child policy
// will pick up the new loadStore.
if err := cib.updateLoadStore(newConfig); err != nil {
if err := b.updateLoadStore(newConfig); err != nil {
return err
}
// Compare new drop config. And update picker if it's changed.
var updatePicker bool
if cib.config == nil || !equalDropCategories(cib.config.DropCategories, newConfig.DropCategories) {
cib.drops = make([]*dropper, 0, len(newConfig.DropCategories))
if b.config == nil || !equalDropCategories(b.config.DropCategories, newConfig.DropCategories) {
b.drops = make([]*dropper, 0, len(newConfig.DropCategories))
for _, c := range newConfig.DropCategories {
cib.drops = append(cib.drops, newDropper(c))
b.drops = append(b.drops, newDropper(c))
}
updatePicker = true
}
// Compare cluster name. And update picker if it's changed, because circuit
// breaking's stream counter will be different.
if cib.config == nil || cib.config.Cluster != newConfig.Cluster {
cib.requestCounter = xdsclient.GetServiceRequestsCounter(newConfig.Cluster)
if b.config == nil || b.config.Cluster != newConfig.Cluster {
b.requestCounter = xdsclient.GetServiceRequestsCounter(newConfig.Cluster)
updatePicker = true
}
// Compare upper bound of stream count. And update picker if it's changed.
@ -247,29 +247,29 @@ func (cib *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState
if newConfig.MaxConcurrentRequests != nil {
newRequestCountMax = *newConfig.MaxConcurrentRequests
}
if cib.requestCountMax != newRequestCountMax {
cib.requestCountMax = newRequestCountMax
if b.requestCountMax != newRequestCountMax {
b.requestCountMax = newRequestCountMax
updatePicker = true
}
if updatePicker {
cib.pickerUpdateCh.Put(&dropConfigs{
drops: cib.drops,
requestCounter: cib.requestCounter,
requestCountMax: cib.requestCountMax,
b.pickerUpdateCh.Put(&dropConfigs{
drops: b.drops,
requestCounter: b.requestCounter,
requestCountMax: b.requestCountMax,
})
}
// If child policy is a different type, recreate the sub-balancer.
if cib.config == nil || cib.config.ChildPolicy.Name != newConfig.ChildPolicy.Name {
if cib.childLB != nil {
cib.childLB.Close()
if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name {
if b.childLB != nil {
b.childLB.Close()
}
cib.childLB = bb.Build(cib, cib.bOpts)
b.childLB = bb.Build(b, b.bOpts)
}
cib.config = newConfig
b.config = newConfig
if cib.childLB == nil {
if b.childLB == nil {
// This is not an expected situation, and should be super rare in
// practice.
//
@ -280,26 +280,26 @@ func (cib *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState
}
// Addresses and sub-balancer config are sent to sub-balancer.
return cib.childLB.UpdateClientConnState(balancer.ClientConnState{
return b.childLB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: s.ResolverState,
BalancerConfig: cib.config.ChildPolicy.Config,
BalancerConfig: b.config.ChildPolicy.Config,
})
}
func (cib *clusterImplBalancer) ResolverError(err error) {
if cib.closed.HasFired() {
cib.logger.Warningf("xds: received resolver error {%+v} after clusterImplBalancer was closed", err)
func (b *clusterImplBalancer) ResolverError(err error) {
if b.closed.HasFired() {
b.logger.Warningf("xds: received resolver error {%+v} after clusterImplBalancer was closed", err)
return
}
if cib.childLB != nil {
cib.childLB.ResolverError(err)
if b.childLB != nil {
b.childLB.ResolverError(err)
}
}
func (cib *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
if cib.closed.HasFired() {
cib.logger.Warningf("xds: received subconn state change {%+v, %+v} after clusterImplBalancer was closed", sc, s)
func (b *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
if b.closed.HasFired() {
b.logger.Warningf("xds: received subconn state change {%+v, %+v} after clusterImplBalancer was closed", sc, s)
return
}
@ -311,65 +311,65 @@ func (cib *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balanc
// knows). The parent priority policy is configured to ignore re-resolution
// signal from the EDS children.
if s.ConnectivityState == connectivity.TransientFailure {
cib.ClientConn.ResolveNow(resolver.ResolveNowOptions{})
b.ClientConn.ResolveNow(resolver.ResolveNowOptions{})
}
if cib.childLB != nil {
cib.childLB.UpdateSubConnState(sc, s)
if b.childLB != nil {
b.childLB.UpdateSubConnState(sc, s)
}
}
func (cib *clusterImplBalancer) Close() {
cib.mu.Lock()
cib.closed.Fire()
cib.mu.Unlock()
func (b *clusterImplBalancer) Close() {
b.mu.Lock()
b.closed.Fire()
b.mu.Unlock()
if cib.childLB != nil {
cib.childLB.Close()
cib.childLB = nil
if b.childLB != nil {
b.childLB.Close()
b.childLB = nil
}
if newXDSClient != nil {
cib.xdsC.Close()
b.xdsC.Close()
}
<-cib.done.Done()
cib.logger.Infof("Shutdown")
<-b.done.Done()
b.logger.Infof("Shutdown")
}
// Override methods to accept updates from the child LB.
func (cib *clusterImplBalancer) UpdateState(state balancer.State) {
func (b *clusterImplBalancer) UpdateState(state balancer.State) {
// Instead of updating parent ClientConn inline, send state to run().
cib.pickerUpdateCh.Put(state)
b.pickerUpdateCh.Put(state)
}
func (cib *clusterImplBalancer) setClusterName(n string) {
cib.clusterNameMu.Lock()
defer cib.clusterNameMu.Unlock()
cib.clusterName = n
func (b *clusterImplBalancer) setClusterName(n string) {
b.clusterNameMu.Lock()
defer b.clusterNameMu.Unlock()
b.clusterName = n
}
func (cib *clusterImplBalancer) getClusterName() string {
cib.clusterNameMu.Lock()
defer cib.clusterNameMu.Unlock()
return cib.clusterName
func (b *clusterImplBalancer) getClusterName() string {
b.clusterNameMu.Lock()
defer b.clusterNameMu.Unlock()
return b.clusterName
}
func (cib *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
clusterName := cib.getClusterName()
func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
clusterName := b.getClusterName()
newAddrs := make([]resolver.Address, len(addrs))
for i, addr := range addrs {
newAddrs[i] = internal.SetXDSHandshakeClusterName(addr, clusterName)
}
return cib.ClientConn.NewSubConn(newAddrs, opts)
return b.ClientConn.NewSubConn(newAddrs, opts)
}
func (cib *clusterImplBalancer) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
clusterName := cib.getClusterName()
func (b *clusterImplBalancer) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
clusterName := b.getClusterName()
newAddrs := make([]resolver.Address, len(addrs))
for i, addr := range addrs {
newAddrs[i] = internal.SetXDSHandshakeClusterName(addr, clusterName)
}
cib.ClientConn.UpdateAddresses(sc, newAddrs)
b.ClientConn.UpdateAddresses(sc, newAddrs)
}
type dropConfigs struct {
@ -378,40 +378,40 @@ type dropConfigs struct {
requestCountMax uint32
}
func (cib *clusterImplBalancer) run() {
defer cib.done.Fire()
func (b *clusterImplBalancer) run() {
defer b.done.Fire()
for {
select {
case update := <-cib.pickerUpdateCh.Get():
cib.pickerUpdateCh.Load()
cib.mu.Lock()
if cib.closed.HasFired() {
cib.mu.Unlock()
case update := <-b.pickerUpdateCh.Get():
b.pickerUpdateCh.Load()
b.mu.Lock()
if b.closed.HasFired() {
b.mu.Unlock()
return
}
switch u := update.(type) {
case balancer.State:
cib.childState = u
cib.ClientConn.UpdateState(balancer.State{
ConnectivityState: cib.childState.ConnectivityState,
Picker: newDropPicker(cib.childState, &dropConfigs{
drops: cib.drops,
requestCounter: cib.requestCounter,
requestCountMax: cib.requestCountMax,
}, cib.loadWrapper),
b.childState = u
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
Picker: newDropPicker(b.childState, &dropConfigs{
drops: b.drops,
requestCounter: b.requestCounter,
requestCountMax: b.requestCountMax,
}, b.loadWrapper),
})
case *dropConfigs:
cib.drops = u.drops
cib.requestCounter = u.requestCounter
if cib.childState.Picker != nil {
cib.ClientConn.UpdateState(balancer.State{
ConnectivityState: cib.childState.ConnectivityState,
Picker: newDropPicker(cib.childState, u, cib.loadWrapper),
b.drops = u.drops
b.requestCounter = u.requestCounter
if b.childState.Picker != nil {
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
Picker: newDropPicker(b.childState, u, b.loadWrapper),
})
}
}
cib.mu.Unlock()
case <-cib.closed.Done():
b.mu.Unlock()
case <-b.closed.Done():
return
}
}

View File

@ -36,12 +36,12 @@ import (
const balancerName = "xds_cluster_manager_experimental"
func init() {
balancer.Register(builder{})
balancer.Register(bb{})
}
type builder struct{}
type bb struct{}
func (builder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
b := &bal{}
b.logger = prefixLogger(b)
b.stateAggregator = newBalancerStateAggregator(cc, b.logger)
@ -52,11 +52,11 @@ func (builder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balance
return b
}
func (builder) Name() string {
func (bb) Name() string {
return balancerName
}
func (builder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return parseConfig(c)
}

View File

@ -57,13 +57,12 @@ var (
)
func init() {
balancer.Register(&edsBalancerBuilder{})
balancer.Register(bb{})
}
type edsBalancerBuilder struct{}
type bb struct{}
// Build helps implement the balancer.Builder interface.
func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
x := &edsBalancer{
cc: cc,
closed: grpcsync.NewEvent(),
@ -92,11 +91,11 @@ func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOp
return x
}
func (b *edsBalancerBuilder) Name() string {
func (bb) Name() string {
return edsName
}
func (b *edsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
var cfg EDSConfig
if err := json.Unmarshal(c, &cfg); err != nil {
return nil, fmt.Errorf("unable to unmarshal balancer config %s into EDSConfig, error: %v", string(c), err)
@ -162,25 +161,25 @@ type edsBalancer struct {
// updates from grpc, xdsClient and load balancer. It synchronizes the
// operations that happen inside edsBalancer. It exits when edsBalancer is
// closed.
func (x *edsBalancer) run() {
func (b *edsBalancer) run() {
for {
select {
case update := <-x.grpcUpdate:
x.handleGRPCUpdate(update)
case update := <-x.xdsClientUpdate:
x.handleXDSClientUpdate(update)
case update := <-x.childPolicyUpdate.Get():
x.childPolicyUpdate.Load()
case update := <-b.grpcUpdate:
b.handleGRPCUpdate(update)
case update := <-b.xdsClientUpdate:
b.handleXDSClientUpdate(update)
case update := <-b.childPolicyUpdate.Get():
b.childPolicyUpdate.Load()
u := update.(*balancerStateWithPriority)
x.edsImpl.updateState(u.priority, u.s)
case <-x.closed.Done():
x.cancelWatch()
b.edsImpl.updateState(u.priority, u.s)
case <-b.closed.Done():
b.cancelWatch()
if newXDSClient != nil {
x.xdsClient.Close()
b.xdsClient.Close()
}
x.edsImpl.close()
x.logger.Infof("Shutdown")
x.done.Fire()
b.edsImpl.close()
b.logger.Infof("Shutdown")
b.done.Fire()
return
}
}
@ -199,68 +198,68 @@ func (x *edsBalancer) run() {
// watcher should keep watching.
// In both cases, the sub-balancers will be closed, and the future picks will
// fail.
func (x *edsBalancer) handleErrorFromUpdate(err error, fromParent bool) {
x.logger.Warningf("Received error: %v", err)
func (b *edsBalancer) handleErrorFromUpdate(err error, fromParent bool) {
b.logger.Warningf("Received error: %v", err)
if xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound {
if fromParent {
// This is an error from the parent ClientConn (can be the parent
// CDS balancer), and is a resource-not-found error. This means the
// resource (can be either LDS or CDS) was removed. Stop the EDS
// watch.
x.cancelWatch()
b.cancelWatch()
}
x.edsImpl.handleEDSResponse(xdsclient.EndpointsUpdate{})
b.edsImpl.handleEDSResponse(xdsclient.EndpointsUpdate{})
}
}
func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
func (b *edsBalancer) handleGRPCUpdate(update interface{}) {
switch u := update.(type) {
case *subConnStateUpdate:
x.edsImpl.handleSubConnStateChange(u.sc, u.state.ConnectivityState)
b.edsImpl.handleSubConnStateChange(u.sc, u.state.ConnectivityState)
case *balancer.ClientConnState:
x.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(u.BalancerConfig))
b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(u.BalancerConfig))
cfg, _ := u.BalancerConfig.(*EDSConfig)
if cfg == nil {
// service config parsing failed. should never happen.
return
}
if err := x.handleServiceConfigUpdate(cfg); err != nil {
x.logger.Warningf("failed to update xDS client: %v", err)
if err := b.handleServiceConfigUpdate(cfg); err != nil {
b.logger.Warningf("failed to update xDS client: %v", err)
}
x.edsImpl.updateServiceRequestsConfig(cfg.ClusterName, cfg.MaxConcurrentRequests)
b.edsImpl.updateServiceRequestsConfig(cfg.ClusterName, cfg.MaxConcurrentRequests)
// We will update the edsImpl with the new child policy, if we got a
// different one.
if !cmp.Equal(cfg.ChildPolicy, x.config.ChildPolicy, cmpopts.EquateEmpty()) {
if !cmp.Equal(cfg.ChildPolicy, b.config.ChildPolicy, cmpopts.EquateEmpty()) {
if cfg.ChildPolicy != nil {
x.edsImpl.handleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config)
b.edsImpl.handleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config)
} else {
x.edsImpl.handleChildPolicy(roundrobin.Name, nil)
b.edsImpl.handleChildPolicy(roundrobin.Name, nil)
}
}
x.config = cfg
b.config = cfg
case error:
x.handleErrorFromUpdate(u, true)
b.handleErrorFromUpdate(u, true)
default:
// unreachable path
x.logger.Errorf("wrong update type: %T", update)
b.logger.Errorf("wrong update type: %T", update)
}
}
// handleServiceConfigUpdate applies the service config update, watching a new
// EDS service name and restarting LRS stream, as required.
func (x *edsBalancer) handleServiceConfigUpdate(config *EDSConfig) error {
func (b *edsBalancer) handleServiceConfigUpdate(config *EDSConfig) error {
var updateLoadClusterAndService bool
if x.clusterName != config.ClusterName {
if b.clusterName != config.ClusterName {
updateLoadClusterAndService = true
x.clusterName = config.ClusterName
x.edsImpl.updateClusterName(x.clusterName)
b.clusterName = config.ClusterName
b.edsImpl.updateClusterName(b.clusterName)
}
if x.edsServiceName != config.EDSServiceName {
if b.edsServiceName != config.EDSServiceName {
updateLoadClusterAndService = true
x.edsServiceName = config.EDSServiceName
b.edsServiceName = config.EDSServiceName
}
// If EDSServiceName is set, use it to watch EDS. Otherwise, use the cluster
@ -270,14 +269,14 @@ func (x *edsBalancer) handleServiceConfigUpdate(config *EDSConfig) error {
newEDSToWatch = config.ClusterName
}
var restartEDSWatch bool
if x.edsToWatch != newEDSToWatch {
if b.edsToWatch != newEDSToWatch {
restartEDSWatch = true
x.edsToWatch = newEDSToWatch
b.edsToWatch = newEDSToWatch
}
// Restart EDS watch when the eds name has changed.
if restartEDSWatch {
x.startEndpointsWatch()
b.startEndpointsWatch()
}
if updateLoadClusterAndService {
@ -288,13 +287,13 @@ func (x *edsBalancer) handleServiceConfigUpdate(config *EDSConfig) error {
//
// This is OK for now, because we don't actually expect edsServiceName
// to change. Fix this (a bigger change) will happen later.
x.loadWrapper.UpdateClusterAndService(x.clusterName, x.edsServiceName)
b.loadWrapper.UpdateClusterAndService(b.clusterName, b.edsServiceName)
}
// Restart load reporting when the loadReportServer name has changed.
if !equalStringPointers(x.loadReportServer, config.LrsLoadReportingServerName) {
loadStore := x.startLoadReport(config.LrsLoadReportingServerName)
x.loadWrapper.UpdateLoadStore(loadStore)
if !equalStringPointers(b.loadReportServer, config.LrsLoadReportingServerName) {
loadStore := b.startLoadReport(config.LrsLoadReportingServerName)
b.loadWrapper.UpdateLoadStore(loadStore)
}
return nil
@ -304,32 +303,32 @@ func (x *edsBalancer) handleServiceConfigUpdate(config *EDSConfig) error {
//
// This usually means load report needs to be restarted, but this function does
// NOT do that. Caller needs to call startLoadReport separately.
func (x *edsBalancer) startEndpointsWatch() {
if x.cancelEndpointsWatch != nil {
x.cancelEndpointsWatch()
func (b *edsBalancer) startEndpointsWatch() {
if b.cancelEndpointsWatch != nil {
b.cancelEndpointsWatch()
}
edsToWatch := x.edsToWatch
cancelEDSWatch := x.xdsClient.WatchEndpoints(edsToWatch, func(update xdsclient.EndpointsUpdate, err error) {
x.logger.Infof("Watch update from xds-client %p, content: %+v", x.xdsClient, pretty.ToJSON(update))
x.handleEDSUpdate(update, err)
edsToWatch := b.edsToWatch
cancelEDSWatch := b.xdsClient.WatchEndpoints(edsToWatch, func(update xdsclient.EndpointsUpdate, err error) {
b.logger.Infof("Watch update from xds-client %p, content: %+v", b.xdsClient, pretty.ToJSON(update))
b.handleEDSUpdate(update, err)
})
x.logger.Infof("Watch started on resource name %v with xds-client %p", edsToWatch, x.xdsClient)
x.cancelEndpointsWatch = func() {
b.logger.Infof("Watch started on resource name %v with xds-client %p", edsToWatch, b.xdsClient)
b.cancelEndpointsWatch = func() {
cancelEDSWatch()
x.logger.Infof("Watch cancelled on resource name %v with xds-client %p", edsToWatch, x.xdsClient)
b.logger.Infof("Watch cancelled on resource name %v with xds-client %p", edsToWatch, b.xdsClient)
}
}
func (x *edsBalancer) cancelWatch() {
x.loadReportServer = nil
if x.cancelLoadReport != nil {
x.cancelLoadReport()
x.cancelLoadReport = nil
func (b *edsBalancer) cancelWatch() {
b.loadReportServer = nil
if b.cancelLoadReport != nil {
b.cancelLoadReport()
b.cancelLoadReport = nil
}
if x.cancelEndpointsWatch != nil {
x.edsToWatch = ""
x.cancelEndpointsWatch()
x.cancelEndpointsWatch = nil
if b.cancelEndpointsWatch != nil {
b.edsToWatch = ""
b.cancelEndpointsWatch()
b.cancelEndpointsWatch = nil
}
}
@ -339,26 +338,26 @@ func (x *edsBalancer) cancelWatch() {
// Caller can cal this when the loadReportServer name changes, but
// edsServiceName doesn't (so we only need to restart load reporting, not EDS
// watch).
func (x *edsBalancer) startLoadReport(loadReportServer *string) *load.Store {
x.loadReportServer = loadReportServer
if x.cancelLoadReport != nil {
x.cancelLoadReport()
x.cancelLoadReport = nil
func (b *edsBalancer) startLoadReport(loadReportServer *string) *load.Store {
b.loadReportServer = loadReportServer
if b.cancelLoadReport != nil {
b.cancelLoadReport()
b.cancelLoadReport = nil
}
if loadReportServer == nil {
return nil
}
ls, cancel := x.xdsClient.ReportLoad(*loadReportServer)
x.cancelLoadReport = cancel
ls, cancel := b.xdsClient.ReportLoad(*loadReportServer)
b.cancelLoadReport = cancel
return ls
}
func (x *edsBalancer) handleXDSClientUpdate(update *edsUpdate) {
func (b *edsBalancer) handleXDSClientUpdate(update *edsUpdate) {
if err := update.err; err != nil {
x.handleErrorFromUpdate(err, false)
b.handleErrorFromUpdate(err, false)
return
}
x.edsImpl.handleEDSResponse(update.resp)
b.edsImpl.handleEDSResponse(update.resp)
}
type subConnStateUpdate struct {
@ -366,36 +365,36 @@ type subConnStateUpdate struct {
state balancer.SubConnState
}
func (x *edsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
func (b *edsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
update := &subConnStateUpdate{
sc: sc,
state: state,
}
select {
case x.grpcUpdate <- update:
case <-x.closed.Done():
case b.grpcUpdate <- update:
case <-b.closed.Done():
}
}
func (x *edsBalancer) ResolverError(err error) {
func (b *edsBalancer) ResolverError(err error) {
select {
case x.grpcUpdate <- err:
case <-x.closed.Done():
case b.grpcUpdate <- err:
case <-b.closed.Done():
}
}
func (x *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
if x.xdsClient == nil {
func (b *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
if b.xdsClient == nil {
c := xdsclient.FromResolverState(s.ResolverState)
if c == nil {
return balancer.ErrBadResolverState
}
x.xdsClient = c
b.xdsClient = c
}
select {
case x.grpcUpdate <- &s:
case <-x.closed.Done():
case b.grpcUpdate <- &s:
case <-b.closed.Done():
}
return nil
}
@ -405,10 +404,10 @@ type edsUpdate struct {
err error
}
func (x *edsBalancer) handleEDSUpdate(resp xdsclient.EndpointsUpdate, err error) {
func (b *edsBalancer) handleEDSUpdate(resp xdsclient.EndpointsUpdate, err error) {
select {
case x.xdsClientUpdate <- &edsUpdate{resp: resp, err: err}:
case <-x.closed.Done():
case b.xdsClientUpdate <- &edsUpdate{resp: resp, err: err}:
case <-b.closed.Done():
}
}
@ -417,16 +416,16 @@ type balancerStateWithPriority struct {
s balancer.State
}
func (x *edsBalancer) enqueueChildBalancerState(p priorityType, s balancer.State) {
x.childPolicyUpdate.Put(&balancerStateWithPriority{
func (b *edsBalancer) enqueueChildBalancerState(p priorityType, s balancer.State) {
b.childPolicyUpdate.Put(&balancerStateWithPriority{
priority: p,
s: s,
})
}
func (x *edsBalancer) Close() {
x.closed.Fire()
<-x.done.Done()
func (b *edsBalancer) Close() {
b.closed.Fire()
<-b.done.Done()
}
// equalStringPointers returns true if

View File

@ -70,10 +70,6 @@ var (
}
)
func init() {
balancer.Register(&edsBalancerBuilder{})
}
func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
return func() balancer.SubConn {
scst, _ := p.Pick(balancer.PickInfo{})
@ -890,8 +886,7 @@ func (s) TestBalancerConfigParsing(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := &edsBalancerBuilder{}
got, err := b.ParseConfig(tt.js)
got, err := bb{}.ParseConfig(tt.js)
if (err != nil) != tt.wantErr {
t.Fatalf("edsBalancerBuilder.ParseConfig() error = %v, wantErr %v", err, tt.wantErr)
}

View File

@ -32,15 +32,15 @@ import "google.golang.org/grpc/balancer"
const xdsName = "xds_experimental"
func init() {
balancer.Register(&xdsBalancerBuilder{})
balancer.Register(xdsBalancerBuilder{})
}
// xdsBalancerBuilder register edsBalancerBuilder (now with name
// "eds_experimental") under the old name "xds_experimental".
type xdsBalancerBuilder struct {
edsBalancerBuilder
bb
}
func (b *xdsBalancerBuilder) Name() string {
func (xdsBalancerBuilder) Name() string {
return xdsName
}

View File

@ -33,7 +33,7 @@ import (
)
func init() {
balancer.Register(&lrsBB{})
balancer.Register(bb{})
}
var newXDSClient func() (xdsClient, error)
@ -41,9 +41,9 @@ var newXDSClient func() (xdsClient, error)
// Name is the name of the LRS balancer.
const Name = "lrs_experimental"
type lrsBB struct{}
type bb struct{}
func (l *lrsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
b := &lrsBalancer{
cc: cc,
buildOpts: opts,
@ -64,11 +64,11 @@ func (l *lrsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balanc
return b
}
func (l *lrsBB) Name() string {
func (bb) Name() string {
return Name
}
func (l *lrsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return parseConfig(c)
}

View File

@ -44,12 +44,12 @@ import (
const Name = "priority_experimental"
func init() {
balancer.Register(priorityBB{})
balancer.Register(bb{})
}
type priorityBB struct{}
type bb struct{}
func (priorityBB) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &priorityBalancer{
cc: cc,
done: grpcsync.NewEvent(),
@ -66,11 +66,11 @@ func (priorityBB) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) bal
return b
}
func (b priorityBB) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
func (b bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return parseConfig(s)
}
func (priorityBB) Name() string {
func (bb) Name() string {
return Name
}

View File

@ -42,12 +42,12 @@ const Name = "weighted_target_experimental"
var NewRandomWRR = wrr.NewRandom
func init() {
balancer.Register(&weightedTargetBB{})
balancer.Register(bb{})
}
type weightedTargetBB struct{}
type bb struct{}
func (wt *weightedTargetBB) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &weightedTargetBalancer{}
b.logger = prefixLogger(b)
b.stateAggregator = weightedaggregator.New(cc, b.logger, NewRandomWRR)
@ -58,11 +58,11 @@ func (wt *weightedTargetBB) Build(cc balancer.ClientConn, bOpts balancer.BuildOp
return b
}
func (wt *weightedTargetBB) Name() string {
func (bb) Name() string {
return Name
}
func (wt *weightedTargetBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return parseConfig(c)
}
@ -83,8 +83,8 @@ type weightedTargetBalancer struct {
// UpdateClientConnState takes the new targets in balancer group,
// creates/deletes sub-balancers and sends them update. Addresses are split into
// groups based on hierarchy path.
func (w *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
w.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(s.BalancerConfig))
func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(s.BalancerConfig))
newConfig, ok := s.BalancerConfig.(*LBConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
@ -94,10 +94,10 @@ func (w *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat
var rebuildStateAndPicker bool
// Remove sub-pickers and sub-balancers that are not in the new config.
for name := range w.targets {
for name := range b.targets {
if _, ok := newConfig.Targets[name]; !ok {
w.stateAggregator.Remove(name)
w.bg.Remove(name)
b.stateAggregator.Remove(name)
b.bg.Remove(name)
// Trigger a state/picker update, because we don't want `ClientConn`
// to pick this sub-balancer anymore.
rebuildStateAndPicker = true
@ -110,27 +110,27 @@ func (w *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat
//
// For all sub-balancers, forward the address/balancer config update.
for name, newT := range newConfig.Targets {
oldT, ok := w.targets[name]
oldT, ok := b.targets[name]
if !ok {
// If this is a new sub-balancer, add weights to the picker map.
w.stateAggregator.Add(name, newT.Weight)
b.stateAggregator.Add(name, newT.Weight)
// Then add to the balancer group.
w.bg.Add(name, balancer.Get(newT.ChildPolicy.Name))
b.bg.Add(name, balancer.Get(newT.ChildPolicy.Name))
// Not trigger a state/picker update. Wait for the new sub-balancer
// to send its updates.
} else if newT.ChildPolicy.Name != oldT.ChildPolicy.Name {
// If the child policy name is differet, remove from balancer group
// and re-add.
w.stateAggregator.Remove(name)
w.bg.Remove(name)
w.stateAggregator.Add(name, newT.Weight)
w.bg.Add(name, balancer.Get(newT.ChildPolicy.Name))
b.stateAggregator.Remove(name)
b.bg.Remove(name)
b.stateAggregator.Add(name, newT.Weight)
b.bg.Add(name, balancer.Get(newT.ChildPolicy.Name))
// Trigger a state/picker update, because we don't want `ClientConn`
// to pick this sub-balancer anymore.
rebuildStateAndPicker = true
} else if newT.Weight != oldT.Weight {
// If this is an existing sub-balancer, update weight if necessary.
w.stateAggregator.UpdateWeight(name, newT.Weight)
b.stateAggregator.UpdateWeight(name, newT.Weight)
// Trigger a state/picker update, because we don't want `ClientConn`
// should do picks with the new weights now.
rebuildStateAndPicker = true
@ -142,7 +142,7 @@ func (w *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat
// - Balancer config comes from the targets map.
//
// TODO: handle error? How to aggregate errors and return?
_ = w.bg.UpdateClientConnState(name, balancer.ClientConnState{
_ = b.bg.UpdateClientConnState(name, balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: addressesSplit[name],
ServiceConfig: s.ResolverState.ServiceConfig,
@ -152,23 +152,23 @@ func (w *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat
})
}
w.targets = newConfig.Targets
b.targets = newConfig.Targets
if rebuildStateAndPicker {
w.stateAggregator.BuildAndUpdate()
b.stateAggregator.BuildAndUpdate()
}
return nil
}
func (w *weightedTargetBalancer) ResolverError(err error) {
w.bg.ResolverError(err)
func (b *weightedTargetBalancer) ResolverError(err error) {
b.bg.ResolverError(err)
}
func (w *weightedTargetBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
w.bg.UpdateSubConnState(sc, state)
func (b *weightedTargetBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
b.bg.UpdateSubConnState(sc, state)
}
func (w *weightedTargetBalancer) Close() {
w.stateAggregator.Stop()
w.bg.Close()
func (b *weightedTargetBalancer) Close() {
b.stateAggregator.Stop()
b.bg.Close()
}