leastrequest: fix data race in leastrequest picker (#6587)

This commit is contained in:
Huang Chong 2023-09-01 02:39:09 +08:00 committed by GitHub
parent 778e638122
commit e498bbc9bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 7 additions and 7 deletions

View File

@ -80,7 +80,7 @@ func (bb) Name() string {
}
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &leastRequestBalancer{scRPCCounts: make(map[balancer.SubConn]*int32)}
b := &leastRequestBalancer{scRPCCounts: make(map[balancer.SubConn]*atomic.Int32)}
baseBuilder := base.NewBalancerBuilder(Name, b, base.Config{HealthCheck: true})
b.Balancer = baseBuilder.Build(cc, bOpts)
return b
@ -92,7 +92,7 @@ type leastRequestBalancer struct {
balancer.Balancer
choiceCount uint32
scRPCCounts map[balancer.SubConn]*int32 // Hold onto RPC counts to keep track for subsequent picker updates.
scRPCCounts map[balancer.SubConn]*atomic.Int32 // Hold onto RPC counts to keep track for subsequent picker updates.
}
func (lrb *leastRequestBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
@ -108,7 +108,7 @@ func (lrb *leastRequestBalancer) UpdateClientConnState(s balancer.ClientConnStat
type scWithRPCCount struct {
sc balancer.SubConn
numRPCs *int32
numRPCs *atomic.Int32
}
func (lrb *leastRequestBalancer) Build(info base.PickerBuildInfo) balancer.Picker {
@ -126,7 +126,7 @@ func (lrb *leastRequestBalancer) Build(info base.PickerBuildInfo) balancer.Picke
// Create new refs if needed.
for sc := range info.ReadySCs {
if _, ok := lrb.scRPCCounts[sc]; !ok {
lrb.scRPCCounts[sc] = new(int32)
lrb.scRPCCounts[sc] = new(atomic.Int32)
}
}
@ -162,18 +162,18 @@ func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
pickedSC = &sc
continue
}
if *sc.numRPCs < *pickedSC.numRPCs {
if sc.numRPCs.Load() < pickedSC.numRPCs.Load() {
pickedSC = &sc
}
}
// "The counter for a subchannel should be atomically incremented by one
// after it has been successfully picked by the picker." - A48
atomic.AddInt32(pickedSC.numRPCs, 1)
pickedSC.numRPCs.Add(1)
// "the picker should add a callback for atomically decrementing the
// subchannel counter once the RPC finishes (regardless of Status code)." -
// A48.
done := func(balancer.DoneInfo) {
atomic.AddInt32(pickedSC.numRPCs, -1)
pickedSC.numRPCs.Add(-1)
}
return balancer.PickResult{
SubConn: pickedSC.sc,