xdsclient: switch more transport tests to e2e style (2/N) (#7693)

This commit is contained in:
Easwar Swaminathan 2024-10-07 15:46:06 -07:00 committed by GitHub
parent 9afb2321c4
commit 98d15504f6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 511 additions and 896 deletions

View File

@ -104,6 +104,7 @@ type authorityArgs struct {
serializer *grpcsync.CallbackSerializer
resourceTypeGetter func(string) xdsresource.Type
watchExpiryTimeout time.Duration
backoff func(int) time.Duration // Backoff for ADS and LRS stream failures.
logger *grpclog.PrefixLogger
}
@ -123,6 +124,7 @@ func newAuthority(args authorityArgs) (*authority, error) {
OnRecvHandler: ret.handleResourceUpdate,
OnErrorHandler: ret.newConnectionError,
OnSendHandler: ret.transportOnSendHandler,
Backoff: args.backoff,
Logger: args.logger,
NodeProto: args.bootstrapCfg.Node(),
})

View File

@ -25,6 +25,7 @@ import (
"time"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/cache"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/xds/bootstrap"
@ -53,16 +54,17 @@ const NameForServer = "#server"
// only when all references are released, and it is safe for the caller to
// invoke this close function multiple times.
func New(name string) (XDSClient, func(), error) {
return newRefCounted(name, defaultWatchExpiryTimeout, defaultIdleAuthorityDeleteTimeout)
return newRefCounted(name, defaultWatchExpiryTimeout, defaultIdleAuthorityDeleteTimeout, backoff.DefaultExponential.Backoff)
}
// newClientImpl returns a new xdsClient with the given config.
func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, idleAuthorityDeleteTimeout time.Duration) (*clientImpl, error) {
func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, idleAuthorityDeleteTimeout time.Duration, streamBackoff func(int) time.Duration) (*clientImpl, error) {
ctx, cancel := context.WithCancel(context.Background())
c := &clientImpl{
done: grpcsync.NewEvent(),
config: config,
watchExpiryTimeout: watchExpiryTimeout,
backoff: streamBackoff,
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerClose: cancel,
resourceTypes: newResourceTypeRegistry(),
@ -90,6 +92,11 @@ type OptionsForTesting struct {
// AuthorityIdleTimeout is the timeout before idle authorities are deleted.
// If unspecified, uses the default value used in non-test code.
AuthorityIdleTimeout time.Duration
// StreamBackoffAfterFailure is the backoff function used to determine the
// backoff duration after stream failures. If unspecified, uses the default
// value used in non-test code.
StreamBackoffAfterFailure func(int) time.Duration
}
// NewForTesting returns an xDS client configured with the provided options.
@ -111,11 +118,14 @@ func NewForTesting(opts OptionsForTesting) (XDSClient, func(), error) {
if opts.AuthorityIdleTimeout == 0 {
opts.AuthorityIdleTimeout = defaultIdleAuthorityDeleteTimeout
}
if opts.StreamBackoffAfterFailure == nil {
opts.StreamBackoffAfterFailure = defaultStreamBackoffFunc
}
if err := bootstrap.SetFallbackBootstrapConfig(opts.Contents); err != nil {
return nil, nil, err
}
client, cancel, err := newRefCounted(opts.Name, opts.WatchExpiryTimeout, opts.AuthorityIdleTimeout)
client, cancel, err := newRefCounted(opts.Name, opts.WatchExpiryTimeout, opts.AuthorityIdleTimeout, opts.StreamBackoffAfterFailure)
return client, func() { bootstrap.UnsetFallbackBootstrapConfigForTesting(); cancel() }, err
}

View File

@ -23,6 +23,7 @@ import (
"sync/atomic"
"time"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/xds/bootstrap"
)
@ -37,6 +38,8 @@ var (
// overridden in tests to give them visibility into certain events.
xdsClientImplCreateHook = func(string) {}
xdsClientImplCloseHook = func(string) {}
defaultStreamBackoffFunc = backoff.DefaultExponential.Backoff
)
func clientRefCountedClose(name string) {
@ -60,7 +63,7 @@ func clientRefCountedClose(name string) {
// newRefCounted creates a new reference counted xDS client implementation for
// name, if one does not exist already. If an xDS client for the given name
// exists, it gets a reference to it and returns it.
func newRefCounted(name string, watchExpiryTimeout, idleAuthorityTimeout time.Duration) (XDSClient, func(), error) {
func newRefCounted(name string, watchExpiryTimeout, idleAuthorityTimeout time.Duration, streamBackoff func(int) time.Duration) (XDSClient, func(), error) {
clientsMu.Lock()
defer clientsMu.Unlock()
@ -74,7 +77,7 @@ func newRefCounted(name string, watchExpiryTimeout, idleAuthorityTimeout time.Du
if err != nil {
return nil, nil, fmt.Errorf("xds: failed to get xDS bootstrap config: %v", err)
}
c, err := newClientImpl(config, watchExpiryTimeout, idleAuthorityTimeout)
c, err := newClientImpl(config, watchExpiryTimeout, idleAuthorityTimeout, streamBackoff)
if err != nil {
return nil, nil, err
}

View File

@ -37,6 +37,7 @@ type clientImpl struct {
config *bootstrap.Config
logger *grpclog.PrefixLogger
watchExpiryTimeout time.Duration
backoff func(int) time.Duration // Backoff for ADS and LRS stream failures.
serializer *grpcsync.CallbackSerializer
serializerClose func()
resourceTypes *resourceTypeRegistry

View File

@ -114,6 +114,7 @@ func (c *clientImpl) newAuthorityLocked(config *bootstrap.ServerConfig) (_ *auth
serializer: c.serializer,
resourceTypeGetter: c.resourceTypes.get,
watchExpiryTimeout: c.watchExpiryTimeout,
backoff: c.backoff,
logger: grpclog.NewPrefixLogger(logger, authorityPrefix(c, config.ServerURI())),
})
if err != nil {

View File

@ -0,0 +1,448 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xdsclient_test
import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
"google.golang.org/protobuf/testing/protocmp"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)
// Creates an xDS client with the given bootstrap contents and backoff function.
func createXDSClientWithBackoff(t *testing.T, bootstrapContents []byte, streamBackoff func(int) time.Duration) xdsclient.XDSClient {
t.Helper()
client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{
Name: t.Name(),
StreamBackoffAfterFailure: streamBackoff,
Contents: bootstrapContents,
})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
t.Cleanup(close)
return client
}
// Tests the case where the management server returns an error in the ADS
// streaming RPC. Verifies that the ADS stream is restarted after a backoff
// period, and that the previously requested resources are re-requested on the
// new stream.
func (s) TestADS_BackoffAfterStreamFailure(t *testing.T) {
// Channels used for verifying different events in the test.
streamCloseCh := make(chan struct{}, 1) // ADS stream is closed.
ldsResourcesCh := make(chan []string, 1) // Listener resource names in the discovery request.
backoffCh := make(chan struct{}, 1) // Backoff after stream failure.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Create an xDS management server that returns RPC errors.
streamErr := errors.New("ADS stream error")
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
// Push the requested resource names on to a channel.
if req.GetTypeUrl() == version.V3ListenerURL {
t.Logf("Received LDS request for resources: %v", req.GetResourceNames())
select {
case ldsResourcesCh <- req.GetResourceNames():
case <-ctx.Done():
}
}
// Return an error everytime a request is sent on the stream. This
// should cause the transport to backoff before attempting to
// recreate the stream.
return streamErr
},
// Push on a channel whenever the stream is closed.
OnStreamClosed: func(int64, *v3corepb.Node) {
select {
case streamCloseCh <- struct{}{}:
case <-ctx.Done():
}
},
})
// Override the backoff implementation to push on a channel that is read by
// the test goroutine.
streamBackoff := func(v int) time.Duration {
select {
case backoffCh <- struct{}{}:
case <-ctx.Done():
}
return 0
}
// Create an xDS client with bootstrap pointing to the above server.
nodeID := uuid.New().String()
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
testutils.CreateBootstrapFileForTesting(t, bc)
client := createXDSClientWithBackoff(t, bc, streamBackoff)
// Register a watch for a listener resource.
const listenerName = "listener"
lw := newListenerWatcher()
ldsCancel := xdsresource.WatchListener(client, listenerName, lw)
defer ldsCancel()
// Verify that an ADS stream is created and an LDS request with the above
// resource name is sent.
if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil {
t.Fatal(err)
}
// Verify that the received stream error is reported to the watcher.
u, err := lw.updateCh.Receive(ctx)
if err != nil {
t.Fatal("Timeout when waiting for an error callback on the listener watcher")
}
gotErr := u.(listenerUpdateErrTuple).err
if !strings.Contains(gotErr.Error(), streamErr.Error()) {
t.Fatalf("Received stream error: %v, wantErr: %v", gotErr, streamErr)
}
// Verify that the stream is closed.
select {
case <-streamCloseCh:
case <-ctx.Done():
t.Fatalf("Timeout waiting for stream to be closed after an error")
}
// Verify that the ADS stream backs off before recreating the stream.
select {
case <-backoffCh:
case <-ctx.Done():
t.Fatalf("Timeout waiting for ADS stream to backoff after stream failure")
}
// Verify that the same resource name is re-requested on the new stream.
if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil {
t.Fatal(err)
}
}
// Tests the case where a stream breaks because the server goes down. Verifies
// that when the server comes back up, the same resources are re-requested, this
// time with the previously acked version and an empty nonce.
func (s) TestADS_RetriesAfterBrokenStream(t *testing.T) {
// Channels used for verifying different events in the test.
streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1) // Discovery request is received.
streamResponseCh := make(chan *v3discoverypb.DiscoveryResponse, 1) // Discovery response is received.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Create an xDS management server listening on a local port.
l, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("Failed to create a local listener for the xDS management server: %v", err)
}
lis := testutils.NewRestartableListener(l)
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
Listener: lis,
// Push the received request on to a channel for the test goroutine to
// verify that it matches expectations.
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
select {
case streamRequestCh <- req:
case <-ctx.Done():
}
return nil
},
// Push the response that the management server is about to send on to a
// channel. The test goroutine to uses this to extract the version and
// nonce, expected on subsequent requests.
OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) {
select {
case streamResponseCh <- resp:
case <-ctx.Done():
}
},
})
// Create a listener resource on the management server.
const listenerName = "listener"
const routeConfigName = "route-config"
nodeID := uuid.New().String()
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerName, routeConfigName)},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Override the backoff implementation to always return 0, to reduce test
// run time. Instead control when the backoff returns by blocking on a
// channel, that the test closes.
backoffCh := make(chan struct{})
streamBackoff := func(v int) time.Duration {
select {
case backoffCh <- struct{}{}:
case <-ctx.Done():
}
return 0
}
// Create an xDS client with bootstrap pointing to the above server.
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
testutils.CreateBootstrapFileForTesting(t, bc)
client := createXDSClientWithBackoff(t, bc, streamBackoff)
// Register a watch for a listener resource.
lw := newListenerWatcher()
ldsCancel := xdsresource.WatchListener(client, listenerName, lw)
defer ldsCancel()
// Verify that the initial discovery request matches expectation.
var gotReq *v3discoverypb.DiscoveryRequest
select {
case gotReq = <-streamRequestCh:
case <-ctx.Done():
t.Fatalf("Timeout waiting for discovery request on the stream")
}
wantReq := &v3discoverypb.DiscoveryRequest{
VersionInfo: "",
Node: &v3corepb.Node{
Id: nodeID,
UserAgentName: "gRPC Go",
UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version},
ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"},
},
ResourceNames: []string{listenerName},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
ResponseNonce: "",
}
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff)
}
// Capture the version and nonce from the response.
var gotResp *v3discoverypb.DiscoveryResponse
select {
case gotResp = <-streamResponseCh:
case <-ctx.Done():
t.Fatalf("Timeout waiting for discovery response on the stream")
}
version := gotResp.GetVersionInfo()
nonce := gotResp.GetNonce()
// Verify that the ACK contains the appropriate version and nonce.
wantReq.VersionInfo = version
wantReq.ResponseNonce = nonce
select {
case gotReq = <-streamRequestCh:
case <-ctx.Done():
t.Fatalf("Timeout waiting for the discovery request ACK on the stream")
}
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff)
}
// Verify the update received by the watcher.
wantUpdate := listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: routeConfigName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
}
if err := verifyListenerUpdate(ctx, lw.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}
// Bring down the management server to simulate a broken stream.
lis.Stop()
// Verify that the error callback on the watcher is not invoked.
verifyNoListenerUpdate(ctx, lw.updateCh)
// Wait for backoff to kick in, and unblock the first backoff attempt.
select {
case <-backoffCh:
case <-ctx.Done():
t.Fatal("Timeout waiting for stream backoff")
}
// Bring up the management server. The test does not have prcecise control
// over when new streams to the management server will start succeeding. The
// ADS stream implementation will backoff as many times as required before
// it can successfully create a new stream. Therefore, we need to receive on
// the backoffCh as many times as required, and unblock the backoff
// implementation.
lis.Restart()
go func() {
for {
select {
case <-backoffCh:
case <-ctx.Done():
return
}
}
}()
// Verify that the transport creates a new stream and sends out a new
// request which contains the previously acked version, but an empty nonce.
wantReq.ResponseNonce = ""
select {
case gotReq = <-streamRequestCh:
case <-ctx.Done():
t.Fatalf("Timeout waiting for the discovery request ACK on the stream")
}
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff)
}
}
// Tests the case where a resource is requested before the a valid ADS stream
// exists. Verifies that the a discovery request is sent out for the previously
// requested resource once a valid stream is created.
func (s) TestADS_ResourceRequestedBeforeStreamCreation(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Channels used for verifying different events in the test.
streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1) // Discovery request is received.
// Create an xDS management server listening on a local port.
l, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("Failed to create a local listener: %v", err)
}
lis := testutils.NewRestartableListener(l)
streamErr := errors.New("ADS stream error")
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
Listener: lis,
// Return an error everytime a request is sent on the stream. This
// should cause the transport to backoff before attempting to recreate
// the stream.
OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
select {
case streamRequestCh <- req:
default:
}
return streamErr
},
})
// Bring down the management server before creating the transport. This
// allows us to test the case where SendRequest() is called when there is no
// stream to the management server.
lis.Stop()
// Override the backoff implementation to always return 0, to reduce test
// run time. Instead control when the backoff returns by blocking on a
// channel, that the test closes.
backoffCh := make(chan struct{}, 1)
unblockBackoffCh := make(chan struct{})
streamBackoff := func(v int) time.Duration {
select {
case backoffCh <- struct{}{}:
default:
}
<-unblockBackoffCh
return 0
}
// Create an xDS client with bootstrap pointing to the above server.
nodeID := uuid.New().String()
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
testutils.CreateBootstrapFileForTesting(t, bc)
client := createXDSClientWithBackoff(t, bc, streamBackoff)
// Register a watch for a listener resource.
const listenerName = "listener"
lw := newListenerWatcher()
ldsCancel := xdsresource.WatchListener(client, listenerName, lw)
defer ldsCancel()
// The above watch results in an attempt to create a new stream, which will
// fail, and will result in backoff. Wait for backoff to kick in.
select {
case <-backoffCh:
case <-ctx.Done():
t.Fatal("Timeout waiting for stream backoff")
}
// Bring up the connection to the management server, and unblock the backoff
// implementation.
lis.Restart()
close(unblockBackoffCh)
// Verify that the initial discovery request matches expectation.
var gotReq *v3discoverypb.DiscoveryRequest
select {
case gotReq = <-streamRequestCh:
case <-ctx.Done():
t.Fatalf("Timeout waiting for discovery request on the stream")
}
wantReq := &v3discoverypb.DiscoveryRequest{
VersionInfo: "",
Node: &v3corepb.Node{
Id: nodeID,
UserAgentName: "gRPC Go",
UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version},
ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"},
},
ResourceNames: []string{listenerName},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
ResponseNonce: "",
}
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff)
}
}
// waitForResourceNames waits for the wantNames to be received on namesCh.
// Returns a non-nil error if the context expires before that.
func waitForResourceNames(ctx context.Context, t *testing.T, namesCh chan []string, wantNames []string) error {
t.Helper()
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
select {
case <-ctx.Done():
case gotNames := <-namesCh:
if cmp.Equal(gotNames, wantNames, cmpopts.EquateEmpty(), cmpopts.SortSlices(func(s1, s2 string) bool { return s1 < s2 })) {
return nil
}
t.Logf("Received resource names %v, want %v", gotNames, wantNames)
}
}
return fmt.Errorf("timeout waiting for resource to be requested from the management server")
}

