txnkv/txnsnapshot: init package for txnsnapshot (#245)

Signed-off-by: shirly <AndreMouche@126.com>
This commit is contained in:
Shirly 2021-07-27 12:10:35 +08:00 committed by GitHub
parent ce977e34b0
commit 070dd85543
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 281 additions and 141 deletions

View File

@ -39,6 +39,9 @@ import (
"github.com/pingcap/errors" "github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
) )
var ( var (
@ -220,3 +223,37 @@ type ErrTokenLimit struct {
func (e *ErrTokenLimit) Error() string { func (e *ErrTokenLimit) Error() string {
return fmt.Sprintf("Store token is up to the limit, store id = %d.", e.StoreID) return fmt.Sprintf("Store token is up to the limit, store id = %d.", e.StoreID)
} }
// ExtractKeyErr extracts a KeyError.
func ExtractKeyErr(keyErr *kvrpcpb.KeyError) error {
if val, err := util.EvalFailpoint("mockRetryableErrorResp"); err == nil {
if val.(bool) {
keyErr.Conflict = nil
keyErr.Retryable = "mock retryable error"
}
}
if keyErr.Conflict != nil {
return &ErrWriteConflict{WriteConflict: keyErr.GetConflict()}
}
if keyErr.Retryable != "" {
return &ErrRetryable{Retryable: keyErr.Retryable}
}
if keyErr.Abort != "" {
err := errors.Errorf("tikv aborts txn: %s", keyErr.GetAbort())
logutil.BgLogger().Warn("2PC failed", zap.Error(err))
return errors.Trace(err)
}
if keyErr.CommitTsTooLarge != nil {
err := errors.Errorf("commit TS %v is too large", keyErr.CommitTsTooLarge.CommitTs)
logutil.BgLogger().Warn("2PC failed", zap.Error(err))
return errors.Trace(err)
}
if keyErr.TxnNotFound != nil {
err := errors.Errorf("txn %d not found", keyErr.TxnNotFound.StartTs)
return errors.Trace(err)
}
return errors.Errorf("unexpected KeyError: %s", keyErr.String())
}

View File

@ -125,7 +125,7 @@ func (s *testAsyncCommitCommon) mustGetLock(key []byte) *txnkv.Lock {
s.NotNil(resp.Resp) s.NotNil(resp.Resp)
keyErr := resp.Resp.(*kvrpcpb.GetResponse).GetError() keyErr := resp.Resp.(*kvrpcpb.GetResponse).GetError()
s.NotNil(keyErr) s.NotNil(keyErr)
lock, err := tikv.ExtractLockFromKeyErr(keyErr) lock, err := txnlock.ExtractLockFromKeyErr(keyErr)
s.Nil(err) s.Nil(err)
return lock return lock
} }

View File

@ -436,7 +436,7 @@ func (s *testLockSuite) mustGetLock(key []byte) *txnkv.Lock {
s.NotNil(resp.Resp) s.NotNil(resp.Resp)
keyErr := resp.Resp.(*kvrpcpb.GetResponse).GetError() keyErr := resp.Resp.(*kvrpcpb.GetResponse).GetError()
s.NotNil(keyErr) s.NotNil(keyErr)
lock, err := tikv.ExtractLockFromKeyErr(keyErr) lock, err := txnlock.ExtractLockFromKeyErr(keyErr)
s.Nil(err) s.Nil(err)
return lock return lock
} }

View File

@ -905,7 +905,7 @@ func sendTxnHeartBeat(bo *Backoffer, store *KVStore, primary []byte, startTS, tt
} }
cmdResp := resp.Resp.(*kvrpcpb.TxnHeartBeatResponse) cmdResp := resp.Resp.(*kvrpcpb.TxnHeartBeatResponse)
if keyErr := cmdResp.GetError(); keyErr != nil { if keyErr := cmdResp.GetError(); keyErr != nil {
return 0, true, errors.Errorf("txn %d heartbeat fail, primary key = %v, err = %s", startTS, hex.EncodeToString(primary), extractKeyErr(keyErr)) return 0, true, errors.Errorf("txn %d heartbeat fail, primary key = %v, err = %s", startTS, hex.EncodeToString(primary), tikverr.ExtractKeyErr(keyErr))
} }
return cmdResp.GetLockTtl(), false, nil return cmdResp.GetLockTtl(), false, nil
} }

View File

