mirror of https://github.com/grpc/grpc-go.git
				
				
				
			
		
			
				
	
	
		
			349 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			349 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
| /*
 | |
|  *
 | |
|  * Copyright 2020 gRPC authors.
 | |
|  *
 | |
|  * Licensed under the Apache License, Version 2.0 (the "License");
 | |
|  * you may not use this file except in compliance with the License.
 | |
|  * You may obtain a copy of the License at
 | |
|  *
 | |
|  *     http://www.apache.org/licenses/LICENSE-2.0
 | |
|  *
 | |
|  * Unless required by applicable law or agreed to in writing, software
 | |
|  * distributed under the License is distributed on an "AS IS" BASIS,
 | |
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|  * See the License for the specific language governing permissions and
 | |
|  * limitations under the License.
 | |
|  *
 | |
|  */
 | |
| 
 | |
| package resolver
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"math/bits"
 | |
| 	"math/rand"
 | |
| 	"strings"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	xxhash "github.com/cespare/xxhash/v2"
 | |
| 	"google.golang.org/grpc/codes"
 | |
| 	"google.golang.org/grpc/internal/grpcutil"
 | |
| 	iresolver "google.golang.org/grpc/internal/resolver"
 | |
| 	"google.golang.org/grpc/internal/serviceconfig"
 | |
| 	"google.golang.org/grpc/internal/wrr"
 | |
| 	"google.golang.org/grpc/metadata"
 | |
| 	"google.golang.org/grpc/status"
 | |
| 	"google.golang.org/grpc/xds/internal/balancer/clustermanager"
 | |
| 	"google.golang.org/grpc/xds/internal/balancer/ringhash"
 | |
| 	"google.golang.org/grpc/xds/internal/httpfilter"
 | |
| 	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	cdsName                      = "cds_experimental"
 | |
| 	xdsClusterManagerName        = "xds_cluster_manager_experimental"
 | |
| 	clusterPrefix                = "cluster:"
 | |
| 	clusterSpecifierPluginPrefix = "cluster_specifier_plugin:"
 | |
| )
 | |
| 
 | |
| type serviceConfig struct {
 | |
| 	LoadBalancingConfig balancerConfig `json:"loadBalancingConfig"`
 | |
| }
 | |
| 
 | |
| type balancerConfig []map[string]any
 | |
| 
 | |
| func newBalancerConfig(name string, config any) balancerConfig {
 | |
| 	return []map[string]any{{name: config}}
 | |
| }
 | |
| 
 | |
| type cdsBalancerConfig struct {
 | |
| 	Cluster string `json:"cluster"`
 | |
| }
 | |
| 
 | |
| type xdsChildConfig struct {
 | |
| 	ChildPolicy balancerConfig `json:"childPolicy"`
 | |
| }
 | |
| 
 | |
| type xdsClusterManagerConfig struct {
 | |
| 	Children map[string]xdsChildConfig `json:"children"`
 | |
| }
 | |
| 
 | |
| // serviceConfigJSON produces a service config in JSON format representing all
 | |
| // the clusters referenced in activeClusters.  This includes clusters with zero
 | |
| // references, so they must be pruned first.
 | |
| func serviceConfigJSON(activeClusters map[string]*clusterInfo) ([]byte, error) {
 | |
| 	// Generate children (all entries in activeClusters).
 | |
| 	children := make(map[string]xdsChildConfig)
 | |
| 	for cluster, ci := range activeClusters {
 | |
| 		children[cluster] = ci.cfg
 | |
| 	}
 | |
| 
 | |
| 	sc := serviceConfig{
 | |
| 		LoadBalancingConfig: newBalancerConfig(
 | |
| 			xdsClusterManagerName, xdsClusterManagerConfig{Children: children},
 | |
| 		),
 | |
| 	}
 | |
| 
 | |
| 	bs, err := json.Marshal(sc)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to marshal json: %v", err)
 | |
| 	}
 | |
| 	return bs, nil
 | |
| }
 | |
| 
 | |
