xds: xds_cluster_impl_balancer part 1 (#4154)

Part of C2P fallback. To support fallback to a DNS cluster.

This PR adds implementation of xds_cluster_impl_balancer, which will be responsible for circuit breaking and rpc dropping.

This PR only added RPC dropping. Circuit breaking will be done in a followup PR, after some necessary refactoring.
This commit is contained in:
Menghan Li 2021-02-11 15:03:39 -08:00 committed by GitHub
parent c9217c7195
commit 9f3606cd0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1007 additions and 78 deletions

16
vet.sh
View File

@ -141,8 +141,11 @@ not grep -Fv '.CredsBundle
.NewAddress
.NewServiceConfig
.Type is deprecated: use Attributes
BuildVersion is deprecated
balancer.ErrTransientFailure
balancer.Picker
extDesc.Filename is deprecated
github.com/golang/protobuf/jsonpb is deprecated
grpc.CallCustomCodec
grpc.Code
grpc.Compressor
@ -164,13 +167,7 @@ grpc.WithServiceConfig
grpc.WithTimeout
http.CloseNotifier
info.SecurityVersion
resolver.Backend
resolver.GRPCLB
extDesc.Filename is deprecated
BuildVersion is deprecated
github.com/golang/protobuf/jsonpb is deprecated
proto is deprecated
xxx_messageInfo_
proto.InternalMessageInfo is deprecated
proto.EnumName is deprecated
proto.ErrInternalBadWireType is deprecated
@ -184,7 +181,12 @@ proto.RegisterExtension is deprecated
proto.RegisteredExtension is deprecated
proto.RegisteredExtensions is deprecated
proto.RegisterMapType is deprecated
proto.Unmarshaler is deprecated' "${SC_OUT}"
proto.Unmarshaler is deprecated
resolver.Backend
resolver.GRPCLB
Target is deprecated: Use the Target field in the BuildOptions instead.
xxx_messageInfo_
' "${SC_OUT}"
# - special golint on package comments.
lint_package_comment_per_package() {

View File

@ -0,0 +1,216 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package clusterimpl
import (
"context"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
)
const (
defaultTestTimeout = 1 * time.Second
testClusterName = "test-cluster"
testServiceName = "test-eds-service"
testLRSServerName = "test-lrs-name"
)
var (
testBackendAddrs = []resolver.Address{
{Addr: "1.1.1.1:1"},
}
cmpOpts = cmp.Options{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(load.Data{}, "ReportInterval"),
}
)
func init() {
newRandomWRR = testutils.NewTestWRR
}
// TestDrop verifies that the balancer correctly drops the picks, and that
// the drops are reported.
func TestDrop(t *testing.T) {
xdsC := fakeclient.NewClient()
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil }
defer func() { newXDSClient = oldNewXDSClient }()
builder := balancer.Get(clusterImplName)
cc := testutils.NewTestClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()
const (
dropReason = "test-dropping-category"
dropNumerator = 1
dropDenominator = 2
)
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: testBackendAddrs,
},
BalancerConfig: &lbConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LRSLoadReportingServerName: newString(testLRSServerName),
DropCategories: []dropCategory{{
Category: dropReason,
RequestsPerMillion: million * dropNumerator / dropDenominator,
}},
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
got, err := xdsC.WaitForReportLoad(ctx)
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != testLRSServerName {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
}
sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker.
p0 := <-cc.NewPickerCh
for i := 0; i < 10; i++ {
_, err := p0.Pick(balancer.PickInfo{})
if err != balancer.ErrNoSubConnAvailable {
t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable)
}
}
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
p1 := <-cc.NewPickerCh
const rpcCount = 20
for i := 0; i < rpcCount; i++ {
gotSCSt, err := p1.Pick(balancer.PickInfo{})
// Even RPCs are dropped.
if i%2 == 0 {
if err == nil || !strings.Contains(err.Error(), "dropped") {
t.Fatalf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err)
}
continue
}
if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
}
if gotSCSt.Done != nil {
gotSCSt.Done(balancer.DoneInfo{})
}
}
// Dump load data from the store and compare with expected counts.
loadStore := xdsC.LoadStore()
if loadStore == nil {
t.Fatal("loadStore is nil in xdsClient")
}
const dropCount = rpcCount * dropNumerator / dropDenominator
wantStatsData0 := []*load.Data{{
Cluster: testClusterName,
Service: testServiceName,
TotalDrops: dropCount,
Drops: map[string]uint64{dropReason: dropCount},
}}
gotStatsData0 := loadStore.Stats([]string{testClusterName})
if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" {
t.Fatalf("got unexpected reports, diff (-got, +want): %v", diff)
}
// Send an update with new drop configs.
const (
dropReason2 = "test-dropping-category-2"
dropNumerator2 = 1
dropDenominator2 = 4
)
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: testBackendAddrs,
},
BalancerConfig: &lbConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LRSLoadReportingServerName: newString(testLRSServerName),
DropCategories: []dropCategory{{
Category: dropReason2,
RequestsPerMillion: million * dropNumerator2 / dropDenominator2,
}},
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}
p2 := <-cc.NewPickerCh
for i := 0; i < rpcCount; i++ {
gotSCSt, err := p2.Pick(balancer.PickInfo{})
// Even RPCs are dropped.
if i%4 == 0 {
if err == nil || !strings.Contains(err.Error(), "dropped") {
t.Fatalf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err)
}
continue
}
if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
}
if gotSCSt.Done != nil {
gotSCSt.Done(balancer.DoneInfo{})
}
}
const dropCount2 = rpcCount * dropNumerator2 / dropDenominator2
wantStatsData1 := []*load.Data{{
Cluster: testClusterName,
Service: testServiceName,
TotalDrops: dropCount2,
Drops: map[string]uint64{dropReason2: dropCount2},
}}
gotStatsData1 := loadStore.Stats([]string{testClusterName})
if diff := cmp.Diff(gotStatsData1, wantStatsData1, cmpOpts); diff != "" {
t.Fatalf("got unexpected reports, diff (-got, +want): %v", diff)
}
}

