grpc-go/balancer/weightedroundrobin/balancer.go

547 lines
17 KiB
Go

/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package weightedroundrobin
import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"unsafe"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/balancer/weightedroundrobin/internal"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcrand"
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/orca"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
)
// Name is the name of the weighted round robin balancer.
const Name = "weighted_round_robin"
func init() {
balancer.Register(bb{})
}
type bb struct{}
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &wrrBalancer{
cc: cc,
subConns: resolver.NewAddressMap(),
csEvltr: &balancer.ConnectivityStateEvaluator{},
scMap: make(map[balancer.SubConn]*weightedSubConn),
connectivityState: connectivity.Connecting,
}
b.logger = prefixLogger(b)
b.logger.Infof("Created")
return b
}
func (bb) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
lbCfg := &lbConfig{
// Default values as documented in A58.
OOBReportingPeriod: iserviceconfig.Duration(10 * time.Second),
BlackoutPeriod: iserviceconfig.Duration(10 * time.Second),
WeightExpirationPeriod: iserviceconfig.Duration(3 * time.Minute),
WeightUpdatePeriod: iserviceconfig.Duration(time.Second),
ErrorUtilizationPenalty: 1,
}
if err := json.Unmarshal(js, lbCfg); err != nil {
return nil, fmt.Errorf("wrr: unable to unmarshal LB policy config: %s, error: %v", string(js), err)
}
if lbCfg.ErrorUtilizationPenalty < 0 {
return nil, fmt.Errorf("wrr: errorUtilizationPenalty must be non-negative")
}
// For easier comparisons later, ensure the OOB reporting period is unset
// (0s) when OOB reports are disabled.
if !lbCfg.EnableOOBLoadReport {
lbCfg.OOBReportingPeriod = 0
}
// Impose lower bound of 100ms on weightUpdatePeriod.
if !internal.AllowAnyWeightUpdatePeriod && lbCfg.WeightUpdatePeriod < iserviceconfig.Duration(100*time.Millisecond) {
lbCfg.WeightUpdatePeriod = iserviceconfig.Duration(100 * time.Millisecond)
}
return lbCfg, nil
}
func (bb) Name() string {
return Name
}
// wrrBalancer implements the weighted round robin LB policy.
type wrrBalancer struct {
cc balancer.ClientConn
logger *grpclog.PrefixLogger
// The following fields are only accessed on calls into the LB policy, and
// do not need a mutex.
cfg *lbConfig // active config
subConns *resolver.AddressMap // active weightedSubConns mapped by address
scMap map[balancer.SubConn]*weightedSubConn
connectivityState connectivity.State // aggregate state
csEvltr *balancer.ConnectivityStateEvaluator
resolverErr error // the last error reported by the resolver; cleared on successful resolution
connErr error // the last connection error; cleared upon leaving TransientFailure
stopPicker func()
}
func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
b.logger.Infof("UpdateCCS: %v", ccs)
b.resolverErr = nil
cfg, ok := ccs.BalancerConfig.(*lbConfig)
if !ok {
return fmt.Errorf("wrr: received nil or illegal BalancerConfig (type %T): %v", ccs.BalancerConfig, ccs.BalancerConfig)
}
b.cfg = cfg
b.updateAddresses(ccs.ResolverState.Addresses)
if len(ccs.ResolverState.Addresses) == 0 {
b.ResolverError(errors.New("resolver produced zero addresses")) // will call regeneratePicker
return balancer.ErrBadResolverState
}
b.regeneratePicker()
return nil
}
func (b *wrrBalancer) updateAddresses(addrs []resolver.Address) {
addrsSet := resolver.NewAddressMap()
// Loop through new address list and create subconns for any new addresses.
for _, addr := range addrs {
if _, ok := addrsSet.Get(addr); ok {
// Redundant address; skip.
continue
}
addrsSet.Set(addr, nil)
var wsc *weightedSubConn
wsci, ok := b.subConns.Get(addr)
if ok {
wsc = wsci.(*weightedSubConn)
} else {
// addr is a new address (not existing in b.subConns).
var sc balancer.SubConn
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{
StateListener: func(state balancer.SubConnState) {
b.updateSubConnState(sc, state)
},
})
if err != nil {
b.logger.Warningf("Failed to create new SubConn for address %v: %v", addr, err)
continue
}
wsc = &weightedSubConn{
SubConn: sc,
logger: b.logger,
connectivityState: connectivity.Idle,
// Initially, we set load reports to off, because they are not
// running upon initial weightedSubConn creation.
cfg: &lbConfig{EnableOOBLoadReport: false},
}
b.subConns.Set(addr, wsc)
b.scMap[sc] = wsc
b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
sc.Connect()
}
// Update config for existing weightedSubConn or send update for first
// time to new one. Ensures an OOB listener is running if needed
// (and stops the existing one if applicable).
wsc.updateConfig(b.cfg)
}
// Loop through existing subconns and remove ones that are not in addrs.
for _, addr := range b.subConns.Keys() {
if _, ok := addrsSet.Get(addr); ok {
// Existing address also in new address list; skip.
continue
}
// addr was removed by resolver. Remove.
wsci, _ := b.subConns.Get(addr)
wsc := wsci.(*weightedSubConn)
wsc.SubConn.Shutdown()
b.subConns.Delete(addr)
}
}
func (b *wrrBalancer) ResolverError(err error) {
b.resolverErr = err
if b.subConns.Len() == 0 {
b.connectivityState = connectivity.TransientFailure
}
if b.connectivityState != connectivity.TransientFailure {
// No need to update the picker since no error is being returned.
return
}
b.regeneratePicker()
}
func (b *wrrBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
}
func (b *wrrBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
wsc := b.scMap[sc]
if wsc == nil {
b.logger.Errorf("UpdateSubConnState called with an unknown SubConn: %p, %v", sc, state)
return
}
if b.logger.V(2) {
logger.Infof("UpdateSubConnState(%+v, %+v)", sc, state)
}
cs := state.ConnectivityState
if cs == connectivity.TransientFailure {
// Save error to be reported via picker.
b.connErr = state.ConnectionError
}
if cs == connectivity.Shutdown {
delete(b.scMap, sc)
// The subconn was removed from b.subConns when the address was removed
// in updateAddresses.
}
oldCS := wsc.updateConnectivityState(cs)
b.connectivityState = b.csEvltr.RecordTransition(oldCS, cs)
// Regenerate picker when one of the following happens:
// - this sc entered or left ready
// - the aggregated state of balancer is TransientFailure
// (may need to update error message)
if (cs == connectivity.Ready) != (oldCS == connectivity.Ready) ||
b.connectivityState == connectivity.TransientFailure {
b.regeneratePicker()
}
}
// Close stops the balancer. It cancels any ongoing scheduler updates and
// stops any ORCA listeners.
func (b *wrrBalancer) Close() {
if b.stopPicker != nil {
b.stopPicker()
b.stopPicker = nil
}
for _, wsc := range b.scMap {
// Ensure any lingering OOB watchers are stopped.
wsc.updateConnectivityState(connectivity.Shutdown)
}
}
// ExitIdle is ignored; we always connect to all backends.
func (b *wrrBalancer) ExitIdle() {}
func (b *wrrBalancer) readySubConns() []*weightedSubConn {
var ret []*weightedSubConn
for _, v := range b.subConns.Values() {
wsc := v.(*weightedSubConn)
if wsc.connectivityState == connectivity.Ready {
ret = append(ret, wsc)
}
}
return ret
}
// mergeErrors builds an error from the last connection error and the last
// resolver error. Must only be called if b.connectivityState is
// TransientFailure.
func (b *wrrBalancer) mergeErrors() error {
// connErr must always be non-nil unless there are no SubConns, in which
// case resolverErr must be non-nil.
if b.connErr == nil {
return fmt.Errorf("last resolver error: %v", b.resolverErr)
}
if b.resolverErr == nil {
return fmt.Errorf("last connection error: %v", b.connErr)
}
return fmt.Errorf("last connection error: %v; last resolver error: %v", b.connErr, b.resolverErr)
}
func (b *wrrBalancer) regeneratePicker() {
if b.stopPicker != nil {
b.stopPicker()
b.stopPicker = nil
}
switch b.connectivityState {
case connectivity.TransientFailure:
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: base.NewErrPicker(b.mergeErrors()),
})
return
case connectivity.Connecting, connectivity.Idle:
// Idle could happen very briefly if all subconns are Idle and we've
// asked them to connect but they haven't reported Connecting yet.
// Report the same as Connecting since this is temporary.
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
})
return
case connectivity.Ready:
b.connErr = nil
}
p := &picker{
v: grpcrand.Uint32(), // start the scheduler at a random point
cfg: b.cfg,
subConns: b.readySubConns(),
}
var ctx context.Context
ctx, b.stopPicker = context.WithCancel(context.Background())
p.start(ctx)
b.cc.UpdateState(balancer.State{
ConnectivityState: b.connectivityState,
Picker: p,
})
}
// picker is the WRR policy's picker. It uses live-updating backend weights to
// update the scheduler periodically and ensure picks are routed proportional
// to those weights.
type picker struct {
scheduler unsafe.Pointer // *scheduler; accessed atomically
v uint32 // incrementing value used by the scheduler; accessed atomically
cfg *lbConfig // active config when picker created
subConns []*weightedSubConn // all READY subconns
}
// scWeights returns a slice containing the weights from p.subConns in the same
// order as p.subConns.
func (p *picker) scWeights() []float64 {
ws := make([]float64, len(p.subConns))
now := internal.TimeNow()
for i, wsc := range p.subConns {
ws[i] = wsc.weight(now, time.Duration(p.cfg.WeightExpirationPeriod), time.Duration(p.cfg.BlackoutPeriod))
}
return ws
}
func (p *picker) inc() uint32 {
return atomic.AddUint32(&p.v, 1)
}
func (p *picker) regenerateScheduler() {
s := newScheduler(p.scWeights(), p.inc)
atomic.StorePointer(&p.scheduler, unsafe.Pointer(&s))
}
func (p *picker) start(ctx context.Context) {
p.regenerateScheduler()
if len(p.subConns) == 1 {
// No need to regenerate weights with only one backend.
return
}
go func() {
ticker := time.NewTicker(time.Duration(p.cfg.WeightUpdatePeriod))
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
p.regenerateScheduler()
}
}
}()
}
func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// Read the scheduler atomically. All scheduler operations are threadsafe,
// and if the scheduler is replaced during this usage, we want to use the
// scheduler that was live when the pick started.
sched := *(*scheduler)(atomic.LoadPointer(&p.scheduler))
pickedSC := p.subConns[sched.nextIndex()]
pr := balancer.PickResult{SubConn: pickedSC.SubConn}
if !p.cfg.EnableOOBLoadReport {
pr.Done = func(info balancer.DoneInfo) {
if load, ok := info.ServerLoad.(*v3orcapb.OrcaLoadReport); ok && load != nil {
pickedSC.OnLoadReport(load)
}
}
}
return pr, nil
}
// weightedSubConn is the wrapper of a subconn that holds the subconn and its
// weight (and other parameters relevant to computing the effective weight).
// When needed, it also tracks connectivity state, listens for metrics updates
// by implementing the orca.OOBListener interface and manages that listener.
type weightedSubConn struct {
balancer.SubConn
logger *grpclog.PrefixLogger
// The following fields are only accessed on calls into the LB policy, and
// do not need a mutex.
connectivityState connectivity.State
stopORCAListener func()
// The following fields are accessed asynchronously and are protected by
// mu. Note that mu may not be held when calling into the stopORCAListener
// or when registering a new listener, as those calls require the ORCA
// producer mu which is held when calling the listener, and the listener
// holds mu.
mu sync.Mutex
weightVal float64
nonEmptySince time.Time
lastUpdated time.Time
cfg *lbConfig
}
func (w *weightedSubConn) OnLoadReport(load *v3orcapb.OrcaLoadReport) {
if w.logger.V(2) {
w.logger.Infof("Received load report for subchannel %v: %v", w.SubConn, load)
}
// Update weights of this subchannel according to the reported load
utilization := load.ApplicationUtilization
if utilization == 0 {
utilization = load.CpuUtilization
}
if utilization == 0 || load.RpsFractional == 0 {
if w.logger.V(2) {
w.logger.Infof("Ignoring empty load report for subchannel %v", w.SubConn)
}
return
}
w.mu.Lock()
defer w.mu.Unlock()
errorRate := load.Eps / load.RpsFractional
w.weightVal = load.RpsFractional / (utilization + errorRate*w.cfg.ErrorUtilizationPenalty)
if w.logger.V(2) {
w.logger.Infof("New weight for subchannel %v: %v", w.SubConn, w.weightVal)
}
w.lastUpdated = internal.TimeNow()
if w.nonEmptySince == (time.Time{}) {
w.nonEmptySince = w.lastUpdated
}
}
// updateConfig updates the parameters of the WRR policy and
// stops/starts/restarts the ORCA OOB listener.
func (w *weightedSubConn) updateConfig(cfg *lbConfig) {
w.mu.Lock()
oldCfg := w.cfg
w.cfg = cfg
w.mu.Unlock()
newPeriod := cfg.OOBReportingPeriod
if cfg.EnableOOBLoadReport == oldCfg.EnableOOBLoadReport &&
newPeriod == oldCfg.OOBReportingPeriod {
// Load reporting wasn't enabled before or after, or load reporting was
// enabled before and after, and had the same period. (Note that with
// load reporting disabled, OOBReportingPeriod is always 0.)
return
}
// (Optionally stop and) start the listener to use the new config's
// settings for OOB reporting.
if w.stopORCAListener != nil {
w.stopORCAListener()
}
if !cfg.EnableOOBLoadReport {
w.stopORCAListener = nil
return
}
if w.logger.V(2) {
w.logger.Infof("Registering ORCA listener for %v with interval %v", w.SubConn, newPeriod)
}
opts := orca.OOBListenerOptions{ReportInterval: time.Duration(newPeriod)}
w.stopORCAListener = orca.RegisterOOBListener(w.SubConn, w, opts)
}
func (w *weightedSubConn) updateConnectivityState(cs connectivity.State) connectivity.State {
switch cs {
case connectivity.Idle:
// Always reconnect when idle.
w.SubConn.Connect()
case connectivity.Ready:
// If we transition back to READY state, reset nonEmptySince so that we
// apply the blackout period after we start receiving load data. Note
// that we cannot guarantee that we will never receive lingering
// callbacks for backend metric reports from the previous connection
// after the new connection has been established, but they should be
// masked by new backend metric reports from the new connection by the
// time the blackout period ends.
w.mu.Lock()
w.nonEmptySince = time.Time{}
w.mu.Unlock()
case connectivity.Shutdown:
if w.stopORCAListener != nil {
w.stopORCAListener()
}
}
oldCS := w.connectivityState
if oldCS == connectivity.TransientFailure &&
(cs == connectivity.Connecting || cs == connectivity.Idle) {
// Once a subconn enters TRANSIENT_FAILURE, ignore subsequent IDLE or
// CONNECTING transitions to prevent the aggregated state from being
// always CONNECTING when many backends exist but are all down.
return oldCS
}
w.connectivityState = cs
return oldCS
}
// weight returns the current effective weight of the subconn, taking into
// account the parameters. Returns 0 for blacked out or expired data, which
// will cause the backend weight to be treated as the mean of the weights of
// the other backends.
func (w *weightedSubConn) weight(now time.Time, weightExpirationPeriod, blackoutPeriod time.Duration) float64 {
w.mu.Lock()
defer w.mu.Unlock()
// If the most recent update was longer ago than the expiration period,
// reset nonEmptySince so that we apply the blackout period again if we
// start getting data again in the future, and return 0.
if now.Sub(w.lastUpdated) >= weightExpirationPeriod {
w.nonEmptySince = time.Time{}
return 0
}
// If we don't have at least blackoutPeriod worth of data, return 0.
if blackoutPeriod != 0 && (w.nonEmptySince == (time.Time{}) || now.Sub(w.nonEmptySince) < blackoutPeriod) {
return 0
}
return w.weightVal
}