grpc: support channel idleness (#6263)

This commit is contained in:
Easwar Swaminathan 2023-05-22 12:42:45 -07:00 committed by GitHub
parent 098b2d00c5
commit 9b7a947cdc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1739 additions and 181 deletions

View File

@ -32,6 +32,15 @@ import (
"google.golang.org/grpc/resolver"
)
type ccbMode int
const (
ccbModeActive = iota
ccbModeIdle
ccbModeClosed
ccbModeExitingIdle
)
// ccBalancerWrapper sits between the ClientConn and the Balancer.
//
// ccBalancerWrapper implements methods corresponding to the ones on the
@ -46,16 +55,25 @@ import (
// It uses the gracefulswitch.Balancer internally to ensure that balancer
// switches happen in a graceful manner.
type ccBalancerWrapper struct {
cc *ClientConn
// The following fields are initialized when the wrapper is created and are
// read-only afterwards, and therefore can be accessed without a mutex.
cc *ClientConn
opts balancer.BuildOptions
// Outgoing (gRPC --> balancer) calls are guaranteed to execute in a
// mutually exclusive manner as they are scheduled on the
// CallbackSerializer. Fields accessed *only* in serializer callbacks, can
// therefore be accessed without a mutex.
serializer *grpcsync.CallbackSerializer
serializerCancel context.CancelFunc
balancer *gracefulswitch.Balancer
curBalancerName string
// mutually exclusive manner as they are scheduled in the serializer. Fields
// accessed *only* in these serializer callbacks, can therefore be accessed
// without a mutex.
balancer *gracefulswitch.Balancer
curBalancerName string
// mu guards access to the below fields. Access to the serializer and its
// cancel function needs to be mutex protected because they are overwritten
// when the wrapper exits idle mode.
mu sync.Mutex
serializer *grpcsync.CallbackSerializer // To serialize all outoing calls.
serializerCancel context.CancelFunc // To close the seralizer at close/enterIdle time.
mode ccbMode // Tracks the current mode of the wrapper.
}
// newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer
@ -64,6 +82,7 @@ func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalanc
ctx, cancel := context.WithCancel(context.Background())
ccb := &ccBalancerWrapper{
cc: cc,
opts: bopts,
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
}
@ -74,8 +93,12 @@ func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalanc
// updateClientConnState is invoked by grpc to push a ClientConnState update to
// the underlying balancer.
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
ccb.mu.Lock()
errCh := make(chan error, 1)
ccb.serializer.Schedule(func(_ context.Context) {
// Here and everywhere else where Schedule() is called, it is done with the
// lock held. But the lock guards only the scheduling part. The actual
// callback is called asynchronously without the lock being held.
ok := ccb.serializer.Schedule(func(_ context.Context) {
// If the addresses specified in the update contain addresses of type
// "grpclb" and the selected LB policy is not "grpclb", these addresses
// will be filtered out and ccs will be modified with the updated
@ -92,16 +115,19 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat
}
errCh <- ccb.balancer.UpdateClientConnState(*ccs)
})
// If the balancer wrapper is closed when waiting for this state update to
// be handled, the callback serializer will be closed as well, and we can
// rely on its Done channel to ensure that we don't block here forever.
select {
case err := <-errCh:
return err
case <-ccb.serializer.Done:
return nil
if !ok {
// If we are unable to schedule a function with the serializer, it
// indicates that it has been closed. A serializer is only closed when
// the wrapper is closed or is in idle.
ccb.mu.Unlock()
return fmt.Errorf("grpc: cannot send state update to a closed or idle balancer")
}
ccb.mu.Unlock()
// We get here only if the above call to Schedule succeeds, in which case it
// is guaranteed that the scheduled function will run. Therefore it is safe
// to block on this channel.
return <-errCh
}
// updateSubConnState is invoked by grpc to push a subConn state update to the
@ -120,21 +146,19 @@ func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connecti
if sc == nil {
return
}
ccb.mu.Lock()
ccb.serializer.Schedule(func(_ context.Context) {
ccb.balancer.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s, ConnectionError: err})
})
}
func (ccb *ccBalancerWrapper) exitIdle() {
ccb.serializer.Schedule(func(_ context.Context) {
ccb.balancer.ExitIdle()
})
ccb.mu.Unlock()
}
func (ccb *ccBalancerWrapper) resolverError(err error) {
ccb.mu.Lock()
ccb.serializer.Schedule(func(_ context.Context) {
ccb.balancer.ResolverError(err)
})
ccb.mu.Unlock()
}
// switchTo is invoked by grpc to instruct the balancer wrapper to switch to the
@ -148,42 +172,149 @@ func (ccb *ccBalancerWrapper) resolverError(err error) {
// the ccBalancerWrapper keeps track of the current LB policy name, and skips
// the graceful balancer switching process if the name does not change.
func (ccb *ccBalancerWrapper) switchTo(name string) {
ccb.mu.Lock()
ccb.serializer.Schedule(func(_ context.Context) {
// TODO: Other languages use case-sensitive balancer registries. We should
// switch as well. See: https://github.com/grpc/grpc-go/issues/5288.
if strings.EqualFold(ccb.curBalancerName, name) {
return
}
// Use the default LB policy, pick_first, if no LB policy with name is
// found in the registry.
builder := balancer.Get(name)
if builder == nil {
channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name)
builder = newPickfirstBuilder()
} else {
channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name)
}
if err := ccb.balancer.SwitchTo(builder); err != nil {
channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err)
return
}
ccb.curBalancerName = builder.Name()
ccb.buildLoadBalancingPolicy(name)
})
ccb.mu.Unlock()
}
// buildLoadBalancingPolicy performs the following:
// - retrieve a balancer builder for the given name. Use the default LB
// policy, pick_first, if no LB policy with name is found in the registry.
// - instruct the gracefulswitch balancer to switch to the above builder. This
// will actually build the new balancer.
// - update the `curBalancerName` field
//
// Must be called from a serializer callback.
func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) {
builder := balancer.Get(name)
if builder == nil {
channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name)
builder = newPickfirstBuilder()
} else {
channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name)
}
if err := ccb.balancer.SwitchTo(builder); err != nil {
channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err)
return
}
ccb.curBalancerName = builder.Name()
}
func (ccb *ccBalancerWrapper) close() {
// Close the serializer to ensure that no more calls from gRPC are sent to
// the balancer. We don't have to worry about suppressing calls from a
// closed balancer because these are handled by the ClientConn (balancer
// wrapper is only ever closed when the ClientConn is closed).
ccb.serializerCancel()
<-ccb.serializer.Done
ccb.balancer.Close()
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing")
ccb.closeBalancer(ccbModeClosed)
}
// enterIdleMode is invoked by grpc when the channel enters idle mode upon
// expiry of idle_timeout. This call blocks until the balancer is closed.
func (ccb *ccBalancerWrapper) enterIdleMode() {
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: entering idle mode")
ccb.closeBalancer(ccbModeIdle)
}
// closeBalancer is invoked when the channel is being closed or when it enters
// idle mode upon expiry of idle_timeout.
func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) {
ccb.mu.Lock()
if ccb.mode == ccbModeClosed || ccb.mode == ccbModeIdle {
ccb.mu.Unlock()
return
}
ccb.mode = m
done := ccb.serializer.Done
b := ccb.balancer
ok := ccb.serializer.Schedule(func(_ context.Context) {
// Close the serializer to ensure that no more calls from gRPC are sent
// to the balancer.
ccb.serializerCancel()
// Empty the current balancer name because we don't have a balancer
// anymore and also so that we act on the next call to switchTo by
// creating a new balancer specified by the new resolver.
ccb.curBalancerName = ""
})
if !ok {
ccb.mu.Unlock()
return
}
ccb.mu.Unlock()
// Give enqueued callbacks a chance to finish.
<-done
// Spawn a goroutine to close the balancer (since it may block trying to
// cleanup all allocated resources) and return early.
go b.Close()
}
// exitIdleMode is invoked by grpc when the channel exits idle mode either
// because of an RPC or because of an invocation of the Connect() API. This
// recreates the balancer that was closed previously when entering idle mode.
//
// If the channel is not in idle mode, we know for a fact that we are here as a
// result of the user calling the Connect() method on the ClientConn. In this
// case, we can simply forward the call to the underlying balancer, instructing
// it to reconnect to the backends.
func (ccb *ccBalancerWrapper) exitIdleMode() {
ccb.mu.Lock()
if ccb.mode == ccbModeClosed {
// Request to exit idle is a no-op when wrapper is already closed.
ccb.mu.Unlock()
return
}
if ccb.mode == ccbModeIdle {
// Recreate the serializer which was closed when we entered idle.
ctx, cancel := context.WithCancel(context.Background())
ccb.serializer = grpcsync.NewCallbackSerializer(ctx)
ccb.serializerCancel = cancel
}
// The ClientConn guarantees that mutual exclusion between close() and
// exitIdleMode(), and since we just created a new serializer, we can be
// sure that the below function will be scheduled.
done := make(chan struct{})
ccb.serializer.Schedule(func(_ context.Context) {
defer close(done)
ccb.mu.Lock()
defer ccb.mu.Unlock()
if ccb.mode != ccbModeIdle {
ccb.balancer.ExitIdle()
return
}
// Gracefulswitch balancer does not support a switchTo operation after
// being closed. Hence we need to create a new one here.
ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
ccb.mode = ccbModeActive
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode")
})
ccb.mu.Unlock()
<-done
}
func (ccb *ccBalancerWrapper) isIdleOrClosed() bool {
ccb.mu.Lock()
defer ccb.mu.Unlock()
return ccb.mode == ccbModeIdle || ccb.mode == ccbModeClosed
}
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
if ccb.isIdleOrClosed() {
return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle")
}
if len(addrs) <= 0 {
return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
}
@ -200,6 +331,18 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
}
func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
if ccb.isIdleOrClosed() {
// It it safe to ignore this call when the balancer is closed or in idle
// because the ClientConn takes care of closing the connections.
//
// Not returning early from here when the balancer is closed or in idle
// leads to a deadlock though, because of the following sequence of
// calls when holding cc.mu:
// cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close -->
// ccb.RemoveAddrConn --> cc.removeAddrConn
return
}
acbw, ok := sc.(*acBalancerWrapper)
if !ok {
return
@ -208,6 +351,10 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
}
func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
if ccb.isIdleOrClosed() {
return
}
acbw, ok := sc.(*acBalancerWrapper)
if !ok {
return
@ -216,6 +363,10 @@ func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resol
}
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
if ccb.isIdleOrClosed() {
return
}
// Update picker before updating state. Even though the ordering here does
// not matter, it can lead to multiple calls of Pick in the common start-up
// case where we wait for ready and then perform an RPC. If the picker is
@ -226,6 +377,10 @@ func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
}
func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) {
if ccb.isIdleOrClosed() {
return
}
ccb.cc.resolveNow(o)
}

View File

@ -27,6 +27,11 @@ import (
//
// All errors returned by Invoke are compatible with the status package.
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
if err := cc.idlenessMgr.onCallBegin(); err != nil {
return err
}
defer cc.idlenessMgr.onCallEnd()
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = combine(cc.dopts.callOptions, opts)

View File

@ -69,6 +69,9 @@ var (
errConnDrain = errors.New("grpc: the connection is drained")
// errConnClosing indicates that the connection is closing.
errConnClosing = errors.New("grpc: the connection is closing")
// errConnIdling indicates the the connection is being closed as the channel
// is moving to an idle mode due to inactivity.
errConnIdling = errors.New("grpc: the connection is closing due to channel idleness")
// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
// service config.
invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
@ -134,17 +137,29 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
czData: new(channelzData),
}
// We start the channel off in idle mode, but kick it out of idle at the end
// of this method, instead of waiting for the first RPC. Other gRPC
// implementations do wait for the first RPC to kick the channel out of
// idle. But doing so would be a major behavior change for our users who are
// used to seeing the channel active after Dial.
//
// Taking this approach of kicking it out of idle at the end of this method
// allows us to share the code between channel creation and exiting idle
// mode. This will also make it easy for us to switch to starting the
// channel off in idle, if at all we ever get to do that.
cc.idlenessState = ccIdlenessStateIdle
cc.retryThrottler.Store((*retryThrottler)(nil))
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.ctx, cc.cancel = context.WithCancel(context.Background())
cc.exitIdleCond = sync.NewCond(&cc.mu)
disableGlobalOpts := false
for _, opt := range opts {
@ -243,67 +258,175 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
go cc.scWatcher()
}
// This creates the name resolver, load balancer, blocking picker etc.
if err := cc.exitIdleMode(); err != nil {
return nil, err
}
// Configure idleness support with configured idle timeout or default idle
// timeout duration. Idleness can be explicitly disabled by the user, by
// setting the dial option to 0.
cc.idlenessMgr = newIdlenessManager(cc, cc.dopts.idleTimeout)
// Return early for non-blocking dials.
if !cc.dopts.block {
return cc, nil
}
// A blocking dial blocks until the clientConn is ready.
for {
s := cc.GetState()
if s == connectivity.Idle {
cc.Connect()
}
if s == connectivity.Ready {
return cc, nil
} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
if err = cc.connectionError(); err != nil {
terr, ok := err.(interface {
Temporary() bool
})
if ok && !terr.Temporary() {
return nil, err
}
}
}
if !cc.WaitForStateChange(ctx, s) {
// ctx got timeout or canceled.
if err = cc.connectionError(); err != nil && cc.dopts.returnLastError {
return nil, err
}
return nil, ctx.Err()
}
}
}
// addTraceEvent is a helper method to add a trace event on the channel. If the
// channel is a nested one, the same event is also added on the parent channel.
func (cc *ClientConn) addTraceEvent(msg string) {
ted := &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Channel %s", msg),
Severity: channelz.CtInfo,
}
if cc.dopts.channelzParentID != nil {
ted.Parent = &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested channel(id:%d) %s", cc.channelzID.Int(), msg),
Severity: channelz.CtInfo,
}
}
channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)
}
// exitIdleMode moves the channel out of idle mode by recreating the name
// resolver and load balancer.
func (cc *ClientConn) exitIdleMode() error {
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
return errConnClosing
}
if cc.idlenessState != ccIdlenessStateIdle {
logger.Error("ClientConn asked to exit idle mode when not in idle mode")
return nil
}
defer func() {
// When Close() and exitIdleMode() race against each other, one of the
// following two can happen:
// - Close() wins the race and runs first. exitIdleMode() runs after, and
// sees that the ClientConn is already closed and hence returns early.
// - exitIdleMode() wins the race and runs first and recreates the balancer
// and releases the lock before recreating the resolver. If Close() runs
// in this window, it will wait for exitIdleMode to complete.
//
// We achieve this synchronization using the below condition variable.
cc.mu.Lock()
cc.idlenessState = ccIdlenessStateActive
cc.exitIdleCond.Signal()
cc.mu.Unlock()
}()
cc.idlenessState = ccIdlenessStateExitingIdle
exitedIdle := false
if cc.blockingpicker == nil {
cc.blockingpicker = newPickerWrapper()
} else {
cc.blockingpicker.exitIdleMode()
exitedIdle = true
}
var credsClone credentials.TransportCredentials
if creds := cc.dopts.copts.TransportCredentials; creds != nil {
credsClone = creds.Clone()
}
cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
})
// Build the resolver.
rWrapper, err := newCCResolverWrapper(cc, ccResolverWrapperOpts{
target: cc.parsedTarget,
builder: cc.resolverBuilder,
bOpts: resolver.BuildOptions{
DisableServiceConfig: cc.dopts.disableServiceConfig,
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
},
channelzID: cc.channelzID,
})
if err != nil {
return nil, fmt.Errorf("failed to build resolver: %v", err)
if cc.balancerWrapper == nil {
cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
})
} else {
cc.balancerWrapper.exitIdleMode()
}
cc.mu.Lock()
cc.resolverWrapper = rWrapper
cc.firstResolveEvent = grpcsync.NewEvent()
cc.mu.Unlock()
// A blocking dial blocks until the clientConn is ready.
if cc.dopts.block {
for {
cc.Connect()
s := cc.GetState()
if s == connectivity.Ready {
break
} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
if err = cc.connectionError(); err != nil {
terr, ok := err.(interface {
Temporary() bool
})
if ok && !terr.Temporary() {
return nil, err
}
}
}
if !cc.WaitForStateChange(ctx, s) {
// ctx got timeout or canceled.
if err = cc.connectionError(); err != nil && cc.dopts.returnLastError {
return nil, err
}
return nil, ctx.Err()
}
}
// This needs to be called without cc.mu because this builds a new resolver
// which might update state or report error inline which needs to be handled
// by cc.updateResolverState() which also grabs cc.mu.
if err := cc.initResolverWrapper(credsClone); err != nil {
return err
}
return cc, nil
if exitedIdle {
cc.addTraceEvent("exiting idle mode")
}
return nil
}
// enterIdleMode puts the channel in idle mode, and as part of it shuts down the
// name resolver, load balancer and any subchannels.
func (cc *ClientConn) enterIdleMode() error {
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
return ErrClientConnClosing
}
if cc.idlenessState != ccIdlenessStateActive {
logger.Error("ClientConn asked to enter idle mode when not active")
return nil
}
// cc.conns == nil is a proxy for the ClientConn being closed. So, instead
// of setting it to nil here, we recreate the map. This also means that we
// don't have to do this when exiting idle mode.
conns := cc.conns
cc.conns = make(map[*addrConn]struct{})
// TODO: Currently, we close the resolver wrapper upon entering idle mode
// and create a new one upon exiting idle mode. This means that the
// `cc.resolverWrapper` field would be overwritten everytime we exit idle
// mode. While this means that we need to hold `cc.mu` when accessing
// `cc.resolverWrapper`, it makes the code simpler in the wrapper. We should
// try to do the same for the balancer and picker wrappers too.
cc.resolverWrapper.close()
cc.blockingpicker.enterIdleMode()
cc.balancerWrapper.enterIdleMode()
cc.csMgr.updateState(connectivity.Idle)
cc.idlenessState = ccIdlenessStateIdle
cc.mu.Unlock()
go func() {
cc.addTraceEvent("entering idle mode")
for ac := range conns {
ac.tearDown(errConnIdling)
}
}()
return nil
}
// validateTransportCredentials performs a series of checks on the configured
@ -350,17 +473,7 @@ func (cc *ClientConn) validateTransportCredentials() error {
// Doesn't grab cc.mu as this method is expected to be called only at Dial time.
func (cc *ClientConn) channelzRegistration(target string) {
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
ted := &channelz.TraceEventDesc{
Desc: "Channel created",
Severity: channelz.CtInfo,
}
if cc.dopts.channelzParentID != nil {
ted.Parent = &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID.Int()),
Severity: channelz.CtInfo,
}
}
channelz.AddTraceEvent(logger, cc.channelzID, 1, ted)
cc.addTraceEvent("created")
cc.csMgr.channelzID = cc.channelzID
}
@ -509,6 +622,7 @@ type ClientConn struct {
channelzID *channelz.Identifier // Channelz identifier for the channel.
resolverBuilder resolver.Builder // See parseTargetAndFindResolver().
balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath.
idlenessMgr idlenessManager
// The following provide their own synchronization, and therefore don't
// require cc.mu to be held to access them.
@ -529,11 +643,31 @@ type ClientConn struct {
sc *ServiceConfig // Latest service config received from the resolver.
conns map[*addrConn]struct{} // Set to nil on close.
mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway.
idlenessState ccIdlenessState // Tracks idleness state of the channel.
exitIdleCond *sync.Cond // Signalled when channel exits idle.
lceMu sync.Mutex // protects lastConnectionError
lastConnectionError error
}
// ccIdlenessState tracks the idleness state of the channel.
//
// Channels start off in `active` and move to `idle` after a period of
// inactivity. When moving back to `active` upon an incoming RPC, they
// transition through `exiting_idle`. This state is useful for synchronization
// with Close().
//
// This state tracking is mostly for self-protection. The idlenessManager is
// expected to keep track of the state as well, and is expected not to call into
// the ClientConn unnecessarily.
type ccIdlenessState int8
const (
ccIdlenessStateActive ccIdlenessState = iota
ccIdlenessStateIdle
ccIdlenessStateExitingIdle
)
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
// ctx expires. A true value is returned in former case and false in latter.
//
@ -573,7 +707,7 @@ func (cc *ClientConn) GetState() connectivity.State {
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release.
func (cc *ClientConn) Connect() {
cc.balancerWrapper.exitIdle()
cc.balancerWrapper.exitIdleMode()
}
func (cc *ClientConn) scWatcher() {
@ -1061,39 +1195,40 @@ func (cc *ClientConn) Close() error {
cc.mu.Unlock()
return ErrClientConnClosing
}
for cc.idlenessState == ccIdlenessStateExitingIdle {
cc.exitIdleCond.Wait()
}
conns := cc.conns
cc.conns = nil
cc.csMgr.updateState(connectivity.Shutdown)
pWrapper := cc.blockingpicker
rWrapper := cc.resolverWrapper
cc.resolverWrapper = nil
bWrapper := cc.balancerWrapper
idlenessMgr := cc.idlenessMgr
cc.mu.Unlock()
// The order of closing matters here since the balancer wrapper assumes the
// picker is closed before it is closed.
cc.blockingpicker.close()
if pWrapper != nil {
pWrapper.close()
}
if bWrapper != nil {
bWrapper.close()
}
if rWrapper != nil {
rWrapper.close()
}
if idlenessMgr != nil {
idlenessMgr.close()
}
for ac := range conns {
ac.tearDown(ErrClientConnClosing)
}
ted := &channelz.TraceEventDesc{
Desc: "Channel deleted",
Severity: channelz.CtInfo,
}
if cc.dopts.channelzParentID != nil {
ted.Parent = &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID.Int()),
Severity: channelz.CtInfo,
}
}
channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)
cc.addTraceEvent("deleted")
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add
// trace reference to the entity being deleted, and thus prevent it from being
// deleted right away.
@ -1735,3 +1870,32 @@ func (cc *ClientConn) determineAuthority() error {
channelz.Infof(logger, cc.channelzID, "Channel authority set to %q", cc.authority)
return nil
}
// initResolverWrapper creates a ccResolverWrapper, which builds the name
// resolver. This method grabs the lock to assign the newly built resolver
// wrapper to the cc.resolverWrapper field.
func (cc *ClientConn) initResolverWrapper(creds credentials.TransportCredentials) error {
rw, err := newCCResolverWrapper(cc, ccResolverWrapperOpts{
target: cc.parsedTarget,
builder: cc.resolverBuilder,
bOpts: resolver.BuildOptions{
DisableServiceConfig: cc.dopts.disableServiceConfig,
DialCreds: creds,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
},
channelzID: cc.channelzID,
})
if err != nil {
return fmt.Errorf("failed to build resolver: %v", err)
}
// Resolver implementations may report state update or error inline when
// built (or right after), and this is handled in cc.updateResolverState.
// Also, an error from the resolver might lead to a re-resolution request
// from the balancer, which is handled in resolveNow() where
// `cc.resolverWrapper` is accessed. Hence, we need to hold the lock here.
cc.mu.Lock()
cc.resolverWrapper = rw
cc.mu.Unlock()
return nil
}

