mirror of https://github.com/grpc/grpc-go.git
				
				
				
			
		
			
				
	
	
		
			361 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			361 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
| /*
 | |
|  *
 | |
|  * Copyright 2023 gRPC authors.
 | |
|  *
 | |
|  * Licensed under the Apache License, Version 2.0 (the "License");
 | |
|  * you may not use this file except in compliance with the License.
 | |
|  * You may obtain a copy of the License at
 | |
|  *
 | |
|  *     http://www.apache.org/licenses/LICENSE-2.0
 | |
|  *
 | |
|  * Unless required by applicable law or agreed to in writing, software
 | |
|  * distributed under the License is distributed on an "AS IS" BASIS,
 | |
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|  * See the License for the specific language governing permissions and
 | |
|  * limitations under the License.
 | |
|  *
 | |
|  */
 | |
| 
 | |
| package grpc
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"testing"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	defaultTestIdleTimeout  = 500 * time.Millisecond // A short idle_timeout for tests.
 | |
| 	defaultTestShortTimeout = 10 * time.Millisecond  // A small deadline to wait for events expected to not happen.
 | |
| )
 | |
| 
 | |
| type testIdlenessEnforcer struct {
 | |
| 	exitIdleCh  chan struct{}
 | |
| 	enterIdleCh chan struct{}
 | |
| }
 | |
| 
 | |
| func (ti *testIdlenessEnforcer) exitIdleMode() error {
 | |
| 	ti.exitIdleCh <- struct{}{}
 | |
| 	return nil
 | |
| 
 | |
| }
 | |
| 
 | |
| func (ti *testIdlenessEnforcer) enterIdleMode() error {
 | |
| 	ti.enterIdleCh <- struct{}{}
 | |
| 	return nil
 | |
| 
 | |
| }
 | |
| 
 | |
