From eb55fa50e6c987e6b847fc13381d10f8bf9ab5db Mon Sep 17 00:00:00 2001 From: lyuxuan Date: Fri, 9 Nov 2018 15:31:07 -0800 Subject: [PATCH] resolverWrapper: remove the watcher goroutine (#2446) --- clientconn.go | 23 +++++++---------- resolver_conn_wrapper.go | 56 ++++++++-------------------------------- 2 files changed, 20 insertions(+), 59 deletions(-) diff --git a/clientconn.go b/clientconn.go index 3aaee7d84..d6f8b1b89 100644 --- a/clientconn.go +++ b/clientconn.go @@ -288,19 +288,14 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } // Build the resolver. - cc.resolverWrapper, err = newCCResolverWrapper(cc) + rWrapper, err := newCCResolverWrapper(cc) if err != nil { return nil, fmt.Errorf("failed to build resolver: %v", err) } - // Start the resolver wrapper goroutine after resolverWrapper is created. - // - // If the goroutine is started before resolverWrapper is ready, the - // following may happen: The goroutine sends updates to cc. cc forwards - // those to balancer. Balancer creates new addrConn. addrConn fails to - // connect, and calls resolveNow(). resolveNow() tries to use the non-ready - // resolverWrapper. - cc.resolverWrapper.start() + cc.mu.Lock() + cc.resolverWrapper = rWrapper + cc.mu.Unlock() // A blocking dial blocks until the clientConn is ready. if cc.dopts.block { for { @@ -389,13 +384,13 @@ type ClientConn struct { csMgr *connectivityStateManager balancerBuildOpts balancer.BuildOptions - resolverWrapper *ccResolverWrapper blockingpicker *pickerWrapper - mu sync.RWMutex - sc ServiceConfig - scRaw string - conns map[*addrConn]struct{} + mu sync.RWMutex + resolverWrapper *ccResolverWrapper + sc ServiceConfig + scRaw string + conns map[*addrConn]struct{} // Keepalive parameter can be updated if a GoAway is received. mkp keepalive.ClientParameters curBalancerName string diff --git a/resolver_conn_wrapper.go b/resolver_conn_wrapper.go index a6c02ac9e..9d7602539 100644 --- a/resolver_conn_wrapper.go +++ b/resolver_conn_wrapper.go @@ -93,47 +93,6 @@ func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) { return ccr, nil } -func (ccr *ccResolverWrapper) start() { - go ccr.watcher() -} - -// watcher processes address updates and service config updates sequentially. -// Otherwise, we need to resolve possible races between address and service -// config (e.g. they specify different balancer types). -func (ccr *ccResolverWrapper) watcher() { - for { - select { - case <-ccr.done: - return - default: - } - - select { - case addrs := <-ccr.addrCh: - select { - case <-ccr.done: - return - default: - } - grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs) - if channelz.IsOn() { - ccr.addChannelzTraceEvent(addrs) - } - ccr.cc.handleResolvedAddrs(addrs, nil) - case sc := <-ccr.scCh: - select { - case <-ccr.done: - return - default: - } - grpclog.Infof("ccResolverWrapper: got new service config: %v", sc) - ccr.cc.handleServiceConfig(sc) - case <-ccr.done: - return - } - } -} - func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOption) { ccr.resolver.ResolveNow(o) } @@ -146,20 +105,27 @@ func (ccr *ccResolverWrapper) close() { // NewAddress is called by the resolver implemenetion to send addresses to gRPC. func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { select { - case <-ccr.addrCh: + case <-ccr.done: + return default: } - ccr.addrCh <- addrs + grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs) + if channelz.IsOn() { + ccr.addChannelzTraceEvent(addrs) + } + ccr.cc.handleResolvedAddrs(addrs, nil) } // NewServiceConfig is called by the resolver implemenetion to send service // configs to gRPC. func (ccr *ccResolverWrapper) NewServiceConfig(sc string) { select { - case <-ccr.scCh: + case <-ccr.done: + return default: } - ccr.scCh <- sc + grpclog.Infof("ccResolverWrapper: got new service config: %v", sc) + ccr.cc.handleServiceConfig(sc) } func (ccr *ccResolverWrapper) addChannelzTraceEvent(addrs []resolver.Address) {