mirror of https://github.com/grpc/grpc-go.git
xds: Support server_listener_resource_name_template (#4233)
This commit is contained in:
parent
d5b628860d
commit
a45f13b160
|
@ -79,10 +79,12 @@ type Config struct {
|
|||
// CertProviderConfigs contains a mapping from certificate provider plugin
|
||||
// instance names to parsed buildable configs.
|
||||
CertProviderConfigs map[string]*certprovider.BuildableConfig
|
||||
// ServerResourceNameID contains the value to be used as the id in the
|
||||
// resource name used to fetch the Listener resource on the xDS-enabled gRPC
|
||||
// server.
|
||||
ServerResourceNameID string
|
||||
// ServerListenerResourceNameTemplate is a template for the name of the
|
||||
// Listener resource to subscribe to for a gRPC server. If the token `%s` is
|
||||
// present in the string, it will be replaced with the server's listening
|
||||
// "IP:port" (e.g., "0.0.0.0:8080", "[::]:8080"). For example, a value of
|
||||
// "example/resource/%s" could become "example/resource/0.0.0.0:8080".
|
||||
ServerListenerResourceNameTemplate string
|
||||
}
|
||||
|
||||
type channelCreds struct {
|
||||
|
@ -145,7 +147,7 @@ func bootstrapConfigFromEnvVariable() ([]byte, error) {
|
|||
// "config": { foo plugin config in JSON }
|
||||
// }
|
||||
// },
|
||||
// "grpc_server_resource_name_id": "grpc/server"
|
||||
// "server_listener_resource_name_template": "grpc/server?xds.resource.listening_address=%s"
|
||||
// }
|
||||
//
|
||||
// Currently, we support exactly one type of credential, which is
|
||||
|
@ -241,8 +243,8 @@ func NewConfig() (*Config, error) {
|
|||
configs[instance] = bc
|
||||
}
|
||||
config.CertProviderConfigs = configs
|
||||
case "grpc_server_resource_name_id":
|
||||
if err := json.Unmarshal(v, &config.ServerResourceNameID); err != nil {
|
||||
case "server_listener_resource_name_template":
|
||||
if err := json.Unmarshal(v, &config.ServerListenerResourceNameTemplate); err != nil {
|
||||
return nil, fmt.Errorf("xds: json.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -240,8 +240,8 @@ func (c *Config) compare(want *Config) error {
|
|||
if diff := cmp.Diff(want.NodeProto, c.NodeProto, cmp.Comparer(proto.Equal)); diff != "" {
|
||||
return fmt.Errorf("config.NodeProto diff (-want, +got):\n%s", diff)
|
||||
}
|
||||
if c.ServerResourceNameID != want.ServerResourceNameID {
|
||||
return fmt.Errorf("config.ServerResourceNameID is %q, want %q", c.ServerResourceNameID, want.ServerResourceNameID)
|
||||
if c.ServerListenerResourceNameTemplate != want.ServerListenerResourceNameTemplate {
|
||||
return fmt.Errorf("config.ServerListenerResourceNameTemplate is %q, want %q", c.ServerListenerResourceNameTemplate, want.ServerListenerResourceNameTemplate)
|
||||
}
|
||||
|
||||
// A vanilla cmp.Equal or cmp.Diff will not produce useful error message
|
||||
|
@ -711,9 +711,9 @@ func TestNewConfigWithCertificateProviders(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestNewConfigWithServerResourceNameID(t *testing.T) {
|
||||
func TestNewConfigWithServerListenerResourceNameTemplate(t *testing.T) {
|
||||
cancel := setupBootstrapOverride(map[string]string{
|
||||
"badServerResourceNameID": `
|
||||
"badServerListenerResourceNameTemplate:": `
|
||||
{
|
||||
"node": {
|
||||
"id": "ENVOY_NODE_ID",
|
||||
|
@ -727,9 +727,9 @@ func TestNewConfigWithServerResourceNameID(t *testing.T) {
|
|||
{ "type": "google_default" }
|
||||
]
|
||||
}],
|
||||
"grpc_server_resource_name_id": 123456789
|
||||
"server_listener_resource_name_template": 123456789
|
||||
}`,
|
||||
"goodServerResourceNameID": `
|
||||
"goodServerListenerResourceNameTemplate": `
|
||||
{
|
||||
"node": {
|
||||
"id": "ENVOY_NODE_ID",
|
||||
|
@ -743,7 +743,7 @@ func TestNewConfigWithServerResourceNameID(t *testing.T) {
|
|||
{ "type": "google_default" }
|
||||
]
|
||||
}],
|
||||
"grpc_server_resource_name_id": "grpc/server"
|
||||
"server_listener_resource_name_template": "grpc/server?xds.resource.listening_address=%s"
|
||||
}`,
|
||||
})
|
||||
defer cancel()
|
||||
|
@ -754,17 +754,17 @@ func TestNewConfigWithServerResourceNameID(t *testing.T) {
|
|||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "badServerResourceNameID",
|
||||
name: "badServerListenerResourceNameTemplate",
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "goodServerResourceNameID",
|
||||
name: "goodServerListenerResourceNameTemplate",
|
||||
wantConfig: &Config{
|
||||
BalancerName: "trafficdirector.googleapis.com:443",
|
||||
Creds: grpc.WithCredentialsBundle(google.NewComputeEngineCredentials()),
|
||||
TransportAPI: version.TransportV2,
|
||||
NodeProto: v2NodeProto,
|
||||
ServerResourceNameID: "grpc/server",
|
||||
BalancerName: "trafficdirector.googleapis.com:443",
|
||||
Creds: grpc.WithCredentialsBundle(google.NewComputeEngineCredentials()),
|
||||
TransportAPI: version.TransportV2,
|
||||
NodeProto: v2NodeProto,
|
||||
ServerListenerResourceNameTemplate: "grpc/server?xds.resource.listening_address=%s",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -203,6 +203,8 @@ type ListenerUpdate struct {
|
|||
// HTTPFilters is a list of HTTP filters (name, config) from the LDS
|
||||
// response.
|
||||
HTTPFilters []HTTPFilter
|
||||
// InboundListenerCfg contains inbound listener configuration.
|
||||
InboundListenerCfg *InboundListenerConfig
|
||||
|
||||
// Raw is the resource from the xds response.
|
||||
Raw *anypb.Any
|
||||
|
@ -221,6 +223,17 @@ type HTTPFilter struct {
|
|||
Config httpfilter.FilterConfig
|
||||
}
|
||||
|
||||
// InboundListenerConfig contains information about the inbound listener, i.e
|
||||
// the server-side listener.
|
||||
type InboundListenerConfig struct {
|
||||
// Address is the local address on which the inbound listener is expected to
|
||||
// accept incoming connections.
|
||||
Address string
|
||||
// Port is the local port on which the inbound listener is expected to
|
||||
// accept incoming connections.
|
||||
Port string
|
||||
}
|
||||
|
||||
func (lu *ListenerUpdate) String() string {
|
||||
return fmt.Sprintf("{RouteConfigName: %q, SecurityConfig: %+v", lu.RouteConfigName, lu.SecurityCfg)
|
||||
}
|
||||
|
|
|
@ -725,7 +725,7 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) {
|
|||
}
|
||||
|
||||
func (s) TestUnmarshalListener_ServerSide(t *testing.T) {
|
||||
const v3LDSTarget = "grpc/server?udpa.resource.listening_address=0.0.0.0:9999"
|
||||
const v3LDSTarget = "grpc/server?xds.resource.listening_address=0.0.0.0:9999"
|
||||
|
||||
var (
|
||||
listenerEmptyTransportSocket = &anypb.Any{
|
||||
|
@ -912,111 +912,6 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) {
|
|||
},
|
||||
wantErr: "no socket_address field in LDS response",
|
||||
},
|
||||
{
|
||||
name: "listener name does not match expected format",
|
||||
resources: []*anypb.Any{
|
||||
{
|
||||
TypeUrl: version.V3ListenerURL,
|
||||
Value: func() []byte {
|
||||
lis := &v3listenerpb.Listener{
|
||||
Name: "foo",
|
||||
Address: &v3corepb.Address{
|
||||
Address: &v3corepb.Address_SocketAddress{
|
||||
SocketAddress: &v3corepb.SocketAddress{
|
||||
Address: "0.0.0.0",
|
||||
PortSpecifier: &v3corepb.SocketAddress_PortValue{
|
||||
PortValue: 9999,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
mLis, _ := proto.Marshal(lis)
|
||||
return mLis
|
||||
}(),
|
||||
},
|
||||
},
|
||||
wantUpdate: map[string]ListenerUpdate{"foo": {}},
|
||||
wantMD: UpdateMetadata{
|
||||
Status: ServiceStatusNACKed,
|
||||
Version: testVersion,
|
||||
ErrState: &UpdateErrorMetadata{
|
||||
Version: testVersion,
|
||||
Err: errPlaceHolder,
|
||||
},
|
||||
},
|
||||
wantErr: "no host:port in name field of LDS response",
|
||||
},
|
||||
{
|
||||
name: "host mismatch",
|
||||
resources: []*anypb.Any{
|
||||
{
|
||||
TypeUrl: version.V3ListenerURL,
|
||||
Value: func() []byte {
|
||||
lis := &v3listenerpb.Listener{
|
||||
Name: v3LDSTarget,
|
||||
Address: &v3corepb.Address{
|
||||
Address: &v3corepb.Address_SocketAddress{
|
||||
SocketAddress: &v3corepb.SocketAddress{
|
||||
Address: "1.2.3.4",
|
||||
PortSpecifier: &v3corepb.SocketAddress_PortValue{
|
||||
PortValue: 9999,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
mLis, _ := proto.Marshal(lis)
|
||||
return mLis
|
||||
}(),
|
||||
},
|
||||
},
|
||||
wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}},
|
||||
wantMD: UpdateMetadata{
|
||||
Status: ServiceStatusNACKed,
|
||||
Version: testVersion,
|
||||
ErrState: &UpdateErrorMetadata{
|
||||
Version: testVersion,
|
||||
Err: errPlaceHolder,
|
||||
},
|
||||
},
|
||||
wantErr: "socket_address host does not match the one in name",
|
||||
},
|
||||
{
|
||||
name: "port mismatch",
|
||||
resources: []*anypb.Any{
|
||||
{
|
||||
TypeUrl: version.V3ListenerURL,
|
||||
Value: func() []byte {
|
||||
lis := &v3listenerpb.Listener{
|
||||
Name: v3LDSTarget,
|
||||
Address: &v3corepb.Address{
|
||||
Address: &v3corepb.Address_SocketAddress{
|
||||
SocketAddress: &v3corepb.SocketAddress{
|
||||
Address: "0.0.0.0",
|
||||
PortSpecifier: &v3corepb.SocketAddress_PortValue{
|
||||
PortValue: 1234,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
mLis, _ := proto.Marshal(lis)
|
||||
return mLis
|
||||
}(),
|
||||
},
|
||||
},
|
||||
wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}},
|
||||
wantMD: UpdateMetadata{
|
||||
Status: ServiceStatusNACKed,
|
||||
Version: testVersion,
|
||||
ErrState: &UpdateErrorMetadata{
|
||||
Version: testVersion,
|
||||
Err: errPlaceHolder,
|
||||
},
|
||||
},
|
||||
wantErr: "socket_address port does not match the one in name",
|
||||
},
|
||||
{
|
||||
name: "unexpected number of filter chains",
|
||||
resources: []*anypb.Any{
|
||||
|
@ -1314,7 +1209,13 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) {
|
|||
name: "empty transport socket",
|
||||
resources: []*anypb.Any{listenerEmptyTransportSocket},
|
||||
wantUpdate: map[string]ListenerUpdate{
|
||||
v3LDSTarget: {Raw: listenerEmptyTransportSocket},
|
||||
v3LDSTarget: {
|
||||
InboundListenerCfg: &InboundListenerConfig{
|
||||
Address: "0.0.0.0",
|
||||
Port: "9999",
|
||||
},
|
||||
Raw: listenerEmptyTransportSocket,
|
||||
},
|
||||
},
|
||||
wantMD: UpdateMetadata{
|
||||
Status: ServiceStatusACKed,
|
||||
|
@ -1446,6 +1347,10 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) {
|
|||
IdentityInstanceName: "identityPluginInstance",
|
||||
IdentityCertName: "identityCertName",
|
||||
},
|
||||
InboundListenerCfg: &InboundListenerConfig{
|
||||
Address: "0.0.0.0",
|
||||
Port: "9999",
|
||||
},
|
||||
Raw: listenerNoValidationContext,
|
||||
},
|
||||
},
|
||||
|
@ -1466,6 +1371,10 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) {
|
|||
IdentityCertName: "identityCertName",
|
||||
RequireClientCert: true,
|
||||
},
|
||||
InboundListenerCfg: &InboundListenerConfig{
|
||||
Address: "0.0.0.0",
|
||||
Port: "9999",
|
||||
},
|
||||
Raw: listenerWithValidationContext,
|
||||
},
|
||||
},
|
||||
|
|
|
@ -246,10 +246,6 @@ func processHTTPFilters(filters []*v3httppb.HttpFilter, server bool) ([]HTTPFilt
|
|||
}
|
||||
|
||||
func processServerSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) {
|
||||
// Make sure that an address encoded in the received listener resource, and
|
||||
// that it matches the one specified in the name. Listener names on the
|
||||
// server-side as in the following format:
|
||||
// grpc/server?udpa.resource.listening_address=IP:Port.
|
||||
addr := lis.GetAddress()
|
||||
if addr == nil {
|
||||
return nil, fmt.Errorf("no address field in LDS response: %+v", lis)
|
||||
|
@ -258,15 +254,11 @@ func processServerSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, err
|
|||
if sockAddr == nil {
|
||||
return nil, fmt.Errorf("no socket_address field in LDS response: %+v", lis)
|
||||
}
|
||||
host, port, err := getAddressFromName(lis.GetName())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("no host:port in name field of LDS response: %+v, error: %v", lis, err)
|
||||
}
|
||||
if h := sockAddr.GetAddress(); host != h {
|
||||
return nil, fmt.Errorf("socket_address host does not match the one in name. Got %q, want %q", h, host)
|
||||
}
|
||||
if p := strconv.Itoa(int(sockAddr.GetPortValue())); port != p {
|
||||
return nil, fmt.Errorf("socket_address port does not match the one in name. Got %q, want %q", p, port)
|
||||
lu := &ListenerUpdate{
|
||||
InboundListenerCfg: &InboundListenerConfig{
|
||||
Address: sockAddr.GetAddress(),
|
||||
Port: strconv.Itoa(int(sockAddr.GetPortValue())),
|
||||
},
|
||||
}
|
||||
|
||||
// Make sure the listener resource contains a single filter chain. We do not
|
||||
|
@ -283,7 +275,7 @@ func processServerSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, err
|
|||
// xdsCredentials.
|
||||
ts := fc.GetTransportSocket()
|
||||
if ts == nil {
|
||||
return &ListenerUpdate{}, nil
|
||||
return lu, nil
|
||||
}
|
||||
if name := ts.GetName(); name != transportSocketName {
|
||||
return nil, fmt.Errorf("transport_socket field has unexpected name: %s", name)
|
||||
|
@ -310,15 +302,9 @@ func processServerSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, err
|
|||
if sc.RequireClientCert && sc.RootInstanceName == "" {
|
||||
return nil, errors.New("security configuration on the server-side does not contain root certificate provider instance name, but require_client_cert field is set")
|
||||
}
|
||||
return &ListenerUpdate{SecurityCfg: sc}, nil
|
||||
}
|
||||
lu.SecurityCfg = sc
|
||||
|
||||
func getAddressFromName(name string) (host string, port string, err error) {
|
||||
parts := strings.SplitN(name, "udpa.resource.listening_address=", 2)
|
||||
if len(parts) != 2 {
|
||||
return "", "", fmt.Errorf("udpa.resource_listening_address not found in name: %v", name)
|
||||
}
|
||||
return net.SplitHostPort(parts[1])
|
||||
return lu, nil
|
||||
}
|
||||
|
||||
// UnmarshalRouteConfig processes resources received in an RDS response,
|
||||
|
|
|
@ -55,10 +55,9 @@ func clientSetup(t *testing.T) (*e2e.ManagementServer, string, uint32, func()) {
|
|||
|
||||
// Create a bootstrap file in a temporary directory.
|
||||
bootstrapCleanup, err := e2e.SetupBootstrapFile(e2e.BootstrapOptions{
|
||||
Version: e2e.TransportV3,
|
||||
NodeID: nodeID,
|
||||
ServerURI: fs.Address,
|
||||
ServerResourceNameID: "grpc/server",
|
||||
Version: e2e.TransportV3,
|
||||
NodeID: nodeID,
|
||||
ServerURI: fs.Address,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
|
|
@ -60,6 +60,9 @@ const (
|
|||
certFile = "cert.pem"
|
||||
keyFile = "key.pem"
|
||||
rootFile = "ca.pem"
|
||||
|
||||
// Template for server Listener resource name.
|
||||
serverListenerResourceNameTemplate = "grpc/server?xds.resource.listening_address=%s"
|
||||
)
|
||||
|
||||
func createTmpFile(t *testing.T, src, dst string) {
|
||||
|
@ -148,11 +151,11 @@ func commonSetup(t *testing.T) (*e2e.ManagementServer, string, net.Listener, fun
|
|||
|
||||
// Create a bootstrap file in a temporary directory.
|
||||
bootstrapCleanup, err := e2e.SetupBootstrapFile(e2e.BootstrapOptions{
|
||||
Version: e2e.TransportV3,
|
||||
NodeID: nodeID,
|
||||
ServerURI: fs.Address,
|
||||
CertificateProviders: cpc,
|
||||
ServerResourceNameID: "grpc/server",
|
||||
Version: e2e.TransportV3,
|
||||
NodeID: nodeID,
|
||||
ServerURI: fs.Address,
|
||||
CertificateProviders: cpc,
|
||||
ServerListenerResourceNameTemplate: serverListenerResourceNameTemplate,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -211,7 +214,7 @@ func listenerResourceWithoutSecurityConfig(t *testing.T, lis net.Listener) *v3li
|
|||
host, port := hostPortFromListener(t, lis)
|
||||
return &v3listenerpb.Listener{
|
||||
// This needs to match the name we are querying for.
|
||||
Name: fmt.Sprintf("grpc/server?udpa.resource.listening_address=%s", lis.Addr().String()),
|
||||
Name: fmt.Sprintf(serverListenerResourceNameTemplate, lis.Addr().String()),
|
||||
Address: &v3corepb.Address{
|
||||
Address: &v3corepb.Address_SocketAddress{
|
||||
SocketAddress: &v3corepb.SocketAddress{
|
||||
|
@ -237,7 +240,7 @@ func listenerResourceWithSecurityConfig(t *testing.T, lis net.Listener) *v3liste
|
|||
host, port := hostPortFromListener(t, lis)
|
||||
return &v3listenerpb.Listener{
|
||||
// This needs to match the name we are querying for.
|
||||
Name: fmt.Sprintf("grpc/server?udpa.resource.listening_address=%s", lis.Addr().String()),
|
||||
Name: fmt.Sprintf(serverListenerResourceNameTemplate, lis.Addr().String()),
|
||||
Address: &v3corepb.Address{
|
||||
Address: &v3corepb.Address_SocketAddress{
|
||||
SocketAddress: &v3corepb.SocketAddress{
|
||||
|
|
|
@ -46,8 +46,8 @@ type BootstrapOptions struct {
|
|||
NodeID string
|
||||
// ServerURI is the address of the management server.
|
||||
ServerURI string
|
||||
// ServerResourceNameID is the Listener resource name to fetch.
|
||||
ServerResourceNameID string
|
||||
// ServerListenerResourceNameTemplate is the Listener resource name to fetch.
|
||||
ServerListenerResourceNameTemplate string
|
||||
// CertificateProviders is the certificate providers configuration.
|
||||
CertificateProviders map[string]json.RawMessage
|
||||
}
|
||||
|
@ -79,8 +79,8 @@ func SetupBootstrapFile(opts BootstrapOptions) (func(), error) {
|
|||
Node: node{
|
||||
ID: opts.NodeID,
|
||||
},
|
||||
CertificateProviders: opts.CertificateProviders,
|
||||
GRPCServerResourceNameID: opts.ServerResourceNameID,
|
||||
CertificateProviders: opts.CertificateProviders,
|
||||
ServerListenerResourceNameTemplate: opts.ServerListenerResourceNameTemplate,
|
||||
}
|
||||
switch opts.Version {
|
||||
case TransportV2:
|
||||
|
@ -127,10 +127,10 @@ func DefaultFileWatcherConfig(certPath, keyPath, caPath string) map[string]json.
|
|||
}
|
||||
|
||||
type bootstrapConfig struct {
|
||||
XdsServers []server `json:"xds_servers,omitempty"`
|
||||
Node node `json:"node,omitempty"`
|
||||
CertificateProviders map[string]json.RawMessage `json:"certificate_providers,omitempty"`
|
||||
GRPCServerResourceNameID string `json:"grpc_server_resource_name_id,omitempty"`
|
||||
XdsServers []server `json:"xds_servers,omitempty"`
|
||||
Node node `json:"node,omitempty"`
|
||||
CertificateProviders map[string]json.RawMessage `json:"certificate_providers,omitempty"`
|
||||
ServerListenerResourceNameTemplate string `json:"server_listener_resource_name_template,omitempty"`
|
||||
}
|
||||
|
||||
type server struct {
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -220,17 +221,20 @@ func (s *GRPCServer) newListenerWrapper(lis net.Listener) (*listenerWrapper, err
|
|||
// or not.
|
||||
goodUpdate := grpcsync.NewEvent()
|
||||
|
||||
// The resource_name in the LDS request sent by the xDS-enabled gRPC server
|
||||
// is of the following format:
|
||||
// "/path/to/resource?udpa.resource.listening_address=IP:Port". The
|
||||
// `/path/to/resource` part of the name is sourced from the bootstrap config
|
||||
// field `grpc_server_resource_name_id`. If this field is not specified in
|
||||
// the bootstrap file, we will use a default of `grpc/server`.
|
||||
path := "grpc/server"
|
||||
if cfg := s.xdsC.BootstrapConfig(); cfg != nil && cfg.ServerResourceNameID != "" {
|
||||
path = cfg.ServerResourceNameID
|
||||
// The server listener resource name template from the bootstrap
|
||||
// configuration contains a template for the name of the Listener resource
|
||||
// to subscribe to for a gRPC server. If the token `%s` is present in the
|
||||
// string, it will be replaced with the server's listening "IP:port" (e.g.,
|
||||
// "0.0.0.0:8080", "[::]:8080"). The absence of a template will be treated
|
||||
// as an error since we do not have any default value for this.
|
||||
cfg := s.xdsC.BootstrapConfig()
|
||||
if cfg == nil || cfg.ServerListenerResourceNameTemplate == "" {
|
||||
return nil, errors.New("missing server_listener_resource_name_template in the bootstrap configuration")
|
||||
}
|
||||
name := cfg.ServerListenerResourceNameTemplate
|
||||
if strings.Contains(cfg.ServerListenerResourceNameTemplate, "%s") {
|
||||
name = strings.ReplaceAll(cfg.ServerListenerResourceNameTemplate, "%s", lis.Addr().String())
|
||||
}
|
||||
name := fmt.Sprintf("%s?udpa.resource.listening_address=%s", path, lis.Addr().String())
|
||||
|
||||
// Register an LDS watch using our xdsClient, and specify the listening
|
||||
// address as the resource name.
|
||||
|
@ -288,6 +292,38 @@ func (s *GRPCServer) handleListenerUpdate(update listenerUpdate) {
|
|||
}
|
||||
s.logger.Infof("Received update for resource %q: %+v", update.name, update.lds.String())
|
||||
|
||||
// Make sure that the socket address on the received Listener resource
|
||||
// matches the address of the net.Listener passed to us by the user. This
|
||||
// check is done here instead of at the xdsClient layer because of the
|
||||
// following couple of reasons:
|
||||
// - xdsClient cannot know the listening address of every listener in the
|
||||
// system, and hence cannot perform this check.
|
||||
// - this is a very context-dependent check and only the server has the
|
||||
// appropriate context to perform this check.
|
||||
//
|
||||
// What this means is that the xdsClient has ACKed a resource which is going
|
||||
// to push the server into a "not serving" state. This is not ideal, but
|
||||
// this is what we have decided to do. See gRPC A36 for more details.
|
||||
// TODO(easwars): Switch to "not serving" if the host:port does not match.
|
||||
lisAddr := update.lw.Listener.Addr().String()
|
||||
addr, port, err := net.SplitHostPort(lisAddr)
|
||||
if err != nil {
|
||||
// This is never expected to return a non-nil error since we have made
|
||||
// sure that the listener is a TCP listener at the beginning of Serve().
|
||||
// This is simply paranoia.
|
||||
s.logger.Warningf("Local listener address %q failed to parse as IP:port: %v", lisAddr, err)
|
||||
return
|
||||
}
|
||||
ilc := update.lds.InboundListenerCfg
|
||||
if ilc == nil {
|
||||
s.logger.Warningf("Missing host:port in Listener updates")
|
||||
return
|
||||
}
|
||||
if ilc.Address != addr || ilc.Port != port {
|
||||
s.logger.Warningf("Received host:port (%s:%d) in Listener update does not match local listening address: %s", ilc.Address, ilc.Port, lisAddr)
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.handleSecurityConfig(update.lds.SecurityCfg, update.lw); err != nil {
|
||||
s.logger.Warningf("Invalid security config update: %v", err)
|
||||
return
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"fmt"
|
||||
"net"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -40,9 +41,9 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
defaultTestTimeout = 5 * time.Second
|
||||
defaultTestShortTimeout = 10 * time.Millisecond
|
||||
testServerResourceNameID = "/path/to/resource"
|
||||
defaultTestTimeout = 5 * time.Second
|
||||
defaultTestShortTimeout = 10 * time.Millisecond
|
||||
testServerListenerResourceNameTemplate = "/path/to/resource/%s/%s"
|
||||
)
|
||||
|
||||
type s struct {
|
||||
|
@ -90,6 +91,14 @@ func newFakeGRPCServer() *fakeGRPCServer {
|
|||
}
|
||||
}
|
||||
|
||||
func splitHostPort(hostport string) (string, string) {
|
||||
addr, port, err := net.SplitHostPort(hostport)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("listener address %q does not parse: %v", hostport, err))
|
||||
}
|
||||
return addr, port
|
||||
}
|
||||
|
||||
func (s) TestNewServer(t *testing.T) {
|
||||
xdsCreds, err := xds.NewServerCredentials(xds.ServerOptions{FallbackCreds: insecure.NewCredentials()})
|
||||
if err != nil {
|
||||
|
@ -228,11 +237,11 @@ func setupOverrides() (*fakeGRPCServer, *testutils.Channel, *testutils.Channel,
|
|||
newXDSClient = func() (xdsClientInterface, error) {
|
||||
c := fakeclient.NewClient()
|
||||
c.SetBootstrapConfig(&bootstrap.Config{
|
||||
BalancerName: "dummyBalancer",
|
||||
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
NodeProto: xdstestutils.EmptyNodeProtoV3,
|
||||
ServerResourceNameID: testServerResourceNameID,
|
||||
CertProviderConfigs: certProviderConfigs,
|
||||
BalancerName: "dummyBalancer",
|
||||
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
NodeProto: xdstestutils.EmptyNodeProtoV3,
|
||||
ServerListenerResourceNameTemplate: testServerListenerResourceNameTemplate,
|
||||
CertProviderConfigs: certProviderConfigs,
|
||||
})
|
||||
clientCh.Send(c)
|
||||
return c, nil
|
||||
|
@ -267,10 +276,10 @@ func setupOverridesForXDSCreds(includeCertProviderCfg bool) (*testutils.Channel,
|
|||
newXDSClient = func() (xdsClientInterface, error) {
|
||||
c := fakeclient.NewClient()
|
||||
bc := &bootstrap.Config{
|
||||
BalancerName: "dummyBalancer",
|
||||
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
NodeProto: xdstestutils.EmptyNodeProtoV3,
|
||||
ServerResourceNameID: testServerResourceNameID,
|
||||
BalancerName: "dummyBalancer",
|
||||
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
NodeProto: xdstestutils.EmptyNodeProtoV3,
|
||||
ServerListenerResourceNameTemplate: testServerListenerResourceNameTemplate,
|
||||
}
|
||||
if includeCertProviderCfg {
|
||||
bc.CertProviderConfigs = certProviderConfigs
|
||||
|
@ -337,7 +346,7 @@ func (s) TestServeSuccess(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("error when waiting for a ListenerWatch: %v", err)
|
||||
}
|
||||
wantName := fmt.Sprintf("%s?udpa.resource.listening_address=%s", client.BootstrapConfig().ServerResourceNameID, lis.Addr().String())
|
||||
wantName := strings.ReplaceAll(testServerListenerResourceNameTemplate, "%s", lis.Addr().String())
|
||||
if name != wantName {
|
||||
t.Fatalf("LDS watch registered for name %q, want %q", name, wantName)
|
||||
}
|
||||
|
@ -353,10 +362,35 @@ func (s) TestServeSuccess(t *testing.T) {
|
|||
|
||||
// Push a good LDS response, and wait for Serve() to be invoked on the
|
||||
// underlying grpc.Server.
|
||||
client.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: "routeconfig"}, nil)
|
||||
addr, port := splitHostPort(lis.Addr().String())
|
||||
client.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{
|
||||
RouteConfigName: "routeconfig",
|
||||
InboundListenerCfg: &xdsclient.InboundListenerConfig{
|
||||
Address: addr,
|
||||
Port: port,
|
||||
},
|
||||
}, nil)
|
||||
if _, err := fs.serveCh.Receive(ctx); err != nil {
|
||||
t.Fatalf("error when waiting for Serve() to be invoked on the grpc.Server")
|
||||
}
|
||||
|
||||
// Push an update to the registered listener watch callback with a Listener
|
||||
// resource whose host:port does not match the actual listening address and
|
||||
// port. Serve() should not return and should continue to use the old state.
|
||||
//
|
||||
// This will change once we add start tracking serving state.
|
||||
client.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{
|
||||
RouteConfigName: "routeconfig",
|
||||
InboundListenerCfg: &xdsclient.InboundListenerConfig{
|
||||
Address: "10.20.30.40",
|
||||
Port: "666",
|
||||
},
|
||||
}, nil)
|
||||
sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
|
||||
defer sCancel()
|
||||
if _, err := serveDone.Receive(sCtx); err != context.DeadlineExceeded {
|
||||
t.Fatal("Serve() returned after a bad LDS response")
|
||||
}
|
||||
}
|
||||
|
||||
// TestServeWithStop tests the case where Stop() is called before an LDS update
|
||||
|
@ -399,7 +433,7 @@ func (s) TestServeWithStop(t *testing.T) {
|
|||
server.Stop()
|
||||
t.Fatalf("error when waiting for a ListenerWatch: %v", err)
|
||||
}
|
||||
wantName := fmt.Sprintf("%s?udpa.resource.listening_address=%s", client.BootstrapConfig().ServerResourceNameID, lis.Addr().String())
|
||||
wantName := strings.ReplaceAll(testServerListenerResourceNameTemplate, "%s", lis.Addr().String())
|
||||
if name != wantName {
|
||||
server.Stop()
|
||||
t.Fatalf("LDS watch registered for name %q, wantPrefix %q", name, wantName)
|
||||
|
@ -448,39 +482,79 @@ func (s) TestServeBootstrapFailure(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestServeBootstrapWithMissingCertProviders tests the case where the bootstrap
|
||||
// config does not contain certificate provider configuration, but xdsCreds are
|
||||
// passed to the server. Verifies that the call to Serve() fails.
|
||||
func (s) TestServeBootstrapWithMissingCertProviders(t *testing.T) {
|
||||
_, _, cleanup := setupOverridesForXDSCreds(false)
|
||||
defer cleanup()
|
||||
|
||||
xdsCreds, err := xds.NewServerCredentials(xds.ServerOptions{FallbackCreds: insecure.NewCredentials()})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create xds server credentials: %v", err)
|
||||
}
|
||||
server := NewGRPCServer(grpc.Creds(xdsCreds))
|
||||
defer server.Stop()
|
||||
|
||||
lis, err := xdstestutils.LocalTCPListener()
|
||||
if err != nil {
|
||||
t.Fatalf("xdstestutils.LocalTCPListener() failed: %v", err)
|
||||
// TestServeBootstrapConfigInvalid tests the cases where the bootstrap config
|
||||
// does not contain expected fields. Verifies that the call to Serve() fails.
|
||||
func (s) TestServeBootstrapConfigInvalid(t *testing.T) {
|
||||
tests := []struct {
|
||||
desc string
|
||||
bootstrapConfig *bootstrap.Config
|
||||
}{
|
||||
{
|
||||
desc: "bootstrap config is missing",
|
||||
bootstrapConfig: nil,
|
||||
},
|
||||
{
|
||||
desc: "certificate provider config is missing",
|
||||
bootstrapConfig: &bootstrap.Config{
|
||||
BalancerName: "dummyBalancer",
|
||||
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
NodeProto: xdstestutils.EmptyNodeProtoV3,
|
||||
ServerListenerResourceNameTemplate: testServerListenerResourceNameTemplate,
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "server_listener_resource_name_template is missing",
|
||||
bootstrapConfig: &bootstrap.Config{
|
||||
BalancerName: "dummyBalancer",
|
||||
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
NodeProto: xdstestutils.EmptyNodeProtoV3,
|
||||
CertProviderConfigs: certProviderConfigs,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
serveDone := testutils.NewChannel()
|
||||
go func() {
|
||||
err := server.Serve(lis)
|
||||
serveDone.Send(err)
|
||||
}()
|
||||
for _, test := range tests {
|
||||
t.Run(test.desc, func(t *testing.T) {
|
||||
// Override the xdsClient creation with one that returns a fake
|
||||
// xdsClient with the specified bootstrap configuration.
|
||||
clientCh := testutils.NewChannel()
|
||||
origNewXDSClient := newXDSClient
|
||||
newXDSClient = func() (xdsClientInterface, error) {
|
||||
c := fakeclient.NewClient()
|
||||
c.SetBootstrapConfig(test.bootstrapConfig)
|
||||
clientCh.Send(c)
|
||||
return c, nil
|
||||
}
|
||||
defer func() { newXDSClient = origNewXDSClient }()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
v, err := serveDone.Receive(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error when waiting for Serve() to exit: %v", err)
|
||||
}
|
||||
if err, ok := v.(error); !ok || err == nil {
|
||||
t.Fatal("Serve() did not exit with error")
|
||||
xdsCreds, err := xds.NewServerCredentials(xds.ServerOptions{FallbackCreds: insecure.NewCredentials()})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create xds server credentials: %v", err)
|
||||
}
|
||||
server := NewGRPCServer(grpc.Creds(xdsCreds))
|
||||
defer server.Stop()
|
||||
|
||||
lis, err := xdstestutils.LocalTCPListener()
|
||||
if err != nil {
|
||||
t.Fatalf("xdstestutils.LocalTCPListener() failed: %v", err)
|
||||
}
|
||||
|
||||
serveDone := testutils.NewChannel()
|
||||
go func() {
|
||||
err := server.Serve(lis)
|
||||
serveDone.Send(err)
|
||||
}()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
v, err := serveDone.Receive(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("error when waiting for Serve() to exit: %v", err)
|
||||
}
|
||||
if err, ok := v.(error); !ok || err == nil {
|
||||
t.Fatal("Serve() did not exit with error")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -556,7 +630,7 @@ func (s) TestHandleListenerUpdate_NoXDSCreds(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("error when waiting for a ListenerWatch: %v", err)
|
||||
}
|
||||
wantName := fmt.Sprintf("%s?udpa.resource.listening_address=%s", client.BootstrapConfig().ServerResourceNameID, lis.Addr().String())
|
||||
wantName := strings.ReplaceAll(testServerListenerResourceNameTemplate, "%s", lis.Addr().String())
|
||||
if name != wantName {
|
||||
t.Fatalf("LDS watch registered for name %q, want %q", name, wantName)
|
||||
}
|
||||
|
@ -564,6 +638,7 @@ func (s) TestHandleListenerUpdate_NoXDSCreds(t *testing.T) {
|
|||
// Push a good LDS response with security config, and wait for Serve() to be
|
||||
// invoked on the underlying grpc.Server. Also make sure that certificate
|
||||
// providers are not created.
|
||||
addr, port := splitHostPort(lis.Addr().String())
|
||||
client.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{
|
||||
RouteConfigName: "routeconfig",
|
||||
SecurityCfg: &xdsclient.SecurityConfig{
|
||||
|
@ -571,6 +646,10 @@ func (s) TestHandleListenerUpdate_NoXDSCreds(t *testing.T) {
|
|||
IdentityInstanceName: "default2",
|
||||
RequireClientCert: true,
|
||||
},
|
||||
InboundListenerCfg: &xdsclient.InboundListenerConfig{
|
||||
Address: addr,
|
||||
Port: port,
|
||||
},
|
||||
}, nil)
|
||||
if _, err := fs.serveCh.Receive(ctx); err != nil {
|
||||
t.Fatalf("error when waiting for Serve() to be invoked on the grpc.Server")
|
||||
|
@ -627,21 +706,14 @@ func (s) TestHandleListenerUpdate_ErrorUpdate(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("error when waiting for a ListenerWatch: %v", err)
|
||||
}
|
||||
wantName := fmt.Sprintf("%s?udpa.resource.listening_address=%s", client.BootstrapConfig().ServerResourceNameID, lis.Addr().String())
|
||||
wantName := strings.ReplaceAll(testServerListenerResourceNameTemplate, "%s", lis.Addr().String())
|
||||
if name != wantName {
|
||||
t.Fatalf("LDS watch registered for name %q, want %q", name, wantName)
|
||||
}
|
||||
|
||||
// Push an error to the registered listener watch callback and make sure
|
||||
// that Serve does not return.
|
||||
client.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{
|
||||
RouteConfigName: "routeconfig",
|
||||
SecurityCfg: &xdsclient.SecurityConfig{
|
||||
RootInstanceName: "default1",
|
||||
IdentityInstanceName: "default2",
|
||||
RequireClientCert: true,
|
||||
},
|
||||
}, errors.New("LDS error"))
|
||||
client.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{}, errors.New("LDS error"))
|
||||
sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
|
||||
defer sCancel()
|
||||
if _, err := serveDone.Receive(sCtx); err != context.DeadlineExceeded {
|
||||
|
@ -691,7 +763,7 @@ func (s) TestHandleListenerUpdate_ClosedListener(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("error when waiting for a ListenerWatch: %v", err)
|
||||
}
|
||||
wantName := fmt.Sprintf("%s?udpa.resource.listening_address=%s", client.BootstrapConfig().ServerResourceNameID, lis.Addr().String())
|
||||
wantName := strings.ReplaceAll(testServerListenerResourceNameTemplate, "%s", lis.Addr().String())
|
||||
if name != wantName {
|
||||
t.Fatalf("LDS watch registered for name %q, want %q", name, wantName)
|
||||
}
|
||||
|
@ -699,9 +771,14 @@ func (s) TestHandleListenerUpdate_ClosedListener(t *testing.T) {
|
|||
// Push a good update to the registered listener watch callback. This will
|
||||
// unblock the xds-enabled server which is waiting for a good listener
|
||||
// update before calling Serve() on the underlying grpc.Server.
|
||||
addr, port := splitHostPort(lis.Addr().String())
|
||||
client.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{
|
||||
RouteConfigName: "routeconfig",
|
||||
SecurityCfg: &xdsclient.SecurityConfig{IdentityInstanceName: "default2"},
|
||||
InboundListenerCfg: &xdsclient.InboundListenerConfig{
|
||||
Address: addr,
|
||||
Port: port,
|
||||
},
|
||||
}, nil)
|
||||
if _, err := providerCh.Receive(ctx); err != nil {
|
||||
t.Fatal("error when waiting for certificate provider to be created")
|
||||
|
@ -723,6 +800,10 @@ func (s) TestHandleListenerUpdate_ClosedListener(t *testing.T) {
|
|||
client.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{
|
||||
RouteConfigName: "routeconfig",
|
||||
SecurityCfg: &xdsclient.SecurityConfig{IdentityInstanceName: "default1"},
|
||||
InboundListenerCfg: &xdsclient.InboundListenerConfig{
|
||||
Address: addr,
|
||||
Port: port,
|
||||
},
|
||||
}, nil)
|
||||
sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
|
||||
defer sCancel()
|
||||
|
|
Loading…
Reference in New Issue