grpc-go/xds/server_resource_ext_test.go

651 lines
26 KiB
Go

/*
*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xds_test
import (
"context"
"fmt"
"io"
"net"
"strings"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/xds"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
"google.golang.org/protobuf/types/known/wrapperspb"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)
// Tests the case where an LDS points to an RDS which returns resource not
// found. Before getting the resource not found, the xDS Server has not received
// all configuration needed, so it should Accept and Close any new connections.
// After it has received the resource not found error, the server should move to
// serving, successfully Accept Connections, and fail at the L7 level with
// resource not found specified.
func (s) TestServer_RouteConfiguration_ResourceNotFound(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
routeConfigNamesCh := make(chan []string, 1)
managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
if req.TypeUrl == version.V3RouteConfigURL {
select {
case routeConfigNamesCh <- req.GetResourceNames():
case <-ctx.Done():
}
}
return nil
},
AllowResourceSubset: true,
})
// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address)
// Setup the management server to respond with a listener resource that
// specifies a route name to watch, and no RDS resource corresponding to
// this route name.
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("Failed to retrieve host and port of server: %v", err)
}
const routeConfigResourceName = "routeName"
listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, routeConfigResourceName)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
SkipValidation: true,
}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
modeChangeHandler := newServingModeChangeHandler(t)
modeChangeOpt := xds.ServingModeCallback(modeChangeHandler.modeChangeCallback)
config, err := bootstrap.NewConfigFromContents(bootstrapContents)
if err != nil {
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err)
}
pool := xdsclient.NewPool(config)
createStubServer(t, lis, modeChangeOpt, xds.ClientPoolForTesting(pool))
// Wait for the route configuration resource to be requested from the
// management server.
select {
case gotNames := <-routeConfigNamesCh:
if !cmp.Equal(gotNames, []string{routeConfigResourceName}) {
t.Fatalf("Requested route config resource names: %v, want %v", gotNames, []string{routeConfigResourceName})
}
case <-ctx.Done():
t.Fatal("Timeout waiting for route config resource to be requested")
}
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()
waitForFailedRPCWithStatus(ctx, t, cc, codes.Unavailable, "", "")
// Lookup the xDS client in use based on the dedicated well-known key, as
// defined in A71, used by the xDS enabled gRPC server.
xdsC, close, err := pool.GetClientForTesting(xdsclient.NameForServer)
if err != nil {
t.Fatalf("Failed to find xDS client for configuration: %v", string(bootstrapContents))
}
defer close()
// Invoke resource not found error for the route configuration resource.
// This should cause the server to go SERVING, but fail RPCs with the
// appropriate error code.
triggerResourceNotFound := internal.TriggerXDSResourceNotFoundForTesting.(func(xdsclient.XDSClient, xdsresource.Type, string) error)
routeConfigResourceType := xdsinternal.ResourceTypeMapForTesting[version.V3RouteConfigURL].(xdsresource.Type)
if err := triggerResourceNotFound(xdsC, routeConfigResourceType, routeConfigResourceName); err != nil {
t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
}
select {
case <-ctx.Done():
t.Fatal("Timeout waiting for the xDS-enabled gRPC server to go SERVING")
case gotMode := <-modeChangeHandler.modeCh:
if gotMode != connectivity.ServingModeServing {
t.Fatalf("Mode changed to %v, want %v", gotMode, connectivity.ServingModeServing)
}
}
waitForFailedRPCWithStatus(ctx, t, cc, codes.Unavailable, "error from xDS configuration for matched route configuration", nodeID)
}
// Tests the scenario where the control plane sends the same resource update. It
// verifies that the mode change callback is not invoked and client connections
// to the server are not recycled.
func (s) TestServer_RedundantUpdateSuppression(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address)
// Setup the management server to respond with the listener resources.
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("Failed to retrieve host and port of server: %v", err)
}
listener := e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, "routeName")
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Start an xDS-enabled gRPC server with the above bootstrap configuration.
config, err := bootstrap.NewConfigFromContents(bootstrapContents)
if err != nil {
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err)
}
pool := xdsclient.NewPool(config)
modeChangeHandler := newServingModeChangeHandler(t)
modeChangeOpt := xds.ServingModeCallback(modeChangeHandler.modeChangeCallback)
createStubServer(t, lis, modeChangeOpt, xds.ClientPoolForTesting(pool))
select {
case <-ctx.Done():
t.Fatalf("Timed out waiting for a mode change update: %v", err)
case mode := <-modeChangeHandler.modeCh:
if mode != connectivity.ServingModeServing {
t.Fatalf("Listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}
// Create a ClientConn and make a successful RPCs.
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient(%q) failed: %v", lis.Addr(), err)
}
defer cc.Close()
waitForSuccessfulRPC(ctx, t, cc)
// Start a goroutine to make sure that we do not see any connectivity state
// changes on the client connection. If redundant updates are not
// suppressed, server will recycle client connections.
errCh := make(chan error, 1)
go func() {
prev := connectivity.Ready // We know we are READY since we just did an RPC.
for {
curr := cc.GetState()
if !(curr == connectivity.Ready || curr == connectivity.Idle) {
errCh <- fmt.Errorf("unexpected connectivity state change {%s --> %s} on the client connection", prev, curr)
return
}
if !cc.WaitForStateChange(ctx, curr) {
// Break out of the for loop when the context has been cancelled.
break
}
prev = curr
}
errCh <- nil
}()
// Update the management server with the same listener resource. This will
// update the resource version though, and should result in a the management
// server sending the same resource to the xDS-enabled gRPC server.
if err := managementServer.Update(ctx, e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
}); err != nil {
t.Fatal(err)
}
// Since redundant resource updates are suppressed, we should not see the
// mode change callback being invoked.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
select {
case <-sCtx.Done():
case mode := <-modeChangeHandler.modeCh:
t.Fatalf("Unexpected mode change callback with new mode %v", mode)
}
// Make sure RPCs continue to succeed.
waitForSuccessfulRPC(ctx, t, cc)
// Cancel the context to ensure that the WaitForStateChange call exits early
// and returns false.
cancel()
if err := <-errCh; err != nil {
t.Fatal(err)
}
}
// Tests the case where the route configuration contains an unsupported route
// action. Verifies that RPCs fail with UNAVAILABLE.
func (s) TestServer_FailWithRouteActionRoute(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Start an xDS management server.
managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address)
// Configure the managegement server with a listener and route configuration
// resource for the above xDS enabled gRPC server.
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("Failed to listen to local port: %v", err)
}
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("Failed to retrieve host and port of server: %v", err)
}
const routeConfigName = "routeName"
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")},
Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigNonForwardingAction(routeConfigName)},
}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Start an xDS-enabled gRPC server with the above bootstrap configuration.
config, err := bootstrap.NewConfigFromContents(bootstrapContents)
if err != nil {
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err)
}
pool := xdsclient.NewPool(config)
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("Serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
})
createStubServer(t, lis, modeChangeOpt, xds.ClientPoolForTesting(pool))
// Create a gRPC channel and verify that RPCs succeed.
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient(%q) failed: %v", lis.Addr(), err)
}
defer cc.Close()
waitForSuccessfulRPC(ctx, t, cc)
// Update the route config resource to contain an unsupported action.
//
// "NonForwardingAction is expected for all Routes used on server-side; a
// route with an inappropriate action causes RPCs matching that route to
// fail with UNAVAILABLE." - A36
resources.Routes = []*v3routepb.RouteConfiguration{e2e.RouteConfigFilterAction("routeName")}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
waitForFailedRPCWithStatus(ctx, t, cc, codes.Unavailable, "the incoming RPC matched to a route that was not of action type non forwarding", nodeID)
}
// Tests the case where the listener resource is removed from the management
// server. This should cause the xDS server to transition to NOT_SERVING mode,
// and the error message should contain the xDS node ID.
func (s) TestServer_ListenerResourceRemoved(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Start an xDS management server.
managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address)
// Configure the managegement server with a listener and route configuration
// resource for the above xDS enabled gRPC server.
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("Failed to listen to local port: %v", err)
}
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("Failed to retrieve host and port of server: %v", err)
}
const routeConfigName = "routeName"
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")},
Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigNonForwardingAction(routeConfigName)},
SkipValidation: true,
}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Start an xDS-enabled gRPC server with the above bootstrap configuration.
config, err := bootstrap.NewConfigFromContents(bootstrapContents)
if err != nil {
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err)
}
pool := xdsclient.NewPool(config)
modeChangeHandler := newServingModeChangeHandler(t)
modeChangeOpt := xds.ServingModeCallback(modeChangeHandler.modeChangeCallback)
createStubServer(t, lis, modeChangeOpt, xds.ClientPoolForTesting(pool))
select {
case <-ctx.Done():
t.Fatal("Timeout waiting for the xDS-enabled gRPC server to go SERVING")
case gotMode := <-modeChangeHandler.modeCh:
if gotMode != connectivity.ServingModeServing {
t.Fatalf("Mode changed to %v, want %v", gotMode, connectivity.ServingModeServing)
}
}
// Create a gRPC channel and verify that RPCs succeed.
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient(%q) failed: %v", lis.Addr(), err)
}
defer cc.Close()
waitForSuccessfulRPC(ctx, t, cc)
// Remove the listener resource from the management server. This should
// cause the server to go NOT_SERVING, and the error message should contain
// the xDS node ID.
resources.Listeners = nil
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
select {
case <-ctx.Done():
t.Fatalf("Timed out waiting for server to go NOT_SERVING")
case gotMode := <-modeChangeHandler.modeCh:
if gotMode != connectivity.ServingModeNotServing {
t.Fatalf("Mode changed to %v, want %v", gotMode, connectivity.ServingModeNotServing)
}
gotErr := <-modeChangeHandler.errCh
if gotErr == nil || !strings.Contains(gotErr.Error(), nodeID) {
t.Fatalf("Unexpected error: %v, want xDS Node id: %s", gotErr, nodeID)
}
}
}
// Tests the case where the listener resource points to a route configuration
// name that is NACKed. This should trigger the server to move to SERVING,
// successfully accept connections, and fail at RPCs with an expected error
// message.
func (s) TestServer_RouteConfiguration_ResourceNACK(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Start an xDS management server.
managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address)
// Configure the managegement server with a listener and route configuration
// resource (that will be NACKed) for the above xDS enabled gRPC server.
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("Failed to listen to local port: %v", err)
}
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("Failed to retrieve host and port of server: %v", err)
}
const routeConfigName = "routeName"
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")},
Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigNoRouteMatch(routeConfigName)},
SkipValidation: true,
}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Start an xDS-enabled gRPC server with the above bootstrap configuration.
config, err := bootstrap.NewConfigFromContents(bootstrapContents)
if err != nil {
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err)
}
pool := xdsclient.NewPool(config)
modeChangeHandler := newServingModeChangeHandler(t)
modeChangeOpt := xds.ServingModeCallback(modeChangeHandler.modeChangeCallback)
createStubServer(t, lis, modeChangeOpt, xds.ClientPoolForTesting(pool))
// Create a gRPC channel and verify that RPCs succeed.
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient(%q) failed: %v", lis.Addr(), err)
}
defer cc.Close()
select {
case <-ctx.Done():
t.Fatal("Timeout waiting for the server to start serving RPCs")
case gotMode := <-modeChangeHandler.modeCh:
if gotMode != connectivity.ServingModeServing {
t.Fatalf("Mode changed to %v, want %v", gotMode, connectivity.ServingModeServing)
}
}
waitForFailedRPCWithStatus(ctx, t, cc, codes.Unavailable, "error from xDS configuration for matched route configuration", nodeID)
}
// Tests the case where the listener resource points to multiple route
// configuration resources.
//
// - Initially the listener resource points to three route configuration
// resources (A, B and C). The filter chain in the listener matches incoming
// connections to route A, and RPCs are expected to succeed.
// - A streaming RPC is also kept open at this point.
// - The listener resource is then updated to point to two route configuration
// resources (A and B). The filter chain in the listener resource does not
// match to any of the configured routes. The default filter chain though
// matches to route B, which contains a route action of type "Route", and this
// is not supported on the server side. New RPCs are expected to fail, while
// any ongoing RPCs should be allowed to complete.
// - The listener resource is then updated to point to a single route
// configuration (A), and the filter chain in the listener matches to route A.
// New RPCs are expected to succeed at this point.
func (s) TestServer_MultipleRouteConfigurations(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Start an xDS management server.
managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address)
// Create a listener on a local port to act as the xDS enabled gRPC server.
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("Failed to listen to local port: %v", err)
}
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("Failed to retrieve host and port of server: %v", err)
}
// Setup the management server to respond with a listener resource that
// specifies three route names to watch, and the corresponding route
// configuration resources.
const routeConfigNameA = "routeName-A"
const routeConfigNameB = "routeName-B"
const routeConfigNameC = "routeName-C"
ldsResource := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, routeConfigNameA)
ldsResource.FilterChains = append(ldsResource.FilterChains,
filterChainWontMatch(t, routeConfigNameB, "1.1.1.1", []uint32{1}),
filterChainWontMatch(t, routeConfigNameC, "2.2.2.2", []uint32{2}),
)
routeConfigA := e2e.RouteConfigNonForwardingAction(routeConfigNameA)
routeConfigB := e2e.RouteConfigFilterAction(routeConfigNameB) // Unsupported route action on server.
routeConfigC := e2e.RouteConfigFilterAction(routeConfigNameC) // Unsupported route action on server.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{ldsResource},
Routes: []*v3routepb.RouteConfiguration{routeConfigA, routeConfigB, routeConfigC},
SkipValidation: true,
}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Start an xDS-enabled gRPC server with the above bootstrap configuration.
config, err := bootstrap.NewConfigFromContents(bootstrapContents)
if err != nil {
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err)
}
pool := xdsclient.NewPool(config)
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("Serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
})
createStubServer(t, lis, modeChangeOpt, xds.ClientPoolForTesting(pool))
// Create a gRPC channel and verify that RPCs succeed.
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient(%q) failed: %v", lis.Addr(), err)
}
defer cc.Close()
waitForSuccessfulRPC(ctx, t, cc)
// Start a streaming RPC and keep the stream open.
client := testgrpc.NewTestServiceClient(cc)
stream, err := client.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("FullDuplexCall failed: %v", err)
}
if err = stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil {
t.Fatalf("stream.Send() failed: %v, should continue to work due to graceful stop", err)
}
// Update the listener resource such that the filter chain does not match
// incoming connections to route A. Instead a default filter chain matches
// to route B, which contains an unsupported route action.
ldsResource = e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, routeConfigNameA)
ldsResource.FilterChains = []*v3listenerpb.FilterChain{filterChainWontMatch(t, routeConfigNameA, "1.1.1.1", []uint32{1})}
ldsResource.DefaultFilterChain = filterChainWontMatch(t, routeConfigNameB, "2.2.2.2", []uint32{2})
resources.Listeners = []*v3listenerpb.Listener{ldsResource}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// xDS is eventually consistent. So simply poll for the new change to be
// reflected.
// "NonForwardingAction is expected for all Routes used on server-side; a
// route with an inappropriate action causes RPCs matching that route to
// fail with UNAVAILABLE." - A36
waitForFailedRPCWithStatus(ctx, t, cc, codes.Unavailable, "the incoming RPC matched to a route that was not of action type non forwarding", nodeID)
// Stream should be allowed to continue on the old working configuration -
// as it on a connection that is gracefully closed (old FCM/LDS
// Configuration which is allowed to continue).
if err = stream.CloseSend(); err != nil {
t.Fatalf("stream.CloseSend() failed: %v, should continue to work due to graceful stop", err)
}
if _, err = stream.Recv(); err != io.EOF {
t.Fatalf("unexpected error: %v, expected an EOF error", err)
}
// Update the listener resource to point to a single route configuration
// that is expected to match and verify that RPCs succeed.
resources.Listeners = []*v3listenerpb.Listener{e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, routeConfigNameA)}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
waitForSuccessfulRPC(ctx, t, cc)
}
// filterChainWontMatch returns a filter chain that won't match if running the
// test locally.
func filterChainWontMatch(t *testing.T, routeName string, addressPrefix string, srcPorts []uint32) *v3listenerpb.FilterChain {
hcm := &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{
Rds: &v3httppb.Rds{
ConfigSource: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
},
RouteConfigName: routeName,
},
},
HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})},
}
return &v3listenerpb.FilterChain{
Name: routeName + "-wont-match",
FilterChainMatch: &v3listenerpb.FilterChainMatch{
PrefixRanges: []*v3corepb.CidrRange{
{
AddressPrefix: addressPrefix,
PrefixLen: &wrapperspb.UInt32Value{
Value: uint32(0),
},
},
},
SourceType: v3listenerpb.FilterChainMatch_SAME_IP_OR_LOOPBACK,
SourcePorts: srcPorts,
SourcePrefixRanges: []*v3corepb.CidrRange{
{
AddressPrefix: addressPrefix,
PrefixLen: &wrapperspb.UInt32Value{
Value: uint32(0),
},
},
},
},
Filters: []*v3listenerpb.Filter{
{
Name: "filter-1",
ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: testutils.MarshalAny(t, hcm)},
},
},
}
}