mirror of https://github.com/grpc/grpc-go.git
				
				
				
			
		
			
				
	
	
		
			462 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			462 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Go
		
	
	
	
| /*
 | |
|  *
 | |
|  * Copyright 2021 gRPC authors.
 | |
|  *
 | |
|  * Licensed under the Apache License, Version 2.0 (the "License");
 | |
|  * you may not use this file except in compliance with the License.
 | |
|  * You may obtain a copy of the License at
 | |
|  *
 | |
|  *     http://www.apache.org/licenses/LICENSE-2.0
 | |
|  *
 | |
|  * Unless required by applicable law or agreed to in writing, software
 | |
|  * distributed under the License is distributed on an "AS IS" BASIS,
 | |
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|  * See the License for the specific language governing permissions and
 | |
|  * limitations under the License.
 | |
|  *
 | |
|  */
 | |
| 
 | |
| package csds_test
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"sort"
 | |
| 	"strings"
 | |
| 	"testing"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/google/go-cmp/cmp"
 | |
| 	"github.com/google/uuid"
 | |
| 	"google.golang.org/grpc"
 | |
| 	"google.golang.org/grpc/credentials/insecure"
 | |
| 	"google.golang.org/grpc/internal/grpctest"
 | |
| 	"google.golang.org/grpc/internal/testutils"
 | |
| 	"google.golang.org/grpc/internal/testutils/xds/bootstrap"
 | |
| 	"google.golang.org/grpc/internal/testutils/xds/e2e"
 | |
| 	"google.golang.org/grpc/xds/csds"
 | |
| 	"google.golang.org/grpc/xds/internal/xdsclient"
 | |
| 	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
 | |
| 	"google.golang.org/protobuf/encoding/prototext"
 | |
| 	"google.golang.org/protobuf/testing/protocmp"
 | |
| 	"google.golang.org/protobuf/types/known/anypb"
 | |
| 
 | |
| 	v3adminpb "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
 | |
| 	v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
 | |
| 	v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
 | |
| 	v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
 | |
| 	v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
 | |
| 	v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
 | |
| 	v3statuspbgrpc "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
 | |
| 
 | |
| 	_ "google.golang.org/grpc/xds/internal/httpfilter/router" // Register the router filter
 | |
| )
 | |
| 
 | |
| const defaultTestTimeout = 5 * time.Second
 | |
| 
 | |
| var cmpOpts = cmp.Options{
 | |
| 	cmp.Transformer("sort", func(in []*v3statuspb.ClientConfig_GenericXdsConfig) []*v3statuspb.ClientConfig_GenericXdsConfig {
 | |
| 		out := append([]*v3statuspb.ClientConfig_GenericXdsConfig(nil), in...)
 | |
| 		sort.Slice(out, func(i, j int) bool {
 | |
| 			a, b := out[i], out[j]
 | |
| 			if a == nil {
 | |
| 				return true
 | |
| 			}
 | |
| 			if b == nil {
 | |
| 				return false
 | |
| 			}
 | |
| 			if strings.Compare(a.TypeUrl, b.TypeUrl) == 0 {
 | |
| 				return strings.Compare(a.Name, b.Name) < 0
 | |
| 			}
 | |
| 			return strings.Compare(a.TypeUrl, b.TypeUrl) < 0
 | |
| 		})
 | |
| 		return out
 | |
| 	}),
 | |
| 	protocmp.Transform(),
 | |
| 	protocmp.IgnoreFields((*v3statuspb.ClientConfig_GenericXdsConfig)(nil), "last_updated"),
 | |
| 	protocmp.IgnoreFields((*v3adminpb.UpdateFailureState)(nil), "last_update_attempt", "details"),
 | |
| }
 | |
| 
 | |
| type s struct {
 | |
| 	grpctest.Tester
 | |
| }
 | |
| 
 | |
| func Test(t *testing.T) {
 | |
| 	grpctest.RunSubTests(t, s{})
 | |
| }
 | |
| 
 | |
| // The following watcher implementations are no-ops since we don't really care
 | |
| // about the callback received by these watchers in the test. We only care
 | |
| // whether CSDS reports the expected state.
 | |
| 
 | |
| type unimplementedListenerWatcher struct{}
 | |
| 
 | |
| func (unimplementedListenerWatcher) OnUpdate(*xdsresource.ListenerResourceData) {}
 | |