@ -163,7 +163,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
err = extractKeyErr(keyErr) err = tikverr.ExtractKeyErr(keyErr)
if c.mu.committed { if c.mu.committed {
// No secondary key could be rolled back after it's primary key is committed. // No secondary key could be rolled back after it's primary key is committed.
// There must be a serious bug somewhere. // There must be a serious bug somewhere.

View File

@ -61,6 +61,8 @@ import (
"github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/rangetask" "github.com/tikv/client-go/v2/txnkv/rangetask"
"github.com/tikv/client-go/v2/txnkv/txnlock" "github.com/tikv/client-go/v2/txnkv/txnlock"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
"github.com/tikv/client-go/v2/txnkv/txnutil"
"github.com/tikv/client-go/v2/util" "github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client" pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3"
@ -299,8 +301,8 @@ func (s *KVStore) DeleteRange(ctx context.Context, startKey []byte, endKey []byt
// GetSnapshot gets a snapshot that is able to read any data which data is <= ver. // GetSnapshot gets a snapshot that is able to read any data which data is <= ver.
// if ts is MaxVersion or > current max committed version, we will use current version for this snapshot. // if ts is MaxVersion or > current max committed version, we will use current version for this snapshot.
func (s *KVStore) GetSnapshot(ts uint64) *KVSnapshot { func (s *KVStore) GetSnapshot(ts uint64) *txnsnapshot.KVSnapshot {
snapshot := newTiKVSnapshot(s, ts, s.nextReplicaReadSeed()) snapshot := txnsnapshot.NewTiKVSnapshot(s, ts, s.nextReplicaReadSeed())
return snapshot return snapshot
} }
@ -572,6 +574,36 @@ func NewLockResolver(etcdAddrs []string, security config.Security, opts ...pd.Cl
return s.lockResolver, nil return s.lockResolver, nil
} }
// TODO: remove it when tidb/br is ready.
// Scanner support tikv scan
type Scanner = txnsnapshot.Scanner
// KVSnapshot implements the tidbkv.Snapshot interface.
type KVSnapshot = txnsnapshot.KVSnapshot
// SnapshotRuntimeStats records the runtime stats of snapshot.
type SnapshotRuntimeStats = txnsnapshot.SnapshotRuntimeStats
// IsoLevel is the transaction's isolation level.
type IsoLevel = txnsnapshot.IsoLevel
// IsoLevel value for transaction priority.
const (
SI = txnsnapshot.SI
RC = txnsnapshot.RC
)
// Priority is the priority for tikv to execute a command.
type Priority = txnutil.Priority
// Priority value for transaction priority.
const (
PriorityHigh = txnutil.PriorityHigh
PriorityNormal = txnutil.PriorityNormal
PriorityLow = txnutil.PriorityLow
)
// TODO: remove Lock&LockResolver&TxnStatus once tidb and br are ready. // TODO: remove Lock&LockResolver&TxnStatus once tidb and br are ready.
// Lock represents a lock from tikv server. // Lock represents a lock from tikv server.

View File

@ -191,7 +191,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
} }
// Extract lock from key error // Extract lock from key error
lock, err1 := extractLockFromKeyErr(keyErr) lock, err1 := txnlock.ExtractLockFromKeyErr(keyErr)
if err1 != nil { if err1 != nil {
return errors.Trace(err1) return errors.Trace(err1)
} }

View File

@ -304,7 +304,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff
} }
// Extract lock from key error // Extract lock from key error
lock, err1 := extractLockFromKeyErr(keyErr) lock, err1 := txnlock.ExtractLockFromKeyErr(keyErr)
if err1 != nil { if err1 != nil {
return errors.Trace(err1) return errors.Trace(err1)
} }

View File

