pickfirst: receive state updates via callback instead of UpdateSubConnState (#6495)

This commit is contained in:
Doug Fawley 2023-08-04 08:14:18 -07:00 committed by GitHub
parent 7aceafcc52
commit d06ab0d4b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 43 additions and 24 deletions

View File

@ -1143,18 +1143,10 @@ func testDefaultServiceConfigWhenResolverReturnInvalidServiceConfig(t *testing.T
} }
type stateRecordingBalancer struct { type stateRecordingBalancer struct {
notifier chan<- connectivity.State
balancer.Balancer balancer.Balancer
} }
func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {}
b.notifier <- s.ConnectivityState
b.Balancer.UpdateSubConnState(sc, s)
}
func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) {
b.notifier = r
}
func (b *stateRecordingBalancer) Close() { func (b *stateRecordingBalancer) Close() {
b.Balancer.Close() b.Balancer.Close()
@ -1179,8 +1171,7 @@ func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balan
b.notifier = stateNotifications b.notifier = stateNotifications
b.mu.Unlock() b.mu.Unlock()
return &stateRecordingBalancer{ return &stateRecordingBalancer{
notifier: stateNotifications, Balancer: balancer.Get("pick_first").Build(&stateRecordingCCWrapper{cc, stateNotifications}, opts),
Balancer: balancer.Get("pick_first").Build(cc, opts),
} }
} }
@ -1192,6 +1183,20 @@ func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity.
return ret return ret
} }
type stateRecordingCCWrapper struct {
balancer.ClientConn
notifier chan<- connectivity.State
}
func (ccw *stateRecordingCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
oldListener := opts.StateListener
opts.StateListener = func(s balancer.SubConnState) {
ccw.notifier <- s.ConnectivityState
oldListener(s)
}
return ccw.ClientConn.NewSubConn(addrs, opts)
}
// Keep reading until something causes the connection to die (EOF, server // Keep reading until something causes the connection to die (EOF, server
// closed, etc). Useful as a tool for mindlessly keeping the connection // closed, etc). Useful as a tool for mindlessly keeping the connection
// healthy, since the client will error if things like client prefaces are not // healthy, since the client will error if things like client prefaces are not

View File

@ -146,7 +146,12 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
return nil return nil
} }
subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{}) var subConn balancer.SubConn
subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{
StateListener: func(state balancer.SubConnState) {
b.updateSubConnState(subConn, state)
},
})
if err != nil { if err != nil {
if b.logger.V(2) { if b.logger.V(2) {
b.logger.Infof("Failed to create new SubConn: %v", err) b.logger.Infof("Failed to create new SubConn: %v", err)
@ -168,7 +173,13 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
return nil return nil
} }
// UpdateSubConnState is unused as a StateListener is always registered when
// creating SubConns.
func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) { func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", subConn, state)
}
func (b *pickfirstBalancer) updateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
if b.logger.V(2) { if b.logger.V(2) {
b.logger.Infof("Received SubConn state update: %p, %+v", subConn, state) b.logger.Infof("Received SubConn state update: %p, %+v", subConn, state)
} }

View File

@ -444,19 +444,9 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
} }
type stateRecordingBalancer struct { type stateRecordingBalancer struct {
notifier chan<- connectivity.State
balancer.Balancer balancer.Balancer
} }
func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
b.notifier <- s.ConnectivityState
b.Balancer.UpdateSubConnState(sc, s)
}
func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) {
b.notifier = r
}
func (b *stateRecordingBalancer) Close() { func (b *stateRecordingBalancer) Close() {
b.Balancer.Close() b.Balancer.Close()
} }
@ -480,8 +470,7 @@ func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balan
b.notifier = stateNotifications b.notifier = stateNotifications
b.mu.Unlock() b.mu.Unlock()
return &stateRecordingBalancer{ return &stateRecordingBalancer{
notifier: stateNotifications, Balancer: balancer.Get("pick_first").Build(&stateRecordingCCWrapper{cc, stateNotifications}, opts),
Balancer: balancer.Get("pick_first").Build(cc, opts),
} }
} }
@ -493,6 +482,20 @@ func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity.
return ret return ret
} }
type stateRecordingCCWrapper struct {
balancer.ClientConn
notifier chan<- connectivity.State
}
func (ccw *stateRecordingCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
oldListener := opts.StateListener
opts.StateListener = func(s balancer.SubConnState) {
ccw.notifier <- s.ConnectivityState
oldListener(s)
}
return ccw.ClientConn.NewSubConn(addrs, opts)
}
// Keep reading until something causes the connection to die (EOF, server // Keep reading until something causes the connection to die (EOF, server
// closed, etc). Useful as a tool for mindlessly keeping the connection // closed, etc). Useful as a tool for mindlessly keeping the connection
// healthy, since the client will error if things like client prefaces are not // healthy, since the client will error if things like client prefaces are not