xds: delete LRS policy and move the functionality to xds_cluster_impl (#4528)

- (cluster_resolver) attach locality ID to addresses
- (cluster_impl) wrap SubConn
- (lrs) delete
This commit is contained in:
Menghan Li 2021-06-24 16:20:11 -07:00 committed by GitHub
parent d9eb12feed
commit e24ede5936
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 383 additions and 867 deletions

View File

@ -22,6 +22,7 @@ package clusterimpl
import (
"context"
"fmt"
"strings"
"testing"
"time"
@ -34,6 +35,7 @@ import (
"google.golang.org/grpc/internal"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/resolver"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
@ -158,6 +160,9 @@ func TestDropByCategory(t *testing.T) {
Service: testServiceName,
TotalDrops: dropCount,
Drops: map[string]uint64{dropReason: dropCount},
LocalityStats: map[string]load.LocalityData{
assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: rpcCount - dropCount}},
},
}}
gotStatsData0 := loadStore.Stats([]string{testClusterName})
@ -213,6 +218,9 @@ func TestDropByCategory(t *testing.T) {
Service: testServiceName,
TotalDrops: dropCount2,
Drops: map[string]uint64{dropReason2: dropCount2},
LocalityStats: map[string]load.LocalityData{
assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: rpcCount - dropCount2}},
},
}}
gotStatsData1 := loadStore.Stats([]string{testClusterName})
@ -320,6 +328,9 @@ func TestDropCircuitBreaking(t *testing.T) {
Cluster: testClusterName,
Service: testServiceName,
TotalDrops: uint64(maxRequest),
LocalityStats: map[string]load.LocalityData{
assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: uint64(rpcCount - maxRequest + 50)}},
},
}}
gotStatsData0 := loadStore.Stats([]string{testClusterName})
@ -533,3 +544,117 @@ func TestReResolution(t *testing.T) {
t.Fatalf("timeout waiting for ResolveNow()")
}
}
func TestLoadReporting(t *testing.T) {
var testLocality = xdsinternal.LocalityID{
Region: "test-region",
Zone: "test-zone",
SubZone: "test-sub-zone",
}
xdsC := fakeclient.NewClient()
defer xdsC.Close()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()
addrs := make([]resolver.Address, len(testBackendAddrs))
for i, a := range testBackendAddrs {
addrs[i] = xdsinternal.SetLocalityID(a, testLocality)
}
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServerName: newString(testLRSServerName),
// Locality: testLocality,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
got, err := xdsC.WaitForReportLoad(ctx)
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != testLRSServerName {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
}
sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker.
p0 := <-cc.NewPickerCh
for i := 0; i < 10; i++ {
_, err := p0.Pick(balancer.PickInfo{})
if err != balancer.ErrNoSubConnAvailable {
t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable)
}
}
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
p1 := <-cc.NewPickerCh
const successCount = 5
for i := 0; i < successCount; i++ {
gotSCSt, err := p1.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
}
gotSCSt.Done(balancer.DoneInfo{})
}
const errorCount = 5
for i := 0; i < errorCount; i++ {
gotSCSt, err := p1.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
}
gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")})
}
// Dump load data from the store and compare with expected counts.
loadStore := xdsC.LoadStore()
if loadStore == nil {
t.Fatal("loadStore is nil in xdsClient")
}
sds := loadStore.Stats([]string{testClusterName})
if len(sds) == 0 {
t.Fatalf("loads for cluster %v not found in store", testClusterName)
}
sd := sds[0]
if sd.Cluster != testClusterName || sd.Service != testServiceName {
t.Fatalf("got unexpected load for %q, %q, want %q, %q", sd.Cluster, sd.Service, testClusterName, testServiceName)
}
testLocalityJSON, _ := testLocality.ToString()
localityData, ok := sd.LocalityStats[testLocalityJSON]
if !ok {
t.Fatalf("loads for %v not found in store", testLocality)
}
reqStats := localityData.RequestStats
if reqStats.Succeeded != successCount {
t.Errorf("got succeeded %v, want %v", reqStats.Succeeded, successCount)
}
if reqStats.Errored != errorCount {
t.Errorf("got errord %v, want %v", reqStats.Errored, errorCount)
}
if reqStats.InProgress != 0 {
t.Errorf("got inProgress %v, want %v", reqStats.InProgress, 0)
}
}
func assertString(f func() (string, error)) string {
s, err := f()
if err != nil {
panic(err.Error())
}
return s
}

View File

