diff --git a/xds/internal/balancer/edsbalancer/eds_impl.go b/xds/internal/balancer/edsbalancer/eds_impl.go index e6dec3957..fcafbbdbd 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl.go +++ b/xds/internal/balancer/edsbalancer/eds_impl.go @@ -87,6 +87,7 @@ type edsBalancerImpl struct { subConnToPriority map[balancer.SubConn]priorityType pickerMu sync.Mutex + dropConfig []xdsclient.OverloadDropConfig drops []*dropper innerState balancer.State // The state of the picker without drop support. } @@ -144,46 +145,25 @@ func (edsImpl *edsBalancerImpl) HandleChildPolicy(name string, config json.RawMe // updateDrops compares new drop policies with the old. If they are different, // it updates the drop policies and send ClientConn an updated picker. -func (edsImpl *edsBalancerImpl) updateDrops(dropPolicies []xdsclient.OverloadDropConfig) { - var ( - newDrops []*dropper - dropsChanged bool - ) - for i, dropPolicy := range dropPolicies { - var ( - numerator = dropPolicy.Numerator - denominator = dropPolicy.Denominator +func (edsImpl *edsBalancerImpl) updateDrops(dropConfig []xdsclient.OverloadDropConfig) { + if cmp.Equal(dropConfig, edsImpl.dropConfig) { + return + } + edsImpl.pickerMu.Lock() + edsImpl.dropConfig = dropConfig + var newDrops []*dropper + for _, c := range edsImpl.dropConfig { + newDrops = append(newDrops, newDropper(c)) + } + edsImpl.drops = newDrops + if edsImpl.innerState.Picker != nil { + // Update picker with old inner picker, new drops. + edsImpl.cc.UpdateState(balancer.State{ + ConnectivityState: edsImpl.innerState.ConnectivityState, + Picker: newDropPicker(edsImpl.innerState.Picker, newDrops, edsImpl.loadStore)}, ) - newDrops = append(newDrops, newDropper(numerator, denominator, dropPolicy.Category)) - - // The following reading edsImpl.drops doesn't need mutex because it can only - // be updated by the code following. - if dropsChanged { - continue - } - if i >= len(edsImpl.drops) { - dropsChanged = true - continue - } - if oldDrop := edsImpl.drops[i]; numerator != oldDrop.numerator || denominator != oldDrop.denominator { - dropsChanged = true - } - } - if len(edsImpl.drops) != len(newDrops) { - dropsChanged = true - } - if dropsChanged { - edsImpl.pickerMu.Lock() - edsImpl.drops = newDrops - if edsImpl.innerState.Picker != nil { - // Update picker with old inner picker, new drops. - edsImpl.cc.UpdateState(balancer.State{ - ConnectivityState: edsImpl.innerState.ConnectivityState, - Picker: newDropPicker(edsImpl.innerState.Picker, newDrops, edsImpl.loadStore)}, - ) - } - edsImpl.pickerMu.Unlock() } + edsImpl.pickerMu.Unlock() } // HandleEDSResponse handles the EDS response and creates/deletes localities and @@ -454,7 +434,7 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { for _, dp := range d.drops { if dp.drop() { drop = true - category = dp.category + category = dp.c.Category break } } diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go index 60b6f23be..ec8942437 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go @@ -603,22 +603,22 @@ func (s) TestDropPicker(t *testing.T) { { name: "one drop", drops: []*dropper{ - newDropper(1, 2, ""), + newDropper(xdsclient.OverloadDropConfig{Numerator: 1, Denominator: 2}), }, }, { name: "two drops", drops: []*dropper{ - newDropper(1, 3, ""), - newDropper(1, 2, ""), + newDropper(xdsclient.OverloadDropConfig{Numerator: 1, Denominator: 3}), + newDropper(xdsclient.OverloadDropConfig{Numerator: 1, Denominator: 2}), }, }, { name: "three drops", drops: []*dropper{ - newDropper(1, 3, ""), - newDropper(1, 4, ""), - newDropper(1, 2, ""), + newDropper(xdsclient.OverloadDropConfig{Numerator: 1, Denominator: 3}), + newDropper(xdsclient.OverloadDropConfig{Numerator: 1, Denominator: 4}), + newDropper(xdsclient.OverloadDropConfig{Numerator: 1, Denominator: 2}), }, }, } @@ -634,7 +634,7 @@ func (s) TestDropPicker(t *testing.T) { wantCount = pickCount ) for _, dp := range tt.drops { - wantCount = wantCount * int(dp.denominator-dp.numerator) / int(dp.denominator) + wantCount = wantCount * int(dp.c.Denominator-dp.c.Numerator) / int(dp.c.Denominator) } for i := 0; i < pickCount; i++ { diff --git a/xds/internal/balancer/edsbalancer/util.go b/xds/internal/balancer/edsbalancer/util.go index 941b09805..06322498e 100644 --- a/xds/internal/balancer/edsbalancer/util.go +++ b/xds/internal/balancer/edsbalancer/util.go @@ -16,26 +16,24 @@ package edsbalancer -import "google.golang.org/grpc/internal/wrr" +import ( + "google.golang.org/grpc/internal/wrr" + xdsclient "google.golang.org/grpc/xds/internal/client" +) type dropper struct { - // Drop rate will be numerator/denominator. - numerator uint32 - denominator uint32 - w wrr.WRR - category string + c xdsclient.OverloadDropConfig + w wrr.WRR } -func newDropper(numerator, denominator uint32, category string) *dropper { +func newDropper(c xdsclient.OverloadDropConfig) *dropper { w := newRandomWRR() - w.Add(true, int64(numerator)) - w.Add(false, int64(denominator-numerator)) + w.Add(true, int64(c.Numerator)) + w.Add(false, int64(c.Denominator-c.Numerator)) return &dropper{ - numerator: numerator, - denominator: denominator, - w: w, - category: category, + c: c, + w: w, } } diff --git a/xds/internal/balancer/edsbalancer/util_test.go b/xds/internal/balancer/edsbalancer/util_test.go index aea7da8ac..f4d5e7822 100644 --- a/xds/internal/balancer/edsbalancer/util_test.go +++ b/xds/internal/balancer/edsbalancer/util_test.go @@ -21,6 +21,7 @@ import ( "testing" "google.golang.org/grpc/internal/wrr" + xdsclient "google.golang.org/grpc/xds/internal/client" ) // testWRR is a deterministic WRR implementation. @@ -104,7 +105,12 @@ func (s) TestDropper(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - d := newDropper(tt.args.numerator, tt.args.denominator, "") + d := newDropper(xdsclient.OverloadDropConfig{ + Category: "", + Numerator: tt.args.numerator, + Denominator: tt.args.denominator, + }) + var ( dCount int wantCount = int(tt.args.numerator) * repeat