View File

@ -370,7 +370,7 @@ func (s) TestBackoffWhenNoServerPrefaceReceived(t *testing.T) {
}()
bc := backoff.Config{
BaseDelay: 200 * time.Millisecond,
Multiplier: 1.1,
Multiplier: 2.0,
Jitter: 0,
MaxDelay: 120 * time.Second,
}

View File

@ -77,6 +77,7 @@ type dialOptions struct {
defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON.
defaultServiceConfigRawJSON *string
resolvers []resolver.Builder
idleTimeout time.Duration
}
// DialOption configures how we set up the connection.
@ -627,6 +628,7 @@ func defaultDialOptions() dialOptions {
ReadBufferSize: defaultReadBufSize,
UseProxy: true,
},
idleTimeout: 30 * time.Minute,
}
}
@ -655,3 +657,23 @@ func WithResolvers(rs ...resolver.Builder) DialOption {
o.resolvers = append(o.resolvers, rs...)
})
}
// WithIdleTimeout returns a DialOption that configures an idle timeout for the
// channel. If the channel is idle for the configured timeout, i.e there are no
// ongoing RPCs and no new RPCs are initiated, the channel will enter idle mode
// and as a result the name resolver and load balancer will be shut down. The
// channel will exit idle mode when the Connect() method is called or when an
// RPC is initiated.
//
// A default timeout of 30 min will be used if this dial option is not set at
// dial time and idleness can be disabled by passing a timeout of zero.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func WithIdleTimeout(d time.Duration) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.idleTimeout = d
})
}

287
idle.go Normal file
View File

