mirror of https://github.com/grpc/grpc-go.git
xds: report drops by circuit breaking (#4171)
This commit is contained in:
parent
e526a29227
commit
0bc741730b
|
|
@ -515,6 +515,11 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
|
|||
}
|
||||
if d.counter != nil {
|
||||
if err := d.counter.StartRequest(); err != nil {
|
||||
// Drops by circuit breaking are reported with empty category. They
|
||||
// will be reported only in total drops, but not in per category.
|
||||
if d.loadStore != nil {
|
||||
d.loadStore.CallDropped("")
|
||||
}
|
||||
return balancer.PickResult{}, status.Errorf(codes.Unavailable, err.Error())
|
||||
}
|
||||
pr, err := d.p.Pick(info)
|
||||
|
|
|
|||
|
|
@ -142,7 +142,7 @@ func (s) TestEDS_OneLocality(t *testing.T) {
|
|||
}
|
||||
|
||||
// The same locality, different drop rate, dropping 50%.
|
||||
clab5 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], []uint32{50})
|
||||
clab5 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], map[string]uint32{"test-drop": 50})
|
||||
clab5.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
|
||||
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab5.Build()))
|
||||
|
||||
|
|
@ -746,6 +746,10 @@ func (s) TestDropPicker(t *testing.T) {
|
|||
}
|
||||
|
||||
func (s) TestEDS_LoadReport(t *testing.T) {
|
||||
origCircuitBreakingSupport := env.CircuitBreakingSupport
|
||||
env.CircuitBreakingSupport = true
|
||||
defer func() { env.CircuitBreakingSupport = origCircuitBreakingSupport }()
|
||||
|
||||
// We create an xdsClientWrapper with a dummy xdsClientInterface which only
|
||||
// implements the LoadStore() method to return the underlying load.Store to
|
||||
// be used.
|
||||
|
|
@ -758,10 +762,20 @@ func (s) TestEDS_LoadReport(t *testing.T) {
|
|||
edsb := newEDSBalancerImpl(cc, nil, lsWrapper, nil)
|
||||
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
|
||||
|
||||
const (
|
||||
testServiceName = "test-service"
|
||||
cbMaxRequests = 20
|
||||
)
|
||||
var maxRequestsTemp uint32 = cbMaxRequests
|
||||
client.SetMaxRequests(testServiceName, &maxRequestsTemp)
|
||||
defer client.ClearCounterForTesting(testServiceName)
|
||||
edsb.updateServiceRequestsCounter(testServiceName)
|
||||
|
||||
backendToBalancerID := make(map[balancer.SubConn]internal.LocalityID)
|
||||
|
||||
const testDropCategory = "test-drop"
|
||||
// Two localities, each with one backend.
|
||||
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], map[string]uint32{testDropCategory: 50})
|
||||
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
|
||||
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
|
||||
sc1 := <-cc.NewSubConnCh
|
||||
|
|
@ -788,20 +802,42 @@ func (s) TestEDS_LoadReport(t *testing.T) {
|
|||
// the picks on sc1 should show up as inProgress.
|
||||
locality1JSON, _ := locality1.ToString()
|
||||
locality2JSON, _ := locality2.ToString()
|
||||
const (
|
||||
rpcCount = 100
|
||||
// 50% will be dropped with category testDropCategory.
|
||||
dropWithCategory = rpcCount / 2
|
||||
// In the remaining RPCs, only cbMaxRequests are allowed by circuit
|
||||
// breaking. Others will be dropped by CB.
|
||||
dropWithCB = rpcCount - dropWithCategory - cbMaxRequests
|
||||
|
||||
rpcInProgress = cbMaxRequests / 2 // 50% of RPCs will be never done.
|
||||
rpcSucceeded = cbMaxRequests / 2 // 50% of RPCs will succeed.
|
||||
)
|
||||
wantStoreData := []*load.Data{{
|
||||
Cluster: testClusterNames[0],
|
||||
Service: "",
|
||||
LocalityStats: map[string]load.LocalityData{
|
||||
locality1JSON: {RequestStats: load.RequestData{InProgress: 5}},
|
||||
locality2JSON: {RequestStats: load.RequestData{Succeeded: 5}},
|
||||
locality1JSON: {RequestStats: load.RequestData{InProgress: rpcInProgress}},
|
||||
locality2JSON: {RequestStats: load.RequestData{Succeeded: rpcSucceeded}},
|
||||
},
|
||||
TotalDrops: dropWithCategory + dropWithCB,
|
||||
Drops: map[string]uint64{
|
||||
testDropCategory: dropWithCategory,
|
||||
},
|
||||
}}
|
||||
for i := 0; i < 10; i++ {
|
||||
|
||||
var rpcsToBeDone []balancer.PickResult
|
||||
// Run the picks, but only pick with sc1 will be done later.
|
||||
for i := 0; i < rpcCount; i++ {
|
||||
scst, _ := p1.Pick(balancer.PickInfo{})
|
||||
if scst.Done != nil && scst.SubConn != sc1 {
|
||||
scst.Done(balancer.DoneInfo{})
|
||||
rpcsToBeDone = append(rpcsToBeDone, scst)
|
||||
}
|
||||
}
|
||||
// Call done on those sc1 picks.
|
||||
for _, scst := range rpcsToBeDone {
|
||||
scst.Done(balancer.DoneInfo{})
|
||||
}
|
||||
|
||||
gotStoreData := loadStore.Stats(testClusterNames[0:1])
|
||||
if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(load.Data{}, "ReportInterval")); diff != "" {
|
||||
|
|
|
|||
|
|
@ -87,3 +87,16 @@ func (c *ServiceRequestsCounter) StartRequest() error {
|
|||
func (c *ServiceRequestsCounter) EndRequest() {
|
||||
atomic.AddUint32(&c.numRequests, ^uint32(0))
|
||||
}
|
||||
|
||||
// ClearCounterForTesting clears the counter for the service. Should be only
|
||||
// used in tests.
|
||||
func ClearCounterForTesting(serviceName string) {
|
||||
src.mu.Lock()
|
||||
defer src.mu.Unlock()
|
||||
c, ok := src.services[serviceName]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
c.maxRequests = defaultMaxRequests
|
||||
c.numRequests = 0
|
||||
}
|
||||
|
|
|
|||
|
|
@ -283,7 +283,12 @@ func (ls *perClusterStore) stats() *Data {
|
|||
return true
|
||||
}
|
||||
sd.TotalDrops += d
|
||||
sd.Drops[key.(string)] = d
|
||||
keyStr := key.(string)
|
||||
if keyStr != "" {
|
||||
// Skip drops without category. They are counted in total_drops, but
|
||||
// not in per category. One example is drops by circuit breaking.
|
||||
sd.Drops[keyStr] = d
|
||||
}
|
||||
return true
|
||||
})
|
||||
ls.localityRPCCount.Range(func(key, val interface{}) bool {
|
||||
|
|
|
|||
|
|
@ -47,9 +47,10 @@ func TestDrops(t *testing.T) {
|
|||
drops = map[string]int{
|
||||
dropCategories[0]: 30,
|
||||
dropCategories[1]: 40,
|
||||
"": 10,
|
||||
}
|
||||
wantStoreData = &Data{
|
||||
TotalDrops: 70,
|
||||
TotalDrops: 80,
|
||||
Drops: map[string]uint64{
|
||||
dropCategories[0]: 30,
|
||||
dropCategories[1]: 40,
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@
|
|||
package testutils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
|
||||
|
|
@ -59,11 +58,11 @@ type ClusterLoadAssignmentBuilder struct {
|
|||
}
|
||||
|
||||
// NewClusterLoadAssignmentBuilder creates a ClusterLoadAssignmentBuilder.
|
||||
func NewClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32) *ClusterLoadAssignmentBuilder {
|
||||
func NewClusterLoadAssignmentBuilder(clusterName string, dropPercents map[string]uint32) *ClusterLoadAssignmentBuilder {
|
||||
var drops []*v2xdspb.ClusterLoadAssignment_Policy_DropOverload
|
||||
for i, d := range dropPercents {
|
||||
for n, d := range dropPercents {
|
||||
drops = append(drops, &v2xdspb.ClusterLoadAssignment_Policy_DropOverload{
|
||||
Category: fmt.Sprintf("test-drop-%d", i),
|
||||
Category: n,
|
||||
DropPercentage: &v2typepb.FractionalPercent{
|
||||
Numerator: d,
|
||||
Denominator: v2typepb.FractionalPercent_HUNDRED,
|
||||
|
|
|
|||
Loading…
Reference in New Issue