View File

@ -0,0 +1,312 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package clusterimpl implements the xds_cluster_impl balancing policy. It
// handles the cluster features (e.g. circuit_breaking, RPC dropping).
//
// Note that it doesn't handle name resolution, which is done by policy
// xds_cluster_resolver.
package clusterimpl
import (
"encoding/json"
"fmt"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/loadstore"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/load"
)
const (
clusterImplName = "xds_cluster_impl_experimental"
// TODO: define defaultRequestCountMax = 1024
)
func init() {
balancer.Register(clusterImplBB{})
}
var newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() }
type clusterImplBB struct{}
func (clusterImplBB) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &clusterImplBalancer{
ClientConn: cc,
bOpts: bOpts,
closed: grpcsync.NewEvent(),
loadWrapper: loadstore.NewWrapper(),
pickerUpdateCh: buffer.NewUnbounded(),
}
b.logger = prefixLogger(b)
client, err := newXDSClient()
if err != nil {
b.logger.Errorf("failed to create xds-client: %v", err)
return nil
}
b.xdsC = client
go b.run()
b.logger.Infof("Created")
return b
}
func (clusterImplBB) Name() string {
return clusterImplName
}
func (clusterImplBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return parseConfig(c)
}
// xdsClientInterface contains only the xds_client methods needed by LRS
// balancer. It's defined so we can override xdsclient in tests.
type xdsClientInterface interface {
ReportLoad(server string) (*load.Store, func())
Close()
}
type clusterImplBalancer struct {
balancer.ClientConn
bOpts balancer.BuildOptions
closed *grpcsync.Event
logger *grpclog.PrefixLogger
xdsC xdsClientInterface
config *lbConfig
childLB balancer.Balancer
cancelLoadReport func()
clusterName string
edsServiceName string
lrsServerName string
loadWrapper *loadstore.Wrapper
// childState/drops/requestCounter can only be accessed in run(). And run()
// is the only goroutine that sends picker to the parent ClientConn. All
// requests to update picker need to be sent to pickerUpdateCh.
childState balancer.State
drops []*dropper
// TODO: add serviceRequestCount and maxRequestCount for circuit breaking.
pickerUpdateCh *buffer.Unbounded
}
// updateLoadStore checks the config for load store, and decides whether it
// needs to restart the load reporting stream.
func (cib *clusterImplBalancer) updateLoadStore(newConfig *lbConfig) error {
var updateLoadClusterAndService bool
// ClusterName is different, restart. ClusterName is from ClusterName and
// EdsServiceName.
if cib.clusterName != newConfig.Cluster {
updateLoadClusterAndService = true
cib.clusterName = newConfig.Cluster
}
if cib.edsServiceName != newConfig.EDSServiceName {
updateLoadClusterAndService = true
cib.edsServiceName = newConfig.EDSServiceName
}
if updateLoadClusterAndService {
// This updates the clusterName and serviceName that will be reported
// for the loads. The update here is too early, the perfect timing is
// when the picker is updated with the new connection. But from this
// balancer's point of view, it's impossible to tell.
//
// On the other hand, this will almost never happen. Each LRS policy
// shouldn't get updated config. The parent should do a graceful switch
// when the clusterName or serviceName is changed.
cib.loadWrapper.UpdateClusterAndService(cib.clusterName, cib.edsServiceName)
}
// Check if it's necessary to restart load report.
var newLRSServerName string
if newConfig.LRSLoadReportingServerName != nil {
newLRSServerName = *newConfig.LRSLoadReportingServerName
}
if cib.lrsServerName != newLRSServerName {
// LrsLoadReportingServerName is different, load should be report to a
// different server, restart.
cib.lrsServerName = newLRSServerName
if cib.cancelLoadReport != nil {
cib.cancelLoadReport()
cib.cancelLoadReport = nil
}
var loadStore *load.Store
if cib.xdsC != nil {
loadStore, cib.cancelLoadReport = cib.xdsC.ReportLoad(cib.lrsServerName)
}
cib.loadWrapper.UpdateLoadStore(loadStore)
}
return nil
}
func (cib *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
if cib.closed.HasFired() {
cib.logger.Warningf("xds: received ClientConnState {%+v} after clusterImplBalancer was closed", s)
return nil
}
newConfig, ok := s.BalancerConfig.(*lbConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
}
// Need to check for potential errors at the beginning of this function, so
// that on errors, we reject the whole config, instead of applying part of
// it.
bb := balancer.Get(newConfig.ChildPolicy.Name)
if bb == nil {
return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name)
}
// Update load reporting config. This needs to be done before updating the
// child policy because we need the loadStore from the updated client to be
// passed to the ccWrapper, so that the next picker from the child policy
// will pick up the new loadStore.
if err := cib.updateLoadStore(newConfig); err != nil {
return err
}
// Compare new drop config. And update picker if it's changed.
var updatePicker bool
if cib.config == nil || !equalDropCategories(cib.config.DropCategories, newConfig.DropCategories) {
cib.drops = make([]*dropper, 0, len(newConfig.DropCategories))
for _, c := range newConfig.DropCategories {
cib.drops = append(cib.drops, newDropper(c))
}
updatePicker = true
}
// TODO: compare cluster name. And update picker if it's changed, because
// circuit breaking's stream counter will be different.
//
// Set `updatePicker` to manually update the picker.
// TODO: compare upper bound of stream count. And update picker if it's
// changed. This is also for circuit breaking.
//
// Set `updatePicker` to manually update the picker.
if updatePicker {
cib.pickerUpdateCh.Put(&dropConfigs{
drops: cib.drops,
})
}
// If child policy is a different type, recreate the sub-balancer.
if cib.config == nil || cib.config.ChildPolicy.Name != newConfig.ChildPolicy.Name {
if cib.childLB != nil {
cib.childLB.Close()
}
cib.childLB = bb.Build(cib, cib.bOpts)
}
cib.config = newConfig
if cib.childLB == nil {
// This is not an expected situation, and should be super rare in
// practice.
//
// When this happens, we already applied all the other configurations
// (drop/circuit breaking), but there's no child policy. This balancer
// will be stuck, and we report the error to the parent.
return fmt.Errorf("child policy is nil, this means balancer %q's Build() returned nil", newConfig.ChildPolicy.Name)
}
// Addresses and sub-balancer config are sent to sub-balancer.
return cib.childLB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: s.ResolverState,
BalancerConfig: cib.config.ChildPolicy.Config,
})
}
func (cib *clusterImplBalancer) ResolverError(err error) {
if cib.closed.HasFired() {
cib.logger.Warningf("xds: received resolver error {%+v} after clusterImplBalancer was closed", err)
return
}
if cib.childLB != nil {
cib.childLB.ResolverError(err)
}
}
func (cib *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
if cib.closed.HasFired() {
cib.logger.Warningf("xds: received subconn state change {%+v, %+v} after clusterImplBalancer was closed", sc, s)
return
}
if cib.childLB != nil {
cib.childLB.UpdateSubConnState(sc, s)
}
}
func (cib *clusterImplBalancer) Close() {
if cib.childLB != nil {
cib.childLB.Close()
cib.childLB = nil
}
cib.xdsC.Close()
cib.closed.Fire()
cib.logger.Infof("Shutdown")
}
// Override methods to accept updates from the child LB.
func (cib *clusterImplBalancer) UpdateState(state balancer.State) {
// Instead of updating parent ClientConn inline, send state to run().
cib.pickerUpdateCh.Put(state)
}
type dropConfigs struct {
drops []*dropper
}
func (cib *clusterImplBalancer) run() {
for {
select {
case update := <-cib.pickerUpdateCh.Get():
cib.pickerUpdateCh.Load()
switch u := update.(type) {
case balancer.State:
cib.childState = u
cib.ClientConn.UpdateState(balancer.State{
ConnectivityState: cib.childState.ConnectivityState,
Picker: newDropPicker(cib.childState, cib.drops, cib.loadWrapper),
})
case *dropConfigs:
cib.drops = u.drops
// cib.requestCounter = u.requestCounter
if cib.childState.Picker != nil {
cib.ClientConn.UpdateState(balancer.State{
ConnectivityState: cib.childState.ConnectivityState,
Picker: newDropPicker(cib.childState, cib.drops, cib.loadWrapper),
})
}
}
case <-cib.closed.Done():
return
}
}
}