@ -0,0 +1,287 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"
)
// For overriding in unit tests.
var timeAfterFunc = func(d time.Duration, f func()) *time.Timer {
return time.AfterFunc(d, f)
}
// idlenessEnforcer is the functionality provided by grpc.ClientConn to enter
// and exit from idle mode.
type idlenessEnforcer interface {
exitIdleMode() error
enterIdleMode() error
}
// idlenessManager defines the functionality required to track RPC activity on a
// channel.
type idlenessManager interface {
onCallBegin() error
onCallEnd()
close()
}
type noopIdlenessManager struct{}
func (noopIdlenessManager) onCallBegin() error { return nil }
func (noopIdlenessManager) onCallEnd() {}
func (noopIdlenessManager) close() {}
// idlenessManagerImpl implements the idlenessManager interface. It uses atomic
// operations to synchronize access to shared state and a mutex to guarantee
// mutual exclusion in a critical section.
type idlenessManagerImpl struct {
// State accessed atomically.
lastCallEndTime int64 // Unix timestamp in nanos; time when the most recent RPC completed.
activeCallsCount int32 // Count of active RPCs; -math.MaxInt32 means channel is idle or is trying to get there.
activeSinceLastTimerCheck int32 // Boolean; True if there was an RPC since the last timer callback.
closed int32 // Boolean; True when the manager is closed.
// Can be accessed without atomics or mutex since these are set at creation
// time and read-only after that.
enforcer idlenessEnforcer // Functionality provided by grpc.ClientConn.
timeout int64 // Idle timeout duration nanos stored as an int64.
// idleMu is used to guarantee mutual exclusion in two scenarios:
// - Opposing intentions:
// - a: Idle timeout has fired and handleIdleTimeout() is trying to put
// the channel in idle mode because the channel has been inactive.
// - b: At the same time an RPC is made on the channel, and onCallBegin()
// is trying to prevent the channel from going idle.
// - Competing intentions:
// - The channel is in idle mode and there are multiple RPCs starting at
// the same time, all trying to move the channel out of idle. Only one
// of them should succeed in doing so, while the other RPCs should
// piggyback on the first one and be successfully handled.
idleMu sync.RWMutex
actuallyIdle bool
timer *time.Timer
}
// newIdlenessManager creates a new idleness manager implementation for the
// given idle timeout.
func newIdlenessManager(enforcer idlenessEnforcer, idleTimeout time.Duration) idlenessManager {
if idleTimeout == 0 {
return noopIdlenessManager{}
}
i := &idlenessManagerImpl{
enforcer: enforcer,
timeout: int64(idleTimeout),
}
i.timer = timeAfterFunc(idleTimeout, i.handleIdleTimeout)
return i
}
// resetIdleTimer resets the idle timer to the given duration. This method
// should only be called from the timer callback.
func (i *idlenessManagerImpl) resetIdleTimer(d time.Duration) {
i.idleMu.Lock()
defer i.idleMu.Unlock()
if i.timer == nil {
// Only close sets timer to nil. We are done.
return
}
// It is safe to ignore the return value from Reset() because this method is
// only ever called from the timer callback, which means the timer has
// already fired.
i.timer.Reset(d)
}
// handleIdleTimeout is the timer callback that is invoked upon expiry of the
// configured idle timeout. The channel is considered inactive if there are no
// ongoing calls and no RPC activity since the last time the timer fired.
func (i *idlenessManagerImpl) handleIdleTimeout() {
if i.isClosed() {
return
}
if atomic.LoadInt32(&i.activeCallsCount) > 0 {
i.resetIdleTimer(time.Duration(i.timeout))
return
}
// There has been activity on the channel since we last got here. Reset the
// timer and return.
if atomic.LoadInt32(&i.activeSinceLastTimerCheck) == 1 {
// Set the timer to fire after a duration of idle timeout, calculated
// from the time the most recent RPC completed.
atomic.StoreInt32(&i.activeSinceLastTimerCheck, 0)
i.resetIdleTimer(time.Duration(atomic.LoadInt64(&i.lastCallEndTime) + i.timeout - time.Now().UnixNano()))
return
}
// This CAS operation is extremely likely to succeed given that there has
// been no activity since the last time we were here. Setting the
// activeCallsCount to -math.MaxInt32 indicates to onCallBegin() that the
// channel is either in idle mode or is trying to get there.
if !atomic.CompareAndSwapInt32(&i.activeCallsCount, 0, -math.MaxInt32) {
// This CAS operation can fail if an RPC started after we checked for
// activity at the top of this method, or one was ongoing from before
// the last time we were here. In both case, reset the timer and return.
i.resetIdleTimer(time.Duration(i.timeout))
return
}
// Now that we've set the active calls count to -math.MaxInt32, it's time to
// actually move to idle mode.
if i.tryEnterIdleMode() {
// Successfully entered idle mode. No timer needed until we exit idle.
return
}
// Failed to enter idle mode due to a concurrent RPC that kept the channel
// active, or because of an error from the channel. Undo the attempt to
// enter idle, and reset the timer to try again later.
atomic.AddInt32(&i.activeCallsCount, math.MaxInt32)
i.resetIdleTimer(time.Duration(i.timeout))
}
// tryEnterIdleMode instructs the channel to enter idle mode. But before
// that, it performs a last minute check to ensure that no new RPC has come in,
// making the channel active.
//
// Return value indicates whether or not the channel moved to idle mode.
//
// Holds idleMu which ensures mutual exclusion with exitIdleMode.
func (i *idlenessManagerImpl) tryEnterIdleMode() bool {
i.idleMu.Lock()
defer i.idleMu.Unlock()
if atomic.LoadInt32(&i.activeCallsCount) != -math.MaxInt32 {
// We raced and lost to a new RPC. Very rare, but stop entering idle.
return false
}
if atomic.LoadInt32(&i.activeSinceLastTimerCheck) == 1 {
// An very short RPC could have come in (and also finished) after we
// checked for calls count and activity in handleIdleTimeout(), but
// before the CAS operation. So, we need to check for activity again.
return false
}
// No new RPCs have come in since we last set the active calls count value
// -math.MaxInt32 in the timer callback. And since we have the lock, it is
// safe to enter idle mode now.
if err := i.enforcer.enterIdleMode(); err != nil {
logger.Errorf("Failed to enter idle mode: %v", err)
return false
}
// Successfully entered idle mode.
i.actuallyIdle = true
return true
}
// onCallBegin is invoked at the start of every RPC.
func (i *idlenessManagerImpl) onCallBegin() error {
if i.isClosed() {
return nil
}
if atomic.AddInt32(&i.activeCallsCount, 1) > 0 {
// Channel is not idle now. Set the activity bit and allow the call.
atomic.StoreInt32(&i.activeSinceLastTimerCheck, 1)
return nil
}
// Channel is either in idle mode or is in the process of moving to idle
// mode. Attempt to exit idle mode to allow this RPC.
if err := i.exitIdleMode(); err != nil {
// Undo the increment to calls count, and return an error causing the
// RPC to fail.
atomic.AddInt32(&i.activeCallsCount, -1)
return err
}
atomic.StoreInt32(&i.activeSinceLastTimerCheck, 1)
return nil
}
// exitIdleMode instructs the channel to exit idle mode.
//
// Holds idleMu which ensures mutual exclusion with tryEnterIdleMode.
func (i *idlenessManagerImpl) exitIdleMode() error {
i.idleMu.Lock()
defer i.idleMu.Unlock()
if !i.actuallyIdle {
// This can happen in two scenarios:
// - handleIdleTimeout() set the calls count to -math.MaxInt32 and called
// tryEnterIdleMode(). But before the latter could grab the lock, an RPC
// came in and onCallBegin() noticed that the calls count is negative.
// - Channel is in idle mode, and multiple new RPCs come in at the same
// time, all of them notice a negative calls count in onCallBegin and get
// here. The first one to get the lock would got the channel to exit idle.
//
// Either way, nothing to do here.
return nil
}
if err := i.enforcer.exitIdleMode(); err != nil {
return fmt.Errorf("channel failed to exit idle mode: %v", err)
}
// Undo the idle entry process. This also respects any new RPC attempts.
atomic.AddInt32(&i.activeCallsCount, math.MaxInt32)
i.actuallyIdle = false
// Start a new timer to fire after the configured idle timeout.
i.timer = timeAfterFunc(time.Duration(i.timeout), i.handleIdleTimeout)
return nil
}
// onCallEnd is invoked at the end of every RPC.
func (i *idlenessManagerImpl) onCallEnd() {
if i.isClosed() {
return
}
// Record the time at which the most recent call finished.
atomic.StoreInt64(&i.lastCallEndTime, time.Now().UnixNano())
// Decrement the active calls count. This count can temporarily go negative
// when the timer callback is in the process of moving the channel to idle
// mode, but one or more RPCs come in and complete before the timer callback
// can get done with the process of moving to idle mode.
atomic.AddInt32(&i.activeCallsCount, -1)
}
func (i *idlenessManagerImpl) isClosed() bool {
return atomic.LoadInt32(&i.closed) == 1
}
func (i *idlenessManagerImpl) close() {
atomic.StoreInt32(&i.closed, 1)
i.idleMu.Lock()
i.timer.Stop()
i.timer = nil
i.idleMu.Unlock()
}

