grpc-go/xds/csds/csds_e2e_test.go

700 lines
27 KiB
Go

/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package csds_test
import (
"context"
"fmt"
"io"
"slices"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/xds/csds"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/anypb"
v3adminpb "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
v3statuspbgrpc "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
_ "google.golang.org/grpc/xds/internal/httpfilter/router" // Register the router filter
)
const defaultTestTimeout = 5 * time.Second
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
// The following watcher implementations are no-ops since we don't really care
// about the callback received by these watchers in the test. We only care
// whether CSDS reports the expected state.
type nopListenerWatcher struct{}
func (nopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) {
onDone()
}
func (nopListenerWatcher) ResourceError(_ error, onDone func()) {
onDone()
}
func (nopListenerWatcher) AmbientError(_ error, onDone func()) {
onDone()
}
type nopRouteConfigWatcher struct{}
func (nopRouteConfigWatcher) ResourceChanged(_ *xdsresource.RouteConfigResourceData, onDone func()) {
onDone()
}
func (nopRouteConfigWatcher) ResourceError(_ error, onDone func()) {
onDone()
}
func (nopRouteConfigWatcher) AmbientError(_ error, onDone func()) {
onDone()
}
type nopClusterWatcher struct{}
func (nopClusterWatcher) ResourceChanged(_ *xdsresource.ClusterResourceData, onDone func()) {
onDone()
}
func (nopClusterWatcher) ResourceError(_ error, onDone func()) {
onDone()
}
func (nopClusterWatcher) AmbientError(_ error, onDone func()) {
onDone()
}
type nopEndpointsWatcher struct{}
func (nopEndpointsWatcher) ResourceChanged(_ *xdsresource.EndpointsResourceData, onDone func()) {
onDone()
}
func (nopEndpointsWatcher) ResourceError(_ error, onDone func()) {
onDone()
}
func (nopEndpointsWatcher) AmbientError(_ error, onDone func()) {
onDone()
}
// This watcher writes the onDone callback on to a channel for the test to
// invoke it when it wants to unblock the next read on the ADS stream in the xDS
// client. This is particularly useful when a resource is NACKed, because the
// go-control-plane management server continuously resends the same resource in
// this case, and applying flow control from these watchers ensures that xDS
// client does not spend all of its time receiving and NACKing updates from the
// management server. This was indeed the case on arm64 (before we had support
// for ADS stream level flow control), and was causing CSDS to not receive any
// updates from the xDS client.
type blockingListenerWatcher struct {
testCtxDone <-chan struct{} // Closed when the test is done.
onDoneCh chan func() // Channel to write the onDone callback to.
}
func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWatcher {
return &blockingListenerWatcher{
testCtxDone: testCtxDone,
onDoneCh: make(chan func(), 1),
}
}
func (w *blockingListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}
func (w *blockingListenerWatcher) ResourceError(_ error, onDone func()) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}
func (w *blockingListenerWatcher) AmbientError(_ error, onDone func()) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}
// writeOnDone attempts to write the onDone callback on the onDone channel. It
// returns when it can successfully write to the channel or when the test is
// done, which is signalled by testCtxDone being closed.
func writeOnDone(testCtxDone <-chan struct{}, onDoneCh chan func(), onDone func()) {
select {
case <-testCtxDone:
case onDoneCh <- onDone:
}
}
// Creates a gRPC server and starts serving a CSDS service implementation on it.
// Returns the address of the newly created gRPC server.
//
// Registers cleanup functions on t to stop the gRPC server and the CSDS
// implementation.
func startCSDSServer(t *testing.T) string {
t.Helper()
server := grpc.NewServer()
t.Cleanup(server.Stop)
csdss, err := csds.NewClientStatusDiscoveryServer()
if err != nil {
t.Fatalf("Failed to create CSDS service implementation: %v", err)
}
v3statuspbgrpc.RegisterClientStatusDiscoveryServiceServer(server, csdss)
t.Cleanup(csdss.Close)
// Create a local listener and pass it to Serve().
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
return lis.Addr().String()
}
func startCSDSClientStream(ctx context.Context, t *testing.T, serverAddr string) v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient {
conn, err := grpc.NewClient(serverAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("Failed to dial CSDS server %q: %v", serverAddr, err)
}
client := v3statuspbgrpc.NewClientStatusDiscoveryServiceClient(conn)
stream, err := client.StreamClientStatus(ctx, grpc.WaitForReady(true))
if err != nil {
t.Fatalf("Failed to create a stream for CSDS: %v", err)
}
t.Cleanup(func() { conn.Close() })
return stream
}
// Tests CSDS functionality. The test performs the following:
// - Spins up a management server and creates two xDS clients talking to it.
// - Registers a set of watches on the xDS clients, and verifies that the CSDS
// response reports resources in REQUESTED state.
// - Configures resources on the management server corresponding to the ones
// being watched by the clients, and verifies that the CSDS response reports
// resources in ACKED state.
//
// For the above operations, the test also verifies that the client_scope field
// in the CSDS response is populated appropriately.
func (s) TestCSDS(t *testing.T) {
// Spin up a xDS management server on a local port.
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
// Create a bootstrap contents pointing to the above management server.
nodeID := uuid.New().String()
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
config, err := bootstrap.NewConfigFromContents(bootstrapContents)
if err != nil {
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err)
}
// We use the default xDS client pool here because the CSDS service reports
// on the state of the default xDS client which is implicitly managed
// within the xdsclient.DefaultPool.
xdsclient.DefaultPool.SetFallbackBootstrapConfig(config)
defer func() { xdsclient.DefaultPool.UnsetBootstrapConfigForTesting() }()
// Create two xDS clients, with different names. These should end up
// creating two different xDS clients.
const xdsClient1Name = "xds-csds-client-1"
xdsClient1, xdsClose1, err := xdsclient.DefaultPool.NewClientForTesting(xdsclient.OptionsForTesting{
Name: xdsClient1Name,
})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer xdsClose1()
const xdsClient2Name = "xds-csds-client-2"
xdsClient2, xdsClose2, err := xdsclient.DefaultPool.NewClientForTesting(xdsclient.OptionsForTesting{
Name: xdsClient2Name,
})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer xdsClose2()
// Start a CSDS server and create a client stream to it.
addr := startCSDSServer(t)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
stream := startCSDSClientStream(ctx, t, addr)
// Verify that the xDS client reports an empty config.
wantNode := &v3corepb.Node{
Id: nodeID,
UserAgentName: "gRPC Go",
UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version},
ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"},
}
wantResp := &v3statuspb.ClientStatusResponse{
Config: []*v3statuspb.ClientConfig{
{
Node: wantNode,
ClientScope: xdsClient1Name,
},
{
Node: wantNode,
ClientScope: xdsClient2Name,
},
},
}
if err := checkClientStatusResponse(ctx, stream, wantResp); err != nil {
t.Fatal(err)
}
// Initialize the xDS resources to be used in this test.
ldsTargets := []string{"lds.target.good:0000", "lds.target.good:1111"}
rdsTargets := []string{"route-config-0", "route-config-1"}
cdsTargets := []string{"cluster-0", "cluster-1"}
edsTargets := []string{"endpoints-0", "endpoints-1"}
listeners := make([]*v3listenerpb.Listener, len(ldsTargets))
listenerAnys := make([]*anypb.Any, len(ldsTargets))
for i := range ldsTargets {
listeners[i] = e2e.DefaultClientListener(ldsTargets[i], rdsTargets[i])
listenerAnys[i] = testutils.MarshalAny(t, listeners[i])
}
routes := make([]*v3routepb.RouteConfiguration, len(rdsTargets))
routeAnys := make([]*anypb.Any, len(rdsTargets))
for i := range rdsTargets {
routes[i] = e2e.DefaultRouteConfig(rdsTargets[i], ldsTargets[i], cdsTargets[i])
routeAnys[i] = testutils.MarshalAny(t, routes[i])
}
clusters := make([]*v3clusterpb.Cluster, len(cdsTargets))
clusterAnys := make([]*anypb.Any, len(cdsTargets))
for i := range cdsTargets {
clusters[i] = e2e.DefaultCluster(cdsTargets[i], edsTargets[i], e2e.SecurityLevelNone)
clusterAnys[i] = testutils.MarshalAny(t, clusters[i])
}
endpoints := make([]*v3endpointpb.ClusterLoadAssignment, len(edsTargets))
endpointAnys := make([]*anypb.Any, len(edsTargets))
ips := []string{"0.0.0.0", "1.1.1.1"}
ports := []uint32{123, 456}
for i := range edsTargets {
endpoints[i] = e2e.DefaultEndpoint(edsTargets[i], ips[i], ports[i:i+1])
endpointAnys[i] = testutils.MarshalAny(t, endpoints[i])
}
// Register watches on the xDS clients for two resources of each type.
for _, xdsC := range []xdsclient.XDSClient{xdsClient1, xdsClient2} {
for _, target := range ldsTargets {
xdsresource.WatchListener(xdsC, target, nopListenerWatcher{})
}
for _, target := range rdsTargets {
xdsresource.WatchRouteConfig(xdsC, target, nopRouteConfigWatcher{})
}
for _, target := range cdsTargets {
xdsresource.WatchCluster(xdsC, target, nopClusterWatcher{})
}
for _, target := range edsTargets {
xdsresource.WatchEndpoints(xdsC, target, nopEndpointsWatcher{})
}
}
// Verify that the xDS client reports the resources as being in "Requested"
// state, and in version "0".
wantConfigs := []*v3statuspb.ClientConfig_GenericXdsConfig{
makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[0], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil),
makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[1], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil),
makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[0], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil),
makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[1], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil),
makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[0], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil),
makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[1], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil),
makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[0], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil),
makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[1], "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil),
}
wantResp = &v3statuspb.ClientStatusResponse{
Config: []*v3statuspb.ClientConfig{
{
Node: wantNode,
GenericXdsConfigs: wantConfigs,
ClientScope: xdsClient1Name,
},
{
Node: wantNode,
GenericXdsConfigs: wantConfigs,
ClientScope: xdsClient2Name,
},
},
}
if err := checkClientStatusResponse(ctx, stream, wantResp); err != nil {
t.Fatal(err)
}
// Configure the management server with two resources of each type,
// corresponding to the watches registered above.
if err := mgmtServer.Update(ctx, e2e.UpdateOptions{
NodeID: nodeID,
Listeners: listeners,
Routes: routes,
Clusters: clusters,
Endpoints: endpoints,
}); err != nil {
t.Fatal(err)
}
// Verify that the xDS client reports the resources as being in "ACKed"
// state, and in version "1".
wantConfigs = []*v3statuspb.ClientConfig_GenericXdsConfig{
makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[0], "1", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[0], nil),
makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[1], "1", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[1], nil),
makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[0], "1", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[0], nil),
makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[1], "1", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[1], nil),
makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[0], "1", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[0], nil),
makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[1], "1", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[1], nil),
makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[0], "1", v3adminpb.ClientResourceStatus_ACKED, routeAnys[0], nil),
makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[1], "1", v3adminpb.ClientResourceStatus_ACKED, routeAnys[1], nil),
}
wantResp = &v3statuspb.ClientStatusResponse{
Config: []*v3statuspb.ClientConfig{
{
Node: wantNode,
GenericXdsConfigs: wantConfigs,
ClientScope: xdsClient1Name,
},
{
Node: wantNode,
GenericXdsConfigs: wantConfigs,
ClientScope: xdsClient2Name,
},
},
}
if err := checkClientStatusResponse(ctx, stream, wantResp); err != nil {
t.Fatal(err)
}
}
// Tests CSDS functionality. The test performs the following:
// - Spins up a management server and creates two xDS clients talking to it.
// - Registers one watch on each xDS client, and verifies that the CSDS
// response reports resources in REQUESTED state.
// - Configures two resources on the management server and verifies that the
// CSDS response reports the resources as being in ACKED state.
// - Updates one of two resources on the management server such that it is
// expected to be NACKed by the client. Verifies that the CSDS response
// contains one resource in ACKED state and one in NACKED state.
//
// For the above operations, the test also verifies that the client_scope field
// in the CSDS response is populated appropriately.
//
// This test does a bunch of similar things to the previous test, but has
// reduced complexity because of having to deal with a single resource type.
// This makes it possible to test the NACKing a resource (which results in
// continuous resending of the resource by the go-control-plane management
// server), in an easier and less flaky way.
func (s) TestCSDS_NACK(t *testing.T) {
// Spin up a xDS management server on a local port.
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
// Create a bootstrap contents pointing to the above management server.
nodeID := uuid.New().String()
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
config, err := bootstrap.NewConfigFromContents(bootstrapContents)
if err != nil {
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err)
}
// We use the default xDS client pool here because the CSDS service reports
// on the state of the default xDS client which is implicitly managed
// within the xdsclient.DefaultPool.
xdsclient.DefaultPool.SetFallbackBootstrapConfig(config)
defer func() { xdsclient.DefaultPool.UnsetBootstrapConfigForTesting() }()
// Create two xDS clients, with different names. These should end up
// creating two different xDS clients.
const xdsClient1Name = "xds-csds-client-1"
xdsClient1, xdsClose1, err := xdsclient.DefaultPool.NewClientForTesting(xdsclient.OptionsForTesting{
Name: xdsClient1Name,
})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer xdsClose1()
const xdsClient2Name = "xds-csds-client-2"
xdsClient2, xdsClose2, err := xdsclient.DefaultPool.NewClientForTesting(xdsclient.OptionsForTesting{
Name: xdsClient2Name,
})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer xdsClose2()
// Start a CSDS server and create a client stream to it.
addr := startCSDSServer(t)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
stream := startCSDSClientStream(ctx, t, addr)
// Verify that the xDS client reports an empty config.
wantNode := &v3corepb.Node{
Id: nodeID,
UserAgentName: "gRPC Go",
UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version},
ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"},
}
wantResp := &v3statuspb.ClientStatusResponse{
Config: []*v3statuspb.ClientConfig{
{
Node: wantNode,
ClientScope: xdsClient1Name,
},
{
Node: wantNode,
ClientScope: xdsClient2Name,
},
},
}
if err := checkClientStatusResponse(ctx, stream, wantResp); err != nil {
t.Fatal(err)
}
// Initialize the xDS resources to be used in this test.
const ldsTarget0, ldsTarget1 = "lds.target.good:0000", "lds.target.good:1111"
listener0 := e2e.DefaultClientListener(ldsTarget0, "rds-name")
listener1 := e2e.DefaultClientListener(ldsTarget1, "rds-name")
listenerAny0 := testutils.MarshalAny(t, listener0)
listenerAny1 := testutils.MarshalAny(t, listener1)
// Register the watchers, one for each xDS client.
watcher1 := nopListenerWatcher{}
watcher2 := newBlockingListenerWatcher(ctx.Done())
xdsresource.WatchListener(xdsClient1, ldsTarget0, watcher1)
xdsresource.WatchListener(xdsClient2, ldsTarget1, watcher2)
// Verify that the xDS client reports the resources as being in "Requested"
// state, and in version "0".
wantResp = &v3statuspb.ClientStatusResponse{
Config: []*v3statuspb.ClientConfig{
{
Node: wantNode,
GenericXdsConfigs: []*v3statuspb.ClientConfig_GenericXdsConfig{
makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTarget0, "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil),
},
ClientScope: xdsClient1Name,
},
{
Node: wantNode,
GenericXdsConfigs: []*v3statuspb.ClientConfig_GenericXdsConfig{
makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTarget1, "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil),
},
ClientScope: xdsClient2Name,
},
},
}
if err := checkClientStatusResponse(ctx, stream, wantResp); err != nil {
t.Fatal(err)
}
// Configure the management server with two listener resources corresponding
// to the watches registered above.
if err := mgmtServer.Update(ctx, e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener0, listener1},
SkipValidation: true,
}); err != nil {
t.Fatal(err)
}
// Verify that the xDS client reports the resources as being in "ACKed"
// state, and in version "1".
wantResp = &v3statuspb.ClientStatusResponse{
Config: []*v3statuspb.ClientConfig{
{
Node: wantNode,
GenericXdsConfigs: []*v3statuspb.ClientConfig_GenericXdsConfig{
makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTarget0, "1", v3adminpb.ClientResourceStatus_ACKED, listenerAny0, nil),
},
ClientScope: xdsClient1Name,
},
{
Node: wantNode,
GenericXdsConfigs: []*v3statuspb.ClientConfig_GenericXdsConfig{
makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTarget1, "1", v3adminpb.ClientResourceStatus_ACKED, listenerAny1, nil),
},
ClientScope: xdsClient2Name,
},
},
}
if err := checkClientStatusResponse(ctx, stream, wantResp); err != nil {
t.Fatal(err)
}
// Unblock reads on the ADS stream by calling the onDone callback sent to
// the watcher.
select {
case <-ctx.Done():
t.Fatal("Timed out waiting for watch callback")
case onDone := <-watcher2.onDoneCh:
onDone()
}
// Update the second resource with an empty ApiListener field which is
// expected to be NACK'ed by the xDS client.
listener1.ApiListener = nil
if err := mgmtServer.Update(ctx, e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener0, listener1},
SkipValidation: true,
}); err != nil {
t.Fatal(err)
}
// Wait for the update to reach the watchers.
select {
case <-ctx.Done():
t.Fatal("Timed out waiting for watch callback")
case onDone := <-watcher2.onDoneCh:
onDone()
}
// Verify that the xDS client reports the first listener resource as being
// ACKed and the second listener resource as being NACKed. The version for
// the ACKed resource would be "2", while that for the NACKed resource would
// be "1". In the NACKed resource, the version which is NACKed is stored in
// the ErrorState field.
wantResp = &v3statuspb.ClientStatusResponse{
Config: []*v3statuspb.ClientConfig{
{
Node: wantNode,
GenericXdsConfigs: []*v3statuspb.ClientConfig_GenericXdsConfig{
makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTarget0, "2", v3adminpb.ClientResourceStatus_ACKED, listenerAny0, nil),
},
ClientScope: xdsClient1Name,
},
{
Node: wantNode,
GenericXdsConfigs: []*v3statuspb.ClientConfig_GenericXdsConfig{
makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTarget1, "1", v3adminpb.ClientResourceStatus_NACKED, listenerAny1, &v3adminpb.UpdateFailureState{VersionInfo: "2"}),
},
ClientScope: xdsClient2Name,
},
},
}
if err := checkClientStatusResponse(ctx, stream, wantResp); err != nil {
t.Fatal(err)
}
}
func makeGenericXdsConfig(typeURL, name, version string, status v3adminpb.ClientResourceStatus, config *anypb.Any, failure *v3adminpb.UpdateFailureState) *v3statuspb.ClientConfig_GenericXdsConfig {
return &v3statuspb.ClientConfig_GenericXdsConfig{
TypeUrl: typeURL,
Name: name,
VersionInfo: version,
ClientStatus: status,
XdsConfig: config,
ErrorState: failure,
}
}
// Repeatedly sends CSDS requests and receives CSDS responses on the provided
// stream and verifies that the response matches `want`. Returns an error if
// sending or receiving on the stream fails, or if the context expires before a
// response matching `want` is received.
//
// Expects client configs in `want` to be sorted on `client_scope` and the
// resource dump to be sorted on type_url and resource name.
func checkClientStatusResponse(ctx context.Context, stream v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, want *v3statuspb.ClientStatusResponse) error {
var cmpOpts = cmp.Options{
protocmp.Transform(),
protocmp.IgnoreFields((*v3statuspb.ClientConfig_GenericXdsConfig)(nil), "last_updated"),
protocmp.IgnoreFields((*v3adminpb.UpdateFailureState)(nil), "last_update_attempt", "details"),
}
var lastErr error
for ; ctx.Err() == nil; <-time.After(100 * time.Millisecond) {
if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
if err != io.EOF {
return fmt.Errorf("failed to send ClientStatusRequest: %v", err)
}
// If the stream has closed, we call Recv() until it returns a non-nil
// error to get the actual error on the stream.
for {
if _, err := stream.Recv(); err != nil {
return fmt.Errorf("failed to recv ClientStatusResponse: %v", err)
}
}
}
got, err := stream.Recv()
if err != nil {
return fmt.Errorf("failed to recv ClientStatusResponse: %v", err)
}
// Sort the client configs based on the `client_scope` field.
slices.SortFunc(got.GetConfig(), func(a, b *v3statuspb.ClientConfig) int {
return strings.Compare(a.ClientScope, b.ClientScope)
})
// Sort the resource configs based on the type_url and name fields.
for _, cc := range got.GetConfig() {
slices.SortFunc(cc.GetGenericXdsConfigs(), func(a, b *v3statuspb.ClientConfig_GenericXdsConfig) int {
if strings.Compare(a.TypeUrl, b.TypeUrl) == 0 {
return strings.Compare(a.Name, b.Name)
}
return strings.Compare(a.TypeUrl, b.TypeUrl)
})
}
diff := cmp.Diff(want, got, cmpOpts)
if diff == "" {
return nil
}
lastErr = fmt.Errorf("received unexpected resource dump, diff (-got, +want):\n%s, got: %s\n want:%s", diff, pretty.ToJSON(got), pretty.ToJSON(want))
}
return fmt.Errorf("timeout when waiting for resource dump to reach expected state: ctxErr: %v, otherErr: %v", ctx.Err(), lastErr)
}
func (s) TestCSDSNoXDSClient(t *testing.T) {
// Create a bootstrap file in a temporary directory. Since we pass an empty
// bootstrap configuration, it will fail xDS client creation because the
// `server_uri` field is unset.
testutils.CreateBootstrapFileForTesting(t, []byte(``))
// Start a CSDS server and create a client stream to it.
addr := startCSDSServer(t)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
stream := startCSDSClientStream(ctx, t, addr)
if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
t.Fatalf("Failed to send ClientStatusRequest: %v", err)
}
r, err := stream.Recv()
if err != nil {
// io.EOF is not ok.
t.Fatalf("Failed to recv ClientStatusResponse: %v", err)
}
if n := len(r.Config); n != 0 {
t.Fatalf("got %d configs, want 0: %v", n, prototext.Format(r))
}
}