internal/client: remove the lock for idle connection recycle (#278)

Signed-off-by: tiancaiamao <tiancaiamao@gmail.com>
This commit is contained in:
tiancaiamao 2021-08-23 15:23:11 +08:00 committed by GitHub
parent e275d06835
commit 9d2ad5209e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 10 deletions

View File

@ -261,9 +261,7 @@ type RPCClient struct {
security config.Security security config.Security
idleNotify uint32 idleNotify uint32
// recycleMu protect the conns from being modified during a connArray is taken out and used.
// That means recycleIdleConnArray() will wait until nobody doing sendBatchRequest()
recycleMu sync.RWMutex
// Periodically check whether there is any connection that is idle and then close and remove these connections. // Periodically check whether there is any connection that is idle and then close and remove these connections.
// Implement background cleanup. // Implement background cleanup.
isClosed bool isClosed bool
@ -297,6 +295,13 @@ func (c *RPCClient) getConnArray(addr string, enableBatch bool, opt ...func(cfg
return nil, err return nil, err
} }
} }
// An idle connArray will not change to active again, this avoid the race condition
// that recycling idle connection close an active connection unexpectedly (idle -> active).
if array.batchConn != nil && array.isIdle() {
return nil, errors.Errorf("rpcClient is idle")
}
return array, nil return array, nil
} }
@ -363,22 +368,18 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
ctx = opentracing.ContextWithSpan(ctx, span1) ctx = opentracing.ContextWithSpan(ctx, span1)
} }
start := time.Now()
if atomic.CompareAndSwapUint32(&c.idleNotify, 1, 0) { if atomic.CompareAndSwapUint32(&c.idleNotify, 1, 0) {
go c.recycleIdleConnArray() go c.recycleIdleConnArray()
} }
// TiDB will not send batch commands to TiFlash, to resolve the conflict with Batch Cop Request. // TiDB will not send batch commands to TiFlash, to resolve the conflict with Batch Cop Request.
enableBatch := req.StoreTp != tikvrpc.TiDB && req.StoreTp != tikvrpc.TiFlash enableBatch := req.StoreTp != tikvrpc.TiDB && req.StoreTp != tikvrpc.TiFlash
c.recycleMu.RLock()
defer c.recycleMu.RUnlock()
connArray, err := c.getConnArray(addr, enableBatch) connArray, err := c.getConnArray(addr, enableBatch)
if err != nil { if err != nil {
return nil, errors.Trace(err) return nil, errors.Trace(err)
} }
metrics.TiKVBatchClientRecycle.Observe(time.Since(start).Seconds())
start = time.Now() start := time.Now()
defer func() { defer func() {
stmtExec := ctx.Value(util.ExecDetailsKey) stmtExec := ctx.Value(util.ExecDetailsKey)
if stmtExec != nil { if stmtExec != nil {

View File

@ -795,8 +795,7 @@ func sendBatchRequest(
} }
func (c *RPCClient) recycleIdleConnArray() { func (c *RPCClient) recycleIdleConnArray() {
c.recycleMu.Lock() start := time.Now()
defer c.recycleMu.Unlock()
var addrs []string var addrs []string
c.RLock() c.RLock()
@ -816,8 +815,11 @@ func (c *RPCClient) recycleIdleConnArray() {
zap.String("target", addr)) zap.String("target", addr))
} }
c.Unlock() c.Unlock()
if conn != nil { if conn != nil {
conn.Close() conn.Close()
} }
} }
metrics.TiKVBatchClientRecycle.Observe(time.Since(start).Seconds())
} }