View File

@ -0,0 +1,63 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package clusterimpl
import (
"encoding/json"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/serviceconfig"
)
type dropCategory struct {
Category string
RequestsPerMillion uint32
}
// lbConfig is the balancer config for weighted_target.
type lbConfig struct {
serviceconfig.LoadBalancingConfig
Cluster string
EDSServiceName string
LRSLoadReportingServerName *string
MaxConcurrentRequests *uint32
DropCategories []dropCategory
ChildPolicy *internalserviceconfig.BalancerConfig
}
func parseConfig(c json.RawMessage) (*lbConfig, error) {
var cfg lbConfig
if err := json.Unmarshal(c, &cfg); err != nil {
return nil, err
}
return &cfg, nil
}
func equalDropCategories(a, b []dropCategory) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}

View File

@ -0,0 +1,144 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package clusterimpl
import (
"testing"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
_ "google.golang.org/grpc/balancer/roundrobin"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
_ "google.golang.org/grpc/xds/internal/balancer/weightedtarget"
)
const (
testJSONConfig = `{
"cluster": "test_cluster",
"edsServiceName": "test-eds",
"lrsLoadReportingServerName": "lrs_server",
"maxConcurrentRequests": 123,
"dropCategories": [
{
"category": "drop-1",
"requestsPerMillion": 314
},
{
"category": "drop-2",
"requestsPerMillion": 159
}
],
"childPolicy": [
{
"weighted_target_experimental": {
"targets": {
"wt-child-1": {
"weight": 75,
"childPolicy":[{"round_robin":{}}]
},
"wt-child-2": {
"weight": 25,
"childPolicy":[{"round_robin":{}}]
}
}
}
}
]
}`
wtName = "weighted_target_experimental"
)
var (
wtConfigParser = balancer.Get(wtName).(balancer.ConfigParser)
wtConfigJSON = `{
"targets": {
"wt-child-1": {
"weight": 75,
"childPolicy":[{"round_robin":{}}]
},
"wt-child-2": {
"weight": 25,
"childPolicy":[{"round_robin":{}}]
}
}
}`
wtConfig, _ = wtConfigParser.ParseConfig([]byte(wtConfigJSON))
)
func TestParseConfig(t *testing.T) {
tests := []struct {
name string
js string
want *lbConfig
wantErr bool
}{
{
name: "empty json",
js: "",
want: nil,
wantErr: true,
},
{
name: "bad json",
js: "{",
want: nil,
wantErr: true,
},
{
name: "OK",
js: testJSONConfig,
want: &lbConfig{
Cluster: "test_cluster",
EDSServiceName: "test-eds",
LRSLoadReportingServerName: newString("lrs_server"),
MaxConcurrentRequests: newUint32(123),
DropCategories: []dropCategory{
{Category: "drop-1", RequestsPerMillion: 314},
{Category: "drop-2", RequestsPerMillion: 159},
},
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: wtName,
Config: wtConfig,
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseConfig([]byte(tt.js))
if (err != nil) != tt.wantErr {
t.Fatalf("parseConfig() error = %v, wantErr %v", err, tt.wantErr)
}
if !cmp.Equal(got, tt.want) {
t.Errorf("parseConfig() got unexpected result, diff: %v", cmp.Diff(got, tt.want))
}
})
}
}
func newString(s string) *string {
return &s
}
func newUint32(i uint32) *uint32 {
return &i
}