| func (unimplementedListenerWatcher) OnError(error)                              {}
 | |
| func (unimplementedListenerWatcher) OnResourceDoesNotExist()                    {}
 | |
| 
 | |
| type unimplementedRouteConfigWatcher struct{}
 | |
| 
 | |
| func (unimplementedRouteConfigWatcher) OnUpdate(*xdsresource.RouteConfigResourceData) {}
 | |
| func (unimplementedRouteConfigWatcher) OnError(error)                                 {}
 | |
| func (unimplementedRouteConfigWatcher) OnResourceDoesNotExist()                       {}
 | |
| 
 | |
| type unimplementedClusterWatcher struct{}
 | |
| 
 | |
| func (unimplementedClusterWatcher) OnUpdate(*xdsresource.ClusterResourceData) {}
 | |
| func (unimplementedClusterWatcher) OnError(error)                             {}
 | |
| func (unimplementedClusterWatcher) OnResourceDoesNotExist()                   {}
 | |
| 
 | |
| type unimplementedEndpointsWatcher struct{}
 | |
| 
 | |
| func (unimplementedEndpointsWatcher) OnUpdate(*xdsresource.EndpointsResourceData) {}
 | |
| func (unimplementedEndpointsWatcher) OnError(error)                               {}
 | |
| func (unimplementedEndpointsWatcher) OnResourceDoesNotExist()                     {}
 | |
| 
 | |