View File

@ -54,15 +54,17 @@ import (
_ "google.golang.org/grpc/xds/internal/httpfilter/router" // Register the router filter.
)
// startFakeManagementServer starts a fake xDS management server and returns a
// startFakeManagementServer starts a fake xDS management server and registers a
// cleanup function to close the fake server.
func startFakeManagementServer(t *testing.T) (*fakeserver.Server, func()) {
func startFakeManagementServer(t *testing.T) *fakeserver.Server {
t.Helper()
fs, sCleanup, err := fakeserver.StartServer(nil)
fs, cleanup, err := fakeserver.StartServer(nil)
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
return fs, sCleanup
t.Logf("Started xDS management server on %s", fs.Address)
t.Cleanup(cleanup)
return fs
}
func compareUpdateMetadata(ctx context.Context, dumpFunc func() *v3statuspb.ClientStatusResponse, want []*v3statuspb.ClientConfig_GenericXdsConfig) error {
@ -276,9 +278,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) {
t.Run(test.desc, func(t *testing.T) {
// Create a fake xDS management server listening on a local port,
// and set it up with the response to send.
mgmtServer, cleanup := startFakeManagementServer(t)
defer cleanup()
t.Logf("Started xDS management server on %s", mgmtServer.Address)
mgmtServer := startFakeManagementServer(t)
// Create an xDS client talking to the above management server.
nodeID := uuid.New().String()
@ -292,7 +292,6 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) {
t.Fatalf("Failed to create an xDS client: %v", err)
}
defer close()
t.Logf("Created xDS client to %s", mgmtServer.Address)
// Register a watch, and push the results on to a channel.
lw := newListenerWatcher()
@ -555,9 +554,7 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) {
t.Run(test.desc, func(t *testing.T) {
// Create a fake xDS management server listening on a local port,
// and set it up with the response to send.
mgmtServer, cleanup := startFakeManagementServer(t)
defer cleanup()
t.Logf("Started xDS management server on %s", mgmtServer.Address)
mgmtServer := startFakeManagementServer(t)
// Create an xDS client talking to the above management server.
nodeID := uuid.New().String()
@ -571,7 +568,6 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) {
t.Fatalf("Failed to create an xDS client: %v", err)
}
defer close()
t.Logf("Created xDS client to %s", mgmtServer.Address)
// Register a watch, and push the results on to a channel.
rw := newRouteConfigWatcher()
@ -795,9 +791,7 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
t.Run(test.desc, func(t *testing.T) {
// Create a fake xDS management server listening on a local port,
// and set it up with the response to send.
mgmtServer, cleanup := startFakeManagementServer(t)
defer cleanup()
t.Logf("Started xDS management server on %s", mgmtServer.Address)
mgmtServer := startFakeManagementServer(t)
// Create an xDS client talking to the above management server.
nodeID := uuid.New().String()
@ -811,7 +805,6 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
t.Fatalf("Failed to create an xDS client: %v", err)
}
defer close()
t.Logf("Created xDS client to %s", mgmtServer.Address)
// Register a watch, and push the results on to a channel.
cw := newClusterWatcher()
@ -1147,9 +1140,7 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) {
t.Run(test.desc, func(t *testing.T) {
// Create a fake xDS management server listening on a local port,
// and set it up with the response to send.
mgmtServer, cleanup := startFakeManagementServer(t)
defer cleanup()
t.Logf("Started xDS management server on %s", mgmtServer.Address)
mgmtServer := startFakeManagementServer(t)
// Create an xDS client talking to the above management server.
nodeID := uuid.New().String()
@ -1163,7 +1154,6 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) {
t.Fatalf("Failed to create an xDS client: %v", err)
}
defer close()
t.Logf("Created xDS client to %s", mgmtServer.Address)
// Register a watch, and push the results on to a channel.
ew := newEndpointsWatcher()

View File

@ -52,6 +52,17 @@ var (
)
)
// startFakeManagementServer starts a fake xDS management server and returns a
// cleanup function to close the fake server.
func startFakeManagementServer(t *testing.T) (*fakeserver.Server, func()) {
t.Helper()
fs, sCleanup, err := fakeserver.StartServer(nil)
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
return fs, sCleanup
}
func (s) TestReportLoad(t *testing.T) {
// Create a fake xDS management server listening on a local port.
mgmtServer, cleanup := startFakeManagementServer(t)

View File

@ -1,449 +0,0 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package transport_test
import (
"context"
"errors"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/transport"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/anypb"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)
var strSort = func(s1, s2 string) bool { return s1 < s2 }
var noopRecvHandler = func(_ transport.ResourceUpdate, onDone func()) error {
onDone()
return nil
}
// TestTransport_BackoffAfterStreamFailure tests the case where the management
// server returns an error in the ADS streaming RPC. The test verifies the
// following:
// 1. Initial discovery request matches expectation.
// 2. RPC error is propagated via the stream error handler.
// 3. When the stream is closed, the transport backs off.
// 4. The same discovery request is sent on the newly created stream.
func (s) TestTransport_BackoffAfterStreamFailure(t *testing.T) {
// Channels used for verifying different events in the test.
streamCloseCh := make(chan struct{}, 1) // ADS stream is closed.
streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1) // Discovery request is received.
backoffCh := make(chan struct{}, 1) // Transport backoff after stream failure.
streamErrCh := make(chan error, 1) // Stream error seen by the transport.
// Create an xDS management server listening on a local port.
streamErr := errors.New("ADS stream error")
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
// Push on a channel whenever the stream is closed.
OnStreamClosed: func(int64, *v3corepb.Node) {
select {
case streamCloseCh <- struct{}{}:
default:
}
},
// Return an error everytime a request is sent on the stream. This
// should cause the transport to backoff before attempting to recreate
// the stream.
OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
select {
case streamRequestCh <- req:
default:
}
return streamErr
},
})
// Override the backoff implementation to push on a channel that is read by
// the test goroutine.
transportBackoff := func(v int) time.Duration {
select {
case backoffCh <- struct{}{}:
default:
}
return 0
}
serverCfg, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: mgmtServer.Address})
if err != nil {
t.Fatalf("Failed to create server config for testing: %v", err)
}
// Create a new transport. Since we are only testing backoff behavior here,
// we can pass a no-op data model layer implementation.
nodeID := uuid.New().String()
tr, err := transport.New(transport.Options{
ServerCfg: serverCfg,
OnRecvHandler: noopRecvHandler, // No data model layer validation.
OnErrorHandler: func(err error) {
select {
case streamErrCh <- err:
default:
}
},
OnSendHandler: func(*transport.ResourceSendInfo) {},
Backoff: transportBackoff,
NodeProto: &v3corepb.Node{Id: nodeID},
})
if err != nil {
t.Fatalf("Failed to create xDS transport: %v", err)
}
defer tr.Close()
// Send a discovery request through the transport.
const resourceName = "resource name"
tr.SendRequest(version.V3ListenerURL, []string{resourceName})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Verify that the initial discovery request matches expectation.
var gotReq *v3discoverypb.DiscoveryRequest
select {
case gotReq = <-streamRequestCh:
case <-ctx.Done():
t.Fatalf("Timeout waiting for discovery request on the stream")
}
wantReq := &v3discoverypb.DiscoveryRequest{
VersionInfo: "",
Node: &v3corepb.Node{Id: nodeID},
ResourceNames: []string{resourceName},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
ResponseNonce: "",
}
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff)
}
// Verify that the received stream error is reported to the user.
var gotErr error
select {
case gotErr = <-streamErrCh:
case <-ctx.Done():
t.Fatalf("Timeout waiting for stream error to be reported to the user")
}
if !strings.Contains(gotErr.Error(), streamErr.Error()) {
t.Fatalf("Received stream error: %v, wantErr: %v", gotErr, streamErr)
}
// Verify that the stream is closed.
select {
case <-streamCloseCh:
case <-ctx.Done():
t.Fatalf("Timeout waiting for stream to be closed after an error")
}
// Verify that the transport backs off before recreating the stream.
select {
case <-backoffCh:
case <-ctx.Done():
t.Fatalf("Timeout waiting for transport to backoff after stream failure")
}
// Verify that the same discovery request is resent on the new stream.
select {
case gotReq = <-streamRequestCh:
case <-ctx.Done():
t.Fatalf("Timeout waiting for discovery request on the stream")
}
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff)
}
}
// TestTransport_RetriesAfterBrokenStream tests the case where a stream breaks
// because the server goes down. The test verifies the following:
// 1. Initial discovery request matches expectation.
// 2. Good response from the server leads to an ACK with appropriate version.
// 3. Management server going down, leads to stream failure.
// 4. Once the management server comes back up, the same resources are
// re-requested, this time with an empty nonce.
func (s) TestTransport_RetriesAfterBrokenStream(t *testing.T) {
// Channels used for verifying different events in the test.
streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1) // Discovery request is received.
streamResponseCh := make(chan *v3discoverypb.DiscoveryResponse, 1) // Discovery response is received.
streamErrCh := make(chan error, 1) // Stream error seen by the transport.
// Create an xDS management server listening on a local port.
l, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("Failed to create a local listener for the xDS management server: %v", err)
}
lis := testutils.NewRestartableListener(l)
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
Listener: lis,
// Push the received request on to a channel for the test goroutine to
// verify that it matches expectations.
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
select {
case streamRequestCh <- req:
default:
}
return nil
},
// Push the response that the management server is about to send on to a
// channel. The test goroutine to uses this to extract the version and
// nonce, expected on subsequent requests.
OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) {
select {
case streamResponseCh <- resp:
default:
}
},
})
// Configure the management server with appropriate resources.
apiListener := &v3listenerpb.ApiListener{
ApiListener: func() *anypb.Any {
return testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{
Rds: &v3httppb.Rds{
ConfigSource: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
},
RouteConfigName: "route-configuration-name",
},
},
})
}(),
}
const resourceName1 = "resource name 1"
const resourceName2 = "resource name 2"
listenerResource1 := &v3listenerpb.Listener{
Name: resourceName1,
ApiListener: apiListener,
}
listenerResource2 := &v3listenerpb.Listener{
Name: resourceName2,
ApiListener: apiListener,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
nodeID := uuid.New().String()
mgmtServer.Update(ctx, e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listenerResource1, listenerResource2},
SkipValidation: true,
})
serverCfg, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: mgmtServer.Address})
if err != nil {
t.Fatalf("Failed to create server config for testing: %v", err)
}
// Create a new transport. Since we are only testing backoff behavior here,
// we can pass a no-op data model layer implementation.
tr, err := transport.New(transport.Options{
ServerCfg: serverCfg,
OnRecvHandler: noopRecvHandler, // No data model layer validation.
OnErrorHandler: func(err error) {
select {
case streamErrCh <- err:
default:
}
},
OnSendHandler: func(*transport.ResourceSendInfo) {},
Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff.
NodeProto: &v3corepb.Node{Id: nodeID},
})
if err != nil {
t.Fatalf("Failed to create xDS transport: %v", err)
}
defer tr.Close()
// Send a discovery request through the transport.
tr.SendRequest(version.V3ListenerURL, []string{resourceName1, resourceName2})
// Verify that the initial discovery request matches expectation.
var gotReq *v3discoverypb.DiscoveryRequest
select {
case gotReq = <-streamRequestCh:
case <-ctx.Done():
t.Fatalf("Timeout waiting for discovery request on the stream")
}
wantReq := &v3discoverypb.DiscoveryRequest{
VersionInfo: "",
Node: &v3corepb.Node{Id: nodeID},
ResourceNames: []string{resourceName1, resourceName2},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
ResponseNonce: "",
}
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform(), cmpopts.SortSlices(strSort)); diff != "" {
t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff)
}
// Capture the version and nonce from the response.
var gotResp *v3discoverypb.DiscoveryResponse
select {
case gotResp = <-streamResponseCh:
case <-ctx.Done():
t.Fatalf("Timeout waiting for discovery response on the stream")
}
version := gotResp.GetVersionInfo()
nonce := gotResp.GetNonce()
// Verify that the ACK contains the appropriate version and nonce.
wantReq.VersionInfo = version
wantReq.ResponseNonce = nonce
select {
case gotReq = <-streamRequestCh:
case <-ctx.Done():
t.Fatalf("Timeout waiting for the discovery request ACK on the stream")
}
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform(), cmpopts.SortSlices(strSort)); diff != "" {
t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff)
}
// Bring down the management server to simulate a broken stream.
lis.Stop()
// We don't care about the exact error here and it can vary based on which
// error gets reported first, the Recv() failure or the new stream creation
// failure. So, all we check here is whether we get an error or not.
select {
case <-streamErrCh:
case <-ctx.Done():
t.Fatalf("Timeout waiting for stream error to be reported to the user")
}
// Bring up the connection to the management server.
lis.Restart()
// Verify that the transport creates a new stream and sends out a new
// request which contains the previously acked version, but an empty nonce.
wantReq.ResponseNonce = ""
select {
case gotReq = <-streamRequestCh:
case <-ctx.Done():
t.Fatalf("Timeout waiting for the discovery request ACK on the stream")
}
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform(), cmpopts.SortSlices(strSort)); diff != "" {
t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff)
}
}
// TestTransport_ResourceRequestedBeforeStreamCreation tests the case where a
// resource is requested before the transport has a valid stream. Verifies that
// the transport sends out the request once it has a valid stream.
func (s) TestTransport_ResourceRequestedBeforeStreamCreation(t *testing.T) {
// Channels used for verifying different events in the test.
streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1) // Discovery request is received.
// Create an xDS management server listening on a local port.
l, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("Failed to create a local listener for the xDS management server: %v", err)
}
lis := testutils.NewRestartableListener(l)
streamErr := errors.New("ADS stream error")
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
Listener: lis,
// Return an error everytime a request is sent on the stream. This
// should cause the transport to backoff before attempting to recreate
// the stream.
OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
select {
case streamRequestCh <- req:
default:
}
return streamErr
},
})
// Bring down the management server before creating the transport. This
// allows us to test the case where SendRequest() is called when there is no
// stream to the management server.
lis.Stop()
serverCfg, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: mgmtServer.Address})
if err != nil {
t.Fatalf("Failed to create server config for testing: %v", err)
}
// Create a new transport. Since we are only testing backoff behavior here,
// we can pass a no-op data model layer implementation.
nodeID := uuid.New().String()
tr, err := transport.New(transport.Options{
ServerCfg: serverCfg,
OnRecvHandler: noopRecvHandler, // No data model layer validation.
OnErrorHandler: func(error) {}, // No stream error handling.
OnSendHandler: func(*transport.ResourceSendInfo) {}, // No on send handler
Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff.
NodeProto: &v3corepb.Node{Id: nodeID},
})
if err != nil {
t.Fatalf("Failed to create xDS transport: %v", err)
}
defer tr.Close()
// Send a discovery request through the transport.
const resourceName = "resource name"
tr.SendRequest(version.V3ListenerURL, []string{resourceName})
// Wait until the transport has attempted to connect to the management
// server and has seen the connection fail. In this case, since the
// connection is down, and the transport creates streams with WaitForReady()
// set to true, stream creation will never fail (unless the context
// expires), and therefore we cannot rely on the stream error handler.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
if tr.ChannelConnectivityStateForTesting() == connectivity.TransientFailure {
break
}
}
lis.Restart()
// Verify that the initial discovery request matches expectation.
var gotReq *v3discoverypb.DiscoveryRequest
select {
case gotReq = <-streamRequestCh:
case <-ctx.Done():
t.Fatalf("Timeout waiting for discovery request on the stream")
}
wantReq := &v3discoverypb.DiscoveryRequest{
VersionInfo: "",
Node: &v3corepb.Node{Id: nodeID},
ResourceNames: []string{resourceName},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
ResponseNonce: "",
}
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff)
}
}

