balancer: Add a SubConn.RegisterHealthListener API and default implementation (#7780)

This commit is contained in:
Arjan Singh Bal 2024-11-13 12:22:49 +05:30 committed by GitHub
parent 0553bc318a
commit 274830d67a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 389 additions and 0 deletions

View File

@ -155,6 +155,14 @@ type SubConn interface {
// indicate the shutdown operation. This may be delivered before
// in-progress RPCs are complete and the actual connection is closed.
Shutdown()
// RegisterHealthListener registers a health listener that receives health
// updates for a Ready SubConn. Only one health listener can be registered
// at a time. A health listener should be registered each time the SubConn's
// connectivity state changes to READY. Registering a health listener when
// the connectivity state is not READY may result in undefined behaviour.
// This method must not be called synchronously while handling an update
// from a previously registered health listener.
RegisterHealthListener(func(SubConnState))
// enforceEmbedding is an unexported method to force implementers embed
// this interface, allowing gRPC to add methods without breaking users.
enforceEmbedding()

View File

@ -55,6 +55,9 @@ func (sc *testSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Pr
return nil, nil
}
// RegisterHealthListener is a no-op.
func (*testSubConn) RegisterHealthListener(func(balancer.SubConnState)) {}
// testPickBuilder creates balancer.Picker for test.
type testPickBuilder struct {
validate func(info PickerBuildInfo)

View File

@ -189,6 +189,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
ac: ac,
producers: make(map[balancer.ProducerBuilder]*refCountedProducer),
stateListener: opts.StateListener,
healthData: newHealthData(connectivity.Idle),
}
ac.acbw = acbw
return acbw, nil
@ -261,6 +262,25 @@ type acBalancerWrapper struct {
producersMu sync.Mutex
producers map[balancer.ProducerBuilder]*refCountedProducer
// Access to healthData is protected by healthMu.
healthMu sync.Mutex
// healthData is stored as a pointer to detect when the health listener is
// dropped or updated. This is required as closures can't be compared for
// equality.
healthData *healthData
}
// healthData holds data related to health state reporting.
type healthData struct {
// connectivityState stores the most recent connectivity state delivered
// to the LB policy. This is stored to avoid sending updates when the
// SubConn has already exited connectivity state READY.
connectivityState connectivity.State
}
func newHealthData(s connectivity.State) *healthData {
return &healthData{connectivityState: s}
}
// updateState is invoked by grpc to push a subConn state update to the
@ -280,6 +300,24 @@ func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolve
if s == connectivity.Ready {
setConnectedAddress(&scs, curAddr)
}
// Invalidate the health listener by updating the healthData.
acbw.healthMu.Lock()
// A race may occur if a health listener is registered soon after the
// connectivity state is set but before the stateListener is called.
// Two cases may arise:
// 1. The new state is not READY: RegisterHealthListener has checks to
// ensure no updates are sent when the connectivity state is not
// READY.
// 2. The new state is READY: This means that the old state wasn't Ready.
// The RegisterHealthListener API mentions that a health listener
// must not be registered when a SubConn is not ready to avoid such
// races. When this happens, the LB policy would get health updates
// on the old listener. When the LB policy registers a new listener
// on receiving the connectivity update, the health updates will be
// sent to the new health listener.
acbw.healthData = newHealthData(scs.ConnectivityState)
acbw.healthMu.Unlock()
acbw.stateListener(scs)
})
}
@ -374,3 +412,41 @@ func (acbw *acBalancerWrapper) closeProducers() {
delete(acbw.producers, pb)
}
}
// RegisterHealthListener accepts a health listener from the LB policy. It sends
// updates to the health listener as long as the SubConn's connectivity state
// doesn't change and a new health listener is not registered. To invalidate
// the currently registered health listener, acbw updates the healthData. If a
// nil listener is registered, the active health listener is dropped.
func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) {
acbw.healthMu.Lock()
defer acbw.healthMu.Unlock()
// listeners should not be registered when the connectivity state
// isn't Ready. This may happen when the balancer registers a listener
// after the connectivityState is updated, but before it is notified
// of the update.
if acbw.healthData.connectivityState != connectivity.Ready {
return
}
// Replace the health data to stop sending updates to any previously
// registered health listeners.
hd := newHealthData(connectivity.Ready)
acbw.healthData = hd
if listener == nil {
return
}
acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
if ctx.Err() != nil || acbw.ccb.balancer == nil {
return
}
// Don't send updates if a new listener is registered.
acbw.healthMu.Lock()
defer acbw.healthMu.Unlock()
curHD := acbw.healthData
if curHD != hd {
return
}
listener(balancer.SubConnState{ConnectivityState: connectivity.Ready})
})
}

