grpc-go/balancer/leastrequest/leastrequest.go

182 lines
5.3 KiB
Go

/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package leastrequest implements a least request load balancer.
package leastrequest
import (
"encoding/json"
"fmt"
"sync/atomic"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/serviceconfig"
)
// grpcranduint32 is a global to stub out in tests.
var grpcranduint32 = grpcrand.Uint32
// Name is the name of the least request balancer.
const Name = "least_request_experimental"
var logger = grpclog.Component("least-request")
func init() {
balancer.Register(bb{})
}
// LBConfig is the balancer config for least_request_experimental balancer.
type LBConfig struct {
serviceconfig.LoadBalancingConfig `json:"-"`
// ChoiceCount is the number of random SubConns to sample to find the one
// with the fewest outstanding requests. If unset, defaults to 2. If set to
// < 2, the config will be rejected, and if set to > 10, will become 10.
ChoiceCount uint32 `json:"choiceCount,omitempty"`
}
type bb struct{}
func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
lbConfig := &LBConfig{
ChoiceCount: 2,
}
if err := json.Unmarshal(s, lbConfig); err != nil {
return nil, fmt.Errorf("least-request: unable to unmarshal LBConfig: %v", err)
}
// "If `choice_count < 2`, the config will be rejected." - A48
if lbConfig.ChoiceCount < 2 { // sweet
return nil, fmt.Errorf("least-request: lbConfig.choiceCount: %v, must be >= 2", lbConfig.ChoiceCount)
}
// "If a LeastRequestLoadBalancingConfig with a choice_count > 10 is
// received, the least_request_experimental policy will set choice_count =
// 10." - A48
if lbConfig.ChoiceCount > 10 {
lbConfig.ChoiceCount = 10
}
return lbConfig, nil
}
func (bb) Name() string {
return Name
}
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &leastRequestBalancer{scRPCCounts: make(map[balancer.SubConn]*atomic.Int32)}
baseBuilder := base.NewBalancerBuilder(Name, b, base.Config{HealthCheck: true})
b.Balancer = baseBuilder.Build(cc, bOpts)
return b
}
type leastRequestBalancer struct {
// Embeds balancer.Balancer because needs to intercept UpdateClientConnState
// to learn about choiceCount.
balancer.Balancer
choiceCount uint32
scRPCCounts map[balancer.SubConn]*atomic.Int32 // Hold onto RPC counts to keep track for subsequent picker updates.
}
func (lrb *leastRequestBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
lrCfg, ok := s.BalancerConfig.(*LBConfig)
if !ok {
logger.Errorf("least-request: received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig)
return balancer.ErrBadResolverState
}
lrb.choiceCount = lrCfg.ChoiceCount
return lrb.Balancer.UpdateClientConnState(s)
}
type scWithRPCCount struct {
sc balancer.SubConn
numRPCs *atomic.Int32
}
func (lrb *leastRequestBalancer) Build(info base.PickerBuildInfo) balancer.Picker {
logger.Infof("least-request: Build called with info: %v", info)
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
for sc := range lrb.scRPCCounts {
if _, ok := info.ReadySCs[sc]; !ok { // If no longer ready, no more need for the ref to count active RPCs.
delete(lrb.scRPCCounts, sc)
}
}
// Create new refs if needed.
for sc := range info.ReadySCs {
if _, ok := lrb.scRPCCounts[sc]; !ok {
lrb.scRPCCounts[sc] = new(atomic.Int32)
}
}
// Copy refs to counters into picker.
scs := make([]scWithRPCCount, 0, len(info.ReadySCs))
for sc := range info.ReadySCs {
scs = append(scs, scWithRPCCount{
sc: sc,
numRPCs: lrb.scRPCCounts[sc], // guaranteed to be present due to algorithm
})
}
return &picker{
choiceCount: lrb.choiceCount,
subConns: scs,
}
}
type picker struct {
// choiceCount is the number of random SubConns to find the one with
// the least request.
choiceCount uint32
// Built out when receives list of ready RPCs.
subConns []scWithRPCCount
}
func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
var pickedSC *scWithRPCCount
var pickedSCNumRPCs int32
for i := 0; i < int(p.choiceCount); i++ {
index := grpcranduint32() % uint32(len(p.subConns))
sc := p.subConns[index]
n := sc.numRPCs.Load()
if pickedSC == nil || n < pickedSCNumRPCs {
pickedSC = &sc
pickedSCNumRPCs = n
}
}
// "The counter for a subchannel should be atomically incremented by one
// after it has been successfully picked by the picker." - A48
pickedSC.numRPCs.Add(1)
// "the picker should add a callback for atomically decrementing the
// subchannel counter once the RPC finishes (regardless of Status code)." -
// A48.
done := func(balancer.DoneInfo) {
pickedSC.numRPCs.Add(-1)
}
return balancer.PickResult{
SubConn: pickedSC.sc,
Done: done,
}, nil
}