xds/internal/server: stop using a fake xDS client in listenerWrapper tests (#6700)

This commit is contained in:
Easwar Swaminathan 2023-10-12 18:00:12 -07:00 committed by GitHub
parent c76442cdaf
commit 3e9b85c6a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 360 additions and 412 deletions

View File

@ -137,9 +137,48 @@ func marshalAny(m proto.Message) *anypb.Any {
return a
}
// DefaultServerListener returns a basic xds Listener resource to be used on
// the server side.
func DefaultServerListener(host string, port uint32, secLevel SecurityLevel) *v3listenerpb.Listener {
// DefaultServerListener returns a basic xds Listener resource to be used on the
// server side. The returned Listener resource contains an inline route
// configuration with the name of routeName.
func DefaultServerListener(host string, port uint32, secLevel SecurityLevel, routeName string) *v3listenerpb.Listener {
return defaultServerListenerCommon(host, port, secLevel, routeName, true)
}
func defaultServerListenerCommon(host string, port uint32, secLevel SecurityLevel, routeName string, inlineRouteConfig bool) *v3listenerpb.Listener {
var hcm *v3httppb.HttpConnectionManager
if inlineRouteConfig {
hcm = &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
RouteConfig: &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{{
// This "*" string matches on any incoming authority. This is to ensure any
// incoming RPC matches to Route_NonForwardingAction and will proceed as
// normal.
Domains: []string{"*"},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{
PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"},
},
Action: &v3routepb.Route_NonForwardingAction{},
}}}}},
},
HttpFilters: []*v3httppb.HttpFilter{RouterHTTPFilter},
}
} else {
hcm = &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{
Rds: &v3httppb.Rds{
ConfigSource: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
},
RouteConfigName: routeName,
},
},
HttpFilters: []*v3httppb.HttpFilter{RouterHTTPFilter},
}
}
var tlsContext *v3tlspb.DownstreamTlsContext
switch secLevel {
case SecurityLevelNone:
@ -212,27 +251,8 @@ func DefaultServerListener(host string, port uint32, secLevel SecurityLevel) *v3
},
Filters: []*v3listenerpb.Filter{
{
Name: "filter-1",
ConfigType: &v3listenerpb.Filter_TypedConfig{
TypedConfig: marshalAny(&v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
RouteConfig: &v3routepb.RouteConfiguration{
Name: "routeName",
VirtualHosts: []*v3routepb.VirtualHost{{
// This "*" string matches on any incoming authority. This is to ensure any
// incoming RPC matches to Route_NonForwardingAction and will proceed as
// normal.
Domains: []string{"*"},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{
PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"},
},
Action: &v3routepb.Route_NonForwardingAction{},
}}}}},
},
HttpFilters: []*v3httppb.HttpFilter{RouterHTTPFilter},
}),
},
Name: "filter-1",
ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: marshalAny(hcm)},
},
},
TransportSocket: ts,
@ -260,27 +280,8 @@ func DefaultServerListener(host string, port uint32, secLevel SecurityLevel) *v3
},
Filters: []*v3listenerpb.Filter{
{
Name: "filter-1",
ConfigType: &v3listenerpb.Filter_TypedConfig{
TypedConfig: marshalAny(&v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
RouteConfig: &v3routepb.RouteConfiguration{
Name: "routeName",
VirtualHosts: []*v3routepb.VirtualHost{{
// This "*" string matches on any incoming authority. This is to ensure any
// incoming RPC matches to Route_NonForwardingAction and will proceed as
// normal.
Domains: []string{"*"},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{
PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"},
},
Action: &v3routepb.Route_NonForwardingAction{},
}}}}},
},
HttpFilters: []*v3httppb.HttpFilter{RouterHTTPFilter},
}),
},
Name: "filter-1",
ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: marshalAny(hcm)},
},
},
TransportSocket: ts,
@ -289,6 +290,13 @@ func DefaultServerListener(host string, port uint32, secLevel SecurityLevel) *v3
}
}
// DefaultServerListenerWithRouteConfigName returns a basic xds Listener
// resource to be used on the server side. The returned Listener resource
// contains a RouteCongiguration resource name that needs to be resolved.
func DefaultServerListenerWithRouteConfigName(host string, port uint32, secLevel SecurityLevel, routeName string) *v3listenerpb.Listener {
return defaultServerListenerCommon(host, port, secLevel, routeName, false)
}
// HTTPFilter constructs an xds HttpFilter with the provided name and config.
func HTTPFilter(name string, config proto.Message) *v3httppb.HttpFilter {
return &v3httppb.HttpFilter{

View File

@ -337,7 +337,7 @@ func resourceWithListenerForGRPCServer(t *testing.T, nodeID string) (e2e.UpdateO
if err != nil {
t.Fatalf("Failed to retrieve host and port of listener at %q: %v", lis.Addr(), err)
}
listener := e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone)
listener := e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, "routeName")
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*listenerpb.Listener{listener},

View File

@ -169,7 +169,7 @@ func (s) TestUnmarshalListener_WithUpdateValidatorFunc(t *testing.T) {
})
// Create an inbound xDS listener resource for the server side.
inboundLis := e2e.DefaultServerListener(host, port, e2e.SecurityLevelMTLS)
inboundLis := e2e.DefaultServerListener(host, port, e2e.SecurityLevelMTLS, "routeName")
for _, fc := range inboundLis.GetFilterChains() {
fc.TransportSocket = test.securityConfig
}

