csds: implement CSDS service handler (#4243)

This commit is contained in:
Menghan Li 2021-03-16 14:05:05 -07:00 committed by GitHub
parent 1e7119b136
commit 95173a53fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 1004 additions and 2 deletions

313
xds/csds/csds.go Normal file
View File

@ -0,0 +1,313 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package csds implements features to dump the status (xDS responses) the
// xds_client is using.
//
// Notice: This package is EXPERIMENTAL and may be changed or removed in a later
// release.
package csds
import (
"context"
"fmt"
"io"
"time"
v3adminpb "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"
"google.golang.org/protobuf/types/known/timestamppb"
_ "google.golang.org/grpc/xds/internal/client/v2" // Register v2 xds_client.
_ "google.golang.org/grpc/xds/internal/client/v3" // Register v3 xds_client.
)
// xdsClientInterface contains methods from xdsClient.Client which are used by
// the server. This is useful for overriding in unit tests.
type xdsClientInterface interface {
DumpLDS() (string, map[string]client.UpdateWithMD)
DumpRDS() (string, map[string]client.UpdateWithMD)
DumpCDS() (string, map[string]client.UpdateWithMD)
DumpEDS() (string, map[string]client.UpdateWithMD)
BootstrapConfig() *bootstrap.Config
Close()
}
var (
logger = grpclog.Component("xds")
newXDSClient = func() (xdsClientInterface, error) {
return client.New()
}
)
// ClientStatusDiscoveryServer implementations interface ClientStatusDiscoveryServiceServer.
type ClientStatusDiscoveryServer struct {
// xdsClient will always be the same in practise. But we keep a copy in each
// server instance for testing.
xdsClient xdsClientInterface
}
// NewClientStatusDiscoveryServer returns an implementation of the CSDS server that can be
// registered on a gRPC server.
func NewClientStatusDiscoveryServer() (*ClientStatusDiscoveryServer, error) {
xdsC, err := newXDSClient()
if err != nil {
return nil, fmt.Errorf("failed to create xds client: %v", err)
}
return &ClientStatusDiscoveryServer{
xdsClient: xdsC,
}, nil
}
// StreamClientStatus implementations interface ClientStatusDiscoveryServiceServer.
func (s *ClientStatusDiscoveryServer) StreamClientStatus(stream v3statuspb.ClientStatusDiscoveryService_StreamClientStatusServer) error {
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
resp, err := s.buildClientStatusRespForReq(req)
if err != nil {
return err
}
if err := stream.Send(resp); err != nil {
return err
}
}
}
// FetchClientStatus implementations interface ClientStatusDiscoveryServiceServer.
func (s *ClientStatusDiscoveryServer) FetchClientStatus(_ context.Context, req *v3statuspb.ClientStatusRequest) (*v3statuspb.ClientStatusResponse, error) {
return s.buildClientStatusRespForReq(req)
}
// buildClientStatusRespForReq fetches the status from the client, and returns
// the response to be sent back to client.
//
// If it returns an error, the error is a status error.
func (s *ClientStatusDiscoveryServer) buildClientStatusRespForReq(req *v3statuspb.ClientStatusRequest) (*v3statuspb.ClientStatusResponse, error) {
// Field NodeMatchers is unsupported, by design
// https://github.com/grpc/proposal/blob/master/A40-csds-support.md#detail-node-matching.
if len(req.NodeMatchers) != 0 {
return nil, status.Errorf(codes.InvalidArgument, "node_matchers are not supported, request contains node_matchers: %v", req.NodeMatchers)
}
ret := &v3statuspb.ClientStatusResponse{
Config: []*v3statuspb.ClientConfig{
{
Node: nodeProtoToV3(s.xdsClient.BootstrapConfig().NodeProto),
XdsConfig: []*v3statuspb.PerXdsConfig{
s.buildLDSPerXDSConfig(),
s.buildRDSPerXDSConfig(),
s.buildCDSPerXDSConfig(),
s.buildEDSPerXDSConfig(),
},
},
},
}
return ret, nil
}
// Close cleans up the resources.
func (s *ClientStatusDiscoveryServer) Close() {
s.xdsClient.Close()
}
// nodeProtoToV3 converts the given proto into a v3.Node. n is from bootstrap
// config, it can be either v2.Node or v3.Node.
//
// If n is already a v3.Node, return it.
// If n is v2.Node, marshal and unmarshal it to v3.
// Otherwise, return nil.
//
// The default case (not v2 or v3) is nil, instead of error, because the
// resources in the response are more important than the node. The worst case is
// that the user will receive no Node info, but will still get resources.
func nodeProtoToV3(n proto.Message) *v3corepb.Node {
var node *v3corepb.Node
switch nn := n.(type) {
case *v3corepb.Node:
node = nn
case *v2corepb.Node:
v2, err := proto.Marshal(nn)
if err != nil {
logger.Warningf("Failed to marshal node (%v): %v", n, err)
break
}
node = new(v3corepb.Node)
if err := proto.Unmarshal(v2, node); err != nil {
logger.Warningf("Failed to unmarshal node (%v): %v", v2, err)
}
default:
logger.Warningf("node from bootstrap is %#v, only v2.Node and v3.Node are supported", nn)
}
return node
}
func (s *ClientStatusDiscoveryServer) buildLDSPerXDSConfig() *v3statuspb.PerXdsConfig {
version, dump := s.xdsClient.DumpLDS()
var resources []*v3adminpb.ListenersConfigDump_DynamicListener
for name, d := range dump {
configDump := &v3adminpb.ListenersConfigDump_DynamicListener{
Name: name,
ClientStatus: serviceStatusToProto(d.MD.Status),
}
if (d.MD.Timestamp != time.Time{}) {
configDump.ActiveState = &v3adminpb.ListenersConfigDump_DynamicListenerState{
VersionInfo: d.MD.Version,
Listener: d.Raw,
LastUpdated: timestamppb.New(d.MD.Timestamp),
}
}
if errState := d.MD.ErrState; errState != nil {
configDump.ErrorState = &v3adminpb.UpdateFailureState{
LastUpdateAttempt: timestamppb.New(errState.Timestamp),
Details: errState.Err.Error(),
VersionInfo: errState.Version,
}
}
resources = append(resources, configDump)
}
return &v3statuspb.PerXdsConfig{
PerXdsConfig: &v3statuspb.PerXdsConfig_ListenerConfig{
ListenerConfig: &v3adminpb.ListenersConfigDump{
VersionInfo: version,
DynamicListeners: resources,
},
},
}
}
func (s *ClientStatusDiscoveryServer) buildRDSPerXDSConfig() *v3statuspb.PerXdsConfig {
_, dump := s.xdsClient.DumpRDS()
var resources []*v3adminpb.RoutesConfigDump_DynamicRouteConfig
for _, d := range dump {
configDump := &v3adminpb.RoutesConfigDump_DynamicRouteConfig{
VersionInfo: d.MD.Version,
ClientStatus: serviceStatusToProto(d.MD.Status),
}
if (d.MD.Timestamp != time.Time{}) {
configDump.RouteConfig = d.Raw
configDump.LastUpdated = timestamppb.New(d.MD.Timestamp)
}
if errState := d.MD.ErrState; errState != nil {
configDump.ErrorState = &v3adminpb.UpdateFailureState{
LastUpdateAttempt: timestamppb.New(errState.Timestamp),
Details: errState.Err.Error(),
VersionInfo: errState.Version,
}
}
resources = append(resources, configDump)
}
return &v3statuspb.PerXdsConfig{
PerXdsConfig: &v3statuspb.PerXdsConfig_RouteConfig{
RouteConfig: &v3adminpb.RoutesConfigDump{
DynamicRouteConfigs: resources,
},
},
}
}
func (s *ClientStatusDiscoveryServer) buildCDSPerXDSConfig() *v3statuspb.PerXdsConfig {
version, dump := s.xdsClient.DumpCDS()
var resources []*v3adminpb.ClustersConfigDump_DynamicCluster
for _, d := range dump {
configDump := &v3adminpb.ClustersConfigDump_DynamicCluster{
VersionInfo: d.MD.Version,
ClientStatus: serviceStatusToProto(d.MD.Status),
}
if (d.MD.Timestamp != time.Time{}) {
configDump.Cluster = d.Raw
configDump.LastUpdated = timestamppb.New(d.MD.Timestamp)
}
if errState := d.MD.ErrState; errState != nil {
configDump.ErrorState = &v3adminpb.UpdateFailureState{
LastUpdateAttempt: timestamppb.New(errState.Timestamp),
Details: errState.Err.Error(),
VersionInfo: errState.Version,
}
}
resources = append(resources, configDump)
}
return &v3statuspb.PerXdsConfig{
PerXdsConfig: &v3statuspb.PerXdsConfig_ClusterConfig{
ClusterConfig: &v3adminpb.ClustersConfigDump{
VersionInfo: version,
DynamicActiveClusters: resources,
},
},
}
}
func (s *ClientStatusDiscoveryServer) buildEDSPerXDSConfig() *v3statuspb.PerXdsConfig {
_, dump := s.xdsClient.DumpEDS()
var resources []*v3adminpb.EndpointsConfigDump_DynamicEndpointConfig
for _, d := range dump {
configDump := &v3adminpb.EndpointsConfigDump_DynamicEndpointConfig{
VersionInfo: d.MD.Version,
ClientStatus: serviceStatusToProto(d.MD.Status),
}
if (d.MD.Timestamp != time.Time{}) {
configDump.EndpointConfig = d.Raw
configDump.LastUpdated = timestamppb.New(d.MD.Timestamp)
}
if errState := d.MD.ErrState; errState != nil {
configDump.ErrorState = &v3adminpb.UpdateFailureState{
LastUpdateAttempt: timestamppb.New(errState.Timestamp),
Details: errState.Err.Error(),
VersionInfo: errState.Version,
}
}
resources = append(resources, configDump)
}
return &v3statuspb.PerXdsConfig{
PerXdsConfig: &v3statuspb.PerXdsConfig_EndpointConfig{
EndpointConfig: &v3adminpb.EndpointsConfigDump{
DynamicEndpointConfigs: resources,
},
},
}
}
func serviceStatusToProto(serviceStatus client.ServiceStatus) v3adminpb.ClientResourceStatus {
switch serviceStatus {
case client.ServiceStatusUnknown:
return v3adminpb.ClientResourceStatus_UNKNOWN
case client.ServiceStatusRequested:
return v3adminpb.ClientResourceStatus_REQUESTED
case client.ServiceStatusNotExist:
return v3adminpb.ClientResourceStatus_DOES_NOT_EXIST
case client.ServiceStatusACKed:
return v3adminpb.ClientResourceStatus_ACKED
case client.ServiceStatusNACKed:
return v3adminpb.ClientResourceStatus_NACKED
default:
return v3adminpb.ClientResourceStatus_UNKNOWN
}
}

