xds: add support for aggregate clusters (#4332)

Add support for aggregate clusters in CDS Balancer
This commit is contained in:
Zach Reyes 2021-05-12 17:28:49 -04:00 committed by GitHub
parent 8bf65c69b9
commit 45e60095da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 992 additions and 20 deletions

View File

@ -151,6 +151,11 @@ type ccUpdate struct {
err error
}
type clusterHandlerUpdate struct {
chu []xdsclient.ClusterUpdate
err error
}
// scUpdate wraps a subConn update received from gRPC. This is directly passed
// on to the edsBalancer.
type scUpdate struct {

View File

@ -640,7 +640,7 @@ func (s) TestSecurityConfigUpdate_GoodToBad(t *testing.T) {
// registered watch should not be cancelled.
sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded {
if _, err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded {
t.Fatal("cluster watch cancelled for a non-resource-not-found-error")
}
}

View File

@ -399,7 +399,7 @@ func (s) TestHandleClusterUpdateError(t *testing.T) {
// registered watch should not be cancelled.
sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded {
if _, err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded {
t.Fatal("cluster watch cancelled for a non-resource-not-found-error")
}
// The CDS balancer has not yet created an EDS balancer. So, this resolver
@ -438,7 +438,7 @@ func (s) TestHandleClusterUpdateError(t *testing.T) {
// Make sure the registered watch is not cancelled.
sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded {
if _, err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded {
t.Fatal("cluster watch cancelled for a non-resource-not-found-error")
}
// Make sure the error is forwarded to the EDS balancer.
@ -453,7 +453,7 @@ func (s) TestHandleClusterUpdateError(t *testing.T) {
// request cluster resource is not found. We should continue to watch it.
sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded {
if _, err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded {
t.Fatal("cluster watch cancelled for a resource-not-found-error")
}
// Make sure the error is forwarded to the EDS balancer.
@ -485,7 +485,7 @@ func (s) TestResolverError(t *testing.T) {
// registered watch should not be cancelled.
sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded {
if _, err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded {
t.Fatal("cluster watch cancelled for a non-resource-not-found-error")
}
// The CDS balancer has not yet created an EDS balancer. So, this resolver
@ -523,7 +523,7 @@ func (s) TestResolverError(t *testing.T) {
// Make sure the registered watch is not cancelled.
sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded {
if _, err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded {
t.Fatal("cluster watch cancelled for a non-resource-not-found-error")
}
// Make sure the error is forwarded to the EDS balancer.
@ -535,7 +535,7 @@ func (s) TestResolverError(t *testing.T) {
resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "cdsBalancer resource not found error")
cdsB.ResolverError(resourceErr)
// Make sure the registered watch is cancelled.
if err := xdsC.WaitForCancelClusterWatch(ctx); err != nil {
if _, err := xdsC.WaitForCancelClusterWatch(ctx); err != nil {
t.Fatalf("want watch to be canceled, watchForCancel failed: %v", err)
}
// Make sure the error is forwarded to the EDS balancer.
@ -642,7 +642,7 @@ func (s) TestClose(t *testing.T) {
// Make sure that the cluster watch registered by the CDS balancer is
// cancelled.
if err := xdsC.WaitForCancelClusterWatch(ctx); err != nil {
if _, err := xdsC.WaitForCancelClusterWatch(ctx); err != nil {
t.Fatal(err)
}

View File

@ -0,0 +1,273 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cdsbalancer
import (
"errors"
"sync"
xdsclient "google.golang.org/grpc/xds/internal/client"
)
var errNotReceivedUpdate = errors.New("tried to construct a cluster update on a cluster that has not received an update")
// clusterHandler will be given a name representing a cluster. It will then
// update the CDS policy constantly with a list of Clusters to pass down to
// XdsClusterResolverLoadBalancingPolicyConfig in a stream like fashion.
type clusterHandler struct {
// A mutex to protect entire tree of clusters.
clusterMutex sync.Mutex
root *clusterNode
rootClusterName string
// A way to ping CDS Balancer about any updates or errors to a Node in the
// tree. This will either get called from this handler constructing an
// update or from a child with an error. Capacity of one as the only update
// CDS Balancer cares about is the most recent update.
updateChannel chan clusterHandlerUpdate
xdsClient xdsClientInterface
}
func (ch *clusterHandler) updateRootCluster(rootClusterName string) {
ch.clusterMutex.Lock()
defer ch.clusterMutex.Unlock()
if ch.root == nil {
// Construct a root node on first update.
ch.root = createClusterNode(rootClusterName, ch.xdsClient, ch)
ch.rootClusterName = rootClusterName
return
}
// Check if root cluster was changed. If it was, delete old one and start
// new one, if not do nothing.
if rootClusterName != ch.rootClusterName {
ch.root.delete()
ch.root = createClusterNode(rootClusterName, ch.xdsClient, ch)
ch.rootClusterName = rootClusterName
}
}
// This function tries to construct a cluster update to send to CDS.
func (ch *clusterHandler) constructClusterUpdate() {
// If there was an error received no op, as this simply means one of the
// children hasn't received an update yet.
if clusterUpdate, err := ch.root.constructClusterUpdate(); err == nil {
// For a ClusterUpdate, the only update CDS cares about is the most
// recent one, so opportunistically drain the update channel before
// sending the new update.
select {
case <-ch.updateChannel:
default:
}
ch.updateChannel <- clusterHandlerUpdate{chu: clusterUpdate, err: nil}
}
}
// close() is meant to be called by CDS when the CDS balancer is closed, and it
// cancels the watches for every cluster in the cluster tree.
func (ch *clusterHandler) close() {
ch.clusterMutex.Lock()
defer ch.clusterMutex.Unlock()
ch.root.delete()
ch.root = nil
ch.rootClusterName = ""
}
// This logically represents a cluster. This handles all the logic for starting
// and stopping a cluster watch, handling any updates, and constructing a list
// recursively for the ClusterHandler.
type clusterNode struct {
// A way to cancel the watch for the cluster.
cancelFunc func()
// A list of children, as the Node can be an aggregate Cluster.
children []*clusterNode
// A ClusterUpdate in order to build a list of cluster updates for CDS to
// send down to child XdsClusterResolverLoadBalancingPolicy.
clusterUpdate xdsclient.ClusterUpdate
// This boolean determines whether this Node has received an update or not.
// This isn't the best practice, but this will protect a list of Cluster
// Updates from being constructed if a cluster in the tree has not received
// an update yet.
receivedUpdate bool
clusterHandler *clusterHandler
}
// CreateClusterNode creates a cluster node from a given clusterName. This will
// also start the watch for that cluster.
func createClusterNode(clusterName string, xdsClient xdsClientInterface, topLevelHandler *clusterHandler) *clusterNode {
c := &clusterNode{
clusterHandler: topLevelHandler,
}
// Communicate with the xds client here.
c.cancelFunc = xdsClient.WatchCluster(clusterName, c.handleResp)
return c
}
// This function cancels the cluster watch on the cluster and all of it's
// children.
func (c *clusterNode) delete() {
c.cancelFunc()
for _, child := range c.children {
child.delete()
}
}
// Construct cluster update (potentially a list of ClusterUpdates) for a node.
func (c *clusterNode) constructClusterUpdate() ([]xdsclient.ClusterUpdate, error) {
// If the cluster has not yet received an update, the cluster update is not
// yet ready.
if !c.receivedUpdate {
return nil, errNotReceivedUpdate
}
// Base case - LogicalDNS or EDS. Both of these cluster types will be tied
// to a single ClusterUpdate.
if c.clusterUpdate.ClusterType != xdsclient.ClusterTypeAggregate {
return []xdsclient.ClusterUpdate{c.clusterUpdate}, nil
}
// If an aggregate construct a list by recursively calling down to all of
// it's children.
var childrenUpdates []xdsclient.ClusterUpdate
for _, child := range c.children {
childUpdateList, err := child.constructClusterUpdate()
if err != nil {
return nil, err
}
childrenUpdates = append(childrenUpdates, childUpdateList...)
}
return childrenUpdates, nil
}
// handleResp handles a xds response for a particular cluster. This function
// also handles any logic with regards to any child state that may have changed.
// At the end of the handleResp(), the clusterUpdate will be pinged in certain
// situations to try and construct an update to send back to CDS.
func (c *clusterNode) handleResp(clusterUpdate xdsclient.ClusterUpdate, err error) {
c.clusterHandler.clusterMutex.Lock()
defer c.clusterHandler.clusterMutex.Unlock()
if err != nil { // Write this error for run() to pick up in CDS LB policy.
// For a ClusterUpdate, the only update CDS cares about is the most
// recent one, so opportunistically drain the update channel before
// sending the new update.
select {
case <-c.clusterHandler.updateChannel:
default:
}
c.clusterHandler.updateChannel <- clusterHandlerUpdate{chu: nil, err: err}
return
}
// deltaInClusterUpdateFields determines whether there was a delta in the
// clusterUpdate fields (forgetting the children). This will be used to help
// determine whether to pingClusterHandler at the end of this callback or
// not.
deltaInClusterUpdateFields := clusterUpdate.ServiceName != c.clusterUpdate.ServiceName || clusterUpdate.ClusterType != c.clusterUpdate.ClusterType
c.receivedUpdate = true
c.clusterUpdate = clusterUpdate
// If the cluster was a leaf node, if the cluster update received had change
// in the cluster update then the overall cluster update would change and
// there is a possibility for the overall update to build so ping cluster
// handler to return. Also, if there was any children from previously,
// delete the children, as the cluster type is no longer an aggregate
// cluster.
if clusterUpdate.ClusterType != xdsclient.ClusterTypeAggregate {
for _, child := range c.children {
child.delete()
}
c.children = nil
if deltaInClusterUpdateFields {
c.clusterHandler.constructClusterUpdate()
}
return
}
// Aggregate cluster handling.
newChildren := make(map[string]bool)
for _, childName := range clusterUpdate.PrioritizedClusterNames {
newChildren[childName] = true
}
// These booleans help determine whether this callback will ping the overall
// clusterHandler to try and construct an update to send back to CDS. This
// will be determined by whether there would be a change in the overall
// clusterUpdate for the whole tree (ex. change in clusterUpdate for current
// cluster or a deleted child) and also if there's even a possibility for
// the update to build (ex. if a child is created and a watch is started,
// that child hasn't received an update yet due to the mutex lock on this
// callback).
var createdChild, deletedChild bool
// This map will represent the current children of the cluster. It will be
// first added to in order to represent the new children. It will then have
// any children deleted that are no longer present. Then, from the cluster
// update received, will be used to construct the new child list.
mapCurrentChildren := make(map[string]*clusterNode)
for _, child := range c.children {
mapCurrentChildren[child.clusterUpdate.ServiceName] = child
}
// Add and construct any new child nodes.
for child := range newChildren {
if _, inChildrenAlready := mapCurrentChildren[child]; !inChildrenAlready {
createdChild = true
mapCurrentChildren[child] = createClusterNode(child, c.clusterHandler.xdsClient, c.clusterHandler)
}
}
// Delete any child nodes no longer in the aggregate cluster's children.
for child := range mapCurrentChildren {
if _, stillAChild := newChildren[child]; !stillAChild {
deletedChild = true
mapCurrentChildren[child].delete()
delete(mapCurrentChildren, child)
}
}
// The order of the children list matters, so use the clusterUpdate from
// xdsclient as the ordering, and use that logical ordering for the new
// children list. This will be a mixture of child nodes which are all
// already constructed in the mapCurrentChildrenMap.
var children = make([]*clusterNode, 0, len(clusterUpdate.PrioritizedClusterNames))
for _, orderedChild := range clusterUpdate.PrioritizedClusterNames {
// The cluster's already have watches started for them in xds client, so
// you can use these pointers to construct the new children list, you
// just have to put them in the correct order using the original cluster
// update.
currentChild := mapCurrentChildren[orderedChild]
children = append(children, currentChild)
}
c.children = children
// If the cluster is an aggregate cluster, if this callback created any new
// child cluster nodes, then there's no possibility for a full cluster
// update to successfully build, as those created children will not have
// received an update yet. However, if there was simply a child deleted,
// then there is a possibility that it will have a full cluster update to
// build and also will have a changed overall cluster update from the
// deleted child.
if deletedChild && !createdChild {
c.clusterHandler.constructClusterUpdate()
}
}

View File

@ -0,0 +1,676 @@
// +build go1.12
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cdsbalancer
import (
"context"
"errors"
"testing"
"github.com/google/go-cmp/cmp"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
)
const (
edsService = "EDS Service"
logicalDNSService = "Logical DNS Service"
edsService2 = "EDS Service 2"
logicalDNSService2 = "Logical DNS Service 2"
aggregateClusterService = "Aggregate Cluster Service"
)
// setupTests creates a clusterHandler with a fake xds client for control over
// xds client.
func setupTests(t *testing.T) (*clusterHandler, *fakeclient.Client) {
xdsC := fakeclient.NewClient()
ch := &clusterHandler{
xdsClient: xdsC,
// This is will be how the update channel is created in cds. It will be
// a separate channel to the buffer.Unbounded. This channel will also be
// read from to test any cluster updates.
updateChannel: make(chan clusterHandlerUpdate, 1),
}
return ch, xdsC
}
// Simplest case: the cluster handler receives a cluster name, handler starts a
// watch for that cluster, xds client returns that it is a Leaf Node (EDS or
// LogicalDNS), not a tree, so expectation that update is written to buffer
// which will be read by CDS LB.
func (s) TestSuccessCaseLeafNode(t *testing.T) {
tests := []struct {
name string
clusterName string
clusterUpdate xdsclient.ClusterUpdate
}{
{name: "test-update-root-cluster-EDS-success",
clusterName: edsService,
clusterUpdate: xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeEDS,
ServiceName: edsService,
}},
{
name: "test-update-root-cluster-Logical-DNS-success",
clusterName: logicalDNSService,
clusterUpdate: xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeLogicalDNS,
ServiceName: logicalDNSService,
}},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ch, fakeClient := setupTests(t)
// When you first update the root cluster, it should hit the code
// path which will start a cluster node for that root. Updating the
// root cluster logically represents a ping from a ClientConn.
ch.updateRootCluster(test.clusterName)
// Starting a cluster node involves communicating with the
// xdsClient, telling it to watch a cluster.
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
gotCluster, err := fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
if gotCluster != test.clusterName {
t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, test.clusterName)
}
// Invoke callback with xds client with a certain clusterUpdate. Due
// to this cluster update filling out the whole cluster tree, as the
// cluster is of a root type (EDS or Logical DNS) and not an
// aggregate cluster, this should trigger the ClusterHandler to
// write to the update buffer to update the CDS policy.
fakeClient.InvokeWatchClusterCallback(test.clusterUpdate, nil)
select {
case chu := <-ch.updateChannel:
if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{test.clusterUpdate}); diff != "" {
t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
t.Fatal("Timed out waiting for update from update channel.")
}
// Close the clusterHandler. This is meant to be called when the CDS
// Balancer is closed, and the call should cancel the watch for this
// cluster.
ch.close()
clusterNameDeleted, err := fakeClient.WaitForCancelClusterWatch(ctx)
if err != nil {
t.Fatalf("xdsClient.CancelCDS failed with error: %v", err)
}
if clusterNameDeleted != test.clusterName {
t.Fatalf("xdsClient.CancelCDS called for cluster %v, want: %v", clusterNameDeleted, logicalDNSService)
}
})
}
}
// The cluster handler receives a cluster name, handler starts a watch for that
// cluster, xds client returns that it is a Leaf Node (EDS or LogicalDNS), not a
// tree, so expectation that first update is written to buffer which will be
// read by CDS LB. Then, send a new cluster update that is different, with the
// expectation that it is also written to the update buffer to send back to CDS.
func (s) TestSuccessCaseLeafNodeThenNewUpdate(t *testing.T) {
tests := []struct {
name string
clusterName string
clusterUpdate xdsclient.ClusterUpdate
newClusterUpdate xdsclient.ClusterUpdate
}{
{name: "test-update-root-cluster-then-new-update-EDS-success",
clusterName: edsService,
clusterUpdate: xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeEDS,
ServiceName: edsService,
},
newClusterUpdate: xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeEDS,
ServiceName: edsService2,
},
},
{
name: "test-update-root-cluster-then-new-update-Logical-DNS-success",
clusterName: logicalDNSService,
clusterUpdate: xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeLogicalDNS,
ServiceName: logicalDNSService,
},
newClusterUpdate: xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeLogicalDNS,
ServiceName: logicalDNSService2,
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ch, fakeClient := setupTests(t)
ch.updateRootCluster(test.clusterName)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
_, err := fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
fakeClient.InvokeWatchClusterCallback(test.clusterUpdate, nil)
select {
case <-ch.updateChannel:
case <-ctx.Done():
t.Fatal("Timed out waiting for update from updateChannel.")
}
// Check that sending the same cluster update does not induce a
// update to be written to update buffer.
fakeClient.InvokeWatchClusterCallback(test.clusterUpdate, nil)
shouldNotHappenCtx, shouldNotHappenCtxCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer shouldNotHappenCtxCancel()
select {
case <-ch.updateChannel:
t.Fatal("Should not have written an update to update buffer, as cluster update did not change.")
case <-shouldNotHappenCtx.Done():
}
// Above represents same thing as the simple
// TestSuccessCaseLeafNode, extra behavior + validation (clusterNode
// which is a leaf receives a changed clusterUpdate, which should
// ping clusterHandler, which should then write to the update
// buffer).
fakeClient.InvokeWatchClusterCallback(test.newClusterUpdate, nil)
select {
case chu := <-ch.updateChannel:
if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{test.newClusterUpdate}); diff != "" {
t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
t.Fatal("Timed out waiting for update from updateChannel.")
}
})
}
}
// TestUpdateRootClusterAggregateSuccess tests the case where an aggregate
// cluster is a root pointing to two child clusters one of type EDS and the
// other of type LogicalDNS. This test will then send cluster updates for both
// the children, and at the end there should be a successful clusterUpdate
// written to the update buffer to send back to CDS.
func (s) TestUpdateRootClusterAggregateSuccess(t *testing.T) {
ch, fakeClient := setupTests(t)
ch.updateRootCluster(aggregateClusterService)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
gotCluster, err := fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
if gotCluster != aggregateClusterService {
t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, aggregateClusterService)
}
// The xdsClient telling the clusterNode that the cluster type is an
// aggregate cluster which will cause a lot of downstream behavior. For a
// cluster type that isn't an aggregate, the behavior is simple. The
// clusterNode will simply get a successful update, which will then ping the
// clusterHandler which will successfully build an update to send to the CDS
// policy. In the aggregate cluster case, the handleResp callback must also
// start watches for the aggregate cluster's children. The ping to the
// clusterHandler at the end of handleResp should be a no-op, as neither the
// EDS or LogicalDNS child clusters have received an update yet.
fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeAggregate,
ServiceName: aggregateClusterService,
PrioritizedClusterNames: []string{edsService, logicalDNSService},
}, nil)
// xds client should be called to start a watch for one of the child
// clusters of the aggregate. The order of the children in the update
// written to the buffer to send to CDS matters, however there is no
// guarantee on the order it will start the watches of the children.
gotCluster, err = fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
if gotCluster != edsService {
if gotCluster != logicalDNSService {
t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, edsService)
}
}
// xds client should then be called to start a watch for the second child
// cluster.
gotCluster, err = fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
if gotCluster != edsService {
if gotCluster != logicalDNSService {
t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, logicalDNSService)
}
}
// The handleResp() call on the root aggregate cluster should not ping the
// cluster handler to try and construct an update, as the handleResp()
// callback knows that when a child is created, it cannot possibly build a
// successful update yet. Thus, there should be nothing in the update
// channel.
shouldNotHappenCtx, shouldNotHappenCtxCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer shouldNotHappenCtxCancel()
select {
case <-ch.updateChannel:
t.Fatal("Cluster Handler wrote an update to updateChannel when it shouldn't have, as each node in the full cluster tree has not yet received an update")
case <-shouldNotHappenCtx.Done():
}
// Send callback for the EDS child cluster.
fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeEDS,
ServiceName: edsService,
}, nil)
// EDS child cluster will ping the Cluster Handler, to try an update, which
// still won't successfully build as the LogicalDNS child of the root
// aggregate cluster has not yet received and handled an update.
select {
case <-ch.updateChannel:
t.Fatal("Cluster Handler wrote an update to updateChannel when it shouldn't have, as each node in the full cluster tree has not yet received an update")
case <-shouldNotHappenCtx.Done():
}
// Invoke callback for Logical DNS child cluster.
fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeLogicalDNS,
ServiceName: logicalDNSService,
}, nil)
// Will Ping Cluster Handler, which will finally successfully build an
// update as all nodes in the tree of clusters have received an update.
// Since this cluster is an aggregate cluster comprised of two children, the
// returned update should be length 2, as the xds cluster resolver LB policy
// only cares about the full list of LogicalDNS and EDS clusters
// representing the base nodes of the tree of clusters. This list should be
// ordered as per the cluster update.
select {
case chu := <-ch.updateChannel:
if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{{
ClusterType: xdsclient.ClusterTypeEDS,
ServiceName: edsService,
}, {
ClusterType: xdsclient.ClusterTypeLogicalDNS,
ServiceName: logicalDNSService,
}}); diff != "" {
t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.")
}
}
// TestUpdateRootClusterAggregateThenChangeChild tests the scenario where you
// have an aggregate cluster with an EDS child and a LogicalDNS child, then you
// change one of the children and send an update for the changed child. This
// should write a new update to the update buffer to send back to CDS.
func (s) TestUpdateRootClusterAggregateThenChangeChild(t *testing.T) {
// This initial code is the same as the test for the aggregate success case,
// except without validations. This will get this test to the point where it
// can change one of the children.
ch, fakeClient := setupTests(t)
ch.updateRootCluster(aggregateClusterService)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
_, err := fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeAggregate,
ServiceName: aggregateClusterService,
PrioritizedClusterNames: []string{edsService, logicalDNSService},
}, nil)
fakeClient.WaitForWatchCluster(ctx)
fakeClient.WaitForWatchCluster(ctx)
fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeEDS,
ServiceName: edsService,
}, nil)
fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeLogicalDNS,
ServiceName: logicalDNSService,
}, nil)
select {
case <-ch.updateChannel:
case <-ctx.Done():
t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.")
}
fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeAggregate,
ServiceName: aggregateClusterService,
PrioritizedClusterNames: []string{edsService, logicalDNSService2},
}, nil)
// The cluster update let's the aggregate cluster know that it's children
// are now edsService and logicalDNSService2, which implies that the
// aggregateCluster lost it's old logicalDNSService child. Thus, the
// logicalDNSService child should be deleted.
clusterNameDeleted, err := fakeClient.WaitForCancelClusterWatch(ctx)
if err != nil {
t.Fatalf("xdsClient.CancelCDS failed with error: %v", err)
}
if clusterNameDeleted != logicalDNSService {
t.Fatalf("xdsClient.CancelCDS called for cluster %v, want: %v", clusterNameDeleted, logicalDNSService)
}
// The handleResp() callback should then start a watch for
// logicalDNSService2.
clusterNameCreated, err := fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
if clusterNameCreated != logicalDNSService2 {
t.Fatalf("xdsClient.WatchCDS called for cluster %v, want: %v", clusterNameCreated, logicalDNSService2)
}
// handleResp() should try and send an update here, but it will fail as
// logicalDNSService2 has not yet received an update.
shouldNotHappenCtx, shouldNotHappenCtxCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer shouldNotHappenCtxCancel()
select {
case <-ch.updateChannel:
t.Fatal("Cluster Handler wrote an update to updateChannel when it shouldn't have, as each node in the full cluster tree has not yet received an update")
case <-shouldNotHappenCtx.Done():
}
// Invoke a callback for the new logicalDNSService2 - this will fill out the
// tree with successful updates.
fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeLogicalDNS,
ServiceName: logicalDNSService2,
}, nil)
// Behavior: This update make every node in the tree of cluster have
// received an update. Thus, at the end of this callback, when you ping the
// clusterHandler to try and construct an update, the update should now
// successfully be written to update buffer to send back to CDS. This new
// update should contain the new child of LogicalDNS2.
select {
case chu := <-ch.updateChannel:
if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{{
ClusterType: xdsclient.ClusterTypeEDS,
ServiceName: edsService,
}, {
ClusterType: xdsclient.ClusterTypeLogicalDNS,
ServiceName: logicalDNSService2,
}}); diff != "" {
t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.")
}
}
// TestUpdateRootClusterAggregateThenChangeRootToEDS tests the situation where
// you have a fully updated aggregate cluster (where AggregateCluster success
// test gets you) as the root cluster, then you update that root cluster to a
// cluster of type EDS.
func (s) TestUpdateRootClusterAggregateThenChangeRootToEDS(t *testing.T) {
// This initial code is the same as the test for the aggregate success case,
// except without validations. This will get this test to the point where it
// can update the root cluster to one of type EDS.
ch, fakeClient := setupTests(t)
ch.updateRootCluster(aggregateClusterService)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
_, err := fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeAggregate,
ServiceName: aggregateClusterService,
PrioritizedClusterNames: []string{edsService, logicalDNSService},
}, nil)
fakeClient.WaitForWatchCluster(ctx)
fakeClient.WaitForWatchCluster(ctx)
fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeEDS,
ServiceName: edsService,
}, nil)
fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeLogicalDNS,
ServiceName: logicalDNSService,
}, nil)
select {
case <-ch.updateChannel:
case <-ctx.Done():
t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.")
}
// Changes the root aggregate cluster to a EDS cluster. This should delete
// the root aggregate cluster and all of it's children by successfully
// canceling the watches for them.
ch.updateRootCluster(edsService2)
// Reads from the cancel channel, should first be type Aggregate, then EDS
// then Logical DNS.
clusterNameDeleted, err := fakeClient.WaitForCancelClusterWatch(ctx)
if err != nil {
t.Fatalf("xdsClient.CancelCDS failed with error: %v", err)
}
if clusterNameDeleted != aggregateClusterService {
t.Fatalf("xdsClient.CancelCDS called for cluster %v, want: %v", clusterNameDeleted, logicalDNSService)
}
clusterNameDeleted, err = fakeClient.WaitForCancelClusterWatch(ctx)
if err != nil {
t.Fatalf("xdsClient.CancelCDS failed with error: %v", err)
}
if clusterNameDeleted != edsService {
t.Fatalf("xdsClient.CancelCDS called for cluster %v, want: %v", clusterNameDeleted, logicalDNSService)
}
clusterNameDeleted, err = fakeClient.WaitForCancelClusterWatch(ctx)
if err != nil {
t.Fatalf("xdsClient.CancelCDS failed with error: %v", err)
}
if clusterNameDeleted != logicalDNSService {
t.Fatalf("xdsClient.CancelCDS called for cluster %v, want: %v", clusterNameDeleted, logicalDNSService)
}
// After deletion, it should start a watch for the EDS Cluster. The behavior
// for this EDS Cluster receiving an update from xds client and then
// successfully writing an update to send back to CDS is already tested in
// the updateEDS success case.
gotCluster, err := fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
if gotCluster != edsService2 {
t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, edsService2)
}
}
// TestHandleRespInvokedWithError tests that when handleResp is invoked with an
// error, that the error is successfully written to the update buffer.
func (s) TestHandleRespInvokedWithError(t *testing.T) {
ch, fakeClient := setupTests(t)
ch.updateRootCluster(edsService)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
_, err := fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{}, errors.New("some error"))
select {
case chu := <-ch.updateChannel:
if chu.err.Error() != "some error" {
t.Fatalf("Did not receive the expected error, instead received: %v", chu.err.Error())
}
case <-ctx.Done():
t.Fatal("Timed out waiting for update from update channel.")
}
}
// TestSwitchClusterNodeBetweenLeafAndAggregated tests having an existing
// cluster node switch between a leaf and an aggregated cluster. When the
// cluster switches from a leaf to an aggregated cluster, it should add
// children, and when it switches back to a leaf, it should delete those new
// children and also successfully write a cluster update to the update buffer.
func (s) TestSwitchClusterNodeBetweenLeafAndAggregated(t *testing.T) {
// Getting the test to the point where there's a root cluster which is a eds
// leaf.
ch, fakeClient := setupTests(t)
ch.updateRootCluster(edsService2)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
_, err := fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeEDS,
ServiceName: edsService2,
}, nil)
select {
case <-ch.updateChannel:
case <-ctx.Done():
t.Fatal("Timed out waiting for update from update channel.")
}
// Switch the cluster to an aggregate cluster, this should cause two new
// child watches to be created.
fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeAggregate,
ServiceName: edsService2,
PrioritizedClusterNames: []string{edsService, logicalDNSService},
}, nil)
// xds client should be called to start a watch for one of the child
// clusters of the aggregate. The order of the children in the update
// written to the buffer to send to CDS matters, however there is no
// guarantee on the order it will start the watches of the children.
gotCluster, err := fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
if gotCluster != edsService {
if gotCluster != logicalDNSService {
t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, edsService)
}
}
// xds client should then be called to start a watch for the second child
// cluster.
gotCluster, err = fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
if gotCluster != edsService {
if gotCluster != logicalDNSService {
t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, logicalDNSService)
}
}
// After starting a watch for the second child cluster, there should be no
// more watches started on the xds client.
shouldNotHappenCtx, shouldNotHappenCtxCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer shouldNotHappenCtxCancel()
gotCluster, err = fakeClient.WaitForWatchCluster(shouldNotHappenCtx)
if err == nil {
t.Fatalf("xdsClient.WatchCDS called for cluster: %v, no more watches should be started.", gotCluster)
}
// The handleResp() call on the root aggregate cluster should not ping the
// cluster handler to try and construct an update, as the handleResp()
// callback knows that when a child is created, it cannot possibly build a
// successful update yet. Thus, there should be nothing in the update
// channel.
shouldNotHappenCtx, shouldNotHappenCtxCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer shouldNotHappenCtxCancel()
select {
case <-ch.updateChannel:
t.Fatal("Cluster Handler wrote an update to updateChannel when it shouldn't have, as each node in the full cluster tree has not yet received an update")
case <-shouldNotHappenCtx.Done():
}
// Switch the cluster back to an EDS Cluster. This should cause the two
// children to be deleted.
fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeEDS,
ServiceName: edsService2,
}, nil)
// Should delete the two children (no guarantee of ordering deleted, which
// is ok), then successfully write an update to the update buffer as the
// full cluster tree has received updates.
clusterNameDeleted, err := fakeClient.WaitForCancelClusterWatch(ctx)
if err != nil {
t.Fatalf("xdsClient.CancelCDS failed with error: %v", err)
}
// No guarantee of ordering, so one of the children should be deleted first.
if clusterNameDeleted != edsService {
if clusterNameDeleted != logicalDNSService {
t.Fatalf("xdsClient.CancelCDS called for cluster %v, want either: %v or: %v", clusterNameDeleted, edsService, logicalDNSService)
}
}
// Then the other child should be deleted.
clusterNameDeleted, err = fakeClient.WaitForCancelClusterWatch(ctx)
if err != nil {
t.Fatalf("xdsClient.CancelCDS failed with error: %v", err)
}
if clusterNameDeleted != edsService {
if clusterNameDeleted != logicalDNSService {
t.Fatalf("xdsClient.CancelCDS called for cluster %v, want either: %v or: %v", clusterNameDeleted, edsService, logicalDNSService)
}
}
// After cancelling a watch for the second child cluster, there should be no
// more watches cancelled on the xds client.
shouldNotHappenCtx, shouldNotHappenCtxCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer shouldNotHappenCtxCancel()
gotCluster, err = fakeClient.WaitForCancelClusterWatch(shouldNotHappenCtx)
if err == nil {
t.Fatalf("xdsClient.WatchCDS called for cluster: %v, no more watches should be cancelled.", gotCluster)
}
// Then an update should successfully be written to the update buffer.
select {
case chu := <-ch.updateChannel:
if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{{
ClusterType: xdsclient.ClusterTypeEDS,
ServiceName: edsService2,
}}); diff != "" {
t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
t.Fatal("Timed out waiting for update from update channel.")
}
}

