mirror of https://github.com/grpc/grpc-go.git
xds: CDS implementation in v2Client. (#3203)
This commit is contained in:
parent
e5e980f276
commit
1c4070c2e9
|
|
@ -0,0 +1,112 @@
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
* Copyright 2019 gRPC authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
||||||
|
"github.com/golang/protobuf/ptypes"
|
||||||
|
"google.golang.org/grpc/grpclog"
|
||||||
|
)
|
||||||
|
|
||||||
|
// newCDSRequest generates an CDS request proto for the provided clusterName,
|
||||||
|
// to be sent out on the wire.
|
||||||
|
func (v2c *v2Client) newCDSRequest(clusterName []string) *xdspb.DiscoveryRequest {
|
||||||
|
return &xdspb.DiscoveryRequest{
|
||||||
|
Node: v2c.nodeProto,
|
||||||
|
TypeUrl: clusterURL,
|
||||||
|
ResourceNames: clusterName,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendCDS sends an CDS request for provided clusterName on the provided
|
||||||
|
// stream.
|
||||||
|
func (v2c *v2Client) sendCDS(stream adsStream, clusterName []string) bool {
|
||||||
|
if err := stream.Send(v2c.newCDSRequest(clusterName)); err != nil {
|
||||||
|
grpclog.Warningf("xds: CDS request for resource %v failed: %v", clusterName, err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleCDSResponse processes an CDS response received from the xDS server. On
|
||||||
|
// receipt of a good response, it also invokes the registered watcher callback.
|
||||||
|
func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error {
|
||||||
|
v2c.mu.Lock()
|
||||||
|
defer v2c.mu.Unlock()
|
||||||
|
|
||||||
|
wi := v2c.watchMap[cdsResource]
|
||||||
|
if wi == nil {
|
||||||
|
return fmt.Errorf("xds: no CDS watcher found when handling CDS response: %+v", resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
var returnUpdate cdsUpdate
|
||||||
|
localCache := make(map[string]cdsUpdate)
|
||||||
|
for _, r := range resp.GetResources() {
|
||||||
|
var resource ptypes.DynamicAny
|
||||||
|
if err := ptypes.UnmarshalAny(r, &resource); err != nil {
|
||||||
|
return fmt.Errorf("xds: failed to unmarshal resource in CDS response: %v", err)
|
||||||
|
}
|
||||||
|
cluster, ok := resource.Message.(*xdspb.Cluster)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("xds: unexpected resource type: %T in CDS response", resource.Message)
|
||||||
|
}
|
||||||
|
update, err := validateCluster(cluster)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the Cluster message in the CDS response did not contain a
|
||||||
|
// serviceName, we will just use the clusterName for EDS.
|
||||||
|
if update.serviceName == "" {
|
||||||
|
update.serviceName = cluster.GetName()
|
||||||
|
}
|
||||||
|
localCache[cluster.GetName()] = update
|
||||||
|
if cluster.GetName() == wi.target[0] {
|
||||||
|
returnUpdate = update
|
||||||
|
}
|
||||||
|
}
|
||||||
|
v2c.cdsCache = localCache
|
||||||
|
|
||||||
|
var err error
|
||||||
|
if returnUpdate.serviceName == "" {
|
||||||
|
err = fmt.Errorf("xds: CDS target %s not found in received response %+v", wi.target, resp)
|
||||||
|
}
|
||||||
|
wi.stopTimer()
|
||||||
|
wi.callback.(cdsCallback)(returnUpdate, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateCluster(cluster *xdspb.Cluster) (cdsUpdate, error) {
|
||||||
|
emptyUpdate := cdsUpdate{serviceName: "", doLRS: false}
|
||||||
|
switch {
|
||||||
|
case cluster.GetType() != xdspb.Cluster_EDS:
|
||||||
|
return emptyUpdate, fmt.Errorf("xds: unexpected cluster type %v in response: %+v", cluster.GetType(), cluster)
|
||||||
|
case cluster.GetEdsClusterConfig().GetEdsConfig().GetAds() == nil:
|
||||||
|
return emptyUpdate, fmt.Errorf("xds: unexpected edsConfig in response: %+v", cluster)
|
||||||
|
case cluster.GetLbPolicy() != xdspb.Cluster_ROUND_ROBIN:
|
||||||
|
return emptyUpdate, fmt.Errorf("xds: unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster)
|
||||||
|
}
|
||||||
|
|
||||||
|
return cdsUpdate{
|
||||||
|
serviceName: cluster.GetEdsClusterConfig().GetServiceName(),
|
||||||
|
doLRS: cluster.GetLrsServer().GetSelf() != nil,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,539 @@
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
* Copyright 2019 gRPC authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
||||||
|
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
|
anypb "github.com/golang/protobuf/ptypes/any"
|
||||||
|
"google.golang.org/grpc/xds/internal/client/fakexds"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
clusterName1 = "foo-cluster"
|
||||||
|
clusterName2 = "bar-cluster"
|
||||||
|
serviceName1 = "foo-service"
|
||||||
|
serviceName2 = "bar-service"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (v2c *v2Client) cloneCDSCacheForTesting() map[string]cdsUpdate {
|
||||||
|
v2c.mu.Lock()
|
||||||
|
defer v2c.mu.Unlock()
|
||||||
|
|
||||||
|
cloneCache := make(map[string]cdsUpdate)
|
||||||
|
for k, v := range v2c.cdsCache {
|
||||||
|
cloneCache[k] = v
|
||||||
|
}
|
||||||
|
return cloneCache
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestValidateCluster(t *testing.T) {
|
||||||
|
emptyUpdate := cdsUpdate{serviceName: "", doLRS: false}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
cluster *xdspb.Cluster
|
||||||
|
wantUpdate cdsUpdate
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "non-eds-cluster-type",
|
||||||
|
cluster: &xdspb.Cluster{
|
||||||
|
ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_STATIC},
|
||||||
|
EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{
|
||||||
|
EdsConfig: &corepb.ConfigSource{
|
||||||
|
ConfigSourceSpecifier: &corepb.ConfigSource_Ads{
|
||||||
|
Ads: &corepb.AggregatedConfigSource{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
LbPolicy: xdspb.Cluster_LEAST_REQUEST,
|
||||||
|
},
|
||||||
|
wantUpdate: emptyUpdate,
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no-eds-config",
|
||||||
|
cluster: &xdspb.Cluster{
|
||||||
|
ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS},
|
||||||
|
LbPolicy: xdspb.Cluster_ROUND_ROBIN,
|
||||||
|
},
|
||||||
|
wantUpdate: emptyUpdate,
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no-ads-config-source",
|
||||||
|
cluster: &xdspb.Cluster{
|
||||||
|
ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS},
|
||||||
|
EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{},
|
||||||
|
LbPolicy: xdspb.Cluster_ROUND_ROBIN,
|
||||||
|
},
|
||||||
|
wantUpdate: emptyUpdate,
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "non-round-robin-lb-policy",
|
||||||
|
cluster: &xdspb.Cluster{
|
||||||
|
ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS},
|
||||||
|
EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{
|
||||||
|
EdsConfig: &corepb.ConfigSource{
|
||||||
|
ConfigSourceSpecifier: &corepb.ConfigSource_Ads{
|
||||||
|
Ads: &corepb.AggregatedConfigSource{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
LbPolicy: xdspb.Cluster_LEAST_REQUEST,
|
||||||
|
},
|
||||||
|
wantUpdate: emptyUpdate,
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "happy-case-no-service-name-no-lrs",
|
||||||
|
cluster: &xdspb.Cluster{
|
||||||
|
ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS},
|
||||||
|
EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{
|
||||||
|
EdsConfig: &corepb.ConfigSource{
|
||||||
|
ConfigSourceSpecifier: &corepb.ConfigSource_Ads{
|
||||||
|
Ads: &corepb.AggregatedConfigSource{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
LbPolicy: xdspb.Cluster_ROUND_ROBIN,
|
||||||
|
},
|
||||||
|
wantUpdate: emptyUpdate,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "happy-case-no-lrs",
|
||||||
|
cluster: &xdspb.Cluster{
|
||||||
|
ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS},
|
||||||
|
EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{
|
||||||
|
EdsConfig: &corepb.ConfigSource{
|
||||||
|
ConfigSourceSpecifier: &corepb.ConfigSource_Ads{
|
||||||
|
Ads: &corepb.AggregatedConfigSource{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ServiceName: serviceName1,
|
||||||
|
},
|
||||||
|
LbPolicy: xdspb.Cluster_ROUND_ROBIN,
|
||||||
|
},
|
||||||
|
wantUpdate: cdsUpdate{serviceName: serviceName1, doLRS: false},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "happiest-case",
|
||||||
|
cluster: goodCluster1,
|
||||||
|
wantUpdate: cdsUpdate{serviceName: serviceName1, doLRS: true},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
gotUpdate, gotErr := validateCluster(test.cluster)
|
||||||
|
if (gotErr != nil) != test.wantErr {
|
||||||
|
t.Errorf("validateCluster(%+v) returned error: %v, wantErr: %v", test.cluster, gotErr, test.wantErr)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(gotUpdate, test.wantUpdate) {
|
||||||
|
t.Errorf("validateCluster(%+v) = %v, want: %v", test.cluster, gotUpdate, test.wantUpdate)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCDSHandleResponse starts a fake xDS server, makes a ClientConn to it,
|
||||||
|
// and creates a v2Client using it. Then, it registers a CDS watcher and tests
|
||||||
|
// different CDS responses.
|
||||||
|
func TestCDSHandleResponse(t *testing.T) {
|
||||||
|
fakeServer, sCleanup := fakexds.StartServer(t)
|
||||||
|
client, cCleanup := fakeServer.GetClientConn(t)
|
||||||
|
defer func() {
|
||||||
|
cCleanup()
|
||||||
|
sCleanup()
|
||||||
|
}()
|
||||||
|
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
||||||
|
defer v2c.close()
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
cdsResponse *xdspb.DiscoveryResponse
|
||||||
|
wantErr bool
|
||||||
|
wantUpdate *cdsUpdate
|
||||||
|
wantUpdateErr bool
|
||||||
|
}{
|
||||||
|
// Badly marshaled CDS response.
|
||||||
|
{
|
||||||
|
name: "badly-marshaled-response",
|
||||||
|
cdsResponse: badlyMarshaledCDSResponse,
|
||||||
|
wantErr: true,
|
||||||
|
wantUpdate: nil,
|
||||||
|
wantUpdateErr: false,
|
||||||
|
},
|
||||||
|
// Response does not contain Cluster proto.
|
||||||
|
{
|
||||||
|
name: "no-cluster-proto-in-response",
|
||||||
|
cdsResponse: badResourceTypeInLDSResponse,
|
||||||
|
wantErr: true,
|
||||||
|
wantUpdate: nil,
|
||||||
|
wantUpdateErr: false,
|
||||||
|
},
|
||||||
|
// Response contains no clusters.
|
||||||
|
{
|
||||||
|
name: "no-cluster",
|
||||||
|
cdsResponse: &xdspb.DiscoveryResponse{},
|
||||||
|
wantErr: false,
|
||||||
|
wantUpdate: &cdsUpdate{},
|
||||||
|
wantUpdateErr: true,
|
||||||
|
},
|
||||||
|
// Response contains one good cluster we are not interested in.
|
||||||
|
{
|
||||||
|
name: "one-uninteresting-cluster",
|
||||||
|
cdsResponse: goodCDSResponse2,
|
||||||
|
wantErr: false,
|
||||||
|
wantUpdate: &cdsUpdate{},
|
||||||
|
wantUpdateErr: true,
|
||||||
|
},
|
||||||
|
// Response contains one cluster and it is good.
|
||||||
|
{
|
||||||
|
name: "one-good-cluster",
|
||||||
|
cdsResponse: goodCDSResponse1,
|
||||||
|
wantErr: false,
|
||||||
|
wantUpdate: &cdsUpdate{serviceName: serviceName1, doLRS: true},
|
||||||
|
wantUpdateErr: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
gotUpdateCh := make(chan cdsUpdate, 1)
|
||||||
|
gotUpdateErrCh := make(chan error, 1)
|
||||||
|
|
||||||
|
// Register a watcher, to trigger the v2Client to send an CDS request.
|
||||||
|
cancelWatch := v2c.watchCDS(clusterName1, func(u cdsUpdate, err error) {
|
||||||
|
t.Logf("in v2c.watchCDS callback, cdsUpdate: %+v, err: %v", u, err)
|
||||||
|
gotUpdateCh <- u
|
||||||
|
gotUpdateErrCh <- err
|
||||||
|
})
|
||||||
|
|
||||||
|
// Wait till the request makes it to the fakeServer. This ensures that
|
||||||
|
// the watch request has been processed by the v2Client.
|
||||||
|
<-fakeServer.RequestChan
|
||||||
|
|
||||||
|
// Directly push the response through a call to handleLDSResponse,
|
||||||
|
// thereby bypassing the fakeServer.
|
||||||
|
if err := v2c.handleCDSResponse(test.cdsResponse); (err != nil) != test.wantErr {
|
||||||
|
t.Fatalf("v2c.handleCDSResponse() returned err: %v, wantErr: %v", err, test.wantErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the test needs the callback to be invoked, verify the update and
|
||||||
|
// error pushed to the callback.
|
||||||
|
if test.wantUpdate != nil {
|
||||||
|
timer := time.NewTimer(defaultTestTimeout)
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
t.Fatal("Timeout when expecting CDS update")
|
||||||
|
case gotUpdate := <-gotUpdateCh:
|
||||||
|
timer.Stop()
|
||||||
|
if !reflect.DeepEqual(gotUpdate, *test.wantUpdate) {
|
||||||
|
t.Fatalf("got CDS update : %+v, want %+v", gotUpdate, test.wantUpdate)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Since the callback that we registered pushes to both channels at
|
||||||
|
// the same time, this channel read should return immediately.
|
||||||
|
gotUpdateErr := <-gotUpdateErrCh
|
||||||
|
if (gotUpdateErr != nil) != test.wantUpdateErr {
|
||||||
|
t.Fatalf("got CDS update error {%v}, wantErr: %v", gotUpdateErr, test.wantUpdateErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cancelWatch()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCDSHandleResponseWithoutWatch tests the case where the v2Client receives
|
||||||
|
// a CDS response without a registered watcher.
|
||||||
|
func TestCDSHandleResponseWithoutWatch(t *testing.T) {
|
||||||
|
fakeServer, sCleanup := fakexds.StartServer(t)
|
||||||
|
client, cCleanup := fakeServer.GetClientConn(t)
|
||||||
|
defer func() {
|
||||||
|
cCleanup()
|
||||||
|
sCleanup()
|
||||||
|
}()
|
||||||
|
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
||||||
|
defer v2c.close()
|
||||||
|
|
||||||
|
if v2c.handleCDSResponse(goodCDSResponse1) == nil {
|
||||||
|
t.Fatal("v2c.handleCDSResponse() succeeded, should have failed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// cdsTestOp contains all data related to one particular test operation. Not
|
||||||
|
// all fields make sense for all tests.
|
||||||
|
type cdsTestOp struct {
|
||||||
|
// target is the resource name to watch for.
|
||||||
|
target string
|
||||||
|
// responseToSend is the xDS response sent to the client
|
||||||
|
responseToSend *fakexds.Response
|
||||||
|
// wantOpErr specfies whether the main operation should return an error.
|
||||||
|
wantOpErr bool
|
||||||
|
// wantCDSCache is the expected rdsCache at the end of an operation.
|
||||||
|
wantCDSCache map[string]cdsUpdate
|
||||||
|
// wantWatchCallback specifies if the watch callback should be invoked.
|
||||||
|
wantWatchCallback bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// testCDSCaching is a helper function which starts a fake xDS server, makes a
|
||||||
|
// ClientConn to it, creates a v2Client using it. It then reads a bunch of
|
||||||
|
// test operations to be performed from cdsTestOps and returns error, if any,
|
||||||
|
// on the provided error channel. This is executed in a separate goroutine.
|
||||||
|
func testCDSCaching(t *testing.T, cdsTestOps []cdsTestOp, errCh chan error) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
fakeServer, sCleanup := fakexds.StartServer(t)
|
||||||
|
client, cCleanup := fakeServer.GetClientConn(t)
|
||||||
|
defer func() {
|
||||||
|
cCleanup()
|
||||||
|
sCleanup()
|
||||||
|
}()
|
||||||
|
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
||||||
|
defer v2c.close()
|
||||||
|
t.Log("Started xds v2Client...")
|
||||||
|
|
||||||
|
callbackCh := make(chan struct{}, 1)
|
||||||
|
for _, cdsTestOp := range cdsTestOps {
|
||||||
|
// Register a watcher if required, and use a channel to signal the
|
||||||
|
// successful invocation of the callback.
|
||||||
|
if cdsTestOp.target != "" {
|
||||||
|
v2c.watchCDS(cdsTestOp.target, func(u cdsUpdate, err error) {
|
||||||
|
t.Logf("Received callback with cdsUpdate {%+v} and error {%v}", u, err)
|
||||||
|
callbackCh <- struct{}{}
|
||||||
|
})
|
||||||
|
t.Logf("Registered a watcher for CDS target: %v...", cdsTestOp.target)
|
||||||
|
|
||||||
|
// Wait till the request makes it to the fakeServer. This ensures that
|
||||||
|
// the watch request has been processed by the v2Client.
|
||||||
|
<-fakeServer.RequestChan
|
||||||
|
t.Log("FakeServer received request...")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Directly push the response through a call to handleCDSResponse,
|
||||||
|
// thereby bypassing the fakeServer.
|
||||||
|
if cdsTestOp.responseToSend != nil {
|
||||||
|
if err := v2c.handleCDSResponse(cdsTestOp.responseToSend.Resp); (err != nil) != cdsTestOp.wantOpErr {
|
||||||
|
errCh <- fmt.Errorf("v2c.handleCDSResponse() returned err: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the test needs the callback to be invoked, just verify that
|
||||||
|
// it was invoked. Since we verify the contents of the cache, it's
|
||||||
|
// ok not to verify the contents of the callback.
|
||||||
|
if cdsTestOp.wantWatchCallback {
|
||||||
|
<-callbackCh
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(v2c.cloneCDSCacheForTesting(), cdsTestOp.wantCDSCache) {
|
||||||
|
errCh <- fmt.Errorf("gotCDSCache: %v, wantCDSCache: %v", v2c.rdsCache, cdsTestOp.wantCDSCache)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t.Log("Completed all test ops successfully...")
|
||||||
|
errCh <- nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCDSCaching tests some end-to-end CDS flows using a fake xDS server, and
|
||||||
|
// verifies the CDS data cached at the v2Client.
|
||||||
|
func TestCDSCaching(t *testing.T) {
|
||||||
|
errCh := make(chan error, 1)
|
||||||
|
ops := []cdsTestOp{
|
||||||
|
// Add an CDS watch for a cluster name (clusterName1), which returns one
|
||||||
|
// matching resource in the response.
|
||||||
|
{
|
||||||
|
target: clusterName1,
|
||||||
|
responseToSend: &fakexds.Response{Resp: goodCDSResponse1},
|
||||||
|
wantCDSCache: map[string]cdsUpdate{
|
||||||
|
clusterName1: {serviceName1, true},
|
||||||
|
},
|
||||||
|
wantWatchCallback: true,
|
||||||
|
},
|
||||||
|
// Push an CDS response which contains a new resource (apart from the
|
||||||
|
// one received in the previous response). This should be cached.
|
||||||
|
{
|
||||||
|
responseToSend: &fakexds.Response{Resp: cdsResponseWithMultipleResources},
|
||||||
|
wantCDSCache: map[string]cdsUpdate{
|
||||||
|
clusterName1: {serviceName1, true},
|
||||||
|
clusterName2: {serviceName2, false},
|
||||||
|
},
|
||||||
|
wantWatchCallback: true,
|
||||||
|
},
|
||||||
|
// Switch the watch target to clusterName2, which was already cached. No
|
||||||
|
// response is received from the server (as expected), but we want the
|
||||||
|
// callback to be invoked with the new serviceName.
|
||||||
|
{
|
||||||
|
target: clusterName2,
|
||||||
|
wantCDSCache: map[string]cdsUpdate{
|
||||||
|
clusterName1: {serviceName1, true},
|
||||||
|
clusterName2: {serviceName2, false},
|
||||||
|
},
|
||||||
|
wantWatchCallback: true,
|
||||||
|
},
|
||||||
|
// Push an empty CDS response. This should clear the cache.
|
||||||
|
{
|
||||||
|
responseToSend: &fakexds.Response{Resp: &xdspb.DiscoveryResponse{TypeUrl: clusterURL}},
|
||||||
|
wantOpErr: false,
|
||||||
|
wantCDSCache: map[string]cdsUpdate{},
|
||||||
|
wantWatchCallback: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
go testCDSCaching(t, ops, errCh)
|
||||||
|
|
||||||
|
timer := time.NewTimer(defaultTestTimeout)
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
t.Fatal("Timeout when expecting CDS update")
|
||||||
|
case err := <-errCh:
|
||||||
|
timer.Stop()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCDSWatchExpiryTimer tests the case where the client does not receive an
|
||||||
|
// CDS response for the request that it sends out. We want the watch callback
|
||||||
|
// to be invoked with an error once the watchExpiryTimer fires.
|
||||||
|
func TestCDSWatchExpiryTimer(t *testing.T) {
|
||||||
|
oldWatchExpiryTimeout := defaultWatchExpiryTimeout
|
||||||
|
defaultWatchExpiryTimeout = 1 * time.Second
|
||||||
|
defer func() {
|
||||||
|
defaultWatchExpiryTimeout = oldWatchExpiryTimeout
|
||||||
|
}()
|
||||||
|
|
||||||
|
fakeServer, sCleanup := fakexds.StartServer(t)
|
||||||
|
client, cCleanup := fakeServer.GetClientConn(t)
|
||||||
|
defer func() {
|
||||||
|
cCleanup()
|
||||||
|
sCleanup()
|
||||||
|
}()
|
||||||
|
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
||||||
|
defer v2c.close()
|
||||||
|
t.Log("Started xds v2Client...")
|
||||||
|
|
||||||
|
cdsCallbackCh := make(chan error, 1)
|
||||||
|
v2c.watchCDS(clusterName1, func(u cdsUpdate, err error) {
|
||||||
|
t.Logf("Received callback with cdsUpdate {%+v} and error {%v}", u, err)
|
||||||
|
if u.serviceName != "" {
|
||||||
|
cdsCallbackCh <- fmt.Errorf("received serviceName %v in cdsCallback, wanted empty string", u.serviceName)
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
cdsCallbackCh <- errors.New("received nil error in cdsCallback")
|
||||||
|
}
|
||||||
|
cdsCallbackCh <- nil
|
||||||
|
})
|
||||||
|
<-fakeServer.RequestChan
|
||||||
|
|
||||||
|
timer := time.NewTimer(2 * time.Second)
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
t.Fatalf("Timeout expired when expecting CDS update")
|
||||||
|
case err := <-cdsCallbackCh:
|
||||||
|
timer.Stop()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
badlyMarshaledCDSResponse = &xdspb.DiscoveryResponse{
|
||||||
|
Resources: []*anypb.Any{
|
||||||
|
{
|
||||||
|
TypeUrl: clusterURL,
|
||||||
|
Value: []byte{1, 2, 3, 4},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
TypeUrl: clusterURL,
|
||||||
|
}
|
||||||
|
goodCluster1 = &xdspb.Cluster{
|
||||||
|
Name: clusterName1,
|
||||||
|
ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS},
|
||||||
|
EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{
|
||||||
|
EdsConfig: &corepb.ConfigSource{
|
||||||
|
ConfigSourceSpecifier: &corepb.ConfigSource_Ads{
|
||||||
|
Ads: &corepb.AggregatedConfigSource{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ServiceName: serviceName1,
|
||||||
|
},
|
||||||
|
LbPolicy: xdspb.Cluster_ROUND_ROBIN,
|
||||||
|
LrsServer: &corepb.ConfigSource{
|
||||||
|
ConfigSourceSpecifier: &corepb.ConfigSource_Self{
|
||||||
|
Self: &corepb.SelfConfigSource{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
marshaledCluster1, _ = proto.Marshal(goodCluster1)
|
||||||
|
goodCluster2 = &xdspb.Cluster{
|
||||||
|
Name: clusterName2,
|
||||||
|
ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS},
|
||||||
|
EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{
|
||||||
|
EdsConfig: &corepb.ConfigSource{
|
||||||
|
ConfigSourceSpecifier: &corepb.ConfigSource_Ads{
|
||||||
|
Ads: &corepb.AggregatedConfigSource{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ServiceName: serviceName2,
|
||||||
|
},
|
||||||
|
LbPolicy: xdspb.Cluster_ROUND_ROBIN,
|
||||||
|
}
|
||||||
|
marshaledCluster2, _ = proto.Marshal(goodCluster2)
|
||||||
|
goodCDSResponse1 = &xdspb.DiscoveryResponse{
|
||||||
|
Resources: []*anypb.Any{
|
||||||
|
{
|
||||||
|
TypeUrl: clusterURL,
|
||||||
|
Value: marshaledCluster1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
TypeUrl: clusterURL,
|
||||||
|
}
|
||||||
|
goodCDSResponse2 = &xdspb.DiscoveryResponse{
|
||||||
|
Resources: []*anypb.Any{
|
||||||
|
{
|
||||||
|
TypeUrl: clusterURL,
|
||||||
|
Value: marshaledCluster2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
TypeUrl: clusterURL,
|
||||||
|
}
|
||||||
|
cdsResponseWithMultipleResources = &xdspb.DiscoveryResponse{
|
||||||
|
Resources: []*anypb.Any{
|
||||||
|
{
|
||||||
|
TypeUrl: clusterURL,
|
||||||
|
Value: marshaledCluster1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
TypeUrl: clusterURL,
|
||||||
|
Value: marshaledCluster2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
TypeUrl: clusterURL,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
@ -224,7 +224,7 @@ func (v2c *v2Client) handleEDSResponse(resp *xdspb.DiscoveryResponse) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if returnUpdate != nil {
|
if returnUpdate != nil {
|
||||||
wi.expiryTimer.Stop()
|
wi.stopTimer()
|
||||||
wi.callback.(edsCallback)(returnUpdate, nil)
|
wi.callback.(edsCallback)(returnUpdate, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,7 @@ import (
|
||||||
"google.golang.org/grpc/xds/internal/client/fakexds"
|
"google.golang.org/grpc/xds/internal/client/fakexds"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestParseEDSRespProto(t *testing.T) {
|
func TestEDSParseRespProto(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
m *xdspb.ClusterLoadAssignment
|
m *xdspb.ClusterLoadAssignment
|
||||||
|
|
@ -161,7 +161,7 @@ var (
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestHandleEDSResponse(t *testing.T) {
|
func TestEDSHandleResponse(t *testing.T) {
|
||||||
fakeServer, sCleanup := fakexds.StartServer(t)
|
fakeServer, sCleanup := fakexds.StartServer(t)
|
||||||
client, cCleanup := fakeServer.GetClientConn(t)
|
client, cCleanup := fakeServer.GetClientConn(t)
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
@ -169,6 +169,7 @@ func TestHandleEDSResponse(t *testing.T) {
|
||||||
sCleanup()
|
sCleanup()
|
||||||
}()
|
}()
|
||||||
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
||||||
|
defer v2c.close()
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
|
@ -272,9 +273,9 @@ func TestHandleEDSResponse(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestHandleEDSResponseWithoutEDSWatch tests the case where the v2Client
|
// TestEDSHandleResponseWithoutWatch tests the case where the v2Client
|
||||||
// receives an EDS response without a registered EDS watcher.
|
// receives an EDS response without a registered EDS watcher.
|
||||||
func TestHandleEDSResponseWithoutEDSWatch(t *testing.T) {
|
func TestEDSHandleResponseWithoutWatch(t *testing.T) {
|
||||||
fakeServer, sCleanup := fakexds.StartServer(t)
|
fakeServer, sCleanup := fakexds.StartServer(t)
|
||||||
client, cCleanup := fakeServer.GetClientConn(t)
|
client, cCleanup := fakeServer.GetClientConn(t)
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
|
||||||
|
|
@ -87,7 +87,7 @@ func (v2c *v2Client) handleLDSResponse(resp *xdspb.DiscoveryResponse) error {
|
||||||
if routeName == "" {
|
if routeName == "" {
|
||||||
err = fmt.Errorf("xds: LDS target %s not found in received response %+v", wi.target, resp)
|
err = fmt.Errorf("xds: LDS target %s not found in received response %+v", wi.target, resp)
|
||||||
}
|
}
|
||||||
wi.expiryTimer.Stop()
|
wi.stopTimer()
|
||||||
wi.callback.(ldsCallback)(ldsUpdate{routeName: routeName}, err)
|
wi.callback.(ldsCallback)(ldsUpdate{routeName: routeName}, err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,7 @@ import (
|
||||||
"google.golang.org/grpc/xds/internal/client/fakexds"
|
"google.golang.org/grpc/xds/internal/client/fakexds"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestGetRouteConfigNameFromListener(t *testing.T) {
|
func TestLDSGetRouteConfig(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
lis *xdspb.Listener
|
lis *xdspb.Listener
|
||||||
|
|
@ -87,10 +87,10 @@ func TestGetRouteConfigNameFromListener(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestHandleLDSResponse starts a fake xDS server, makes a ClientConn to it,
|
// TestLDSHandleResponse starts a fake xDS server, makes a ClientConn to it,
|
||||||
// and creates a v2Client using it. Then, it registers a watchLDS and tests
|
// and creates a v2Client using it. Then, it registers a watchLDS and tests
|
||||||
// different LDS responses.
|
// different LDS responses.
|
||||||
func TestHandleLDSResponse(t *testing.T) {
|
func TestLDSHandleResponse(t *testing.T) {
|
||||||
fakeServer, sCleanup := fakexds.StartServer(t)
|
fakeServer, sCleanup := fakexds.StartServer(t)
|
||||||
client, cCleanup := fakeServer.GetClientConn(t)
|
client, cCleanup := fakeServer.GetClientConn(t)
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
@ -98,6 +98,7 @@ func TestHandleLDSResponse(t *testing.T) {
|
||||||
sCleanup()
|
sCleanup()
|
||||||
}()
|
}()
|
||||||
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
||||||
|
defer v2c.close()
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
|
@ -224,9 +225,9 @@ func TestHandleLDSResponse(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestHandleLDSResponseWithoutWatch tests the case where the v2Client receives
|
// TestLDSHandleResponseWithoutWatch tests the case where the v2Client receives
|
||||||
// an LDS response without a registered watcher.
|
// an LDS response without a registered watcher.
|
||||||
func TestHandleLDSResponseWithoutWatch(t *testing.T) {
|
func TestLDSHandleResponseWithoutWatch(t *testing.T) {
|
||||||
fakeServer, sCleanup := fakexds.StartServer(t)
|
fakeServer, sCleanup := fakexds.StartServer(t)
|
||||||
client, cCleanup := fakeServer.GetClientConn(t)
|
client, cCleanup := fakeServer.GetClientConn(t)
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
@ -234,6 +235,7 @@ func TestHandleLDSResponseWithoutWatch(t *testing.T) {
|
||||||
sCleanup()
|
sCleanup()
|
||||||
}()
|
}()
|
||||||
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
||||||
|
defer v2c.close()
|
||||||
|
|
||||||
if v2c.handleLDSResponse(goodLDSResponse1) == nil {
|
if v2c.handleLDSResponse(goodLDSResponse1) == nil {
|
||||||
t.Fatal("v2c.handleLDSResponse() succeeded, should have failed")
|
t.Fatal("v2c.handleLDSResponse() succeeded, should have failed")
|
||||||
|
|
@ -257,6 +259,7 @@ func TestLDSWatchExpiryTimer(t *testing.T) {
|
||||||
sCleanup()
|
sCleanup()
|
||||||
}()
|
}()
|
||||||
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
||||||
|
defer v2c.close()
|
||||||
|
|
||||||
// Wait till the request makes it to the fakeServer. This ensures that
|
// Wait till the request makes it to the fakeServer. This ensures that
|
||||||
// the watch request has been processed by the v2Client.
|
// the watch request has been processed by the v2Client.
|
||||||
|
|
|
||||||
|
|
@ -104,7 +104,7 @@ func (v2c *v2Client) handleRDSResponse(resp *xdspb.DiscoveryResponse) error {
|
||||||
// incremental protocol, the fact that we did not receive the resource
|
// incremental protocol, the fact that we did not receive the resource
|
||||||
// that we are watching for in this response does not mean that the
|
// that we are watching for in this response does not mean that the
|
||||||
// server does not know about it.
|
// server does not know about it.
|
||||||
wi.expiryTimer.Stop()
|
wi.stopTimer()
|
||||||
wi.callback.(rdsCallback)(rdsUpdate{clusterName: returnCluster}, nil)
|
wi.callback.(rdsCallback)(rdsUpdate{clusterName: returnCluster}, nil)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -41,7 +41,7 @@ func (v2c *v2Client) cloneRDSCacheForTesting() map[string]string {
|
||||||
return cloneCache
|
return cloneCache
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetClusterFromRouteConfiguration(t *testing.T) {
|
func TestRDSGetClusterFromRouteConfiguration(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
rc *xdspb.RouteConfiguration
|
rc *xdspb.RouteConfiguration
|
||||||
|
|
@ -140,10 +140,10 @@ func TestGetClusterFromRouteConfiguration(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestHandleRDSResponse starts a fake xDS server, makes a ClientConn to it,
|
// TestRDSHandleResponse starts a fake xDS server, makes a ClientConn to it,
|
||||||
// and creates a v2Client using it. Then, it registers an LDS and RDS watcher
|
// and creates a v2Client using it. Then, it registers an LDS and RDS watcher
|
||||||
// and tests different RDS responses.
|
// and tests different RDS responses.
|
||||||
func TestHandleRDSResponse(t *testing.T) {
|
func TestRDSHandleResponse(t *testing.T) {
|
||||||
fakeServer, sCleanup := fakexds.StartServer(t)
|
fakeServer, sCleanup := fakexds.StartServer(t)
|
||||||
client, cCleanup := fakeServer.GetClientConn(t)
|
client, cCleanup := fakeServer.GetClientConn(t)
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
@ -151,6 +151,7 @@ func TestHandleRDSResponse(t *testing.T) {
|
||||||
sCleanup()
|
sCleanup()
|
||||||
}()
|
}()
|
||||||
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
||||||
|
defer v2c.close()
|
||||||
|
|
||||||
// Register an LDS watcher, and wait till the request is sent out, the
|
// Register an LDS watcher, and wait till the request is sent out, the
|
||||||
// response is received and the callback is invoked.
|
// response is received and the callback is invoked.
|
||||||
|
|
@ -263,9 +264,9 @@ func TestHandleRDSResponse(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestHandleRDSResponseWithoutLDSWatch tests the case where the v2Client
|
// TestRDSHandleResponseWithoutLDSWatch tests the case where the v2Client
|
||||||
// receives an RDS response without a registered LDS watcher.
|
// receives an RDS response without a registered LDS watcher.
|
||||||
func TestHandleRDSResponseWithoutLDSWatch(t *testing.T) {
|
func TestRDSHandleResponseWithoutLDSWatch(t *testing.T) {
|
||||||
fakeServer, sCleanup := fakexds.StartServer(t)
|
fakeServer, sCleanup := fakexds.StartServer(t)
|
||||||
client, cCleanup := fakeServer.GetClientConn(t)
|
client, cCleanup := fakeServer.GetClientConn(t)
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
@ -273,15 +274,16 @@ func TestHandleRDSResponseWithoutLDSWatch(t *testing.T) {
|
||||||
sCleanup()
|
sCleanup()
|
||||||
}()
|
}()
|
||||||
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
||||||
|
defer v2c.close()
|
||||||
|
|
||||||
if v2c.handleRDSResponse(goodRDSResponse1) == nil {
|
if v2c.handleRDSResponse(goodRDSResponse1) == nil {
|
||||||
t.Fatal("v2c.handleRDSResponse() succeeded, should have failed")
|
t.Fatal("v2c.handleRDSResponse() succeeded, should have failed")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestHandleRDSResponseWithoutRDSWatch tests the case where the v2Client
|
// TestRDSHandleResponseWithoutRDSWatch tests the case where the v2Client
|
||||||
// receives an RDS response without a registered RDS watcher.
|
// receives an RDS response without a registered RDS watcher.
|
||||||
func TestHandleRDSResponseWithoutRDSWatch(t *testing.T) {
|
func TestRDSHandleResponseWithoutRDSWatch(t *testing.T) {
|
||||||
fakeServer, sCleanup := fakexds.StartServer(t)
|
fakeServer, sCleanup := fakexds.StartServer(t)
|
||||||
client, cCleanup := fakeServer.GetClientConn(t)
|
client, cCleanup := fakeServer.GetClientConn(t)
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
@ -289,6 +291,7 @@ func TestHandleRDSResponseWithoutRDSWatch(t *testing.T) {
|
||||||
sCleanup()
|
sCleanup()
|
||||||
}()
|
}()
|
||||||
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
||||||
|
defer v2c.close()
|
||||||
|
|
||||||
// Register an LDS watcher, and wait till the request is sent out, the
|
// Register an LDS watcher, and wait till the request is sent out, the
|
||||||
// response is received and the callback is invoked.
|
// response is received and the callback is invoked.
|
||||||
|
|
@ -308,9 +311,9 @@ func TestHandleRDSResponseWithoutRDSWatch(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// testOp contains all data related to one particular test operation. Not all
|
// rdsTestOp contains all data related to one particular test operation. Not
|
||||||
// fields make sense for all tests.
|
// all fields make sense for all tests.
|
||||||
type testOp struct {
|
type rdsTestOp struct {
|
||||||
// target is the resource name to watch for.
|
// target is the resource name to watch for.
|
||||||
target string
|
target string
|
||||||
// responseToSend is the xDS response sent to the client
|
// responseToSend is the xDS response sent to the client
|
||||||
|
|
@ -326,9 +329,9 @@ type testOp struct {
|
||||||
// testRDSCaching is a helper function which starts a fake xDS server, makes a
|
// testRDSCaching is a helper function which starts a fake xDS server, makes a
|
||||||
// ClientConn to it, creates a v2Client using it, registers an LDS watcher and
|
// ClientConn to it, creates a v2Client using it, registers an LDS watcher and
|
||||||
// pushes a good LDS response. It then reads a bunch of test operations to be
|
// pushes a good LDS response. It then reads a bunch of test operations to be
|
||||||
// performed from testOps and returns error, if any, on the provided error
|
// performed from rdsTestOps and returns error, if any, on the provided error
|
||||||
// channel. This is executed in a separate goroutine.
|
// channel. This is executed in a separate goroutine.
|
||||||
func testRDSCaching(t *testing.T, testOps []testOp, errCh chan error) {
|
func testRDSCaching(t *testing.T, rdsTestOps []rdsTestOp, errCh chan error) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
fakeServer, sCleanup := fakexds.StartServer(t)
|
fakeServer, sCleanup := fakexds.StartServer(t)
|
||||||
|
|
@ -356,15 +359,15 @@ func testRDSCaching(t *testing.T, testOps []testOp, errCh chan error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
callbackCh := make(chan struct{}, 1)
|
callbackCh := make(chan struct{}, 1)
|
||||||
for _, testOp := range testOps {
|
for _, rdsTestOp := range rdsTestOps {
|
||||||
// Register a watcher if required, and use a channel to signal the
|
// Register a watcher if required, and use a channel to signal the
|
||||||
// successful invocation of the callback.
|
// successful invocation of the callback.
|
||||||
if testOp.target != "" {
|
if rdsTestOp.target != "" {
|
||||||
v2c.watchRDS(testOp.target, func(u rdsUpdate, err error) {
|
v2c.watchRDS(rdsTestOp.target, func(u rdsUpdate, err error) {
|
||||||
t.Logf("Received callback with rdsUpdate {%+v} and error {%v}", u, err)
|
t.Logf("Received callback with rdsUpdate {%+v} and error {%v}", u, err)
|
||||||
callbackCh <- struct{}{}
|
callbackCh <- struct{}{}
|
||||||
})
|
})
|
||||||
t.Logf("Registered a watcher for LDS target: %v...", testOp.target)
|
t.Logf("Registered a watcher for RDS target: %v...", rdsTestOp.target)
|
||||||
|
|
||||||
// Wait till the request makes it to the fakeServer. This ensures that
|
// Wait till the request makes it to the fakeServer. This ensures that
|
||||||
// the watch request has been processed by the v2Client.
|
// the watch request has been processed by the v2Client.
|
||||||
|
|
@ -374,8 +377,8 @@ func testRDSCaching(t *testing.T, testOps []testOp, errCh chan error) {
|
||||||
|
|
||||||
// Directly push the response through a call to handleRDSResponse,
|
// Directly push the response through a call to handleRDSResponse,
|
||||||
// thereby bypassing the fakeServer.
|
// thereby bypassing the fakeServer.
|
||||||
if testOp.responseToSend != nil {
|
if rdsTestOp.responseToSend != nil {
|
||||||
if err := v2c.handleRDSResponse(testOp.responseToSend.Resp); (err != nil) != testOp.wantOpErr {
|
if err := v2c.handleRDSResponse(rdsTestOp.responseToSend.Resp); (err != nil) != rdsTestOp.wantOpErr {
|
||||||
errCh <- fmt.Errorf("v2c.handleRDSResponse() returned err: %v", err)
|
errCh <- fmt.Errorf("v2c.handleRDSResponse() returned err: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -384,12 +387,12 @@ func testRDSCaching(t *testing.T, testOps []testOp, errCh chan error) {
|
||||||
// If the test needs the callback to be invoked, just verify that
|
// If the test needs the callback to be invoked, just verify that
|
||||||
// it was invoked. Since we verify the contents of the cache, it's
|
// it was invoked. Since we verify the contents of the cache, it's
|
||||||
// ok not to verify the contents of the callback.
|
// ok not to verify the contents of the callback.
|
||||||
if testOp.wantWatchCallback {
|
if rdsTestOp.wantWatchCallback {
|
||||||
<-callbackCh
|
<-callbackCh
|
||||||
}
|
}
|
||||||
|
|
||||||
if !reflect.DeepEqual(v2c.cloneRDSCacheForTesting(), testOp.wantRDSCache) {
|
if !reflect.DeepEqual(v2c.cloneRDSCacheForTesting(), rdsTestOp.wantRDSCache) {
|
||||||
errCh <- fmt.Errorf("gotRDSCache: %v, wantRDSCache: %v", v2c.rdsCache, testOp.wantRDSCache)
|
errCh <- fmt.Errorf("gotRDSCache: %v, wantRDSCache: %v", v2c.rdsCache, rdsTestOp.wantRDSCache)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -401,7 +404,7 @@ func testRDSCaching(t *testing.T, testOps []testOp, errCh chan error) {
|
||||||
// verifies the RDS data cached at the v2Client.
|
// verifies the RDS data cached at the v2Client.
|
||||||
func TestRDSCaching(t *testing.T) {
|
func TestRDSCaching(t *testing.T) {
|
||||||
errCh := make(chan error, 1)
|
errCh := make(chan error, 1)
|
||||||
ops := []testOp{
|
ops := []rdsTestOp{
|
||||||
// Add an RDS watch for a resource name (goodRouteName1), which returns one
|
// Add an RDS watch for a resource name (goodRouteName1), which returns one
|
||||||
// matching resource in the response.
|
// matching resource in the response.
|
||||||
{
|
{
|
||||||
|
|
@ -458,6 +461,9 @@ func TestRDSCaching(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestRDSWatchExpiryTimer tests the case where the client does not receive an
|
||||||
|
// RDS response for the request that it sends out. We want the watch callback
|
||||||
|
// to be invoked with an error once the watchExpiryTimer fires.
|
||||||
func TestRDSWatchExpiryTimer(t *testing.T) {
|
func TestRDSWatchExpiryTimer(t *testing.T) {
|
||||||
oldWatchExpiryTimeout := defaultWatchExpiryTimeout
|
oldWatchExpiryTimeout := defaultWatchExpiryTimeout
|
||||||
defaultWatchExpiryTimeout = 1 * time.Second
|
defaultWatchExpiryTimeout = 1 * time.Second
|
||||||
|
|
|
||||||
|
|
@ -69,16 +69,27 @@ func (wi *watchInfo) cancel() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// stopTimer stops the expiry timer without cancelling the watch.
|
||||||
|
func (wi *watchInfo) stopTimer() {
|
||||||
|
if wi.expiryTimer != nil {
|
||||||
|
wi.expiryTimer.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type ldsUpdate struct {
|
type ldsUpdate struct {
|
||||||
routeName string
|
routeName string
|
||||||
}
|
}
|
||||||
|
|
||||||
type ldsCallback func(ldsUpdate, error)
|
type ldsCallback func(ldsUpdate, error)
|
||||||
|
|
||||||
type rdsUpdate struct {
|
type rdsUpdate struct {
|
||||||
clusterName string
|
clusterName string
|
||||||
}
|
}
|
||||||
|
|
||||||
type rdsCallback func(rdsUpdate, error)
|
type rdsCallback func(rdsUpdate, error)
|
||||||
|
|
||||||
|
type cdsUpdate struct {
|
||||||
|
serviceName string
|
||||||
|
doLRS bool
|
||||||
|
}
|
||||||
|
type cdsCallback func(cdsUpdate, error)
|
||||||
|
|
||||||
type edsCallback func(*EDSUpdate, error)
|
type edsCallback func(*EDSUpdate, error)
|
||||||
|
|
|
||||||
|
|
@ -74,6 +74,16 @@ type v2Client struct {
|
||||||
// unrequested resources.
|
// unrequested resources.
|
||||||
// https://github.com/envoyproxy/envoy/blob/master/api/xds_protocol.rst#resource-hints
|
// https://github.com/envoyproxy/envoy/blob/master/api/xds_protocol.rst#resource-hints
|
||||||
rdsCache map[string]string
|
rdsCache map[string]string
|
||||||
|
// rdsCache maintains a mapping of {clusterName --> cdsUpdate} from
|
||||||
|
// validated cluster configurations received in CDS responses. We cache all
|
||||||
|
// valid cluster configurations, whether or not we are interested in them
|
||||||
|
// when we received them (because we could become interested in them in the
|
||||||
|
// future and the server wont send us those resources again). This is only
|
||||||
|
// to support legacy management servers that do not honor the
|
||||||
|
// resource_names field. As per the latest spec, the server should resend
|
||||||
|
// the response when the request changes, even if it had sent the same
|
||||||
|
// resource earlier (when not asked for). Protected by the above mutex.
|
||||||
|
cdsCache map[string]cdsUpdate
|
||||||
}
|
}
|
||||||
|
|
||||||
// newV2Client creates a new v2Client initialized with the passed arguments.
|
// newV2Client creates a new v2Client initialized with the passed arguments.
|
||||||
|
|
@ -85,6 +95,7 @@ func newV2Client(cc *grpc.ClientConn, nodeProto *corepb.Node, backoff func(int)
|
||||||
watchCh: buffer.NewUnbounded(),
|
watchCh: buffer.NewUnbounded(),
|
||||||
watchMap: make(map[resourceType]*watchInfo),
|
watchMap: make(map[resourceType]*watchInfo),
|
||||||
rdsCache: make(map[string]string),
|
rdsCache: make(map[string]string),
|
||||||
|
cdsCache: make(map[string]cdsUpdate),
|
||||||
}
|
}
|
||||||
v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background())
|
v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
|
@ -163,6 +174,10 @@ func (v2c *v2Client) sendExisting(stream adsStream) bool {
|
||||||
if !v2c.sendRDS(stream, wi.target) {
|
if !v2c.sendRDS(stream, wi.target) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
case cdsResource:
|
||||||
|
if !v2c.sendCDS(stream, wi.target) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
case edsResource:
|
case edsResource:
|
||||||
if !v2c.sendEDS(stream, wi.target) {
|
if !v2c.sendEDS(stream, wi.target) {
|
||||||
return false
|
return false
|
||||||
|
|
@ -206,6 +221,10 @@ func (v2c *v2Client) send(stream adsStream, done chan struct{}) {
|
||||||
if !v2c.sendRDS(stream, target) {
|
if !v2c.sendRDS(stream, target) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
case cdsResource:
|
||||||
|
if !v2c.sendCDS(stream, target) {
|
||||||
|
return
|
||||||
|
}
|
||||||
case edsResource:
|
case edsResource:
|
||||||
if !v2c.sendEDS(stream, target) {
|
if !v2c.sendEDS(stream, target) {
|
||||||
return
|
return
|
||||||
|
|
@ -239,6 +258,11 @@ func (v2c *v2Client) recv(stream adsStream) bool {
|
||||||
grpclog.Warningf("xds: RDS response handler failed: %v", err)
|
grpclog.Warningf("xds: RDS response handler failed: %v", err)
|
||||||
return success
|
return success
|
||||||
}
|
}
|
||||||
|
case clusterURL:
|
||||||
|
if err := v2c.handleCDSResponse(resp); err != nil {
|
||||||
|
grpclog.Warningf("xds: CDS response handler failed: %v", err)
|
||||||
|
return success
|
||||||
|
}
|
||||||
case endpointURL:
|
case endpointURL:
|
||||||
if err := v2c.handleEDSResponse(resp); err != nil {
|
if err := v2c.handleEDSResponse(resp); err != nil {
|
||||||
grpclog.Warningf("xds: EDS response handler failed: %v", err)
|
grpclog.Warningf("xds: EDS response handler failed: %v", err)
|
||||||
|
|
@ -296,6 +320,27 @@ func (v2c *v2Client) watchRDS(routeName string, rdsCb rdsCallback) (cancel func(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// watchCDS registers an CDS watcher for the provided clusterName. Updates
|
||||||
|
// corresponding to received CDS responses will be pushed to the provided
|
||||||
|
// callback. The caller can cancel the watch by invoking the returned cancel
|
||||||
|
// function.
|
||||||
|
// The provided callback should not block or perform any expensive operations
|
||||||
|
// or call other methods of the v2Client object.
|
||||||
|
func (v2c *v2Client) watchCDS(clusterName string, cdsCb cdsCallback) (cancel func()) {
|
||||||
|
wi := &watchInfo{wType: cdsResource, target: []string{clusterName}, callback: cdsCb}
|
||||||
|
v2c.watchCh.Put(wi)
|
||||||
|
return func() {
|
||||||
|
v2c.mu.Lock()
|
||||||
|
defer v2c.mu.Unlock()
|
||||||
|
if wi.state == watchEnqueued {
|
||||||
|
wi.state = watchCancelled
|
||||||
|
return
|
||||||
|
}
|
||||||
|
v2c.watchMap[cdsResource].cancel()
|
||||||
|
delete(v2c.watchMap, cdsResource)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// watchEDS registers an EDS watcher for the provided clusterName. Updates
|
// watchEDS registers an EDS watcher for the provided clusterName. Updates
|
||||||
// corresponding to received EDS responses will be pushed to the provided
|
// corresponding to received EDS responses will be pushed to the provided
|
||||||
// callback. The caller can cancel the watch by invoking the returned cancel
|
// callback. The caller can cancel the watch by invoking the returned cancel
|
||||||
|
|
@ -333,19 +378,17 @@ func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) {
|
||||||
|
|
||||||
v2c.watchMap[wi.wType] = wi
|
v2c.watchMap[wi.wType] = wi
|
||||||
switch wi.wType {
|
switch wi.wType {
|
||||||
|
// We need to grab the lock inside of the expiryTimer's afterFunc because
|
||||||
|
// we need to access the watchInfo, which is stored in the watchMap.
|
||||||
case ldsResource:
|
case ldsResource:
|
||||||
wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
|
wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
|
||||||
// We need to grab the lock here because we are accessing the
|
|
||||||
// watchInfo (which is now stored in the watchMap) from this
|
|
||||||
// method which will be called when the timer fires.
|
|
||||||
v2c.mu.Lock()
|
v2c.mu.Lock()
|
||||||
wi.callback.(ldsCallback)(ldsUpdate{routeName: ""}, fmt.Errorf("xds: LDS target %s not found", wi.target))
|
wi.callback.(ldsCallback)(ldsUpdate{}, fmt.Errorf("xds: LDS target %s not found", wi.target))
|
||||||
v2c.mu.Unlock()
|
v2c.mu.Unlock()
|
||||||
})
|
})
|
||||||
case rdsResource:
|
case rdsResource:
|
||||||
routeName := wi.target[0]
|
routeName := wi.target[0]
|
||||||
if cluster := v2c.rdsCache[routeName]; cluster != "" {
|
if cluster := v2c.rdsCache[routeName]; cluster != "" {
|
||||||
// Invoke the callback now, since we found the entry in the cache.
|
|
||||||
var err error
|
var err error
|
||||||
if v2c.watchMap[ldsResource] == nil {
|
if v2c.watchMap[ldsResource] == nil {
|
||||||
cluster = ""
|
cluster = ""
|
||||||
|
|
@ -357,18 +400,27 @@ func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) {
|
||||||
// Add the watch expiry timer only for new watches we don't find in
|
// Add the watch expiry timer only for new watches we don't find in
|
||||||
// the cache, and return from here.
|
// the cache, and return from here.
|
||||||
wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
|
wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
|
||||||
// We need to grab the lock here because we are accessing the
|
|
||||||
// watchInfo (which is now stored in the watchMap) from this
|
|
||||||
// method which will be called when the timer fires.
|
|
||||||
v2c.mu.Lock()
|
v2c.mu.Lock()
|
||||||
wi.callback.(rdsCallback)(rdsUpdate{clusterName: ""}, fmt.Errorf("xds: RDS target %s not found", wi.target))
|
wi.callback.(rdsCallback)(rdsUpdate{}, fmt.Errorf("xds: RDS target %s not found", wi.target))
|
||||||
|
v2c.mu.Unlock()
|
||||||
|
})
|
||||||
|
case cdsResource:
|
||||||
|
clusterName := wi.target[0]
|
||||||
|
if update, ok := v2c.cdsCache[clusterName]; ok {
|
||||||
|
var err error
|
||||||
|
if v2c.watchMap[cdsResource] == nil {
|
||||||
|
err = fmt.Errorf("xds: no CDS watcher found when handling CDS watch for cluster {%v} from cache", clusterName)
|
||||||
|
}
|
||||||
|
wi.callback.(cdsCallback)(update, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
|
||||||
|
v2c.mu.Lock()
|
||||||
|
wi.callback.(cdsCallback)(cdsUpdate{}, fmt.Errorf("xds: CDS target %s not found", wi.target))
|
||||||
v2c.mu.Unlock()
|
v2c.mu.Unlock()
|
||||||
})
|
})
|
||||||
case edsResource:
|
case edsResource:
|
||||||
wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
|
wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
|
||||||
// We need to grab the lock here because we are accessing the
|
|
||||||
// watchInfo (which is now stored in the watchMap) from this
|
|
||||||
// method which will be called when the timer fires.
|
|
||||||
v2c.mu.Lock()
|
v2c.mu.Lock()
|
||||||
wi.callback.(edsCallback)(nil, fmt.Errorf("xds: EDS target %s not found", wi.target))
|
wi.callback.(edsCallback)(nil, fmt.Errorf("xds: EDS target %s not found", wi.target))
|
||||||
v2c.mu.Unlock()
|
v2c.mu.Unlock()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue