xds: Handle loops and ignore duplicates in aggregated cluster handling (#5317)

xds: Handle loops and ignore duplicates in aggregated cluster handling
This commit is contained in:
Zach Reyes 2022-05-05 16:08:24 -04:00 committed by GitHub
parent 799605c228
commit ee67b3d8e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 531 additions and 56 deletions

View File

@ -24,7 +24,12 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
var errNotReceivedUpdate = errors.New("tried to construct a cluster update on a cluster that has not received an update")
const maxDepth = 16
var (
errNotReceivedUpdate = errors.New("tried to construct a cluster update on a cluster that has not received an update")
errExceedsMaxDepth = errors.New("aggregate cluster graph exceeds max depth")
)
// clusterHandlerUpdate wraps the information received from the registered CDS
// watcher. A non-nil error is propagated to the underlying cluster_resolver
@ -54,9 +59,10 @@ type clusterHandler struct {
// A mutex to protect entire tree of clusters.
clusterMutex sync.Mutex
root *clusterNode
rootClusterName string
createdClusters map[string]*clusterNode
// A way to ping CDS Balancer about any updates or errors to a Node in the
// tree. This will either get called from this handler constructing an
// update or from a child with an error. Capacity of one as the only update
@ -68,37 +74,46 @@ func newClusterHandler(parent *cdsBalancer) *clusterHandler {
return &clusterHandler{
parent: parent,
updateChannel: make(chan clusterHandlerUpdate, 1),
createdClusters: make(map[string]*clusterNode),
}
}
func (ch *clusterHandler) updateRootCluster(rootClusterName string) {
ch.clusterMutex.Lock()
defer ch.clusterMutex.Unlock()
if ch.root == nil {
if ch.createdClusters[ch.rootClusterName] == nil {
// Construct a root node on first update.
ch.root = createClusterNode(rootClusterName, ch.parent.xdsClient, ch)
createClusterNode(rootClusterName, ch.parent.xdsClient, ch, 0)
ch.rootClusterName = rootClusterName
return
}
// Check if root cluster was changed. If it was, delete old one and start
// new one, if not do nothing.
if rootClusterName != ch.rootClusterName {
ch.root.delete()
ch.root = createClusterNode(rootClusterName, ch.parent.xdsClient, ch)
ch.createdClusters[ch.rootClusterName].delete()
createClusterNode(rootClusterName, ch.parent.xdsClient, ch, 0)
ch.rootClusterName = rootClusterName
}
}
// This function tries to construct a cluster update to send to CDS.
func (ch *clusterHandler) constructClusterUpdate() {
if ch.root == nil {
if ch.createdClusters[ch.rootClusterName] == nil {
// If root is nil, this handler is closed, ignore the update.
return
}
clusterUpdate, err := ch.root.constructClusterUpdate()
clusterUpdate, err := ch.createdClusters[ch.rootClusterName].constructClusterUpdate(make(map[string]bool))
if err != nil {
// If there was an error received no op, as this simply means one of the
// children hasn't received an update yet.
// If there was an error received no op, as this can mean one of the
// children hasn't received an update yet, or the graph continued to
// stay in an error state. If the graph continues to stay in an error
// state, no new error needs to be written to the update buffer as that
// would be redundant information.
return
}
if clusterUpdate == nil {
// This means that there was an aggregated cluster with no EDS or DNS as
// leaf nodes. No update to be written.
return
}
// For a ClusterUpdate, the only update CDS cares about is the most
@ -109,8 +124,8 @@ func (ch *clusterHandler) constructClusterUpdate() {
default:
}
ch.updateChannel <- clusterHandlerUpdate{
securityCfg: ch.root.clusterUpdate.SecurityCfg,
lbPolicy: ch.root.clusterUpdate.LBPolicy,
securityCfg: ch.createdClusters[ch.rootClusterName].clusterUpdate.SecurityCfg,
lbPolicy: ch.createdClusters[ch.rootClusterName].clusterUpdate.LBPolicy,
updates: clusterUpdate,
}
}
@ -120,11 +135,10 @@ func (ch *clusterHandler) constructClusterUpdate() {
func (ch *clusterHandler) close() {
ch.clusterMutex.Lock()
defer ch.clusterMutex.Unlock()
if ch.root == nil {
if ch.createdClusters[ch.rootClusterName] == nil {
return
}
ch.root.delete()
ch.root = nil
ch.createdClusters[ch.rootClusterName].delete()
ch.rootClusterName = ""
}
@ -136,7 +150,7 @@ type clusterNode struct {
cancelFunc func()
// A list of children, as the Node can be an aggregate Cluster.
children []*clusterNode
children []string
// A ClusterUpdate in order to build a list of cluster updates for CDS to
// send down to child XdsClusterResolverLoadBalancingPolicy.
@ -149,13 +163,30 @@ type clusterNode struct {
receivedUpdate bool
clusterHandler *clusterHandler
depth int32
refCount int32
// maxDepthErr is set if this cluster node is an aggregate cluster and has a
// child that causes the graph to exceed the maximum depth allowed. This is
// used to show a cluster graph as being in an error state when it constructs
// a cluster update.
maxDepthErr error
}
// CreateClusterNode creates a cluster node from a given clusterName. This will
// also start the watch for that cluster.
func createClusterNode(clusterName string, xdsClient xdsclient.XDSClient, topLevelHandler *clusterHandler) *clusterNode {
func createClusterNode(clusterName string, xdsClient xdsclient.XDSClient, topLevelHandler *clusterHandler, depth int32) {
// If the cluster has already been created, simply return, which ignores
// duplicates.
if topLevelHandler.createdClusters[clusterName] != nil {
topLevelHandler.createdClusters[clusterName].refCount++
return
}
c := &clusterNode{
clusterHandler: topLevelHandler,
depth: depth,
refCount: 1,
}
// Communicate with the xds client here.
topLevelHandler.parent.logger.Infof("CDS watch started on %v", clusterName)
@ -164,25 +195,43 @@ func createClusterNode(clusterName string, xdsClient xdsclient.XDSClient, topLev
topLevelHandler.parent.logger.Infof("CDS watch canceled on %v", clusterName)
cancel()
}
return c
topLevelHandler.createdClusters[clusterName] = c
}
// This function cancels the cluster watch on the cluster and all of it's
// children.
func (c *clusterNode) delete() {
c.refCount--
if c.refCount == 0 {
c.cancelFunc()
delete(c.clusterHandler.createdClusters, c.clusterUpdate.ClusterName)
for _, child := range c.children {
child.delete()
if c.clusterHandler.createdClusters[child] != nil {
c.clusterHandler.createdClusters[child].delete()
}
}
}
}
// Construct cluster update (potentially a list of ClusterUpdates) for a node.
func (c *clusterNode) constructClusterUpdate() ([]xdsresource.ClusterUpdate, error) {
func (c *clusterNode) constructClusterUpdate(clustersSeen map[string]bool) ([]xdsresource.ClusterUpdate, error) {
// If the cluster has not yet received an update, the cluster update is not
// yet ready.
if !c.receivedUpdate {
return nil, errNotReceivedUpdate
}
if c.maxDepthErr != nil {
return nil, c.maxDepthErr
}
// Ignore duplicates. It's ok to ignore duplicates because the second
// occurrence of a cluster will never be used. I.e. in [C, D, C], the second
// C will never be used (the only way to fall back to lower priority D is if
// C is down, which means second C will never be chosen). Thus, [C, D, C] is
// logically equivalent to [C, D].
if clustersSeen[c.clusterUpdate.ClusterName] {
return []xdsresource.ClusterUpdate{}, nil
}
clustersSeen[c.clusterUpdate.ClusterName] = true
// Base case - LogicalDNS or EDS. Both of these cluster types will be tied
// to a single ClusterUpdate.
@ -194,7 +243,7 @@ func (c *clusterNode) constructClusterUpdate() ([]xdsresource.ClusterUpdate, err
// it's children.
var childrenUpdates []xdsresource.ClusterUpdate
for _, child := range c.children {
childUpdateList, err := child.constructClusterUpdate()
childUpdateList, err := c.clusterHandler.createdClusters[child].constructClusterUpdate(clustersSeen)
if err != nil {
return nil, err
}
@ -219,6 +268,8 @@ func (c *clusterNode) handleResp(clusterUpdate xdsresource.ClusterUpdate, err er
default:
}
c.clusterHandler.updateChannel <- clusterHandlerUpdate{err: err}
c.receivedUpdate = false
c.maxDepthErr = nil
return
}
@ -233,9 +284,10 @@ func (c *clusterNode) handleResp(clusterUpdate xdsresource.ClusterUpdate, err er
// cluster.
if clusterUpdate.ClusterType != xdsresource.ClusterTypeAggregate {
for _, child := range c.children {
child.delete()
c.clusterHandler.createdClusters[child].delete()
}
c.children = nil
c.maxDepthErr = nil
// This is an update in the one leaf node, should try to send an update
// to the parent CDS balancer.
//
@ -248,6 +300,22 @@ func (c *clusterNode) handleResp(clusterUpdate xdsresource.ClusterUpdate, err er
}
// Aggregate cluster handling.
if len(clusterUpdate.PrioritizedClusterNames) >= 1 {
if c.depth == maxDepth-1 {
// For a ClusterUpdate, the only update CDS cares about is the most
// recent one, so opportunistically drain the update channel before
// sending the new update.
select {
case <-c.clusterHandler.updateChannel:
default:
}
c.clusterHandler.updateChannel <- clusterHandlerUpdate{err: errExceedsMaxDepth}
c.children = []string{}
c.maxDepthErr = errExceedsMaxDepth
return
}
}
newChildren := make(map[string]bool)
for _, childName := range clusterUpdate.PrioritizedClusterNames {
newChildren[childName] = true
@ -261,59 +329,42 @@ func (c *clusterNode) handleResp(clusterUpdate xdsresource.ClusterUpdate, err er
// the update to build (ex. if a child is created and a watch is started,
// that child hasn't received an update yet due to the mutex lock on this
// callback).
var createdChild, deletedChild bool
var createdChild bool
// This map will represent the current children of the cluster. It will be
// first added to in order to represent the new children. It will then have
// any children deleted that are no longer present. Then, from the cluster
// update received, will be used to construct the new child list.
mapCurrentChildren := make(map[string]*clusterNode)
// any children deleted that are no longer present.
mapCurrentChildren := make(map[string]bool)
for _, child := range c.children {
mapCurrentChildren[child.clusterUpdate.ClusterName] = child
mapCurrentChildren[child] = true
}
// Add and construct any new child nodes.
for child := range newChildren {
if _, inChildrenAlready := mapCurrentChildren[child]; !inChildrenAlready {
createdChild = true
mapCurrentChildren[child] = createClusterNode(child, c.clusterHandler.parent.xdsClient, c.clusterHandler)
createClusterNode(child, c.clusterHandler.parent.xdsClient, c.clusterHandler, c.depth+1)
}
}
// Delete any child nodes no longer in the aggregate cluster's children.
for child := range mapCurrentChildren {
if _, stillAChild := newChildren[child]; !stillAChild {
deletedChild = true
mapCurrentChildren[child].delete()
c.clusterHandler.createdClusters[child].delete()
delete(mapCurrentChildren, child)
}
}
// The order of the children list matters, so use the clusterUpdate from
// xdsclient as the ordering, and use that logical ordering for the new
// children list. This will be a mixture of child nodes which are all
// already constructed in the mapCurrentChildrenMap.
var children = make([]*clusterNode, 0, len(clusterUpdate.PrioritizedClusterNames))
for _, orderedChild := range clusterUpdate.PrioritizedClusterNames {
// The cluster's already have watches started for them in xds client, so
// you can use these pointers to construct the new children list, you
// just have to put them in the correct order using the original cluster
// update.
currentChild := mapCurrentChildren[orderedChild]
children = append(children, currentChild)
}
c.children = children
c.children = clusterUpdate.PrioritizedClusterNames
c.maxDepthErr = nil
// If the cluster is an aggregate cluster, if this callback created any new
// child cluster nodes, then there's no possibility for a full cluster
// update to successfully build, as those created children will not have
// received an update yet. However, if there was simply a child deleted,
// then there is a possibility that it will have a full cluster update to
// build and also will have a changed overall cluster update from the
// deleted child.
if deletedChild && !createdChild {
// received an update yet. Even if this update did not delete a child, there
// is still a possibility for the cluster update to build, as the aggregate
// cluster can ignore duplicated children and thus the update can fill out
// the full cluster update tree.
if !createdChild {
c.clusterHandler.constructClusterUpdate()
}
}

View File

@ -19,6 +19,7 @@ package cdsbalancer
import (
"context"
"errors"
"fmt"
"testing"
"github.com/google/go-cmp/cmp"
@ -683,3 +684,426 @@ func (s) TestSwitchClusterNodeBetweenLeafAndAggregated(t *testing.T) {
t.Fatal("Timed out waiting for update from update channel.")
}
}
// TestExceedsMaxStackDepth tests the scenario where an aggregate cluster
// exceeds the maximum depth, which is 16. This should cause an error to be
// written to the update buffer.
func (s) TestExceedsMaxStackDepth(t *testing.T) {
ch, fakeClient := setupTests()
ch.updateRootCluster("cluster0")
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
_, err := fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
for i := 0; i <= 15; i++ {
fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{
ClusterType: xdsresource.ClusterTypeAggregate,
ClusterName: "cluster" + fmt.Sprint(i),
PrioritizedClusterNames: []string{"cluster" + fmt.Sprint(i+1)},
}, nil)
if i == 15 {
// The 16th iteration will try and create a cluster which exceeds
// max stack depth and will thus error, so no CDS Watch will be
// started for the child.
continue
}
_, err = fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
}
select {
case chu := <-ch.updateChannel:
if chu.err.Error() != "aggregate cluster graph exceeds max depth" {
t.Fatalf("Did not receive the expected error, instead received: %v", chu.err.Error())
}
case <-ctx.Done():
t.Fatal("Timed out waiting for an error to be written to update channel.")
}
}
// TestDiamondDependency tests a diamond shaped aggregate cluster (A->[B,C];
// B->D; C->D). Due to both B and C pointing to D as it's child, it should be
// ignored for C. Once all 4 clusters have received a CDS update, an update
// should be then written to the update buffer, specifying a single Cluster D.
func (s) TestDiamondDependency(t *testing.T) {
ch, fakeClient := setupTests()
ch.updateRootCluster("clusterA")
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
_, err := fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{
ClusterType: xdsresource.ClusterTypeAggregate,
ClusterName: "clusterA",
PrioritizedClusterNames: []string{"clusterB", "clusterC"},
}, nil)
// Two watches should be started for both child clusters.
_, err = fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
_, err = fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
// B -> D.
fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{
ClusterType: xdsresource.ClusterTypeAggregate,
ClusterName: "clusterB",
PrioritizedClusterNames: []string{"clusterD"},
}, nil)
_, err = fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
// This shouldn't cause an update to be written to the update buffer,
// as cluster C has not received a cluster update yet.
fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{
ClusterType: xdsresource.ClusterTypeEDS,
ClusterName: "clusterD",
}, nil)
sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
select {
case <-ch.updateChannel:
t.Fatal("an update should not have been written to the update buffer")
case <-sCtx.Done():
}
// This update for C should cause an update to be written to the update
// buffer. When you search this aggregated cluster graph, each node has
// received an update. This update should only contain one clusterD, as
// clusterC does not add a clusterD child update due to the clusterD update
// already having been added as a child of clusterB.
fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{
ClusterType: xdsresource.ClusterTypeAggregate,
ClusterName: "clusterC",
PrioritizedClusterNames: []string{"clusterD"},
}, nil)
select {
case chu := <-ch.updateChannel:
if diff := cmp.Diff(chu.updates, []xdsresource.ClusterUpdate{{
ClusterType: xdsresource.ClusterTypeEDS,
ClusterName: "clusterD",
}}); diff != "" {
t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.")
}
}
// TestIgnoreDups tests the cluster (A->[B, C]; B->[C, D]). Only one watch
// should be started for cluster C. The update written to the update buffer
// should only contain one instance of cluster C correctly as a higher priority
// than D.
func (s) TestIgnoreDups(t *testing.T) {
ch, fakeClient := setupTests()
ch.updateRootCluster("clusterA")
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
_, err := fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{
ClusterType: xdsresource.ClusterTypeAggregate,
ClusterName: "clusterA",
PrioritizedClusterNames: []string{"clusterB", "clusterC"},
}, nil)
// Two watches should be started, one for each child cluster.
_, err = fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
_, err = fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
// The child cluster C should not have a watch started for it, as it is
// already part of the aggregate cluster graph as the child of the root
// cluster clusterA and has already had a watch started for it.
fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{
ClusterType: xdsresource.ClusterTypeAggregate,
ClusterName: "clusterB",
PrioritizedClusterNames: []string{"clusterC", "clusterD"},
}, nil)
// Only one watch should be started, which should be for clusterD.
name, err := fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
if name != "clusterD" {
t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: clusterD", name)
}
sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
if _, err = fakeClient.WaitForWatchCluster(sCtx); err == nil {
t.Fatalf("only one watch should have been started for the children of clusterB")
}
// This update should not cause an update to be written to the update
// buffer, as each cluster in the tree has not yet received a cluster
// update. With cluster B ignoring cluster C, the system should function as
// if cluster C was not a child of cluster B (meaning all 4 clusters should
// be required to get an update).
fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{
ClusterType: xdsresource.ClusterTypeEDS,
ClusterName: "clusterC",
}, nil)
sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
select {
case <-ch.updateChannel:
t.Fatal("an update should not have been written to the update buffer")
case <-sCtx.Done():
}
// This update causes all 4 clusters in the aggregated cluster graph to have
// received an update, so an update should be written to the update buffer
// with only a single occurrence of cluster C as a higher priority than
// cluster D.
fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{
ClusterType: xdsresource.ClusterTypeEDS,
ClusterName: "clusterD",
}, nil)
select {
case chu := <-ch.updateChannel:
if diff := cmp.Diff(chu.updates, []xdsresource.ClusterUpdate{{
ClusterType: xdsresource.ClusterTypeEDS,
ClusterName: "clusterC",
}, {
ClusterType: xdsresource.ClusterTypeEDS,
ClusterName: "clusterD",
}}); diff != "" {
t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.")
}
// Delete A's ref to C by updating A with only child B. Since B still has a
// reference to C, C's watch should not be canceled, and also an update
// should correctly be built.
fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{
ClusterType: xdsresource.ClusterTypeAggregate,
ClusterName: "clusterA",
PrioritizedClusterNames: []string{"clusterB"},
}, nil)
select {
case chu := <-ch.updateChannel:
if diff := cmp.Diff(chu.updates, []xdsresource.ClusterUpdate{{
ClusterType: xdsresource.ClusterTypeEDS,
ClusterName: "clusterC",
}, {
ClusterType: xdsresource.ClusterTypeEDS,
ClusterName: "clusterD",
}}); diff != "" {
t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.")
}
}
// TestErrorStateWholeTree tests the scenario where the aggregate cluster graph
// exceeds max depth. An error should be written to the update channel.
// Afterward, if a valid response comes in for another cluster, no update should
// be written to the update channel, as the aggregate cluster graph is still in
// the same error state.
func (s) TestErrorStateWholeTree(t *testing.T) {
ch, fakeClient := setupTests()
ch.updateRootCluster("cluster0")
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
_, err := fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
for i := 0; i <= 15; i++ {
fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{
ClusterType: xdsresource.ClusterTypeAggregate,
ClusterName: "cluster" + fmt.Sprint(i),
PrioritizedClusterNames: []string{"cluster" + fmt.Sprint(i+1)},
}, nil)
if i == 15 {
// The 16th iteration will try and create a cluster which exceeds
// max stack depth and will thus error, so no CDS Watch will be
// started for the child.
continue
}
_, err = fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
}
select {
case chu := <-ch.updateChannel:
if chu.err.Error() != "aggregate cluster graph exceeds max depth" {
t.Fatalf("Did not receive the expected error, instead received: %v", chu.err.Error())
}
case <-ctx.Done():
t.Fatal("Timed out waiting for an error to be written to update channel.")
}
// Invoke a cluster callback for a node in the graph that rests within the
// allowed depth. This will cause the system to try and construct a cluster
// update, and it shouldn't write an update as the aggregate cluster graph
// is still in an error state. Since the graph continues to stay in an error
// state, no new error needs to be written to the update buffer as that
// would be redundant information.
fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{
ClusterType: xdsresource.ClusterTypeAggregate,
ClusterName: "cluster3",
PrioritizedClusterNames: []string{"cluster4"},
}, nil)
sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
select {
case <-ch.updateChannel:
t.Fatal("an update should not have been written to the update buffer")
case <-sCtx.Done():
}
// Invoke the same cluster update for cluster 15, specifying it has a child
// cluster16. This should cause an error to be written to the update buffer,
// as it still exceeds the max depth.
fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{
ClusterType: xdsresource.ClusterTypeAggregate,
ClusterName: "cluster15",
PrioritizedClusterNames: []string{"cluster16"},
}, nil)
select {
case chu := <-ch.updateChannel:
if chu.err.Error() != "aggregate cluster graph exceeds max depth" {
t.Fatalf("Did not receive the expected error, instead received: %v", chu.err.Error())
}
case <-ctx.Done():
t.Fatal("Timed out waiting for an error to be written to update channel.")
}
// When you remove the child of cluster15 that causes the graph to be in the
// error state of exceeding max depth, the update should successfully
// construct and be written to the update buffer.
fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{
ClusterType: xdsresource.ClusterTypeEDS,
ClusterName: "cluster15",
}, nil)
select {
case chu := <-ch.updateChannel:
if diff := cmp.Diff(chu.updates, []xdsresource.ClusterUpdate{{
ClusterType: xdsresource.ClusterTypeEDS,
ClusterName: "cluster15",
}}); diff != "" {
t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.")
}
}
// TestNodeChildOfItself tests the scenario where the aggregate cluster graph
// has a node that has child node of itself. The case for this is A -> A, and
// since there is no base cluster (EDS or Logical DNS), no update should be
// written if it tries to build a cluster update.
func (s) TestNodeChildOfItself(t *testing.T) {
ch, fakeClient := setupTests()
ch.updateRootCluster("clusterA")
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
_, err := fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
// Invoke the callback informing the cluster handler that clusterA has a
// child that it is itself. Due to this child cluster being a duplicate, no
// watch should be started. Since there are no leaf nodes (i.e. EDS or
// Logical DNS), no update should be written to the update buffer.
fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{
ClusterType: xdsresource.ClusterTypeAggregate,
ClusterName: "clusterA",
PrioritizedClusterNames: []string{"clusterA"},
}, nil)
sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
if _, err := fakeClient.WaitForWatchCluster(sCtx); err == nil {
t.Fatal("Watch should not have been started for clusterA")
}
sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
select {
case <-ch.updateChannel:
t.Fatal("update should not have been written to update buffer")
case <-sCtx.Done():
}
// Invoke the callback again informing the cluster handler that clusterA has
// a child that it is itself. Due to this child cluster being a duplicate,
// no watch should be started. Since there are no leaf nodes (i.e. EDS or
// Logical DNS), no update should be written to the update buffer.
fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{
ClusterType: xdsresource.ClusterTypeAggregate,
ClusterName: "clusterA",
PrioritizedClusterNames: []string{"clusterA"},
}, nil)
sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
if _, err := fakeClient.WaitForWatchCluster(sCtx); err == nil {
t.Fatal("Watch should not have been started for clusterA")
}
sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
select {
case <-ch.updateChannel:
t.Fatal("update should not have been written to update buffer, as clusterB has not received an update yet")
case <-sCtx.Done():
}
// Inform the cluster handler that clusterA now has clusterB as a child.
// This should not cancel the watch for A, as it is still the root cluster
// and still has a ref count, not write an update to update buffer as
// cluster B has not received an update yet, and start a new watch for
// cluster B as it is not a duplicate.
fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{
ClusterType: xdsresource.ClusterTypeAggregate,
ClusterName: "clusterA",
PrioritizedClusterNames: []string{"clusterB"},
}, nil)
sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
if _, err := fakeClient.WaitForCancelClusterWatch(sCtx); err == nil {
t.Fatal("clusterA should not have been canceled, as it is still the root cluster")
}
sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
select {
case <-ch.updateChannel:
t.Fatal("update should not have been written to update buffer, as clusterB has not received an update yet")
case <-sCtx.Done():
}
gotCluster, err := fakeClient.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
if gotCluster != "clusterB" {
t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, "clusterB")
}
}