From 070dd85543e17bc1cba0bf9342343bbcfade5fd6 Mon Sep 17 00:00:00 2001 From: Shirly Date: Tue, 27 Jul 2021 12:10:35 +0800 Subject: [PATCH] txnkv/txnsnapshot: init package for txnsnapshot (#245) Signed-off-by: shirly --- error/error.go | 37 ++++++ integration_tests/async_commit_test.go | 2 +- integration_tests/lock_test.go | 2 +- tikv/2pc.go | 2 +- tikv/commit.go | 2 +- tikv/kv.go | 36 +++++- tikv/pessimistic.go | 2 +- tikv/prewrite.go | 2 +- tikv/test_probe.go | 52 ++------- tikv/txn.go | 14 ++- txnkv/txnlock/lock.go | 27 +++++ {tikv => txnkv/txnsnapshot}/client_helper.go | 12 +- {tikv => txnkv/txnsnapshot}/scan.go | 16 +-- {tikv => txnkv/txnsnapshot}/snapshot.go | 112 +++++++------------ txnkv/txnsnapshot/test_probe.go | 71 ++++++++++++ txnkv/txnutil/priority.go | 33 ++++++ 16 files changed, 281 insertions(+), 141 deletions(-) create mode 100644 txnkv/txnlock/lock.go rename {tikv => txnkv/txnsnapshot}/client_helper.go (85%) rename {tikv => txnkv/txnsnapshot}/scan.go (94%) rename {tikv => txnkv/txnsnapshot}/snapshot.go (88%) create mode 100644 txnkv/txnsnapshot/test_probe.go create mode 100644 txnkv/txnutil/priority.go diff --git a/error/error.go b/error/error.go index ccf837f0..289f1deb 100644 --- a/error/error.go +++ b/error/error.go @@ -39,6 +39,9 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/tikv/client-go/v2/internal/logutil" + "github.com/tikv/client-go/v2/util" + "go.uber.org/zap" ) var ( @@ -220,3 +223,37 @@ type ErrTokenLimit struct { func (e *ErrTokenLimit) Error() string { return fmt.Sprintf("Store token is up to the limit, store id = %d.", e.StoreID) } + +// ExtractKeyErr extracts a KeyError. +func ExtractKeyErr(keyErr *kvrpcpb.KeyError) error { + if val, err := util.EvalFailpoint("mockRetryableErrorResp"); err == nil { + if val.(bool) { + keyErr.Conflict = nil + keyErr.Retryable = "mock retryable error" + } + } + + if keyErr.Conflict != nil { + return &ErrWriteConflict{WriteConflict: keyErr.GetConflict()} + } + + if keyErr.Retryable != "" { + return &ErrRetryable{Retryable: keyErr.Retryable} + } + + if keyErr.Abort != "" { + err := errors.Errorf("tikv aborts txn: %s", keyErr.GetAbort()) + logutil.BgLogger().Warn("2PC failed", zap.Error(err)) + return errors.Trace(err) + } + if keyErr.CommitTsTooLarge != nil { + err := errors.Errorf("commit TS %v is too large", keyErr.CommitTsTooLarge.CommitTs) + logutil.BgLogger().Warn("2PC failed", zap.Error(err)) + return errors.Trace(err) + } + if keyErr.TxnNotFound != nil { + err := errors.Errorf("txn %d not found", keyErr.TxnNotFound.StartTs) + return errors.Trace(err) + } + return errors.Errorf("unexpected KeyError: %s", keyErr.String()) +} diff --git a/integration_tests/async_commit_test.go b/integration_tests/async_commit_test.go index 8ca4b095..d2509e5d 100644 --- a/integration_tests/async_commit_test.go +++ b/integration_tests/async_commit_test.go @@ -125,7 +125,7 @@ func (s *testAsyncCommitCommon) mustGetLock(key []byte) *txnkv.Lock { s.NotNil(resp.Resp) keyErr := resp.Resp.(*kvrpcpb.GetResponse).GetError() s.NotNil(keyErr) - lock, err := tikv.ExtractLockFromKeyErr(keyErr) + lock, err := txnlock.ExtractLockFromKeyErr(keyErr) s.Nil(err) return lock } diff --git a/integration_tests/lock_test.go b/integration_tests/lock_test.go index e2ad9c86..fa7dee5c 100644 --- a/integration_tests/lock_test.go +++ b/integration_tests/lock_test.go @@ -436,7 +436,7 @@ func (s *testLockSuite) mustGetLock(key []byte) *txnkv.Lock { s.NotNil(resp.Resp) keyErr := resp.Resp.(*kvrpcpb.GetResponse).GetError() s.NotNil(keyErr) - lock, err := tikv.ExtractLockFromKeyErr(keyErr) + lock, err := txnlock.ExtractLockFromKeyErr(keyErr) s.Nil(err) return lock } diff --git a/tikv/2pc.go b/tikv/2pc.go index fb830ac6..5df0c7a3 100644 --- a/tikv/2pc.go +++ b/tikv/2pc.go @@ -905,7 +905,7 @@ func sendTxnHeartBeat(bo *Backoffer, store *KVStore, primary []byte, startTS, tt } cmdResp := resp.Resp.(*kvrpcpb.TxnHeartBeatResponse) if keyErr := cmdResp.GetError(); keyErr != nil { - return 0, true, errors.Errorf("txn %d heartbeat fail, primary key = %v, err = %s", startTS, hex.EncodeToString(primary), extractKeyErr(keyErr)) + return 0, true, errors.Errorf("txn %d heartbeat fail, primary key = %v, err = %s", startTS, hex.EncodeToString(primary), tikverr.ExtractKeyErr(keyErr)) } return cmdResp.GetLockTtl(), false, nil } diff --git a/tikv/commit.go b/tikv/commit.go index 38d12931..46d9e1f8 100644 --- a/tikv/commit.go +++ b/tikv/commit.go @@ -163,7 +163,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch c.mu.RLock() defer c.mu.RUnlock() - err = extractKeyErr(keyErr) + err = tikverr.ExtractKeyErr(keyErr) if c.mu.committed { // No secondary key could be rolled back after it's primary key is committed. // There must be a serious bug somewhere. diff --git a/tikv/kv.go b/tikv/kv.go index 17208c0a..d04fd01d 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -61,6 +61,8 @@ import ( "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv/rangetask" "github.com/tikv/client-go/v2/txnkv/txnlock" + "github.com/tikv/client-go/v2/txnkv/txnsnapshot" + "github.com/tikv/client-go/v2/txnkv/txnutil" "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" "go.etcd.io/etcd/clientv3" @@ -299,8 +301,8 @@ func (s *KVStore) DeleteRange(ctx context.Context, startKey []byte, endKey []byt // GetSnapshot gets a snapshot that is able to read any data which data is <= ver. // if ts is MaxVersion or > current max committed version, we will use current version for this snapshot. -func (s *KVStore) GetSnapshot(ts uint64) *KVSnapshot { - snapshot := newTiKVSnapshot(s, ts, s.nextReplicaReadSeed()) +func (s *KVStore) GetSnapshot(ts uint64) *txnsnapshot.KVSnapshot { + snapshot := txnsnapshot.NewTiKVSnapshot(s, ts, s.nextReplicaReadSeed()) return snapshot } @@ -572,6 +574,36 @@ func NewLockResolver(etcdAddrs []string, security config.Security, opts ...pd.Cl return s.lockResolver, nil } +// TODO: remove it when tidb/br is ready. + +// Scanner support tikv scan +type Scanner = txnsnapshot.Scanner + +// KVSnapshot implements the tidbkv.Snapshot interface. +type KVSnapshot = txnsnapshot.KVSnapshot + +// SnapshotRuntimeStats records the runtime stats of snapshot. +type SnapshotRuntimeStats = txnsnapshot.SnapshotRuntimeStats + +// IsoLevel is the transaction's isolation level. +type IsoLevel = txnsnapshot.IsoLevel + +// IsoLevel value for transaction priority. +const ( + SI = txnsnapshot.SI + RC = txnsnapshot.RC +) + +// Priority is the priority for tikv to execute a command. +type Priority = txnutil.Priority + +// Priority value for transaction priority. +const ( + PriorityHigh = txnutil.PriorityHigh + PriorityNormal = txnutil.PriorityNormal + PriorityLow = txnutil.PriorityLow +) + // TODO: remove Lock&LockResolver&TxnStatus once tidb and br are ready. // Lock represents a lock from tikv server. diff --git a/tikv/pessimistic.go b/tikv/pessimistic.go index 12954157..3cb7b678 100644 --- a/tikv/pessimistic.go +++ b/tikv/pessimistic.go @@ -191,7 +191,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * } // Extract lock from key error - lock, err1 := extractLockFromKeyErr(keyErr) + lock, err1 := txnlock.ExtractLockFromKeyErr(keyErr) if err1 != nil { return errors.Trace(err1) } diff --git a/tikv/prewrite.go b/tikv/prewrite.go index dfd1eaee..2d412bc0 100644 --- a/tikv/prewrite.go +++ b/tikv/prewrite.go @@ -304,7 +304,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff } // Extract lock from key error - lock, err1 := extractLockFromKeyErr(keyErr) + lock, err1 := txnlock.ExtractLockFromKeyErr(keyErr) if err1 != nil { return errors.Trace(err1) } diff --git a/tikv/test_probe.go b/tikv/test_probe.go index 583734d3..e39964ff 100644 --- a/tikv/test_probe.go +++ b/tikv/test_probe.go @@ -39,13 +39,13 @@ import ( "time" "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/client-go/v2/internal/locate" "github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/internal/unionstore" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv/txnlock" + "github.com/tikv/client-go/v2/txnkv/txnsnapshot" pd "github.com/tikv/pd/client" ) @@ -72,9 +72,9 @@ func (s StoreProbe) Begin() (TxnProbe, error) { } // GetSnapshot returns a snapshot. -func (s StoreProbe) GetSnapshot(ts uint64) SnapshotProbe { +func (s StoreProbe) GetSnapshot(ts uint64) txnsnapshot.SnapshotProbe { snap := s.KVStore.GetSnapshot(ts) - return SnapshotProbe{KVSnapshot: snap} + return txnsnapshot.SnapshotProbe{KVSnapshot: snap} } // SetRegionCachePDClient replaces pd client inside region cache. @@ -165,13 +165,15 @@ func (txn TxnProbe) CollectLockedKeys() [][]byte { // BatchGetSingleRegion gets a batch of keys from a region. func (txn TxnProbe) BatchGetSingleRegion(bo *Backoffer, region locate.RegionVerID, keys [][]byte, collect func([]byte, []byte)) error { - snapshot := txn.GetSnapshot() - return snapshot.batchGetSingleRegion(bo, batchKeys{region: region, keys: keys}, collect) + snapshot := txnsnapshot.SnapshotProbe{KVSnapshot: txn.GetSnapshot()} + + return snapshot.BatchGetSingleRegion(bo, region, keys, collect) } // NewScanner returns a scanner to iterate given key range. -func (txn TxnProbe) NewScanner(start, end []byte, batchSize int, reverse bool) (*Scanner, error) { - return newScanner(txn.GetSnapshot(), start, end, batchSize, reverse) +func (txn TxnProbe) NewScanner(start, end []byte, batchSize int, reverse bool) (*txnsnapshot.Scanner, error) { + snapshot := txnsnapshot.SnapshotProbe{KVSnapshot: txn.GetSnapshot()} + return snapshot.NewScanner(start, end, batchSize, reverse) } // GetStartTime returns the time when txn starts. @@ -407,33 +409,6 @@ func (c CommitterProbe) CleanupMutations(ctx context.Context) error { return c.cleanupMutations(bo, c.mutations) } -// SnapshotProbe exposes some snapshot utilities for testing purpose. -type SnapshotProbe struct { - *KVSnapshot -} - -// MergeRegionRequestStats merges RPC runtime stats into snapshot's stats. -func (s SnapshotProbe) MergeRegionRequestStats(stats map[tikvrpc.CmdType]*locate.RPCRuntimeStats) { - s.mergeRegionRequestStats(stats) -} - -// RecordBackoffInfo records backoff stats into snapshot's stats. -func (s SnapshotProbe) RecordBackoffInfo(bo *Backoffer) { - s.recordBackoffInfo(bo) -} - -// MergeExecDetail merges exec stats into snapshot's stats. -func (s SnapshotProbe) MergeExecDetail(detail *kvrpcpb.ExecDetailsV2) { - s.mergeExecDetail(detail) -} - -// FormatStats dumps information of stats. -func (s SnapshotProbe) FormatStats() string { - s.mu.Lock() - defer s.mu.Unlock() - return s.mu.stats.String() -} - // LockResolverProbe wraps a LockResolver and exposes internal stats for testing purpose. type LockResolverProbe struct { *txnlock.LockResolverProbe @@ -457,11 +432,6 @@ func (l LockResolverProbe) ResolvePessimisticLock(ctx context.Context, lock *txn return l.LockResolverProbe.ResolvePessimisticLock(bo, lock) } -// ExtractLockFromKeyErr makes a Lock based on a key error. -func ExtractLockFromKeyErr(err *kvrpcpb.KeyError) (*txnlock.Lock, error) { - return extractLockFromKeyErr(err) -} - // ConfigProbe exposes configurations and global variables for testing purpose. type ConfigProbe struct{} @@ -479,7 +449,7 @@ func (c ConfigProbe) GetBigTxnThreshold() int { // GetScanBatchSize returns the batch size to scan ranges. func (c ConfigProbe) GetScanBatchSize() int { - return defaultScanBatchSize + return txnsnapshot.ConfigProbe{}.GetScanBatchSize() } // GetDefaultLockTTL returns the default lock TTL. @@ -494,7 +464,7 @@ func (c ConfigProbe) GetTTLFactor() int { // GetGetMaxBackoff returns the max sleep for get command. func (c ConfigProbe) GetGetMaxBackoff() int { - return getMaxBackoff + return txnsnapshot.ConfigProbe{}.GetGetMaxBackoff() } // LoadPreSplitDetectThreshold returns presplit detect threshold config. diff --git a/tikv/txn.go b/tikv/txn.go index 7677bc4b..6fd03845 100644 --- a/tikv/txn.go +++ b/tikv/txn.go @@ -57,6 +57,8 @@ import ( tikv "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/oracle" + "github.com/tikv/client-go/v2/txnkv/txnsnapshot" + "github.com/tikv/client-go/v2/txnkv/txnutil" "github.com/tikv/client-go/v2/util" "go.uber.org/zap" ) @@ -99,7 +101,7 @@ func (to StartTSOption) SetTxnScope(txnScope string) StartTSOption { // KVTxn contains methods to interact with a TiKV transaction. type KVTxn struct { - snapshot *KVSnapshot + snapshot *txnsnapshot.KVSnapshot us *unionstore.KVUnionStore store *KVStore // for connection to region. startTS uint64 @@ -123,7 +125,7 @@ type KVTxn struct { binlog BinlogExecutor schemaLeaseChecker SchemaLeaseChecker syncLog bool - priority Priority + priority txnutil.Priority isPessimistic bool enableAsyncCommit bool enable1PC bool @@ -150,7 +152,7 @@ func newTiKVTxnWithOptions(store *KVStore, options StartTSOption) (*KVTxn, error if err != nil { return nil, errors.Trace(err) } - snapshot := newTiKVSnapshot(store, startTS, store.nextReplicaReadSeed()) + snapshot := txnsnapshot.NewTiKVSnapshot(store, startTS, store.nextReplicaReadSeed()) cfg := config.GetGlobalConfig() newTiKVTxn := &KVTxn{ snapshot: snapshot, @@ -173,7 +175,7 @@ var SetSuccess = false // SetVars sets variables to the transaction. func (txn *KVTxn) SetVars(vars *tikv.Variables) { txn.vars = vars - txn.snapshot.vars = vars + txn.snapshot.SetVars(vars) if val, err := util.EvalFailpoint("probeSetVars"); err == nil { if val.(bool) { SetSuccess = true @@ -257,7 +259,7 @@ func (txn *KVTxn) SetSchemaVer(schemaVer SchemaVer) { } // SetPriority sets the priority for both write and read. -func (txn *KVTxn) SetPriority(pri Priority) { +func (txn *KVTxn) SetPriority(pri txnutil.Priority) { txn.priority = pri txn.GetSnapshot().SetPriority(pri) } @@ -790,7 +792,7 @@ func (txn *KVTxn) GetMemBuffer() *MemDB { } // GetSnapshot returns the Snapshot binding to this transaction. -func (txn *KVTxn) GetSnapshot() *KVSnapshot { +func (txn *KVTxn) GetSnapshot() *txnsnapshot.KVSnapshot { return txn.snapshot } diff --git a/txnkv/txnlock/lock.go b/txnkv/txnlock/lock.go new file mode 100644 index 00000000..bff97396 --- /dev/null +++ b/txnkv/txnlock/lock.go @@ -0,0 +1,27 @@ +// Copyright 2021 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package txnlock + +import ( + "github.com/pingcap/kvproto/pkg/kvrpcpb" + tikverr "github.com/tikv/client-go/v2/error" +) + +// ExtractLockFromKeyErr extracts the KeyError. +func ExtractLockFromKeyErr(keyErr *kvrpcpb.KeyError) (*Lock, error) { + if locked := keyErr.GetLocked(); locked != nil { + return NewLock(locked), nil + } + return nil, tikverr.ExtractKeyErr(keyErr) +} diff --git a/tikv/client_helper.go b/txnkv/txnsnapshot/client_helper.go similarity index 85% rename from tikv/client_helper.go rename to txnkv/txnsnapshot/client_helper.go index 95ea8189..828a312d 100644 --- a/tikv/client_helper.go +++ b/txnkv/txnsnapshot/client_helper.go @@ -30,12 +30,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tikv +package txnsnapshot import ( "time" + "github.com/tikv/client-go/v2/internal/client" "github.com/tikv/client-go/v2/internal/locate" + "github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv/txnlock" "github.com/tikv/client-go/v2/util" @@ -52,13 +54,13 @@ type ClientHelper struct { lockResolver *txnlock.LockResolver regionCache *locate.RegionCache resolvedLocks *util.TSSet - client Client + client client.Client resolveLite bool locate.RegionRequestRuntimeStats } // NewClientHelper creates a helper instance. -func NewClientHelper(store *KVStore, resolvedLocks *util.TSSet, resolveLite bool) *ClientHelper { +func NewClientHelper(store kvstore, resolvedLocks *util.TSSet, resolveLite bool) *ClientHelper { return &ClientHelper{ lockResolver: store.GetLockResolver(), regionCache: store.GetRegionCache(), @@ -69,7 +71,7 @@ func NewClientHelper(store *KVStore, resolvedLocks *util.TSSet, resolveLite bool } // ResolveLocks wraps the ResolveLocks function and store the resolved result. -func (ch *ClientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks []*txnlock.Lock) (int64, error) { +func (ch *ClientHelper) ResolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*txnlock.Lock) (int64, error) { var err error var resolvedLocks []uint64 var msBeforeTxnExpired int64 @@ -94,7 +96,7 @@ func (ch *ClientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks } // SendReqCtx wraps the SendReqCtx function and use the resolved lock result in the kvrpcpb.Context. -func (ch *ClientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID locate.RegionVerID, timeout time.Duration, et tikvrpc.EndpointType, directStoreAddr string, opts ...locate.StoreSelectorOption) (*tikvrpc.Response, *locate.RPCContext, string, error) { +func (ch *ClientHelper) SendReqCtx(bo *retry.Backoffer, req *tikvrpc.Request, regionID locate.RegionVerID, timeout time.Duration, et tikvrpc.EndpointType, directStoreAddr string, opts ...locate.StoreSelectorOption) (*tikvrpc.Response, *locate.RPCContext, string, error) { sender := locate.NewRegionRequestSender(ch.regionCache, ch.client) if len(directStoreAddr) > 0 { sender.SetStoreAddr(directStoreAddr) diff --git a/tikv/scan.go b/txnkv/txnsnapshot/scan.go similarity index 94% rename from tikv/scan.go rename to txnkv/txnsnapshot/scan.go index 9df808f1..bd0169c0 100644 --- a/tikv/scan.go +++ b/txnkv/txnsnapshot/scan.go @@ -30,7 +30,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tikv +package txnsnapshot import ( "bytes" @@ -169,7 +169,7 @@ func (s *Scanner) startTS() uint64 { return s.snapshot.version } -func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *kvrpcpb.KvPair) error { +func (s *Scanner) resolveCurrentLock(bo *retry.Backoffer, current *kvrpcpb.KvPair) error { ctx := context.Background() val, err := s.snapshot.get(ctx, bo, current.Key) if err != nil { @@ -180,21 +180,21 @@ func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *kvrpcpb.KvPair) err return nil } -func (s *Scanner) getData(bo *Backoffer) error { +func (s *Scanner) getData(bo *retry.Backoffer) error { logutil.BgLogger().Debug("txn getData", zap.String("nextStartKey", kv.StrKey(s.nextStartKey)), zap.String("nextEndKey", kv.StrKey(s.nextEndKey)), zap.Bool("reverse", s.reverse), zap.Uint64("txnStartTS", s.startTS())) - sender := locate.NewRegionRequestSender(s.snapshot.store.regionCache, s.snapshot.store.GetTiKVClient()) + sender := locate.NewRegionRequestSender(s.snapshot.store.GetRegionCache(), s.snapshot.store.GetTiKVClient()) var reqEndKey, reqStartKey []byte var loc *locate.KeyLocation var err error for { if !s.reverse { - loc, err = s.snapshot.store.regionCache.LocateKey(bo, s.nextStartKey) + loc, err = s.snapshot.store.GetRegionCache().LocateKey(bo, s.nextStartKey) } else { - loc, err = s.snapshot.store.regionCache.LocateEndKey(bo, s.nextEndKey) + loc, err = s.snapshot.store.GetRegionCache().LocateEndKey(bo, s.nextEndKey) } if err != nil { return errors.Trace(err) @@ -274,7 +274,7 @@ func (s *Scanner) getData(bo *Backoffer) error { // When there is a response-level key error, the returned pairs are incomplete. // We should resolve the lock first and then retry the same request. if keyErr := cmdScanResp.GetError(); keyErr != nil { - lock, err := extractLockFromKeyErr(keyErr) + lock, err := txnlock.ExtractLockFromKeyErr(keyErr) if err != nil { return errors.Trace(err) } @@ -295,7 +295,7 @@ func (s *Scanner) getData(bo *Backoffer) error { // Check if kvPair contains error, it should be a Lock. for _, pair := range kvPairs { if keyErr := pair.GetError(); keyErr != nil && len(pair.Key) == 0 { - lock, err := extractLockFromKeyErr(keyErr) + lock, err := txnlock.ExtractLockFromKeyErr(keyErr) if err != nil { return errors.Trace(err) } diff --git a/tikv/snapshot.go b/txnkv/txnsnapshot/snapshot.go similarity index 88% rename from tikv/snapshot.go rename to txnkv/txnsnapshot/snapshot.go index 7ba075f0..595f0695 100644 --- a/tikv/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -30,7 +30,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tikv +package txnsnapshot import ( "bytes" @@ -51,10 +51,13 @@ import ( "github.com/tikv/client-go/v2/internal/locate" "github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/internal/retry" + "github.com/tikv/client-go/v2/internal/unionstore" "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/metrics" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv/txnlock" + "github.com/tikv/client-go/v2/txnkv/txnutil" "github.com/tikv/client-go/v2/util" "go.uber.org/zap" ) @@ -65,21 +68,6 @@ const ( maxTimestamp = math.MaxUint64 ) -// Priority is the priority for tikv to execute a command. -type Priority kvrpcpb.CommandPri - -// Priority value for transaction priority. -const ( - PriorityNormal = Priority(kvrpcpb.CommandPri_Normal) - PriorityLow = Priority(kvrpcpb.CommandPri_Low) - PriorityHigh = Priority(kvrpcpb.CommandPri_High) -) - -// ToPB converts priority to wire type. -func (p Priority) ToPB() kvrpcpb.CommandPri { - return kvrpcpb.CommandPri(p) -} - // IsoLevel is the transaction's isolation level. type IsoLevel kvrpcpb.IsolationLevel @@ -95,12 +83,25 @@ func (l IsoLevel) ToPB() kvrpcpb.IsolationLevel { return kvrpcpb.IsolationLevel(l) } +type kvstore interface { + CheckVisibility(startTime uint64) error + // GetRegionCache gets the RegionCache. + GetRegionCache() *locate.RegionCache + GetLockResolver() *txnlock.LockResolver + GetTiKVClient() (client client.Client) + + // SendReq sends a request to TiKV. + SendReq(bo *retry.Backoffer, req *tikvrpc.Request, regionID locate.RegionVerID, timeout time.Duration) (*tikvrpc.Response, error) + // GetOracle gets a timestamp oracle client. + GetOracle() oracle.Oracle +} + // KVSnapshot implements the tidbkv.Snapshot interface. type KVSnapshot struct { - store *KVStore + store kvstore version uint64 isolationLevel IsoLevel - priority Priority + priority txnutil.Priority notFillCache bool keyOnly bool vars *kv.Variables @@ -133,8 +134,8 @@ type KVSnapshot struct { resourceGroupTag []byte } -// newTiKVSnapshot creates a snapshot of an TiKV store. -func newTiKVSnapshot(store *KVStore, ts uint64, replicaReadSeed uint32) *KVSnapshot { +// NewTiKVSnapshot creates a snapshot of an TiKV store. +func NewTiKVSnapshot(store kvstore, ts uint64, replicaReadSeed uint32) *KVSnapshot { // Sanity check for snapshot version. if ts >= math.MaxInt64 && ts != math.MaxUint64 { err := errors.Errorf("try to get snapshot with a large ts %d", ts) @@ -144,7 +145,7 @@ func newTiKVSnapshot(store *KVStore, ts uint64, replicaReadSeed uint32) *KVSnaps store: store, version: ts, scanBatchSize: defaultScanBatchSize, - priority: PriorityNormal, + priority: txnutil.PriorityNormal, vars: kv.DefaultVars, replicaReadSeed: replicaReadSeed, } @@ -253,7 +254,7 @@ type batchKeys struct { keys [][]byte } -func (b *batchKeys) relocate(bo *Backoffer, c *RegionCache) (bool, error) { +func (b *batchKeys) relocate(bo *retry.Backoffer, c *locate.RegionCache) (bool, error) { loc, err := c.LocateKey(bo, b.keys[0]) if err != nil { return false, errors.Trace(err) @@ -285,11 +286,11 @@ func appendBatchKeysBySize(b []batchKeys, region locate.RegionVerID, keys [][]by return b } -func (s *KVSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, collectF func(k, v []byte)) error { +func (s *KVSnapshot) batchGetKeysByRegions(bo *retry.Backoffer, keys [][]byte, collectF func(k, v []byte)) error { defer func(start time.Time) { metrics.TxnCmdHistogramWithBatchGet.Observe(time.Since(start).Seconds()) }(time.Now()) - groups, _, err := s.store.regionCache.GroupKeysByRegion(bo, keys, nil) + groups, _, err := s.store.GetRegionCache().GroupKeysByRegion(bo, keys, nil) if err != nil { return errors.Trace(err) } @@ -327,7 +328,7 @@ func (s *KVSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, collect return errors.Trace(err) } -func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collectF func(k, v []byte)) error { +func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, collectF func(k, v []byte)) error { cli := NewClientHelper(s.store, &s.resolvedLocks, false) s.mu.RLock() if s.mu.stats != nil { @@ -358,7 +359,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collec if isStaleness { req.EnableStaleRead() } - ops := make([]StoreSelectorOption, 0, 2) + ops := make([]locate.StoreSelectorOption, 0, 2) if len(matchStoreLabels) > 0 { ops = append(ops, locate.WithMatchLabels(matchStoreLabels)) } @@ -400,7 +401,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collec ) if keyErr := batchGetResp.GetError(); keyErr != nil { // If a response-level error happens, skip reading pairs. - lock, err := extractLockFromKeyErr(keyErr) + lock, err := txnlock.ExtractLockFromKeyErr(keyErr) if err != nil { return errors.Trace(err) } @@ -413,7 +414,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collec collectF(pair.GetKey(), pair.GetValue()) continue } - lock, err := extractLockFromKeyErr(keyErr) + lock, err := txnlock.ExtractLockFromKeyErr(keyErr) if err != nil { return errors.Trace(err) } @@ -475,7 +476,7 @@ func (s *KVSnapshot) Get(ctx context.Context, k []byte) ([]byte, error) { return val, nil } -func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte, error) { +func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]byte, error) { // Check the cached values first. s.mu.RLock() if s.mu.cached != nil { @@ -532,7 +533,7 @@ func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte, var firstLock *txnlock.Lock for { util.EvalFailpoint("beforeSendPointGet") - loc, err := s.store.regionCache.LocateKey(bo, k) + loc, err := s.store.GetRegionCache().LocateKey(bo, k) if err != nil { return nil, errors.Trace(err) } @@ -568,7 +569,7 @@ func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte, } val := cmdGetResp.GetValue() if keyErr := cmdGetResp.GetError(); keyErr != nil { - lock, err := extractLockFromKeyErr(keyErr) + lock, err := txnlock.ExtractLockFromKeyErr(keyErr) if err != nil { return nil, errors.Trace(err) } @@ -615,13 +616,13 @@ func (s *KVSnapshot) mergeExecDetail(detail *kvrpcpb.ExecDetailsV2) { } // Iter return a list of key-value pair after `k`. -func (s *KVSnapshot) Iter(k []byte, upperBound []byte) (Iterator, error) { +func (s *KVSnapshot) Iter(k []byte, upperBound []byte) (unionstore.Iterator, error) { scanner, err := newScanner(s, k, upperBound, s.scanBatchSize, false) return scanner, errors.Trace(err) } // IterReverse creates a reversed Iterator positioned on the first entry which key is less than k. -func (s *KVSnapshot) IterReverse(k []byte) (Iterator, error) { +func (s *KVSnapshot) IterReverse(k []byte) (unionstore.Iterator, error) { scanner, err := newScanner(s, nil, k, s.scanBatchSize, true) return scanner, errors.Trace(err) } @@ -660,7 +661,7 @@ func (s *KVSnapshot) SetSampleStep(step uint32) { } // SetPriority sets the priority for tikv to execute commands. -func (s *KVSnapshot) SetPriority(pri Priority) { +func (s *KVSnapshot) SetPriority(pri txnutil.Priority) { s.priority = pri } @@ -718,47 +719,12 @@ func (s *KVSnapshot) SnapCacheSize() int { return len(s.mu.cached) } -func extractLockFromKeyErr(keyErr *kvrpcpb.KeyError) (*txnlock.Lock, error) { - if locked := keyErr.GetLocked(); locked != nil { - return txnlock.NewLock(locked), nil - } - return nil, extractKeyErr(keyErr) +// SetVars sets variables to the transaction. +func (s *KVSnapshot) SetVars(vars *kv.Variables) { + s.vars = vars } -func extractKeyErr(keyErr *kvrpcpb.KeyError) error { - if val, err := util.EvalFailpoint("mockRetryableErrorResp"); err == nil { - if val.(bool) { - keyErr.Conflict = nil - keyErr.Retryable = "mock retryable error" - } - } - - if keyErr.Conflict != nil { - return &tikverr.ErrWriteConflict{WriteConflict: keyErr.GetConflict()} - } - - if keyErr.Retryable != "" { - return &tikverr.ErrRetryable{Retryable: keyErr.Retryable} - } - - if keyErr.Abort != "" { - err := errors.Errorf("tikv aborts txn: %s", keyErr.GetAbort()) - logutil.BgLogger().Warn("2PC failed", zap.Error(err)) - return errors.Trace(err) - } - if keyErr.CommitTsTooLarge != nil { - err := errors.Errorf("commit TS %v is too large", keyErr.CommitTsTooLarge.CommitTs) - logutil.BgLogger().Warn("2PC failed", zap.Error(err)) - return errors.Trace(err) - } - if keyErr.TxnNotFound != nil { - err := errors.Errorf("txn %d not found", keyErr.TxnNotFound.StartTs) - return errors.Trace(err) - } - return errors.Errorf("unexpected KeyError: %s", keyErr.String()) -} - -func (s *KVSnapshot) recordBackoffInfo(bo *Backoffer) { +func (s *KVSnapshot) recordBackoffInfo(bo *retry.Backoffer) { s.mu.RLock() if s.mu.stats == nil || bo.GetTotalSleep() == 0 { s.mu.RUnlock() diff --git a/txnkv/txnsnapshot/test_probe.go b/txnkv/txnsnapshot/test_probe.go new file mode 100644 index 00000000..1eb4557b --- /dev/null +++ b/txnkv/txnsnapshot/test_probe.go @@ -0,0 +1,71 @@ +// Copyright 2021 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package txnsnapshot + +import ( + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/tikv/client-go/v2/internal/locate" + "github.com/tikv/client-go/v2/internal/retry" + "github.com/tikv/client-go/v2/tikvrpc" +) + +// SnapshotProbe exposes some snapshot utilities for testing purpose. +type SnapshotProbe struct { + *KVSnapshot +} + +// MergeRegionRequestStats merges RPC runtime stats into snapshot's stats. +func (s SnapshotProbe) MergeRegionRequestStats(stats map[tikvrpc.CmdType]*locate.RPCRuntimeStats) { + s.mergeRegionRequestStats(stats) +} + +// RecordBackoffInfo records backoff stats into snapshot's stats. +func (s SnapshotProbe) RecordBackoffInfo(bo *retry.Backoffer) { + s.recordBackoffInfo(bo) +} + +// MergeExecDetail merges exec stats into snapshot's stats. +func (s SnapshotProbe) MergeExecDetail(detail *kvrpcpb.ExecDetailsV2) { + s.mergeExecDetail(detail) +} + +// FormatStats dumps information of stats. +func (s SnapshotProbe) FormatStats() string { + s.mu.Lock() + defer s.mu.Unlock() + return s.mu.stats.String() +} + +// BatchGetSingleRegion gets a batch of keys from a region. +func (s SnapshotProbe) BatchGetSingleRegion(bo *retry.Backoffer, region locate.RegionVerID, keys [][]byte, collectF func(k, v []byte)) error { + return s.batchGetSingleRegion(bo, batchKeys{region: region, keys: keys}, collectF) +} + +// NewScanner returns a scanner to iterate given key range. +func (s SnapshotProbe) NewScanner(start, end []byte, batchSize int, reverse bool) (*Scanner, error) { + return newScanner(s.KVSnapshot, start, end, batchSize, reverse) +} + +// ConfigProbe exposes configurations and global variables for testing purpose. +type ConfigProbe struct{} + +// GetScanBatchSize returns the batch size to scan ranges. +func (c ConfigProbe) GetScanBatchSize() int { + return defaultScanBatchSize +} + +// GetGetMaxBackoff returns the max sleep for get command. +func (c ConfigProbe) GetGetMaxBackoff() int { + return getMaxBackoff +} diff --git a/txnkv/txnutil/priority.go b/txnkv/txnutil/priority.go new file mode 100644 index 00000000..6a4a4f25 --- /dev/null +++ b/txnkv/txnutil/priority.go @@ -0,0 +1,33 @@ +// Copyright 2021 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package txnutil + +import ( + "github.com/pingcap/kvproto/pkg/kvrpcpb" +) + +// Priority is the priority for tikv to execute a command. +type Priority kvrpcpb.CommandPri + +// Priority value for transaction priority. +const ( + PriorityNormal = Priority(kvrpcpb.CommandPri_Normal) + PriorityLow = Priority(kvrpcpb.CommandPri_Low) + PriorityHigh = Priority(kvrpcpb.CommandPri_High) +) + +// ToPB converts priority to wire type. +func (p Priority) ToPB() kvrpcpb.CommandPri { + return kvrpcpb.CommandPri(p) +}