mirror of https://github.com/grpc/grpc-go.git
xds: add HashPolicy fields to RDS update (#4521)
* Add HashPolicy fields to RDS update
This commit is contained in:
parent
45549242f7
commit
22c5358187
|
@ -39,6 +39,7 @@ const (
|
|||
// When both bootstrap FileName and FileContent are set, FileName is used.
|
||||
BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"
|
||||
|
||||
ringHashSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"
|
||||
clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
|
||||
aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"
|
||||
|
||||
|
@ -59,7 +60,10 @@ var (
|
|||
//
|
||||
// When both bootstrap FileName and FileContent are set, FileName is used.
|
||||
BootstrapFileContent = os.Getenv(BootstrapFileContentEnv)
|
||||
|
||||
// RingHashSupport indicates whether ring hash support is enabled, which can
|
||||
// be enabled by setting the environment variable
|
||||
// "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH" to "true".
|
||||
RingHashSupport = strings.EqualFold(os.Getenv(ringHashSupportEnv), "true")
|
||||
// ClientSideSecuritySupport is used to control processing of security
|
||||
// configuration on the client-side.
|
||||
//
|
||||
|
|
|
@ -269,6 +269,28 @@ type VirtualHost struct {
|
|||
HTTPFilterConfigOverride map[string]httpfilter.FilterConfig
|
||||
}
|
||||
|
||||
// HashPolicyType specifies the type of HashPolicy from a received RDS Response.
|
||||
type HashPolicyType int
|
||||
|
||||
const (
|
||||
// HashPolicyTypeHeader specifies to hash a Header in the incoming request.
|
||||
HashPolicyTypeHeader HashPolicyType = iota
|
||||
// HashPolicyTypeChannelID specifies to hash a unique Identifier of the
|
||||
// Channel. In grpc-go, this will be done using the ClientConn pointer.
|
||||
HashPolicyTypeChannelID
|
||||
)
|
||||
|
||||
// HashPolicy specifies the HashPolicy if the upstream cluster uses a hashing
|
||||
// load balancer.
|
||||
type HashPolicy struct {
|
||||
HashPolicyType HashPolicyType
|
||||
Terminal bool
|
||||
// Fields used for type HEADER.
|
||||
HeaderName string
|
||||
Regex *regexp.Regexp
|
||||
RegexSubstitution string
|
||||
}
|
||||
|
||||
// Route is both a specification of how to match a request as well as an
|
||||
// indication of the action to take upon match.
|
||||
type Route struct {
|
||||
|
@ -281,6 +303,8 @@ type Route struct {
|
|||
Headers []*HeaderMatcher
|
||||
Fraction *uint32
|
||||
|
||||
HashPolicies []*HashPolicy
|
||||
|
||||
// If the matchers above indicate a match, the below configuration is used.
|
||||
WeightedClusters map[string]WeightedCluster
|
||||
// If MaxStreamDuration is nil, it indicates neither of the route action's
|
||||
|
|
|
@ -29,6 +29,7 @@ import (
|
|||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"google.golang.org/grpc/internal/testutils"
|
||||
"google.golang.org/grpc/internal/xds/env"
|
||||
"google.golang.org/grpc/xds/internal/httpfilter"
|
||||
"google.golang.org/grpc/xds/internal/version"
|
||||
"google.golang.org/protobuf/types/known/durationpb"
|
||||
|
@ -1153,6 +1154,61 @@ func (s) TestRoutesProtoToSlice(t *testing.T) {
|
|||
}},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "good-with-channel-id-hash-policy",
|
||||
routes: []*v3routepb.Route{
|
||||
{
|
||||
Match: &v3routepb.RouteMatch{
|
||||
PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/a/"},
|
||||
Headers: []*v3routepb.HeaderMatcher{
|
||||
{
|
||||
Name: "th",
|
||||
HeaderMatchSpecifier: &v3routepb.HeaderMatcher_PrefixMatch{
|
||||
PrefixMatch: "tv",
|
||||
},
|
||||
InvertMatch: true,
|
||||
},
|
||||
},
|
||||
RuntimeFraction: &v3corepb.RuntimeFractionalPercent{
|
||||
DefaultValue: &v3typepb.FractionalPercent{
|
||||
Numerator: 1,
|
||||
Denominator: v3typepb.FractionalPercent_HUNDRED,
|
||||
},
|
||||
},
|
||||
},
|
||||
Action: &v3routepb.Route_Route{
|
||||
Route: &v3routepb.RouteAction{
|
||||
ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{
|
||||
WeightedClusters: &v3routepb.WeightedCluster{
|
||||
Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
|
||||
{Name: "B", Weight: &wrapperspb.UInt32Value{Value: 60}},
|
||||
{Name: "A", Weight: &wrapperspb.UInt32Value{Value: 40}},
|
||||
},
|
||||
TotalWeight: &wrapperspb.UInt32Value{Value: 100},
|
||||
}},
|
||||
HashPolicy: []*v3routepb.RouteAction_HashPolicy{
|
||||
{PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{Key: "io.grpc.channel_id"}}},
|
||||
},
|
||||
}},
|
||||
},
|
||||
},
|
||||
wantRoutes: []*Route{{
|
||||
Prefix: newStringP("/a/"),
|
||||
Headers: []*HeaderMatcher{
|
||||
{
|
||||
Name: "th",
|
||||
InvertMatch: newBoolP(true),
|
||||
PrefixMatch: newStringP("tv"),
|
||||
},
|
||||
},
|
||||
Fraction: newUInt32P(10000),
|
||||
WeightedClusters: map[string]WeightedCluster{"A": {Weight: 40}, "B": {Weight: 60}},
|
||||
HashPolicies: []*HashPolicy{
|
||||
{HashPolicyType: HashPolicyTypeChannelID},
|
||||
},
|
||||
}},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "with custom HTTP filter config",
|
||||
routes: goodRouteWithFilterConfigs(map[string]*anypb.Any{"foo": customFilterConfig}),
|
||||
|
@ -1197,7 +1253,9 @@ func (s) TestRoutesProtoToSlice(t *testing.T) {
|
|||
return fmt.Sprint(fc)
|
||||
}),
|
||||
}
|
||||
|
||||
oldRingHashSupport := env.RingHashSupport
|
||||
env.RingHashSupport = true
|
||||
defer func() { env.RingHashSupport = oldRingHashSupport }()
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := routesProtoToSlice(tt.routes, nil, false)
|
||||
|
@ -1211,6 +1269,119 @@ func (s) TestRoutesProtoToSlice(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s) TestHashPoliciesProtoToSlice(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
hashPolicies []*v3routepb.RouteAction_HashPolicy
|
||||
wantHashPolicies []*HashPolicy
|
||||
wantErr bool
|
||||
}{
|
||||
// header-hash-policy tests a basic hash policy that specifies to hash a
|
||||
// certain header.
|
||||
{
|
||||
name: "header-hash-policy",
|
||||
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
|
||||
{
|
||||
PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{
|
||||
Header: &v3routepb.RouteAction_HashPolicy_Header{
|
||||
HeaderName: ":path",
|
||||
RegexRewrite: &v3matcherpb.RegexMatchAndSubstitute{
|
||||
Pattern: &v3matcherpb.RegexMatcher{Regex: "/products"},
|
||||
Substitution: "/products",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
wantHashPolicies: []*HashPolicy{
|
||||
{
|
||||
HashPolicyType: HashPolicyTypeHeader,
|
||||
HeaderName: ":path",
|
||||
Regex: func() *regexp.Regexp { return regexp.MustCompile("/products") }(),
|
||||
RegexSubstitution: "/products",
|
||||
},
|
||||
},
|
||||
},
|
||||
// channel-id-hash-policy tests a basic hash policy that specifies to
|
||||
// hash a unique identifier of the channel.
|
||||
{
|
||||
name: "channel-id-hash-policy",
|
||||
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
|
||||
{PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{Key: "io.grpc.channel_id"}}},
|
||||
},
|
||||
wantHashPolicies: []*HashPolicy{
|
||||
{HashPolicyType: HashPolicyTypeChannelID},
|
||||
},
|
||||
},
|
||||
// unsupported-filter-state-key tests that an unsupported key in the
|
||||
// filter state hash policy are treated as a no-op.
|
||||
{
|
||||
name: "wrong-filter-state-key",
|
||||
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
|
||||
{PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{Key: "unsupported key"}}},
|
||||
},
|
||||
},
|
||||
// no-op-hash-policy tests that hash policies that are not supported by
|
||||
// grpc are treated as a no-op.
|
||||
{
|
||||
name: "no-op-hash-policy",
|
||||
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
|
||||
{PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{}},
|
||||
},
|
||||
},
|
||||
// header-and-channel-id-hash-policy test that a list of header and
|
||||
// channel id hash policies are successfully converted to an internal
|
||||
// struct.
|
||||
{
|
||||
name: "header-and-channel-id-hash-policy",
|
||||
hashPolicies: []*v3routepb.RouteAction_HashPolicy{
|
||||
{
|
||||
PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{
|
||||
Header: &v3routepb.RouteAction_HashPolicy_Header{
|
||||
HeaderName: ":path",
|
||||
RegexRewrite: &v3matcherpb.RegexMatchAndSubstitute{
|
||||
Pattern: &v3matcherpb.RegexMatcher{Regex: "/products"},
|
||||
Substitution: "/products",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{Key: "io.grpc.channel_id"}},
|
||||
Terminal: true,
|
||||
},
|
||||
},
|
||||
wantHashPolicies: []*HashPolicy{
|
||||
{
|
||||
HashPolicyType: HashPolicyTypeHeader,
|
||||
HeaderName: ":path",
|
||||
Regex: func() *regexp.Regexp { return regexp.MustCompile("/products") }(),
|
||||
RegexSubstitution: "/products",
|
||||
},
|
||||
{
|
||||
HashPolicyType: HashPolicyTypeChannelID,
|
||||
Terminal: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
oldRingHashSupport := env.RingHashSupport
|
||||
env.RingHashSupport = true
|
||||
defer func() { env.RingHashSupport = oldRingHashSupport }()
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := hashPoliciesProtoToSlice(tt.hashPolicies, nil)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Fatalf("hashPoliciesProtoToSlice() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
if diff := cmp.Diff(got, tt.wantHashPolicies, cmp.AllowUnexported(regexp.Regexp{})); diff != "" {
|
||||
t.Fatalf("hashPoliciesProtoToSlice() returned unexpected diff (-got +want):\n%s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func newStringP(s string) *string {
|
||||
return &s
|
||||
}
|
||||
|
|
|
@ -496,6 +496,16 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger,
|
|||
|
||||
route.WeightedClusters = make(map[string]WeightedCluster)
|
||||
action := r.GetRoute()
|
||||
|
||||
// Hash Policies are only applicable for a Ring Hash LB.
|
||||
if env.RingHashSupport {
|
||||
hp, err := hashPoliciesProtoToSlice(action.HashPolicy, logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
route.HashPolicies = hp
|
||||
}
|
||||
|
||||
switch a := action.GetClusterSpecifier().(type) {
|
||||
case *v3routepb.RouteAction_Cluster:
|
||||
route.WeightedClusters[a.Cluster] = WeightedCluster{Weight: 1}
|
||||
|
@ -557,6 +567,37 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger,
|
|||
return routesRet, nil
|
||||
}
|
||||
|
||||
func hashPoliciesProtoToSlice(policies []*v3routepb.RouteAction_HashPolicy, logger *grpclog.PrefixLogger) ([]*HashPolicy, error) {
|
||||
var hashPoliciesRet []*HashPolicy
|
||||
for _, p := range policies {
|
||||
policy := HashPolicy{Terminal: p.Terminal}
|
||||
switch p.GetPolicySpecifier().(type) {
|
||||
case *v3routepb.RouteAction_HashPolicy_Header_:
|
||||
policy.HashPolicyType = HashPolicyTypeHeader
|
||||
policy.HeaderName = p.GetHeader().GetHeaderName()
|
||||
regex := p.GetHeader().GetRegexRewrite().GetPattern().GetRegex()
|
||||
re, err := regexp.Compile(regex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("hash policy %+v contains an invalid regex %q", p, regex)
|
||||
}
|
||||
policy.Regex = re
|
||||
policy.RegexSubstitution = p.GetHeader().GetRegexRewrite().GetSubstitution()
|
||||
case *v3routepb.RouteAction_HashPolicy_FilterState_:
|
||||
if p.GetFilterState().GetKey() != "io.grpc.channel_id" {
|
||||
logger.Infof("hash policy %+v contains an invalid key for filter state policy %q", p, p.GetFilterState().GetKey())
|
||||
continue
|
||||
}
|
||||
policy.HashPolicyType = HashPolicyTypeChannelID
|
||||
default:
|
||||
logger.Infof("hash policy %T is an unsupported hash policy", p.GetPolicySpecifier())
|
||||
continue
|
||||
}
|
||||
|
||||
hashPoliciesRet = append(hashPoliciesRet, &policy)
|
||||
}
|
||||
return hashPoliciesRet, nil
|
||||
}
|
||||
|
||||
// UnmarshalCluster processes resources received in an CDS response, validates
|
||||
// them, and transforms them into a native struct which contains only fields we
|
||||
// are interested in.
|
||||
|
|
Loading…
Reference in New Issue