683
xds/csds/csds_test.go Normal file
View File

@ -0,0 +1,683 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package csds
import (
"context"
"fmt"
"strings"
"testing"
"time"
v3adminpb "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/e2e"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
)
const (
defaultTestTimeout = 10 * time.Second
)
type xdsClientInterfaceWithWatch interface {
WatchListener(string, func(client.ListenerUpdate, error)) func()
WatchRouteConfig(string, func(client.RouteConfigUpdate, error)) func()
WatchCluster(string, func(client.ClusterUpdate, error)) func()
WatchEndpoints(string, func(client.EndpointsUpdate, error)) func()
}
var cmpOpts = cmp.Options{
cmpopts.EquateEmpty(),
cmp.Comparer(func(a, b *timestamppb.Timestamp) bool { return true }),
protocmp.IgnoreFields(&v3adminpb.UpdateFailureState{}, "last_update_attempt", "details"),
protocmp.SortRepeated(func(a, b *v3adminpb.ListenersConfigDump_DynamicListener) bool {
return strings.Compare(a.Name, b.Name) < 0
}),
protocmp.SortRepeated(func(a, b *v3adminpb.RoutesConfigDump_DynamicRouteConfig) bool {
if a.RouteConfig == nil {
return false
}
if b.RouteConfig == nil {
return true
}
var at, bt v3routepb.RouteConfiguration
if err := ptypes.UnmarshalAny(a.RouteConfig, &at); err != nil {
panic("failed to unmarshal RouteConfig" + err.Error())
}
if err := ptypes.UnmarshalAny(b.RouteConfig, &bt); err != nil {
panic("failed to unmarshal RouteConfig" + err.Error())
}
return strings.Compare(at.Name, bt.Name) < 0
}),
protocmp.SortRepeated(func(a, b *v3adminpb.ClustersConfigDump_DynamicCluster) bool {
if a.Cluster == nil {
return false
}
if b.Cluster == nil {
return true
}
var at, bt v3clusterpb.Cluster
if err := ptypes.UnmarshalAny(a.Cluster, &at); err != nil {
panic("failed to unmarshal Cluster" + err.Error())
}
if err := ptypes.UnmarshalAny(b.Cluster, &bt); err != nil {
panic("failed to unmarshal Cluster" + err.Error())
}
return strings.Compare(at.Name, bt.Name) < 0
}),
protocmp.SortRepeated(func(a, b *v3adminpb.EndpointsConfigDump_DynamicEndpointConfig) bool {
if a.EndpointConfig == nil {
return false
}
if b.EndpointConfig == nil {
return true
}
var at, bt v3endpointpb.ClusterLoadAssignment
if err := ptypes.UnmarshalAny(a.EndpointConfig, &at); err != nil {
panic("failed to unmarshal Endpoints" + err.Error())
}
if err := ptypes.UnmarshalAny(b.EndpointConfig, &bt); err != nil {
panic("failed to unmarshal Endpoints" + err.Error())
}
return strings.Compare(at.ClusterName, bt.ClusterName) < 0
}),
protocmp.IgnoreFields(&v3adminpb.ListenersConfigDump_DynamicListenerState{}, "last_updated"),
protocmp.IgnoreFields(&v3adminpb.RoutesConfigDump_DynamicRouteConfig{}, "last_updated"),
protocmp.IgnoreFields(&v3adminpb.ClustersConfigDump_DynamicCluster{}, "last_updated"),
protocmp.IgnoreFields(&v3adminpb.EndpointsConfigDump_DynamicEndpointConfig{}, "last_updated"),
protocmp.Transform(),
}
var (
ldsTargets = []string{"lds.target.good:0000", "lds.target.good:1111"}
listeners = make([]*v3listenerpb.Listener, len(ldsTargets))
listenerAnys = make([]*anypb.Any, len(ldsTargets))
rdsTargets = []string{"route-config-0", "route-config-1"}
routes = make([]*v3routepb.RouteConfiguration, len(rdsTargets))
routeAnys = make([]*anypb.Any, len(rdsTargets))
cdsTargets = []string{"cluster-0", "cluster-1"}
clusters = make([]*v3clusterpb.Cluster, len(cdsTargets))
clusterAnys = make([]*anypb.Any, len(cdsTargets))
edsTargets = []string{"endpoints-0", "endpoints-1"}
endpoints = make([]*v3endpointpb.ClusterLoadAssignment, len(edsTargets))
endpointAnys = make([]*anypb.Any, len(edsTargets))
ips = []string{"0.0.0.0", "1.1.1.1"}
ports = []uint32{123, 456}
)
func init() {
for i := range ldsTargets {
listeners[i] = e2e.DefaultListener(ldsTargets[i], rdsTargets[i])
listenerAnys[i], _ = ptypes.MarshalAny(listeners[i])
}
for i := range rdsTargets {
routes[i] = e2e.DefaultRouteConfig(rdsTargets[i], ldsTargets[i], cdsTargets[i])
routeAnys[i], _ = ptypes.MarshalAny(routes[i])
}
for i := range cdsTargets {
clusters[i] = e2e.DefaultCluster(cdsTargets[i], edsTargets[i])
clusterAnys[i], _ = ptypes.MarshalAny(clusters[i])
}
for i := range edsTargets {
endpoints[i] = e2e.DefaultEndpoint(edsTargets[i], ips[i], ports[i])
endpointAnys[i], _ = ptypes.MarshalAny(endpoints[i])
}
}
func TestCSDS(t *testing.T) {
const retryCount = 10
xdsC, mgmServer, nodeID, stream, cleanup := commonSetup(t)
defer cleanup()
for _, target := range ldsTargets {
xdsC.WatchListener(target, func(client.ListenerUpdate, error) {})
}
for _, target := range rdsTargets {
xdsC.WatchRouteConfig(target, func(client.RouteConfigUpdate, error) {})
}
for _, target := range cdsTargets {
xdsC.WatchCluster(target, func(client.ClusterUpdate, error) {})
}
for _, target := range edsTargets {
xdsC.WatchEndpoints(target, func(client.EndpointsUpdate, error) {})
}
for i := 0; i < retryCount; i++ {
err := checkForRequested(stream)
if err == nil {
break
}
if i == retryCount-1 {
t.Fatalf("%v", err)
}
time.Sleep(time.Millisecond * 100)
}
if err := mgmServer.Update(e2e.UpdateOptions{
NodeID: nodeID,
Listeners: listeners,
Routes: routes,
Clusters: clusters,
Endpoints: endpoints,
}); err != nil {
t.Fatal(err)
}
for i := 0; i < retryCount; i++ {
err := checkForACKed(stream)
if err == nil {
break
}
if i == retryCount-1 {
t.Fatalf("%v", err)
}
time.Sleep(time.Millisecond * 100)
}
const nackResourceIdx = 0
if err := mgmServer.Update(e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{
{Name: ldsTargets[nackResourceIdx], ApiListener: &v3listenerpb.ApiListener{}}, // 0 will be nacked. 1 will stay the same.
},
Routes: []*v3routepb.RouteConfiguration{
{Name: rdsTargets[nackResourceIdx], VirtualHosts: []*v3routepb.VirtualHost{{
Routes: []*v3routepb.Route{{}},
}}},
},
Clusters: []*v3clusterpb.Cluster{
{Name: cdsTargets[nackResourceIdx], ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC}},
},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{
{ClusterName: edsTargets[nackResourceIdx], Endpoints: []*v3endpointpb.LocalityLbEndpoints{{}}},
},
SkipValidation: true,
}); err != nil {
t.Fatal(err)
}
for i := 0; i < retryCount; i++ {
err := checkForNACKed(nackResourceIdx, stream)
if err == nil {
break
}
if i == retryCount-1 {
t.Fatalf("%v", err)
}
time.Sleep(time.Millisecond * 100)
}
}
func commonSetup(t *testing.T) (xdsClientInterfaceWithWatch, *e2e.ManagementServer, string, v3statuspb.ClientStatusDiscoveryService_StreamClientStatusClient, func()) {
t.Helper()
// Spin up a xDS management server on a local port.
nodeID := uuid.New().String()
fs, err := e2e.StartManagementServer()
if err != nil {
t.Fatal(err)
}
// Create a bootstrap file in a temporary directory.
bootstrapCleanup, err := e2e.SetupBootstrapFile(e2e.BootstrapOptions{
Version: e2e.TransportV3,
NodeID: nodeID,
ServerURI: fs.Address,
})
if err != nil {
t.Fatal(err)
}
// Create xds_client.
xdsC, err := client.New()
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClientInterface, error) {
return xdsC, nil
}
// Initialize an gRPC server and register CSDS on it.
server := grpc.NewServer()
csdss, err := NewClientStatusDiscoveryServer()
if err != nil {
t.Fatal(err)
}
v3statuspb.RegisterClientStatusDiscoveryServiceServer(server, csdss)
// Create a local listener and pass it to Serve().
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
// Create CSDS client.
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
if err != nil {
t.Fatalf("cannot connect to server: %v", err)
}
c := v3statuspb.NewClientStatusDiscoveryServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
stream, err := c.StreamClientStatus(ctx, grpc.WaitForReady(true))
if err != nil {
t.Fatalf("cannot get ServerReflectionInfo: %v", err)
}
return xdsC, fs, nodeID, stream, func() {
fs.Stop()
cancel()
conn.Close()
server.Stop()
csdss.Close()
newXDSClient = oldNewXDSClient
xdsC.Close()
bootstrapCleanup()
}
}
func checkForRequested(stream v3statuspb.ClientStatusDiscoveryService_StreamClientStatusClient) error {
if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
return fmt.Errorf("failed to send request: %v", err)
}
r, err := stream.Recv()
if err != nil {
// io.EOF is not ok.
return fmt.Errorf("failed to recv response: %v", err)
}
if n := len(r.Config); n != 1 {
return fmt.Errorf("got %d configs, want 1: %v", n, proto.MarshalTextString(r))
}
if n := len(r.Config[0].XdsConfig); n != 4 {
return fmt.Errorf("got %d xds configs (one for each type), want 4: %v", n, proto.MarshalTextString(r))
}
for _, cfg := range r.Config[0].XdsConfig {
switch config := cfg.PerXdsConfig.(type) {
case *v3statuspb.PerXdsConfig_ListenerConfig:
var wantLis []*v3adminpb.ListenersConfigDump_DynamicListener
for i := range ldsTargets {
wantLis = append(wantLis, &v3adminpb.ListenersConfigDump_DynamicListener{
Name: ldsTargets[i],
ClientStatus: v3adminpb.ClientResourceStatus_REQUESTED,
})
}
wantDump := &v3adminpb.ListenersConfigDump{
DynamicListeners: wantLis,
}
if diff := cmp.Diff(config.ListenerConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
case *v3statuspb.PerXdsConfig_RouteConfig:
var wantRoutes []*v3adminpb.RoutesConfigDump_DynamicRouteConfig
for range rdsTargets {
wantRoutes = append(wantRoutes, &v3adminpb.RoutesConfigDump_DynamicRouteConfig{
ClientStatus: v3adminpb.ClientResourceStatus_REQUESTED,
})
}
wantDump := &v3adminpb.RoutesConfigDump{
DynamicRouteConfigs: wantRoutes,
}
if diff := cmp.Diff(config.RouteConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
case *v3statuspb.PerXdsConfig_ClusterConfig:
var wantCluster []*v3adminpb.ClustersConfigDump_DynamicCluster
for range cdsTargets {
wantCluster = append(wantCluster, &v3adminpb.ClustersConfigDump_DynamicCluster{
ClientStatus: v3adminpb.ClientResourceStatus_REQUESTED,
})
}
wantDump := &v3adminpb.ClustersConfigDump{
DynamicActiveClusters: wantCluster,
}
if diff := cmp.Diff(config.ClusterConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
case *v3statuspb.PerXdsConfig_EndpointConfig:
var wantEndpoint []*v3adminpb.EndpointsConfigDump_DynamicEndpointConfig
for range cdsTargets {
wantEndpoint = append(wantEndpoint, &v3adminpb.EndpointsConfigDump_DynamicEndpointConfig{
ClientStatus: v3adminpb.ClientResourceStatus_REQUESTED,
})
}
wantDump := &v3adminpb.EndpointsConfigDump{
DynamicEndpointConfigs: wantEndpoint,
}
if diff := cmp.Diff(config.EndpointConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
default:
return fmt.Errorf("unexpected PerXdsConfig: %+v; %v", cfg.PerXdsConfig, protoToJSON(r))
}
}
return nil
}
func checkForACKed(stream v3statuspb.ClientStatusDiscoveryService_StreamClientStatusClient) error {
const wantVersion = "1"
if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
return fmt.Errorf("failed to send: %v", err)
}
r, err := stream.Recv()
if err != nil {
// io.EOF is not ok.
return fmt.Errorf("failed to recv response: %v", err)
}
if n := len(r.Config); n != 1 {
return fmt.Errorf("got %d configs, want 1: %v", n, proto.MarshalTextString(r))
}
if n := len(r.Config[0].XdsConfig); n != 4 {
return fmt.Errorf("got %d xds configs (one for each type), want 4: %v", n, proto.MarshalTextString(r))
}
for _, cfg := range r.Config[0].XdsConfig {
switch config := cfg.PerXdsConfig.(type) {
case *v3statuspb.PerXdsConfig_ListenerConfig:
var wantLis []*v3adminpb.ListenersConfigDump_DynamicListener
for i := range ldsTargets {
wantLis = append(wantLis, &v3adminpb.ListenersConfigDump_DynamicListener{
Name: ldsTargets[i],
ActiveState: &v3adminpb.ListenersConfigDump_DynamicListenerState{
VersionInfo: wantVersion,
Listener: listenerAnys[i],
LastUpdated: nil,
},
ErrorState: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
})
}
wantDump := &v3adminpb.ListenersConfigDump{
VersionInfo: wantVersion,
DynamicListeners: wantLis,
}
if diff := cmp.Diff(config.ListenerConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
case *v3statuspb.PerXdsConfig_RouteConfig:
var wantRoutes []*v3adminpb.RoutesConfigDump_DynamicRouteConfig
for i := range rdsTargets {
wantRoutes = append(wantRoutes, &v3adminpb.RoutesConfigDump_DynamicRouteConfig{
VersionInfo: wantVersion,
RouteConfig: routeAnys[i],
LastUpdated: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
})
}
wantDump := &v3adminpb.RoutesConfigDump{
DynamicRouteConfigs: wantRoutes,
}
if diff := cmp.Diff(config.RouteConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
case *v3statuspb.PerXdsConfig_ClusterConfig:
var wantCluster []*v3adminpb.ClustersConfigDump_DynamicCluster
for i := range cdsTargets {
wantCluster = append(wantCluster, &v3adminpb.ClustersConfigDump_DynamicCluster{
VersionInfo: wantVersion,
Cluster: clusterAnys[i],
LastUpdated: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
})
}
wantDump := &v3adminpb.ClustersConfigDump{
VersionInfo: wantVersion,
DynamicActiveClusters: wantCluster,
}
if diff := cmp.Diff(config.ClusterConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
case *v3statuspb.PerXdsConfig_EndpointConfig:
var wantEndpoint []*v3adminpb.EndpointsConfigDump_DynamicEndpointConfig
for i := range cdsTargets {
wantEndpoint = append(wantEndpoint, &v3adminpb.EndpointsConfigDump_DynamicEndpointConfig{
VersionInfo: wantVersion,
EndpointConfig: endpointAnys[i],
LastUpdated: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
})
}
wantDump := &v3adminpb.EndpointsConfigDump{
DynamicEndpointConfigs: wantEndpoint,
}
if diff := cmp.Diff(config.EndpointConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
default:
return fmt.Errorf("unexpected PerXdsConfig: %+v; %v", cfg.PerXdsConfig, protoToJSON(r))
}
}
return nil
}
func checkForNACKed(nackResourceIdx int, stream v3statuspb.ClientStatusDiscoveryService_StreamClientStatusClient) error {
const (
ackVersion = "1"
nackVersion = "2"
)
if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
return fmt.Errorf("failed to send: %v", err)
}
r, err := stream.Recv()
if err != nil {
// io.EOF is not ok.
return fmt.Errorf("failed to recv response: %v", err)
}
if n := len(r.Config); n != 1 {
return fmt.Errorf("got %d configs, want 1: %v", n, proto.MarshalTextString(r))
}
if n := len(r.Config[0].XdsConfig); n != 4 {
return fmt.Errorf("got %d xds configs (one for each type), want 4: %v", n, proto.MarshalTextString(r))
}
for _, cfg := range r.Config[0].XdsConfig {
switch config := cfg.PerXdsConfig.(type) {
case *v3statuspb.PerXdsConfig_ListenerConfig:
var wantLis []*v3adminpb.ListenersConfigDump_DynamicListener
for i := range ldsTargets {
configDump := &v3adminpb.ListenersConfigDump_DynamicListener{
Name: ldsTargets[i],
ActiveState: &v3adminpb.ListenersConfigDump_DynamicListenerState{
VersionInfo: ackVersion,
Listener: listenerAnys[i],
LastUpdated: nil,
},
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
}
if i == nackResourceIdx {
configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
configDump.ErrorState = &v3adminpb.UpdateFailureState{
Details: "blahblah",
VersionInfo: nackVersion,
}
}
wantLis = append(wantLis, configDump)
}
wantDump := &v3adminpb.ListenersConfigDump{
VersionInfo: nackVersion,
DynamicListeners: wantLis,
}
if diff := cmp.Diff(config.ListenerConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
case *v3statuspb.PerXdsConfig_RouteConfig:
var wantRoutes []*v3adminpb.RoutesConfigDump_DynamicRouteConfig
for i := range rdsTargets {
configDump := &v3adminpb.RoutesConfigDump_DynamicRouteConfig{
VersionInfo: ackVersion,
RouteConfig: routeAnys[i],
LastUpdated: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
}
if i == nackResourceIdx {
configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
configDump.ErrorState = &v3adminpb.UpdateFailureState{
Details: "blahblah",
VersionInfo: nackVersion,
}
}
wantRoutes = append(wantRoutes, configDump)
}
wantDump := &v3adminpb.RoutesConfigDump{
DynamicRouteConfigs: wantRoutes,
}
if diff := cmp.Diff(config.RouteConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
case *v3statuspb.PerXdsConfig_ClusterConfig:
var wantCluster []*v3adminpb.ClustersConfigDump_DynamicCluster
for i := range cdsTargets {
configDump := &v3adminpb.ClustersConfigDump_DynamicCluster{
VersionInfo: ackVersion,
Cluster: clusterAnys[i],
LastUpdated: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
}
if i == nackResourceIdx {
configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
configDump.ErrorState = &v3adminpb.UpdateFailureState{
Details: "blahblah",
VersionInfo: nackVersion,
}
}
wantCluster = append(wantCluster, configDump)
}
wantDump := &v3adminpb.ClustersConfigDump{
VersionInfo: nackVersion,
DynamicActiveClusters: wantCluster,
}
if diff := cmp.Diff(config.ClusterConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
case *v3statuspb.PerXdsConfig_EndpointConfig:
var wantEndpoint []*v3adminpb.EndpointsConfigDump_DynamicEndpointConfig
for i := range cdsTargets {
configDump := &v3adminpb.EndpointsConfigDump_DynamicEndpointConfig{
VersionInfo: ackVersion,
EndpointConfig: endpointAnys[i],
LastUpdated: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
}
if i == nackResourceIdx {
configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
configDump.ErrorState = &v3adminpb.UpdateFailureState{
Details: "blahblah",
VersionInfo: nackVersion,
}
}
wantEndpoint = append(wantEndpoint, configDump)
}
wantDump := &v3adminpb.EndpointsConfigDump{
DynamicEndpointConfigs: wantEndpoint,
}
if diff := cmp.Diff(config.EndpointConfig, wantDump, cmpOpts); diff != "" {
return fmt.Errorf(diff)
}
default:
return fmt.Errorf("unexpected PerXdsConfig: %+v; %v", cfg.PerXdsConfig, protoToJSON(r))
}
}
return nil
}
func protoToJSON(p proto.Message) string {
mm := jsonpb.Marshaler{
Indent: " ",
}
ret, _ := mm.MarshalToString(p)
return ret
}
func Test_nodeProtoToV3(t *testing.T) {
const (
testID = "test-id"
testCluster = "test-cluster"
testZone = "test-zone"
)
tests := []struct {
name string
n proto.Message
want *v3corepb.Node
}{
{
name: "v3",
n: &v3corepb.Node{
Id: testID,
Cluster: testCluster,
Locality: &v3corepb.Locality{Zone: testZone},
},
want: &v3corepb.Node{
Id: testID,
Cluster: testCluster,
Locality: &v3corepb.Locality{Zone: testZone},
},
},
{
name: "v2",
n: &v2corepb.Node{
Id: testID,
Cluster: testCluster,
Locality: &v2corepb.Locality{Zone: testZone},
},
want: &v3corepb.Node{
Id: testID,
Cluster: testCluster,
Locality: &v3corepb.Locality{Zone: testZone},
},
},
{
name: "not node",
n: &v2corepb.Locality{Zone: testZone},
want: nil, // Input is not a node, should return nil.
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := nodeProtoToV3(tt.n)
if diff := cmp.Diff(got, tt.want, protocmp.Transform()); diff != "" {
t.Errorf("nodeProtoToV3() got unexpected result, diff (-got, +want): %v", diff)
}
})
}
}

View File

@ -113,6 +113,10 @@ type UpdateOptions struct {
Clusters []*v3clusterpb.Cluster
Routes []*v3routepb.RouteConfiguration
Listeners []*v3listenerpb.Listener
// SkipValidation indicates whether we want to skip validation (by not
// calling snapshot.Consistent()). It can be useful for negative tests,
// where we send updates that the client will NACK.
SkipValidation bool
}
// Update changes the resource snapshot held by the management server, which
@ -122,8 +126,10 @@ func (s *ManagementServer) Update(opts UpdateOptions) error {
// Create a snapshot with the passed in resources.
snapshot := v3cache.NewSnapshot(strconv.Itoa(s.version), resourceSlice(opts.Endpoints), resourceSlice(opts.Clusters), resourceSlice(opts.Routes), resourceSlice(opts.Listeners), nil /*runtimes*/, nil /*secrets*/)
if err := snapshot.Consistent(); err != nil {
return fmt.Errorf("failed to create new resource snapshot: %v", err)
if !opts.SkipValidation {
if err := snapshot.Consistent(); err != nil {
return fmt.Errorf("failed to create new resource snapshot: %v", err)
}
}
logger.Infof("Created new resource snapshot...")