View File

@ -1,422 +0,0 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package transport_test contains e2e style tests for the xDS transport
// implementation. It uses the envoy-go-control-plane as the management server.
package transport_test
import (
"context"
"errors"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/fakeserver"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/transport"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/anypb"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
const (
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond
)
// startFakeManagementServer starts a fake xDS management server and returns a
// cleanup function to close the fake server.
func startFakeManagementServer(t *testing.T) (*fakeserver.Server, func()) {
t.Helper()
fs, sCleanup, err := fakeserver.StartServer(nil)
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
return fs, sCleanup
}
// resourcesWithTypeURL wraps resources and type URL received from server.
type resourcesWithTypeURL struct {
resources []*anypb.Any
url string
}
// TestHandleResponseFromManagementServer covers different scenarios of the
// transport receiving a response from the management server. In all scenarios,
// the transport is expected to pass the received responses as-is to the data
// model layer for validation and not perform any validation on its own.
func (s) TestHandleResponseFromManagementServer(t *testing.T) {
const (
resourceName1 = "resource-name-1"
resourceName2 = "resource-name-2"
)
var (
badlyMarshaledResource = &anypb.Any{
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
Value: []byte{1, 2, 3, 4},
}
apiListener = &v3listenerpb.ApiListener{
ApiListener: func() *anypb.Any {
return testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{
Rds: &v3httppb.Rds{
ConfigSource: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
},
RouteConfigName: "route-configuration-name",
},
},
})
}(),
}
resource1 = &v3listenerpb.Listener{
Name: resourceName1,
ApiListener: apiListener,
}
resource2 = &v3listenerpb.Listener{
Name: resourceName2,
ApiListener: apiListener,
}
)
tests := []struct {
desc string
resourceNamesToRequest []string
managementServerResponse *v3discoverypb.DiscoveryResponse
wantURL string
wantResources []*anypb.Any
}{
{
desc: "badly marshaled response",
resourceNamesToRequest: []string{resourceName1},
managementServerResponse: &v3discoverypb.DiscoveryResponse{
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
Resources: []*anypb.Any{badlyMarshaledResource},
},
wantURL: "type.googleapis.com/envoy.config.listener.v3.Listener",
wantResources: []*anypb.Any{badlyMarshaledResource},
},
{
desc: "empty response",
resourceNamesToRequest: []string{resourceName1},
managementServerResponse: &v3discoverypb.DiscoveryResponse{},
wantURL: "",
wantResources: nil,
},
{
desc: "one good resource",
resourceNamesToRequest: []string{resourceName1},
managementServerResponse: &v3discoverypb.DiscoveryResponse{
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
Resources: []*anypb.Any{testutils.MarshalAny(t, resource1)},
},
wantURL: "type.googleapis.com/envoy.config.listener.v3.Listener",
wantResources: []*anypb.Any{testutils.MarshalAny(t, resource1)},
},
{
desc: "two good resources",
resourceNamesToRequest: []string{resourceName1, resourceName2},
managementServerResponse: &v3discoverypb.DiscoveryResponse{
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
Resources: []*anypb.Any{testutils.MarshalAny(t, resource1), testutils.MarshalAny(t, resource2)},
},
wantURL: "type.googleapis.com/envoy.config.listener.v3.Listener",
wantResources: []*anypb.Any{testutils.MarshalAny(t, resource1), testutils.MarshalAny(t, resource2)},
},
{
desc: "two resources when we requested one",
resourceNamesToRequest: []string{resourceName1},
managementServerResponse: &v3discoverypb.DiscoveryResponse{
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
Resources: []*anypb.Any{testutils.MarshalAny(t, resource1), testutils.MarshalAny(t, resource2)},
},
wantURL: "type.googleapis.com/envoy.config.listener.v3.Listener",
wantResources: []*anypb.Any{testutils.MarshalAny(t, resource1), testutils.MarshalAny(t, resource2)},
},
}
for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
// Create a fake xDS management server listening on a local port,
// and set it up with the response to send.
mgmtServer, cleanup := startFakeManagementServer(t)
defer cleanup()
t.Logf("Started xDS management server on %s", mgmtServer.Address)
mgmtServer.XDSResponseChan <- &fakeserver.Response{Resp: test.managementServerResponse}
serverCfg, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: mgmtServer.Address})
if err != nil {
t.Fatalf("Failed to create server config for testing: %v", err)
}
// Create a new transport.
resourcesCh := testutils.NewChannel()
tr, err := transport.New(transport.Options{
ServerCfg: serverCfg,
// No validation. Simply push received resources on a channel.
OnRecvHandler: func(update transport.ResourceUpdate, onDone func()) error {
resourcesCh.Send(&resourcesWithTypeURL{
resources: update.Resources,
url: update.URL,
// Ignore resource version here.
})
onDone()
return nil
},
OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling.
OnErrorHandler: func(error) {}, // No stream error handling.
Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff.
NodeProto: &v3corepb.Node{Id: uuid.New().String()},
})
if err != nil {
t.Fatalf("Failed to create xDS transport: %v", err)
}
defer tr.Close()
// Send the request, and validate that the response sent by the
// management server is propagated to the data model layer.
tr.SendRequest(version.V3ListenerURL, test.resourceNamesToRequest)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
v, err := resourcesCh.Receive(ctx)
if err != nil {
t.Fatalf("Failed to receive resources at the data model layer: %v", err)
}
gotURL := v.(*resourcesWithTypeURL).url
gotResources := v.(*resourcesWithTypeURL).resources
if gotURL != test.wantURL {
t.Fatalf("Received resource URL in response: %s, want %s", gotURL, test.wantURL)
}
if diff := cmp.Diff(gotResources, test.wantResources, protocmp.Transform()); diff != "" {
t.Fatalf("Received unexpected resources. Diff (-got, +want):\n%s", diff)
}
})
}
}
func (s) TestEmptyListenerResourceOnStreamRestart(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
mgmtServer, cleanup := startFakeManagementServer(t)
defer cleanup()
t.Logf("Started xDS management server on %s", mgmtServer.Address)
serverCfg, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: mgmtServer.Address})
if err != nil {
t.Fatalf("Failed to create server config for testing: %v", err)
}
nodeProto := &v3corepb.Node{Id: uuid.New().String()}
tr, err := transport.New(transport.Options{
ServerCfg: serverCfg,
OnRecvHandler: noopRecvHandler, // No data model layer validation.
OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling.
OnErrorHandler: func(error) {}, // No stream error handling.
Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff.
NodeProto: nodeProto,
})
if err != nil {
t.Fatalf("Failed to create xDS transport: %v", err)
}
defer tr.Close()
// Send a request for a listener resource.
const resource = "some-resource"
tr.SendRequest(version.V3ListenerURL, []string{resource})
// Ensure the proper request was sent.
val, err := mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq := &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
Node: nodeProto,
ResourceNames: []string{resource},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
}}
gotReq := val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}
// Remove the subscription by requesting an empty list.
tr.SendRequest(version.V3ListenerURL, []string{})
// Ensure the proper request was sent.
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
ResourceNames: []string{},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
}}
gotReq = val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}
// Cause the stream to restart.
mgmtServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("go away")}
// Ensure no request is sent since there are no resources.
ctxShort, cancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer cancel()
if got, err := mgmtServer.XDSRequestChan.Receive(ctxShort); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("mgmt server received request: %v; wanted DeadlineExceeded error", got)
}
tr.SendRequest(version.V3ListenerURL, []string{resource})
// Ensure the proper request was sent with the node proto.
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
Node: nodeProto,
ResourceNames: []string{resource},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
}}
gotReq = val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}
}
func (s) TestEmptyClusterResourceOnStreamRestartWithListener(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
mgmtServer, cleanup := startFakeManagementServer(t)
defer cleanup()
t.Logf("Started xDS management server on %s", mgmtServer.Address)
serverCfg, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: mgmtServer.Address})
if err != nil {
t.Fatalf("Failed to create server config for testing: %v", err)
}
nodeProto := &v3corepb.Node{Id: uuid.New().String()}
tr, err := transport.New(transport.Options{
ServerCfg: serverCfg,
OnRecvHandler: noopRecvHandler, // No data model layer validation.
OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling.
OnErrorHandler: func(error) {}, // No stream error handling.
Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff.
NodeProto: nodeProto,
})
if err != nil {
t.Fatalf("Failed to create xDS transport: %v", err)
}
defer tr.Close()
// Send a request for a listener resource.
const resource = "some-resource"
tr.SendRequest(version.V3ListenerURL, []string{resource})
// Ensure the proper request was sent.
val, err := mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq := &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
Node: nodeProto,
ResourceNames: []string{resource},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
}}
gotReq := val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}
// Send a request for a cluster resource.
tr.SendRequest(version.V3ClusterURL, []string{resource})
// Ensure the proper request was sent.
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
ResourceNames: []string{resource},
TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster",
}}
gotReq = val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}
// Remove the cluster subscription by requesting an empty list.
tr.SendRequest(version.V3ClusterURL, []string{})
// Ensure the proper request was sent.
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
ResourceNames: []string{},
TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster",
}}
gotReq = val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}
// Cause the stream to restart.
mgmtServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("go away")}
// Ensure the proper LDS request was sent.
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
Node: nodeProto,
ResourceNames: []string{resource},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
}}
gotReq = val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}
// Ensure no cluster request is sent since there are no cluster resources.
ctxShort, cancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer cancel()
if got, err := mgmtServer.XDSRequestChan.Receive(ctxShort); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("mgmt server received request: %v; wanted DeadlineExceeded error", got)
}
}

View File

@ -22,10 +22,12 @@ import (
"encoding/json"
"net"
"testing"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpctest"
internalbootstrap "google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/xds/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/transport"
@ -34,6 +36,24 @@ import (
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
)
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
const (
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond
)
var noopRecvHandler = func(_ transport.ResourceUpdate, onDone func()) error {
onDone()
return nil
}
func (s) TestNewWithGRPCDial(t *testing.T) {
// Override the dialer with a custom one.
customDialerCalled := false