From 9d2ad5209ef58965ace7401dfe7303f5abfe1516 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 23 Aug 2021 15:23:11 +0800 Subject: [PATCH] internal/client: remove the lock for idle connection recycle (#278) Signed-off-by: tiancaiamao --- internal/client/client.go | 17 +++++++++-------- internal/client/client_batch.go | 6 ++++-- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index 3de1eec5..c34bf814 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -261,9 +261,7 @@ type RPCClient struct { security config.Security idleNotify uint32 - // recycleMu protect the conns from being modified during a connArray is taken out and used. - // That means recycleIdleConnArray() will wait until nobody doing sendBatchRequest() - recycleMu sync.RWMutex + // Periodically check whether there is any connection that is idle and then close and remove these connections. // Implement background cleanup. isClosed bool @@ -297,6 +295,13 @@ func (c *RPCClient) getConnArray(addr string, enableBatch bool, opt ...func(cfg return nil, err } } + + // An idle connArray will not change to active again, this avoid the race condition + // that recycling idle connection close an active connection unexpectedly (idle -> active). + if array.batchConn != nil && array.isIdle() { + return nil, errors.Errorf("rpcClient is idle") + } + return array, nil } @@ -363,22 +368,18 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R ctx = opentracing.ContextWithSpan(ctx, span1) } - start := time.Now() if atomic.CompareAndSwapUint32(&c.idleNotify, 1, 0) { go c.recycleIdleConnArray() } // TiDB will not send batch commands to TiFlash, to resolve the conflict with Batch Cop Request. enableBatch := req.StoreTp != tikvrpc.TiDB && req.StoreTp != tikvrpc.TiFlash - c.recycleMu.RLock() - defer c.recycleMu.RUnlock() connArray, err := c.getConnArray(addr, enableBatch) if err != nil { return nil, errors.Trace(err) } - metrics.TiKVBatchClientRecycle.Observe(time.Since(start).Seconds()) - start = time.Now() + start := time.Now() defer func() { stmtExec := ctx.Value(util.ExecDetailsKey) if stmtExec != nil { diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index f8b581aa..5b15f076 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -795,8 +795,7 @@ func sendBatchRequest( } func (c *RPCClient) recycleIdleConnArray() { - c.recycleMu.Lock() - defer c.recycleMu.Unlock() + start := time.Now() var addrs []string c.RLock() @@ -816,8 +815,11 @@ func (c *RPCClient) recycleIdleConnArray() { zap.String("target", addr)) } c.Unlock() + if conn != nil { conn.Close() } } + + metrics.TiKVBatchClientRecycle.Observe(time.Since(start).Seconds()) }