View File

@ -153,7 +153,7 @@ func (s) TestServerSideXDS_Fallback(t *testing.T) {
// Create an inbound xDS listener resource for the server side that does not
// contain any security configuration. This should force the server-side
// xdsCredentials to use fallback.
inboundLis := e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone)
inboundLis := e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, "routeName")
resources.Listeners = append(resources.Listeners, inboundLis)
// Setup the management server with client and server-side resources.
@ -238,7 +238,7 @@ func (s) TestServerSideXDS_FileWatcherCerts(t *testing.T) {
// Create an inbound xDS listener resource for the server side that
// contains security configuration pointing to the file watcher
// plugin.
inboundLis := e2e.DefaultServerListener(host, port, test.secLevel)
inboundLis := e2e.DefaultServerListener(host, port, test.secLevel, "routeName")
resources.Listeners = append(resources.Listeners, inboundLis)
// Setup the management server with client and server resources.
@ -306,7 +306,7 @@ func (s) TestServerSideXDS_SecurityConfigChange(t *testing.T) {
// Create an inbound xDS listener resource for the server side that does not
// contain any security configuration. This should force the xDS credentials
// on server to use its fallback.
inboundLis := e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone)
inboundLis := e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, "routeName")
resources.Listeners = append(resources.Listeners, inboundLis)
// Setup the management server with client and server-side resources.
@ -360,7 +360,7 @@ func (s) TestServerSideXDS_SecurityConfigChange(t *testing.T) {
Port: port,
SecLevel: e2e.SecurityLevelMTLS,
})
inboundLis = e2e.DefaultServerListener(host, port, e2e.SecurityLevelMTLS)
inboundLis = e2e.DefaultServerListener(host, port, e2e.SecurityLevelMTLS, "routeName")
resources.Listeners = append(resources.Listeners, inboundLis)
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)

View File

@ -75,7 +75,7 @@ func (s) TestServerSideXDS_RedundantUpdateSuppression(t *testing.T) {
if err != nil {
t.Fatalf("failed to retrieve host and port of server: %v", err)
}
listener := e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone)
listener := e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, "routeName")
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
@ -221,12 +221,12 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
if err != nil {
t.Fatalf("failed to retrieve host and port of server: %v", err)
}
listener1 := e2e.DefaultServerListener(host1, port1, e2e.SecurityLevelNone)
listener1 := e2e.DefaultServerListener(host1, port1, e2e.SecurityLevelNone, "routeName")
host2, port2, err := hostPortFromListener(lis2)
if err != nil {
t.Fatalf("failed to retrieve host and port of server: %v", err)
}
listener2 := e2e.DefaultServerListener(host2, port2, e2e.SecurityLevelNone)
listener2 := e2e.DefaultServerListener(host2, port2, e2e.SecurityLevelNone, "routeName")
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener1, listener2},

View File

