xds: Handle child policy configs properly (#4071)

This commit is contained in:
Easwar Swaminathan 2020-12-03 09:00:33 -08:00 committed by GitHub
parent 4a0125ac58
commit 5b1874ab74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 479 additions and 598 deletions

View File

@ -24,6 +24,7 @@ import (
"fmt"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
@ -33,15 +34,22 @@ import (
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/serviceconfig"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/load"
)
const (
edsName = "eds_experimental"
)
const edsName = "eds_experimental"
// xdsClientInterface contains only the xds_client methods needed by EDS
// balancer. It's defined so we can override xdsclient.New function in tests.
type xdsClientInterface interface {
WatchEndpoints(clusterName string, edsCb func(xdsclient.EndpointsUpdate, error)) (cancel func())
ReportLoad(server string) (loadStore *load.Store, cancel func())
Close()
}
var (
newEDSBalancer = func(cc balancer.ClientConn, enqueueState func(priorityType, balancer.State), xdsClient *xdsClientWrapper, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
return newEDSBalancerImpl(cc, enqueueState, xdsClient, logger)
newEDSBalancer = func(cc balancer.ClientConn, enqueueState func(priorityType, balancer.State), lw load.PerClusterReporter, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
return newEDSBalancerImpl(cc, enqueueState, lw, logger)
}
newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() }
)
@ -60,8 +68,10 @@ func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptio
grpcUpdate: make(chan interface{}),
xdsClientUpdate: make(chan *edsUpdate),
childPolicyUpdate: buffer.NewUnbounded(),
lsw: &loadStoreWrapper{},
config: &EDSConfig{},
}
x.logger = prefixLogger((x))
x.logger = prefixLogger(x)
client, err := newXDSClient()
if err != nil {
@ -69,8 +79,8 @@ func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptio
return nil
}
x.client = newXDSClientWrapper(client, x.handleEDSUpdate, x.logger)
x.edsImpl = newEDSBalancer(x.cc, x.enqueueChildBalancerState, x.client, x.logger)
x.xdsClient = client
x.edsImpl = newEDSBalancer(x.cc, x.enqueueChildBalancerState, x.lsw, x.logger)
x.logger.Infof("Created")
go x.run()
return x
@ -93,9 +103,11 @@ func (b *edsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadB
//
// It's implemented by the real eds balancer and a fake testing eds balancer.
type edsBalancerImplInterface interface {
// handleEDSResponse passes the received EDS message from traffic director to eds balancer.
// handleEDSResponse passes the received EDS message from traffic director
// to eds balancer.
handleEDSResponse(edsResp xdsclient.EndpointsUpdate)
// handleChildPolicy updates the eds balancer the intra-cluster load balancing policy to use.
// handleChildPolicy updates the eds balancer the intra-cluster load
// balancing policy to use.
handleChildPolicy(name string, config json.RawMessage)
// handleSubConnStateChange handles state change for SubConn.
handleSubConnStateChange(sc balancer.SubConn, state connectivity.State)
@ -110,23 +122,33 @@ type edsBalancerImplInterface interface {
//
// It currently has only an edsBalancer. Later, we may add fallback.
type edsBalancer struct {
cc balancer.ClientConn // *xdsClientConn
cc balancer.ClientConn
closed *grpcsync.Event
logger *grpclog.PrefixLogger
// edsBalancer continuously monitor the channels below, and will handle events from them in sync.
// edsBalancer continuously monitors the channels below, and will handle
// events from them in sync.
grpcUpdate chan interface{}
xdsClientUpdate chan *edsUpdate
childPolicyUpdate *buffer.Unbounded
client *xdsClientWrapper // may change when passed a different service config
config *EDSConfig // may change when passed a different service config
edsImpl edsBalancerImplInterface
xdsClient xdsClientInterface
lsw *loadStoreWrapper
config *EDSConfig // may change when passed a different service config
edsImpl edsBalancerImplInterface
// edsServiceName is the edsServiceName currently being watched, not
// necessary the edsServiceName from service config.
edsServiceName string
cancelEndpointsWatch func()
loadReportServer *string // LRS is disabled if loadReporterServer is nil.
cancelLoadReport func()
}
// run gets executed in a goroutine once edsBalancer is created. It monitors updates from grpc,
// xdsClient and load balancer. It synchronizes the operations that happen inside edsBalancer. It
// exits when edsBalancer is closed.
// run gets executed in a goroutine once edsBalancer is created. It monitors
// updates from grpc, xdsClient and load balancer. It synchronizes the
// operations that happen inside edsBalancer. It exits when edsBalancer is
// closed.
func (x *edsBalancer) run() {
for {
select {
@ -139,7 +161,8 @@ func (x *edsBalancer) run() {
u := update.(*balancerStateWithPriority)
x.edsImpl.updateState(u.priority, u.s)
case <-x.closed.Done():
x.client.close()
x.cancelWatch()
x.xdsClient.Close()
x.edsImpl.close()
return
}
@ -167,7 +190,7 @@ func (x *edsBalancer) handleErrorFromUpdate(err error, fromParent bool) {
// CDS balancer), and is a resource-not-found error. This means the
// resource (can be either LDS or CDS) was removed. Stop the EDS
// watch.
x.client.cancelWatch()
x.cancelWatch()
}
x.edsImpl.handleEDSResponse(xdsclient.EndpointsUpdate{})
}
@ -185,25 +208,19 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
return
}
if err := x.client.handleUpdate(cfg); err != nil {
x.logger.Warningf("failed to update xds clients: %v", err)
}
if x.config == nil {
x.config = cfg
return
if err := x.handleServiceConfigUpdate(cfg); err != nil {
x.logger.Warningf("failed to update xDS client: %v", err)
}
// We will update the edsImpl with the new child policy, if we got a
// different one.
if !cmp.Equal(cfg.ChildPolicy, x.config.ChildPolicy) {
if !cmp.Equal(cfg.ChildPolicy, x.config.ChildPolicy, cmpopts.EquateEmpty()) {
if cfg.ChildPolicy != nil {
x.edsImpl.handleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config)
} else {
x.edsImpl.handleChildPolicy(roundrobin.Name, nil)
}
}
x.config = cfg
case error:
x.handleErrorFromUpdate(u, true)
@ -213,6 +230,81 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
}
}
// handleServiceConfigUpdate applies the service config update, watching a new
// EDS service name and restarting LRS stream, as required.
func (x *edsBalancer) handleServiceConfigUpdate(config *EDSConfig) error {
// Restart EDS watch when the edsServiceName has changed.
if x.edsServiceName != config.EDSServiceName {
x.edsServiceName = config.EDSServiceName
x.startEndpointsWatch()
// TODO: this update for the LRS service name is too early. It should
// only apply to the new EDS response. But this is applied to the RPCs
// before the new EDS response. To fully fix this, the EDS balancer
// needs to do a graceful switch to another EDS implementation.
//
// This is OK for now, because we don't actually expect edsServiceName
// to change. Fix this (a bigger change) will happen later.
x.lsw.updateServiceName(x.edsServiceName)
}
// Restart load reporting when the loadReportServer name has changed.
if !equalStringPointers(x.loadReportServer, config.LrsLoadReportingServerName) {
loadStore := x.startLoadReport(config.LrsLoadReportingServerName)
x.lsw.updateLoadStore(loadStore)
}
return nil
}
// startEndpointsWatch starts the EDS watch.
//
// This usually means load report needs to be restarted, but this function does
// NOT do that. Caller needs to call startLoadReport separately.
func (x *edsBalancer) startEndpointsWatch() {
if x.cancelEndpointsWatch != nil {
x.cancelEndpointsWatch()
}
cancelEDSWatch := x.xdsClient.WatchEndpoints(x.edsServiceName, func(update xdsclient.EndpointsUpdate, err error) {
x.logger.Infof("Watch update from xds-client %p, content: %+v", x.xdsClient, update)
x.handleEDSUpdate(update, err)
})
x.logger.Infof("Watch started on resource name %v with xds-client %p", x.edsServiceName, x.xdsClient)
x.cancelEndpointsWatch = func() {
cancelEDSWatch()
x.logger.Infof("Watch cancelled on resource name %v with xds-client %p", x.edsServiceName, x.xdsClient)
}
}
func (x *edsBalancer) cancelWatch() {
x.loadReportServer = nil
if x.cancelLoadReport != nil {
x.cancelLoadReport()
}
x.edsServiceName = ""
if x.cancelEndpointsWatch != nil {
x.cancelEndpointsWatch()
}
}
// startLoadReport starts load reporting. If there's already a load reporting in
// progress, it cancels that.
//
// Caller can cal this when the loadReportServer name changes, but
// edsServiceName doesn't (so we only need to restart load reporting, not EDS
// watch).
func (x *edsBalancer) startLoadReport(loadReportServer *string) *load.Store {
x.loadReportServer = loadReportServer
if x.cancelLoadReport != nil {
x.cancelLoadReport()
}
if loadReportServer == nil {
return nil
}
ls, cancel := x.xdsClient.ReportLoad(*loadReportServer)
x.cancelLoadReport = cancel
return ls
}
func (x *edsBalancer) handleXDSClientUpdate(update *edsUpdate) {
if err := update.err; err != nil {
x.handleErrorFromUpdate(err, false)
@ -280,3 +372,16 @@ func (x *edsBalancer) Close() {
x.closed.Fire()
x.logger.Infof("Shutdown")
}
// equalStringPointers returns true if
// - a and b are both nil OR
// - *a == *b (and a and b are both non-nil)
func equalStringPointers(a, b *string) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
return *a == *b
}

View File

@ -62,9 +62,9 @@ type balancerGroupWithConfig struct {
// The localities are picked as weighted round robin. A configurable child
// policy is used to manage endpoints in each locality.
type edsBalancerImpl struct {
cc balancer.ClientConn
logger *grpclog.PrefixLogger
xdsClient *xdsClientWrapper // To fetch the load.Store from.
cc balancer.ClientConn
logger *grpclog.PrefixLogger
loadReporter load.PerClusterReporter
enqueueChildBalancerStateUpdate func(priorityType, balancer.State)
@ -99,12 +99,12 @@ type edsBalancerImpl struct {
}
// newEDSBalancerImpl create a new edsBalancerImpl.
func newEDSBalancerImpl(cc balancer.ClientConn, enqueueState func(priorityType, balancer.State), xdsClient *xdsClientWrapper, logger *grpclog.PrefixLogger) *edsBalancerImpl {
func newEDSBalancerImpl(cc balancer.ClientConn, enqueueState func(priorityType, balancer.State), lr load.PerClusterReporter, logger *grpclog.PrefixLogger) *edsBalancerImpl {
edsImpl := &edsBalancerImpl{
cc: cc,
logger: logger,
subBalancerBuilder: balancer.Get(roundrobin.Name),
xdsClient: xdsClient,
loadReporter: lr,
enqueueChildBalancerStateUpdate: enqueueState,
@ -170,7 +170,7 @@ func (edsImpl *edsBalancerImpl) updateDrops(dropConfig []xdsclient.OverloadDropC
// Update picker with old inner picker, new drops.
edsImpl.cc.UpdateState(balancer.State{
ConnectivityState: edsImpl.innerState.ConnectivityState,
Picker: newDropPicker(edsImpl.innerState.Picker, newDrops, edsImpl.xdsClient.loadStore())},
Picker: newDropPicker(edsImpl.innerState.Picker, newDrops, edsImpl.loadReporter)},
)
}
edsImpl.pickerMu.Unlock()
@ -240,7 +240,7 @@ func (edsImpl *edsBalancerImpl) handleEDSResponse(edsResp xdsclient.EndpointsUpd
ccPriorityWrapper := edsImpl.ccWrapperWithPriority(priority)
stateAggregator := weightedaggregator.New(ccPriorityWrapper, edsImpl.logger, newRandomWRR)
bgwc = &balancerGroupWithConfig{
bg: balancergroup.New(ccPriorityWrapper, stateAggregator, edsImpl.xdsClient.loadStore(), edsImpl.logger),
bg: balancergroup.New(ccPriorityWrapper, stateAggregator, edsImpl.loadReporter, edsImpl.logger),
stateAggregator: stateAggregator,
configs: make(map[internal.LocalityID]*localityConfig),
}
@ -403,7 +403,7 @@ func (edsImpl *edsBalancerImpl) updateState(priority priorityType, s balancer.St
defer edsImpl.pickerMu.Unlock()
edsImpl.innerState = s
// Don't reset drops when it's a state change.
edsImpl.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: newDropPicker(s.Picker, edsImpl.drops, edsImpl.xdsClient.loadStore())})
edsImpl.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: newDropPicker(s.Picker, edsImpl.drops, edsImpl.loadReporter)})
}
}

View File

@ -18,6 +18,7 @@
package edsbalancer
import (
"context"
"testing"
"time"
@ -74,7 +75,7 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
t.Fatalf("got unexpected new SubConn")
case <-cc.RemoveSubConnCh:
t.Fatalf("got unexpected remove SubConn")
case <-time.After(time.Millisecond * 100):
case <-time.After(defaultTestShortTimeout):
}
// Remove p2, no updates.
@ -90,7 +91,7 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
t.Fatalf("got unexpected new SubConn")
case <-cc.RemoveSubConnCh:
t.Fatalf("got unexpected remove SubConn")
case <-time.After(time.Millisecond * 100):
case <-time.After(defaultTestShortTimeout):
}
}
@ -159,7 +160,7 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
t.Fatalf("got unexpected new SubConn")
case <-cc.RemoveSubConnCh:
t.Fatalf("got unexpected remove SubConn")
case <-time.After(time.Millisecond * 100):
case <-time.After(defaultTestShortTimeout):
}
// Turn down 1, use 2
@ -269,8 +270,6 @@ func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) {
//
// Init 0,1,2; 0 and 1 down, use 2; 0 up, close 1 and 2.
func (s) TestEDSPriority_HigherReadyCloseAllLower(t *testing.T) {
defer time.Sleep(10 * time.Millisecond)
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
@ -627,7 +626,7 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) {
t.Fatalf("got unexpected new SubConn")
case <-cc.RemoveSubConnCh:
t.Fatalf("got unexpected remove SubConn")
case <-time.After(time.Millisecond * 100):
case <-time.After(defaultTestShortTimeout):
}
}
@ -836,6 +835,9 @@ func (s) TestEDSPriority_FirstPriorityUnavailable(t *testing.T) {
clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab2.Build()))
// Wait after double the init timer timeout, to ensure it doesn't fail.
time.Sleep(testPriorityInitTimeout * 2)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := cc.WaitForErrPicker(ctx); err != nil {
t.Fatal(err)
}
}

