mirror of https://github.com/grpc/grpc-go.git
				
				
				
			xds: Add Cluster Specifier Plugin to xdsclient (#4967)
* xds: Add Cluster Specifier Plugin to xdsclient
This commit is contained in:
		
							parent
							
								
									dd767416a6
								
							
						
					
					
						commit
						d57363ab5d
					
				|  | @ -0,0 +1,67 @@ | |||
| /* | ||||
|  * | ||||
|  * Copyright 2021 gRPC authors. | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *     http://www.apache.org/licenses/LICENSE-2.0
 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  * | ||||
|  */ | ||||
| 
 | ||||
| // Package clusterspecifier contains the ClusterSpecifier interface and a registry for
 | ||||
| // storing and retrieving their implementations.
 | ||||
| package clusterspecifier | ||||
| 
 | ||||
| import ( | ||||
| 	"github.com/golang/protobuf/proto" | ||||
| ) | ||||
| 
 | ||||
| // BalancerConfig is the Go Native JSON representation of a balancer
 | ||||
| // configuration.
 | ||||
| type BalancerConfig []map[string]interface{} | ||||
| 
 | ||||
| // ClusterSpecifier defines the parsing functionality of a Cluster Specifier.
 | ||||
| type ClusterSpecifier interface { | ||||
| 	// TypeURLs are the proto message types supported by this
 | ||||
| 	// ClusterSpecifierPlugin. A ClusterSpecifierPlugin will be registered by
 | ||||
| 	// each of its supported message types.
 | ||||
| 	TypeURLs() []string | ||||
| 	// ParseClusterSpecifierConfig parses the provided configuration
 | ||||
| 	// proto.Message from the top level RDS configuration. The resulting
 | ||||
| 	// BalancerConfig will be used as configuration for a child LB Policy of the
 | ||||
| 	// Cluster Manager LB Policy.
 | ||||
| 	ParseClusterSpecifierConfig(proto.Message) (BalancerConfig, error) | ||||
| } | ||||
| 
 | ||||
| var ( | ||||
| 	// m is a map from scheme to filter.
 | ||||
| 	m = make(map[string]ClusterSpecifier) | ||||
| ) | ||||
| 
 | ||||
| // Register registers the ClusterSpecifierPlugin to the ClusterSpecifier map.
 | ||||
| // cs.TypeURLs() will be used as the types for this ClusterSpecifierPlugin.
 | ||||
| //
 | ||||
| // NOTE: this function must only be called during initialization time (i.e. in
 | ||||
| // an init() function), and is not thread-safe. If multiple cluster specifier
 | ||||
| // plugins are registered with the same type URL, the one registered last will
 | ||||
| // take effect.
 | ||||
| func Register(cs ClusterSpecifier) { | ||||
| 	for _, u := range cs.TypeURLs() { | ||||
| 		m[u] = cs | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // Get returns the ClusterSpecifier registered with typeURL.
 | ||||
| //
 | ||||
| // If no cluster specifier is registered with typeURL, nil will be returned.
 | ||||
| func Get(typeURL string) ClusterSpecifier { | ||||
| 	return m[typeURL] | ||||
| } | ||||
|  | @ -23,6 +23,7 @@ import ( | |||
| 
 | ||||
| 	"google.golang.org/grpc/codes" | ||||
| 	"google.golang.org/grpc/internal/xds/matcher" | ||||
| 	"google.golang.org/grpc/xds/internal/clusterspecifier" | ||||
| 	"google.golang.org/grpc/xds/internal/httpfilter" | ||||
| 	"google.golang.org/protobuf/types/known/anypb" | ||||
| ) | ||||
|  | @ -31,6 +32,9 @@ import ( | |||
| // of interest to the registered RDS watcher.
 | ||||
| type RouteConfigUpdate struct { | ||||
| 	VirtualHosts []*VirtualHost | ||||
| 	// ClusterSpecifierPlugins are the LB Configurations for any
 | ||||
| 	// ClusterSpecifierPlugins referenced by the Route Table.
 | ||||
| 	ClusterSpecifierPlugins map[string]clusterspecifier.BalancerConfig | ||||
| 	// Raw is the resource from the xds response.
 | ||||
| 	Raw *anypb.Any | ||||
| } | ||||
|  |  | |||
|  | @ -30,6 +30,7 @@ import ( | |||
| 	"google.golang.org/grpc/internal/grpclog" | ||||
| 	"google.golang.org/grpc/internal/pretty" | ||||
| 	"google.golang.org/grpc/internal/xds/env" | ||||
| 	"google.golang.org/grpc/xds/internal/clusterspecifier" | ||||
| 	"google.golang.org/grpc/xds/internal/version" | ||||
| 	"google.golang.org/protobuf/types/known/anypb" | ||||
| ) | ||||
|  | @ -82,11 +83,22 @@ func unmarshalRouteConfigResource(r *anypb.Any, logger *grpclog.PrefixLogger) (s | |||
| // we are looking for.
 | ||||
| func generateRDSUpdateFromRouteConfiguration(rc *v3routepb.RouteConfiguration, logger *grpclog.PrefixLogger, v2 bool) (RouteConfigUpdate, error) { | ||||
| 	vhs := make([]*VirtualHost, 0, len(rc.GetVirtualHosts())) | ||||
| 	csps, err := processClusterSpecifierPlugins(rc.ClusterSpecifierPlugins) | ||||
| 	if err != nil { | ||||
| 		return RouteConfigUpdate{}, fmt.Errorf("received route is invalid %v", err) | ||||
| 	} | ||||
| 	// cspNames represents all the cluster specifiers referenced by Route
 | ||||
| 	// Actions - any cluster specifiers not referenced by a Route Action can be
 | ||||
| 	// ignored and not emitted by the xdsclient.
 | ||||
| 	var cspNames = make(map[string]bool) | ||||
| 	for _, vh := range rc.GetVirtualHosts() { | ||||
| 		routes, err := routesProtoToSlice(vh.Routes, logger, v2) | ||||
| 		routes, cspNs, err := routesProtoToSlice(vh.Routes, csps, logger, v2) | ||||
| 		if err != nil { | ||||
| 			return RouteConfigUpdate{}, fmt.Errorf("received route is invalid: %v", err) | ||||
| 		} | ||||
| 		for n := range cspNs { | ||||
| 			cspNames[n] = true | ||||
| 		} | ||||
| 		rc, err := generateRetryConfig(vh.GetRetryPolicy()) | ||||
| 		if err != nil { | ||||
| 			return RouteConfigUpdate{}, fmt.Errorf("received route is invalid: %v", err) | ||||
|  | @ -105,7 +117,44 @@ func generateRDSUpdateFromRouteConfiguration(rc *v3routepb.RouteConfiguration, l | |||
| 		} | ||||
| 		vhs = append(vhs, vhOut) | ||||
| 	} | ||||
| 	return RouteConfigUpdate{VirtualHosts: vhs}, nil | ||||
| 
 | ||||
| 	// "For any entry in the RouteConfiguration.cluster_specifier_plugins not
 | ||||
| 	// referenced by an enclosed RouteAction's cluster_specifier_plugin, the xDS
 | ||||
| 	// client should not provide it to its consumers." - RLS in xDS Design
 | ||||
| 	for name := range csps { | ||||
| 		if !cspNames[name] { | ||||
| 			delete(csps, name) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return RouteConfigUpdate{VirtualHosts: vhs, ClusterSpecifierPlugins: csps}, nil | ||||
| } | ||||
| 
 | ||||
| func processClusterSpecifierPlugins(csps []*v3routepb.ClusterSpecifierPlugin) (map[string]clusterspecifier.BalancerConfig, error) { | ||||
| 	cspCfgs := make(map[string]clusterspecifier.BalancerConfig) | ||||
| 	// "The xDS client will inspect all elements of the
 | ||||
| 	// cluster_specifier_plugins field looking up a plugin based on the
 | ||||
| 	// extension.typed_config of each." - RLS in xDS design
 | ||||
| 	for _, csp := range csps { | ||||
| 		cs := clusterspecifier.Get(csp.GetExtension().GetTypedConfig().GetTypeUrl()) | ||||
| 		if cs == nil { | ||||
| 			// "If no plugin is registered for it, the resource will be NACKed."
 | ||||
| 			// - RLS in xDS design
 | ||||
| 			return nil, fmt.Errorf("cluster specifier %q of type %q was not found", csp.GetExtension().GetName(), csp.GetExtension().GetTypedConfig().GetTypeUrl()) | ||||
| 		} | ||||
| 		lbCfg, err := cs.ParseClusterSpecifierConfig(csp.GetExtension().GetTypedConfig()) | ||||
| 		if err != nil { | ||||
| 			// "If a plugin is found, the value of the typed_config field will
 | ||||
| 			// be passed to it's conversion method, and if an error is
 | ||||
| 			// encountered, the resource will be NACKED." - RLS in xDS design
 | ||||
| 			return nil, fmt.Errorf("error: %q parsing config %q for cluster specifier %q of type %q", err, csp.GetExtension().GetTypedConfig(), csp.GetExtension().GetName(), csp.GetExtension().GetTypedConfig().GetTypeUrl()) | ||||
| 		} | ||||
| 		// "If all cluster specifiers are valid, the xDS client will store the
 | ||||
| 		// configurations in a map keyed by the name of the extension instance." -
 | ||||
| 		// RLS in xDS Design
 | ||||
| 		cspCfgs[csp.GetExtension().GetName()] = lbCfg | ||||
| 	} | ||||
| 	return cspCfgs, nil | ||||
| } | ||||
| 
 | ||||
| func generateRetryConfig(rp *v3routepb.RetryPolicy) (*RetryConfig, error) { | ||||
|  | @ -162,12 +211,13 @@ func generateRetryConfig(rp *v3routepb.RetryPolicy) (*RetryConfig, error) { | |||
| 	return cfg, nil | ||||
| } | ||||
| 
 | ||||
| func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger, v2 bool) ([]*Route, error) { | ||||
| func routesProtoToSlice(routes []*v3routepb.Route, csps map[string]clusterspecifier.BalancerConfig, logger *grpclog.PrefixLogger, v2 bool) ([]*Route, map[string]bool, error) { | ||||
| 	var routesRet []*Route | ||||
| 	var cspNames = make(map[string]bool) | ||||
| 	for _, r := range routes { | ||||
| 		match := r.GetMatch() | ||||
| 		if match == nil { | ||||
| 			return nil, fmt.Errorf("route %+v doesn't have a match", r) | ||||
| 			return nil, nil, fmt.Errorf("route %+v doesn't have a match", r) | ||||
| 		} | ||||
| 
 | ||||
| 		if len(match.GetQueryParameters()) != 0 { | ||||
|  | @ -178,7 +228,7 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger, | |||
| 
 | ||||
| 		pathSp := match.GetPathSpecifier() | ||||
| 		if pathSp == nil { | ||||
| 			return nil, fmt.Errorf("route %+v doesn't have a path specifier", r) | ||||
| 			return nil, nil, fmt.Errorf("route %+v doesn't have a path specifier", r) | ||||
| 		} | ||||
| 
 | ||||
| 		var route Route | ||||
|  | @ -191,11 +241,11 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger, | |||
| 			regex := pt.SafeRegex.GetRegex() | ||||
| 			re, err := regexp.Compile(regex) | ||||
| 			if err != nil { | ||||
| 				return nil, fmt.Errorf("route %+v contains an invalid regex %q", r, regex) | ||||
| 				return nil, nil, fmt.Errorf("route %+v contains an invalid regex %q", r, regex) | ||||
| 			} | ||||
| 			route.Regex = re | ||||
| 		default: | ||||
| 			return nil, fmt.Errorf("route %+v has an unrecognized path specifier: %+v", r, pt) | ||||
| 			return nil, nil, fmt.Errorf("route %+v has an unrecognized path specifier: %+v", r, pt) | ||||
| 		} | ||||
| 
 | ||||
| 		if caseSensitive := match.GetCaseSensitive(); caseSensitive != nil { | ||||
|  | @ -211,7 +261,7 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger, | |||
| 				regex := ht.SafeRegexMatch.GetRegex() | ||||
| 				re, err := regexp.Compile(regex) | ||||
| 				if err != nil { | ||||
| 					return nil, fmt.Errorf("route %+v contains an invalid regex %q", r, regex) | ||||
| 					return nil, nil, fmt.Errorf("route %+v contains an invalid regex %q", r, regex) | ||||
| 				} | ||||
| 				header.RegexMatch = re | ||||
| 			case *v3routepb.HeaderMatcher_RangeMatch: | ||||
|  | @ -226,7 +276,7 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger, | |||
| 			case *v3routepb.HeaderMatcher_SuffixMatch: | ||||
| 				header.SuffixMatch = &ht.SuffixMatch | ||||
| 			default: | ||||
| 				return nil, fmt.Errorf("route %+v has an unrecognized header matcher: %+v", r, ht) | ||||
| 				return nil, nil, fmt.Errorf("route %+v has an unrecognized header matcher: %+v", r, ht) | ||||
| 			} | ||||
| 			header.Name = h.GetName() | ||||
| 			invert := h.GetInvertMatch() | ||||
|  | @ -256,7 +306,7 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger, | |||
| 			if env.RingHashSupport { | ||||
| 				hp, err := hashPoliciesProtoToSlice(action.HashPolicy, logger) | ||||
| 				if err != nil { | ||||
| 					return nil, err | ||||
| 					return nil, nil, err | ||||
| 				} | ||||
| 				route.HashPolicies = hp | ||||
| 			} | ||||
|  | @ -276,7 +326,7 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger, | |||
| 					if !v2 { | ||||
| 						cfgs, err := processHTTPFilterOverrides(c.GetTypedPerFilterConfig()) | ||||
| 						if err != nil { | ||||
| 							return nil, fmt.Errorf("route %+v, action %+v: %v", r, a, err) | ||||
| 							return nil, nil, fmt.Errorf("route %+v, action %+v: %v", r, a, err) | ||||
| 						} | ||||
| 						wc.HTTPFilterConfigOverride = cfgs | ||||
| 					} | ||||
|  | @ -290,15 +340,24 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger, | |||
| 					wantTotalWeight = tw.GetValue() | ||||
| 				} | ||||
| 				if totalWeight != wantTotalWeight { | ||||
| 					return nil, fmt.Errorf("route %+v, action %+v, weights of clusters do not add up to total total weight, got: %v, expected total weight from response: %v", r, a, totalWeight, wantTotalWeight) | ||||
| 					return nil, nil, fmt.Errorf("route %+v, action %+v, weights of clusters do not add up to total total weight, got: %v, expected total weight from response: %v", r, a, totalWeight, wantTotalWeight) | ||||
| 				} | ||||
| 				if totalWeight == 0 { | ||||
| 					return nil, fmt.Errorf("route %+v, action %+v, has no valid cluster in WeightedCluster action", r, a) | ||||
| 					return nil, nil, fmt.Errorf("route %+v, action %+v, has no valid cluster in WeightedCluster action", r, a) | ||||
| 				} | ||||
| 			case *v3routepb.RouteAction_ClusterHeader: | ||||
| 				continue | ||||
| 			case *v3routepb.RouteAction_ClusterSpecifierPlugin: | ||||
| 				if _, ok := csps[a.ClusterSpecifierPlugin]; !ok { | ||||
| 					// "When processing RouteActions, if any action includes a
 | ||||
| 					// cluster_specifier_plugin value that is not in
 | ||||
| 					// RouteConfiguration.cluster_specifier_plugins, the
 | ||||
| 					// resource will be NACKed." - RLS in xDS design
 | ||||
| 					return nil, nil, fmt.Errorf("route %+v, action %+v, specifies a cluster specifier plugin %+v that is not in Route Configuration", r, a, a.ClusterSpecifierPlugin) | ||||
| 				} | ||||
| 				cspNames[a.ClusterSpecifierPlugin] = true | ||||
| 			default: | ||||
| 				return nil, fmt.Errorf("route %+v, has an unknown ClusterSpecifier: %+v", r, a) | ||||
| 				return nil, nil, fmt.Errorf("route %+v, has an unknown ClusterSpecifier: %+v", r, a) | ||||
| 			} | ||||
| 
 | ||||
| 			msd := action.GetMaxStreamDuration() | ||||
|  | @ -315,7 +374,7 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger, | |||
| 			var err error | ||||
| 			route.RetryConfig, err = generateRetryConfig(action.GetRetryPolicy()) | ||||
| 			if err != nil { | ||||
| 				return nil, fmt.Errorf("route %+v, action %+v: %v", r, action, err) | ||||
| 				return nil, nil, fmt.Errorf("route %+v, action %+v: %v", r, action, err) | ||||
| 			} | ||||
| 
 | ||||
| 			route.RouteAction = RouteActionRoute | ||||
|  | @ -330,13 +389,13 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger, | |||
| 		if !v2 { | ||||
| 			cfgs, err := processHTTPFilterOverrides(r.GetTypedPerFilterConfig()) | ||||
| 			if err != nil { | ||||
| 				return nil, fmt.Errorf("route %+v: %v", r, err) | ||||
| 				return nil, nil, fmt.Errorf("route %+v: %v", r, err) | ||||
| 			} | ||||
| 			route.HTTPFilterConfigOverride = cfgs | ||||
| 		} | ||||
| 		routesRet = append(routesRet, &route) | ||||
| 	} | ||||
| 	return routesRet, nil | ||||
| 	return routesRet, cspNames, nil | ||||
| } | ||||
| 
 | ||||
| func hashPoliciesProtoToSlice(policies []*v3routepb.RouteAction_HashPolicy, logger *grpclog.PrefixLogger) ([]*HashPolicy, error) { | ||||
|  |  | |||
|  | @ -18,16 +18,19 @@ | |||
| package xdsresource | ||||
| 
 | ||||
| import ( | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"regexp" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/golang/protobuf/proto" | ||||
| 	"github.com/google/go-cmp/cmp" | ||||
| 	"github.com/google/go-cmp/cmp/cmpopts" | ||||
| 	"google.golang.org/grpc/codes" | ||||
| 	"google.golang.org/grpc/internal/testutils" | ||||
| 	"google.golang.org/grpc/internal/xds/env" | ||||
| 	"google.golang.org/grpc/xds/internal/clusterspecifier" | ||||
| 	"google.golang.org/grpc/xds/internal/httpfilter" | ||||
| 	"google.golang.org/grpc/xds/internal/version" | ||||
| 	"google.golang.org/protobuf/types/known/durationpb" | ||||
|  | @ -67,6 +70,31 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { | |||
| 				}}, | ||||
| 			} | ||||
| 		} | ||||
| 		goodRouteConfigWithClusterSpecifierPlugins = func(csps []*v3routepb.ClusterSpecifierPlugin, cspReferences []string) *v3routepb.RouteConfiguration { | ||||
| 			var rs []*v3routepb.Route | ||||
| 
 | ||||
| 			for i, cspReference := range cspReferences { | ||||
| 				rs = append(rs, &v3routepb.Route{ | ||||
| 					Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: fmt.Sprint(i + 1)}}, | ||||
| 					Action: &v3routepb.Route_Route{ | ||||
| 						Route: &v3routepb.RouteAction{ | ||||
| 							ClusterSpecifier: &v3routepb.RouteAction_ClusterSpecifierPlugin{ClusterSpecifierPlugin: cspReference}, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}) | ||||
| 			} | ||||
| 
 | ||||
| 			rc := &v3routepb.RouteConfiguration{ | ||||
| 				Name: routeName, | ||||
| 				VirtualHosts: []*v3routepb.VirtualHost{{ | ||||
| 					Domains: []string{ldsTarget}, | ||||
| 					Routes:  rs, | ||||
| 				}}, | ||||
| 				ClusterSpecifierPlugins: csps, | ||||
| 			} | ||||
| 
 | ||||
| 			return rc | ||||
| 		} | ||||
| 		goodUpdateWithFilterConfigs = func(cfgs map[string]httpfilter.FilterConfig) RouteConfigUpdate { | ||||
| 			return RouteConfigUpdate{ | ||||
| 				VirtualHosts: []*VirtualHost{{ | ||||
|  | @ -80,6 +108,26 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { | |||
| 				}}, | ||||
| 			} | ||||
| 		} | ||||
| 		goodUpdateWithClusterSpecifierPluginA = RouteConfigUpdate{ | ||||
| 			VirtualHosts: []*VirtualHost{{ | ||||
| 				Domains: []string{ldsTarget}, | ||||
| 				Routes: []*Route{{ | ||||
| 					Prefix:      newStringP("1"), | ||||
| 					RouteAction: RouteActionRoute, | ||||
| 				}}, | ||||
| 			}}, | ||||
| 			ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{ | ||||
| 				"cspA": nil, | ||||
| 			}, | ||||
| 		} | ||||
| 		clusterSpecifierPlugin = func(name string, config *anypb.Any) *v3routepb.ClusterSpecifierPlugin { | ||||
| 			return &v3routepb.ClusterSpecifierPlugin{ | ||||
| 				Extension: &v3corepb.TypedExtensionConfig{ | ||||
| 					Name:        name, | ||||
| 					TypedConfig: config, | ||||
| 				}, | ||||
| 			} | ||||
| 		} | ||||
| 		goodRouteConfigWithRetryPolicy = func(vhrp *v3routepb.RetryPolicy, rrp *v3routepb.RetryPolicy) *v3routepb.RouteConfiguration { | ||||
| 			return &v3routepb.RouteConfiguration{ | ||||
| 				Name: routeName, | ||||
|  | @ -565,6 +613,42 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { | |||
| 			wantUpdate: RouteConfigUpdate{}, | ||||
| 			wantError:  true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "cluster-specifier-declared-which-not-registered", | ||||
| 			rc: goodRouteConfigWithClusterSpecifierPlugins([]*v3routepb.ClusterSpecifierPlugin{ | ||||
| 				clusterSpecifierPlugin("cspA", configOfClusterSpecifierDoesntExist), | ||||
| 			}, []string{"cspA"}), | ||||
| 			wantError: true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "error-in-cluster-specifier-plugin-conversion-method", | ||||
| 			rc: goodRouteConfigWithClusterSpecifierPlugins([]*v3routepb.ClusterSpecifierPlugin{ | ||||
| 				clusterSpecifierPlugin("cspA", errorClusterSpecifierConfig), | ||||
| 			}, []string{"cspA"}), | ||||
| 			wantError: true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "route-action-that-references-undeclared-cluster-specifier-plugin", | ||||
| 			rc: goodRouteConfigWithClusterSpecifierPlugins([]*v3routepb.ClusterSpecifierPlugin{ | ||||
| 				clusterSpecifierPlugin("cspA", mockClusterSpecifierConfig), | ||||
| 			}, []string{"cspA", "cspB"}), | ||||
| 			wantError: true, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "emitted-cluster-specifier-plugins", | ||||
| 			rc: goodRouteConfigWithClusterSpecifierPlugins([]*v3routepb.ClusterSpecifierPlugin{ | ||||
| 				clusterSpecifierPlugin("cspA", mockClusterSpecifierConfig), | ||||
| 			}, []string{"cspA"}), | ||||
| 			wantUpdate: goodUpdateWithClusterSpecifierPluginA, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "deleted-cluster-specifier-plugins-not-referenced", | ||||
| 			rc: goodRouteConfigWithClusterSpecifierPlugins([]*v3routepb.ClusterSpecifierPlugin{ | ||||
| 				clusterSpecifierPlugin("cspA", mockClusterSpecifierConfig), | ||||
| 				clusterSpecifierPlugin("cspB", mockClusterSpecifierConfig), | ||||
| 			}, []string{"cspA"}), | ||||
| 			wantUpdate: goodUpdateWithClusterSpecifierPluginA, | ||||
| 		}, | ||||
| 	} | ||||
| 	for _, test := range tests { | ||||
| 		t.Run(test.name, func(t *testing.T) { | ||||
|  | @ -580,6 +664,47 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| var configOfClusterSpecifierDoesntExist = &anypb.Any{ | ||||
| 	TypeUrl: "does.not.exist", | ||||
| 	Value:   []byte{1, 2, 3}, | ||||
| } | ||||
| 
 | ||||
| var mockClusterSpecifierConfig = &anypb.Any{ | ||||
| 	TypeUrl: "mock.cluster.specifier.plugin", | ||||
| 	Value:   []byte{1, 2, 3}, | ||||
| } | ||||
| 
 | ||||
| var errorClusterSpecifierConfig = &anypb.Any{ | ||||
| 	TypeUrl: "error.cluster.specifier.plugin", | ||||
| 	Value:   []byte{1, 2, 3}, | ||||
| } | ||||
| 
 | ||||
| func init() { | ||||
| 	clusterspecifier.Register(mockClusterSpecifierPlugin{}) | ||||
| 	clusterspecifier.Register(errorClusterSpecifierPlugin{}) | ||||
| } | ||||
| 
 | ||||
| type mockClusterSpecifierPlugin struct { | ||||
| } | ||||
| 
 | ||||
| func (mockClusterSpecifierPlugin) TypeURLs() []string { | ||||
| 	return []string{"mock.cluster.specifier.plugin"} | ||||
| } | ||||
| 
 | ||||
| func (mockClusterSpecifierPlugin) ParseClusterSpecifierConfig(proto.Message) (clusterspecifier.BalancerConfig, error) { | ||||
| 	return nil, nil | ||||
| } | ||||
| 
 | ||||
| type errorClusterSpecifierPlugin struct{} | ||||
| 
 | ||||
| func (errorClusterSpecifierPlugin) TypeURLs() []string { | ||||
| 	return []string{"error.cluster.specifier.plugin"} | ||||
| } | ||||
| 
 | ||||
| func (errorClusterSpecifierPlugin) ParseClusterSpecifierConfig(proto.Message) (clusterspecifier.BalancerConfig, error) { | ||||
| 	return nil, errors.New("error from cluster specifier conversion function") | ||||
| } | ||||
| 
 | ||||
| func (s) TestUnmarshalRouteConfig(t *testing.T) { | ||||
| 	const ( | ||||
| 		ldsTarget                = "lds.target.good:1111" | ||||
|  | @ -1458,7 +1583,7 @@ func (s) TestRoutesProtoToSlice(t *testing.T) { | |||
| 	defer func() { env.RingHashSupport = oldRingHashSupport }() | ||||
| 	for _, tt := range tests { | ||||
| 		t.Run(tt.name, func(t *testing.T) { | ||||
| 			got, err := routesProtoToSlice(tt.routes, nil, false) | ||||
| 			got, _, err := routesProtoToSlice(tt.routes, nil, nil, false) | ||||
| 			if (err != nil) != tt.wantErr { | ||||
| 				t.Fatalf("routesProtoToSlice() error = %v, wantErr %v", err, tt.wantErr) | ||||
| 			} | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue