From eb7e4884a852a29e97a54dd3ac33e5f519aeee82 Mon Sep 17 00:00:00 2001 From: lysu Date: Thu, 2 Sep 2021 13:31:00 +0800 Subject: [PATCH] config: support config resolve lock lite threshold (#291) Signed-off-by: lysu --- config/config.go | 40 ++++++++++++++++++---------------- txnkv/txnlock/lock_resolver.go | 18 +++++++-------- 2 files changed, 30 insertions(+), 28 deletions(-) diff --git a/config/config.go b/config/config.go index a48fecc9..fbaae568 100644 --- a/config/config.go +++ b/config/config.go @@ -71,30 +71,32 @@ type Config struct { PessimisticTxn PessimisticTxn TxnLocalLatches TxnLocalLatches // StoresRefreshInterval indicates the interval of refreshing stores info, the unit is second. - StoresRefreshInterval uint64 - OpenTracingEnable bool - Path string - EnableForwarding bool - TxnScope string - EnableAsyncCommit bool - Enable1PC bool + StoresRefreshInterval uint64 + OpenTracingEnable bool + Path string + EnableForwarding bool + TxnScope string + EnableAsyncCommit bool + Enable1PC bool + ResolveLockLiteThreshold uint64 } // DefaultConfig returns the default configuration. func DefaultConfig() Config { return Config{ - CommitterConcurrency: 128, - MaxTxnTTL: 60 * 60 * 1000, // 1hour - TiKVClient: DefaultTiKVClient(), - PDClient: DefaultPDClient(), - TxnLocalLatches: DefaultTxnLocalLatches(), - StoresRefreshInterval: DefStoresRefreshInterval, - OpenTracingEnable: false, - Path: "", - EnableForwarding: false, - TxnScope: "", - EnableAsyncCommit: false, - Enable1PC: false, + CommitterConcurrency: 128, + MaxTxnTTL: 60 * 60 * 1000, // 1hour + TiKVClient: DefaultTiKVClient(), + PDClient: DefaultPDClient(), + TxnLocalLatches: DefaultTxnLocalLatches(), + StoresRefreshInterval: DefStoresRefreshInterval, + OpenTracingEnable: false, + Path: "", + EnableForwarding: false, + TxnScope: "", + EnableAsyncCommit: false, + Enable1PC: false, + ResolveLockLiteThreshold: 16, } } diff --git a/txnkv/txnlock/lock_resolver.go b/txnkv/txnlock/lock_resolver.go index 5df6c77b..16c5b093 100644 --- a/txnkv/txnlock/lock_resolver.go +++ b/txnkv/txnlock/lock_resolver.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/tikv/client-go/v2/config" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/internal/client" "github.com/tikv/client-go/v2/internal/locate" @@ -52,8 +53,9 @@ type storage interface { // LockResolver resolves locks and also caches resolved txn status. type LockResolver struct { - store storage - mu struct { + store storage + resolveLockLiteThreshold uint64 + mu struct { sync.RWMutex // resolved caches resolved txns (FIFO, txn id -> txnStatus). resolved map[uint64]TxnStatus @@ -67,7 +69,8 @@ type LockResolver struct { // NewLockResolver creates a new LockResolver instance. func NewLockResolver(store storage) *LockResolver { r := &LockResolver{ - store: store, + store: store, + resolveLockLiteThreshold: config.GetGlobalConfig().ResolveLockLiteThreshold, } r.mu.resolved = make(map[uint64]TxnStatus) r.mu.recentResolved = list.New() @@ -137,8 +140,8 @@ func (l *Lock) String() string { buf.WriteString(hex.EncodeToString(l.Key)) buf.WriteString(", primary: ") buf.WriteString(hex.EncodeToString(l.Primary)) - return fmt.Sprintf("%s, txnStartTS: %d, lockForUpdateTS:%d, minCommitTs:%d, ttl: %d, type: %s, UseAsyncCommit: %t", - buf.String(), l.TxnID, l.LockForUpdateTS, l.MinCommitTS, l.TTL, l.LockType, l.UseAsyncCommit) + return fmt.Sprintf("%s, txnStartTS: %d, lockForUpdateTS:%d, minCommitTs:%d, ttl: %d, type: %s, UseAsyncCommit: %t, txnSize: %d", + buf.String(), l.TxnID, l.LockForUpdateTS, l.MinCommitTS, l.TTL, l.LockType, l.UseAsyncCommit, l.TxnSize) } // NewLock creates a new *Lock. @@ -869,12 +872,9 @@ func (lr *LockResolver) resolveRegionLocks(bo *retry.Backoffer, l *Lock, region return nil } -// bigTxnThreshold : transaction involves keys exceed this threshold can be treated as `big transaction`. -const bigTxnThreshold = 16 - func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStatus, lite bool, cleanRegions map[locate.RegionVerID]struct{}) error { metrics.LockResolverCountWithResolveLocks.Inc() - resolveLite := lite || l.TxnSize < bigTxnThreshold + resolveLite := lite || l.TxnSize < lr.resolveLockLiteThreshold for { loc, err := lr.store.GetRegionCache().LocateKey(bo, l.Key) if err != nil {