xds: client test cleanup. (#3284)

This commit is contained in:
Easwar Swaminathan 2019-12-27 15:00:59 -08:00 committed by GitHub
parent c618975385
commit 8a65b8453b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 505 additions and 845 deletions

View File

@ -34,7 +34,7 @@ import (
xdsbalancer "google.golang.org/grpc/xds/internal/balancer"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakexds"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
)
const (
@ -55,7 +55,7 @@ type cdsWatchInfo struct {
// invokeWatchCb invokes the CDS watch callback registered by the cdsBalancer
// and waits for appropriate state to be pushed to the provided edsBalancer.
func invokeWatchCbAndWait(xdsC *fakexds.Client, cdsW cdsWatchInfo, wantCCS balancer.ClientConnState, edsB *testEDSBalancer) error {
func invokeWatchCbAndWait(xdsC *fakeclient.Client, cdsW cdsWatchInfo, wantCCS balancer.ClientConnState, edsB *testEDSBalancer) error {
xdsC.InvokeWatchClusterCallback(cdsW.update, cdsW.err)
if cdsW.err != nil {
return edsB.waitForResolverError(cdsW.err)
@ -224,10 +224,10 @@ func setup() (*cdsBalancer, *testEDSBalancer, func()) {
// setupWithWatch does everything that setup does, and also pushes a ClientConn
// update to the cdsBalancer and waits for a CDS watch call to be registered.
func setupWithWatch(t *testing.T) (*fakexds.Client, *cdsBalancer, *testEDSBalancer, func()) {
func setupWithWatch(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, func()) {
t.Helper()
xdsC := fakexds.NewClient()
xdsC := fakeclient.NewClient()
cdsB, edsB, cancel := setup()
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil {
t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err)
@ -246,7 +246,7 @@ func setupWithWatch(t *testing.T) (*fakexds.Client, *cdsBalancer, *testEDSBalanc
// cdsBalancer with different inputs and verifies that the CDS watch API on the
// provided xdsClient is invoked appropriately.
func TestUpdateClientConnState(t *testing.T) {
xdsC := fakexds.NewClient()
xdsC := fakeclient.NewClient()
tests := []struct {
name string
@ -324,7 +324,7 @@ func TestUpdateClientConnStateAfterClose(t *testing.T) {
defer cancel()
cdsB.Close()
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, fakexds.NewClient())); err != errBalancerClosed {
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, fakeclient.NewClient())); err != errBalancerClosed {
t.Fatalf("UpdateClientConnState() after close returned %v, want %v", err, errBalancerClosed)
}
}

View File

@ -32,7 +32,8 @@ import (
xdsinternal "google.golang.org/grpc/xds/internal"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakexds"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
)
const (
@ -54,7 +55,7 @@ var (
// * Sends updates with different edsServiceNames and expects new watches to be
// registered.
func (s) TestClientWrapperWatchEDS(t *testing.T) {
fakeServer, cleanup, err := fakexds.StartServer()
fakeServer, cleanup, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
@ -101,7 +102,7 @@ func (s) TestClientWrapperWatchEDS(t *testing.T) {
if err != nil {
t.Fatalf("EDS RPC failed with err: %v", err)
}
edsReq := req.(*fakexds.Request)
edsReq := req.(*fakeserver.Request)
if edsReq.Err != nil {
t.Fatalf("EDS RPC failed with err: %v", edsReq.Err)
}
@ -123,9 +124,9 @@ func (s) TestClientWrapperWatchEDS(t *testing.T) {
//
// The test does the following:
// * Creates a clientWrapper.
// * Creates a fakexds.Client and passes it to the clientWrapper in attributes.
// * Creates a fakeclient.Client and passes it to the clientWrapper in attributes.
// * Verifies the clientWrapper registers an EDS watch.
// * Forces the fakexds.Client to invoke the registered EDS watch callback with
// * Forces the fakeclient.Client to invoke the registered EDS watch callback with
// an error. Verifies that the wrapper does not invoke the top-level
// edsBalancer with the received error.
func (s) TestClientWrapperHandleUpdateError(t *testing.T) {
@ -138,7 +139,7 @@ func (s) TestClientWrapperHandleUpdateError(t *testing.T) {
cw := newXDSClientWrapper(newEDS, nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil)
defer cw.close()
xdsC := fakexds.NewClient()
xdsC := fakeclient.NewClient()
cw.handleUpdate(&XDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC))
gotCluster, err := xdsC.WaitForWatchEDS()
if err != nil {
@ -173,7 +174,7 @@ func (s) TestClientWrapperGetsXDSClientInAttributes(t *testing.T) {
defer cw.close()
// Verify that the eds watch is registered for the expected resource name.
xdsC1 := fakexds.NewClient()
xdsC1 := fakeclient.NewClient()
cw.handleUpdate(&XDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC1))
gotCluster, err := xdsC1.WaitForWatchEDS()
if err != nil {
@ -187,7 +188,7 @@ func (s) TestClientWrapperGetsXDSClientInAttributes(t *testing.T) {
// re-registered on the new client, and that the old client is not closed
// (because clientWrapper only closes clients that it creates, it does not
// close client that are passed through attributes).
xdsC2 := fakexds.NewClient()
xdsC2 := fakeclient.NewClient()
cw.handleUpdate(&XDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC2))
gotCluster, err = xdsC2.WaitForWatchEDS()
if err != nil {

View File

@ -25,7 +25,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/testutils/fakexds"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
)
// TestXDSLoadReporting verifies that the edsBalancer starts the loadReport
@ -40,7 +40,7 @@ func (s) TestXDSLoadReporting(t *testing.T) {
}
defer edsB.Close()
xdsC := fakexds.NewClient()
xdsC := fakeclient.NewClient()
edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, xdsC)},
BalancerConfig: &XDSConfig{LrsLoadReportingServerName: new(string)},

View File

@ -43,7 +43,7 @@ import (
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakexds"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
)
var lbABuilder = &balancerABuilder{}
@ -317,7 +317,7 @@ func (s) TestXDSFallbackResolvedAddrs(t *testing.T) {
// waitForNewXDSClientWithEDSWatch makes sure that a new xdsClient is created
// with the provided name. It also make sure that the newly created client
// registers an eds watcher.
func waitForNewXDSClientWithEDSWatch(t *testing.T, ch *testutils.Channel, wantName string) *fakexds.Client {
func waitForNewXDSClientWithEDSWatch(t *testing.T, ch *testutils.Channel, wantName string) *fakeclient.Client {
t.Helper()
val, err := ch.Receive()
@ -325,7 +325,7 @@ func waitForNewXDSClientWithEDSWatch(t *testing.T, ch *testutils.Channel, wantNa
t.Fatalf("error when waiting for a new xds client: %v", err)
return nil
}
xdsC := val.(*fakexds.Client)
xdsC := val.(*fakeclient.Client)
if xdsC.Name() != wantName {
t.Fatalf("xdsClient created to balancer: %v, want %v", xdsC.Name(), wantName)
return nil
@ -365,7 +365,7 @@ func setup(edsLBCh *testutils.Channel, xdsClientCh *testutils.Channel) func() {
origXdsClientNew := xdsclientNew
xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) {
xdsC := fakexds.NewClientWithName(opts.Config.BalancerName)
xdsC := fakeclient.NewClientWithName(opts.Config.BalancerName)
defer func() { xdsClientCh.Send(xdsC) }()
return xdsC, nil
}
@ -527,7 +527,7 @@ func (s) TestXDSConnfigChildPolicyUpdate(t *testing.T) {
// will not kick-in as yet.
// * Sends another ClientConn update with fallback addresses. Still fallback
// would not have kicked in because the startupTimeout hasn't expired.
// * Sends an EDSUpdate through the fakexds.Client object. This will trigger
// * Sends an EDSUpdate through the fakeclient.Client object. This will trigger
// the creation of an edsLB object. This is verified.
// * Trigger fallback by directly calling the loseContact method on the
// top-level edsBalancer. This should instantiate the fallbackLB and should

View File

@ -25,11 +25,13 @@ import (
"testing"
"time"
discoverypb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/golang/protobuf/proto"
anypb "github.com/golang/protobuf/ptypes/any"
"google.golang.org/grpc/xds/internal/client/fakexds"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
)
const (
@ -164,13 +166,10 @@ func TestValidateCluster(t *testing.T) {
// and creates a v2Client using it. Then, it registers a CDS watcher and tests
// different CDS responses.
func TestCDSHandleResponse(t *testing.T) {
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
tests := []struct {
@ -230,7 +229,7 @@ func TestCDSHandleResponse(t *testing.T) {
wantUpdateErr: test.wantUpdateErr,
cdsWatch: v2c.watchCDS,
watchReqChan: fakeServer.RequestChan,
watchReqChan: fakeServer.XDSRequestChan,
handleXDSResp: v2c.handleCDSResponse,
})
})
@ -240,13 +239,10 @@ func TestCDSHandleResponse(t *testing.T) {
// TestCDSHandleResponseWithoutWatch tests the case where the v2Client receives
// a CDS response without a registered watcher.
func TestCDSHandleResponseWithoutWatch(t *testing.T) {
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
_, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
if v2c.handleCDSResponse(goodCDSResponse1) == nil {
@ -260,7 +256,7 @@ type cdsTestOp struct {
// target is the resource name to watch for.
target string
// responseToSend is the xDS response sent to the client
responseToSend *fakexds.Response
responseToSend *fakeserver.Response
// wantOpErr specfies whether the main operation should return an error.
wantOpErr bool
// wantCDSCache is the expected rdsCache at the end of an operation.
@ -273,16 +269,13 @@ type cdsTestOp struct {
// ClientConn to it, creates a v2Client using it. It then reads a bunch of
// test operations to be performed from cdsTestOps and returns error, if any,
// on the provided error channel. This is executed in a separate goroutine.
func testCDSCaching(t *testing.T, cdsTestOps []cdsTestOp, errCh chan error) {
func testCDSCaching(t *testing.T, cdsTestOps []cdsTestOp, errCh *testutils.Channel) {
t.Helper()
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
@ -299,15 +292,19 @@ func testCDSCaching(t *testing.T, cdsTestOps []cdsTestOp, errCh chan error) {
// Wait till the request makes it to the fakeServer. This ensures that
// the watch request has been processed by the v2Client.
<-fakeServer.RequestChan
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
errCh.Send(fmt.Errorf("Timeout waiting for CDS request: %v", err))
return
}
t.Log("FakeServer received request...")
}
// Directly push the response through a call to handleCDSResponse,
// thereby bypassing the fakeServer.
if cdsTestOp.responseToSend != nil {
if err := v2c.handleCDSResponse(cdsTestOp.responseToSend.Resp); (err != nil) != cdsTestOp.wantOpErr {
errCh <- fmt.Errorf("v2c.handleCDSResponse() returned err: %v", err)
resp := cdsTestOp.responseToSend.Resp.(*discoverypb.DiscoveryResponse)
if err := v2c.handleCDSResponse(resp); (err != nil) != cdsTestOp.wantOpErr {
errCh.Send(fmt.Errorf("v2c.handleRDSResponse(%+v) returned err: %v", resp, err))
return
}
}
@ -320,24 +317,23 @@ func testCDSCaching(t *testing.T, cdsTestOps []cdsTestOp, errCh chan error) {
}
if !reflect.DeepEqual(v2c.cloneCDSCacheForTesting(), cdsTestOp.wantCDSCache) {
errCh <- fmt.Errorf("gotCDSCache: %v, wantCDSCache: %v", v2c.rdsCache, cdsTestOp.wantCDSCache)
errCh.Send(fmt.Errorf("gotCDSCache: %v, wantCDSCache: %v", v2c.rdsCache, cdsTestOp.wantCDSCache))
return
}
}
t.Log("Completed all test ops successfully...")
errCh <- nil
errCh.Send(nil)
}
// TestCDSCaching tests some end-to-end CDS flows using a fake xDS server, and
// verifies the CDS data cached at the v2Client.
func TestCDSCaching(t *testing.T) {
errCh := make(chan error, 1)
ops := []cdsTestOp{
// Add an CDS watch for a cluster name (clusterName1), which returns one
// matching resource in the response.
{
target: clusterName1,
responseToSend: &fakexds.Response{Resp: goodCDSResponse1},
responseToSend: &fakeserver.Response{Resp: goodCDSResponse1},
wantCDSCache: map[string]CDSUpdate{
clusterName1: {serviceName1, true},
},
@ -346,7 +342,7 @@ func TestCDSCaching(t *testing.T) {
// Push an CDS response which contains a new resource (apart from the
// one received in the previous response). This should be cached.
{
responseToSend: &fakexds.Response{Resp: cdsResponseWithMultipleResources},
responseToSend: &fakeserver.Response{Resp: cdsResponseWithMultipleResources},
wantCDSCache: map[string]CDSUpdate{
clusterName1: {serviceName1, true},
clusterName2: {serviceName2, false},
@ -366,24 +362,15 @@ func TestCDSCaching(t *testing.T) {
},
// Push an empty CDS response. This should clear the cache.
{
responseToSend: &fakexds.Response{Resp: &xdspb.DiscoveryResponse{TypeUrl: cdsURL}},
responseToSend: &fakeserver.Response{Resp: &xdspb.DiscoveryResponse{TypeUrl: cdsURL}},
wantOpErr: false,
wantCDSCache: map[string]CDSUpdate{},
wantWatchCallback: true,
},
}
errCh := testutils.NewChannel()
go testCDSCaching(t, ops, errCh)
timer := time.NewTimer(defaultTestTimeout)
select {
case <-timer.C:
t.Fatal("Timeout when expecting CDS update")
case err := <-errCh:
timer.Stop()
if err != nil {
t.Fatal(err)
}
}
waitForNilErr(t, errCh)
}
// TestCDSWatchExpiryTimer tests the case where the client does not receive an
@ -391,44 +378,36 @@ func TestCDSCaching(t *testing.T) {
// to be invoked with an error once the watchExpiryTimer fires.
func TestCDSWatchExpiryTimer(t *testing.T) {
oldWatchExpiryTimeout := defaultWatchExpiryTimeout
defaultWatchExpiryTimeout = 1 * time.Second
defaultWatchExpiryTimeout = 500 * time.Millisecond
defer func() {
defaultWatchExpiryTimeout = oldWatchExpiryTimeout
}()
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
cdsCallbackCh := make(chan error, 1)
callbackCh := testutils.NewChannel()
v2c.watchCDS(clusterName1, func(u CDSUpdate, err error) {
t.Logf("Received callback with CDSUpdate {%+v} and error {%v}", u, err)
if u.ServiceName != "" {
cdsCallbackCh <- fmt.Errorf("received serviceName %v in cdsCallback, wanted empty string", u.ServiceName)
callbackCh.Send(fmt.Errorf("received serviceName %v in cdsCallback, wanted empty string", u.ServiceName))
}
if err == nil {
cdsCallbackCh <- errors.New("received nil error in cdsCallback")
callbackCh.Send(errors.New("received nil error in cdsCallback"))
}
cdsCallbackCh <- nil
callbackCh.Send(nil)
})
<-fakeServer.RequestChan
timer := time.NewTimer(2 * time.Second)
select {
case <-timer.C:
t.Fatalf("Timeout expired when expecting CDS update")
case err := <-cdsCallbackCh:
timer.Stop()
if err != nil {
t.Fatal(err)
}
// Wait till the request makes it to the fakeServer. This ensures that
// the watch request has been processed by the v2Client.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an CDS request")
}
waitForNilErr(t, callbackCh)
}
var (

View File

@ -26,7 +26,8 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/xds/internal/client/bootstrap"
"google.golang.org/grpc/xds/internal/client/fakexds"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
)
@ -45,7 +46,10 @@ func clientOpts(balancerName string) Options {
}
func TestNew(t *testing.T) {
fakeServer, cleanup := fakexds.StartServer(t)
fakeServer, cleanup, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
defer cleanup()
tests := []struct {
@ -107,7 +111,10 @@ func TestNew(t *testing.T) {
// TestWatchService tests the happy case of registering a watcher for
// service updates and receiving a good update.
func TestWatchService(t *testing.T) {
fakeServer, cleanup := fakexds.StartServer(t)
fakeServer, cleanup, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
defer cleanup()
xdsClient, err := New(clientOpts(fakeServer.Address))
@ -117,37 +124,33 @@ func TestWatchService(t *testing.T) {
defer xdsClient.Close()
t.Log("Created an xdsClient...")
callbackCh := make(chan error, 1)
callbackCh := testutils.NewChannel()
cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) {
if err != nil {
callbackCh <- fmt.Errorf("xdsClient.WatchService returned error: %v", err)
callbackCh.Send(fmt.Errorf("xdsClient.WatchService returned error: %v", err))
return
}
if su.Cluster != goodClusterName1 {
callbackCh <- fmt.Errorf("got clusterName: %+v, want clusterName: %+v", su.Cluster, goodClusterName1)
callbackCh.Send(fmt.Errorf("got clusterName: %+v, want clusterName: %+v", su.Cluster, goodClusterName1))
return
}
callbackCh <- nil
callbackCh.Send(nil)
})
defer cancelWatch()
t.Log("Registered a watcher for service updates...")
// Make the fakeServer send LDS and RDS responses.
<-fakeServer.RequestChan
fakeServer.ResponseChan <- &fakexds.Response{Resp: goodLDSResponse1}
<-fakeServer.RequestChan
fakeServer.ResponseChan <- &fakexds.Response{Resp: goodRDSResponse1}
timer := time.NewTimer(defaultTestTimeout)
select {
case <-timer.C:
t.Fatal("Timeout when expecting a service update")
case err := <-callbackCh:
timer.Stop()
if err != nil {
t.Fatal(err)
}
// Make the fakeServer send LDS response.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS request")
}
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1}
// Make the fakeServer send RDS response.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an RDS request")
}
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodRDSResponse1}
waitForNilErr(t, callbackCh)
}
// TestWatchServiceWithNoResponseFromServer tests the case where the
@ -155,7 +158,10 @@ func TestWatchService(t *testing.T) {
// registering a service update watcher. The underlying v2Client will timeout
// and will send us an error.
func TestWatchServiceWithNoResponseFromServer(t *testing.T) {
fakeServer, cleanup := fakexds.StartServer(t)
fakeServer, cleanup, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
defer cleanup()
xdsClient, err := New(clientOpts(fakeServer.Address))
@ -166,45 +172,40 @@ func TestWatchServiceWithNoResponseFromServer(t *testing.T) {
t.Log("Created an xdsClient...")
oldWatchExpiryTimeout := defaultWatchExpiryTimeout
defaultWatchExpiryTimeout = 1 * time.Second
defaultWatchExpiryTimeout = 500 * time.Millisecond
defer func() {
defaultWatchExpiryTimeout = oldWatchExpiryTimeout
}()
callbackCh := make(chan error, 1)
callbackCh := testutils.NewChannel()
cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) {
if su.Cluster != "" {
callbackCh <- fmt.Errorf("got clusterName: %+v, want empty clusterName", su.Cluster)
callbackCh.Send(fmt.Errorf("got clusterName: %+v, want empty clusterName", su.Cluster))
return
}
if err == nil {
callbackCh <- errors.New("xdsClient.WatchService returned error non-nil error")
callbackCh.Send(errors.New("xdsClient.WatchService returned error non-nil error"))
return
}
callbackCh <- nil
callbackCh.Send(nil)
})
defer cancelWatch()
t.Log("Registered a watcher for service updates...")
// Wait for one request from the client, but send no reponses.
<-fakeServer.RequestChan
timer := time.NewTimer(2 * time.Second)
select {
case <-timer.C:
t.Fatal("Timeout when expecting a service update")
case err := <-callbackCh:
timer.Stop()
if err != nil {
t.Fatal(err)
}
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS request")
}
waitForNilErr(t, callbackCh)
}
// TestWatchServiceEmptyRDS tests the case where the underlying
// v2Client receives an empty RDS response.
func TestWatchServiceEmptyRDS(t *testing.T) {
fakeServer, cleanup := fakexds.StartServer(t)
fakeServer, cleanup, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
defer cleanup()
xdsClient, err := New(clientOpts(fakeServer.Address))
@ -215,49 +216,48 @@ func TestWatchServiceEmptyRDS(t *testing.T) {
t.Log("Created an xdsClient...")
oldWatchExpiryTimeout := defaultWatchExpiryTimeout
defaultWatchExpiryTimeout = 1 * time.Second
defaultWatchExpiryTimeout = 500 * time.Millisecond
defer func() {
defaultWatchExpiryTimeout = oldWatchExpiryTimeout
}()
callbackCh := make(chan error, 1)
callbackCh := testutils.NewChannel()
cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) {
if su.Cluster != "" {
callbackCh <- fmt.Errorf("got clusterName: %+v, want empty clusterName", su.Cluster)
callbackCh.Send(fmt.Errorf("got clusterName: %+v, want empty clusterName", su.Cluster))
return
}
if err == nil {
callbackCh <- errors.New("xdsClient.WatchService returned error non-nil error")
callbackCh.Send(errors.New("xdsClient.WatchService returned error non-nil error"))
return
}
callbackCh <- nil
callbackCh.Send(nil)
})
defer cancelWatch()
t.Log("Registered a watcher for service updates...")
// Send a good LDS response, but send an empty RDS response.
<-fakeServer.RequestChan
fakeServer.ResponseChan <- &fakexds.Response{Resp: goodLDSResponse1}
<-fakeServer.RequestChan
fakeServer.ResponseChan <- &fakexds.Response{Resp: noVirtualHostsInRDSResponse}
timer := time.NewTimer(2 * time.Second)
select {
case <-timer.C:
t.Fatal("Timeout when expecting a service update")
case err := <-callbackCh:
timer.Stop()
if err != nil {
t.Fatal(err)
}
// Make the fakeServer send LDS response.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS request")
}
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1}
// Make the fakeServer send an empty RDS response.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an RDS request")
}
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: noVirtualHostsInRDSResponse}
waitForNilErr(t, callbackCh)
}
// TestWatchServiceWithClientClose tests the case where xDS responses are
// received after the client is closed, and we make sure that the registered
// watcher callback is not invoked.
func TestWatchServiceWithClientClose(t *testing.T) {
fakeServer, cleanup := fakexds.StartServer(t)
fakeServer, cleanup, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
defer cleanup()
xdsClient, err := New(clientOpts(fakeServer.Address))
@ -267,29 +267,26 @@ func TestWatchServiceWithClientClose(t *testing.T) {
defer xdsClient.Close()
t.Log("Created an xdsClient...")
callbackCh := make(chan error, 1)
callbackCh := testutils.NewChannel()
cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) {
callbackCh <- errors.New("watcher callback invoked after client close")
callbackCh.Send(errors.New("watcher callback invoked after client close"))
})
defer cancelWatch()
t.Log("Registered a watcher for service updates...")
// Make the fakeServer send LDS response.
<-fakeServer.RequestChan
fakeServer.ResponseChan <- &fakexds.Response{Resp: goodLDSResponse1}
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS request")
}
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1}
xdsClient.Close()
t.Log("Closing the xdsClient...")
// Push an RDS response from the fakeserver
fakeServer.ResponseChan <- &fakexds.Response{Resp: goodRDSResponse1}
timer := time.NewTimer(1 * time.Second)
select {
case <-timer.C:
// Do nothing. Success.
case err := <-callbackCh:
timer.Stop()
t.Fatal(err)
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodRDSResponse1}
if cbErr, err := callbackCh.Receive(); err != testutils.ErrRecvTimeout {
t.Fatal(cbErr)
}
}

View File

@ -29,7 +29,7 @@ import (
anypb "github.com/golang/protobuf/ptypes/any"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/client/fakexds"
"google.golang.org/grpc/xds/internal/testutils"
)
func TestEDSParseRespProto(t *testing.T) {
@ -162,13 +162,10 @@ var (
)
func TestEDSHandleResponse(t *testing.T) {
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
tests := []struct {
@ -235,7 +232,7 @@ func TestEDSHandleResponse(t *testing.T) {
wantUpdateErr: test.wantUpdateErr,
edsWatch: v2c.watchEDS,
watchReqChan: fakeServer.RequestChan,
watchReqChan: fakeServer.XDSRequestChan,
handleXDSResp: v2c.handleEDSResponse,
})
})
@ -245,13 +242,10 @@ func TestEDSHandleResponse(t *testing.T) {
// TestEDSHandleResponseWithoutWatch tests the case where the v2Client
// receives an EDS response without a registered EDS watcher.
func TestEDSHandleResponseWithoutWatch(t *testing.T) {
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
_, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
if v2c.handleEDSResponse(goodEDSResponse1) == nil {
@ -261,42 +255,34 @@ func TestEDSHandleResponseWithoutWatch(t *testing.T) {
func TestEDSWatchExpiryTimer(t *testing.T) {
oldWatchExpiryTimeout := defaultWatchExpiryTimeout
defaultWatchExpiryTimeout = 1 * time.Second
defaultWatchExpiryTimeout = 500 * time.Millisecond
defer func() {
defaultWatchExpiryTimeout = oldWatchExpiryTimeout
}()
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
edsCallbackCh := make(chan error, 1)
callbackCh := testutils.NewChannel()
v2c.watchEDS(goodRouteName1, func(u *EDSUpdate, err error) {
t.Logf("Received callback with edsUpdate {%+v} and error {%v}", u, err)
if u != nil {
edsCallbackCh <- fmt.Errorf("received EDSUpdate %v in edsCallback, wanted nil", u)
callbackCh.Send(fmt.Errorf("received EDSUpdate %v in edsCallback, wanted nil", u))
}
if err == nil {
edsCallbackCh <- errors.New("received nil error in edsCallback")
callbackCh.Send(errors.New("received nil error in edsCallback"))
}
edsCallbackCh <- nil
callbackCh.Send(nil)
})
<-fakeServer.RequestChan
timer := time.NewTimer(2 * time.Second)
select {
case <-timer.C:
t.Fatalf("Timeout expired when expecting EDS update")
case err := <-edsCallbackCh:
timer.Stop()
if err != nil {
t.Fatal(err)
}
// Wait till the request makes it to the fakeServer. This ensures that
// the watch request has been processed by the v2Client.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an CDS request")
}
waitForNilErr(t, callbackCh)
}

View File

@ -1,164 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package fakexds provides a very basic fake implementation of the xDS server
// for unit testing purposes.
package fakexds
import (
"context"
"net"
"testing"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
discoverypb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
)
// TODO: Make this a var or a field in the server if there is a need to use a
// value other than this default.
const defaultChannelBufferSize = 50
// Request wraps an xDS request and error.
type Request struct {
Req *discoverypb.DiscoveryRequest
Err error
}
// Response wraps an xDS response and error.
type Response struct {
Resp *discoverypb.DiscoveryResponse
Err error
}
// Server is a very basic implementation of a fake xDS server. It provides a
// request and response channel for the user to control the requests that are
// expected and the responses that needs to be sent out.
type Server struct {
// RequestChan is a buffered channel on which the fake server writes the
// received requests onto.
RequestChan chan *Request
// ResponseChan is a buffered channel from which the fake server reads the
// responses that it must send out to the client.
ResponseChan chan *Response
// Address is the host:port on which the fake xdsServer is listening on.
Address string
// LRS is the LRS server installed.
LRS *LRSServer
}
// StartServer starts a fakexds.Server. The returned function should be invoked
// by the caller once the test is done.
func StartServer(t *testing.T) (*Server, func()) {
t.Helper()
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("net.Listen() failed: %v", err)
}
server := grpc.NewServer()
lrss := newLRSServer()
lrsgrpc.RegisterLoadReportingServiceServer(server, lrss)
fs := &Server{
RequestChan: make(chan *Request, defaultChannelBufferSize),
ResponseChan: make(chan *Response, defaultChannelBufferSize),
Address: lis.Addr().String(),
LRS: lrss,
}
adsgrpc.RegisterAggregatedDiscoveryServiceServer(server, fs)
go server.Serve(lis)
t.Logf("Starting fake xDS server at %v...", fs.Address)
return fs, func() { server.Stop() }
}
// GetClientConn returns a grpc.ClientConn talking to the fake server. The
// returned function should be invoked by the caller once the test is done.
func (fs *Server) GetClientConn(t *testing.T) (*grpc.ClientConn, func()) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, fs.Address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
t.Fatalf("grpc.DialContext(%s) failed: %v", fs.Address, err)
}
t.Log("Started xDS gRPC client...")
return cc, func() {
cc.Close()
}
}
// StreamAggregatedResources is the fake implementation to handle an ADS
// stream.
func (fs *Server) StreamAggregatedResources(s adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
errCh := make(chan error, 2)
go func() {
for {
req, err := s.Recv()
if err != nil {
errCh <- err
return
}
fs.RequestChan <- &Request{req, err}
}
}()
go func() {
var retErr error
defer func() {
errCh <- retErr
}()
for {
select {
case r := <-fs.ResponseChan:
if r.Err != nil {
retErr = r.Err
return
}
if err := s.Send(r.Resp); err != nil {
retErr = err
return
}
case <-s.Context().Done():
retErr = s.Context().Err()
return
}
}
}()
if err := <-errCh; err != nil {
return err
}
return nil
}
// DeltaAggregatedResources helps implement the ADS service.
func (fs *Server) DeltaAggregatedResources(adsgrpc.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
return status.Error(codes.Unimplemented, "")
}

View File

@ -1,104 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fakexds
import (
"io"
"sync"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
"github.com/golang/protobuf/proto"
durationpb "github.com/golang/protobuf/ptypes/duration"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// LRSServer implements the LRS service, and is to be installed on the fakexds
// server. It collects load reports, and returned them later for comparison.
type LRSServer struct {
// ReportingInterval will be sent in the first response to control reporting
// interval.
ReportingInterval *durationpb.Duration
// ExpectedEDSClusterName is checked against the first LRS request. The RPC
// is failed if they don't match.
ExpectedEDSClusterName string
mu sync.Mutex
dropTotal uint64
drops map[string]uint64
}
func newLRSServer() *LRSServer {
return &LRSServer{
drops: make(map[string]uint64),
ReportingInterval: &durationpb.Duration{
Seconds: 60 * 60, // 1 hour, each test can override this to a shorter duration.
},
}
}
// StreamLoadStats implements LRS service.
func (lrss *LRSServer) StreamLoadStats(stream lrsgrpc.LoadReportingService_StreamLoadStatsServer) error {
req, err := stream.Recv()
if err != nil {
return err
}
wantReq := &lrspb.LoadStatsRequest{
ClusterStats: []*endpointpb.ClusterStats{{
ClusterName: lrss.ExpectedEDSClusterName,
}},
Node: &corepb.Node{},
}
if !proto.Equal(req, wantReq) {
return status.Errorf(codes.FailedPrecondition, "unexpected req: %+v, want %+v, diff: %s", req, wantReq, cmp.Diff(req, wantReq, cmp.Comparer(proto.Equal)))
}
if err := stream.Send(&lrspb.LoadStatsResponse{
Clusters: []string{lrss.ExpectedEDSClusterName},
LoadReportingInterval: lrss.ReportingInterval,
}); err != nil {
return err
}
for {
req, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
stats := req.ClusterStats[0]
lrss.mu.Lock()
lrss.dropTotal += stats.TotalDroppedRequests
for _, d := range stats.DroppedRequests {
lrss.drops[d.Category] += d.DroppedCount
}
lrss.mu.Unlock()
}
}
// GetDrops returns the drops reported to this server.
func (lrss *LRSServer) GetDrops() map[string]uint64 {
lrss.mu.Lock()
defer lrss.mu.Unlock()
return lrss.drops
}

View File

@ -25,7 +25,7 @@ import (
"time"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"google.golang.org/grpc/xds/internal/client/fakexds"
"google.golang.org/grpc/xds/internal/testutils"
)
func TestLDSGetRouteConfig(t *testing.T) {
@ -90,13 +90,10 @@ func TestLDSGetRouteConfig(t *testing.T) {
// and creates a v2Client using it. Then, it registers a watchLDS and tests
// different LDS responses.
func TestLDSHandleResponse(t *testing.T) {
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
tests := []struct {
@ -186,7 +183,7 @@ func TestLDSHandleResponse(t *testing.T) {
wantUpdateErr: test.wantUpdateErr,
ldsWatch: v2c.watchLDS,
watchReqChan: fakeServer.RequestChan,
watchReqChan: fakeServer.XDSRequestChan,
handleXDSResp: v2c.handleLDSResponse,
})
})
@ -196,13 +193,10 @@ func TestLDSHandleResponse(t *testing.T) {
// TestLDSHandleResponseWithoutWatch tests the case where the v2Client receives
// an LDS response without a registered watcher.
func TestLDSHandleResponseWithoutWatch(t *testing.T) {
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
_, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
if v2c.handleLDSResponse(goodLDSResponse1) == nil {
@ -215,43 +209,33 @@ func TestLDSHandleResponseWithoutWatch(t *testing.T) {
// to be invoked with an error once the watchExpiryTimer fires.
func TestLDSWatchExpiryTimer(t *testing.T) {
oldWatchExpiryTimeout := defaultWatchExpiryTimeout
defaultWatchExpiryTimeout = 1 * time.Second
defaultWatchExpiryTimeout = 500 * time.Millisecond
defer func() {
defaultWatchExpiryTimeout = oldWatchExpiryTimeout
}()
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
// Wait till the request makes it to the fakeServer. This ensures that
// the watch request has been processed by the v2Client.
callbackCh := make(chan error, 1)
callbackCh := testutils.NewChannel()
v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) {
t.Logf("in v2c.watchLDS callback, ldsUpdate: %+v, err: %v", u, err)
if u.routeName != "" {
callbackCh <- fmt.Errorf("received routeName %v in ldsCallback, wanted empty string", u.routeName)
callbackCh.Send(fmt.Errorf("received routeName %v in ldsCallback, wanted empty string", u.routeName))
}
if err == nil {
callbackCh <- errors.New("received nil error in ldsCallback")
callbackCh.Send(errors.New("received nil error in ldsCallback"))
}
callbackCh <- nil
callbackCh.Send(nil)
})
<-fakeServer.RequestChan
timer := time.NewTimer(2 * time.Second)
select {
case <-timer.C:
t.Fatalf("Timeout expired when expecting LDS update")
case err := <-callbackCh:
timer.Stop()
if err != nil {
t.Fatal(err)
}
// Wait till the request makes it to the fakeServer. This ensures that
// the watch request has been processed by the v2Client.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS request")
}
waitForNilErr(t, callbackCh)
}

View File

@ -25,9 +25,11 @@ import (
"testing"
"time"
discoverypb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
routepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
"google.golang.org/grpc/xds/internal/client/fakexds"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
)
func (v2c *v2Client) cloneRDSCacheForTesting() map[string]string {
@ -165,34 +167,36 @@ func TestRDSGetClusterFromRouteConfiguration(t *testing.T) {
// This is called by RDS tests to start LDS first, because LDS is a
// pre-requirement for RDS, and RDS handle would fail without an existing LDS
// watch.
func doLDS(t *testing.T, v2c *v2Client, fakeServer *fakexds.Server) {
func doLDS(t *testing.T, v2c *v2Client, fakeServer *fakeserver.Server) {
// Register an LDS watcher, and wait till the request is sent out, the
// response is received and the callback is invoked.
cbCh := make(chan error, 1)
cbCh := testutils.NewChannel()
v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) {
t.Logf("v2c.watchLDS callback, ldsUpdate: %+v, err: %v", u, err)
cbCh <- err
cbCh.Send(err)
})
<-fakeServer.RequestChan
fakeServer.ResponseChan <- &fakexds.Response{Resp: goodLDSResponse1}
if err := <-cbCh; err != nil {
t.Fatalf("v2c.watchLDS returned error in callback: %v", err)
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout waiting for LDS request: %v", err)
}
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1}
waitForNilErr(t, cbCh)
// Read the LDS ack, to clear RequestChan for following tests.
<-fakeServer.RequestChan
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout waiting for LDS ACK: %v", err)
}
}
// TestRDSHandleResponse starts a fake xDS server, makes a ClientConn to it,
// and creates a v2Client using it. Then, it registers an LDS and RDS watcher
// and tests different RDS responses.
func TestRDSHandleResponse(t *testing.T) {
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
doLDS(t, v2c, fakeServer)
@ -256,7 +260,7 @@ func TestRDSHandleResponse(t *testing.T) {
wantUpdateErr: test.wantUpdateErr,
rdsWatch: v2c.watchRDS,
watchReqChan: fakeServer.RequestChan,
watchReqChan: fakeServer.XDSRequestChan,
handleXDSResp: v2c.handleRDSResponse,
})
})
@ -266,13 +270,10 @@ func TestRDSHandleResponse(t *testing.T) {
// TestRDSHandleResponseWithoutLDSWatch tests the case where the v2Client
// receives an RDS response without a registered LDS watcher.
func TestRDSHandleResponseWithoutLDSWatch(t *testing.T) {
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
_, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
if v2c.handleRDSResponse(goodRDSResponse1) == nil {
@ -283,13 +284,10 @@ func TestRDSHandleResponseWithoutLDSWatch(t *testing.T) {
// TestRDSHandleResponseWithoutRDSWatch tests the case where the v2Client
// receives an RDS response without a registered RDS watcher.
func TestRDSHandleResponseWithoutRDSWatch(t *testing.T) {
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
doLDS(t, v2c, fakeServer)
@ -304,7 +302,7 @@ type rdsTestOp struct {
// target is the resource name to watch for.
target string
// responseToSend is the xDS response sent to the client
responseToSend *fakexds.Response
responseToSend *fakeserver.Response
// wantOpErr specfies whether the main operation should return an error.
wantOpErr bool
// wantRDSCache is the expected rdsCache at the end of an operation.
@ -318,16 +316,13 @@ type rdsTestOp struct {
// pushes a good LDS response. It then reads a bunch of test operations to be
// performed from rdsTestOps and returns error, if any, on the provided error
// channel. This is executed in a separate goroutine.
func testRDSCaching(t *testing.T, rdsTestOps []rdsTestOp, errCh chan error) {
func testRDSCaching(t *testing.T, rdsTestOps []rdsTestOp, errCh *testutils.Channel) {
t.Helper()
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
doLDS(t, v2c, fakeServer)
@ -345,15 +340,18 @@ func testRDSCaching(t *testing.T, rdsTestOps []rdsTestOp, errCh chan error) {
// Wait till the request makes it to the fakeServer. This ensures that
// the watch request has been processed by the v2Client.
<-fakeServer.RequestChan
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
errCh.Send(fmt.Errorf("Timeout waiting for RDS request: %v", err))
}
t.Log("FakeServer received request...")
}
// Directly push the response through a call to handleRDSResponse,
// thereby bypassing the fakeServer.
if rdsTestOp.responseToSend != nil {
if err := v2c.handleRDSResponse(rdsTestOp.responseToSend.Resp); (err != nil) != rdsTestOp.wantOpErr {
errCh <- fmt.Errorf("v2c.handleRDSResponse() returned err: %v", err)
resp := rdsTestOp.responseToSend.Resp.(*discoverypb.DiscoveryResponse)
if err := v2c.handleRDSResponse(resp); (err != nil) != rdsTestOp.wantOpErr {
errCh.Send(fmt.Errorf("v2c.handleRDSResponse(%+v) returned err: %v", resp, err))
return
}
}
@ -366,24 +364,23 @@ func testRDSCaching(t *testing.T, rdsTestOps []rdsTestOp, errCh chan error) {
}
if !reflect.DeepEqual(v2c.cloneRDSCacheForTesting(), rdsTestOp.wantRDSCache) {
errCh <- fmt.Errorf("gotRDSCache: %v, wantRDSCache: %v", v2c.rdsCache, rdsTestOp.wantRDSCache)
errCh.Send(fmt.Errorf("gotRDSCache: %v, wantRDSCache: %v", v2c.rdsCache, rdsTestOp.wantRDSCache))
return
}
}
t.Log("Completed all test ops successfully...")
errCh <- nil
errCh.Send(nil)
}
// TestRDSCaching tests some end-to-end RDS flows using a fake xDS server, and
// verifies the RDS data cached at the v2Client.
func TestRDSCaching(t *testing.T) {
errCh := make(chan error, 1)
ops := []rdsTestOp{
// Add an RDS watch for a resource name (goodRouteName1), which returns one
// matching resource in the response.
{
target: goodRouteName1,
responseToSend: &fakexds.Response{Resp: goodRDSResponse1},
responseToSend: &fakeserver.Response{Resp: goodRDSResponse1},
wantRDSCache: map[string]string{goodRouteName1: goodClusterName1},
wantWatchCallback: true,
},
@ -392,7 +389,7 @@ func TestRDSCaching(t *testing.T) {
// routeConfigName does not match our RDS watch (so the watch callback will
// not be invoked). But this should still be cached.
{
responseToSend: &fakexds.Response{Resp: goodRDSResponse2},
responseToSend: &fakeserver.Response{Resp: goodRDSResponse2},
wantRDSCache: map[string]string{
goodRouteName1: goodClusterName1,
goodRouteName2: goodClusterName2,
@ -402,7 +399,7 @@ func TestRDSCaching(t *testing.T) {
// to return an error. But the watch callback should not be invoked, and
// the cache should not be updated.
{
responseToSend: &fakexds.Response{Resp: uninterestingRDSResponse},
responseToSend: &fakeserver.Response{Resp: uninterestingRDSResponse},
wantOpErr: true,
wantRDSCache: map[string]string{
goodRouteName1: goodClusterName1,
@ -421,18 +418,9 @@ func TestRDSCaching(t *testing.T) {
wantWatchCallback: true,
},
}
errCh := testutils.NewChannel()
go testRDSCaching(t, ops, errCh)
timer := time.NewTimer(defaultTestTimeout)
select {
case <-timer.C:
t.Fatal("Timeout when expecting RDS update")
case err := <-errCh:
timer.Stop()
if err != nil {
t.Fatal(err)
}
}
waitForNilErr(t, errCh)
}
// TestRDSWatchExpiryTimer tests the case where the client does not receive an
@ -440,47 +428,37 @@ func TestRDSCaching(t *testing.T) {
// to be invoked with an error once the watchExpiryTimer fires.
func TestRDSWatchExpiryTimer(t *testing.T) {
oldWatchExpiryTimeout := defaultWatchExpiryTimeout
defaultWatchExpiryTimeout = 1 * time.Second
defaultWatchExpiryTimeout = 500 * time.Millisecond
defer func() {
defaultWatchExpiryTimeout = oldWatchExpiryTimeout
}()
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
doLDS(t, v2c, fakeServer)
rdsCallbackCh := make(chan error, 1)
callbackCh := testutils.NewChannel()
v2c.watchRDS(goodRouteName1, func(u rdsUpdate, err error) {
t.Logf("Received callback with rdsUpdate {%+v} and error {%v}", u, err)
if u.clusterName != "" {
rdsCallbackCh <- fmt.Errorf("received clusterName %v in rdsCallback, wanted empty string", u.clusterName)
callbackCh.Send(fmt.Errorf("received clusterName %v in rdsCallback, wanted empty string", u.clusterName))
}
if err == nil {
rdsCallbackCh <- errors.New("received nil error in rdsCallback")
callbackCh.Send(errors.New("received nil error in rdsCallback"))
}
rdsCallbackCh <- nil
callbackCh.Send(nil)
})
// Wait till the request makes it to the fakeServer. This ensures that
// the watch request has been processed by the v2Client.
<-fakeServer.RequestChan
timer := time.NewTimer(2 * defaultWatchExpiryTimeout)
select {
case <-timer.C:
t.Fatalf("Timeout expired when expecting RDS update")
case err := <-rdsCallbackCh:
timer.Stop()
if err != nil {
t.Fatal(err)
}
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an RDS request")
}
waitForNilErr(t, callbackCh)
}
func TestHostFromTarget(t *testing.T) {

View File

@ -20,11 +20,12 @@ package client
import (
"reflect"
"testing"
"time"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/xds/internal/client/fakexds"
"google.golang.org/grpc"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
)
type watchHandleTestcase struct {
@ -39,7 +40,7 @@ type watchHandleTestcase struct {
rdsWatch func(routeName string, rdsCb rdsCallback) (cancel func())
cdsWatch func(clusterName string, cdsCb cdsCallback) (cancel func())
edsWatch func(clusterName string, edsCb edsCallback) (cancel func())
watchReqChan chan *fakexds.Request // The request sent for watch will be sent to this channel.
watchReqChan *testutils.Channel // The request sent for watch will be sent to this channel.
handleXDSResp func(response *xdspb.DiscoveryResponse) error
}
@ -50,8 +51,11 @@ type watchHandleTestcase struct {
// handleXDSResp with responseToHandle (if it's set). It then compares the
// update received by watch callback with the expected results.
func testWatchHandle(t *testing.T, test *watchHandleTestcase) {
gotUpdateCh := make(chan interface{}, 1)
gotUpdateErrCh := make(chan error, 1)
type updateErr struct {
u interface{}
err error
}
gotUpdateCh := testutils.NewChannel()
var cancelWatch func()
// Register the watcher, this will also trigger the v2Client to send the xDS
@ -60,26 +64,22 @@ func testWatchHandle(t *testing.T, test *watchHandleTestcase) {
case test.ldsWatch != nil:
cancelWatch = test.ldsWatch(goodLDSTarget1, func(u ldsUpdate, err error) {
t.Logf("in v2c.watchLDS callback, ldsUpdate: %+v, err: %v", u, err)
gotUpdateCh <- u
gotUpdateErrCh <- err
gotUpdateCh.Send(updateErr{u, err})
})
case test.rdsWatch != nil:
cancelWatch = test.rdsWatch(goodRouteName1, func(u rdsUpdate, err error) {
t.Logf("in v2c.watchRDS callback, rdsUpdate: %+v, err: %v", u, err)
gotUpdateCh <- u
gotUpdateErrCh <- err
gotUpdateCh.Send(updateErr{u, err})
})
case test.cdsWatch != nil:
cancelWatch = test.cdsWatch(clusterName1, func(u CDSUpdate, err error) {
t.Logf("in v2c.watchCDS callback, cdsUpdate: %+v, err: %v", u, err)
gotUpdateCh <- u
gotUpdateErrCh <- err
gotUpdateCh.Send(updateErr{u, err})
})
case test.edsWatch != nil:
cancelWatch = test.edsWatch(goodEDSName, func(u *EDSUpdate, err error) {
t.Logf("in v2c.watchEDS callback, edsUpdate: %+v, err: %v", u, err)
gotUpdateCh <- *u
gotUpdateErrCh <- err
gotUpdateCh.Send(updateErr{*u, err})
})
default:
t.Fatalf("no watch() is set")
@ -88,9 +88,9 @@ func testWatchHandle(t *testing.T, test *watchHandleTestcase) {
// Wait till the request makes it to the fakeServer. This ensures that
// the watch request has been processed by the v2Client.
//
// TODO: switch to new fakexds server request channel with timed receive.
<-test.watchReqChan
if _, err := test.watchReqChan.Receive(); err != nil {
t.Fatalf("Timeout waiting for an xDS request: %v", err)
}
// Directly push the response through a call to handleXDSResp. This bypasses
// the fakeServer, so it's only testing the handle logic. Client response
@ -108,30 +108,62 @@ func testWatchHandle(t *testing.T, test *watchHandleTestcase) {
// Cannot directly compare test.wantUpdate with nil (typed vs non-typed nil:
// https://golang.org/doc/faq#nil_error).
if c := test.wantUpdate; c == nil || (reflect.ValueOf(c).Kind() == reflect.Ptr && reflect.ValueOf(c).IsNil()) {
timer := time.NewTimer(defaultTestTimeout)
select {
case <-timer.C:
update, err := gotUpdateCh.Receive()
if err == testutils.ErrRecvTimeout {
return
case <-gotUpdateCh:
t.Fatal("Unexpected update")
}
t.Fatalf("Unexpected update: +%v", update)
}
wantUpdate := reflect.ValueOf(test.wantUpdate).Elem().Interface()
timer := time.NewTimer(defaultTestTimeout)
select {
case <-timer.C:
t.Fatal("Timeout expecting RDS update")
case gotUpdate := <-gotUpdateCh:
timer.Stop()
if diff := cmp.Diff(gotUpdate, wantUpdate, cmp.AllowUnexported(rdsUpdate{}, ldsUpdate{}, CDSUpdate{}, EDSUpdate{})); diff != "" {
t.Fatalf("got update : %+v, want %+v, diff: %s", gotUpdate, wantUpdate, diff)
}
uErr, err := gotUpdateCh.Receive()
if err == testutils.ErrRecvTimeout {
t.Fatal("Timeout expecting xDS update")
}
// Since the callback that we registered pushes to both channels at
// the same time, this channel read should return immediately.
gotUpdateErr := <-gotUpdateErrCh
gotUpdate := uErr.(updateErr).u
opt := cmp.AllowUnexported(rdsUpdate{}, ldsUpdate{}, CDSUpdate{}, EDSUpdate{})
if diff := cmp.Diff(gotUpdate, wantUpdate, opt); diff != "" {
t.Fatalf("got update : %+v, want %+v, diff: %s", gotUpdate, wantUpdate, diff)
}
gotUpdateErr := uErr.(updateErr).err
if (gotUpdateErr != nil) != test.wantUpdateErr {
t.Fatalf("got RDS update error {%v}, wantErr: %v", gotUpdateErr, test.wantUpdateErr)
t.Fatalf("got xDS update error {%v}, wantErr: %v", gotUpdateErr, test.wantUpdateErr)
}
}
// startServerAndGetCC starts a fake XDS server and also returns a ClientConn
// connected to it.
func startServerAndGetCC(t *testing.T) (*fakeserver.Server, *grpc.ClientConn, func()) {
t.Helper()
fs, sCleanup, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
cc, ccCleanup, err := fs.XDSClientConn()
if err != nil {
sCleanup()
t.Fatalf("Failed to get a clientConn to the fake xDS server: %v", err)
}
return fs, cc, func() {
sCleanup()
ccCleanup()
}
}
// waitForNilErr waits for a nil error value to be received on the
// provided channel.
func waitForNilErr(t *testing.T, ch *testutils.Channel) {
t.Helper()
val, err := ch.Receive()
if err == testutils.ErrRecvTimeout {
t.Fatalf("Timeout expired when expecting update")
}
if val != nil {
if cbErr := val.(error); cbErr != nil {
t.Fatal(cbErr)
}
}
}

View File

@ -27,60 +27,121 @@ import (
"github.com/golang/protobuf/proto"
anypb "github.com/golang/protobuf/ptypes/any"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/xds/internal/client/fakexds"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
)
func emptyChanRecvWithTimeout(ch <-chan struct{}, d time.Duration) error {
timer := time.NewTimer(d)
select {
case <-timer.C:
return fmt.Errorf("timeout")
case <-ch:
timer.Stop()
return nil
}
}
func requestChanRecvWithTimeout(ch <-chan *fakexds.Request, d time.Duration) (*fakexds.Request, error) {
timer := time.NewTimer(d)
select {
case <-timer.C:
return nil, fmt.Errorf("timeout waiting for request")
case r := <-ch:
timer.Stop()
return r, nil
}
}
// compareXDSRequest reads requests from channel, compare it with want.
func compareXDSRequest(ch <-chan *fakexds.Request, d time.Duration, want *xdspb.DiscoveryRequest, version, nonce string) error {
r, err := requestChanRecvWithTimeout(ch, d)
func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, version, nonce string) error {
val, err := ch.Receive()
if err != nil {
return err
}
if r.Err != nil {
return fmt.Errorf("unexpected error from request: %v", r.Err)
req := val.(*fakeserver.Request)
if req.Err != nil {
return fmt.Errorf("unexpected error from request: %v", req.Err)
}
wantClone := proto.Clone(want).(*xdspb.DiscoveryRequest)
wantClone.VersionInfo = version
wantClone.ResponseNonce = nonce
if !cmp.Equal(r.Req, wantClone, cmp.Comparer(proto.Equal)) {
return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(r.Req, wantClone))
if !cmp.Equal(req.Req, wantClone, cmp.Comparer(proto.Equal)) {
return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(req.Req, wantClone))
}
return nil
}
func sendXDSRespWithVersion(ch chan<- *fakexds.Response, respWithoutVersion *xdspb.DiscoveryResponse, version int) (nonce string) {
func sendXDSRespWithVersion(ch chan<- *fakeserver.Response, respWithoutVersion *xdspb.DiscoveryResponse, version int) (nonce string) {
respToSend := proto.Clone(respWithoutVersion).(*xdspb.DiscoveryResponse)
respToSend.VersionInfo = strconv.Itoa(version)
nonce = strconv.Itoa(int(time.Now().UnixNano()))
respToSend.Nonce = nonce
ch <- &fakexds.Response{Resp: respToSend}
ch <- &fakeserver.Response{Resp: respToSend}
return
}
// TestV2ClientAck verifies that valid responses are acked, and invalid ones are
// nacked.
// startXDS calls watch to send the first request. It then sends a good response
// and checks for ack.
func startXDS(t *testing.T, xdsname string, v2c *v2Client, reqChan *testutils.Channel, req *xdspb.DiscoveryRequest) *testutils.Channel {
callbackCh := testutils.NewChannel()
switch xdsname {
case "LDS":
v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err)
callbackCh.Send(struct{}{})
})
case "RDS":
v2c.watchRDS(goodRouteName1, func(u rdsUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err)
callbackCh.Send(struct{}{})
})
case "CDS":
v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err)
callbackCh.Send(struct{}{})
})
case "EDS":
v2c.watchEDS(goodEDSName, func(u *EDSUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err)
callbackCh.Send(struct{}{})
})
}
if err := compareXDSRequest(reqChan, req, "", ""); err != nil {
t.Fatalf("Failed to receive %s request: %v", xdsname, err)
}
t.Logf("FakeServer received %s request...", xdsname)
return callbackCh
}
// sendGoodResp sends the good response, with the given version, and a random
// nonce.
//
// It also waits and checks that the ack request contains the given version, and
// the generated nonce.
func sendGoodResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, version int, goodResp *xdspb.DiscoveryResponse, wantReq *xdspb.DiscoveryRequest, callbackCh *testutils.Channel) {
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodResp, version)
t.Logf("Good %s response pushed to fakeServer...", xdsname)
if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(version), nonce); err != nil {
t.Errorf("Failed to receive %s request: %v", xdsname, err)
}
t.Logf("Good %s response acked", xdsname)
if _, err := callbackCh.Receive(); err != nil {
t.Errorf("Timeout when expecting %s update", xdsname)
}
t.Logf("Good %s response callback executed", xdsname)
}
// sendBadResp sends a bad response with the given version. This response will
// be nacked, so we expect a request with the previous version (version-1).
//
// But the nonce in request should be the new nonce.
func sendBadResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, version int, wantReq *xdspb.DiscoveryRequest) {
var typeURL string
switch xdsname {
case "LDS":
typeURL = ldsURL
case "RDS":
typeURL = rdsURL
case "CDS":
typeURL = cdsURL
case "EDS":
typeURL = edsURL
}
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{{}},
TypeUrl: typeURL,
}, version)
t.Logf("Bad %s response pushed to fakeServer...", xdsname)
if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(version-1), nonce); err != nil {
t.Errorf("Failed to receive %s request: %v", xdsname, err)
}
t.Logf("Bad %s response nacked", xdsname)
}
// TestV2ClientAck verifies that valid responses are acked, and invalid ones
// are nacked.
//
// This test also verifies the version for different types are independent.
func TestV2ClientAck(t *testing.T) {
@ -91,27 +152,23 @@ func TestV2ClientAck(t *testing.T) {
versionEDS = 4000
)
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
// Start the watch, send a good response, and check for ack.
cbLDS := startXDS(t, "LDS", v2c, fakeServer, goodLDSRequest)
cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest)
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
versionLDS++
cbRDS := startXDS(t, "RDS", v2c, fakeServer, goodRDSRequest)
cbRDS := startXDS(t, "RDS", v2c, fakeServer.XDSRequestChan, goodRDSRequest)
sendGoodResp(t, "RDS", fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS)
versionRDS++
cbCDS := startXDS(t, "CDS", v2c, fakeServer, goodCDSRequest)
cbCDS := startXDS(t, "CDS", v2c, fakeServer.XDSRequestChan, goodCDSRequest)
sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS)
versionCDS++
cbEDS := startXDS(t, "EDS", v2c, fakeServer, goodEDSRequest)
cbEDS := startXDS(t, "EDS", v2c, fakeServer.XDSRequestChan, goodEDSRequest)
sendGoodResp(t, "EDS", fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS)
versionEDS++
@ -136,105 +193,21 @@ func TestV2ClientAck(t *testing.T) {
versionEDS++
}
// startXDS calls watch to send the first request. It then sends a good response
// and checks for ack.
func startXDS(t *testing.T, xdsname string, v2c *v2Client, fakeServer *fakexds.Server, goodReq *xdspb.DiscoveryRequest) <-chan struct{} {
callbackCh := make(chan struct{}, 1)
switch xdsname {
case "LDS":
v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err)
callbackCh <- struct{}{}
})
case "RDS":
v2c.watchRDS(goodRouteName1, func(u rdsUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err)
callbackCh <- struct{}{}
})
case "CDS":
v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err)
callbackCh <- struct{}{}
})
case "EDS":
v2c.watchEDS(goodEDSName, func(u *EDSUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err)
callbackCh <- struct{}{}
})
}
if err := compareXDSRequest(fakeServer.RequestChan, defaultTestTimeout, goodReq, "", ""); err != nil {
t.Fatalf("Failed to receive %s request: %v", xdsname, err)
}
t.Logf("FakeServer received %s request...", xdsname)
return callbackCh
}
// sendGoodResp sends the good response, with the given version, and a random
// nonce.
//
// It also waits and checks that the ack request contains the given version, and
// the generated nonce.
func sendGoodResp(t *testing.T, xdsname string, fakeServer *fakexds.Server, version int, goodResp *xdspb.DiscoveryResponse, wantReq *xdspb.DiscoveryRequest, callbackCh <-chan struct{}) {
nonce := sendXDSRespWithVersion(fakeServer.ResponseChan, goodResp, version)
t.Logf("Good %s response pushed to fakeServer...", xdsname)
if err := compareXDSRequest(fakeServer.RequestChan, defaultTestTimeout, wantReq, strconv.Itoa(version), nonce); err != nil {
t.Errorf("Failed to receive %s request: %v", xdsname, err)
}
t.Logf("Good %s response acked", xdsname)
if err := emptyChanRecvWithTimeout(callbackCh, defaultTestTimeout); err != nil {
t.Errorf("Timeout when expecting %s update", xdsname)
}
t.Logf("Good %s response callback executed", xdsname)
}
// sendBadResp sends a bad response with the given version. This response will
// be nacked, so we expect a request with the previous version (version-1).
//
// But the nonce in request should be the new nonce.
func sendBadResp(t *testing.T, xdsname string, fakeServer *fakexds.Server, version int, wantReq *xdspb.DiscoveryRequest) {
var typeURL string
switch xdsname {
case "LDS":
typeURL = ldsURL
case "RDS":
typeURL = rdsURL
case "CDS":
typeURL = cdsURL
case "EDS":
typeURL = edsURL
}
nonce := sendXDSRespWithVersion(fakeServer.ResponseChan, &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{{}},
TypeUrl: typeURL,
}, version)
t.Logf("Bad %s response pushed to fakeServer...", xdsname)
if err := compareXDSRequest(fakeServer.RequestChan, defaultTestTimeout, wantReq, strconv.Itoa(version-1), nonce); err != nil {
t.Errorf("Failed to receive %s request: %v", xdsname, err)
}
t.Logf("Bad %s response nacked", xdsname)
}
// Test when the first response is invalid, and is nacked, the nack requests
// should have an empty version string.
func TestV2ClientAckFirstIsNack(t *testing.T) {
var versionLDS = 1000
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
// Start the watch, send a good response, and check for ack.
cbLDS := startXDS(t, "LDS", v2c, fakeServer, goodLDSRequest)
cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest)
nonce := sendXDSRespWithVersion(fakeServer.ResponseChan, &xdspb.DiscoveryResponse{
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{{}},
TypeUrl: ldsURL,
}, versionLDS)
@ -242,7 +215,7 @@ func TestV2ClientAckFirstIsNack(t *testing.T) {
// The expected version string is an empty string, because this is the first
// response, and it's nacked (so there's no previous ack version).
if err := compareXDSRequest(fakeServer.RequestChan, defaultTestTimeout, goodLDSRequest, "", nonce); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, "", nonce); err != nil {
t.Errorf("Failed to receive request: %v", err)
}
t.Logf("Bad response nacked")
@ -257,33 +230,29 @@ func TestV2ClientAckFirstIsNack(t *testing.T) {
func TestV2ClientAckNackAfterNewWatch(t *testing.T) {
var versionLDS = 1000
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
// Start the watch, send a good response, and check for ack.
cbLDS := startXDS(t, "LDS", v2c, fakeServer, goodLDSRequest)
cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest)
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
versionLDS++
// Start a new watch.
cbLDS = startXDS(t, "LDS", v2c, fakeServer, goodLDSRequest)
cbLDS = startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest)
// This is an invalid response after the new watch.
nonce := sendXDSRespWithVersion(fakeServer.ResponseChan, &xdspb.DiscoveryResponse{
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{{}},
TypeUrl: ldsURL,
}, versionLDS)
t.Logf("Bad response pushed to fakeServer...")
// The expected version string is the previous acked version.
if err := compareXDSRequest(fakeServer.RequestChan, defaultTestTimeout, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce); err != nil {
t.Errorf("Failed to receive request: %v", err)
}
t.Logf("Bad response nacked")

View File

@ -24,7 +24,8 @@ import (
"time"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/xds/internal/client/fakexds"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
basepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
@ -36,7 +37,7 @@ import (
)
const (
defaultTestTimeout = 2 * time.Second
defaultTestTimeout = 1 * time.Second
goodLDSTarget1 = "lds.target.good:1111"
goodLDSTarget2 = "lds.target.good:2222"
goodRouteName1 = "GoodRouteConfig1"
@ -390,12 +391,8 @@ var (
// TestV2ClientBackoffAfterRecvError verifies if the v2Client backoffs when it
// encounters a Recv error while receiving an LDS response.
func TestV2ClientBackoffAfterRecvError(t *testing.T) {
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
// Override the v2Client backoff function with this, so that we can verify
// that a backoff actually was triggerred.
@ -405,7 +402,7 @@ func TestV2ClientBackoffAfterRecvError(t *testing.T) {
return 0
}
v2c := newV2Client(client, goodNodeProto, clientBackoff)
v2c := newV2Client(cc, goodNodeProto, clientBackoff)
defer v2c.close()
t.Log("Started xds v2Client...")
@ -413,10 +410,12 @@ func TestV2ClientBackoffAfterRecvError(t *testing.T) {
v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) {
close(callbackCh)
})
<-fakeServer.RequestChan
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS request")
}
t.Log("FakeServer received request...")
fakeServer.ResponseChan <- &fakexds.Response{Err: errors.New("RPC error")}
fakeServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("RPC error")}
t.Log("Bad LDS response pushed to fakeServer...")
timer := time.NewTimer(defaultTestTimeout)
@ -435,95 +434,81 @@ func TestV2ClientBackoffAfterRecvError(t *testing.T) {
// encountered a Recv() error, and is expected to send out xDS requests for
// registered watchers once it comes back up again.
func TestV2ClientRetriesAfterBrokenStream(t *testing.T) {
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
callbackCh := make(chan struct{}, 1)
callbackCh := testutils.NewChannel()
v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) {
t.Logf("Received LDS callback with ldsUpdate {%+v} and error {%v}", u, err)
callbackCh <- struct{}{}
callbackCh.Send(struct{}{})
})
<-fakeServer.RequestChan
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS request")
}
t.Log("FakeServer received request...")
fakeServer.ResponseChan <- &fakexds.Response{Resp: goodLDSResponse1}
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1}
t.Log("Good LDS response pushed to fakeServer...")
timer := time.NewTimer(defaultTestTimeout)
select {
case <-timer.C:
if _, err := callbackCh.Receive(); err != nil {
t.Fatal("Timeout when expecting LDS update")
case <-callbackCh:
timer.Stop()
}
// Read the ack, so the next request is sent after stream re-creation.
<-fakeServer.RequestChan
fakeServer.ResponseChan <- &fakexds.Response{Err: errors.New("RPC error")}
// Read the ack, so the next request is sent after stream re-creation.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS ACK")
}
fakeServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("RPC error")}
t.Log("Bad LDS response pushed to fakeServer...")
timer = time.NewTimer(defaultTestTimeout)
select {
case <-timer.C:
t.Fatal("Timeout when expecting LDS update")
case gotRequest := <-fakeServer.RequestChan:
timer.Stop()
t.Log("received LDS request after stream re-creation")
if !proto.Equal(gotRequest.Req, goodLDSRequest) {
t.Fatalf("gotRequest: %+v, wantRequest: %+v", gotRequest.Req, goodLDSRequest)
}
val, err := fakeServer.XDSRequestChan.Receive()
if err == testutils.ErrRecvTimeout {
t.Fatalf("Timeout expired when expecting LDS update")
}
gotRequest := val.(*fakeserver.Request)
if !proto.Equal(gotRequest.Req, goodLDSRequest) {
t.Fatalf("gotRequest: %+v, wantRequest: %+v", gotRequest.Req, goodLDSRequest)
}
}
// TestV2ClientCancelWatch verifies that the registered watch callback is not
// invoked if a response is received after the watcher is cancelled.
func TestV2ClientCancelWatch(t *testing.T) {
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
callbackCh := make(chan struct{}, 1)
callbackCh := testutils.NewChannel()
cancelFunc := v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) {
t.Logf("Received LDS callback with ldsUpdate {%+v} and error {%v}", u, err)
callbackCh <- struct{}{}
callbackCh.Send(struct{}{})
})
<-fakeServer.RequestChan
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS request")
}
t.Log("FakeServer received request...")
fakeServer.ResponseChan <- &fakexds.Response{Resp: goodLDSResponse1}
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1}
t.Log("Good LDS response pushed to fakeServer...")
timer := time.NewTimer(defaultTestTimeout)
select {
case <-timer.C:
if _, err := callbackCh.Receive(); err != nil {
t.Fatal("Timeout when expecting LDS update")
case <-callbackCh:
timer.Stop()
}
cancelFunc()
fakeServer.ResponseChan <- &fakexds.Response{Resp: goodLDSResponse1}
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1}
t.Log("Another good LDS response pushed to fakeServer...")
timer = time.NewTimer(defaultTestTimeout)
select {
case <-timer.C:
case <-callbackCh:
timer.Stop()
if _, err := callbackCh.Receive(); err != testutils.ErrRecvTimeout {
t.Fatalf("Watch callback invoked after the watcher was cancelled")
}
}

View File

@ -34,7 +34,7 @@ import (
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakexds"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
)
@ -98,7 +98,7 @@ func getXDSClientMakerFunc(wantOpts xdsclient.Options) func(xdsclient.Options) (
if len(gotOpts.DialOpts) != len(wantOpts.DialOpts) {
return nil, fmt.Errorf("got len(DialOpts): %v, want: %v", len(gotOpts.DialOpts), len(wantOpts.DialOpts))
}
return fakexds.NewClient(), nil
return fakeclient.NewClient(), nil
}
}
@ -232,7 +232,7 @@ func testSetup(t *testing.T, opts setupOpts) (*xdsResolver, *testClientConn, fun
// waitForWatchService waits for the WatchService method to be called on the
// xdsClient within a reasonable amount of time, and also verifies that the
// watch is called with the expected target.
func waitForWatchService(t *testing.T, xdsC *fakexds.Client, wantTarget string) {
func waitForWatchService(t *testing.T, xdsC *fakeclient.Client, wantTarget string) {
t.Helper()
gotTarget, err := xdsC.WaitForWatchService()
@ -247,7 +247,7 @@ func waitForWatchService(t *testing.T, xdsC *fakexds.Client, wantTarget string)
// TestXDSResolverWatchCallbackAfterClose tests the case where a service update
// from the underlying xdsClient is received after the resolver is closed.
func TestXDSResolverWatchCallbackAfterClose(t *testing.T) {
xdsC := fakexds.NewClient()
xdsC := fakeclient.NewClient()
xdsR, tcc, cancel := testSetup(t, setupOpts{
config: &validConfig,
xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return xdsC, nil },
@ -268,7 +268,7 @@ func TestXDSResolverWatchCallbackAfterClose(t *testing.T) {
// TestXDSResolverBadServiceUpdate tests the case the xdsClient returns a bad
// service update.
func TestXDSResolverBadServiceUpdate(t *testing.T) {
xdsC := fakexds.NewClient()
xdsC := fakeclient.NewClient()
xdsR, tcc, cancel := testSetup(t, setupOpts{
config: &validConfig,
xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return xdsC, nil },
@ -292,7 +292,7 @@ func TestXDSResolverBadServiceUpdate(t *testing.T) {
// TestXDSResolverGoodServiceUpdate tests the happy case where the resolver
// gets a good service update from the xdsClient.
func TestXDSResolverGoodServiceUpdate(t *testing.T) {
xdsC := fakexds.NewClient()
xdsC := fakeclient.NewClient()
xdsR, tcc, cancel := testSetup(t, setupOpts{
config: &validConfig,
xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return xdsC, nil },

View File

@ -16,9 +16,8 @@
*
*/
// Package fakexds provides fake implementation of multiple types, for use in
// xds tests.
package fakexds
// Package fakeclient provides a fake implementation of an xDS client.
package fakeclient
import (
"sync"

View File

@ -16,12 +16,15 @@
*
*/
package fakexds
// Package fakeserver provides a fake implementation of an xDS server.
package fakeserver
import (
"context"
"fmt"
"io"
"net"
"time"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
@ -35,9 +38,12 @@ import (
lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
)
// TODO: Make this a var or a field in the server if there is a need to use a
// value other than this default.
const defaultChannelBufferSize = 50
const (
// TODO: Make this a var or a field in the server if there is a need to use a
// value other than this default.
defaultChannelBufferSize = 50
defaultDialTimeout = 5 * time.Second
)
// Request wraps the request protobuf (xds/LRS) and error received by the
// Server in a call to stream.Recv().
@ -104,6 +110,18 @@ func StartServer() (*Server, func(), error) {
return s, func() { server.Stop() }, nil
}
// XDSClientConn returns a grpc.ClientConn connected to the fakeServer.
func (xdsS *Server) XDSClientConn() (*grpc.ClientConn, func(), error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultDialTimeout)
defer cancel()
cc, err := grpc.DialContext(ctx, xdsS.Address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return nil, nil, fmt.Errorf("grpc.DialContext(%s) failed: %v", xdsS.Address, err)
}
return cc, func() { cc.Close() }, nil
}
type xdsServer struct {
reqChan *testutils.Channel
respChan chan *Response