@ -27,6 +27,7 @@ import (
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
@ -37,6 +38,7 @@ import (
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/loadstore"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/load"
@ -61,6 +63,7 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
loadWrapper: loadstore.NewWrapper(),
scWrappers: make(map[balancer.SubConn]*scWrapper),
pickerUpdateCh: buffer.NewUnbounded(),
requestCountMax: defaultRequestCountMax,
}
@ -107,6 +110,18 @@ type clusterImplBalancer struct {
clusterNameMu sync.Mutex
clusterName string
scWrappersMu sync.Mutex
// The SubConns passed to the child policy are wrapped in a wrapper, to keep
// locality ID. But when the parent ClientConn sends updates, it's going to
// give the original SubConn, not the wrapper. But the child policies only
// know about the wrapper, so when forwarding SubConn updates, they must be
// sent for the wrappers.
//
// This keeps a map from original SubConn to wrapper, so that when
// forwarding the SubConn state update, the child policy will get the
// wrappers.
scWrappers map[balancer.SubConn]*scWrapper
// childState/drops/requestCounter keeps the state used by the most recently
// generated picker. All fields can only be accessed in run(). And run() is
// the only goroutine that sends picker to the parent ClientConn. All
@ -267,6 +282,15 @@ func (b *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer
b.ClientConn.ResolveNow(resolver.ResolveNowOptions{})
}
b.scWrappersMu.Lock()
if scw, ok := b.scWrappers[sc]; ok {
sc = scw
if s.ConnectivityState == connectivity.Shutdown {
// Remove this SubConn from the map on Shutdown.
delete(b.scWrappers, scw.SubConn)
}
}
b.scWrappersMu.Unlock()
if b.childLB != nil {
b.childLB.UpdateSubConnState(sc, s)
}
@ -304,20 +328,83 @@ func (b *clusterImplBalancer) getClusterName() string {
return b.clusterName
}
// scWrapper is a wrapper of SubConn with locality ID. The locality ID can be
// retrieved from the addresses when creating SubConn.
//
// All SubConns passed to the child policies are wrapped in this, so that the
// picker can get the localityID from the picked SubConn, and do load reporting.
//
// After wrapping, all SubConns to and from the parent ClientConn (e.g. for
// SubConn state update, update/remove SubConn) must be the original SubConns.
// All SubConns to and from the child policy (NewSubConn, forwarding SubConn
// state update) must be the wrapper. The balancer keeps a map from the original
// SubConn to the wrapper for this purpose.
type scWrapper struct {
balancer.SubConn
// locality needs to be atomic because it can be updated while being read by
// the picker.
locality atomic.Value // type xdsinternal.LocalityID
}
func (scw *scWrapper) updateLocalityID(lID xdsinternal.LocalityID) {
scw.locality.Store(lID)
}
func (scw *scWrapper) localityID() xdsinternal.LocalityID {
lID, _ := scw.locality.Load().(xdsinternal.LocalityID)
return lID
}
func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
clusterName := b.getClusterName()
newAddrs := make([]resolver.Address, len(addrs))
var lID xdsinternal.LocalityID
for i, addr := range addrs {
newAddrs[i] = internal.SetXDSHandshakeClusterName(addr, clusterName)
lID = xdsinternal.GetLocalityID(newAddrs[i])
}
return b.ClientConn.NewSubConn(newAddrs, opts)
sc, err := b.ClientConn.NewSubConn(newAddrs, opts)
if err != nil {
return nil, err
}
// Wrap this SubConn in a wrapper, and add it to the map.
b.scWrappersMu.Lock()
ret := &scWrapper{SubConn: sc}
ret.updateLocalityID(lID)
b.scWrappers[sc] = ret
b.scWrappersMu.Unlock()
return ret, nil
}
func (b *clusterImplBalancer) RemoveSubConn(sc balancer.SubConn) {
scw, ok := sc.(*scWrapper)
if !ok {
b.ClientConn.RemoveSubConn(sc)
return
}
// Remove the original SubConn from the parent ClientConn.
//
// Note that we don't remove this SubConn from the scWrappers map. We will
// need it to forward the final SubConn state Shutdown to the child policy.
//
// This entry is kept in the map until it's state is changes to Shutdown,
// and will be deleted in UpdateSubConnState().
b.ClientConn.RemoveSubConn(scw.SubConn)
}
func (b *clusterImplBalancer) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
clusterName := b.getClusterName()
newAddrs := make([]resolver.Address, len(addrs))
var lID xdsinternal.LocalityID
for i, addr := range addrs {
newAddrs[i] = internal.SetXDSHandshakeClusterName(addr, clusterName)
lID = xdsinternal.GetLocalityID(newAddrs[i])
}
if scw, ok := sc.(*scWrapper); ok {
scw.updateLocalityID(lID)
// Need to get the original SubConn from the wrapper before calling
// parent ClientConn.
sc = scw.SubConn
}
b.ClientConn.UpdateAddresses(sc, newAddrs)
}
@ -388,7 +475,7 @@ func (b *clusterImplBalancer) run() {
b.childState = u
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
Picker: newDropPicker(b.childState, &dropConfigs{
Picker: newPicker(b.childState, &dropConfigs{
drops: b.drops,
requestCounter: b.requestCounter,
requestCountMax: b.requestCountMax,
@ -399,7 +486,7 @@ func (b *clusterImplBalancer) run() {
if dc != nil && b.childState.Picker != nil {
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
Picker: newDropPicker(b.childState, dc, b.loadWrapper),
Picker: newPicker(b.childState, dc, b.loadWrapper),
})
}
}

View File

@ -19,6 +19,7 @@
package clusterimpl
import (
orcapb "github.com/cncf/udpa/go/udpa/data/orca/v1"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
@ -66,12 +67,21 @@ func (d *dropper) drop() (ret bool) {
return d.w.Next().(bool)
}
const (
serverLoadCPUName = "cpu_utilization"
serverLoadMemoryName = "mem_utilization"
)
// loadReporter wraps the methods from the loadStore that are used here.
type loadReporter interface {
CallStarted(locality string)
CallFinished(locality string, err error)
CallServerLoad(locality, name string, val float64)
CallDropped(locality string)
}
type dropPicker struct {
// Picker implements RPC drop, circuit breaking drop and load reporting.
type picker struct {
drops []*dropper
s balancer.State
loadStore loadReporter
@ -79,8 +89,8 @@ type dropPicker struct {
countMax uint32
}
func newDropPicker(s balancer.State, config *dropConfigs, loadStore load.PerClusterReporter) *dropPicker {
return &dropPicker{
func newPicker(s balancer.State, config *dropConfigs, loadStore load.PerClusterReporter) *picker {
return &picker{
drops: config.drops,
s: s,
loadStore: loadStore,
@ -89,13 +99,14 @@ func newDropPicker(s balancer.State, config *dropConfigs, loadStore load.PerClus
}
}
func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// Don't drop unless the inner picker is READY. Similar to
// https://github.com/grpc/grpc-go/issues/2622.
if d.s.ConnectivityState != connectivity.Ready {
return d.s.Picker.Pick(info)
}
// Check if this RPC should be dropped by category.
for _, dp := range d.drops {
if dp.drop() {
if d.loadStore != nil {
@ -105,6 +116,7 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
}
}
// Check if this RPC should be dropped by circuit breaking.
if d.counter != nil {
if err := d.counter.StartRequest(d.countMax); err != nil {
// Drops by circuit breaking are reported with empty category. They
@ -114,11 +126,58 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
}
return balancer.PickResult{}, status.Errorf(codes.Unavailable, err.Error())
}
pr, err := d.s.Picker.Pick(info)
if err != nil {
d.counter.EndRequest()
return pr, err
}
var lIDStr string
pr, err := d.s.Picker.Pick(info)
if scw, ok := pr.SubConn.(*scWrapper); ok {
// This OK check also covers the case err!=nil, because SubConn will be
// nil.
pr.SubConn = scw.SubConn
var e error
// If locality ID isn't found in the wrapper, an empty locality ID will
// be used.
lIDStr, e = scw.localityID().ToString()
if e != nil {
logger.Infof("failed to marshal LocalityID: %#v, loads won't be reported", scw.localityID())
}
}
if err != nil {
if d.counter != nil {
// Release one request count if this pick fails.
d.counter.EndRequest()
}
return pr, err
}
if d.loadStore != nil {
d.loadStore.CallStarted(lIDStr)
oldDone := pr.Done
pr.Done = func(info balancer.DoneInfo) {
if oldDone != nil {
oldDone(info)
}
d.loadStore.CallFinished(lIDStr, info.Err)
load, ok := info.ServerLoad.(*orcapb.OrcaLoadReport)
if !ok {
return
}
d.loadStore.CallServerLoad(lIDStr, serverLoadCPUName, load.CpuUtilization)
d.loadStore.CallServerLoad(lIDStr, serverLoadMemoryName, load.MemUtilization)
for n, c := range load.RequestCost {
d.loadStore.CallServerLoad(lIDStr, n, c)
}
for n, c := range load.Utilization {
d.loadStore.CallServerLoad(lIDStr, n, c)
}
}
}
if d.counter != nil {
// Update Done() so that when the RPC finishes, the request count will
// be released.
oldDone := pr.Done
pr.Done = func(doneInfo balancer.DoneInfo) {
d.counter.EndRequest()
@ -126,8 +185,7 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
oldDone(doneInfo)
}
}
return pr, err
}
return d.s.Picker.Pick(info)
return pr, err
}

View File

@ -38,8 +38,8 @@ import (
"google.golang.org/grpc/internal/hierarchy"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
"google.golang.org/grpc/xds/internal/balancer/lrs"
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget"
"google.golang.org/grpc/xds/internal/xdsclient"
@ -181,7 +181,7 @@ func buildClusterImplConfigForEDS(parentPriority int, edsResp xdsclient.Endpoint
// Prepend parent priority to the priority names, to avoid duplicates.
pName := fmt.Sprintf("priority-%v-%v", parentPriority, priorityName)
retNames = append(retNames, pName)
wtConfig, addrs := localitiesToWeightedTarget(priorityLocalities, pName, endpointPickingPolicy, mechanism.LoadReportingServerName, mechanism.Cluster, mechanism.EDSServiceName)
wtConfig, addrs := localitiesToWeightedTarget(priorityLocalities, pName, endpointPickingPolicy, mechanism.Cluster, mechanism.EDSServiceName)
retConfigs[pName] = &clusterimpl.LBConfig{
Cluster: mechanism.Cluster,
EDSServiceName: mechanism.EDSServiceName,
@ -249,7 +249,7 @@ func dedupSortedIntSlice(a []int) []int {
//
// The addresses have path hierarchy set to [priority-name, locality-name], so
// priority and weighted target know which child policy they are for.
func localitiesToWeightedTarget(localities []xdsclient.Locality, priorityName string, childPolicy *internalserviceconfig.BalancerConfig, lrsServer *string, cluster, edsService string) (*weightedtarget.LBConfig, []resolver.Address) {
func localitiesToWeightedTarget(localities []xdsclient.Locality, priorityName string, childPolicy *internalserviceconfig.BalancerConfig, cluster, edsService string) (*weightedtarget.LBConfig, []resolver.Address) {
weightedTargets := make(map[string]weightedtarget.Target)
var addrs []resolver.Address
for _, locality := range localities {
@ -257,25 +257,7 @@ func localitiesToWeightedTarget(localities []xdsclient.Locality, priorityName st
if err != nil {
localityStr = fmt.Sprintf("%+v", locality.ID)
}
child := childPolicy
// If lrsServer is not set, we can skip this extra layer of the LRS
// policy.
if lrsServer != nil {
localityID := locality.ID
child = &internalserviceconfig.BalancerConfig{
Name: lrs.Name,
Config: &lrs.LBConfig{
ClusterName: cluster,
EDSServiceName: edsService,
ChildPolicy: childPolicy,
LoadReportingServerName: *lrsServer,
Locality: &localityID,
},
}
}
weightedTargets[localityStr] = weightedtarget.Target{Weight: locality.Weight, ChildPolicy: child}
weightedTargets[localityStr] = weightedtarget.Target{Weight: locality.Weight, ChildPolicy: childPolicy}
for _, endpoint := range locality.Endpoints {
// Filter out all "unhealthy" endpoints (unknown and healthy are
// both considered to be healthy:
@ -290,6 +272,7 @@ func localitiesToWeightedTarget(localities []xdsclient.Locality, priorityName st
addr = weightedroundrobin.SetAddrInfo(addr, ai)
}
addr = hierarchy.Set(addr, []string{priorityName, localityStr})
addr = internal.SetLocalityID(addr, locality.ID)
addrs = append(addrs, addr)
}
}

View File

@ -35,7 +35,6 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
"google.golang.org/grpc/xds/internal/balancer/lrs"
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget"
"google.golang.org/grpc/xds/internal/xdsclient"
@ -228,30 +227,12 @@ func TestBuildPriorityConfig(t *testing.T) {
Config: &weightedtarget.LBConfig{
Targets: map[string]weightedtarget.Target{
assertString(testLocalityIDs[0].ToString): {
Weight: 20,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: lrs.Name,
Config: &lrs.LBConfig{
ClusterName: testClusterName,
EDSServiceName: testEDSServiceName,
LoadReportingServerName: testLRSServer,
Locality: &testLocalityIDs[0],
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
Weight: 20,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
assertString(testLocalityIDs[1].ToString): {
Weight: 80,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: lrs.Name,
Config: &lrs.LBConfig{
ClusterName: testClusterName,
EDSServiceName: testEDSServiceName,
LoadReportingServerName: testLRSServer,
Locality: &testLocalityIDs[1],
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
Weight: 80,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
},
@ -279,30 +260,12 @@ func TestBuildPriorityConfig(t *testing.T) {
Config: &weightedtarget.LBConfig{
Targets: map[string]weightedtarget.Target{
assertString(testLocalityIDs[2].ToString): {
Weight: 20,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: lrs.Name,
Config: &lrs.LBConfig{
ClusterName: testClusterName,
EDSServiceName: testEDSServiceName,
LoadReportingServerName: testLRSServer,
Locality: &testLocalityIDs[2],
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
Weight: 20,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
assertString(testLocalityIDs[3].ToString): {
Weight: 80,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: lrs.Name,
Config: &lrs.LBConfig{
ClusterName: testClusterName,
EDSServiceName: testEDSServiceName,
LoadReportingServerName: testLRSServer,
Locality: &testLocalityIDs[3],
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
Weight: 80,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
},
@ -324,16 +287,16 @@ func TestBuildPriorityConfig(t *testing.T) {
Priorities: []string{"priority-0-0", "priority-0-1", "priority-1"},
}
wantAddrs := []resolver.Address{
hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][0]}, []string{"priority-0-0", assertString(testLocalityIDs[0].ToString)}),
hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][1]}, []string{"priority-0-0", assertString(testLocalityIDs[0].ToString)}),
hierarchy.Set(resolver.Address{Addr: testAddressStrs[1][0]}, []string{"priority-0-0", assertString(testLocalityIDs[1].ToString)}),
hierarchy.Set(resolver.Address{Addr: testAddressStrs[1][1]}, []string{"priority-0-0", assertString(testLocalityIDs[1].ToString)}),
hierarchy.Set(resolver.Address{Addr: testAddressStrs[2][0]}, []string{"priority-0-1", assertString(testLocalityIDs[2].ToString)}),
hierarchy.Set(resolver.Address{Addr: testAddressStrs[2][1]}, []string{"priority-0-1", assertString(testLocalityIDs[2].ToString)}),
hierarchy.Set(resolver.Address{Addr: testAddressStrs[3][0]}, []string{"priority-0-1", assertString(testLocalityIDs[3].ToString)}),
hierarchy.Set(resolver.Address{Addr: testAddressStrs[3][1]}, []string{"priority-0-1", assertString(testLocalityIDs[3].ToString)}),
hierarchy.Set(resolver.Address{Addr: testAddressStrs[4][0]}, []string{"priority-1"}),
hierarchy.Set(resolver.Address{Addr: testAddressStrs[4][1]}, []string{"priority-1"}),
testAddrWithAttrs(testAddressStrs[0][0], nil, "priority-0-0", &testLocalityIDs[0]),
testAddrWithAttrs(testAddressStrs[0][1], nil, "priority-0-0", &testLocalityIDs[0]),
testAddrWithAttrs(testAddressStrs[1][0], nil, "priority-0-0", &testLocalityIDs[1]),
testAddrWithAttrs(testAddressStrs[1][1], nil, "priority-0-0", &testLocalityIDs[1]),
testAddrWithAttrs(testAddressStrs[2][0], nil, "priority-0-1", &testLocalityIDs[2]),
testAddrWithAttrs(testAddressStrs[2][1], nil, "priority-0-1", &testLocalityIDs[2]),
testAddrWithAttrs(testAddressStrs[3][0], nil, "priority-0-1", &testLocalityIDs[3]),
testAddrWithAttrs(testAddressStrs[3][1], nil, "priority-0-1", &testLocalityIDs[3]),
testAddrWithAttrs(testAddressStrs[4][0], nil, "priority-1", nil),
testAddrWithAttrs(testAddressStrs[4][1], nil, "priority-1", nil),
}
if diff := cmp.Diff(gotConfig, wantConfig); diff != "" {
@ -434,30 +397,12 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) {
Config: &weightedtarget.LBConfig{
Targets: map[string]weightedtarget.Target{
assertString(testLocalityIDs[0].ToString): {
Weight: 20,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: lrs.Name,
Config: &lrs.LBConfig{
ClusterName: testClusterName,
EDSServiceName: testEDSServiceName,
LoadReportingServerName: testLRSServer,
Locality: &testLocalityIDs[0],
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
Weight: 20,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
assertString(testLocalityIDs[1].ToString): {
Weight: 80,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: lrs.Name,
Config: &lrs.LBConfig{
ClusterName: testClusterName,
EDSServiceName: testEDSServiceName,
LoadReportingServerName: testLRSServer,
Locality: &testLocalityIDs[1],
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
Weight: 80,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
},
@ -479,30 +424,12 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) {
Config: &weightedtarget.LBConfig{
Targets: map[string]weightedtarget.Target{
assertString(testLocalityIDs[2].ToString): {
Weight: 20,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: lrs.Name,
Config: &lrs.LBConfig{
ClusterName: testClusterName,
EDSServiceName: testEDSServiceName,
LoadReportingServerName: testLRSServer,
Locality: &testLocalityIDs[2],
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
Weight: 20,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
assertString(testLocalityIDs[3].ToString): {
Weight: 80,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: lrs.Name,
Config: &lrs.LBConfig{
ClusterName: testClusterName,
EDSServiceName: testEDSServiceName,
LoadReportingServerName: testLRSServer,
Locality: &testLocalityIDs[3],
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
Weight: 80,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
},
@ -510,14 +437,14 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) {
},
}
wantAddrs := []resolver.Address{
hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][0]}, []string{"priority-2-0", assertString(testLocalityIDs[0].ToString)}),
hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][1]}, []string{"priority-2-0", assertString(testLocalityIDs[0].ToString)}),
hierarchy.Set(resolver.Address{Addr: testAddressStrs[1][0]}, []string{"priority-2-0", assertString(testLocalityIDs[1].ToString)}),
hierarchy.Set(resolver.Address{Addr: testAddressStrs[1][1]}, []string{"priority-2-0", assertString(testLocalityIDs[1].ToString)}),
hierarchy.Set(resolver.Address{Addr: testAddressStrs[2][0]}, []string{"priority-2-1", assertString(testLocalityIDs[2].ToString)}),
hierarchy.Set(resolver.Address{Addr: testAddressStrs[2][1]}, []string{"priority-2-1", assertString(testLocalityIDs[2].ToString)}),
hierarchy.Set(resolver.Address{Addr: testAddressStrs[3][0]}, []string{"priority-2-1", assertString(testLocalityIDs[3].ToString)}),
hierarchy.Set(resolver.Address{Addr: testAddressStrs[3][1]}, []string{"priority-2-1", assertString(testLocalityIDs[3].ToString)}),
testAddrWithAttrs(testAddressStrs[0][0], nil, "priority-2-0", &testLocalityIDs[0]),
testAddrWithAttrs(testAddressStrs[0][1], nil, "priority-2-0", &testLocalityIDs[0]),
testAddrWithAttrs(testAddressStrs[1][0], nil, "priority-2-0", &testLocalityIDs[1]),
testAddrWithAttrs(testAddressStrs[1][1], nil, "priority-2-0", &testLocalityIDs[1]),
testAddrWithAttrs(testAddressStrs[2][0], nil, "priority-2-1", &testLocalityIDs[2]),
testAddrWithAttrs(testAddressStrs[2][1], nil, "priority-2-1", &testLocalityIDs[2]),
testAddrWithAttrs(testAddressStrs[3][0], nil, "priority-2-1", &testLocalityIDs[3]),
testAddrWithAttrs(testAddressStrs[3][1], nil, "priority-2-1", &testLocalityIDs[3]),
}
if diff := cmp.Diff(gotNames, wantNames); diff != "" {
@ -674,38 +601,20 @@ func TestLocalitiesToWeightedTarget(t *testing.T) {
wantConfig: &weightedtarget.LBConfig{
Targets: map[string]weightedtarget.Target{
assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString): {
Weight: 20,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: lrs.Name,
Config: &lrs.LBConfig{
ClusterName: "test-cluster",
EDSServiceName: "test-eds-service",
LoadReportingServerName: "test-lrs-server",
Locality: &internal.LocalityID{Zone: "test-zone-1"},
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
Weight: 20,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString): {
Weight: 80,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: lrs.Name,
Config: &lrs.LBConfig{
ClusterName: "test-cluster",
EDSServiceName: "test-eds-service",
LoadReportingServerName: "test-lrs-server",
Locality: &internal.LocalityID{Zone: "test-zone-2"},
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
Weight: 80,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
},
wantAddrs: []resolver.Address{
hierarchy.Set(resolver.Address{Addr: "addr-1-1"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}),
hierarchy.Set(resolver.Address{Addr: "addr-1-2"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}),
hierarchy.Set(resolver.Address{Addr: "addr-2-1"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}),
hierarchy.Set(resolver.Address{Addr: "addr-2-2"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}),
testAddrWithAttrs("addr-1-1", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-1-2", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-2-1", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
testAddrWithAttrs("addr-2-2", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
},
},
{
@ -748,10 +657,10 @@ func TestLocalitiesToWeightedTarget(t *testing.T) {
},
},
wantAddrs: []resolver.Address{
hierarchy.Set(resolver.Address{Addr: "addr-1-1"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}),
hierarchy.Set(resolver.Address{Addr: "addr-1-2"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}),
hierarchy.Set(resolver.Address{Addr: "addr-2-1"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}),
hierarchy.Set(resolver.Address{Addr: "addr-2-2"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}),
testAddrWithAttrs("addr-1-1", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-1-2", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-2-1", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
testAddrWithAttrs("addr-2-2", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
},
},
{
@ -794,24 +703,16 @@ func TestLocalitiesToWeightedTarget(t *testing.T) {
},
},
wantAddrs: []resolver.Address{
hierarchy.Set(
weightedroundrobin.SetAddrInfo(resolver.Address{Addr: "addr-1-1"}, weightedroundrobin.AddrInfo{Weight: 90}),
[]string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}),
hierarchy.Set(
weightedroundrobin.SetAddrInfo(resolver.Address{Addr: "addr-1-2"}, weightedroundrobin.AddrInfo{Weight: 10}),
[]string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}),
hierarchy.Set(
weightedroundrobin.SetAddrInfo(resolver.Address{Addr: "addr-2-1"}, weightedroundrobin.AddrInfo{Weight: 90}),
[]string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}),
hierarchy.Set(
weightedroundrobin.SetAddrInfo(resolver.Address{Addr: "addr-2-2"}, weightedroundrobin.AddrInfo{Weight: 10}),
[]string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}),
testAddrWithAttrs("addr-1-1", newUint32(90), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-1-2", newUint32(10), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-2-1", newUint32(90), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
testAddrWithAttrs("addr-2-2", newUint32(10), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, got1 := localitiesToWeightedTarget(tt.localities, tt.priorityName, tt.childPolicy, tt.lrsServer, tt.cluster, tt.edsService)
got, got1 := localitiesToWeightedTarget(tt.localities, tt.priorityName, tt.childPolicy, tt.cluster, tt.edsService)
if diff := cmp.Diff(got, tt.wantConfig); diff != "" {
t.Errorf("localitiesToWeightedTarget() diff (-got +want) %v", diff)
}
@ -837,3 +738,17 @@ func assertString(f func() (string, error)) string {
}
return s
}
func testAddrWithAttrs(addrStr string, weight *uint32, priority string, lID *internal.LocalityID) resolver.Address {
addr := resolver.Address{Addr: addrStr}
if weight != nil {
addr = weightedroundrobin.SetAddrInfo(addr, weightedroundrobin.AddrInfo{Weight: *weight})
}
path := []string{priority}
if lID != nil {
path = append(path, assertString(lID.ToString))
addr = internal.SetLocalityID(addr, *lID)
}
addr = hierarchy.Set(addr, path)
return addr
}

View File

@ -1,239 +0,0 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package lrs implements load reporting balancer for xds.
package lrs
import (
"encoding/json"
"fmt"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/loadstore"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/load"
)
func init() {
balancer.Register(bb{})
}
// Name is the name of the LRS balancer.
const Name = "lrs_experimental"
type bb struct{}
func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
b := &lrsBalancer{
cc: cc,
buildOpts: opts,
}
b.logger = prefixLogger(b)
b.logger.Infof("Created")
return b
}
func (bb) Name() string {
return Name
}
func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return parseConfig(c)
}
type lrsBalancer struct {
cc balancer.ClientConn
buildOpts balancer.BuildOptions
logger *grpclog.PrefixLogger
xdsClient *xdsClientWrapper
config *LBConfig
lb balancer.Balancer // The sub balancer.
}
func (b *lrsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(s.BalancerConfig))
newConfig, ok := s.BalancerConfig.(*LBConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
}
if b.xdsClient == nil {
c := xdsclient.FromResolverState(s.ResolverState)
if c == nil {
return balancer.ErrBadResolverState
}
b.xdsClient = newXDSClientWrapper(c)
}
// Update load reporting config or xds client. This needs to be done before
// updating the child policy because we need the loadStore from the updated
// client to be passed to the ccWrapper.
if err := b.xdsClient.update(newConfig); err != nil {
return err
}
// If child policy is a different type, recreate the sub-balancer.
if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name {
bb := balancer.Get(newConfig.ChildPolicy.Name)
if bb == nil {
return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name)
}
if b.lb != nil {
b.lb.Close()
}
lidJSON, err := newConfig.Locality.ToString()
if err != nil {
return fmt.Errorf("failed to marshal LocalityID: %#v", newConfig.Locality)
}
ccWrapper := newCCWrapper(b.cc, b.xdsClient.loadStore(), lidJSON)
b.lb = bb.Build(ccWrapper, b.buildOpts)
}
b.config = newConfig
// Addresses and sub-balancer config are sent to sub-balancer.
return b.lb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: s.ResolverState,
BalancerConfig: b.config.ChildPolicy.Config,
})
}
func (b *lrsBalancer) ResolverError(err error) {
if b.lb != nil {
b.lb.ResolverError(err)
}
}
func (b *lrsBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
if b.lb != nil {
b.lb.UpdateSubConnState(sc, s)
}
}
func (b *lrsBalancer) Close() {
if b.lb != nil {
b.lb.Close()
b.lb = nil
}
b.xdsClient.close()
}
type ccWrapper struct {
balancer.ClientConn
loadStore load.PerClusterReporter
localityIDJSON string
}
func newCCWrapper(cc balancer.ClientConn, loadStore load.PerClusterReporter, localityIDJSON string) *ccWrapper {
return &ccWrapper{
ClientConn: cc,
loadStore: loadStore,
localityIDJSON: localityIDJSON,
}
}
func (ccw *ccWrapper) UpdateState(s balancer.State) {
s.Picker = newLoadReportPicker(s.Picker, ccw.localityIDJSON, ccw.loadStore)
ccw.ClientConn.UpdateState(s)
}
type xdsClientWrapper struct {
c xdsclient.XDSClient
cancelLoadReport func()
clusterName string
edsServiceName string
lrsServerName *string
// loadWrapper is a wrapper with loadOriginal, with clusterName and
// edsServiceName. It's used children to report loads.
loadWrapper *loadstore.Wrapper
}
func newXDSClientWrapper(c xdsclient.XDSClient) *xdsClientWrapper {
return &xdsClientWrapper{
c: c,
loadWrapper: loadstore.NewWrapper(),
}
}
// update checks the config and xdsclient, and decides whether it needs to
// restart the load reporting stream.
func (w *xdsClientWrapper) update(newConfig *LBConfig) error {
var (
restartLoadReport bool
updateLoadClusterAndService bool
)
// ClusterName is different, restart. ClusterName is from ClusterName and
// EDSServiceName.
if w.clusterName != newConfig.ClusterName {
updateLoadClusterAndService = true
w.clusterName = newConfig.ClusterName
}
if w.edsServiceName != newConfig.EDSServiceName {
updateLoadClusterAndService = true
w.edsServiceName = newConfig.EDSServiceName
}
if updateLoadClusterAndService {
// This updates the clusterName and serviceName that will reported for the
// loads. The update here is too early, the perfect timing is when the
// picker is updated with the new connection. But from this balancer's point
// of view, it's impossible to tell.
//
// On the other hand, this will almost never happen. Each LRS policy
// shouldn't get updated config. The parent should do a graceful switch when
// the clusterName or serviceName is changed.
w.loadWrapper.UpdateClusterAndService(w.clusterName, w.edsServiceName)
}
if w.lrsServerName == nil || *w.lrsServerName != newConfig.LoadReportingServerName {
// LoadReportingServerName is different, load should be report to a
// different server, restart.
restartLoadReport = true
w.lrsServerName = &newConfig.LoadReportingServerName
}
if restartLoadReport {
if w.cancelLoadReport != nil {
w.cancelLoadReport()
w.cancelLoadReport = nil
}
var loadStore *load.Store
if w.c != nil {
loadStore, w.cancelLoadReport = w.c.ReportLoad(*w.lrsServerName)
}
w.loadWrapper.UpdateLoadStore(loadStore)
}
return nil
}
func (w *xdsClientWrapper) loadStore() load.PerClusterReporter {
return w.loadWrapper
}
func (w *xdsClientWrapper) close() {
if w.cancelLoadReport != nil {
w.cancelLoadReport()
w.cancelLoadReport = nil
}
}

View File

@ -1,143 +0,0 @@
// +build go1.12
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package lrs
import (
"context"
"fmt"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/resolver"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
)
const defaultTestTimeout = 1 * time.Second
var (
testBackendAddrs = []resolver.Address{
{Addr: "1.1.1.1:1"},
}
testLocality = &xdsinternal.LocalityID{
Region: "test-region",
Zone: "test-zone",
SubZone: "test-sub-zone",
}
)
// TestLoadReporting verifies that the lrs balancer starts the loadReport
// stream when the LBConfig passed to it contains a valid value for the LRS
// server (empty string).
func TestLoadReporting(t *testing.T) {
xdsC := fakeclient.NewClient()
defer xdsC.Close()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
lrsB := builder.Build(cc, balancer.BuildOptions{})
defer lrsB.Close()
if err := lrsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
ClusterName: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServerName: testLRSServerName,
Locality: testLocality,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
got, err := xdsC.WaitForReportLoad(ctx)
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != testLRSServerName {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
}
sc1 := <-cc.NewSubConnCh
lrsB.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
lrsB.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
p1 := <-cc.NewPickerCh
const successCount = 5
for i := 0; i < successCount; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
gotSCSt.Done(balancer.DoneInfo{})
}
const errorCount = 5
for i := 0; i < errorCount; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")})
}
// Dump load data from the store and compare with expected counts.
loadStore := xdsC.LoadStore()
if loadStore == nil {
t.Fatal("loadStore is nil in xdsClient")
}
sds := loadStore.Stats([]string{testClusterName})
if len(sds) == 0 {
t.Fatalf("loads for cluster %v not found in store", testClusterName)
}
sd := sds[0]
if sd.Cluster != testClusterName || sd.Service != testServiceName {
t.Fatalf("got unexpected load for %q, %q, want %q, %q", sd.Cluster, sd.Service, testClusterName, testServiceName)
}
testLocalityJSON, _ := testLocality.ToString()
localityData, ok := sd.LocalityStats[testLocalityJSON]
if !ok {
t.Fatalf("loads for %v not found in store", testLocality)
}
reqStats := localityData.RequestStats
if reqStats.Succeeded != successCount {
t.Errorf("got succeeded %v, want %v", reqStats.Succeeded, successCount)
}
if reqStats.Errored != errorCount {
t.Errorf("got errord %v, want %v", reqStats.Errored, errorCount)
}
if reqStats.InProgress != 0 {
t.Errorf("got inProgress %v, want %v", reqStats.InProgress, 0)
}
}

View File

@ -1,53 +0,0 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package lrs
import (
"encoding/json"
"fmt"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal"
)
// LBConfig is the balancer config for lrs balancer.
type LBConfig struct {
serviceconfig.LoadBalancingConfig `json:"-"`
ClusterName string `json:"clusterName,omitempty"`
EDSServiceName string `json:"edsServiceName,omitempty"`
LoadReportingServerName string `json:"lrsLoadReportingServerName,omitempty"`
Locality *internal.LocalityID `json:"locality,omitempty"`
ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
}
func parseConfig(c json.RawMessage) (*LBConfig, error) {
var cfg LBConfig
if err := json.Unmarshal(c, &cfg); err != nil {
return nil, err
}
if cfg.ClusterName == "" {
return nil, fmt.Errorf("required ClusterName is not set in %+v", cfg)
}
if cfg.Locality == nil {
return nil, fmt.Errorf("required Locality is not set in %+v", cfg)
}
return &cfg, nil
}

View File

@ -1,114 +0,0 @@
// +build go1.12
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package lrs
import (
"testing"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer/roundrobin"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
xdsinternal "google.golang.org/grpc/xds/internal"
)
const (
testClusterName = "test-cluster"
testServiceName = "test-eds-service"
testLRSServerName = "test-lrs-name"
)
func TestParseConfig(t *testing.T) {
tests := []struct {
name string
js string
want *LBConfig
wantErr bool
}{
{
name: "no cluster name",
js: `{
"edsServiceName": "test-eds-service",
"lrsLoadReportingServerName": "test-lrs-name",
"locality": {
"region": "test-region",
"zone": "test-zone",
"subZone": "test-sub-zone"
},
"childPolicy":[{"round_robin":{}}]
}
`,
wantErr: true,
},
{
name: "no locality",
js: `{
"clusterName": "test-cluster",
"edsServiceName": "test-eds-service",
"lrsLoadReportingServerName": "test-lrs-name",
"childPolicy":[{"round_robin":{}}]
}
`,
wantErr: true,
},
{
name: "good",
js: `{
"clusterName": "test-cluster",
"edsServiceName": "test-eds-service",
"lrsLoadReportingServerName": "test-lrs-name",
"locality": {
"region": "test-region",
"zone": "test-zone",
"subZone": "test-sub-zone"
},
"childPolicy":[{"round_robin":{}}]
}
`,
want: &LBConfig{
ClusterName: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServerName: testLRSServerName,
Locality: &xdsinternal.LocalityID{
Region: "test-region",
Zone: "test-zone",
SubZone: "test-sub-zone",
},
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
Config: nil,
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseConfig([]byte(tt.js))
if (err != nil) != tt.wantErr {
t.Errorf("parseConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}
if diff := cmp.Diff(got, tt.want); diff != "" {
t.Errorf("parseConfig() got = %v, want %v, diff: %s", got, tt.want, diff)
}
})
}
}

View File

@ -1,34 +0,0 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package lrs
import (
"fmt"
"google.golang.org/grpc/grpclog"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
)
const prefix = "[lrs-lb %p] "
var logger = grpclog.Component("xds")
func prefixLogger(p *lrsBalancer) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p))
}

View File

@ -1,85 +0,0 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package lrs
import (
orcapb "github.com/cncf/udpa/go/udpa/data/orca/v1"
"google.golang.org/grpc/balancer"
)
const (
serverLoadCPUName = "cpu_utilization"
serverLoadMemoryName = "mem_utilization"
)
// loadReporter wraps the methods from the loadStore that are used here.
type loadReporter interface {
CallStarted(locality string)
CallFinished(locality string, err error)
CallServerLoad(locality, name string, val float64)
}
type loadReportPicker struct {
p balancer.Picker
locality string
loadStore loadReporter
}
func newLoadReportPicker(p balancer.Picker, id string, loadStore loadReporter) *loadReportPicker {
return &loadReportPicker{
p: p,
locality: id,
loadStore: loadStore,
}
}
func (lrp *loadReportPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
res, err := lrp.p.Pick(info)
if err != nil {
return res, err
}
if lrp.loadStore == nil {
return res, err
}
lrp.loadStore.CallStarted(lrp.locality)
oldDone := res.Done
res.Done = func(info balancer.DoneInfo) {
if oldDone != nil {
oldDone(info)
}
lrp.loadStore.CallFinished(lrp.locality, info.Err)
load, ok := info.ServerLoad.(*orcapb.OrcaLoadReport)
if !ok {
return
}
lrp.loadStore.CallServerLoad(lrp.locality, serverLoadCPUName, load.CpuUtilization)
lrp.loadStore.CallServerLoad(lrp.locality, serverLoadMemoryName, load.MemUtilization)
for n, d := range load.RequestCost {
lrp.loadStore.CallServerLoad(lrp.locality, n, d)
}
for n, d := range load.Utilization {
lrp.loadStore.CallServerLoad(lrp.locality, n, d)
}
}
return res, err
}

View File

@ -26,8 +26,7 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
_ "google.golang.org/grpc/xds/internal/balancer/lrs" // Register LRS balancer, so we can use it as child policy in the config tests.
"google.golang.org/grpc/xds/internal/balancer/priority"
)
const (
@ -35,23 +34,22 @@ const (
"targets": {
"cluster_1" : {
"weight":75,
"childPolicy":[{"lrs_experimental":{"clusterName":"cluster_1","lrsLoadReportingServerName":"lrs.server","locality":{"zone":"test-zone-1"}}}]
"childPolicy":[{"priority_experimental":{"priorities": ["child-1"], "children": {"child-1": {"config": [{"round_robin":{}}]}}}}]
},
"cluster_2" : {
"weight":25,
"childPolicy":[{"lrs_experimental":{"clusterName":"cluster_2","lrsLoadReportingServerName":"lrs.server","locality":{"zone":"test-zone-2"}}}]
"childPolicy":[{"priority_experimental":{"priorities": ["child-2"], "children": {"child-2": {"config": [{"round_robin":{}}]}}}}]
}
}
}`
lrsBalancerName = "lrs_experimental"
)
var (
lrsConfigParser = balancer.Get(lrsBalancerName).(balancer.ConfigParser)
lrsConfigJSON1 = `{"clusterName":"cluster_1","lrsLoadReportingServerName":"lrs.server","locality":{"zone":"test-zone-1"}}`
lrsConfig1, _ = lrsConfigParser.ParseConfig([]byte(lrsConfigJSON1))
lrsConfigJSON2 = `{"clusterName":"cluster_2","lrsLoadReportingServerName":"lrs.server","locality":{"zone":"test-zone-2"}}`
lrsConfig2, _ = lrsConfigParser.ParseConfig([]byte(lrsConfigJSON2))
testConfigParser = balancer.Get(priority.Name).(balancer.ConfigParser)
testConfigJSON1 = `{"priorities": ["child-1"], "children": {"child-1": {"config": [{"round_robin":{}}]}}}`
testConfig1, _ = testConfigParser.ParseConfig([]byte(testConfigJSON1))
testConfigJSON2 = `{"priorities": ["child-2"], "children": {"child-2": {"config": [{"round_robin":{}}]}}}`
testConfig2, _ = testConfigParser.ParseConfig([]byte(testConfigJSON2))
)
func Test_parseConfig(t *testing.T) {
@ -75,15 +73,15 @@ func Test_parseConfig(t *testing.T) {
"cluster_1": {
Weight: 75,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: lrsBalancerName,
Config: lrsConfig1,
Name: priority.Name,
Config: testConfig1,
},
},
"cluster_2": {
Weight: 25,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: lrsBalancerName,
Config: lrsConfig2,
Name: priority.Name,
Config: testConfig2,
},
},
},

View File

@ -22,6 +22,8 @@ package internal
import (
"encoding/json"
"fmt"
"google.golang.org/grpc/resolver"
)
// LocalityID is xds.Locality without XXX fields, so it can be used as map
@ -53,3 +55,19 @@ func LocalityIDFromString(s string) (ret LocalityID, _ error) {
}
return ret, nil
}
type localityKeyType string
const localityKey = localityKeyType("grpc.xds.internal.address.locality")
// GetLocalityID returns the locality ID of addr.
func GetLocalityID(addr resolver.Address) LocalityID {
path, _ := addr.Attributes.Value(localityKey).(LocalityID)
return path
}
// SetLocalityID sets locality ID in addr to l.
func SetLocalityID(addr resolver.Address, l LocalityID) resolver.Address {
addr.Attributes = addr.Attributes.WithValues(localityKey, l)
return addr
}