@ -39,13 +39,13 @@ import (
"time" "time"
"github.com/pingcap/errors" "github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/client-go/v2/internal/locate" "github.com/tikv/client-go/v2/internal/locate"
"github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/internal/unionstore" "github.com/tikv/client-go/v2/internal/unionstore"
"github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnlock" "github.com/tikv/client-go/v2/txnkv/txnlock"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
pd "github.com/tikv/pd/client" pd "github.com/tikv/pd/client"
) )
@ -72,9 +72,9 @@ func (s StoreProbe) Begin() (TxnProbe, error) {
} }
// GetSnapshot returns a snapshot. // GetSnapshot returns a snapshot.
func (s StoreProbe) GetSnapshot(ts uint64) SnapshotProbe { func (s StoreProbe) GetSnapshot(ts uint64) txnsnapshot.SnapshotProbe {
snap := s.KVStore.GetSnapshot(ts) snap := s.KVStore.GetSnapshot(ts)
return SnapshotProbe{KVSnapshot: snap} return txnsnapshot.SnapshotProbe{KVSnapshot: snap}
} }
// SetRegionCachePDClient replaces pd client inside region cache. // SetRegionCachePDClient replaces pd client inside region cache.
@ -165,13 +165,15 @@ func (txn TxnProbe) CollectLockedKeys() [][]byte {
// BatchGetSingleRegion gets a batch of keys from a region. // BatchGetSingleRegion gets a batch of keys from a region.
func (txn TxnProbe) BatchGetSingleRegion(bo *Backoffer, region locate.RegionVerID, keys [][]byte, collect func([]byte, []byte)) error { func (txn TxnProbe) BatchGetSingleRegion(bo *Backoffer, region locate.RegionVerID, keys [][]byte, collect func([]byte, []byte)) error {
snapshot := txn.GetSnapshot() snapshot := txnsnapshot.SnapshotProbe{KVSnapshot: txn.GetSnapshot()}
return snapshot.batchGetSingleRegion(bo, batchKeys{region: region, keys: keys}, collect)
return snapshot.BatchGetSingleRegion(bo, region, keys, collect)
} }
// NewScanner returns a scanner to iterate given key range. // NewScanner returns a scanner to iterate given key range.
func (txn TxnProbe) NewScanner(start, end []byte, batchSize int, reverse bool) (*Scanner, error) { func (txn TxnProbe) NewScanner(start, end []byte, batchSize int, reverse bool) (*txnsnapshot.Scanner, error) {
return newScanner(txn.GetSnapshot(), start, end, batchSize, reverse) snapshot := txnsnapshot.SnapshotProbe{KVSnapshot: txn.GetSnapshot()}
return snapshot.NewScanner(start, end, batchSize, reverse)
} }
// GetStartTime returns the time when txn starts. // GetStartTime returns the time when txn starts.
@ -407,33 +409,6 @@ func (c CommitterProbe) CleanupMutations(ctx context.Context) error {
return c.cleanupMutations(bo, c.mutations) return c.cleanupMutations(bo, c.mutations)
} }
// SnapshotProbe exposes some snapshot utilities for testing purpose.
type SnapshotProbe struct {
*KVSnapshot
}
// MergeRegionRequestStats merges RPC runtime stats into snapshot's stats.
func (s SnapshotProbe) MergeRegionRequestStats(stats map[tikvrpc.CmdType]*locate.RPCRuntimeStats) {
s.mergeRegionRequestStats(stats)
}
// RecordBackoffInfo records backoff stats into snapshot's stats.
func (s SnapshotProbe) RecordBackoffInfo(bo *Backoffer) {
s.recordBackoffInfo(bo)
}
// MergeExecDetail merges exec stats into snapshot's stats.
func (s SnapshotProbe) MergeExecDetail(detail *kvrpcpb.ExecDetailsV2) {
s.mergeExecDetail(detail)
}
// FormatStats dumps information of stats.
func (s SnapshotProbe) FormatStats() string {
s.mu.Lock()
defer s.mu.Unlock()
return s.mu.stats.String()
}
// LockResolverProbe wraps a LockResolver and exposes internal stats for testing purpose. // LockResolverProbe wraps a LockResolver and exposes internal stats for testing purpose.
type LockResolverProbe struct { type LockResolverProbe struct {
*txnlock.LockResolverProbe *txnlock.LockResolverProbe
@ -457,11 +432,6 @@ func (l LockResolverProbe) ResolvePessimisticLock(ctx context.Context, lock *txn
return l.LockResolverProbe.ResolvePessimisticLock(bo, lock) return l.LockResolverProbe.ResolvePessimisticLock(bo, lock)
} }
// ExtractLockFromKeyErr makes a Lock based on a key error.
func ExtractLockFromKeyErr(err *kvrpcpb.KeyError) (*txnlock.Lock, error) {
return extractLockFromKeyErr(err)
}
// ConfigProbe exposes configurations and global variables for testing purpose. // ConfigProbe exposes configurations and global variables for testing purpose.
type ConfigProbe struct{} type ConfigProbe struct{}
@ -479,7 +449,7 @@ func (c ConfigProbe) GetBigTxnThreshold() int {
// GetScanBatchSize returns the batch size to scan ranges. // GetScanBatchSize returns the batch size to scan ranges.
func (c ConfigProbe) GetScanBatchSize() int { func (c ConfigProbe) GetScanBatchSize() int {
return defaultScanBatchSize return txnsnapshot.ConfigProbe{}.GetScanBatchSize()
} }
// GetDefaultLockTTL returns the default lock TTL. // GetDefaultLockTTL returns the default lock TTL.
@ -494,7 +464,7 @@ func (c ConfigProbe) GetTTLFactor() int {
// GetGetMaxBackoff returns the max sleep for get command. // GetGetMaxBackoff returns the max sleep for get command.
func (c ConfigProbe) GetGetMaxBackoff() int { func (c ConfigProbe) GetGetMaxBackoff() int {
return getMaxBackoff return txnsnapshot.ConfigProbe{}.GetGetMaxBackoff()
} }
// LoadPreSplitDetectThreshold returns presplit detect threshold config. // LoadPreSplitDetectThreshold returns presplit detect threshold config.

View File

@ -57,6 +57,8 @@ import (
tikv "github.com/tikv/client-go/v2/kv" tikv "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
"github.com/tikv/client-go/v2/txnkv/txnutil"
"github.com/tikv/client-go/v2/util" "github.com/tikv/client-go/v2/util"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -99,7 +101,7 @@ func (to StartTSOption) SetTxnScope(txnScope string) StartTSOption {
// KVTxn contains methods to interact with a TiKV transaction. // KVTxn contains methods to interact with a TiKV transaction.
type KVTxn struct { type KVTxn struct {
snapshot *KVSnapshot snapshot *txnsnapshot.KVSnapshot
us *unionstore.KVUnionStore us *unionstore.KVUnionStore
store *KVStore // for connection to region. store *KVStore // for connection to region.
startTS uint64 startTS uint64
@ -123,7 +125,7 @@ type KVTxn struct {
binlog BinlogExecutor binlog BinlogExecutor
schemaLeaseChecker SchemaLeaseChecker schemaLeaseChecker SchemaLeaseChecker
syncLog bool syncLog bool
priority Priority priority txnutil.Priority
isPessimistic bool isPessimistic bool
enableAsyncCommit bool enableAsyncCommit bool
enable1PC bool enable1PC bool
@ -150,7 +152,7 @@ func newTiKVTxnWithOptions(store *KVStore, options StartTSOption) (*KVTxn, error
if err != nil { if err != nil {
return nil, errors.Trace(err) return nil, errors.Trace(err)
} }
snapshot := newTiKVSnapshot(store, startTS, store.nextReplicaReadSeed()) snapshot := txnsnapshot.NewTiKVSnapshot(store, startTS, store.nextReplicaReadSeed())
cfg := config.GetGlobalConfig() cfg := config.GetGlobalConfig()
newTiKVTxn := &KVTxn{ newTiKVTxn := &KVTxn{
snapshot: snapshot, snapshot: snapshot,
@ -173,7 +175,7 @@ var SetSuccess = false
// SetVars sets variables to the transaction. // SetVars sets variables to the transaction.
func (txn *KVTxn) SetVars(vars *tikv.Variables) { func (txn *KVTxn) SetVars(vars *tikv.Variables) {
txn.vars = vars txn.vars = vars
txn.snapshot.vars = vars txn.snapshot.SetVars(vars)
if val, err := util.EvalFailpoint("probeSetVars"); err == nil { if val, err := util.EvalFailpoint("probeSetVars"); err == nil {
if val.(bool) { if val.(bool) {
SetSuccess = true SetSuccess = true
@ -257,7 +259,7 @@ func (txn *KVTxn) SetSchemaVer(schemaVer SchemaVer) {
} }
// SetPriority sets the priority for both write and read. // SetPriority sets the priority for both write and read.
func (txn *KVTxn) SetPriority(pri Priority) { func (txn *KVTxn) SetPriority(pri txnutil.Priority) {
txn.priority = pri txn.priority = pri
txn.GetSnapshot().SetPriority(pri) txn.GetSnapshot().SetPriority(pri)
} }
@ -790,7 +792,7 @@ func (txn *KVTxn) GetMemBuffer() *MemDB {
} }
// GetSnapshot returns the Snapshot binding to this transaction. // GetSnapshot returns the Snapshot binding to this transaction.
func (txn *KVTxn) GetSnapshot() *KVSnapshot { func (txn *KVTxn) GetSnapshot() *txnsnapshot.KVSnapshot {
return txn.snapshot return txn.snapshot
} }

27
txnkv/txnlock/lock.go Normal file
View File

@ -0,0 +1,27 @@
// Copyright 2021 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package txnlock
import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
tikverr "github.com/tikv/client-go/v2/error"
)
// ExtractLockFromKeyErr extracts the KeyError.
func ExtractLockFromKeyErr(keyErr *kvrpcpb.KeyError) (*Lock, error) {
if locked := keyErr.GetLocked(); locked != nil {
return NewLock(locked), nil
}
return nil, tikverr.ExtractKeyErr(keyErr)
}

View File

@ -30,12 +30,14 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package tikv package txnsnapshot
import ( import (
"time" "time"
"github.com/tikv/client-go/v2/internal/client"
"github.com/tikv/client-go/v2/internal/locate" "github.com/tikv/client-go/v2/internal/locate"
"github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnlock" "github.com/tikv/client-go/v2/txnkv/txnlock"
"github.com/tikv/client-go/v2/util" "github.com/tikv/client-go/v2/util"
@ -52,13 +54,13 @@ type ClientHelper struct {
lockResolver *txnlock.LockResolver lockResolver *txnlock.LockResolver
regionCache *locate.RegionCache regionCache *locate.RegionCache
resolvedLocks *util.TSSet resolvedLocks *util.TSSet
client Client client client.Client
resolveLite bool resolveLite bool
locate.RegionRequestRuntimeStats locate.RegionRequestRuntimeStats
} }
// NewClientHelper creates a helper instance. // NewClientHelper creates a helper instance.
func NewClientHelper(store *KVStore, resolvedLocks *util.TSSet, resolveLite bool) *ClientHelper { func NewClientHelper(store kvstore, resolvedLocks *util.TSSet, resolveLite bool) *ClientHelper {
return &ClientHelper{ return &ClientHelper{
lockResolver: store.GetLockResolver(), lockResolver: store.GetLockResolver(),
regionCache: store.GetRegionCache(), regionCache: store.GetRegionCache(),
@ -69,7 +71,7 @@ func NewClientHelper(store *KVStore, resolvedLocks *util.TSSet, resolveLite bool
} }
// ResolveLocks wraps the ResolveLocks function and store the resolved result. // ResolveLocks wraps the ResolveLocks function and store the resolved result.
func (ch *ClientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks []*txnlock.Lock) (int64, error) { func (ch *ClientHelper) ResolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*txnlock.Lock) (int64, error) {
var err error var err error
var resolvedLocks []uint64 var resolvedLocks []uint64
var msBeforeTxnExpired int64 var msBeforeTxnExpired int64
@ -94,7 +96,7 @@ func (ch *ClientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks
} }
// SendReqCtx wraps the SendReqCtx function and use the resolved lock result in the kvrpcpb.Context. // SendReqCtx wraps the SendReqCtx function and use the resolved lock result in the kvrpcpb.Context.
func (ch *ClientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID locate.RegionVerID, timeout time.Duration, et tikvrpc.EndpointType, directStoreAddr string, opts ...locate.StoreSelectorOption) (*tikvrpc.Response, *locate.RPCContext, string, error) { func (ch *ClientHelper) SendReqCtx(bo *retry.Backoffer, req *tikvrpc.Request, regionID locate.RegionVerID, timeout time.Duration, et tikvrpc.EndpointType, directStoreAddr string, opts ...locate.StoreSelectorOption) (*tikvrpc.Response, *locate.RPCContext, string, error) {
sender := locate.NewRegionRequestSender(ch.regionCache, ch.client) sender := locate.NewRegionRequestSender(ch.regionCache, ch.client)
if len(directStoreAddr) > 0 { if len(directStoreAddr) > 0 {
sender.SetStoreAddr(directStoreAddr) sender.SetStoreAddr(directStoreAddr)

View File

@ -30,7 +30,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package tikv package txnsnapshot
import ( import (
"bytes" "bytes"
@ -169,7 +169,7 @@ func (s *Scanner) startTS() uint64 {
return s.snapshot.version return s.snapshot.version
} }
func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *kvrpcpb.KvPair) error { func (s *Scanner) resolveCurrentLock(bo *retry.Backoffer, current *kvrpcpb.KvPair) error {
ctx := context.Background() ctx := context.Background()
val, err := s.snapshot.get(ctx, bo, current.Key) val, err := s.snapshot.get(ctx, bo, current.Key)
if err != nil { if err != nil {
@ -180,21 +180,21 @@ func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *kvrpcpb.KvPair) err
return nil return nil
} }
func (s *Scanner) getData(bo *Backoffer) error { func (s *Scanner) getData(bo *retry.Backoffer) error {
logutil.BgLogger().Debug("txn getData", logutil.BgLogger().Debug("txn getData",
zap.String("nextStartKey", kv.StrKey(s.nextStartKey)), zap.String("nextStartKey", kv.StrKey(s.nextStartKey)),
zap.String("nextEndKey", kv.StrKey(s.nextEndKey)), zap.String("nextEndKey", kv.StrKey(s.nextEndKey)),
zap.Bool("reverse", s.reverse), zap.Bool("reverse", s.reverse),
zap.Uint64("txnStartTS", s.startTS())) zap.Uint64("txnStartTS", s.startTS()))
sender := locate.NewRegionRequestSender(s.snapshot.store.regionCache, s.snapshot.store.GetTiKVClient()) sender := locate.NewRegionRequestSender(s.snapshot.store.GetRegionCache(), s.snapshot.store.GetTiKVClient())
var reqEndKey, reqStartKey []byte var reqEndKey, reqStartKey []byte
var loc *locate.KeyLocation var loc *locate.KeyLocation
var err error var err error
for { for {
if !s.reverse { if !s.reverse {
loc, err = s.snapshot.store.regionCache.LocateKey(bo, s.nextStartKey) loc, err = s.snapshot.store.GetRegionCache().LocateKey(bo, s.nextStartKey)
} else { } else {
loc, err = s.snapshot.store.regionCache.LocateEndKey(bo, s.nextEndKey) loc, err = s.snapshot.store.GetRegionCache().LocateEndKey(bo, s.nextEndKey)
} }
if err != nil { if err != nil {
return errors.Trace(err) return errors.Trace(err)
@ -274,7 +274,7 @@ func (s *Scanner) getData(bo *Backoffer) error {
// When there is a response-level key error, the returned pairs are incomplete. // When there is a response-level key error, the returned pairs are incomplete.
// We should resolve the lock first and then retry the same request. // We should resolve the lock first and then retry the same request.
if keyErr := cmdScanResp.GetError(); keyErr != nil { if keyErr := cmdScanResp.GetError(); keyErr != nil {
lock, err := extractLockFromKeyErr(keyErr) lock, err := txnlock.ExtractLockFromKeyErr(keyErr)
if err != nil { if err != nil {
return errors.Trace(err) return errors.Trace(err)
} }
@ -295,7 +295,7 @@ func (s *Scanner) getData(bo *Backoffer) error {
// Check if kvPair contains error, it should be a Lock. // Check if kvPair contains error, it should be a Lock.
for _, pair := range kvPairs { for _, pair := range kvPairs {
if keyErr := pair.GetError(); keyErr != nil && len(pair.Key) == 0 { if keyErr := pair.GetError(); keyErr != nil && len(pair.Key) == 0 {
lock, err := extractLockFromKeyErr(keyErr) lock, err := txnlock.ExtractLockFromKeyErr(keyErr)
if err != nil { if err != nil {
return errors.Trace(err) return errors.Trace(err)
} }

View File

@ -30,7 +30,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package tikv package txnsnapshot
import ( import (
"bytes" "bytes"
@ -51,10 +51,13 @@ import (
"github.com/tikv/client-go/v2/internal/locate" "github.com/tikv/client-go/v2/internal/locate"
"github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/internal/unionstore"
"github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnlock" "github.com/tikv/client-go/v2/txnkv/txnlock"
"github.com/tikv/client-go/v2/txnkv/txnutil"
"github.com/tikv/client-go/v2/util" "github.com/tikv/client-go/v2/util"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -65,21 +68,6 @@ const (
maxTimestamp = math.MaxUint64 maxTimestamp = math.MaxUint64
) )
// Priority is the priority for tikv to execute a command.
type Priority kvrpcpb.CommandPri
// Priority value for transaction priority.
const (
PriorityNormal = Priority(kvrpcpb.CommandPri_Normal)
PriorityLow = Priority(kvrpcpb.CommandPri_Low)
PriorityHigh = Priority(kvrpcpb.CommandPri_High)
)
// ToPB converts priority to wire type.
func (p Priority) ToPB() kvrpcpb.CommandPri {
return kvrpcpb.CommandPri(p)
}
// IsoLevel is the transaction's isolation level. // IsoLevel is the transaction's isolation level.
type IsoLevel kvrpcpb.IsolationLevel type IsoLevel kvrpcpb.IsolationLevel
@ -95,12 +83,25 @@ func (l IsoLevel) ToPB() kvrpcpb.IsolationLevel {
return kvrpcpb.IsolationLevel(l) return kvrpcpb.IsolationLevel(l)
} }
type kvstore interface {
CheckVisibility(startTime uint64) error
// GetRegionCache gets the RegionCache.
GetRegionCache() *locate.RegionCache
GetLockResolver() *txnlock.LockResolver
GetTiKVClient() (client client.Client)
// SendReq sends a request to TiKV.
SendReq(bo *retry.Backoffer, req *tikvrpc.Request, regionID locate.RegionVerID, timeout time.Duration) (*tikvrpc.Response, error)
// GetOracle gets a timestamp oracle client.
GetOracle() oracle.Oracle
}
// KVSnapshot implements the tidbkv.Snapshot interface. // KVSnapshot implements the tidbkv.Snapshot interface.
type KVSnapshot struct { type KVSnapshot struct {
store *KVStore store kvstore
version uint64 version uint64
isolationLevel IsoLevel isolationLevel IsoLevel
priority Priority priority txnutil.Priority
notFillCache bool notFillCache bool
keyOnly bool keyOnly bool
vars *kv.Variables vars *kv.Variables
@ -133,8 +134,8 @@ type KVSnapshot struct {
resourceGroupTag []byte resourceGroupTag []byte
} }
// newTiKVSnapshot creates a snapshot of an TiKV store. // NewTiKVSnapshot creates a snapshot of an TiKV store.
func newTiKVSnapshot(store *KVStore, ts uint64, replicaReadSeed uint32) *KVSnapshot { func NewTiKVSnapshot(store kvstore, ts uint64, replicaReadSeed uint32) *KVSnapshot {
// Sanity check for snapshot version. // Sanity check for snapshot version.
if ts >= math.MaxInt64 && ts != math.MaxUint64 { if ts >= math.MaxInt64 && ts != math.MaxUint64 {
err := errors.Errorf("try to get snapshot with a large ts %d", ts) err := errors.Errorf("try to get snapshot with a large ts %d", ts)
@ -144,7 +145,7 @@ func newTiKVSnapshot(store *KVStore, ts uint64, replicaReadSeed uint32) *KVSnaps
store: store, store: store,
version: ts, version: ts,
scanBatchSize: defaultScanBatchSize, scanBatchSize: defaultScanBatchSize,
priority: PriorityNormal, priority: txnutil.PriorityNormal,
vars: kv.DefaultVars, vars: kv.DefaultVars,
replicaReadSeed: replicaReadSeed, replicaReadSeed: replicaReadSeed,
} }
@ -253,7 +254,7 @@ type batchKeys struct {
keys [][]byte keys [][]byte
} }
func (b *batchKeys) relocate(bo *Backoffer, c *RegionCache) (bool, error) { func (b *batchKeys) relocate(bo *retry.Backoffer, c *locate.RegionCache) (bool, error) {
loc, err := c.LocateKey(bo, b.keys[0]) loc, err := c.LocateKey(bo, b.keys[0])
if err != nil { if err != nil {
return false, errors.Trace(err) return false, errors.Trace(err)
@ -285,11 +286,11 @@ func appendBatchKeysBySize(b []batchKeys, region locate.RegionVerID, keys [][]by
return b return b
} }
func (s *KVSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, collectF func(k, v []byte)) error { func (s *KVSnapshot) batchGetKeysByRegions(bo *retry.Backoffer, keys [][]byte, collectF func(k, v []byte)) error {
defer func(start time.Time) { defer func(start time.Time) {
metrics.TxnCmdHistogramWithBatchGet.Observe(time.Since(start).Seconds()) metrics.TxnCmdHistogramWithBatchGet.Observe(time.Since(start).Seconds())
}(time.Now()) }(time.Now())
groups, _, err := s.store.regionCache.GroupKeysByRegion(bo, keys, nil) groups, _, err := s.store.GetRegionCache().GroupKeysByRegion(bo, keys, nil)
if err != nil { if err != nil {
return errors.Trace(err) return errors.Trace(err)
} }
@ -327,7 +328,7 @@ func (s *KVSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, collect
return errors.Trace(err) return errors.Trace(err)
} }
func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collectF func(k, v []byte)) error { func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, collectF func(k, v []byte)) error {
cli := NewClientHelper(s.store, &s.resolvedLocks, false) cli := NewClientHelper(s.store, &s.resolvedLocks, false)
s.mu.RLock() s.mu.RLock()
if s.mu.stats != nil { if s.mu.stats != nil {
@ -358,7 +359,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collec
if isStaleness { if isStaleness {
req.EnableStaleRead() req.EnableStaleRead()
} }
ops := make([]StoreSelectorOption, 0, 2) ops := make([]locate.StoreSelectorOption, 0, 2)
if len(matchStoreLabels) > 0 { if len(matchStoreLabels) > 0 {
ops = append(ops, locate.WithMatchLabels(matchStoreLabels)) ops = append(ops, locate.WithMatchLabels(matchStoreLabels))
} }
@ -400,7 +401,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collec
) )
if keyErr := batchGetResp.GetError(); keyErr != nil { if keyErr := batchGetResp.GetError(); keyErr != nil {
// If a response-level error happens, skip reading pairs. // If a response-level error happens, skip reading pairs.
lock, err := extractLockFromKeyErr(keyErr) lock, err := txnlock.ExtractLockFromKeyErr(keyErr)
if err != nil { if err != nil {
return errors.Trace(err) return errors.Trace(err)
} }
@ -413,7 +414,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collec
collectF(pair.GetKey(), pair.GetValue()) collectF(pair.GetKey(), pair.GetValue())
continue continue
} }
lock, err := extractLockFromKeyErr(keyErr) lock, err := txnlock.ExtractLockFromKeyErr(keyErr)
if err != nil { if err != nil {
return errors.Trace(err) return errors.Trace(err)
} }
@ -475,7 +476,7 @@ func (s *KVSnapshot) Get(ctx context.Context, k []byte) ([]byte, error) {
return val, nil return val, nil
} }
func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte, error) { func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]byte, error) {
// Check the cached values first. // Check the cached values first.
s.mu.RLock() s.mu.RLock()
if s.mu.cached != nil { if s.mu.cached != nil {
@ -532,7 +533,7 @@ func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte,
var firstLock *txnlock.Lock var firstLock *txnlock.Lock
for { for {
util.EvalFailpoint("beforeSendPointGet") util.EvalFailpoint("beforeSendPointGet")
loc, err := s.store.regionCache.LocateKey(bo, k) loc, err := s.store.GetRegionCache().LocateKey(bo, k)
if err != nil { if err != nil {
return nil, errors.Trace(err) return nil, errors.Trace(err)
} }
@ -568,7 +569,7 @@ func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte,
} }
val := cmdGetResp.GetValue() val := cmdGetResp.GetValue()
if keyErr := cmdGetResp.GetError(); keyErr != nil { if keyErr := cmdGetResp.GetError(); keyErr != nil {
lock, err := extractLockFromKeyErr(keyErr) lock, err := txnlock.ExtractLockFromKeyErr(keyErr)
if err != nil { if err != nil {
return nil, errors.Trace(err) return nil, errors.Trace(err)
} }
@ -615,13 +616,13 @@ func (s *KVSnapshot) mergeExecDetail(detail *kvrpcpb.ExecDetailsV2) {
} }
// Iter return a list of key-value pair after `k`. // Iter return a list of key-value pair after `k`.
func (s *KVSnapshot) Iter(k []byte, upperBound []byte) (Iterator, error) { func (s *KVSnapshot) Iter(k []byte, upperBound []byte) (unionstore.Iterator, error) {
scanner, err := newScanner(s, k, upperBound, s.scanBatchSize, false) scanner, err := newScanner(s, k, upperBound, s.scanBatchSize, false)
return scanner, errors.Trace(err) return scanner, errors.Trace(err)
} }
// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k. // IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
func (s *KVSnapshot) IterReverse(k []byte) (Iterator, error) { func (s *KVSnapshot) IterReverse(k []byte) (unionstore.Iterator, error) {
scanner, err := newScanner(s, nil, k, s.scanBatchSize, true) scanner, err := newScanner(s, nil, k, s.scanBatchSize, true)
return scanner, errors.Trace(err) return scanner, errors.Trace(err)
} }
@ -660,7 +661,7 @@ func (s *KVSnapshot) SetSampleStep(step uint32) {
} }
// SetPriority sets the priority for tikv to execute commands. // SetPriority sets the priority for tikv to execute commands.
func (s *KVSnapshot) SetPriority(pri Priority) { func (s *KVSnapshot) SetPriority(pri txnutil.Priority) {
s.priority = pri s.priority = pri
} }
@ -718,47 +719,12 @@ func (s *KVSnapshot) SnapCacheSize() int {
return len(s.mu.cached) return len(s.mu.cached)
} }
func extractLockFromKeyErr(keyErr *kvrpcpb.KeyError) (*txnlock.Lock, error) { // SetVars sets variables to the transaction.
if locked := keyErr.GetLocked(); locked != nil { func (s *KVSnapshot) SetVars(vars *kv.Variables) {
return txnlock.NewLock(locked), nil s.vars = vars
}
return nil, extractKeyErr(keyErr)
} }
func extractKeyErr(keyErr *kvrpcpb.KeyError) error { func (s *KVSnapshot) recordBackoffInfo(bo *retry.Backoffer) {
if val, err := util.EvalFailpoint("mockRetryableErrorResp"); err == nil {
if val.(bool) {
keyErr.Conflict = nil
keyErr.Retryable = "mock retryable error"
}
}
if keyErr.Conflict != nil {
return &tikverr.ErrWriteConflict{WriteConflict: keyErr.GetConflict()}
}
if keyErr.Retryable != "" {
return &tikverr.ErrRetryable{Retryable: keyErr.Retryable}
}
if keyErr.Abort != "" {
err := errors.Errorf("tikv aborts txn: %s", keyErr.GetAbort())
logutil.BgLogger().Warn("2PC failed", zap.Error(err))
return errors.Trace(err)
}
if keyErr.CommitTsTooLarge != nil {
err := errors.Errorf("commit TS %v is too large", keyErr.CommitTsTooLarge.CommitTs)
logutil.BgLogger().Warn("2PC failed", zap.Error(err))
return errors.Trace(err)
}
if keyErr.TxnNotFound != nil {
err := errors.Errorf("txn %d not found", keyErr.TxnNotFound.StartTs)
return errors.Trace(err)
}
return errors.Errorf("unexpected KeyError: %s", keyErr.String())
}
func (s *KVSnapshot) recordBackoffInfo(bo *Backoffer) {
s.mu.RLock() s.mu.RLock()
if s.mu.stats == nil || bo.GetTotalSleep() == 0 { if s.mu.stats == nil || bo.GetTotalSleep() == 0 {
s.mu.RUnlock() s.mu.RUnlock()

View File

@ -0,0 +1,71 @@
// Copyright 2021 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package txnsnapshot
import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/tikv/client-go/v2/internal/locate"
"github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/tikvrpc"
)
// SnapshotProbe exposes some snapshot utilities for testing purpose.
type SnapshotProbe struct {
*KVSnapshot
}
// MergeRegionRequestStats merges RPC runtime stats into snapshot's stats.
func (s SnapshotProbe) MergeRegionRequestStats(stats map[tikvrpc.CmdType]*locate.RPCRuntimeStats) {
s.mergeRegionRequestStats(stats)
}
// RecordBackoffInfo records backoff stats into snapshot's stats.
func (s SnapshotProbe) RecordBackoffInfo(bo *retry.Backoffer) {
s.recordBackoffInfo(bo)
}
// MergeExecDetail merges exec stats into snapshot's stats.
func (s SnapshotProbe) MergeExecDetail(detail *kvrpcpb.ExecDetailsV2) {
s.mergeExecDetail(detail)
}
// FormatStats dumps information of stats.
func (s SnapshotProbe) FormatStats() string {
s.mu.Lock()
defer s.mu.Unlock()
return s.mu.stats.String()
}
// BatchGetSingleRegion gets a batch of keys from a region.
func (s SnapshotProbe) BatchGetSingleRegion(bo *retry.Backoffer, region locate.RegionVerID, keys [][]byte, collectF func(k, v []byte)) error {
return s.batchGetSingleRegion(bo, batchKeys{region: region, keys: keys}, collectF)
}
// NewScanner returns a scanner to iterate given key range.
func (s SnapshotProbe) NewScanner(start, end []byte, batchSize int, reverse bool) (*Scanner, error) {
return newScanner(s.KVSnapshot, start, end, batchSize, reverse)
}
// ConfigProbe exposes configurations and global variables for testing purpose.
type ConfigProbe struct{}
// GetScanBatchSize returns the batch size to scan ranges.
func (c ConfigProbe) GetScanBatchSize() int {
return defaultScanBatchSize
}
// GetGetMaxBackoff returns the max sleep for get command.
func (c ConfigProbe) GetGetMaxBackoff() int {
return getMaxBackoff
}

33
txnkv/txnutil/priority.go Normal file
View File

@ -0,0 +1,33 @@
// Copyright 2021 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package txnutil
import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
)
// Priority is the priority for tikv to execute a command.
type Priority kvrpcpb.CommandPri
// Priority value for transaction priority.
const (
PriorityNormal = Priority(kvrpcpb.CommandPri_Normal)
PriorityLow = Priority(kvrpcpb.CommandPri_Low)
PriorityHigh = Priority(kvrpcpb.CommandPri_High)
)
// ToPB converts priority to wire type.
func (p Priority) ToPB() kvrpcpb.CommandPri {
return kvrpcpb.CommandPri(p)
}