View File

@ -0,0 +1,34 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package clusterimpl
import (
"fmt"
"google.golang.org/grpc/grpclog"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
)
const prefix = "[xds-cluster-impl-lb %p] "
var logger = grpclog.Component("xds")
func prefixLogger(p *clusterImplBalancer) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p))
}

View File

@ -0,0 +1,104 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package clusterimpl
import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/wrr"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/client/load"
)
var newRandomWRR = wrr.NewRandom
const million = 1000000
type dropper struct {
category string
w wrr.WRR
}
// greatest common divisor (GCD) via Euclidean algorithm
func gcd(a, b uint32) uint32 {
for b != 0 {
t := b
b = a % b
a = t
}
return a
}
func newDropper(c dropCategory) *dropper {
w := newRandomWRR()
gcdv := gcd(c.RequestsPerMillion, million)
// Return true for RequestPerMillion, false for the rest.
w.Add(true, int64(c.RequestsPerMillion/gcdv))
w.Add(false, int64((million-c.RequestsPerMillion)/gcdv))
return &dropper{
category: c.Category,
w: w,
}
}
func (d *dropper) drop() (ret bool) {
return d.w.Next().(bool)
}
// loadReporter wraps the methods from the loadStore that are used here.
type loadReporter interface {
CallDropped(locality string)
}
type dropPicker struct {
drops []*dropper
s balancer.State
loadStore loadReporter
// TODO: add serviceRequestCount and maxRequestCount for circuit breaking.
}
func newDropPicker(s balancer.State, drops []*dropper, loadStore load.PerClusterReporter) *dropPicker {
return &dropPicker{
drops: drops,
s: s,
loadStore: loadStore,
}
}
func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// Don't drop unless the inner picker is READY. Similar to
// https://github.com/grpc/grpc-go/issues/2622.
if d.s.ConnectivityState != connectivity.Ready {
return d.s.Picker.Pick(info)
}
for _, dp := range d.drops {
if dp.drop() {
if d.loadStore != nil {
d.loadStore.CallDropped(dp.category)
}
return balancer.PickResult{}, status.Errorf(codes.Unavailable, "RPC is dropped")
}
}
// TODO: support circuit breaking, check if d.maxRequestCount >=
// d.counter.StartRequestWithMax().
return d.s.Picker.Pick(info)
}