View File

@ -45,10 +45,10 @@ type Client struct {
loadStore *load.Store
bootstrapCfg *bootstrap.Config
ldsCb func(xdsclient.ListenerUpdate, error)
rdsCb func(xdsclient.RouteConfigUpdate, error)
cdsCb func(xdsclient.ClusterUpdate, error)
edsCb func(xdsclient.EndpointsUpdate, error)
ldsCb func(xdsclient.ListenerUpdate, error)
rdsCb func(xdsclient.RouteConfigUpdate, error)
cdsCbs map[string]func(xdsclient.ClusterUpdate, error)
edsCb func(xdsclient.EndpointsUpdate, error)
}
// WatchListener registers a LDS watch.
@ -121,10 +121,13 @@ func (xdsC *Client) WaitForCancelRouteConfigWatch(ctx context.Context) error {
// WatchCluster registers a CDS watch.
func (xdsC *Client) WatchCluster(clusterName string, callback func(xdsclient.ClusterUpdate, error)) func() {
xdsC.cdsCb = callback
// Due to the tree like structure of aggregate clusters, there can be multiple callbacks persisted for each cluster
// node. However, the client doesn't care about the parent child relationship between the nodes, only that it invokes
// the right callback for a particular cluster.
xdsC.cdsCbs[clusterName] = callback
xdsC.cdsWatchCh.Send(clusterName)
return func() {
xdsC.cdsCancelCh.Send(nil)
xdsC.cdsCancelCh.Send(clusterName)
}
}
@ -143,14 +146,28 @@ func (xdsC *Client) WaitForWatchCluster(ctx context.Context) (string, error) {
// Not thread safe with WatchCluster. Only call this after
// WaitForWatchCluster.
func (xdsC *Client) InvokeWatchClusterCallback(update xdsclient.ClusterUpdate, err error) {
xdsC.cdsCb(update, err)
// Keeps functionality with previous usage of this, if single callback call that callback.
if len(xdsC.cdsCbs) == 1 {
var clusterName string
for cluster := range xdsC.cdsCbs {
clusterName = cluster
}
xdsC.cdsCbs[clusterName](update, err)
} else {
// Have what callback you call with the update determined by the service name in the ClusterUpdate. Left up to the
// caller to make sure the cluster update matches with a persisted callback.
xdsC.cdsCbs[update.ServiceName](update, err)
}
}
// WaitForCancelClusterWatch waits for a CDS watch to be cancelled and returns
// context.DeadlineExceeded otherwise.
func (xdsC *Client) WaitForCancelClusterWatch(ctx context.Context) error {
_, err := xdsC.cdsCancelCh.Receive(ctx)
return err
func (xdsC *Client) WaitForCancelClusterWatch(ctx context.Context) (string, error) {
clusterNameReceived, err := xdsC.cdsCancelCh.Receive(ctx)
if err != nil {
return "", err
}
return clusterNameReceived.(string), err
}
// WatchEndpoints registers an EDS watch for provided clusterName.
@ -251,14 +268,15 @@ func NewClientWithName(name string) *Client {
name: name,
ldsWatchCh: testutils.NewChannel(),
rdsWatchCh: testutils.NewChannel(),
cdsWatchCh: testutils.NewChannel(),
cdsWatchCh: testutils.NewChannelWithSize(10),
edsWatchCh: testutils.NewChannel(),
ldsCancelCh: testutils.NewChannel(),
rdsCancelCh: testutils.NewChannel(),
cdsCancelCh: testutils.NewChannel(),
cdsCancelCh: testutils.NewChannelWithSize(10),
edsCancelCh: testutils.NewChannel(),
loadReportCh: testutils.NewChannel(),
closeCh: testutils.NewChannel(),
loadStore: load.NewStore(),
cdsCbs: make(map[string]func(xdsclient.ClusterUpdate, error)),
}
}