From 274830d67a236ec65c1e705c4f4b92be6e326a82 Mon Sep 17 00:00:00 2001 From: Arjan Singh Bal <46515553+arjan-bal@users.noreply.github.com> Date: Wed, 13 Nov 2024 12:22:49 +0530 Subject: [PATCH] balancer: Add a SubConn.RegisterHealthListener API and default implementation (#7780) --- balancer/balancer.go | 8 + balancer/base/balancer_test.go | 3 + balancer_wrapper.go | 76 +++++++++ internal/testutils/balancer.go | 3 + test/balancer_test.go | 299 +++++++++++++++++++++++++++++++++ 5 files changed, 389 insertions(+) diff --git a/balancer/balancer.go b/balancer/balancer.go index 324915c16..b0d11d139 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -155,6 +155,14 @@ type SubConn interface { // indicate the shutdown operation. This may be delivered before // in-progress RPCs are complete and the actual connection is closed. Shutdown() + // RegisterHealthListener registers a health listener that receives health + // updates for a Ready SubConn. Only one health listener can be registered + // at a time. A health listener should be registered each time the SubConn's + // connectivity state changes to READY. Registering a health listener when + // the connectivity state is not READY may result in undefined behaviour. + // This method must not be called synchronously while handling an update + // from a previously registered health listener. + RegisterHealthListener(func(SubConnState)) // enforceEmbedding is an unexported method to force implementers embed // this interface, allowing gRPC to add methods without breaking users. enforceEmbedding() diff --git a/balancer/base/balancer_test.go b/balancer/base/balancer_test.go index ea4a4fda2..7486cf1c4 100644 --- a/balancer/base/balancer_test.go +++ b/balancer/base/balancer_test.go @@ -55,6 +55,9 @@ func (sc *testSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Pr return nil, nil } +// RegisterHealthListener is a no-op. +func (*testSubConn) RegisterHealthListener(func(balancer.SubConnState)) {} + // testPickBuilder creates balancer.Picker for test. type testPickBuilder struct { validate func(info PickerBuildInfo) diff --git a/balancer_wrapper.go b/balancer_wrapper.go index 80620d310..747669fda 100644 --- a/balancer_wrapper.go +++ b/balancer_wrapper.go @@ -189,6 +189,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer), stateListener: opts.StateListener, + healthData: newHealthData(connectivity.Idle), } ac.acbw = acbw return acbw, nil @@ -261,6 +262,25 @@ type acBalancerWrapper struct { producersMu sync.Mutex producers map[balancer.ProducerBuilder]*refCountedProducer + + // Access to healthData is protected by healthMu. + healthMu sync.Mutex + // healthData is stored as a pointer to detect when the health listener is + // dropped or updated. This is required as closures can't be compared for + // equality. + healthData *healthData +} + +// healthData holds data related to health state reporting. +type healthData struct { + // connectivityState stores the most recent connectivity state delivered + // to the LB policy. This is stored to avoid sending updates when the + // SubConn has already exited connectivity state READY. + connectivityState connectivity.State +} + +func newHealthData(s connectivity.State) *healthData { + return &healthData{connectivityState: s} } // updateState is invoked by grpc to push a subConn state update to the @@ -280,6 +300,24 @@ func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolve if s == connectivity.Ready { setConnectedAddress(&scs, curAddr) } + // Invalidate the health listener by updating the healthData. + acbw.healthMu.Lock() + // A race may occur if a health listener is registered soon after the + // connectivity state is set but before the stateListener is called. + // Two cases may arise: + // 1. The new state is not READY: RegisterHealthListener has checks to + // ensure no updates are sent when the connectivity state is not + // READY. + // 2. The new state is READY: This means that the old state wasn't Ready. + // The RegisterHealthListener API mentions that a health listener + // must not be registered when a SubConn is not ready to avoid such + // races. When this happens, the LB policy would get health updates + // on the old listener. When the LB policy registers a new listener + // on receiving the connectivity update, the health updates will be + // sent to the new health listener. + acbw.healthData = newHealthData(scs.ConnectivityState) + acbw.healthMu.Unlock() + acbw.stateListener(scs) }) } @@ -374,3 +412,41 @@ func (acbw *acBalancerWrapper) closeProducers() { delete(acbw.producers, pb) } } + +// RegisterHealthListener accepts a health listener from the LB policy. It sends +// updates to the health listener as long as the SubConn's connectivity state +// doesn't change and a new health listener is not registered. To invalidate +// the currently registered health listener, acbw updates the healthData. If a +// nil listener is registered, the active health listener is dropped. +func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) { + acbw.healthMu.Lock() + defer acbw.healthMu.Unlock() + // listeners should not be registered when the connectivity state + // isn't Ready. This may happen when the balancer registers a listener + // after the connectivityState is updated, but before it is notified + // of the update. + if acbw.healthData.connectivityState != connectivity.Ready { + return + } + // Replace the health data to stop sending updates to any previously + // registered health listeners. + hd := newHealthData(connectivity.Ready) + acbw.healthData = hd + if listener == nil { + return + } + + acbw.ccb.serializer.TrySchedule(func(ctx context.Context) { + if ctx.Err() != nil || acbw.ccb.balancer == nil { + return + } + // Don't send updates if a new listener is registered. + acbw.healthMu.Lock() + defer acbw.healthMu.Unlock() + curHD := acbw.healthData + if curHD != hd { + return + } + listener(balancer.SubConnState{ConnectivityState: connectivity.Ready}) + }) +} diff --git a/internal/testutils/balancer.go b/internal/testutils/balancer.go index d1b4292b4..5a446b147 100644 --- a/internal/testutils/balancer.go +++ b/internal/testutils/balancer.go @@ -93,6 +93,9 @@ func (tsc *TestSubConn) String() string { return tsc.id } +// RegisterHealthListener is a no-op. +func (*TestSubConn) RegisterHealthListener(func(balancer.SubConnState)) {} + // BalancerClientConn is a mock balancer.ClientConn used in tests. type BalancerClientConn struct { logger Logger diff --git a/test/balancer_test.go b/test/balancer_test.go index c2405808f..bb45e11d9 100644 --- a/test/balancer_test.go +++ b/test/balancer_test.go @@ -981,3 +981,302 @@ func (s) TestSubConnShutdown(t *testing.T) { }) } } + +type subConnStoringCCWrapper struct { + balancer.ClientConn + stateListener func(balancer.SubConnState) + scChan chan balancer.SubConn +} + +func (ccw *subConnStoringCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + if ccw.stateListener != nil { + origListener := opts.StateListener + opts.StateListener = func(scs balancer.SubConnState) { + ccw.stateListener(scs) + origListener(scs) + } + } + sc, err := ccw.ClientConn.NewSubConn(addrs, opts) + ccw.scChan <- sc + return sc, err +} + +// Test calls RegisterHealthListener on a SubConn to verify that expected health +// updates are sent only to the most recently registered listener. +func (s) TestSubConn_RegisterHealthListener(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + scChan := make(chan balancer.SubConn, 1) + bf := stub.BalancerFuncs{ + Init: func(bd *stub.BalancerData) { + cc := bd.ClientConn + ccw := &subConnStoringCCWrapper{ + ClientConn: cc, + scChan: scChan, + } + bd.Data = balancer.Get(pickfirst.Name).Build(ccw, bd.BuildOptions) + }, + Close: func(bd *stub.BalancerData) { + bd.Data.(balancer.Balancer).Close() + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs) + }, + ExitIdle: func(bd *stub.BalancerData) { + bd.Data.(balancer.ExitIdler).ExitIdle() + }, + } + + stub.Register(t.Name(), bf) + svcCfg := fmt.Sprintf(`{ "loadBalancingConfig": [{%q: {}}] }`, t.Name()) + backend := stubserver.StartTestService(t, nil) + defer backend.Stop() + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(svcCfg), + } + cc, err := grpc.NewClient(backend.Address, opts...) + if err != nil { + t.Fatalf("grpc.NewClient(%q) failed: %v", backend.Address, err) + + } + defer cc.Close() + + cc.Connect() + + var sc balancer.SubConn + select { + case sc = <-scChan: + case <-ctx.Done(): + t.Fatal("Context timed out waiting for SubConn creation") + } + healthUpdateChan := make(chan balancer.SubConnState, 1) + + // Register listener while Ready and verify it gets a health update. + testutils.AwaitState(ctx, t, cc, connectivity.Ready) + for i := 0; i < 2; i++ { + sc.RegisterHealthListener(func(scs balancer.SubConnState) { + healthUpdateChan <- scs + }) + select { + case scs := <-healthUpdateChan: + if scs.ConnectivityState != connectivity.Ready { + t.Fatalf("Received health update = %v, want = %v", scs.ConnectivityState, connectivity.Ready) + } + case <-ctx.Done(): + t.Fatalf("Context timed out waiting for health update") + } + + // No further updates are expected. + select { + case scs := <-healthUpdateChan: + t.Fatalf("Received unexpected health update while channel is in state %v: %v", cc.GetState(), scs) + case <-time.After(defaultTestShortTimeout): + } + } + + // Make the SubConn enter IDLE and verify that health updates are recevied + // on registering a new listener. + backend.S.Stop() + backend.S = nil + testutils.AwaitState(ctx, t, cc, connectivity.Idle) + if err := backend.StartServer(); err != nil { + t.Fatalf("Error while restarting the backend server: %v", err) + } + cc.Connect() + testutils.AwaitState(ctx, t, cc, connectivity.Ready) + sc.RegisterHealthListener(func(scs balancer.SubConnState) { + healthUpdateChan <- scs + }) + select { + case scs := <-healthUpdateChan: + if scs.ConnectivityState != connectivity.Ready { + t.Fatalf("Received health update = %v, want = %v", scs.ConnectivityState, connectivity.Ready) + } + case <-ctx.Done(): + t.Fatalf("Context timed out waiting for health update") + } +} + +// Test calls RegisterHealthListener on a SubConn twice while handling the +// connectivity update. The test verifies that only the latest listener +// receives the health update. +func (s) TestSubConn_RegisterHealthListener_RegisterTwice(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + scChan := make(chan balancer.SubConn, 1) + readyUpdateResumeCh := make(chan struct{}) + readyUpdateReceivedCh := make(chan struct{}) + bf := stub.BalancerFuncs{ + Init: func(bd *stub.BalancerData) { + cc := bd.ClientConn + ccw := &subConnStoringCCWrapper{ + ClientConn: cc, + scChan: scChan, + stateListener: func(scs balancer.SubConnState) { + if scs.ConnectivityState != connectivity.Ready { + return + } + close(readyUpdateReceivedCh) + select { + case <-readyUpdateResumeCh: + case <-ctx.Done(): + t.Error("Context timed out waiting for update on ready channel") + } + }, + } + bd.Data = balancer.Get(pickfirst.Name).Build(ccw, bd.BuildOptions) + }, + Close: func(bd *stub.BalancerData) { + bd.Data.(balancer.Balancer).Close() + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs) + }, + } + + stub.Register(t.Name(), bf) + svcCfg := fmt.Sprintf(`{ "loadBalancingConfig": [{%q: {}}] }`, t.Name()) + backend := stubserver.StartTestService(t, nil) + defer backend.Stop() + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(svcCfg), + } + cc, err := grpc.NewClient(backend.Address, opts...) + if err != nil { + t.Fatalf("grpc.NewClient(%q) failed: %v", backend.Address, err) + + } + defer cc.Close() + + cc.Connect() + + var sc balancer.SubConn + select { + case sc = <-scChan: + case <-ctx.Done(): + t.Fatal("Context timed out waiting for SubConn creation") + } + + // Wait for the SubConn to enter READY. + select { + case <-readyUpdateReceivedCh: + case <-ctx.Done(): + t.Fatalf("Context timed out waiting for SubConn to enter READY") + } + + healthChan1 := make(chan balancer.SubConnState, 1) + healthChan2 := make(chan balancer.SubConnState, 1) + + sc.RegisterHealthListener(func(scs balancer.SubConnState) { + healthChan1 <- scs + }) + sc.RegisterHealthListener(func(scs balancer.SubConnState) { + healthChan2 <- scs + }) + close(readyUpdateResumeCh) + + select { + case scs := <-healthChan2: + if scs.ConnectivityState != connectivity.Ready { + t.Fatalf("Received health update = %v, want = %v", scs.ConnectivityState, connectivity.Ready) + } + case <-ctx.Done(): + t.Fatalf("Context timed out waiting for health update") + } + + // No updates should be received on the first listener. + select { + case scs := <-healthChan1: + t.Fatalf("Received unexpected health update on first listener: %v", scs) + case <-time.After(defaultTestShortTimeout): + } +} + +// Test calls RegisterHealthListener on a SubConn with a nil listener and +// verifies that the listener registered before the nil listener doesn't receive +// any further updates. +func (s) TestSubConn_RegisterHealthListener_NilListener(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + scChan := make(chan balancer.SubConn, 1) + readyUpdateResumeCh := make(chan struct{}) + readyUpdateReceivedCh := make(chan struct{}) + bf := stub.BalancerFuncs{ + Init: func(bd *stub.BalancerData) { + cc := bd.ClientConn + ccw := &subConnStoringCCWrapper{ + ClientConn: cc, + scChan: scChan, + stateListener: func(scs balancer.SubConnState) { + if scs.ConnectivityState != connectivity.Ready { + return + } + close(readyUpdateReceivedCh) + select { + case <-readyUpdateResumeCh: + case <-ctx.Done(): + t.Error("Context timed out waiting for update on ready channel") + } + }, + } + bd.Data = balancer.Get(pickfirst.Name).Build(ccw, bd.BuildOptions) + }, + Close: func(bd *stub.BalancerData) { + bd.Data.(balancer.Balancer).Close() + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs) + }, + } + + stub.Register(t.Name(), bf) + svcCfg := fmt.Sprintf(`{ "loadBalancingConfig": [{%q: {}}] }`, t.Name()) + backend := stubserver.StartTestService(t, nil) + defer backend.Stop() + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(svcCfg), + } + cc, err := grpc.NewClient(backend.Address, opts...) + if err != nil { + t.Fatalf("grpc.NewClient(%q) failed: %v", backend.Address, err) + + } + defer cc.Close() + + cc.Connect() + + var sc balancer.SubConn + select { + case sc = <-scChan: + case <-ctx.Done(): + t.Fatal("Context timed out waiting for SubConn creation") + } + + // Wait for the SubConn to enter READY. + select { + case <-readyUpdateReceivedCh: + case <-ctx.Done(): + t.Fatalf("Context timed out waiting for SubConn to enter READY") + } + + healthChan := make(chan balancer.SubConnState, 1) + + sc.RegisterHealthListener(func(scs balancer.SubConnState) { + healthChan <- scs + }) + + // Registering a nil listener should invalidate the previously registered + // listener. + sc.RegisterHealthListener(nil) + close(readyUpdateResumeCh) + + // No updates should be received on the listener. + select { + case scs := <-healthChan: + t.Fatalf("Received unexpected health update on the listener: %v", scs) + case <-time.After(defaultTestShortTimeout): + } +}