@ -21,148 +21,32 @@ package server
import (
"context"
"errors"
"fmt"
"net"
"strconv"
"testing"
"time"
"google.golang.org/grpc/internal/envconfig"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
v3tlspb "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
_ "google.golang.org/grpc/xds/internal/httpfilter/router"
_ "google.golang.org/grpc/xds/internal/httpfilter/router" // Register the router filter
)
const (
fakeListenerHost = "0.0.0.0"
fakeListenerPort = 50051
testListenerResourceName = "lds.target.1.2.3.4:1111"
defaultTestTimeout = 10 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond
fakeListenerHost = "0.0.0.0"
fakeListenerPort = 50051
defaultTestTimeout = 10 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond
)
func listenerWithRouteConfiguration(t *testing.T) *v3listenerpb.Listener {
return &v3listenerpb.Listener{
FilterChains: []*v3listenerpb.FilterChain{
{
FilterChainMatch: &v3listenerpb.FilterChainMatch{
PrefixRanges: []*v3corepb.CidrRange{
{
AddressPrefix: "192.168.0.0",
PrefixLen: &wrapperspb.UInt32Value{
Value: uint32(16),
},
},
},
SourceType: v3listenerpb.FilterChainMatch_SAME_IP_OR_LOOPBACK,
SourcePrefixRanges: []*v3corepb.CidrRange{
{
AddressPrefix: "192.168.0.0",
PrefixLen: &wrapperspb.UInt32Value{
Value: uint32(16),
},
},
},
SourcePorts: []uint32{80},
},
Filters: []*v3listenerpb.Filter{
{
Name: "filter-1",
ConfigType: &v3listenerpb.Filter_TypedConfig{
TypedConfig: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{
Rds: &v3httppb.Rds{
ConfigSource: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
},
RouteConfigName: "route-1",
},
},
HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter},
}),
},
},
},
},
},
}
}
func listenerWithFilterChains(t *testing.T) *v3listenerpb.Listener {
return &v3listenerpb.Listener{
FilterChains: []*v3listenerpb.FilterChain{
{
FilterChainMatch: &v3listenerpb.FilterChainMatch{
PrefixRanges: []*v3corepb.CidrRange{
{
AddressPrefix: "192.168.0.0",
PrefixLen: &wrapperspb.UInt32Value{
Value: uint32(16),
},
},
},
SourceType: v3listenerpb.FilterChainMatch_SAME_IP_OR_LOOPBACK,
SourcePrefixRanges: []*v3corepb.CidrRange{
{
AddressPrefix: "192.168.0.0",
PrefixLen: &wrapperspb.UInt32Value{
Value: uint32(16),
},
},
},
SourcePorts: []uint32{80},
},
TransportSocket: &v3corepb.TransportSocket{
Name: "envoy.transport_sockets.tls",
ConfigType: &v3corepb.TransportSocket_TypedConfig{
TypedConfig: testutils.MarshalAny(t, &v3tlspb.DownstreamTlsContext{
CommonTlsContext: &v3tlspb.CommonTlsContext{
TlsCertificateCertificateProviderInstance: &v3tlspb.CommonTlsContext_CertificateProviderInstance{
InstanceName: "identityPluginInstance",
CertificateName: "identityCertName",
},
},
}),
},
},
Filters: []*v3listenerpb.Filter{
{
Name: "filter-1",
ConfigType: &v3listenerpb.Filter_TypedConfig{
TypedConfig: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
RouteConfig: &v3routepb.RouteConfiguration{
Name: "routeName",
VirtualHosts: []*v3routepb.VirtualHost{{
Domains: []string{"lds.target.good:3333"},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{
PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"},
},
Action: &v3routepb.Route_NonForwardingAction{},
}}}}},
},
HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter},
}),
},
},
},
},
},
}
}
type s struct {
grpctest.Tester
}
@ -171,6 +55,191 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
// badListenerResource returns a listener resource for the given name which does
// not contain the `RouteSpecifier` field in the HTTPConnectionManager, and
// hence is expected to be NACKed by the client.
func badListenerResource(t *testing.T, name string) *v3listenerpb.Listener {
hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})},
})
return &v3listenerpb.Listener{
Name: name,
ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm},
FilterChains: []*v3listenerpb.FilterChain{{
Name: "filter-chain-name",
Filters: []*v3listenerpb.Filter{{
Name: wellknown.HTTPConnectionManager,
ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm},
}},
}},
}
}
func hostPortFromListener(t *testing.T, lis net.Listener) (string, uint32) {
t.Helper()
host, p, err := net.SplitHostPort(lis.Addr().String())
if err != nil {
t.Fatalf("net.SplitHostPort(%s) failed: %v", lis.Addr().String(), err)
}
port, err := strconv.ParseInt(p, 10, 32)
if err != nil {
t.Fatalf("strconv.ParseInt(%s, 10, 32) failed: %v", p, err)
}
return host, uint32(port)
}
// Creates a local TCP net.Listener and creates a listenerWrapper by passing
// that and the provided xDS client.
//
// Returns the following:
// - the ready channel of the listenerWrapper
// - host of the listener
// - port of the listener
// - listener resource name to use when requesting this resource from the
// management server
func createListenerWrapper(t *testing.T, xdsC XDSClient) (<-chan struct{}, string, uint32, string) {
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("Failed to create a local TCP listener: %v", err)
}
host, port := hostPortFromListener(t, lis)
lisResourceName := fmt.Sprintf(e2e.ServerListenerResourceNameTemplate, net.JoinHostPort(host, strconv.Itoa(int(port))))
params := ListenerWrapperParams{
Listener: lis,
ListenerResourceName: lisResourceName,
XDSClient: xdsC,
}
l, readyCh := NewListenerWrapper(params)
if l == nil {
t.Fatalf("NewListenerWrapper(%+v) returned nil", params)
}
t.Cleanup(func() { l.Close() })
return readyCh, host, port, lisResourceName
}
// Tests the case where a listenerWrapper is created and following happens:
//
// - the management server returns a Listener resource that is NACKed. Test
// verifies that the listenerWrapper does not become ready.
// - the management server returns a Listener resource that does not match the
// address to which our net.Listener is bound to. Test verifies that the
// listenerWrapper does not become ready.
// - the management server returns a Listener resource that that matches the
// address to which our net.Listener is bound to. Also, it contains an
// inline Route Configuration. Test verifies that the listenerWrapper
// becomes ready.
func (s) TestListenerWrapper_InlineRouteConfig(t *testing.T) {
mgmtServer, nodeID, ldsResourceNamesCh, _, xdsC := xdsSetupFoTests(t)
readyCh, host, port, lisResourceName := createListenerWrapper(t, xdsC)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForResourceNames(ctx, t, ldsResourceNamesCh, []string{lisResourceName})
// Configure the management server with a listener resource that is expected
// to be NACKed.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{badListenerResource(t, lisResourceName)},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Verify that there is no message on the ready channel.
sCtx, sCtxCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCtxCancel()
select {
case <-readyCh:
t.Fatalf("Ready channel written to after receipt of a bad Listener update")
case <-sCtx.Done():
}
// Configure the management server with a listener resource that does not
// match the address to which our listener is bound to.
resources.Listeners = []*v3listenerpb.Listener{e2e.DefaultServerListener(host, port+1, e2e.SecurityLevelNone, route1)}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Verify that there is no message on the ready channel.
select {
case <-readyCh:
t.Fatalf("Ready channel written to after receipt of a bad Listener update")
case <-sCtx.Done():
}
// Configure the management server with a Listener resource that contains
// the expected host and port. Also, it does not contain any rds names that
// need reolution. Therefore the listenerWrapper is expected to become
// ready.
resources.Listeners = []*v3listenerpb.Listener{e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, route1)}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Verify that the listener wrapper becomes ready.
select {
case <-ctx.Done():
t.Fatalf("Timeout waiting for the ready channel to be written to after receipt of a good Listener update")
case <-readyCh:
}
}
// Tests the case where a listenerWrapper is created and the management server
// returns a Listener resource that specifies the name of a Route Configuration
// resource. The test verifies that the listenerWrapper does not become ready
// when waiting for the Route Configuration resource and becomes ready once it
// receives the Route Configuration resource.
func (s) TestListenerWrapper_RouteNames(t *testing.T) {
mgmtServer, nodeID, ldsResourceNamesCh, rdsResourceNamesCh, xdsC := xdsSetupFoTests(t)
readyCh, host, port, lisResourceName := createListenerWrapper(t, xdsC)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForResourceNames(ctx, t, ldsResourceNamesCh, []string{lisResourceName})
// Configure the management server with a listener resource that specifies
// the name of RDS resources that need to be resolved.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, route1)},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
waitForResourceNames(ctx, t, rdsResourceNamesCh, []string{route1})
// Verify that there is no message on the ready channel.
sCtx, sCtxCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCtxCancel()
select {
case <-readyCh:
t.Fatalf("Ready channel written to without rds configuration specified")
case <-sCtx.Done():
}
// Configure the management server with the route configuration resource
// specified by the listener resource.
resources.Routes = []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(route1, lisResourceName, clusterName)}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// All of the xDS updates have completed, so can expect to send a ping on
// good update channel.
select {
case <-ctx.Done():
t.Fatalf("timeout waiting for the ready channel to be written to after receipt of a good rds update")
case <-readyCh:
}
}
type tempError struct{}
func (tempError) Error() string {
@ -233,174 +302,17 @@ func (fc *fakeConn) Close() error {
return nil
}
func newListenerWrapper(t *testing.T) (*listenerWrapper, <-chan struct{}, *fakeclient.Client, *fakeListener, func()) {
t.Helper()
// Create a listener wrapper with a fake listener and fake XDSClient and
// verify that it extracts the host and port from the passed in listener.
lis := &fakeListener{
acceptCh: make(chan connAndErr, 1),
closeCh: testutils.NewChannel(),
}
xdsC := fakeclient.NewClient()
lParams := ListenerWrapperParams{
Listener: lis,
ListenerResourceName: testListenerResourceName,
XDSClient: xdsC,
}
l, readyCh := NewListenerWrapper(lParams)
if l == nil {
t.Fatalf("NewListenerWrapper(%+v) returned nil", lParams)
}
lw, ok := l.(*listenerWrapper)
if !ok {
t.Fatalf("NewListenerWrapper(%+v) returned listener of type %T want *listenerWrapper", lParams, l)
}
if lw.addr != fakeListenerHost || lw.port != strconv.Itoa(fakeListenerPort) {
t.Fatalf("listenerWrapper has host:port %s:%s, want %s:%d", lw.addr, lw.port, fakeListenerHost, fakeListenerPort)
}
return lw, readyCh, xdsC, lis, func() { l.Close() }
}
func (s) TestNewListenerWrapper(t *testing.T) {
_, readyCh, xdsC, _, cleanup := newListenerWrapper(t)
defer cleanup()
// Verify that the listener wrapper registers a listener watch for the
// expected Listener resource name.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
name, err := xdsC.WaitForWatchListener(ctx)
if err != nil {
t.Fatalf("error when waiting for a watch on a Listener resource: %v", err)
}
if name != testListenerResourceName {
t.Fatalf("listenerWrapper registered a lds watch on %s, want %s", name, testListenerResourceName)
}
// Push an error to the listener update handler.
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{}, errors.New("bad listener update"))
timer := time.NewTimer(defaultTestShortTimeout)
select {
case <-timer.C:
timer.Stop()
case <-readyCh:
t.Fatalf("ready channel written to after receipt of a bad Listener update")
}
fcm, err := xdsresource.NewFilterChainManager(listenerWithFilterChains(t))
if err != nil {
t.Fatalf("xdsclient.NewFilterChainManager() failed with error: %v", err)
}
// Push an update whose address does not match the address to which our
// listener is bound, and verify that the ready channel is not written to.
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{
InboundListenerCfg: &xdsresource.InboundListenerConfig{
Address: "10.0.0.1",
Port: "50051",
FilterChains: fcm,
}}, nil)
timer = time.NewTimer(defaultTestShortTimeout)
select {
case <-timer.C:
timer.Stop()
case <-readyCh:
t.Fatalf("ready channel written to after receipt of a bad Listener update")
}
// Push a good update, and verify that the ready channel is written to.
// Since there are no dynamic RDS updates needed to be received, the
// ListenerWrapper does not have to wait for anything else before telling
// that it is ready.
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{
InboundListenerCfg: &xdsresource.InboundListenerConfig{
Address: fakeListenerHost,
Port: strconv.Itoa(fakeListenerPort),
FilterChains: fcm,
}}, nil)
select {
case <-ctx.Done():
t.Fatalf("timeout waiting for the ready channel to be written to after receipt of a good Listener update")
case <-readyCh:
}
}
// TestNewListenerWrapperWithRouteUpdate tests the scenario where the listener
// gets built, starts a watch, that watch returns a list of Route Names to
// return, than receives an update from the rds handler. Only after receiving
// the update from the rds handler should it move the server to
// ServingModeServing.
func (s) TestNewListenerWrapperWithRouteUpdate(t *testing.T) {
oldRBAC := envconfig.XDSRBAC
envconfig.XDSRBAC = true
defer func() {
envconfig.XDSRBAC = oldRBAC
}()
_, readyCh, xdsC, _, cleanup := newListenerWrapper(t)
defer cleanup()
// Verify that the listener wrapper registers a listener watch for the
// expected Listener resource name.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
name, err := xdsC.WaitForWatchListener(ctx)
if err != nil {
t.Fatalf("error when waiting for a watch on a Listener resource: %v", err)
}
if name != testListenerResourceName {
t.Fatalf("listenerWrapper registered a lds watch on %s, want %s", name, testListenerResourceName)
}
fcm, err := xdsresource.NewFilterChainManager(listenerWithRouteConfiguration(t))
if err != nil {
t.Fatalf("xdsclient.NewFilterChainManager() failed with error: %v", err)
}
// Push a good update which contains a Filter Chain that specifies dynamic
// RDS Resources that need to be received. This should ping rds handler
// about which rds names to start, which will eventually start a watch on
// xds client for rds name "route-1".
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{
InboundListenerCfg: &xdsresource.InboundListenerConfig{
Address: fakeListenerHost,
Port: strconv.Itoa(fakeListenerPort),
FilterChains: fcm,
}}, nil)
// This should start a watch on xds client for rds name "route-1".
routeName, err := xdsC.WaitForWatchRouteConfig(ctx)
if err != nil {
t.Fatalf("error when waiting for a watch on a Route resource: %v", err)
}
if routeName != "route-1" {
t.Fatalf("listenerWrapper registered a lds watch on %s, want %s", routeName, "route-1")
}
// This shouldn't invoke good update channel, as has not received rds updates yet.
timer := time.NewTimer(defaultTestShortTimeout)
select {
case <-timer.C:
timer.Stop()
case <-readyCh:
t.Fatalf("ready channel written to without rds configuration specified")
}
// Invoke rds callback for the started rds watch. This valid rds callback
// should trigger the listener wrapper to fire GoodUpdate, as it has
// received both it's LDS Configuration and also RDS Configuration,
// specified in LDS Configuration.
xdsC.InvokeWatchRouteConfigCallback("route-1", xdsresource.RouteConfigUpdate{}, nil)
// All of the xDS updates have completed, so can expect to send a ping on
// good update channel.
select {
case <-ctx.Done():
t.Fatalf("timeout waiting for the ready channel to be written to after receipt of a good rds update")
case <-readyCh:
}
}
// Tests the case where a listenerWrapper is created with a fake net.Listener.
// The test verifies that the listenerWrapper becomes ready once it receives
// configuration from the management server. The test then performs the
// following:
// - injects a non-temp error via the fake net.Listener and verifies that
// Accept() returns with the same error.
// - injects a temp error via the fake net.Listener and verifies that Accept()
// backs off.
// - injects a fake net.Conn via the fake net.Listener. This Conn is expected
// to match the filter chains on the Listener resource. Verifies that
// Accept() does not return an error in this case.
func (s) TestListenerWrapper_Accept(t *testing.T) {
boCh := testutils.NewChannel()
origBackoffFunc := backoffFunc
@ -410,27 +322,52 @@ func (s) TestListenerWrapper_Accept(t *testing.T) {
}
defer func() { backoffFunc = origBackoffFunc }()
lw, readyCh, xdsC, lis, cleanup := newListenerWrapper(t)
defer cleanup()
mgmtServer, nodeID, ldsResourceNamesCh, _, xdsC := xdsSetupFoTests(t)
// Push a good update with a filter chain which accepts local connections on
// 192.168.0.0/16 subnet and port 80.
fcm, err := xdsresource.NewFilterChainManager(listenerWithFilterChains(t))
if err != nil {
t.Fatalf("xdsclient.NewFilterChainManager() failed with error: %v", err)
// Create a listener wrapper with a fake listener and verify that it
// extracts the host and port from the passed in listener.
lis := &fakeListener{
acceptCh: make(chan connAndErr, 1),
closeCh: testutils.NewChannel(),
}
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{
InboundListenerCfg: &xdsresource.InboundListenerConfig{
Address: fakeListenerHost,
Port: strconv.Itoa(fakeListenerPort),
FilterChains: fcm,
}}, nil)
lisResourceName := fmt.Sprintf(e2e.ServerListenerResourceNameTemplate, net.JoinHostPort(fakeListenerHost, strconv.Itoa(int(fakeListenerPort))))
params := ListenerWrapperParams{
Listener: lis,
ListenerResourceName: lisResourceName,
XDSClient: xdsC,
}
l, readyCh := NewListenerWrapper(params)
if l == nil {
t.Fatalf("NewListenerWrapper(%+v) returned nil", params)
}
lw, ok := l.(*listenerWrapper)
if !ok {
t.Fatalf("NewListenerWrapper(%+v) returned listener of type %T want *listenerWrapper", params, l)
}
if lw.addr != fakeListenerHost || lw.port != strconv.Itoa(fakeListenerPort) {
t.Fatalf("listenerWrapper has host:port %s:%s, want %s:%d", lw.addr, lw.port, fakeListenerHost, fakeListenerPort)
}
defer l.Close()
// Verify that the expected listener resource is requested.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
defer close(lis.acceptCh)
waitForResourceNames(ctx, t, ldsResourceNamesCh, []string{lisResourceName})
// Configure the management server with a listener resource.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListener(fakeListenerHost, fakeListenerPort, e2e.SecurityLevelNone, route1)},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Verify that the listener wrapper becomes ready.
select {
case <-ctx.Done():
t.Fatalf("timeout waiting for the ready channel to be written to after receipt of a good Listener update")
t.Fatalf("Timeout waiting for the ready channel to be written to after receipt of a good Listener update")
case <-readyCh:
}
@ -465,21 +402,9 @@ func (s) TestListenerWrapper_Accept(t *testing.T) {
t.Fatalf("error when waiting for Accept() to backoff on temporary errors: %v", err)
}
// Push a fakeConn which does not match any filter chains configured on the
// received Listener resource. Verify that the conn is closed.
fc := &fakeConn{
local: &net.TCPAddr{IP: net.IPv4(192, 168, 1, 2), Port: 79},
remote: &net.TCPAddr{IP: net.IPv4(10, 1, 1, 1), Port: 80},
closeCh: testutils.NewChannel(),
}
lis.acceptCh <- connAndErr{conn: fc}
if _, err := fc.closeCh.Receive(ctx); err != nil {
t.Fatalf("error when waiting for conn to be closed on no filter chain match: %v", err)
}
// Push a fakeConn which matches the filter chains configured on the
// received Listener resource. Verify that Accept() returns.
fc = &fakeConn{
fc := &fakeConn{
local: &net.TCPAddr{IP: net.IPv4(192, 168, 1, 2)},
remote: &net.TCPAddr{IP: net.IPv4(192, 168, 1, 2), Port: 80},
closeCh: testutils.NewChannel(),

View File

@ -46,7 +46,7 @@ const (
route3 = "route3"
)
// setupForRDSHandlerTests performs the following setup actions:
// xdsSetupFoTests performs the following setup actions:
// - spins up an xDS management server
// - creates an xDS client with a bootstrap configuration pointing to the above
// management server
@ -54,26 +54,41 @@ const (
// Returns the following:
// - a reference to the management server
// - nodeID to use when pushing resources to the management server
// - a channel to read resource names received by the management server
// - a channel to read lds resource names received by the management server
// - a channel to read rds resource names received by the management server
// - an xDS client to pass to the rdsHandler under test
func setupForRDSHandlerTests(t *testing.T) (*e2e.ManagementServer, string, chan []string, xdsclient.XDSClient) {
namesCh := make(chan []string, 1)
func xdsSetupFoTests(t *testing.T) (*e2e.ManagementServer, string, chan []string, chan []string, xdsclient.XDSClient) {
t.Helper()
ldsNamesCh := make(chan []string, 1)
rdsNamesCh := make(chan []string, 1)
// Setup the management server to push the requested route configuration
// resource names on to a channel for the test to inspect.
mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
if req.GetTypeUrl() != version.V3RouteConfigURL {
switch req.GetTypeUrl() {
case version.V3ListenerURL:
select {
case <-ldsNamesCh:
default:
}
select {
case ldsNamesCh <- req.GetResourceNames():
default:
}
case version.V3RouteConfigURL:
select {
case <-rdsNamesCh:
default:
}
select {
case rdsNamesCh <- req.GetResourceNames():
default:
}
default:
return fmt.Errorf("unexpected resources %v of type %q requested", req.GetResourceNames(), req.GetTypeUrl())
}
select {
case <-namesCh:
default:
}
select {
case namesCh <- req.GetResourceNames():
default:
}
return nil
},
AllowResourceSubset: true,
@ -86,7 +101,7 @@ func setupForRDSHandlerTests(t *testing.T) (*e2e.ManagementServer, string, chan
}
t.Cleanup(cancel)
return mgmtServer, nodeID, namesCh, xdsC
return mgmtServer, nodeID, ldsNamesCh, rdsNamesCh, xdsC
}
// Waits for the wantNames to be pushed on to namesCh. Fails the test by calling
@ -156,7 +171,7 @@ var defaultRouteConfigUpdate = xdsresource.RouteConfigUpdate{
// update channel
// - once the handler is closed, the watch for the route name is canceled.
func (s) TestRDSHandler_SuccessCaseOneRDSWatch(t *testing.T) {
mgmtServer, nodeID, resourceNamesCh, xdsC := setupForRDSHandlerTests(t)
mgmtServer, nodeID, _, rdsNamesCh, xdsC := xdsSetupFoTests(t)
// Configure the management server with a route config resource.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@ -176,7 +191,7 @@ func (s) TestRDSHandler_SuccessCaseOneRDSWatch(t *testing.T) {
rh.updateRouteNamesToWatch(map[string]bool{route1: true})
// Verify that the given route is requested.
waitForResourceNames(ctx, t, resourceNamesCh, []string{route1})
waitForResourceNames(ctx, t, rdsNamesCh, []string{route1})
// Verify that the update is pushed to the handler's update channel.
wantUpdate := rdsHandlerUpdate{updates: map[string]xdsresource.RouteConfigUpdate{route1: defaultRouteConfigUpdate}}
@ -184,7 +199,7 @@ func (s) TestRDSHandler_SuccessCaseOneRDSWatch(t *testing.T) {
// Close the rds handler and verify that the watch is canceled.
rh.close()
waitForResourceNames(ctx, t, resourceNamesCh, []string{})
waitForResourceNames(ctx, t, rdsNamesCh, []string{})
}
// Tests the case where the rds handler receives two route names to watch. The
@ -192,7 +207,7 @@ func (s) TestRDSHandler_SuccessCaseOneRDSWatch(t *testing.T) {
// push an update on the channel. And when the handler receives the second
// route, the test verifies that the update is pushed.
func (s) TestRDSHandler_SuccessCaseTwoUpdates(t *testing.T) {
mgmtServer, nodeID, resourceNamesCh, xdsC := setupForRDSHandlerTests(t)
mgmtServer, nodeID, _, rdsNamesCh, xdsC := xdsSetupFoTests(t)
// Create an rds handler and give it a single route to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
@ -202,11 +217,11 @@ func (s) TestRDSHandler_SuccessCaseTwoUpdates(t *testing.T) {
// Verify that the given route is requested.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForResourceNames(ctx, t, resourceNamesCh, []string{route1})
waitForResourceNames(ctx, t, rdsNamesCh, []string{route1})
// Update the rds handler to watch for two routes.
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true})
waitForResourceNames(ctx, t, resourceNamesCh, []string{route1, route2})
waitForResourceNames(ctx, t, rdsNamesCh, []string{route1, route2})
// Configure the management server with a single route config resource.
routeResource1 := routeConfigResourceForName(route1)
@ -246,7 +261,7 @@ func (s) TestRDSHandler_SuccessCaseTwoUpdates(t *testing.T) {
// Close the rds handler and verify that the watch is canceled.
rh.close()
waitForResourceNames(ctx, t, resourceNamesCh, []string{})
waitForResourceNames(ctx, t, rdsNamesCh, []string{})
}
// Tests the case where the rds handler receives an update with two routes, then
@ -254,7 +269,7 @@ func (s) TestRDSHandler_SuccessCaseTwoUpdates(t *testing.T) {
// the watch for the route no longer present, and push a corresponding update
// with only one route.
func (s) TestRDSHandler_SuccessCaseDeletedRoute(t *testing.T) {
mgmtServer, nodeID, resourceNamesCh, xdsC := setupForRDSHandlerTests(t)
mgmtServer, nodeID, _, rdsNamesCh, xdsC := xdsSetupFoTests(t)
// Create an rds handler and give it two routes to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
@ -264,7 +279,7 @@ func (s) TestRDSHandler_SuccessCaseDeletedRoute(t *testing.T) {
// Verify that the given routes are requested.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForResourceNames(ctx, t, resourceNamesCh, []string{route1, route2})
waitForResourceNames(ctx, t, rdsNamesCh, []string{route1, route2})
// Configure the management server with two route config resources.
routeResource1 := routeConfigResourceForName(route1)
@ -291,7 +306,7 @@ func (s) TestRDSHandler_SuccessCaseDeletedRoute(t *testing.T) {
rh.updateRouteNamesToWatch(map[string]bool{route1: true})
// Verify that the other route is no longer requested.
waitForResourceNames(ctx, t, resourceNamesCh, []string{route1})
waitForResourceNames(ctx, t, rdsNamesCh, []string{route1})
// Verify that an update is pushed with only one route.
wantUpdate = rdsHandlerUpdate{updates: map[string]xdsresource.RouteConfigUpdate{route1: defaultRouteConfigUpdate}}
@ -299,7 +314,7 @@ func (s) TestRDSHandler_SuccessCaseDeletedRoute(t *testing.T) {
// Close the rds handler and verify that the watch is canceled.
rh.close()
waitForResourceNames(ctx, t, resourceNamesCh, []string{})
waitForResourceNames(ctx, t, rdsNamesCh, []string{})
}
// Tests the case where the rds handler receives an update with two routes, and
@ -310,7 +325,7 @@ func (s) TestRDSHandler_SuccessCaseDeletedRoute(t *testing.T) {
// rds handler until the newly added route config resource is received from the
// management server.
func (s) TestRDSHandler_SuccessCaseTwoUpdatesAddAndDeleteRoute(t *testing.T) {
mgmtServer, nodeID, resourceNamesCh, xdsC := setupForRDSHandlerTests(t)
mgmtServer, nodeID, _, rdsNamesCh, xdsC := xdsSetupFoTests(t)
// Create an rds handler and give it two routes to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
@ -320,7 +335,7 @@ func (s) TestRDSHandler_SuccessCaseTwoUpdatesAddAndDeleteRoute(t *testing.T) {
// Verify that the given routes are requested.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForResourceNames(ctx, t, resourceNamesCh, []string{route1, route2})
waitForResourceNames(ctx, t, rdsNamesCh, []string{route1, route2})
// Configure the management server with two route config resources.
routeResource1 := routeConfigResourceForName(route1)
@ -345,7 +360,7 @@ func (s) TestRDSHandler_SuccessCaseTwoUpdatesAddAndDeleteRoute(t *testing.T) {
// Update the handler with a route that was already there and a new route.
rh.updateRouteNamesToWatch(map[string]bool{route2: true, route3: true})
waitForResourceNames(ctx, t, resourceNamesCh, []string{route2, route3})
waitForResourceNames(ctx, t, rdsNamesCh, []string{route2, route3})
// The handler should not send an update.
sCtx, sCtxCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
@ -374,14 +389,14 @@ func (s) TestRDSHandler_SuccessCaseTwoUpdatesAddAndDeleteRoute(t *testing.T) {
// Close the rds handler and verify that the watch is canceled.
rh.close()
waitForResourceNames(ctx, t, resourceNamesCh, []string{})
waitForResourceNames(ctx, t, rdsNamesCh, []string{})
}
// Tests the scenario where the rds handler gets told to watch three rds
// configurations, gets two successful updates, then gets told to watch only
// those two. The rds handler should then write an update to update buffer.
func (s) TestRDSHandler_SuccessCaseSecondUpdateMakesRouteFull(t *testing.T) {
mgmtServer, nodeID, resourceNamesCh, xdsC := setupForRDSHandlerTests(t)
mgmtServer, nodeID, _, rdsNamesCh, xdsC := xdsSetupFoTests(t)
// Create an rds handler and give it three routes to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
@ -391,7 +406,7 @@ func (s) TestRDSHandler_SuccessCaseSecondUpdateMakesRouteFull(t *testing.T) {
// Verify that the given routes are requested.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForResourceNames(ctx, t, resourceNamesCh, []string{route1, route2, route3})
waitForResourceNames(ctx, t, rdsNamesCh, []string{route1, route2, route3})
// Configure the management server with two route config resources.
routeResource1 := routeConfigResourceForName(route1)
@ -424,19 +439,19 @@ func (s) TestRDSHandler_SuccessCaseSecondUpdateMakesRouteFull(t *testing.T) {
route2: defaultRouteConfigUpdate,
},
}
waitForResourceNames(ctx, t, resourceNamesCh, []string{route1, route2})
waitForResourceNames(ctx, t, rdsNamesCh, []string{route1, route2})
verifyUpdateFromChannel(ctx, t, updateCh, wantUpdate)
// Close the rds handler and verify that the watch is canceled.
rh.close()
waitForResourceNames(ctx, t, resourceNamesCh, []string{})
waitForResourceNames(ctx, t, rdsNamesCh, []string{})
}
// TestErrorReceived tests the case where the rds handler receives a route name
// to watch, then receives an update with an error. This error should be then
// written to the update channel.
func (s) TestErrorReceived(t *testing.T) {
mgmtServer, nodeID, resourceNamesCh, xdsC := setupForRDSHandlerTests(t)
mgmtServer, nodeID, _, rdsNamesCh, xdsC := xdsSetupFoTests(t)
// Create an rds handler and give it a single route to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
@ -446,7 +461,7 @@ func (s) TestErrorReceived(t *testing.T) {
// Verify that the given route is requested.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForResourceNames(ctx, t, resourceNamesCh, []string{route1})
waitForResourceNames(ctx, t, rdsNamesCh, []string{route1})
// Configure the management server with a single route config resource, that
// is expected to be NACKed.

View File

@ -440,7 +440,7 @@ func (s) TestServeSuccess(t *testing.T) {
host, port := hostPortFromListener(t, lis)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone)},
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, "routeName")},
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
@ -647,7 +647,7 @@ func (s) TestHandleListenerUpdate_NoXDSCreds(t *testing.T) {
host, port := hostPortFromListener(t, lis)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListener(host, port, e2e.SecurityLevelMTLS)},
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListener(host, port, e2e.SecurityLevelMTLS, "routeName")},
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
@ -731,7 +731,7 @@ func (s) TestHandleListenerUpdate_ErrorUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
host, port := hostPortFromListener(t, lis)
listener := e2e.DefaultServerListener(host, port, e2e.SecurityLevelMTLS)
listener := e2e.DefaultServerListener(host, port, e2e.SecurityLevelMTLS, "routeName")
listener.ListenerFilters = []*v3listenerpb.ListenerFilter{{Name: "foo"}}
resources := e2e.UpdateOptions{
NodeID: nodeID,