xds/server: fix RDS handling for non-inline route configs (#6915)

This commit is contained in:
Zach Reyes 2024-01-16 19:03:18 -05:00 committed by GitHub
parent 8b455deef5
commit ddd377f198
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 1365 additions and 1135 deletions

View File

@ -68,11 +68,6 @@ var (
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
CanonicalString any // func (codes.Code) string
// DrainServerTransports initiates a graceful close of existing connections
// on a gRPC server accepted on the provided listener address. An
// xDS-enabled server invokes this method on a grpc.Server when a particular
// listener moves to "not-serving" mode.
DrainServerTransports any // func(*grpc.Server, string)
// IsRegisteredMethod returns whether the passed in method is registered as
// a method on the server.
IsRegisteredMethod any // func(*grpc.Server, string) bool
@ -188,6 +183,15 @@ var (
ExitIdleModeForTesting any // func(*grpc.ClientConn) error
ChannelzTurnOffForTesting func()
// TriggerXDSResourceNameNotFoundForTesting triggers the resource-not-found
// error for a given resource type and name. This is usually triggered when
// the associated watch timer fires. For testing purposes, having this
// function makes events more predictable than relying on timer events.
TriggerXDSResourceNameNotFoundForTesting any // func(func(xdsresource.Type, string), string, string) error
// TriggerXDSResourceNotFoundClient invokes the testing xDS Client singleton
// to invoke resource not found for a resource type name and resource name.
TriggerXDSResourceNameNotFoundClient any // func(string, string) error
)
// HealthChecker defines the signature of the client-side LB channel health checking function.

View File

@ -137,6 +137,72 @@ func marshalAny(m proto.Message) *anypb.Any {
return a
}
// filterChainWontMatch returns a filter chain that won't match if running the
// test locally.
func filterChainWontMatch(routeName string, addressPrefix string, srcPorts []uint32) *v3listenerpb.FilterChain {
hcm := &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{
Rds: &v3httppb.Rds{
ConfigSource: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
},
RouteConfigName: routeName,
},
},
HttpFilters: []*v3httppb.HttpFilter{RouterHTTPFilter},
}
return &v3listenerpb.FilterChain{
Name: routeName + "-wont-match",
FilterChainMatch: &v3listenerpb.FilterChainMatch{
PrefixRanges: []*v3corepb.CidrRange{
{
AddressPrefix: addressPrefix,
PrefixLen: &wrapperspb.UInt32Value{
Value: uint32(0),
},
},
},
SourceType: v3listenerpb.FilterChainMatch_SAME_IP_OR_LOOPBACK,
SourcePorts: srcPorts,
SourcePrefixRanges: []*v3corepb.CidrRange{
{
AddressPrefix: addressPrefix,
PrefixLen: &wrapperspb.UInt32Value{
Value: uint32(0),
},
},
},
},
Filters: []*v3listenerpb.Filter{
{
Name: "filter-1",
ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: marshalAny(hcm)},
},
},
}
}
// ListenerResourceThreeRouteResources returns a listener resource that points
// to three route configurations. Only the filter chain that points to the first
// route config can be matched to.
func ListenerResourceThreeRouteResources(host string, port uint32, secLevel SecurityLevel, routeName string) *v3listenerpb.Listener {
lis := defaultServerListenerCommon(host, port, secLevel, routeName, false)
lis.FilterChains = append(lis.FilterChains, filterChainWontMatch("routeName2", "1.1.1.1", []uint32{1}))
lis.FilterChains = append(lis.FilterChains, filterChainWontMatch("routeName3", "2.2.2.2", []uint32{2}))
return lis
}
// ListenerResourceFallbackToDefault returns a listener resource that contains a
// filter chain that will never get chosen to process traffic and a default
// filter chain. The default filter chain points to routeName2.
func ListenerResourceFallbackToDefault(host string, port uint32, secLevel SecurityLevel) *v3listenerpb.Listener {
lis := defaultServerListenerCommon(host, port, secLevel, "", false)
lis.FilterChains = nil
lis.FilterChains = append(lis.FilterChains, filterChainWontMatch("routeName", "1.1.1.1", []uint32{1}))
lis.DefaultFilterChain = filterChainWontMatch("routeName2", "2.2.2.2", []uint32{2})
return lis
}
// DefaultServerListener returns a basic xds Listener resource to be used on the
// server side. The returned Listener resource contains an inline route
// configuration with the name of routeName.
@ -290,13 +356,6 @@ func defaultServerListenerCommon(host string, port uint32, secLevel SecurityLeve
}
}
// DefaultServerListenerWithRouteConfigName returns a basic xds Listener
// resource to be used on the server side. The returned Listener resource
// contains a RouteCongiguration resource name that needs to be resolved.
func DefaultServerListenerWithRouteConfigName(host string, port uint32, secLevel SecurityLevel, routeName string) *v3listenerpb.Listener {
return defaultServerListenerCommon(host, port, secLevel, routeName, false)
}
// HTTPFilter constructs an xds HttpFilter with the provided name and config.
func HTTPFilter(name string, config proto.Message) *v3httppb.HttpFilter {
return &v3httppb.HttpFilter{
@ -356,7 +415,6 @@ type RouteConfigOptions struct {
ListenerName string
// ClusterSpecifierType determines the cluster specifier type.
ClusterSpecifierType RouteConfigClusterSpecifierType
// ClusterName is name of the cluster resource used when the cluster
// specifier type is set to RouteConfigClusterSpecifierTypeCluster.
//
@ -722,3 +780,65 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
}
return cla
}
// DefaultServerListenerWithRouteConfigName returns a basic xds Listener
// resource to be used on the server side. The returned Listener resource
// contains a RouteCongiguration resource name that needs to be resolved.
func DefaultServerListenerWithRouteConfigName(host string, port uint32, secLevel SecurityLevel, routeName string) *v3listenerpb.Listener {
return defaultServerListenerCommon(host, port, secLevel, routeName, false)
}
// RouteConfigNoRouteMatch returns an xDS RouteConfig resource which a route
// with no route match. This will be NACKed by the xDS Client.
func RouteConfigNoRouteMatch(routeName string) *v3routepb.RouteConfiguration {
return &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{{
// This "*" string matches on any incoming authority. This is to ensure any
// incoming RPC matches to Route_NonForwardingAction and will proceed as
// normal.
Domains: []string{"*"},
Routes: []*v3routepb.Route{{
Action: &v3routepb.Route_NonForwardingAction{},
}}}}}
}
// RouteConfigNonForwardingAction returns an xDS RouteConfig resource which
// specifies to route to a route specifying non forwarding action. This is
// intended to be used on the server side for RDS requests, and corresponds to
// the inline route configuration in DefaultServerListener.
func RouteConfigNonForwardingAction(routeName string) *v3routepb.RouteConfiguration {
return &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{{
// This "*" string matches on any incoming authority. This is to ensure any
// incoming RPC matches to Route_NonForwardingAction and will proceed as
// normal.
Domains: []string{"*"},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{
PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"},
},
Action: &v3routepb.Route_NonForwardingAction{},
}}}}}
}
// RouteConfigFilterAction returns an xDS RouteConfig resource which specifies
// to route to a route specifying route filter action. Since this is not type
// non forwarding action, this should fail requests that match to this server
// side.
func RouteConfigFilterAction(routeName string) *v3routepb.RouteConfiguration {
return &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{{
// This "*" string matches on any incoming authority. This is to
// ensure any incoming RPC matches to Route_Route and will fail with
// UNAVAILABLE.
Domains: []string{"*"},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{
PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"},
},
Action: &v3routepb.Route_FilterAction{},
}}}}}
}

View File

@ -74,9 +74,6 @@ func init() {
return srv.isRegisteredMethod(method)
}
internal.ServerFromContext = serverFromContext
internal.DrainServerTransports = func(srv *Server, addr string) {
srv.drainServerTransports(addr)
}
internal.AddGlobalServerOptions = func(opt ...ServerOption) {
globalServerOptions = append(globalServerOptions, opt...)
}
@ -932,6 +929,12 @@ func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
return
}
if cc, ok := rawConn.(interface {
PassServerTransport(transport.ServerTransport)
}); ok {
cc.PassServerTransport(st)
}
if !s.addConn(lisAddr, st) {
return
}
@ -941,15 +944,6 @@ func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
}()
}
func (s *Server) drainServerTransports(addr string) {
s.mu.Lock()
conns := s.conns[addr]
for st := range conns {
st.Drain("")
}
s.mu.Unlock()
}
// newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go).
func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {

View File

@ -28,14 +28,12 @@ import (
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/bootstrap"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds"
"google.golang.org/protobuf/types/known/wrapperspb"
@ -233,12 +231,7 @@ func (s) TestServerSideXDS_WithNoCertificateProvidersInBootstrap_Failure(t *test
}
defer cc.Close()
client := testgrpc.NewTestServiceClient(cc)
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() failed: %v, wantCode: %s", err, codes.DeadlineExceeded)
}
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
}
// Tests the case where the bootstrap configuration contains one certificate
@ -484,10 +477,5 @@ func (s) TestServerSideXDS_WithValidAndInvalidSecurityConfiguration(t *testing.T
}
defer cc2.Close()
client2 := testgrpc.NewTestServiceClient(cc2)
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := client2.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() failed: %v, wantCode: %s", err, codes.DeadlineExceeded)
}
waitForFailedRPCWithStatus(ctx, t, cc2, errAcceptAndClose)
}

View File

