client-go/txnkv/txnlock/lock_resolver.go

1028 lines
35 KiB
Go

// Copyright 2021 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package txnlock
import (
"bytes"
"container/list"
"context"
"encoding/hex"
"fmt"
"math"
"sync"
"time"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pkg/errors"
"github.com/tikv/client-go/v2/config"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/internal/client"
"github.com/tikv/client-go/v2/internal/locate"
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)
// ResolvedCacheSize is max number of cached txn status.
const ResolvedCacheSize = 2048
const (
getTxnStatusMaxBackoff = 20000
asyncResolveLockMaxBackoff = 40000
)
type storage interface {
// GetRegionCache gets the RegionCache.
GetRegionCache() *locate.RegionCache
// SendReq sends a request to TiKV.
SendReq(bo *retry.Backoffer, req *tikvrpc.Request, regionID locate.RegionVerID, timeout time.Duration) (*tikvrpc.Response, error)
// GetOracle gets a timestamp oracle client.
GetOracle() oracle.Oracle
}
// LockResolver resolves locks and also caches resolved txn status.
type LockResolver struct {
store storage
resolveLockLiteThreshold uint64
mu struct {
sync.RWMutex
// resolved caches resolved txns (FIFO, txn id -> txnStatus).
resolved map[uint64]TxnStatus
recentResolved *list.List
}
testingKnobs struct {
meetLock func(locks []*Lock)
}
// LockResolver may have some goroutines resolving locks in the background.
// The Cancel function is to cancel these goroutines for passing goleak test.
asyncResolveCtx context.Context
asyncResolveCancel func()
}
// NewLockResolver creates a new LockResolver instance.
func NewLockResolver(store storage) *LockResolver {
r := &LockResolver{
store: store,
resolveLockLiteThreshold: config.GetGlobalConfig().TiKVClient.ResolveLockLiteThreshold,
}
r.mu.resolved = make(map[uint64]TxnStatus)
r.mu.recentResolved = list.New()
r.asyncResolveCtx, r.asyncResolveCancel = context.WithCancel(context.Background())
return r
}
// Close cancels all background goroutines.
func (lr *LockResolver) Close() {
lr.asyncResolveCancel()
}
// TxnStatus represents a txn's final status. It should be Lock or Commit or Rollback.
type TxnStatus struct {
ttl uint64
commitTS uint64
action kvrpcpb.Action
primaryLock *kvrpcpb.LockInfo
}
// IsCommitted returns true if the txn's final status is Commit.
func (s TxnStatus) IsCommitted() bool { return s.ttl == 0 && s.commitTS > 0 }
// IsRolledBack returns true if the txn's final status is rolled back.
func (s TxnStatus) IsRolledBack() bool { return s.ttl == 0 && s.commitTS == 0 }
// CommitTS returns the txn's commitTS. It is valid iff `IsCommitted` is true.
func (s TxnStatus) CommitTS() uint64 { return s.commitTS }
// TTL returns the TTL of the transaction if the transaction is still alive.
func (s TxnStatus) TTL() uint64 { return s.ttl }
// Action returns what the CheckTxnStatus request have done to the transaction.
func (s TxnStatus) Action() kvrpcpb.Action { return s.action }
// StatusCacheable checks whether the transaction status is certain.True will be
// returned if its status is certain:
// If transaction is already committed, the result could be cached.
// Otherwise:
// If l.LockType is pessimistic lock type:
// - if its primary lock is pessimistic too, the check txn status result should not be cached.
// - if its primary lock is prewrite lock type, the check txn status could be cached.
// If l.lockType is prewrite lock type:
// - always cache the check txn status result.
// For prewrite locks, their primary keys should ALWAYS be the correct one and will NOT change.
func (s TxnStatus) StatusCacheable() bool {
if s.IsCommitted() {
return true
}
if s.ttl == 0 {
if s.action == kvrpcpb.Action_NoAction ||
s.action == kvrpcpb.Action_LockNotExistRollback ||
s.action == kvrpcpb.Action_TTLExpireRollback {
return true
}
}
return false
}
// Lock represents a lock from tikv server.
type Lock struct {
Key []byte
Primary []byte
TxnID uint64
TTL uint64
TxnSize uint64
LockType kvrpcpb.Op
UseAsyncCommit bool
LockForUpdateTS uint64
MinCommitTS uint64
}
func (l *Lock) String() string {
buf := bytes.NewBuffer(make([]byte, 0, 128))
buf.WriteString("key: ")
buf.WriteString(hex.EncodeToString(l.Key))
buf.WriteString(", primary: ")
buf.WriteString(hex.EncodeToString(l.Primary))
return fmt.Sprintf("%s, txnStartTS: %d, lockForUpdateTS:%d, minCommitTs:%d, ttl: %d, type: %s, UseAsyncCommit: %t, txnSize: %d",
buf.String(), l.TxnID, l.LockForUpdateTS, l.MinCommitTS, l.TTL, l.LockType, l.UseAsyncCommit, l.TxnSize)
}
// NewLock creates a new *Lock.
func NewLock(l *kvrpcpb.LockInfo) *Lock {
return &Lock{
Key: l.GetKey(),
Primary: l.GetPrimaryLock(),
TxnID: l.GetLockVersion(),
TTL: l.GetLockTtl(),
TxnSize: l.GetTxnSize(),
LockType: l.LockType,
UseAsyncCommit: l.UseAsyncCommit,
LockForUpdateTS: l.LockForUpdateTs,
MinCommitTS: l.MinCommitTs,
}
}
func (lr *LockResolver) saveResolved(txnID uint64, status TxnStatus) {
lr.mu.Lock()
defer lr.mu.Unlock()
if _, ok := lr.mu.resolved[txnID]; ok {
return
}
lr.mu.resolved[txnID] = status
lr.mu.recentResolved.PushBack(txnID)
if len(lr.mu.resolved) > ResolvedCacheSize {
front := lr.mu.recentResolved.Front()
delete(lr.mu.resolved, front.Value.(uint64))
lr.mu.recentResolved.Remove(front)
}
}
func (lr *LockResolver) getResolved(txnID uint64) (TxnStatus, bool) {
lr.mu.RLock()
defer lr.mu.RUnlock()
s, ok := lr.mu.resolved[txnID]
return s, ok
}
// BatchResolveLocks resolve locks in a batch.
// Used it in gcworker only!
func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, loc locate.RegionVerID) (bool, error) {
if len(locks) == 0 {
return true, nil
}
metrics.LockResolverCountWithBatchResolve.Inc()
// The GCWorker kill all ongoing transactions, because it must make sure all
// locks have been cleaned before GC.
expiredLocks := locks
txnInfos := make(map[uint64]uint64)
startTime := time.Now()
for _, l := range expiredLocks {
if _, ok := txnInfos[l.TxnID]; ok {
continue
}
metrics.LockResolverCountWithExpired.Inc()
// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l)
if err != nil {
return false, err
}
// If the transaction uses async commit, CheckTxnStatus will reject rolling back the primary lock.
// Then we need to check the secondary locks to determine the final status of the transaction.
if status.primaryLock != nil && status.primaryLock.UseAsyncCommit {
resolveData, err := lr.checkAllSecondaries(bo, l, &status)
if err == nil {
txnInfos[l.TxnID] = resolveData.commitTs
continue
}
if _, ok := errors.Cause(err).(*nonAsyncCommitLock); ok {
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, true, l)
if err != nil {
return false, err
}
} else {
return false, err
}
}
if status.ttl > 0 {
logutil.BgLogger().Error("BatchResolveLocks fail to clean locks, this result is not expected!")
return false, errors.New("TiDB ask TiKV to rollback locks but it doesn't, the protocol maybe wrong")
}
txnInfos[l.TxnID] = status.commitTS
}
logutil.BgLogger().Info("BatchResolveLocks: lookup txn status",
zap.Duration("cost time", time.Since(startTime)),
zap.Int("num of txn", len(txnInfos)))
listTxnInfos := make([]*kvrpcpb.TxnInfo, 0, len(txnInfos))
for txnID, status := range txnInfos {
listTxnInfos = append(listTxnInfos, &kvrpcpb.TxnInfo{
Txn: txnID,
Status: status,
})
}
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, &kvrpcpb.ResolveLockRequest{TxnInfos: listTxnInfos})
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
startTime = time.Now()
resp, err := lr.store.SendReq(bo, req, loc, client.ReadTimeoutShort)
if err != nil {
return false, err
}
regionErr, err := resp.GetRegionError()
if err != nil {
return false, err
}
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return false, err
}
return false, nil
}
if resp.Resp == nil {
return false, errors.WithStack(tikverr.ErrBodyMissing)
}
cmdResp := resp.Resp.(*kvrpcpb.ResolveLockResponse)
if keyErr := cmdResp.GetError(); keyErr != nil {
return false, errors.Errorf("unexpected resolve err: %s", keyErr)
}
logutil.BgLogger().Info("BatchResolveLocks: resolve locks in a batch",
zap.Duration("cost time", time.Since(startTime)),
zap.Int("num of locks", len(expiredLocks)))
return true, nil
}
// ResolveLocks tries to resolve Locks. The resolving process is in 3 steps:
// 1) Use the `lockTTL` to pick up all expired locks. Only locks that are too
// old are considered orphan locks and will be handled later. If all locks
// are expired then all locks will be resolved so the returned `ok` will be
// true, otherwise caller should sleep a while before retry.
// 2) For each lock, query the primary key to get txn(which left the lock)'s
// commit status.
// 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to
// the same transaction.
func (lr *LockResolver) ResolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock) (int64, error) {
ttl, _, _, err := lr.resolveLocks(bo, callerStartTS, locks, false, false)
return ttl, err
}
// ResolveLocksForRead is essentially the same as ResolveLocks, except with some optimizations for read.
// Read operations needn't wait for resolve secondary locks and can read through(the lock's transaction is committed
// and its commitTS is less than or equal to callerStartTS) or ignore(the lock's transaction is rolled back or its minCommitTS is pushed) the lock .
func (lr *LockResolver) ResolveLocksForRead(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock, lite bool) (int64, []uint64 /* canIgnore */, []uint64 /* canAccess */, error) {
return lr.resolveLocks(bo, callerStartTS, locks, true, lite)
}
func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock, forRead bool, lite bool) (int64, []uint64 /* canIgnore */, []uint64 /* canAccess */, error) {
if lr.testingKnobs.meetLock != nil {
lr.testingKnobs.meetLock(locks)
}
var msBeforeTxnExpired txnExpireTime
if len(locks) == 0 {
return msBeforeTxnExpired.value(), nil, nil, nil
}
metrics.LockResolverCountWithResolve.Inc()
// TxnID -> []Region, record resolved Regions.
// TODO: Maybe put it in LockResolver and share by all txns.
cleanTxns := make(map[uint64]map[locate.RegionVerID]struct{})
var resolve func(*Lock, bool) (TxnStatus, error)
resolve = func(l *Lock, forceSyncCommit bool) (TxnStatus, error) {
status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS, forceSyncCommit)
if err != nil {
return TxnStatus{}, err
}
if status.ttl != 0 {
return status, nil
}
// If the lock is committed or rollbacked, resolve lock.
metrics.LockResolverCountWithExpired.Inc()
cleanRegions, exists := cleanTxns[l.TxnID]
if !exists {
cleanRegions = make(map[locate.RegionVerID]struct{})
cleanTxns[l.TxnID] = cleanRegions
}
if status.primaryLock != nil && status.primaryLock.UseAsyncCommit && !forceSyncCommit {
// resolveAsyncCommitLock will resolve all locks of the transaction, so we needn't resolve
// it again if it has been resolved once.
if exists {
return status, nil
}
// status of async-commit transaction is determined by resolveAsyncCommitLock.
status, err = lr.resolveAsyncCommitLock(bo, l, status, forRead)
if _, ok := errors.Cause(err).(*nonAsyncCommitLock); ok {
status, err = resolve(l, true)
}
return status, err
}
if l.LockType == kvrpcpb.Op_PessimisticLock {
// pessimistic locks don't block read so it needn't be async.
err = lr.resolvePessimisticLock(bo, l)
} else {
if forRead {
asyncBo := retry.NewBackoffer(lr.asyncResolveCtx, asyncResolveLockMaxBackoff)
go func() {
// Pass an empty cleanRegions here to avoid data race and
// let `reqCollapse` deduplicate identical resolve requests.
err := lr.resolveLock(asyncBo, l, status, lite, map[locate.RegionVerID]struct{}{})
if err != nil {
logutil.BgLogger().Info("failed to resolve lock asynchronously",
zap.String("lock", l.String()), zap.Uint64("commitTS", status.CommitTS()), zap.Error(err))
}
}()
} else {
err = lr.resolveLock(bo, l, status, lite, cleanRegions)
}
}
return status, err
}
var canIgnore, canAccess []uint64
for _, l := range locks {
status, err := resolve(l, false)
if err != nil {
msBeforeTxnExpired.update(0)
return msBeforeTxnExpired.value(), nil, nil, err
}
if !forRead {
if status.ttl != 0 {
metrics.LockResolverCountWithNotExpired.Inc()
msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
msBeforeTxnExpired.update(msBeforeLockExpired)
continue
}
}
if status.action == kvrpcpb.Action_MinCommitTSPushed || status.IsRolledBack() ||
(status.IsCommitted() && status.CommitTS() > callerStartTS) {
if canIgnore == nil {
canIgnore = make([]uint64, 0, len(locks))
}
canIgnore = append(canIgnore, l.TxnID)
} else if status.IsCommitted() && status.CommitTS() <= callerStartTS {
if canAccess == nil {
canAccess = make([]uint64, 0, len(locks))
}
canAccess = append(canAccess, l.TxnID)
} else {
metrics.LockResolverCountWithNotExpired.Inc()
msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
msBeforeTxnExpired.update(msBeforeLockExpired)
}
}
if msBeforeTxnExpired.value() > 0 {
metrics.LockResolverCountWithWaitExpired.Inc()
}
return msBeforeTxnExpired.value(), canIgnore, canAccess, nil
}
type txnExpireTime struct {
initialized bool
txnExpire int64
}
func (t *txnExpireTime) update(lockExpire int64) {
if lockExpire <= 0 {
lockExpire = 0
}
if !t.initialized {
t.txnExpire = lockExpire
t.initialized = true
return
}
if lockExpire < t.txnExpire {
t.txnExpire = lockExpire
}
}
func (t *txnExpireTime) value() int64 {
if !t.initialized {
return 0
}
return t.txnExpire
}
// GetTxnStatus queries tikv-server for a txn's status (commit/rollback).
// If the primary key is still locked, it will launch a Rollback to abort it.
// To avoid unnecessarily aborting too many txns, it is wiser to wait a few
// seconds before calling it after Prewrite.
func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary []byte) (TxnStatus, error) {
var status TxnStatus
bo := retry.NewBackoffer(context.Background(), getTxnStatusMaxBackoff)
currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
if err != nil {
return status, err
}
return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true, false, nil)
}
func (lr *LockResolver) getTxnStatusFromLock(bo *retry.Backoffer, l *Lock, callerStartTS uint64, forceSyncCommit bool) (TxnStatus, error) {
var currentTS uint64
var err error
var status TxnStatus
if l.TTL == 0 {
// NOTE: l.TTL = 0 is a special protocol!!!
// When the pessimistic txn prewrite meets locks of a txn, it should resolve the lock **unconditionally**.
// In this case, TiKV use lock TTL = 0 to notify TiDB, and TiDB should resolve the lock!
// Set currentTS to max uint64 to make the lock expired.
currentTS = math.MaxUint64
} else {
currentTS, err = lr.store.GetOracle().GetLowResolutionTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
if err != nil {
return TxnStatus{}, err
}
}
rollbackIfNotExist := false
if _, err := util.EvalFailpoint("getTxnStatusDelay"); err == nil {
time.Sleep(100 * time.Millisecond)
}
for {
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, rollbackIfNotExist, forceSyncCommit, l)
if err == nil {
return status, nil
}
// If the error is something other than txnNotFoundErr, throw the error (network
// unavailable, tikv down, backoff timeout etc) to the caller.
if _, ok := errors.Cause(err).(txnNotFoundErr); !ok {
return TxnStatus{}, err
}
if _, err := util.EvalFailpoint("txnNotFoundRetTTL"); err == nil {
return TxnStatus{ttl: l.TTL, action: kvrpcpb.Action_NoAction}, nil
}
if lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) <= 0 {
logutil.Logger(bo.GetCtx()).Warn("lock txn not found, lock has expired",
zap.Uint64("CallerStartTs", callerStartTS),
zap.Stringer("lock str", l))
if l.LockType == kvrpcpb.Op_PessimisticLock {
if _, err := util.EvalFailpoint("txnExpireRetTTL"); err == nil {
return TxnStatus{action: kvrpcpb.Action_LockNotExistDoNothing},
errors.New("error txn not found and lock expired")
}
}
// For pessimistic lock resolving, if the primary lock does not exist and rollbackIfNotExist is true,
// The Action_LockNotExistDoNothing will be returned as the status.
rollbackIfNotExist = true
} else {
// For the Rollback statement from user, the pessimistic locks will be rollbacked and the primary key in store
// has no related information. There are possibilities that some other transactions do checkTxnStatus on these
// locks and they will be blocked ttl time, so let the transaction retries to do pessimistic lock if txn not found
// and the lock does not expire yet.
if l.LockType == kvrpcpb.Op_PessimisticLock {
return TxnStatus{ttl: l.TTL}, nil
}
}
// Handle txnNotFound error.
// getTxnStatus() returns it when the secondary locks exist while the primary lock doesn't.
// This is likely to happen in the concurrently prewrite when secondary regions
// success before the primary region.
if err := bo.Backoff(retry.BoTxnNotFound, err); err != nil {
logutil.Logger(bo.GetCtx()).Warn("getTxnStatusFromLock backoff fail", zap.Error(err))
}
}
}
type txnNotFoundErr struct {
*kvrpcpb.TxnNotFound
}
func (e txnNotFoundErr) Error() string {
return e.TxnNotFound.String()
}
// getTxnStatus sends the CheckTxnStatus request to the TiKV server.
// When rollbackIfNotExist is false, the caller should be careful with the txnNotFoundErr error.
func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary []byte,
callerStartTS, currentTS uint64, rollbackIfNotExist bool, forceSyncCommit bool, lockInfo *Lock) (TxnStatus, error) {
if s, ok := lr.getResolved(txnID); ok {
return s, nil
}
metrics.LockResolverCountWithQueryTxnStatus.Inc()
// CheckTxnStatus may meet the following cases:
// 1. LOCK
// 1.1 Lock expired -- orphan lock, fail to update TTL, crash recovery etc.
// 1.2 Lock TTL -- active transaction holding the lock.
// 2. NO LOCK
// 2.1 Txn Committed
// 2.2 Txn Rollbacked -- rollback itself, rollback by others, GC tomb etc.
// 2.3 No lock -- pessimistic lock rollback, concurrence prewrite.
var status TxnStatus
resolvingPessimisticLock := lockInfo != nil && lockInfo.LockType == kvrpcpb.Op_PessimisticLock
req := tikvrpc.NewRequest(tikvrpc.CmdCheckTxnStatus, &kvrpcpb.CheckTxnStatusRequest{
PrimaryKey: primary,
LockTs: txnID,
CallerStartTs: callerStartTS,
CurrentTs: currentTS,
RollbackIfNotExist: rollbackIfNotExist,
ForceSyncCommit: forceSyncCommit,
ResolvingPessimisticLock: resolvingPessimisticLock,
})
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, primary)
if err != nil {
return status, err
}
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
resp, err := lr.store.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
if err != nil {
return status, err
}
regionErr, err := resp.GetRegionError()
if err != nil {
return status, err
}
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return status, err
}
continue
}
if resp.Resp == nil {
return status, errors.WithStack(tikverr.ErrBodyMissing)
}
cmdResp := resp.Resp.(*kvrpcpb.CheckTxnStatusResponse)
if keyErr := cmdResp.GetError(); keyErr != nil {
txnNotFound := keyErr.GetTxnNotFound()
if txnNotFound != nil {
return status, txnNotFoundErr{txnNotFound}
}
err = errors.Errorf("unexpected err: %s, tid: %v", keyErr, txnID)
logutil.BgLogger().Error("getTxnStatus error", zap.Error(err))
return status, err
}
status.action = cmdResp.Action
status.primaryLock = cmdResp.LockInfo
if status.primaryLock != nil && status.primaryLock.UseAsyncCommit && !forceSyncCommit {
if !lr.store.GetOracle().IsExpired(txnID, cmdResp.LockTtl, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) {
status.ttl = cmdResp.LockTtl
}
} else if cmdResp.LockTtl != 0 {
status.ttl = cmdResp.LockTtl
} else {
if cmdResp.CommitVersion == 0 {
metrics.LockResolverCountWithQueryTxnStatusRolledBack.Inc()
} else {
metrics.LockResolverCountWithQueryTxnStatusCommitted.Inc()
}
status.commitTS = cmdResp.CommitVersion
if status.StatusCacheable() {
lr.saveResolved(txnID, status)
}
}
return status, nil
}
}
// asyncResolveData is data contributed by multiple goroutines when resolving locks using the async commit protocol. All
// data should be protected by the mutex field.
type asyncResolveData struct {
mutex sync.Mutex
// If any key has been committed (missingLock is true), then this is the commit ts. In that case, all locks should
// be committed with the same commit timestamp. If no locks have been committed (missingLock is false), then we will
// use max(all min commit ts) from all locks; i.e., it is the commit ts we should use. Note that a secondary lock's
// commit ts may or may not be the same as the primary lock's min commit ts.
commitTs uint64
keys [][]byte
missingLock bool
}
type nonAsyncCommitLock struct{}
func (*nonAsyncCommitLock) Error() string {
return "CheckSecondaryLocks receives a non-async-commit lock"
}
// addKeys adds the keys from locks to data, keeping other fields up to date. startTS and commitTS are for the
// transaction being resolved.
//
// In the async commit protocol when checking locks, we send a list of keys to check and get back a list of locks. There
// will be a lock for every key which is locked. If there are fewer locks than keys, then a lock is missing because it
// has been committed, rolled back, or was never locked.
//
// In this function, locks is the list of locks, and expected is the number of keys. asyncResolveData.missingLock will be
// set to true if the lengths don't match. If the lengths do match, then the locks are added to asyncResolveData.locks
// and will need to be resolved by the caller.
func (data *asyncResolveData) addKeys(locks []*kvrpcpb.LockInfo, expected int, startTS uint64, commitTS uint64) error {
data.mutex.Lock()
defer data.mutex.Unlock()
// Check locks to see if any have been committed or rolled back.
if len(locks) < expected {
logutil.BgLogger().Debug("addKeys: lock has been committed or rolled back", zap.Uint64("commit ts", commitTS), zap.Uint64("start ts", startTS))
// A lock is missing - the transaction must either have been rolled back or committed.
if !data.missingLock {
// commitTS == 0 => lock has been rolled back.
if commitTS != 0 && commitTS < data.commitTs {
return errors.Errorf("commit TS must be greater or equal to min commit TS: commit ts: %v, min commit ts: %v", commitTS, data.commitTs)
}
data.commitTs = commitTS
}
data.missingLock = true
if data.commitTs != commitTS {
return errors.Errorf("commit TS mismatch in async commit recovery: %v and %v", data.commitTs, commitTS)
}
// We do not need to resolve the remaining locks because TiKV will have resolved them as appropriate.
return nil
}
logutil.BgLogger().Debug("addKeys: all locks present", zap.Uint64("start ts", startTS))
// Save all locks to be resolved.
for _, lockInfo := range locks {
if lockInfo.LockVersion != startTS {
err := errors.Errorf("unexpected timestamp, expected: %v, found: %v", startTS, lockInfo.LockVersion)
logutil.BgLogger().Error("addLocks error", zap.Error(err))
return err
}
if !lockInfo.UseAsyncCommit {
return &nonAsyncCommitLock{}
}
if !data.missingLock && lockInfo.MinCommitTs > data.commitTs {
data.commitTs = lockInfo.MinCommitTs
}
data.keys = append(data.keys, lockInfo.Key)
}
return nil
}
func (lr *LockResolver) checkSecondaries(bo *retry.Backoffer, txnID uint64, curKeys [][]byte, curRegionID locate.RegionVerID, shared *asyncResolveData) error {
checkReq := &kvrpcpb.CheckSecondaryLocksRequest{
Keys: curKeys,
StartVersion: txnID,
}
req := tikvrpc.NewRequest(tikvrpc.CmdCheckSecondaryLocks, checkReq)
metrics.LockResolverCountWithQueryCheckSecondaryLocks.Inc()
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
resp, err := lr.store.SendReq(bo, req, curRegionID, client.ReadTimeoutShort)
if err != nil {
return err
}
regionErr, err := resp.GetRegionError()
if err != nil {
return err
}
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return err
}
logutil.BgLogger().Debug("checkSecondaries: region error, regrouping", zap.Uint64("txn id", txnID), zap.Uint64("region", curRegionID.GetID()))
// If regions have changed, then we might need to regroup the keys. Since this should be rare and for the sake
// of simplicity, we will resolve regions sequentially.
regions, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, curKeys, nil)
if err != nil {
return err
}
for regionID, keys := range regions {
// Recursion will terminate because the resolve request succeeds or the Backoffer reaches its limit.
if err = lr.checkSecondaries(bo, txnID, keys, regionID, shared); err != nil {
return err
}
}
return nil
}
if resp.Resp == nil {
return errors.WithStack(tikverr.ErrBodyMissing)
}
checkResp := resp.Resp.(*kvrpcpb.CheckSecondaryLocksResponse)
return shared.addKeys(checkResp.Locks, len(curKeys), txnID, checkResp.CommitTs)
}
// resolveAsyncResolveData resolves all locks in an async-commit transaction according to the status.
func (lr *LockResolver) resolveAsyncResolveData(bo *retry.Backoffer, l *Lock, status TxnStatus, data *asyncResolveData) error {
util.EvalFailpoint("resolveAsyncResolveData")
keysByRegion, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, data.keys, nil)
if err != nil {
return err
}
errChan := make(chan error, len(keysByRegion))
// Resolve every lock in the transaction.
for region, locks := range keysByRegion {
curLocks := locks
curRegion := region
resolveBo, cancel := bo.Fork()
defer cancel()
go func() {
errChan <- lr.resolveRegionLocks(resolveBo, l, curRegion, curLocks, status)
}()
}
var errs []string
for range keysByRegion {
err1 := <-errChan
if err1 != nil {
errs = append(errs, err1.Error())
}
}
if len(errs) > 0 {
return errors.Errorf("async commit recovery (sending ResolveLock) finished with errors: %v", errs)
}
return nil
}
// resolveLockAsync resolves l assuming it was locked using the async commit protocol.
func (lr *LockResolver) resolveAsyncCommitLock(bo *retry.Backoffer, l *Lock, status TxnStatus, asyncResolveAll bool) (TxnStatus, error) {
metrics.LockResolverCountWithResolveAsync.Inc()
resolveData, err := lr.checkAllSecondaries(bo, l, &status)
if err != nil {
return TxnStatus{}, err
}
resolveData.keys = append(resolveData.keys, l.Primary)
status.commitTS = resolveData.commitTs
if status.StatusCacheable() {
lr.saveResolved(l.TxnID, status)
}
logutil.BgLogger().Info("resolve async commit", zap.Uint64("startTS", l.TxnID), zap.Uint64("commitTS", status.commitTS))
if asyncResolveAll {
asyncBo := retry.NewBackoffer(lr.asyncResolveCtx, asyncResolveLockMaxBackoff)
go func() {
err := lr.resolveAsyncResolveData(asyncBo, l, status, resolveData)
if err != nil {
logutil.BgLogger().Info("failed to resolve async-commit locks asynchronously",
zap.Uint64("startTS", l.TxnID), zap.Uint64("commitTS", status.CommitTS()), zap.Error(err))
}
}()
} else {
err = lr.resolveAsyncResolveData(bo, l, status, resolveData)
}
return status, err
}
// checkAllSecondaries checks the secondary locks of an async commit transaction to find out the final
// status of the transaction
func (lr *LockResolver) checkAllSecondaries(bo *retry.Backoffer, l *Lock, status *TxnStatus) (*asyncResolveData, error) {
regions, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, status.primaryLock.Secondaries, nil)
if err != nil {
return nil, err
}
shared := asyncResolveData{
mutex: sync.Mutex{},
commitTs: status.primaryLock.MinCommitTs,
keys: [][]byte{},
missingLock: false,
}
errChan := make(chan error, len(regions))
for regionID, keys := range regions {
curRegionID := regionID
curKeys := keys
checkBo, cancel := bo.Fork()
defer cancel()
go func() {
errChan <- lr.checkSecondaries(checkBo, l.TxnID, curKeys, curRegionID, &shared)
}()
}
for range regions {
err := <-errChan
if err != nil {
return nil, err
}
}
return &shared, nil
}
// resolveRegionLocks is essentially the same as resolveLock, but we resolve all keys in the same region at the same time.
func (lr *LockResolver) resolveRegionLocks(bo *retry.Backoffer, l *Lock, region locate.RegionVerID, keys [][]byte, status TxnStatus) error {
lreq := &kvrpcpb.ResolveLockRequest{
StartVersion: l.TxnID,
}
if status.IsCommitted() {
lreq.CommitVersion = status.CommitTS()
}
lreq.Keys = keys
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq)
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
resp, err := lr.store.SendReq(bo, req, region, client.ReadTimeoutShort)
if err != nil {
return err
}
regionErr, err := resp.GetRegionError()
if err != nil {
return err
}
if regionErr != nil {
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return err
}
logutil.BgLogger().Info("resolveRegionLocks region error, regrouping", zap.String("lock", l.String()), zap.Uint64("region", region.GetID()))
// Regroup locks.
regions, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, keys, nil)
if err != nil {
return err
}
for regionID, keys := range regions {
// Recursion will terminate because the resolve request succeeds or the Backoffer reaches its limit.
if err = lr.resolveRegionLocks(bo, l, regionID, keys, status); err != nil {
return err
}
}
return nil
}
if resp.Resp == nil {
return errors.WithStack(tikverr.ErrBodyMissing)
}
cmdResp := resp.Resp.(*kvrpcpb.ResolveLockResponse)
if keyErr := cmdResp.GetError(); keyErr != nil {
err = errors.Errorf("unexpected resolve err: %s, lock: %v", keyErr, l)
logutil.BgLogger().Error("resolveLock error", zap.Error(err))
}
return nil
}
func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStatus, lite bool, cleanRegions map[locate.RegionVerID]struct{}) error {
util.EvalFailpoint("resolveLock")
metrics.LockResolverCountWithResolveLocks.Inc()
resolveLite := lite || l.TxnSize < lr.resolveLockLiteThreshold
// The lock has been resolved by getTxnStatusFromLock.
if resolveLite && bytes.Equal(l.Key, l.Primary) {
return nil
}
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, l.Key)
if err != nil {
return err
}
if _, ok := cleanRegions[loc.Region]; ok {
return nil
}
lreq := &kvrpcpb.ResolveLockRequest{
StartVersion: l.TxnID,
}
if status.IsCommitted() {
lreq.CommitVersion = status.CommitTS()
} else {
logutil.BgLogger().Info("resolveLock rollback", zap.String("lock", l.String()))
}
if resolveLite {
// Only resolve specified keys when it is a small transaction,
// prevent from scanning the whole region in this case.
metrics.LockResolverCountWithResolveLockLite.Inc()
lreq.Keys = [][]byte{l.Key}
}
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq)
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
resp, err := lr.store.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
if err != nil {
return err
}
regionErr, err := resp.GetRegionError()
if err != nil {
return err
}
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return err
}
continue
}
if resp.Resp == nil {
return errors.WithStack(tikverr.ErrBodyMissing)
}
cmdResp := resp.Resp.(*kvrpcpb.ResolveLockResponse)
if keyErr := cmdResp.GetError(); keyErr != nil {
err = errors.Errorf("unexpected resolve err: %s, lock: %v", keyErr, l)
logutil.BgLogger().Error("resolveLock error", zap.Error(err))
return err
}
if !resolveLite {
cleanRegions[loc.Region] = struct{}{}
}
return nil
}
}
func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) error {
metrics.LockResolverCountWithResolveLocks.Inc()
// The lock has been resolved by getTxnStatusFromLock.
if bytes.Equal(l.Key, l.Primary) {
return nil
}
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, l.Key)
if err != nil {
return err
}
forUpdateTS := l.LockForUpdateTS
if forUpdateTS == 0 {
forUpdateTS = math.MaxUint64
}
pessimisticRollbackReq := &kvrpcpb.PessimisticRollbackRequest{
StartVersion: l.TxnID,
ForUpdateTs: forUpdateTS,
Keys: [][]byte{l.Key},
}
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticRollback, pessimisticRollbackReq)
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
resp, err := lr.store.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
if err != nil {
return err
}
regionErr, err := resp.GetRegionError()
if err != nil {
return err
}
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return err
}
continue
}
if resp.Resp == nil {
return errors.WithStack(tikverr.ErrBodyMissing)
}
cmdResp := resp.Resp.(*kvrpcpb.PessimisticRollbackResponse)
if keyErr := cmdResp.GetErrors(); len(keyErr) > 0 {
err = errors.Errorf("unexpected resolve pessimistic lock err: %s, lock: %v", keyErr[0], l)
logutil.Logger(bo.GetCtx()).Error("resolveLock error", zap.Error(err))
return err
}
return nil
}
}