mirror of https://github.com/grpc/grpc-go.git
607 lines
18 KiB
Go
607 lines
18 KiB
Go
/*
|
|
*
|
|
* Copyright 2021 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
package csds
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/google/go-cmp/cmp"
|
|
"github.com/google/uuid"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/grpc/internal/grpctest"
|
|
"google.golang.org/grpc/internal/testutils"
|
|
"google.golang.org/grpc/internal/testutils/xds/e2e"
|
|
"google.golang.org/grpc/internal/xds"
|
|
_ "google.golang.org/grpc/xds/internal/httpfilter/router"
|
|
"google.golang.org/grpc/xds/internal/xdsclient"
|
|
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
|
|
"google.golang.org/protobuf/testing/protocmp"
|
|
"google.golang.org/protobuf/types/known/anypb"
|
|
|
|
v3adminpb "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
|
|
v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
|
|
v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
|
|
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
|
|
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
|
|
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
|
|
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
|
|
v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
|
|
v3statuspbgrpc "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
|
|
)
|
|
|
|
const defaultTestTimeout = 10 * time.Second
|
|
|
|
var cmpOpts = cmp.Options{
|
|
cmp.Transformer("sort", func(in []*v3statuspb.ClientConfig_GenericXdsConfig) []*v3statuspb.ClientConfig_GenericXdsConfig {
|
|
out := append([]*v3statuspb.ClientConfig_GenericXdsConfig(nil), in...)
|
|
sort.Slice(out, func(i, j int) bool {
|
|
a, b := out[i], out[j]
|
|
if a == nil {
|
|
return true
|
|
}
|
|
if b == nil {
|
|
return false
|
|
}
|
|
if strings.Compare(a.TypeUrl, b.TypeUrl) == 0 {
|
|
return strings.Compare(a.Name, b.Name) < 0
|
|
}
|
|
return strings.Compare(a.TypeUrl, b.TypeUrl) < 0
|
|
})
|
|
return out
|
|
}),
|
|
protocmp.Transform(),
|
|
}
|
|
|
|
// filterFields clears unimportant fields in the proto messages.
|
|
//
|
|
// protocmp.IgnoreFields() doesn't work on nil messages (it panics).
|
|
func filterFields(ms []*v3statuspb.ClientConfig_GenericXdsConfig) []*v3statuspb.ClientConfig_GenericXdsConfig {
|
|
out := append([]*v3statuspb.ClientConfig_GenericXdsConfig{}, ms...)
|
|
for _, m := range out {
|
|
if m == nil {
|
|
continue
|
|
}
|
|
m.LastUpdated = nil
|
|
if m.ErrorState != nil {
|
|
m.ErrorState.Details = "blahblah"
|
|
m.ErrorState.LastUpdateAttempt = nil
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
var (
|
|
ldsTargets = []string{"lds.target.good:0000", "lds.target.good:1111"}
|
|
listeners = make([]*v3listenerpb.Listener, len(ldsTargets))
|
|
listenerAnys = make([]*anypb.Any, len(ldsTargets))
|
|
|
|
rdsTargets = []string{"route-config-0", "route-config-1"}
|
|
routes = make([]*v3routepb.RouteConfiguration, len(rdsTargets))
|
|
routeAnys = make([]*anypb.Any, len(rdsTargets))
|
|
|
|
cdsTargets = []string{"cluster-0", "cluster-1"}
|
|
clusters = make([]*v3clusterpb.Cluster, len(cdsTargets))
|
|
clusterAnys = make([]*anypb.Any, len(cdsTargets))
|
|
|
|
edsTargets = []string{"endpoints-0", "endpoints-1"}
|
|
endpoints = make([]*v3endpointpb.ClusterLoadAssignment, len(edsTargets))
|
|
endpointAnys = make([]*anypb.Any, len(edsTargets))
|
|
ips = []string{"0.0.0.0", "1.1.1.1"}
|
|
ports = []uint32{123, 456}
|
|
)
|
|
|
|
type s struct {
|
|
grpctest.Tester
|
|
}
|
|
|
|
func Test(t *testing.T) {
|
|
grpctest.RunSubTests(t, s{})
|
|
}
|
|
|
|
func init() {
|
|
for i := range ldsTargets {
|
|
listeners[i] = e2e.DefaultClientListener(ldsTargets[i], rdsTargets[i])
|
|
listenerAnys[i] = testutils.MarshalAny(listeners[i])
|
|
}
|
|
for i := range rdsTargets {
|
|
routes[i] = e2e.DefaultRouteConfig(rdsTargets[i], ldsTargets[i], cdsTargets[i])
|
|
routeAnys[i] = testutils.MarshalAny(routes[i])
|
|
}
|
|
for i := range cdsTargets {
|
|
clusters[i] = e2e.DefaultCluster(cdsTargets[i], edsTargets[i], e2e.SecurityLevelNone)
|
|
clusterAnys[i] = testutils.MarshalAny(clusters[i])
|
|
}
|
|
for i := range edsTargets {
|
|
endpoints[i] = e2e.DefaultEndpoint(edsTargets[i], ips[i], ports[i:i+1])
|
|
endpointAnys[i] = testutils.MarshalAny(endpoints[i])
|
|
}
|
|
}
|
|
|
|
func (s) TestCSDS(t *testing.T) {
|
|
const retryCount = 10
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
xdsC, mgmServer, nodeID, stream, cleanup := commonSetup(ctx, t)
|
|
defer cleanup()
|
|
|
|
for _, target := range ldsTargets {
|
|
xdsC.WatchListener(target, func(xdsresource.ListenerUpdate, error) {})
|
|
}
|
|
for _, target := range rdsTargets {
|
|
xdsC.WatchRouteConfig(target, func(xdsresource.RouteConfigUpdate, error) {})
|
|
}
|
|
for _, target := range cdsTargets {
|
|
xdsC.WatchCluster(target, func(xdsresource.ClusterUpdate, error) {})
|
|
}
|
|
for _, target := range edsTargets {
|
|
xdsC.WatchEndpoints(target, func(xdsresource.EndpointsUpdate, error) {})
|
|
}
|
|
|
|
for i := 0; i < retryCount; i++ {
|
|
err := checkForRequested(stream)
|
|
if err == nil {
|
|
break
|
|
}
|
|
if i == retryCount-1 {
|
|
t.Fatalf("%v", err)
|
|
}
|
|
time.Sleep(time.Millisecond * 100)
|
|
}
|
|
|
|
if err := mgmServer.Update(ctx, e2e.UpdateOptions{
|
|
NodeID: nodeID,
|
|
Listeners: listeners,
|
|
Routes: routes,
|
|
Clusters: clusters,
|
|
Endpoints: endpoints,
|
|
}); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
for i := 0; i < retryCount; i++ {
|
|
err := checkForACKed(stream)
|
|
if err == nil {
|
|
break
|
|
}
|
|
if i == retryCount-1 {
|
|
t.Fatalf("%v", err)
|
|
}
|
|
time.Sleep(time.Millisecond * 100)
|
|
}
|
|
|
|
const nackResourceIdx = 0
|
|
var (
|
|
nackListeners = append([]*v3listenerpb.Listener{}, listeners...)
|
|
nackRoutes = append([]*v3routepb.RouteConfiguration{}, routes...)
|
|
nackClusters = append([]*v3clusterpb.Cluster{}, clusters...)
|
|
nackEndpoints = append([]*v3endpointpb.ClusterLoadAssignment{}, endpoints...)
|
|
)
|
|
nackListeners[0] = &v3listenerpb.Listener{Name: ldsTargets[nackResourceIdx], ApiListener: &v3listenerpb.ApiListener{}} // 0 will be nacked. 1 will stay the same.
|
|
nackRoutes[0] = &v3routepb.RouteConfiguration{
|
|
Name: rdsTargets[nackResourceIdx], VirtualHosts: []*v3routepb.VirtualHost{{Routes: []*v3routepb.Route{{}}}},
|
|
}
|
|
nackClusters[0] = &v3clusterpb.Cluster{
|
|
Name: cdsTargets[nackResourceIdx], ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC},
|
|
}
|
|
nackEndpoints[0] = &v3endpointpb.ClusterLoadAssignment{
|
|
ClusterName: edsTargets[nackResourceIdx], Endpoints: []*v3endpointpb.LocalityLbEndpoints{{}},
|
|
}
|
|
if err := mgmServer.Update(ctx, e2e.UpdateOptions{
|
|
NodeID: nodeID,
|
|
Listeners: nackListeners,
|
|
Routes: nackRoutes,
|
|
Clusters: nackClusters,
|
|
Endpoints: nackEndpoints,
|
|
SkipValidation: true,
|
|
}); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
for i := 0; i < retryCount; i++ {
|
|
err := checkForNACKed(nackResourceIdx, stream)
|
|
if err == nil {
|
|
break
|
|
}
|
|
if i == retryCount-1 {
|
|
t.Fatalf("%v", err)
|
|
}
|
|
time.Sleep(time.Millisecond * 100)
|
|
}
|
|
}
|
|
|
|
func commonSetup(ctx context.Context, t *testing.T) (xdsclient.XDSClient, *e2e.ManagementServer, string, v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, func()) {
|
|
t.Helper()
|
|
|
|
// Spin up a xDS management server on a local port.
|
|
nodeID := uuid.New().String()
|
|
fs, err := e2e.StartManagementServer()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Create a bootstrap file in a temporary directory.
|
|
bootstrapCleanup, err := xds.SetupBootstrapFile(xds.BootstrapOptions{
|
|
Version: xds.TransportV3,
|
|
NodeID: nodeID,
|
|
ServerURI: fs.Address,
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Create xds_client.
|
|
xdsC, err := xdsclient.New()
|
|
if err != nil {
|
|
t.Fatalf("failed to create xds client: %v", err)
|
|
}
|
|
origNewXDSClient := newXDSClient
|
|
newXDSClient = func() xdsclient.XDSClient { return xdsC }
|
|
|
|
// Initialize an gRPC server and register CSDS on it.
|
|
server := grpc.NewServer()
|
|
csdss, err := NewClientStatusDiscoveryServer()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
v3statuspbgrpc.RegisterClientStatusDiscoveryServiceServer(server, csdss)
|
|
|
|
// Create a local listener and pass it to Serve().
|
|
lis, err := testutils.LocalTCPListener()
|
|
if err != nil {
|
|
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
|
|
}
|
|
go func() {
|
|
if err := server.Serve(lis); err != nil {
|
|
t.Errorf("Serve() failed: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Create CSDS client.
|
|
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
if err != nil {
|
|
t.Fatalf("cannot connect to server: %v", err)
|
|
}
|
|
c := v3statuspbgrpc.NewClientStatusDiscoveryServiceClient(conn)
|
|
stream, err := c.StreamClientStatus(ctx, grpc.WaitForReady(true))
|
|
if err != nil {
|
|
t.Fatalf("cannot get ServerReflectionInfo: %v", err)
|
|
}
|
|
|
|
return xdsC, fs, nodeID, stream, func() {
|
|
fs.Stop()
|
|
conn.Close()
|
|
server.Stop()
|
|
csdss.Close()
|
|
newXDSClient = origNewXDSClient
|
|
xdsC.Close()
|
|
bootstrapCleanup()
|
|
}
|
|
}
|
|
|
|
func checkForRequested(stream v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient) error {
|
|
if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
|
|
return fmt.Errorf("failed to send request: %v", err)
|
|
}
|
|
r, err := stream.Recv()
|
|
if err != nil {
|
|
// io.EOF is not ok.
|
|
return fmt.Errorf("failed to recv response: %v", err)
|
|
}
|
|
|
|
if n := len(r.Config); n != 1 {
|
|
return fmt.Errorf("got %d configs, want 1: %v", n, proto.MarshalTextString(r))
|
|
}
|
|
|
|
var want []*v3statuspb.ClientConfig_GenericXdsConfig
|
|
// Status is Requested, but version and xds config are all unset.
|
|
for i := range ldsTargets {
|
|
want = append(want, &v3statuspb.ClientConfig_GenericXdsConfig{
|
|
TypeUrl: listenerTypeURL, Name: ldsTargets[i], ClientStatus: v3adminpb.ClientResourceStatus_REQUESTED,
|
|
})
|
|
}
|
|
for i := range rdsTargets {
|
|
want = append(want, &v3statuspb.ClientConfig_GenericXdsConfig{
|
|
TypeUrl: routeConfigTypeURL, Name: rdsTargets[i], ClientStatus: v3adminpb.ClientResourceStatus_REQUESTED,
|
|
})
|
|
}
|
|
for i := range cdsTargets {
|
|
want = append(want, &v3statuspb.ClientConfig_GenericXdsConfig{
|
|
TypeUrl: clusterTypeURL, Name: cdsTargets[i], ClientStatus: v3adminpb.ClientResourceStatus_REQUESTED,
|
|
})
|
|
}
|
|
for i := range edsTargets {
|
|
want = append(want, &v3statuspb.ClientConfig_GenericXdsConfig{
|
|
TypeUrl: endpointsTypeURL, Name: edsTargets[i], ClientStatus: v3adminpb.ClientResourceStatus_REQUESTED,
|
|
})
|
|
}
|
|
if diff := cmp.Diff(filterFields(r.Config[0].GenericXdsConfigs), want, cmpOpts); diff != "" {
|
|
return fmt.Errorf(diff)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func checkForACKed(stream v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient) error {
|
|
const wantVersion = "1"
|
|
|
|
if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
|
|
return fmt.Errorf("failed to send: %v", err)
|
|
}
|
|
r, err := stream.Recv()
|
|
if err != nil {
|
|
// io.EOF is not ok.
|
|
return fmt.Errorf("failed to recv response: %v", err)
|
|
}
|
|
|
|
if n := len(r.Config); n != 1 {
|
|
return fmt.Errorf("got %d configs, want 1: %v", n, proto.MarshalTextString(r))
|
|
}
|
|
|
|
var want []*v3statuspb.ClientConfig_GenericXdsConfig
|
|
// Status is Acked, config is filled with the prebuilt Anys.
|
|
for i := range ldsTargets {
|
|
want = append(want, &v3statuspb.ClientConfig_GenericXdsConfig{
|
|
TypeUrl: listenerTypeURL,
|
|
Name: ldsTargets[i],
|
|
VersionInfo: wantVersion,
|
|
XdsConfig: listenerAnys[i],
|
|
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
|
|
})
|
|
}
|
|
for i := range rdsTargets {
|
|
want = append(want, &v3statuspb.ClientConfig_GenericXdsConfig{
|
|
TypeUrl: routeConfigTypeURL,
|
|
Name: rdsTargets[i],
|
|
VersionInfo: wantVersion,
|
|
XdsConfig: routeAnys[i],
|
|
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
|
|
})
|
|
}
|
|
for i := range cdsTargets {
|
|
want = append(want, &v3statuspb.ClientConfig_GenericXdsConfig{
|
|
TypeUrl: clusterTypeURL,
|
|
Name: cdsTargets[i],
|
|
VersionInfo: wantVersion,
|
|
XdsConfig: clusterAnys[i],
|
|
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
|
|
})
|
|
}
|
|
for i := range edsTargets {
|
|
want = append(want, &v3statuspb.ClientConfig_GenericXdsConfig{
|
|
TypeUrl: endpointsTypeURL,
|
|
Name: edsTargets[i],
|
|
VersionInfo: wantVersion,
|
|
XdsConfig: endpointAnys[i],
|
|
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
|
|
})
|
|
}
|
|
if diff := cmp.Diff(filterFields(r.Config[0].GenericXdsConfigs), want, cmpOpts); diff != "" {
|
|
return fmt.Errorf(diff)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient) error {
|
|
const (
|
|
ackVersion = "1"
|
|
nackVersion = "2"
|
|
)
|
|
if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
|
|
return fmt.Errorf("failed to send: %v", err)
|
|
}
|
|
r, err := stream.Recv()
|
|
if err != nil {
|
|
// io.EOF is not ok.
|
|
return fmt.Errorf("failed to recv response: %v", err)
|
|
}
|
|
|
|
if n := len(r.Config); n != 1 {
|
|
return fmt.Errorf("got %d configs, want 1: %v", n, proto.MarshalTextString(r))
|
|
}
|
|
|
|
var want []*v3statuspb.ClientConfig_GenericXdsConfig
|
|
// Resources with the nackIdx are NACKed.
|
|
for i := range ldsTargets {
|
|
config := &v3statuspb.ClientConfig_GenericXdsConfig{
|
|
TypeUrl: listenerTypeURL,
|
|
Name: ldsTargets[i],
|
|
VersionInfo: nackVersion,
|
|
XdsConfig: listenerAnys[i],
|
|
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
|
|
}
|
|
if i == nackResourceIdx {
|
|
config.VersionInfo = ackVersion
|
|
config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
|
|
config.ErrorState = &v3adminpb.UpdateFailureState{
|
|
Details: "blahblah",
|
|
VersionInfo: nackVersion,
|
|
}
|
|
}
|
|
want = append(want, config)
|
|
}
|
|
for i := range rdsTargets {
|
|
config := &v3statuspb.ClientConfig_GenericXdsConfig{
|
|
TypeUrl: routeConfigTypeURL,
|
|
Name: rdsTargets[i],
|
|
VersionInfo: nackVersion,
|
|
XdsConfig: routeAnys[i],
|
|
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
|
|
}
|
|
if i == nackResourceIdx {
|
|
config.VersionInfo = ackVersion
|
|
config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
|
|
config.ErrorState = &v3adminpb.UpdateFailureState{
|
|
Details: "blahblah",
|
|
VersionInfo: nackVersion,
|
|
}
|
|
}
|
|
want = append(want, config)
|
|
}
|
|
for i := range cdsTargets {
|
|
config := &v3statuspb.ClientConfig_GenericXdsConfig{
|
|
TypeUrl: clusterTypeURL,
|
|
Name: cdsTargets[i],
|
|
VersionInfo: nackVersion,
|
|
XdsConfig: clusterAnys[i],
|
|
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
|
|
}
|
|
if i == nackResourceIdx {
|
|
config.VersionInfo = ackVersion
|
|
config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
|
|
config.ErrorState = &v3adminpb.UpdateFailureState{
|
|
Details: "blahblah",
|
|
VersionInfo: nackVersion,
|
|
}
|
|
}
|
|
want = append(want, config)
|
|
}
|
|
for i := range edsTargets {
|
|
config := &v3statuspb.ClientConfig_GenericXdsConfig{
|
|
TypeUrl: endpointsTypeURL,
|
|
Name: edsTargets[i],
|
|
VersionInfo: nackVersion,
|
|
XdsConfig: endpointAnys[i],
|
|
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
|
|
}
|
|
if i == nackResourceIdx {
|
|
config.VersionInfo = ackVersion
|
|
config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
|
|
config.ErrorState = &v3adminpb.UpdateFailureState{
|
|
Details: "blahblah",
|
|
VersionInfo: nackVersion,
|
|
}
|
|
}
|
|
want = append(want, config)
|
|
}
|
|
if diff := cmp.Diff(filterFields(r.Config[0].GenericXdsConfigs), want, cmpOpts); diff != "" {
|
|
return fmt.Errorf(diff)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s) TestCSDSNoXDSClient(t *testing.T) {
|
|
oldNewXDSClient := newXDSClient
|
|
newXDSClient = func() xdsclient.XDSClient { return nil }
|
|
defer func() { newXDSClient = oldNewXDSClient }()
|
|
|
|
// Initialize an gRPC server and register CSDS on it.
|
|
server := grpc.NewServer()
|
|
csdss, err := NewClientStatusDiscoveryServer()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer csdss.Close()
|
|
v3statuspbgrpc.RegisterClientStatusDiscoveryServiceServer(server, csdss)
|
|
// Create a local listener and pass it to Serve().
|
|
lis, err := testutils.LocalTCPListener()
|
|
if err != nil {
|
|
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
|
|
}
|
|
go func() {
|
|
if err := server.Serve(lis); err != nil {
|
|
t.Errorf("Serve() failed: %v", err)
|
|
}
|
|
}()
|
|
defer server.Stop()
|
|
|
|
// Create CSDS client.
|
|
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
if err != nil {
|
|
t.Fatalf("cannot connect to server: %v", err)
|
|
}
|
|
defer conn.Close()
|
|
c := v3statuspbgrpc.NewClientStatusDiscoveryServiceClient(conn)
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
stream, err := c.StreamClientStatus(ctx, grpc.WaitForReady(true))
|
|
if err != nil {
|
|
t.Fatalf("cannot get ServerReflectionInfo: %v", err)
|
|
}
|
|
|
|
if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
|
|
t.Fatalf("failed to send: %v", err)
|
|
}
|
|
r, err := stream.Recv()
|
|
if err != nil {
|
|
// io.EOF is not ok.
|
|
t.Fatalf("failed to recv response: %v", err)
|
|
}
|
|
if n := len(r.Config); n != 0 {
|
|
t.Fatalf("got %d configs, want 0: %v", n, proto.MarshalTextString(r))
|
|
}
|
|
}
|
|
|
|
func Test_nodeProtoToV3(t *testing.T) {
|
|
const (
|
|
testID = "test-id"
|
|
testCluster = "test-cluster"
|
|
testZone = "test-zone"
|
|
)
|
|
tests := []struct {
|
|
name string
|
|
n proto.Message
|
|
want *v3corepb.Node
|
|
}{
|
|
{
|
|
name: "v3",
|
|
n: &v3corepb.Node{
|
|
Id: testID,
|
|
Cluster: testCluster,
|
|
Locality: &v3corepb.Locality{Zone: testZone},
|
|
},
|
|
want: &v3corepb.Node{
|
|
Id: testID,
|
|
Cluster: testCluster,
|
|
Locality: &v3corepb.Locality{Zone: testZone},
|
|
},
|
|
},
|
|
{
|
|
name: "v2",
|
|
n: &v2corepb.Node{
|
|
Id: testID,
|
|
Cluster: testCluster,
|
|
Locality: &v2corepb.Locality{Zone: testZone},
|
|
},
|
|
want: &v3corepb.Node{
|
|
Id: testID,
|
|
Cluster: testCluster,
|
|
Locality: &v3corepb.Locality{Zone: testZone},
|
|
},
|
|
},
|
|
{
|
|
name: "not node",
|
|
n: &v2corepb.Locality{Zone: testZone},
|
|
want: nil, // Input is not a node, should return nil.
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
got := nodeProtoToV3(tt.n)
|
|
if diff := cmp.Diff(got, tt.want, protocmp.Transform()); diff != "" {
|
|
t.Errorf("nodeProtoToV3() got unexpected result, diff (-got, +want): %v", diff)
|
|
}
|
|
})
|
|
}
|
|
}
|