View File

@ -93,6 +93,9 @@ func (tsc *TestSubConn) String() string {
return tsc.id
}
// RegisterHealthListener is a no-op.
func (*TestSubConn) RegisterHealthListener(func(balancer.SubConnState)) {}
// BalancerClientConn is a mock balancer.ClientConn used in tests.
type BalancerClientConn struct {
logger Logger

View File

@ -981,3 +981,302 @@ func (s) TestSubConnShutdown(t *testing.T) {
})
}
}
type subConnStoringCCWrapper struct {
balancer.ClientConn
stateListener func(balancer.SubConnState)
scChan chan balancer.SubConn
}
func (ccw *subConnStoringCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
if ccw.stateListener != nil {
origListener := opts.StateListener
opts.StateListener = func(scs balancer.SubConnState) {
ccw.stateListener(scs)
origListener(scs)
}
}
sc, err := ccw.ClientConn.NewSubConn(addrs, opts)
ccw.scChan <- sc
return sc, err
}
// Test calls RegisterHealthListener on a SubConn to verify that expected health
// updates are sent only to the most recently registered listener.
func (s) TestSubConn_RegisterHealthListener(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
scChan := make(chan balancer.SubConn, 1)
bf := stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
cc := bd.ClientConn
ccw := &subConnStoringCCWrapper{
ClientConn: cc,
scChan: scChan,
}
bd.Data = balancer.Get(pickfirst.Name).Build(ccw, bd.BuildOptions)
},
Close: func(bd *stub.BalancerData) {
bd.Data.(balancer.Balancer).Close()
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs)
},
ExitIdle: func(bd *stub.BalancerData) {
bd.Data.(balancer.ExitIdler).ExitIdle()
},
}
stub.Register(t.Name(), bf)
svcCfg := fmt.Sprintf(`{ "loadBalancingConfig": [{%q: {}}] }`, t.Name())
backend := stubserver.StartTestService(t, nil)
defer backend.Stop()
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(svcCfg),
}
cc, err := grpc.NewClient(backend.Address, opts...)
if err != nil {
t.Fatalf("grpc.NewClient(%q) failed: %v", backend.Address, err)
}
defer cc.Close()
cc.Connect()
var sc balancer.SubConn
select {
case sc = <-scChan:
case <-ctx.Done():
t.Fatal("Context timed out waiting for SubConn creation")
}
healthUpdateChan := make(chan balancer.SubConnState, 1)
// Register listener while Ready and verify it gets a health update.
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
for i := 0; i < 2; i++ {
sc.RegisterHealthListener(func(scs balancer.SubConnState) {
healthUpdateChan <- scs
})
select {
case scs := <-healthUpdateChan:
if scs.ConnectivityState != connectivity.Ready {
t.Fatalf("Received health update = %v, want = %v", scs.ConnectivityState, connectivity.Ready)
}
case <-ctx.Done():
t.Fatalf("Context timed out waiting for health update")
}
// No further updates are expected.
select {
case scs := <-healthUpdateChan:
t.Fatalf("Received unexpected health update while channel is in state %v: %v", cc.GetState(), scs)
case <-time.After(defaultTestShortTimeout):
}
}
// Make the SubConn enter IDLE and verify that health updates are recevied
// on registering a new listener.
backend.S.Stop()
backend.S = nil
testutils.AwaitState(ctx, t, cc, connectivity.Idle)
if err := backend.StartServer(); err != nil {
t.Fatalf("Error while restarting the backend server: %v", err)
}
cc.Connect()
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
sc.RegisterHealthListener(func(scs balancer.SubConnState) {
healthUpdateChan <- scs
})
select {
case scs := <-healthUpdateChan:
if scs.ConnectivityState != connectivity.Ready {
t.Fatalf("Received health update = %v, want = %v", scs.ConnectivityState, connectivity.Ready)
}
case <-ctx.Done():
t.Fatalf("Context timed out waiting for health update")
}
}
// Test calls RegisterHealthListener on a SubConn twice while handling the
// connectivity update. The test verifies that only the latest listener
// receives the health update.
func (s) TestSubConn_RegisterHealthListener_RegisterTwice(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
scChan := make(chan balancer.SubConn, 1)
readyUpdateResumeCh := make(chan struct{})
readyUpdateReceivedCh := make(chan struct{})
bf := stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
cc := bd.ClientConn
ccw := &subConnStoringCCWrapper{
ClientConn: cc,
scChan: scChan,
stateListener: func(scs balancer.SubConnState) {
if scs.ConnectivityState != connectivity.Ready {
return
}
close(readyUpdateReceivedCh)
select {
case <-readyUpdateResumeCh:
case <-ctx.Done():
t.Error("Context timed out waiting for update on ready channel")
}
},
}
bd.Data = balancer.Get(pickfirst.Name).Build(ccw, bd.BuildOptions)
},
Close: func(bd *stub.BalancerData) {
bd.Data.(balancer.Balancer).Close()
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs)
},
}
stub.Register(t.Name(), bf)
svcCfg := fmt.Sprintf(`{ "loadBalancingConfig": [{%q: {}}] }`, t.Name())
backend := stubserver.StartTestService(t, nil)
defer backend.Stop()
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(svcCfg),
}
cc, err := grpc.NewClient(backend.Address, opts...)
if err != nil {
t.Fatalf("grpc.NewClient(%q) failed: %v", backend.Address, err)
}
defer cc.Close()
cc.Connect()
var sc balancer.SubConn
select {
case sc = <-scChan:
case <-ctx.Done():
t.Fatal("Context timed out waiting for SubConn creation")
}
// Wait for the SubConn to enter READY.
select {
case <-readyUpdateReceivedCh:
case <-ctx.Done():
t.Fatalf("Context timed out waiting for SubConn to enter READY")
}
healthChan1 := make(chan balancer.SubConnState, 1)
healthChan2 := make(chan balancer.SubConnState, 1)
sc.RegisterHealthListener(func(scs balancer.SubConnState) {
healthChan1 <- scs
})
sc.RegisterHealthListener(func(scs balancer.SubConnState) {
healthChan2 <- scs
})
close(readyUpdateResumeCh)
select {
case scs := <-healthChan2:
if scs.ConnectivityState != connectivity.Ready {
t.Fatalf("Received health update = %v, want = %v", scs.ConnectivityState, connectivity.Ready)
}
case <-ctx.Done():
t.Fatalf("Context timed out waiting for health update")
}
// No updates should be received on the first listener.
select {
case scs := <-healthChan1:
t.Fatalf("Received unexpected health update on first listener: %v", scs)
case <-time.After(defaultTestShortTimeout):
}
}
// Test calls RegisterHealthListener on a SubConn with a nil listener and
// verifies that the listener registered before the nil listener doesn't receive
// any further updates.
func (s) TestSubConn_RegisterHealthListener_NilListener(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
scChan := make(chan balancer.SubConn, 1)
readyUpdateResumeCh := make(chan struct{})
readyUpdateReceivedCh := make(chan struct{})
bf := stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
cc := bd.ClientConn
ccw := &subConnStoringCCWrapper{
ClientConn: cc,
scChan: scChan,
stateListener: func(scs balancer.SubConnState) {
if scs.ConnectivityState != connectivity.Ready {
return
}
close(readyUpdateReceivedCh)
select {
case <-readyUpdateResumeCh:
case <-ctx.Done():
t.Error("Context timed out waiting for update on ready channel")
}
},
}
bd.Data = balancer.Get(pickfirst.Name).Build(ccw, bd.BuildOptions)
},
Close: func(bd *stub.BalancerData) {
bd.Data.(balancer.Balancer).Close()
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs)
},
}
stub.Register(t.Name(), bf)
svcCfg := fmt.Sprintf(`{ "loadBalancingConfig": [{%q: {}}] }`, t.Name())
backend := stubserver.StartTestService(t, nil)
defer backend.Stop()
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(svcCfg),
}
cc, err := grpc.NewClient(backend.Address, opts...)
if err != nil {
t.Fatalf("grpc.NewClient(%q) failed: %v", backend.Address, err)
}
defer cc.Close()
cc.Connect()
var sc balancer.SubConn
select {
case sc = <-scChan:
case <-ctx.Done():
t.Fatal("Context timed out waiting for SubConn creation")
}
// Wait for the SubConn to enter READY.
select {
case <-readyUpdateReceivedCh:
case <-ctx.Done():
t.Fatalf("Context timed out waiting for SubConn to enter READY")
}
healthChan := make(chan balancer.SubConnState, 1)
sc.RegisterHealthListener(func(scs balancer.SubConnState) {
healthChan <- scs
})
// Registering a nil listener should invalidate the previously registered
// listener.
sc.RegisterHealthListener(nil)
close(readyUpdateResumeCh)
// No updates should be received on the listener.
select {
case scs := <-healthChan:
t.Fatalf("Received unexpected health update on the listener: %v", scs)
case <-time.After(defaultTestShortTimeout):
}
}