mirror of https://github.com/grpc/grpc-go.git
rls: delegate pick to child policy as long as it is not in TransientFailure (#5656)
This commit is contained in:
parent
7da8a056b6
commit
9c3e589d3e
|
@ -162,21 +162,20 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
|
|||
|
||||
// delegateToChildPolicies is a helper function which iterates through the list
|
||||
// of child policy wrappers in a cache entry and attempts to find a child policy
|
||||
// to which this RPC can be routed to. If there is no child policy in READY
|
||||
// state, we delegate to the first child policy arbitrarily.
|
||||
// to which this RPC can be routed to. If all child policies are in
|
||||
// TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily.
|
||||
//
|
||||
// Caller must hold at least a read-lock on p.lb.cacheMu.
|
||||
func (p *rlsPicker) delegateToChildPolicies(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) {
|
||||
for _, cpw := range dcEntry.childPolicyWrappers {
|
||||
ok, res, err := p.pickIfFeasible(cpw, info)
|
||||
if ok {
|
||||
return res, err
|
||||
for i, cpw := range dcEntry.childPolicyWrappers {
|
||||
state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
|
||||
// Delegate to the child policy if it is not in TRANSIENT_FAILURE, or if
|
||||
// it the last one (which handles the case of delegating to the last
|
||||
// child picker if all child polcies are in TRANSIENT_FAILURE).
|
||||
if state.ConnectivityState != connectivity.TransientFailure || i == len(dcEntry.childPolicyWrappers)-1 {
|
||||
return state.Picker.Pick(info)
|
||||
}
|
||||
}
|
||||
if len(dcEntry.childPolicyWrappers) != 0 {
|
||||
state := (*balancer.State)(atomic.LoadPointer(&dcEntry.childPolicyWrappers[0].state))
|
||||
return state.Picker.Pick(info)
|
||||
}
|
||||
// In the unlikely event that we have a cache entry with no targets, we end up
|
||||
// queueing the RPC.
|
||||
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
|
||||
|
@ -249,8 +248,8 @@ func (p *rlsPicker) sendRequestAndReturnPick(cacheKey cacheKey, bs *backoffState
|
|||
// target if one is configured, or fails the pick with the given error.
|
||||
func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) {
|
||||
if p.defaultPolicy != nil {
|
||||
_, res, err := p.pickIfFeasible(p.defaultPolicy, info)
|
||||
return res, err
|
||||
state := (*balancer.State)(atomic.LoadPointer(&p.defaultPolicy.state))
|
||||
return state.Picker.Pick(info)
|
||||
}
|
||||
return balancer.PickResult{}, errOnNoDefault
|
||||
}
|
||||
|
@ -275,27 +274,6 @@ func (p *rlsPicker) sendRouteLookupRequest(cacheKey cacheKey, bs *backoffState,
|
|||
return throttled
|
||||
}
|
||||
|
||||
// pickIfFeasible determines if a pick can be delegated to child policy based on
|
||||
// its connectivity state.
|
||||
// - If state is CONNECTING, the pick is to be queued
|
||||
// - If state is IDLE, the child policy is instructed to exit idle, and the pick
|
||||
// is to be queued
|
||||
// - If state is READY, pick it delegated to the child policy's picker
|
||||
func (p *rlsPicker) pickIfFeasible(cpw *childPolicyWrapper, info balancer.PickInfo) (bool, balancer.PickResult, error) {
|
||||
state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
|
||||
switch state.ConnectivityState {
|
||||
case connectivity.Connecting:
|
||||
return true, balancer.PickResult{}, balancer.ErrNoSubConnAvailable
|
||||
case connectivity.Idle:
|
||||
p.bg.ExitIdleOne(cpw.target)
|
||||
return true, balancer.PickResult{}, balancer.ErrNoSubConnAvailable
|
||||
case connectivity.Ready:
|
||||
r, e := state.Picker.Pick(info)
|
||||
return true, r, e
|
||||
}
|
||||
return false, balancer.PickResult{}, balancer.ErrNoSubConnAvailable
|
||||
}
|
||||
|
||||
// handleRouteLookupResponse is the callback invoked by the control channel upon
|
||||
// receipt of an RLS response. Modifies the data cache and pending requests map
|
||||
// and sends a new picker.
|
||||
|
|
|
@ -305,8 +305,44 @@ func DefaultRouteConfig(routeName, ldsTarget, clusterName string) *v3routepb.Rou
|
|||
|
||||
// DefaultCluster returns a basic xds Cluster resource.
|
||||
func DefaultCluster(clusterName, edsServiceName string, secLevel SecurityLevel) *v3clusterpb.Cluster {
|
||||
return ClusterResourceWithOptions(&ClusterOptions{
|
||||
ClusterName: clusterName,
|
||||
ServiceName: edsServiceName,
|
||||
Policy: LoadBalancingPolicyRoundRobin,
|
||||
SecurityLevel: secLevel,
|
||||
})
|
||||
}
|
||||
|
||||
// LoadBalancingPolicy determines the policy used for balancing load across
|
||||
// endpoints in the Cluster.
|
||||
type LoadBalancingPolicy int
|
||||
|
||||
const (
|
||||
// LoadBalancingPolicyRoundRobin results in the use of the weighted_target
|
||||
// LB policy to balance load across localities and endpoints in the cluster.
|
||||
LoadBalancingPolicyRoundRobin LoadBalancingPolicy = iota
|
||||
// LoadBalancingPolicyRingHash results in the use of the ring_hash LB policy
|
||||
// as the leaf policy.
|
||||
LoadBalancingPolicyRingHash
|
||||
)
|
||||
|
||||
// ClusterOptions contains options to configure a Cluster resource.
|
||||
type ClusterOptions struct {
|
||||
// ClusterName is the name of the Cluster resource.
|
||||
ClusterName string
|
||||
// ServiceName is the EDS service name of the Cluster.
|
||||
ServiceName string
|
||||
// Policy is the LB policy to be used.
|
||||
Policy LoadBalancingPolicy
|
||||
// SecurityLevel determines the security configuration for the Cluster.
|
||||
SecurityLevel SecurityLevel
|
||||
}
|
||||
|
||||
// ClusterResourceWithOptions returns an xDS Cluster resource configured with
|
||||
// the provided options.
|
||||
func ClusterResourceWithOptions(opts *ClusterOptions) *v3clusterpb.Cluster {
|
||||
var tlsContext *v3tlspb.UpstreamTlsContext
|
||||
switch secLevel {
|
||||
switch opts.SecurityLevel {
|
||||
case SecurityLevelNone:
|
||||
case SecurityLevelTLS:
|
||||
tlsContext = &v3tlspb.UpstreamTlsContext{
|
||||
|
@ -333,8 +369,15 @@ func DefaultCluster(clusterName, edsServiceName string, secLevel SecurityLevel)
|
|||
}
|
||||
}
|
||||
|
||||
var lbPolicy v3clusterpb.Cluster_LbPolicy
|
||||
switch opts.Policy {
|
||||
case LoadBalancingPolicyRoundRobin:
|
||||
lbPolicy = v3clusterpb.Cluster_ROUND_ROBIN
|
||||
case LoadBalancingPolicyRingHash:
|
||||
lbPolicy = v3clusterpb.Cluster_RING_HASH
|
||||
}
|
||||
cluster := &v3clusterpb.Cluster{
|
||||
Name: clusterName,
|
||||
Name: opts.ClusterName,
|
||||
ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS},
|
||||
EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{
|
||||
EdsConfig: &v3corepb.ConfigSource{
|
||||
|
@ -342,9 +385,9 @@ func DefaultCluster(clusterName, edsServiceName string, secLevel SecurityLevel)
|
|||
Ads: &v3corepb.AggregatedConfigSource{},
|
||||
},
|
||||
},
|
||||
ServiceName: edsServiceName,
|
||||
ServiceName: opts.ServiceName,
|
||||
},
|
||||
LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN,
|
||||
LbPolicy: lbPolicy,
|
||||
}
|
||||
if tlsContext != nil {
|
||||
cluster.TransportSocket = &v3corepb.TransportSocket{
|
||||
|
|
|
@ -46,7 +46,7 @@ import (
|
|||
|
||||
// defaultClientResourcesWithRLSCSP returns a set of resources (LDS, RDS, CDS, EDS) for a
|
||||
// client to connect to a server with a RLS Load Balancer as a child of Cluster Manager.
|
||||
func defaultClientResourcesWithRLSCSP(params e2e.ResourceParams, rlsProto *rlspb.RouteLookupConfig) e2e.UpdateOptions {
|
||||
func defaultClientResourcesWithRLSCSP(lb e2e.LoadBalancingPolicy, params e2e.ResourceParams, rlsProto *rlspb.RouteLookupConfig) e2e.UpdateOptions {
|
||||
routeConfigName := "route-" + params.DialTarget
|
||||
clusterName := "cluster-" + params.DialTarget
|
||||
endpointsName := "endpoints-" + params.DialTarget
|
||||
|
@ -54,7 +54,12 @@ func defaultClientResourcesWithRLSCSP(params e2e.ResourceParams, rlsProto *rlspb
|
|||
NodeID: params.NodeID,
|
||||
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(params.DialTarget, routeConfigName)},
|
||||
Routes: []*v3routepb.RouteConfiguration{defaultRouteConfigWithRLSCSP(routeConfigName, params.DialTarget, rlsProto)},
|
||||
Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, endpointsName, params.SecLevel)},
|
||||
Clusters: []*v3clusterpb.Cluster{e2e.ClusterResourceWithOptions(&e2e.ClusterOptions{
|
||||
ClusterName: clusterName,
|
||||
ServiceName: endpointsName,
|
||||
Policy: lb,
|
||||
SecurityLevel: params.SecLevel,
|
||||
})},
|
||||
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(endpointsName, params.Host, []uint32{params.Port})},
|
||||
}
|
||||
}
|
||||
|
@ -93,6 +98,27 @@ func defaultRouteConfigWithRLSCSP(routeName, ldsTarget string, rlsProto *rlspb.R
|
|||
// target corresponding to this test service. This test asserts an RPC proceeds
|
||||
// as normal with the RLS Balancer as part of system.
|
||||
func (s) TestRLSinxDS(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
lbPolicy e2e.LoadBalancingPolicy
|
||||
}{
|
||||
{
|
||||
name: "roundrobin",
|
||||
lbPolicy: e2e.LoadBalancingPolicyRoundRobin,
|
||||
},
|
||||
{
|
||||
name: "ringhash",
|
||||
lbPolicy: e2e.LoadBalancingPolicyRingHash,
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
testRLSinxDS(t, test.lbPolicy)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testRLSinxDS(t *testing.T, lbPolicy e2e.LoadBalancingPolicy) {
|
||||
oldRLS := envconfig.XDSRLS
|
||||
envconfig.XDSRLS = true
|
||||
internal.RegisterRLSClusterSpecifierPluginForTesting()
|
||||
|
@ -119,7 +145,7 @@ func (s) TestRLSinxDS(t *testing.T) {
|
|||
}
|
||||
|
||||
const serviceName = "my-service-client-side-xds"
|
||||
resources := defaultClientResourcesWithRLSCSP(e2e.ResourceParams{
|
||||
resources := defaultClientResourcesWithRLSCSP(lbPolicy, e2e.ResourceParams{
|
||||
DialTarget: serviceName,
|
||||
NodeID: nodeID,
|
||||
Host: "localhost",
|
||||
|
|
Loading…
Reference in New Issue