| func (s) TestCSDS(t *testing.T) {
 | |
| 	// Spin up a xDS management server on a local port.
 | |
| 	nodeID := uuid.New().String()
 | |
| 	mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
 | |
| 	if err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	defer mgmtServer.Stop()
 | |
| 
 | |
| 	// Create a bootstrap file in a temporary directory.
 | |
| 	bootstrapCleanup, err := bootstrap.CreateFile(bootstrap.Options{
 | |
| 		NodeID:    nodeID,
 | |
| 		ServerURI: mgmtServer.Address,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	defer bootstrapCleanup()
 | |
| 
 | |
| 	// Create an xDS client. This will end up using the same singleton as used
 | |
| 	// by the CSDS service.
 | |
| 	xdsC, close, err := xdsclient.New()
 | |
| 	if err != nil {
 | |
| 		t.Fatalf("Failed to create xDS client: %v", err)
 | |
| 	}
 | |
| 	defer close()
 | |
| 
 | |
| 	// Initialize an gRPC server and register CSDS on it.
 | |
| 	server := grpc.NewServer()
 | |
| 	csdss, err := csds.NewClientStatusDiscoveryServer()
 | |
| 	if err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	v3statuspbgrpc.RegisterClientStatusDiscoveryServiceServer(server, csdss)
 | |
| 	defer func() {
 | |
| 		server.Stop()
 | |
| 		csdss.Close()
 | |
| 	}()
 | |
| 
 | |
| 	// Create a local listener and pass it to Serve().
 | |
| 	lis, err := testutils.LocalTCPListener()
 | |
| 	if err != nil {
 | |
| 		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
 | |
| 	}
 | |
| 	go func() {
 | |
| 		if err := server.Serve(lis); err != nil {
 | |
| 			t.Errorf("Serve() failed: %v", err)
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	// Create a client to the CSDS server.
 | |
| 	conn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
 | |
| 	if err != nil {
 | |
| 		t.Fatalf("Failed to dial CSDS server %q: %v", lis.Addr().String(), err)
 | |
| 	}
 | |
| 	c := v3statuspbgrpc.NewClientStatusDiscoveryServiceClient(conn)
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 	defer cancel()
 | |
| 	stream, err := c.StreamClientStatus(ctx, grpc.WaitForReady(true))
 | |
| 	if err != nil {
 | |
| 		t.Fatalf("Failed to create a stream for CSDS: %v", err)
 | |
| 	}
 | |
| 	defer conn.Close()
 | |
| 
 | |
| 	// Verify that the xDS client reports an empty config.
 | |
| 	if err := checkClientStatusResponse(stream, nil); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 
 | |
| 	// Initialize the xDS resources to be used in this test.
 | |
| 	ldsTargets := []string{"lds.target.good:0000", "lds.target.good:1111"}
 | |
| 	rdsTargets := []string{"route-config-0", "route-config-1"}
 | |
| 	cdsTargets := []string{"cluster-0", "cluster-1"}
 | |
| 	edsTargets := []string{"endpoints-0", "endpoints-1"}
 | |
| 	listeners := make([]*v3listenerpb.Listener, len(ldsTargets))
 | |
| 	listenerAnys := make([]*anypb.Any, len(ldsTargets))
 | |
| 	for i := range ldsTargets {
 | |
| 		listeners[i] = e2e.DefaultClientListener(ldsTargets[i], rdsTargets[i])
 | |
| 		listenerAnys[i] = testutils.MarshalAny(t, listeners[i])
 | |
| 	}
 | |
| 	routes := make([]*v3routepb.RouteConfiguration, len(rdsTargets))
 | |
| 	routeAnys := make([]*anypb.Any, len(rdsTargets))
 | |
| 	for i := range rdsTargets {
 | |
| 		routes[i] = e2e.DefaultRouteConfig(rdsTargets[i], ldsTargets[i], cdsTargets[i])
 | |
| 		routeAnys[i] = testutils.MarshalAny(t, routes[i])
 | |
| 	}
 | |
| 	clusters := make([]*v3clusterpb.Cluster, len(cdsTargets))
 | |
| 	clusterAnys := make([]*anypb.Any, len(cdsTargets))
 | |
| 	for i := range cdsTargets {
 | |
| 		clusters[i] = e2e.DefaultCluster(cdsTargets[i], edsTargets[i], e2e.SecurityLevelNone)
 | |
| 		clusterAnys[i] = testutils.MarshalAny(t, clusters[i])
 | |
| 	}
 | |
| 	endpoints := make([]*v3endpointpb.ClusterLoadAssignment, len(edsTargets))
 | |
| 	endpointAnys := make([]*anypb.Any, len(edsTargets))
 | |
| 	ips := []string{"0.0.0.0", "1.1.1.1"}
 | |
| 	ports := []uint32{123, 456}
 | |
| 	for i := range edsTargets {
 | |
| 		endpoints[i] = e2e.DefaultEndpoint(edsTargets[i], ips[i], ports[i:i+1])
 | |
| 		endpointAnys[i] = testutils.MarshalAny(t, endpoints[i])
 | |
| 	}
 | |
| 
 | |
| 	// Register watches on the xDS client for two resources of each type.
 | |
| 	for _, target := range ldsTargets {
 | |
| 		xdsresource.WatchListener(xdsC, target, unimplementedListenerWatcher{})
 | |
| 	}
 | |
| 	for _, target := range rdsTargets {
 | |
| 		xdsresource.WatchRouteConfig(xdsC, target, unimplementedRouteConfigWatcher{})
 | |
| 	}
 | |
| 	for _, target := range cdsTargets {
 | |
| 		xdsresource.WatchCluster(xdsC, target, unimplementedClusterWatcher{})
 | |
| 	}
 | |
| 	for _, target := range edsTargets {
 | |
| 		xdsresource.WatchEndpoints(xdsC, target, unimplementedEndpointsWatcher{})
 | |
| 	}
 | |
| 
 | |
| 	// Verify that the xDS client reports the resources as being in "Requested"
 | |
| 	// state.
 | |
| 	want := []*v3statuspb.ClientConfig_GenericXdsConfig{}
 | |
| 	for i := range ldsTargets {
 | |
| 		want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil))
 | |
| 	}
 | |
| 	for i := range rdsTargets {
 | |
| 		want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil))
 | |
| 	}
 | |
| 	for i := range cdsTargets {
 | |
| 		want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil))
 | |
| 	}
 | |
| 	for i := range edsTargets {
 | |
| 		want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil))
 | |
| 	}
 | |
| 	for {
 | |
| 		if err := ctx.Err(); err != nil {
 | |
| 			t.Fatalf("Timeout when waiting for resources in \"Requested\" state: %v", err)
 | |
| 		}
 | |
| 		if err := checkClientStatusResponse(stream, want); err == nil {
 | |
| 			break
 | |
| 		}
 | |
| 		time.Sleep(time.Millisecond * 100)
 | |
| 	}
 | |
| 
 | |
| 	// Configure the management server with two resources of each type,
 | |
| 	// corresponding to the watches registered above.
 | |