@ -21,6 +21,7 @@ package xds_test
import (
"context"
"fmt"
"io"
"net"
"strconv"
"testing"
@ -50,6 +51,15 @@ func (*testService) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.S
return &testpb.SimpleResponse{}, nil
}
func (*testService) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error {
for {
_, err := stream.Recv() // hangs here forever if stream doesn't shut down...doesn't receive EOF without any errors
if err == io.EOF {
return nil
}
}
}
func testModeChangeServerOption(t *testing.T) grpc.ServerOption {
// Create a server option to get notified about serving mode changes. We don't
// do anything other than throwing a log entry here. But this is required,

View File

@ -386,7 +386,7 @@ func waitForFailedRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn) {
return
}
ticker := time.NewTimer(1 * time.Second)
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
select {

564
test/xds/xds_server_test.go Normal file
View File

@ -0,0 +1,564 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xds_test
import (
"context"
"io"
"net"
"strings"
"testing"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)
var (
errAcceptAndClose = status.New(codes.Unavailable, "")
)
// TestServeLDSRDS tests the case where a server receives LDS resource which
// specifies RDS. LDS and RDS resources are configured on the management server,
// which the server should pick up. The server should successfully accept
// connections and RPCs should work on these accepted connections. It then
// switches the RDS resource to match incoming RPC's to a route type of type
// that isn't non forwarding action. This should get picked up by the connection
// dynamically, and subsequent RPC's on that connection should start failing
// with status code UNAVAILABLE.
func (s) TestServeLDSRDS(t *testing.T) {
managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
// Setup the management server to respond with a listener resource that
// specifies a route name to watch, and a RDS resource corresponding to this
// route name.
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("failed to retrieve host and port of server: %v", err)
}
listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
routeConfig := e2e.RouteConfigNonForwardingAction("routeName")
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
Routes: []*v3routepb.RouteConfiguration{routeConfig},
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
serving := grpcsync.NewEvent()
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
if args.Mode == connectivity.ServingModeServing {
serving.Fire()
}
})
server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
select {
case <-ctx.Done():
t.Fatal("timeout waiting for the xDS Server to go Serving")
case <-serving.Done():
}
cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()
waitForSuccessfulRPC(ctx, t, cc) // Eventually, the LDS and dynamic RDS get processed, work, and RPC's should work as usual.
// Set the route config to be of type route action route, which the rpc will
// match to. This should eventually reflect in the Conn's routing
// configuration and fail the rpc with a status code UNAVAILABLE.
routeConfig = e2e.RouteConfigFilterAction("routeName")
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener}, // Same lis, so will get eaten by the xDS Client.
Routes: []*v3routepb.RouteConfiguration{routeConfig},
}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// "NonForwardingAction is expected for all Routes used on server-side; a
// route with an inappropriate action causes RPCs matching that route to
// fail with UNAVAILABLE." - A36
waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "the incoming RPC matched to a route that was not of action type non forwarding"))
}
// waitForFailedRPCWithStatus makes unary RPC's until it receives the expected
// status in a polling manner. Fails if the RPC made does not return the
// expected status before the context expires.
func waitForFailedRPCWithStatus(ctx context.Context, t *testing.T, cc *grpc.ClientConn, st *status.Status) {
t.Helper()
c := testgrpc.NewTestServiceClient(cc)
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
var err error
for {
select {
case <-ctx.Done():
t.Fatalf("failure when waiting for RPCs to fail with certain status %v: %v. most recent error received from RPC: %v", st, ctx.Err(), err)
case <-ticker.C:
_, err = c.EmptyCall(ctx, &testpb.Empty{})
if status.Code(err) == st.Code() && strings.Contains(err.Error(), st.Message()) {
t.Logf("most recent error happy case: %v", err.Error())
return
}
}
}
}
// TestResourceNack tests the case where an LDS points to an RDS which returns
// an RDS Resource which is NACKed. This should trigger server should move to
// serving, successfully Accept Connections, and fail at the L7 level with a
// certain error message.
func (s) TestRDSNack(t *testing.T) {
managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
// Setup the management server to respond with a listener resource that
// specifies a route name to watch, and no RDS resource corresponding to
// this route name.
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("failed to retrieve host and port of server: %v", err)
}
listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
routeConfig := e2e.RouteConfigNoRouteMatch("routeName")
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
Routes: []*v3routepb.RouteConfiguration{routeConfig},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
serving := grpcsync.NewEvent()
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
if args.Mode == connectivity.ServingModeServing {
serving.Fire()
}
})
server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()
<-serving.Done()
waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration"))
}
// TestResourceNotFoundRDS tests the case where an LDS points to an RDS which
// returns resource not found. Before getting the resource not found, the xDS
// Server has not received all configuration needed, so it should Accept and
// Close any new connections. After it has received the resource not found
// error, the server should move to serving, successfully Accept Connections,
// and fail at the L7 level with resource not found specified.
func (s) TestResourceNotFoundRDS(t *testing.T) {
managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
// Setup the management server to respond with a listener resource that
// specifies a route name to watch, and no RDS resource corresponding to
// this route name.
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("failed to retrieve host and port of server: %v", err)
}
listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
serving := grpcsync.NewEvent()
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
if args.Mode == connectivity.ServingModeServing {
serving.Fire()
}
})
server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
// Invoke resource not found - this should result in L7 RPC error with
// unavailable receive on serving as a result, should trigger it to go
// serving. Poll as watch might not be started yet to trigger resource not
// found.
loop:
for {
if err := internal.TriggerXDSResourceNameNotFoundClient.(func(string, string) error)("RouteConfigResource", "routeName"); err != nil {
t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
}
select {
case <-serving.Done():
break loop
case <-ctx.Done():
t.Fatalf("timed out waiting for serving mode to go serving")
case <-time.After(time.Millisecond):
}
}
waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration"))
}
// TestServingModeChanges tests the Server's logic as it transitions from Not
// Ready to Ready, then to Not Ready. Before it goes Ready, connections should
// be accepted and closed. After it goes ready, RPC's should proceed as normal
// according to matched route configuration. After it transitions back into not
// ready (through an explicit LDS Resource Not Found), previously running RPC's
// should be gracefully closed and still work, and new RPC's should fail.
func (s) TestServingModeChanges(t *testing.T) {
managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
// Setup the management server to respond with a listener resource that
// specifies a route name to watch. Due to not having received the full
// configuration, this should cause the server to be in mode Serving.
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("failed to retrieve host and port of server: %v", err)
}
listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
serving := grpcsync.NewEvent()
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
if args.Mode == connectivity.ServingModeServing {
serving.Fire()
}
})
server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
routeConfig := e2e.RouteConfigNonForwardingAction("routeName")
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
Routes: []*v3routepb.RouteConfiguration{routeConfig},
}
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
select {
case <-ctx.Done():
t.Fatal("timeout waiting for the xDS Server to go Serving")
case <-serving.Done():
}
// A unary RPC should work once it transitions into serving. (need this same
// assertion from LDS resource not found triggering it).
waitForSuccessfulRPC(ctx, t, cc)
// Start a stream before switching the server to not serving. Due to the
// stream being created before the graceful stop of the underlying
// connection, it should be able to continue even after the server switches
// to not serving.
c := testgrpc.NewTestServiceClient(cc)
stream, err := c.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("cc.FullDuplexCall failed: %f", err)
}
// Invoke the lds resource not found - this should cause the server to
// switch to not serving. This should gracefully drain connections, and fail
// RPC's after. (how to assert accepted + closed) does this make it's way to
// application layer? (should work outside of resource not found...
// Invoke LDS Resource not found here (tests graceful close)
if err := internal.TriggerXDSResourceNameNotFoundClient.(func(string, string) error)("ListenerResource", listener.GetName()); err != nil {
t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
}
// New RPCs on that connection should eventually start failing. Due to
// Graceful Stop any started streams continue to work.
if err = stream.Send(&testgrpc.StreamingOutputCallRequest{}); err != nil {
t.Fatalf("stream.Send() failed: %v, should continue to work due to graceful stop", err)
}
if err = stream.CloseSend(); err != nil {
t.Fatalf("stream.CloseSend() failed: %v, should continue to work due to graceful stop", err)
}
if _, err = stream.Recv(); err != io.EOF {
t.Fatalf("unexpected error: %v, expected an EOF error", err)
}
// New RPCs on that connection should eventually start failing.
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
}
// TestMultipleUpdatesImmediatelySwitch tests the case where you get an LDS
// specifying RDS A, B, and C (with A being matched to). The Server should be in
// not serving until it receives all 3 RDS Configurations, and then transition
// into serving. RPCs will match to RDS A and work properly. Afterward, it
// receives an LDS specifying RDS A, B. The Filter Chain pointing to RDS A
// doesn't get matched, and the Default Filter Chain pointing to RDS B does get
// matched. RDS B is of the wrong route type for server side, so RPC's are
// expected to eventually fail with that information. However, any RPC's on the
// old configration should be allowed to complete due to the transition being
// graceful stop.After, it receives an LDS specifying RDS A (which incoming
// RPC's will match to). This configuration should eventually be represented in
// the Server's state, and RPCs should proceed successfully.
func (s) TestMultipleUpdatesImmediatelySwitch(t *testing.T) {
managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("failed to retrieve host and port of server: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Setup the management server to respond with a listener resource that
// specifies three route names to watch.
ldsResource := e2e.ListenerResourceThreeRouteResources(host, port, e2e.SecurityLevelNone, "routeName")
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{ldsResource},
SkipValidation: true,
}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), testModeChangeServerOption(t), xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()
waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
routeConfig1 := e2e.RouteConfigNonForwardingAction("routeName")
routeConfig2 := e2e.RouteConfigFilterAction("routeName2")
routeConfig3 := e2e.RouteConfigFilterAction("routeName3")
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{ldsResource},
Routes: []*v3routepb.RouteConfiguration{routeConfig1, routeConfig2, routeConfig3},
SkipValidation: true,
}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
pollForSuccessfulRPC(ctx, t, cc)
c := testgrpc.NewTestServiceClient(cc)
stream, err := c.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("cc.FullDuplexCall failed: %f", err)
}
if err = stream.Send(&testgrpc.StreamingOutputCallRequest{}); err != nil {
t.Fatalf("stream.Send() failed: %v, should continue to work due to graceful stop", err)
}
// Configure with LDS with a filter chain that doesn't get matched to and a
// default filter chain that matches to RDS A.
ldsResource = e2e.ListenerResourceFallbackToDefault(host, port, e2e.SecurityLevelNone)
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{ldsResource},
Routes: []*v3routepb.RouteConfiguration{routeConfig1, routeConfig2, routeConfig3},
SkipValidation: true,
}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatalf("error updating management server: %v", err)
}
// xDS is eventually consistent. So simply poll for the new change to be
// reflected.
// "NonForwardingAction is expected for all Routes used on server-side; a
// route with an inappropriate action causes RPCs matching that route to
// fail with UNAVAILABLE." - A36
waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "the incoming RPC matched to a route that was not of action type non forwarding"))
// Stream should be allowed to continue on the old working configuration -
// as it on a connection that is gracefully closed (old FCM/LDS
// Configuration which is allowed to continue).
if err = stream.CloseSend(); err != nil {
t.Fatalf("stream.CloseSend() failed: %v, should continue to work due to graceful stop", err)
}
if _, err = stream.Recv(); err != io.EOF {
t.Fatalf("unexpected error: %v, expected an EOF error", err)
}
ldsResource = e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, "routeName")
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{ldsResource},
Routes: []*v3routepb.RouteConfiguration{routeConfig1, routeConfig2, routeConfig3},
SkipValidation: true,
}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
pollForSuccessfulRPC(ctx, t, cc)
}
func pollForSuccessfulRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn) {
t.Helper()
c := testgrpc.NewTestServiceClient(cc)
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
t.Fatalf("timeout waiting for RPCs to succeed")
case <-ticker.C:
if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err == nil {
return
}
}
}
}

View File

