mirror of https://github.com/grpc/grpc-go.git
idle: use LB policy close event as a proxy for channel idleness (#6628)
This commit is contained in:
parent
2d1bb21e4d
commit
9deee9ba5f
|
@ -20,7 +20,6 @@ package idle_test
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
|
@ -28,9 +27,12 @@ import (
|
|||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/balancer/roundrobin"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/internal/balancer/stub"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/internal/grpctest"
|
||||
"google.golang.org/grpc/internal/stubserver"
|
||||
|
@ -83,26 +85,41 @@ func channelzTraceEventFound(ctx context.Context, wantDesc string) error {
|
|||
return fmt.Errorf("when looking for channelz trace event with description %q, %w", wantDesc, ctx.Err())
|
||||
}
|
||||
|
||||
// channelzTraceEventNotFound looks up the top-channels in channelz (expects a
|
||||
// single one), and verifies that there is no trace event on the channel
|
||||
// matching the provided description string.
|
||||
func channelzTraceEventNotFound(ctx context.Context, wantDesc string) error {
|
||||
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
|
||||
defer sCancel()
|
||||
// Registers a wrapped round_robin LB policy for the duration of this test that
|
||||
// retains all the functionality of the round_robin LB policy and makes the
|
||||
// balancer close event available for inspection by the test.
|
||||
//
|
||||
// Returns a channel that gets pinged when the balancer is closed.
|
||||
func registerWrappedRoundRobinPolicy(t *testing.T) chan struct{} {
|
||||
rrBuilder := balancer.Get(roundrobin.Name)
|
||||
closeCh := make(chan struct{}, 1)
|
||||
stub.Register(roundrobin.Name, stub.BalancerFuncs{
|
||||
Init: func(bd *stub.BalancerData) {
|
||||
bd.Data = rrBuilder.Build(bd.ClientConn, bd.BuildOptions)
|
||||
},
|
||||
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
|
||||
bal := bd.Data.(balancer.Balancer)
|
||||
return bal.UpdateClientConnState(ccs)
|
||||
},
|
||||
Close: func(bd *stub.BalancerData) {
|
||||
select {
|
||||
case closeCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
bal := bd.Data.(balancer.Balancer)
|
||||
bal.Close()
|
||||
},
|
||||
})
|
||||
t.Cleanup(func() { balancer.Register(rrBuilder) })
|
||||
|
||||
err := channelzTraceEventFound(sCtx, wantDesc)
|
||||
if err == nil {
|
||||
return fmt.Errorf("found channelz trace event with description %q, when expected not to", wantDesc)
|
||||
}
|
||||
if !errors.Is(err, context.DeadlineExceeded) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return closeCh
|
||||
}
|
||||
|
||||
// Tests the case where channel idleness is disabled by passing an idle_timeout
|
||||
// of 0. Verifies that a READY channel with no RPCs does not move to IDLE.
|
||||
func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) {
|
||||
closeCh := registerWrappedRoundRobinPolicy(t)
|
||||
|
||||
// Create a ClientConn with idle_timeout set to 0.
|
||||
r := manual.NewBuilderWithScheme("whatever")
|
||||
dopts := []grpc.DialOption{
|
||||
|
@ -127,17 +144,19 @@ func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) {
|
|||
defer cancel()
|
||||
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
|
||||
|
||||
// Verify that the ClientConn stay in READY.
|
||||
// Verify that the ClientConn stays in READY.
|
||||
sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout)
|
||||
defer sCancel()
|
||||
testutils.AwaitNoStateChange(sCtx, t, cc, connectivity.Ready)
|
||||
|
||||
// Verify that there are no idleness related channelz events.
|
||||
if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := channelzTraceEventNotFound(ctx, "exiting idle mode"); err != nil {
|
||||
t.Fatal(err)
|
||||
// Verify that the LB policy is not closed which is expected to happen when
|
||||
// the channel enters IDLE.
|
||||
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortIdleTimeout)
|
||||
defer sCancel()
|
||||
select {
|
||||
case <-sCtx.Done():
|
||||
case <-closeCh:
|
||||
t.Fatal("LB policy closed when expected not to")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -145,6 +164,8 @@ func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) {
|
|||
// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE, and
|
||||
// the connection to the backend is closed.
|
||||
func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
|
||||
closeCh := registerWrappedRoundRobinPolicy(t)
|
||||
|
||||
// Create a ClientConn with a short idle_timeout.
|
||||
r := manual.NewBuilderWithScheme("whatever")
|
||||
dopts := []grpc.DialOption{
|
||||
|
@ -189,6 +210,13 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
|
|||
if _, err := conn.CloseCh.Receive(ctx); err != nil {
|
||||
t.Fatalf("Failed when waiting for connection to be closed after channel entered IDLE: %v", err)
|
||||
}
|
||||
|
||||
// Verify that the LB policy is closed.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Fatal("Timeout waiting for LB policy to be closed after the channel enters IDLE")
|
||||
case <-closeCh:
|
||||
}
|
||||
}
|
||||
|
||||
// Tests the case where channel idleness is enabled by passing a small value for
|
||||
|
@ -224,6 +252,8 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
|
|||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
closeCh := registerWrappedRoundRobinPolicy(t)
|
||||
|
||||
// Create a ClientConn with a short idle_timeout.
|
||||
r := manual.NewBuilderWithScheme("whatever")
|
||||
dopts := []grpc.DialOption{
|
||||
|
@ -277,15 +307,16 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
|
|||
return
|
||||
}
|
||||
|
||||
// Verify that there are no idleness related channelz events.
|
||||
//
|
||||
// TODO: Improve the checks here. If these log strings are
|
||||
// changed in the code, these checks will continue to pass.
|
||||
if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
// Verify that the LB policy is not closed which is expected to happen when
|
||||
// the channel enters IDLE.
|
||||
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortIdleTimeout)
|
||||
defer sCancel()
|
||||
select {
|
||||
case <-sCtx.Done():
|
||||
case <-closeCh:
|
||||
errCh <- fmt.Errorf("LB policy closed when expected not to")
|
||||
}
|
||||
errCh <- channelzTraceEventNotFound(ctx, "exiting idle mode")
|
||||
errCh <- nil
|
||||
}()
|
||||
|
||||
if err := test.makeRPC(ctx, testgrpc.NewTestServiceClient(cc)); err != nil {
|
||||
|
@ -308,6 +339,8 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
|
|||
// idle_timeout. Verifies that activity on a READY channel (frequent and short
|
||||
// RPCs) keeps it from moving to IDLE.
|
||||
func (s) TestChannelIdleness_Enabled_ActiveSinceLastCheck(t *testing.T) {
|
||||
closeCh := registerWrappedRoundRobinPolicy(t)
|
||||
|
||||
// Create a ClientConn with a short idle_timeout.
|
||||
r := manual.NewBuilderWithScheme("whatever")
|
||||
dopts := []grpc.DialOption{
|
||||
|
@ -352,15 +385,15 @@ func (s) TestChannelIdleness_Enabled_ActiveSinceLastCheck(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
|
||||
// Verify that the ClientConn stay in READY.
|
||||
// Verify that the ClientConn stays in READY.
|
||||
testutils.AwaitNoStateChange(sCtx, t, cc, connectivity.Ready)
|
||||
|
||||
// Verify that there are no idleness related channelz events.
|
||||
if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := channelzTraceEventNotFound(ctx, "exiting idle mode"); err != nil {
|
||||
t.Fatal(err)
|
||||
// Verify that the LB policy is not closed which is expected to happen when
|
||||
// the channel enters IDLE.
|
||||
select {
|
||||
case <-sCtx.Done():
|
||||
case <-closeCh:
|
||||
t.Fatal("LB policy closed when expected not to")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -368,6 +401,8 @@ func (s) TestChannelIdleness_Enabled_ActiveSinceLastCheck(t *testing.T) {
|
|||
// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE. Also
|
||||
// verifies that a subsequent RPC on the IDLE channel kicks it out of IDLE.
|
||||
func (s) TestChannelIdleness_Enabled_ExitIdleOnRPC(t *testing.T) {
|
||||
closeCh := registerWrappedRoundRobinPolicy(t)
|
||||
|
||||
// Start a test backend and set the bootstrap state of the resolver to
|
||||
// include this address. This will ensure that when the resolver is
|
||||
// restarted when exiting idle, it will push the same address to grpc again.
|
||||
|
@ -402,6 +437,13 @@ func (s) TestChannelIdleness_Enabled_ExitIdleOnRPC(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Verify that the LB policy is closed.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Fatal("Timeout waiting for LB policy to be closed after the channel enters IDLE")
|
||||
case <-closeCh:
|
||||
}
|
||||
|
||||
// Make an RPC and ensure that it succeeds and moves the channel back to
|
||||
// READY.
|
||||
client := testgrpc.NewTestServiceClient(cc)
|
||||
|
|
Loading…
Reference in New Issue