360
idle_test.go Normal file
View File

@ -0,0 +1,360 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
)
const (
defaultTestIdleTimeout = 500 * time.Millisecond // A short idle_timeout for tests.
defaultTestShortTimeout = 10 * time.Millisecond // A small deadline to wait for events expected to not happen.
)
type testIdlenessEnforcer struct {
exitIdleCh chan struct{}
enterIdleCh chan struct{}
}
func (ti *testIdlenessEnforcer) exitIdleMode() error {
ti.exitIdleCh <- struct{}{}
return nil
}
func (ti *testIdlenessEnforcer) enterIdleMode() error {
ti.enterIdleCh <- struct{}{}
return nil
}
func newTestIdlenessEnforcer() *testIdlenessEnforcer {
return &testIdlenessEnforcer{
exitIdleCh: make(chan struct{}, 1),
enterIdleCh: make(chan struct{}, 1),
}
}
// overrideNewTimer overrides the new timer creation function by ensuring that a
// message is pushed on the returned channel everytime the timer fires.
func overrideNewTimer(t *testing.T) <-chan struct{} {
t.Helper()
ch := make(chan struct{}, 1)
origTimeAfterFunc := timeAfterFunc
timeAfterFunc = func(d time.Duration, callback func()) *time.Timer {
return time.AfterFunc(d, func() {
select {
case ch <- struct{}{}:
default:
}
callback()
})
}
t.Cleanup(func() { timeAfterFunc = origTimeAfterFunc })
return ch
}
// TestIdlenessManager_Disabled tests the case where the idleness manager is
// disabled by passing an idle_timeout of 0. Verifies the following things:
// - timer callback does not fire
// - an RPC does not trigger a call to exitIdleMode on the ClientConn
// - more calls to RPC termination (as compared to RPC initiation) does not
// result in an error log
func (s) TestIdlenessManager_Disabled(t *testing.T) {
callbackCh := overrideNewTimer(t)
// Create an idleness manager that is disabled because of idleTimeout being
// set to `0`.
enforcer := newTestIdlenessEnforcer()
mgr := newIdlenessManager(enforcer, time.Duration(0))
// Ensure that the timer callback does not fire within a short deadline.
select {
case <-callbackCh:
t.Fatal("Idle timer callback fired when manager is disabled")
case <-time.After(defaultTestShortTimeout):
}
// The first invocation of onCallBegin() would lead to a call to
// exitIdleMode() on the enforcer, unless the idleness manager is disabled.
mgr.onCallBegin()
select {
case <-enforcer.exitIdleCh:
t.Fatalf("exitIdleMode() called on enforcer when manager is disabled")
case <-time.After(defaultTestShortTimeout):
}
// If the number of calls to onCallEnd() exceeds the number of calls to
// onCallBegin(), the idleness manager is expected to throw an error log
// (which will cause our TestLogger to fail the test). But since the manager
// is disabled, this should not happen.
mgr.onCallEnd()
mgr.onCallEnd()
// The idleness manager is explicitly not closed here. But since the manager
// is disabled, it will not start the run goroutine, and hence we expect the
// leakchecker to not find any leaked goroutines.
}
// TestIdlenessManager_Enabled_TimerFires tests the case where the idle manager
// is enabled. Ensures that when there are no RPCs, the timer callback is
// invoked and the enterIdleMode() method is invoked on the enforcer.
func (s) TestIdlenessManager_Enabled_TimerFires(t *testing.T) {
callbackCh := overrideNewTimer(t)
enforcer := newTestIdlenessEnforcer()
mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout))
defer mgr.close()
// Ensure that the timer callback fires within a appropriate amount of time.
select {
case <-callbackCh:
case <-time.After(2 * defaultTestIdleTimeout):
t.Fatal("Timeout waiting for idle timer callback to fire")
}
// Ensure that the channel moves to idle mode eventually.
select {
case <-enforcer.enterIdleCh:
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout waiting for channel to move to idle")
}
}
// TestIdlenessManager_Enabled_OngoingCall tests the case where the idle manager
// is enabled. Ensures that when there is an ongoing RPC, the channel does not
// enter idle mode.
func (s) TestIdlenessManager_Enabled_OngoingCall(t *testing.T) {
callbackCh := overrideNewTimer(t)
enforcer := newTestIdlenessEnforcer()
mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout))
defer mgr.close()
// Fire up a goroutine that simulates an ongoing RPC that is terminated
// after the timer callback fires for the first time.
timerFired := make(chan struct{})
go func() {
mgr.onCallBegin()
<-timerFired
mgr.onCallEnd()
}()
// Ensure that the timer callback fires and unblock the above goroutine.
select {
case <-callbackCh:
close(timerFired)
case <-time.After(2 * defaultTestIdleTimeout):
t.Fatal("Timeout waiting for idle timer callback to fire")
}
// The invocation of the timer callback should not put the channel in idle
// mode since we had an ongoing RPC.
select {
case <-enforcer.enterIdleCh:
t.Fatalf("enterIdleMode() called on enforcer when active RPC exists")
case <-time.After(defaultTestShortTimeout):
}
// Since we terminated the ongoing RPC and we have no other active RPCs, the
// channel must move to idle eventually.
select {
case <-enforcer.enterIdleCh:
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout waiting for channel to move to idle")
}
}
// TestIdlenessManager_Enabled_ActiveSinceLastCheck tests the case where the
// idle manager is enabled. Ensures that when there are active RPCs in the last
// period (even though there is no active call when the timer fires), the
// channel does not enter idle mode.
func (s) TestIdlenessManager_Enabled_ActiveSinceLastCheck(t *testing.T) {
callbackCh := overrideNewTimer(t)
enforcer := newTestIdlenessEnforcer()
mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout))
defer mgr.close()
// Fire up a goroutine that simulates unary RPCs until the timer callback
// fires.
timerFired := make(chan struct{})
go func() {
for ; ; <-time.After(defaultTestShortTimeout) {
mgr.onCallBegin()
mgr.onCallEnd()
select {
case <-timerFired:
return
default:
}
}
}()
// Ensure that the timer callback fires, and that we don't enter idle as
// part of this invocation of the timer callback, since we had some RPCs in
// this period.
select {
case <-callbackCh:
close(timerFired)
case <-time.After(2 * defaultTestIdleTimeout):
t.Fatal("Timeout waiting for idle timer callback to fire")
}
select {
case <-enforcer.enterIdleCh:
t.Fatalf("enterIdleMode() called on enforcer when one RPC completed in the last period")
case <-time.After(defaultTestShortTimeout):
}
// Since the unrary RPC terminated and we have no other active RPCs, the
// channel must move to idle eventually.
select {
case <-enforcer.enterIdleCh:
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout waiting for channel to move to idle")
}
}
// TestIdlenessManager_Enabled_ExitIdleOnRPC tests the case where the idle
// manager is enabled. Ensures that the channel moves out of idle when an RPC is
// initiated.
func (s) TestIdlenessManager_Enabled_ExitIdleOnRPC(t *testing.T) {
overrideNewTimer(t)
enforcer := newTestIdlenessEnforcer()
mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout))
defer mgr.close()
// Ensure that the channel moves to idle since there are no RPCs.
select {
case <-enforcer.enterIdleCh:
case <-time.After(2 * defaultTestIdleTimeout):
t.Fatal("Timeout waiting for channel to move to idle mode")
}
for i := 0; i < 100; i++ {
// A call to onCallBegin and onCallEnd simulates an RPC.
go func() {
if err := mgr.onCallBegin(); err != nil {
t.Errorf("onCallBegin() failed: %v", err)
}
mgr.onCallEnd()
}()
}
// Ensure that the channel moves out of idle as a result of the above RPC.
select {
case <-enforcer.exitIdleCh:
case <-time.After(2 * defaultTestIdleTimeout):
t.Fatal("Timeout waiting for channel to move out of idle mode")
}
// Ensure that only one call to exit idle mode is made to the CC.
sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
select {
case <-enforcer.exitIdleCh:
t.Fatal("More than one call to exit idle mode on the ClientConn; only one expected")
case <-sCtx.Done():
}
}
type racyIdlenessState int32
const (
stateInital racyIdlenessState = iota
stateEnteredIdle
stateExitedIdle
stateActiveRPCs
)
// racyIdlnessEnforcer is a test idleness enforcer used specifically to test the
// race between idle timeout and incoming RPCs.
type racyIdlenessEnforcer struct {
state *racyIdlenessState // Accessed atomically.
}
// exitIdleMode sets the internal state to stateExitedIdle. We should only ever
// exit idle when we are currently in idle.
func (ri *racyIdlenessEnforcer) exitIdleMode() error {
if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateEnteredIdle), int32(stateExitedIdle)) {
return fmt.Errorf("idleness enforcer asked to exit idle when it did not enter idle earlier")
}
return nil
}
// enterIdleMode attempts to set the internal state to stateEnteredIdle. We should only ever enter idle before RPCs start.
func (ri *racyIdlenessEnforcer) enterIdleMode() error {
if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateInital), int32(stateEnteredIdle)) {
return fmt.Errorf("idleness enforcer asked to enter idle after rpcs started")
}
return nil
}
// TestIdlenessManager_IdleTimeoutRacesWithOnCallBegin tests the case where
// firing of the idle timeout races with an incoming RPC. The test verifies that
// if the timer callback win the race and puts the channel in idle, the RPCs can
// kick it out of idle. And if the RPCs win the race and keep the channel
// active, then the timer callback should not attempt to put the channel in idle
// mode.
func (s) TestIdlenessManager_IdleTimeoutRacesWithOnCallBegin(t *testing.T) {
// Run multiple iterations to simulate different possibilities.
for i := 0; i < 10; i++ {
t.Run(fmt.Sprintf("iteration=%d", i), func(t *testing.T) {
var idlenessState racyIdlenessState
enforcer := &racyIdlenessEnforcer{state: &idlenessState}
// Configure a large idle timeout so that we can control the
// race between the timer callback and RPCs.
mgr := newIdlenessManager(enforcer, time.Duration(10*time.Minute))
defer mgr.close()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
m := mgr.(interface{ handleIdleTimeout() })
<-time.After(defaultTestIdleTimeout)
m.handleIdleTimeout()
}()
for j := 0; j < 100; j++ {
wg.Add(1)
go func() {
defer wg.Done()
// Wait for the configured idle timeout and simulate an RPC to
// race with the idle timeout timer callback.
<-time.After(defaultTestIdleTimeout)
if err := mgr.onCallBegin(); err != nil {
t.Errorf("onCallBegin() failed: %v", err)
}
atomic.StoreInt32((*int32)(&idlenessState), int32(stateActiveRPCs))
mgr.onCallEnd()
}()
}
wg.Wait()
})
}
}