@ -22,10 +22,12 @@ import (
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"google.golang.org/grpc/credentials/tls/certprovider"
xdsinternal "google.golang.org/grpc/internal/credentials/xds"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
@ -36,8 +38,9 @@ import (
// key material from the certificate providers.
// 2. Implements the XDSHandshakeInfo() method used by the xdsCredentials to
// retrieve the configured certificate providers.
// 3. xDS filter_chain matching logic to select appropriate security
// configuration for the incoming connection.
// 3. xDS filter_chain configuration determines security configuration.
// 4. Dynamically reads routing configuration in UsableRouteConfiguration(), called
// to process incoming RPC's. (LDS + RDS configuration).
type connWrapper struct {
net.Conn
@ -58,14 +61,19 @@ type connWrapper struct {
deadlineMu sync.Mutex
deadline time.Time
mu sync.Mutex
st transport.ServerTransport
draining bool
// The virtual hosts with matchable routes and instantiated HTTP Filters per
// route.
virtualHosts []xdsresource.VirtualHostWithInterceptors
// route, or an error.
urc *atomic.Pointer[xdsresource.UsableRouteConfiguration]
}
// VirtualHosts returns the virtual hosts to be used for server side routing.
func (c *connWrapper) VirtualHosts() []xdsresource.VirtualHostWithInterceptors {
return c.virtualHosts
// UsableRouteConfiguration returns the UsableRouteConfiguration to be used for
// server side routing.
func (c *connWrapper) UsableRouteConfiguration() xdsresource.UsableRouteConfiguration {
return *c.urc.Load()
}
// SetDeadline makes a copy of the passed in deadline and forwards the call to
@ -121,6 +129,30 @@ func (c *connWrapper) XDSHandshakeInfo() (*xdsinternal.HandshakeInfo, error) {
return xdsinternal.NewHandshakeInfo(c.rootProvider, c.identityProvider, nil, secCfg.RequireClientCert), nil
}
// PassServerTransport drains the passed in ServerTransport if draining is set,
// or persists it to be drained once drained is called.
func (c *connWrapper) PassServerTransport(st transport.ServerTransport) {
c.mu.Lock()
defer c.mu.Unlock()
if c.draining {
st.Drain("draining")
} else {
c.st = st
}
}
// Drain drains the associated ServerTransport, or sets draining to true so it
// will be drained after it is created.
func (c *connWrapper) Drain() {
c.mu.Lock()
defer c.mu.Unlock()
if c.st == nil {
c.draining = true
} else {
c.st.Drain("draining")
}
}
// Close closes the providers and the underlying connection.
func (c *connWrapper) Close() error {
if c.identityProvider != nil {

View File

@ -21,13 +21,10 @@
package server
import (
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"unsafe"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/connectivity"
@ -58,13 +55,6 @@ var (
// non-nil error if the server has transitioned into not-serving mode.
type ServingModeCallback func(addr net.Addr, mode connectivity.ServingMode, err error)
// DrainCallback is the callback that an xDS-enabled server registers to get
// notified about updates to the Listener configuration. The server is expected
// to gracefully shutdown existing connections, thereby forcing clients to
// reconnect and have the new configuration applied to the newly created
// connections.
type DrainCallback func(addr net.Addr)
// XDSClient wraps the methods on the XDSClient which are required by
// the listenerWrapper.
type XDSClient interface {
@ -82,29 +72,23 @@ type ListenerWrapperParams struct {
XDSClient XDSClient
// ModeCallback is the callback to invoke when the serving mode changes.
ModeCallback ServingModeCallback
// DrainCallback is the callback to invoke when the Listener gets a LDS
// update.
DrainCallback DrainCallback
}
// NewListenerWrapper creates a new listenerWrapper with params. It returns a
// net.Listener and a channel which is written to, indicating that the former is
// ready to be passed to grpc.Serve().
// net.Listener. It starts in state not serving, which triggers Accept() +
// Close() on any incoming connections.
//
// Only TCP listeners are supported.
func NewListenerWrapper(params ListenerWrapperParams) (net.Listener, <-chan struct{}) {
func NewListenerWrapper(params ListenerWrapperParams) net.Listener {
lw := &listenerWrapper{
Listener: params.Listener,
name: params.ListenerResourceName,
xdsC: params.XDSClient,
modeCallback: params.ModeCallback,
drainCallback: params.DrainCallback,
isUnspecifiedAddr: params.Listener.Addr().(*net.TCPAddr).IP.IsUnspecified(),
mode: connectivity.ServingModeStarting,
closed: grpcsync.NewEvent(),
goodUpdate: grpcsync.NewEvent(),
rdsUpdateCh: make(chan rdsHandlerUpdate, 1),
mode: connectivity.ServingModeNotServing,
closed: grpcsync.NewEvent(),
}
lw.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[xds-server-listener %p] ", lw))
@ -113,14 +97,13 @@ func NewListenerWrapper(params ListenerWrapperParams) (net.Listener, <-chan stru
lisAddr := lw.Listener.Addr().String()
lw.addr, lw.port, _ = net.SplitHostPort(lisAddr)
lw.rdsHandler = newRDSHandler(lw.xdsC, lw.logger, lw.rdsUpdateCh)
lw.rdsHandler = newRDSHandler(lw.handleRDSUpdate, lw.xdsC, lw.logger)
lw.cancelWatch = xdsresource.WatchListener(lw.xdsC, lw.name, &ldsWatcher{
parent: lw,
logger: lw.logger,
name: lw.name,
})
go lw.run()
return lw, lw.goodUpdate.Done()
return lw
}
// listenerWrapper wraps the net.Listener associated with the listening address
@ -130,11 +113,10 @@ type listenerWrapper struct {
net.Listener
logger *internalgrpclog.PrefixLogger
name string
xdsC XDSClient
cancelWatch func()
modeCallback ServingModeCallback
drainCallback DrainCallback
name string
xdsC XDSClient
cancelWatch func()
modeCallback ServingModeCallback
// Set to true if the listener is bound to the IP_ANY address (which is
// "0.0.0.0" for IPv4 and "::" for IPv6).
@ -143,11 +125,6 @@ type listenerWrapper struct {
// Listener resource received from the control plane.
addr, port string
// This is used to notify that a good update has been received and that
// Serve() can be invoked on the underlying gRPC server. Using an event
// instead of a vanilla channel simplifies the update handler as it need not
// keep track of whether the received update is the first one or not.
goodUpdate *grpcsync.Event
// A small race exists in the XDSClient code between the receipt of an xDS
// response and the user cancelling the associated watch. In this window,
// the registered callback may be invoked after the watch is canceled, and
@ -156,24 +133,133 @@ type listenerWrapper struct {
// updates received in the callback if this event has fired.
closed *grpcsync.Event
// mu guards access to the current serving mode and the filter chains. The
// reason for using an rw lock here is that these fields are read in
// Accept() for all incoming connections, but writes happen rarely (when we
// get a Listener resource update).
// mu guards access to the current serving mode and the active filter chain
// manager.
mu sync.RWMutex
// Current serving mode.
mode connectivity.ServingMode
// Filter chains received as part of the last good update.
filterChains *xdsresource.FilterChainManager
// Filter chain manager currently serving.
activeFilterChainManager *xdsresource.FilterChainManager
// conns accepted with configuration from activeFilterChainManager.
conns []*connWrapper
// These fields are read/written to in the context of xDS updates, which are
// guaranteed to be emitted synchronously from the xDS Client. Thus, they do
// not need further synchronization.
// Pending filter chain manager. Will go active once rdsHandler has received
// all the RDS resources this filter chain manager needs.
pendingFilterChainManager *xdsresource.FilterChainManager
// rdsHandler is used for any dynamic RDS resources specified in a LDS
// update.
rdsHandler *rdsHandler
// rdsUpdates are the RDS resources received from the management
// server, keyed on the RouteName of the RDS resource.
rdsUpdates unsafe.Pointer // map[string]xdsclient.RouteConfigUpdate
// rdsUpdateCh is a channel for XDSClient RDS updates.
rdsUpdateCh chan rdsHandlerUpdate
}
func (l *listenerWrapper) handleLDSUpdate(update xdsresource.ListenerUpdate) {
ilc := update.InboundListenerCfg
// Make sure that the socket address on the received Listener resource
// matches the address of the net.Listener passed to us by the user. This
// check is done here instead of at the XDSClient layer because of the
// following couple of reasons:
// - XDSClient cannot know the listening address of every listener in the
// system, and hence cannot perform this check.
// - this is a very context-dependent check and only the server has the
// appropriate context to perform this check.
//
// What this means is that the XDSClient has ACKed a resource which can push
// the server into a "not serving" mode. This is not ideal, but this is
// what we have decided to do.
if ilc.Address != l.addr || ilc.Port != l.port {
l.mu.Lock()
l.switchModeLocked(connectivity.ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port))
l.mu.Unlock()
return
}
l.pendingFilterChainManager = ilc.FilterChains
l.rdsHandler.updateRouteNamesToWatch(ilc.FilterChains.RouteConfigNames)
if l.rdsHandler.determineRouteConfigurationReady() {
l.maybeUpdateFilterChains()
}
}
// maybeUpdateFilterChains swaps in the pending filter chain manager to the
// active one if the pending filter chain manager is present. If a swap occurs,
// it also drains (gracefully stops) any connections that were accepted on the
// old active filter chain manager, and puts this listener in state SERVING.
// Must be called within an xDS Client Callback.
func (l *listenerWrapper) maybeUpdateFilterChains() {
if l.pendingFilterChainManager == nil {
// Nothing to update, return early.
return
}
l.mu.Lock()
l.switchModeLocked(connectivity.ServingModeServing, nil)
// "Updates to a Listener cause all older connections on that Listener to be
// gracefully shut down with a grace period of 10 minutes for long-lived
// RPC's, such that clients will reconnect and have the updated
// configuration apply." - A36
var connsToClose []*connWrapper
if l.activeFilterChainManager != nil { // If there is a filter chain manager to clean up.
connsToClose = l.conns
l.conns = nil
}
l.activeFilterChainManager = l.pendingFilterChainManager
l.pendingFilterChainManager = nil
l.instantiateFilterChainRoutingConfigurationsLocked()
l.mu.Unlock()
go func() {
for _, conn := range connsToClose {
conn.Drain()
}
}()
}
// handleRDSUpdate rebuilds any routing configuration server side for any filter
// chains that point to this RDS, and potentially makes pending lds
// configuration to swap to be active.
func (l *listenerWrapper) handleRDSUpdate(routeName string, rcu rdsWatcherUpdate) {
// Update any filter chains that point to this route configuration.
if l.activeFilterChainManager != nil {
for _, fc := range l.activeFilterChainManager.FilterChains() {
if fc.RouteConfigName != routeName {
continue
}
if rcu.err != nil && rcu.data == nil { // Either NACK before update, or resource not found triggers this conditional.
fc.UsableRouteConfiguration.Store(&xdsresource.UsableRouteConfiguration{
Err: rcu.err,
})
continue
}
fc.UsableRouteConfiguration.Store(fc.ConstructUsableRouteConfiguration(*rcu.data))
}
}
if l.rdsHandler.determineRouteConfigurationReady() {
l.maybeUpdateFilterChains()
}
}
// instantiateFilterChainRoutingConfigurationsLocked instantiates all of the
// routing configuration for the newly active filter chains. For any inline
// route configurations, uses that, otherwise uses cached rdsHandler updates.
// Must be called within an xDS Client Callback.
func (l *listenerWrapper) instantiateFilterChainRoutingConfigurationsLocked() {
for _, fc := range l.activeFilterChainManager.FilterChains() {
if fc.InlineRouteConfig != nil {
fc.UsableRouteConfiguration.Store(fc.ConstructUsableRouteConfiguration(*fc.InlineRouteConfig)) // Can't race with an RPC coming in but no harm making atomic.
continue
} // Inline configuration constructed once here, will remain for lifetime of filter chain.
rcu := l.rdsHandler.updates[fc.RouteConfigName]
if rcu.err != nil && rcu.data == nil {
fc.UsableRouteConfiguration.Store(&xdsresource.UsableRouteConfiguration{Err: rcu.err})
continue
}
fc.UsableRouteConfiguration.Store(fc.ConstructUsableRouteConfiguration(*rcu.data)) // Can't race with an RPC coming in but no harm making atomic.
}
}
// Accept blocks on an Accept() on the underlying listener, and wraps the
@ -230,14 +316,15 @@ func (l *listenerWrapper) Accept() (net.Conn, error) {
conn.Close()
continue
}
fc, err := l.filterChains.Lookup(xdsresource.FilterChainLookupParams{
fc, err := l.activeFilterChainManager.Lookup(xdsresource.FilterChainLookupParams{
IsUnspecifiedListener: l.isUnspecifiedAddr,
DestAddr: destAddr.IP,
SourceAddr: srcAddr.IP,
SourcePort: srcAddr.Port,
})
l.mu.RUnlock()
if err != nil {
l.mu.RUnlock()
// When a matching filter chain is not found, we close the
// connection right away, but do not return an error back to
// `grpc.Serve()` from where this Accept() was invoked. Returning an
@ -253,36 +340,10 @@ func (l *listenerWrapper) Accept() (net.Conn, error) {
conn.Close()
continue
}
var rc xdsresource.RouteConfigUpdate
if fc.InlineRouteConfig != nil {
rc = *fc.InlineRouteConfig
} else {
rcPtr := atomic.LoadPointer(&l.rdsUpdates)
rcuPtr := (*map[string]xdsresource.RouteConfigUpdate)(rcPtr)
// This shouldn't happen, but this error protects against a panic.
if rcuPtr == nil {
return nil, errors.New("route configuration pointer is nil")
}
rcu := *rcuPtr
rc = rcu[fc.RouteConfigName]
}
// The filter chain will construct a usuable route table on each
// connection accept. This is done because preinstantiating every route
// table before it is needed for a connection would potentially lead to
// a lot of cpu time and memory allocated for route tables that will
// never be used. There was also a thought to cache this configuration,
// and reuse it for the next accepted connection. However, this would
// lead to a lot of code complexity (RDS Updates for a given route name
// can come it at any time), and connections aren't accepted too often,
// so this reinstantation of the Route Configuration is an acceptable
// tradeoff for simplicity.
vhswi, err := fc.ConstructUsableRouteConfiguration(rc)
if err != nil {
l.logger.Warningf("Failed to construct usable route configuration: %v", err)
conn.Close()
continue
}
return &connWrapper{Conn: conn, filterChain: fc, parent: l, virtualHosts: vhswi}, nil
cw := &connWrapper{Conn: conn, filterChain: fc, parent: l, urc: fc.UsableRouteConfiguration}
l.conns = append(l.conns, cw)
l.mu.RUnlock()
return cw, nil
}
}
@ -299,88 +360,11 @@ func (l *listenerWrapper) Close() error {
return nil
}
// run is a long running goroutine which handles all xds updates. LDS and RDS
// push updates onto a channel which is read and acted upon from this goroutine.
func (l *listenerWrapper) run() {
for {
select {
case <-l.closed.Done():
return
case u := <-l.rdsUpdateCh:
l.handleRDSUpdate(u)
}
}
}
// handleRDSUpdate handles a full rds update from rds handler. On a successful
// update, the server will switch to ServingModeServing as the full
// configuration (both LDS and RDS) has been received.
func (l *listenerWrapper) handleRDSUpdate(update rdsHandlerUpdate) {
if l.closed.HasFired() {
l.logger.Warningf("RDS received update: %v with error: %v, after listener was closed", update.updates, update.err)
return
}
if update.err != nil {
if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
l.switchMode(nil, connectivity.ServingModeNotServing, update.err)
}
// For errors which are anything other than "resource-not-found", we
// continue to use the old configuration.
return
}
atomic.StorePointer(&l.rdsUpdates, unsafe.Pointer(&update.updates))
l.switchMode(l.filterChains, connectivity.ServingModeServing, nil)
l.goodUpdate.Fire()
}
func (l *listenerWrapper) handleLDSUpdate(update xdsresource.ListenerUpdate) {
// Make sure that the socket address on the received Listener resource
// matches the address of the net.Listener passed to us by the user. This
// check is done here instead of at the XDSClient layer because of the
// following couple of reasons:
// - XDSClient cannot know the listening address of every listener in the
// system, and hence cannot perform this check.
// - this is a very context-dependent check and only the server has the
// appropriate context to perform this check.
//
// What this means is that the XDSClient has ACKed a resource which can push
// the server into a "not serving" mode. This is not ideal, but this is
// what we have decided to do. See gRPC A36 for more details.
ilc := update.InboundListenerCfg
if ilc.Address != l.addr || ilc.Port != l.port {
l.switchMode(nil, connectivity.ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port))
return
}
// "Updates to a Listener cause all older connections on that Listener to be
// gracefully shut down with a grace period of 10 minutes for long-lived
// RPC's, such that clients will reconnect and have the updated
// configuration apply." - A36 Note that this is not the same as moving the
// Server's state to ServingModeNotServing. That prevents new connections
// from being accepted, whereas here we simply want the clients to reconnect
// to get the updated configuration.
if l.drainCallback != nil {
l.drainCallback(l.Listener.Addr())
}
l.rdsHandler.updateRouteNamesToWatch(ilc.FilterChains.RouteConfigNames)
// If there are no dynamic RDS Configurations still needed to be received
// from the management server, this listener has all the configuration
// needed, and is ready to serve.
if len(ilc.FilterChains.RouteConfigNames) == 0 {
l.switchMode(ilc.FilterChains, connectivity.ServingModeServing, nil)
l.goodUpdate.Fire()
}
}
// switchMode updates the value of serving mode and filter chains stored in the
// listenerWrapper. And if the serving mode has changed, it invokes the
// registered mode change callback.
func (l *listenerWrapper) switchMode(fcs *xdsresource.FilterChainManager, newMode connectivity.ServingMode, err error) {
l.mu.Lock()
defer l.mu.Unlock()
l.filterChains = fcs
// switchModeLocked switches the current mode of the listener wrapper. It also
// gracefully closes any connections if the listener wrapper transitions into
// not serving. If the serving mode has changed, it invokes the registered mode
// change callback.
func (l *listenerWrapper) switchModeLocked(newMode connectivity.ServingMode, err error) {
if l.mode == newMode && l.mode == connectivity.ServingModeServing {
// Redundant updates are suppressed only when we are SERVING and the new
// mode is also SERVING. In the other case (where we are NOT_SERVING and the
@ -390,11 +374,34 @@ func (l *listenerWrapper) switchMode(fcs *xdsresource.FilterChainManager, newMod
return
}
l.mode = newMode
if l.mode == connectivity.ServingModeNotServing {
connsToClose := l.conns
l.conns = nil
go func() {
for _, conn := range connsToClose {
conn.Drain()
}
}()
}
// The XdsServer API will allow applications to register a "serving state"
// callback to be invoked when the server begins serving and when the
// server encounters errors that force it to be "not serving". If "not
// serving", the callback must be provided error information, for
// debugging use by developers - A36.
if l.modeCallback != nil {
l.modeCallback(l.Listener.Addr(), newMode, err)
}
}
func (l *listenerWrapper) onLDSResourceDoesNotExist(err error) {
l.mu.Lock()
defer l.mu.Unlock()
l.switchModeLocked(connectivity.ServingModeNotServing, err)
l.activeFilterChainManager = nil
l.pendingFilterChainManager = nil
l.rdsHandler.updateRouteNamesToWatch(make(map[string]bool))
}
// ldsWatcher implements the xdsresource.ListenerWatcher interface and is
// passed to the WatchListener API.
type ldsWatcher struct {
@ -434,6 +441,7 @@ func (lw *ldsWatcher) OnResourceDoesNotExist() {
if lw.logger.V(2) {
lw.logger.Infof("LDS watch for resource %q reported resource-does-not-exist error", lw.name)
}
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Listener not found in received response", lw.name)
lw.parent.switchMode(nil, connectivity.ServingModeNotServing, err)
lw.parent.onLDSResourceDoesNotExist(err)
}

View File

@ -20,59 +20,28 @@ package server
import (
"context"
"errors"
"fmt"
"net"
"strconv"
"testing"
"time"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
_ "google.golang.org/grpc/xds/internal/httpfilter/router" // Register the router filter
)
const (
fakeListenerHost = "0.0.0.0"
fakeListenerPort = 50051
defaultTestTimeout = 10 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond
)
type s struct {
grpctest.Tester
type verifyMode struct {
modeCh chan connectivity.ServingMode
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
// badListenerResource returns a listener resource for the given name which does
// not contain the `RouteSpecifier` field in the HTTPConnectionManager, and
// hence is expected to be NACKed by the client.
func badListenerResource(t *testing.T, name string) *v3listenerpb.Listener {
hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})},
})
return &v3listenerpb.Listener{
Name: name,
ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm},
FilterChains: []*v3listenerpb.FilterChain{{
Name: "filter-chain-name",
Filters: []*v3listenerpb.Filter{{
Name: wellknown.HTTPConnectionManager,
ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm},
}},
}},
}
func (vm *verifyMode) verifyModeCallback(_ net.Addr, mode connectivity.ServingMode, _ error) {
vm.modeCh <- mode
}
func hostPortFromListener(t *testing.T, lis net.Listener) (string, uint32) {
@ -89,121 +58,44 @@ func hostPortFromListener(t *testing.T, lis net.Listener) (string, uint32) {
return host, uint32(port)
}
// Creates a local TCP net.Listener and creates a listenerWrapper by passing
// that and the provided xDS client.
//
// Returns the following:
// - the ready channel of the listenerWrapper
// - host of the listener
// - port of the listener
// - listener resource name to use when requesting this resource from the
// management server
func createListenerWrapper(t *testing.T, xdsC XDSClient) (<-chan struct{}, string, uint32, string) {
// TestListenerWrapper tests the listener wrapper. It configures the listener
// wrapper with a certain LDS, and makes sure that it requests the LDS name. It
// then receives an LDS resource that points to an RDS for Route Configuration.
// The listener wrapper should then start a watch for the RDS name. This should
// not trigger a mode change (the mode starts out non serving). Then a RDS
// resource is configured to return for the RDS name. This should transition the
// Listener Wrapper to READY.
func (s) TestListenerWrapper(t *testing.T) {
mgmtServer, nodeID, ldsResourceNamesCh, rdsResourceNamesCh, xdsC := xdsSetupForTests(t)
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("Failed to create a local TCP listener: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
modeCh := make(chan connectivity.ServingMode, 1)
vm := verifyMode{
modeCh: modeCh,
}
host, port := hostPortFromListener(t, lis)
lisResourceName := fmt.Sprintf(e2e.ServerListenerResourceNameTemplate, net.JoinHostPort(host, strconv.Itoa(int(port))))
params := ListenerWrapperParams{
Listener: lis,
ListenerResourceName: lisResourceName,
XDSClient: xdsC,
ModeCallback: vm.verifyModeCallback,
}
l, readyCh := NewListenerWrapper(params)
l := NewListenerWrapper(params)
if l == nil {
t.Fatalf("NewListenerWrapper(%+v) returned nil", params)
}
t.Cleanup(func() { l.Close() })
return readyCh, host, port, lisResourceName
}
// Tests the case where a listenerWrapper is created and following happens:
//
// - the management server returns a Listener resource that is NACKed. Test
// verifies that the listenerWrapper does not become ready.
// - the management server returns a Listener resource that does not match the
// address to which our net.Listener is bound to. Test verifies that the
// listenerWrapper does not become ready.
// - the management server returns a Listener resource that that matches the
// address to which our net.Listener is bound to. Also, it contains an
// inline Route Configuration. Test verifies that the listenerWrapper
// becomes ready.
func (s) TestListenerWrapper_InlineRouteConfig(t *testing.T) {
mgmtServer, nodeID, ldsResourceNamesCh, _, xdsC := xdsSetupFoTests(t)
readyCh, host, port, lisResourceName := createListenerWrapper(t, xdsC)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
defer l.Close()
waitForResourceNames(ctx, t, ldsResourceNamesCh, []string{lisResourceName})
// Configure the management server with a listener resource that is expected
// to be NACKed.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{badListenerResource(t, lisResourceName)},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Verify that there is no message on the ready channel.
sCtx, sCtxCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCtxCancel()
select {
case <-readyCh:
t.Fatalf("Ready channel written to after receipt of a bad Listener update")
case <-sCtx.Done():
}
// Configure the management server with a listener resource that does not
// match the address to which our listener is bound to.
resources.Listeners = []*v3listenerpb.Listener{e2e.DefaultServerListener(host, port+1, e2e.SecurityLevelNone, route1)}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Verify that there is no message on the ready channel.
select {
case <-readyCh:
t.Fatalf("Ready channel written to after receipt of a bad Listener update")
case <-sCtx.Done():
}
// Configure the management server with a Listener resource that contains
// the expected host and port. Also, it does not contain any rds names that
// need reolution. Therefore the listenerWrapper is expected to become
// ready.
resources.Listeners = []*v3listenerpb.Listener{e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, route1)}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Verify that the listener wrapper becomes ready.
select {
case <-ctx.Done():
t.Fatalf("Timeout waiting for the ready channel to be written to after receipt of a good Listener update")
case <-readyCh:
}
}
// Tests the case where a listenerWrapper is created and the management server
// returns a Listener resource that specifies the name of a Route Configuration
// resource. The test verifies that the listenerWrapper does not become ready
// when waiting for the Route Configuration resource and becomes ready once it
// receives the Route Configuration resource.
func (s) TestListenerWrapper_RouteNames(t *testing.T) {
mgmtServer, nodeID, ldsResourceNamesCh, rdsResourceNamesCh, xdsC := xdsSetupFoTests(t)
readyCh, host, port, lisResourceName := createListenerWrapper(t, xdsC)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForResourceNames(ctx, t, ldsResourceNamesCh, []string{lisResourceName})
// Configure the management server with a listener resource that specifies
// the name of RDS resources that need to be resolved.
listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, route1)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, route1)},
@ -215,12 +107,12 @@ func (s) TestListenerWrapper_RouteNames(t *testing.T) {
waitForResourceNames(ctx, t, rdsResourceNamesCh, []string{route1})
// Verify that there is no message on the ready channel.
sCtx, sCtxCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCtxCancel()
// Verify that there is no mode change.
sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
select {
case <-readyCh:
t.Fatalf("Ready channel written to without rds configuration specified")
case mode := <-modeCh:
t.Fatalf("received mode change to %v when no mode expected", mode)
case <-sCtx.Done():
}
@ -231,186 +123,27 @@ func (s) TestListenerWrapper_RouteNames(t *testing.T) {
t.Fatal(err)
}
// All of the xDS updates have completed, so can expect to send a ping on
// good update channel.
// Mode should go serving.
select {
case <-ctx.Done():
t.Fatalf("timeout waiting for the ready channel to be written to after receipt of a good rds update")
case <-readyCh:
}
}
type tempError struct{}
func (tempError) Error() string {
return "listenerWrapper test temporary error"
}
func (tempError) Temporary() bool {
return true
}
// connAndErr wraps a net.Conn and an error.
type connAndErr struct {
conn net.Conn
err error
}
// fakeListener allows the user to inject conns returned by Accept().
type fakeListener struct {
acceptCh chan connAndErr
closeCh *testutils.Channel
}
func (fl *fakeListener) Accept() (net.Conn, error) {
cne, ok := <-fl.acceptCh
if !ok {
return nil, errors.New("a non-temporary error")
}
return cne.conn, cne.err
}
func (fl *fakeListener) Close() error {
fl.closeCh.Send(nil)
return nil
}
func (fl *fakeListener) Addr() net.Addr {
return &net.TCPAddr{
IP: net.IPv4(0, 0, 0, 0),
Port: fakeListenerPort,
}
}
// fakeConn overrides LocalAddr, RemoteAddr and Close methods.
type fakeConn struct {
net.Conn
local, remote net.Addr
closeCh *testutils.Channel
}
func (fc *fakeConn) LocalAddr() net.Addr {
return fc.local
}
func (fc *fakeConn) RemoteAddr() net.Addr {
return fc.remote
}
func (fc *fakeConn) Close() error {
fc.closeCh.Send(nil)
return nil
}
// Tests the case where a listenerWrapper is created with a fake net.Listener.
// The test verifies that the listenerWrapper becomes ready once it receives
// configuration from the management server. The test then performs the
// following:
// - injects a non-temp error via the fake net.Listener and verifies that
// Accept() returns with the same error.
// - injects a temp error via the fake net.Listener and verifies that Accept()
// backs off.
// - injects a fake net.Conn via the fake net.Listener. This Conn is expected
// to match the filter chains on the Listener resource. Verifies that
// Accept() does not return an error in this case.
func (s) TestListenerWrapper_Accept(t *testing.T) {
boCh := testutils.NewChannel()
origBackoffFunc := backoffFunc
backoffFunc = func(v int) time.Duration {
boCh.Send(v)
return 0
}
defer func() { backoffFunc = origBackoffFunc }()
mgmtServer, nodeID, ldsResourceNamesCh, _, xdsC := xdsSetupFoTests(t)
// Create a listener wrapper with a fake listener and verify that it
// extracts the host and port from the passed in listener.
lis := &fakeListener{
acceptCh: make(chan connAndErr, 1),
closeCh: testutils.NewChannel(),
}
lisResourceName := fmt.Sprintf(e2e.ServerListenerResourceNameTemplate, net.JoinHostPort(fakeListenerHost, strconv.Itoa(int(fakeListenerPort))))
params := ListenerWrapperParams{
Listener: lis,
ListenerResourceName: lisResourceName,
XDSClient: xdsC,
}
l, readyCh := NewListenerWrapper(params)
if l == nil {
t.Fatalf("NewListenerWrapper(%+v) returned nil", params)
}
lw, ok := l.(*listenerWrapper)
if !ok {
t.Fatalf("NewListenerWrapper(%+v) returned listener of type %T want *listenerWrapper", params, l)
}
if lw.addr != fakeListenerHost || lw.port != strconv.Itoa(fakeListenerPort) {
t.Fatalf("listenerWrapper has host:port %s:%s, want %s:%d", lw.addr, lw.port, fakeListenerHost, fakeListenerPort)
}
defer l.Close()
// Verify that the expected listener resource is requested.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForResourceNames(ctx, t, ldsResourceNamesCh, []string{lisResourceName})
// Configure the management server with a listener resource.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListener(fakeListenerHost, fakeListenerPort, e2e.SecurityLevelNone, route1)},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
t.Fatalf("timeout waiting for mode change")
case mode := <-modeCh:
if mode != connectivity.ServingModeServing {
t.Fatalf("mode change received: %v, want: %v", mode, connectivity.ServingModeServing)
}
}
// Verify that the listener wrapper becomes ready.
// Invoke lds resource not found - should go back to non serving.
if err := internal.TriggerXDSResourceNameNotFoundClient.(func(string, string) error)("ListenerResource", listener.GetName()); err != nil {
t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
}
select {
case <-ctx.Done():
t.Fatalf("Timeout waiting for the ready channel to be written to after receipt of a good Listener update")
case <-readyCh:
}
// Push a non-temporary error into Accept().
nonTempErr := errors.New("a non-temporary error")
lis.acceptCh <- connAndErr{err: nonTempErr}
if _, err := lw.Accept(); err != nonTempErr {
t.Fatalf("listenerWrapper.Accept() returned error: %v, want: %v", err, nonTempErr)
}
// Invoke Accept() in a goroutine since we expect it to swallow:
// 1. temporary errors returned from the underlying listener
// 2. errors related to finding a matching filter chain for the incoming
// connection.
errCh := testutils.NewChannel()
go func() {
conn, err := lw.Accept()
if err != nil {
errCh.Send(err)
return
t.Fatalf("timeout waiting for mode change")
case mode := <-modeCh:
if mode != connectivity.ServingModeNotServing {
t.Fatalf("mode change received: %v, want: %v", mode, connectivity.ServingModeNotServing)
}
if _, ok := conn.(*connWrapper); !ok {
errCh.Send(errors.New("listenerWrapper.Accept() returned a Conn of type %T, want *connWrapper"))
return
}
errCh.Send(nil)
}()
// Push a temporary error into Accept() and verify that it backs off.
lis.acceptCh <- connAndErr{err: tempError{}}
if _, err := boCh.Receive(ctx); err != nil {
t.Fatalf("error when waiting for Accept() to backoff on temporary errors: %v", err)
}
// Push a fakeConn which matches the filter chains configured on the
// received Listener resource. Verify that Accept() returns.
fc := &fakeConn{
local: &net.TCPAddr{IP: net.IPv4(192, 168, 1, 2)},
remote: &net.TCPAddr{IP: net.IPv4(192, 168, 1, 2), Port: 80},
closeCh: testutils.NewChannel(),
}
lis.acceptCh <- connAndErr{conn: fc}
if _, err := errCh.Receive(ctx); err != nil {
t.Fatalf("error when waiting for Accept() to return the conn on filter chain match: %v", err)
}
}

View File

@ -25,58 +25,64 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
// rdsHandlerUpdate wraps the full RouteConfigUpdate that are dynamically
// queried for a given server side listener.
type rdsHandlerUpdate struct {
updates map[string]xdsresource.RouteConfigUpdate
err error
}
// rdsHandler handles any RDS queries that need to be started for a given server
// side listeners Filter Chains (i.e. not inline).
// side listeners Filter Chains (i.e. not inline). It persists rdsWatcher
// updates for later use and also determines whether all the rdsWatcher updates
// needed have been received or not.
type rdsHandler struct {
xdsC XDSClient
logger *igrpclog.PrefixLogger
mu sync.Mutex
updates map[string]xdsresource.RouteConfigUpdate
cancels map[string]func()
callback func(string, rdsWatcherUpdate)
// For a rdsHandler update, the only update wrapped listener cares about is
// most recent one, so this channel will be opportunistically drained before
// sending any new updates.
updateChannel chan rdsHandlerUpdate
// updates is a map from routeName to rdsWatcher update, including
// RouteConfiguration resources and any errors received. If not written in
// this map, no RouteConfiguration or error for that route name yet. If
// update set in value, use that as valid route configuration, otherwise
// treat as an error case and fail at L7 level.
updates map[string]rdsWatcherUpdate
mu sync.Mutex
cancels map[string]func()
}
// newRDSHandler creates a new rdsHandler to watch for RDS resources.
// listenerWrapper updates the list of route names to watch by calling
// updateRouteNamesToWatch() upon receipt of new Listener configuration.
func newRDSHandler(xdsC XDSClient, logger *igrpclog.PrefixLogger, ch chan rdsHandlerUpdate) *rdsHandler {
// newRDSHandler creates a new rdsHandler to watch for RouteConfiguration
// resources. listenerWrapper updates the list of route names to watch by
// calling updateRouteNamesToWatch() upon receipt of new Listener configuration.
func newRDSHandler(cb func(string, rdsWatcherUpdate), xdsC XDSClient, logger *igrpclog.PrefixLogger) *rdsHandler {
return &rdsHandler{
xdsC: xdsC,
logger: logger,
updateChannel: ch,
updates: make(map[string]xdsresource.RouteConfigUpdate),
cancels: make(map[string]func()),
xdsC: xdsC,
logger: logger,
callback: cb,
updates: make(map[string]rdsWatcherUpdate),
cancels: make(map[string]func()),
}
}
// updateRouteNamesToWatch handles a list of route names to watch for a given
// server side listener (if a filter chain specifies dynamic RDS configuration).
// This function handles all the logic with respect to any routes that may have
// been added or deleted as compared to what was previously present.
// server side listener (if a filter chain specifies dynamic
// RouteConfiguration). This function handles all the logic with respect to any
// routes that may have been added or deleted as compared to what was previously
// present. Must be called within an xDS Client callback.
func (rh *rdsHandler) updateRouteNamesToWatch(routeNamesToWatch map[string]bool) {
rh.mu.Lock()
defer rh.mu.Unlock()
// Add and start watches for any routes for any new routes in
// routeNamesToWatch.
// Add and start watches for any new routes in routeNamesToWatch.
for routeName := range routeNamesToWatch {
if _, ok := rh.cancels[routeName]; !ok {
// The xDS client keeps a reference to the watcher until the cancel
// func is invoked. So, we don't need to keep a reference for fear
// of it being garbage collected.
w := &rdsWatcher{parent: rh, routeName: routeName}
rh.cancels[routeName] = xdsresource.WatchRouteConfig(rh.xdsC, routeName, w)
cancel := xdsresource.WatchRouteConfig(rh.xdsC, routeName, w)
// Set bit on cancel function to eat any RouteConfiguration calls
// for this watcher after it has been canceled.
rh.cancels[routeName] = func() {
w.mu.Lock()
w.canceled = true
w.mu.Unlock()
cancel()
}
}
}
@ -89,35 +95,29 @@ func (rh *rdsHandler) updateRouteNamesToWatch(routeNamesToWatch map[string]bool)
delete(rh.updates, routeName)
}
}
// If the full list (determined by length) of updates are now successfully
// updated, the listener is ready to be updated.
if len(rh.updates) == len(rh.cancels) && len(routeNamesToWatch) != 0 {
drainAndPush(rh.updateChannel, rdsHandlerUpdate{updates: rh.updates})
}
}
// handleRouteUpdate persists the route config for a given route name, and also
// sends an update to the Listener Wrapper on an error received or if the rds
// handler has a full collection of updates.
func (rh *rdsHandler) handleRouteUpdate(routeName string, update xdsresource.RouteConfigUpdate) {
rh.mu.Lock()
defer rh.mu.Unlock()
rh.updates[routeName] = update
// If the full list (determined by length) of updates have successfully
// updated, the listener is ready to be updated.
if len(rh.updates) == len(rh.cancels) {
drainAndPush(rh.updateChannel, rdsHandlerUpdate{updates: rh.updates})
}
// determines if all dynamic RouteConfiguration needed has received
// configuration or update. Must be called from an xDS Client Callback.
func (rh *rdsHandler) determineRouteConfigurationReady() bool {
// Safe to read cancels because only written to in other parts of xDS Client
// Callbacks, which are sync.
return len(rh.updates) == len(rh.cancels)
}
func drainAndPush(ch chan rdsHandlerUpdate, update rdsHandlerUpdate) {
select {
case <-ch:
default:
// Must be called from an xDS Client Callback.
func (rh *rdsHandler) handleRouteUpdate(routeName string, update rdsWatcherUpdate) {
rwu := rh.updates[routeName]
// Accept the new update if any of the following are true:
// 1. we had no valid update data.
// 2. the update is valid.
// 3. the update error is ResourceNotFound.
if rwu.data == nil || update.err == nil || xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
rwu = update
}
ch <- update
rh.updates[routeName] = rwu
rh.callback(routeName, rwu)
}
// close() is meant to be called by wrapped listener when the wrapped listener
@ -131,32 +131,58 @@ func (rh *rdsHandler) close() {
}
}
type rdsWatcherUpdate struct {
data *xdsresource.RouteConfigUpdate
err error
}
// rdsWatcher implements the xdsresource.RouteConfigWatcher interface and is
// passed to the WatchRouteConfig API.
type rdsWatcher struct {
parent *rdsHandler
logger *igrpclog.PrefixLogger
routeName string
mu sync.Mutex
canceled bool // eats callbacks if true
}
func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
rw.mu.Lock()
if rw.canceled {
rw.mu.Unlock()
return
}
rw.mu.Unlock()
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource)
}
rw.parent.handleRouteUpdate(rw.routeName, update.Resource)
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &update.Resource})
}
func (rw *rdsWatcher) OnError(err error) {
rw.mu.Lock()
if rw.canceled {
rw.mu.Unlock()
return
}
rw.mu.Unlock()
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q reported error: %v", rw.routeName, err)
}
drainAndPush(rw.parent.updateChannel, rdsHandlerUpdate{err: err})
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err})
}
func (rw *rdsWatcher) OnResourceDoesNotExist() {
rw.mu.Lock()
if rw.canceled {
rw.mu.Unlock()
return
}
rw.mu.Unlock()
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q reported resource-does-not-exist error: %v", rw.routeName)
}
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type RouteConfiguration not found in received response", rw.routeName)
drainAndPush(rw.parent.updateChannel, rdsHandlerUpdate{err: err})
rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err})
}

