client: simplify initialization and cleanup a bit (#6798)

This commit is contained in:
Doug Fawley 2023-11-15 10:47:19 -08:00 committed by GitHub
parent b98104ec5a
commit ce3b538586
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 57 additions and 70 deletions

View File

@ -76,17 +76,14 @@ type ccBalancerWrapper struct {
mode ccbMode // Tracks the current mode of the wrapper.
}
// newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer
// is not created until the switchTo() method is invoked.
// newCCBalancerWrapper creates a new balancer wrapper in idle state. The
// underlying balancer is not created until the switchTo() method is invoked.
func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper {
ctx, cancel := context.WithCancel(context.Background())
ccb := &ccBalancerWrapper{
cc: cc,
opts: bopts,
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
cc: cc,
opts: bopts,
mode: ccbModeIdle,
}
ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts)
return ccb
}
@ -258,7 +255,7 @@ func (ccb *ccBalancerWrapper) exitIdleMode() {
// exitIdleMode(), and since we just created a new serializer, we can be
// sure that the below function will be scheduled.
done := make(chan struct{})
ccb.serializer.Schedule(func(_ context.Context) {
ccb.serializer.Schedule(func(context.Context) {
defer close(done)
ccb.mu.Lock()
@ -271,7 +268,11 @@ func (ccb *ccBalancerWrapper) exitIdleMode() {
// Gracefulswitch balancer does not support a switchTo operation after
// being closed. Hence we need to create a new one here.
ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
opts := ccb.opts
if c := opts.DialCreds; c != nil {
opts.DialCreds = c.Clone()
}
ccb.balancer = gracefulswitch.NewBalancer(ccb, opts)
ccb.mode = ccbModeActive
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode")
@ -337,7 +338,7 @@ func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
// case where we wait for ready and then perform an RPC. If the picker is
// updated later, we could call the "connecting" picker when the state is
// updated, and then call the "ready" picker after the picker gets updated.
ccb.cc.blockingpicker.updatePicker(s.Picker)
ccb.cc.pickerWrapper.updatePicker(s.Picker)
ccb.cc.csMgr.updateState(s.ConnectivityState)
}

View File

@ -160,6 +160,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
cc.ctx, cc.cancel = context.WithCancel(context.Background())
cc.exitIdleCond = sync.NewCond(&cc.mu)
// Apply dial options.
disableGlobalOpts := false
for _, opt := range opts {
if _, ok := opt.(*disableGlobalDialOptions); ok {
@ -177,21 +178,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
for _, opt := range opts {
opt.apply(&cc.dopts)
}
chainUnaryClientInterceptors(cc)
chainStreamClientInterceptors(cc)
defer func() {
if err != nil {
cc.Close()
}
}()
// Register ClientConn with channelz.
cc.channelzRegistration(target)
cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID)
if err := cc.validateTransportCredentials(); err != nil {
return nil, err
}
@ -211,6 +200,37 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
cc.dopts.copts.UserAgent = grpcUA
}
// Register ClientConn with channelz.
cc.channelzRegistration(target)
// Determine the resolver to use.
if err := cc.parseTargetAndFindResolver(); err != nil {
channelz.RemoveEntry(cc.channelzID)
return nil, err
}
if err = cc.determineAuthority(); err != nil {
channelz.RemoveEntry(cc.channelzID)
return nil, err
}
cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID)
cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)
cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
DialCreds: cc.dopts.copts.TransportCredentials,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
})
defer func() {
if err != nil {
cc.Close()
}
}()
if cc.dopts.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
@ -235,14 +255,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
cc.dopts.bs = backoff.DefaultExponential
}
// Determine the resolver to use.
if err := cc.parseTargetAndFindResolver(); err != nil {
return nil, err
}
if err = cc.determineAuthority(); err != nil {
return nil, err
}
if cc.dopts.scChan != nil {
// Blocking wait for the initial service config.
select {
@ -359,31 +371,13 @@ func (cc *ClientConn) exitIdleMode() error {
}()
cc.idlenessState = ccIdlenessStateExitingIdle
exitedIdle := false
if cc.blockingpicker == nil {
cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers)
} else {
cc.blockingpicker.exitIdleMode()
exitedIdle = true
}
cc.pickerWrapper.exitIdleMode()
var credsClone credentials.TransportCredentials
if creds := cc.dopts.copts.TransportCredentials; creds != nil {
credsClone = creds.Clone()
}
if cc.balancerWrapper == nil {
cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
})
} else {
cc.balancerWrapper.exitIdleMode()
}
cc.balancerWrapper.exitIdleMode()
cc.firstResolveEvent = grpcsync.NewEvent()
cc.mu.Unlock()
@ -394,9 +388,7 @@ func (cc *ClientConn) exitIdleMode() error {
return err
}
if exitedIdle {
cc.addTraceEvent("exiting idle mode")
}
cc.addTraceEvent("exiting idle mode")
return nil
}
@ -427,7 +419,7 @@ func (cc *ClientConn) enterIdleMode() error {
// `cc.resolverWrapper`, it makes the code simpler in the wrapper. We should
// try to do the same for the balancer and picker wrappers too.
cc.resolverWrapper.close()
cc.blockingpicker.enterIdleMode()
cc.pickerWrapper.enterIdleMode()
cc.balancerWrapper.enterIdleMode()
cc.csMgr.updateState(connectivity.Idle)
cc.idlenessState = ccIdlenessStateIdle
@ -655,7 +647,7 @@ type ClientConn struct {
// The following provide their own synchronization, and therefore don't
// require cc.mu to be held to access them.
csMgr *connectivityStateManager
blockingpicker *pickerWrapper
pickerWrapper *pickerWrapper
safeConfigSelector iresolver.SafeConfigSelector
czData *channelzData
retryThrottler atomic.Value // Updated from service config.
@ -910,7 +902,7 @@ func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) {
err = status.Errorf(codes.Unavailable, "illegal service config type: %T", sc.Config)
}
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.blockingpicker.updatePicker(base.NewErrPicker(err))
cc.pickerWrapper.updatePicker(base.NewErrPicker(err))
cc.csMgr.updateState(connectivity.TransientFailure)
}
@ -1174,7 +1166,7 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
}
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) {
return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
return cc.pickerWrapper.pick(ctx, failfast, balancer.PickInfo{
Ctx: ctx,
FullMethodName: method,
})
@ -1267,24 +1259,18 @@ func (cc *ClientConn) Close() error {
cc.conns = nil
cc.csMgr.updateState(connectivity.Shutdown)
pWrapper := cc.blockingpicker
rWrapper := cc.resolverWrapper
bWrapper := cc.balancerWrapper
idlenessMgr := cc.idlenessMgr
// We can safely unlock and continue to access all fields now as
// cc.conns==nil, preventing any further operations on cc.
cc.mu.Unlock()
// The order of closing matters here since the balancer wrapper assumes the
// picker is closed before it is closed.
if pWrapper != nil {
pWrapper.close()
}
if bWrapper != nil {
bWrapper.close()
}
if rWrapper != nil {
cc.pickerWrapper.close()
cc.balancerWrapper.close()
if rWrapper := cc.resolverWrapper; rWrapper != nil {
rWrapper.close()
}
if idlenessMgr != nil {
if idlenessMgr := cc.idlenessMgr; idlenessMgr != nil {
idlenessMgr.Close()
}