resolver: create AddressMapV2 with generics to replace AddressMap (#8187)

This commit is contained in:
Doug Fawley 2025-03-21 13:09:52 -07:00 committed by GitHub
parent 43a4a84abc
commit b0d1203846
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 63 additions and 56 deletions

View File

@ -41,7 +41,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) ba
cc: cc, cc: cc,
pickerBuilder: bb.pickerBuilder, pickerBuilder: bb.pickerBuilder,
subConns: resolver.NewAddressMap(), subConns: resolver.NewAddressMapV2[balancer.SubConn](),
scStates: make(map[balancer.SubConn]connectivity.State), scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{}, csEvltr: &balancer.ConnectivityStateEvaluator{},
config: bb.config, config: bb.config,
@ -65,7 +65,7 @@ type baseBalancer struct {
csEvltr *balancer.ConnectivityStateEvaluator csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State state connectivity.State
subConns *resolver.AddressMap subConns *resolver.AddressMapV2[balancer.SubConn]
scStates map[balancer.SubConn]connectivity.State scStates map[balancer.SubConn]connectivity.State
picker balancer.Picker picker balancer.Picker
config Config config Config
@ -100,7 +100,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// Successful resolution; clear resolver error and ensure we return nil. // Successful resolution; clear resolver error and ensure we return nil.
b.resolverErr = nil b.resolverErr = nil
// addrsSet is the set converted from addrs, it's used for quick lookup of an address. // addrsSet is the set converted from addrs, it's used for quick lookup of an address.
addrsSet := resolver.NewAddressMap() addrsSet := resolver.NewAddressMapV2[any]()
for _, a := range s.ResolverState.Addresses { for _, a := range s.ResolverState.Addresses {
addrsSet.Set(a, nil) addrsSet.Set(a, nil)
if _, ok := b.subConns.Get(a); !ok { if _, ok := b.subConns.Get(a); !ok {
@ -122,8 +122,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
} }
} }
for _, a := range b.subConns.Keys() { for _, a := range b.subConns.Keys() {
sci, _ := b.subConns.Get(a) sc, _ := b.subConns.Get(a)
sc := sci.(balancer.SubConn)
// a was removed by resolver. // a was removed by resolver.
if _, ok := addrsSet.Get(a); !ok { if _, ok := addrsSet.Get(a); !ok {
sc.Shutdown() sc.Shutdown()
@ -173,8 +172,7 @@ func (b *baseBalancer) regeneratePicker() {
// Filter out all ready SCs from full subConn map. // Filter out all ready SCs from full subConn map.
for _, addr := range b.subConns.Keys() { for _, addr := range b.subConns.Keys() {
sci, _ := b.subConns.Get(addr) sc, _ := b.subConns.Get(addr)
sc := sci.(balancer.SubConn)
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready { if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
readySCs[sc] = SubConnInfo{Address: addr} readySCs[sc] = SubConnInfo{Address: addr}
} }

View File

@ -122,7 +122,7 @@ func (pickfirstBuilder) Build(cc balancer.ClientConn, bo balancer.BuildOptions)
target: bo.Target.String(), target: bo.Target.String(),
metricsRecorder: cc.MetricsRecorder(), metricsRecorder: cc.MetricsRecorder(),
subConns: resolver.NewAddressMap(), subConns: resolver.NewAddressMapV2[*scData](),
state: connectivity.Connecting, state: connectivity.Connecting,
cancelConnectionTimer: func() {}, cancelConnectionTimer: func() {},
} }
@ -220,7 +220,7 @@ type pickfirstBalancer struct {
// updates. // updates.
state connectivity.State state connectivity.State
// scData for active subonns mapped by address. // scData for active subonns mapped by address.
subConns *resolver.AddressMap subConns *resolver.AddressMapV2[*scData]
addressList addressList addressList addressList
firstPass bool firstPass bool
numTF int numTF int
@ -319,7 +319,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
prevAddr := b.addressList.currentAddress() prevAddr := b.addressList.currentAddress()
prevSCData, found := b.subConns.Get(prevAddr) prevSCData, found := b.subConns.Get(prevAddr)
prevAddrsCount := b.addressList.size() prevAddrsCount := b.addressList.size()
isPrevRawConnectivityStateReady := found && prevSCData.(*scData).rawConnectivityState == connectivity.Ready isPrevRawConnectivityStateReady := found && prevSCData.rawConnectivityState == connectivity.Ready
b.addressList.updateAddrs(newAddrs) b.addressList.updateAddrs(newAddrs)
// If the previous ready SubConn exists in new address list, // If the previous ready SubConn exists in new address list,
@ -381,21 +381,21 @@ func (b *pickfirstBalancer) startFirstPassLocked() {
b.numTF = 0 b.numTF = 0
// Reset the connection attempt record for existing SubConns. // Reset the connection attempt record for existing SubConns.
for _, sd := range b.subConns.Values() { for _, sd := range b.subConns.Values() {
sd.(*scData).connectionFailedInFirstPass = false sd.connectionFailedInFirstPass = false
} }
b.requestConnectionLocked() b.requestConnectionLocked()
} }
func (b *pickfirstBalancer) closeSubConnsLocked() { func (b *pickfirstBalancer) closeSubConnsLocked() {
for _, sd := range b.subConns.Values() { for _, sd := range b.subConns.Values() {
sd.(*scData).subConn.Shutdown() sd.subConn.Shutdown()
} }
b.subConns = resolver.NewAddressMap() b.subConns = resolver.NewAddressMapV2[*scData]()
} }
// deDupAddresses ensures that each address appears only once in the slice. // deDupAddresses ensures that each address appears only once in the slice.
func deDupAddresses(addrs []resolver.Address) []resolver.Address { func deDupAddresses(addrs []resolver.Address) []resolver.Address {
seenAddrs := resolver.NewAddressMap() seenAddrs := resolver.NewAddressMapV2[*scData]()
retAddrs := []resolver.Address{} retAddrs := []resolver.Address{}
for _, addr := range addrs { for _, addr := range addrs {
@ -481,7 +481,7 @@ func addressFamily(address string) ipAddrFamily {
// This ensures that the subchannel map accurately reflects the current set of // This ensures that the subchannel map accurately reflects the current set of
// addresses received from the name resolver. // addresses received from the name resolver.
func (b *pickfirstBalancer) reconcileSubConnsLocked(newAddrs []resolver.Address) { func (b *pickfirstBalancer) reconcileSubConnsLocked(newAddrs []resolver.Address) {
newAddrsMap := resolver.NewAddressMap() newAddrsMap := resolver.NewAddressMapV2[bool]()
for _, addr := range newAddrs { for _, addr := range newAddrs {
newAddrsMap.Set(addr, true) newAddrsMap.Set(addr, true)
} }
@ -491,7 +491,7 @@ func (b *pickfirstBalancer) reconcileSubConnsLocked(newAddrs []resolver.Address)
continue continue
} }
val, _ := b.subConns.Get(oldAddr) val, _ := b.subConns.Get(oldAddr)
val.(*scData).subConn.Shutdown() val.subConn.Shutdown()
b.subConns.Delete(oldAddr) b.subConns.Delete(oldAddr)
} }
} }
@ -500,13 +500,12 @@ func (b *pickfirstBalancer) reconcileSubConnsLocked(newAddrs []resolver.Address)
// becomes ready, which means that all other subConn must be shutdown. // becomes ready, which means that all other subConn must be shutdown.
func (b *pickfirstBalancer) shutdownRemainingLocked(selected *scData) { func (b *pickfirstBalancer) shutdownRemainingLocked(selected *scData) {
b.cancelConnectionTimer() b.cancelConnectionTimer()
for _, v := range b.subConns.Values() { for _, sd := range b.subConns.Values() {
sd := v.(*scData)
if sd.subConn != selected.subConn { if sd.subConn != selected.subConn {
sd.subConn.Shutdown() sd.subConn.Shutdown()
} }
} }
b.subConns = resolver.NewAddressMap() b.subConns = resolver.NewAddressMapV2[*scData]()
b.subConns.Set(selected.addr, selected) b.subConns.Set(selected.addr, selected)
} }
@ -539,18 +538,17 @@ func (b *pickfirstBalancer) requestConnectionLocked() {
b.subConns.Set(curAddr, sd) b.subConns.Set(curAddr, sd)
} }
scd := sd.(*scData) switch sd.rawConnectivityState {
switch scd.rawConnectivityState {
case connectivity.Idle: case connectivity.Idle:
scd.subConn.Connect() sd.subConn.Connect()
b.scheduleNextConnectionLocked() b.scheduleNextConnectionLocked()
return return
case connectivity.TransientFailure: case connectivity.TransientFailure:
// The SubConn is being re-used and failed during a previous pass // The SubConn is being re-used and failed during a previous pass
// over the addressList. It has not completed backoff yet. // over the addressList. It has not completed backoff yet.
// Mark it as having failed and try the next address. // Mark it as having failed and try the next address.
scd.connectionFailedInFirstPass = true sd.connectionFailedInFirstPass = true
lastErr = scd.lastErr lastErr = sd.lastErr
continue continue
case connectivity.Connecting: case connectivity.Connecting:
// Wait for the connection attempt to complete or the timer to fire // Wait for the connection attempt to complete or the timer to fire
@ -558,7 +556,7 @@ func (b *pickfirstBalancer) requestConnectionLocked() {
b.scheduleNextConnectionLocked() b.scheduleNextConnectionLocked()
return return
default: default:
b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", scd.rawConnectivityState) b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", sd.rawConnectivityState)
return return
} }
@ -753,8 +751,7 @@ func (b *pickfirstBalancer) endFirstPassIfPossibleLocked(lastErr error) {
} }
// Connect() has been called on all the SubConns. The first pass can be // Connect() has been called on all the SubConns. The first pass can be
// ended if all the SubConns have reported a failure. // ended if all the SubConns have reported a failure.
for _, v := range b.subConns.Values() { for _, sd := range b.subConns.Values() {
sd := v.(*scData)
if !sd.connectionFailedInFirstPass { if !sd.connectionFailedInFirstPass {
return return
} }
@ -765,8 +762,7 @@ func (b *pickfirstBalancer) endFirstPassIfPossibleLocked(lastErr error) {
Picker: &picker{err: lastErr}, Picker: &picker{err: lastErr},
}) })
// Start re-connecting all the SubConns that are already in IDLE. // Start re-connecting all the SubConns that are already in IDLE.
for _, v := range b.subConns.Values() { for _, sd := range b.subConns.Values() {
sd := v.(*scData)
if sd.rawConnectivityState == connectivity.Idle { if sd.rawConnectivityState == connectivity.Idle {
sd.subConn.Connect() sd.subConn.Connect()
} }

View File

@ -104,7 +104,7 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
ClientConn: cc, ClientConn: cc,
target: bOpts.Target.String(), target: bOpts.Target.String(),
metricsRecorder: cc.MetricsRecorder(), metricsRecorder: cc.MetricsRecorder(),
addressWeights: resolver.NewAddressMap(), addressWeights: resolver.NewAddressMapV2[*endpointWeight](),
endpointToWeight: resolver.NewEndpointMap(), endpointToWeight: resolver.NewEndpointMap(),
scToWeight: make(map[balancer.SubConn]*endpointWeight), scToWeight: make(map[balancer.SubConn]*endpointWeight),
} }
@ -156,7 +156,7 @@ func (bb) Name() string {
// Caller must hold b.mu. // Caller must hold b.mu.
func (b *wrrBalancer) updateEndpointsLocked(endpoints []resolver.Endpoint) { func (b *wrrBalancer) updateEndpointsLocked(endpoints []resolver.Endpoint) {
endpointSet := resolver.NewEndpointMap() endpointSet := resolver.NewEndpointMap()
addressSet := resolver.NewAddressMap() addressSet := resolver.NewAddressMapV2[*endpointWeight]()
for _, endpoint := range endpoints { for _, endpoint := range endpoints {
endpointSet.Set(endpoint, nil) endpointSet.Set(endpoint, nil)
for _, addr := range endpoint.Addresses { for _, addr := range endpoint.Addresses {
@ -214,7 +214,7 @@ type wrrBalancer struct {
cfg *lbConfig // active config cfg *lbConfig // active config
locality string locality string
stopPicker *grpcsync.Event stopPicker *grpcsync.Event
addressWeights *resolver.AddressMap // addr -> endpointWeight addressWeights *resolver.AddressMapV2[*endpointWeight]
endpointToWeight *resolver.EndpointMap // endpoint -> endpointWeight endpointToWeight *resolver.EndpointMap // endpoint -> endpointWeight
scToWeight map[balancer.SubConn]*endpointWeight scToWeight map[balancer.SubConn]*endpointWeight
} }
@ -329,7 +329,7 @@ func (b *wrrBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubC
if err != nil { if err != nil {
return nil, err return nil, err
} }
b.scToWeight[sc] = ewi.(*endpointWeight) b.scToWeight[sc] = ewi
return sc, nil return sc, nil
} }

View File

@ -24,16 +24,22 @@ import (
"strings" "strings"
) )
type addressMapEntry struct { type addressMapEntry[T any] struct {
addr Address addr Address
value any value T
} }
// AddressMap is a map of addresses to arbitrary values taking into account // AddressMap is an AddressMapV2[any]. It will be deleted in an upcoming
// release of grpc-go.
//
// Deprecated: use the generic AddressMapV2 type instead.
type AddressMap = AddressMapV2[any]
// AddressMapV2 is a map of addresses to arbitrary values taking into account
// Attributes. BalancerAttributes are ignored, as are Metadata and Type. // Attributes. BalancerAttributes are ignored, as are Metadata and Type.
// Multiple accesses may not be performed concurrently. Must be created via // Multiple accesses may not be performed concurrently. Must be created via
// NewAddressMap; do not construct directly. // NewAddressMap; do not construct directly.
type AddressMap struct { type AddressMapV2[T any] struct {
// The underlying map is keyed by an Address with fields that we don't care // The underlying map is keyed by an Address with fields that we don't care
// about being set to their zero values. The only fields that we care about // about being set to their zero values. The only fields that we care about
// are `Addr`, `ServerName` and `Attributes`. Since we need to be able to // are `Addr`, `ServerName` and `Attributes`. Since we need to be able to
@ -47,23 +53,30 @@ type AddressMap struct {
// The value type of the map contains a slice of addresses which match the key // The value type of the map contains a slice of addresses which match the key
// in their `Addr` and `ServerName` fields and contain the corresponding value // in their `Addr` and `ServerName` fields and contain the corresponding value
// associated with them. // associated with them.
m map[Address]addressMapEntryList m map[Address]addressMapEntryList[T]
} }
func toMapKey(addr *Address) Address { func toMapKey(addr *Address) Address {
return Address{Addr: addr.Addr, ServerName: addr.ServerName} return Address{Addr: addr.Addr, ServerName: addr.ServerName}
} }
type addressMapEntryList []*addressMapEntry type addressMapEntryList[T any] []*addressMapEntry[T]
// NewAddressMap creates a new AddressMap. // NewAddressMap creates a new AddressMapV2[any].
//
// Deprecated: use the generic NewAddressMapV2 constructor instead.
func NewAddressMap() *AddressMap { func NewAddressMap() *AddressMap {
return &AddressMap{m: make(map[Address]addressMapEntryList)} return NewAddressMapV2[any]()
}
// NewAddressMapV2 creates a new AddressMapV2.
func NewAddressMapV2[T any]() *AddressMapV2[T] {
return &AddressMapV2[T]{m: make(map[Address]addressMapEntryList[T])}
} }
// find returns the index of addr in the addressMapEntry slice, or -1 if not // find returns the index of addr in the addressMapEntry slice, or -1 if not
// present. // present.
func (l addressMapEntryList) find(addr Address) int { func (l addressMapEntryList[T]) find(addr Address) int {
for i, entry := range l { for i, entry := range l {
// Attributes are the only thing to match on here, since `Addr` and // Attributes are the only thing to match on here, since `Addr` and
// `ServerName` are already equal. // `ServerName` are already equal.
@ -75,28 +88,28 @@ func (l addressMapEntryList) find(addr Address) int {
} }
// Get returns the value for the address in the map, if present. // Get returns the value for the address in the map, if present.
func (a *AddressMap) Get(addr Address) (value any, ok bool) { func (a *AddressMapV2[T]) Get(addr Address) (value T, ok bool) {
addrKey := toMapKey(&addr) addrKey := toMapKey(&addr)
entryList := a.m[addrKey] entryList := a.m[addrKey]
if entry := entryList.find(addr); entry != -1 { if entry := entryList.find(addr); entry != -1 {
return entryList[entry].value, true return entryList[entry].value, true
} }
return nil, false return value, false
} }
// Set updates or adds the value to the address in the map. // Set updates or adds the value to the address in the map.
func (a *AddressMap) Set(addr Address, value any) { func (a *AddressMapV2[T]) Set(addr Address, value T) {
addrKey := toMapKey(&addr) addrKey := toMapKey(&addr)
entryList := a.m[addrKey] entryList := a.m[addrKey]
if entry := entryList.find(addr); entry != -1 { if entry := entryList.find(addr); entry != -1 {
entryList[entry].value = value entryList[entry].value = value
return return
} }
a.m[addrKey] = append(entryList, &addressMapEntry{addr: addr, value: value}) a.m[addrKey] = append(entryList, &addressMapEntry[T]{addr: addr, value: value})
} }
// Delete removes addr from the map. // Delete removes addr from the map.
func (a *AddressMap) Delete(addr Address) { func (a *AddressMapV2[T]) Delete(addr Address) {
addrKey := toMapKey(&addr) addrKey := toMapKey(&addr)
entryList := a.m[addrKey] entryList := a.m[addrKey]
entry := entryList.find(addr) entry := entryList.find(addr)
@ -113,7 +126,7 @@ func (a *AddressMap) Delete(addr Address) {
} }
// Len returns the number of entries in the map. // Len returns the number of entries in the map.
func (a *AddressMap) Len() int { func (a *AddressMapV2[T]) Len() int {
ret := 0 ret := 0
for _, entryList := range a.m { for _, entryList := range a.m {
ret += len(entryList) ret += len(entryList)
@ -122,7 +135,7 @@ func (a *AddressMap) Len() int {
} }
// Keys returns a slice of all current map keys. // Keys returns a slice of all current map keys.
func (a *AddressMap) Keys() []Address { func (a *AddressMapV2[T]) Keys() []Address {
ret := make([]Address, 0, a.Len()) ret := make([]Address, 0, a.Len())
for _, entryList := range a.m { for _, entryList := range a.m {
for _, entry := range entryList { for _, entry := range entryList {
@ -133,8 +146,8 @@ func (a *AddressMap) Keys() []Address {
} }
// Values returns a slice of all current map values. // Values returns a slice of all current map values.
func (a *AddressMap) Values() []any { func (a *AddressMapV2[T]) Values() []T {
ret := make([]any, 0, a.Len()) ret := make([]T, 0, a.Len())
for _, entryList := range a.m { for _, entryList := range a.m {
for _, entry := range entryList { for _, entry := range entryList {
ret = append(ret, entry.value) ret = append(ret, entry.value)

View File

@ -52,7 +52,7 @@ var (
) )
func (s) TestAddressMap_Length(t *testing.T) { func (s) TestAddressMap_Length(t *testing.T) {
addrMap := NewAddressMap() addrMap := NewAddressMapV2[any]()
if got := addrMap.Len(); got != 0 { if got := addrMap.Len(); got != 0 {
t.Fatalf("addrMap.Len() = %v; want 0", got) t.Fatalf("addrMap.Len() = %v; want 0", got)
} }
@ -72,7 +72,7 @@ func (s) TestAddressMap_Length(t *testing.T) {
} }
func (s) TestAddressMap_Get(t *testing.T) { func (s) TestAddressMap_Get(t *testing.T) {
addrMap := NewAddressMap() addrMap := NewAddressMapV2[any]()
addrMap.Set(addr1, 1) addrMap.Set(addr1, 1)
if got, ok := addrMap.Get(addr2); ok || got != nil { if got, ok := addrMap.Get(addr2); ok || got != nil {
@ -109,7 +109,7 @@ func (s) TestAddressMap_Get(t *testing.T) {
} }
func (s) TestAddressMap_Delete(t *testing.T) { func (s) TestAddressMap_Delete(t *testing.T) {
addrMap := NewAddressMap() addrMap := NewAddressMapV2[any]()
addrMap.Set(addr1, 1) addrMap.Set(addr1, 1)
addrMap.Set(addr2, 2) addrMap.Set(addr2, 2)
if got, want := addrMap.Len(), 2; got != want { if got, want := addrMap.Len(), 2; got != want {
@ -132,7 +132,7 @@ func (s) TestAddressMap_Delete(t *testing.T) {
} }
func (s) TestAddressMap_Keys(t *testing.T) { func (s) TestAddressMap_Keys(t *testing.T) {
addrMap := NewAddressMap() addrMap := NewAddressMapV2[any]()
addrMap.Set(addr1, 1) addrMap.Set(addr1, 1)
addrMap.Set(addr2, 2) addrMap.Set(addr2, 2)
addrMap.Set(addr3, 3) addrMap.Set(addr3, 3)
@ -153,7 +153,7 @@ func (s) TestAddressMap_Keys(t *testing.T) {
} }
func (s) TestAddressMap_Values(t *testing.T) { func (s) TestAddressMap_Values(t *testing.T) {
addrMap := NewAddressMap() addrMap := NewAddressMapV2[any]()
addrMap.Set(addr1, 1) addrMap.Set(addr1, 1)
addrMap.Set(addr2, 2) addrMap.Set(addr2, 2)
addrMap.Set(addr3, 3) addrMap.Set(addr3, 3)