| type virtualHost struct {
 | |
| 	// map from filter name to its config
 | |
| 	httpFilterConfigOverride map[string]httpfilter.FilterConfig
 | |
| 	// retry policy present in virtual host
 | |
| 	retryConfig *xdsresource.RetryConfig
 | |
| }
 | |
| 
 | |
| // routeCluster holds information about a cluster as referenced by a route.
 | |
| type routeCluster struct {
 | |
| 	name string
 | |
| 	// map from filter name to its config
 | |
| 	httpFilterConfigOverride map[string]httpfilter.FilterConfig
 | |
| }
 | |
| 
 | |
| type route struct {
 | |
| 	m                 *xdsresource.CompositeMatcher // converted from route matchers
 | |
| 	actionType        xdsresource.RouteActionType   // holds route action type
 | |
| 	clusters          wrr.WRR                       // holds *routeCluster entries
 | |
| 	maxStreamDuration time.Duration
 | |
| 	// map from filter name to its config
 | |
| 	httpFilterConfigOverride map[string]httpfilter.FilterConfig
 | |
| 	retryConfig              *xdsresource.RetryConfig
 | |
| 	hashPolicies             []*xdsresource.HashPolicy
 | |
| }
 | |
| 
 | |
| func (r route) String() string {
 | |
| 	return fmt.Sprintf("%s -> { clusters: %v, maxStreamDuration: %v }", r.m.String(), r.clusters, r.maxStreamDuration)
 | |
| }
 | |
| 
 | |
| type configSelector struct {
 | |
| 	r                *xdsResolver
 | |
| 	virtualHost      virtualHost
 | |
| 	routes           []route
 | |
| 	clusters         map[string]*clusterInfo
 | |
| 	httpFilterConfig []xdsresource.HTTPFilter
 | |
| }
 | |
| 
 | |
| var errNoMatchedRouteFound = status.Errorf(codes.Unavailable, "no matched route was found")
 | |
| var errUnsupportedClientRouteAction = status.Errorf(codes.Unavailable, "matched route does not have a supported route action type")
 | |
| 
 | |
| func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
 | |
| 	if cs == nil {
 | |
| 		return nil, status.Errorf(codes.Unavailable, "no valid clusters")
 | |
| 	}
 | |
| 	var rt *route
 | |
| 	// Loop through routes in order and select first match.
 | |
| 	for _, r := range cs.routes {
 | |
| 		if r.m.Match(rpcInfo) {
 | |
| 			rt = &r
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if rt == nil || rt.clusters == nil {
 | |
| 		return nil, errNoMatchedRouteFound
 | |
| 	}
 | |
| 
 | |
| 	if rt.actionType != xdsresource.RouteActionRoute {
 | |
| 		return nil, errUnsupportedClientRouteAction
 | |
| 	}
 | |
| 
 | |
| 	cluster, ok := rt.clusters.Next().(*routeCluster)
 | |
| 	if !ok {
 | |
| 		return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster)
 | |
| 	}
 | |
| 
 | |
| 	// Add a ref to the selected cluster, as this RPC needs this cluster until
 | |
| 	// it is committed.
 | |
| 	ref := &cs.clusters[cluster.name].refCount
 | |
| 	atomic.AddInt32(ref, 1)
 | |
| 
 | |
| 	interceptor, err := cs.newInterceptor(rt, cluster)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	lbCtx := clustermanager.SetPickedCluster(rpcInfo.Context, cluster.name)
 | |
| 	lbCtx = ringhash.SetRequestHash(lbCtx, cs.generateHash(rpcInfo, rt.hashPolicies))
 | |
| 
 | |
| 	config := &iresolver.RPCConfig{
 | |
| 		// Communicate to the LB policy the chosen cluster and request hash, if Ring Hash LB policy.
 | |
| 		Context: lbCtx,
 | |
| 		OnCommitted: func() {
 | |
| 			// When the RPC is committed, the cluster is no longer required.
 | |
| 			// Decrease its ref.
 | |
| 			if v := atomic.AddInt32(ref, -1); v == 0 {
 | |
| 				// This entry will be removed from activeClusters when
 | |
| 				// producing the service config for the empty update.
 | |
| 				cs.r.serializer.Schedule(func(context.Context) {
 | |
| 					cs.r.onClusterRefDownToZero()
 | |
| 				})
 | |
| 			}
 | |
| 		},
 | |
| 		Interceptor: interceptor,
 | |
| 	}
 | |
| 
 | |
| 	if rt.maxStreamDuration != 0 {
 | |
| 		config.MethodConfig.Timeout = &rt.maxStreamDuration
 | |
| 	}
 | |
| 	if rt.retryConfig != nil {
 | |
| 		config.MethodConfig.RetryPolicy = retryConfigToPolicy(rt.retryConfig)
 | |
| 	} else if cs.virtualHost.retryConfig != nil {
 | |
| 		config.MethodConfig.RetryPolicy = retryConfigToPolicy(cs.virtualHost.retryConfig)
 | |
| 	}
 | |
| 
 | |
| 	return config, nil
 | |
| }
 | |