View File

@ -21,22 +21,33 @@ package server
import (
"context"
"fmt"
"strings"
"testing"
"time"
wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
const (
defaultTestTimeout = 10 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
)
const (
listenerName = "listener"
clusterName = "cluster"
@ -44,9 +55,10 @@ const (
route1 = "route1"
route2 = "route2"
route3 = "route3"
route4 = "route4"
)
// xdsSetupFoTests performs the following setup actions:
// xdsSetupForTests performs the following setup actions:
// - spins up an xDS management server
// - creates an xDS client with a bootstrap configuration pointing to the above
// management server
@ -57,7 +69,7 @@ const (
// - a channel to read lds resource names received by the management server
// - a channel to read rds resource names received by the management server
// - an xDS client to pass to the rdsHandler under test
func xdsSetupFoTests(t *testing.T) (*e2e.ManagementServer, string, chan []string, chan []string, xdsclient.XDSClient) {
func xdsSetupForTests(t *testing.T) (*e2e.ManagementServer, string, chan []string, chan []string, xdsclient.XDSClient) {
t.Helper()
ldsNamesCh := make(chan []string, 1)
@ -68,7 +80,7 @@ func xdsSetupFoTests(t *testing.T) (*e2e.ManagementServer, string, chan []string
mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
switch req.GetTypeUrl() {
case version.V3ListenerURL:
case version.V3ListenerURL: // Waits on the listener, and route config below...
select {
case <-ldsNamesCh:
default:
@ -77,7 +89,7 @@ func xdsSetupFoTests(t *testing.T) (*e2e.ManagementServer, string, chan []string
case ldsNamesCh <- req.GetResourceNames():
default:
}
case version.V3RouteConfigURL:
case version.V3RouteConfigURL: // waits on route config names here...
select {
case <-rdsNamesCh:
default:
@ -122,27 +134,6 @@ func waitForResourceNames(ctx context.Context, t *testing.T, namesCh chan []stri
t.Fatalf("Timeout waiting for resource to be requested from the management server")
}
// Waits for an update to be pushed on updateCh and compares it to wantUpdate.
// Fails the test by calling t.Fatal if the context expires or if the update
// received on the channel does not match wantUpdate.
func verifyUpdateFromChannel(ctx context.Context, t *testing.T, updateCh chan rdsHandlerUpdate, wantUpdate rdsHandlerUpdate) {
t.Helper()
opts := []cmp.Option{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw"),
cmp.AllowUnexported(rdsHandlerUpdate{}),
}
select {
case gotUpdate := <-updateCh:
if diff := cmp.Diff(gotUpdate, wantUpdate, opts...); diff != "" {
t.Fatalf("Got unexpected route config update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
t.Fatal("Timed out waiting for a route config update")
}
}
func routeConfigResourceForName(name string) *v3routepb.RouteConfiguration {
return e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: name,
@ -152,76 +143,48 @@ func routeConfigResourceForName(name string) *v3routepb.RouteConfiguration {
})
}
var defaultRouteConfigUpdate = xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{{
Domains: []string{listenerName},
Routes: []*xdsresource.Route{{
Prefix: newStringP("/"),
ActionType: xdsresource.RouteActionRoute,
WeightedClusters: map[string]xdsresource.WeightedCluster{clusterName: {Weight: 1}},
}},
}},
type testCallbackVerify struct {
ch chan callbackStruct
}
// Tests the simplest scenario: the rds handler receives a single route name.
//
// The test verifies the following:
// - the handler starts a watch for the given route name
// - once an update it received from the management server, it is pushed to the
// update channel
// - once the handler is closed, the watch for the route name is canceled.
func (s) TestRDSHandler_SuccessCaseOneRDSWatch(t *testing.T) {
mgmtServer, nodeID, _, rdsNamesCh, xdsC := xdsSetupFoTests(t)
// Configure the management server with a route config resource.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
resources := e2e.UpdateOptions{
NodeID: nodeID,
Routes: []*v3routepb.RouteConfiguration{routeConfigResourceForName(route1)},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Create an rds handler and give it a single route to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
rh := newRDSHandler(xdsC, nil, updateCh)
rh.updateRouteNamesToWatch(map[string]bool{route1: true})
// Verify that the given route is requested.
waitForResourceNames(ctx, t, rdsNamesCh, []string{route1})
// Verify that the update is pushed to the handler's update channel.
wantUpdate := rdsHandlerUpdate{updates: map[string]xdsresource.RouteConfigUpdate{route1: defaultRouteConfigUpdate}}
verifyUpdateFromChannel(ctx, t, updateCh, wantUpdate)
// Close the rds handler and verify that the watch is canceled.
rh.close()
waitForResourceNames(ctx, t, rdsNamesCh, []string{})
type callbackStruct struct {
routeName string
rwu rdsWatcherUpdate
}
// Tests the case where the rds handler receives two route names to watch. The
// test verifies that when the handler receives only once of them, it does not
// push an update on the channel. And when the handler receives the second
// route, the test verifies that the update is pushed.
func (s) TestRDSHandler_SuccessCaseTwoUpdates(t *testing.T) {
mgmtServer, nodeID, _, rdsNamesCh, xdsC := xdsSetupFoTests(t)
func (tcv *testCallbackVerify) testCallback(routeName string, rwu rdsWatcherUpdate) {
tcv.ch <- callbackStruct{routeName: routeName, rwu: rwu}
}
// Create an rds handler and give it a single route to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
rh := newRDSHandler(xdsC, nil, updateCh)
rh.updateRouteNamesToWatch(map[string]bool{route1: true})
func verifyRouteName(ctx context.Context, t *testing.T, ch chan callbackStruct, want callbackStruct) {
t.Helper()
select {
case got := <-ch:
if diff := cmp.Diff(got.routeName, want.routeName); diff != "" {
t.Fatalf("unexpected update received (-got, +want):%v, want: %v", got, want)
}
case <-ctx.Done():
t.Fatalf("timeout waiting for callback")
}
}
// TestRDSHandler tests the RDS Handler. It first configures the rds handler to
// watch route 1 and 2. Before receiving both RDS updates, it should not be
// ready, but after receiving both, it should be ready. It then tells the rds
// handler to watch route 1 2 and 3. It should not be ready until it receives
// route3 from the management server. It then configures the rds handler to
// watch route 1 and 3. It should immediately be ready. It then configures the
// rds handler to watch route 1 and 4. It should not be ready until it receives
// an rds update for route 4.
func (s) TestRDSHandler(t *testing.T) {
mgmtServer, nodeID, _, rdsNamesCh, xdsC := xdsSetupForTests(t)
ch := make(chan callbackStruct, 1)
tcv := &testCallbackVerify{ch: ch}
rh := newRDSHandler(tcv.testCallback, xdsC, nil)
// Verify that the given route is requested.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForResourceNames(ctx, t, rdsNamesCh, []string{route1})
// Update the rds handler to watch for two routes.
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true})
waitForResourceNames(ctx, t, rdsNamesCh, []string{route1, route2})
// Configure the management server with a single route config resource.
routeResource1 := routeConfigResourceForName(route1)
@ -234,259 +197,64 @@ func (s) TestRDSHandler_SuccessCaseTwoUpdates(t *testing.T) {
t.Fatal(err)
}
// Verify that the rds handler does not send an update.
sCtx, sCtxCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCtxCancel()
select {
case <-updateCh:
t.Fatal("RDS Handler wrote an update to updateChannel when it shouldn't have, as each route name has not received an update yet")
case <-sCtx.Done():
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true})
waitForResourceNames(ctx, t, rdsNamesCh, []string{route1, route2})
verifyRouteName(ctx, t, ch, callbackStruct{routeName: route1})
// The rds handler update should not be ready.
if got := rh.determineRouteConfigurationReady(); got != false {
t.Fatalf("rh.determineRouteConfigurationReady: %v, want: false", false)
}
// Configure the management server with both route config resources.
// Configure the management server both route config resources.
routeResource2 := routeConfigResourceForName(route2)
resources.Routes = []*v3routepb.RouteConfiguration{routeResource1, routeResource2}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Verify that the update is pushed to the handler's update channel.
wantUpdate := rdsHandlerUpdate{
updates: map[string]xdsresource.RouteConfigUpdate{
route1: defaultRouteConfigUpdate,
route2: defaultRouteConfigUpdate,
},
}
verifyUpdateFromChannel(ctx, t, updateCh, wantUpdate)
verifyRouteName(ctx, t, ch, callbackStruct{routeName: route2})
// Close the rds handler and verify that the watch is canceled.
rh.close()
waitForResourceNames(ctx, t, rdsNamesCh, []string{})
}
// Tests the case where the rds handler receives an update with two routes, then
// receives an update with only one route. The rds handler is expected to cancel
// the watch for the route no longer present, and push a corresponding update
// with only one route.
func (s) TestRDSHandler_SuccessCaseDeletedRoute(t *testing.T) {
mgmtServer, nodeID, _, rdsNamesCh, xdsC := xdsSetupFoTests(t)
// Create an rds handler and give it two routes to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
rh := newRDSHandler(xdsC, nil, updateCh)
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true})
// Verify that the given routes are requested.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForResourceNames(ctx, t, rdsNamesCh, []string{route1, route2})
// Configure the management server with two route config resources.
routeResource1 := routeConfigResourceForName(route1)
routeResource2 := routeConfigResourceForName(route2)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Routes: []*v3routepb.RouteConfiguration{routeResource1, routeResource2},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
if got := rh.determineRouteConfigurationReady(); got != true {
t.Fatalf("rh.determineRouteConfigurationReady: %v, want: true", got)
}
// Verify that the update is pushed to the handler's update channel.
wantUpdate := rdsHandlerUpdate{
updates: map[string]xdsresource.RouteConfigUpdate{
route1: defaultRouteConfigUpdate,
route2: defaultRouteConfigUpdate,
},
}
verifyUpdateFromChannel(ctx, t, updateCh, wantUpdate)
// Update the handler to watch only one of the two previous routes.
rh.updateRouteNamesToWatch(map[string]bool{route1: true})
// Verify that the other route is no longer requested.
waitForResourceNames(ctx, t, rdsNamesCh, []string{route1})
// Verify that an update is pushed with only one route.
wantUpdate = rdsHandlerUpdate{updates: map[string]xdsresource.RouteConfigUpdate{route1: defaultRouteConfigUpdate}}
verifyUpdateFromChannel(ctx, t, updateCh, wantUpdate)
// Close the rds handler and verify that the watch is canceled.
rh.close()
waitForResourceNames(ctx, t, rdsNamesCh, []string{})
}
// Tests the case where the rds handler receives an update with two routes, and
// then receives an update with two routes, one previously there and one added
// (i.e. 12 -> 23). This should cause the route that is no longer there to be
// deleted and cancelled, and the route that was added should have a watch
// started for it. The test also verifies that an update is not pushed by the
// rds handler until the newly added route config resource is received from the
// management server.
func (s) TestRDSHandler_SuccessCaseTwoUpdatesAddAndDeleteRoute(t *testing.T) {
mgmtServer, nodeID, _, rdsNamesCh, xdsC := xdsSetupFoTests(t)
// Create an rds handler and give it two routes to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
rh := newRDSHandler(xdsC, nil, updateCh)
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true})
// Verify that the given routes are requested.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForResourceNames(ctx, t, rdsNamesCh, []string{route1, route2})
// Configure the management server with two route config resources.
routeResource1 := routeConfigResourceForName(route1)
routeResource2 := routeConfigResourceForName(route2)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Routes: []*v3routepb.RouteConfiguration{routeResource1, routeResource2},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Verify that the update is pushed to the handler's update channel.
wantUpdate := rdsHandlerUpdate{
updates: map[string]xdsresource.RouteConfigUpdate{
route1: defaultRouteConfigUpdate,
route2: defaultRouteConfigUpdate,
},
}
verifyUpdateFromChannel(ctx, t, updateCh, wantUpdate)
// Update the handler with a route that was already there and a new route.
rh.updateRouteNamesToWatch(map[string]bool{route2: true, route3: true})
waitForResourceNames(ctx, t, rdsNamesCh, []string{route2, route3})
// The handler should not send an update.
sCtx, sCtxCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCtxCancel()
select {
case <-updateCh:
t.Fatalf("RDS Handler wrote an update to updateChannel when it shouldn't have, as each route name has not received an update yet")
case <-sCtx.Done():
}
// Configure the management server with the third resource.
routeResource3 := routeConfigResourceForName(route3)
resources.Routes = append(resources.Routes, routeResource3)
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Verify that the update is pushed to the handler's update channel.
wantUpdate = rdsHandlerUpdate{
updates: map[string]xdsresource.RouteConfigUpdate{
route2: defaultRouteConfigUpdate,
route3: defaultRouteConfigUpdate,
},
}
verifyUpdateFromChannel(ctx, t, updateCh, wantUpdate)
// Close the rds handler and verify that the watch is canceled.
rh.close()
waitForResourceNames(ctx, t, rdsNamesCh, []string{})
}
// Tests the scenario where the rds handler gets told to watch three rds
// configurations, gets two successful updates, then gets told to watch only
// those two. The rds handler should then write an update to update buffer.
func (s) TestRDSHandler_SuccessCaseSecondUpdateMakesRouteFull(t *testing.T) {
mgmtServer, nodeID, _, rdsNamesCh, xdsC := xdsSetupFoTests(t)
// Create an rds handler and give it three routes to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
rh := newRDSHandler(xdsC, nil, updateCh)
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true, route3: true})
// Verify that the given routes are requested.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForResourceNames(ctx, t, rdsNamesCh, []string{route1, route2, route3})
// Configure the management server with two route config resources.
routeResource1 := routeConfigResourceForName(route1)
routeResource2 := routeConfigResourceForName(route2)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Routes: []*v3routepb.RouteConfiguration{routeResource1, routeResource2},
SkipValidation: true,
if got := rh.determineRouteConfigurationReady(); got != false {
t.Fatalf("rh.determineRouteConfigurationReady: %v, want: false", got)
}
// Configure the management server with route config resources.
routeResource3 := routeConfigResourceForName(route3)
resources.Routes = []*v3routepb.RouteConfiguration{routeResource1, routeResource2, routeResource3}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
verifyRouteName(ctx, t, ch, callbackStruct{routeName: route3})
// The handler should not send an update.
sCtx, sCtxCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCtxCancel()
select {
case <-rh.updateChannel:
t.Fatalf("RDS Handler wrote an update to updateChannel when it shouldn't have, as each route name has not received an update yet")
case <-sCtx.Done():
if got := rh.determineRouteConfigurationReady(); got != true {
t.Fatalf("rh.determineRouteConfigurationReady: %v, want: true", got)
}
// Update to route 1 and route 2. Should immediately go ready.
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route3: true})
if got := rh.determineRouteConfigurationReady(); got != true {
t.Fatalf("rh.determineRouteConfigurationReady: %v, want: true", got)
}
// Tell the rds handler to now only watch Route 1 and Route 2. This should
// trigger the rds handler to write an update to the update buffer as it now
// has full rds configuration.
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true})
wantUpdate := rdsHandlerUpdate{
updates: map[string]xdsresource.RouteConfigUpdate{
route1: defaultRouteConfigUpdate,
route2: defaultRouteConfigUpdate,
},
}
waitForResourceNames(ctx, t, rdsNamesCh, []string{route1, route2})
verifyUpdateFromChannel(ctx, t, updateCh, wantUpdate)
// Close the rds handler and verify that the watch is canceled.
rh.close()
waitForResourceNames(ctx, t, rdsNamesCh, []string{})
}
// TestErrorReceived tests the case where the rds handler receives a route name
// to watch, then receives an update with an error. This error should be then
// written to the update channel.
func (s) TestErrorReceived(t *testing.T) {
mgmtServer, nodeID, _, rdsNamesCh, xdsC := xdsSetupFoTests(t)
// Create an rds handler and give it a single route to watch.
updateCh := make(chan rdsHandlerUpdate, 1)
rh := newRDSHandler(xdsC, nil, updateCh)
rh.updateRouteNamesToWatch(map[string]bool{route1: true})
// Verify that the given route is requested.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForResourceNames(ctx, t, rdsNamesCh, []string{route1})
// Configure the management server with a single route config resource, that
// is expected to be NACKed.
routeResource := routeConfigResourceForName(route1)
routeResource.VirtualHosts[0].RetryPolicy = &v3routepb.RetryPolicy{NumRetries: &wrapperspb.UInt32Value{Value: 0}}
resources := e2e.UpdateOptions{
NodeID: nodeID,
Routes: []*v3routepb.RouteConfiguration{routeResource},
SkipValidation: true,
// Update to route 1 and route 4. No route 4, so should not be ready.
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route4: true})
waitForResourceNames(ctx, t, rdsNamesCh, []string{route1, route4})
if got := rh.determineRouteConfigurationReady(); got != false {
t.Fatalf("rh.determineRouteConfigurationReady: %v, want: false", got)
}
routeResource4 := routeConfigResourceForName(route4)
resources.Routes = []*v3routepb.RouteConfiguration{routeResource1, routeResource2, routeResource3, routeResource4}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
const wantErr = "received route is invalid: retry_policy.num_retries = 0; must be >= 1"
select {
case update := <-updateCh:
if !strings.Contains(update.err.Error(), wantErr) {
t.Fatalf("Update received with error %v, want error containing %v", update.err, wantErr)
}
case <-ctx.Done():
t.Fatal("Timed out waiting for update from update channel")
verifyRouteName(ctx, t, ch, callbackStruct{routeName: route4})
if got := rh.determineRouteConfigurationReady(); got != true {
t.Fatalf("rh.determineRouteConfigurationReady: %v, want: true", got)
}
}
func newStringP(s string) *string {
return &s
}

View File

@ -542,6 +542,32 @@ func (a *authority) handleWatchTimerExpiry(rType xdsresource.Type, resourceName
}
}
func (a *authority) triggerResourceNotFoundForTesting(rType xdsresource.Type, resourceName string) {
a.resourcesMu.Lock()
defer a.resourcesMu.Unlock()
if a.closed {
return
}
resourceStates := a.resources[rType]
state, ok := resourceStates[resourceName]
if !ok {
return
}
// if watchStateTimeout already triggered resource not found above from
// normal watch expiry.
if state.wState == watchStateCanceled || state.wState == watchStateTimeout {
return
}
state.wState = watchStateTimeout
state.cache = nil
state.md = xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusNotExist}
for watcher := range state.watchers {
watcher := watcher
a.serializer.Schedule(func(context.Context) { watcher.OnResourceDoesNotExist() })
}
}
// sendDiscoveryRequestLocked sends a discovery request for the specified
// resource type and resource names. Even though this method does not directly
// access the resource cache, it is important that `resourcesMu` be beld when

