From e24ede593630782a7718aeb27f116446e0284f90 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 24 Jun 2021 16:20:11 -0700 Subject: [PATCH] xds: delete LRS policy and move the functionality to xds_cluster_impl (#4528) - (cluster_resolver) attach locality ID to addresses - (cluster_impl) wrap SubConn - (lrs) delete --- .../balancer/clusterimpl/balancer_test.go | 125 +++++++++ .../balancer/clusterimpl/clusterimpl.go | 93 ++++++- xds/internal/balancer/clusterimpl/picker.go | 78 +++++- .../balancerconfig/configbuilder.go | 27 +- .../balancerconfig/configbuilder_test.go | 215 +++++----------- xds/internal/balancer/lrs/balancer.go | 239 ------------------ xds/internal/balancer/lrs/balancer_test.go | 143 ----------- xds/internal/balancer/lrs/config.go | 53 ---- xds/internal/balancer/lrs/config_test.go | 114 --------- xds/internal/balancer/lrs/logging.go | 34 --- xds/internal/balancer/lrs/picker.go | 85 ------- .../weightedtarget_config_test.go | 26 +- xds/internal/internal.go | 18 ++ 13 files changed, 383 insertions(+), 867 deletions(-) delete mode 100644 xds/internal/balancer/lrs/balancer.go delete mode 100644 xds/internal/balancer/lrs/balancer_test.go delete mode 100644 xds/internal/balancer/lrs/config.go delete mode 100644 xds/internal/balancer/lrs/config_test.go delete mode 100644 xds/internal/balancer/lrs/logging.go delete mode 100644 xds/internal/balancer/lrs/picker.go diff --git a/xds/internal/balancer/clusterimpl/balancer_test.go b/xds/internal/balancer/clusterimpl/balancer_test.go index 3cb34200b..09194a54d 100644 --- a/xds/internal/balancer/clusterimpl/balancer_test.go +++ b/xds/internal/balancer/clusterimpl/balancer_test.go @@ -22,6 +22,7 @@ package clusterimpl import ( "context" + "fmt" "strings" "testing" "time" @@ -34,6 +35,7 @@ import ( "google.golang.org/grpc/internal" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/resolver" + xdsinternal "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/testutils/fakeclient" "google.golang.org/grpc/xds/internal/xdsclient" @@ -158,6 +160,9 @@ func TestDropByCategory(t *testing.T) { Service: testServiceName, TotalDrops: dropCount, Drops: map[string]uint64{dropReason: dropCount}, + LocalityStats: map[string]load.LocalityData{ + assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: rpcCount - dropCount}}, + }, }} gotStatsData0 := loadStore.Stats([]string{testClusterName}) @@ -213,6 +218,9 @@ func TestDropByCategory(t *testing.T) { Service: testServiceName, TotalDrops: dropCount2, Drops: map[string]uint64{dropReason2: dropCount2}, + LocalityStats: map[string]load.LocalityData{ + assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: rpcCount - dropCount2}}, + }, }} gotStatsData1 := loadStore.Stats([]string{testClusterName}) @@ -320,6 +328,9 @@ func TestDropCircuitBreaking(t *testing.T) { Cluster: testClusterName, Service: testServiceName, TotalDrops: uint64(maxRequest), + LocalityStats: map[string]load.LocalityData{ + assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: uint64(rpcCount - maxRequest + 50)}}, + }, }} gotStatsData0 := loadStore.Stats([]string{testClusterName}) @@ -533,3 +544,117 @@ func TestReResolution(t *testing.T) { t.Fatalf("timeout waiting for ResolveNow()") } } + +func TestLoadReporting(t *testing.T) { + var testLocality = xdsinternal.LocalityID{ + Region: "test-region", + Zone: "test-zone", + SubZone: "test-sub-zone", + } + + xdsC := fakeclient.NewClient() + defer xdsC.Close() + + builder := balancer.Get(Name) + cc := testutils.NewTestClientConn(t) + b := builder.Build(cc, balancer.BuildOptions{}) + defer b.Close() + + addrs := make([]resolver.Address, len(testBackendAddrs)) + for i, a := range testBackendAddrs { + addrs[i] = xdsinternal.SetLocalityID(a, testLocality) + } + if err := b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC), + BalancerConfig: &LBConfig{ + Cluster: testClusterName, + EDSServiceName: testServiceName, + LoadReportingServerName: newString(testLRSServerName), + // Locality: testLocality, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: roundrobin.Name, + }, + }, + }); err != nil { + t.Fatalf("unexpected error from UpdateClientConnState: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + got, err := xdsC.WaitForReportLoad(ctx) + if err != nil { + t.Fatalf("xdsClient.ReportLoad failed with error: %v", err) + } + if got.Server != testLRSServerName { + t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName) + } + + sc1 := <-cc.NewSubConnCh + b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + // This should get the connecting picker. + p0 := <-cc.NewPickerCh + for i := 0; i < 10; i++ { + _, err := p0.Pick(balancer.PickInfo{}) + if err != balancer.ErrNoSubConnAvailable { + t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable) + } + } + + b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + // Test pick with one backend. + p1 := <-cc.NewPickerCh + const successCount = 5 + for i := 0; i < successCount; i++ { + gotSCSt, err := p1.Pick(balancer.PickInfo{}) + if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) + } + gotSCSt.Done(balancer.DoneInfo{}) + } + const errorCount = 5 + for i := 0; i < errorCount; i++ { + gotSCSt, err := p1.Pick(balancer.PickInfo{}) + if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) + } + gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")}) + } + + // Dump load data from the store and compare with expected counts. + loadStore := xdsC.LoadStore() + if loadStore == nil { + t.Fatal("loadStore is nil in xdsClient") + } + sds := loadStore.Stats([]string{testClusterName}) + if len(sds) == 0 { + t.Fatalf("loads for cluster %v not found in store", testClusterName) + } + sd := sds[0] + if sd.Cluster != testClusterName || sd.Service != testServiceName { + t.Fatalf("got unexpected load for %q, %q, want %q, %q", sd.Cluster, sd.Service, testClusterName, testServiceName) + } + testLocalityJSON, _ := testLocality.ToString() + localityData, ok := sd.LocalityStats[testLocalityJSON] + if !ok { + t.Fatalf("loads for %v not found in store", testLocality) + } + reqStats := localityData.RequestStats + if reqStats.Succeeded != successCount { + t.Errorf("got succeeded %v, want %v", reqStats.Succeeded, successCount) + } + if reqStats.Errored != errorCount { + t.Errorf("got errord %v, want %v", reqStats.Errored, errorCount) + } + if reqStats.InProgress != 0 { + t.Errorf("got inProgress %v, want %v", reqStats.InProgress, 0) + } +} + +func assertString(f func() (string, error)) string { + s, err := f() + if err != nil { + panic(err.Error()) + } + return s +} diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index 64b175d3c..32c97757d 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -27,6 +27,7 @@ import ( "encoding/json" "fmt" "sync" + "sync/atomic" "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" @@ -37,6 +38,7 @@ import ( "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" + xdsinternal "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/balancer/loadstore" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/load" @@ -61,6 +63,7 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba closed: grpcsync.NewEvent(), done: grpcsync.NewEvent(), loadWrapper: loadstore.NewWrapper(), + scWrappers: make(map[balancer.SubConn]*scWrapper), pickerUpdateCh: buffer.NewUnbounded(), requestCountMax: defaultRequestCountMax, } @@ -107,6 +110,18 @@ type clusterImplBalancer struct { clusterNameMu sync.Mutex clusterName string + scWrappersMu sync.Mutex + // The SubConns passed to the child policy are wrapped in a wrapper, to keep + // locality ID. But when the parent ClientConn sends updates, it's going to + // give the original SubConn, not the wrapper. But the child policies only + // know about the wrapper, so when forwarding SubConn updates, they must be + // sent for the wrappers. + // + // This keeps a map from original SubConn to wrapper, so that when + // forwarding the SubConn state update, the child policy will get the + // wrappers. + scWrappers map[balancer.SubConn]*scWrapper + // childState/drops/requestCounter keeps the state used by the most recently // generated picker. All fields can only be accessed in run(). And run() is // the only goroutine that sends picker to the parent ClientConn. All @@ -267,6 +282,15 @@ func (b *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer b.ClientConn.ResolveNow(resolver.ResolveNowOptions{}) } + b.scWrappersMu.Lock() + if scw, ok := b.scWrappers[sc]; ok { + sc = scw + if s.ConnectivityState == connectivity.Shutdown { + // Remove this SubConn from the map on Shutdown. + delete(b.scWrappers, scw.SubConn) + } + } + b.scWrappersMu.Unlock() if b.childLB != nil { b.childLB.UpdateSubConnState(sc, s) } @@ -304,20 +328,83 @@ func (b *clusterImplBalancer) getClusterName() string { return b.clusterName } +// scWrapper is a wrapper of SubConn with locality ID. The locality ID can be +// retrieved from the addresses when creating SubConn. +// +// All SubConns passed to the child policies are wrapped in this, so that the +// picker can get the localityID from the picked SubConn, and do load reporting. +// +// After wrapping, all SubConns to and from the parent ClientConn (e.g. for +// SubConn state update, update/remove SubConn) must be the original SubConns. +// All SubConns to and from the child policy (NewSubConn, forwarding SubConn +// state update) must be the wrapper. The balancer keeps a map from the original +// SubConn to the wrapper for this purpose. +type scWrapper struct { + balancer.SubConn + // locality needs to be atomic because it can be updated while being read by + // the picker. + locality atomic.Value // type xdsinternal.LocalityID +} + +func (scw *scWrapper) updateLocalityID(lID xdsinternal.LocalityID) { + scw.locality.Store(lID) +} + +func (scw *scWrapper) localityID() xdsinternal.LocalityID { + lID, _ := scw.locality.Load().(xdsinternal.LocalityID) + return lID +} + func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { clusterName := b.getClusterName() newAddrs := make([]resolver.Address, len(addrs)) + var lID xdsinternal.LocalityID for i, addr := range addrs { newAddrs[i] = internal.SetXDSHandshakeClusterName(addr, clusterName) + lID = xdsinternal.GetLocalityID(newAddrs[i]) } - return b.ClientConn.NewSubConn(newAddrs, opts) + sc, err := b.ClientConn.NewSubConn(newAddrs, opts) + if err != nil { + return nil, err + } + // Wrap this SubConn in a wrapper, and add it to the map. + b.scWrappersMu.Lock() + ret := &scWrapper{SubConn: sc} + ret.updateLocalityID(lID) + b.scWrappers[sc] = ret + b.scWrappersMu.Unlock() + return ret, nil +} + +func (b *clusterImplBalancer) RemoveSubConn(sc balancer.SubConn) { + scw, ok := sc.(*scWrapper) + if !ok { + b.ClientConn.RemoveSubConn(sc) + return + } + // Remove the original SubConn from the parent ClientConn. + // + // Note that we don't remove this SubConn from the scWrappers map. We will + // need it to forward the final SubConn state Shutdown to the child policy. + // + // This entry is kept in the map until it's state is changes to Shutdown, + // and will be deleted in UpdateSubConnState(). + b.ClientConn.RemoveSubConn(scw.SubConn) } func (b *clusterImplBalancer) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { clusterName := b.getClusterName() newAddrs := make([]resolver.Address, len(addrs)) + var lID xdsinternal.LocalityID for i, addr := range addrs { newAddrs[i] = internal.SetXDSHandshakeClusterName(addr, clusterName) + lID = xdsinternal.GetLocalityID(newAddrs[i]) + } + if scw, ok := sc.(*scWrapper); ok { + scw.updateLocalityID(lID) + // Need to get the original SubConn from the wrapper before calling + // parent ClientConn. + sc = scw.SubConn } b.ClientConn.UpdateAddresses(sc, newAddrs) } @@ -388,7 +475,7 @@ func (b *clusterImplBalancer) run() { b.childState = u b.ClientConn.UpdateState(balancer.State{ ConnectivityState: b.childState.ConnectivityState, - Picker: newDropPicker(b.childState, &dropConfigs{ + Picker: newPicker(b.childState, &dropConfigs{ drops: b.drops, requestCounter: b.requestCounter, requestCountMax: b.requestCountMax, @@ -399,7 +486,7 @@ func (b *clusterImplBalancer) run() { if dc != nil && b.childState.Picker != nil { b.ClientConn.UpdateState(balancer.State{ ConnectivityState: b.childState.ConnectivityState, - Picker: newDropPicker(b.childState, dc, b.loadWrapper), + Picker: newPicker(b.childState, dc, b.loadWrapper), }) } } diff --git a/xds/internal/balancer/clusterimpl/picker.go b/xds/internal/balancer/clusterimpl/picker.go index c2693258e..db29c550b 100644 --- a/xds/internal/balancer/clusterimpl/picker.go +++ b/xds/internal/balancer/clusterimpl/picker.go @@ -19,6 +19,7 @@ package clusterimpl import ( + orcapb "github.com/cncf/udpa/go/udpa/data/orca/v1" "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" @@ -66,12 +67,21 @@ func (d *dropper) drop() (ret bool) { return d.w.Next().(bool) } +const ( + serverLoadCPUName = "cpu_utilization" + serverLoadMemoryName = "mem_utilization" +) + // loadReporter wraps the methods from the loadStore that are used here. type loadReporter interface { + CallStarted(locality string) + CallFinished(locality string, err error) + CallServerLoad(locality, name string, val float64) CallDropped(locality string) } -type dropPicker struct { +// Picker implements RPC drop, circuit breaking drop and load reporting. +type picker struct { drops []*dropper s balancer.State loadStore loadReporter @@ -79,8 +89,8 @@ type dropPicker struct { countMax uint32 } -func newDropPicker(s balancer.State, config *dropConfigs, loadStore load.PerClusterReporter) *dropPicker { - return &dropPicker{ +func newPicker(s balancer.State, config *dropConfigs, loadStore load.PerClusterReporter) *picker { + return &picker{ drops: config.drops, s: s, loadStore: loadStore, @@ -89,13 +99,14 @@ func newDropPicker(s balancer.State, config *dropConfigs, loadStore load.PerClus } } -func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { +func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { // Don't drop unless the inner picker is READY. Similar to // https://github.com/grpc/grpc-go/issues/2622. if d.s.ConnectivityState != connectivity.Ready { return d.s.Picker.Pick(info) } + // Check if this RPC should be dropped by category. for _, dp := range d.drops { if dp.drop() { if d.loadStore != nil { @@ -105,6 +116,7 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { } } + // Check if this RPC should be dropped by circuit breaking. if d.counter != nil { if err := d.counter.StartRequest(d.countMax); err != nil { // Drops by circuit breaking are reported with empty category. They @@ -114,11 +126,58 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { } return balancer.PickResult{}, status.Errorf(codes.Unavailable, err.Error()) } - pr, err := d.s.Picker.Pick(info) - if err != nil { - d.counter.EndRequest() - return pr, err + } + + var lIDStr string + pr, err := d.s.Picker.Pick(info) + if scw, ok := pr.SubConn.(*scWrapper); ok { + // This OK check also covers the case err!=nil, because SubConn will be + // nil. + pr.SubConn = scw.SubConn + var e error + // If locality ID isn't found in the wrapper, an empty locality ID will + // be used. + lIDStr, e = scw.localityID().ToString() + if e != nil { + logger.Infof("failed to marshal LocalityID: %#v, loads won't be reported", scw.localityID()) } + } + + if err != nil { + if d.counter != nil { + // Release one request count if this pick fails. + d.counter.EndRequest() + } + return pr, err + } + + if d.loadStore != nil { + d.loadStore.CallStarted(lIDStr) + oldDone := pr.Done + pr.Done = func(info balancer.DoneInfo) { + if oldDone != nil { + oldDone(info) + } + d.loadStore.CallFinished(lIDStr, info.Err) + + load, ok := info.ServerLoad.(*orcapb.OrcaLoadReport) + if !ok { + return + } + d.loadStore.CallServerLoad(lIDStr, serverLoadCPUName, load.CpuUtilization) + d.loadStore.CallServerLoad(lIDStr, serverLoadMemoryName, load.MemUtilization) + for n, c := range load.RequestCost { + d.loadStore.CallServerLoad(lIDStr, n, c) + } + for n, c := range load.Utilization { + d.loadStore.CallServerLoad(lIDStr, n, c) + } + } + } + + if d.counter != nil { + // Update Done() so that when the RPC finishes, the request count will + // be released. oldDone := pr.Done pr.Done = func(doneInfo balancer.DoneInfo) { d.counter.EndRequest() @@ -126,8 +185,7 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { oldDone(doneInfo) } } - return pr, err } - return d.s.Picker.Pick(info) + return pr, err } diff --git a/xds/internal/balancer/clusterresolver/balancerconfig/configbuilder.go b/xds/internal/balancer/clusterresolver/balancerconfig/configbuilder.go index c75ecddd7..4f96cc61f 100644 --- a/xds/internal/balancer/clusterresolver/balancerconfig/configbuilder.go +++ b/xds/internal/balancer/clusterresolver/balancerconfig/configbuilder.go @@ -38,8 +38,8 @@ import ( "google.golang.org/grpc/internal/hierarchy" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/resolver" + "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/balancer/clusterimpl" - "google.golang.org/grpc/xds/internal/balancer/lrs" "google.golang.org/grpc/xds/internal/balancer/priority" "google.golang.org/grpc/xds/internal/balancer/weightedtarget" "google.golang.org/grpc/xds/internal/xdsclient" @@ -181,7 +181,7 @@ func buildClusterImplConfigForEDS(parentPriority int, edsResp xdsclient.Endpoint // Prepend parent priority to the priority names, to avoid duplicates. pName := fmt.Sprintf("priority-%v-%v", parentPriority, priorityName) retNames = append(retNames, pName) - wtConfig, addrs := localitiesToWeightedTarget(priorityLocalities, pName, endpointPickingPolicy, mechanism.LoadReportingServerName, mechanism.Cluster, mechanism.EDSServiceName) + wtConfig, addrs := localitiesToWeightedTarget(priorityLocalities, pName, endpointPickingPolicy, mechanism.Cluster, mechanism.EDSServiceName) retConfigs[pName] = &clusterimpl.LBConfig{ Cluster: mechanism.Cluster, EDSServiceName: mechanism.EDSServiceName, @@ -249,7 +249,7 @@ func dedupSortedIntSlice(a []int) []int { // // The addresses have path hierarchy set to [priority-name, locality-name], so // priority and weighted target know which child policy they are for. -func localitiesToWeightedTarget(localities []xdsclient.Locality, priorityName string, childPolicy *internalserviceconfig.BalancerConfig, lrsServer *string, cluster, edsService string) (*weightedtarget.LBConfig, []resolver.Address) { +func localitiesToWeightedTarget(localities []xdsclient.Locality, priorityName string, childPolicy *internalserviceconfig.BalancerConfig, cluster, edsService string) (*weightedtarget.LBConfig, []resolver.Address) { weightedTargets := make(map[string]weightedtarget.Target) var addrs []resolver.Address for _, locality := range localities { @@ -257,25 +257,7 @@ func localitiesToWeightedTarget(localities []xdsclient.Locality, priorityName st if err != nil { localityStr = fmt.Sprintf("%+v", locality.ID) } - - child := childPolicy - // If lrsServer is not set, we can skip this extra layer of the LRS - // policy. - if lrsServer != nil { - localityID := locality.ID - child = &internalserviceconfig.BalancerConfig{ - Name: lrs.Name, - Config: &lrs.LBConfig{ - ClusterName: cluster, - EDSServiceName: edsService, - ChildPolicy: childPolicy, - LoadReportingServerName: *lrsServer, - Locality: &localityID, - }, - } - } - weightedTargets[localityStr] = weightedtarget.Target{Weight: locality.Weight, ChildPolicy: child} - + weightedTargets[localityStr] = weightedtarget.Target{Weight: locality.Weight, ChildPolicy: childPolicy} for _, endpoint := range locality.Endpoints { // Filter out all "unhealthy" endpoints (unknown and healthy are // both considered to be healthy: @@ -290,6 +272,7 @@ func localitiesToWeightedTarget(localities []xdsclient.Locality, priorityName st addr = weightedroundrobin.SetAddrInfo(addr, ai) } addr = hierarchy.Set(addr, []string{priorityName, localityStr}) + addr = internal.SetLocalityID(addr, locality.ID) addrs = append(addrs, addr) } } diff --git a/xds/internal/balancer/clusterresolver/balancerconfig/configbuilder_test.go b/xds/internal/balancer/clusterresolver/balancerconfig/configbuilder_test.go index 389db349b..95ded6019 100644 --- a/xds/internal/balancer/clusterresolver/balancerconfig/configbuilder_test.go +++ b/xds/internal/balancer/clusterresolver/balancerconfig/configbuilder_test.go @@ -35,7 +35,6 @@ import ( "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/balancer/clusterimpl" - "google.golang.org/grpc/xds/internal/balancer/lrs" "google.golang.org/grpc/xds/internal/balancer/priority" "google.golang.org/grpc/xds/internal/balancer/weightedtarget" "google.golang.org/grpc/xds/internal/xdsclient" @@ -228,30 +227,12 @@ func TestBuildPriorityConfig(t *testing.T) { Config: &weightedtarget.LBConfig{ Targets: map[string]weightedtarget.Target{ assertString(testLocalityIDs[0].ToString): { - Weight: 20, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: lrs.Name, - Config: &lrs.LBConfig{ - ClusterName: testClusterName, - EDSServiceName: testEDSServiceName, - LoadReportingServerName: testLRSServer, - Locality: &testLocalityIDs[0], - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - }, + Weight: 20, + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, }, assertString(testLocalityIDs[1].ToString): { - Weight: 80, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: lrs.Name, - Config: &lrs.LBConfig{ - ClusterName: testClusterName, - EDSServiceName: testEDSServiceName, - LoadReportingServerName: testLRSServer, - Locality: &testLocalityIDs[1], - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - }, + Weight: 80, + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, }, }, }, @@ -279,30 +260,12 @@ func TestBuildPriorityConfig(t *testing.T) { Config: &weightedtarget.LBConfig{ Targets: map[string]weightedtarget.Target{ assertString(testLocalityIDs[2].ToString): { - Weight: 20, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: lrs.Name, - Config: &lrs.LBConfig{ - ClusterName: testClusterName, - EDSServiceName: testEDSServiceName, - LoadReportingServerName: testLRSServer, - Locality: &testLocalityIDs[2], - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - }, + Weight: 20, + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, }, assertString(testLocalityIDs[3].ToString): { - Weight: 80, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: lrs.Name, - Config: &lrs.LBConfig{ - ClusterName: testClusterName, - EDSServiceName: testEDSServiceName, - LoadReportingServerName: testLRSServer, - Locality: &testLocalityIDs[3], - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - }, + Weight: 80, + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, }, }, }, @@ -324,16 +287,16 @@ func TestBuildPriorityConfig(t *testing.T) { Priorities: []string{"priority-0-0", "priority-0-1", "priority-1"}, } wantAddrs := []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][0]}, []string{"priority-0-0", assertString(testLocalityIDs[0].ToString)}), - hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][1]}, []string{"priority-0-0", assertString(testLocalityIDs[0].ToString)}), - hierarchy.Set(resolver.Address{Addr: testAddressStrs[1][0]}, []string{"priority-0-0", assertString(testLocalityIDs[1].ToString)}), - hierarchy.Set(resolver.Address{Addr: testAddressStrs[1][1]}, []string{"priority-0-0", assertString(testLocalityIDs[1].ToString)}), - hierarchy.Set(resolver.Address{Addr: testAddressStrs[2][0]}, []string{"priority-0-1", assertString(testLocalityIDs[2].ToString)}), - hierarchy.Set(resolver.Address{Addr: testAddressStrs[2][1]}, []string{"priority-0-1", assertString(testLocalityIDs[2].ToString)}), - hierarchy.Set(resolver.Address{Addr: testAddressStrs[3][0]}, []string{"priority-0-1", assertString(testLocalityIDs[3].ToString)}), - hierarchy.Set(resolver.Address{Addr: testAddressStrs[3][1]}, []string{"priority-0-1", assertString(testLocalityIDs[3].ToString)}), - hierarchy.Set(resolver.Address{Addr: testAddressStrs[4][0]}, []string{"priority-1"}), - hierarchy.Set(resolver.Address{Addr: testAddressStrs[4][1]}, []string{"priority-1"}), + testAddrWithAttrs(testAddressStrs[0][0], nil, "priority-0-0", &testLocalityIDs[0]), + testAddrWithAttrs(testAddressStrs[0][1], nil, "priority-0-0", &testLocalityIDs[0]), + testAddrWithAttrs(testAddressStrs[1][0], nil, "priority-0-0", &testLocalityIDs[1]), + testAddrWithAttrs(testAddressStrs[1][1], nil, "priority-0-0", &testLocalityIDs[1]), + testAddrWithAttrs(testAddressStrs[2][0], nil, "priority-0-1", &testLocalityIDs[2]), + testAddrWithAttrs(testAddressStrs[2][1], nil, "priority-0-1", &testLocalityIDs[2]), + testAddrWithAttrs(testAddressStrs[3][0], nil, "priority-0-1", &testLocalityIDs[3]), + testAddrWithAttrs(testAddressStrs[3][1], nil, "priority-0-1", &testLocalityIDs[3]), + testAddrWithAttrs(testAddressStrs[4][0], nil, "priority-1", nil), + testAddrWithAttrs(testAddressStrs[4][1], nil, "priority-1", nil), } if diff := cmp.Diff(gotConfig, wantConfig); diff != "" { @@ -434,30 +397,12 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) { Config: &weightedtarget.LBConfig{ Targets: map[string]weightedtarget.Target{ assertString(testLocalityIDs[0].ToString): { - Weight: 20, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: lrs.Name, - Config: &lrs.LBConfig{ - ClusterName: testClusterName, - EDSServiceName: testEDSServiceName, - LoadReportingServerName: testLRSServer, - Locality: &testLocalityIDs[0], - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - }, + Weight: 20, + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, }, assertString(testLocalityIDs[1].ToString): { - Weight: 80, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: lrs.Name, - Config: &lrs.LBConfig{ - ClusterName: testClusterName, - EDSServiceName: testEDSServiceName, - LoadReportingServerName: testLRSServer, - Locality: &testLocalityIDs[1], - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - }, + Weight: 80, + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, }, }, }, @@ -479,30 +424,12 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) { Config: &weightedtarget.LBConfig{ Targets: map[string]weightedtarget.Target{ assertString(testLocalityIDs[2].ToString): { - Weight: 20, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: lrs.Name, - Config: &lrs.LBConfig{ - ClusterName: testClusterName, - EDSServiceName: testEDSServiceName, - LoadReportingServerName: testLRSServer, - Locality: &testLocalityIDs[2], - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - }, + Weight: 20, + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, }, assertString(testLocalityIDs[3].ToString): { - Weight: 80, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: lrs.Name, - Config: &lrs.LBConfig{ - ClusterName: testClusterName, - EDSServiceName: testEDSServiceName, - LoadReportingServerName: testLRSServer, - Locality: &testLocalityIDs[3], - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - }, + Weight: 80, + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, }, }, }, @@ -510,14 +437,14 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) { }, } wantAddrs := []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][0]}, []string{"priority-2-0", assertString(testLocalityIDs[0].ToString)}), - hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][1]}, []string{"priority-2-0", assertString(testLocalityIDs[0].ToString)}), - hierarchy.Set(resolver.Address{Addr: testAddressStrs[1][0]}, []string{"priority-2-0", assertString(testLocalityIDs[1].ToString)}), - hierarchy.Set(resolver.Address{Addr: testAddressStrs[1][1]}, []string{"priority-2-0", assertString(testLocalityIDs[1].ToString)}), - hierarchy.Set(resolver.Address{Addr: testAddressStrs[2][0]}, []string{"priority-2-1", assertString(testLocalityIDs[2].ToString)}), - hierarchy.Set(resolver.Address{Addr: testAddressStrs[2][1]}, []string{"priority-2-1", assertString(testLocalityIDs[2].ToString)}), - hierarchy.Set(resolver.Address{Addr: testAddressStrs[3][0]}, []string{"priority-2-1", assertString(testLocalityIDs[3].ToString)}), - hierarchy.Set(resolver.Address{Addr: testAddressStrs[3][1]}, []string{"priority-2-1", assertString(testLocalityIDs[3].ToString)}), + testAddrWithAttrs(testAddressStrs[0][0], nil, "priority-2-0", &testLocalityIDs[0]), + testAddrWithAttrs(testAddressStrs[0][1], nil, "priority-2-0", &testLocalityIDs[0]), + testAddrWithAttrs(testAddressStrs[1][0], nil, "priority-2-0", &testLocalityIDs[1]), + testAddrWithAttrs(testAddressStrs[1][1], nil, "priority-2-0", &testLocalityIDs[1]), + testAddrWithAttrs(testAddressStrs[2][0], nil, "priority-2-1", &testLocalityIDs[2]), + testAddrWithAttrs(testAddressStrs[2][1], nil, "priority-2-1", &testLocalityIDs[2]), + testAddrWithAttrs(testAddressStrs[3][0], nil, "priority-2-1", &testLocalityIDs[3]), + testAddrWithAttrs(testAddressStrs[3][1], nil, "priority-2-1", &testLocalityIDs[3]), } if diff := cmp.Diff(gotNames, wantNames); diff != "" { @@ -674,38 +601,20 @@ func TestLocalitiesToWeightedTarget(t *testing.T) { wantConfig: &weightedtarget.LBConfig{ Targets: map[string]weightedtarget.Target{ assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString): { - Weight: 20, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: lrs.Name, - Config: &lrs.LBConfig{ - ClusterName: "test-cluster", - EDSServiceName: "test-eds-service", - LoadReportingServerName: "test-lrs-server", - Locality: &internal.LocalityID{Zone: "test-zone-1"}, - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - }, + Weight: 20, + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, }, assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString): { - Weight: 80, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: lrs.Name, - Config: &lrs.LBConfig{ - ClusterName: "test-cluster", - EDSServiceName: "test-eds-service", - LoadReportingServerName: "test-lrs-server", - Locality: &internal.LocalityID{Zone: "test-zone-2"}, - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, - }, - }, + Weight: 80, + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}, }, }, }, wantAddrs: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: "addr-1-1"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}), - hierarchy.Set(resolver.Address{Addr: "addr-1-2"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}), - hierarchy.Set(resolver.Address{Addr: "addr-2-1"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}), - hierarchy.Set(resolver.Address{Addr: "addr-2-2"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}), + testAddrWithAttrs("addr-1-1", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), + testAddrWithAttrs("addr-1-2", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), + testAddrWithAttrs("addr-2-1", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), + testAddrWithAttrs("addr-2-2", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), }, }, { @@ -748,10 +657,10 @@ func TestLocalitiesToWeightedTarget(t *testing.T) { }, }, wantAddrs: []resolver.Address{ - hierarchy.Set(resolver.Address{Addr: "addr-1-1"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}), - hierarchy.Set(resolver.Address{Addr: "addr-1-2"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}), - hierarchy.Set(resolver.Address{Addr: "addr-2-1"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}), - hierarchy.Set(resolver.Address{Addr: "addr-2-2"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}), + testAddrWithAttrs("addr-1-1", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), + testAddrWithAttrs("addr-1-2", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), + testAddrWithAttrs("addr-2-1", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), + testAddrWithAttrs("addr-2-2", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), }, }, { @@ -794,24 +703,16 @@ func TestLocalitiesToWeightedTarget(t *testing.T) { }, }, wantAddrs: []resolver.Address{ - hierarchy.Set( - weightedroundrobin.SetAddrInfo(resolver.Address{Addr: "addr-1-1"}, weightedroundrobin.AddrInfo{Weight: 90}), - []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}), - hierarchy.Set( - weightedroundrobin.SetAddrInfo(resolver.Address{Addr: "addr-1-2"}, weightedroundrobin.AddrInfo{Weight: 10}), - []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}), - hierarchy.Set( - weightedroundrobin.SetAddrInfo(resolver.Address{Addr: "addr-2-1"}, weightedroundrobin.AddrInfo{Weight: 90}), - []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}), - hierarchy.Set( - weightedroundrobin.SetAddrInfo(resolver.Address{Addr: "addr-2-2"}, weightedroundrobin.AddrInfo{Weight: 10}), - []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}), + testAddrWithAttrs("addr-1-1", newUint32(90), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), + testAddrWithAttrs("addr-1-2", newUint32(10), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}), + testAddrWithAttrs("addr-2-1", newUint32(90), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), + testAddrWithAttrs("addr-2-2", newUint32(10), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}), }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, got1 := localitiesToWeightedTarget(tt.localities, tt.priorityName, tt.childPolicy, tt.lrsServer, tt.cluster, tt.edsService) + got, got1 := localitiesToWeightedTarget(tt.localities, tt.priorityName, tt.childPolicy, tt.cluster, tt.edsService) if diff := cmp.Diff(got, tt.wantConfig); diff != "" { t.Errorf("localitiesToWeightedTarget() diff (-got +want) %v", diff) } @@ -837,3 +738,17 @@ func assertString(f func() (string, error)) string { } return s } + +func testAddrWithAttrs(addrStr string, weight *uint32, priority string, lID *internal.LocalityID) resolver.Address { + addr := resolver.Address{Addr: addrStr} + if weight != nil { + addr = weightedroundrobin.SetAddrInfo(addr, weightedroundrobin.AddrInfo{Weight: *weight}) + } + path := []string{priority} + if lID != nil { + path = append(path, assertString(lID.ToString)) + addr = internal.SetLocalityID(addr, *lID) + } + addr = hierarchy.Set(addr, path) + return addr +} diff --git a/xds/internal/balancer/lrs/balancer.go b/xds/internal/balancer/lrs/balancer.go deleted file mode 100644 index ed7fb38c8..000000000 --- a/xds/internal/balancer/lrs/balancer.go +++ /dev/null @@ -1,239 +0,0 @@ -/* - * - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -// Package lrs implements load reporting balancer for xds. -package lrs - -import ( - "encoding/json" - "fmt" - - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/internal/grpclog" - "google.golang.org/grpc/internal/pretty" - "google.golang.org/grpc/serviceconfig" - "google.golang.org/grpc/xds/internal/balancer/loadstore" - "google.golang.org/grpc/xds/internal/xdsclient" - "google.golang.org/grpc/xds/internal/xdsclient/load" -) - -func init() { - balancer.Register(bb{}) -} - -// Name is the name of the LRS balancer. -const Name = "lrs_experimental" - -type bb struct{} - -func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { - b := &lrsBalancer{ - cc: cc, - buildOpts: opts, - } - b.logger = prefixLogger(b) - b.logger.Infof("Created") - return b -} - -func (bb) Name() string { - return Name -} - -func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { - return parseConfig(c) -} - -type lrsBalancer struct { - cc balancer.ClientConn - buildOpts balancer.BuildOptions - - logger *grpclog.PrefixLogger - xdsClient *xdsClientWrapper - - config *LBConfig - lb balancer.Balancer // The sub balancer. -} - -func (b *lrsBalancer) UpdateClientConnState(s balancer.ClientConnState) error { - b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(s.BalancerConfig)) - newConfig, ok := s.BalancerConfig.(*LBConfig) - if !ok { - return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig) - } - - if b.xdsClient == nil { - c := xdsclient.FromResolverState(s.ResolverState) - if c == nil { - return balancer.ErrBadResolverState - } - b.xdsClient = newXDSClientWrapper(c) - } - - // Update load reporting config or xds client. This needs to be done before - // updating the child policy because we need the loadStore from the updated - // client to be passed to the ccWrapper. - if err := b.xdsClient.update(newConfig); err != nil { - return err - } - - // If child policy is a different type, recreate the sub-balancer. - if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name { - bb := balancer.Get(newConfig.ChildPolicy.Name) - if bb == nil { - return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name) - } - if b.lb != nil { - b.lb.Close() - } - lidJSON, err := newConfig.Locality.ToString() - if err != nil { - return fmt.Errorf("failed to marshal LocalityID: %#v", newConfig.Locality) - } - ccWrapper := newCCWrapper(b.cc, b.xdsClient.loadStore(), lidJSON) - b.lb = bb.Build(ccWrapper, b.buildOpts) - } - b.config = newConfig - - // Addresses and sub-balancer config are sent to sub-balancer. - return b.lb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: s.ResolverState, - BalancerConfig: b.config.ChildPolicy.Config, - }) -} - -func (b *lrsBalancer) ResolverError(err error) { - if b.lb != nil { - b.lb.ResolverError(err) - } -} - -func (b *lrsBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { - if b.lb != nil { - b.lb.UpdateSubConnState(sc, s) - } -} - -func (b *lrsBalancer) Close() { - if b.lb != nil { - b.lb.Close() - b.lb = nil - } - b.xdsClient.close() -} - -type ccWrapper struct { - balancer.ClientConn - loadStore load.PerClusterReporter - localityIDJSON string -} - -func newCCWrapper(cc balancer.ClientConn, loadStore load.PerClusterReporter, localityIDJSON string) *ccWrapper { - return &ccWrapper{ - ClientConn: cc, - loadStore: loadStore, - localityIDJSON: localityIDJSON, - } -} - -func (ccw *ccWrapper) UpdateState(s balancer.State) { - s.Picker = newLoadReportPicker(s.Picker, ccw.localityIDJSON, ccw.loadStore) - ccw.ClientConn.UpdateState(s) -} - -type xdsClientWrapper struct { - c xdsclient.XDSClient - cancelLoadReport func() - clusterName string - edsServiceName string - lrsServerName *string - // loadWrapper is a wrapper with loadOriginal, with clusterName and - // edsServiceName. It's used children to report loads. - loadWrapper *loadstore.Wrapper -} - -func newXDSClientWrapper(c xdsclient.XDSClient) *xdsClientWrapper { - return &xdsClientWrapper{ - c: c, - loadWrapper: loadstore.NewWrapper(), - } -} - -// update checks the config and xdsclient, and decides whether it needs to -// restart the load reporting stream. -func (w *xdsClientWrapper) update(newConfig *LBConfig) error { - var ( - restartLoadReport bool - updateLoadClusterAndService bool - ) - - // ClusterName is different, restart. ClusterName is from ClusterName and - // EDSServiceName. - if w.clusterName != newConfig.ClusterName { - updateLoadClusterAndService = true - w.clusterName = newConfig.ClusterName - } - if w.edsServiceName != newConfig.EDSServiceName { - updateLoadClusterAndService = true - w.edsServiceName = newConfig.EDSServiceName - } - - if updateLoadClusterAndService { - // This updates the clusterName and serviceName that will reported for the - // loads. The update here is too early, the perfect timing is when the - // picker is updated with the new connection. But from this balancer's point - // of view, it's impossible to tell. - // - // On the other hand, this will almost never happen. Each LRS policy - // shouldn't get updated config. The parent should do a graceful switch when - // the clusterName or serviceName is changed. - w.loadWrapper.UpdateClusterAndService(w.clusterName, w.edsServiceName) - } - - if w.lrsServerName == nil || *w.lrsServerName != newConfig.LoadReportingServerName { - // LoadReportingServerName is different, load should be report to a - // different server, restart. - restartLoadReport = true - w.lrsServerName = &newConfig.LoadReportingServerName - } - - if restartLoadReport { - if w.cancelLoadReport != nil { - w.cancelLoadReport() - w.cancelLoadReport = nil - } - var loadStore *load.Store - if w.c != nil { - loadStore, w.cancelLoadReport = w.c.ReportLoad(*w.lrsServerName) - } - w.loadWrapper.UpdateLoadStore(loadStore) - } - - return nil -} - -func (w *xdsClientWrapper) loadStore() load.PerClusterReporter { - return w.loadWrapper -} - -func (w *xdsClientWrapper) close() { - if w.cancelLoadReport != nil { - w.cancelLoadReport() - w.cancelLoadReport = nil - } -} diff --git a/xds/internal/balancer/lrs/balancer_test.go b/xds/internal/balancer/lrs/balancer_test.go deleted file mode 100644 index c0ec9cc41..000000000 --- a/xds/internal/balancer/lrs/balancer_test.go +++ /dev/null @@ -1,143 +0,0 @@ -// +build go1.12 - -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package lrs - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/google/go-cmp/cmp" - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/balancer/roundrobin" - "google.golang.org/grpc/connectivity" - internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" - "google.golang.org/grpc/resolver" - xdsinternal "google.golang.org/grpc/xds/internal" - "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/testutils/fakeclient" - "google.golang.org/grpc/xds/internal/xdsclient" -) - -const defaultTestTimeout = 1 * time.Second - -var ( - testBackendAddrs = []resolver.Address{ - {Addr: "1.1.1.1:1"}, - } - testLocality = &xdsinternal.LocalityID{ - Region: "test-region", - Zone: "test-zone", - SubZone: "test-sub-zone", - } -) - -// TestLoadReporting verifies that the lrs balancer starts the loadReport -// stream when the LBConfig passed to it contains a valid value for the LRS -// server (empty string). -func TestLoadReporting(t *testing.T) { - xdsC := fakeclient.NewClient() - defer xdsC.Close() - - builder := balancer.Get(Name) - cc := testutils.NewTestClientConn(t) - lrsB := builder.Build(cc, balancer.BuildOptions{}) - defer lrsB.Close() - - if err := lrsB.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), - BalancerConfig: &LBConfig{ - ClusterName: testClusterName, - EDSServiceName: testServiceName, - LoadReportingServerName: testLRSServerName, - Locality: testLocality, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: roundrobin.Name, - }, - }, - }); err != nil { - t.Fatalf("unexpected error from UpdateClientConnState: %v", err) - } - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - got, err := xdsC.WaitForReportLoad(ctx) - if err != nil { - t.Fatalf("xdsClient.ReportLoad failed with error: %v", err) - } - if got.Server != testLRSServerName { - t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName) - } - - sc1 := <-cc.NewSubConnCh - lrsB.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - lrsB.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) - - // Test pick with one backend. - p1 := <-cc.NewPickerCh - const successCount = 5 - for i := 0; i < successCount; i++ { - gotSCSt, _ := p1.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) - } - gotSCSt.Done(balancer.DoneInfo{}) - } - const errorCount = 5 - for i := 0; i < errorCount; i++ { - gotSCSt, _ := p1.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) - } - gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")}) - } - - // Dump load data from the store and compare with expected counts. - loadStore := xdsC.LoadStore() - if loadStore == nil { - t.Fatal("loadStore is nil in xdsClient") - } - sds := loadStore.Stats([]string{testClusterName}) - if len(sds) == 0 { - t.Fatalf("loads for cluster %v not found in store", testClusterName) - } - sd := sds[0] - if sd.Cluster != testClusterName || sd.Service != testServiceName { - t.Fatalf("got unexpected load for %q, %q, want %q, %q", sd.Cluster, sd.Service, testClusterName, testServiceName) - } - testLocalityJSON, _ := testLocality.ToString() - localityData, ok := sd.LocalityStats[testLocalityJSON] - if !ok { - t.Fatalf("loads for %v not found in store", testLocality) - } - reqStats := localityData.RequestStats - if reqStats.Succeeded != successCount { - t.Errorf("got succeeded %v, want %v", reqStats.Succeeded, successCount) - } - if reqStats.Errored != errorCount { - t.Errorf("got errord %v, want %v", reqStats.Errored, errorCount) - } - if reqStats.InProgress != 0 { - t.Errorf("got inProgress %v, want %v", reqStats.InProgress, 0) - } -} diff --git a/xds/internal/balancer/lrs/config.go b/xds/internal/balancer/lrs/config.go deleted file mode 100644 index e0e30bbb8..000000000 --- a/xds/internal/balancer/lrs/config.go +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package lrs - -import ( - "encoding/json" - "fmt" - - internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" - "google.golang.org/grpc/serviceconfig" - "google.golang.org/grpc/xds/internal" -) - -// LBConfig is the balancer config for lrs balancer. -type LBConfig struct { - serviceconfig.LoadBalancingConfig `json:"-"` - - ClusterName string `json:"clusterName,omitempty"` - EDSServiceName string `json:"edsServiceName,omitempty"` - LoadReportingServerName string `json:"lrsLoadReportingServerName,omitempty"` - Locality *internal.LocalityID `json:"locality,omitempty"` - ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"` -} - -func parseConfig(c json.RawMessage) (*LBConfig, error) { - var cfg LBConfig - if err := json.Unmarshal(c, &cfg); err != nil { - return nil, err - } - if cfg.ClusterName == "" { - return nil, fmt.Errorf("required ClusterName is not set in %+v", cfg) - } - if cfg.Locality == nil { - return nil, fmt.Errorf("required Locality is not set in %+v", cfg) - } - return &cfg, nil -} diff --git a/xds/internal/balancer/lrs/config_test.go b/xds/internal/balancer/lrs/config_test.go deleted file mode 100644 index eaf902ac5..000000000 --- a/xds/internal/balancer/lrs/config_test.go +++ /dev/null @@ -1,114 +0,0 @@ -// +build go1.12 - -/* - * - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package lrs - -import ( - "testing" - - "github.com/google/go-cmp/cmp" - "google.golang.org/grpc/balancer/roundrobin" - internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" - xdsinternal "google.golang.org/grpc/xds/internal" -) - -const ( - testClusterName = "test-cluster" - testServiceName = "test-eds-service" - testLRSServerName = "test-lrs-name" -) - -func TestParseConfig(t *testing.T) { - tests := []struct { - name string - js string - want *LBConfig - wantErr bool - }{ - { - name: "no cluster name", - js: `{ - "edsServiceName": "test-eds-service", - "lrsLoadReportingServerName": "test-lrs-name", - "locality": { - "region": "test-region", - "zone": "test-zone", - "subZone": "test-sub-zone" - }, - "childPolicy":[{"round_robin":{}}] -} - `, - wantErr: true, - }, - { - name: "no locality", - js: `{ - "clusterName": "test-cluster", - "edsServiceName": "test-eds-service", - "lrsLoadReportingServerName": "test-lrs-name", - "childPolicy":[{"round_robin":{}}] -} - `, - wantErr: true, - }, - { - name: "good", - js: `{ - "clusterName": "test-cluster", - "edsServiceName": "test-eds-service", - "lrsLoadReportingServerName": "test-lrs-name", - "locality": { - "region": "test-region", - "zone": "test-zone", - "subZone": "test-sub-zone" - }, - "childPolicy":[{"round_robin":{}}] -} - `, - want: &LBConfig{ - ClusterName: testClusterName, - EDSServiceName: testServiceName, - LoadReportingServerName: testLRSServerName, - Locality: &xdsinternal.LocalityID{ - Region: "test-region", - Zone: "test-zone", - SubZone: "test-sub-zone", - }, - ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: roundrobin.Name, - Config: nil, - }, - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := parseConfig([]byte(tt.js)) - if (err != nil) != tt.wantErr { - t.Errorf("parseConfig() error = %v, wantErr %v", err, tt.wantErr) - return - } - if diff := cmp.Diff(got, tt.want); diff != "" { - t.Errorf("parseConfig() got = %v, want %v, diff: %s", got, tt.want, diff) - } - }) - } -} diff --git a/xds/internal/balancer/lrs/logging.go b/xds/internal/balancer/lrs/logging.go deleted file mode 100644 index 602dac099..000000000 --- a/xds/internal/balancer/lrs/logging.go +++ /dev/null @@ -1,34 +0,0 @@ -/* - * - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package lrs - -import ( - "fmt" - - "google.golang.org/grpc/grpclog" - internalgrpclog "google.golang.org/grpc/internal/grpclog" -) - -const prefix = "[lrs-lb %p] " - -var logger = grpclog.Component("xds") - -func prefixLogger(p *lrsBalancer) *internalgrpclog.PrefixLogger { - return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p)) -} diff --git a/xds/internal/balancer/lrs/picker.go b/xds/internal/balancer/lrs/picker.go deleted file mode 100644 index 1e4ad156e..000000000 --- a/xds/internal/balancer/lrs/picker.go +++ /dev/null @@ -1,85 +0,0 @@ -/* - * - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package lrs - -import ( - orcapb "github.com/cncf/udpa/go/udpa/data/orca/v1" - "google.golang.org/grpc/balancer" -) - -const ( - serverLoadCPUName = "cpu_utilization" - serverLoadMemoryName = "mem_utilization" -) - -// loadReporter wraps the methods from the loadStore that are used here. -type loadReporter interface { - CallStarted(locality string) - CallFinished(locality string, err error) - CallServerLoad(locality, name string, val float64) -} - -type loadReportPicker struct { - p balancer.Picker - - locality string - loadStore loadReporter -} - -func newLoadReportPicker(p balancer.Picker, id string, loadStore loadReporter) *loadReportPicker { - return &loadReportPicker{ - p: p, - locality: id, - loadStore: loadStore, - } -} - -func (lrp *loadReportPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { - res, err := lrp.p.Pick(info) - if err != nil { - return res, err - } - - if lrp.loadStore == nil { - return res, err - } - - lrp.loadStore.CallStarted(lrp.locality) - oldDone := res.Done - res.Done = func(info balancer.DoneInfo) { - if oldDone != nil { - oldDone(info) - } - lrp.loadStore.CallFinished(lrp.locality, info.Err) - - load, ok := info.ServerLoad.(*orcapb.OrcaLoadReport) - if !ok { - return - } - lrp.loadStore.CallServerLoad(lrp.locality, serverLoadCPUName, load.CpuUtilization) - lrp.loadStore.CallServerLoad(lrp.locality, serverLoadMemoryName, load.MemUtilization) - for n, d := range load.RequestCost { - lrp.loadStore.CallServerLoad(lrp.locality, n, d) - } - for n, d := range load.Utilization { - lrp.loadStore.CallServerLoad(lrp.locality, n, d) - } - } - return res, err -} diff --git a/xds/internal/balancer/weightedtarget/weightedtarget_config_test.go b/xds/internal/balancer/weightedtarget/weightedtarget_config_test.go index 351a13553..3cd6f74df 100644 --- a/xds/internal/balancer/weightedtarget/weightedtarget_config_test.go +++ b/xds/internal/balancer/weightedtarget/weightedtarget_config_test.go @@ -26,8 +26,7 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/grpc/balancer" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" - - _ "google.golang.org/grpc/xds/internal/balancer/lrs" // Register LRS balancer, so we can use it as child policy in the config tests. + "google.golang.org/grpc/xds/internal/balancer/priority" ) const ( @@ -35,23 +34,22 @@ const ( "targets": { "cluster_1" : { "weight":75, - "childPolicy":[{"lrs_experimental":{"clusterName":"cluster_1","lrsLoadReportingServerName":"lrs.server","locality":{"zone":"test-zone-1"}}}] + "childPolicy":[{"priority_experimental":{"priorities": ["child-1"], "children": {"child-1": {"config": [{"round_robin":{}}]}}}}] }, "cluster_2" : { "weight":25, - "childPolicy":[{"lrs_experimental":{"clusterName":"cluster_2","lrsLoadReportingServerName":"lrs.server","locality":{"zone":"test-zone-2"}}}] + "childPolicy":[{"priority_experimental":{"priorities": ["child-2"], "children": {"child-2": {"config": [{"round_robin":{}}]}}}}] } } }` - lrsBalancerName = "lrs_experimental" ) var ( - lrsConfigParser = balancer.Get(lrsBalancerName).(balancer.ConfigParser) - lrsConfigJSON1 = `{"clusterName":"cluster_1","lrsLoadReportingServerName":"lrs.server","locality":{"zone":"test-zone-1"}}` - lrsConfig1, _ = lrsConfigParser.ParseConfig([]byte(lrsConfigJSON1)) - lrsConfigJSON2 = `{"clusterName":"cluster_2","lrsLoadReportingServerName":"lrs.server","locality":{"zone":"test-zone-2"}}` - lrsConfig2, _ = lrsConfigParser.ParseConfig([]byte(lrsConfigJSON2)) + testConfigParser = balancer.Get(priority.Name).(balancer.ConfigParser) + testConfigJSON1 = `{"priorities": ["child-1"], "children": {"child-1": {"config": [{"round_robin":{}}]}}}` + testConfig1, _ = testConfigParser.ParseConfig([]byte(testConfigJSON1)) + testConfigJSON2 = `{"priorities": ["child-2"], "children": {"child-2": {"config": [{"round_robin":{}}]}}}` + testConfig2, _ = testConfigParser.ParseConfig([]byte(testConfigJSON2)) ) func Test_parseConfig(t *testing.T) { @@ -75,15 +73,15 @@ func Test_parseConfig(t *testing.T) { "cluster_1": { Weight: 75, ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: lrsBalancerName, - Config: lrsConfig1, + Name: priority.Name, + Config: testConfig1, }, }, "cluster_2": { Weight: 25, ChildPolicy: &internalserviceconfig.BalancerConfig{ - Name: lrsBalancerName, - Config: lrsConfig2, + Name: priority.Name, + Config: testConfig2, }, }, }, diff --git a/xds/internal/internal.go b/xds/internal/internal.go index e4284ee02..0cccd3824 100644 --- a/xds/internal/internal.go +++ b/xds/internal/internal.go @@ -22,6 +22,8 @@ package internal import ( "encoding/json" "fmt" + + "google.golang.org/grpc/resolver" ) // LocalityID is xds.Locality without XXX fields, so it can be used as map @@ -53,3 +55,19 @@ func LocalityIDFromString(s string) (ret LocalityID, _ error) { } return ret, nil } + +type localityKeyType string + +const localityKey = localityKeyType("grpc.xds.internal.address.locality") + +// GetLocalityID returns the locality ID of addr. +func GetLocalityID(addr resolver.Address) LocalityID { + path, _ := addr.Attributes.Value(localityKey).(LocalityID) + return path +} + +// SetLocalityID sets locality ID in addr to l. +func SetLocalityID(addr resolver.Address, l LocalityID) resolver.Address { + addr.Attributes = addr.Attributes.WithValues(localityKey, l) + return addr +}