mirror of https://github.com/grpc/grpc-go.git
xds: add Outlier Detection Balancer (#5435)
* xds: add Outlier Detection Balancer
This commit is contained in:
parent
182e9df160
commit
f7d2036712
|
|
@ -52,6 +52,13 @@ func Intn(n int) int {
|
|||
return r.Intn(n)
|
||||
}
|
||||
|
||||
// Int31n implements rand.Int31n on the grpcrand global source.
|
||||
func Int31n(n int32) int32 {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
return r.Int31n(n)
|
||||
}
|
||||
|
||||
// Float64 implements rand.Float64 on the grpcrand global source.
|
||||
func Float64() float64 {
|
||||
mu.Lock()
|
||||
|
|
|
|||
|
|
@ -0,0 +1,219 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2022 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package xds_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
|
||||
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
|
||||
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
|
||||
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/envconfig"
|
||||
"google.golang.org/grpc/internal/stubserver"
|
||||
"google.golang.org/grpc/internal/testutils/xds/e2e"
|
||||
testgrpc "google.golang.org/grpc/test/grpc_testing"
|
||||
testpb "google.golang.org/grpc/test/grpc_testing"
|
||||
"google.golang.org/protobuf/types/known/durationpb"
|
||||
"google.golang.org/protobuf/types/known/wrapperspb"
|
||||
)
|
||||
|
||||
// TestOutlierDetection_NoopConfig tests the scenario where the Outlier
|
||||
// Detection feature is enabled on the gRPC client, but it receives no Outlier
|
||||
// Detection configuration from the management server. This should result in a
|
||||
// no-op Outlier Detection configuration being used to configure the Outlier
|
||||
// Detection balancer. This test verifies that an RPC is able to proceed
|
||||
// normally with this configuration.
|
||||
func (s) TestOutlierDetection_NoopConfig(t *testing.T) {
|
||||
oldOD := envconfig.XDSOutlierDetection
|
||||
envconfig.XDSOutlierDetection = true
|
||||
internal.RegisterOutlierDetectionBalancerForTesting()
|
||||
defer func() {
|
||||
envconfig.XDSOutlierDetection = oldOD
|
||||
internal.UnregisterOutlierDetectionBalancerForTesting()
|
||||
}()
|
||||
|
||||
managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, nil)
|
||||
defer cleanup1()
|
||||
|
||||
port, cleanup2 := startTestService(t, nil)
|
||||
defer cleanup2()
|
||||
|
||||
const serviceName = "my-service-client-side-xds"
|
||||
resources := e2e.DefaultClientResources(e2e.ResourceParams{
|
||||
DialTarget: serviceName,
|
||||
NodeID: nodeID,
|
||||
Host: "localhost",
|
||||
Port: port,
|
||||
SecLevel: e2e.SecurityLevelNone,
|
||||
})
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
if err := managementServer.Update(ctx, resources); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Create a ClientConn and make a successful RPC.
|
||||
cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to dial local test server: %v", err)
|
||||
}
|
||||
defer cc.Close()
|
||||
|
||||
client := testgrpc.NewTestServiceClient(cc)
|
||||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
||||
t.Fatalf("rpc EmptyCall() failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// clientResourcesMultipleBackendsAndOD returns xDS resources which correspond
|
||||
// to multiple upstreams, corresponding different backends listening on
|
||||
// different localhost:port combinations. The resources also configure an
|
||||
// Outlier Detection Balancer set up with Failure Percentage Algorithm, which
|
||||
// ejects endpoints based on failure rate.
|
||||
func clientResourcesMultipleBackendsAndOD(params e2e.ResourceParams, ports []uint32) e2e.UpdateOptions {
|
||||
routeConfigName := "route-" + params.DialTarget
|
||||
clusterName := "cluster-" + params.DialTarget
|
||||
endpointsName := "endpoints-" + params.DialTarget
|
||||
return e2e.UpdateOptions{
|
||||
NodeID: params.NodeID,
|
||||
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(params.DialTarget, routeConfigName)},
|
||||
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeConfigName, params.DialTarget, clusterName)},
|
||||
Clusters: []*v3clusterpb.Cluster{clusterWithOutlierDetection(clusterName, endpointsName, params.SecLevel)},
|
||||
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(endpointsName, params.Host, ports)},
|
||||
}
|
||||
}
|
||||
|
||||
func clusterWithOutlierDetection(clusterName, edsServiceName string, secLevel e2e.SecurityLevel) *v3clusterpb.Cluster {
|
||||
cluster := e2e.DefaultCluster(clusterName, edsServiceName, secLevel)
|
||||
cluster.OutlierDetection = &v3clusterpb.OutlierDetection{
|
||||
Interval: &durationpb.Duration{Nanos: 50000000}, // .5 seconds
|
||||
BaseEjectionTime: &durationpb.Duration{Seconds: 30},
|
||||
MaxEjectionTime: &durationpb.Duration{Seconds: 300},
|
||||
MaxEjectionPercent: &wrapperspb.UInt32Value{Value: 1},
|
||||
FailurePercentageThreshold: &wrapperspb.UInt32Value{Value: 50},
|
||||
EnforcingFailurePercentage: &wrapperspb.UInt32Value{Value: 100},
|
||||
FailurePercentageRequestVolume: &wrapperspb.UInt32Value{Value: 1},
|
||||
FailurePercentageMinimumHosts: &wrapperspb.UInt32Value{Value: 1},
|
||||
}
|
||||
return cluster
|
||||
}
|
||||
|
||||
// TestOutlierDetectionWithOutlier tests the Outlier Detection Balancer e2e. It
|
||||
// spins up three backends, one which consistently errors, and configures the
|
||||
// ClientConn using xDS to connect to all three of those backends. The Outlier
|
||||
// Detection Balancer should eject the connection to the backend which
|
||||
// constantly errors, and thus RPC's should mainly go to backend 1 and 2.
|
||||
func (s) TestOutlierDetectionWithOutlier(t *testing.T) {
|
||||
oldOD := envconfig.XDSOutlierDetection
|
||||
envconfig.XDSOutlierDetection = true
|
||||
internal.RegisterOutlierDetectionBalancerForTesting()
|
||||
defer func() {
|
||||
envconfig.XDSOutlierDetection = oldOD
|
||||
internal.UnregisterOutlierDetectionBalancerForTesting()
|
||||
}()
|
||||
|
||||
managementServer, nodeID, _, resolver, cleanup := e2e.SetupManagementServer(t, nil)
|
||||
defer cleanup()
|
||||
|
||||
// Counters for how many times backends got called.
|
||||
var count1, count2, count3 int
|
||||
|
||||
// Working backend 1.
|
||||
port1, cleanup1 := startTestService(t, &stubserver.StubServer{
|
||||
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
|
||||
count1++
|
||||
return &testpb.Empty{}, nil
|
||||
},
|
||||
Address: "localhost:0",
|
||||
})
|
||||
defer cleanup1()
|
||||
|
||||
// Working backend 2.
|
||||
port2, cleanup2 := startTestService(t, &stubserver.StubServer{
|
||||
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
|
||||
count2++
|
||||
return &testpb.Empty{}, nil
|
||||
},
|
||||
Address: "localhost:0",
|
||||
})
|
||||
defer cleanup2()
|
||||
|
||||
// Backend 3 that will always return an error and eventually ejected.
|
||||
port3, cleanup3 := startTestService(t, &stubserver.StubServer{
|
||||
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
|
||||
count3++
|
||||
return nil, errors.New("some error")
|
||||
},
|
||||
Address: "localhost:0",
|
||||
})
|
||||
defer cleanup3()
|
||||
|
||||
const serviceName = "my-service-client-side-xds"
|
||||
resources := clientResourcesMultipleBackendsAndOD(e2e.ResourceParams{
|
||||
DialTarget: serviceName,
|
||||
NodeID: nodeID,
|
||||
Host: "localhost",
|
||||
SecLevel: e2e.SecurityLevelNone,
|
||||
}, []uint32{port1, port2, port3})
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
if err := managementServer.Update(ctx, resources); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to dial local test server: %v", err)
|
||||
}
|
||||
defer cc.Close()
|
||||
|
||||
client := testgrpc.NewTestServiceClient(cc)
|
||||
for i := 0; i < 2000; i++ {
|
||||
// Can either error or not depending on the backend called.
|
||||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil && !strings.Contains(err.Error(), "some error") {
|
||||
t.Fatalf("rpc EmptyCall() failed: %v", err)
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
|
||||
// Backend 1 should've gotten more than 1/3rd of the load as backend 3
|
||||
// should get ejected, leaving only 1 and 2.
|
||||
if count1 < 700 {
|
||||
t.Fatalf("backend 1 should've gotten more than 1/3rd of the load")
|
||||
}
|
||||
// Backend 2 should've gotten more than 1/3rd of the load as backend 3
|
||||
// should get ejected, leaving only 1 and 2.
|
||||
if count2 < 700 {
|
||||
t.Fatalf("backend 2 should've gotten more than 1/3rd of the load")
|
||||
}
|
||||
// Backend 3 should've gotten less than 1/3rd of the load since it gets
|
||||
// ejected.
|
||||
if count3 > 650 {
|
||||
t.Fatalf("backend 1 should've gotten more than 1/3rd of the load")
|
||||
}
|
||||
}
|
||||
|
|
@ -25,13 +25,31 @@ import (
|
|||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/balancer/gracefulswitch"
|
||||
"google.golang.org/grpc/internal/buffer"
|
||||
"google.golang.org/grpc/internal/envconfig"
|
||||
"google.golang.org/grpc/internal/grpclog"
|
||||
"google.golang.org/grpc/internal/grpcrand"
|
||||
"google.golang.org/grpc/internal/grpcsync"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
)
|
||||
|
||||
// Globals to stub out in tests.
|
||||
var (
|
||||
afterFunc = time.AfterFunc
|
||||
now = time.Now
|
||||
)
|
||||
|
||||
// Name is the name of the outlier detection balancer.
|
||||
const Name = "outlier_detection_experimental"
|
||||
|
||||
|
|
@ -51,7 +69,20 @@ func init() {
|
|||
type bb struct{}
|
||||
|
||||
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
|
||||
return nil
|
||||
b := &outlierDetectionBalancer{
|
||||
cc: cc,
|
||||
closed: grpcsync.NewEvent(),
|
||||
done: grpcsync.NewEvent(),
|
||||
addrs: make(map[string]*addressInfo),
|
||||
scWrappers: make(map[balancer.SubConn]*subConnWrapper),
|
||||
scUpdateCh: buffer.NewUnbounded(),
|
||||
pickerUpdateCh: buffer.NewUnbounded(),
|
||||
}
|
||||
b.logger = prefixLogger(b)
|
||||
b.logger.Infof("Created")
|
||||
b.child = gracefulswitch.NewBalancer(b, bOpts)
|
||||
go b.run()
|
||||
return b
|
||||
}
|
||||
|
||||
func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
|
||||
|
|
@ -82,6 +113,7 @@ func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, err
|
|||
return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.base_ejection_time = %s; must be >= 0", lbCfg.BaseEjectionTime)
|
||||
case lbCfg.MaxEjectionTime < 0:
|
||||
return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.max_ejection_time = %s; must be >= 0", lbCfg.MaxEjectionTime)
|
||||
|
||||
// "The fields max_ejection_percent,
|
||||
// success_rate_ejection.enforcement_percentage,
|
||||
// failure_percentage_ejection.threshold, and
|
||||
|
|
@ -105,3 +137,766 @@ func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, err
|
|||
func (bb) Name() string {
|
||||
return Name
|
||||
}
|
||||
|
||||
// scUpdate wraps a subConn update to be sent to the child balancer.
|
||||
type scUpdate struct {
|
||||
scw *subConnWrapper
|
||||
state balancer.SubConnState
|
||||
}
|
||||
|
||||
type ejectionUpdate struct {
|
||||
scw *subConnWrapper
|
||||
isEjected bool // true for ejected, false for unejected
|
||||
}
|
||||
|
||||
type lbCfgUpdate struct {
|
||||
lbCfg *LBConfig
|
||||
// to make sure picker is updated synchronously.
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
type outlierDetectionBalancer struct {
|
||||
// These fields are safe to be accessed without holding any mutex because
|
||||
// they are synchronized in run(), which makes these field accesses happen
|
||||
// serially.
|
||||
//
|
||||
// childState is the latest balancer state received from the child.
|
||||
childState balancer.State
|
||||
// recentPickerNoop represents whether the most recent picker sent upward to
|
||||
// the balancer.ClientConn is a noop picker, which doesn't count RPC's. Used
|
||||
// to suppress redundant picker updates.
|
||||
recentPickerNoop bool
|
||||
|
||||
closed *grpcsync.Event
|
||||
done *grpcsync.Event
|
||||
cc balancer.ClientConn
|
||||
logger *grpclog.PrefixLogger
|
||||
|
||||
// childMu guards calls into child (to uphold the balancer.Balancer API
|
||||
// guarantee of synchronous calls).
|
||||
childMu sync.Mutex
|
||||
child *gracefulswitch.Balancer
|
||||
|
||||
// mu guards access to the following fields. It also helps to synchronize
|
||||
// behaviors of the following events: config updates, firing of the interval
|
||||
// timer, SubConn State updates, SubConn address updates, and child state
|
||||
// updates.
|
||||
//
|
||||
// For example, when we receive a config update in the middle of the
|
||||
// interval timer algorithm, which uses knobs present in the config, the
|
||||
// balancer will wait for the interval timer algorithm to finish before
|
||||
// persisting the new configuration.
|
||||
//
|
||||
// Another example would be the updating of the addrs map, such as from a
|
||||
// SubConn address update in the middle of the interval timer algorithm
|
||||
// which uses addrs. This balancer waits for the interval timer algorithm to
|
||||
// finish before making the update to the addrs map.
|
||||
//
|
||||
// This mutex is never held at the same time as childMu (within the context
|
||||
// of a single goroutine).
|
||||
mu sync.Mutex
|
||||
addrs map[string]*addressInfo
|
||||
cfg *LBConfig
|
||||
scWrappers map[balancer.SubConn]*subConnWrapper
|
||||
timerStartTime time.Time
|
||||
intervalTimer *time.Timer
|
||||
inhibitPickerUpdates bool
|
||||
updateUnconditionally bool
|
||||
numAddrsEjected int // For fast calculations of percentage of addrs ejected
|
||||
|
||||
scUpdateCh *buffer.Unbounded
|
||||
pickerUpdateCh *buffer.Unbounded
|
||||
}
|
||||
|
||||
// noopConfig returns whether this balancer is configured with a logical no-op
|
||||
// configuration or not.
|
||||
//
|
||||
// Caller must hold b.mu.
|
||||
func (b *outlierDetectionBalancer) noopConfig() bool {
|
||||
return b.cfg.SuccessRateEjection == nil && b.cfg.FailurePercentageEjection == nil
|
||||
}
|
||||
|
||||
// onIntervalConfig handles logic required specifically on the receipt of a
|
||||
// configuration which specifies to count RPC's and periodically perform passive
|
||||
// health checking based on heuristics defined in configuration every configured
|
||||
// interval.
|
||||
//
|
||||
// Caller must hold b.mu.
|
||||
func (b *outlierDetectionBalancer) onIntervalConfig() {
|
||||
var interval time.Duration
|
||||
if b.timerStartTime.IsZero() {
|
||||
b.timerStartTime = time.Now()
|
||||
for _, addrInfo := range b.addrs {
|
||||
addrInfo.callCounter.clear()
|
||||
}
|
||||
interval = b.cfg.Interval
|
||||
} else {
|
||||
interval = b.cfg.Interval - now().Sub(b.timerStartTime)
|
||||
if interval < 0 {
|
||||
interval = 0
|
||||
}
|
||||
}
|
||||
b.intervalTimer = afterFunc(interval, b.intervalTimerAlgorithm)
|
||||
}
|
||||
|
||||
// onNoopConfig handles logic required specifically on the receipt of a
|
||||
// configuration which specifies the balancer to be a noop.
|
||||
//
|
||||
// Caller must hold b.mu.
|
||||
func (b *outlierDetectionBalancer) onNoopConfig() {
|
||||
// "If a config is provided with both the `success_rate_ejection` and
|
||||
// `failure_percentage_ejection` fields unset, skip starting the timer and
|
||||
// do the following:"
|
||||
// "Unset the timer start timestamp."
|
||||
b.timerStartTime = time.Time{}
|
||||
for _, addrInfo := range b.addrs {
|
||||
// "Uneject all currently ejected addresses."
|
||||
if !addrInfo.latestEjectionTimestamp.IsZero() {
|
||||
b.unejectAddress(addrInfo)
|
||||
}
|
||||
// "Reset each address's ejection time multiplier to 0."
|
||||
addrInfo.ejectionTimeMultiplier = 0
|
||||
}
|
||||
}
|
||||
|
||||
func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
|
||||
lbCfg, ok := s.BalancerConfig.(*LBConfig)
|
||||
if !ok {
|
||||
b.logger.Errorf("received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig)
|
||||
return balancer.ErrBadResolverState
|
||||
}
|
||||
|
||||
// Reject whole config if child policy doesn't exist, don't persist it for
|
||||
// later.
|
||||
bb := balancer.Get(lbCfg.ChildPolicy.Name)
|
||||
if bb == nil {
|
||||
return fmt.Errorf("outlier detection: child balancer %q not registered", lbCfg.ChildPolicy.Name)
|
||||
}
|
||||
|
||||
// It is safe to read b.cfg here without holding the mutex, as the only
|
||||
// write to b.cfg happens later in this function. This function is part of
|
||||
// the balancer.Balancer API, so it is guaranteed to be called in a
|
||||
// synchronous manner, so it cannot race with this read.
|
||||
if b.cfg == nil || b.cfg.ChildPolicy.Name != lbCfg.ChildPolicy.Name {
|
||||
b.childMu.Lock()
|
||||
err := b.child.SwitchTo(bb)
|
||||
if err != nil {
|
||||
b.childMu.Unlock()
|
||||
return fmt.Errorf("outlier detection: error switching to child of type %q: %v", lbCfg.ChildPolicy.Name, err)
|
||||
}
|
||||
b.childMu.Unlock()
|
||||
}
|
||||
|
||||
b.mu.Lock()
|
||||
// Inhibit child picker updates until this UpdateClientConnState() call
|
||||
// completes. If needed, a picker update containing the no-op config bit
|
||||
// determined from this config and most recent state from the child will be
|
||||
// sent synchronously upward at the end of this UpdateClientConnState()
|
||||
// call.
|
||||
b.inhibitPickerUpdates = true
|
||||
b.updateUnconditionally = false
|
||||
b.cfg = lbCfg
|
||||
|
||||
addrs := make(map[string]bool, len(s.ResolverState.Addresses))
|
||||
for _, addr := range s.ResolverState.Addresses {
|
||||
addrs[addr.Addr] = true
|
||||
if _, ok := b.addrs[addr.Addr]; !ok {
|
||||
b.addrs[addr.Addr] = newAddressInfo()
|
||||
}
|
||||
}
|
||||
for addr := range b.addrs {
|
||||
if !addrs[addr] {
|
||||
delete(b.addrs, addr)
|
||||
}
|
||||
}
|
||||
|
||||
if b.intervalTimer != nil {
|
||||
b.intervalTimer.Stop()
|
||||
}
|
||||
|
||||
if b.noopConfig() {
|
||||
b.onNoopConfig()
|
||||
} else {
|
||||
b.onIntervalConfig()
|
||||
}
|
||||
b.mu.Unlock()
|
||||
|
||||
b.childMu.Lock()
|
||||
err := b.child.UpdateClientConnState(balancer.ClientConnState{
|
||||
ResolverState: s.ResolverState,
|
||||
BalancerConfig: b.cfg.ChildPolicy.Config,
|
||||
})
|
||||
b.childMu.Unlock()
|
||||
|
||||
done := make(chan struct{})
|
||||
b.pickerUpdateCh.Put(lbCfgUpdate{
|
||||
lbCfg: lbCfg,
|
||||
done: done,
|
||||
})
|
||||
<-done
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (b *outlierDetectionBalancer) ResolverError(err error) {
|
||||
b.childMu.Lock()
|
||||
defer b.childMu.Unlock()
|
||||
b.child.ResolverError(err)
|
||||
}
|
||||
|
||||
func (b *outlierDetectionBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
scw, ok := b.scWrappers[sc]
|
||||
if !ok {
|
||||
// Shouldn't happen if passed down a SubConnWrapper to child on SubConn
|
||||
// creation.
|
||||
b.logger.Errorf("UpdateSubConnState called with SubConn that has no corresponding SubConnWrapper")
|
||||
return
|
||||
}
|
||||
if state.ConnectivityState == connectivity.Shutdown {
|
||||
delete(b.scWrappers, scw.SubConn)
|
||||
}
|
||||
b.scUpdateCh.Put(&scUpdate{
|
||||
scw: scw,
|
||||
state: state,
|
||||
})
|
||||
}
|
||||
|
||||
func (b *outlierDetectionBalancer) Close() {
|
||||
b.closed.Fire()
|
||||
<-b.done.Done()
|
||||
b.childMu.Lock()
|
||||
b.child.Close()
|
||||
b.childMu.Unlock()
|
||||
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if b.intervalTimer != nil {
|
||||
b.intervalTimer.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
func (b *outlierDetectionBalancer) ExitIdle() {
|
||||
b.childMu.Lock()
|
||||
defer b.childMu.Unlock()
|
||||
b.child.ExitIdle()
|
||||
}
|
||||
|
||||
// wrappedPicker delegates to the child policy's picker, and when the request
|
||||
// finishes, it increments the corresponding counter in the map entry referenced
|
||||
// by the subConnWrapper that was picked. If both the `success_rate_ejection`
|
||||
// and `failure_percentage_ejection` fields are unset in the configuration, this
|
||||
// picker will not count.
|
||||
type wrappedPicker struct {
|
||||
childPicker balancer.Picker
|
||||
noopPicker bool
|
||||
}
|
||||
|
||||
func (wp *wrappedPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
|
||||
pr, err := wp.childPicker.Pick(info)
|
||||
if err != nil {
|
||||
return balancer.PickResult{}, err
|
||||
}
|
||||
|
||||
done := func(di balancer.DoneInfo) {
|
||||
if !wp.noopPicker {
|
||||
incrementCounter(pr.SubConn, di)
|
||||
}
|
||||
if pr.Done != nil {
|
||||
pr.Done(di)
|
||||
}
|
||||
}
|
||||
scw, ok := pr.SubConn.(*subConnWrapper)
|
||||
if !ok {
|
||||
// This can never happen, but check is present for defensive
|
||||
// programming.
|
||||
logger.Errorf("Picked SubConn from child picker is not a SubConnWrapper")
|
||||
return balancer.PickResult{
|
||||
SubConn: pr.SubConn,
|
||||
Done: done,
|
||||
}, nil
|
||||
}
|
||||
return balancer.PickResult{
|
||||
SubConn: scw.SubConn,
|
||||
Done: done,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func incrementCounter(sc balancer.SubConn, info balancer.DoneInfo) {
|
||||
scw, ok := sc.(*subConnWrapper)
|
||||
if !ok {
|
||||
// Shouldn't happen, as comes from child
|
||||
return
|
||||
}
|
||||
|
||||
// scw.addressInfo and callCounter.activeBucket can be written to
|
||||
// concurrently (the pointers themselves). Thus, protect the reads here with
|
||||
// atomics to prevent data corruption. There exists a race in which you read
|
||||
// the addressInfo or active bucket pointer and then that pointer points to
|
||||
// deprecated memory. If this goroutine yields the processor, in between
|
||||
// reading the addressInfo pointer and writing to the active bucket,
|
||||
// UpdateAddresses can switch the addressInfo the scw points to. Writing to
|
||||
// an outdated addresses is a very small race and tolerable. After reading
|
||||
// callCounter.activeBucket in this picker a swap call can concurrently
|
||||
// change what activeBucket points to. A50 says to swap the pointer, which
|
||||
// will cause this race to write to deprecated memory the interval timer
|
||||
// algorithm will never read, which makes this race alright.
|
||||
addrInfo := (*addressInfo)(atomic.LoadPointer(&scw.addressInfo))
|
||||
if addrInfo == nil {
|
||||
return
|
||||
}
|
||||
ab := (*bucket)(atomic.LoadPointer(&addrInfo.callCounter.activeBucket))
|
||||
|
||||
if info.Err == nil {
|
||||
atomic.AddUint32(&ab.numSuccesses, 1)
|
||||
} else {
|
||||
atomic.AddUint32(&ab.numFailures, 1)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *outlierDetectionBalancer) UpdateState(s balancer.State) {
|
||||
b.pickerUpdateCh.Put(s)
|
||||
}
|
||||
|
||||
func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
||||
sc, err := b.cc.NewSubConn(addrs, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
scw := &subConnWrapper{
|
||||
SubConn: sc,
|
||||
addresses: addrs,
|
||||
scUpdateCh: b.scUpdateCh,
|
||||
}
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
b.scWrappers[sc] = scw
|
||||
if len(addrs) != 1 {
|
||||
return scw, nil
|
||||
}
|
||||
addrInfo, ok := b.addrs[addrs[0].Addr]
|
||||
if !ok {
|
||||
return scw, nil
|
||||
}
|
||||
addrInfo.sws = append(addrInfo.sws, scw)
|
||||
atomic.StorePointer(&scw.addressInfo, unsafe.Pointer(addrInfo))
|
||||
if !addrInfo.latestEjectionTimestamp.IsZero() {
|
||||
scw.eject()
|
||||
}
|
||||
return scw, nil
|
||||
}
|
||||
|
||||
func (b *outlierDetectionBalancer) RemoveSubConn(sc balancer.SubConn) {
|
||||
scw, ok := sc.(*subConnWrapper)
|
||||
if !ok { // Shouldn't happen
|
||||
return
|
||||
}
|
||||
// Remove the wrapped SubConn from the parent Client Conn. We don't remove
|
||||
// from map entry until we get a Shutdown state for the SubConn, as we need
|
||||
// that data to forward that state down.
|
||||
b.cc.RemoveSubConn(scw.SubConn)
|
||||
}
|
||||
|
||||
// appendIfPresent appends the scw to the address, if the address is present in
|
||||
// the Outlier Detection balancers address map. Returns nil if not present, and
|
||||
// the map entry if present.
|
||||
//
|
||||
// Caller must hold b.mu.
|
||||
func (b *outlierDetectionBalancer) appendIfPresent(addr string, scw *subConnWrapper) *addressInfo {
|
||||
addrInfo, ok := b.addrs[addr]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
addrInfo.sws = append(addrInfo.sws, scw)
|
||||
atomic.StorePointer(&scw.addressInfo, unsafe.Pointer(addrInfo))
|
||||
return addrInfo
|
||||
}
|
||||
|
||||
// removeSubConnFromAddressesMapEntry removes the scw from its map entry if
|
||||
// present.
|
||||
//
|
||||
// Caller must hold b.mu.
|
||||
func (b *outlierDetectionBalancer) removeSubConnFromAddressesMapEntry(scw *subConnWrapper) {
|
||||
addrInfo := (*addressInfo)(atomic.LoadPointer(&scw.addressInfo))
|
||||
if addrInfo == nil {
|
||||
return
|
||||
}
|
||||
for i, sw := range addrInfo.sws {
|
||||
if scw == sw {
|
||||
addrInfo.sws = append(addrInfo.sws[:i], addrInfo.sws[i+1:]...)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *outlierDetectionBalancer) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
|
||||
scw, ok := sc.(*subConnWrapper)
|
||||
if !ok {
|
||||
// Return, shouldn't happen if passed up scw
|
||||
return
|
||||
}
|
||||
|
||||
b.cc.UpdateAddresses(scw.SubConn, addrs)
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
// Note that 0 addresses is a valid update/state for a SubConn to be in.
|
||||
// This is correctly handled by this algorithm (handled as part of a non singular
|
||||
// old address/new address).
|
||||
switch {
|
||||
case len(scw.addresses) == 1 && len(addrs) == 1: // single address to single address
|
||||
// If the updated address is the same, then there is nothing to do
|
||||
// past this point.
|
||||
if scw.addresses[0].Addr == addrs[0].Addr {
|
||||
return
|
||||
}
|
||||
b.removeSubConnFromAddressesMapEntry(scw)
|
||||
addrInfo := b.appendIfPresent(addrs[0].Addr, scw)
|
||||
if addrInfo == nil { // uneject unconditionally because could have come from an ejected address
|
||||
scw.uneject()
|
||||
break
|
||||
}
|
||||
if addrInfo.latestEjectionTimestamp.IsZero() { // relay new updated subconn state
|
||||
scw.uneject()
|
||||
} else {
|
||||
scw.eject()
|
||||
}
|
||||
case len(scw.addresses) == 1: // single address to multiple/no addresses
|
||||
b.removeSubConnFromAddressesMapEntry(scw)
|
||||
addrInfo := (*addressInfo)(atomic.LoadPointer(&scw.addressInfo))
|
||||
if addrInfo != nil {
|
||||
addrInfo.callCounter.clear()
|
||||
}
|
||||
scw.uneject()
|
||||
case len(addrs) == 1: // multiple/no addresses to single address
|
||||
addrInfo := b.appendIfPresent(addrs[0].Addr, scw)
|
||||
if addrInfo != nil && !addrInfo.latestEjectionTimestamp.IsZero() {
|
||||
scw.eject()
|
||||
}
|
||||
} // otherwise multiple/no addresses to multiple/no addresses; ignore
|
||||
|
||||
scw.addresses = addrs
|
||||
}
|
||||
|
||||
func (b *outlierDetectionBalancer) ResolveNow(opts resolver.ResolveNowOptions) {
|
||||
b.cc.ResolveNow(opts)
|
||||
}
|
||||
|
||||
func (b *outlierDetectionBalancer) Target() string {
|
||||
return b.cc.Target()
|
||||
}
|
||||
|
||||
func max(x, y int64) int64 {
|
||||
if x < y {
|
||||
return y
|
||||
}
|
||||
return x
|
||||
}
|
||||
|
||||
func min(x, y int64) int64 {
|
||||
if x < y {
|
||||
return x
|
||||
}
|
||||
return y
|
||||
}
|
||||
|
||||
// handleSubConnUpdate stores the recent state and forward the update
|
||||
// if the SubConn is not ejected.
|
||||
func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) {
|
||||
scw := u.scw
|
||||
scw.latestState = u.state
|
||||
if !scw.ejected {
|
||||
b.childMu.Lock()
|
||||
b.child.UpdateSubConnState(scw, u.state)
|
||||
b.childMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// handleEjectedUpdate handles any SubConns that get ejected/unejected, and
|
||||
// forwards the appropriate corresponding subConnState to the child policy.
|
||||
func (b *outlierDetectionBalancer) handleEjectedUpdate(u *ejectionUpdate) {
|
||||
scw := u.scw
|
||||
scw.ejected = u.isEjected
|
||||
// If scw.latestState has never been written to will default to connectivity
|
||||
// IDLE, which is fine.
|
||||
stateToUpdate := scw.latestState
|
||||
if u.isEjected {
|
||||
stateToUpdate = balancer.SubConnState{
|
||||
ConnectivityState: connectivity.TransientFailure,
|
||||
}
|
||||
}
|
||||
b.childMu.Lock()
|
||||
b.child.UpdateSubConnState(scw, stateToUpdate)
|
||||
b.childMu.Unlock()
|
||||
}
|
||||
|
||||
// handleChildStateUpdate forwards the picker update wrapped in a wrapped picker
|
||||
// with the noop picker bit present.
|
||||
func (b *outlierDetectionBalancer) handleChildStateUpdate(u balancer.State) {
|
||||
b.childState = u
|
||||
b.mu.Lock()
|
||||
if b.inhibitPickerUpdates {
|
||||
// If a child's state is updated during the suppression of child
|
||||
// updates, the synchronous handleLBConfigUpdate function with respect
|
||||
// to UpdateClientConnState should return a picker unconditionally.
|
||||
b.updateUnconditionally = true
|
||||
b.mu.Unlock()
|
||||
return
|
||||
}
|
||||
noopCfg := b.noopConfig()
|
||||
b.mu.Unlock()
|
||||
b.recentPickerNoop = noopCfg
|
||||
b.cc.UpdateState(balancer.State{
|
||||
ConnectivityState: b.childState.ConnectivityState,
|
||||
Picker: &wrappedPicker{
|
||||
childPicker: b.childState.Picker,
|
||||
noopPicker: noopCfg,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// handleLBConfigUpdate compares whether the new config is a noop config or not,
|
||||
// to the noop bit in the picker if present. It updates the picker if this bit
|
||||
// changed compared to the picker currently in use.
|
||||
func (b *outlierDetectionBalancer) handleLBConfigUpdate(u lbCfgUpdate) {
|
||||
lbCfg := u.lbCfg
|
||||
noopCfg := lbCfg.SuccessRateEjection == nil && lbCfg.FailurePercentageEjection == nil
|
||||
// If the child has sent it's first update and this config flips the noop
|
||||
// bit compared to the most recent picker update sent upward, then a new
|
||||
// picker with this updated bit needs to be forwarded upward. If a child
|
||||
// update was received during the suppression of child updates within
|
||||
// UpdateClientConnState(), then a new picker needs to be forwarded with
|
||||
// this updated state, irregardless of whether this new configuration flips
|
||||
// the bit.
|
||||
if b.childState.Picker != nil && noopCfg != b.recentPickerNoop || b.updateUnconditionally {
|
||||
b.recentPickerNoop = noopCfg
|
||||
b.cc.UpdateState(balancer.State{
|
||||
ConnectivityState: b.childState.ConnectivityState,
|
||||
Picker: &wrappedPicker{
|
||||
childPicker: b.childState.Picker,
|
||||
noopPicker: noopCfg,
|
||||
},
|
||||
})
|
||||
}
|
||||
b.inhibitPickerUpdates = false
|
||||
b.updateUnconditionally = false
|
||||
close(u.done)
|
||||
}
|
||||
|
||||
func (b *outlierDetectionBalancer) run() {
|
||||
defer b.done.Fire()
|
||||
for {
|
||||
select {
|
||||
case update := <-b.scUpdateCh.Get():
|
||||
b.scUpdateCh.Load()
|
||||
if b.closed.HasFired() { // don't send SubConn updates to child after the balancer has been closed
|
||||
return
|
||||
}
|
||||
switch u := update.(type) {
|
||||
case *scUpdate:
|
||||
b.handleSubConnUpdate(u)
|
||||
case *ejectionUpdate:
|
||||
b.handleEjectedUpdate(u)
|
||||
}
|
||||
case update := <-b.pickerUpdateCh.Get():
|
||||
b.pickerUpdateCh.Load()
|
||||
if b.closed.HasFired() { // don't send picker updates to grpc after the balancer has been closed
|
||||
return
|
||||
}
|
||||
switch u := update.(type) {
|
||||
case balancer.State:
|
||||
b.handleChildStateUpdate(u)
|
||||
case lbCfgUpdate:
|
||||
b.handleLBConfigUpdate(u)
|
||||
}
|
||||
case <-b.closed.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// intervalTimerAlgorithm ejects and unejects addresses based on the Outlier
|
||||
// Detection configuration and data about each address from the previous
|
||||
// interval.
|
||||
func (b *outlierDetectionBalancer) intervalTimerAlgorithm() {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
b.timerStartTime = time.Now()
|
||||
|
||||
for _, addrInfo := range b.addrs {
|
||||
addrInfo.callCounter.swap()
|
||||
}
|
||||
|
||||
if b.cfg.SuccessRateEjection != nil {
|
||||
b.successRateAlgorithm()
|
||||
}
|
||||
|
||||
if b.cfg.FailurePercentageEjection != nil {
|
||||
b.failurePercentageAlgorithm()
|
||||
}
|
||||
|
||||
for _, addrInfo := range b.addrs {
|
||||
if addrInfo.latestEjectionTimestamp.IsZero() && addrInfo.ejectionTimeMultiplier > 0 {
|
||||
addrInfo.ejectionTimeMultiplier--
|
||||
continue
|
||||
}
|
||||
if addrInfo.latestEjectionTimestamp.IsZero() {
|
||||
// Address is already not ejected, so no need to check for whether
|
||||
// to uneject the address below.
|
||||
continue
|
||||
}
|
||||
et := b.cfg.BaseEjectionTime.Nanoseconds() * addrInfo.ejectionTimeMultiplier
|
||||
met := max(b.cfg.BaseEjectionTime.Nanoseconds(), b.cfg.MaxEjectionTime.Nanoseconds())
|
||||
curTimeAfterEt := now().After(addrInfo.latestEjectionTimestamp.Add(time.Duration(min(et, met))))
|
||||
if curTimeAfterEt {
|
||||
b.unejectAddress(addrInfo)
|
||||
}
|
||||
}
|
||||
|
||||
// This conditional only for testing (since the interval timer algorithm is
|
||||
// called manually), will never hit in production.
|
||||
if b.intervalTimer != nil {
|
||||
b.intervalTimer.Stop()
|
||||
}
|
||||
b.intervalTimer = afterFunc(b.cfg.Interval, b.intervalTimerAlgorithm)
|
||||
}
|
||||
|
||||
// addrsWithAtLeastRequestVolume returns a slice of address information of all
|
||||
// addresses with at least request volume passed in.
|
||||
//
|
||||
// Caller must hold b.mu.
|
||||
func (b *outlierDetectionBalancer) addrsWithAtLeastRequestVolume(requestVolume uint32) []*addressInfo {
|
||||
var addrs []*addressInfo
|
||||
for _, addrInfo := range b.addrs {
|
||||
bucket := addrInfo.callCounter.inactiveBucket
|
||||
rv := bucket.numSuccesses + bucket.numFailures
|
||||
if rv >= requestVolume {
|
||||
addrs = append(addrs, addrInfo)
|
||||
}
|
||||
}
|
||||
return addrs
|
||||
}
|
||||
|
||||
// meanAndStdDev returns the mean and std dev of the fractions of successful
|
||||
// requests of the addresses passed in.
|
||||
//
|
||||
// Caller must hold b.mu.
|
||||
func (b *outlierDetectionBalancer) meanAndStdDev(addrs []*addressInfo) (float64, float64) {
|
||||
var totalFractionOfSuccessfulRequests float64
|
||||
var mean float64
|
||||
for _, addrInfo := range addrs {
|
||||
bucket := addrInfo.callCounter.inactiveBucket
|
||||
rv := bucket.numSuccesses + bucket.numFailures
|
||||
totalFractionOfSuccessfulRequests += float64(bucket.numSuccesses) / float64(rv)
|
||||
}
|
||||
mean = totalFractionOfSuccessfulRequests / float64(len(addrs))
|
||||
var sumOfSquares float64
|
||||
for _, addrInfo := range addrs {
|
||||
bucket := addrInfo.callCounter.inactiveBucket
|
||||
rv := bucket.numSuccesses + bucket.numFailures
|
||||
devFromMean := (float64(bucket.numSuccesses) / float64(rv)) - mean
|
||||
sumOfSquares += devFromMean * devFromMean
|
||||
}
|
||||
variance := sumOfSquares / float64(len(addrs))
|
||||
return mean, math.Sqrt(variance)
|
||||
}
|
||||
|
||||
// successRateAlgorithm ejects any addresses where the success rate falls below
|
||||
// the other addresses according to mean and standard deviation, and if overall
|
||||
// applicable from other set heuristics.
|
||||
//
|
||||
// Caller must hold b.mu.
|
||||
func (b *outlierDetectionBalancer) successRateAlgorithm() {
|
||||
addrsToConsider := b.addrsWithAtLeastRequestVolume(b.cfg.SuccessRateEjection.RequestVolume)
|
||||
if len(addrsToConsider) < int(b.cfg.SuccessRateEjection.MinimumHosts) {
|
||||
return
|
||||
}
|
||||
mean, stddev := b.meanAndStdDev(addrsToConsider)
|
||||
for _, addrInfo := range addrsToConsider {
|
||||
bucket := addrInfo.callCounter.inactiveBucket
|
||||
ejectionCfg := b.cfg.SuccessRateEjection
|
||||
if float64(b.numAddrsEjected)/float64(len(b.addrs))*100 >= float64(b.cfg.MaxEjectionPercent) {
|
||||
return
|
||||
}
|
||||
successRate := float64(bucket.numSuccesses) / float64(bucket.numSuccesses+bucket.numFailures)
|
||||
if successRate < (mean - stddev*(float64(ejectionCfg.StdevFactor)/1000)) {
|
||||
if uint32(grpcrand.Int31n(100)) < ejectionCfg.EnforcementPercentage {
|
||||
b.ejectAddress(addrInfo)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// failurePercentageAlgorithm ejects any addresses where the failure percentage
|
||||
// rate exceeds a set enforcement percentage, if overall applicable from other
|
||||
// set heuristics.
|
||||
//
|
||||
// Caller must hold b.mu.
|
||||
func (b *outlierDetectionBalancer) failurePercentageAlgorithm() {
|
||||
addrsToConsider := b.addrsWithAtLeastRequestVolume(b.cfg.FailurePercentageEjection.RequestVolume)
|
||||
if len(addrsToConsider) < int(b.cfg.FailurePercentageEjection.MinimumHosts) {
|
||||
return
|
||||
}
|
||||
|
||||
for _, addrInfo := range addrsToConsider {
|
||||
bucket := addrInfo.callCounter.inactiveBucket
|
||||
ejectionCfg := b.cfg.FailurePercentageEjection
|
||||
if float64(b.numAddrsEjected)/float64(len(b.addrs))*100 >= float64(b.cfg.MaxEjectionPercent) {
|
||||
return
|
||||
}
|
||||
failurePercentage := (float64(bucket.numFailures) / float64(bucket.numSuccesses+bucket.numFailures)) * 100
|
||||
if failurePercentage > float64(b.cfg.FailurePercentageEjection.Threshold) {
|
||||
if uint32(grpcrand.Int31n(100)) < ejectionCfg.EnforcementPercentage {
|
||||
b.ejectAddress(addrInfo)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Caller must hold b.mu.
|
||||
func (b *outlierDetectionBalancer) ejectAddress(addrInfo *addressInfo) {
|
||||
b.numAddrsEjected++
|
||||
addrInfo.latestEjectionTimestamp = b.timerStartTime
|
||||
addrInfo.ejectionTimeMultiplier++
|
||||
for _, sbw := range addrInfo.sws {
|
||||
sbw.eject()
|
||||
}
|
||||
}
|
||||
|
||||
// Caller must hold b.mu.
|
||||
func (b *outlierDetectionBalancer) unejectAddress(addrInfo *addressInfo) {
|
||||
b.numAddrsEjected--
|
||||
addrInfo.latestEjectionTimestamp = time.Time{}
|
||||
for _, sbw := range addrInfo.sws {
|
||||
sbw.uneject()
|
||||
}
|
||||
}
|
||||
|
||||
// addressInfo contains the runtime information about an address that pertains
|
||||
// to Outlier Detection. This struct and all of its fields is protected by
|
||||
// outlierDetectionBalancer.mu in the case where it is accessed through the
|
||||
// address map. In the case of Picker callbacks, the writes to the activeBucket
|
||||
// of callCounter are protected by atomically loading and storing
|
||||
// unsafe.Pointers (see further explanation in incrementCounter()).
|
||||
type addressInfo struct {
|
||||
// The call result counter object.
|
||||
callCounter *callCounter
|
||||
|
||||
// The latest ejection timestamp, or zero if the address is currently not
|
||||
// ejected.
|
||||
latestEjectionTimestamp time.Time
|
||||
|
||||
// The current ejection time multiplier, starting at 0.
|
||||
ejectionTimeMultiplier int64
|
||||
|
||||
// A list of subchannel wrapper objects that correspond to this address.
|
||||
sws []*subConnWrapper
|
||||
}
|
||||
|
||||
func newAddressInfo() *addressInfo {
|
||||
return &addressInfo{
|
||||
callCounter: newCallCounter(),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load Diff
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2022 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package outlierdetection
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
type bucket struct {
|
||||
numSuccesses uint32
|
||||
numFailures uint32
|
||||
}
|
||||
|
||||
func newCallCounter() *callCounter {
|
||||
return &callCounter{
|
||||
activeBucket: unsafe.Pointer(&bucket{}),
|
||||
inactiveBucket: &bucket{},
|
||||
}
|
||||
}
|
||||
|
||||
// callCounter has two buckets, which each count successful and failing RPC's.
|
||||
// The activeBucket is used to actively count any finished RPC's, and the
|
||||
// inactiveBucket is populated with this activeBucket's data every interval for
|
||||
// use by the Outlier Detection algorithm.
|
||||
type callCounter struct {
|
||||
// activeBucket updates every time a call finishes (from picker passed to
|
||||
// Client Conn), so protect pointer read with atomic load of unsafe.Pointer
|
||||
// so picker does not have to grab a mutex per RPC, the critical path.
|
||||
activeBucket unsafe.Pointer // bucket
|
||||
inactiveBucket *bucket
|
||||
}
|
||||
|
||||
func (cc *callCounter) clear() {
|
||||
atomic.StorePointer(&cc.activeBucket, unsafe.Pointer(&bucket{}))
|
||||
cc.inactiveBucket = &bucket{}
|
||||
}
|
||||
|
||||
// "When the timer triggers, the inactive bucket is zeroed and swapped with the
|
||||
// active bucket. Then the inactive bucket contains the number of successes and
|
||||
// failures since the last time the timer triggered. Those numbers are used to
|
||||
// evaluate the ejection criteria." - A50.
|
||||
func (cc *callCounter) swap() {
|
||||
ib := cc.inactiveBucket
|
||||
*ib = bucket{}
|
||||
ab := (*bucket)(atomic.SwapPointer(&cc.activeBucket, unsafe.Pointer(ib)))
|
||||
cc.inactiveBucket = &bucket{
|
||||
numSuccesses: atomic.LoadUint32(&ab.numSuccesses),
|
||||
numFailures: atomic.LoadUint32(&ab.numFailures),
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,94 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2022 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package outlierdetection
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"unsafe"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
)
|
||||
|
||||
func (b1 *bucket) Equal(b2 *bucket) bool {
|
||||
if b1 == nil && b2 == nil {
|
||||
return true
|
||||
}
|
||||
if (b1 != nil) != (b2 != nil) {
|
||||
return false
|
||||
}
|
||||
if b1.numSuccesses != b2.numSuccesses {
|
||||
return false
|
||||
}
|
||||
return b1.numFailures == b2.numFailures
|
||||
}
|
||||
|
||||
func (cc1 *callCounter) Equal(cc2 *callCounter) bool {
|
||||
if cc1 == nil && cc2 == nil {
|
||||
return true
|
||||
}
|
||||
if (cc1 != nil) != (cc2 != nil) {
|
||||
return false
|
||||
}
|
||||
ab1 := (*bucket)(atomic.LoadPointer(&cc1.activeBucket))
|
||||
ab2 := (*bucket)(atomic.LoadPointer(&cc2.activeBucket))
|
||||
if !ab1.Equal(ab2) {
|
||||
return false
|
||||
}
|
||||
return cc1.inactiveBucket.Equal(cc2.inactiveBucket)
|
||||
}
|
||||
|
||||
// TestClear tests that clear on the call counter clears (everything set to 0)
|
||||
// the active and inactive buckets.
|
||||
func (s) TestClear(t *testing.T) {
|
||||
cc := newCallCounter()
|
||||
ab := (*bucket)(atomic.LoadPointer(&cc.activeBucket))
|
||||
ab.numSuccesses = 1
|
||||
ab.numFailures = 2
|
||||
cc.inactiveBucket.numSuccesses = 4
|
||||
cc.inactiveBucket.numFailures = 5
|
||||
cc.clear()
|
||||
// Both the active and inactive buckets should be cleared.
|
||||
ccWant := newCallCounter()
|
||||
if diff := cmp.Diff(cc, ccWant); diff != "" {
|
||||
t.Fatalf("callCounter is different than expected, diff (-got +want): %v", diff)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSwap tests that swap() on the callCounter successfully has the desired
|
||||
// end result of inactive bucket containing the previous active buckets data,
|
||||
// and the active bucket being cleared.
|
||||
func (s) TestSwap(t *testing.T) {
|
||||
cc := newCallCounter()
|
||||
ab := (*bucket)(atomic.LoadPointer(&cc.activeBucket))
|
||||
ab.numSuccesses = 1
|
||||
ab.numFailures = 2
|
||||
cc.inactiveBucket.numSuccesses = 4
|
||||
cc.inactiveBucket.numFailures = 5
|
||||
ib := cc.inactiveBucket
|
||||
cc.swap()
|
||||
// Inactive should pick up active's data, active should be swapped to zeroed
|
||||
// inactive.
|
||||
ccWant := newCallCounter()
|
||||
ccWant.inactiveBucket.numSuccesses = 1
|
||||
ccWant.inactiveBucket.numFailures = 2
|
||||
atomic.StorePointer(&ccWant.activeBucket, unsafe.Pointer(ib))
|
||||
if diff := cmp.Diff(cc, ccWant); diff != "" {
|
||||
t.Fatalf("callCounter is different than expected, diff (-got +want): %v", diff)
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,369 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2022 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
// Package e2e_test contains e2e test cases for the Outlier Detection LB Policy.
|
||||
package e2e_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/grpctest"
|
||||
"google.golang.org/grpc/internal/stubserver"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/resolver/manual"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
testpb "google.golang.org/grpc/test/grpc_testing"
|
||||
_ "google.golang.org/grpc/xds/internal/balancer/outlierdetection" // To register helper functions which register/unregister Outlier Detection LB Policy.
|
||||
)
|
||||
|
||||
var defaultTestTimeout = 5 * time.Second
|
||||
|
||||
type s struct {
|
||||
grpctest.Tester
|
||||
}
|
||||
|
||||
func Test(t *testing.T) {
|
||||
grpctest.RunSubTests(t, s{})
|
||||
}
|
||||
|
||||
// Setup spins up three test backends, each listening on a port on localhost.
|
||||
// Two of the backends are configured to always reply with an empty response and
|
||||
// no error and one is configured to always return an error.
|
||||
func setupBackends(t *testing.T) ([]string, func()) {
|
||||
t.Helper()
|
||||
|
||||
backends := make([]*stubserver.StubServer, 3)
|
||||
addresses := make([]string, 3)
|
||||
// Construct and start 2 working backends.
|
||||
for i := 0; i < 2; i++ {
|
||||
backend := &stubserver.StubServer{
|
||||
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
|
||||
return &testpb.Empty{}, nil
|
||||
},
|
||||
}
|
||||
if err := backend.StartServer(); err != nil {
|
||||
t.Fatalf("Failed to start backend: %v", err)
|
||||
}
|
||||
t.Logf("Started good TestService backend at: %q", backend.Address)
|
||||
backends[i] = backend
|
||||
addresses[i] = backend.Address
|
||||
}
|
||||
|
||||
// Construct and start a failing backend.
|
||||
backend := &stubserver.StubServer{
|
||||
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
|
||||
return nil, errors.New("some error")
|
||||
},
|
||||
}
|
||||
if err := backend.StartServer(); err != nil {
|
||||
t.Fatalf("Failed to start backend: %v", err)
|
||||
}
|
||||
t.Logf("Started bad TestService backend at: %q", backend.Address)
|
||||
backends[2] = backend
|
||||
addresses[2] = backend.Address
|
||||
cancel := func() {
|
||||
for _, backend := range backends {
|
||||
backend.Stop()
|
||||
}
|
||||
}
|
||||
return addresses, cancel
|
||||
}
|
||||
|
||||
// checkRoundRobinRPCs verifies that EmptyCall RPCs on the given ClientConn,
|
||||
// connected to a server exposing the test.grpc_testing.TestService, are
|
||||
// roundrobined across the given backend addresses.
|
||||
//
|
||||
// Returns a non-nil error if context deadline expires before RPCs start to get
|
||||
// roundrobined across the given backends.
|
||||
func checkRoundRobinRPCs(ctx context.Context, client testpb.TestServiceClient, addrs []resolver.Address) error {
|
||||
wantAddrCount := make(map[string]int)
|
||||
for _, addr := range addrs {
|
||||
wantAddrCount[addr.Addr]++
|
||||
}
|
||||
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
|
||||
// Perform 3 iterations.
|
||||
var iterations [][]string
|
||||
for i := 0; i < 3; i++ {
|
||||
iteration := make([]string, len(addrs))
|
||||
for c := 0; c < len(addrs); c++ {
|
||||
var peer peer.Peer
|
||||
client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer))
|
||||
if peer.Addr != nil {
|
||||
iteration[c] = peer.Addr.String()
|
||||
}
|
||||
}
|
||||
iterations = append(iterations, iteration)
|
||||
}
|
||||
// Ensure the the first iteration contains all addresses in addrs.
|
||||
gotAddrCount := make(map[string]int)
|
||||
for _, addr := range iterations[0] {
|
||||
gotAddrCount[addr]++
|
||||
}
|
||||
if diff := cmp.Diff(gotAddrCount, wantAddrCount); diff != "" {
|
||||
continue
|
||||
}
|
||||
// Ensure all three iterations contain the same addresses.
|
||||
if !cmp.Equal(iterations[0], iterations[1]) || !cmp.Equal(iterations[0], iterations[2]) {
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("timeout when waiting for roundrobin distribution of RPCs across addresses: %v", addrs)
|
||||
}
|
||||
|
||||
// TestOutlierDetectionAlgorithmsE2E tests the Outlier Detection Success Rate
|
||||
// and Failure Percentage algorithms in an e2e fashion. The Outlier Detection
|
||||
// Balancer is configured as the top level LB Policy of the channel with a Round
|
||||
// Robin child, and connects to three upstreams. Two of the upstreams are healthy and
|
||||
// one is unhealthy. The two algorithms should at some point eject the failing
|
||||
// upstream, causing RPC's to not be routed to those two upstreams, and only be
|
||||
// Round Robined across the two healthy upstreams. Other than the intervals the
|
||||
// two unhealthy upstreams are ejected, RPC's should regularly round robin
|
||||
// across all three upstreams.
|
||||
func (s) TestOutlierDetectionAlgorithmsE2E(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
odscJSON string
|
||||
}{
|
||||
{
|
||||
name: "Success Rate Algorithm",
|
||||
odscJSON: `
|
||||
{
|
||||
"loadBalancingConfig": [
|
||||
{
|
||||
"outlier_detection_experimental": {
|
||||
"interval": 50000000,
|
||||
"baseEjectionTime": 100000000,
|
||||
"maxEjectionTime": 300000000000,
|
||||
"maxEjectionPercent": 33,
|
||||
"successRateEjection": {
|
||||
"stdevFactor": 50,
|
||||
"enforcementPercentage": 100,
|
||||
"minimumHosts": 3,
|
||||
"requestVolume": 5
|
||||
},
|
||||
"childPolicy": [{"round_robin": {}}]
|
||||
}
|
||||
}
|
||||
]
|
||||
}`,
|
||||
},
|
||||
{
|
||||
name: "Failure Percentage Algorithm",
|
||||
odscJSON: `
|
||||
{
|
||||
"loadBalancingConfig": [
|
||||
{
|
||||
"outlier_detection_experimental": {
|
||||
"interval": 50000000,
|
||||
"baseEjectionTime": 100000000,
|
||||
"maxEjectionTime": 300000000000,
|
||||
"maxEjectionPercent": 33,
|
||||
"failurePercentageEjection": {
|
||||
"threshold": 50,
|
||||
"enforcementPercentage": 100,
|
||||
"minimumHosts": 3,
|
||||
"requestVolume": 5
|
||||
},
|
||||
"childPolicy": [{"round_robin": {}}
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}`,
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
internal.RegisterOutlierDetectionBalancerForTesting()
|
||||
defer internal.UnregisterOutlierDetectionBalancerForTesting()
|
||||
addresses, cancel := setupBackends(t)
|
||||
defer cancel()
|
||||
|
||||
mr := manual.NewBuilderWithScheme("od-e2e")
|
||||
defer mr.Close()
|
||||
|
||||
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(test.odscJSON)
|
||||
// The full list of addresses.
|
||||
fullAddresses := []resolver.Address{
|
||||
{Addr: addresses[0]},
|
||||
{Addr: addresses[1]},
|
||||
{Addr: addresses[2]},
|
||||
}
|
||||
mr.InitialState(resolver.State{
|
||||
Addresses: fullAddresses,
|
||||
ServiceConfig: sc,
|
||||
})
|
||||
|
||||
cc, err := grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if err != nil {
|
||||
t.Fatalf("grpc.Dial() failed: %v", err)
|
||||
}
|
||||
defer cc.Close()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
testServiceClient := testpb.NewTestServiceClient(cc)
|
||||
|
||||
// At first, due to no statistics on each of the backends, the 3
|
||||
// upstreams should all be round robined across.
|
||||
if err = checkRoundRobinRPCs(ctx, testServiceClient, fullAddresses); err != nil {
|
||||
t.Fatalf("error in expected round robin: %v", err)
|
||||
}
|
||||
|
||||
// The addresses which don't return errors.
|
||||
okAddresses := []resolver.Address{
|
||||
{Addr: addresses[0]},
|
||||
{Addr: addresses[1]},
|
||||
}
|
||||
// After calling the three upstreams, one of them constantly error
|
||||
// and should eventually be ejected for a period of time. This
|
||||
// period of time should cause the RPC's to be round robined only
|
||||
// across the two that are healthy.
|
||||
if err = checkRoundRobinRPCs(ctx, testServiceClient, okAddresses); err != nil {
|
||||
t.Fatalf("error in expected round robin: %v", err)
|
||||
}
|
||||
|
||||
// The failing upstream isn't ejected indefinitely, and eventually
|
||||
// should be unejected in subsequent iterations of the interval
|
||||
// algorithm as per the spec for the two specific algorithms.
|
||||
if err = checkRoundRobinRPCs(ctx, testServiceClient, fullAddresses); err != nil {
|
||||
t.Fatalf("error in expected round robin: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestNoopConfiguration tests the Outlier Detection Balancer configured with a
|
||||
// noop configuration. The noop configuration should cause the Outlier Detection
|
||||
// Balancer to not count RPC's, and thus never eject any upstreams and continue
|
||||
// to route to every upstream connected to, even if they continuously error.
|
||||
// Once the Outlier Detection Balancer gets reconfigured with configuration
|
||||
// requiring counting RPC's, the Outlier Detection Balancer should start
|
||||
// ejecting any upstreams as specified in the configuration.
|
||||
func (s) TestNoopConfiguration(t *testing.T) {
|
||||
internal.RegisterOutlierDetectionBalancerForTesting()
|
||||
defer internal.UnregisterOutlierDetectionBalancerForTesting()
|
||||
addresses, cancel := setupBackends(t)
|
||||
defer cancel()
|
||||
|
||||
mr := manual.NewBuilderWithScheme("od-e2e")
|
||||
defer mr.Close()
|
||||
|
||||
noopODServiceConfigJSON := `
|
||||
{
|
||||
"loadBalancingConfig": [
|
||||
{
|
||||
"outlier_detection_experimental": {
|
||||
"interval": 50000000,
|
||||
"baseEjectionTime": 100000000,
|
||||
"maxEjectionTime": 300000000000,
|
||||
"maxEjectionPercent": 33,
|
||||
"childPolicy": [{"round_robin": {}}]
|
||||
}
|
||||
}
|
||||
]
|
||||
}`
|
||||
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(noopODServiceConfigJSON)
|
||||
// The full list of addresses.
|
||||
fullAddresses := []resolver.Address{
|
||||
{Addr: addresses[0]},
|
||||
{Addr: addresses[1]},
|
||||
{Addr: addresses[2]},
|
||||
}
|
||||
mr.InitialState(resolver.State{
|
||||
Addresses: fullAddresses,
|
||||
ServiceConfig: sc,
|
||||
})
|
||||
cc, err := grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if err != nil {
|
||||
t.Fatalf("grpc.Dial() failed: %v", err)
|
||||
}
|
||||
defer cc.Close()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
testServiceClient := testpb.NewTestServiceClient(cc)
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
// Since the Outlier Detection Balancer starts with a noop
|
||||
// configuration, it shouldn't count RPCs or eject any upstreams. Thus,
|
||||
// even though an upstream it connects to constantly errors, it should
|
||||
// continue to Round Robin across every upstream.
|
||||
if err := checkRoundRobinRPCs(ctx, testServiceClient, fullAddresses); err != nil {
|
||||
t.Fatalf("error in expected round robin: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Reconfigure the Outlier Detection Balancer with a configuration that
|
||||
// specifies to count RPC's and eject upstreams. Due to the balancer no
|
||||
// longer being a noop, it should eject any unhealthy addresses as specified
|
||||
// by the failure percentage portion of the configuration.
|
||||
countingODServiceConfigJSON := `
|
||||
{
|
||||
"loadBalancingConfig": [
|
||||
{
|
||||
"outlier_detection_experimental": {
|
||||
"interval": 50000000,
|
||||
"baseEjectionTime": 100000000,
|
||||
"maxEjectionTime": 300000000000,
|
||||
"maxEjectionPercent": 33,
|
||||
"failurePercentageEjection": {
|
||||
"threshold": 50,
|
||||
"enforcementPercentage": 100,
|
||||
"minimumHosts": 3,
|
||||
"requestVolume": 5
|
||||
},
|
||||
"childPolicy": [{"round_robin": {}}]
|
||||
}
|
||||
}
|
||||
]
|
||||
}`
|
||||
sc = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(countingODServiceConfigJSON)
|
||||
|
||||
mr.UpdateState(resolver.State{
|
||||
Addresses: fullAddresses,
|
||||
ServiceConfig: sc,
|
||||
})
|
||||
|
||||
// At first on the reconfigured balancer, the balancer has no stats
|
||||
// collected about upstreams. Thus, it should at first route across the full
|
||||
// upstream list.
|
||||
if err = checkRoundRobinRPCs(ctx, testServiceClient, fullAddresses); err != nil {
|
||||
t.Fatalf("error in expected round robin: %v", err)
|
||||
}
|
||||
|
||||
// The addresses which don't return errors.
|
||||
okAddresses := []resolver.Address{
|
||||
{Addr: addresses[0]},
|
||||
{Addr: addresses[1]},
|
||||
}
|
||||
// Now that the reconfigured balancer has data about the failing upstream,
|
||||
// it should eject the upstream and only route across the two healthy
|
||||
// upstreams.
|
||||
if err = checkRoundRobinRPCs(ctx, testServiceClient, okAddresses); err != nil {
|
||||
t.Fatalf("error in expected round robin: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2022 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package outlierdetection
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"google.golang.org/grpc/grpclog"
|
||||
internalgrpclog "google.golang.org/grpc/internal/grpclog"
|
||||
)
|
||||
|
||||
const prefix = "[outlier-detection-lb %p] "
|
||||
|
||||
var logger = grpclog.Component("xds")
|
||||
|
||||
func prefixLogger(p *outlierDetectionBalancer) *internalgrpclog.PrefixLogger {
|
||||
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p))
|
||||
}
|
||||
|
|
@ -0,0 +1,68 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2022 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package outlierdetection
|
||||
|
||||
import (
|
||||
"unsafe"
|
||||
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/internal/buffer"
|
||||
"google.golang.org/grpc/resolver"
|
||||
)
|
||||
|
||||
// subConnWrapper wraps every created SubConn in the Outlier Detection Balancer,
|
||||
// to help track the latest state update from the underlying SubConn, and also
|
||||
// whether or not this SubConn is ejected.
|
||||
type subConnWrapper struct {
|
||||
balancer.SubConn
|
||||
|
||||
// addressInfo is a pointer to the subConnWrapper's corresponding address
|
||||
// map entry, if the map entry exists.
|
||||
addressInfo unsafe.Pointer // *addressInfo
|
||||
// These two pieces of state will reach eventual consistency due to sync in
|
||||
// run(), and child will always have the correctly updated SubConnState.
|
||||
// latestState is the latest state update from the underlying SubConn. This
|
||||
// is used whenever a SubConn gets unejected.
|
||||
latestState balancer.SubConnState
|
||||
ejected bool
|
||||
|
||||
scUpdateCh *buffer.Unbounded
|
||||
|
||||
// addresses is the list of address(es) this SubConn was created with to
|
||||
// help support any change in address(es)
|
||||
addresses []resolver.Address
|
||||
}
|
||||
|
||||
// eject causes the wrapper to report a state update with the TRANSIENT_FAILURE
|
||||
// state, and to stop passing along updates from the underlying subchannel.
|
||||
func (scw *subConnWrapper) eject() {
|
||||
scw.scUpdateCh.Put(&ejectionUpdate{
|
||||
scw: scw,
|
||||
isEjected: true,
|
||||
})
|
||||
}
|
||||
|
||||
// uneject causes the wrapper to report a state update with the latest update
|
||||
// from the underlying subchannel, and resume passing along updates from the
|
||||
// underlying subchannel.
|
||||
func (scw *subConnWrapper) uneject() {
|
||||
scw.scUpdateCh.Put(&ejectionUpdate{
|
||||
scw: scw,
|
||||
isEjected: false,
|
||||
})
|
||||
}
|
||||
Loading…
Reference in New Issue