View File

@ -0,0 +1,120 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package loadstore contains the loadStoreWrapper shared by the balancers.
package loadstore
import (
"sync"
"google.golang.org/grpc/xds/internal/client/load"
)
// NewWrapper creates a Wrapper.
func NewWrapper() *Wrapper {
return &Wrapper{}
}
// Wrapper wraps a load store with cluster and edsService.
//
// It's store and cluster/edsService can be updated separately. And it will
// update its internal perCluster store so that new stats will be added to the
// correct perCluster.
//
// Note that this struct is a temporary walkaround before we implement graceful
// switch for EDS. Any update to the clusterName and serviceName is too early,
// the perfect timing is when the picker is updated with the new connection.
// This early update could cause picks for the old SubConn being reported to the
// new services.
//
// When the graceful switch in EDS is done, there should be no need for this
// struct. The policies that record/report load shouldn't need to handle update
// of lrsServerName/cluster/edsService. Its parent should do a graceful switch
// of the whole tree when one of that changes.
type Wrapper struct {
mu sync.RWMutex
cluster string
edsService string
// store and perCluster are initialized as nil. They are only set by the
// balancer when LRS is enabled. Before that, all functions to record loads
// are no-op.
store *load.Store
perCluster load.PerClusterReporter
}
// UpdateClusterAndService updates the cluster name and eds service for this
// wrapper. If any one of them is changed from before, the perCluster store in
// this wrapper will also be updated.
func (lsw *Wrapper) UpdateClusterAndService(cluster, edsService string) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
if cluster == lsw.cluster && edsService == lsw.edsService {
return
}
lsw.cluster = cluster
lsw.edsService = edsService
lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService)
}
// UpdateLoadStore updates the load store for this wrapper. If it is changed
// from before, the perCluster store in this wrapper will also be updated.
func (lsw *Wrapper) UpdateLoadStore(store *load.Store) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
if store == lsw.store {
return
}
lsw.store = store
lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService)
}
// CallStarted records a call started in the store.
func (lsw *Wrapper) CallStarted(locality string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster != nil {
lsw.perCluster.CallStarted(locality)
}
}
// CallFinished records a call finished in the store.
func (lsw *Wrapper) CallFinished(locality string, err error) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster != nil {
lsw.perCluster.CallFinished(locality, err)
}
}
// CallServerLoad records the server load in the store.
func (lsw *Wrapper) CallServerLoad(locality, name string, val float64) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster != nil {
lsw.perCluster.CallServerLoad(locality, name, val)
}
}
// CallDropped records a call dropped in the store.
func (lsw *Wrapper) CallDropped(category string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster != nil {
lsw.perCluster.CallDropped(category)
}
}

