diff --git a/xds/internal/client/cds.go b/xds/internal/client/cds.go new file mode 100644 index 000000000..ae40e8eca --- /dev/null +++ b/xds/internal/client/cds.go @@ -0,0 +1,112 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package client + +import ( + "fmt" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + "github.com/golang/protobuf/ptypes" + "google.golang.org/grpc/grpclog" +) + +// newCDSRequest generates an CDS request proto for the provided clusterName, +// to be sent out on the wire. +func (v2c *v2Client) newCDSRequest(clusterName []string) *xdspb.DiscoveryRequest { + return &xdspb.DiscoveryRequest{ + Node: v2c.nodeProto, + TypeUrl: clusterURL, + ResourceNames: clusterName, + } +} + +// sendCDS sends an CDS request for provided clusterName on the provided +// stream. +func (v2c *v2Client) sendCDS(stream adsStream, clusterName []string) bool { + if err := stream.Send(v2c.newCDSRequest(clusterName)); err != nil { + grpclog.Warningf("xds: CDS request for resource %v failed: %v", clusterName, err) + return false + } + return true +} + +// handleCDSResponse processes an CDS response received from the xDS server. On +// receipt of a good response, it also invokes the registered watcher callback. +func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error { + v2c.mu.Lock() + defer v2c.mu.Unlock() + + wi := v2c.watchMap[cdsResource] + if wi == nil { + return fmt.Errorf("xds: no CDS watcher found when handling CDS response: %+v", resp) + } + + var returnUpdate cdsUpdate + localCache := make(map[string]cdsUpdate) + for _, r := range resp.GetResources() { + var resource ptypes.DynamicAny + if err := ptypes.UnmarshalAny(r, &resource); err != nil { + return fmt.Errorf("xds: failed to unmarshal resource in CDS response: %v", err) + } + cluster, ok := resource.Message.(*xdspb.Cluster) + if !ok { + return fmt.Errorf("xds: unexpected resource type: %T in CDS response", resource.Message) + } + update, err := validateCluster(cluster) + if err != nil { + return err + } + + // If the Cluster message in the CDS response did not contain a + // serviceName, we will just use the clusterName for EDS. + if update.serviceName == "" { + update.serviceName = cluster.GetName() + } + localCache[cluster.GetName()] = update + if cluster.GetName() == wi.target[0] { + returnUpdate = update + } + } + v2c.cdsCache = localCache + + var err error + if returnUpdate.serviceName == "" { + err = fmt.Errorf("xds: CDS target %s not found in received response %+v", wi.target, resp) + } + wi.stopTimer() + wi.callback.(cdsCallback)(returnUpdate, err) + return nil +} + +func validateCluster(cluster *xdspb.Cluster) (cdsUpdate, error) { + emptyUpdate := cdsUpdate{serviceName: "", doLRS: false} + switch { + case cluster.GetType() != xdspb.Cluster_EDS: + return emptyUpdate, fmt.Errorf("xds: unexpected cluster type %v in response: %+v", cluster.GetType(), cluster) + case cluster.GetEdsClusterConfig().GetEdsConfig().GetAds() == nil: + return emptyUpdate, fmt.Errorf("xds: unexpected edsConfig in response: %+v", cluster) + case cluster.GetLbPolicy() != xdspb.Cluster_ROUND_ROBIN: + return emptyUpdate, fmt.Errorf("xds: unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster) + } + + return cdsUpdate{ + serviceName: cluster.GetEdsClusterConfig().GetServiceName(), + doLRS: cluster.GetLrsServer().GetSelf() != nil, + }, nil +} diff --git a/xds/internal/client/cds_test.go b/xds/internal/client/cds_test.go new file mode 100644 index 000000000..484b5b211 --- /dev/null +++ b/xds/internal/client/cds_test.go @@ -0,0 +1,539 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package client + +import ( + "errors" + "fmt" + "reflect" + "testing" + "time" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + "github.com/golang/protobuf/proto" + anypb "github.com/golang/protobuf/ptypes/any" + "google.golang.org/grpc/xds/internal/client/fakexds" +) + +const ( + clusterName1 = "foo-cluster" + clusterName2 = "bar-cluster" + serviceName1 = "foo-service" + serviceName2 = "bar-service" +) + +func (v2c *v2Client) cloneCDSCacheForTesting() map[string]cdsUpdate { + v2c.mu.Lock() + defer v2c.mu.Unlock() + + cloneCache := make(map[string]cdsUpdate) + for k, v := range v2c.cdsCache { + cloneCache[k] = v + } + return cloneCache +} + +func TestValidateCluster(t *testing.T) { + emptyUpdate := cdsUpdate{serviceName: "", doLRS: false} + tests := []struct { + name string + cluster *xdspb.Cluster + wantUpdate cdsUpdate + wantErr bool + }{ + { + name: "non-eds-cluster-type", + cluster: &xdspb.Cluster{ + ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_STATIC}, + EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ + EdsConfig: &corepb.ConfigSource{ + ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ + Ads: &corepb.AggregatedConfigSource{}, + }, + }, + }, + LbPolicy: xdspb.Cluster_LEAST_REQUEST, + }, + wantUpdate: emptyUpdate, + wantErr: true, + }, + { + name: "no-eds-config", + cluster: &xdspb.Cluster{ + ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, + LbPolicy: xdspb.Cluster_ROUND_ROBIN, + }, + wantUpdate: emptyUpdate, + wantErr: true, + }, + { + name: "no-ads-config-source", + cluster: &xdspb.Cluster{ + ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, + EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{}, + LbPolicy: xdspb.Cluster_ROUND_ROBIN, + }, + wantUpdate: emptyUpdate, + wantErr: true, + }, + { + name: "non-round-robin-lb-policy", + cluster: &xdspb.Cluster{ + ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, + EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ + EdsConfig: &corepb.ConfigSource{ + ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ + Ads: &corepb.AggregatedConfigSource{}, + }, + }, + }, + LbPolicy: xdspb.Cluster_LEAST_REQUEST, + }, + wantUpdate: emptyUpdate, + wantErr: true, + }, + { + name: "happy-case-no-service-name-no-lrs", + cluster: &xdspb.Cluster{ + ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, + EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ + EdsConfig: &corepb.ConfigSource{ + ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ + Ads: &corepb.AggregatedConfigSource{}, + }, + }, + }, + LbPolicy: xdspb.Cluster_ROUND_ROBIN, + }, + wantUpdate: emptyUpdate, + }, + { + name: "happy-case-no-lrs", + cluster: &xdspb.Cluster{ + ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, + EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ + EdsConfig: &corepb.ConfigSource{ + ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ + Ads: &corepb.AggregatedConfigSource{}, + }, + }, + ServiceName: serviceName1, + }, + LbPolicy: xdspb.Cluster_ROUND_ROBIN, + }, + wantUpdate: cdsUpdate{serviceName: serviceName1, doLRS: false}, + }, + { + name: "happiest-case", + cluster: goodCluster1, + wantUpdate: cdsUpdate{serviceName: serviceName1, doLRS: true}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + gotUpdate, gotErr := validateCluster(test.cluster) + if (gotErr != nil) != test.wantErr { + t.Errorf("validateCluster(%+v) returned error: %v, wantErr: %v", test.cluster, gotErr, test.wantErr) + } + if !reflect.DeepEqual(gotUpdate, test.wantUpdate) { + t.Errorf("validateCluster(%+v) = %v, want: %v", test.cluster, gotUpdate, test.wantUpdate) + } + }) + } +} + +// TestCDSHandleResponse starts a fake xDS server, makes a ClientConn to it, +// and creates a v2Client using it. Then, it registers a CDS watcher and tests +// different CDS responses. +func TestCDSHandleResponse(t *testing.T) { + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() + v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() + + tests := []struct { + name string + cdsResponse *xdspb.DiscoveryResponse + wantErr bool + wantUpdate *cdsUpdate + wantUpdateErr bool + }{ + // Badly marshaled CDS response. + { + name: "badly-marshaled-response", + cdsResponse: badlyMarshaledCDSResponse, + wantErr: true, + wantUpdate: nil, + wantUpdateErr: false, + }, + // Response does not contain Cluster proto. + { + name: "no-cluster-proto-in-response", + cdsResponse: badResourceTypeInLDSResponse, + wantErr: true, + wantUpdate: nil, + wantUpdateErr: false, + }, + // Response contains no clusters. + { + name: "no-cluster", + cdsResponse: &xdspb.DiscoveryResponse{}, + wantErr: false, + wantUpdate: &cdsUpdate{}, + wantUpdateErr: true, + }, + // Response contains one good cluster we are not interested in. + { + name: "one-uninteresting-cluster", + cdsResponse: goodCDSResponse2, + wantErr: false, + wantUpdate: &cdsUpdate{}, + wantUpdateErr: true, + }, + // Response contains one cluster and it is good. + { + name: "one-good-cluster", + cdsResponse: goodCDSResponse1, + wantErr: false, + wantUpdate: &cdsUpdate{serviceName: serviceName1, doLRS: true}, + wantUpdateErr: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + gotUpdateCh := make(chan cdsUpdate, 1) + gotUpdateErrCh := make(chan error, 1) + + // Register a watcher, to trigger the v2Client to send an CDS request. + cancelWatch := v2c.watchCDS(clusterName1, func(u cdsUpdate, err error) { + t.Logf("in v2c.watchCDS callback, cdsUpdate: %+v, err: %v", u, err) + gotUpdateCh <- u + gotUpdateErrCh <- err + }) + + // Wait till the request makes it to the fakeServer. This ensures that + // the watch request has been processed by the v2Client. + <-fakeServer.RequestChan + + // Directly push the response through a call to handleLDSResponse, + // thereby bypassing the fakeServer. + if err := v2c.handleCDSResponse(test.cdsResponse); (err != nil) != test.wantErr { + t.Fatalf("v2c.handleCDSResponse() returned err: %v, wantErr: %v", err, test.wantErr) + } + + // If the test needs the callback to be invoked, verify the update and + // error pushed to the callback. + if test.wantUpdate != nil { + timer := time.NewTimer(defaultTestTimeout) + select { + case <-timer.C: + t.Fatal("Timeout when expecting CDS update") + case gotUpdate := <-gotUpdateCh: + timer.Stop() + if !reflect.DeepEqual(gotUpdate, *test.wantUpdate) { + t.Fatalf("got CDS update : %+v, want %+v", gotUpdate, test.wantUpdate) + } + } + // Since the callback that we registered pushes to both channels at + // the same time, this channel read should return immediately. + gotUpdateErr := <-gotUpdateErrCh + if (gotUpdateErr != nil) != test.wantUpdateErr { + t.Fatalf("got CDS update error {%v}, wantErr: %v", gotUpdateErr, test.wantUpdateErr) + } + } + cancelWatch() + }) + } +} + +// TestCDSHandleResponseWithoutWatch tests the case where the v2Client receives +// a CDS response without a registered watcher. +func TestCDSHandleResponseWithoutWatch(t *testing.T) { + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() + v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() + + if v2c.handleCDSResponse(goodCDSResponse1) == nil { + t.Fatal("v2c.handleCDSResponse() succeeded, should have failed") + } +} + +// cdsTestOp contains all data related to one particular test operation. Not +// all fields make sense for all tests. +type cdsTestOp struct { + // target is the resource name to watch for. + target string + // responseToSend is the xDS response sent to the client + responseToSend *fakexds.Response + // wantOpErr specfies whether the main operation should return an error. + wantOpErr bool + // wantCDSCache is the expected rdsCache at the end of an operation. + wantCDSCache map[string]cdsUpdate + // wantWatchCallback specifies if the watch callback should be invoked. + wantWatchCallback bool +} + +// testCDSCaching is a helper function which starts a fake xDS server, makes a +// ClientConn to it, creates a v2Client using it. It then reads a bunch of +// test operations to be performed from cdsTestOps and returns error, if any, +// on the provided error channel. This is executed in a separate goroutine. +func testCDSCaching(t *testing.T, cdsTestOps []cdsTestOp, errCh chan error) { + t.Helper() + + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() + v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() + t.Log("Started xds v2Client...") + + callbackCh := make(chan struct{}, 1) + for _, cdsTestOp := range cdsTestOps { + // Register a watcher if required, and use a channel to signal the + // successful invocation of the callback. + if cdsTestOp.target != "" { + v2c.watchCDS(cdsTestOp.target, func(u cdsUpdate, err error) { + t.Logf("Received callback with cdsUpdate {%+v} and error {%v}", u, err) + callbackCh <- struct{}{} + }) + t.Logf("Registered a watcher for CDS target: %v...", cdsTestOp.target) + + // Wait till the request makes it to the fakeServer. This ensures that + // the watch request has been processed by the v2Client. + <-fakeServer.RequestChan + t.Log("FakeServer received request...") + } + + // Directly push the response through a call to handleCDSResponse, + // thereby bypassing the fakeServer. + if cdsTestOp.responseToSend != nil { + if err := v2c.handleCDSResponse(cdsTestOp.responseToSend.Resp); (err != nil) != cdsTestOp.wantOpErr { + errCh <- fmt.Errorf("v2c.handleCDSResponse() returned err: %v", err) + return + } + } + + // If the test needs the callback to be invoked, just verify that + // it was invoked. Since we verify the contents of the cache, it's + // ok not to verify the contents of the callback. + if cdsTestOp.wantWatchCallback { + <-callbackCh + } + + if !reflect.DeepEqual(v2c.cloneCDSCacheForTesting(), cdsTestOp.wantCDSCache) { + errCh <- fmt.Errorf("gotCDSCache: %v, wantCDSCache: %v", v2c.rdsCache, cdsTestOp.wantCDSCache) + return + } + } + t.Log("Completed all test ops successfully...") + errCh <- nil +} + +// TestCDSCaching tests some end-to-end CDS flows using a fake xDS server, and +// verifies the CDS data cached at the v2Client. +func TestCDSCaching(t *testing.T) { + errCh := make(chan error, 1) + ops := []cdsTestOp{ + // Add an CDS watch for a cluster name (clusterName1), which returns one + // matching resource in the response. + { + target: clusterName1, + responseToSend: &fakexds.Response{Resp: goodCDSResponse1}, + wantCDSCache: map[string]cdsUpdate{ + clusterName1: {serviceName1, true}, + }, + wantWatchCallback: true, + }, + // Push an CDS response which contains a new resource (apart from the + // one received in the previous response). This should be cached. + { + responseToSend: &fakexds.Response{Resp: cdsResponseWithMultipleResources}, + wantCDSCache: map[string]cdsUpdate{ + clusterName1: {serviceName1, true}, + clusterName2: {serviceName2, false}, + }, + wantWatchCallback: true, + }, + // Switch the watch target to clusterName2, which was already cached. No + // response is received from the server (as expected), but we want the + // callback to be invoked with the new serviceName. + { + target: clusterName2, + wantCDSCache: map[string]cdsUpdate{ + clusterName1: {serviceName1, true}, + clusterName2: {serviceName2, false}, + }, + wantWatchCallback: true, + }, + // Push an empty CDS response. This should clear the cache. + { + responseToSend: &fakexds.Response{Resp: &xdspb.DiscoveryResponse{TypeUrl: clusterURL}}, + wantOpErr: false, + wantCDSCache: map[string]cdsUpdate{}, + wantWatchCallback: true, + }, + } + go testCDSCaching(t, ops, errCh) + + timer := time.NewTimer(defaultTestTimeout) + select { + case <-timer.C: + t.Fatal("Timeout when expecting CDS update") + case err := <-errCh: + timer.Stop() + if err != nil { + t.Fatal(err) + } + } +} + +// TestCDSWatchExpiryTimer tests the case where the client does not receive an +// CDS response for the request that it sends out. We want the watch callback +// to be invoked with an error once the watchExpiryTimer fires. +func TestCDSWatchExpiryTimer(t *testing.T) { + oldWatchExpiryTimeout := defaultWatchExpiryTimeout + defaultWatchExpiryTimeout = 1 * time.Second + defer func() { + defaultWatchExpiryTimeout = oldWatchExpiryTimeout + }() + + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() + v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() + t.Log("Started xds v2Client...") + + cdsCallbackCh := make(chan error, 1) + v2c.watchCDS(clusterName1, func(u cdsUpdate, err error) { + t.Logf("Received callback with cdsUpdate {%+v} and error {%v}", u, err) + if u.serviceName != "" { + cdsCallbackCh <- fmt.Errorf("received serviceName %v in cdsCallback, wanted empty string", u.serviceName) + } + if err == nil { + cdsCallbackCh <- errors.New("received nil error in cdsCallback") + } + cdsCallbackCh <- nil + }) + <-fakeServer.RequestChan + + timer := time.NewTimer(2 * time.Second) + select { + case <-timer.C: + t.Fatalf("Timeout expired when expecting CDS update") + case err := <-cdsCallbackCh: + timer.Stop() + if err != nil { + t.Fatal(err) + } + } +} + +var ( + badlyMarshaledCDSResponse = &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{ + { + TypeUrl: clusterURL, + Value: []byte{1, 2, 3, 4}, + }, + }, + TypeUrl: clusterURL, + } + goodCluster1 = &xdspb.Cluster{ + Name: clusterName1, + ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, + EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ + EdsConfig: &corepb.ConfigSource{ + ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ + Ads: &corepb.AggregatedConfigSource{}, + }, + }, + ServiceName: serviceName1, + }, + LbPolicy: xdspb.Cluster_ROUND_ROBIN, + LrsServer: &corepb.ConfigSource{ + ConfigSourceSpecifier: &corepb.ConfigSource_Self{ + Self: &corepb.SelfConfigSource{}, + }, + }, + } + marshaledCluster1, _ = proto.Marshal(goodCluster1) + goodCluster2 = &xdspb.Cluster{ + Name: clusterName2, + ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, + EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ + EdsConfig: &corepb.ConfigSource{ + ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ + Ads: &corepb.AggregatedConfigSource{}, + }, + }, + ServiceName: serviceName2, + }, + LbPolicy: xdspb.Cluster_ROUND_ROBIN, + } + marshaledCluster2, _ = proto.Marshal(goodCluster2) + goodCDSResponse1 = &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{ + { + TypeUrl: clusterURL, + Value: marshaledCluster1, + }, + }, + TypeUrl: clusterURL, + } + goodCDSResponse2 = &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{ + { + TypeUrl: clusterURL, + Value: marshaledCluster2, + }, + }, + TypeUrl: clusterURL, + } + cdsResponseWithMultipleResources = &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{ + { + TypeUrl: clusterURL, + Value: marshaledCluster1, + }, + { + TypeUrl: clusterURL, + Value: marshaledCluster2, + }, + }, + TypeUrl: clusterURL, + } +) diff --git a/xds/internal/client/eds.go b/xds/internal/client/eds.go index 9f7f5722e..720b25698 100644 --- a/xds/internal/client/eds.go +++ b/xds/internal/client/eds.go @@ -224,7 +224,7 @@ func (v2c *v2Client) handleEDSResponse(resp *xdspb.DiscoveryResponse) error { } if returnUpdate != nil { - wi.expiryTimer.Stop() + wi.stopTimer() wi.callback.(edsCallback)(returnUpdate, nil) } diff --git a/xds/internal/client/eds_test.go b/xds/internal/client/eds_test.go index 2b46f445f..ae9aac34d 100644 --- a/xds/internal/client/eds_test.go +++ b/xds/internal/client/eds_test.go @@ -32,7 +32,7 @@ import ( "google.golang.org/grpc/xds/internal/client/fakexds" ) -func TestParseEDSRespProto(t *testing.T) { +func TestEDSParseRespProto(t *testing.T) { tests := []struct { name string m *xdspb.ClusterLoadAssignment @@ -161,7 +161,7 @@ var ( } ) -func TestHandleEDSResponse(t *testing.T) { +func TestEDSHandleResponse(t *testing.T) { fakeServer, sCleanup := fakexds.StartServer(t) client, cCleanup := fakeServer.GetClientConn(t) defer func() { @@ -169,6 +169,7 @@ func TestHandleEDSResponse(t *testing.T) { sCleanup() }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() tests := []struct { name string @@ -272,9 +273,9 @@ func TestHandleEDSResponse(t *testing.T) { } } -// TestHandleEDSResponseWithoutEDSWatch tests the case where the v2Client +// TestEDSHandleResponseWithoutWatch tests the case where the v2Client // receives an EDS response without a registered EDS watcher. -func TestHandleEDSResponseWithoutEDSWatch(t *testing.T) { +func TestEDSHandleResponseWithoutWatch(t *testing.T) { fakeServer, sCleanup := fakexds.StartServer(t) client, cCleanup := fakeServer.GetClientConn(t) defer func() { diff --git a/xds/internal/client/lds.go b/xds/internal/client/lds.go index be92a9a6c..55b622e5d 100644 --- a/xds/internal/client/lds.go +++ b/xds/internal/client/lds.go @@ -87,7 +87,7 @@ func (v2c *v2Client) handleLDSResponse(resp *xdspb.DiscoveryResponse) error { if routeName == "" { err = fmt.Errorf("xds: LDS target %s not found in received response %+v", wi.target, resp) } - wi.expiryTimer.Stop() + wi.stopTimer() wi.callback.(ldsCallback)(ldsUpdate{routeName: routeName}, err) return nil } diff --git a/xds/internal/client/lds_test.go b/xds/internal/client/lds_test.go index e76b6f798..ce045a5af 100644 --- a/xds/internal/client/lds_test.go +++ b/xds/internal/client/lds_test.go @@ -29,7 +29,7 @@ import ( "google.golang.org/grpc/xds/internal/client/fakexds" ) -func TestGetRouteConfigNameFromListener(t *testing.T) { +func TestLDSGetRouteConfig(t *testing.T) { tests := []struct { name string lis *xdspb.Listener @@ -87,10 +87,10 @@ func TestGetRouteConfigNameFromListener(t *testing.T) { } } -// TestHandleLDSResponse starts a fake xDS server, makes a ClientConn to it, +// TestLDSHandleResponse starts a fake xDS server, makes a ClientConn to it, // and creates a v2Client using it. Then, it registers a watchLDS and tests // different LDS responses. -func TestHandleLDSResponse(t *testing.T) { +func TestLDSHandleResponse(t *testing.T) { fakeServer, sCleanup := fakexds.StartServer(t) client, cCleanup := fakeServer.GetClientConn(t) defer func() { @@ -98,6 +98,7 @@ func TestHandleLDSResponse(t *testing.T) { sCleanup() }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() tests := []struct { name string @@ -224,9 +225,9 @@ func TestHandleLDSResponse(t *testing.T) { } } -// TestHandleLDSResponseWithoutWatch tests the case where the v2Client receives +// TestLDSHandleResponseWithoutWatch tests the case where the v2Client receives // an LDS response without a registered watcher. -func TestHandleLDSResponseWithoutWatch(t *testing.T) { +func TestLDSHandleResponseWithoutWatch(t *testing.T) { fakeServer, sCleanup := fakexds.StartServer(t) client, cCleanup := fakeServer.GetClientConn(t) defer func() { @@ -234,6 +235,7 @@ func TestHandleLDSResponseWithoutWatch(t *testing.T) { sCleanup() }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() if v2c.handleLDSResponse(goodLDSResponse1) == nil { t.Fatal("v2c.handleLDSResponse() succeeded, should have failed") @@ -257,6 +259,7 @@ func TestLDSWatchExpiryTimer(t *testing.T) { sCleanup() }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() // Wait till the request makes it to the fakeServer. This ensures that // the watch request has been processed by the v2Client. diff --git a/xds/internal/client/rds.go b/xds/internal/client/rds.go index 67c54cf64..0d1dcc96a 100644 --- a/xds/internal/client/rds.go +++ b/xds/internal/client/rds.go @@ -104,7 +104,7 @@ func (v2c *v2Client) handleRDSResponse(resp *xdspb.DiscoveryResponse) error { // incremental protocol, the fact that we did not receive the resource // that we are watching for in this response does not mean that the // server does not know about it. - wi.expiryTimer.Stop() + wi.stopTimer() wi.callback.(rdsCallback)(rdsUpdate{clusterName: returnCluster}, nil) } return nil diff --git a/xds/internal/client/rds_test.go b/xds/internal/client/rds_test.go index 79673d2f8..0c04e3d53 100644 --- a/xds/internal/client/rds_test.go +++ b/xds/internal/client/rds_test.go @@ -41,7 +41,7 @@ func (v2c *v2Client) cloneRDSCacheForTesting() map[string]string { return cloneCache } -func TestGetClusterFromRouteConfiguration(t *testing.T) { +func TestRDSGetClusterFromRouteConfiguration(t *testing.T) { tests := []struct { name string rc *xdspb.RouteConfiguration @@ -140,10 +140,10 @@ func TestGetClusterFromRouteConfiguration(t *testing.T) { } } -// TestHandleRDSResponse starts a fake xDS server, makes a ClientConn to it, +// TestRDSHandleResponse starts a fake xDS server, makes a ClientConn to it, // and creates a v2Client using it. Then, it registers an LDS and RDS watcher // and tests different RDS responses. -func TestHandleRDSResponse(t *testing.T) { +func TestRDSHandleResponse(t *testing.T) { fakeServer, sCleanup := fakexds.StartServer(t) client, cCleanup := fakeServer.GetClientConn(t) defer func() { @@ -151,6 +151,7 @@ func TestHandleRDSResponse(t *testing.T) { sCleanup() }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() // Register an LDS watcher, and wait till the request is sent out, the // response is received and the callback is invoked. @@ -263,9 +264,9 @@ func TestHandleRDSResponse(t *testing.T) { } } -// TestHandleRDSResponseWithoutLDSWatch tests the case where the v2Client +// TestRDSHandleResponseWithoutLDSWatch tests the case where the v2Client // receives an RDS response without a registered LDS watcher. -func TestHandleRDSResponseWithoutLDSWatch(t *testing.T) { +func TestRDSHandleResponseWithoutLDSWatch(t *testing.T) { fakeServer, sCleanup := fakexds.StartServer(t) client, cCleanup := fakeServer.GetClientConn(t) defer func() { @@ -273,15 +274,16 @@ func TestHandleRDSResponseWithoutLDSWatch(t *testing.T) { sCleanup() }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() if v2c.handleRDSResponse(goodRDSResponse1) == nil { t.Fatal("v2c.handleRDSResponse() succeeded, should have failed") } } -// TestHandleRDSResponseWithoutRDSWatch tests the case where the v2Client +// TestRDSHandleResponseWithoutRDSWatch tests the case where the v2Client // receives an RDS response without a registered RDS watcher. -func TestHandleRDSResponseWithoutRDSWatch(t *testing.T) { +func TestRDSHandleResponseWithoutRDSWatch(t *testing.T) { fakeServer, sCleanup := fakexds.StartServer(t) client, cCleanup := fakeServer.GetClientConn(t) defer func() { @@ -289,6 +291,7 @@ func TestHandleRDSResponseWithoutRDSWatch(t *testing.T) { sCleanup() }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() // Register an LDS watcher, and wait till the request is sent out, the // response is received and the callback is invoked. @@ -308,9 +311,9 @@ func TestHandleRDSResponseWithoutRDSWatch(t *testing.T) { } } -// testOp contains all data related to one particular test operation. Not all -// fields make sense for all tests. -type testOp struct { +// rdsTestOp contains all data related to one particular test operation. Not +// all fields make sense for all tests. +type rdsTestOp struct { // target is the resource name to watch for. target string // responseToSend is the xDS response sent to the client @@ -326,9 +329,9 @@ type testOp struct { // testRDSCaching is a helper function which starts a fake xDS server, makes a // ClientConn to it, creates a v2Client using it, registers an LDS watcher and // pushes a good LDS response. It then reads a bunch of test operations to be -// performed from testOps and returns error, if any, on the provided error +// performed from rdsTestOps and returns error, if any, on the provided error // channel. This is executed in a separate goroutine. -func testRDSCaching(t *testing.T, testOps []testOp, errCh chan error) { +func testRDSCaching(t *testing.T, rdsTestOps []rdsTestOp, errCh chan error) { t.Helper() fakeServer, sCleanup := fakexds.StartServer(t) @@ -356,15 +359,15 @@ func testRDSCaching(t *testing.T, testOps []testOp, errCh chan error) { } callbackCh := make(chan struct{}, 1) - for _, testOp := range testOps { + for _, rdsTestOp := range rdsTestOps { // Register a watcher if required, and use a channel to signal the // successful invocation of the callback. - if testOp.target != "" { - v2c.watchRDS(testOp.target, func(u rdsUpdate, err error) { + if rdsTestOp.target != "" { + v2c.watchRDS(rdsTestOp.target, func(u rdsUpdate, err error) { t.Logf("Received callback with rdsUpdate {%+v} and error {%v}", u, err) callbackCh <- struct{}{} }) - t.Logf("Registered a watcher for LDS target: %v...", testOp.target) + t.Logf("Registered a watcher for RDS target: %v...", rdsTestOp.target) // Wait till the request makes it to the fakeServer. This ensures that // the watch request has been processed by the v2Client. @@ -374,8 +377,8 @@ func testRDSCaching(t *testing.T, testOps []testOp, errCh chan error) { // Directly push the response through a call to handleRDSResponse, // thereby bypassing the fakeServer. - if testOp.responseToSend != nil { - if err := v2c.handleRDSResponse(testOp.responseToSend.Resp); (err != nil) != testOp.wantOpErr { + if rdsTestOp.responseToSend != nil { + if err := v2c.handleRDSResponse(rdsTestOp.responseToSend.Resp); (err != nil) != rdsTestOp.wantOpErr { errCh <- fmt.Errorf("v2c.handleRDSResponse() returned err: %v", err) return } @@ -384,12 +387,12 @@ func testRDSCaching(t *testing.T, testOps []testOp, errCh chan error) { // If the test needs the callback to be invoked, just verify that // it was invoked. Since we verify the contents of the cache, it's // ok not to verify the contents of the callback. - if testOp.wantWatchCallback { + if rdsTestOp.wantWatchCallback { <-callbackCh } - if !reflect.DeepEqual(v2c.cloneRDSCacheForTesting(), testOp.wantRDSCache) { - errCh <- fmt.Errorf("gotRDSCache: %v, wantRDSCache: %v", v2c.rdsCache, testOp.wantRDSCache) + if !reflect.DeepEqual(v2c.cloneRDSCacheForTesting(), rdsTestOp.wantRDSCache) { + errCh <- fmt.Errorf("gotRDSCache: %v, wantRDSCache: %v", v2c.rdsCache, rdsTestOp.wantRDSCache) return } } @@ -401,7 +404,7 @@ func testRDSCaching(t *testing.T, testOps []testOp, errCh chan error) { // verifies the RDS data cached at the v2Client. func TestRDSCaching(t *testing.T) { errCh := make(chan error, 1) - ops := []testOp{ + ops := []rdsTestOp{ // Add an RDS watch for a resource name (goodRouteName1), which returns one // matching resource in the response. { @@ -458,6 +461,9 @@ func TestRDSCaching(t *testing.T) { } } +// TestRDSWatchExpiryTimer tests the case where the client does not receive an +// RDS response for the request that it sends out. We want the watch callback +// to be invoked with an error once the watchExpiryTimer fires. func TestRDSWatchExpiryTimer(t *testing.T) { oldWatchExpiryTimeout := defaultWatchExpiryTimeout defaultWatchExpiryTimeout = 1 * time.Second diff --git a/xds/internal/client/types.go b/xds/internal/client/types.go index d45ddeae7..fed39ded1 100644 --- a/xds/internal/client/types.go +++ b/xds/internal/client/types.go @@ -69,16 +69,27 @@ func (wi *watchInfo) cancel() { } } +// stopTimer stops the expiry timer without cancelling the watch. +func (wi *watchInfo) stopTimer() { + if wi.expiryTimer != nil { + wi.expiryTimer.Stop() + } +} + type ldsUpdate struct { routeName string } - type ldsCallback func(ldsUpdate, error) type rdsUpdate struct { clusterName string } - type rdsCallback func(rdsUpdate, error) +type cdsUpdate struct { + serviceName string + doLRS bool +} +type cdsCallback func(cdsUpdate, error) + type edsCallback func(*EDSUpdate, error) diff --git a/xds/internal/client/v2client.go b/xds/internal/client/v2client.go index e03977947..b659db135 100644 --- a/xds/internal/client/v2client.go +++ b/xds/internal/client/v2client.go @@ -74,6 +74,16 @@ type v2Client struct { // unrequested resources. // https://github.com/envoyproxy/envoy/blob/master/api/xds_protocol.rst#resource-hints rdsCache map[string]string + // rdsCache maintains a mapping of {clusterName --> cdsUpdate} from + // validated cluster configurations received in CDS responses. We cache all + // valid cluster configurations, whether or not we are interested in them + // when we received them (because we could become interested in them in the + // future and the server wont send us those resources again). This is only + // to support legacy management servers that do not honor the + // resource_names field. As per the latest spec, the server should resend + // the response when the request changes, even if it had sent the same + // resource earlier (when not asked for). Protected by the above mutex. + cdsCache map[string]cdsUpdate } // newV2Client creates a new v2Client initialized with the passed arguments. @@ -85,6 +95,7 @@ func newV2Client(cc *grpc.ClientConn, nodeProto *corepb.Node, backoff func(int) watchCh: buffer.NewUnbounded(), watchMap: make(map[resourceType]*watchInfo), rdsCache: make(map[string]string), + cdsCache: make(map[string]cdsUpdate), } v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background()) @@ -163,6 +174,10 @@ func (v2c *v2Client) sendExisting(stream adsStream) bool { if !v2c.sendRDS(stream, wi.target) { return false } + case cdsResource: + if !v2c.sendCDS(stream, wi.target) { + return false + } case edsResource: if !v2c.sendEDS(stream, wi.target) { return false @@ -206,6 +221,10 @@ func (v2c *v2Client) send(stream adsStream, done chan struct{}) { if !v2c.sendRDS(stream, target) { return } + case cdsResource: + if !v2c.sendCDS(stream, target) { + return + } case edsResource: if !v2c.sendEDS(stream, target) { return @@ -239,6 +258,11 @@ func (v2c *v2Client) recv(stream adsStream) bool { grpclog.Warningf("xds: RDS response handler failed: %v", err) return success } + case clusterURL: + if err := v2c.handleCDSResponse(resp); err != nil { + grpclog.Warningf("xds: CDS response handler failed: %v", err) + return success + } case endpointURL: if err := v2c.handleEDSResponse(resp); err != nil { grpclog.Warningf("xds: EDS response handler failed: %v", err) @@ -296,6 +320,27 @@ func (v2c *v2Client) watchRDS(routeName string, rdsCb rdsCallback) (cancel func( } } +// watchCDS registers an CDS watcher for the provided clusterName. Updates +// corresponding to received CDS responses will be pushed to the provided +// callback. The caller can cancel the watch by invoking the returned cancel +// function. +// The provided callback should not block or perform any expensive operations +// or call other methods of the v2Client object. +func (v2c *v2Client) watchCDS(clusterName string, cdsCb cdsCallback) (cancel func()) { + wi := &watchInfo{wType: cdsResource, target: []string{clusterName}, callback: cdsCb} + v2c.watchCh.Put(wi) + return func() { + v2c.mu.Lock() + defer v2c.mu.Unlock() + if wi.state == watchEnqueued { + wi.state = watchCancelled + return + } + v2c.watchMap[cdsResource].cancel() + delete(v2c.watchMap, cdsResource) + } +} + // watchEDS registers an EDS watcher for the provided clusterName. Updates // corresponding to received EDS responses will be pushed to the provided // callback. The caller can cancel the watch by invoking the returned cancel @@ -333,19 +378,17 @@ func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) { v2c.watchMap[wi.wType] = wi switch wi.wType { + // We need to grab the lock inside of the expiryTimer's afterFunc because + // we need to access the watchInfo, which is stored in the watchMap. case ldsResource: wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { - // We need to grab the lock here because we are accessing the - // watchInfo (which is now stored in the watchMap) from this - // method which will be called when the timer fires. v2c.mu.Lock() - wi.callback.(ldsCallback)(ldsUpdate{routeName: ""}, fmt.Errorf("xds: LDS target %s not found", wi.target)) + wi.callback.(ldsCallback)(ldsUpdate{}, fmt.Errorf("xds: LDS target %s not found", wi.target)) v2c.mu.Unlock() }) case rdsResource: routeName := wi.target[0] if cluster := v2c.rdsCache[routeName]; cluster != "" { - // Invoke the callback now, since we found the entry in the cache. var err error if v2c.watchMap[ldsResource] == nil { cluster = "" @@ -357,18 +400,27 @@ func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) { // Add the watch expiry timer only for new watches we don't find in // the cache, and return from here. wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { - // We need to grab the lock here because we are accessing the - // watchInfo (which is now stored in the watchMap) from this - // method which will be called when the timer fires. v2c.mu.Lock() - wi.callback.(rdsCallback)(rdsUpdate{clusterName: ""}, fmt.Errorf("xds: RDS target %s not found", wi.target)) + wi.callback.(rdsCallback)(rdsUpdate{}, fmt.Errorf("xds: RDS target %s not found", wi.target)) + v2c.mu.Unlock() + }) + case cdsResource: + clusterName := wi.target[0] + if update, ok := v2c.cdsCache[clusterName]; ok { + var err error + if v2c.watchMap[cdsResource] == nil { + err = fmt.Errorf("xds: no CDS watcher found when handling CDS watch for cluster {%v} from cache", clusterName) + } + wi.callback.(cdsCallback)(update, err) + return + } + wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { + v2c.mu.Lock() + wi.callback.(cdsCallback)(cdsUpdate{}, fmt.Errorf("xds: CDS target %s not found", wi.target)) v2c.mu.Unlock() }) case edsResource: wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { - // We need to grab the lock here because we are accessing the - // watchInfo (which is now stored in the watchMap) from this - // method which will be called when the timer fires. v2c.mu.Lock() wi.callback.(edsCallback)(nil, fmt.Errorf("xds: EDS target %s not found", wi.target)) v2c.mu.Unlock()