mirror of https://github.com/tikv/client-go.git
config: support config resolve lock lite threshold (#291)
Signed-off-by: lysu <sulifx@gmail.com>
This commit is contained in:
parent
06c3868595
commit
eb7e4884a8
|
|
@ -71,30 +71,32 @@ type Config struct {
|
||||||
PessimisticTxn PessimisticTxn
|
PessimisticTxn PessimisticTxn
|
||||||
TxnLocalLatches TxnLocalLatches
|
TxnLocalLatches TxnLocalLatches
|
||||||
// StoresRefreshInterval indicates the interval of refreshing stores info, the unit is second.
|
// StoresRefreshInterval indicates the interval of refreshing stores info, the unit is second.
|
||||||
StoresRefreshInterval uint64
|
StoresRefreshInterval uint64
|
||||||
OpenTracingEnable bool
|
OpenTracingEnable bool
|
||||||
Path string
|
Path string
|
||||||
EnableForwarding bool
|
EnableForwarding bool
|
||||||
TxnScope string
|
TxnScope string
|
||||||
EnableAsyncCommit bool
|
EnableAsyncCommit bool
|
||||||
Enable1PC bool
|
Enable1PC bool
|
||||||
|
ResolveLockLiteThreshold uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultConfig returns the default configuration.
|
// DefaultConfig returns the default configuration.
|
||||||
func DefaultConfig() Config {
|
func DefaultConfig() Config {
|
||||||
return Config{
|
return Config{
|
||||||
CommitterConcurrency: 128,
|
CommitterConcurrency: 128,
|
||||||
MaxTxnTTL: 60 * 60 * 1000, // 1hour
|
MaxTxnTTL: 60 * 60 * 1000, // 1hour
|
||||||
TiKVClient: DefaultTiKVClient(),
|
TiKVClient: DefaultTiKVClient(),
|
||||||
PDClient: DefaultPDClient(),
|
PDClient: DefaultPDClient(),
|
||||||
TxnLocalLatches: DefaultTxnLocalLatches(),
|
TxnLocalLatches: DefaultTxnLocalLatches(),
|
||||||
StoresRefreshInterval: DefStoresRefreshInterval,
|
StoresRefreshInterval: DefStoresRefreshInterval,
|
||||||
OpenTracingEnable: false,
|
OpenTracingEnable: false,
|
||||||
Path: "",
|
Path: "",
|
||||||
EnableForwarding: false,
|
EnableForwarding: false,
|
||||||
TxnScope: "",
|
TxnScope: "",
|
||||||
EnableAsyncCommit: false,
|
EnableAsyncCommit: false,
|
||||||
Enable1PC: false,
|
Enable1PC: false,
|
||||||
|
ResolveLockLiteThreshold: 16,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,7 @@ import (
|
||||||
|
|
||||||
"github.com/pingcap/errors"
|
"github.com/pingcap/errors"
|
||||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||||
|
"github.com/tikv/client-go/v2/config"
|
||||||
tikverr "github.com/tikv/client-go/v2/error"
|
tikverr "github.com/tikv/client-go/v2/error"
|
||||||
"github.com/tikv/client-go/v2/internal/client"
|
"github.com/tikv/client-go/v2/internal/client"
|
||||||
"github.com/tikv/client-go/v2/internal/locate"
|
"github.com/tikv/client-go/v2/internal/locate"
|
||||||
|
|
@ -52,8 +53,9 @@ type storage interface {
|
||||||
|
|
||||||
// LockResolver resolves locks and also caches resolved txn status.
|
// LockResolver resolves locks and also caches resolved txn status.
|
||||||
type LockResolver struct {
|
type LockResolver struct {
|
||||||
store storage
|
store storage
|
||||||
mu struct {
|
resolveLockLiteThreshold uint64
|
||||||
|
mu struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
// resolved caches resolved txns (FIFO, txn id -> txnStatus).
|
// resolved caches resolved txns (FIFO, txn id -> txnStatus).
|
||||||
resolved map[uint64]TxnStatus
|
resolved map[uint64]TxnStatus
|
||||||
|
|
@ -67,7 +69,8 @@ type LockResolver struct {
|
||||||
// NewLockResolver creates a new LockResolver instance.
|
// NewLockResolver creates a new LockResolver instance.
|
||||||
func NewLockResolver(store storage) *LockResolver {
|
func NewLockResolver(store storage) *LockResolver {
|
||||||
r := &LockResolver{
|
r := &LockResolver{
|
||||||
store: store,
|
store: store,
|
||||||
|
resolveLockLiteThreshold: config.GetGlobalConfig().ResolveLockLiteThreshold,
|
||||||
}
|
}
|
||||||
r.mu.resolved = make(map[uint64]TxnStatus)
|
r.mu.resolved = make(map[uint64]TxnStatus)
|
||||||
r.mu.recentResolved = list.New()
|
r.mu.recentResolved = list.New()
|
||||||
|
|
@ -137,8 +140,8 @@ func (l *Lock) String() string {
|
||||||
buf.WriteString(hex.EncodeToString(l.Key))
|
buf.WriteString(hex.EncodeToString(l.Key))
|
||||||
buf.WriteString(", primary: ")
|
buf.WriteString(", primary: ")
|
||||||
buf.WriteString(hex.EncodeToString(l.Primary))
|
buf.WriteString(hex.EncodeToString(l.Primary))
|
||||||
return fmt.Sprintf("%s, txnStartTS: %d, lockForUpdateTS:%d, minCommitTs:%d, ttl: %d, type: %s, UseAsyncCommit: %t",
|
return fmt.Sprintf("%s, txnStartTS: %d, lockForUpdateTS:%d, minCommitTs:%d, ttl: %d, type: %s, UseAsyncCommit: %t, txnSize: %d",
|
||||||
buf.String(), l.TxnID, l.LockForUpdateTS, l.MinCommitTS, l.TTL, l.LockType, l.UseAsyncCommit)
|
buf.String(), l.TxnID, l.LockForUpdateTS, l.MinCommitTS, l.TTL, l.LockType, l.UseAsyncCommit, l.TxnSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewLock creates a new *Lock.
|
// NewLock creates a new *Lock.
|
||||||
|
|
@ -869,12 +872,9 @@ func (lr *LockResolver) resolveRegionLocks(bo *retry.Backoffer, l *Lock, region
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// bigTxnThreshold : transaction involves keys exceed this threshold can be treated as `big transaction`.
|
|
||||||
const bigTxnThreshold = 16
|
|
||||||
|
|
||||||
func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStatus, lite bool, cleanRegions map[locate.RegionVerID]struct{}) error {
|
func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStatus, lite bool, cleanRegions map[locate.RegionVerID]struct{}) error {
|
||||||
metrics.LockResolverCountWithResolveLocks.Inc()
|
metrics.LockResolverCountWithResolveLocks.Inc()
|
||||||
resolveLite := lite || l.TxnSize < bigTxnThreshold
|
resolveLite := lite || l.TxnSize < lr.resolveLockLiteThreshold
|
||||||
for {
|
for {
|
||||||
loc, err := lr.store.GetRegionCache().LocateKey(bo, l.Key)
|
loc, err := lr.store.GetRegionCache().LocateKey(bo, l.Key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue