mirror of https://github.com/grpc/grpc-go.git
xds: better way to compare and update dropper in EDS (#3434)
This commit is contained in:
parent
9e4ff32c82
commit
ecea6ed1fc
|
|
@ -87,6 +87,7 @@ type edsBalancerImpl struct {
|
|||
subConnToPriority map[balancer.SubConn]priorityType
|
||||
|
||||
pickerMu sync.Mutex
|
||||
dropConfig []xdsclient.OverloadDropConfig
|
||||
drops []*dropper
|
||||
innerState balancer.State // The state of the picker without drop support.
|
||||
}
|
||||
|
|
@ -144,36 +145,16 @@ func (edsImpl *edsBalancerImpl) HandleChildPolicy(name string, config json.RawMe
|
|||
|
||||
// updateDrops compares new drop policies with the old. If they are different,
|
||||
// it updates the drop policies and send ClientConn an updated picker.
|
||||
func (edsImpl *edsBalancerImpl) updateDrops(dropPolicies []xdsclient.OverloadDropConfig) {
|
||||
var (
|
||||
newDrops []*dropper
|
||||
dropsChanged bool
|
||||
)
|
||||
for i, dropPolicy := range dropPolicies {
|
||||
var (
|
||||
numerator = dropPolicy.Numerator
|
||||
denominator = dropPolicy.Denominator
|
||||
)
|
||||
newDrops = append(newDrops, newDropper(numerator, denominator, dropPolicy.Category))
|
||||
|
||||
// The following reading edsImpl.drops doesn't need mutex because it can only
|
||||
// be updated by the code following.
|
||||
if dropsChanged {
|
||||
continue
|
||||
func (edsImpl *edsBalancerImpl) updateDrops(dropConfig []xdsclient.OverloadDropConfig) {
|
||||
if cmp.Equal(dropConfig, edsImpl.dropConfig) {
|
||||
return
|
||||
}
|
||||
if i >= len(edsImpl.drops) {
|
||||
dropsChanged = true
|
||||
continue
|
||||
}
|
||||
if oldDrop := edsImpl.drops[i]; numerator != oldDrop.numerator || denominator != oldDrop.denominator {
|
||||
dropsChanged = true
|
||||
}
|
||||
}
|
||||
if len(edsImpl.drops) != len(newDrops) {
|
||||
dropsChanged = true
|
||||
}
|
||||
if dropsChanged {
|
||||
edsImpl.pickerMu.Lock()
|
||||
edsImpl.dropConfig = dropConfig
|
||||
var newDrops []*dropper
|
||||
for _, c := range edsImpl.dropConfig {
|
||||
newDrops = append(newDrops, newDropper(c))
|
||||
}
|
||||
edsImpl.drops = newDrops
|
||||
if edsImpl.innerState.Picker != nil {
|
||||
// Update picker with old inner picker, new drops.
|
||||
|
|
@ -184,7 +165,6 @@ func (edsImpl *edsBalancerImpl) updateDrops(dropPolicies []xdsclient.OverloadDro
|
|||
}
|
||||
edsImpl.pickerMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// HandleEDSResponse handles the EDS response and creates/deletes localities and
|
||||
// SubConns. It also handles drops.
|
||||
|
|
@ -454,7 +434,7 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
|
|||
for _, dp := range d.drops {
|
||||
if dp.drop() {
|
||||
drop = true
|
||||
category = dp.category
|
||||
category = dp.c.Category
|
||||
break
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -603,22 +603,22 @@ func (s) TestDropPicker(t *testing.T) {
|
|||
{
|
||||
name: "one drop",
|
||||
drops: []*dropper{
|
||||
newDropper(1, 2, ""),
|
||||
newDropper(xdsclient.OverloadDropConfig{Numerator: 1, Denominator: 2}),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "two drops",
|
||||
drops: []*dropper{
|
||||
newDropper(1, 3, ""),
|
||||
newDropper(1, 2, ""),
|
||||
newDropper(xdsclient.OverloadDropConfig{Numerator: 1, Denominator: 3}),
|
||||
newDropper(xdsclient.OverloadDropConfig{Numerator: 1, Denominator: 2}),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "three drops",
|
||||
drops: []*dropper{
|
||||
newDropper(1, 3, ""),
|
||||
newDropper(1, 4, ""),
|
||||
newDropper(1, 2, ""),
|
||||
newDropper(xdsclient.OverloadDropConfig{Numerator: 1, Denominator: 3}),
|
||||
newDropper(xdsclient.OverloadDropConfig{Numerator: 1, Denominator: 4}),
|
||||
newDropper(xdsclient.OverloadDropConfig{Numerator: 1, Denominator: 2}),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
@ -634,7 +634,7 @@ func (s) TestDropPicker(t *testing.T) {
|
|||
wantCount = pickCount
|
||||
)
|
||||
for _, dp := range tt.drops {
|
||||
wantCount = wantCount * int(dp.denominator-dp.numerator) / int(dp.denominator)
|
||||
wantCount = wantCount * int(dp.c.Denominator-dp.c.Numerator) / int(dp.c.Denominator)
|
||||
}
|
||||
|
||||
for i := 0; i < pickCount; i++ {
|
||||
|
|
|
|||
|
|
@ -16,26 +16,24 @@
|
|||
|
||||
package edsbalancer
|
||||
|
||||
import "google.golang.org/grpc/internal/wrr"
|
||||
import (
|
||||
"google.golang.org/grpc/internal/wrr"
|
||||
xdsclient "google.golang.org/grpc/xds/internal/client"
|
||||
)
|
||||
|
||||
type dropper struct {
|
||||
// Drop rate will be numerator/denominator.
|
||||
numerator uint32
|
||||
denominator uint32
|
||||
c xdsclient.OverloadDropConfig
|
||||
w wrr.WRR
|
||||
category string
|
||||
}
|
||||
|
||||
func newDropper(numerator, denominator uint32, category string) *dropper {
|
||||
func newDropper(c xdsclient.OverloadDropConfig) *dropper {
|
||||
w := newRandomWRR()
|
||||
w.Add(true, int64(numerator))
|
||||
w.Add(false, int64(denominator-numerator))
|
||||
w.Add(true, int64(c.Numerator))
|
||||
w.Add(false, int64(c.Denominator-c.Numerator))
|
||||
|
||||
return &dropper{
|
||||
numerator: numerator,
|
||||
denominator: denominator,
|
||||
c: c,
|
||||
w: w,
|
||||
category: category,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"google.golang.org/grpc/internal/wrr"
|
||||
xdsclient "google.golang.org/grpc/xds/internal/client"
|
||||
)
|
||||
|
||||
// testWRR is a deterministic WRR implementation.
|
||||
|
|
@ -104,7 +105,12 @@ func (s) TestDropper(t *testing.T) {
|
|||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
d := newDropper(tt.args.numerator, tt.args.denominator, "")
|
||||
d := newDropper(xdsclient.OverloadDropConfig{
|
||||
Category: "",
|
||||
Numerator: tt.args.numerator,
|
||||
Denominator: tt.args.denominator,
|
||||
})
|
||||
|
||||
var (
|
||||
dCount int
|
||||
wantCount = int(tt.args.numerator) * repeat
|
||||
|
|
|
|||
Loading…
Reference in New Issue