diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index bf1519bb8..9b987c00c 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -151,6 +151,11 @@ type ccUpdate struct { err error } +type clusterHandlerUpdate struct { + chu []xdsclient.ClusterUpdate + err error +} + // scUpdate wraps a subConn update received from gRPC. This is directly passed // on to the edsBalancer. type scUpdate struct { diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go index 5c746cfa1..d1074f2a1 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go @@ -640,7 +640,7 @@ func (s) TestSecurityConfigUpdate_GoodToBad(t *testing.T) { // registered watch should not be cancelled. sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) defer sCancel() - if err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded { + if _, err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded { t.Fatal("cluster watch cancelled for a non-resource-not-found-error") } } diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 4476a1532..e93df5a10 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -399,7 +399,7 @@ func (s) TestHandleClusterUpdateError(t *testing.T) { // registered watch should not be cancelled. sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) defer sCancel() - if err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded { + if _, err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded { t.Fatal("cluster watch cancelled for a non-resource-not-found-error") } // The CDS balancer has not yet created an EDS balancer. So, this resolver @@ -438,7 +438,7 @@ func (s) TestHandleClusterUpdateError(t *testing.T) { // Make sure the registered watch is not cancelled. sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) defer sCancel() - if err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded { + if _, err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded { t.Fatal("cluster watch cancelled for a non-resource-not-found-error") } // Make sure the error is forwarded to the EDS balancer. @@ -453,7 +453,7 @@ func (s) TestHandleClusterUpdateError(t *testing.T) { // request cluster resource is not found. We should continue to watch it. sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) defer sCancel() - if err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded { + if _, err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded { t.Fatal("cluster watch cancelled for a resource-not-found-error") } // Make sure the error is forwarded to the EDS balancer. @@ -485,7 +485,7 @@ func (s) TestResolverError(t *testing.T) { // registered watch should not be cancelled. sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) defer sCancel() - if err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded { + if _, err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded { t.Fatal("cluster watch cancelled for a non-resource-not-found-error") } // The CDS balancer has not yet created an EDS balancer. So, this resolver @@ -523,7 +523,7 @@ func (s) TestResolverError(t *testing.T) { // Make sure the registered watch is not cancelled. sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) defer sCancel() - if err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded { + if _, err := xdsC.WaitForCancelClusterWatch(sCtx); err != context.DeadlineExceeded { t.Fatal("cluster watch cancelled for a non-resource-not-found-error") } // Make sure the error is forwarded to the EDS balancer. @@ -535,7 +535,7 @@ func (s) TestResolverError(t *testing.T) { resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "cdsBalancer resource not found error") cdsB.ResolverError(resourceErr) // Make sure the registered watch is cancelled. - if err := xdsC.WaitForCancelClusterWatch(ctx); err != nil { + if _, err := xdsC.WaitForCancelClusterWatch(ctx); err != nil { t.Fatalf("want watch to be canceled, watchForCancel failed: %v", err) } // Make sure the error is forwarded to the EDS balancer. @@ -642,7 +642,7 @@ func (s) TestClose(t *testing.T) { // Make sure that the cluster watch registered by the CDS balancer is // cancelled. - if err := xdsC.WaitForCancelClusterWatch(ctx); err != nil { + if _, err := xdsC.WaitForCancelClusterWatch(ctx); err != nil { t.Fatal(err) } diff --git a/xds/internal/balancer/cdsbalancer/cluster_handler.go b/xds/internal/balancer/cdsbalancer/cluster_handler.go new file mode 100644 index 000000000..2dafb212f --- /dev/null +++ b/xds/internal/balancer/cdsbalancer/cluster_handler.go @@ -0,0 +1,273 @@ +/* + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cdsbalancer + +import ( + "errors" + "sync" + + xdsclient "google.golang.org/grpc/xds/internal/client" +) + +var errNotReceivedUpdate = errors.New("tried to construct a cluster update on a cluster that has not received an update") + +// clusterHandler will be given a name representing a cluster. It will then +// update the CDS policy constantly with a list of Clusters to pass down to +// XdsClusterResolverLoadBalancingPolicyConfig in a stream like fashion. +type clusterHandler struct { + // A mutex to protect entire tree of clusters. + clusterMutex sync.Mutex + root *clusterNode + rootClusterName string + + // A way to ping CDS Balancer about any updates or errors to a Node in the + // tree. This will either get called from this handler constructing an + // update or from a child with an error. Capacity of one as the only update + // CDS Balancer cares about is the most recent update. + updateChannel chan clusterHandlerUpdate + + xdsClient xdsClientInterface +} + +func (ch *clusterHandler) updateRootCluster(rootClusterName string) { + ch.clusterMutex.Lock() + defer ch.clusterMutex.Unlock() + if ch.root == nil { + // Construct a root node on first update. + ch.root = createClusterNode(rootClusterName, ch.xdsClient, ch) + ch.rootClusterName = rootClusterName + return + } + // Check if root cluster was changed. If it was, delete old one and start + // new one, if not do nothing. + if rootClusterName != ch.rootClusterName { + ch.root.delete() + ch.root = createClusterNode(rootClusterName, ch.xdsClient, ch) + ch.rootClusterName = rootClusterName + } +} + +// This function tries to construct a cluster update to send to CDS. +func (ch *clusterHandler) constructClusterUpdate() { + // If there was an error received no op, as this simply means one of the + // children hasn't received an update yet. + if clusterUpdate, err := ch.root.constructClusterUpdate(); err == nil { + // For a ClusterUpdate, the only update CDS cares about is the most + // recent one, so opportunistically drain the update channel before + // sending the new update. + select { + case <-ch.updateChannel: + default: + } + ch.updateChannel <- clusterHandlerUpdate{chu: clusterUpdate, err: nil} + } +} + +// close() is meant to be called by CDS when the CDS balancer is closed, and it +// cancels the watches for every cluster in the cluster tree. +func (ch *clusterHandler) close() { + ch.clusterMutex.Lock() + defer ch.clusterMutex.Unlock() + ch.root.delete() + ch.root = nil + ch.rootClusterName = "" +} + +// This logically represents a cluster. This handles all the logic for starting +// and stopping a cluster watch, handling any updates, and constructing a list +// recursively for the ClusterHandler. +type clusterNode struct { + // A way to cancel the watch for the cluster. + cancelFunc func() + + // A list of children, as the Node can be an aggregate Cluster. + children []*clusterNode + + // A ClusterUpdate in order to build a list of cluster updates for CDS to + // send down to child XdsClusterResolverLoadBalancingPolicy. + clusterUpdate xdsclient.ClusterUpdate + + // This boolean determines whether this Node has received an update or not. + // This isn't the best practice, but this will protect a list of Cluster + // Updates from being constructed if a cluster in the tree has not received + // an update yet. + receivedUpdate bool + + clusterHandler *clusterHandler +} + +// CreateClusterNode creates a cluster node from a given clusterName. This will +// also start the watch for that cluster. +func createClusterNode(clusterName string, xdsClient xdsClientInterface, topLevelHandler *clusterHandler) *clusterNode { + c := &clusterNode{ + clusterHandler: topLevelHandler, + } + // Communicate with the xds client here. + c.cancelFunc = xdsClient.WatchCluster(clusterName, c.handleResp) + return c +} + +// This function cancels the cluster watch on the cluster and all of it's +// children. +func (c *clusterNode) delete() { + c.cancelFunc() + for _, child := range c.children { + child.delete() + } +} + +// Construct cluster update (potentially a list of ClusterUpdates) for a node. +func (c *clusterNode) constructClusterUpdate() ([]xdsclient.ClusterUpdate, error) { + // If the cluster has not yet received an update, the cluster update is not + // yet ready. + if !c.receivedUpdate { + return nil, errNotReceivedUpdate + } + + // Base case - LogicalDNS or EDS. Both of these cluster types will be tied + // to a single ClusterUpdate. + if c.clusterUpdate.ClusterType != xdsclient.ClusterTypeAggregate { + return []xdsclient.ClusterUpdate{c.clusterUpdate}, nil + } + + // If an aggregate construct a list by recursively calling down to all of + // it's children. + var childrenUpdates []xdsclient.ClusterUpdate + for _, child := range c.children { + childUpdateList, err := child.constructClusterUpdate() + if err != nil { + return nil, err + } + childrenUpdates = append(childrenUpdates, childUpdateList...) + } + return childrenUpdates, nil +} + +// handleResp handles a xds response for a particular cluster. This function +// also handles any logic with regards to any child state that may have changed. +// At the end of the handleResp(), the clusterUpdate will be pinged in certain +// situations to try and construct an update to send back to CDS. +func (c *clusterNode) handleResp(clusterUpdate xdsclient.ClusterUpdate, err error) { + c.clusterHandler.clusterMutex.Lock() + defer c.clusterHandler.clusterMutex.Unlock() + if err != nil { // Write this error for run() to pick up in CDS LB policy. + // For a ClusterUpdate, the only update CDS cares about is the most + // recent one, so opportunistically drain the update channel before + // sending the new update. + select { + case <-c.clusterHandler.updateChannel: + default: + } + c.clusterHandler.updateChannel <- clusterHandlerUpdate{chu: nil, err: err} + return + } + + // deltaInClusterUpdateFields determines whether there was a delta in the + // clusterUpdate fields (forgetting the children). This will be used to help + // determine whether to pingClusterHandler at the end of this callback or + // not. + deltaInClusterUpdateFields := clusterUpdate.ServiceName != c.clusterUpdate.ServiceName || clusterUpdate.ClusterType != c.clusterUpdate.ClusterType + c.receivedUpdate = true + c.clusterUpdate = clusterUpdate + + // If the cluster was a leaf node, if the cluster update received had change + // in the cluster update then the overall cluster update would change and + // there is a possibility for the overall update to build so ping cluster + // handler to return. Also, if there was any children from previously, + // delete the children, as the cluster type is no longer an aggregate + // cluster. + if clusterUpdate.ClusterType != xdsclient.ClusterTypeAggregate { + for _, child := range c.children { + child.delete() + } + c.children = nil + if deltaInClusterUpdateFields { + c.clusterHandler.constructClusterUpdate() + } + return + } + + // Aggregate cluster handling. + newChildren := make(map[string]bool) + for _, childName := range clusterUpdate.PrioritizedClusterNames { + newChildren[childName] = true + } + + // These booleans help determine whether this callback will ping the overall + // clusterHandler to try and construct an update to send back to CDS. This + // will be determined by whether there would be a change in the overall + // clusterUpdate for the whole tree (ex. change in clusterUpdate for current + // cluster or a deleted child) and also if there's even a possibility for + // the update to build (ex. if a child is created and a watch is started, + // that child hasn't received an update yet due to the mutex lock on this + // callback). + var createdChild, deletedChild bool + + // This map will represent the current children of the cluster. It will be + // first added to in order to represent the new children. It will then have + // any children deleted that are no longer present. Then, from the cluster + // update received, will be used to construct the new child list. + mapCurrentChildren := make(map[string]*clusterNode) + for _, child := range c.children { + mapCurrentChildren[child.clusterUpdate.ServiceName] = child + } + + // Add and construct any new child nodes. + for child := range newChildren { + if _, inChildrenAlready := mapCurrentChildren[child]; !inChildrenAlready { + createdChild = true + mapCurrentChildren[child] = createClusterNode(child, c.clusterHandler.xdsClient, c.clusterHandler) + } + } + + // Delete any child nodes no longer in the aggregate cluster's children. + for child := range mapCurrentChildren { + if _, stillAChild := newChildren[child]; !stillAChild { + deletedChild = true + mapCurrentChildren[child].delete() + delete(mapCurrentChildren, child) + } + } + + // The order of the children list matters, so use the clusterUpdate from + // xdsclient as the ordering, and use that logical ordering for the new + // children list. This will be a mixture of child nodes which are all + // already constructed in the mapCurrentChildrenMap. + var children = make([]*clusterNode, 0, len(clusterUpdate.PrioritizedClusterNames)) + + for _, orderedChild := range clusterUpdate.PrioritizedClusterNames { + // The cluster's already have watches started for them in xds client, so + // you can use these pointers to construct the new children list, you + // just have to put them in the correct order using the original cluster + // update. + currentChild := mapCurrentChildren[orderedChild] + children = append(children, currentChild) + } + + c.children = children + + // If the cluster is an aggregate cluster, if this callback created any new + // child cluster nodes, then there's no possibility for a full cluster + // update to successfully build, as those created children will not have + // received an update yet. However, if there was simply a child deleted, + // then there is a possibility that it will have a full cluster update to + // build and also will have a changed overall cluster update from the + // deleted child. + if deletedChild && !createdChild { + c.clusterHandler.constructClusterUpdate() + } +} diff --git a/xds/internal/balancer/cdsbalancer/cluster_handler_test.go b/xds/internal/balancer/cdsbalancer/cluster_handler_test.go new file mode 100644 index 000000000..96fca8a26 --- /dev/null +++ b/xds/internal/balancer/cdsbalancer/cluster_handler_test.go @@ -0,0 +1,676 @@ +// +build go1.12 + +/* + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cdsbalancer + +import ( + "context" + "errors" + "testing" + + "github.com/google/go-cmp/cmp" + xdsclient "google.golang.org/grpc/xds/internal/client" + "google.golang.org/grpc/xds/internal/testutils/fakeclient" +) + +const ( + edsService = "EDS Service" + logicalDNSService = "Logical DNS Service" + edsService2 = "EDS Service 2" + logicalDNSService2 = "Logical DNS Service 2" + aggregateClusterService = "Aggregate Cluster Service" +) + +// setupTests creates a clusterHandler with a fake xds client for control over +// xds client. +func setupTests(t *testing.T) (*clusterHandler, *fakeclient.Client) { + xdsC := fakeclient.NewClient() + ch := &clusterHandler{ + xdsClient: xdsC, + // This is will be how the update channel is created in cds. It will be + // a separate channel to the buffer.Unbounded. This channel will also be + // read from to test any cluster updates. + updateChannel: make(chan clusterHandlerUpdate, 1), + } + return ch, xdsC +} + +// Simplest case: the cluster handler receives a cluster name, handler starts a +// watch for that cluster, xds client returns that it is a Leaf Node (EDS or +// LogicalDNS), not a tree, so expectation that update is written to buffer +// which will be read by CDS LB. +func (s) TestSuccessCaseLeafNode(t *testing.T) { + tests := []struct { + name string + clusterName string + clusterUpdate xdsclient.ClusterUpdate + }{ + {name: "test-update-root-cluster-EDS-success", + clusterName: edsService, + clusterUpdate: xdsclient.ClusterUpdate{ + ClusterType: xdsclient.ClusterTypeEDS, + ServiceName: edsService, + }}, + { + name: "test-update-root-cluster-Logical-DNS-success", + clusterName: logicalDNSService, + clusterUpdate: xdsclient.ClusterUpdate{ + ClusterType: xdsclient.ClusterTypeLogicalDNS, + ServiceName: logicalDNSService, + }}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ch, fakeClient := setupTests(t) + // When you first update the root cluster, it should hit the code + // path which will start a cluster node for that root. Updating the + // root cluster logically represents a ping from a ClientConn. + ch.updateRootCluster(test.clusterName) + // Starting a cluster node involves communicating with the + // xdsClient, telling it to watch a cluster. + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + gotCluster, err := fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + if gotCluster != test.clusterName { + t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, test.clusterName) + } + // Invoke callback with xds client with a certain clusterUpdate. Due + // to this cluster update filling out the whole cluster tree, as the + // cluster is of a root type (EDS or Logical DNS) and not an + // aggregate cluster, this should trigger the ClusterHandler to + // write to the update buffer to update the CDS policy. + fakeClient.InvokeWatchClusterCallback(test.clusterUpdate, nil) + select { + case chu := <-ch.updateChannel: + if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{test.clusterUpdate}); diff != "" { + t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for update from update channel.") + } + // Close the clusterHandler. This is meant to be called when the CDS + // Balancer is closed, and the call should cancel the watch for this + // cluster. + ch.close() + clusterNameDeleted, err := fakeClient.WaitForCancelClusterWatch(ctx) + if err != nil { + t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) + } + if clusterNameDeleted != test.clusterName { + t.Fatalf("xdsClient.CancelCDS called for cluster %v, want: %v", clusterNameDeleted, logicalDNSService) + } + }) + } +} + +// The cluster handler receives a cluster name, handler starts a watch for that +// cluster, xds client returns that it is a Leaf Node (EDS or LogicalDNS), not a +// tree, so expectation that first update is written to buffer which will be +// read by CDS LB. Then, send a new cluster update that is different, with the +// expectation that it is also written to the update buffer to send back to CDS. +func (s) TestSuccessCaseLeafNodeThenNewUpdate(t *testing.T) { + tests := []struct { + name string + clusterName string + clusterUpdate xdsclient.ClusterUpdate + newClusterUpdate xdsclient.ClusterUpdate + }{ + {name: "test-update-root-cluster-then-new-update-EDS-success", + clusterName: edsService, + clusterUpdate: xdsclient.ClusterUpdate{ + ClusterType: xdsclient.ClusterTypeEDS, + ServiceName: edsService, + }, + newClusterUpdate: xdsclient.ClusterUpdate{ + ClusterType: xdsclient.ClusterTypeEDS, + ServiceName: edsService2, + }, + }, + { + name: "test-update-root-cluster-then-new-update-Logical-DNS-success", + clusterName: logicalDNSService, + clusterUpdate: xdsclient.ClusterUpdate{ + ClusterType: xdsclient.ClusterTypeLogicalDNS, + ServiceName: logicalDNSService, + }, + newClusterUpdate: xdsclient.ClusterUpdate{ + ClusterType: xdsclient.ClusterTypeLogicalDNS, + ServiceName: logicalDNSService2, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ch, fakeClient := setupTests(t) + ch.updateRootCluster(test.clusterName) + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + _, err := fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + fakeClient.InvokeWatchClusterCallback(test.clusterUpdate, nil) + select { + case <-ch.updateChannel: + case <-ctx.Done(): + t.Fatal("Timed out waiting for update from updateChannel.") + } + + // Check that sending the same cluster update does not induce a + // update to be written to update buffer. + fakeClient.InvokeWatchClusterCallback(test.clusterUpdate, nil) + shouldNotHappenCtx, shouldNotHappenCtxCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer shouldNotHappenCtxCancel() + select { + case <-ch.updateChannel: + t.Fatal("Should not have written an update to update buffer, as cluster update did not change.") + case <-shouldNotHappenCtx.Done(): + } + + // Above represents same thing as the simple + // TestSuccessCaseLeafNode, extra behavior + validation (clusterNode + // which is a leaf receives a changed clusterUpdate, which should + // ping clusterHandler, which should then write to the update + // buffer). + fakeClient.InvokeWatchClusterCallback(test.newClusterUpdate, nil) + select { + case chu := <-ch.updateChannel: + if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{test.newClusterUpdate}); diff != "" { + t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for update from updateChannel.") + } + }) + } +} + +// TestUpdateRootClusterAggregateSuccess tests the case where an aggregate +// cluster is a root pointing to two child clusters one of type EDS and the +// other of type LogicalDNS. This test will then send cluster updates for both +// the children, and at the end there should be a successful clusterUpdate +// written to the update buffer to send back to CDS. +func (s) TestUpdateRootClusterAggregateSuccess(t *testing.T) { + ch, fakeClient := setupTests(t) + ch.updateRootCluster(aggregateClusterService) + + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + gotCluster, err := fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + if gotCluster != aggregateClusterService { + t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, aggregateClusterService) + } + + // The xdsClient telling the clusterNode that the cluster type is an + // aggregate cluster which will cause a lot of downstream behavior. For a + // cluster type that isn't an aggregate, the behavior is simple. The + // clusterNode will simply get a successful update, which will then ping the + // clusterHandler which will successfully build an update to send to the CDS + // policy. In the aggregate cluster case, the handleResp callback must also + // start watches for the aggregate cluster's children. The ping to the + // clusterHandler at the end of handleResp should be a no-op, as neither the + // EDS or LogicalDNS child clusters have received an update yet. + fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{ + ClusterType: xdsclient.ClusterTypeAggregate, + ServiceName: aggregateClusterService, + PrioritizedClusterNames: []string{edsService, logicalDNSService}, + }, nil) + + // xds client should be called to start a watch for one of the child + // clusters of the aggregate. The order of the children in the update + // written to the buffer to send to CDS matters, however there is no + // guarantee on the order it will start the watches of the children. + gotCluster, err = fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + if gotCluster != edsService { + if gotCluster != logicalDNSService { + t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, edsService) + } + } + + // xds client should then be called to start a watch for the second child + // cluster. + gotCluster, err = fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + if gotCluster != edsService { + if gotCluster != logicalDNSService { + t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, logicalDNSService) + } + } + + // The handleResp() call on the root aggregate cluster should not ping the + // cluster handler to try and construct an update, as the handleResp() + // callback knows that when a child is created, it cannot possibly build a + // successful update yet. Thus, there should be nothing in the update + // channel. + + shouldNotHappenCtx, shouldNotHappenCtxCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer shouldNotHappenCtxCancel() + + select { + case <-ch.updateChannel: + t.Fatal("Cluster Handler wrote an update to updateChannel when it shouldn't have, as each node in the full cluster tree has not yet received an update") + case <-shouldNotHappenCtx.Done(): + } + + // Send callback for the EDS child cluster. + fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{ + ClusterType: xdsclient.ClusterTypeEDS, + ServiceName: edsService, + }, nil) + + // EDS child cluster will ping the Cluster Handler, to try an update, which + // still won't successfully build as the LogicalDNS child of the root + // aggregate cluster has not yet received and handled an update. + select { + case <-ch.updateChannel: + t.Fatal("Cluster Handler wrote an update to updateChannel when it shouldn't have, as each node in the full cluster tree has not yet received an update") + case <-shouldNotHappenCtx.Done(): + } + + // Invoke callback for Logical DNS child cluster. + + fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{ + ClusterType: xdsclient.ClusterTypeLogicalDNS, + ServiceName: logicalDNSService, + }, nil) + + // Will Ping Cluster Handler, which will finally successfully build an + // update as all nodes in the tree of clusters have received an update. + // Since this cluster is an aggregate cluster comprised of two children, the + // returned update should be length 2, as the xds cluster resolver LB policy + // only cares about the full list of LogicalDNS and EDS clusters + // representing the base nodes of the tree of clusters. This list should be + // ordered as per the cluster update. + select { + case chu := <-ch.updateChannel: + if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{{ + ClusterType: xdsclient.ClusterTypeEDS, + ServiceName: edsService, + }, { + ClusterType: xdsclient.ClusterTypeLogicalDNS, + ServiceName: logicalDNSService, + }}); diff != "" { + t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.") + } +} + +// TestUpdateRootClusterAggregateThenChangeChild tests the scenario where you +// have an aggregate cluster with an EDS child and a LogicalDNS child, then you +// change one of the children and send an update for the changed child. This +// should write a new update to the update buffer to send back to CDS. +func (s) TestUpdateRootClusterAggregateThenChangeChild(t *testing.T) { + // This initial code is the same as the test for the aggregate success case, + // except without validations. This will get this test to the point where it + // can change one of the children. + ch, fakeClient := setupTests(t) + ch.updateRootCluster(aggregateClusterService) + + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + _, err := fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + + fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{ + ClusterType: xdsclient.ClusterTypeAggregate, + ServiceName: aggregateClusterService, + PrioritizedClusterNames: []string{edsService, logicalDNSService}, + }, nil) + fakeClient.WaitForWatchCluster(ctx) + fakeClient.WaitForWatchCluster(ctx) + fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{ + ClusterType: xdsclient.ClusterTypeEDS, + ServiceName: edsService, + }, nil) + fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{ + ClusterType: xdsclient.ClusterTypeLogicalDNS, + ServiceName: logicalDNSService, + }, nil) + + select { + case <-ch.updateChannel: + case <-ctx.Done(): + t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.") + } + + fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{ + ClusterType: xdsclient.ClusterTypeAggregate, + ServiceName: aggregateClusterService, + PrioritizedClusterNames: []string{edsService, logicalDNSService2}, + }, nil) + + // The cluster update let's the aggregate cluster know that it's children + // are now edsService and logicalDNSService2, which implies that the + // aggregateCluster lost it's old logicalDNSService child. Thus, the + // logicalDNSService child should be deleted. + clusterNameDeleted, err := fakeClient.WaitForCancelClusterWatch(ctx) + if err != nil { + t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) + } + if clusterNameDeleted != logicalDNSService { + t.Fatalf("xdsClient.CancelCDS called for cluster %v, want: %v", clusterNameDeleted, logicalDNSService) + } + + // The handleResp() callback should then start a watch for + // logicalDNSService2. + clusterNameCreated, err := fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + if clusterNameCreated != logicalDNSService2 { + t.Fatalf("xdsClient.WatchCDS called for cluster %v, want: %v", clusterNameCreated, logicalDNSService2) + } + + // handleResp() should try and send an update here, but it will fail as + // logicalDNSService2 has not yet received an update. + shouldNotHappenCtx, shouldNotHappenCtxCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer shouldNotHappenCtxCancel() + select { + case <-ch.updateChannel: + t.Fatal("Cluster Handler wrote an update to updateChannel when it shouldn't have, as each node in the full cluster tree has not yet received an update") + case <-shouldNotHappenCtx.Done(): + } + + // Invoke a callback for the new logicalDNSService2 - this will fill out the + // tree with successful updates. + fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{ + ClusterType: xdsclient.ClusterTypeLogicalDNS, + ServiceName: logicalDNSService2, + }, nil) + + // Behavior: This update make every node in the tree of cluster have + // received an update. Thus, at the end of this callback, when you ping the + // clusterHandler to try and construct an update, the update should now + // successfully be written to update buffer to send back to CDS. This new + // update should contain the new child of LogicalDNS2. + + select { + case chu := <-ch.updateChannel: + if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{{ + ClusterType: xdsclient.ClusterTypeEDS, + ServiceName: edsService, + }, { + ClusterType: xdsclient.ClusterTypeLogicalDNS, + ServiceName: logicalDNSService2, + }}); diff != "" { + t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.") + } +} + +// TestUpdateRootClusterAggregateThenChangeRootToEDS tests the situation where +// you have a fully updated aggregate cluster (where AggregateCluster success +// test gets you) as the root cluster, then you update that root cluster to a +// cluster of type EDS. +func (s) TestUpdateRootClusterAggregateThenChangeRootToEDS(t *testing.T) { + // This initial code is the same as the test for the aggregate success case, + // except without validations. This will get this test to the point where it + // can update the root cluster to one of type EDS. + ch, fakeClient := setupTests(t) + ch.updateRootCluster(aggregateClusterService) + + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + _, err := fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + + fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{ + ClusterType: xdsclient.ClusterTypeAggregate, + ServiceName: aggregateClusterService, + PrioritizedClusterNames: []string{edsService, logicalDNSService}, + }, nil) + fakeClient.WaitForWatchCluster(ctx) + fakeClient.WaitForWatchCluster(ctx) + fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{ + ClusterType: xdsclient.ClusterTypeEDS, + ServiceName: edsService, + }, nil) + fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{ + ClusterType: xdsclient.ClusterTypeLogicalDNS, + ServiceName: logicalDNSService, + }, nil) + + select { + case <-ch.updateChannel: + case <-ctx.Done(): + t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.") + } + + // Changes the root aggregate cluster to a EDS cluster. This should delete + // the root aggregate cluster and all of it's children by successfully + // canceling the watches for them. + ch.updateRootCluster(edsService2) + + // Reads from the cancel channel, should first be type Aggregate, then EDS + // then Logical DNS. + clusterNameDeleted, err := fakeClient.WaitForCancelClusterWatch(ctx) + if err != nil { + t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) + } + if clusterNameDeleted != aggregateClusterService { + t.Fatalf("xdsClient.CancelCDS called for cluster %v, want: %v", clusterNameDeleted, logicalDNSService) + } + + clusterNameDeleted, err = fakeClient.WaitForCancelClusterWatch(ctx) + if err != nil { + t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) + } + if clusterNameDeleted != edsService { + t.Fatalf("xdsClient.CancelCDS called for cluster %v, want: %v", clusterNameDeleted, logicalDNSService) + } + + clusterNameDeleted, err = fakeClient.WaitForCancelClusterWatch(ctx) + if err != nil { + t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) + } + if clusterNameDeleted != logicalDNSService { + t.Fatalf("xdsClient.CancelCDS called for cluster %v, want: %v", clusterNameDeleted, logicalDNSService) + } + + // After deletion, it should start a watch for the EDS Cluster. The behavior + // for this EDS Cluster receiving an update from xds client and then + // successfully writing an update to send back to CDS is already tested in + // the updateEDS success case. + gotCluster, err := fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + if gotCluster != edsService2 { + t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, edsService2) + } +} + +// TestHandleRespInvokedWithError tests that when handleResp is invoked with an +// error, that the error is successfully written to the update buffer. +func (s) TestHandleRespInvokedWithError(t *testing.T) { + ch, fakeClient := setupTests(t) + ch.updateRootCluster(edsService) + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + _, err := fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{}, errors.New("some error")) + select { + case chu := <-ch.updateChannel: + if chu.err.Error() != "some error" { + t.Fatalf("Did not receive the expected error, instead received: %v", chu.err.Error()) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for update from update channel.") + } +} + +// TestSwitchClusterNodeBetweenLeafAndAggregated tests having an existing +// cluster node switch between a leaf and an aggregated cluster. When the +// cluster switches from a leaf to an aggregated cluster, it should add +// children, and when it switches back to a leaf, it should delete those new +// children and also successfully write a cluster update to the update buffer. +func (s) TestSwitchClusterNodeBetweenLeafAndAggregated(t *testing.T) { + // Getting the test to the point where there's a root cluster which is a eds + // leaf. + ch, fakeClient := setupTests(t) + ch.updateRootCluster(edsService2) + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + _, err := fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{ + ClusterType: xdsclient.ClusterTypeEDS, + ServiceName: edsService2, + }, nil) + select { + case <-ch.updateChannel: + case <-ctx.Done(): + t.Fatal("Timed out waiting for update from update channel.") + } + // Switch the cluster to an aggregate cluster, this should cause two new + // child watches to be created. + fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{ + ClusterType: xdsclient.ClusterTypeAggregate, + ServiceName: edsService2, + PrioritizedClusterNames: []string{edsService, logicalDNSService}, + }, nil) + + // xds client should be called to start a watch for one of the child + // clusters of the aggregate. The order of the children in the update + // written to the buffer to send to CDS matters, however there is no + // guarantee on the order it will start the watches of the children. + gotCluster, err := fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + if gotCluster != edsService { + if gotCluster != logicalDNSService { + t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, edsService) + } + } + + // xds client should then be called to start a watch for the second child + // cluster. + gotCluster, err = fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + if gotCluster != edsService { + if gotCluster != logicalDNSService { + t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, logicalDNSService) + } + } + + // After starting a watch for the second child cluster, there should be no + // more watches started on the xds client. + shouldNotHappenCtx, shouldNotHappenCtxCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer shouldNotHappenCtxCancel() + gotCluster, err = fakeClient.WaitForWatchCluster(shouldNotHappenCtx) + if err == nil { + t.Fatalf("xdsClient.WatchCDS called for cluster: %v, no more watches should be started.", gotCluster) + } + + // The handleResp() call on the root aggregate cluster should not ping the + // cluster handler to try and construct an update, as the handleResp() + // callback knows that when a child is created, it cannot possibly build a + // successful update yet. Thus, there should be nothing in the update + // channel. + + shouldNotHappenCtx, shouldNotHappenCtxCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer shouldNotHappenCtxCancel() + + select { + case <-ch.updateChannel: + t.Fatal("Cluster Handler wrote an update to updateChannel when it shouldn't have, as each node in the full cluster tree has not yet received an update") + case <-shouldNotHappenCtx.Done(): + } + + // Switch the cluster back to an EDS Cluster. This should cause the two + // children to be deleted. + fakeClient.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{ + ClusterType: xdsclient.ClusterTypeEDS, + ServiceName: edsService2, + }, nil) + + // Should delete the two children (no guarantee of ordering deleted, which + // is ok), then successfully write an update to the update buffer as the + // full cluster tree has received updates. + clusterNameDeleted, err := fakeClient.WaitForCancelClusterWatch(ctx) + if err != nil { + t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) + } + // No guarantee of ordering, so one of the children should be deleted first. + if clusterNameDeleted != edsService { + if clusterNameDeleted != logicalDNSService { + t.Fatalf("xdsClient.CancelCDS called for cluster %v, want either: %v or: %v", clusterNameDeleted, edsService, logicalDNSService) + } + } + // Then the other child should be deleted. + clusterNameDeleted, err = fakeClient.WaitForCancelClusterWatch(ctx) + if err != nil { + t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) + } + if clusterNameDeleted != edsService { + if clusterNameDeleted != logicalDNSService { + t.Fatalf("xdsClient.CancelCDS called for cluster %v, want either: %v or: %v", clusterNameDeleted, edsService, logicalDNSService) + } + } + + // After cancelling a watch for the second child cluster, there should be no + // more watches cancelled on the xds client. + shouldNotHappenCtx, shouldNotHappenCtxCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer shouldNotHappenCtxCancel() + gotCluster, err = fakeClient.WaitForCancelClusterWatch(shouldNotHappenCtx) + if err == nil { + t.Fatalf("xdsClient.WatchCDS called for cluster: %v, no more watches should be cancelled.", gotCluster) + } + + // Then an update should successfully be written to the update buffer. + select { + case chu := <-ch.updateChannel: + if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{{ + ClusterType: xdsclient.ClusterTypeEDS, + ServiceName: edsService2, + }}); diff != "" { + t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for update from update channel.") + } +} diff --git a/xds/internal/testutils/fakeclient/client.go b/xds/internal/testutils/fakeclient/client.go index 0978125b8..eb4c659e5 100644 --- a/xds/internal/testutils/fakeclient/client.go +++ b/xds/internal/testutils/fakeclient/client.go @@ -45,10 +45,10 @@ type Client struct { loadStore *load.Store bootstrapCfg *bootstrap.Config - ldsCb func(xdsclient.ListenerUpdate, error) - rdsCb func(xdsclient.RouteConfigUpdate, error) - cdsCb func(xdsclient.ClusterUpdate, error) - edsCb func(xdsclient.EndpointsUpdate, error) + ldsCb func(xdsclient.ListenerUpdate, error) + rdsCb func(xdsclient.RouteConfigUpdate, error) + cdsCbs map[string]func(xdsclient.ClusterUpdate, error) + edsCb func(xdsclient.EndpointsUpdate, error) } // WatchListener registers a LDS watch. @@ -121,10 +121,13 @@ func (xdsC *Client) WaitForCancelRouteConfigWatch(ctx context.Context) error { // WatchCluster registers a CDS watch. func (xdsC *Client) WatchCluster(clusterName string, callback func(xdsclient.ClusterUpdate, error)) func() { - xdsC.cdsCb = callback + // Due to the tree like structure of aggregate clusters, there can be multiple callbacks persisted for each cluster + // node. However, the client doesn't care about the parent child relationship between the nodes, only that it invokes + // the right callback for a particular cluster. + xdsC.cdsCbs[clusterName] = callback xdsC.cdsWatchCh.Send(clusterName) return func() { - xdsC.cdsCancelCh.Send(nil) + xdsC.cdsCancelCh.Send(clusterName) } } @@ -143,14 +146,28 @@ func (xdsC *Client) WaitForWatchCluster(ctx context.Context) (string, error) { // Not thread safe with WatchCluster. Only call this after // WaitForWatchCluster. func (xdsC *Client) InvokeWatchClusterCallback(update xdsclient.ClusterUpdate, err error) { - xdsC.cdsCb(update, err) + // Keeps functionality with previous usage of this, if single callback call that callback. + if len(xdsC.cdsCbs) == 1 { + var clusterName string + for cluster := range xdsC.cdsCbs { + clusterName = cluster + } + xdsC.cdsCbs[clusterName](update, err) + } else { + // Have what callback you call with the update determined by the service name in the ClusterUpdate. Left up to the + // caller to make sure the cluster update matches with a persisted callback. + xdsC.cdsCbs[update.ServiceName](update, err) + } } // WaitForCancelClusterWatch waits for a CDS watch to be cancelled and returns // context.DeadlineExceeded otherwise. -func (xdsC *Client) WaitForCancelClusterWatch(ctx context.Context) error { - _, err := xdsC.cdsCancelCh.Receive(ctx) - return err +func (xdsC *Client) WaitForCancelClusterWatch(ctx context.Context) (string, error) { + clusterNameReceived, err := xdsC.cdsCancelCh.Receive(ctx) + if err != nil { + return "", err + } + return clusterNameReceived.(string), err } // WatchEndpoints registers an EDS watch for provided clusterName. @@ -251,14 +268,15 @@ func NewClientWithName(name string) *Client { name: name, ldsWatchCh: testutils.NewChannel(), rdsWatchCh: testutils.NewChannel(), - cdsWatchCh: testutils.NewChannel(), + cdsWatchCh: testutils.NewChannelWithSize(10), edsWatchCh: testutils.NewChannel(), ldsCancelCh: testutils.NewChannel(), rdsCancelCh: testutils.NewChannel(), - cdsCancelCh: testutils.NewChannel(), + cdsCancelCh: testutils.NewChannelWithSize(10), edsCancelCh: testutils.NewChannel(), loadReportCh: testutils.NewChannel(), closeCh: testutils.NewChannel(), loadStore: load.NewStore(), + cdsCbs: make(map[string]func(xdsclient.ClusterUpdate, error)), } }