| 	if err := mgmtServer.Update(ctx, e2e.UpdateOptions{
 | |
| 		NodeID:    nodeID,
 | |
| 		Listeners: listeners,
 | |
| 		Routes:    routes,
 | |
| 		Clusters:  clusters,
 | |
| 		Endpoints: endpoints,
 | |
| 	}); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 
 | |
| 	// Verify that the xDS client reports the resources as being in "ACKed"
 | |
| 	// state, and in version "1".
 | |
| 	want = nil
 | |
| 	for i := range ldsTargets {
 | |
| 		want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[i]))
 | |
| 	}
 | |
| 	for i := range rdsTargets {
 | |
| 		want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, routeAnys[i]))
 | |
| 	}
 | |
| 	for i := range cdsTargets {
 | |
| 		want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[i]))
 | |
| 	}
 | |
| 	for i := range edsTargets {
 | |
| 		want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[i]))
 | |
| 	}
 | |
| 	for {
 | |
| 		if err := ctx.Err(); err != nil {
 | |
| 			t.Fatalf("Timeout when waiting for resources in \"ACKed\" state: %v", err)
 | |
| 		}
 | |
| 		err := checkClientStatusResponse(stream, want)
 | |
| 		if err == nil {
 | |
| 			break
 | |
| 		}
 | |
| 		time.Sleep(time.Millisecond * 100)
 | |
| 	}
 | |
| 
 | |
| 	// Update the first resource of each type in the management server to a
 | |
| 	// value which is expected to be NACK'ed by the xDS client.
 | |
| 	const nackResourceIdx = 0
 | |
| 	listeners[nackResourceIdx].ApiListener = &v3listenerpb.ApiListener{}
 | |
| 	routes[nackResourceIdx].VirtualHosts = []*v3routepb.VirtualHost{{Routes: []*v3routepb.Route{{}}}}
 | |
| 	clusters[nackResourceIdx].ClusterDiscoveryType = &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC}
 | |
| 	endpoints[nackResourceIdx].Endpoints = []*v3endpointpb.LocalityLbEndpoints{{}}
 | |
| 	if err := mgmtServer.Update(ctx, e2e.UpdateOptions{
 | |
| 		NodeID:         nodeID,
 | |
| 		Listeners:      listeners,
 | |
| 		Routes:         routes,
 | |
| 		Clusters:       clusters,
 | |
| 		Endpoints:      endpoints,
 | |
| 		SkipValidation: true,
 | |
| 	}); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 
 | |
| 	// Verify that the xDS client reports the first resource of each type as
 | |
| 	// being in "NACKed" state, and the second resource of each type to be in
 | |
| 	// "ACKed" state. The version for the ACKed resource would be "2", while
 | |
| 	// that for the NACKed resource would be "1". In the NACKed resource, the
 | |
| 	// version which is NACKed is stored in the ErrorState field.
 | |
| 	want = nil
 | |
| 	for i := range ldsTargets {
 | |
| 		config := makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[i])
 | |
| 		if i == nackResourceIdx {
 | |
| 			config.VersionInfo = "1"
 | |
| 			config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
 | |
| 			config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"}
 | |
| 		}
 | |
| 		want = append(want, config)
 | |
| 	}
 | |
| 	for i := range rdsTargets {
 | |
| 		config := makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, routeAnys[i])
 | |
| 		if i == nackResourceIdx {
 | |
| 			config.VersionInfo = "1"
 | |
| 			config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
 | |
| 			config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"}
 | |
| 		}
 | |
| 		want = append(want, config)
 | |
| 	}
 | |
| 	for i := range cdsTargets {
 | |
| 		config := makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[i])
 | |
| 		if i == nackResourceIdx {
 | |
| 			config.VersionInfo = "1"
 | |
| 			config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
 | |
| 			config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"}
 | |
| 		}
 | |
| 		want = append(want, config)
 | |
| 	}
 | |
| 	for i := range edsTargets {
 | |
| 		config := makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[i])
 | |
| 		if i == nackResourceIdx {
 | |
| 			config.VersionInfo = "1"
 | |
| 			config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
 | |
| 			config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"}
 | |
| 		}
 | |
| 		want = append(want, config)
 | |
| 	}
 | |
