From 5841c8c78316e5b2ab44d37698034b146cb8c091 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 5 Nov 2021 09:30:57 -0700 Subject: [PATCH] xds/balancergroup: remove xds dependency from balancergroup (#4955) --- .../balancer/balancergroup/balancergroup.go | 71 +--------- .../balancergroup/balancergroup_test.go | 129 ++++-------------- .../balancer/balancergroup/testutils_test.go | 33 ----- .../balancer/clustermanager/clustermanager.go | 2 +- xds/internal/balancer/priority/balancer.go | 2 +- .../balancer/weightedtarget/weightedtarget.go | 2 +- 6 files changed, 30 insertions(+), 209 deletions(-) delete mode 100644 xds/internal/balancer/balancergroup/testutils_test.go diff --git a/xds/internal/balancer/balancergroup/balancergroup.go b/xds/internal/balancer/balancergroup/balancergroup.go index 749c6b36e..d516f2153 100644 --- a/xds/internal/balancer/balancergroup/balancergroup.go +++ b/xds/internal/balancer/balancergroup/balancergroup.go @@ -23,9 +23,6 @@ import ( "sync" "time" - orcapb "github.com/cncf/xds/go/xds/data/orca/v3" - "google.golang.org/grpc/xds/internal/xdsclient/load" - "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/cache" @@ -178,7 +175,6 @@ func (sbc *subBalancerWrapper) stopBalancer() { // // Updates from ClientConn are forwarded to sub-balancers // - service config update -// - Not implemented // - address update // - subConn state change // - find the corresponding balancer and forward @@ -199,7 +195,6 @@ type BalancerGroup struct { cc balancer.ClientConn buildOpts balancer.BuildOptions logger *grpclog.PrefixLogger - loadStore load.PerClusterReporter // TODO: delete this, no longer needed. It was used by EDS. // stateAggregator is where the state/picker updates will be sent to. It's // provided by the parent balancer, to build a picker with all the @@ -254,15 +249,11 @@ var DefaultSubBalancerCloseTimeout = 15 * time.Minute // New creates a new BalancerGroup. Note that the BalancerGroup // needs to be started to work. -// -// TODO(easwars): Pass an options struct instead of N args. -func New(cc balancer.ClientConn, bOpts balancer.BuildOptions, stateAggregator BalancerStateAggregator, loadStore load.PerClusterReporter, logger *grpclog.PrefixLogger) *BalancerGroup { +func New(cc balancer.ClientConn, bOpts balancer.BuildOptions, stateAggregator BalancerStateAggregator, logger *grpclog.PrefixLogger) *BalancerGroup { return &BalancerGroup{ - cc: cc, - buildOpts: bOpts, - logger: logger, - loadStore: loadStore, - + cc: cc, + buildOpts: bOpts, + logger: logger, stateAggregator: stateAggregator, idToBalancerConfig: make(map[string]*subBalancerWrapper), @@ -467,10 +458,6 @@ func (bg *BalancerGroup) newSubConn(config *subBalancerWrapper, addrs []resolver // state, then forward to ClientConn. func (bg *BalancerGroup) updateBalancerState(id string, state balancer.State) { bg.logger.Infof("Balancer state update from locality %v, new state: %+v", id, state) - if bg.loadStore != nil { - // Only wrap the picker to do load reporting if loadStore was set. - state.Picker = newLoadReportPicker(state.Picker, id, bg.loadStore) - } // Send new state to the aggregator, without holding the incomingMu. // incomingMu is to protect all calls to the parent ClientConn, this update @@ -519,53 +506,3 @@ func (bg *BalancerGroup) ExitIdle() { } bg.outgoingMu.Unlock() } - -const ( - serverLoadCPUName = "cpu_utilization" - serverLoadMemoryName = "mem_utilization" -) - -type loadReportPicker struct { - p balancer.Picker - - locality string - loadStore load.PerClusterReporter -} - -func newLoadReportPicker(p balancer.Picker, id string, loadStore load.PerClusterReporter) *loadReportPicker { - return &loadReportPicker{ - p: p, - locality: id, - loadStore: loadStore, - } -} - -func (lrp *loadReportPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { - res, err := lrp.p.Pick(info) - if err != nil { - return res, err - } - - lrp.loadStore.CallStarted(lrp.locality) - oldDone := res.Done - res.Done = func(info balancer.DoneInfo) { - if oldDone != nil { - oldDone(info) - } - lrp.loadStore.CallFinished(lrp.locality, info.Err) - - load, ok := info.ServerLoad.(*orcapb.OrcaLoadReport) - if !ok { - return - } - lrp.loadStore.CallServerLoad(lrp.locality, serverLoadCPUName, load.CpuUtilization) - lrp.loadStore.CallServerLoad(lrp.locality, serverLoadMemoryName, load.MemUtilization) - for n, d := range load.RequestCost { - lrp.loadStore.CallServerLoad(lrp.locality, n, d) - } - for n, d := range load.Utilization { - lrp.loadStore.CallServerLoad(lrp.locality, n, d) - } - } - return res, err -} diff --git a/xds/internal/balancer/balancergroup/balancergroup_test.go b/xds/internal/balancer/balancergroup/balancergroup_test.go index 82d0c2dfb..60c74a16b 100644 --- a/xds/internal/balancer/balancergroup/balancergroup_test.go +++ b/xds/internal/balancer/balancergroup/balancergroup_test.go @@ -30,19 +30,17 @@ import ( "testing" "time" - orcapb "github.com/cncf/xds/go/xds/data/orca/v3" "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/balancer/stub" + "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator" "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/xdsclient/load" ) var ( @@ -64,6 +62,14 @@ func init() { DefaultSubBalancerCloseTimeout = time.Millisecond } +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + func subConnFromPicker(p balancer.Picker) func() balancer.SubConn { return func() balancer.SubConn { scst, _ := p.Pick(balancer.PickInfo{}) @@ -71,18 +77,18 @@ func subConnFromPicker(p balancer.Picker) func() balancer.SubConn { } } -func newTestBalancerGroup(t *testing.T, loadStore load.PerClusterReporter) (*testutils.TestClientConn, *weightedaggregator.Aggregator, *BalancerGroup) { +func newTestBalancerGroup(t *testing.T) (*testutils.TestClientConn, *weightedaggregator.Aggregator, *BalancerGroup) { cc := testutils.NewTestClientConn(t) gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR) gator.Start() - bg := New(cc, balancer.BuildOptions{}, gator, loadStore, nil) + bg := New(cc, balancer.BuildOptions{}, gator, nil) bg.Start() return cc, gator, bg } // 1 balancer, 1 backend -> 2 backends -> 1 backend. func (s) TestBalancerGroup_OneRR_AddRemoveBackend(t *testing.T) { - cc, gator, bg := newTestBalancerGroup(t, nil) + cc, gator, bg := newTestBalancerGroup(t) // Add one balancer to group. gator.Add(testBalancerIDs[0], 1) @@ -138,7 +144,7 @@ func (s) TestBalancerGroup_OneRR_AddRemoveBackend(t *testing.T) { // 2 balancers, each with 1 backend. func (s) TestBalancerGroup_TwoRR_OneBackend(t *testing.T) { - cc, gator, bg := newTestBalancerGroup(t, nil) + cc, gator, bg := newTestBalancerGroup(t) // Add two balancers to group and send one resolved address to both // balancers. @@ -168,7 +174,7 @@ func (s) TestBalancerGroup_TwoRR_OneBackend(t *testing.T) { // 2 balancers, each with more than 1 backends. func (s) TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) { - cc, gator, bg := newTestBalancerGroup(t, nil) + cc, gator, bg := newTestBalancerGroup(t) // Add two balancers to group and send one resolved address to both // balancers. @@ -253,7 +259,7 @@ func (s) TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) { // 2 balancers with different weights. func (s) TestBalancerGroup_TwoRR_DifferentWeight_MoreBackends(t *testing.T) { - cc, gator, bg := newTestBalancerGroup(t, nil) + cc, gator, bg := newTestBalancerGroup(t) // Add two balancers to group and send two resolved addresses to both // balancers. @@ -289,7 +295,7 @@ func (s) TestBalancerGroup_TwoRR_DifferentWeight_MoreBackends(t *testing.T) { // totally 3 balancers, add/remove balancer. func (s) TestBalancerGroup_ThreeRR_RemoveBalancer(t *testing.T) { - cc, gator, bg := newTestBalancerGroup(t, nil) + cc, gator, bg := newTestBalancerGroup(t) // Add three balancers to group and send one resolved address to both // balancers. @@ -356,7 +362,7 @@ func (s) TestBalancerGroup_ThreeRR_RemoveBalancer(t *testing.T) { // 2 balancers, change balancer weight. func (s) TestBalancerGroup_TwoRR_ChangeWeight_MoreBackends(t *testing.T) { - cc, gator, bg := newTestBalancerGroup(t, nil) + cc, gator, bg := newTestBalancerGroup(t) // Add two balancers to group and send two resolved addresses to both // balancers. @@ -400,95 +406,6 @@ func (s) TestBalancerGroup_TwoRR_ChangeWeight_MoreBackends(t *testing.T) { } } -func (s) TestBalancerGroup_LoadReport(t *testing.T) { - loadStore := load.NewStore() - const ( - testCluster = "test-cluster" - testEDSService = "test-eds-service" - ) - cc, gator, bg := newTestBalancerGroup(t, loadStore.PerCluster(testCluster, testEDSService)) - - backendToBalancerID := make(map[balancer.SubConn]string) - - // Add two balancers to group and send two resolved addresses to both - // balancers. - gator.Add(testBalancerIDs[0], 2) - bg.Add(testBalancerIDs[0], rrBuilder) - bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}}) - sc1 := <-cc.NewSubConnCh - sc2 := <-cc.NewSubConnCh - backendToBalancerID[sc1] = testBalancerIDs[0] - backendToBalancerID[sc2] = testBalancerIDs[0] - - gator.Add(testBalancerIDs[1], 1) - bg.Add(testBalancerIDs[1], rrBuilder) - bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}}) - sc3 := <-cc.NewSubConnCh - sc4 := <-cc.NewSubConnCh - backendToBalancerID[sc3] = testBalancerIDs[1] - backendToBalancerID[sc4] = testBalancerIDs[1] - - // Send state changes for both subconns. - bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) - bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) - bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready}) - bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready}) - - // Test roundrobin on the last picker. - p1 := <-cc.NewPickerCh - // bg1 has a weight of 2, while bg2 has a weight of 1. So, we expect 20 of - // these picks to go to bg1 and 10 of them to bg2. And since there are two - // subConns in each group, we expect the picks to be equally split between - // the subConns. We do not call Done() on picks routed to sc1, so we expect - // these to show up as pending rpcs. - wantStoreData := []*load.Data{{ - Cluster: testCluster, - Service: testEDSService, - LocalityStats: map[string]load.LocalityData{ - testBalancerIDs[0]: { - RequestStats: load.RequestData{Succeeded: 10, InProgress: 10}, - LoadStats: map[string]load.ServerLoadData{ - "cpu_utilization": {Count: 10, Sum: 100}, - "mem_utilization": {Count: 10, Sum: 50}, - "pic": {Count: 10, Sum: 31.4}, - "piu": {Count: 10, Sum: 31.4}, - }, - }, - testBalancerIDs[1]: { - RequestStats: load.RequestData{Succeeded: 10}, - LoadStats: map[string]load.ServerLoadData{ - "cpu_utilization": {Count: 10, Sum: 100}, - "mem_utilization": {Count: 10, Sum: 50}, - "pic": {Count: 10, Sum: 31.4}, - "piu": {Count: 10, Sum: 31.4}, - }, - }, - }, - }} - for i := 0; i < 30; i++ { - scst, _ := p1.Pick(balancer.PickInfo{}) - if scst.Done != nil && scst.SubConn != sc1 { - scst.Done(balancer.DoneInfo{ - ServerLoad: &orcapb.OrcaLoadReport{ - CpuUtilization: 10, - MemUtilization: 5, - RequestCost: map[string]float64{"pic": 3.14}, - Utilization: map[string]float64{"piu": 3.14}, - }, - }) - } - } - - gotStoreData := loadStore.Stats([]string{testCluster}) - if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.EquateApprox(0, 0.1), cmpopts.IgnoreFields(load.Data{}, "ReportInterval")); diff != "" { - t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff) - } -} - // Create a new balancer group, add balancer and backends, but not start. // - b1, weight 2, backends [0,1] // - b2, weight 1, backends [2,3] @@ -502,7 +419,7 @@ func (s) TestBalancerGroup_start_close(t *testing.T) { cc := testutils.NewTestClientConn(t) gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR) gator.Start() - bg := New(cc, balancer.BuildOptions{}, gator, nil, nil) + bg := New(cc, balancer.BuildOptions{}, gator, nil) // Add two balancers to group and send two resolved addresses to both // balancers. @@ -598,7 +515,7 @@ func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) { cc := testutils.NewTestClientConn(t) gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR) gator.Start() - bg := New(cc, balancer.BuildOptions{}, gator, nil, nil) + bg := New(cc, balancer.BuildOptions{}, gator, nil) gator.Add(testBalancerIDs[0], 2) bg.Add(testBalancerIDs[0], builder) @@ -614,7 +531,7 @@ func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) { // transient_failure, the picks won't fail with transient_failure, and should // instead wait for the other sub-balancer. func (s) TestBalancerGroup_InitOneSubBalancerTransientFailure(t *testing.T) { - cc, gator, bg := newTestBalancerGroup(t, nil) + cc, gator, bg := newTestBalancerGroup(t) // Add two balancers to group and send one resolved address to both // balancers. @@ -645,7 +562,7 @@ func (s) TestBalancerGroup_InitOneSubBalancerTransientFailure(t *testing.T) { // connecting, the overall state stays in transient_failure, and all picks // return transient failure error. func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *testing.T) { - cc, gator, bg := newTestBalancerGroup(t, nil) + cc, gator, bg := newTestBalancerGroup(t) // Add two balancers to group and send one resolved address to both // balancers. @@ -700,7 +617,7 @@ func initBalancerGroupForCachingTest(t *testing.T) (*weightedaggregator.Aggregat cc := testutils.NewTestClientConn(t) gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR) gator.Start() - bg := New(cc, balancer.BuildOptions{}, gator, nil, nil) + bg := New(cc, balancer.BuildOptions{}, gator, nil) // Add two balancers to group and send two resolved addresses to both // balancers. @@ -993,7 +910,7 @@ func (s) TestBalancerGroupBuildOptions(t *testing.T) { }, }) cc := testutils.NewTestClientConn(t) - bg := New(cc, bOpts, nil, nil, nil) + bg := New(cc, bOpts, nil, nil) bg.Start() // Add the stub balancer build above as a child policy. diff --git a/xds/internal/balancer/balancergroup/testutils_test.go b/xds/internal/balancer/balancergroup/testutils_test.go deleted file mode 100644 index 1429fa87b..000000000 --- a/xds/internal/balancer/balancergroup/testutils_test.go +++ /dev/null @@ -1,33 +0,0 @@ -/* - * - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package balancergroup - -import ( - "testing" - - "google.golang.org/grpc/internal/grpctest" -) - -type s struct { - grpctest.Tester -} - -func Test(t *testing.T) { - grpctest.RunSubTests(t, s{}) -} diff --git a/xds/internal/balancer/clustermanager/clustermanager.go b/xds/internal/balancer/clustermanager/clustermanager.go index 318545d79..188b39d46 100644 --- a/xds/internal/balancer/clustermanager/clustermanager.go +++ b/xds/internal/balancer/clustermanager/clustermanager.go @@ -46,7 +46,7 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal b.logger = prefixLogger(b) b.stateAggregator = newBalancerStateAggregator(cc, b.logger) b.stateAggregator.start() - b.bg = balancergroup.New(cc, opts, b.stateAggregator, nil, b.logger) + b.bg = balancergroup.New(cc, opts, b.stateAggregator, b.logger) b.bg.Start() b.logger.Infof("Created") return b diff --git a/xds/internal/balancer/priority/balancer.go b/xds/internal/balancer/priority/balancer.go index 23e8aa775..39053dbc1 100644 --- a/xds/internal/balancer/priority/balancer.go +++ b/xds/internal/balancer/priority/balancer.go @@ -59,7 +59,7 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba } b.logger = prefixLogger(b) - b.bg = balancergroup.New(cc, bOpts, b, nil, b.logger) + b.bg = balancergroup.New(cc, bOpts, b, b.logger) b.bg.Start() go b.run() b.logger.Infof("Created") diff --git a/xds/internal/balancer/weightedtarget/weightedtarget.go b/xds/internal/balancer/weightedtarget/weightedtarget.go index f05e0aca1..d036c9b5b 100644 --- a/xds/internal/balancer/weightedtarget/weightedtarget.go +++ b/xds/internal/balancer/weightedtarget/weightedtarget.go @@ -52,7 +52,7 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba b.logger = prefixLogger(b) b.stateAggregator = weightedaggregator.New(cc, b.logger, NewRandomWRR) b.stateAggregator.Start() - b.bg = balancergroup.New(cc, bOpts, b.stateAggregator, nil, b.logger) + b.bg = balancergroup.New(cc, bOpts, b.stateAggregator, b.logger) b.bg.Start() b.logger.Infof("Created") return b