mirror of https://github.com/grpc/grpc-go.git
384 lines
12 KiB
Go
384 lines
12 KiB
Go
/*
|
|
*
|
|
* Copyright 2023 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
package idle
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"google.golang.org/grpc/internal/grpctest"
|
|
)
|
|
|
|
const (
|
|
defaultTestTimeout = 10 * time.Second
|
|
defaultTestIdleTimeout = 500 * time.Millisecond // A short idle_timeout for tests.
|
|
defaultTestShortTimeout = 10 * time.Millisecond // A small deadline to wait for events expected to not happen.
|
|
)
|
|
|
|
type s struct {
|
|
grpctest.Tester
|
|
}
|
|
|
|
func Test(t *testing.T) {
|
|
grpctest.RunSubTests(t, s{})
|
|
}
|
|
|
|
type testEnforcer struct {
|
|
exitIdleCh chan struct{}
|
|
enterIdleCh chan struct{}
|
|
}
|
|
|
|
func (ti *testEnforcer) ExitIdleMode() error {
|
|
ti.exitIdleCh <- struct{}{}
|
|
return nil
|
|
|
|
}
|
|
|
|
func (ti *testEnforcer) EnterIdleMode() {
|
|
ti.enterIdleCh <- struct{}{}
|
|
}
|
|
|
|
func newTestEnforcer() *testEnforcer {
|
|
return &testEnforcer{
|
|
exitIdleCh: make(chan struct{}, 1),
|
|
enterIdleCh: make(chan struct{}, 1),
|
|
}
|
|
}
|
|
|
|
// overrideNewTimer overrides the new timer creation function by ensuring that a
|
|
// message is pushed on the returned channel everytime the timer fires.
|
|
func overrideNewTimer(t *testing.T) <-chan struct{} {
|
|
t.Helper()
|
|
|
|
ch := make(chan struct{}, 1)
|
|
origTimeAfterFunc := timeAfterFunc
|
|
timeAfterFunc = func(d time.Duration, callback func()) *time.Timer {
|
|
return time.AfterFunc(d, func() {
|
|
select {
|
|
case ch <- struct{}{}:
|
|
default:
|
|
}
|
|
callback()
|
|
})
|
|
}
|
|
t.Cleanup(func() { timeAfterFunc = origTimeAfterFunc })
|
|
return ch
|
|
}
|
|
|
|
// TestManager_Disabled tests the case where the idleness manager is
|
|
// disabled by passing an idle_timeout of 0. Verifies the following things:
|
|
// - timer callback does not fire
|
|
// - an RPC triggers a call to ExitIdleMode on the ClientConn
|
|
// - more calls to RPC termination (as compared to RPC initiation) does not
|
|
// result in an error log
|
|
func (s) TestManager_Disabled(t *testing.T) {
|
|
callbackCh := overrideNewTimer(t)
|
|
|
|
// Create an idleness manager that is disabled because of idleTimeout being
|
|
// set to `0`.
|
|
enforcer := newTestEnforcer()
|
|
mgr := NewManager(enforcer, time.Duration(0))
|
|
|
|
// Ensure that the timer callback does not fire within a short deadline.
|
|
select {
|
|
case <-callbackCh:
|
|
t.Fatal("Idle timer callback fired when manager is disabled")
|
|
case <-time.After(defaultTestShortTimeout):
|
|
}
|
|
|
|
// The first invocation of OnCallBegin() should lead to a call to
|
|
// ExitIdleMode() on the enforcer.
|
|
go mgr.OnCallBegin()
|
|
select {
|
|
case <-enforcer.exitIdleCh:
|
|
case <-time.After(defaultTestShortTimeout):
|
|
t.Fatal("Timeout waiting for channel to move out of idle mode")
|
|
}
|
|
|
|
// If the number of calls to OnCallEnd() exceeds the number of calls to
|
|
// OnCallBegin(), the idleness manager is expected to throw an error log
|
|
// (which will cause our TestLogger to fail the test). But since the manager
|
|
// is disabled, this should not happen.
|
|
mgr.OnCallEnd()
|
|
mgr.OnCallEnd()
|
|
|
|
// The idleness manager is explicitly not closed here. But since the manager
|
|
// is disabled, it will not start the run goroutine, and hence we expect the
|
|
// leak checker to not find any leaked goroutines.
|
|
}
|
|
|
|
// TestManager_Enabled_TimerFires tests the case where the idle manager
|
|
// is enabled. Ensures that when there are no RPCs, the timer callback is
|
|
// invoked and the EnterIdleMode() method is invoked on the enforcer.
|
|
func (s) TestManager_Enabled_TimerFires(t *testing.T) {
|
|
callbackCh := overrideNewTimer(t)
|
|
|
|
enforcer := newTestEnforcer()
|
|
mgr := NewManager(enforcer, time.Duration(defaultTestIdleTimeout))
|
|
defer mgr.Close()
|
|
mgr.ExitIdleMode()
|
|
|
|
// Ensure that the timer callback fires within an appropriate amount of time.
|
|
select {
|
|
case <-callbackCh:
|
|
case <-time.After(2 * defaultTestIdleTimeout):
|
|
t.Fatal("Timeout waiting for idle timer callback to fire")
|
|
}
|
|
|
|
// Ensure that the channel moves to idle mode eventually.
|
|
select {
|
|
case <-enforcer.enterIdleCh:
|
|
case <-time.After(defaultTestTimeout):
|
|
t.Fatal("Timeout waiting for channel to move to idle")
|
|
}
|
|
}
|
|
|
|
// TestManager_Enabled_OngoingCall tests the case where the idle manager
|
|
// is enabled. Ensures that when there is an ongoing RPC, the channel does not
|
|
// enter idle mode.
|
|
func (s) TestManager_Enabled_OngoingCall(t *testing.T) {
|
|
callbackCh := overrideNewTimer(t)
|
|
|
|
enforcer := newTestEnforcer()
|
|
mgr := NewManager(enforcer, time.Duration(defaultTestIdleTimeout))
|
|
defer mgr.Close()
|
|
mgr.ExitIdleMode()
|
|
|
|
// Fire up a goroutine that simulates an ongoing RPC that is terminated
|
|
// after the timer callback fires for the first time.
|
|
timerFired := make(chan struct{})
|
|
go func() {
|
|
mgr.OnCallBegin()
|
|
<-timerFired
|
|
mgr.OnCallEnd()
|
|
}()
|
|
|
|
// Ensure that the timer callback fires and unblock the above goroutine.
|
|
select {
|
|
case <-callbackCh:
|
|
close(timerFired)
|
|
case <-time.After(2 * defaultTestIdleTimeout):
|
|
t.Fatal("Timeout waiting for idle timer callback to fire")
|
|
}
|
|
|
|
// The invocation of the timer callback should not put the channel in idle
|
|
// mode since we had an ongoing RPC.
|
|
select {
|
|
case <-enforcer.enterIdleCh:
|
|
t.Fatalf("EnterIdleMode() called on enforcer when active RPC exists")
|
|
case <-time.After(defaultTestShortTimeout):
|
|
}
|
|
|
|
// Since we terminated the ongoing RPC and we have no other active RPCs, the
|
|
// channel must move to idle eventually.
|
|
select {
|
|
case <-enforcer.enterIdleCh:
|
|
case <-time.After(defaultTestTimeout):
|
|
t.Fatal("Timeout waiting for channel to move to idle")
|
|
}
|
|
}
|
|
|
|
// TestManager_Enabled_ActiveSinceLastCheck tests the case where the
|
|
// idle manager is enabled. Ensures that when there are active RPCs in the last
|
|
// period (even though there is no active call when the timer fires), the
|
|
// channel does not enter idle mode.
|
|
func (s) TestManager_Enabled_ActiveSinceLastCheck(t *testing.T) {
|
|
callbackCh := overrideNewTimer(t)
|
|
|
|
enforcer := newTestEnforcer()
|
|
mgr := NewManager(enforcer, time.Duration(defaultTestIdleTimeout))
|
|
defer mgr.Close()
|
|
mgr.ExitIdleMode()
|
|
|
|
// Fire up a goroutine that simulates unary RPCs until the timer callback
|
|
// fires.
|
|
timerFired := make(chan struct{})
|
|
go func() {
|
|
for ; ; <-time.After(defaultTestShortTimeout) {
|
|
mgr.OnCallBegin()
|
|
mgr.OnCallEnd()
|
|
|
|
select {
|
|
case <-timerFired:
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Ensure that the timer callback fires, and that we don't enter idle as
|
|
// part of this invocation of the timer callback, since we had some RPCs in
|
|
// this period.
|
|
select {
|
|
case <-callbackCh:
|
|
close(timerFired)
|
|
case <-time.After(2 * defaultTestIdleTimeout):
|
|
close(timerFired)
|
|
t.Fatal("Timeout waiting for idle timer callback to fire")
|
|
}
|
|
select {
|
|
case <-enforcer.enterIdleCh:
|
|
t.Fatalf("EnterIdleMode() called on enforcer when one RPC completed in the last period")
|
|
case <-time.After(defaultTestShortTimeout):
|
|
}
|
|
|
|
// Since the unary RPC terminated and we have no other active RPCs, the
|
|
// channel must move to idle eventually.
|
|
select {
|
|
case <-enforcer.enterIdleCh:
|
|
case <-time.After(defaultTestTimeout):
|
|
t.Fatal("Timeout waiting for channel to move to idle")
|
|
}
|
|
}
|
|
|
|
// TestManager_Enabled_ExitIdleOnRPC tests the case where the idle
|
|
// manager is enabled. Ensures that the channel moves out of idle when an RPC is
|
|
// initiated.
|
|
func (s) TestManager_Enabled_ExitIdleOnRPC(t *testing.T) {
|
|
overrideNewTimer(t)
|
|
|
|
enforcer := newTestEnforcer()
|
|
mgr := NewManager(enforcer, time.Duration(defaultTestIdleTimeout))
|
|
defer mgr.Close()
|
|
|
|
mgr.ExitIdleMode()
|
|
<-enforcer.exitIdleCh
|
|
// Ensure that the channel moves to idle since there are no RPCs.
|
|
select {
|
|
case <-enforcer.enterIdleCh:
|
|
case <-time.After(2 * defaultTestIdleTimeout):
|
|
t.Fatal("Timeout waiting for channel to move to idle mode")
|
|
}
|
|
|
|
for i := 0; i < 100; i++ {
|
|
// A call to OnCallBegin and OnCallEnd simulates an RPC.
|
|
go func() {
|
|
if err := mgr.OnCallBegin(); err != nil {
|
|
t.Errorf("OnCallBegin() failed: %v", err)
|
|
}
|
|
mgr.OnCallEnd()
|
|
}()
|
|
}
|
|
|
|
// Ensure that the channel moves out of idle as a result of the above RPC.
|
|
select {
|
|
case <-enforcer.exitIdleCh:
|
|
case <-time.After(2 * defaultTestIdleTimeout):
|
|
t.Fatal("Timeout waiting for channel to move out of idle mode")
|
|
}
|
|
|
|
// Ensure that only one call to exit idle mode is made to the CC.
|
|
sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
|
|
defer sCancel()
|
|
select {
|
|
case <-enforcer.exitIdleCh:
|
|
t.Fatal("More than one call to exit idle mode on the ClientConn; only one expected")
|
|
case <-sCtx.Done():
|
|
}
|
|
}
|
|
|
|
type racyState int32
|
|
|
|
const (
|
|
stateInitial racyState = iota
|
|
stateEnteredIdle
|
|
stateExitedIdle
|
|
stateActiveRPCs
|
|
)
|
|
|
|
// racyEnforcer is a test idleness enforcer used specifically to test the
|
|
// race between idle timeout and incoming RPCs.
|
|
type racyEnforcer struct {
|
|
t *testing.T
|
|
state *racyState // Accessed atomically.
|
|
started bool
|
|
}
|
|
|
|
// ExitIdleMode sets the internal state to stateExitedIdle. We should only ever
|
|
// exit idle when we are currently in idle.
|
|
func (ri *racyEnforcer) ExitIdleMode() error {
|
|
// Set only on the initial ExitIdleMode
|
|
if ri.started == false {
|
|
if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateInitial), int32(stateInitial)) {
|
|
return fmt.Errorf("idleness enforcer's first ExitIdleMode after EnterIdleMode")
|
|
}
|
|
ri.started = true
|
|
return nil
|
|
}
|
|
if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateEnteredIdle), int32(stateExitedIdle)) {
|
|
return fmt.Errorf("idleness enforcer asked to exit idle when it did not enter idle earlier")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// EnterIdleMode attempts to set the internal state to stateEnteredIdle. We should only ever enter idle before RPCs start.
|
|
func (ri *racyEnforcer) EnterIdleMode() {
|
|
if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateInitial), int32(stateEnteredIdle)) {
|
|
ri.t.Errorf("idleness enforcer asked to enter idle after rpcs started")
|
|
}
|
|
}
|
|
|
|
// TestManager_IdleTimeoutRacesWithOnCallBegin tests the case where firing of
|
|
// the idle timeout races with an incoming RPC. The test verifies that if the
|
|
// timer callback wins the race and puts the channel in idle, the RPCs can kick
|
|
// it out of idle. And if the RPCs win the race and keep the channel active,
|
|
// then the timer callback should not attempt to put the channel in idle mode.
|
|
func (s) TestManager_IdleTimeoutRacesWithOnCallBegin(t *testing.T) {
|
|
// Run multiple iterations to simulate different possibilities.
|
|
for i := 0; i < 20; i++ {
|
|
t.Run(fmt.Sprintf("iteration=%d", i), func(t *testing.T) {
|
|
var idlenessState racyState
|
|
enforcer := &racyEnforcer{t: t, state: &idlenessState}
|
|
|
|
// Configure a large idle timeout so that we can control the
|
|
// race between the timer callback and RPCs.
|
|
mgr := NewManager(enforcer, time.Duration(10*time.Minute))
|
|
defer mgr.Close()
|
|
mgr.ExitIdleMode()
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
<-time.After(defaultTestIdleTimeout / 50)
|
|
mgr.handleIdleTimeout()
|
|
}()
|
|
for j := 0; j < 5; j++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
// Wait for the configured idle timeout and simulate an RPC to
|
|
// race with the idle timeout timer callback.
|
|
<-time.After(defaultTestIdleTimeout / 50)
|
|
if err := mgr.OnCallBegin(); err != nil {
|
|
t.Errorf("OnCallBegin() failed: %v", err)
|
|
}
|
|
atomic.StoreInt32((*int32)(&idlenessState), int32(stateActiveRPCs))
|
|
mgr.OnCallEnd()
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
})
|
|
}
|
|
}
|