xds/balancergroup: remove xds dependency from balancergroup (#4955)

This commit is contained in:
Easwar Swaminathan 2021-11-05 09:30:57 -07:00 committed by GitHub
parent 3b94303f37
commit 5841c8c783
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 30 additions and 209 deletions

View File

@ -23,9 +23,6 @@ import (
"sync"
"time"
orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
"google.golang.org/grpc/xds/internal/xdsclient/load"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/cache"
@ -178,7 +175,6 @@ func (sbc *subBalancerWrapper) stopBalancer() {
//
// Updates from ClientConn are forwarded to sub-balancers
// - service config update
// - Not implemented
// - address update
// - subConn state change
// - find the corresponding balancer and forward
@ -199,7 +195,6 @@ type BalancerGroup struct {
cc balancer.ClientConn
buildOpts balancer.BuildOptions
logger *grpclog.PrefixLogger
loadStore load.PerClusterReporter // TODO: delete this, no longer needed. It was used by EDS.
// stateAggregator is where the state/picker updates will be sent to. It's
// provided by the parent balancer, to build a picker with all the
@ -254,15 +249,11 @@ var DefaultSubBalancerCloseTimeout = 15 * time.Minute
// New creates a new BalancerGroup. Note that the BalancerGroup
// needs to be started to work.
//
// TODO(easwars): Pass an options struct instead of N args.
func New(cc balancer.ClientConn, bOpts balancer.BuildOptions, stateAggregator BalancerStateAggregator, loadStore load.PerClusterReporter, logger *grpclog.PrefixLogger) *BalancerGroup {
func New(cc balancer.ClientConn, bOpts balancer.BuildOptions, stateAggregator BalancerStateAggregator, logger *grpclog.PrefixLogger) *BalancerGroup {
return &BalancerGroup{
cc: cc,
buildOpts: bOpts,
logger: logger,
loadStore: loadStore,
cc: cc,
buildOpts: bOpts,
logger: logger,
stateAggregator: stateAggregator,
idToBalancerConfig: make(map[string]*subBalancerWrapper),
@ -467,10 +458,6 @@ func (bg *BalancerGroup) newSubConn(config *subBalancerWrapper, addrs []resolver
// state, then forward to ClientConn.
func (bg *BalancerGroup) updateBalancerState(id string, state balancer.State) {
bg.logger.Infof("Balancer state update from locality %v, new state: %+v", id, state)
if bg.loadStore != nil {
// Only wrap the picker to do load reporting if loadStore was set.
state.Picker = newLoadReportPicker(state.Picker, id, bg.loadStore)
}
// Send new state to the aggregator, without holding the incomingMu.
// incomingMu is to protect all calls to the parent ClientConn, this update
@ -519,53 +506,3 @@ func (bg *BalancerGroup) ExitIdle() {
}
bg.outgoingMu.Unlock()
}
const (
serverLoadCPUName = "cpu_utilization"
serverLoadMemoryName = "mem_utilization"
)
type loadReportPicker struct {
p balancer.Picker
locality string
loadStore load.PerClusterReporter
}
func newLoadReportPicker(p balancer.Picker, id string, loadStore load.PerClusterReporter) *loadReportPicker {
return &loadReportPicker{
p: p,
locality: id,
loadStore: loadStore,
}
}
func (lrp *loadReportPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
res, err := lrp.p.Pick(info)
if err != nil {
return res, err
}
lrp.loadStore.CallStarted(lrp.locality)
oldDone := res.Done
res.Done = func(info balancer.DoneInfo) {
if oldDone != nil {
oldDone(info)
}
lrp.loadStore.CallFinished(lrp.locality, info.Err)
load, ok := info.ServerLoad.(*orcapb.OrcaLoadReport)
if !ok {
return
}
lrp.loadStore.CallServerLoad(lrp.locality, serverLoadCPUName, load.CpuUtilization)
lrp.loadStore.CallServerLoad(lrp.locality, serverLoadMemoryName, load.MemUtilization)
for n, d := range load.RequestCost {
lrp.loadStore.CallServerLoad(lrp.locality, n, d)
}
for n, d := range load.Utilization {
lrp.loadStore.CallServerLoad(lrp.locality, n, d)
}
}
return res, err
}

View File

@ -30,19 +30,17 @@ import (
"testing"
"time"
orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient/load"
)
var (
@ -64,6 +62,14 @@ func init() {
DefaultSubBalancerCloseTimeout = time.Millisecond
}
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
return func() balancer.SubConn {
scst, _ := p.Pick(balancer.PickInfo{})
@ -71,18 +77,18 @@ func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
}
}
func newTestBalancerGroup(t *testing.T, loadStore load.PerClusterReporter) (*testutils.TestClientConn, *weightedaggregator.Aggregator, *BalancerGroup) {
func newTestBalancerGroup(t *testing.T) (*testutils.TestClientConn, *weightedaggregator.Aggregator, *BalancerGroup) {
cc := testutils.NewTestClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(cc, balancer.BuildOptions{}, gator, loadStore, nil)
bg := New(cc, balancer.BuildOptions{}, gator, nil)
bg.Start()
return cc, gator, bg
}
// 1 balancer, 1 backend -> 2 backends -> 1 backend.
func (s) TestBalancerGroup_OneRR_AddRemoveBackend(t *testing.T) {
cc, gator, bg := newTestBalancerGroup(t, nil)
cc, gator, bg := newTestBalancerGroup(t)
// Add one balancer to group.
gator.Add(testBalancerIDs[0], 1)
@ -138,7 +144,7 @@ func (s) TestBalancerGroup_OneRR_AddRemoveBackend(t *testing.T) {
// 2 balancers, each with 1 backend.
func (s) TestBalancerGroup_TwoRR_OneBackend(t *testing.T) {
cc, gator, bg := newTestBalancerGroup(t, nil)
cc, gator, bg := newTestBalancerGroup(t)
// Add two balancers to group and send one resolved address to both
// balancers.
@ -168,7 +174,7 @@ func (s) TestBalancerGroup_TwoRR_OneBackend(t *testing.T) {
// 2 balancers, each with more than 1 backends.
func (s) TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) {
cc, gator, bg := newTestBalancerGroup(t, nil)
cc, gator, bg := newTestBalancerGroup(t)
// Add two balancers to group and send one resolved address to both
// balancers.
@ -253,7 +259,7 @@ func (s) TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) {
// 2 balancers with different weights.
func (s) TestBalancerGroup_TwoRR_DifferentWeight_MoreBackends(t *testing.T) {
cc, gator, bg := newTestBalancerGroup(t, nil)
cc, gator, bg := newTestBalancerGroup(t)
// Add two balancers to group and send two resolved addresses to both
// balancers.
@ -289,7 +295,7 @@ func (s) TestBalancerGroup_TwoRR_DifferentWeight_MoreBackends(t *testing.T) {
// totally 3 balancers, add/remove balancer.
func (s) TestBalancerGroup_ThreeRR_RemoveBalancer(t *testing.T) {
cc, gator, bg := newTestBalancerGroup(t, nil)
cc, gator, bg := newTestBalancerGroup(t)
// Add three balancers to group and send one resolved address to both
// balancers.
@ -356,7 +362,7 @@ func (s) TestBalancerGroup_ThreeRR_RemoveBalancer(t *testing.T) {
// 2 balancers, change balancer weight.
func (s) TestBalancerGroup_TwoRR_ChangeWeight_MoreBackends(t *testing.T) {
cc, gator, bg := newTestBalancerGroup(t, nil)
cc, gator, bg := newTestBalancerGroup(t)
// Add two balancers to group and send two resolved addresses to both
// balancers.
@ -400,95 +406,6 @@ func (s) TestBalancerGroup_TwoRR_ChangeWeight_MoreBackends(t *testing.T) {
}
}
func (s) TestBalancerGroup_LoadReport(t *testing.T) {
loadStore := load.NewStore()
const (
testCluster = "test-cluster"
testEDSService = "test-eds-service"
)
cc, gator, bg := newTestBalancerGroup(t, loadStore.PerCluster(testCluster, testEDSService))
backendToBalancerID := make(map[balancer.SubConn]string)
// Add two balancers to group and send two resolved addresses to both
// balancers.
gator.Add(testBalancerIDs[0], 2)
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
sc1 := <-cc.NewSubConnCh
sc2 := <-cc.NewSubConnCh
backendToBalancerID[sc1] = testBalancerIDs[0]
backendToBalancerID[sc2] = testBalancerIDs[0]
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
sc3 := <-cc.NewSubConnCh
sc4 := <-cc.NewSubConnCh
backendToBalancerID[sc3] = testBalancerIDs[1]
backendToBalancerID[sc4] = testBalancerIDs[1]
// Send state changes for both subconns.
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin on the last picker.
p1 := <-cc.NewPickerCh
// bg1 has a weight of 2, while bg2 has a weight of 1. So, we expect 20 of
// these picks to go to bg1 and 10 of them to bg2. And since there are two
// subConns in each group, we expect the picks to be equally split between
// the subConns. We do not call Done() on picks routed to sc1, so we expect
// these to show up as pending rpcs.
wantStoreData := []*load.Data{{
Cluster: testCluster,
Service: testEDSService,
LocalityStats: map[string]load.LocalityData{
testBalancerIDs[0]: {
RequestStats: load.RequestData{Succeeded: 10, InProgress: 10},
LoadStats: map[string]load.ServerLoadData{
"cpu_utilization": {Count: 10, Sum: 100},
"mem_utilization": {Count: 10, Sum: 50},
"pic": {Count: 10, Sum: 31.4},
"piu": {Count: 10, Sum: 31.4},
},
},
testBalancerIDs[1]: {
RequestStats: load.RequestData{Succeeded: 10},
LoadStats: map[string]load.ServerLoadData{
"cpu_utilization": {Count: 10, Sum: 100},
"mem_utilization": {Count: 10, Sum: 50},
"pic": {Count: 10, Sum: 31.4},
"piu": {Count: 10, Sum: 31.4},
},
},
},
}}
for i := 0; i < 30; i++ {
scst, _ := p1.Pick(balancer.PickInfo{})
if scst.Done != nil && scst.SubConn != sc1 {
scst.Done(balancer.DoneInfo{
ServerLoad: &orcapb.OrcaLoadReport{
CpuUtilization: 10,
MemUtilization: 5,
RequestCost: map[string]float64{"pic": 3.14},
Utilization: map[string]float64{"piu": 3.14},
},
})
}
}
gotStoreData := loadStore.Stats([]string{testCluster})
if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.EquateApprox(0, 0.1), cmpopts.IgnoreFields(load.Data{}, "ReportInterval")); diff != "" {
t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
}
}
// Create a new balancer group, add balancer and backends, but not start.
// - b1, weight 2, backends [0,1]
// - b2, weight 1, backends [2,3]
@ -502,7 +419,7 @@ func (s) TestBalancerGroup_start_close(t *testing.T) {
cc := testutils.NewTestClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(cc, balancer.BuildOptions{}, gator, nil, nil)
bg := New(cc, balancer.BuildOptions{}, gator, nil)
// Add two balancers to group and send two resolved addresses to both
// balancers.
@ -598,7 +515,7 @@ func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
cc := testutils.NewTestClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(cc, balancer.BuildOptions{}, gator, nil, nil)
bg := New(cc, balancer.BuildOptions{}, gator, nil)
gator.Add(testBalancerIDs[0], 2)
bg.Add(testBalancerIDs[0], builder)
@ -614,7 +531,7 @@ func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
// transient_failure, the picks won't fail with transient_failure, and should
// instead wait for the other sub-balancer.
func (s) TestBalancerGroup_InitOneSubBalancerTransientFailure(t *testing.T) {
cc, gator, bg := newTestBalancerGroup(t, nil)
cc, gator, bg := newTestBalancerGroup(t)
// Add two balancers to group and send one resolved address to both
// balancers.
@ -645,7 +562,7 @@ func (s) TestBalancerGroup_InitOneSubBalancerTransientFailure(t *testing.T) {
// connecting, the overall state stays in transient_failure, and all picks
// return transient failure error.
func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *testing.T) {
cc, gator, bg := newTestBalancerGroup(t, nil)
cc, gator, bg := newTestBalancerGroup(t)
// Add two balancers to group and send one resolved address to both
// balancers.
@ -700,7 +617,7 @@ func initBalancerGroupForCachingTest(t *testing.T) (*weightedaggregator.Aggregat
cc := testutils.NewTestClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(cc, balancer.BuildOptions{}, gator, nil, nil)
bg := New(cc, balancer.BuildOptions{}, gator, nil)
// Add two balancers to group and send two resolved addresses to both
// balancers.
@ -993,7 +910,7 @@ func (s) TestBalancerGroupBuildOptions(t *testing.T) {
},
})
cc := testutils.NewTestClientConn(t)
bg := New(cc, bOpts, nil, nil, nil)
bg := New(cc, bOpts, nil, nil)
bg.Start()
// Add the stub balancer build above as a child policy.

View File

@ -1,33 +0,0 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package balancergroup
import (
"testing"
"google.golang.org/grpc/internal/grpctest"
)
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

View File

@ -46,7 +46,7 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal
b.logger = prefixLogger(b)
b.stateAggregator = newBalancerStateAggregator(cc, b.logger)
b.stateAggregator.start()
b.bg = balancergroup.New(cc, opts, b.stateAggregator, nil, b.logger)
b.bg = balancergroup.New(cc, opts, b.stateAggregator, b.logger)
b.bg.Start()
b.logger.Infof("Created")
return b

View File

@ -59,7 +59,7 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
}
b.logger = prefixLogger(b)
b.bg = balancergroup.New(cc, bOpts, b, nil, b.logger)
b.bg = balancergroup.New(cc, bOpts, b, b.logger)
b.bg.Start()
go b.run()
b.logger.Infof("Created")

View File

@ -52,7 +52,7 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
b.logger = prefixLogger(b)
b.stateAggregator = weightedaggregator.New(cc, b.logger, NewRandomWRR)
b.stateAggregator.Start()
b.bg = balancergroup.New(cc, bOpts, b.stateAggregator, nil, b.logger)
b.bg = balancergroup.New(cc, bOpts, b.stateAggregator, b.logger)
b.bg.Start()
b.logger.Infof("Created")
return b