| func newTestIdlenessEnforcer() *testIdlenessEnforcer {
 | |
| 	return &testIdlenessEnforcer{
 | |
| 		exitIdleCh:  make(chan struct{}, 1),
 | |
| 		enterIdleCh: make(chan struct{}, 1),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // overrideNewTimer overrides the new timer creation function by ensuring that a
 | |
| // message is pushed on the returned channel everytime the timer fires.
 | |
| func overrideNewTimer(t *testing.T) <-chan struct{} {
 | |
| 	t.Helper()
 | |
| 
 | |
| 	ch := make(chan struct{}, 1)
 | |
| 	origTimeAfterFunc := timeAfterFunc
 | |
| 	timeAfterFunc = func(d time.Duration, callback func()) *time.Timer {
 | |
| 		return time.AfterFunc(d, func() {
 | |
| 			select {
 | |
| 			case ch <- struct{}{}:
 | |
| 			default:
 | |
| 			}
 | |
| 			callback()
 | |
| 		})
 | |
| 	}
 | |
| 	t.Cleanup(func() { timeAfterFunc = origTimeAfterFunc })
 | |
| 	return ch
 | |
| }
 | |
| 
 | |
| // TestIdlenessManager_Disabled tests the case where the idleness manager is
 | |
| // disabled by passing an idle_timeout of 0. Verifies the following things:
 | |
| //   - timer callback does not fire
 | |
| //   - an RPC does not trigger a call to exitIdleMode on the ClientConn
 | |
| //   - more calls to RPC termination (as compared to RPC initiation) does not
 | |
| //     result in an error log
 | |
| func (s) TestIdlenessManager_Disabled(t *testing.T) {
 | |
| 	callbackCh := overrideNewTimer(t)
 | |
| 
 | |
| 	// Create an idleness manager that is disabled because of idleTimeout being
 | |
| 	// set to `0`.
 | |
| 	enforcer := newTestIdlenessEnforcer()
 | |
| 	mgr := newIdlenessManager(enforcer, time.Duration(0))
 | |
| 
 | |
| 	// Ensure that the timer callback does not fire within a short deadline.
 | |
| 	select {
 | |
| 	case <-callbackCh:
 | |
| 		t.Fatal("Idle timer callback fired when manager is disabled")
 | |
| 	case <-time.After(defaultTestShortTimeout):
 | |
| 	}
 | |
| 
 | |
| 	// The first invocation of onCallBegin() would lead to a call to
 | |
| 	// exitIdleMode() on the enforcer, unless the idleness manager is disabled.
 | |
| 	mgr.onCallBegin()
 | |
| 	select {
 | |
| 	case <-enforcer.exitIdleCh:
 | |
| 		t.Fatalf("exitIdleMode() called on enforcer when manager is disabled")
 | |
| 	case <-time.After(defaultTestShortTimeout):
 | |
| 	}
 | |
| 
 | |
| 	// If the number of calls to onCallEnd() exceeds the number of calls to
 | |
| 	// onCallBegin(), the idleness manager is expected to throw an error log
 | |
| 	// (which will cause our TestLogger to fail the test). But since the manager
 | |
| 	// is disabled, this should not happen.
 | |
| 	mgr.onCallEnd()
 | |
| 	mgr.onCallEnd()
 | |
| 
 | |
| 	// The idleness manager is explicitly not closed here. But since the manager
 | |
| 	// is disabled, it will not start the run goroutine, and hence we expect the
 | |
| 	// leakchecker to not find any leaked goroutines.
 | |
| }
 | |
| 
 | |
| // TestIdlenessManager_Enabled_TimerFires tests the case where the idle manager
 | |
| // is enabled. Ensures that when there are no RPCs, the timer callback is
 | |
| // invoked and the enterIdleMode() method is invoked on the enforcer.
 | |
| func (s) TestIdlenessManager_Enabled_TimerFires(t *testing.T) {
 | |
| 	callbackCh := overrideNewTimer(t)
 | |
| 
 | |
| 	enforcer := newTestIdlenessEnforcer()
 | |
| 	mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout))
 | |
| 	defer mgr.close()
 | |
| 
 | |
| 	// Ensure that the timer callback fires within a appropriate amount of time.
 | |
| 	select {
 | |
| 	case <-callbackCh:
 | |
| 	case <-time.After(2 * defaultTestIdleTimeout):
 | |
| 		t.Fatal("Timeout waiting for idle timer callback to fire")
 | |
| 	}
 | |
| 
 | |
| 	// Ensure that the channel moves to idle mode eventually.
 | |
| 	select {
 | |
| 	case <-enforcer.enterIdleCh:
 | |
| 	case <-time.After(defaultTestTimeout):
 | |
| 		t.Fatal("Timeout waiting for channel to move to idle")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestIdlenessManager_Enabled_OngoingCall tests the case where the idle manager
 | |
| // is enabled. Ensures that when there is an ongoing RPC, the channel does not
 | |
| // enter idle mode.
 | |
| func (s) TestIdlenessManager_Enabled_OngoingCall(t *testing.T) {
 | |
| 	callbackCh := overrideNewTimer(t)
 | |
| 
 | |
| 	enforcer := newTestIdlenessEnforcer()
 | |
| 	mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout))
 | |
| 	defer mgr.close()
 | |
| 
 | |
| 	// Fire up a goroutine that simulates an ongoing RPC that is terminated
 | |
| 	// after the timer callback fires for the first time.
 | |
| 	timerFired := make(chan struct{})
 | |
| 	go func() {
 | |
| 		mgr.onCallBegin()
 | |
| 		<-timerFired
 | |
| 		mgr.onCallEnd()
 | |
| 	}()
 | |
| 
 | |
| 	// Ensure that the timer callback fires and unblock the above goroutine.
 | |
| 	select {
 | |
| 	case <-callbackCh:
 | |
| 		close(timerFired)
 | |
| 	case <-time.After(2 * defaultTestIdleTimeout):
 | |
| 		t.Fatal("Timeout waiting for idle timer callback to fire")
 | |
| 	}
 | |
| 
 | |
| 	// The invocation of the timer callback should not put the channel in idle
 | |
| 	// mode since we had an ongoing RPC.
 | |
| 	select {
 | |
| 	case <-enforcer.enterIdleCh:
 | |
| 		t.Fatalf("enterIdleMode() called on enforcer when active RPC exists")
 | |
| 	case <-time.After(defaultTestShortTimeout):
 | |
| 	}
 | |
| 
 | |
| 	// Since we terminated the ongoing RPC and we have no other active RPCs, the
 | |
| 	// channel must move to idle eventually.
 | |
| 	select {
 | |
| 	case <-enforcer.enterIdleCh:
 | |
| 	case <-time.After(defaultTestTimeout):
 | |
| 		t.Fatal("Timeout waiting for channel to move to idle")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestIdlenessManager_Enabled_ActiveSinceLastCheck tests the case where the
 | |
| // idle manager is enabled. Ensures that when there are active RPCs in the last
 | |
| // period (even though there is no active call when the timer fires), the
 | |
| // channel does not enter idle mode.
 | |
| func (s) TestIdlenessManager_Enabled_ActiveSinceLastCheck(t *testing.T) {
 | |
| 	callbackCh := overrideNewTimer(t)
 | |
| 
 | |
| 	enforcer := newTestIdlenessEnforcer()
 | |
| 	mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout))
 | |
| 	defer mgr.close()
 | |
| 
 | |
| 	// Fire up a goroutine that simulates unary RPCs until the timer callback
 | |
| 	// fires.
 | |
| 	timerFired := make(chan struct{})
 | |
| 	go func() {
 | |
| 		for ; ; <-time.After(defaultTestShortTimeout) {
 | |
| 			mgr.onCallBegin()
 | |
| 			mgr.onCallEnd()
 | |
| 
 | |
| 			select {
 | |
| 			case <-timerFired:
 | |
| 				return
 | |
| 			default:
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	// Ensure that the timer callback fires, and that we don't enter idle as
 | |
| 	// part of this invocation of the timer callback, since we had some RPCs in
 | |
| 	// this period.
 | |
| 	select {
 | |
| 	case <-callbackCh:
 | |
| 		close(timerFired)
 | |
| 	case <-time.After(2 * defaultTestIdleTimeout):
 | |
| 		t.Fatal("Timeout waiting for idle timer callback to fire")
 | |
| 	}
 | |
| 	select {
 | |
| 	case <-enforcer.enterIdleCh:
 | |
| 		t.Fatalf("enterIdleMode() called on enforcer when one RPC completed in the last period")
 | |
| 	case <-time.After(defaultTestShortTimeout):
 | |
| 	}
 | |
| 
 | |
| 	// Since the unrary RPC terminated and we have no other active RPCs, the
 | |
| 	// channel must move to idle eventually.
 | |
| 	select {
 | |
| 	case <-enforcer.enterIdleCh:
 | |
| 	case <-time.After(defaultTestTimeout):
 | |
| 		t.Fatal("Timeout waiting for channel to move to idle")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestIdlenessManager_Enabled_ExitIdleOnRPC tests the case where the idle
 | |
| // manager is enabled. Ensures that the channel moves out of idle when an RPC is
 | |
| // initiated.
 | |
| func (s) TestIdlenessManager_Enabled_ExitIdleOnRPC(t *testing.T) {
 | |
| 	overrideNewTimer(t)
 | |
| 
 | |
| 	enforcer := newTestIdlenessEnforcer()
 | |
| 	mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout))
 | |
| 	defer mgr.close()
 | |
| 
 | |
| 	// Ensure that the channel moves to idle since there are no RPCs.
 | |
| 	select {
 | |
| 	case <-enforcer.enterIdleCh:
 | |
| 	case <-time.After(2 * defaultTestIdleTimeout):
 | |
| 		t.Fatal("Timeout waiting for channel to move to idle mode")
 | |
| 	}
 | |
| 
 | |
| 	for i := 0; i < 100; i++ {
 | |
| 		// A call to onCallBegin and onCallEnd simulates an RPC.
 | |
| 		go func() {
 | |
| 			if err := mgr.onCallBegin(); err != nil {
 | |
| 				t.Errorf("onCallBegin() failed: %v", err)
 | |
| 			}
 | |
| 			mgr.onCallEnd()
 | |
| 		}()
 | |
| 	}
 | |
| 
 | |
| 	// Ensure that the channel moves out of idle as a result of the above RPC.
 | |
| 	select {
 | |
| 	case <-enforcer.exitIdleCh:
 | |
| 	case <-time.After(2 * defaultTestIdleTimeout):
 | |
| 		t.Fatal("Timeout waiting for channel to move out of idle mode")
 | |
| 	}
 | |
| 
 | |
| 	// Ensure that only one call to exit idle mode is made to the CC.
 | |
| 	sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
 | |
| 	defer sCancel()
 | |
| 	select {
 | |
| 	case <-enforcer.exitIdleCh:
 | |
| 		t.Fatal("More than one call to exit idle mode on the ClientConn; only one expected")
 | |
| 	case <-sCtx.Done():
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type racyIdlenessState int32
 | |
| 
 | |
| const (
 | |
| 	stateInital racyIdlenessState = iota
 | |
| 	stateEnteredIdle
 | |
| 	stateExitedIdle
 | |
| 	stateActiveRPCs
 | |
| )
 | |
| 
 | |
| // racyIdlnessEnforcer is a test idleness enforcer used specifically to test the
 | |
| // race between idle timeout and incoming RPCs.
 | |
| type racyIdlenessEnforcer struct {
 | |
| 	state *racyIdlenessState // Accessed atomically.
 | |
| }
 | |
| 
 | |
| // exitIdleMode sets the internal state to stateExitedIdle. We should only ever
 | |
| // exit idle when we are currently in idle.
 | |
| func (ri *racyIdlenessEnforcer) exitIdleMode() error {
 | |
| 	if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateEnteredIdle), int32(stateExitedIdle)) {
 | |
| 		return fmt.Errorf("idleness enforcer asked to exit idle when it did not enter idle earlier")
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // enterIdleMode attempts to set the internal state to stateEnteredIdle. We should only ever enter idle before RPCs start.
 | |
| func (ri *racyIdlenessEnforcer) enterIdleMode() error {
 | |
| 	if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateInital), int32(stateEnteredIdle)) {
 | |
| 		return fmt.Errorf("idleness enforcer asked to enter idle after rpcs started")
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // TestIdlenessManager_IdleTimeoutRacesWithOnCallBegin tests the case where
 | |
| // firing of the idle timeout races with an incoming RPC. The test verifies that
 | |
| // if the timer callback win the race and puts the channel in idle, the RPCs can
 | |
| // kick it out of idle. And if the RPCs win the race and keep the channel
 | |
| // active, then the timer callback should not attempt to put the channel in idle
 | |
| // mode.
 | |
| func (s) TestIdlenessManager_IdleTimeoutRacesWithOnCallBegin(t *testing.T) {
 | |
| 	// Run multiple iterations to simulate different possibilities.
 | |
| 	for i := 0; i < 10; i++ {
 | |
| 		t.Run(fmt.Sprintf("iteration=%d", i), func(t *testing.T) {
 | |
| 			var idlenessState racyIdlenessState
 | |
| 			enforcer := &racyIdlenessEnforcer{state: &idlenessState}
 | |
| 
 | |
| 			// Configure a large idle timeout so that we can control the
 | |
| 			// race between the timer callback and RPCs.
 | |
| 			mgr := newIdlenessManager(enforcer, time.Duration(10*time.Minute))
 | |
| 			defer mgr.close()
 | |
| 
 | |
| 			var wg sync.WaitGroup
 | |
| 			wg.Add(1)
 | |
| 			go func() {
 | |
| 				defer wg.Done()
 | |
| 				m := mgr.(interface{ handleIdleTimeout() })
 | |
| 				<-time.After(defaultTestIdleTimeout)
 | |
| 				m.handleIdleTimeout()
 | |
| 			}()
 | |
| 			for j := 0; j < 100; j++ {
 | |
| 				wg.Add(1)
 | |
| 				go func() {
 | |
| 					defer wg.Done()
 | |
| 					// Wait for the configured idle timeout and simulate an RPC to
 | |
| 					// race with the idle timeout timer callback.
 | |
| 					<-time.After(defaultTestIdleTimeout)
 | |
| 					if err := mgr.onCallBegin(); err != nil {
 | |
| 						t.Errorf("onCallBegin() failed: %v", err)
 | |
| 					}
 | |
| 					atomic.StoreInt32((*int32)(&idlenessState), int32(stateActiveRPCs))
 | |
| 					mgr.onCallEnd()
 | |
| 				}()
 | |
| 			}
 | |
| 			wg.Wait()
 | |
| 		})
 | |
| 	}
 | |
| }
 |