| 
 | |
| func retryConfigToPolicy(config *xdsresource.RetryConfig) *serviceconfig.RetryPolicy {
 | |
| 	return &serviceconfig.RetryPolicy{
 | |
| 		MaxAttempts:          int(config.NumRetries) + 1,
 | |
| 		InitialBackoff:       config.RetryBackoff.BaseInterval,
 | |
| 		MaxBackoff:           config.RetryBackoff.MaxInterval,
 | |
| 		BackoffMultiplier:    2,
 | |
| 		RetryableStatusCodes: config.RetryOn,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (cs *configSelector) generateHash(rpcInfo iresolver.RPCInfo, hashPolicies []*xdsresource.HashPolicy) uint64 {
 | |
| 	var hash uint64
 | |
| 	var generatedHash bool
 | |
| 	var md, emd metadata.MD
 | |
| 	var mdRead bool
 | |
| 	for _, policy := range hashPolicies {
 | |
| 		var policyHash uint64
 | |
| 		var generatedPolicyHash bool
 | |
| 		switch policy.HashPolicyType {
 | |
| 		case xdsresource.HashPolicyTypeHeader:
 | |
| 			if strings.HasSuffix(policy.HeaderName, "-bin") {
 | |
| 				continue
 | |
| 			}
 | |
| 			if !mdRead {
 | |
| 				md, _ = metadata.FromOutgoingContext(rpcInfo.Context)
 | |
| 				emd, _ = grpcutil.ExtraMetadata(rpcInfo.Context)
 | |
| 				mdRead = true
 | |
| 			}
 | |
| 			values := emd.Get(policy.HeaderName)
 | |
| 			if len(values) == 0 {
 | |
| 				// Extra metadata (e.g. the "content-type" header) takes
 | |
| 				// precedence over the user's metadata.
 | |
| 				values = md.Get(policy.HeaderName)
 | |
| 				if len(values) == 0 {
 | |
| 					// If the header isn't present at all, this policy is a no-op.
 | |
| 					continue
 | |
| 				}
 | |
| 			}
 | |
| 			joinedValues := strings.Join(values, ",")
 | |
| 			if policy.Regex != nil {
 | |
| 				joinedValues = policy.Regex.ReplaceAllString(joinedValues, policy.RegexSubstitution)
 | |
| 			}
 | |
| 			policyHash = xxhash.Sum64String(joinedValues)
 | |
| 			generatedHash = true
 | |
| 			generatedPolicyHash = true
 | |
| 		case xdsresource.HashPolicyTypeChannelID:
 | |
| 			// Use the static channel ID as the hash for this policy.
 | |
| 			policyHash = cs.r.channelID
 | |
| 			generatedHash = true
 | |
| 			generatedPolicyHash = true
 | |
| 		}
 | |
| 
 | |
| 		// Deterministically combine the hash policies. Rotating prevents
 | |
| 		// duplicate hash policies from cancelling each other out and preserves
 | |
| 		// the 64 bits of entropy.
 | |
| 		if generatedPolicyHash {
 | |
| 			hash = bits.RotateLeft64(hash, 1)
 | |
| 			hash = hash ^ policyHash
 | |
| 		}
 | |
| 
 | |
| 		// If terminal policy and a hash has already been generated, ignore the
 | |
| 		// rest of the policies and use that hash already generated.
 | |
| 		if policy.Terminal && generatedHash {
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if generatedHash {
 | |
| 		return hash
 | |
| 	}
 | |
| 	// If no generated hash return a random long. In the grand scheme of things
 | |
| 	// this logically will map to choosing a random backend to route request to.
 | |
| 	return rand.Uint64()
 | |
| }
 | |
| 
 | |
| func (cs *configSelector) newInterceptor(rt *route, cluster *routeCluster) (iresolver.ClientInterceptor, error) {
 | |
| 	if len(cs.httpFilterConfig) == 0 {
 | |
| 		return nil, nil
 | |
| 	}
 | |
| 	interceptors := make([]iresolver.ClientInterceptor, 0, len(cs.httpFilterConfig))
 | |
| 	for _, filter := range cs.httpFilterConfig {
 | |
| 		override := cluster.httpFilterConfigOverride[filter.Name] // cluster is highest priority
 | |
| 		if override == nil {
 | |
| 			override = rt.httpFilterConfigOverride[filter.Name] // route is second priority
 | |
| 		}
 | |
| 		if override == nil {
 | |
| 			override = cs.virtualHost.httpFilterConfigOverride[filter.Name] // VH is third & lowest priority
 | |
| 		}
 | |
| 		ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder)
 | |
| 		if !ok {
 | |
| 			// Should not happen if it passed xdsClient validation.
 | |
| 			return nil, fmt.Errorf("filter does not support use in client")
 | |
| 		}
 | |
| 		i, err := ib.BuildClientInterceptor(filter.Config, override)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("error constructing filter: %v", err)
 | |
| 		}
 | |
| 		if i != nil {
 | |
| 			interceptors = append(interceptors, i)
 | |
| 		}
 | |
| 	}
 | |
| 	return &interceptorList{interceptors: interceptors}, nil
 | |
| }
 | |
| 
 | |
| // stop decrements refs of all clusters referenced by this config selector.
 | |
| func (cs *configSelector) stop() {
 | |
| 	// The resolver's old configSelector may be nil.  Handle that here.
 | |
| 	if cs == nil {
 | |
| 		return
 | |
| 	}
 | |
| 	// If any refs drop to zero, we'll need a service config update to delete
 | |
| 	// the cluster.
 | |
| 	needUpdate := false
 | |
| 	// Loops over cs.clusters, but these are pointers to entries in
 | |
| 	// activeClusters.
 | |
| 	for _, ci := range cs.clusters {
 | |
| 		if v := atomic.AddInt32(&ci.refCount, -1); v == 0 {
 | |
| 			needUpdate = true
 | |
| 		}
 | |
| 	}
 | |
| 	// We stop the old config selector immediately after sending a new config
 | |
| 	// selector; we need another update to delete clusters from the config (if
 | |
| 	// we don't have another update pending already).
 | |
| 	if needUpdate {
 | |
| 		cs.r.serializer.Schedule(func(context.Context) {
 | |
| 			cs.r.onClusterRefDownToZero()
 | |
| 		})
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type interceptorList struct {
 | |
| 	interceptors []iresolver.ClientInterceptor
 | |
| }
 | |
| 
 | |
| func (il *interceptorList) NewStream(ctx context.Context, ri iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) {
 | |
| 	for i := len(il.interceptors) - 1; i >= 0; i-- {
 | |
| 		ns := newStream
 | |
| 		interceptor := il.interceptors[i]
 | |
| 		newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
 | |
| 			return interceptor.NewStream(ctx, ri, done, ns)
 | |
| 		}
 | |
| 	}
 | |
| 	return newStream(ctx, func() {})
 | |
| }
 |