internal/idle: add a test that invokes ClientConn methods concurrently (#6659)

This commit is contained in:
Easwar Swaminathan 2023-09-29 14:23:45 -07:00 committed by GitHub
parent fd9ef7263a
commit 1466283cc6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 115 additions and 25 deletions

View File

@ -337,8 +337,8 @@ func (cc *ClientConn) exitIdleMode() error {
return errConnClosing
}
if cc.idlenessState != ccIdlenessStateIdle {
cc.mu.Unlock()
channelz.Infof(logger, cc.channelzID, "ClientConn asked to exit idle mode, current mode is %v", cc.idlenessState)
cc.mu.Unlock()
return nil
}
@ -404,13 +404,13 @@ func (cc *ClientConn) exitIdleMode() error {
// name resolver, load balancer and any subchannels.
func (cc *ClientConn) enterIdleMode() error {
cc.mu.Lock()
defer cc.mu.Unlock()
if cc.conns == nil {
cc.mu.Unlock()
return ErrClientConnClosing
}
if cc.idlenessState != ccIdlenessStateActive {
channelz.Errorf(logger, cc.channelzID, "ClientConn asked to enter idle mode, current mode is %v", cc.idlenessState)
cc.mu.Unlock()
channelz.Warningf(logger, cc.channelzID, "ClientConn asked to enter idle mode, current mode is %v", cc.idlenessState)
return nil
}
@ -431,14 +431,14 @@ func (cc *ClientConn) enterIdleMode() error {
cc.balancerWrapper.enterIdleMode()
cc.csMgr.updateState(connectivity.Idle)
cc.idlenessState = ccIdlenessStateIdle
cc.mu.Unlock()
cc.addTraceEvent("entering idle mode")
go func() {
cc.addTraceEvent("entering idle mode")
for ac := range conns {
ac.tearDown(errConnIdling)
}
}()
return nil
}
@ -804,6 +804,12 @@ func init() {
internal.SubscribeToConnectivityStateChanges = func(cc *ClientConn, s grpcsync.Subscriber) func() {
return cc.csMgr.pubSub.Subscribe(s)
}
internal.EnterIdleModeForTesting = func(cc *ClientConn) error {
return cc.enterIdleMode()
}
internal.ExitIdleModeForTesting = func(cc *ClientConn) error {
return cc.exitIdleMode()
}
}
func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {

View File

@ -23,6 +23,7 @@ import (
"fmt"
"io"
"strings"
"sync"
"testing"
"time"
@ -32,6 +33,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpctest"
@ -132,11 +134,11 @@ func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) {
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
defer cc.Close()
// Start a test backend and push an address update via the resolver.
backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)
defer backend.Stop()
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
// Verify that the ClientConn moves to READY.
@ -178,12 +180,12 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
defer cc.Close()
// Start a test backend and push an address update via the resolver.
lis := testutils.NewListenerWrapper(t, nil)
backend := stubserver.StartTestService(t, &stubserver.StubServer{Listener: lis})
t.Cleanup(backend.Stop)
defer backend.Stop()
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
// Verify that the ClientConn moves to READY.
@ -266,11 +268,10 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
defer cc.Close()
// Start a test backend which keeps a unary RPC call active by blocking on a
// channel that is closed by the test later on. Also push an address update
// via the resolver.
// Start a test backend that keeps the RPC call active by blocking
// on a channel that is closed by the test later on.
blockCh := make(chan struct{})
backend := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
@ -285,7 +286,10 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
if err := backend.StartServer(); err != nil {
t.Fatalf("Failed to start backend: %v", err)
}
t.Cleanup(backend.Stop)
defer backend.Stop()
// Push an address update containing the address of the above
// backend via the manual resolver.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
// Verify that the ClientConn moves to READY.
@ -293,8 +297,8 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
defer cancel()
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
// Spawn a goroutine which checks expected state transitions and idleness
// channelz trace events.
// Spawn a goroutine to check for expected behavior while a blocking
// RPC all is made from the main test goroutine.
errCh := make(chan error, 1)
go func() {
defer close(blockCh)
@ -353,11 +357,11 @@ func (s) TestChannelIdleness_Enabled_ActiveSinceLastCheck(t *testing.T) {
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
defer cc.Close()
// Start a test backend and push an address update via the resolver.
backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)
defer backend.Stop()
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
// Verify that the ClientConn moves to READY.
@ -408,7 +412,7 @@ func (s) TestChannelIdleness_Enabled_ExitIdleOnRPC(t *testing.T) {
// restarted when exiting idle, it will push the same address to grpc again.
r := manual.NewBuilderWithScheme("whatever")
backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)
defer backend.Stop()
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
// Create a ClientConn with a short idle_timeout.
@ -422,7 +426,7 @@ func (s) TestChannelIdleness_Enabled_ExitIdleOnRPC(t *testing.T) {
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
defer cc.Close()
// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@ -473,7 +477,7 @@ func (s) TestChannelIdleness_Enabled_IdleTimeoutRacesWithRPCs(t *testing.T) {
// restarted when exiting idle, it will push the same address to grpc again.
r := manual.NewBuilderWithScheme("whatever")
backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)
defer backend.Stop()
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
// Create a ClientConn with a short idle_timeout.
@ -487,7 +491,7 @@ func (s) TestChannelIdleness_Enabled_IdleTimeoutRacesWithRPCs(t *testing.T) {
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
defer cc.Close()
// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@ -516,7 +520,7 @@ func (s) TestChannelIdleness_Connect(t *testing.T) {
// restarted when exiting idle, it will push the same address to grpc again.
r := manual.NewBuilderWithScheme("whatever")
backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)
defer backend.Stop()
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
// Create a ClientConn with a short idle_timeout.
@ -530,7 +534,7 @@ func (s) TestChannelIdleness_Connect(t *testing.T) {
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
defer cc.Close()
// Verify that the ClientConn moves to IDLE.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@ -544,3 +548,77 @@ func (s) TestChannelIdleness_Connect(t *testing.T) {
// Verify that the ClientConn moves back to READY.
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
}
// runFunc runs f repeatedly until the context expires.
func runFunc(ctx context.Context, f func()) {
for {
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Millisecond):
f()
}
}
}
// Tests the scenario where there are concurrent calls to exit and enter idle
// mode on the ClientConn. Verifies that there is no race under this scenario.
func (s) TestChannelIdleness_RaceBetweenEnterAndExitIdleMode(t *testing.T) {
// Start a test backend and set the bootstrap state of the resolver to
// include this address. This will ensure that when the resolver is
// restarted when exiting idle, it will push the same address to grpc again.
r := manual.NewBuilderWithScheme("whatever")
backend := stubserver.StartTestService(t, nil)
defer backend.Stop()
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
// Create a ClientConn with a long idle_timeout. We will explicitly trigger
// entering and exiting IDLE mode from the test.
dopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithIdleTimeout(30 * time.Minute),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"pick_first":{}}]}`),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()
enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn) error)
enterIdleFunc := func() {
if err := enterIdle(cc); err != nil {
t.Errorf("Failed to enter idle mode: %v", err)
}
}
exitIdle := internal.ExitIdleModeForTesting.(func(*grpc.ClientConn) error)
exitIdleFunc := func() {
if err := exitIdle(cc); err != nil {
t.Errorf("Failed to exit idle mode: %v", err)
}
}
// Spawn goroutines that call methods on the ClientConn to enter and exit
// idle mode concurrently for one second.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
var wg sync.WaitGroup
wg.Add(4)
go func() {
runFunc(ctx, enterIdleFunc)
wg.Done()
}()
go func() {
runFunc(ctx, enterIdleFunc)
wg.Done()
}()
go func() {
runFunc(ctx, exitIdleFunc)
wg.Done()
}()
go func() {
runFunc(ctx, exitIdleFunc)
wg.Done()
}()
wg.Wait()
}

View File

@ -175,6 +175,12 @@ var (
// GRPCResolverSchemeExtraMetadata determines when gRPC will add extra
// metadata to RPCs.
GRPCResolverSchemeExtraMetadata string = "xds"
// EnterIdleModeForTesting gets the ClientConn to enter IDLE mode.
EnterIdleModeForTesting any // func(*grpc.ClientConn) error
// ExitIdleModeForTesting gets the ClientConn to exit IDLE mode.
ExitIdleModeForTesting any // func(*grpc.ClientConn) error
)
// HealthChecker defines the signature of the client-side LB channel health checking function.