View File

@ -690,12 +690,9 @@ func (s) TestEDS_LoadReport(t *testing.T) {
lsWrapper := &loadStoreWrapper{}
lsWrapper.updateServiceName(testClusterNames[0])
lsWrapper.updateLoadStore(loadStore)
cw := &xdsClientWrapper{
loadWrapper: lsWrapper,
}
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, cw, nil)
edsb := newEDSBalancerImpl(cc, nil, lsWrapper, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
backendToBalancerID := make(map[balancer.SubConn]internal.LocalityID)

View File

@ -38,13 +38,35 @@ import (
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
_ "google.golang.org/grpc/xds/internal/client/v2" // V2 client registration.
)
const defaultTestTimeout = 1 * time.Second
const (
defaultTestTimeout = 1 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond
testServiceName = "test/foo"
testEDSClusterName = "test/service/eds"
)
var (
// A non-empty endpoints update which is expected to be accepted by the EDS
// LB policy.
defaultEndpointsUpdate = xdsclient.EndpointsUpdate{
Localities: []xdsclient.Locality{
{
Endpoints: []xdsclient.Endpoint{{Address: "endpoint1"}},
ID: internal.LocalityID{Zone: "zone"},
Priority: 1,
Weight: 100,
},
},
}
)
func init() {
balancer.Register(&edsBalancerBuilder{})
@ -77,7 +99,7 @@ type noopTestClientConn struct {
balancer.ClientConn
}
func (t *noopTestClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
func (t *noopTestClientConn) NewSubConn([]resolver.Address, balancer.NewSubConnOptions) (balancer.SubConn, error) {
return nil, nil
}
@ -111,13 +133,10 @@ func (f *fakeEDSBalancer) updateState(priority priorityType, s balancer.State) {
func (f *fakeEDSBalancer) close() {}
func (f *fakeEDSBalancer) waitForChildPolicy(wantPolicy *loadBalancingConfig) error {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
func (f *fakeEDSBalancer) waitForChildPolicy(ctx context.Context, wantPolicy *loadBalancingConfig) error {
val, err := f.childPolicy.Receive(ctx)
if err != nil {
return fmt.Errorf("error waiting for childPolicy: %v", err)
return err
}
gotPolicy := val.(*loadBalancingConfig)
if !cmp.Equal(gotPolicy, wantPolicy) {
@ -126,13 +145,10 @@ func (f *fakeEDSBalancer) waitForChildPolicy(wantPolicy *loadBalancingConfig) er
return nil
}
func (f *fakeEDSBalancer) waitForSubConnStateChange(wantState *scStateChange) error {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
func (f *fakeEDSBalancer) waitForSubConnStateChange(ctx context.Context, wantState *scStateChange) error {
val, err := f.subconnStateChange.Receive(ctx)
if err != nil {
return fmt.Errorf("error waiting for subconnStateChange: %v", err)
return err
}
gotState := val.(*scStateChange)
if !cmp.Equal(gotState, wantState, cmp.AllowUnexported(scStateChange{})) {
@ -141,13 +157,10 @@ func (f *fakeEDSBalancer) waitForSubConnStateChange(wantState *scStateChange) er
return nil
}
func (f *fakeEDSBalancer) waitForEDSResponse(wantUpdate xdsclient.EndpointsUpdate) error {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
func (f *fakeEDSBalancer) waitForEDSResponse(ctx context.Context, wantUpdate xdsclient.EndpointsUpdate) error {
val, err := f.edsUpdate.Receive(ctx)
if err != nil {
return fmt.Errorf("error waiting for edsUpdate: %v", err)
return err
}
gotUpdate := val.(xdsclient.EndpointsUpdate)
if !reflect.DeepEqual(gotUpdate, wantUpdate) {
@ -172,17 +185,12 @@ func (*fakeSubConn) Connect() { panic("implement me")
// waitForNewEDSLB makes sure that a new edsLB is created by the top-level
// edsBalancer.
func waitForNewEDSLB(t *testing.T, ch *testutils.Channel) *fakeEDSBalancer {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
func waitForNewEDSLB(ctx context.Context, ch *testutils.Channel) (*fakeEDSBalancer, error) {
val, err := ch.Receive(ctx)
if err != nil {
t.Fatalf("error when waiting for a new edsLB: %v", err)
return nil
return nil, fmt.Errorf("error when waiting for a new edsLB: %v", err)
}
return val.(*fakeEDSBalancer)
return val.(*fakeEDSBalancer), nil
}
// setup overrides the functions which are used to create the xdsClient and the
@ -195,7 +203,7 @@ func setup(edsLBCh *testutils.Channel) (*fakeclient.Client, func()) {
newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil }
origNewEDSBalancer := newEDSBalancer
newEDSBalancer = func(cc balancer.ClientConn, enqueue func(priorityType, balancer.State), _ *xdsClientWrapper, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
newEDSBalancer = func(cc balancer.ClientConn, enqueue func(priorityType, balancer.State), _ load.PerClusterReporter, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
edsLB := newFakeEDSBalancer(cc)
defer func() { edsLBCh.Send(edsLB) }()
return edsLB
@ -214,7 +222,6 @@ const (
// Install two fake balancers for service config update tests.
//
// ParseConfig only accepts the json if the balancer specified is registered.
func init() {
balancer.Register(&fakeBalancerBuilder{name: fakeBalancerA})
balancer.Register(&fakeBalancerBuilder{name: fakeBalancerB})
@ -250,102 +257,115 @@ func (b *fakeBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnStat
func (b *fakeBalancer) Close() {}
// TestXDSConnfigChildPolicyUpdate verifies scenarios where the childPolicy
// TestConfigChildPolicyUpdate verifies scenarios where the childPolicy
// section of the lbConfig is updated.
//
// The test does the following:
// * Builds a new xds balancer.
// * Builds a new EDS balancer.
// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerA.
// Verifies that a new xdsClient is created. It then pushes a new edsUpdate
// Verifies that an EDS watch is registered. It then pushes a new edsUpdate
// through the fakexds client. Verifies that a new edsLB is created and it
// receives the expected childPolicy.
// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerB.
// This time around, we expect no new xdsClient or edsLB to be created.
// Instead, we expect the existing edsLB to receive the new child policy.
func (s) TestXDSConnfigChildPolicyUpdate(t *testing.T) {
// Verifies that the existing edsLB receives the new child policy.
func (s) TestConfigChildPolicyUpdate(t *testing.T) {
edsLBCh := testutils.NewChannel()
xdsC, cancel := setup(edsLBCh)
defer cancel()
xdsC, cleanup := setup(edsLBCh)
defer cleanup()
builder := balancer.Get(edsName)
cc := newNoopTestClientConn()
edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer)
if !ok {
t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
}
defer edsB.Close()
edsB.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &EDSConfig{
ChildPolicy: &loadBalancingConfig{
Name: fakeBalancerA,
Config: json.RawMessage("{}"),
},
EDSServiceName: testEDSClusterName,
},
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
edsLB, err := waitForNewEDSLB(ctx, edsLBCh)
if err != nil {
t.Fatal(err)
}
lbCfgA := &loadBalancingConfig{
Name: fakeBalancerA,
Config: json.RawMessage("{}"),
}
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &EDSConfig{
ChildPolicy: lbCfgA,
EDSServiceName: testServiceName,
},
}); err != nil {
t.Fatalf("edsB.UpdateClientConnState() failed: %v", err)
}
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
edsLB := waitForNewEDSLB(t, edsLBCh)
edsLB.waitForChildPolicy(&loadBalancingConfig{
Name: string(fakeBalancerA),
Config: json.RawMessage(`{}`),
})
xdsC.InvokeWatchEDSCallback(defaultEndpointsUpdate, nil)
if err := edsLB.waitForChildPolicy(ctx, lbCfgA); err != nil {
t.Fatal(err)
}
edsB.UpdateClientConnState(balancer.ClientConnState{
lbCfgB := &loadBalancingConfig{
Name: fakeBalancerB,
Config: json.RawMessage("{}"),
}
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &EDSConfig{
ChildPolicy: &loadBalancingConfig{
Name: fakeBalancerB,
Config: json.RawMessage("{}"),
},
EDSServiceName: testEDSClusterName,
ChildPolicy: lbCfgB,
EDSServiceName: testServiceName,
},
})
edsLB.waitForChildPolicy(&loadBalancingConfig{
Name: string(fakeBalancerA),
Config: json.RawMessage(`{}`),
})
}); err != nil {
t.Fatalf("edsB.UpdateClientConnState() failed: %v", err)
}
if err := edsLB.waitForChildPolicy(ctx, lbCfgB); err != nil {
t.Fatal(err)
}
}
// TestXDSSubConnStateChange verifies if the top-level edsBalancer passes on
// the subConnStateChange to appropriate child balancers.
func (s) TestXDSSubConnStateChange(t *testing.T) {
// TestSubConnStateChange verifies if the top-level edsBalancer passes on
// the subConnStateChange to appropriate child balancer.
func (s) TestSubConnStateChange(t *testing.T) {
edsLBCh := testutils.NewChannel()
xdsC, cancel := setup(edsLBCh)
defer cancel()
xdsC, cleanup := setup(edsLBCh)
defer cleanup()
builder := balancer.Get(edsName)
cc := newNoopTestClientConn()
edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
if !ok {
t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
}
defer edsB.Close()
edsB.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &EDSConfig{EDSServiceName: testEDSClusterName},
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
edsLB, err := waitForNewEDSLB(ctx, edsLBCh)
if err != nil {
t.Fatal(err)
}
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &EDSConfig{EDSServiceName: testServiceName},
}); err != nil {
t.Fatalf("edsB.UpdateClientConnState() failed: %v", err)
}
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
edsLB := waitForNewEDSLB(t, edsLBCh)
xdsC.InvokeWatchEDSCallback(defaultEndpointsUpdate, nil)
fsc := &fakeSubConn{}
state := connectivity.Ready
edsB.UpdateSubConnState(fsc, balancer.SubConnState{ConnectivityState: state})
edsLB.waitForSubConnStateChange(&scStateChange{sc: fsc, state: state})
if err := edsLB.waitForSubConnStateChange(ctx, &scStateChange{sc: fsc, state: state}); err != nil {
t.Fatal(err)
}
}
// TestErrorFromXDSClientUpdate verifies that errros from xdsClient update are
// TestErrorFromXDSClientUpdate verifies that an error from xdsClient update is
// handled correctly.
//
// If it's resource-not-found, watch will NOT be canceled, the EDS impl will
@ -355,42 +375,50 @@ func (s) TestXDSSubConnStateChange(t *testing.T) {
// handle fallback.
func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
edsLBCh := testutils.NewChannel()
xdsC, cancel := setup(edsLBCh)
defer cancel()
xdsC, cleanup := setup(edsLBCh)
defer cleanup()
builder := balancer.Get(edsName)
cc := newNoopTestClientConn()
edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
if !ok {
t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
}
defer edsB.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
edsLB, err := waitForNewEDSLB(ctx, edsLBCh)
if err != nil {
t.Fatal(err)
}
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &EDSConfig{EDSServiceName: testEDSClusterName},
BalancerConfig: &EDSConfig{EDSServiceName: testServiceName},
}); err != nil {
t.Fatal(err)
}
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
edsLB := waitForNewEDSLB(t, edsLBCh)
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err != nil {
if err := edsLB.waitForEDSResponse(ctx, xdsclient.EndpointsUpdate{}); err != nil {
t.Fatalf("EDS impl got unexpected EDS response: %v", err)
}
connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error")
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, connectionErr)
if err := xdsC.WaitForCancelEDSWatch(ctx); err == nil {
sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err == nil {
t.Fatal("eds impl got EDS resp, want timeout error")
sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if err := edsLB.waitForEDSResponse(sCtx, xdsclient.EndpointsUpdate{}); err != context.DeadlineExceeded {
t.Fatal(err)
}
resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "edsBalancer resource not found error")
@ -398,12 +426,12 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
// Even if error is resource not found, watch shouldn't be canceled, because
// this is an EDS resource removed (and xds client actually never sends this
// error, but we still handles it).
ctx, ctxCancel = context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := xdsC.WaitForCancelEDSWatch(ctx); err == nil {
sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err != nil {
if err := edsLB.waitForEDSResponse(ctx, xdsclient.EndpointsUpdate{}); err != nil {
t.Fatalf("eds impl expecting empty update, got %v", err)
}
}
@ -417,57 +445,128 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
// handle fallback.
func (s) TestErrorFromResolver(t *testing.T) {
edsLBCh := testutils.NewChannel()
xdsC, cancel := setup(edsLBCh)
defer cancel()
xdsC, cleanup := setup(edsLBCh)
defer cleanup()
builder := balancer.Get(edsName)
cc := newNoopTestClientConn()
edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
if !ok {
t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
}
defer edsB.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
edsLB, err := waitForNewEDSLB(ctx, edsLBCh)
if err != nil {
t.Fatal(err)
}
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &EDSConfig{EDSServiceName: testEDSClusterName},
BalancerConfig: &EDSConfig{EDSServiceName: testServiceName},
}); err != nil {
t.Fatal(err)
}
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
edsLB := waitForNewEDSLB(t, edsLBCh)
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err != nil {
if err := edsLB.waitForEDSResponse(ctx, xdsclient.EndpointsUpdate{}); err != nil {
t.Fatalf("EDS impl got unexpected EDS response: %v", err)
}
connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error")
edsB.ResolverError(connectionErr)
if err := xdsC.WaitForCancelEDSWatch(ctx); err == nil {
sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err == nil {
sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if err := edsLB.waitForEDSResponse(sCtx, xdsclient.EndpointsUpdate{}); err != context.DeadlineExceeded {
t.Fatal("eds impl got EDS resp, want timeout error")
}
resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "edsBalancer resource not found error")
edsB.ResolverError(resourceErr)
ctx, ctxCancel = context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := xdsC.WaitForCancelEDSWatch(ctx); err != nil {
t.Fatalf("want watch to be canceled, waitForCancel failed: %v", err)
}
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err != nil {
if err := edsLB.waitForEDSResponse(ctx, xdsclient.EndpointsUpdate{}); err != nil {
t.Fatalf("EDS impl got unexpected EDS response: %v", err)
}
}
func (s) TestXDSBalancerConfigParsing(t *testing.T) {
// Given a list of resource names, verifies that EDS requests for the same are
// sent by the EDS balancer, through the fake xDS client.
func verifyExpectedRequests(ctx context.Context, fc *fakeclient.Client, resourceNames ...string) error {
for _, name := range resourceNames {
if name == "" {
// ResourceName empty string indicates a cancel.
if err := fc.WaitForCancelEDSWatch(ctx); err != nil {
return fmt.Errorf("timed out when expecting resource %q", name)
}
return nil
}
resName, err := fc.WaitForWatchEDS(ctx)
if err != nil {
return fmt.Errorf("timed out when expecting resource %q, %p", name, fc)
}
if resName != name {
return fmt.Errorf("got EDS request for resource %q, expected: %q", resName, name)
}
}
return nil
}
// TestClientWatchEDS verifies that the xdsClient inside the top-level EDS LB
// policy registers an EDS watch for expected resource upon receiving an update
// from gRPC.
func (s) TestClientWatchEDS(t *testing.T) {
edsLBCh := testutils.NewChannel()
xdsC, cleanup := setup(edsLBCh)
defer cleanup()
builder := balancer.Get(edsName)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
}
defer edsB.Close()
// Update with an non-empty edsServiceName should trigger an EDS watch for
// the same.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &EDSConfig{EDSServiceName: "foobar-1"},
}); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := verifyExpectedRequests(ctx, xdsC, "foobar-1"); err != nil {
t.Fatal(err)
}
// Also test the case where the edsServerName changes from one non-empty
// name to another, and make sure a new watch is registered. The previously
// registered watch will be cancelled, which will result in an EDS request
// with no resource names being sent to the server.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &EDSConfig{EDSServiceName: "foobar-2"},
}); err != nil {
t.Fatal(err)
}
if err := verifyExpectedRequests(ctx, xdsC, "", "foobar-2"); err != nil {
t.Fatal(err)
}
}
func (s) TestBalancerConfigParsing(t *testing.T) {
const testEDSName = "eds.service"
var testLRSName = "lrs.server"
b := bytes.NewBuffer(nil)
@ -496,6 +595,16 @@ func (s) TestXDSBalancerConfigParsing(t *testing.T) {
want serviceconfig.LoadBalancingConfig
wantErr bool
}{
{
name: "bad json",
js: json.RawMessage(`i am not JSON`),
wantErr: true,
},
{
name: "empty",
js: json.RawMessage(`{}`),
want: &EDSConfig{},
},
{
name: "jsonpb-generated",
js: b.Bytes(),
@ -511,7 +620,6 @@ func (s) TestXDSBalancerConfigParsing(t *testing.T) {
EDSServiceName: testEDSName,
LrsLoadReportingServerName: &testLRSName,
},
wantErr: false,
},
{
// json with random balancers, and the first is not registered.
@ -543,7 +651,6 @@ func (s) TestXDSBalancerConfigParsing(t *testing.T) {
EDSServiceName: testEDSName,
LrsLoadReportingServerName: &testLRSName,
},
wantErr: false,
},
{
// json with no lrs server name, LrsLoadReportingServerName should
@ -557,37 +664,10 @@ func (s) TestXDSBalancerConfigParsing(t *testing.T) {
EDSServiceName: testEDSName,
LrsLoadReportingServerName: nil,
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := &edsBalancerBuilder{}
got, err := b.ParseConfig(tt.js)
if (err != nil) != tt.wantErr {
t.Errorf("edsBalancerBuilder.ParseConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !cmp.Equal(got, tt.want) {
t.Errorf(cmp.Diff(got, tt.want))
}
})
}
}
func (s) TestLoadbalancingConfigParsing(t *testing.T) {
tests := []struct {
name string
s string
want *EDSConfig
}{
{
name: "empty",
s: "{}",
want: &EDSConfig{},
},
{
name: "success1",
s: `{"childPolicy":[{"pick_first":{}}]}`,
name: "good child policy",
js: json.RawMessage(`{"childPolicy":[{"pick_first":{}}]}`),
want: &EDSConfig{
ChildPolicy: &loadBalancingConfig{
Name: "pick_first",
@ -596,8 +676,8 @@ func (s) TestLoadbalancingConfigParsing(t *testing.T) {
},
},
{
name: "success2",
s: `{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`,
name: "multiple good child policies",
js: json.RawMessage(`{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`),
want: &EDSConfig{
ChildPolicy: &loadBalancingConfig{
Name: "round_robin",
@ -608,9 +688,16 @@ func (s) TestLoadbalancingConfigParsing(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var cfg EDSConfig
if err := json.Unmarshal([]byte(tt.s), &cfg); err != nil || !cmp.Equal(&cfg, tt.want) {
t.Errorf("test name: %s, parseFullServiceConfig() = %+v, err: %v, want %+v, <nil>", tt.name, cfg, err, tt.want)
b := &edsBalancerBuilder{}
got, err := b.ParseConfig(tt.js)
if (err != nil) != tt.wantErr {
t.Fatalf("edsBalancerBuilder.ParseConfig() error = %v, wantErr %v", err, tt.wantErr)
}
if tt.wantErr {
return
}
if !cmp.Equal(got, tt.want) {
t.Errorf(cmp.Diff(got, tt.want))
}
})
}

View File

@ -0,0 +1,80 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package edsbalancer
import (
"sync"
"google.golang.org/grpc/xds/internal/client/load"
)
type loadStoreWrapper struct {
mu sync.RWMutex
service string
// Both store and perCluster will be nil if load reporting is disabled (EDS
// response doesn't have LRS server name). Note that methods on Store and
// perCluster all handle nil, so there's no need to check nil before calling
// them.
store *load.Store
perCluster load.PerClusterReporter
}
func (lsw *loadStoreWrapper) updateServiceName(service string) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
if lsw.service == service {
return
}
lsw.service = service
lsw.perCluster = lsw.store.PerCluster(lsw.service, "")
}
func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
if store == lsw.store {
return
}
lsw.store = store
lsw.perCluster = lsw.store.PerCluster(lsw.service, "")
}
func (lsw *loadStoreWrapper) CallStarted(locality string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
lsw.perCluster.CallStarted(locality)
}
func (lsw *loadStoreWrapper) CallFinished(locality string, err error) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
lsw.perCluster.CallFinished(locality, err)
}
func (lsw *loadStoreWrapper) CallServerLoad(locality, name string, val float64) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
lsw.perCluster.CallServerLoad(locality, name, val)
}
func (lsw *loadStoreWrapper) CallDropped(category string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
lsw.perCluster.CallDropped(category)
}

View File

@ -1,238 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package edsbalancer
import (
"sync"
"google.golang.org/grpc/internal/grpclog"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/load"
)
// xdsClientInterface contains only the xds_client methods needed by EDS
// balancer. It's defined so we can override xdsclientNew function in tests.
type xdsClientInterface interface {
WatchEndpoints(clusterName string, edsCb func(xdsclient.EndpointsUpdate, error)) (cancel func())
ReportLoad(server string) (loadStore *load.Store, cancel func())
Close()
}
type loadStoreWrapper struct {
mu sync.RWMutex
service string
// Both store and perCluster will be nil if load reporting is disabled (EDS
// response doesn't have LRS server name). Note that methods on Store and
// perCluster all handle nil, so there's no need to check nil before calling
// them.
store *load.Store
perCluster load.PerClusterReporter
}
func (lsw *loadStoreWrapper) updateServiceName(service string) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
if lsw.service == service {
return
}
lsw.service = service
lsw.perCluster = lsw.store.PerCluster(lsw.service, "")
}
func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
if store == lsw.store {
return
}
lsw.store = store
lsw.perCluster = nil
lsw.perCluster = lsw.store.PerCluster(lsw.service, "")
}
func (lsw *loadStoreWrapper) CallStarted(locality string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
lsw.perCluster.CallStarted(locality)
}
func (lsw *loadStoreWrapper) CallFinished(locality string, err error) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
lsw.perCluster.CallFinished(locality, err)
}
func (lsw *loadStoreWrapper) CallServerLoad(locality, name string, val float64) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
lsw.perCluster.CallServerLoad(locality, name, val)
}
func (lsw *loadStoreWrapper) CallDropped(category string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
lsw.perCluster.CallDropped(category)
}
// xdsclientWrapper is responsible for getting the xds client from attributes or
// creating a new xds client, and start watching EDS. The given callbacks will
// be called with EDS updates or errors.
type xdsClientWrapper struct {
logger *grpclog.PrefixLogger
newEDSUpdate func(xdsclient.EndpointsUpdate, error)
// xdsClient could come from attributes, or created with balancerName.
xdsClient xdsClientInterface
// loadWrapper is a wrapper with loadOriginal, with clusterName and
// edsServiceName. It's used children to report loads.
loadWrapper *loadStoreWrapper
// edsServiceName is the edsServiceName currently being watched, not
// necessary the edsServiceName from service config.
//
// If edsServiceName from service config is an empty, this will be user's
// dial target (because that's what we use to watch EDS).
//
// TODO: remove the empty string related behavior, when we switch to always
// do CDS.
edsServiceName string
cancelEndpointsWatch func()
loadReportServer *string // LRS is disabled if loadReporterServer is nil.
cancelLoadReport func()
}
// newXDSClientWrapper creates an empty xds_client wrapper that does nothing. It
// can accept xds_client configs, to new/switch xds_client to use.
//
// The given callbacks won't be called until the underlying xds_client is
// working and sends updates.
func newXDSClientWrapper(xdsClient xdsClientInterface, newEDSUpdate func(xdsclient.EndpointsUpdate, error), logger *grpclog.PrefixLogger) *xdsClientWrapper {
return &xdsClientWrapper{
logger: logger,
newEDSUpdate: newEDSUpdate,
xdsClient: xdsClient,
loadWrapper: &loadStoreWrapper{},
}
}
// startEndpointsWatch starts the EDS watch. Caller can call this when the
// xds_client is updated, or the edsServiceName is updated.
//
// Note that if there's already a watch in progress, it's not explicitly
// canceled. Because for each xds_client, there should be only one EDS watch in
// progress. So a new EDS watch implicitly cancels the previous one.
//
// This usually means load report needs to be restarted, but this function does
// NOT do that. Caller needs to call startLoadReport separately.
func (c *xdsClientWrapper) startEndpointsWatch() {
if c.cancelEndpointsWatch != nil {
c.cancelEndpointsWatch()
}
cancelEDSWatch := c.xdsClient.WatchEndpoints(c.edsServiceName, func(update xdsclient.EndpointsUpdate, err error) {
c.logger.Infof("Watch update from xds-client %p, content: %+v", c.xdsClient, update)
c.newEDSUpdate(update, err)
})
c.logger.Infof("Watch started on resource name %v with xds-client %p", c.edsServiceName, c.xdsClient)
c.cancelEndpointsWatch = func() {
cancelEDSWatch()
c.logger.Infof("Watch cancelled on resource name %v with xds-client %p", c.edsServiceName, c.xdsClient)
}
}
// startLoadReport starts load reporting. If there's already a load reporting in
// progress, it cancels that.
//
// Caller can cal this when the loadReportServer name changes, but
// edsServiceName doesn't (so we only need to restart load reporting, not EDS
// watch).
func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) *load.Store {
if c.cancelLoadReport != nil {
c.cancelLoadReport()
}
c.loadReportServer = loadReportServer
var loadStore *load.Store
if c.loadReportServer != nil {
loadStore, c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer)
}
return loadStore
}
func (c *xdsClientWrapper) loadStore() load.PerClusterReporter {
if c == nil {
return nil
}
return c.loadWrapper
}
// handleUpdate applies the service config and attributes updates to the client,
// including updating the xds_client to use, and updating the EDS name to watch.
func (c *xdsClientWrapper) handleUpdate(config *EDSConfig) error {
// Need to restart EDS watch when the edsServiceName changed
if c.edsServiceName != config.EDSServiceName {
c.edsServiceName = config.EDSServiceName
c.startEndpointsWatch()
// TODO: this update for the LRS service name is too early. It should
// only apply to the new EDS response. But this is applied to the RPCs
// before the new EDS response. To fully fix this, the EDS balancer
// needs to do a graceful switch to another EDS implementation.
//
// This is OK for now, because we don't actually expect edsServiceName
// to change. Fix this (a bigger change) will happen later.
c.loadWrapper.updateServiceName(c.edsServiceName)
}
// Only need to restart load reporting when:
// - the loadReportServer name changed
if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) {
loadStore := c.startLoadReport(config.LrsLoadReportingServerName)
c.loadWrapper.updateLoadStore(loadStore)
}
return nil
}
func (c *xdsClientWrapper) cancelWatch() {
c.loadReportServer = nil
if c.cancelLoadReport != nil {
c.cancelLoadReport()
}
c.edsServiceName = ""
if c.cancelEndpointsWatch != nil {
c.cancelEndpointsWatch()
}
}
func (c *xdsClientWrapper) close() {
c.cancelWatch()
c.xdsClient.Close()
}
// equalStringPointers returns true if
// - a and b are both nil OR
// - *a == *b (and a and b are both non-nil)
func equalStringPointers(a, b *string) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
return *a == *b
}

View File

@ -1,145 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package edsbalancer
import (
"context"
"errors"
"fmt"
"testing"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/internal/testutils"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
)
var (
testServiceName = "test/foo"
testEDSClusterName = "test/service/eds"
)
// Given a list of resource names, verifies that EDS requests for the same are
// received at the fake server.
func verifyExpectedRequests(fc *fakeclient.Client, resourceNames ...string) error {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
for _, name := range resourceNames {
if name == "" {
// ResourceName empty string indicates a cancel.
if err := fc.WaitForCancelEDSWatch(ctx); err != nil {
return fmt.Errorf("timed out when expecting resource %q", name)
}
return nil
}
resName, err := fc.WaitForWatchEDS(ctx)
if err != nil {
return fmt.Errorf("timed out when expecting resource %q, %p", name, fc)
}
if resName != name {
return fmt.Errorf("got EDS request for resource %q, expected: %q", resName, name)
}
}
return nil
}
// TestClientWrapperWatchEDS verifies that the clientWrapper registers an
// EDS watch for expected resource upon receiving an update from the top-level
// edsBalancer.
//
// The test does the following:
// * Starts a fake xDS server.
// * Creates a clientWrapper.
// * Sends updates with different edsServiceNames and expects new watches to be
// registered.
func (s) TestClientWrapperWatchEDS(t *testing.T) {
xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar)
cw := newXDSClientWrapper(xdsC, nil, nil)
defer cw.close()
t.Logf("Started xDS client wrapper for endpoint %s...", testServiceName)
// Update with an non-empty edsServiceName should trigger an EDS watch for
// the same.
cw.handleUpdate(&EDSConfig{EDSServiceName: "foobar-1"})
if err := verifyExpectedRequests(xdsC, "foobar-1"); err != nil {
t.Fatal(err)
}
// Also test the case where the edsServerName changes from one non-empty
// name to another, and make sure a new watch is registered. The previously
// registered watch will be cancelled, which will result in an EDS request
// with no resource names being sent to the server.
cw.handleUpdate(&EDSConfig{EDSServiceName: "foobar-2"})
if err := verifyExpectedRequests(xdsC, "", "foobar-2"); err != nil {
t.Fatal(err)
}
}
// TestClientWrapperHandleUpdateError verifies that the clientWrapper handles
// errors from the edsWatch callback appropriately.
//
// The test does the following:
// * Creates a clientWrapper.
// * Creates a fakeclient.Client and passes it to the clientWrapper in attributes.
// * Verifies the clientWrapper registers an EDS watch.
// * Forces the fakeclient.Client to invoke the registered EDS watch callback with
// an error. Verifies that the wrapper does not invoke the top-level
// edsBalancer with the received error.
func (s) TestClientWrapperHandleUpdateError(t *testing.T) {
edsRespChan := testutils.NewChannel()
newEDS := func(update xdsclient.EndpointsUpdate, err error) {
edsRespChan.Send(&edsUpdate{resp: update, err: err})
}
xdsC := fakeclient.NewClient()
cw := newXDSClientWrapper(xdsC, newEDS, nil)
defer cw.close()
cw.handleUpdate(&EDSConfig{EDSServiceName: testEDSClusterName})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
gotCluster, err := xdsC.WaitForWatchEDS(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
if gotCluster != testEDSClusterName {
t.Fatalf("xdsClient.WatchEndpoints() called with cluster: %v, want %v", gotCluster, testEDSClusterName)
}
watchErr := errors.New("EDS watch callback error")
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, watchErr)
// The callback is called with an error, expect no update from edsRespChan.
//
// TODO: check for loseContact() when errors indicating "lose contact" are
// handled correctly.
ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
gotUpdate, err := edsRespChan.Receive(ctx)
if err != nil {
t.Fatalf("edsBalancer failed to get edsUpdate %v", err)
}
update := gotUpdate.(*edsUpdate)
if !cmp.Equal(update.resp, (xdsclient.EndpointsUpdate{})) || update.err != watchErr {
t.Fatalf("want update {nil, %v}, got %+v", watchErr, update)
}
}

View File

@ -36,10 +36,9 @@ func (s) TestXDSLoadReporting(t *testing.T) {
defer func() { newXDSClient = oldNewXDSClient }()
builder := balancer.Get(edsName)
cc := newNoopTestClientConn()
edsB, ok := builder.Build(cc, balancer.BuildOptions{}).(*edsBalancer)
if !ok {
t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
}
defer edsB.Close()

View File

@ -123,12 +123,6 @@ func (tcc *TestClientConn) RemoveSubConn(sc balancer.SubConn) {
}
}
// UpdateBalancerState implements balancer.Balancer API. It will be removed when
// switching to the new balancer interface.
func (tcc *TestClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
panic("not implemented")
}
// UpdateState updates connectivity state and picker.
func (tcc *TestClientConn) UpdateState(bs balancer.State) {
tcc.logger.Logf("testClientConn: UpdateState(%v)", bs)