mirror of https://github.com/tikv/client-go.git
Add metrics and statistics about aggressive locking (#687)
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
This commit is contained in:
parent
9b3ecc1dca
commit
50e86f7d3c
|
|
@ -97,6 +97,7 @@ var (
|
|||
TiKVUnsafeDestroyRangeFailuresCounterVec *prometheus.CounterVec
|
||||
TiKVPrewriteAssertionUsageCounter *prometheus.CounterVec
|
||||
TiKVGrpcConnectionState *prometheus.GaugeVec
|
||||
TiKVAggressiveLockedKeysCounter *prometheus.CounterVec
|
||||
)
|
||||
|
||||
// Label constants.
|
||||
|
|
@ -598,6 +599,14 @@ func initMetrics(namespace, subsystem string) {
|
|||
Help: "State of gRPC connection",
|
||||
}, []string{"connection_id", "store_ip", "grpc_state"})
|
||||
|
||||
TiKVAggressiveLockedKeysCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "aggressive_locking_count",
|
||||
Help: "Counter of keys locked in aggressive locking mode",
|
||||
}, []string{LblType})
|
||||
|
||||
initShortcuts()
|
||||
}
|
||||
|
||||
|
|
@ -669,6 +678,7 @@ func RegisterMetrics() {
|
|||
prometheus.MustRegister(TiKVUnsafeDestroyRangeFailuresCounterVec)
|
||||
prometheus.MustRegister(TiKVPrewriteAssertionUsageCounter)
|
||||
prometheus.MustRegister(TiKVGrpcConnectionState)
|
||||
prometheus.MustRegister(TiKVAggressiveLockedKeysCounter)
|
||||
}
|
||||
|
||||
// readCounter reads the value of a prometheus.Counter.
|
||||
|
|
|
|||
|
|
@ -136,6 +136,11 @@ var (
|
|||
PrewriteAssertionUsageCounterExist prometheus.Counter
|
||||
PrewriteAssertionUsageCounterNotExist prometheus.Counter
|
||||
PrewriteAssertionUsageCounterUnknown prometheus.Counter
|
||||
|
||||
AggressiveLockedKeysNew prometheus.Counter
|
||||
AggressiveLockedKeysDerived prometheus.Counter
|
||||
AggressiveLockedKeysLockedWithConflict prometheus.Counter
|
||||
AggressiveLockedKeysNonForceLock prometheus.Counter
|
||||
)
|
||||
|
||||
func initShortcuts() {
|
||||
|
|
@ -237,4 +242,17 @@ func initShortcuts() {
|
|||
PrewriteAssertionUsageCounterExist = TiKVPrewriteAssertionUsageCounter.WithLabelValues("exist")
|
||||
PrewriteAssertionUsageCounterNotExist = TiKVPrewriteAssertionUsageCounter.WithLabelValues("not-exist")
|
||||
PrewriteAssertionUsageCounterUnknown = TiKVPrewriteAssertionUsageCounter.WithLabelValues("unknown")
|
||||
|
||||
// Counts new locks trying to acquire inside an aggressive locking stage.
|
||||
AggressiveLockedKeysNew = TiKVAggressiveLockedKeysCounter.WithLabelValues("new")
|
||||
// Counts locks trying to acquire inside an aggressive locking stage, but it's already locked in the previous
|
||||
// aggressive locking stage (before the latest invocation to `RetryAggressiveLocking`), in which case the lock
|
||||
// can be *derived* from the previous stage and no RPC is needed for the key.
|
||||
AggressiveLockedKeysDerived = TiKVAggressiveLockedKeysCounter.WithLabelValues("derived")
|
||||
// Counts locks that's forced acquired ignoring the WriteConflict.
|
||||
AggressiveLockedKeysLockedWithConflict = TiKVAggressiveLockedKeysCounter.WithLabelValues("locked_with_conflict")
|
||||
// Counts locks that's acquired within an aggressive locking stage, but with force-lock disabled (by passing
|
||||
// `WakeUpMode = PessimisticLockWakeUpMode_WakeUpModeNormal`, which will disable `allow_lock_with_conflict` in
|
||||
// TiKV).
|
||||
AggressiveLockedKeysNonForceLock = TiKVAggressiveLockedKeysCounter.WithLabelValues("non_force_lock")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -825,6 +825,28 @@ func (txn *KVTxn) filterAggressiveLockedKeys(lockCtx *tikv.LockCtx, allKeys [][]
|
|||
return keys, nil
|
||||
}
|
||||
|
||||
// collectAggressiveLockingStats collects statistics about aggressive locking and updates metrics if needed.
|
||||
func (txn *KVTxn) collectAggressiveLockingStats(lockCtx *tikv.LockCtx, keys int, skippedLockKeys int, filteredAggressiveLockedKeysCount int, lockWakeUpMode kvrpcpb.PessimisticLockWakeUpMode) {
|
||||
if keys > 0 {
|
||||
lockCtx.Stats.AggressiveLockNewCount += keys - skippedLockKeys
|
||||
|
||||
lockedWithConflictCount := 0
|
||||
for _, v := range lockCtx.Values {
|
||||
if v.LockedWithConflictTS != 0 {
|
||||
lockedWithConflictCount++
|
||||
}
|
||||
}
|
||||
lockCtx.Stats.LockedWithConflictCount += lockedWithConflictCount
|
||||
metrics.AggressiveLockedKeysLockedWithConflict.Add(float64(lockedWithConflictCount))
|
||||
|
||||
if lockWakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeNormal {
|
||||
metrics.AggressiveLockedKeysNonForceLock.Add(float64(keys))
|
||||
}
|
||||
}
|
||||
|
||||
lockCtx.Stats.AggressiveLockDerivedCount += filteredAggressiveLockedKeysCount
|
||||
}
|
||||
|
||||
// LockKeys tries to lock the entries with the keys in KV store.
|
||||
// lockCtx is the context for lock, lockCtx.lockWaitTime in ms
|
||||
func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput ...[]byte) error {
|
||||
|
|
@ -956,7 +978,9 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
|
|||
}
|
||||
keys = deduplicateKeys(keys)
|
||||
checkedExistence := false
|
||||
filteredAggressiveLockedKeysCount := 0
|
||||
var assignedPrimaryKey bool
|
||||
lockWakeUpMode := kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeNormal
|
||||
if txn.IsPessimistic() && lockCtx.ForUpdateTS > 0 {
|
||||
if txn.committer == nil {
|
||||
// sessionID is used for log.
|
||||
|
|
@ -979,6 +1003,10 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
|
|||
txn.committer.forUpdateTS = lockCtx.ForUpdateTS
|
||||
|
||||
allKeys := keys
|
||||
lockCtx.Stats = &util.LockKeysDetails{
|
||||
LockKeys: int32(len(keys)),
|
||||
ResolveLock: util.ResolveLockDetail{},
|
||||
}
|
||||
|
||||
// If aggressive locking is enabled and we don't need to update the primary for all locks, we can avoid sending
|
||||
// RPC to those already locked keys.
|
||||
|
|
@ -988,24 +1016,26 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
|
|||
return err
|
||||
}
|
||||
|
||||
filteredAggressiveLockedKeysCount = len(allKeys) - len(keys)
|
||||
metrics.AggressiveLockedKeysDerived.Add(float64(filteredAggressiveLockedKeysCount))
|
||||
metrics.AggressiveLockedKeysNew.Add(float64(len(keys)))
|
||||
|
||||
if len(keys) == 0 {
|
||||
if lockCtx.Stats != nil {
|
||||
txn.collectAggressiveLockingStats(lockCtx, 0, 0, filteredAggressiveLockedKeysCount, lockWakeUpMode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
lockWaitMode := kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeNormal
|
||||
if txn.aggressiveLockingContext != nil && len(keys) == 1 && !lockCtx.LockOnlyIfExists {
|
||||
lockWaitMode = kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock
|
||||
}
|
||||
lockCtx.Stats = &util.LockKeysDetails{
|
||||
LockKeys: int32(len(keys)),
|
||||
ResolveLock: util.ResolveLockDetail{},
|
||||
lockWakeUpMode = kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock
|
||||
}
|
||||
bo := retry.NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, txn.vars)
|
||||
// If the number of keys greater than 1, it can be on different region,
|
||||
// concurrently execute on multiple regions may lead to deadlock.
|
||||
txn.committer.isFirstLock = txn.lockedCnt == 0 && len(keys) == 1
|
||||
err = txn.committer.pessimisticLockMutations(bo, lockCtx, lockWaitMode, &PlainMutations{keys: keys})
|
||||
err = txn.committer.pessimisticLockMutations(bo, lockCtx, lockWakeUpMode, &PlainMutations{keys: keys})
|
||||
if lockCtx.Stats != nil && bo.GetTotalSleep() > 0 {
|
||||
atomic.AddInt64(&lockCtx.Stats.BackoffTime, int64(bo.GetTotalSleep())*int64(time.Millisecond))
|
||||
lockCtx.Stats.Mu.Lock()
|
||||
|
|
@ -1095,7 +1125,7 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
|
|||
}
|
||||
txn.unsetPrimaryKeyIfNeeded(lockCtx)
|
||||
}
|
||||
skipedLockKeys := 0
|
||||
skippedLockKeys := 0
|
||||
for _, key := range keys {
|
||||
valExists := true
|
||||
// PointGet and BatchPointGet will return value in pessimistic lock response, the value may not exist.
|
||||
|
|
@ -1124,13 +1154,18 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
|
|||
}
|
||||
// TODO: Fix the calculation when aggressive-locking is active
|
||||
if lockCtx.LockOnlyIfExists && !valExists {
|
||||
skipedLockKeys++
|
||||
skippedLockKeys++
|
||||
continue
|
||||
}
|
||||
memBuf.UpdateFlags(key, tikv.SetKeyLocked, tikv.DelNeedCheckExists, setValExists)
|
||||
}
|
||||
}
|
||||
txn.lockedCnt += len(keys) - skipedLockKeys
|
||||
|
||||
// Update statistics information.
|
||||
txn.lockedCnt += len(keys) - skippedLockKeys
|
||||
if txn.aggressiveLockingContext != nil && lockCtx.Stats != nil {
|
||||
txn.collectAggressiveLockingStats(lockCtx, len(keys), skippedLockKeys, filteredAggressiveLockedKeysCount, lockWakeUpMode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -239,12 +239,15 @@ func (cd *CommitDetails) Clone() *CommitDetails {
|
|||
|
||||
// LockKeysDetails contains pessimistic lock keys detail information.
|
||||
type LockKeysDetails struct {
|
||||
TotalTime time.Duration
|
||||
RegionNum int32
|
||||
LockKeys int32
|
||||
ResolveLock ResolveLockDetail
|
||||
BackoffTime int64
|
||||
Mu struct {
|
||||
TotalTime time.Duration
|
||||
RegionNum int32
|
||||
LockKeys int32
|
||||
AggressiveLockNewCount int
|
||||
AggressiveLockDerivedCount int
|
||||
LockedWithConflictCount int
|
||||
ResolveLock ResolveLockDetail
|
||||
BackoffTime int64
|
||||
Mu struct {
|
||||
sync.Mutex
|
||||
BackoffTypes []string
|
||||
SlowestReqTotalTime time.Duration
|
||||
|
|
@ -262,6 +265,9 @@ func (ld *LockKeysDetails) Merge(lockKey *LockKeysDetails) {
|
|||
ld.TotalTime += lockKey.TotalTime
|
||||
ld.RegionNum += lockKey.RegionNum
|
||||
ld.LockKeys += lockKey.LockKeys
|
||||
ld.AggressiveLockNewCount += lockKey.AggressiveLockNewCount
|
||||
ld.AggressiveLockDerivedCount += lockKey.AggressiveLockDerivedCount
|
||||
ld.LockedWithConflictCount += lockKey.LockedWithConflictCount
|
||||
ld.ResolveLock.ResolveLockTime += lockKey.ResolveLock.ResolveLockTime
|
||||
ld.BackoffTime += lockKey.BackoffTime
|
||||
ld.LockRPCTime += lockKey.LockRPCTime
|
||||
|
|
@ -294,14 +300,17 @@ func (ld *LockKeysDetails) MergeReqDetails(reqDuration time.Duration, regionID u
|
|||
// Clone returns a deep copy of itself.
|
||||
func (ld *LockKeysDetails) Clone() *LockKeysDetails {
|
||||
lock := &LockKeysDetails{
|
||||
TotalTime: ld.TotalTime,
|
||||
RegionNum: ld.RegionNum,
|
||||
LockKeys: ld.LockKeys,
|
||||
BackoffTime: ld.BackoffTime,
|
||||
LockRPCTime: ld.LockRPCTime,
|
||||
LockRPCCount: ld.LockRPCCount,
|
||||
RetryCount: ld.RetryCount,
|
||||
ResolveLock: ld.ResolveLock,
|
||||
TotalTime: ld.TotalTime,
|
||||
RegionNum: ld.RegionNum,
|
||||
LockKeys: ld.LockKeys,
|
||||
AggressiveLockNewCount: ld.AggressiveLockNewCount,
|
||||
AggressiveLockDerivedCount: ld.AggressiveLockDerivedCount,
|
||||
LockedWithConflictCount: ld.LockedWithConflictCount,
|
||||
BackoffTime: ld.BackoffTime,
|
||||
LockRPCTime: ld.LockRPCTime,
|
||||
LockRPCCount: ld.LockRPCCount,
|
||||
RetryCount: ld.RetryCount,
|
||||
ResolveLock: ld.ResolveLock,
|
||||
}
|
||||
lock.Mu.BackoffTypes = append([]string{}, ld.Mu.BackoffTypes...)
|
||||
lock.Mu.SlowestReqTotalTime = ld.Mu.SlowestReqTotalTime
|
||||
|
|
@ -321,12 +330,12 @@ type ExecDetails struct {
|
|||
|
||||
// FormatDuration uses to format duration, this function will prune precision before format duration.
|
||||
// Pruning precision is for human readability. The prune rule is:
|
||||
// 1. if the duration was less than 1us, return the original string.
|
||||
// 2. readable value >=10, keep 1 decimal, otherwise, keep 2 decimal. such as:
|
||||
// 9.412345ms -> 9.41ms
|
||||
// 10.412345ms -> 10.4ms
|
||||
// 5.999s -> 6s
|
||||
// 100.45µs -> 100.5µs
|
||||
// 1. if the duration was less than 1us, return the original string.
|
||||
// 2. readable value >=10, keep 1 decimal, otherwise, keep 2 decimal. such as:
|
||||
// 9.412345ms -> 9.41ms
|
||||
// 10.412345ms -> 10.4ms
|
||||
// 5.999s -> 6s
|
||||
// 100.45µs -> 100.5µs
|
||||
func FormatDuration(d time.Duration) string {
|
||||
if d <= time.Microsecond {
|
||||
return d.String()
|
||||
|
|
|
|||
Loading…
Reference in New Issue