View File

@ -22,11 +22,11 @@ package lrs
import (
"encoding/json"
"fmt"
"sync"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/loadstore"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/load"
)
@ -162,72 +162,6 @@ type xdsClientInterface interface {
Close()
}
type loadStoreWrapper struct {
mu sync.RWMutex
cluster string
edsService string
// Both store and perCluster will be nil if load reporting is disabled (EDS
// response doesn't have LRS server name). Note that methods on Store and
// perCluster all handle nil, so there's no need to check nil before calling
// them.
store *load.Store
perCluster load.PerClusterReporter
}
func (lsw *loadStoreWrapper) updateClusterAndService(cluster, edsService string) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
if cluster == lsw.cluster && edsService == lsw.edsService {
return
}
lsw.cluster = cluster
lsw.edsService = edsService
lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService)
}
func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
if store == lsw.store {
return
}
lsw.store = store
lsw.perCluster = nil
lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService)
}
func (lsw *loadStoreWrapper) CallStarted(locality string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster != nil {
lsw.perCluster.CallStarted(locality)
}
}
func (lsw *loadStoreWrapper) CallFinished(locality string, err error) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster != nil {
lsw.perCluster.CallFinished(locality, err)
}
}
func (lsw *loadStoreWrapper) CallServerLoad(locality, name string, val float64) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster != nil {
lsw.perCluster.CallServerLoad(locality, name, val)
}
}
func (lsw *loadStoreWrapper) CallDropped(category string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster != nil {
lsw.perCluster.CallDropped(category)
}
}
type xdsClientWrapper struct {
c xdsClientInterface
cancelLoadReport func()
@ -236,13 +170,13 @@ type xdsClientWrapper struct {
lrsServerName string
// loadWrapper is a wrapper with loadOriginal, with clusterName and
// edsServiceName. It's used children to report loads.
loadWrapper *loadStoreWrapper
loadWrapper *loadstore.Wrapper
}
func newXDSClientWrapper(c xdsClientInterface) *xdsClientWrapper {
return &xdsClientWrapper{
c: c,
loadWrapper: &loadStoreWrapper{},
loadWrapper: loadstore.NewWrapper(),
}
}
@ -274,7 +208,7 @@ func (w *xdsClientWrapper) update(newConfig *lbConfig) error {
// On the other hand, this will almost never happen. Each LRS policy
// shouldn't get updated config. The parent should do a graceful switch when
// the clusterName or serviceName is changed.
w.loadWrapper.updateClusterAndService(w.clusterName, w.edsServiceName)
w.loadWrapper.UpdateClusterAndService(w.clusterName, w.edsServiceName)
}
if w.lrsServerName != newConfig.LrsLoadReportingServerName {
@ -293,7 +227,7 @@ func (w *xdsClientWrapper) update(newConfig *lbConfig) error {
if w.c != nil {
loadStore, w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName)
}
w.loadWrapper.updateLoadStore(loadStore)
w.loadWrapper.UpdateLoadStore(loadStore)
}
return nil