diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index e9f290900..508e778f3 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -34,7 +34,7 @@ import ( xdsbalancer "google.golang.org/grpc/xds/internal/balancer" xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/testutils/fakexds" + "google.golang.org/grpc/xds/internal/testutils/fakeclient" ) const ( @@ -55,7 +55,7 @@ type cdsWatchInfo struct { // invokeWatchCb invokes the CDS watch callback registered by the cdsBalancer // and waits for appropriate state to be pushed to the provided edsBalancer. -func invokeWatchCbAndWait(xdsC *fakexds.Client, cdsW cdsWatchInfo, wantCCS balancer.ClientConnState, edsB *testEDSBalancer) error { +func invokeWatchCbAndWait(xdsC *fakeclient.Client, cdsW cdsWatchInfo, wantCCS balancer.ClientConnState, edsB *testEDSBalancer) error { xdsC.InvokeWatchClusterCallback(cdsW.update, cdsW.err) if cdsW.err != nil { return edsB.waitForResolverError(cdsW.err) @@ -224,10 +224,10 @@ func setup() (*cdsBalancer, *testEDSBalancer, func()) { // setupWithWatch does everything that setup does, and also pushes a ClientConn // update to the cdsBalancer and waits for a CDS watch call to be registered. -func setupWithWatch(t *testing.T) (*fakexds.Client, *cdsBalancer, *testEDSBalancer, func()) { +func setupWithWatch(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, func()) { t.Helper() - xdsC := fakexds.NewClient() + xdsC := fakeclient.NewClient() cdsB, edsB, cancel := setup() if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil { t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err) @@ -246,7 +246,7 @@ func setupWithWatch(t *testing.T) (*fakexds.Client, *cdsBalancer, *testEDSBalanc // cdsBalancer with different inputs and verifies that the CDS watch API on the // provided xdsClient is invoked appropriately. func TestUpdateClientConnState(t *testing.T) { - xdsC := fakexds.NewClient() + xdsC := fakeclient.NewClient() tests := []struct { name string @@ -324,7 +324,7 @@ func TestUpdateClientConnStateAfterClose(t *testing.T) { defer cancel() cdsB.Close() - if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, fakexds.NewClient())); err != errBalancerClosed { + if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, fakeclient.NewClient())); err != errBalancerClosed { t.Fatalf("UpdateClientConnState() after close returned %v, want %v", err, errBalancerClosed) } } diff --git a/xds/internal/balancer/xds_client_test.go b/xds/internal/balancer/xds_client_test.go index c640f18b6..b9f48a762 100644 --- a/xds/internal/balancer/xds_client_test.go +++ b/xds/internal/balancer/xds_client_test.go @@ -32,7 +32,8 @@ import ( xdsinternal "google.golang.org/grpc/xds/internal" xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/testutils/fakexds" + "google.golang.org/grpc/xds/internal/testutils/fakeclient" + "google.golang.org/grpc/xds/internal/testutils/fakeserver" ) const ( @@ -54,7 +55,7 @@ var ( // * Sends updates with different edsServiceNames and expects new watches to be // registered. func (s) TestClientWrapperWatchEDS(t *testing.T) { - fakeServer, cleanup, err := fakexds.StartServer() + fakeServer, cleanup, err := fakeserver.StartServer() if err != nil { t.Fatalf("Failed to start fake xDS server: %v", err) } @@ -101,7 +102,7 @@ func (s) TestClientWrapperWatchEDS(t *testing.T) { if err != nil { t.Fatalf("EDS RPC failed with err: %v", err) } - edsReq := req.(*fakexds.Request) + edsReq := req.(*fakeserver.Request) if edsReq.Err != nil { t.Fatalf("EDS RPC failed with err: %v", edsReq.Err) } @@ -123,9 +124,9 @@ func (s) TestClientWrapperWatchEDS(t *testing.T) { // // The test does the following: // * Creates a clientWrapper. -// * Creates a fakexds.Client and passes it to the clientWrapper in attributes. +// * Creates a fakeclient.Client and passes it to the clientWrapper in attributes. // * Verifies the clientWrapper registers an EDS watch. -// * Forces the fakexds.Client to invoke the registered EDS watch callback with +// * Forces the fakeclient.Client to invoke the registered EDS watch callback with // an error. Verifies that the wrapper does not invoke the top-level // edsBalancer with the received error. func (s) TestClientWrapperHandleUpdateError(t *testing.T) { @@ -138,7 +139,7 @@ func (s) TestClientWrapperHandleUpdateError(t *testing.T) { cw := newXDSClientWrapper(newEDS, nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil) defer cw.close() - xdsC := fakexds.NewClient() + xdsC := fakeclient.NewClient() cw.handleUpdate(&XDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC)) gotCluster, err := xdsC.WaitForWatchEDS() if err != nil { @@ -173,7 +174,7 @@ func (s) TestClientWrapperGetsXDSClientInAttributes(t *testing.T) { defer cw.close() // Verify that the eds watch is registered for the expected resource name. - xdsC1 := fakexds.NewClient() + xdsC1 := fakeclient.NewClient() cw.handleUpdate(&XDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC1)) gotCluster, err := xdsC1.WaitForWatchEDS() if err != nil { @@ -187,7 +188,7 @@ func (s) TestClientWrapperGetsXDSClientInAttributes(t *testing.T) { // re-registered on the new client, and that the old client is not closed // (because clientWrapper only closes clients that it creates, it does not // close client that are passed through attributes). - xdsC2 := fakexds.NewClient() + xdsC2 := fakeclient.NewClient() cw.handleUpdate(&XDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC2)) gotCluster, err = xdsC2.WaitForWatchEDS() if err != nil { diff --git a/xds/internal/balancer/xds_lrs_test.go b/xds/internal/balancer/xds_lrs_test.go index 9c2df72c7..0bd329fa1 100644 --- a/xds/internal/balancer/xds_lrs_test.go +++ b/xds/internal/balancer/xds_lrs_test.go @@ -25,7 +25,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/resolver" xdsinternal "google.golang.org/grpc/xds/internal" - "google.golang.org/grpc/xds/internal/testutils/fakexds" + "google.golang.org/grpc/xds/internal/testutils/fakeclient" ) // TestXDSLoadReporting verifies that the edsBalancer starts the loadReport @@ -40,7 +40,7 @@ func (s) TestXDSLoadReporting(t *testing.T) { } defer edsB.Close() - xdsC := fakexds.NewClient() + xdsC := fakeclient.NewClient() edsB.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, xdsC)}, BalancerConfig: &XDSConfig{LrsLoadReportingServerName: new(string)}, diff --git a/xds/internal/balancer/xds_test.go b/xds/internal/balancer/xds_test.go index 27d3e1dc3..6dc04ba29 100644 --- a/xds/internal/balancer/xds_test.go +++ b/xds/internal/balancer/xds_test.go @@ -43,7 +43,7 @@ import ( xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/client/bootstrap" "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/testutils/fakexds" + "google.golang.org/grpc/xds/internal/testutils/fakeclient" ) var lbABuilder = &balancerABuilder{} @@ -317,7 +317,7 @@ func (s) TestXDSFallbackResolvedAddrs(t *testing.T) { // waitForNewXDSClientWithEDSWatch makes sure that a new xdsClient is created // with the provided name. It also make sure that the newly created client // registers an eds watcher. -func waitForNewXDSClientWithEDSWatch(t *testing.T, ch *testutils.Channel, wantName string) *fakexds.Client { +func waitForNewXDSClientWithEDSWatch(t *testing.T, ch *testutils.Channel, wantName string) *fakeclient.Client { t.Helper() val, err := ch.Receive() @@ -325,7 +325,7 @@ func waitForNewXDSClientWithEDSWatch(t *testing.T, ch *testutils.Channel, wantNa t.Fatalf("error when waiting for a new xds client: %v", err) return nil } - xdsC := val.(*fakexds.Client) + xdsC := val.(*fakeclient.Client) if xdsC.Name() != wantName { t.Fatalf("xdsClient created to balancer: %v, want %v", xdsC.Name(), wantName) return nil @@ -365,7 +365,7 @@ func setup(edsLBCh *testutils.Channel, xdsClientCh *testutils.Channel) func() { origXdsClientNew := xdsclientNew xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) { - xdsC := fakexds.NewClientWithName(opts.Config.BalancerName) + xdsC := fakeclient.NewClientWithName(opts.Config.BalancerName) defer func() { xdsClientCh.Send(xdsC) }() return xdsC, nil } @@ -527,7 +527,7 @@ func (s) TestXDSConnfigChildPolicyUpdate(t *testing.T) { // will not kick-in as yet. // * Sends another ClientConn update with fallback addresses. Still fallback // would not have kicked in because the startupTimeout hasn't expired. -// * Sends an EDSUpdate through the fakexds.Client object. This will trigger +// * Sends an EDSUpdate through the fakeclient.Client object. This will trigger // the creation of an edsLB object. This is verified. // * Trigger fallback by directly calling the loseContact method on the // top-level edsBalancer. This should instantiate the fallbackLB and should diff --git a/xds/internal/client/cds_test.go b/xds/internal/client/cds_test.go index 086184bdd..156026699 100644 --- a/xds/internal/client/cds_test.go +++ b/xds/internal/client/cds_test.go @@ -25,11 +25,13 @@ import ( "testing" "time" + discoverypb "github.com/envoyproxy/go-control-plane/envoy/api/v2" xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" "github.com/golang/protobuf/proto" anypb "github.com/golang/protobuf/ptypes/any" - "google.golang.org/grpc/xds/internal/client/fakexds" + "google.golang.org/grpc/xds/internal/testutils" + "google.golang.org/grpc/xds/internal/testutils/fakeserver" ) const ( @@ -164,13 +166,10 @@ func TestValidateCluster(t *testing.T) { // and creates a v2Client using it. Then, it registers a CDS watcher and tests // different CDS responses. func TestCDSHandleResponse(t *testing.T) { - fakeServer, sCleanup := fakexds.StartServer(t) - client, cCleanup := fakeServer.GetClientConn(t) - defer func() { - cCleanup() - sCleanup() - }() - v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() tests := []struct { @@ -230,7 +229,7 @@ func TestCDSHandleResponse(t *testing.T) { wantUpdateErr: test.wantUpdateErr, cdsWatch: v2c.watchCDS, - watchReqChan: fakeServer.RequestChan, + watchReqChan: fakeServer.XDSRequestChan, handleXDSResp: v2c.handleCDSResponse, }) }) @@ -240,13 +239,10 @@ func TestCDSHandleResponse(t *testing.T) { // TestCDSHandleResponseWithoutWatch tests the case where the v2Client receives // a CDS response without a registered watcher. func TestCDSHandleResponseWithoutWatch(t *testing.T) { - fakeServer, sCleanup := fakexds.StartServer(t) - client, cCleanup := fakeServer.GetClientConn(t) - defer func() { - cCleanup() - sCleanup() - }() - v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + _, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() if v2c.handleCDSResponse(goodCDSResponse1) == nil { @@ -260,7 +256,7 @@ type cdsTestOp struct { // target is the resource name to watch for. target string // responseToSend is the xDS response sent to the client - responseToSend *fakexds.Response + responseToSend *fakeserver.Response // wantOpErr specfies whether the main operation should return an error. wantOpErr bool // wantCDSCache is the expected rdsCache at the end of an operation. @@ -273,16 +269,13 @@ type cdsTestOp struct { // ClientConn to it, creates a v2Client using it. It then reads a bunch of // test operations to be performed from cdsTestOps and returns error, if any, // on the provided error channel. This is executed in a separate goroutine. -func testCDSCaching(t *testing.T, cdsTestOps []cdsTestOp, errCh chan error) { +func testCDSCaching(t *testing.T, cdsTestOps []cdsTestOp, errCh *testutils.Channel) { t.Helper() - fakeServer, sCleanup := fakexds.StartServer(t) - client, cCleanup := fakeServer.GetClientConn(t) - defer func() { - cCleanup() - sCleanup() - }() - v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() t.Log("Started xds v2Client...") @@ -299,15 +292,19 @@ func testCDSCaching(t *testing.T, cdsTestOps []cdsTestOp, errCh chan error) { // Wait till the request makes it to the fakeServer. This ensures that // the watch request has been processed by the v2Client. - <-fakeServer.RequestChan + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + errCh.Send(fmt.Errorf("Timeout waiting for CDS request: %v", err)) + return + } t.Log("FakeServer received request...") } // Directly push the response through a call to handleCDSResponse, // thereby bypassing the fakeServer. if cdsTestOp.responseToSend != nil { - if err := v2c.handleCDSResponse(cdsTestOp.responseToSend.Resp); (err != nil) != cdsTestOp.wantOpErr { - errCh <- fmt.Errorf("v2c.handleCDSResponse() returned err: %v", err) + resp := cdsTestOp.responseToSend.Resp.(*discoverypb.DiscoveryResponse) + if err := v2c.handleCDSResponse(resp); (err != nil) != cdsTestOp.wantOpErr { + errCh.Send(fmt.Errorf("v2c.handleRDSResponse(%+v) returned err: %v", resp, err)) return } } @@ -320,24 +317,23 @@ func testCDSCaching(t *testing.T, cdsTestOps []cdsTestOp, errCh chan error) { } if !reflect.DeepEqual(v2c.cloneCDSCacheForTesting(), cdsTestOp.wantCDSCache) { - errCh <- fmt.Errorf("gotCDSCache: %v, wantCDSCache: %v", v2c.rdsCache, cdsTestOp.wantCDSCache) + errCh.Send(fmt.Errorf("gotCDSCache: %v, wantCDSCache: %v", v2c.rdsCache, cdsTestOp.wantCDSCache)) return } } t.Log("Completed all test ops successfully...") - errCh <- nil + errCh.Send(nil) } // TestCDSCaching tests some end-to-end CDS flows using a fake xDS server, and // verifies the CDS data cached at the v2Client. func TestCDSCaching(t *testing.T) { - errCh := make(chan error, 1) ops := []cdsTestOp{ // Add an CDS watch for a cluster name (clusterName1), which returns one // matching resource in the response. { target: clusterName1, - responseToSend: &fakexds.Response{Resp: goodCDSResponse1}, + responseToSend: &fakeserver.Response{Resp: goodCDSResponse1}, wantCDSCache: map[string]CDSUpdate{ clusterName1: {serviceName1, true}, }, @@ -346,7 +342,7 @@ func TestCDSCaching(t *testing.T) { // Push an CDS response which contains a new resource (apart from the // one received in the previous response). This should be cached. { - responseToSend: &fakexds.Response{Resp: cdsResponseWithMultipleResources}, + responseToSend: &fakeserver.Response{Resp: cdsResponseWithMultipleResources}, wantCDSCache: map[string]CDSUpdate{ clusterName1: {serviceName1, true}, clusterName2: {serviceName2, false}, @@ -366,24 +362,15 @@ func TestCDSCaching(t *testing.T) { }, // Push an empty CDS response. This should clear the cache. { - responseToSend: &fakexds.Response{Resp: &xdspb.DiscoveryResponse{TypeUrl: cdsURL}}, + responseToSend: &fakeserver.Response{Resp: &xdspb.DiscoveryResponse{TypeUrl: cdsURL}}, wantOpErr: false, wantCDSCache: map[string]CDSUpdate{}, wantWatchCallback: true, }, } + errCh := testutils.NewChannel() go testCDSCaching(t, ops, errCh) - - timer := time.NewTimer(defaultTestTimeout) - select { - case <-timer.C: - t.Fatal("Timeout when expecting CDS update") - case err := <-errCh: - timer.Stop() - if err != nil { - t.Fatal(err) - } - } + waitForNilErr(t, errCh) } // TestCDSWatchExpiryTimer tests the case where the client does not receive an @@ -391,44 +378,36 @@ func TestCDSCaching(t *testing.T) { // to be invoked with an error once the watchExpiryTimer fires. func TestCDSWatchExpiryTimer(t *testing.T) { oldWatchExpiryTimeout := defaultWatchExpiryTimeout - defaultWatchExpiryTimeout = 1 * time.Second + defaultWatchExpiryTimeout = 500 * time.Millisecond defer func() { defaultWatchExpiryTimeout = oldWatchExpiryTimeout }() - fakeServer, sCleanup := fakexds.StartServer(t) - client, cCleanup := fakeServer.GetClientConn(t) - defer func() { - cCleanup() - sCleanup() - }() - v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() t.Log("Started xds v2Client...") - cdsCallbackCh := make(chan error, 1) + callbackCh := testutils.NewChannel() v2c.watchCDS(clusterName1, func(u CDSUpdate, err error) { t.Logf("Received callback with CDSUpdate {%+v} and error {%v}", u, err) if u.ServiceName != "" { - cdsCallbackCh <- fmt.Errorf("received serviceName %v in cdsCallback, wanted empty string", u.ServiceName) + callbackCh.Send(fmt.Errorf("received serviceName %v in cdsCallback, wanted empty string", u.ServiceName)) } if err == nil { - cdsCallbackCh <- errors.New("received nil error in cdsCallback") + callbackCh.Send(errors.New("received nil error in cdsCallback")) } - cdsCallbackCh <- nil + callbackCh.Send(nil) }) - <-fakeServer.RequestChan - timer := time.NewTimer(2 * time.Second) - select { - case <-timer.C: - t.Fatalf("Timeout expired when expecting CDS update") - case err := <-cdsCallbackCh: - timer.Stop() - if err != nil { - t.Fatal(err) - } + // Wait till the request makes it to the fakeServer. This ensures that + // the watch request has been processed by the v2Client. + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an CDS request") } + waitForNilErr(t, callbackCh) } var ( diff --git a/xds/internal/client/client_test.go b/xds/internal/client/client_test.go index f9edff4f1..d74d6c8ef 100644 --- a/xds/internal/client/client_test.go +++ b/xds/internal/client/client_test.go @@ -26,7 +26,8 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/xds/internal/client/bootstrap" - "google.golang.org/grpc/xds/internal/client/fakexds" + "google.golang.org/grpc/xds/internal/testutils" + "google.golang.org/grpc/xds/internal/testutils/fakeserver" corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" ) @@ -45,7 +46,10 @@ func clientOpts(balancerName string) Options { } func TestNew(t *testing.T) { - fakeServer, cleanup := fakexds.StartServer(t) + fakeServer, cleanup, err := fakeserver.StartServer() + if err != nil { + t.Fatalf("Failed to start fake xDS server: %v", err) + } defer cleanup() tests := []struct { @@ -107,7 +111,10 @@ func TestNew(t *testing.T) { // TestWatchService tests the happy case of registering a watcher for // service updates and receiving a good update. func TestWatchService(t *testing.T) { - fakeServer, cleanup := fakexds.StartServer(t) + fakeServer, cleanup, err := fakeserver.StartServer() + if err != nil { + t.Fatalf("Failed to start fake xDS server: %v", err) + } defer cleanup() xdsClient, err := New(clientOpts(fakeServer.Address)) @@ -117,37 +124,33 @@ func TestWatchService(t *testing.T) { defer xdsClient.Close() t.Log("Created an xdsClient...") - callbackCh := make(chan error, 1) + callbackCh := testutils.NewChannel() cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) { if err != nil { - callbackCh <- fmt.Errorf("xdsClient.WatchService returned error: %v", err) + callbackCh.Send(fmt.Errorf("xdsClient.WatchService returned error: %v", err)) return } if su.Cluster != goodClusterName1 { - callbackCh <- fmt.Errorf("got clusterName: %+v, want clusterName: %+v", su.Cluster, goodClusterName1) + callbackCh.Send(fmt.Errorf("got clusterName: %+v, want clusterName: %+v", su.Cluster, goodClusterName1)) return } - callbackCh <- nil + callbackCh.Send(nil) }) defer cancelWatch() t.Log("Registered a watcher for service updates...") - // Make the fakeServer send LDS and RDS responses. - <-fakeServer.RequestChan - fakeServer.ResponseChan <- &fakexds.Response{Resp: goodLDSResponse1} - <-fakeServer.RequestChan - fakeServer.ResponseChan <- &fakexds.Response{Resp: goodRDSResponse1} - - timer := time.NewTimer(defaultTestTimeout) - select { - case <-timer.C: - t.Fatal("Timeout when expecting a service update") - case err := <-callbackCh: - timer.Stop() - if err != nil { - t.Fatal(err) - } + // Make the fakeServer send LDS response. + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an LDS request") } + fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} + + // Make the fakeServer send RDS response. + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an RDS request") + } + fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodRDSResponse1} + waitForNilErr(t, callbackCh) } // TestWatchServiceWithNoResponseFromServer tests the case where the @@ -155,7 +158,10 @@ func TestWatchService(t *testing.T) { // registering a service update watcher. The underlying v2Client will timeout // and will send us an error. func TestWatchServiceWithNoResponseFromServer(t *testing.T) { - fakeServer, cleanup := fakexds.StartServer(t) + fakeServer, cleanup, err := fakeserver.StartServer() + if err != nil { + t.Fatalf("Failed to start fake xDS server: %v", err) + } defer cleanup() xdsClient, err := New(clientOpts(fakeServer.Address)) @@ -166,45 +172,40 @@ func TestWatchServiceWithNoResponseFromServer(t *testing.T) { t.Log("Created an xdsClient...") oldWatchExpiryTimeout := defaultWatchExpiryTimeout - defaultWatchExpiryTimeout = 1 * time.Second + defaultWatchExpiryTimeout = 500 * time.Millisecond defer func() { defaultWatchExpiryTimeout = oldWatchExpiryTimeout }() - callbackCh := make(chan error, 1) + callbackCh := testutils.NewChannel() cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) { if su.Cluster != "" { - callbackCh <- fmt.Errorf("got clusterName: %+v, want empty clusterName", su.Cluster) + callbackCh.Send(fmt.Errorf("got clusterName: %+v, want empty clusterName", su.Cluster)) return } if err == nil { - callbackCh <- errors.New("xdsClient.WatchService returned error non-nil error") + callbackCh.Send(errors.New("xdsClient.WatchService returned error non-nil error")) return } - callbackCh <- nil + callbackCh.Send(nil) }) defer cancelWatch() t.Log("Registered a watcher for service updates...") // Wait for one request from the client, but send no reponses. - <-fakeServer.RequestChan - - timer := time.NewTimer(2 * time.Second) - select { - case <-timer.C: - t.Fatal("Timeout when expecting a service update") - case err := <-callbackCh: - timer.Stop() - if err != nil { - t.Fatal(err) - } + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an LDS request") } + waitForNilErr(t, callbackCh) } // TestWatchServiceEmptyRDS tests the case where the underlying // v2Client receives an empty RDS response. func TestWatchServiceEmptyRDS(t *testing.T) { - fakeServer, cleanup := fakexds.StartServer(t) + fakeServer, cleanup, err := fakeserver.StartServer() + if err != nil { + t.Fatalf("Failed to start fake xDS server: %v", err) + } defer cleanup() xdsClient, err := New(clientOpts(fakeServer.Address)) @@ -215,49 +216,48 @@ func TestWatchServiceEmptyRDS(t *testing.T) { t.Log("Created an xdsClient...") oldWatchExpiryTimeout := defaultWatchExpiryTimeout - defaultWatchExpiryTimeout = 1 * time.Second + defaultWatchExpiryTimeout = 500 * time.Millisecond defer func() { defaultWatchExpiryTimeout = oldWatchExpiryTimeout }() - callbackCh := make(chan error, 1) + callbackCh := testutils.NewChannel() cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) { if su.Cluster != "" { - callbackCh <- fmt.Errorf("got clusterName: %+v, want empty clusterName", su.Cluster) + callbackCh.Send(fmt.Errorf("got clusterName: %+v, want empty clusterName", su.Cluster)) return } if err == nil { - callbackCh <- errors.New("xdsClient.WatchService returned error non-nil error") + callbackCh.Send(errors.New("xdsClient.WatchService returned error non-nil error")) return } - callbackCh <- nil + callbackCh.Send(nil) }) defer cancelWatch() t.Log("Registered a watcher for service updates...") - // Send a good LDS response, but send an empty RDS response. - <-fakeServer.RequestChan - fakeServer.ResponseChan <- &fakexds.Response{Resp: goodLDSResponse1} - <-fakeServer.RequestChan - fakeServer.ResponseChan <- &fakexds.Response{Resp: noVirtualHostsInRDSResponse} - - timer := time.NewTimer(2 * time.Second) - select { - case <-timer.C: - t.Fatal("Timeout when expecting a service update") - case err := <-callbackCh: - timer.Stop() - if err != nil { - t.Fatal(err) - } + // Make the fakeServer send LDS response. + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an LDS request") } + fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} + + // Make the fakeServer send an empty RDS response. + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an RDS request") + } + fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: noVirtualHostsInRDSResponse} + waitForNilErr(t, callbackCh) } // TestWatchServiceWithClientClose tests the case where xDS responses are // received after the client is closed, and we make sure that the registered // watcher callback is not invoked. func TestWatchServiceWithClientClose(t *testing.T) { - fakeServer, cleanup := fakexds.StartServer(t) + fakeServer, cleanup, err := fakeserver.StartServer() + if err != nil { + t.Fatalf("Failed to start fake xDS server: %v", err) + } defer cleanup() xdsClient, err := New(clientOpts(fakeServer.Address)) @@ -267,29 +267,26 @@ func TestWatchServiceWithClientClose(t *testing.T) { defer xdsClient.Close() t.Log("Created an xdsClient...") - callbackCh := make(chan error, 1) + callbackCh := testutils.NewChannel() cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) { - callbackCh <- errors.New("watcher callback invoked after client close") + callbackCh.Send(errors.New("watcher callback invoked after client close")) }) defer cancelWatch() t.Log("Registered a watcher for service updates...") // Make the fakeServer send LDS response. - <-fakeServer.RequestChan - fakeServer.ResponseChan <- &fakexds.Response{Resp: goodLDSResponse1} + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an LDS request") + } + fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} xdsClient.Close() t.Log("Closing the xdsClient...") // Push an RDS response from the fakeserver - fakeServer.ResponseChan <- &fakexds.Response{Resp: goodRDSResponse1} - - timer := time.NewTimer(1 * time.Second) - select { - case <-timer.C: - // Do nothing. Success. - case err := <-callbackCh: - timer.Stop() - t.Fatal(err) + fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodRDSResponse1} + if cbErr, err := callbackCh.Receive(); err != testutils.ErrRecvTimeout { + t.Fatal(cbErr) } + } diff --git a/xds/internal/client/eds_test.go b/xds/internal/client/eds_test.go index cf6fcceea..70afd655b 100644 --- a/xds/internal/client/eds_test.go +++ b/xds/internal/client/eds_test.go @@ -29,7 +29,7 @@ import ( anypb "github.com/golang/protobuf/ptypes/any" "github.com/google/go-cmp/cmp" "google.golang.org/grpc/xds/internal" - "google.golang.org/grpc/xds/internal/client/fakexds" + "google.golang.org/grpc/xds/internal/testutils" ) func TestEDSParseRespProto(t *testing.T) { @@ -162,13 +162,10 @@ var ( ) func TestEDSHandleResponse(t *testing.T) { - fakeServer, sCleanup := fakexds.StartServer(t) - client, cCleanup := fakeServer.GetClientConn(t) - defer func() { - cCleanup() - sCleanup() - }() - v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() tests := []struct { @@ -235,7 +232,7 @@ func TestEDSHandleResponse(t *testing.T) { wantUpdateErr: test.wantUpdateErr, edsWatch: v2c.watchEDS, - watchReqChan: fakeServer.RequestChan, + watchReqChan: fakeServer.XDSRequestChan, handleXDSResp: v2c.handleEDSResponse, }) }) @@ -245,13 +242,10 @@ func TestEDSHandleResponse(t *testing.T) { // TestEDSHandleResponseWithoutWatch tests the case where the v2Client // receives an EDS response without a registered EDS watcher. func TestEDSHandleResponseWithoutWatch(t *testing.T) { - fakeServer, sCleanup := fakexds.StartServer(t) - client, cCleanup := fakeServer.GetClientConn(t) - defer func() { - cCleanup() - sCleanup() - }() - v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + _, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() if v2c.handleEDSResponse(goodEDSResponse1) == nil { @@ -261,42 +255,34 @@ func TestEDSHandleResponseWithoutWatch(t *testing.T) { func TestEDSWatchExpiryTimer(t *testing.T) { oldWatchExpiryTimeout := defaultWatchExpiryTimeout - defaultWatchExpiryTimeout = 1 * time.Second + defaultWatchExpiryTimeout = 500 * time.Millisecond defer func() { defaultWatchExpiryTimeout = oldWatchExpiryTimeout }() - fakeServer, sCleanup := fakexds.StartServer(t) - client, cCleanup := fakeServer.GetClientConn(t) - defer func() { - cCleanup() - sCleanup() - }() - v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() t.Log("Started xds v2Client...") - edsCallbackCh := make(chan error, 1) + callbackCh := testutils.NewChannel() v2c.watchEDS(goodRouteName1, func(u *EDSUpdate, err error) { t.Logf("Received callback with edsUpdate {%+v} and error {%v}", u, err) if u != nil { - edsCallbackCh <- fmt.Errorf("received EDSUpdate %v in edsCallback, wanted nil", u) + callbackCh.Send(fmt.Errorf("received EDSUpdate %v in edsCallback, wanted nil", u)) } if err == nil { - edsCallbackCh <- errors.New("received nil error in edsCallback") + callbackCh.Send(errors.New("received nil error in edsCallback")) } - edsCallbackCh <- nil + callbackCh.Send(nil) }) - <-fakeServer.RequestChan - timer := time.NewTimer(2 * time.Second) - select { - case <-timer.C: - t.Fatalf("Timeout expired when expecting EDS update") - case err := <-edsCallbackCh: - timer.Stop() - if err != nil { - t.Fatal(err) - } + // Wait till the request makes it to the fakeServer. This ensures that + // the watch request has been processed by the v2Client. + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an CDS request") } + waitForNilErr(t, callbackCh) } diff --git a/xds/internal/client/fakexds/fakexds.go b/xds/internal/client/fakexds/fakexds.go deleted file mode 100644 index 9505e5ac9..000000000 --- a/xds/internal/client/fakexds/fakexds.go +++ /dev/null @@ -1,164 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -// Package fakexds provides a very basic fake implementation of the xDS server -// for unit testing purposes. -package fakexds - -import ( - "context" - "net" - "testing" - "time" - - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - - discoverypb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" - lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" -) - -// TODO: Make this a var or a field in the server if there is a need to use a -// value other than this default. -const defaultChannelBufferSize = 50 - -// Request wraps an xDS request and error. -type Request struct { - Req *discoverypb.DiscoveryRequest - Err error -} - -// Response wraps an xDS response and error. -type Response struct { - Resp *discoverypb.DiscoveryResponse - Err error -} - -// Server is a very basic implementation of a fake xDS server. It provides a -// request and response channel for the user to control the requests that are -// expected and the responses that needs to be sent out. -type Server struct { - // RequestChan is a buffered channel on which the fake server writes the - // received requests onto. - RequestChan chan *Request - // ResponseChan is a buffered channel from which the fake server reads the - // responses that it must send out to the client. - ResponseChan chan *Response - // Address is the host:port on which the fake xdsServer is listening on. - Address string - // LRS is the LRS server installed. - LRS *LRSServer -} - -// StartServer starts a fakexds.Server. The returned function should be invoked -// by the caller once the test is done. -func StartServer(t *testing.T) (*Server, func()) { - t.Helper() - - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("net.Listen() failed: %v", err) - } - server := grpc.NewServer() - - lrss := newLRSServer() - lrsgrpc.RegisterLoadReportingServiceServer(server, lrss) - - fs := &Server{ - RequestChan: make(chan *Request, defaultChannelBufferSize), - ResponseChan: make(chan *Response, defaultChannelBufferSize), - Address: lis.Addr().String(), - LRS: lrss, - } - adsgrpc.RegisterAggregatedDiscoveryServiceServer(server, fs) - - go server.Serve(lis) - t.Logf("Starting fake xDS server at %v...", fs.Address) - - return fs, func() { server.Stop() } -} - -// GetClientConn returns a grpc.ClientConn talking to the fake server. The -// returned function should be invoked by the caller once the test is done. -func (fs *Server) GetClientConn(t *testing.T) (*grpc.ClientConn, func()) { - t.Helper() - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - cc, err := grpc.DialContext(ctx, fs.Address, grpc.WithInsecure(), grpc.WithBlock()) - if err != nil { - t.Fatalf("grpc.DialContext(%s) failed: %v", fs.Address, err) - } - t.Log("Started xDS gRPC client...") - - return cc, func() { - cc.Close() - } -} - -// StreamAggregatedResources is the fake implementation to handle an ADS -// stream. -func (fs *Server) StreamAggregatedResources(s adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { - errCh := make(chan error, 2) - go func() { - for { - req, err := s.Recv() - if err != nil { - errCh <- err - return - } - fs.RequestChan <- &Request{req, err} - } - }() - go func() { - var retErr error - defer func() { - errCh <- retErr - }() - - for { - select { - case r := <-fs.ResponseChan: - if r.Err != nil { - retErr = r.Err - return - } - if err := s.Send(r.Resp); err != nil { - retErr = err - return - } - case <-s.Context().Done(): - retErr = s.Context().Err() - return - } - } - }() - - if err := <-errCh; err != nil { - return err - } - return nil -} - -// DeltaAggregatedResources helps implement the ADS service. -func (fs *Server) DeltaAggregatedResources(adsgrpc.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error { - return status.Error(codes.Unimplemented, "") -} diff --git a/xds/internal/client/fakexds/lrsserver.go b/xds/internal/client/fakexds/lrsserver.go deleted file mode 100644 index 55a6feeff..000000000 --- a/xds/internal/client/fakexds/lrsserver.go +++ /dev/null @@ -1,104 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package fakexds - -import ( - "io" - "sync" - - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" - lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" - lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" - "github.com/golang/protobuf/proto" - durationpb "github.com/golang/protobuf/ptypes/duration" - "github.com/google/go-cmp/cmp" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -// LRSServer implements the LRS service, and is to be installed on the fakexds -// server. It collects load reports, and returned them later for comparison. -type LRSServer struct { - // ReportingInterval will be sent in the first response to control reporting - // interval. - ReportingInterval *durationpb.Duration - // ExpectedEDSClusterName is checked against the first LRS request. The RPC - // is failed if they don't match. - ExpectedEDSClusterName string - - mu sync.Mutex - dropTotal uint64 - drops map[string]uint64 -} - -func newLRSServer() *LRSServer { - return &LRSServer{ - drops: make(map[string]uint64), - ReportingInterval: &durationpb.Duration{ - Seconds: 60 * 60, // 1 hour, each test can override this to a shorter duration. - }, - } -} - -// StreamLoadStats implements LRS service. -func (lrss *LRSServer) StreamLoadStats(stream lrsgrpc.LoadReportingService_StreamLoadStatsServer) error { - req, err := stream.Recv() - if err != nil { - return err - } - wantReq := &lrspb.LoadStatsRequest{ - ClusterStats: []*endpointpb.ClusterStats{{ - ClusterName: lrss.ExpectedEDSClusterName, - }}, - Node: &corepb.Node{}, - } - if !proto.Equal(req, wantReq) { - return status.Errorf(codes.FailedPrecondition, "unexpected req: %+v, want %+v, diff: %s", req, wantReq, cmp.Diff(req, wantReq, cmp.Comparer(proto.Equal))) - } - if err := stream.Send(&lrspb.LoadStatsResponse{ - Clusters: []string{lrss.ExpectedEDSClusterName}, - LoadReportingInterval: lrss.ReportingInterval, - }); err != nil { - return err - } - - for { - req, err := stream.Recv() - if err != nil { - if err == io.EOF { - return nil - } - return err - } - stats := req.ClusterStats[0] - lrss.mu.Lock() - lrss.dropTotal += stats.TotalDroppedRequests - for _, d := range stats.DroppedRequests { - lrss.drops[d.Category] += d.DroppedCount - } - lrss.mu.Unlock() - } -} - -// GetDrops returns the drops reported to this server. -func (lrss *LRSServer) GetDrops() map[string]uint64 { - lrss.mu.Lock() - defer lrss.mu.Unlock() - return lrss.drops -} diff --git a/xds/internal/client/lds_test.go b/xds/internal/client/lds_test.go index 6ac39a46a..9eee4e896 100644 --- a/xds/internal/client/lds_test.go +++ b/xds/internal/client/lds_test.go @@ -25,7 +25,7 @@ import ( "time" xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - "google.golang.org/grpc/xds/internal/client/fakexds" + "google.golang.org/grpc/xds/internal/testutils" ) func TestLDSGetRouteConfig(t *testing.T) { @@ -90,13 +90,10 @@ func TestLDSGetRouteConfig(t *testing.T) { // and creates a v2Client using it. Then, it registers a watchLDS and tests // different LDS responses. func TestLDSHandleResponse(t *testing.T) { - fakeServer, sCleanup := fakexds.StartServer(t) - client, cCleanup := fakeServer.GetClientConn(t) - defer func() { - cCleanup() - sCleanup() - }() - v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() tests := []struct { @@ -186,7 +183,7 @@ func TestLDSHandleResponse(t *testing.T) { wantUpdateErr: test.wantUpdateErr, ldsWatch: v2c.watchLDS, - watchReqChan: fakeServer.RequestChan, + watchReqChan: fakeServer.XDSRequestChan, handleXDSResp: v2c.handleLDSResponse, }) }) @@ -196,13 +193,10 @@ func TestLDSHandleResponse(t *testing.T) { // TestLDSHandleResponseWithoutWatch tests the case where the v2Client receives // an LDS response without a registered watcher. func TestLDSHandleResponseWithoutWatch(t *testing.T) { - fakeServer, sCleanup := fakexds.StartServer(t) - client, cCleanup := fakeServer.GetClientConn(t) - defer func() { - cCleanup() - sCleanup() - }() - v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + _, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() if v2c.handleLDSResponse(goodLDSResponse1) == nil { @@ -215,43 +209,33 @@ func TestLDSHandleResponseWithoutWatch(t *testing.T) { // to be invoked with an error once the watchExpiryTimer fires. func TestLDSWatchExpiryTimer(t *testing.T) { oldWatchExpiryTimeout := defaultWatchExpiryTimeout - defaultWatchExpiryTimeout = 1 * time.Second + defaultWatchExpiryTimeout = 500 * time.Millisecond defer func() { defaultWatchExpiryTimeout = oldWatchExpiryTimeout }() - fakeServer, sCleanup := fakexds.StartServer(t) - client, cCleanup := fakeServer.GetClientConn(t) - defer func() { - cCleanup() - sCleanup() - }() - v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() - // Wait till the request makes it to the fakeServer. This ensures that - // the watch request has been processed by the v2Client. - callbackCh := make(chan error, 1) + callbackCh := testutils.NewChannel() v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) { t.Logf("in v2c.watchLDS callback, ldsUpdate: %+v, err: %v", u, err) if u.routeName != "" { - callbackCh <- fmt.Errorf("received routeName %v in ldsCallback, wanted empty string", u.routeName) + callbackCh.Send(fmt.Errorf("received routeName %v in ldsCallback, wanted empty string", u.routeName)) } if err == nil { - callbackCh <- errors.New("received nil error in ldsCallback") + callbackCh.Send(errors.New("received nil error in ldsCallback")) } - callbackCh <- nil + callbackCh.Send(nil) }) - <-fakeServer.RequestChan - timer := time.NewTimer(2 * time.Second) - select { - case <-timer.C: - t.Fatalf("Timeout expired when expecting LDS update") - case err := <-callbackCh: - timer.Stop() - if err != nil { - t.Fatal(err) - } + // Wait till the request makes it to the fakeServer. This ensures that + // the watch request has been processed by the v2Client. + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an LDS request") } + waitForNilErr(t, callbackCh) } diff --git a/xds/internal/client/rds_test.go b/xds/internal/client/rds_test.go index 3c49f96f2..a947fc180 100644 --- a/xds/internal/client/rds_test.go +++ b/xds/internal/client/rds_test.go @@ -25,9 +25,11 @@ import ( "testing" "time" + discoverypb "github.com/envoyproxy/go-control-plane/envoy/api/v2" xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" routepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/route" - "google.golang.org/grpc/xds/internal/client/fakexds" + "google.golang.org/grpc/xds/internal/testutils" + "google.golang.org/grpc/xds/internal/testutils/fakeserver" ) func (v2c *v2Client) cloneRDSCacheForTesting() map[string]string { @@ -165,34 +167,36 @@ func TestRDSGetClusterFromRouteConfiguration(t *testing.T) { // This is called by RDS tests to start LDS first, because LDS is a // pre-requirement for RDS, and RDS handle would fail without an existing LDS // watch. -func doLDS(t *testing.T, v2c *v2Client, fakeServer *fakexds.Server) { +func doLDS(t *testing.T, v2c *v2Client, fakeServer *fakeserver.Server) { // Register an LDS watcher, and wait till the request is sent out, the // response is received and the callback is invoked. - cbCh := make(chan error, 1) + cbCh := testutils.NewChannel() v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) { t.Logf("v2c.watchLDS callback, ldsUpdate: %+v, err: %v", u, err) - cbCh <- err + cbCh.Send(err) }) - <-fakeServer.RequestChan - fakeServer.ResponseChan <- &fakexds.Response{Resp: goodLDSResponse1} - if err := <-cbCh; err != nil { - t.Fatalf("v2c.watchLDS returned error in callback: %v", err) + + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout waiting for LDS request: %v", err) } + + fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} + waitForNilErr(t, cbCh) + // Read the LDS ack, to clear RequestChan for following tests. - <-fakeServer.RequestChan + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout waiting for LDS ACK: %v", err) + } } // TestRDSHandleResponse starts a fake xDS server, makes a ClientConn to it, // and creates a v2Client using it. Then, it registers an LDS and RDS watcher // and tests different RDS responses. func TestRDSHandleResponse(t *testing.T) { - fakeServer, sCleanup := fakexds.StartServer(t) - client, cCleanup := fakeServer.GetClientConn(t) - defer func() { - cCleanup() - sCleanup() - }() - v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() doLDS(t, v2c, fakeServer) @@ -256,7 +260,7 @@ func TestRDSHandleResponse(t *testing.T) { wantUpdateErr: test.wantUpdateErr, rdsWatch: v2c.watchRDS, - watchReqChan: fakeServer.RequestChan, + watchReqChan: fakeServer.XDSRequestChan, handleXDSResp: v2c.handleRDSResponse, }) }) @@ -266,13 +270,10 @@ func TestRDSHandleResponse(t *testing.T) { // TestRDSHandleResponseWithoutLDSWatch tests the case where the v2Client // receives an RDS response without a registered LDS watcher. func TestRDSHandleResponseWithoutLDSWatch(t *testing.T) { - fakeServer, sCleanup := fakexds.StartServer(t) - client, cCleanup := fakeServer.GetClientConn(t) - defer func() { - cCleanup() - sCleanup() - }() - v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + _, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() if v2c.handleRDSResponse(goodRDSResponse1) == nil { @@ -283,13 +284,10 @@ func TestRDSHandleResponseWithoutLDSWatch(t *testing.T) { // TestRDSHandleResponseWithoutRDSWatch tests the case where the v2Client // receives an RDS response without a registered RDS watcher. func TestRDSHandleResponseWithoutRDSWatch(t *testing.T) { - fakeServer, sCleanup := fakexds.StartServer(t) - client, cCleanup := fakeServer.GetClientConn(t) - defer func() { - cCleanup() - sCleanup() - }() - v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() doLDS(t, v2c, fakeServer) @@ -304,7 +302,7 @@ type rdsTestOp struct { // target is the resource name to watch for. target string // responseToSend is the xDS response sent to the client - responseToSend *fakexds.Response + responseToSend *fakeserver.Response // wantOpErr specfies whether the main operation should return an error. wantOpErr bool // wantRDSCache is the expected rdsCache at the end of an operation. @@ -318,16 +316,13 @@ type rdsTestOp struct { // pushes a good LDS response. It then reads a bunch of test operations to be // performed from rdsTestOps and returns error, if any, on the provided error // channel. This is executed in a separate goroutine. -func testRDSCaching(t *testing.T, rdsTestOps []rdsTestOp, errCh chan error) { +func testRDSCaching(t *testing.T, rdsTestOps []rdsTestOp, errCh *testutils.Channel) { t.Helper() - fakeServer, sCleanup := fakexds.StartServer(t) - client, cCleanup := fakeServer.GetClientConn(t) - defer func() { - cCleanup() - sCleanup() - }() - v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() t.Log("Started xds v2Client...") doLDS(t, v2c, fakeServer) @@ -345,15 +340,18 @@ func testRDSCaching(t *testing.T, rdsTestOps []rdsTestOp, errCh chan error) { // Wait till the request makes it to the fakeServer. This ensures that // the watch request has been processed by the v2Client. - <-fakeServer.RequestChan + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + errCh.Send(fmt.Errorf("Timeout waiting for RDS request: %v", err)) + } t.Log("FakeServer received request...") } // Directly push the response through a call to handleRDSResponse, // thereby bypassing the fakeServer. if rdsTestOp.responseToSend != nil { - if err := v2c.handleRDSResponse(rdsTestOp.responseToSend.Resp); (err != nil) != rdsTestOp.wantOpErr { - errCh <- fmt.Errorf("v2c.handleRDSResponse() returned err: %v", err) + resp := rdsTestOp.responseToSend.Resp.(*discoverypb.DiscoveryResponse) + if err := v2c.handleRDSResponse(resp); (err != nil) != rdsTestOp.wantOpErr { + errCh.Send(fmt.Errorf("v2c.handleRDSResponse(%+v) returned err: %v", resp, err)) return } } @@ -366,24 +364,23 @@ func testRDSCaching(t *testing.T, rdsTestOps []rdsTestOp, errCh chan error) { } if !reflect.DeepEqual(v2c.cloneRDSCacheForTesting(), rdsTestOp.wantRDSCache) { - errCh <- fmt.Errorf("gotRDSCache: %v, wantRDSCache: %v", v2c.rdsCache, rdsTestOp.wantRDSCache) + errCh.Send(fmt.Errorf("gotRDSCache: %v, wantRDSCache: %v", v2c.rdsCache, rdsTestOp.wantRDSCache)) return } } t.Log("Completed all test ops successfully...") - errCh <- nil + errCh.Send(nil) } // TestRDSCaching tests some end-to-end RDS flows using a fake xDS server, and // verifies the RDS data cached at the v2Client. func TestRDSCaching(t *testing.T) { - errCh := make(chan error, 1) ops := []rdsTestOp{ // Add an RDS watch for a resource name (goodRouteName1), which returns one // matching resource in the response. { target: goodRouteName1, - responseToSend: &fakexds.Response{Resp: goodRDSResponse1}, + responseToSend: &fakeserver.Response{Resp: goodRDSResponse1}, wantRDSCache: map[string]string{goodRouteName1: goodClusterName1}, wantWatchCallback: true, }, @@ -392,7 +389,7 @@ func TestRDSCaching(t *testing.T) { // routeConfigName does not match our RDS watch (so the watch callback will // not be invoked). But this should still be cached. { - responseToSend: &fakexds.Response{Resp: goodRDSResponse2}, + responseToSend: &fakeserver.Response{Resp: goodRDSResponse2}, wantRDSCache: map[string]string{ goodRouteName1: goodClusterName1, goodRouteName2: goodClusterName2, @@ -402,7 +399,7 @@ func TestRDSCaching(t *testing.T) { // to return an error. But the watch callback should not be invoked, and // the cache should not be updated. { - responseToSend: &fakexds.Response{Resp: uninterestingRDSResponse}, + responseToSend: &fakeserver.Response{Resp: uninterestingRDSResponse}, wantOpErr: true, wantRDSCache: map[string]string{ goodRouteName1: goodClusterName1, @@ -421,18 +418,9 @@ func TestRDSCaching(t *testing.T) { wantWatchCallback: true, }, } + errCh := testutils.NewChannel() go testRDSCaching(t, ops, errCh) - - timer := time.NewTimer(defaultTestTimeout) - select { - case <-timer.C: - t.Fatal("Timeout when expecting RDS update") - case err := <-errCh: - timer.Stop() - if err != nil { - t.Fatal(err) - } - } + waitForNilErr(t, errCh) } // TestRDSWatchExpiryTimer tests the case where the client does not receive an @@ -440,47 +428,37 @@ func TestRDSCaching(t *testing.T) { // to be invoked with an error once the watchExpiryTimer fires. func TestRDSWatchExpiryTimer(t *testing.T) { oldWatchExpiryTimeout := defaultWatchExpiryTimeout - defaultWatchExpiryTimeout = 1 * time.Second + defaultWatchExpiryTimeout = 500 * time.Millisecond defer func() { defaultWatchExpiryTimeout = oldWatchExpiryTimeout }() - fakeServer, sCleanup := fakexds.StartServer(t) - client, cCleanup := fakeServer.GetClientConn(t) - defer func() { - cCleanup() - sCleanup() - }() - v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() t.Log("Started xds v2Client...") doLDS(t, v2c, fakeServer) - rdsCallbackCh := make(chan error, 1) + callbackCh := testutils.NewChannel() v2c.watchRDS(goodRouteName1, func(u rdsUpdate, err error) { t.Logf("Received callback with rdsUpdate {%+v} and error {%v}", u, err) if u.clusterName != "" { - rdsCallbackCh <- fmt.Errorf("received clusterName %v in rdsCallback, wanted empty string", u.clusterName) + callbackCh.Send(fmt.Errorf("received clusterName %v in rdsCallback, wanted empty string", u.clusterName)) } if err == nil { - rdsCallbackCh <- errors.New("received nil error in rdsCallback") + callbackCh.Send(errors.New("received nil error in rdsCallback")) } - rdsCallbackCh <- nil + callbackCh.Send(nil) }) + // Wait till the request makes it to the fakeServer. This ensures that // the watch request has been processed by the v2Client. - <-fakeServer.RequestChan - - timer := time.NewTimer(2 * defaultWatchExpiryTimeout) - select { - case <-timer.C: - t.Fatalf("Timeout expired when expecting RDS update") - case err := <-rdsCallbackCh: - timer.Stop() - if err != nil { - t.Fatal(err) - } + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an RDS request") } + waitForNilErr(t, callbackCh) } func TestHostFromTarget(t *testing.T) { diff --git a/xds/internal/client/testutil_test.go b/xds/internal/client/testutil_test.go index da46757b8..2fbc71839 100644 --- a/xds/internal/client/testutil_test.go +++ b/xds/internal/client/testutil_test.go @@ -20,11 +20,12 @@ package client import ( "reflect" "testing" - "time" xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" "github.com/google/go-cmp/cmp" - "google.golang.org/grpc/xds/internal/client/fakexds" + "google.golang.org/grpc" + "google.golang.org/grpc/xds/internal/testutils" + "google.golang.org/grpc/xds/internal/testutils/fakeserver" ) type watchHandleTestcase struct { @@ -39,7 +40,7 @@ type watchHandleTestcase struct { rdsWatch func(routeName string, rdsCb rdsCallback) (cancel func()) cdsWatch func(clusterName string, cdsCb cdsCallback) (cancel func()) edsWatch func(clusterName string, edsCb edsCallback) (cancel func()) - watchReqChan chan *fakexds.Request // The request sent for watch will be sent to this channel. + watchReqChan *testutils.Channel // The request sent for watch will be sent to this channel. handleXDSResp func(response *xdspb.DiscoveryResponse) error } @@ -50,8 +51,11 @@ type watchHandleTestcase struct { // handleXDSResp with responseToHandle (if it's set). It then compares the // update received by watch callback with the expected results. func testWatchHandle(t *testing.T, test *watchHandleTestcase) { - gotUpdateCh := make(chan interface{}, 1) - gotUpdateErrCh := make(chan error, 1) + type updateErr struct { + u interface{} + err error + } + gotUpdateCh := testutils.NewChannel() var cancelWatch func() // Register the watcher, this will also trigger the v2Client to send the xDS @@ -60,26 +64,22 @@ func testWatchHandle(t *testing.T, test *watchHandleTestcase) { case test.ldsWatch != nil: cancelWatch = test.ldsWatch(goodLDSTarget1, func(u ldsUpdate, err error) { t.Logf("in v2c.watchLDS callback, ldsUpdate: %+v, err: %v", u, err) - gotUpdateCh <- u - gotUpdateErrCh <- err + gotUpdateCh.Send(updateErr{u, err}) }) case test.rdsWatch != nil: cancelWatch = test.rdsWatch(goodRouteName1, func(u rdsUpdate, err error) { t.Logf("in v2c.watchRDS callback, rdsUpdate: %+v, err: %v", u, err) - gotUpdateCh <- u - gotUpdateErrCh <- err + gotUpdateCh.Send(updateErr{u, err}) }) case test.cdsWatch != nil: cancelWatch = test.cdsWatch(clusterName1, func(u CDSUpdate, err error) { t.Logf("in v2c.watchCDS callback, cdsUpdate: %+v, err: %v", u, err) - gotUpdateCh <- u - gotUpdateErrCh <- err + gotUpdateCh.Send(updateErr{u, err}) }) case test.edsWatch != nil: cancelWatch = test.edsWatch(goodEDSName, func(u *EDSUpdate, err error) { t.Logf("in v2c.watchEDS callback, edsUpdate: %+v, err: %v", u, err) - gotUpdateCh <- *u - gotUpdateErrCh <- err + gotUpdateCh.Send(updateErr{*u, err}) }) default: t.Fatalf("no watch() is set") @@ -88,9 +88,9 @@ func testWatchHandle(t *testing.T, test *watchHandleTestcase) { // Wait till the request makes it to the fakeServer. This ensures that // the watch request has been processed by the v2Client. - // - // TODO: switch to new fakexds server request channel with timed receive. - <-test.watchReqChan + if _, err := test.watchReqChan.Receive(); err != nil { + t.Fatalf("Timeout waiting for an xDS request: %v", err) + } // Directly push the response through a call to handleXDSResp. This bypasses // the fakeServer, so it's only testing the handle logic. Client response @@ -108,30 +108,62 @@ func testWatchHandle(t *testing.T, test *watchHandleTestcase) { // Cannot directly compare test.wantUpdate with nil (typed vs non-typed nil: // https://golang.org/doc/faq#nil_error). if c := test.wantUpdate; c == nil || (reflect.ValueOf(c).Kind() == reflect.Ptr && reflect.ValueOf(c).IsNil()) { - timer := time.NewTimer(defaultTestTimeout) - select { - case <-timer.C: + update, err := gotUpdateCh.Receive() + if err == testutils.ErrRecvTimeout { return - case <-gotUpdateCh: - t.Fatal("Unexpected update") } + t.Fatalf("Unexpected update: +%v", update) } wantUpdate := reflect.ValueOf(test.wantUpdate).Elem().Interface() - timer := time.NewTimer(defaultTestTimeout) - select { - case <-timer.C: - t.Fatal("Timeout expecting RDS update") - case gotUpdate := <-gotUpdateCh: - timer.Stop() - if diff := cmp.Diff(gotUpdate, wantUpdate, cmp.AllowUnexported(rdsUpdate{}, ldsUpdate{}, CDSUpdate{}, EDSUpdate{})); diff != "" { - t.Fatalf("got update : %+v, want %+v, diff: %s", gotUpdate, wantUpdate, diff) - } + uErr, err := gotUpdateCh.Receive() + if err == testutils.ErrRecvTimeout { + t.Fatal("Timeout expecting xDS update") } - // Since the callback that we registered pushes to both channels at - // the same time, this channel read should return immediately. - gotUpdateErr := <-gotUpdateErrCh + gotUpdate := uErr.(updateErr).u + opt := cmp.AllowUnexported(rdsUpdate{}, ldsUpdate{}, CDSUpdate{}, EDSUpdate{}) + if diff := cmp.Diff(gotUpdate, wantUpdate, opt); diff != "" { + t.Fatalf("got update : %+v, want %+v, diff: %s", gotUpdate, wantUpdate, diff) + } + gotUpdateErr := uErr.(updateErr).err if (gotUpdateErr != nil) != test.wantUpdateErr { - t.Fatalf("got RDS update error {%v}, wantErr: %v", gotUpdateErr, test.wantUpdateErr) + t.Fatalf("got xDS update error {%v}, wantErr: %v", gotUpdateErr, test.wantUpdateErr) + } +} + +// startServerAndGetCC starts a fake XDS server and also returns a ClientConn +// connected to it. +func startServerAndGetCC(t *testing.T) (*fakeserver.Server, *grpc.ClientConn, func()) { + t.Helper() + + fs, sCleanup, err := fakeserver.StartServer() + if err != nil { + t.Fatalf("Failed to start fake xDS server: %v", err) + } + + cc, ccCleanup, err := fs.XDSClientConn() + if err != nil { + sCleanup() + t.Fatalf("Failed to get a clientConn to the fake xDS server: %v", err) + } + return fs, cc, func() { + sCleanup() + ccCleanup() + } +} + +// waitForNilErr waits for a nil error value to be received on the +// provided channel. +func waitForNilErr(t *testing.T, ch *testutils.Channel) { + t.Helper() + + val, err := ch.Receive() + if err == testutils.ErrRecvTimeout { + t.Fatalf("Timeout expired when expecting update") + } + if val != nil { + if cbErr := val.(error); cbErr != nil { + t.Fatal(cbErr) + } } } diff --git a/xds/internal/client/v2client_ack_test.go b/xds/internal/client/v2client_ack_test.go index 1e73dd529..1896cb1b8 100644 --- a/xds/internal/client/v2client_ack_test.go +++ b/xds/internal/client/v2client_ack_test.go @@ -27,60 +27,121 @@ import ( "github.com/golang/protobuf/proto" anypb "github.com/golang/protobuf/ptypes/any" "github.com/google/go-cmp/cmp" - "google.golang.org/grpc/xds/internal/client/fakexds" + "google.golang.org/grpc/xds/internal/testutils" + "google.golang.org/grpc/xds/internal/testutils/fakeserver" ) -func emptyChanRecvWithTimeout(ch <-chan struct{}, d time.Duration) error { - timer := time.NewTimer(d) - select { - case <-timer.C: - return fmt.Errorf("timeout") - case <-ch: - timer.Stop() - return nil - } -} - -func requestChanRecvWithTimeout(ch <-chan *fakexds.Request, d time.Duration) (*fakexds.Request, error) { - timer := time.NewTimer(d) - select { - case <-timer.C: - return nil, fmt.Errorf("timeout waiting for request") - case r := <-ch: - timer.Stop() - return r, nil - } -} - // compareXDSRequest reads requests from channel, compare it with want. -func compareXDSRequest(ch <-chan *fakexds.Request, d time.Duration, want *xdspb.DiscoveryRequest, version, nonce string) error { - r, err := requestChanRecvWithTimeout(ch, d) +func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, version, nonce string) error { + val, err := ch.Receive() if err != nil { return err } - if r.Err != nil { - return fmt.Errorf("unexpected error from request: %v", r.Err) + req := val.(*fakeserver.Request) + if req.Err != nil { + return fmt.Errorf("unexpected error from request: %v", req.Err) } wantClone := proto.Clone(want).(*xdspb.DiscoveryRequest) wantClone.VersionInfo = version wantClone.ResponseNonce = nonce - if !cmp.Equal(r.Req, wantClone, cmp.Comparer(proto.Equal)) { - return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(r.Req, wantClone)) + if !cmp.Equal(req.Req, wantClone, cmp.Comparer(proto.Equal)) { + return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(req.Req, wantClone)) } return nil } -func sendXDSRespWithVersion(ch chan<- *fakexds.Response, respWithoutVersion *xdspb.DiscoveryResponse, version int) (nonce string) { +func sendXDSRespWithVersion(ch chan<- *fakeserver.Response, respWithoutVersion *xdspb.DiscoveryResponse, version int) (nonce string) { respToSend := proto.Clone(respWithoutVersion).(*xdspb.DiscoveryResponse) respToSend.VersionInfo = strconv.Itoa(version) nonce = strconv.Itoa(int(time.Now().UnixNano())) respToSend.Nonce = nonce - ch <- &fakexds.Response{Resp: respToSend} + ch <- &fakeserver.Response{Resp: respToSend} return } -// TestV2ClientAck verifies that valid responses are acked, and invalid ones are -// nacked. +// startXDS calls watch to send the first request. It then sends a good response +// and checks for ack. +func startXDS(t *testing.T, xdsname string, v2c *v2Client, reqChan *testutils.Channel, req *xdspb.DiscoveryRequest) *testutils.Channel { + callbackCh := testutils.NewChannel() + switch xdsname { + case "LDS": + v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) { + t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) + callbackCh.Send(struct{}{}) + }) + case "RDS": + v2c.watchRDS(goodRouteName1, func(u rdsUpdate, err error) { + t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) + callbackCh.Send(struct{}{}) + }) + case "CDS": + v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) { + t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) + callbackCh.Send(struct{}{}) + }) + case "EDS": + v2c.watchEDS(goodEDSName, func(u *EDSUpdate, err error) { + t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) + callbackCh.Send(struct{}{}) + }) + } + + if err := compareXDSRequest(reqChan, req, "", ""); err != nil { + t.Fatalf("Failed to receive %s request: %v", xdsname, err) + } + t.Logf("FakeServer received %s request...", xdsname) + return callbackCh +} + +// sendGoodResp sends the good response, with the given version, and a random +// nonce. +// +// It also waits and checks that the ack request contains the given version, and +// the generated nonce. +func sendGoodResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, version int, goodResp *xdspb.DiscoveryResponse, wantReq *xdspb.DiscoveryRequest, callbackCh *testutils.Channel) { + nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodResp, version) + t.Logf("Good %s response pushed to fakeServer...", xdsname) + + if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(version), nonce); err != nil { + t.Errorf("Failed to receive %s request: %v", xdsname, err) + } + t.Logf("Good %s response acked", xdsname) + + if _, err := callbackCh.Receive(); err != nil { + t.Errorf("Timeout when expecting %s update", xdsname) + } + t.Logf("Good %s response callback executed", xdsname) +} + +// sendBadResp sends a bad response with the given version. This response will +// be nacked, so we expect a request with the previous version (version-1). +// +// But the nonce in request should be the new nonce. +func sendBadResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, version int, wantReq *xdspb.DiscoveryRequest) { + var typeURL string + switch xdsname { + case "LDS": + typeURL = ldsURL + case "RDS": + typeURL = rdsURL + case "CDS": + typeURL = cdsURL + case "EDS": + typeURL = edsURL + } + nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{{}}, + TypeUrl: typeURL, + }, version) + t.Logf("Bad %s response pushed to fakeServer...", xdsname) + if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(version-1), nonce); err != nil { + t.Errorf("Failed to receive %s request: %v", xdsname, err) + } + t.Logf("Bad %s response nacked", xdsname) +} + +// TestV2ClientAck verifies that valid responses are acked, and invalid ones +// are nacked. // // This test also verifies the version for different types are independent. func TestV2ClientAck(t *testing.T) { @@ -91,27 +152,23 @@ func TestV2ClientAck(t *testing.T) { versionEDS = 4000 ) - fakeServer, sCleanup := fakexds.StartServer(t) - client, cCleanup := fakeServer.GetClientConn(t) - defer func() { - cCleanup() - sCleanup() - }() - v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() t.Log("Started xds v2Client...") // Start the watch, send a good response, and check for ack. - cbLDS := startXDS(t, "LDS", v2c, fakeServer, goodLDSRequest) + cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest) sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) versionLDS++ - cbRDS := startXDS(t, "RDS", v2c, fakeServer, goodRDSRequest) + cbRDS := startXDS(t, "RDS", v2c, fakeServer.XDSRequestChan, goodRDSRequest) sendGoodResp(t, "RDS", fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS) versionRDS++ - cbCDS := startXDS(t, "CDS", v2c, fakeServer, goodCDSRequest) + cbCDS := startXDS(t, "CDS", v2c, fakeServer.XDSRequestChan, goodCDSRequest) sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS) versionCDS++ - cbEDS := startXDS(t, "EDS", v2c, fakeServer, goodEDSRequest) + cbEDS := startXDS(t, "EDS", v2c, fakeServer.XDSRequestChan, goodEDSRequest) sendGoodResp(t, "EDS", fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS) versionEDS++ @@ -136,105 +193,21 @@ func TestV2ClientAck(t *testing.T) { versionEDS++ } -// startXDS calls watch to send the first request. It then sends a good response -// and checks for ack. -func startXDS(t *testing.T, xdsname string, v2c *v2Client, fakeServer *fakexds.Server, goodReq *xdspb.DiscoveryRequest) <-chan struct{} { - callbackCh := make(chan struct{}, 1) - switch xdsname { - case "LDS": - v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) { - t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) - callbackCh <- struct{}{} - }) - case "RDS": - v2c.watchRDS(goodRouteName1, func(u rdsUpdate, err error) { - t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) - callbackCh <- struct{}{} - }) - case "CDS": - v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) { - t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) - callbackCh <- struct{}{} - }) - case "EDS": - v2c.watchEDS(goodEDSName, func(u *EDSUpdate, err error) { - t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) - callbackCh <- struct{}{} - }) - } - - if err := compareXDSRequest(fakeServer.RequestChan, defaultTestTimeout, goodReq, "", ""); err != nil { - t.Fatalf("Failed to receive %s request: %v", xdsname, err) - } - t.Logf("FakeServer received %s request...", xdsname) - return callbackCh -} - -// sendGoodResp sends the good response, with the given version, and a random -// nonce. -// -// It also waits and checks that the ack request contains the given version, and -// the generated nonce. -func sendGoodResp(t *testing.T, xdsname string, fakeServer *fakexds.Server, version int, goodResp *xdspb.DiscoveryResponse, wantReq *xdspb.DiscoveryRequest, callbackCh <-chan struct{}) { - nonce := sendXDSRespWithVersion(fakeServer.ResponseChan, goodResp, version) - t.Logf("Good %s response pushed to fakeServer...", xdsname) - - if err := compareXDSRequest(fakeServer.RequestChan, defaultTestTimeout, wantReq, strconv.Itoa(version), nonce); err != nil { - t.Errorf("Failed to receive %s request: %v", xdsname, err) - } - t.Logf("Good %s response acked", xdsname) - if err := emptyChanRecvWithTimeout(callbackCh, defaultTestTimeout); err != nil { - t.Errorf("Timeout when expecting %s update", xdsname) - } - t.Logf("Good %s response callback executed", xdsname) -} - -// sendBadResp sends a bad response with the given version. This response will -// be nacked, so we expect a request with the previous version (version-1). -// -// But the nonce in request should be the new nonce. -func sendBadResp(t *testing.T, xdsname string, fakeServer *fakexds.Server, version int, wantReq *xdspb.DiscoveryRequest) { - var typeURL string - switch xdsname { - case "LDS": - typeURL = ldsURL - case "RDS": - typeURL = rdsURL - case "CDS": - typeURL = cdsURL - case "EDS": - typeURL = edsURL - } - nonce := sendXDSRespWithVersion(fakeServer.ResponseChan, &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{{}}, - TypeUrl: typeURL, - }, version) - t.Logf("Bad %s response pushed to fakeServer...", xdsname) - if err := compareXDSRequest(fakeServer.RequestChan, defaultTestTimeout, wantReq, strconv.Itoa(version-1), nonce); err != nil { - t.Errorf("Failed to receive %s request: %v", xdsname, err) - } - t.Logf("Bad %s response nacked", xdsname) -} - // Test when the first response is invalid, and is nacked, the nack requests // should have an empty version string. func TestV2ClientAckFirstIsNack(t *testing.T) { var versionLDS = 1000 - fakeServer, sCleanup := fakexds.StartServer(t) - client, cCleanup := fakeServer.GetClientConn(t) - defer func() { - cCleanup() - sCleanup() - }() - v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() t.Log("Started xds v2Client...") // Start the watch, send a good response, and check for ack. - cbLDS := startXDS(t, "LDS", v2c, fakeServer, goodLDSRequest) + cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest) - nonce := sendXDSRespWithVersion(fakeServer.ResponseChan, &xdspb.DiscoveryResponse{ + nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{{}}, TypeUrl: ldsURL, }, versionLDS) @@ -242,7 +215,7 @@ func TestV2ClientAckFirstIsNack(t *testing.T) { // The expected version string is an empty string, because this is the first // response, and it's nacked (so there's no previous ack version). - if err := compareXDSRequest(fakeServer.RequestChan, defaultTestTimeout, goodLDSRequest, "", nonce); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, "", nonce); err != nil { t.Errorf("Failed to receive request: %v", err) } t.Logf("Bad response nacked") @@ -257,33 +230,29 @@ func TestV2ClientAckFirstIsNack(t *testing.T) { func TestV2ClientAckNackAfterNewWatch(t *testing.T) { var versionLDS = 1000 - fakeServer, sCleanup := fakexds.StartServer(t) - client, cCleanup := fakeServer.GetClientConn(t) - defer func() { - cCleanup() - sCleanup() - }() - v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() t.Log("Started xds v2Client...") // Start the watch, send a good response, and check for ack. - cbLDS := startXDS(t, "LDS", v2c, fakeServer, goodLDSRequest) + cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest) sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) versionLDS++ // Start a new watch. - cbLDS = startXDS(t, "LDS", v2c, fakeServer, goodLDSRequest) + cbLDS = startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest) // This is an invalid response after the new watch. - nonce := sendXDSRespWithVersion(fakeServer.ResponseChan, &xdspb.DiscoveryResponse{ + nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{{}}, TypeUrl: ldsURL, }, versionLDS) t.Logf("Bad response pushed to fakeServer...") // The expected version string is the previous acked version. - if err := compareXDSRequest(fakeServer.RequestChan, defaultTestTimeout, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce); err != nil { t.Errorf("Failed to receive request: %v", err) } t.Logf("Bad response nacked") diff --git a/xds/internal/client/v2client_test.go b/xds/internal/client/v2client_test.go index a844f6888..a7378f333 100644 --- a/xds/internal/client/v2client_test.go +++ b/xds/internal/client/v2client_test.go @@ -24,7 +24,8 @@ import ( "time" "github.com/golang/protobuf/proto" - "google.golang.org/grpc/xds/internal/client/fakexds" + "google.golang.org/grpc/xds/internal/testutils" + "google.golang.org/grpc/xds/internal/testutils/fakeserver" xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" basepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" @@ -36,7 +37,7 @@ import ( ) const ( - defaultTestTimeout = 2 * time.Second + defaultTestTimeout = 1 * time.Second goodLDSTarget1 = "lds.target.good:1111" goodLDSTarget2 = "lds.target.good:2222" goodRouteName1 = "GoodRouteConfig1" @@ -390,12 +391,8 @@ var ( // TestV2ClientBackoffAfterRecvError verifies if the v2Client backoffs when it // encounters a Recv error while receiving an LDS response. func TestV2ClientBackoffAfterRecvError(t *testing.T) { - fakeServer, sCleanup := fakexds.StartServer(t) - client, cCleanup := fakeServer.GetClientConn(t) - defer func() { - cCleanup() - sCleanup() - }() + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() // Override the v2Client backoff function with this, so that we can verify // that a backoff actually was triggerred. @@ -405,7 +402,7 @@ func TestV2ClientBackoffAfterRecvError(t *testing.T) { return 0 } - v2c := newV2Client(client, goodNodeProto, clientBackoff) + v2c := newV2Client(cc, goodNodeProto, clientBackoff) defer v2c.close() t.Log("Started xds v2Client...") @@ -413,10 +410,12 @@ func TestV2ClientBackoffAfterRecvError(t *testing.T) { v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) { close(callbackCh) }) - <-fakeServer.RequestChan + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an LDS request") + } t.Log("FakeServer received request...") - fakeServer.ResponseChan <- &fakexds.Response{Err: errors.New("RPC error")} + fakeServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("RPC error")} t.Log("Bad LDS response pushed to fakeServer...") timer := time.NewTimer(defaultTestTimeout) @@ -435,95 +434,81 @@ func TestV2ClientBackoffAfterRecvError(t *testing.T) { // encountered a Recv() error, and is expected to send out xDS requests for // registered watchers once it comes back up again. func TestV2ClientRetriesAfterBrokenStream(t *testing.T) { - fakeServer, sCleanup := fakexds.StartServer(t) - client, cCleanup := fakeServer.GetClientConn(t) - defer func() { - cCleanup() - sCleanup() - }() - v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() t.Log("Started xds v2Client...") - callbackCh := make(chan struct{}, 1) + callbackCh := testutils.NewChannel() v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) { t.Logf("Received LDS callback with ldsUpdate {%+v} and error {%v}", u, err) - callbackCh <- struct{}{} + callbackCh.Send(struct{}{}) }) - <-fakeServer.RequestChan + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an LDS request") + } t.Log("FakeServer received request...") - fakeServer.ResponseChan <- &fakexds.Response{Resp: goodLDSResponse1} + fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} t.Log("Good LDS response pushed to fakeServer...") - timer := time.NewTimer(defaultTestTimeout) - select { - case <-timer.C: + if _, err := callbackCh.Receive(); err != nil { t.Fatal("Timeout when expecting LDS update") - case <-callbackCh: - timer.Stop() } - // Read the ack, so the next request is sent after stream re-creation. - <-fakeServer.RequestChan - fakeServer.ResponseChan <- &fakexds.Response{Err: errors.New("RPC error")} + // Read the ack, so the next request is sent after stream re-creation. + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an LDS ACK") + } + + fakeServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("RPC error")} t.Log("Bad LDS response pushed to fakeServer...") - timer = time.NewTimer(defaultTestTimeout) - select { - case <-timer.C: - t.Fatal("Timeout when expecting LDS update") - case gotRequest := <-fakeServer.RequestChan: - timer.Stop() - t.Log("received LDS request after stream re-creation") - if !proto.Equal(gotRequest.Req, goodLDSRequest) { - t.Fatalf("gotRequest: %+v, wantRequest: %+v", gotRequest.Req, goodLDSRequest) - } + val, err := fakeServer.XDSRequestChan.Receive() + if err == testutils.ErrRecvTimeout { + t.Fatalf("Timeout expired when expecting LDS update") + } + gotRequest := val.(*fakeserver.Request) + if !proto.Equal(gotRequest.Req, goodLDSRequest) { + t.Fatalf("gotRequest: %+v, wantRequest: %+v", gotRequest.Req, goodLDSRequest) } } // TestV2ClientCancelWatch verifies that the registered watch callback is not // invoked if a response is received after the watcher is cancelled. func TestV2ClientCancelWatch(t *testing.T) { - fakeServer, sCleanup := fakexds.StartServer(t) - client, cCleanup := fakeServer.GetClientConn(t) - defer func() { - cCleanup() - sCleanup() - }() - v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() t.Log("Started xds v2Client...") - callbackCh := make(chan struct{}, 1) + callbackCh := testutils.NewChannel() cancelFunc := v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) { t.Logf("Received LDS callback with ldsUpdate {%+v} and error {%v}", u, err) - callbackCh <- struct{}{} + callbackCh.Send(struct{}{}) }) - <-fakeServer.RequestChan + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an LDS request") + } t.Log("FakeServer received request...") - fakeServer.ResponseChan <- &fakexds.Response{Resp: goodLDSResponse1} + fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} t.Log("Good LDS response pushed to fakeServer...") - timer := time.NewTimer(defaultTestTimeout) - select { - case <-timer.C: + if _, err := callbackCh.Receive(); err != nil { t.Fatal("Timeout when expecting LDS update") - case <-callbackCh: - timer.Stop() } cancelFunc() - fakeServer.ResponseChan <- &fakexds.Response{Resp: goodLDSResponse1} + fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} t.Log("Another good LDS response pushed to fakeServer...") - timer = time.NewTimer(defaultTestTimeout) - select { - case <-timer.C: - case <-callbackCh: - timer.Stop() + if _, err := callbackCh.Receive(); err != testutils.ErrRecvTimeout { t.Fatalf("Watch callback invoked after the watcher was cancelled") } } diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index aa33d9328..c471b8049 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -34,7 +34,7 @@ import ( xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/client/bootstrap" "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/testutils/fakexds" + "google.golang.org/grpc/xds/internal/testutils/fakeclient" corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" ) @@ -98,7 +98,7 @@ func getXDSClientMakerFunc(wantOpts xdsclient.Options) func(xdsclient.Options) ( if len(gotOpts.DialOpts) != len(wantOpts.DialOpts) { return nil, fmt.Errorf("got len(DialOpts): %v, want: %v", len(gotOpts.DialOpts), len(wantOpts.DialOpts)) } - return fakexds.NewClient(), nil + return fakeclient.NewClient(), nil } } @@ -232,7 +232,7 @@ func testSetup(t *testing.T, opts setupOpts) (*xdsResolver, *testClientConn, fun // waitForWatchService waits for the WatchService method to be called on the // xdsClient within a reasonable amount of time, and also verifies that the // watch is called with the expected target. -func waitForWatchService(t *testing.T, xdsC *fakexds.Client, wantTarget string) { +func waitForWatchService(t *testing.T, xdsC *fakeclient.Client, wantTarget string) { t.Helper() gotTarget, err := xdsC.WaitForWatchService() @@ -247,7 +247,7 @@ func waitForWatchService(t *testing.T, xdsC *fakexds.Client, wantTarget string) // TestXDSResolverWatchCallbackAfterClose tests the case where a service update // from the underlying xdsClient is received after the resolver is closed. func TestXDSResolverWatchCallbackAfterClose(t *testing.T) { - xdsC := fakexds.NewClient() + xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ config: &validConfig, xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return xdsC, nil }, @@ -268,7 +268,7 @@ func TestXDSResolverWatchCallbackAfterClose(t *testing.T) { // TestXDSResolverBadServiceUpdate tests the case the xdsClient returns a bad // service update. func TestXDSResolverBadServiceUpdate(t *testing.T) { - xdsC := fakexds.NewClient() + xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ config: &validConfig, xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return xdsC, nil }, @@ -292,7 +292,7 @@ func TestXDSResolverBadServiceUpdate(t *testing.T) { // TestXDSResolverGoodServiceUpdate tests the happy case where the resolver // gets a good service update from the xdsClient. func TestXDSResolverGoodServiceUpdate(t *testing.T) { - xdsC := fakexds.NewClient() + xdsC := fakeclient.NewClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ config: &validConfig, xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return xdsC, nil }, diff --git a/xds/internal/testutils/fakexds/client.go b/xds/internal/testutils/fakeclient/client.go similarity index 98% rename from xds/internal/testutils/fakexds/client.go rename to xds/internal/testutils/fakeclient/client.go index d9c66a571..5ad6e128e 100644 --- a/xds/internal/testutils/fakexds/client.go +++ b/xds/internal/testutils/fakeclient/client.go @@ -16,9 +16,8 @@ * */ -// Package fakexds provides fake implementation of multiple types, for use in -// xds tests. -package fakexds +// Package fakeclient provides a fake implementation of an xDS client. +package fakeclient import ( "sync" diff --git a/xds/internal/testutils/fakexds/server.go b/xds/internal/testutils/fakeserver/server.go similarity index 86% rename from xds/internal/testutils/fakexds/server.go rename to xds/internal/testutils/fakeserver/server.go index 86380e820..3185386cc 100644 --- a/xds/internal/testutils/fakexds/server.go +++ b/xds/internal/testutils/fakeserver/server.go @@ -16,12 +16,15 @@ * */ -package fakexds +// Package fakeserver provides a fake implementation of an xDS server. +package fakeserver import ( + "context" "fmt" "io" "net" + "time" "github.com/golang/protobuf/proto" "google.golang.org/grpc" @@ -35,9 +38,12 @@ import ( lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" ) -// TODO: Make this a var or a field in the server if there is a need to use a -// value other than this default. -const defaultChannelBufferSize = 50 +const ( + // TODO: Make this a var or a field in the server if there is a need to use a + // value other than this default. + defaultChannelBufferSize = 50 + defaultDialTimeout = 5 * time.Second +) // Request wraps the request protobuf (xds/LRS) and error received by the // Server in a call to stream.Recv(). @@ -104,6 +110,18 @@ func StartServer() (*Server, func(), error) { return s, func() { server.Stop() }, nil } +// XDSClientConn returns a grpc.ClientConn connected to the fakeServer. +func (xdsS *Server) XDSClientConn() (*grpc.ClientConn, func(), error) { + ctx, cancel := context.WithTimeout(context.Background(), defaultDialTimeout) + defer cancel() + + cc, err := grpc.DialContext(ctx, xdsS.Address, grpc.WithInsecure(), grpc.WithBlock()) + if err != nil { + return nil, nil, fmt.Errorf("grpc.DialContext(%s) failed: %v", xdsS.Address, err) + } + return cc, func() { cc.Close() }, nil +} + type xdsServer struct { reqChan *testutils.Channel respChan chan *Response