| 	for {
 | |
| 		if err := ctx.Err(); err != nil {
 | |
| 			t.Fatalf("Timeout when waiting for resources in \"NACKed\" state: %v", err)
 | |
| 		}
 | |
| 		err := checkClientStatusResponse(stream, want)
 | |
| 		if err == nil {
 | |
| 			break
 | |
| 		}
 | |
| 		time.Sleep(time.Millisecond * 100)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func makeGenericXdsConfig(typeURL, name, version string, status v3adminpb.ClientResourceStatus, config *anypb.Any) *v3statuspb.ClientConfig_GenericXdsConfig {
 | |
| 	return &v3statuspb.ClientConfig_GenericXdsConfig{
 | |
| 		TypeUrl:      typeURL,
 | |
| 		Name:         name,
 | |
| 		VersionInfo:  version,
 | |
| 		ClientStatus: status,
 | |
| 		XdsConfig:    config,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func checkClientStatusResponse(stream v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, want []*v3statuspb.ClientConfig_GenericXdsConfig) error {
 | |
| 	if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
 | |
| 		if err != io.EOF {
 | |
| 			return fmt.Errorf("failed to send ClientStatusRequest: %v", err)
 | |
| 		}
 | |
| 		// If the stream has closed, we call Recv() until it returns a non-nil
 | |
| 		// error to get the actual error on the stream.
 | |
| 		for {
 | |
| 			if _, err := stream.Recv(); err != nil {
 | |
| 				return fmt.Errorf("failed to recv ClientStatusResponse: %v", err)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	resp, err := stream.Recv()
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to recv ClientStatusResponse: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	if n := len(resp.Config); n != 1 {
 | |
| 		return fmt.Errorf("got %d configs, want 1: %v", n, prototext.Format(resp))
 | |
| 	}
 | |
| 
 | |
| 	if diff := cmp.Diff(resp.Config[0].GenericXdsConfigs, want, cmpOpts); diff != "" {
 | |
| 		return fmt.Errorf(diff)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (s) TestCSDSNoXDSClient(t *testing.T) {
 | |
| 	// Create a bootstrap file in a temporary directory. Since we pass empty
 | |
| 	// options, it would end up creating a bootstrap file with an empty
 | |
| 	// serverURI which will fail xDS client creation.
 | |
| 	bootstrapCleanup, err := bootstrap.CreateFile(bootstrap.Options{})
 | |
| 	if err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	t.Cleanup(func() { bootstrapCleanup() })
 | |
| 
 | |
| 	// Initialize an gRPC server and register CSDS on it.
 | |
| 	server := grpc.NewServer()
 | |
| 	csdss, err := csds.NewClientStatusDiscoveryServer()
 | |
| 	if err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	defer csdss.Close()
 | |
| 	v3statuspbgrpc.RegisterClientStatusDiscoveryServiceServer(server, csdss)
 | |
| 
 | |
| 	// Create a local listener and pass it to Serve().
 | |
| 	lis, err := testutils.LocalTCPListener()
 | |
| 	if err != nil {
 | |
| 		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
 | |
| 	}
 | |
| 	go func() {
 | |
| 		if err := server.Serve(lis); err != nil {
 | |
| 			t.Errorf("Serve() failed: %v", err)
 | |
| 		}
 | |
| 	}()
 | |
| 	defer server.Stop()
 | |
| 
 | |
| 	// Create a client to the CSDS server.
 | |
| 	conn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
 | |
| 	if err != nil {
 | |
| 		t.Fatalf("Failed to dial CSDS server %q: %v", lis.Addr().String(), err)
 | |
| 	}
 | |
| 	defer conn.Close()
 | |
| 	c := v3statuspbgrpc.NewClientStatusDiscoveryServiceClient(conn)
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 	defer cancel()
 | |
| 	stream, err := c.StreamClientStatus(ctx, grpc.WaitForReady(true))
 | |
| 	if err != nil {
 | |
| 		t.Fatalf("Failed to create a stream for CSDS: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
 | |
| 		t.Fatalf("Failed to send ClientStatusRequest: %v", err)
 | |
| 	}
 | |
| 	r, err := stream.Recv()
 | |
| 	if err != nil {
 | |
| 		// io.EOF is not ok.
 | |
| 		t.Fatalf("Failed to recv ClientStatusResponse: %v", err)
 | |
| 	}
 | |
| 	if n := len(r.Config); n != 0 {
 | |
| 		t.Fatalf("got %d configs, want 0: %v", n, prototext.Format(r))
 | |
| 	}
 | |
| }
 |