View File

@ -20,6 +20,7 @@ package grpcsync
import (
"context"
"sync"
"google.golang.org/grpc/internal/buffer"
)
@ -31,19 +32,21 @@ import (
//
// This type is safe for concurrent access.
type CallbackSerializer struct {
// Done is closed once the serializer is shut down completely, i.e a
// scheduled callback, if any, that was running when the context passed to
// NewCallbackSerializer is cancelled, has completed and the serializer has
// deallocated all its resources.
// Done is closed once the serializer is shut down completely, i.e all
// scheduled callbacks are executed and the serializer has deallocated all
// its resources.
Done chan struct{}
callbacks *buffer.Unbounded
closedMu sync.Mutex
closed bool
}
// NewCallbackSerializer returns a new CallbackSerializer instance. The provided
// context will be passed to the scheduled callbacks. Users should cancel the
// provided context to shutdown the CallbackSerializer. It is guaranteed that no
// callbacks will be executed once this context is canceled.
// callbacks will be added once this context is canceled, and any pending un-run
// callbacks will be executed before the serializer is shut down.
func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
t := &CallbackSerializer{
Done: make(chan struct{}),
@ -57,17 +60,30 @@ func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
//
// Callbacks are expected to honor the context when performing any blocking
// operations, and should return early when the context is canceled.
func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) {
//
// Return value indicates if the callback was successfully added to the list of
// callbacks to be executed by the serializer. It is not possible to add
// callbacks once the context passed to NewCallbackSerializer is cancelled.
func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) bool {
t.closedMu.Lock()
defer t.closedMu.Unlock()
if t.closed {
return false
}
t.callbacks.Put(f)
return true
}
func (t *CallbackSerializer) run(ctx context.Context) {
var backlog []func(context.Context)
defer close(t.Done)
for ctx.Err() == nil {
select {
case <-ctx.Done():
t.callbacks.Close()
return
// Do nothing here. Next iteration of the for loop will not happen,
// since ctx.Err() would be non-nil.
case callback, ok := <-t.callbacks.Get():
if !ok {
return
@ -76,4 +92,28 @@ func (t *CallbackSerializer) run(ctx context.Context) {
callback.(func(ctx context.Context))(ctx)
}
}
// Fetch pending callbacks if any, and execute them before returning from
// this method and closing t.Done.
t.closedMu.Lock()
t.closed = true
backlog = t.fetchPendingCallbacks()
t.callbacks.Close()
t.closedMu.Unlock()
for _, b := range backlog {
b(ctx)
}
}
func (t *CallbackSerializer) fetchPendingCallbacks() []func(context.Context) {
var backlog []func(context.Context)
for {
select {
case b := <-t.callbacks.Get():
backlog = append(backlog, b.(func(context.Context)))
t.callbacks.Load()
default:
return backlog
}
}
}

View File

@ -20,7 +20,6 @@ package grpcsync
import (
"context"
"fmt"
"sync"
"testing"
"time"
@ -141,7 +140,10 @@ func (s) TestCallbackSerializer_Schedule_Concurrent(t *testing.T) {
// are not executed once Close() returns.
func (s) TestCallbackSerializer_Schedule_Close(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
cs := NewCallbackSerializer(ctx)
defer cancel()
serializerCtx, serializerCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
cs := NewCallbackSerializer(serializerCtx)
// Schedule a callback which blocks until the context passed to it is
// canceled. It also closes a channel to signal that it has started.
@ -151,36 +153,54 @@ func (s) TestCallbackSerializer_Schedule_Close(t *testing.T) {
<-ctx.Done()
})
// Schedule a bunch of callbacks. These should not be exeuted since the first
// one started earlier is blocked.
// Schedule a bunch of callbacks. These should be exeuted since the are
// scheduled before the serializer is closed.
const numCallbacks = 10
errCh := make(chan error, numCallbacks)
callbackCh := make(chan int, numCallbacks)
for i := 0; i < numCallbacks; i++ {
cs.Schedule(func(_ context.Context) {
errCh <- fmt.Errorf("callback %d executed when not expected to", i)
})
num := i
if !cs.Schedule(func(context.Context) { callbackCh <- num }) {
t.Fatal("Schedule failed to accept a callback when the serializer is yet to be closed")
}
}
// Ensure that none of the newer callbacks are executed at this point.
select {
case <-time.After(defaultTestShortTimeout):
case err := <-errCh:
t.Fatal(err)
case <-callbackCh:
t.Fatal("Newer callback executed when older one is still executing")
}
// Wait for the first callback to start before closing the scheduler.
<-firstCallbackStartedCh
// Cancel the context which will unblock the first callback. None of the
// Cancel the context which will unblock the first callback. All of the
// other callbacks (which have not started executing at this point) should
// be executed after this.
cancel()
serializerCancel()
// Ensure that the newer callbacks are executed.
for i := 0; i < numCallbacks; i++ {
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for callback scheduled before close to be executed")
case num := <-callbackCh:
if num != i {
t.Fatalf("Executing callback %d, want %d", num, i)
}
}
}
<-cs.Done
// Ensure that the newer callbacks are not executed.
done := make(chan struct{})
if cs.Schedule(func(context.Context) { close(done) }) {
t.Fatal("Scheduled a callback after closing the serializer")
}
// Ensure that the lates callback is executed at this point.
select {
case <-time.After(defaultTestShortTimeout):
case err := <-errCh:
t.Fatal(err)
case <-done:
t.Fatal("Newer callback executed when scheduled after closing serializer")
}
}

View File

@ -36,6 +36,7 @@ import (
type pickerWrapper struct {
mu sync.Mutex
done bool
idle bool
blockingCh chan struct{}
picker balancer.Picker
}
@ -47,7 +48,11 @@ func newPickerWrapper() *pickerWrapper {
// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
pw.mu.Lock()
if pw.done {
if pw.done || pw.idle {
// There is a small window where a picker update from the LB policy can
// race with the channel going to idle mode. If the picker is idle here,
// it is because the channel asked it to do so, and therefore it is sage
// to ignore the update from the LB policy.
pw.mu.Unlock()
return
}
@ -187,6 +192,25 @@ func (pw *pickerWrapper) close() {
close(pw.blockingCh)
}
func (pw *pickerWrapper) enterIdleMode() {
pw.mu.Lock()
defer pw.mu.Unlock()
if pw.done {
return
}
pw.idle = true
}
func (pw *pickerWrapper) exitIdleMode() {
pw.mu.Lock()
defer pw.mu.Unlock()
if pw.done {
return
}
pw.blockingCh = make(chan struct{})
pw.idle = false
}
// dropError is a wrapper error that indicates the LB policy wishes to drop the
// RPC and not retry it.
type dropError struct {

View File

@ -21,6 +21,7 @@ package grpc
import (
"context"
"strings"
"sync"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal/channelz"
@ -44,15 +45,20 @@ type ccResolverWrapper struct {
cc resolverStateUpdater
channelzID *channelz.Identifier
ignoreServiceConfig bool
opts ccResolverWrapperOpts
serializer *grpcsync.CallbackSerializer // To serialize all incoming calls.
serializerCancel context.CancelFunc // To close the serializer, accessed only from close().
// Outgoing (gRPC --> resolver) and incoming (resolver --> gRPC) calls are
// guaranteed to execute in a mutually exclusive manner as they are
// scheduled on the CallbackSerializer. Fields accessed *only* in serializer
// callbacks, can therefore be accessed without a mutex.
serializer *grpcsync.CallbackSerializer
serializerCancel context.CancelFunc
resolver resolver.Resolver
curState resolver.State
// All incoming (resolver --> gRPC) calls are guaranteed to execute in a
// mutually exclusive manner as they are scheduled on the serializer.
// Fields accessed *only* in these serializer callbacks, can therefore be
// accessed without a mutex.
curState resolver.State
// mu guards access to the below fields.
mu sync.Mutex
closed bool
resolver resolver.Resolver // Accessed only from outgoing calls.
}
// ccResolverWrapperOpts wraps the arguments to be passed when creating a new
@ -72,38 +78,81 @@ func newCCResolverWrapper(cc resolverStateUpdater, opts ccResolverWrapperOpts) (
cc: cc,
channelzID: opts.channelzID,
ignoreServiceConfig: opts.bOpts.DisableServiceConfig,
opts: opts,
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
}
// Cannot hold the lock at build time because the resolver can send an
// update or error inline and these incoming calls grab the lock to schedule
// a callback in the serializer.
r, err := opts.builder.Build(opts.target, ccr, opts.bOpts)
if err != nil {
cancel()
return nil, err
}
// Any error reported by the resolver at build time that leads to a
// re-resolution request from the balancer is dropped by grpc until we
// return from this function. So, we don't have to handle pending resolveNow
// requests here.
ccr.mu.Lock()
ccr.resolver = r
ccr.mu.Unlock()
return ccr, nil
}
func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
ccr.serializer.Schedule(func(_ context.Context) {
ccr.resolver.ResolveNow(o)
})
ccr.mu.Lock()
defer ccr.mu.Unlock()
// ccr.resolver field is set only after the call to Build() returns. But in
// the process of building, the resolver may send an error update which when
// propagated to the balancer may result in a re-resolution request.
if ccr.closed || ccr.resolver == nil {
return
}
ccr.resolver.ResolveNow(o)
}
func (ccr *ccResolverWrapper) close() {
ccr.mu.Lock()
if ccr.closed {
ccr.mu.Unlock()
return
}
channelz.Info(logger, ccr.channelzID, "Closing the name resolver")
// Close the serializer to ensure that no more calls from the resolver are
// handled, before closing the resolver.
// handled, before actually closing the resolver.
ccr.serializerCancel()
ccr.closed = true
r := ccr.resolver
ccr.mu.Unlock()
// Give enqueued callbacks a chance to finish.
<-ccr.serializer.Done
ccr.resolver.Close()
// Spawn a goroutine to close the resolver (since it may block trying to
// cleanup all allocated resources) and return early.
go r.Close()
}
// serializerScheduleLocked is a convenience method to schedule a function to be
// run on the serializer while holding ccr.mu.
func (ccr *ccResolverWrapper) serializerScheduleLocked(f func(context.Context)) {
ccr.mu.Lock()
ccr.serializer.Schedule(f)
ccr.mu.Unlock()
}
// UpdateState is called by resolver implementations to report new state to gRPC
// which includes addresses and service config.
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
errCh := make(chan error, 1)
ccr.serializer.Schedule(func(_ context.Context) {
ok := ccr.serializer.Schedule(func(context.Context) {
ccr.addChannelzTraceEvent(s)
ccr.curState = s
if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
@ -112,22 +161,19 @@ func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
}
errCh <- nil
})
// If the resolver wrapper is closed when waiting for this state update to
// be handled, the callback serializer will be closed as well, and we can
// rely on its Done channel to ensure that we don't block here forever.
select {
case err := <-errCh:
return err
case <-ccr.serializer.Done:
if !ok {
// The only time when Schedule() fail to add the callback to the
// serializer is when the serializer is closed, and this happens only
// when the resolver wrapper is closed.
return nil
}
return <-errCh
}
// ReportError is called by resolver implementations to report errors
// encountered during name resolution to gRPC.
func (ccr *ccResolverWrapper) ReportError(err error) {
ccr.serializer.Schedule(func(_ context.Context) {
ccr.serializerScheduleLocked(func(_ context.Context) {
channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: reporting error to cc: %v", err)
ccr.cc.updateResolverState(resolver.State{}, err)
})
@ -136,7 +182,7 @@ func (ccr *ccResolverWrapper) ReportError(err error) {
// NewAddress is called by the resolver implementation to send addresses to
// gRPC.
func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
ccr.serializer.Schedule(func(_ context.Context) {
ccr.serializerScheduleLocked(func(_ context.Context) {
ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
ccr.curState.Addresses = addrs
ccr.cc.updateResolverState(ccr.curState, nil)
@ -146,7 +192,7 @@ func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
// NewServiceConfig is called by the resolver implementation to send service
// configs to gRPC.
func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
ccr.serializer.Schedule(func(_ context.Context) {
ccr.serializerScheduleLocked(func(_ context.Context) {
channelz.Infof(logger, ccr.channelzID, "ccResolverWrapper: got new service config: %s", sc)
if ccr.ignoreServiceConfig {
channelz.Info(logger, ccr.channelzID, "Service config lookups disabled; ignoring config")

View File

@ -155,6 +155,11 @@ type ClientStream interface {
// If none of the above happen, a goroutine and a context will be leaked, and grpc
// will not call the optionally-configured stats handler with a stats.End message.
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
if err := cc.idlenessMgr.onCallBegin(); err != nil {
return nil, err
}
defer cc.idlenessMgr.onCallEnd()
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = combine(cc.dopts.callOptions, opts)

View File

@ -537,3 +537,10 @@ func awaitNotState(ctx context.Context, t *testing.T, cc *grpc.ClientConn, state
}
}
}
func awaitNoStateChange(ctx context.Context, t *testing.T, cc *grpc.ClientConn, currState connectivity.State) {
t.Helper()
if cc.WaitForStateChange(ctx, currState) {
t.Fatalf("State changed from %q to %q when no state change was expected", currState, cc.GetState())
}
}

423
test/idleness_test.go Normal file
View File

@ -0,0 +1,423 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package test
import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/status"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)
const defaultTestShortIdleTimeout = 500 * time.Millisecond
// channelzTraceEventFound looks up the top-channels in channelz (expects a
// single one), and checks if there is a trace event on the channel matching the
// provided description string.
func channelzTraceEventFound(ctx context.Context, wantDesc string) error {
for ctx.Err() == nil {
tcs, _ := channelz.GetTopChannels(0, 0)
if l := len(tcs); l != 1 {
return fmt.Errorf("when looking for channelz trace event with description %q, found %d top-level channels, want 1", wantDesc, l)
}
if tcs[0].Trace == nil {
return fmt.Errorf("when looking for channelz trace event with description %q, no trace events found for top-level channel", wantDesc)
}
for _, e := range tcs[0].Trace.Events {
if strings.Contains(e.Desc, wantDesc) {
return nil
}
}
}
return fmt.Errorf("when looking for channelz trace event with description %q, %w", wantDesc, ctx.Err())
}
// channelzTraceEventNotFound looks up the top-channels in channelz (expects a
// single one), and verifies that there is no trace event on the channel
// matching the provided description string.
func channelzTraceEventNotFound(ctx context.Context, wantDesc string) error {
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
err := channelzTraceEventFound(sCtx, wantDesc)
if err == nil {
return fmt.Errorf("found channelz trace event with description %q, when expected not to", wantDesc)
}
if !errors.Is(err, context.DeadlineExceeded) {
return err
}
return nil
}
// Tests the case where channel idleness is disabled by passing an idle_timeout
// of 0. Verifies that a READY channel with no RPCs does not move to IDLE.
func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) {
// Setup channelz for testing.
czCleanup := channelz.NewChannelzStorageForTesting()
t.Cleanup(func() { czCleanupWrapper(czCleanup, t) })
// Create a ClientConn with idle_timeout set to 0.
r := manual.NewBuilderWithScheme("whatever")
dopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithIdleTimeout(0), // Disable idleness.
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
// Start a test backend and push an address update via the resolver.
backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
// Veirfy that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
awaitState(ctx, t, cc, connectivity.Ready)
// Veirfy that the ClientConn stay in READY.
sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout)
defer sCancel()
awaitNoStateChange(sCtx, t, cc, connectivity.Ready)
// Verify that there are no idleness related channelz events.
if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil {
t.Fatal(err)
}
if err := channelzTraceEventNotFound(ctx, "exiting idle mode"); err != nil {
t.Fatal(err)
}
}
// Tests the case where channel idleness is enabled by passing a small value for
// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE.
func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
// Setup channelz for testing.
czCleanup := channelz.NewChannelzStorageForTesting()
t.Cleanup(func() { czCleanupWrapper(czCleanup, t) })
// Create a ClientConn with a short idle_timeout.
r := manual.NewBuilderWithScheme("whatever")
dopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
// Start a test backend and push an address update via the resolver.
backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
// Veirfy that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
awaitState(ctx, t, cc, connectivity.Ready)
// Veirfy that the ClientConn moves to IDLE as there is no activity.
awaitState(ctx, t, cc, connectivity.Idle)
// Verify idleness related channelz events.
if err := channelzTraceEventFound(ctx, "entering idle mode"); err != nil {
t.Fatal(err)
}
}
// Tests the case where channel idleness is enabled by passing a small value for
// idle_timeout. Verifies that a READY channel with an ongoing RPC stays READY.
func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
// Setup channelz for testing.
czCleanup := channelz.NewChannelzStorageForTesting()
t.Cleanup(func() { czCleanupWrapper(czCleanup, t) })
// Create a ClientConn with a short idle_timeout.
r := manual.NewBuilderWithScheme("whatever")
dopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
// Start a test backend which keeps a unary RPC call active by blocking on a
// channel that is closed by the test later on. Also push an address update
// via the resolver.
blockCh := make(chan struct{})
backend := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
<-blockCh
return &testpb.Empty{}, nil
},
}
if err := backend.StartServer(); err != nil {
t.Fatalf("Failed to start backend: %v", err)
}
t.Cleanup(backend.Stop)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
// Veirfy that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
awaitState(ctx, t, cc, connectivity.Ready)
// Spawn a goroutine which checks expected state transitions and idleness
// channelz trace events. It eventually closes `blockCh`, thereby unblocking
// the server RPC handler and the unary call below.
errCh := make(chan error, 1)
go func() {
// Veirfy that the ClientConn stay in READY.
sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout)
defer sCancel()
awaitNoStateChange(sCtx, t, cc, connectivity.Ready)
// Verify that there are no idleness related channelz events.
if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil {
errCh <- err
return
}
if err := channelzTraceEventNotFound(ctx, "exiting idle mode"); err != nil {
errCh <- err
return
}
// Unblock the unary RPC on the server.
close(blockCh)
errCh <- nil
}()
// Make a unary RPC that blocks on the server, thereby ensuring that the
// count of active RPCs on the client is non-zero.
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Errorf("EmptyCall RPC failed: %v", err)
}
select {
case err := <-errCh:
if err != nil {
t.Fatal(err)
}
case <-ctx.Done():
t.Fatalf("Timeout when trying to verify that an active RPC keeps channel from moving to IDLE")
}
}
// Tests the case where channel idleness is enabled by passing a small value for
// idle_timeout. Verifies that activity on a READY channel (frequent and short
// RPCs) keeps it from moving to IDLE.
func (s) TestChannelIdleness_Enabled_ActiveSinceLastCheck(t *testing.T) {
// Setup channelz for testing.
czCleanup := channelz.NewChannelzStorageForTesting()
t.Cleanup(func() { czCleanupWrapper(czCleanup, t) })
// Create a ClientConn with a short idle_timeout.
r := manual.NewBuilderWithScheme("whatever")
dopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
// Start a test backend and push an address update via the resolver.
backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
// Veirfy that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
awaitState(ctx, t, cc, connectivity.Ready)
// For a duration of three times the configured idle timeout, making RPCs
// every now and then and ensure that the channel does not move out of
// READY.
sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout)
defer sCancel()
go func() {
for ; sCtx.Err() == nil; <-time.After(defaultTestShortIdleTimeout / 4) {
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); err != nil {
// While iterating through this for loop, at some point in time,
// the context deadline will expire. It is safe to ignore that
// error code.
if status.Code(err) != codes.DeadlineExceeded {
t.Errorf("EmptyCall RPC failed: %v", err)
return
}
}
}
}()
// Veirfy that the ClientConn stay in READY.
awaitNoStateChange(sCtx, t, cc, connectivity.Ready)
// Verify that there are no idleness related channelz events.
if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil {
t.Fatal(err)
}
if err := channelzTraceEventNotFound(ctx, "exiting idle mode"); err != nil {
t.Fatal(err)
}
}
// Tests the case where channel idleness is enabled by passing a small value for
// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE. Also
// verifies that a subsequent RPC on the IDLE channel kicks it out of IDLE.
func (s) TestChannelIdleness_Enabled_ExitIdleOnRPC(t *testing.T) {
// Setup channelz for testing.
czCleanup := channelz.NewChannelzStorageForTesting()
t.Cleanup(func() { czCleanupWrapper(czCleanup, t) })
// Start a test backend and set the bootstrap state of the resolver to
// include this address. This will ensure that when the resolver is
// restarted when exiting idle, it will push the same address to grpc again.
r := manual.NewBuilderWithScheme("whatever")
backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
// Create a ClientConn with a short idle_timeout.
dopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
// Veirfy that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
awaitState(ctx, t, cc, connectivity.Ready)
// Veirfy that the ClientConn moves to IDLE as there is no activity.
awaitState(ctx, t, cc, connectivity.Idle)
// Verify idleness related channelz events.
if err := channelzTraceEventFound(ctx, "entering idle mode"); err != nil {
t.Fatal(err)
}
// Make an RPC and ensure that it succeeds and moves the channel back to
// READY.
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall RPC failed: %v", err)
}
awaitState(ctx, t, cc, connectivity.Ready)
if err := channelzTraceEventFound(ctx, "exiting idle mode"); err != nil {
t.Fatal(err)
}
}
// Tests the case where channel idleness is enabled by passing a small value for
// idle_timeout. Simulates a race between the idle timer firing and RPCs being
// initiated, after a period of inactivity on the channel.
//
// After a period of inactivity (for the configured idle timeout duration), when
// RPCs are started, there are two possibilities:
// - the idle timer wins the race and puts the channel in idle. The RPCs then
// kick it out of idle.
// - the RPCs win the race, and therefore the channel never moves to idle.
//
// In either of these cases, all RPCs must succeed.
func (s) TestChannelIdleness_Enabled_IdleTimeoutRacesWithRPCs(t *testing.T) {
// Setup channelz for testing.
czCleanup := channelz.NewChannelzStorageForTesting()
t.Cleanup(func() { czCleanupWrapper(czCleanup, t) })
// Start a test backend and set the bootstrap state of the resolver to
// include this address. This will ensure that when the resolver is
// restarted when exiting idle, it will push the same address to grpc again.
r := manual.NewBuilderWithScheme("whatever")
backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
// Create a ClientConn with a short idle_timeout.
dopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithIdleTimeout(defaultTestShortTimeout),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })
// Veirfy that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
awaitState(ctx, t, cc, connectivity.Ready)
// Make an RPC every defaultTestShortTimeout duration so as to race with the
// idle timeout. Whether the idle timeout wins the race or the RPC wins the
// race, RPCs must succeed.
client := testgrpc.NewTestServiceClient(cc)
for i := 0; i < 20; i++ {
<-time.After(defaultTestShortTimeout)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Errorf("EmptyCall RPC failed: %v", err)
}
}
}