From 97c314341871cd9f4837a6ffb71bfd991a9ba04e Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 17 Mar 2022 10:34:45 -0700 Subject: [PATCH] xds/client: accept resources wrapped in discoverypb.Resource message (#5242) --- xds/internal/xdsclient/bootstrap/bootstrap.go | 3 +- .../xdsclient/bootstrap/bootstrap_test.go | 6 +- xds/internal/xdsclient/xdsresource/type.go | 17 ++++++ .../xdsclient/xdsresource/unmarshal_cds.go | 5 ++ .../xdsresource/unmarshal_cds_test.go | 31 +++++++++++ .../xdsclient/xdsresource/unmarshal_eds.go | 5 ++ .../xdsresource/unmarshal_eds_test.go | 37 +++++++++++++ .../xdsclient/xdsresource/unmarshal_lds.go | 5 ++ .../xdsresource/unmarshal_lds_test.go | 23 ++++++++ .../xdsclient/xdsresource/unmarshal_rds.go | 5 ++ .../xdsresource/unmarshal_rds_test.go | 55 +++++++++++++++++++ .../xdsclient/xdsresource/version/version.go | 2 + 12 files changed, 190 insertions(+), 4 deletions(-) diff --git a/xds/internal/xdsclient/bootstrap/bootstrap.go b/xds/internal/xdsclient/bootstrap/bootstrap.go index 4523a6131..97fe4a8b0 100644 --- a/xds/internal/xdsclient/bootstrap/bootstrap.go +++ b/xds/internal/xdsclient/bootstrap/bootstrap.go @@ -53,6 +53,7 @@ const ( gRPCUserAgentName = "gRPC Go" clientFeatureNoOverprovisioning = "envoy.lb.does_not_support_overprovisioning" + clientFeatureResourceWrapper = "xds.config.resource-in-sotw" ) func init() { @@ -499,7 +500,7 @@ func (c *Config) updateNodeProto(node *v3corepb.Node) error { } v3.UserAgentName = gRPCUserAgentName v3.UserAgentVersionType = &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version} - v3.ClientFeatures = append(v3.ClientFeatures, clientFeatureNoOverprovisioning) + v3.ClientFeatures = append(v3.ClientFeatures, clientFeatureNoOverprovisioning, clientFeatureResourceWrapper) v3bytes, err := proto.Marshal(v3) if err != nil { diff --git a/xds/internal/xdsclient/bootstrap/bootstrap_test.go b/xds/internal/xdsclient/bootstrap/bootstrap_test.go index 36b4302c8..6aa047d6d 100644 --- a/xds/internal/xdsclient/bootstrap/bootstrap_test.go +++ b/xds/internal/xdsclient/bootstrap/bootstrap_test.go @@ -200,14 +200,14 @@ var ( BuildVersion: gRPCVersion, UserAgentName: gRPCUserAgentName, UserAgentVersionType: &v2corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version}, - ClientFeatures: []string{clientFeatureNoOverprovisioning}, + ClientFeatures: []string{clientFeatureNoOverprovisioning, clientFeatureResourceWrapper}, } v3NodeProto = &v3corepb.Node{ Id: "ENVOY_NODE_ID", Metadata: metadata, UserAgentName: gRPCUserAgentName, UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version}, - ClientFeatures: []string{clientFeatureNoOverprovisioning}, + ClientFeatures: []string{clientFeatureNoOverprovisioning, clientFeatureResourceWrapper}, } nilCredsConfigV2 = &Config{ XDSServer: &ServerConfig{ @@ -401,7 +401,7 @@ func TestNewConfigV2ProtoSuccess(t *testing.T) { BuildVersion: gRPCVersion, UserAgentName: gRPCUserAgentName, UserAgentVersionType: &v2corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version}, - ClientFeatures: []string{clientFeatureNoOverprovisioning}, + ClientFeatures: []string{clientFeatureNoOverprovisioning, clientFeatureResourceWrapper}, }, }, ClientDefaultListenerResourceNameTemplate: "%s", diff --git a/xds/internal/xdsclient/xdsresource/type.go b/xds/internal/xdsclient/xdsresource/type.go index c64f7c609..faf34f98e 100644 --- a/xds/internal/xdsclient/xdsresource/type.go +++ b/xds/internal/xdsclient/xdsresource/type.go @@ -20,6 +20,8 @@ package xdsresource import ( "time" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/golang/protobuf/proto" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" "google.golang.org/protobuf/types/known/anypb" ) @@ -76,6 +78,21 @@ func IsEndpointsResource(url string) bool { return url == version.V2EndpointsURL || url == version.V3EndpointsURL } +// unwrapResource unwraps and returns the inner resource if it's in a resource +// wrapper. The original resource is returned if it's not wrapped. +func unwrapResource(r *anypb.Any) (*anypb.Any, error) { + url := r.GetTypeUrl() + if url != version.V2ResourceWrapperURL && url != version.V3ResourceWrapperURL { + // Not wrapped. + return r, nil + } + inner := &v3discoverypb.Resource{} + if err := proto.Unmarshal(r.GetValue(), inner); err != nil { + return nil, err + } + return inner.Resource, nil +} + // ServiceStatus is the status of the update. type ServiceStatus int diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_cds.go b/xds/internal/xdsclient/xdsresource/unmarshal_cds.go index eba78716e..572941efb 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_cds.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_cds.go @@ -51,6 +51,11 @@ func UnmarshalCluster(opts *UnmarshalOptions) (map[string]ClusterUpdateErrTuple, } func unmarshalClusterResource(r *anypb.Any, f UpdateValidatorFunc, logger *grpclog.PrefixLogger) (string, ClusterUpdate, error) { + r, err := unwrapResource(r) + if err != nil { + return "", ClusterUpdate{}, fmt.Errorf("failed to unwrap resource: %v", err) + } + if !IsClusterResource(r.GetTypeUrl()) { return "", ClusterUpdate{}, fmt.Errorf("unexpected resource type: %q ", r.GetTypeUrl()) } diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go b/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go index 096c4fb0e..4569d1358 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/internal/envconfig" @@ -1517,6 +1518,21 @@ func (s) TestUnmarshalCluster(t *testing.T) { Version: testVersion, }, }, + { + name: "v2 cluster wrapped", + resources: []*anypb.Any{testutils.MarshalAny(&v2xdspb.Resource{Resource: v2ClusterAny})}, + wantUpdate: map[string]ClusterUpdateErrTuple{ + v2ClusterName: {Update: ClusterUpdate{ + ClusterName: v2ClusterName, + EDSServiceName: v2Service, LRSServerConfig: ClusterLRSServerSelf, + Raw: v2ClusterAny, + }}, + }, + wantMD: UpdateMetadata{ + Status: ServiceStatusACKed, + Version: testVersion, + }, + }, { name: "v3 cluster", resources: []*anypb.Any{v3ClusterAny}, @@ -1532,6 +1548,21 @@ func (s) TestUnmarshalCluster(t *testing.T) { Version: testVersion, }, }, + { + name: "v3 cluster wrapped", + resources: []*anypb.Any{testutils.MarshalAny(&v3discoverypb.Resource{Resource: v3ClusterAny})}, + wantUpdate: map[string]ClusterUpdateErrTuple{ + v3ClusterName: {Update: ClusterUpdate{ + ClusterName: v3ClusterName, + EDSServiceName: v3Service, LRSServerConfig: ClusterLRSServerSelf, + Raw: v3ClusterAny, + }}, + }, + wantMD: UpdateMetadata{ + Status: ServiceStatusACKed, + Version: testVersion, + }, + }, { name: "v3 cluster with EDS config source self", resources: []*anypb.Any{v3ClusterAnyWithEDSConfigSourceSelf}, diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_eds.go b/xds/internal/xdsclient/xdsresource/unmarshal_eds.go index f1774deda..147870cdf 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_eds.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_eds.go @@ -42,6 +42,11 @@ func UnmarshalEndpoints(opts *UnmarshalOptions) (map[string]EndpointsUpdateErrTu } func unmarshalEndpointsResource(r *anypb.Any, logger *grpclog.PrefixLogger) (string, EndpointsUpdate, error) { + r, err := unwrapResource(r) + if err != nil { + return "", EndpointsUpdate{}, fmt.Errorf("failed to unwrap resource: %v", err) + } + if !IsEndpointsResource(r.GetTypeUrl()) { return "", EndpointsUpdate{}, fmt.Errorf("unexpected resource type: %q ", r.GetTypeUrl()) } diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go b/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go index 324d7d250..5c6118d4e 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go @@ -25,6 +25,7 @@ import ( v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" v3typepb "github.com/envoyproxy/go-control-plane/envoy/type/v3" anypb "github.com/golang/protobuf/ptypes/any" wrapperspb "github.com/golang/protobuf/ptypes/wrappers" @@ -227,6 +228,42 @@ func (s) TestUnmarshalEndpoints(t *testing.T) { Version: testVersion, }, }, + { + name: "v3 endpoints wrapped", + resources: []*anypb.Any{testutils.MarshalAny(&v3discoverypb.Resource{Resource: v3EndpointsAny})}, + wantUpdate: map[string]EndpointsUpdateErrTuple{ + "test": {Update: EndpointsUpdate{ + Drops: nil, + Localities: []Locality{ + { + Endpoints: []Endpoint{{ + Address: "addr1:314", + HealthStatus: EndpointHealthStatusUnhealthy, + Weight: 271, + }}, + ID: internal.LocalityID{SubZone: "locality-1"}, + Priority: 1, + Weight: 1, + }, + { + Endpoints: []Endpoint{{ + Address: "addr2:159", + HealthStatus: EndpointHealthStatusDraining, + Weight: 828, + }}, + ID: internal.LocalityID{SubZone: "locality-2"}, + Priority: 0, + Weight: 1, + }, + }, + Raw: v3EndpointsAny, + }}, + }, + wantMD: UpdateMetadata{ + Status: ServiceStatusACKed, + Version: testVersion, + }, + }, { // To test that unmarshal keeps processing on errors. name: "good and bad endpoints", diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_lds.go b/xds/internal/xdsclient/xdsresource/unmarshal_lds.go index b259c7b87..2e59c0605 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_lds.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_lds.go @@ -46,6 +46,11 @@ func UnmarshalListener(opts *UnmarshalOptions) (map[string]ListenerUpdateErrTupl } func unmarshalListenerResource(r *anypb.Any, f UpdateValidatorFunc, logger *grpclog.PrefixLogger) (string, ListenerUpdate, error) { + r, err := unwrapResource(r) + if err != nil { + return "", ListenerUpdate{}, fmt.Errorf("failed to unwrap resource: %v", err) + } + if !IsListenerResource(r.GetTypeUrl()) { return "", ListenerUpdate{}, fmt.Errorf("unexpected resource type: %q ", r.GetTypeUrl()) } diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_lds_test.go b/xds/internal/xdsclient/xdsresource/unmarshal_lds_test.go index 4444421a4..9150d64df 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_lds_test.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_lds_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/golang/protobuf/proto" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -605,6 +606,17 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { Version: testVersion, }, }, + { + name: "v2 listener resource wrapped", + resources: []*anypb.Any{testutils.MarshalAny(&v2xdspb.Resource{Resource: v2Lis})}, + wantUpdate: map[string]ListenerUpdateErrTuple{ + v2LDSTarget: {Update: ListenerUpdate{RouteConfigName: v2RouteConfigName, Raw: v2Lis}}, + }, + wantMD: UpdateMetadata{ + Status: ServiceStatusACKed, + Version: testVersion, + }, + }, { name: "v3 listener resource", resources: []*anypb.Any{v3LisWithFilters()}, @@ -616,6 +628,17 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { Version: testVersion, }, }, + { + name: "v3 listener resource wrapped", + resources: []*anypb.Any{testutils.MarshalAny(&v3discoverypb.Resource{Resource: v3LisWithFilters()})}, + wantUpdate: map[string]ListenerUpdateErrTuple{ + v3LDSTarget: {Update: ListenerUpdate{RouteConfigName: v3RouteConfigName, MaxStreamDuration: time.Second, HTTPFilters: routerFilterList, Raw: v3LisWithFilters()}}, + }, + wantMD: UpdateMetadata{ + Status: ServiceStatusACKed, + Version: testVersion, + }, + }, // "To allow equating RBAC's direct_remote_ip and // remote_ip...HttpConnectionManager.xff_num_trusted_hops must be unset // or zero and HttpConnectionManager.original_ip_detection_extensions diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_rds.go b/xds/internal/xdsclient/xdsresource/unmarshal_rds.go index f43b18292..12c3d560f 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_rds.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_rds.go @@ -46,6 +46,11 @@ func UnmarshalRouteConfig(opts *UnmarshalOptions) (map[string]RouteConfigUpdateE } func unmarshalRouteConfigResource(r *anypb.Any, logger *grpclog.PrefixLogger) (string, RouteConfigUpdate, error) { + r, err := unwrapResource(r) + if err != nil { + return "", RouteConfigUpdate{}, fmt.Errorf("failed to unwrap resource: %v", err) + } + if !IsRouteConfigResource(r.GetTypeUrl()) { return "", RouteConfigUpdate{}, fmt.Errorf("unexpected resource type: %q ", r.GetTypeUrl()) } diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_rds_test.go b/xds/internal/xdsclient/xdsresource/unmarshal_rds_test.go index a14d321b8..abae9ff09 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_rds_test.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_rds_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/golang/protobuf/proto" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -910,6 +911,33 @@ func (s) TestUnmarshalRouteConfig(t *testing.T) { Version: testVersion, }, }, + { + name: "v2 routeConfig resource wrapped", + resources: []*anypb.Any{testutils.MarshalAny(&v2xdspb.Resource{Resource: v2RouteConfig})}, + wantUpdate: map[string]RouteConfigUpdateErrTuple{ + v2RouteConfigName: {Update: RouteConfigUpdate{ + VirtualHosts: []*VirtualHost{ + { + Domains: []string{uninterestingDomain}, + Routes: []*Route{{Prefix: newStringP(""), + WeightedClusters: map[string]WeightedCluster{uninterestingClusterName: {Weight: 1}}, + ActionType: RouteActionRoute}}, + }, + { + Domains: []string{ldsTarget}, + Routes: []*Route{{Prefix: newStringP(""), + WeightedClusters: map[string]WeightedCluster{v2ClusterName: {Weight: 1}}, + ActionType: RouteActionRoute}}, + }, + }, + Raw: v2RouteConfig, + }}, + }, + wantMD: UpdateMetadata{ + Status: ServiceStatusACKed, + Version: testVersion, + }, + }, { name: "v3 routeConfig resource", resources: []*anypb.Any{v3RouteConfig}, @@ -937,6 +965,33 @@ func (s) TestUnmarshalRouteConfig(t *testing.T) { Version: testVersion, }, }, + { + name: "v3 routeConfig resource wrapped", + resources: []*anypb.Any{testutils.MarshalAny(&v3discoverypb.Resource{Resource: v3RouteConfig})}, + wantUpdate: map[string]RouteConfigUpdateErrTuple{ + v3RouteConfigName: {Update: RouteConfigUpdate{ + VirtualHosts: []*VirtualHost{ + { + Domains: []string{uninterestingDomain}, + Routes: []*Route{{Prefix: newStringP(""), + WeightedClusters: map[string]WeightedCluster{uninterestingClusterName: {Weight: 1}}, + ActionType: RouteActionRoute}}, + }, + { + Domains: []string{ldsTarget}, + Routes: []*Route{{Prefix: newStringP(""), + WeightedClusters: map[string]WeightedCluster{v3ClusterName: {Weight: 1}}, + ActionType: RouteActionRoute}}, + }, + }, + Raw: v3RouteConfig, + }}, + }, + wantMD: UpdateMetadata{ + Status: ServiceStatusACKed, + Version: testVersion, + }, + }, { name: "multiple routeConfig resources", resources: []*anypb.Any{v2RouteConfig, v3RouteConfig}, diff --git a/xds/internal/xdsclient/xdsresource/version/version.go b/xds/internal/xdsclient/xdsresource/version/version.go index edfa68762..2c4819abd 100644 --- a/xds/internal/xdsclient/xdsresource/version/version.go +++ b/xds/internal/xdsclient/xdsresource/version/version.go @@ -42,6 +42,7 @@ const ( V2ClusterType = "envoy.api.v2.Cluster" V2EndpointsType = "envoy.api.v2.ClusterLoadAssignment" + V2ResourceWrapperURL = googleapiPrefix + "envoy.api.v2.Resource" V2ListenerURL = googleapiPrefix + V2ListenerType V2RouteConfigURL = googleapiPrefix + V2RouteConfigType V2ClusterURL = googleapiPrefix + V2ClusterType @@ -53,6 +54,7 @@ const ( V3ClusterType = "envoy.config.cluster.v3.Cluster" V3EndpointsType = "envoy.config.endpoint.v3.ClusterLoadAssignment" + V3ResourceWrapperURL = googleapiPrefix + "envoy.service.discovery.v3.Resource" V3ListenerURL = googleapiPrefix + V3ListenerType V3RouteConfigURL = googleapiPrefix + V3RouteConfigType V3ClusterURL = googleapiPrefix + V3ClusterType