View File

@ -24,11 +24,14 @@ import (
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/cache"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
// New returns a new xDS client configured by the bootstrap file specified in env
@ -100,6 +103,17 @@ func NewWithConfigForTesting(config *bootstrap.Config, watchExpiryTimeout, autho
return cl, grpcsync.OnceFunc(cl.close), nil
}
func init() {
internal.TriggerXDSResourceNameNotFoundClient = triggerXDSResourceNameNotFoundClient
}
var singletonClientForTesting = atomic.Pointer[clientRefCounted]{}
func triggerXDSResourceNameNotFoundClient(resourceType, resourceName string) error {
c := singletonClientForTesting.Load()
return internal.TriggerXDSResourceNameNotFoundForTesting.(func(func(xdsresource.Type, string) error, string, string) error)(c.clientImpl.triggerResourceNotFoundForTesting, resourceType, resourceName)
}
// NewWithBootstrapContentsForTesting returns an xDS client for this config,
// separate from the global singleton.
//
@ -123,12 +137,14 @@ func NewWithBootstrapContentsForTesting(contents []byte) (XDSClient, func(), err
if err != nil {
return nil, nil, err
}
singletonClientForTesting.Store(c)
return c, grpcsync.OnceFunc(func() {
clientsMu.Lock()
defer clientsMu.Unlock()
if c.decrRef() == 0 {
c.close()
delete(clients, string(contents))
singletonClientForTesting.Store(nil)
}
}), nil
}

View File

@ -103,3 +103,19 @@ func (r *resourceTypeRegistry) maybeRegister(rType xdsresource.Type) error {
r.types[url] = rType
return nil
}
func (c *clientImpl) triggerResourceNotFoundForTesting(rType xdsresource.Type, resourceName string) error {
// Return early if the client is already closed.
if c == nil || c.done.HasFired() {
return fmt.Errorf("attempt to trigger resource-not-found-error for resource %q of type %q, but client is closed", rType.TypeName(), resourceName)
}
n := xdsresource.ParseName(resourceName)
a, unref, err := c.findAuthority(n)
if err != nil {
return fmt.Errorf("attempt to trigger resource-not-found-error for resource %q of type %q, but authority %q is not found", rType.TypeName(), resourceName, n.Authority)
}
defer unref()
a.triggerResourceNotFoundForTesting(rType, n.String())
return nil
}

View File

@ -21,15 +21,17 @@ import (
"errors"
"fmt"
"net"
"sync/atomic"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
v3tlspb "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/xds/internal/httpfilter"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
v3tlspb "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
)
const (
@ -66,6 +68,9 @@ type FilterChain struct {
//
// Exactly one of RouteConfigName and InlineRouteConfig is set.
InlineRouteConfig *RouteConfigUpdate
// UsableRouteConfiguration is the routing configuration for this filter
// chain (LDS + RDS).
UsableRouteConfiguration *atomic.Pointer[UsableRouteConfiguration]
}
// VirtualHostWithInterceptors captures information present in a VirtualHost
@ -92,21 +97,28 @@ type RouteWithInterceptors struct {
Interceptors []resolver.ServerInterceptor
}
type UsableRouteConfiguration struct {
VHS []VirtualHostWithInterceptors
Err error
}
// ConstructUsableRouteConfiguration takes Route Configuration and converts it
// into matchable route configuration, with instantiated HTTP Filters per route.
func (f *FilterChain) ConstructUsableRouteConfiguration(config RouteConfigUpdate) ([]VirtualHostWithInterceptors, error) {
func (fc *FilterChain) ConstructUsableRouteConfiguration(config RouteConfigUpdate) *UsableRouteConfiguration {
vhs := make([]VirtualHostWithInterceptors, len(config.VirtualHosts))
for _, vh := range config.VirtualHosts {
vhwi, err := f.convertVirtualHost(vh)
vhwi, err := fc.convertVirtualHost(vh)
if err != nil {
return nil, fmt.Errorf("virtual host construction: %v", err)
// Non nil if (lds + rds) fails, shouldn't happen since validated by
// xDS Client, treat as L7 error but shouldn't happen.
return &UsableRouteConfiguration{Err: fmt.Errorf("virtual host construction: %v", err)}
}
vhs = append(vhs, vhwi)
}
return vhs, nil
return &UsableRouteConfiguration{VHS: vhs}
}
func (f *FilterChain) convertVirtualHost(virtualHost *VirtualHost) (VirtualHostWithInterceptors, error) {
func (fc *FilterChain) convertVirtualHost(virtualHost *VirtualHost) (VirtualHostWithInterceptors, error) {
rs := make([]RouteWithInterceptors, len(virtualHost.Routes))
for i, r := range virtualHost.Routes {
var err error
@ -115,7 +127,7 @@ func (f *FilterChain) convertVirtualHost(virtualHost *VirtualHost) (VirtualHostW
if err != nil {
return VirtualHostWithInterceptors{}, fmt.Errorf("matcher construction: %v", err)
}
for _, filter := range f.HTTPFilters {
for _, filter := range fc.HTTPFilters {
// Route is highest priority on server side, as there is no concept
// of an upstream cluster on server side.
override := r.HTTPFilterConfigOverride[filter.Name]
@ -194,6 +206,9 @@ type FilterChainManager struct {
def *FilterChain // Default filter chain, if specified.
// Slice of filter chains managed by this filter chain manager.
fcs []*FilterChain
// RouteConfigNames are the route configuration names which need to be
// dynamically queried for RDS Configuration for any FilterChains which
// specify to load RDS Configuration dynamically.
@ -284,6 +299,9 @@ func NewFilterChainManager(lis *v3listenerpb.Listener) (*FilterChainManager, err
}
}
fci.def = def
if fci.def != nil {
fci.fcs = append(fci.fcs, fci.def)
}
// If there are no supported filter chains and no default filter chain, we
// fail here. This will call the Listener resource to be NACK'ed.
@ -295,10 +313,10 @@ func NewFilterChainManager(lis *v3listenerpb.Listener) (*FilterChainManager, err
// addFilterChains parses the filter chains in fcs and adds the required
// internal data structures corresponding to the match criteria.
func (fci *FilterChainManager) addFilterChains(fcs []*v3listenerpb.FilterChain) error {
func (fcm *FilterChainManager) addFilterChains(fcs []*v3listenerpb.FilterChain) error {
for _, fc := range fcs {
fcm := fc.GetFilterChainMatch()
if fcm.GetDestinationPort().GetValue() != 0 {
fcMatch := fc.GetFilterChainMatch()
if fcMatch.GetDestinationPort().GetValue() != 0 {
// Destination port is the first match criteria and we do not
// support filter chains which contains this match criteria.
logger.Warningf("Dropping filter chain %+v since it contains unsupported destination_port match field", fc)
@ -306,7 +324,7 @@ func (fci *FilterChainManager) addFilterChains(fcs []*v3listenerpb.FilterChain)
}
// Build the internal representation of the filter chain match fields.
if err := fci.addFilterChainsForDestPrefixes(fc); err != nil {
if err := fcm.addFilterChainsForDestPrefixes(fc); err != nil {
return err
}
}
@ -314,7 +332,7 @@ func (fci *FilterChainManager) addFilterChains(fcs []*v3listenerpb.FilterChain)
return nil
}
func (fci *FilterChainManager) addFilterChainsForDestPrefixes(fc *v3listenerpb.FilterChain) error {
func (fcm *FilterChainManager) addFilterChainsForDestPrefixes(fc *v3listenerpb.FilterChain) error {
ranges := fc.GetFilterChainMatch().GetPrefixRanges()
dstPrefixes := make([]*net.IPNet, 0, len(ranges))
for _, pr := range ranges {
@ -329,24 +347,24 @@ func (fci *FilterChainManager) addFilterChainsForDestPrefixes(fc *v3listenerpb.F
if len(dstPrefixes) == 0 {
// Use the unspecified entry when destination prefix is unspecified, and
// set the `net` field to nil.
if fci.dstPrefixMap[unspecifiedPrefixMapKey] == nil {
fci.dstPrefixMap[unspecifiedPrefixMapKey] = &destPrefixEntry{}
if fcm.dstPrefixMap[unspecifiedPrefixMapKey] == nil {
fcm.dstPrefixMap[unspecifiedPrefixMapKey] = &destPrefixEntry{}
}
return fci.addFilterChainsForServerNames(fci.dstPrefixMap[unspecifiedPrefixMapKey], fc)
return fcm.addFilterChainsForServerNames(fcm.dstPrefixMap[unspecifiedPrefixMapKey], fc)
}
for _, prefix := range dstPrefixes {
p := prefix.String()
if fci.dstPrefixMap[p] == nil {
fci.dstPrefixMap[p] = &destPrefixEntry{net: prefix}
if fcm.dstPrefixMap[p] == nil {
fcm.dstPrefixMap[p] = &destPrefixEntry{net: prefix}
}
if err := fci.addFilterChainsForServerNames(fci.dstPrefixMap[p], fc); err != nil {
if err := fcm.addFilterChainsForServerNames(fcm.dstPrefixMap[p], fc); err != nil {
return err
}
}
return nil
}
func (fci *FilterChainManager) addFilterChainsForServerNames(dstEntry *destPrefixEntry, fc *v3listenerpb.FilterChain) error {
func (fcm *FilterChainManager) addFilterChainsForServerNames(dstEntry *destPrefixEntry, fc *v3listenerpb.FilterChain) error {
// Filter chains specifying server names in their match criteria always fail
// a match at connection time. So, these filter chains can be dropped now.
if len(fc.GetFilterChainMatch().GetServerNames()) != 0 {
@ -354,10 +372,10 @@ func (fci *FilterChainManager) addFilterChainsForServerNames(dstEntry *destPrefi
return nil
}
return fci.addFilterChainsForTransportProtocols(dstEntry, fc)
return fcm.addFilterChainsForTransportProtocols(dstEntry, fc)
}
func (fci *FilterChainManager) addFilterChainsForTransportProtocols(dstEntry *destPrefixEntry, fc *v3listenerpb.FilterChain) error {
func (fcm *FilterChainManager) addFilterChainsForTransportProtocols(dstEntry *destPrefixEntry, fc *v3listenerpb.FilterChain) error {
tp := fc.GetFilterChainMatch().GetTransportProtocol()
switch {
case tp != "" && tp != "raw_buffer":
@ -378,21 +396,21 @@ func (fci *FilterChainManager) addFilterChainsForTransportProtocols(dstEntry *de
dstEntry.rawBufferSeen = true
dstEntry.srcTypeArr = sourceTypesArray{}
}
return fci.addFilterChainsForApplicationProtocols(dstEntry, fc)
return fcm.addFilterChainsForApplicationProtocols(dstEntry, fc)
}
func (fci *FilterChainManager) addFilterChainsForApplicationProtocols(dstEntry *destPrefixEntry, fc *v3listenerpb.FilterChain) error {
func (fcm *FilterChainManager) addFilterChainsForApplicationProtocols(dstEntry *destPrefixEntry, fc *v3listenerpb.FilterChain) error {
if len(fc.GetFilterChainMatch().GetApplicationProtocols()) != 0 {
logger.Warningf("Dropping filter chain %+v since it contains unsupported application_protocols match field", fc)
return nil
}
return fci.addFilterChainsForSourceType(dstEntry, fc)
return fcm.addFilterChainsForSourceType(dstEntry, fc)
}
// addFilterChainsForSourceType adds source types to the internal data
// structures and delegates control to addFilterChainsForSourcePrefixes to
// continue building the internal data structure.
func (fci *FilterChainManager) addFilterChainsForSourceType(dstEntry *destPrefixEntry, fc *v3listenerpb.FilterChain) error {
func (fcm *FilterChainManager) addFilterChainsForSourceType(dstEntry *destPrefixEntry, fc *v3listenerpb.FilterChain) error {
var srcType SourceType
switch st := fc.GetFilterChainMatch().GetSourceType(); st {
case v3listenerpb.FilterChainMatch_ANY:
@ -409,13 +427,13 @@ func (fci *FilterChainManager) addFilterChainsForSourceType(dstEntry *destPrefix
if dstEntry.srcTypeArr[st] == nil {
dstEntry.srcTypeArr[st] = &sourcePrefixes{srcPrefixMap: make(map[string]*sourcePrefixEntry)}
}
return fci.addFilterChainsForSourcePrefixes(dstEntry.srcTypeArr[st].srcPrefixMap, fc)
return fcm.addFilterChainsForSourcePrefixes(dstEntry.srcTypeArr[st].srcPrefixMap, fc)
}
// addFilterChainsForSourcePrefixes adds source prefixes to the internal data
// structures and delegates control to addFilterChainsForSourcePorts to continue
// building the internal data structure.
func (fci *FilterChainManager) addFilterChainsForSourcePrefixes(srcPrefixMap map[string]*sourcePrefixEntry, fc *v3listenerpb.FilterChain) error {
func (fcm *FilterChainManager) addFilterChainsForSourcePrefixes(srcPrefixMap map[string]*sourcePrefixEntry, fc *v3listenerpb.FilterChain) error {
ranges := fc.GetFilterChainMatch().GetSourcePrefixRanges()
srcPrefixes := make([]*net.IPNet, 0, len(ranges))
for _, pr := range fc.GetFilterChainMatch().GetSourcePrefixRanges() {
@ -435,7 +453,7 @@ func (fci *FilterChainManager) addFilterChainsForSourcePrefixes(srcPrefixMap map
srcPortMap: make(map[int]*FilterChain),
}
}
return fci.addFilterChainsForSourcePorts(srcPrefixMap[unspecifiedPrefixMapKey], fc)
return fcm.addFilterChainsForSourcePorts(srcPrefixMap[unspecifiedPrefixMapKey], fc)
}
for _, prefix := range srcPrefixes {
p := prefix.String()
@ -445,7 +463,7 @@ func (fci *FilterChainManager) addFilterChainsForSourcePrefixes(srcPrefixMap map
srcPortMap: make(map[int]*FilterChain),
}
}
if err := fci.addFilterChainsForSourcePorts(srcPrefixMap[p], fc); err != nil {
if err := fcm.addFilterChainsForSourcePorts(srcPrefixMap[p], fc); err != nil {
return err
}
}
@ -456,14 +474,14 @@ func (fci *FilterChainManager) addFilterChainsForSourcePrefixes(srcPrefixMap map
// structures and completes the process of building the internal data structure.
// It is here that we determine if there are multiple filter chains with
// overlapping matching rules.
func (fci *FilterChainManager) addFilterChainsForSourcePorts(srcEntry *sourcePrefixEntry, fcProto *v3listenerpb.FilterChain) error {
func (fcm *FilterChainManager) addFilterChainsForSourcePorts(srcEntry *sourcePrefixEntry, fcProto *v3listenerpb.FilterChain) error {
ports := fcProto.GetFilterChainMatch().GetSourcePorts()
srcPorts := make([]int, 0, len(ports))
for _, port := range ports {
srcPorts = append(srcPorts, int(port))
}
fc, err := fci.filterChainFromProto(fcProto)
fc, err := fcm.filterChainFromProto(fcProto)
if err != nil {
return err
}
@ -474,6 +492,7 @@ func (fci *FilterChainManager) addFilterChainsForSourcePorts(srcEntry *sourcePre
return errors.New("multiple filter chains with overlapping matching rules are defined")
}
srcEntry.srcPortMap[0] = fc
fcm.fcs = append(fcm.fcs, fc)
return nil
}
for _, port := range srcPorts {
@ -482,13 +501,18 @@ func (fci *FilterChainManager) addFilterChainsForSourcePorts(srcEntry *sourcePre
}
srcEntry.srcPortMap[port] = fc
}
fcm.fcs = append(fcm.fcs, fc)
return nil
}
func (fcm *FilterChainManager) FilterChains() []*FilterChain {
return fcm.fcs
}
// filterChainFromProto extracts the relevant information from the FilterChain
// proto and stores it in our internal representation. It also persists any
// RouteNames which need to be queried dynamically via RDS.
func (fci *FilterChainManager) filterChainFromProto(fc *v3listenerpb.FilterChain) (*FilterChain, error) {
func (fcm *FilterChainManager) filterChainFromProto(fc *v3listenerpb.FilterChain) (*FilterChain, error) {
filterChain, err := processNetworkFilters(fc.GetFilters())
if err != nil {
return nil, err
@ -497,7 +521,7 @@ func (fci *FilterChainManager) filterChainFromProto(fc *v3listenerpb.FilterChain
// listener, which receives the LDS response, if specified for the filter
// chain.
if filterChain.RouteConfigName != "" {
fci.RouteConfigNames[filterChain.RouteConfigName] = true
fcm.RouteConfigNames[filterChain.RouteConfigName] = true
}
// If the transport_socket field is not specified, it means that the control
// plane has not sent us any security config. This is fine and the server
@ -550,8 +574,8 @@ func (fci *FilterChainManager) filterChainFromProto(fc *v3listenerpb.FilterChain
}
// Validate takes a function to validate the FilterChains in this manager.
func (fci *FilterChainManager) Validate(f func(fc *FilterChain) error) error {
for _, dst := range fci.dstPrefixMap {
func (fcm *FilterChainManager) Validate(f func(fc *FilterChain) error) error {
for _, dst := range fcm.dstPrefixMap {
for _, srcType := range dst.srcTypeArr {
if srcType == nil {
continue
@ -565,11 +589,15 @@ func (fci *FilterChainManager) Validate(f func(fc *FilterChain) error) error {
}
}
}
return f(fci.def)
return f(fcm.def)
}
func processNetworkFilters(filters []*v3listenerpb.Filter) (*FilterChain, error) {
filterChain := &FilterChain{}
rc := &UsableRouteConfiguration{}
filterChain := &FilterChain{
UsableRouteConfiguration: &atomic.Pointer[UsableRouteConfiguration]{},
}
filterChain.UsableRouteConfiguration.Store(rc)
seenNames := make(map[string]bool, len(filters))
seenHCM := false
for _, filter := range filters {
@ -687,11 +715,11 @@ type FilterChainLookupParams struct {
// Returns a non-nil error if no matching filter chain could be found or
// multiple matching filter chains were found, and in both cases, the incoming
// connection must be dropped.
func (fci *FilterChainManager) Lookup(params FilterChainLookupParams) (*FilterChain, error) {
dstPrefixes := filterByDestinationPrefixes(fci.dstPrefixes, params.IsUnspecifiedListener, params.DestAddr)
func (fcm *FilterChainManager) Lookup(params FilterChainLookupParams) (*FilterChain, error) {
dstPrefixes := filterByDestinationPrefixes(fcm.dstPrefixes, params.IsUnspecifiedListener, params.DestAddr)
if len(dstPrefixes) == 0 {
if fci.def != nil {
return fci.def, nil
if fcm.def != nil {
return fcm.def, nil
}
return nil, fmt.Errorf("no matching filter chain based on destination prefix match for %+v", params)
}
@ -702,8 +730,8 @@ func (fci *FilterChainManager) Lookup(params FilterChainLookupParams) (*FilterCh
}
srcPrefixes := filterBySourceType(dstPrefixes, srcType)
if len(srcPrefixes) == 0 {
if fci.def != nil {
return fci.def, nil
if fcm.def != nil {
return fcm.def, nil
}
return nil, fmt.Errorf("no matching filter chain based on source type match for %+v", params)
}
@ -714,8 +742,8 @@ func (fci *FilterChainManager) Lookup(params FilterChainLookupParams) (*FilterCh
if fc := filterBySourcePorts(srcPrefixEntry, params.SourcePort); fc != nil {
return fc, nil
}
if fci.def != nil {
return fci.def, nil
if fcm.def != nil {
return fcm.def, nil
}
return nil, fmt.Errorf("no matching filter chain after all match criteria for %+v", params)
}

View File

@ -2735,14 +2735,14 @@ func (s) TestHTTPFilterInstantiation(t *testing.T) {
fc := FilterChain{
HTTPFilters: test.filters,
}
vhswi, err := fc.ConstructUsableRouteConfiguration(test.routeConfig)
if err != nil {
t.Fatalf("Error constructing usable route configuration: %v", err)
urc := fc.ConstructUsableRouteConfiguration(test.routeConfig)
if urc.Err != nil {
t.Fatalf("Error constructing usable route configuration: %v", urc.Err)
}
// Build out list of errors by iterating through the virtual hosts and routes,
// and running the filters in route configurations.
var errs []string
for _, vh := range vhswi {
for _, vh := range urc.VHS {
for _, r := range vh.Routes {
for _, int := range r.Interceptors {
errs = append(errs, int.AllowRPC(context.Background()).Error())
@ -2759,25 +2759,44 @@ func (s) TestHTTPFilterInstantiation(t *testing.T) {
// The Equal() methods defined below help with using cmp.Equal() on these types
// which contain all unexported fields.
func (fci *FilterChainManager) Equal(other *FilterChainManager) bool {
if (fci == nil) != (other == nil) {
func (fcm *FilterChainManager) Equal(other *FilterChainManager) bool {
if (fcm == nil) != (other == nil) {
return false
}
if fci == nil {
if fcm == nil {
return true
}
switch {
case !cmp.Equal(fci.dstPrefixMap, other.dstPrefixMap, cmpopts.EquateEmpty()):
case !cmp.Equal(fcm.dstPrefixMap, other.dstPrefixMap, cmpopts.EquateEmpty()):
return false
// TODO: Support comparing dstPrefixes slice?
case !cmp.Equal(fci.def, other.def, cmpopts.EquateEmpty(), protocmp.Transform()):
case !cmp.Equal(fcm.def, other.def, cmpopts.EquateEmpty(), protocmp.Transform()):
return false
case !cmp.Equal(fci.RouteConfigNames, other.RouteConfigNames, cmpopts.EquateEmpty()):
case !cmp.Equal(fcm.RouteConfigNames, other.RouteConfigNames, cmpopts.EquateEmpty()):
return false
}
return true
}
func (fc *FilterChain) Equal(other *FilterChain) bool {
if (fc == nil) != (other == nil) {
return false
}
if fc == nil {
return true
}
if !cmp.Equal(fc.SecurityCfg, other.SecurityCfg, cmpopts.EquateEmpty()) {
return false
}
if !cmp.Equal(fc.RouteConfigName, other.RouteConfigName) {
return false
}
if !cmp.Equal(fc.HTTPFilters, other.HTTPFilters, cmpopts.EquateEmpty(), protocmp.Transform()) {
return false
}
return cmp.Equal(fc.InlineRouteConfig, other.InlineRouteConfig, cmpopts.EquateEmpty())
}
func (dpe *destPrefixEntry) Equal(other *destPrefixEntry) bool {
if (dpe == nil) != (other == nil) {
return false
@ -2826,28 +2845,28 @@ func (spe *sourcePrefixEntry) Equal(other *sourcePrefixEntry) bool {
// The String() methods defined below help with debugging test failures as the
// regular %v or %+v formatting directives do not expands pointer fields inside
// structs, and these types have a lot of pointers pointing to other structs.
func (fci *FilterChainManager) String() string {
if fci == nil {
func (fcm *FilterChainManager) String() string {
if fcm == nil {
return ""
}
var sb strings.Builder
if fci.dstPrefixMap != nil {
if fcm.dstPrefixMap != nil {
sb.WriteString("destination_prefix_map: map {\n")
for k, v := range fci.dstPrefixMap {
for k, v := range fcm.dstPrefixMap {
sb.WriteString(fmt.Sprintf("%q: %v\n", k, v))
}
sb.WriteString("}\n")
}
if fci.dstPrefixes != nil {
if fcm.dstPrefixes != nil {
sb.WriteString("destination_prefixes: [")
for _, p := range fci.dstPrefixes {
for _, p := range fcm.dstPrefixes {
sb.WriteString(fmt.Sprintf("%v ", p))
}
sb.WriteString("]")
}
if fci.def != nil {
sb.WriteString(fmt.Sprintf("default_filter_chain: %+v ", fci.def))
if fcm.def != nil {
sb.WriteString(fmt.Sprintf("default_filter_chain: %+v ", fcm.def))
}
return sb.String()
}
@ -2908,11 +2927,11 @@ func (spe *sourcePrefixEntry) String() string {
return sb.String()
}
func (f *FilterChain) String() string {
if f == nil || f.SecurityCfg == nil {
func (fc *FilterChain) String() string {
if fc == nil || fc.SecurityCfg == nil {
return ""
}
return fmt.Sprintf("security_config: %v", f.SecurityCfg)
return fmt.Sprintf("security_config: %v", fc.SecurityCfg)
}
func ipNetFromCIDR(cidr string) *net.IPNet {

View File

@ -25,18 +25,23 @@
package xdsresource
import (
"google.golang.org/grpc/xds/internal"
"fmt"
"google.golang.org/grpc/internal"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
"google.golang.org/protobuf/types/known/anypb"
)
func init() {
internal.ResourceTypeMapForTesting = make(map[string]any)
internal.ResourceTypeMapForTesting[version.V3ListenerURL] = listenerType
internal.ResourceTypeMapForTesting[version.V3RouteConfigURL] = routeConfigType
internal.ResourceTypeMapForTesting[version.V3ClusterURL] = clusterType
internal.ResourceTypeMapForTesting[version.V3EndpointsURL] = endpointsType
xdsinternal.ResourceTypeMapForTesting = make(map[string]any)
xdsinternal.ResourceTypeMapForTesting[version.V3ListenerURL] = listenerType
xdsinternal.ResourceTypeMapForTesting[version.V3RouteConfigURL] = routeConfigType
xdsinternal.ResourceTypeMapForTesting[version.V3ClusterURL] = clusterType
xdsinternal.ResourceTypeMapForTesting[version.V3EndpointsURL] = endpointsType
internal.TriggerXDSResourceNameNotFoundForTesting = triggerResourceNotFoundForTesting
}
// Producer contains a single method to discover resource configuration from a
@ -162,3 +167,20 @@ func (r resourceTypeState) TypeName() string {
func (r resourceTypeState) AllResourcesRequiredInSotW() bool {
return r.allResourcesRequiredInSotW
}
func triggerResourceNotFoundForTesting(cb func(Type, string) error, typeName, resourceName string) error {
var typ Type
switch typeName {
case ListenerResourceTypeName:
typ = listenerType
case RouteConfigTypeName:
typ = routeConfigType
case ClusterResourceTypeName:
typ = clusterType
case EndpointsResourceTypeName:
typ = endpointsType
default:
return fmt.Errorf("unknown type name %q", typeName)
}
return cb(typ, resourceName)
}

View File

@ -27,9 +27,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/buffer"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
iresolver "google.golang.org/grpc/internal/resolver"
@ -52,9 +49,6 @@ var (
newGRPCServer = func(opts ...grpc.ServerOption) grpcServer {
return grpc.NewServer(opts...)
}
drainServerTransports = internal.DrainServerTransports.(func(*grpc.Server, string))
logger = grpclog.Component("xds")
)
// grpcServer contains methods from grpc.Server which are used by the
@ -199,93 +193,22 @@ func (s *GRPCServer) Serve(lis net.Listener) error {
cfg := s.xdsC.BootstrapConfig()
name := bootstrap.PopulateResourceTemplate(cfg.ServerListenerResourceNameTemplate, lis.Addr().String())
modeUpdateCh := buffer.NewUnbounded()
go func() {
s.handleServingModeChanges(modeUpdateCh)
}()
// Create a listenerWrapper which handles all functionality required by
// this particular instance of Serve().
lw, goodUpdateCh := server.NewListenerWrapper(server.ListenerWrapperParams{
lw := server.NewListenerWrapper(server.ListenerWrapperParams{
Listener: lis,
ListenerResourceName: name,
XDSClient: s.xdsC,
ModeCallback: func(addr net.Addr, mode connectivity.ServingMode, err error) {
modeUpdateCh.Put(&modeChangeArgs{
addr: addr,
mode: mode,
err: err,
s.opts.modeCallback(addr, ServingModeChangeArgs{
Mode: mode,
Err: err,
})
},
DrainCallback: func(addr net.Addr) {
if gs, ok := s.gs.(*grpc.Server); ok {
drainServerTransports(gs, addr.String())
}
},
})
// Block until a good LDS response is received or the server is stopped.
select {
case <-s.quit.Done():
// Since the listener has not yet been handed over to gs.Serve(), we
// need to explicitly close the listener. Cancellation of the xDS watch
// is handled by the listenerWrapper.
lw.Close()
modeUpdateCh.Close()
return nil
case <-goodUpdateCh:
}
return s.gs.Serve(lw)
}
// modeChangeArgs wraps argument required for invoking mode change callback.
type modeChangeArgs struct {
addr net.Addr
mode connectivity.ServingMode
err error
}
// handleServingModeChanges runs as a separate goroutine, spawned from Serve().
// It reads a channel on to which mode change arguments are pushed, and in turn
// invokes the user registered callback. It also calls an internal method on the
// underlying grpc.Server to gracefully close existing connections, if the
// listener moved to a "not-serving" mode.
func (s *GRPCServer) handleServingModeChanges(updateCh *buffer.Unbounded) {
for {
select {
case <-s.quit.Done():
return
case u, ok := <-updateCh.Get():
if !ok {
return
}
updateCh.Load()
args := u.(*modeChangeArgs)
if args.mode == connectivity.ServingModeNotServing {
// We type assert our underlying gRPC server to the real
// grpc.Server here before trying to initiate the drain
// operation. This approach avoids performing the same type
// assertion in the grpc package which provides the
// implementation for internal.GetServerCredentials, and allows
// us to use a fake gRPC server in tests.
if gs, ok := s.gs.(*grpc.Server); ok {
drainServerTransports(gs, args.addr.String())
}
}
// The XdsServer API will allow applications to register a "serving state"
// callback to be invoked when the server begins serving and when the
// server encounters errors that force it to be "not serving". If "not
// serving", the callback must be provided error information, for
// debugging use by developers - A36.
s.opts.modeCallback(args.addr, ServingModeChangeArgs{
Mode: args.mode,
Err: args.err,
})
}
}
}
// Stop stops the underlying gRPC server. It immediately closes all open
// connections. It cancels all active RPCs on the server side and the
// corresponding pending RPCs on the client side will get notified by connection
@ -315,11 +238,23 @@ func (s *GRPCServer) GracefulStop() {
func routeAndProcess(ctx context.Context) error {
conn := transport.GetConnection(ctx)
cw, ok := conn.(interface {
VirtualHosts() []xdsresource.VirtualHostWithInterceptors
UsableRouteConfiguration() xdsresource.UsableRouteConfiguration
})
if !ok {
return errors.New("missing virtual hosts in incoming context")
}
rc := cw.UsableRouteConfiguration()
// Error out at routing l7 level with a status code UNAVAILABLE, represents
// an nack before usable route configuration or resource not found for RDS
// or error combining LDS + RDS (Shouldn't happen).
if rc.Err != nil {
if logger.V(2) {
logger.Infof("RPC on connection with xDS Configuration error: %v", rc.Err)
}
return status.Error(codes.Unavailable, "error from xDS configuration for matched route configuration")
}
mn, ok := grpc.Method(ctx)
if !ok {
return errors.New("missing method name in incoming context")
@ -332,7 +267,7 @@ func routeAndProcess(ctx context.Context) error {
// the RPC gets to this point, there will be a single, unambiguous authority
// present in the header map.
authority := md.Get(":authority")
vh := xdsresource.FindBestMatchingVirtualHostServer(authority[0], cw.VirtualHosts())
vh := xdsresource.FindBestMatchingVirtualHostServer(authority[0], rc.VHS)
if vh == nil {
return status.Error(codes.Unavailable, "the incoming RPC did not match a configured Virtual Host")
}

View File

@ -470,88 +470,6 @@ func (s) TestServeSuccess(t *testing.T) {
}
}
// TestServeWithStop tests the case where Stop() is called before an LDS update
// is received. This should cause Serve() to exit before calling Serve() on the
// underlying grpc.Server.
func (s) TestServeWithStop(t *testing.T) {
// Setup an xDS management server that pushes on a channel when an LDS
// request is received by it. It also blocks on the incoming LDS request
// until unblocked by the test.
ldsRequestCh := make(chan []string, 1)
ldsRequestBlockCh := make(chan struct{})
mgmtServer, nodeID, _, _, cancel := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
if req.GetTypeUrl() == version.V3ListenerURL {
select {
case ldsRequestCh <- req.GetResourceNames():
default:
}
<-ldsRequestBlockCh
}
return nil
},
})
defer cancel()
defer close(ldsRequestBlockCh)
// Override the function to create the underlying grpc.Server to allow the
// test to verify that Serve() is called on the underlying server.
fs := newFakeGRPCServer()
origNewGRPCServer := newGRPCServer
newGRPCServer = func(opts ...grpc.ServerOption) grpcServer { return fs }
defer func() { newGRPCServer = origNewGRPCServer }()
// Create a new xDS enabled gRPC server. Note that we are not deferring the
// Stop() here since we explicitly call it after the LDS watch has been
// registered.
server, err := NewGRPCServer(BootstrapContentsForTesting(generateBootstrapContents(t, nodeID, mgmtServer.Address)))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
// Call Serve() in a goroutine, and push on a channel when Serve returns.
lis, err := testutils.LocalTCPListener()
if err != nil {
server.Stop()
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
serveDone := testutils.NewChannel()
go func() {
if err := server.Serve(lis); err != nil {
t.Error(err)
}
serveDone.Send(nil)
}()
// Ensure that the LDS request is sent out for the expected name.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
var gotNames []string
select {
case gotNames = <-ldsRequestCh:
case <-ctx.Done():
t.Fatalf("Timeout when waiting for an LDS request to be sent out")
}
wantNames := []string{strings.Replace(e2e.ServerListenerResourceNameTemplate, "%s", lis.Addr().String(), -1)}
if !cmp.Equal(gotNames, wantNames) {
t.Fatalf("LDS watch registered for names %v, want %v", gotNames, wantNames)
}
// Call Stop() on the xDS enabled gRPC server before the management server
// can respond.
server.Stop()
if _, err := serveDone.Receive(ctx); err != nil {
t.Fatalf("Timeout when waiting for Serve() to exit: %v", err)
}
// Make sure that Serve() on the underlying grpc.Server is not called.
sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if _, err := fs.serveCh.Receive(sCtx); err != context.DeadlineExceeded {
t.Fatal("Serve() called on underlying grpc.Server")
}
}
// TestNewServer_ClientCreationFailure tests the case where the xDS client
// creation fails and verifies that the call to NewGRPCServer() fails.
func (s) TestNewServer_ClientCreationFailure(t *testing.T) {

View File

@ -31,6 +31,7 @@ import (
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
internaladmin "google.golang.org/grpc/internal/admin"
"google.golang.org/grpc/resolver"
@ -48,6 +49,8 @@ import (
v3statusgrpc "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
)
var logger = grpclog.Component("xds")
func init() {
internaladmin.AddService(func(registrar grpc.ServiceRegistrar) (func(), error) {
var grpcServer *grpc.Server