txnkv/transaction: init package (#260)

Signed-off-by: shirly <AndreMouche@126.com>
Co-authored-by: disksing <i@disksing.com>
This commit is contained in:
disksing 2021-07-31 16:26:22 +08:00 committed by GitHub
parent 21ea47fe19
commit d535b62b62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 725 additions and 537 deletions

View File

@ -21,6 +21,7 @@ import (
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv"
)
// KV represents a Key-Value pair.
@ -46,7 +47,7 @@ func initStore() {
}
}
func begin_pessimistic_txn() (txn *tikv.KVTxn) {
func begin_pessimistic_txn() (txn *txnkv.KVTxn) {
txn, err := client.Begin()
if err != nil {
panic(err)

View File

@ -60,6 +60,7 @@ import (
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv"
"github.com/tikv/client-go/v2/txnkv/transaction"
"github.com/tikv/client-go/v2/txnkv/txnlock"
)
@ -79,14 +80,14 @@ type testCommitterSuite struct {
}
func (s *testCommitterSuite) SetupSuite() {
atomic.StoreUint64(&tikv.ManagedLockTTL, 3000) // 3s
atomic.StoreUint64(&tikv.CommitMaxBackoff, 1000)
atomic.StoreUint64(&tikv.VeryLongMaxBackoff, 1000)
atomic.StoreUint64(&transaction.ManagedLockTTL, 3000) // 3s
atomic.StoreUint64(&transaction.CommitMaxBackoff, 1000)
atomic.StoreUint64(&transaction.VeryLongMaxBackoff, 1000)
}
func (s *testCommitterSuite) TearDownSuite() {
atomic.StoreUint64(&tikv.CommitMaxBackoff, 20000)
atomic.StoreUint64(&tikv.VeryLongMaxBackoff, 600000)
atomic.StoreUint64(&transaction.CommitMaxBackoff, 20000)
atomic.StoreUint64(&transaction.VeryLongMaxBackoff, 600000)
}
func (s *testCommitterSuite) SetupTest() {
@ -107,13 +108,13 @@ func (s *testCommitterSuite) TearDownTest() {
s.store.Close()
}
func (s *testCommitterSuite) begin() tikv.TxnProbe {
func (s *testCommitterSuite) begin() transaction.TxnProbe {
txn, err := s.store.Begin()
s.Require().Nil(err)
return txn
}
func (s *testCommitterSuite) beginAsyncCommit() tikv.TxnProbe {
func (s *testCommitterSuite) beginAsyncCommit() transaction.TxnProbe {
txn, err := s.store.Begin()
s.Require().Nil(err)
txn.SetEnableAsyncCommit(true)
@ -703,7 +704,7 @@ func (s *testCommitterSuite) TestPessimisticTTL() {
expire := oracle.ExtractPhysical(txn.StartTS()) + int64(lockInfoNew.LockTtl)
now := oracle.ExtractPhysical(currentTS)
s.True(expire > now)
s.True(uint64(expire-now) <= atomic.LoadUint64(&tikv.ManagedLockTTL))
s.True(uint64(expire-now) <= atomic.LoadUint64(&transaction.ManagedLockTTL))
return true
}
return false
@ -742,8 +743,8 @@ func (s *testCommitterSuite) TestElapsedTTL() {
err := txn.LockKeys(context.Background(), lockCtx, key)
s.Nil(err)
lockInfo := s.getLockInfo(key)
s.GreaterOrEqual(lockInfo.LockTtl-atomic.LoadUint64(&tikv.ManagedLockTTL), uint64(100))
s.Less(lockInfo.LockTtl-atomic.LoadUint64(&tikv.ManagedLockTTL), uint64(150))
s.GreaterOrEqual(lockInfo.LockTtl-atomic.LoadUint64(&transaction.ManagedLockTTL), uint64(100))
s.Less(lockInfo.LockTtl-atomic.LoadUint64(&transaction.ManagedLockTTL), uint64(150))
}
func (s *testCommitterSuite) TestDeleteYourWriteCauseGhostPrimary() {
@ -859,8 +860,8 @@ func (s *testCommitterSuite) TestDeleteAllYourWritesWithSFU() {
// TestAcquireFalseTimeoutLock tests acquiring a key which is a secondary key of another transaction.
// The lock's own TTL is expired but the primary key is still alive due to heartbeats.
func (s *testCommitterSuite) TestAcquireFalseTimeoutLock() {
atomic.StoreUint64(&tikv.ManagedLockTTL, 1000) // 1s
defer atomic.StoreUint64(&tikv.ManagedLockTTL, 3000) // restore default test value
atomic.StoreUint64(&transaction.ManagedLockTTL, 1000) // 1s
defer atomic.StoreUint64(&transaction.ManagedLockTTL, 3000) // restore default test value
// k1 is the primary lock of txn1
k1 := []byte("k1")
@ -881,7 +882,7 @@ func (s *testCommitterSuite) TestAcquireFalseTimeoutLock() {
// Heartbeats will increase the TTL of the primary key
// wait until secondary key exceeds its own TTL
time.Sleep(time.Duration(atomic.LoadUint64(&tikv.ManagedLockTTL)) * time.Millisecond)
time.Sleep(time.Duration(atomic.LoadUint64(&transaction.ManagedLockTTL)) * time.Millisecond)
txn2 := s.begin()
txn2.SetPessimistic(true)
@ -919,8 +920,8 @@ func (s *testCommitterSuite) getLockInfo(key []byte) *kvrpcpb.LockInfo {
}
func (s *testCommitterSuite) TestPkNotFound() {
atomic.StoreUint64(&tikv.ManagedLockTTL, 100) // 100ms
defer atomic.StoreUint64(&tikv.ManagedLockTTL, 3000) // restore default value
atomic.StoreUint64(&transaction.ManagedLockTTL, 100) // 100ms
defer atomic.StoreUint64(&transaction.ManagedLockTTL, 3000) // restore default value
ctx := context.Background()
// k1 is the primary lock of txn1.
k1 := []byte("k1")
@ -976,7 +977,7 @@ func (s *testCommitterSuite) TestPkNotFound() {
Key: k3,
Primary: k1,
TxnID: txn1.StartTS(),
TTL: tikv.ManagedLockTTL,
TTL: transaction.ManagedLockTTL,
TxnSize: txnCommitBatchSize,
LockType: kvrpcpb.Op_PessimisticLock,
LockForUpdateTS: txn1.StartTS() - 1,
@ -1154,8 +1155,8 @@ func (s *testCommitterSuite) TestPushPessimisticLock() {
// TestResolveMixed tests mixed resolve with left behind optimistic locks and pessimistic locks,
// using clean whole region resolve path
func (s *testCommitterSuite) TestResolveMixed() {
atomic.StoreUint64(&tikv.ManagedLockTTL, 100) // 100ms
defer atomic.StoreUint64(&tikv.ManagedLockTTL, 3000) // restore default value
atomic.StoreUint64(&transaction.ManagedLockTTL, 100) // 100ms
defer atomic.StoreUint64(&transaction.ManagedLockTTL, 3000) // restore default value
ctx := context.Background()
// pk is the primary lock of txn1
@ -1196,13 +1197,13 @@ func (s *testCommitterSuite) TestResolveMixed() {
// stop txn ttl manager and remove primary key, make the other keys left behind
committer.CloseTTLManager()
muts := tikv.NewPlainMutations(1)
muts := transaction.NewPlainMutations(1)
muts.Push(kvrpcpb.Op_Lock, pk, nil, true)
err = committer.PessimisticRollbackMutations(context.Background(), &muts)
s.Nil(err)
// try to resolve the left optimistic locks, use clean whole region
time.Sleep(time.Duration(atomic.LoadUint64(&tikv.ManagedLockTTL)) * time.Millisecond)
time.Sleep(time.Duration(atomic.LoadUint64(&transaction.ManagedLockTTL)) * time.Millisecond)
optimisticLockInfo := s.getLockInfo(optimisticLockKey)
lock := txnlock.NewLock(optimisticLockInfo)
resolver := tikv.NewLockResolverProb(s.store.GetLockResolver())

View File

@ -52,6 +52,7 @@ import (
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv"
"github.com/tikv/client-go/v2/txnkv/transaction"
"github.com/tikv/client-go/v2/txnkv/txnlock"
"github.com/tikv/client-go/v2/util"
)
@ -104,7 +105,7 @@ func (s *testAsyncCommitCommon) putKV(key, value []byte, enableAsyncCommit bool)
return txn.StartTS(), txn.GetCommitTS()
}
func (s *testAsyncCommitCommon) mustGetFromTxn(txn tikv.TxnProbe, key, expectedValue []byte) {
func (s *testAsyncCommitCommon) mustGetFromTxn(txn transaction.TxnProbe, key, expectedValue []byte) {
v, err := txn.Get(context.Background(), key)
s.Nil(err)
s.Equal(v, expectedValue)
@ -150,30 +151,30 @@ func (s *testAsyncCommitCommon) mustGetNoneFromSnapshot(version uint64, key []by
s.Equal(errors.Cause(err), tikverr.ErrNotExist)
}
func (s *testAsyncCommitCommon) beginAsyncCommitWithLinearizability() tikv.TxnProbe {
func (s *testAsyncCommitCommon) beginAsyncCommitWithLinearizability() transaction.TxnProbe {
txn := s.beginAsyncCommit()
txn.SetCausalConsistency(false)
return txn
}
func (s *testAsyncCommitCommon) beginAsyncCommit() tikv.TxnProbe {
func (s *testAsyncCommitCommon) beginAsyncCommit() transaction.TxnProbe {
txn, err := s.store.Begin()
s.Nil(err)
txn.SetEnableAsyncCommit(true)
return tikv.TxnProbe{KVTxn: txn}
return transaction.TxnProbe{KVTxn: txn}
}
func (s *testAsyncCommitCommon) begin() tikv.TxnProbe {
func (s *testAsyncCommitCommon) begin() transaction.TxnProbe {
txn, err := s.store.Begin()
s.Nil(err)
return tikv.TxnProbe{KVTxn: txn}
return transaction.TxnProbe{KVTxn: txn}
}
func (s *testAsyncCommitCommon) begin1PC() tikv.TxnProbe {
func (s *testAsyncCommitCommon) begin1PC() transaction.TxnProbe {
txn, err := s.store.Begin()
s.Nil(err)
txn.SetEnable1PC(true)
return tikv.TxnProbe{KVTxn: txn}
return transaction.TxnProbe{KVTxn: txn}
}
type testAsyncCommitSuite struct {
@ -208,7 +209,7 @@ func (s *testAsyncCommitSuite) lockKeysWithAsyncCommit(keys, values [][]byte, pr
err = txn.Delete(primaryKey)
}
s.Nil(err)
txnProbe := tikv.TxnProbe{KVTxn: txn}
txnProbe := transaction.TxnProbe{KVTxn: txn}
tpc, err := txnProbe.NewCommitter(0)
s.Nil(err)
tpc.SetPrimaryKey(primaryKey)
@ -462,7 +463,7 @@ func (s *testAsyncCommitSuite) TestAsyncCommitWithMultiDC() {
func (s *testAsyncCommitSuite) TestResolveTxnFallbackFromAsyncCommit() {
keys := [][]byte{[]byte("k0"), []byte("k1")}
values := [][]byte{[]byte("v00"), []byte("v10")}
initTest := func() tikv.CommitterProbe {
initTest := func() transaction.CommitterProbe {
t0 := s.begin()
err := t0.Set(keys[0], values[0])
s.Nil(err)
@ -483,7 +484,7 @@ func (s *testAsyncCommitSuite) TestResolveTxnFallbackFromAsyncCommit() {
committer.SetUseAsyncCommit()
return committer
}
prewriteKey := func(committer tikv.CommitterProbe, idx int, fallback bool) {
prewriteKey := func(committer transaction.CommitterProbe, idx int, fallback bool) {
bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil)
loc, err := s.store.GetRegionCache().LocateKey(bo, keys[idx])
s.Nil(err)

View File

@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/transaction"
)
func TestIsolation(t *testing.T) {
@ -82,7 +83,7 @@ func (s *testIsolationSuite) SetWithRetry(k, v []byte) writeRecord {
txnRaw, err := s.store.Begin()
s.Nil(err)
txn := tikv.TxnProbe{KVTxn: txnRaw}
txn := transaction.TxnProbe{KVTxn: txnRaw}
err = txn.Set(k, v)
s.Nil(err)

View File

@ -53,6 +53,7 @@ import (
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv"
"github.com/tikv/client-go/v2/txnkv/transaction"
"github.com/tikv/client-go/v2/txnkv/txnlock"
)
@ -240,7 +241,7 @@ func (s *testLockSuite) TestCheckTxnStatusTTL() {
txn.Set([]byte("key"), []byte("value"))
s.prewriteTxnWithTTL(txn, 1000)
bo := tikv.NewBackofferWithVars(context.Background(), tikv.PrewriteMaxBackoff, nil)
bo := tikv.NewBackofferWithVars(context.Background(), transaction.PrewriteMaxBackoff, nil)
lr := s.store.NewLockResolver()
callerStartTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
s.Nil(err)
@ -307,7 +308,7 @@ func (s *testLockSuite) TestCheckTxnStatus() {
s.Nil(err)
s.Greater(currentTS, txn.StartTS())
bo := tikv.NewBackofferWithVars(context.Background(), tikv.PrewriteMaxBackoff, nil)
bo := tikv.NewBackofferWithVars(context.Background(), transaction.PrewriteMaxBackoff, nil)
resolver := s.store.NewLockResolver()
// Call getTxnStatus to check the lock status.
status, err := resolver.GetTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true, false, nil)
@ -364,7 +365,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait() {
o := s.store.GetOracle()
currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
s.Nil(err)
bo := tikv.NewBackofferWithVars(context.Background(), tikv.PrewriteMaxBackoff, nil)
bo := tikv.NewBackofferWithVars(context.Background(), transaction.PrewriteMaxBackoff, nil)
resolver := s.store.NewLockResolver()
// Call getTxnStatus for the TxnNotFound case.
@ -406,11 +407,11 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait() {
s.Equal(status.Action(), kvrpcpb.Action_LockNotExistRollback)
}
func (s *testLockSuite) prewriteTxn(txn tikv.TxnProbe) {
func (s *testLockSuite) prewriteTxn(txn transaction.TxnProbe) {
s.prewriteTxnWithTTL(txn, 0)
}
func (s *testLockSuite) prewriteTxnWithTTL(txn tikv.TxnProbe, ttl uint64) {
func (s *testLockSuite) prewriteTxnWithTTL(txn transaction.TxnProbe, ttl uint64) {
committer, err := txn.NewCommitter(0)
s.Nil(err)
if ttl > 0 {
@ -449,9 +450,9 @@ func (s *testLockSuite) ttlEquals(x, y uint64) {
}
func (s *testLockSuite) TestLockTTL() {
managedLockTTL := atomic.LoadUint64(&tikv.ManagedLockTTL)
atomic.StoreUint64(&tikv.ManagedLockTTL, 20000) // set to 20s
defer atomic.StoreUint64(&tikv.ManagedLockTTL, managedLockTTL) // restore value
managedLockTTL := atomic.LoadUint64(&transaction.ManagedLockTTL)
atomic.StoreUint64(&transaction.ManagedLockTTL, 20000) // set to 20s
defer atomic.StoreUint64(&transaction.ManagedLockTTL, managedLockTTL) // restore value
defaultLockTTL := tikv.ConfigProbe{}.GetDefaultLockTTL()
ttlFactor := tikv.ConfigProbe{}.GetTTLFactor()
@ -561,7 +562,7 @@ func (s *testLockSuite) TestZeroMinCommitTS() {
txn, err := s.store.Begin()
s.Nil(err)
txn.Set([]byte("key"), []byte("value"))
bo := tikv.NewBackofferWithVars(context.Background(), tikv.PrewriteMaxBackoff, nil)
bo := tikv.NewBackofferWithVars(context.Background(), transaction.PrewriteMaxBackoff, nil)
mockValue := fmt.Sprintf(`return(%d)`, txn.StartTS())
s.Nil(failpoint.Enable("tikvclient/mockZeroCommitTS", mockValue))
@ -674,7 +675,7 @@ func (s *testLockSuite) TestBatchResolveTxnFallenBackFromAsyncCommit() {
func (s *testLockSuite) TestDeadlockReportWaitChain() {
// Utilities to make the test logic clear and simple.
type txnWrapper struct {
tikv.TxnProbe
transaction.TxnProbe
wg sync.WaitGroup
}

View File

@ -40,6 +40,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/transaction"
)
func TestSetMinCommitTSInAsyncCommit(t *testing.T) {
@ -54,7 +55,7 @@ func TestSetMinCommitTSInAsyncCommit(t *testing.T) {
tx, err := store.Begin()
require.Nil(err)
txn := tikv.TxnProbe{KVTxn: tx}
txn := transaction.TxnProbe{KVTxn: tx}
err = txn.Set([]byte("k"), []byte("v"))
assert.Nil(err)
committer, err := txn.NewCommitter(1)

View File

@ -43,6 +43,7 @@ import (
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/transaction"
)
func TestSafepoint(t *testing.T) {
@ -65,7 +66,7 @@ func (s *testSafePointSuite) TearDownSuite() {
s.Require().Nil(err)
}
func (s *testSafePointSuite) beginTxn() tikv.TxnProbe {
func (s *testSafePointSuite) beginTxn() transaction.TxnProbe {
txn, err := s.store.Begin()
s.Require().Nil(err)
return txn

View File

@ -48,6 +48,7 @@ import (
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv"
"github.com/tikv/client-go/v2/txnkv/transaction"
)
func TestSnapshot(t *testing.T) {
@ -84,7 +85,7 @@ func (s *testSnapshotSuite) TearDownSuite() {
s.Nil(err)
}
func (s *testSnapshotSuite) beginTxn() tikv.TxnProbe {
func (s *testSnapshotSuite) beginTxn() transaction.TxnProbe {
txn, err := s.store.Begin()
s.Require().Nil(err)
return txn

View File

@ -44,6 +44,7 @@ import (
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/transaction"
pd "github.com/tikv/pd/client"
)
@ -74,7 +75,7 @@ func (s *testSplitSuite) TearDownTest() {
s.store.Close()
}
func (s *testSplitSuite) begin() tikv.TxnProbe {
func (s *testSplitSuite) begin() transaction.TxnProbe {
txn, err := s.store.Begin()
s.Require().Nil(err)
return txn

View File

@ -47,6 +47,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/transaction"
"github.com/tikv/client-go/v2/util/codec"
pd "github.com/tikv/pd/client"
)
@ -117,7 +118,7 @@ func s08d(prefix string, n int) string {
return fmt.Sprintf("%s%08d", prefix, n)
}
func toTiDBTxn(txn *tikv.TxnProbe) kv.Transaction {
func toTiDBTxn(txn *transaction.TxnProbe) kv.Transaction {
return txndriver.NewTiKVTxn(txn.KVTxn)
}

View File

@ -48,15 +48,6 @@ type BackoffConfig = retry.Config
// Maximum total sleep time(in ms) for kv/cop commands.
const (
gcResolveLockMaxBackoff = 100000
// CommitSecondaryMaxBackoff is max sleep time of the 'commit' command
CommitSecondaryMaxBackoff = 41000
)
var (
// CommitMaxBackoff is max sleep time of the 'commit' command
CommitMaxBackoff = uint64(41000)
// PrewriteMaxBackoff is max sleep time of the `pre-write` command.
PrewriteMaxBackoff = 20000
)
// NewBackofferWithVars creates a Backoffer with maximum sleep time(in ms) and kv.Variables.

View File

@ -60,6 +60,7 @@ import (
"github.com/tikv/client-go/v2/oracle/oracles"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/rangetask"
"github.com/tikv/client-go/v2/txnkv/transaction"
"github.com/tikv/client-go/v2/txnkv/txnlock"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
"github.com/tikv/client-go/v2/util"
@ -277,13 +278,28 @@ func (s *KVStore) runSafePointChecker() {
}
// Begin a global transaction.
func (s *KVStore) Begin() (*KVTxn, error) {
func (s *KVStore) Begin() (*transaction.KVTxn, error) {
return s.BeginWithOption(DefaultStartTSOption())
}
// BeginWithOption begins a transaction with the given StartTSOption
func (s *KVStore) BeginWithOption(options StartTSOption) (*KVTxn, error) {
return newTiKVTxnWithOptions(s, options)
func (s *KVStore) BeginWithOption(options StartTSOption) (*transaction.KVTxn, error) {
if options.TxnScope == "" {
options.TxnScope = oracle.GlobalTxnScope
}
if options.StartTS != nil {
snapshot := txnsnapshot.NewTiKVSnapshot(s, *options.StartTS, s.nextReplicaReadSeed())
return transaction.NewTiKVTxn(s, snapshot, *options.StartTS, options.TxnScope)
}
bo := retry.NewBackofferWithVars(context.Background(), transaction.TsoMaxBackoff, nil)
startTS, err := s.getTimestampWithRetry(bo, options.TxnScope)
if err != nil {
return nil, errors.Trace(err)
}
snapshot := txnsnapshot.NewTiKVSnapshot(s, startTS, s.nextReplicaReadSeed())
return transaction.NewTiKVTxn(s, snapshot, startTS, options.TxnScope)
}
// DeleteRange delete all versions of all keys in the range[startKey,endKey) immediately.
@ -335,7 +351,7 @@ func (s *KVStore) UUID() string {
// CurrentTimestamp returns current timestamp with the given txnScope (local or global).
func (s *KVStore) CurrentTimestamp(txnScope string) (uint64, error) {
bo := retry.NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil)
bo := retry.NewBackofferWithVars(context.Background(), transaction.TsoMaxBackoff, nil)
startTS, err := s.getTimestampWithRetry(bo, txnScope)
if err != nil {
return 0, errors.Trace(err)
@ -343,6 +359,11 @@ func (s *KVStore) CurrentTimestamp(txnScope string) (uint64, error) {
return startTS, nil
}
// GetTimestampWithRetry returns latest timestamp.
func (s *KVStore) GetTimestampWithRetry(bo *Backoffer, scope string) (uint64, error) {
return s.getTimestampWithRetry(bo, scope)
}
func (s *KVStore) getTimestampWithRetry(bo *Backoffer, txnScope string) (uint64, error) {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("TiKVStore.getTimestampWithRetry", opentracing.ChildOf(span.Context()))
@ -457,6 +478,26 @@ func (s *KVStore) GetMinSafeTS(txnScope string) uint64 {
return s.getMinSafeTSByStores(stores)
}
// Ctx returns ctx.
func (s *KVStore) Ctx() context.Context {
return s.ctx
}
// WaitGroup returns wg
func (s *KVStore) WaitGroup() *sync.WaitGroup {
return &s.wg
}
// TxnLatches returns txnLatches.
func (s *KVStore) TxnLatches() *latch.LatchesScheduler {
return s.txnLatches
}
// GetClusterID returns store's cluster id.
func (s *KVStore) GetClusterID() uint64 {
return s.clusterID
}
func (s *KVStore) getSafeTS(storeID uint64) uint64 {
safeTS, ok := s.safeTSMap.Load(storeID)
if !ok {
@ -572,3 +613,52 @@ func NewLockResolver(etcdAddrs []string, security config.Security, opts ...pd.Cl
}
return s.lockResolver, nil
}
// StartTSOption indicates the option when beginning a transaction
// `TxnScope` must be set for each object
// Every other fields are optional, but currently at most one of them can be set
type StartTSOption struct {
TxnScope string
StartTS *uint64
}
// DefaultStartTSOption creates a default StartTSOption, ie. Work in GlobalTxnScope and get start ts when got used
func DefaultStartTSOption() StartTSOption {
return StartTSOption{TxnScope: oracle.GlobalTxnScope}
}
// SetStartTS returns a new StartTSOption with StartTS set to the given startTS
func (to StartTSOption) SetStartTS(startTS uint64) StartTSOption {
to.StartTS = &startTS
return to
}
// SetTxnScope returns a new StartTSOption with TxnScope set to txnScope
func (to StartTSOption) SetTxnScope(txnScope string) StartTSOption {
to.TxnScope = txnScope
return to
}
// TODO: remove once tidb and br are ready
// KVTxn contains methods to interact with a TiKV transaction.
type KVTxn = transaction.KVTxn
// BinlogWriteResult defines the result of prewrite binlog.
type BinlogWriteResult = transaction.BinlogWriteResult
// KVFilter is a filter that filters out unnecessary KV pairs.
type KVFilter = transaction.KVFilter
// SchemaLeaseChecker is used to validate schema version is not changed during transaction execution.
type SchemaLeaseChecker = transaction.SchemaLeaseChecker
// SchemaVer is the infoSchema which will return the schema version.
type SchemaVer = transaction.SchemaVer
// SchemaAmender is used by pessimistic transactions to amend commit mutations for schema change during 2pc.
type SchemaAmender = transaction.SchemaAmender
// MaxTxnTimeUse is the max time a Txn may use (in ms) from its begin to commit.
// We use it to abort the transaction to guarantee GC worker will not influence it.
const MaxTxnTimeUse = transaction.MaxTxnTimeUse

View File

@ -37,7 +37,6 @@ import (
"context"
"fmt"
"math"
"sync/atomic"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
@ -265,43 +264,6 @@ func (s *KVStore) scatterRegion(bo *Backoffer, regionID uint64, tableID *int64)
return nil
}
func (s *KVStore) preSplitRegion(ctx context.Context, group groupedMutations) bool {
splitKeys := make([][]byte, 0, 4)
preSplitSizeThresholdVal := atomic.LoadUint32(&preSplitSizeThreshold)
regionSize := 0
keysLength := group.mutations.Len()
// The value length maybe zero for pessimistic lock keys
for i := 0; i < keysLength; i++ {
regionSize = regionSize + len(group.mutations.GetKey(i)) + len(group.mutations.GetValue(i))
// The second condition is used for testing.
if regionSize >= int(preSplitSizeThresholdVal) {
regionSize = 0
splitKeys = append(splitKeys, group.mutations.GetKey(i))
}
}
if len(splitKeys) == 0 {
return false
}
regionIDs, err := s.SplitRegions(ctx, splitKeys, true, nil)
if err != nil {
logutil.BgLogger().Warn("2PC split regions failed", zap.Uint64("regionID", group.region.GetID()),
zap.Int("keys count", keysLength), zap.Error(err))
return false
}
for _, regionID := range regionIDs {
err := s.WaitScatterRegionFinish(ctx, regionID, 0)
if err != nil {
logutil.BgLogger().Warn("2PC wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err))
}
}
// Invalidate the old region cache information.
s.regionCache.InvalidateCachedRegion(group.region)
return true
}
const waitScatterRegionFinishBackoff = 120000
// WaitScatterRegionFinish implements SplittableStore interface.

View File

@ -33,17 +33,12 @@
package tikv
import (
"bytes"
"context"
"sync/atomic"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/client-go/v2/internal/locate"
"github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/internal/unionstore"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/transaction"
"github.com/tikv/client-go/v2/txnkv/txnlock"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
pd "github.com/tikv/pd/client"
@ -60,15 +55,10 @@ func (s StoreProbe) NewLockResolver() LockResolverProbe {
return LockResolverProbe{&txnLockResolver}
}
// GetTimestampWithRetry returns latest timestamp.
func (s StoreProbe) GetTimestampWithRetry(bo *Backoffer, scope string) (uint64, error) {
return s.getTimestampWithRetry(bo, scope)
}
// Begin starts a transaction.
func (s StoreProbe) Begin() (TxnProbe, error) {
func (s StoreProbe) Begin() (transaction.TxnProbe, error) {
txn, err := s.KVStore.Begin()
return TxnProbe{KVTxn: txn}, err
return transaction.TxnProbe{KVTxn: txn}, err
}
// GetSnapshot returns a snapshot.
@ -92,8 +82,8 @@ func (s StoreProbe) ClearTxnLatches() {
// SendTxnHeartbeat renews a txn's ttl.
func (s StoreProbe) SendTxnHeartbeat(ctx context.Context, key []byte, startTS uint64, ttl uint64) (uint64, error) {
bo := retry.NewBackofferWithVars(ctx, PrewriteMaxBackoff, nil)
newTTL, _, err := sendTxnHeartBeat(bo, s.KVStore, key, startTS, ttl)
bo := retry.NewBackofferWithVars(ctx, transaction.PrewriteMaxBackoff, nil)
newTTL, _, err := transaction.SendTxnHeartBeat(bo, s.KVStore, key, startTS, ttl)
return newTTL, err
}
@ -117,298 +107,6 @@ func (s StoreProbe) SetSafeTS(storeID, safeTS uint64) {
s.setSafeTS(storeID, safeTS)
}
// TxnProbe wraps a txn and exports internal states for testing purpose.
type TxnProbe struct {
*KVTxn
}
// SetStartTS resets the txn's start ts.
func (txn TxnProbe) SetStartTS(ts uint64) {
txn.startTS = ts
}
// GetCommitTS returns the commit ts.
func (txn TxnProbe) GetCommitTS() uint64 {
return txn.commitTS
}
// GetUnionStore returns transaction's embedded unionstore.
func (txn TxnProbe) GetUnionStore() *unionstore.KVUnionStore {
return txn.us
}
// IsAsyncCommit returns if the txn is committed using async commit.
func (txn TxnProbe) IsAsyncCommit() bool {
return txn.committer.isAsyncCommit()
}
// NewCommitter creates an committer.
func (txn TxnProbe) NewCommitter(sessionID uint64) (CommitterProbe, error) {
committer, err := newTwoPhaseCommitterWithInit(txn.KVTxn, sessionID)
return CommitterProbe{twoPhaseCommitter: committer}, err
}
// GetCommitter returns the transaction committer.
func (txn TxnProbe) GetCommitter() CommitterProbe {
return CommitterProbe{txn.committer}
}
// SetCommitter sets the bind committer of a transaction.
func (txn TxnProbe) SetCommitter(committer CommitterProbe) {
txn.committer = committer.twoPhaseCommitter
}
// CollectLockedKeys returns all locked keys of a transaction.
func (txn TxnProbe) CollectLockedKeys() [][]byte {
return txn.collectLockedKeys()
}
// BatchGetSingleRegion gets a batch of keys from a region.
func (txn TxnProbe) BatchGetSingleRegion(bo *Backoffer, region locate.RegionVerID, keys [][]byte, collect func([]byte, []byte)) error {
snapshot := txnsnapshot.SnapshotProbe{KVSnapshot: txn.GetSnapshot()}
return snapshot.BatchGetSingleRegion(bo, region, keys, collect)
}
// NewScanner returns a scanner to iterate given key range.
func (txn TxnProbe) NewScanner(start, end []byte, batchSize int, reverse bool) (*txnsnapshot.Scanner, error) {
snapshot := txnsnapshot.SnapshotProbe{KVSnapshot: txn.GetSnapshot()}
return snapshot.NewScanner(start, end, batchSize, reverse)
}
// GetStartTime returns the time when txn starts.
func (txn TxnProbe) GetStartTime() time.Time {
return txn.startTime
}
func newTwoPhaseCommitterWithInit(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, error) {
c, err := newTwoPhaseCommitter(txn, sessionID)
if err != nil {
return nil, errors.Trace(err)
}
if err = c.initKeysAndMutations(); err != nil {
return nil, errors.Trace(err)
}
return c, nil
}
// CommitterProbe wraps a 2PC committer and exports internal states for testing purpose.
type CommitterProbe struct {
*twoPhaseCommitter
}
// InitKeysAndMutations prepares the committer for commit.
func (c CommitterProbe) InitKeysAndMutations() error {
return c.initKeysAndMutations()
}
// SetPrimaryKey resets the committer's commit ts.
func (c CommitterProbe) SetPrimaryKey(key []byte) {
c.primaryKey = key
}
// GetPrimaryKey returns primary key of the committer.
func (c CommitterProbe) GetPrimaryKey() []byte {
return c.primaryKey
}
// GetMutations returns the mutation buffer to commit.
func (c CommitterProbe) GetMutations() CommitterMutations {
return c.mutations
}
// SetMutations replace the mutation buffer.
func (c CommitterProbe) SetMutations(muts CommitterMutations) {
c.mutations = muts.(*memBufferMutations)
}
// SetCommitTS resets the committer's commit ts.
func (c CommitterProbe) SetCommitTS(ts uint64) {
atomic.StoreUint64(&c.commitTS, ts)
}
// GetCommitTS returns the commit ts of the committer.
func (c CommitterProbe) GetCommitTS() uint64 {
return atomic.LoadUint64(&c.commitTS)
}
// GetMinCommitTS returns the minimal commit ts can be used.
func (c CommitterProbe) GetMinCommitTS() uint64 {
return c.minCommitTS
}
// SetMinCommitTS sets the minimal commit ts can be used.
func (c CommitterProbe) SetMinCommitTS(ts uint64) {
c.minCommitTS = ts
}
// SetMaxCommitTS sets the max commit ts can be used.
func (c CommitterProbe) SetMaxCommitTS(ts uint64) {
c.maxCommitTS = ts
}
// SetSessionID sets the session id of the committer.
func (c CommitterProbe) SetSessionID(id uint64) {
c.sessionID = id
}
// GetForUpdateTS returns the pessimistic ForUpdate ts.
func (c CommitterProbe) GetForUpdateTS() uint64 {
return c.forUpdateTS
}
// SetForUpdateTS sets pessimistic ForUpdate ts.
func (c CommitterProbe) SetForUpdateTS(ts uint64) {
c.forUpdateTS = ts
}
// GetStartTS returns the start ts of the transaction.
func (c CommitterProbe) GetStartTS() uint64 {
return c.startTS
}
// GetLockTTL returns the lock ttl duration of the transaction.
func (c CommitterProbe) GetLockTTL() uint64 {
return c.lockTTL
}
// SetLockTTL sets the lock ttl duration.
func (c CommitterProbe) SetLockTTL(ttl uint64) {
c.lockTTL = ttl
}
// SetLockTTLByTimeAndSize sets the lock ttl duration by time and size.
func (c CommitterProbe) SetLockTTLByTimeAndSize(start time.Time, size int) {
c.lockTTL = txnLockTTL(start, size)
}
// SetTxnSize resets the txn size of the committer and updates lock TTL.
func (c CommitterProbe) SetTxnSize(sz int) {
c.txnSize = sz
c.lockTTL = txnLockTTL(c.txn.startTime, sz)
}
// SetUseAsyncCommit enables async commit feature.
func (c CommitterProbe) SetUseAsyncCommit() {
c.useAsyncCommit = 1
}
// Execute runs the commit process.
func (c CommitterProbe) Execute(ctx context.Context) error {
return c.execute(ctx)
}
// PrewriteAllMutations performs the first phase of commit.
func (c CommitterProbe) PrewriteAllMutations(ctx context.Context) error {
return c.PrewriteMutations(ctx, c.mutations)
}
// PrewriteMutations performs the first phase of commit for given keys.
func (c CommitterProbe) PrewriteMutations(ctx context.Context, mutations CommitterMutations) error {
return c.prewriteMutations(retry.NewBackofferWithVars(ctx, PrewriteMaxBackoff, nil), mutations)
}
// CommitMutations performs the second phase of commit.
func (c CommitterProbe) CommitMutations(ctx context.Context) error {
return c.commitMutations(retry.NewBackofferWithVars(ctx, int(atomic.LoadUint64(&CommitMaxBackoff)), nil), c.mutationsOfKeys([][]byte{c.primaryKey}))
}
// MutationsOfKeys returns mutations match the keys.
func (c CommitterProbe) MutationsOfKeys(keys [][]byte) CommitterMutations {
return c.mutationsOfKeys(keys)
}
// PessimisticRollbackMutations rolls mutations back.
func (c CommitterProbe) PessimisticRollbackMutations(ctx context.Context, muts CommitterMutations) error {
return c.pessimisticRollbackMutations(retry.NewBackofferWithVars(ctx, pessimisticRollbackMaxBackoff, nil), muts)
}
// Cleanup cleans dirty data of a committer.
func (c CommitterProbe) Cleanup(ctx context.Context) {
c.cleanup(ctx)
c.cleanWg.Wait()
}
// WaitCleanup waits for the committer to complete.
func (c CommitterProbe) WaitCleanup() {
c.cleanWg.Wait()
}
// IsOnePC returns if the committer is using one PC.
func (c CommitterProbe) IsOnePC() bool {
return c.isOnePC()
}
// BuildPrewriteRequest builds rpc request for mutation.
func (c CommitterProbe) BuildPrewriteRequest(regionID, regionConf, regionVersion uint64, mutations CommitterMutations, txnSize uint64) *tikvrpc.Request {
var batch batchMutations
batch.mutations = mutations
batch.region = locate.NewRegionVerID(regionID, regionConf, regionVersion)
for _, key := range mutations.GetKeys() {
if bytes.Equal(key, c.primary()) {
batch.isPrimary = true
break
}
}
return c.buildPrewriteRequest(batch, txnSize)
}
// IsAsyncCommit returns if the committer uses async commit.
func (c CommitterProbe) IsAsyncCommit() bool {
return c.isAsyncCommit()
}
// CheckAsyncCommit returns if async commit is available.
func (c CommitterProbe) CheckAsyncCommit() bool {
return c.checkAsyncCommit()
}
// GetOnePCCommitTS returns the commit ts of one pc.
func (c CommitterProbe) GetOnePCCommitTS() uint64 {
return c.onePCCommitTS
}
// IsTTLUninitialized returns if the TTL manager is uninitialized.
func (c CommitterProbe) IsTTLUninitialized() bool {
state := atomic.LoadUint32((*uint32)(&c.ttlManager.state))
return state == uint32(stateUninitialized)
}
// IsTTLRunning returns if the TTL manager is running state.
func (c CommitterProbe) IsTTLRunning() bool {
state := atomic.LoadUint32((*uint32)(&c.ttlManager.state))
return state == uint32(stateRunning)
}
// CloseTTLManager closes the TTL manager.
func (c CommitterProbe) CloseTTLManager() {
c.ttlManager.close()
}
// GetUndeterminedErr returns the encountered undetermined error (if any).
func (c CommitterProbe) GetUndeterminedErr() error {
c.mu.RLock()
defer c.mu.RUnlock()
return c.mu.undeterminedErr
}
// SetNoFallBack disallows async commit to fall back to normal mode.
func (c CommitterProbe) SetNoFallBack() {
c.testingKnobs.noFallBack = true
}
// SetPrimaryKeyBlocker is used to block committer after primary is sent.
func (c CommitterProbe) SetPrimaryKeyBlocker(ac, bk chan struct{}) {
c.testingKnobs.acAfterCommitPrimary = ac
c.testingKnobs.bkAfterCommitPrimary = bk
}
// CleanupMutations performs the clean up phase.
func (c CommitterProbe) CleanupMutations(ctx context.Context) error {
bo := retry.NewBackofferWithVars(ctx, cleanupMaxBackoff, nil)
return c.cleanupMutations(bo, c.mutations)
}
// LockResolverProbe wraps a LockResolver and exposes internal stats for testing purpose.
type LockResolverProbe struct {
*txnlock.LockResolverProbe
@ -422,13 +120,13 @@ func NewLockResolverProb(r *txnlock.LockResolver) *LockResolverProbe {
// ResolveLock resolves single lock.
func (l LockResolverProbe) ResolveLock(ctx context.Context, lock *txnlock.Lock) error {
bo := retry.NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, nil)
bo := retry.NewBackofferWithVars(ctx, transaction.ConfigProbe{}.GetPessimisticLockMaxBackoff(), nil)
return l.LockResolverProbe.ResolveLock(bo, lock)
}
// ResolvePessimisticLock resolves single pessimistic lock.
func (l LockResolverProbe) ResolvePessimisticLock(ctx context.Context, lock *txnlock.Lock) error {
bo := retry.NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, nil)
bo := retry.NewBackofferWithVars(ctx, transaction.ConfigProbe{}.GetPessimisticLockMaxBackoff(), nil)
return l.LockResolverProbe.ResolvePessimisticLock(bo, lock)
}
@ -437,7 +135,7 @@ type ConfigProbe struct{}
// GetTxnCommitBatchSize returns the batch size to commit txn.
func (c ConfigProbe) GetTxnCommitBatchSize() uint64 {
return txnCommitBatchSize
return transaction.ConfigProbe{}.GetTxnCommitBatchSize()
}
// GetBigTxnThreshold returns the txn size to be considered as big txn.
@ -454,12 +152,12 @@ func (c ConfigProbe) GetScanBatchSize() int {
// GetDefaultLockTTL returns the default lock TTL.
func (c ConfigProbe) GetDefaultLockTTL() uint64 {
return defaultLockTTL
return transaction.ConfigProbe{}.GetDefaultLockTTL()
}
// GetTTLFactor returns the factor to calculate txn TTL.
func (c ConfigProbe) GetTTLFactor() int {
return ttlFactor
return transaction.ConfigProbe{}.GetTTLFactor()
}
// GetGetMaxBackoff returns the max sleep for get command.
@ -469,22 +167,22 @@ func (c ConfigProbe) GetGetMaxBackoff() int {
// LoadPreSplitDetectThreshold returns presplit detect threshold config.
func (c ConfigProbe) LoadPreSplitDetectThreshold() uint32 {
return atomic.LoadUint32(&preSplitDetectThreshold)
return transaction.ConfigProbe{}.LoadPreSplitDetectThreshold()
}
// StorePreSplitDetectThreshold updates presplit detect threshold config.
func (c ConfigProbe) StorePreSplitDetectThreshold(v uint32) {
atomic.StoreUint32(&preSplitDetectThreshold, v)
transaction.ConfigProbe{}.StorePreSplitDetectThreshold(v)
}
// LoadPreSplitSizeThreshold returns presplit size threshold config.
func (c ConfigProbe) LoadPreSplitSizeThreshold() uint32 {
return atomic.LoadUint32(&preSplitSizeThreshold)
return transaction.ConfigProbe{}.LoadPreSplitSizeThreshold()
}
// StorePreSplitSizeThreshold updates presplit size threshold config.
func (c ConfigProbe) StorePreSplitSizeThreshold(v uint32) {
atomic.StoreUint32(&preSplitSizeThreshold, v)
transaction.ConfigProbe{}.StorePreSplitSizeThreshold(v)
}
// SetOracleUpdateInterval sets the interval of updating cached ts.

View File

@ -30,7 +30,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package tikv
package transaction
import (
"bytes"
@ -51,6 +51,7 @@ import (
"github.com/tikv/client-go/v2/config"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/internal/client"
"github.com/tikv/client-go/v2/internal/latch"
"github.com/tikv/client-go/v2/internal/locate"
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/internal/retry"
@ -59,6 +60,7 @@ import (
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnlock"
"github.com/tikv/client-go/v2/util"
zap "go.uber.org/zap"
)
@ -67,7 +69,7 @@ import (
const slowRequestThreshold = time.Minute
type twoPhaseCommitAction interface {
handleSingleBatch(*twoPhaseCommitter, *Backoffer, batchMutations) error
handleSingleBatch(*twoPhaseCommitter, *retry.Backoffer, batchMutations) error
tiKVTxnRegionsNumHistogram() prometheus.Observer
String() string
}
@ -77,9 +79,43 @@ var (
ManagedLockTTL uint64 = 20000 // 20s
)
var (
// CommitMaxBackoff is max sleep time of the 'commit' command
CommitMaxBackoff = uint64(41000)
// PrewriteMaxBackoff is max sleep time of the `pre-write` command.
PrewriteMaxBackoff = 20000
)
type kvstore interface {
// GetRegionCache gets the RegionCache.
GetRegionCache() *locate.RegionCache
// SplitRegions splits regions by splitKeys.
SplitRegions(ctx context.Context, splitKeys [][]byte, scatter bool, tableID *int64) (regionIDs []uint64, err error)
// WaitScatterRegionFinish implements SplittableStore interface.
// backOff is the back off time of the wait scatter region.(Milliseconds)
// if backOff <= 0, the default wait scatter back off time will be used.
WaitScatterRegionFinish(ctx context.Context, regionID uint64, backOff int) error
// GetTimestampWithRetry returns latest timestamp.
GetTimestampWithRetry(bo *retry.Backoffer, scope string) (uint64, error)
// GetOracle gets a timestamp oracle client.
GetOracle() oracle.Oracle
CurrentTimestamp(txnScope string) (uint64, error)
// SendReq sends a request to TiKV.
SendReq(bo *retry.Backoffer, req *tikvrpc.Request, regionID locate.RegionVerID, timeout time.Duration) (*tikvrpc.Response, error)
// GetTiKVClient gets the client instance.
GetTiKVClient() (client client.Client)
GetLockResolver() *txnlock.LockResolver
Ctx() context.Context
WaitGroup() *sync.WaitGroup
// TxnLatches returns txnLatches.
TxnLatches() *latch.LatchesScheduler
GetClusterID() uint64
}
// twoPhaseCommitter executes a two-phase commit protocol.
type twoPhaseCommitter struct {
store *KVStore
store kvstore
txn *KVTxn
startTS uint64
mutations *memBufferMutations
@ -132,17 +168,14 @@ type twoPhaseCommitter struct {
binlog BinlogExecutor
resourceGroupTag []byte
storeWg *sync.WaitGroup
storeCtx context.Context
}
type memBufferMutations struct {
storage *MemDB
storage *unionstore.MemDB
handles []unionstore.MemKeyHandle
}
func newMemBufferMutations(sizeHint int, storage *MemDB) *memBufferMutations {
func newMemBufferMutations(sizeHint int, storage *unionstore.MemDB) *memBufferMutations {
return &memBufferMutations{
handles: make([]unionstore.MemKeyHandle, 0, sizeHint),
storage: storage,
@ -332,8 +365,6 @@ func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, err
},
isPessimistic: txn.IsPessimistic(),
binlog: txn.binlog,
storeWg: &txn.store.wg,
storeCtx: txn.store.ctx,
}, nil
}
@ -523,7 +554,7 @@ var preSplitSizeThreshold uint32 = 32 << 20
// doActionOnMutations groups keys into primary batch and secondary batches, if primary batch exists in the key,
// it does action on primary batch first, then on secondary batches. If action is commit, secondary batches
// is done in background goroutine.
func (c *twoPhaseCommitter) doActionOnMutations(bo *Backoffer, action twoPhaseCommitAction, mutations CommitterMutations) error {
func (c *twoPhaseCommitter) doActionOnMutations(bo *retry.Backoffer, action twoPhaseCommitAction, mutations CommitterMutations) error {
if mutations.Len() == 0 {
return nil
}
@ -540,15 +571,15 @@ func (c *twoPhaseCommitter) doActionOnMutations(bo *Backoffer, action twoPhaseCo
}
type groupedMutations struct {
region RegionVerID
region locate.RegionVerID
mutations CommitterMutations
}
// groupSortedMutationsByRegion separates keys into groups by their belonging Regions.
func groupSortedMutationsByRegion(c *RegionCache, bo *retry.Backoffer, m CommitterMutations) ([]groupedMutations, error) {
func groupSortedMutationsByRegion(c *locate.RegionCache, bo *retry.Backoffer, m CommitterMutations) ([]groupedMutations, error) {
var (
groups []groupedMutations
lastLoc *KeyLocation
lastLoc *locate.KeyLocation
)
lastUpperBound := 0
for i := 0; i < m.Len(); i++ {
@ -577,8 +608,8 @@ func groupSortedMutationsByRegion(c *RegionCache, bo *retry.Backoffer, m Committ
}
// groupMutations groups mutations by region, then checks for any large groups and in that case pre-splits the region.
func (c *twoPhaseCommitter) groupMutations(bo *Backoffer, mutations CommitterMutations) ([]groupedMutations, error) {
groups, err := groupSortedMutationsByRegion(c.store.regionCache, bo, mutations)
func (c *twoPhaseCommitter) groupMutations(bo *retry.Backoffer, mutations CommitterMutations) ([]groupedMutations, error) {
groups, err := groupSortedMutationsByRegion(c.store.GetRegionCache(), bo, mutations)
if err != nil {
return nil, errors.Trace(err)
}
@ -592,14 +623,14 @@ func (c *twoPhaseCommitter) groupMutations(bo *Backoffer, mutations CommitterMut
logutil.BgLogger().Info("2PC detect large amount of mutations on a single region",
zap.Uint64("region", group.region.GetID()),
zap.Int("mutations count", group.mutations.Len()))
if c.store.preSplitRegion(bo.GetCtx(), group) {
if c.preSplitRegion(bo.GetCtx(), group) {
didPreSplit = true
}
}
}
// Reload region cache again.
if didPreSplit {
groups, err = groupSortedMutationsByRegion(c.store.regionCache, bo, mutations)
groups, err = groupSortedMutationsByRegion(c.store.GetRegionCache(), bo, mutations)
if err != nil {
return nil, errors.Trace(err)
}
@ -608,9 +639,49 @@ func (c *twoPhaseCommitter) groupMutations(bo *Backoffer, mutations CommitterMut
return groups, nil
}
func (c *twoPhaseCommitter) preSplitRegion(ctx context.Context, group groupedMutations) bool {
splitKeys := make([][]byte, 0, 4)
preSplitSizeThresholdVal := atomic.LoadUint32(&preSplitSizeThreshold)
regionSize := 0
keysLength := group.mutations.Len()
// The value length maybe zero for pessimistic lock keys
for i := 0; i < keysLength; i++ {
regionSize = regionSize + len(group.mutations.GetKey(i)) + len(group.mutations.GetValue(i))
// The second condition is used for testing.
if regionSize >= int(preSplitSizeThresholdVal) {
regionSize = 0
splitKeys = append(splitKeys, group.mutations.GetKey(i))
}
}
if len(splitKeys) == 0 {
return false
}
regionIDs, err := c.store.SplitRegions(ctx, splitKeys, true, nil)
if err != nil {
logutil.BgLogger().Warn("2PC split regions failed", zap.Uint64("regionID", group.region.GetID()),
zap.Int("keys count", keysLength), zap.Error(err))
return false
}
for _, regionID := range regionIDs {
err := c.store.WaitScatterRegionFinish(ctx, regionID, 0)
if err != nil {
logutil.BgLogger().Warn("2PC wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err))
}
}
// Invalidate the old region cache information.
c.store.GetRegionCache().InvalidateCachedRegion(group.region)
return true
}
// CommitSecondaryMaxBackoff is max sleep time of the 'commit' command
const CommitSecondaryMaxBackoff = 41000
// doActionOnGroupedMutations splits groups into batches (there is one group per region, and potentially many batches per group, but all mutations
// in a batch will belong to the same region).
func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPhaseCommitAction, groups []groupedMutations) error {
func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action twoPhaseCommitAction, groups []groupedMutations) error {
action.tiKVTxnRegionsNumHistogram().Observe(float64(len(groups)))
var sizeFunc = c.keySize
@ -683,10 +754,10 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh
}
// Already spawned a goroutine for async commit transaction.
if actionIsCommit && !actionCommit.retry && !c.isAsyncCommit() {
secondaryBo := retry.NewBackofferWithVars(c.storeCtx, CommitSecondaryMaxBackoff, c.txn.vars)
c.storeWg.Add(1)
secondaryBo := retry.NewBackofferWithVars(c.store.Ctx(), CommitSecondaryMaxBackoff, c.txn.vars)
c.store.WaitGroup().Add(1)
go func() {
defer c.storeWg.Done()
defer c.store.WaitGroup().Done()
if c.sessionID > 0 {
if v, err := util.EvalFailpoint("beforeCommitSecondaries"); err == nil {
if s, ok := v.(string); !ok {
@ -717,7 +788,7 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh
}
// doActionOnBatches does action to batches in parallel.
func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseCommitAction, batches []batchMutations) error {
func (c *twoPhaseCommitter) doActionOnBatches(bo *retry.Backoffer, action twoPhaseCommitAction, batches []batchMutations) error {
if len(batches) == 0 {
return nil
}
@ -819,7 +890,7 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
return
}
bo := retry.NewBackofferWithVars(context.Background(), keepAliveMaxBackoff, c.txn.vars)
now, err := c.store.getTimestampWithRetry(bo, c.txn.GetScope())
now, err := c.store.GetTimestampWithRetry(bo, c.txn.GetScope())
if err != nil {
logutil.Logger(bo.GetCtx()).Warn("keepAlive get tso fail",
zap.Error(err))
@ -869,7 +940,7 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
}
}
func sendTxnHeartBeat(bo *Backoffer, store *KVStore, primary []byte, startTS, ttl uint64) (newTTL uint64, stopHeartBeat bool, err error) {
func sendTxnHeartBeat(bo *retry.Backoffer, store kvstore, primary []byte, startTS, ttl uint64) (newTTL uint64, stopHeartBeat bool, err error) {
req := tikvrpc.NewRequest(tikvrpc.CmdTxnHeartBeat, &kvrpcpb.TxnHeartBeatRequest{
PrimaryLock: primary,
StartVersion: startTS,
@ -984,7 +1055,8 @@ func (c *twoPhaseCommitter) checkOnePCFallBack(action twoPhaseCommitAction, batc
const (
cleanupMaxBackoff = 20000
tsoMaxBackoff = 15000
// TsoMaxBackoff is the max sleep time to get tso.
TsoMaxBackoff = 15000
)
// VeryLongMaxBackoff is the max sleep time of transaction commit.
@ -992,9 +1064,9 @@ var VeryLongMaxBackoff = uint64(600000) // 10mins
func (c *twoPhaseCommitter) cleanup(ctx context.Context) {
c.cleanWg.Add(1)
c.storeWg.Add(1)
c.store.WaitGroup().Add(1)
go func() {
defer c.storeWg.Done()
defer c.store.WaitGroup().Done()
if _, err := util.EvalFailpoint("commitFailedSkipCleanup"); err == nil {
logutil.Logger(ctx).Info("[failpoint] injected skip cleanup secondaries on failure",
zap.Uint64("txnStartTS", c.startTS))
@ -1002,7 +1074,7 @@ func (c *twoPhaseCommitter) cleanup(ctx context.Context) {
return
}
cleanupKeysCtx := context.WithValue(c.storeCtx, retry.TxnStartKey, ctx.Value(retry.TxnStartKey))
cleanupKeysCtx := context.WithValue(c.store.Ctx(), retry.TxnStartKey, ctx.Value(retry.TxnStartKey))
var err error
if !c.isOnePC() {
err = c.cleanupMutations(retry.NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations)
@ -1103,7 +1175,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
// from PD and plus one as our MinCommitTS.
if commitTSMayBeCalculated && c.needLinearizability() {
util.EvalFailpoint("getMinCommitTSFromTSO")
latestTS, err := c.store.getTimestampWithRetry(bo, c.txn.GetScope())
latestTS, err := c.store.GetTimestampWithRetry(bo, c.txn.GetScope())
// If we fail to get a timestamp from PD, we just propagate the failure
// instead of falling back to the normal 2PC because a normal 2PC will
// also be likely to fail due to the same timestamp issue.
@ -1209,7 +1281,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
} else {
start = time.Now()
logutil.Event(ctx, "start get commit ts")
commitTS, err = c.store.getTimestampWithRetry(retry.NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars), c.txn.GetScope())
commitTS, err = c.store.GetTimestampWithRetry(retry.NewBackofferWithVars(ctx, TsoMaxBackoff, c.txn.vars), c.txn.GetScope())
if err != nil {
logutil.Logger(ctx).Warn("2PC get commitTS failed",
zap.Error(err),
@ -1255,7 +1327,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
}
atomic.StoreUint64(&c.commitTS, commitTS)
if c.store.oracle.IsExpired(c.startTS, MaxTxnTimeUse, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) {
if c.store.GetOracle().IsExpired(c.startTS, MaxTxnTimeUse, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) {
err = errors.Errorf("session %d txn takes too much time, txnStartTS: %d, comm: %d",
c.sessionID, c.startTS, c.commitTS)
return err
@ -1288,13 +1360,13 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
logutil.Logger(ctx).Debug("2PC will use async commit protocol to commit this txn",
zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", c.commitTS),
zap.Uint64("sessionID", c.sessionID))
c.storeWg.Add(1)
c.store.WaitGroup().Add(1)
go func() {
defer c.storeWg.Done()
defer c.store.WaitGroup().Done()
if _, err := util.EvalFailpoint("asyncCommitDoNothing"); err == nil {
return
}
commitBo := retry.NewBackofferWithVars(c.storeCtx, CommitSecondaryMaxBackoff, c.txn.vars)
commitBo := retry.NewBackofferWithVars(c.store.Ctx(), CommitSecondaryMaxBackoff, c.txn.vars)
err := c.commitMutations(commitBo, c.mutations)
if err != nil {
logutil.Logger(ctx).Warn("2PC async commit failed", zap.Uint64("sessionID", c.sessionID),
@ -1477,7 +1549,7 @@ func (c *twoPhaseCommitter) tryAmendTxn(ctx context.Context, startInfoSchema Sch
func (c *twoPhaseCommitter) getCommitTS(ctx context.Context, commitDetail *util.CommitDetails) (uint64, error) {
start := time.Now()
logutil.Event(ctx, "start get commit ts")
commitTS, err := c.store.getTimestampWithRetry(retry.NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars), c.txn.GetScope())
commitTS, err := c.store.GetTimestampWithRetry(retry.NewBackofferWithVars(ctx, TsoMaxBackoff, c.txn.vars), c.txn.GetScope())
if err != nil {
logutil.Logger(ctx).Warn("2PC get commitTS failed",
zap.Error(err),
@ -1574,7 +1646,7 @@ type batchMutations struct {
isPrimary bool
}
func (b *batchMutations) relocate(bo *Backoffer, c *RegionCache) (bool, error) {
func (b *batchMutations) relocate(bo *retry.Backoffer, c *locate.RegionCache) (bool, error) {
begin, end := b.mutations.GetKey(0), b.mutations.GetKey(b.mutations.Len()-1)
loc, err := c.LocateKey(bo, begin)
if err != nil {
@ -1663,13 +1735,13 @@ type batchExecutor struct {
rateLimiter *util.RateLimit // rate limiter for concurrency control, maybe more strategies
committer *twoPhaseCommitter // here maybe more different type committer in the future
action twoPhaseCommitAction // the work action type
backoffer *Backoffer // Backoffer
backoffer *retry.Backoffer // Backoffer
tokenWaitDuration time.Duration // get token wait time
}
// newBatchExecutor create processor to handle concurrent batch works(prewrite/commit etc)
func newBatchExecutor(rateLimit int, committer *twoPhaseCommitter,
action twoPhaseCommitAction, backoffer *Backoffer) *batchExecutor {
action twoPhaseCommitAction, backoffer *retry.Backoffer) *batchExecutor {
return &batchExecutor{rateLimit, nil, committer,
action, backoffer, 0}
}
@ -1690,7 +1762,7 @@ func (batchExe *batchExecutor) startWorker(exitCh chan struct{}, ch chan error,
batch := batch1
go func() {
defer batchExe.rateLimiter.PutToken()
var singleBatchBackoffer *Backoffer
var singleBatchBackoffer *retry.Backoffer
if _, ok := batchExe.action.(actionCommit); ok {
// Because the secondary batches of the commit actions are implemented to be
// committed asynchronously in background goroutines, we should not

View File

@ -30,19 +30,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package tikv
package transaction
import (
"context"
"github.com/pingcap/errors"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/internal/unionstore"
)
// BatchBufferGetter is the interface for BatchGet.
type BatchBufferGetter interface {
Len() int
Getter
unionstore.Getter
}
// BatchGetter is the interface for BatchGet.

View File

@ -30,7 +30,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package tikv
package transaction
import (
"context"

View File

@ -30,7 +30,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package tikv
package transaction
import (
"context"

View File

@ -30,7 +30,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package tikv
package transaction
import (
"github.com/pingcap/errors"
@ -56,7 +56,7 @@ func (actionCleanup) tiKVTxnRegionsNumHistogram() prometheus.Observer {
return metrics.TxnRegionsNumHistogramCleanup
}
func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchMutations) error {
func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &kvrpcpb.BatchRollbackRequest{
Keys: batch.mutations.GetKeys(),
StartVersion: c.startTS,
@ -87,6 +87,6 @@ func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batc
return nil
}
func (c *twoPhaseCommitter) cleanupMutations(bo *Backoffer, mutations CommitterMutations) error {
func (c *twoPhaseCommitter) cleanupMutations(bo *retry.Backoffer, mutations CommitterMutations) error {
return c.doActionOnMutations(bo, actionCleanup{}, mutations)
}

View File

@ -30,7 +30,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package tikv
package transaction
import (
"encoding/hex"
@ -62,7 +62,7 @@ func (actionCommit) tiKVTxnRegionsNumHistogram() prometheus.Observer {
return metrics.TxnRegionsNumHistogramCommit
}
func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchMutations) error {
func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
keys := batch.mutations.GetKeys()
req := tikvrpc.NewRequest(tikvrpc.CmdCommit, &kvrpcpb.CommitRequest{
StartVersion: c.startTS,
@ -73,7 +73,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
tBegin := time.Now()
attempts := 0
sender := NewRegionRequestSender(c.store.regionCache, c.store.GetTiKVClient())
sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient())
for {
attempts++
if time.Since(tBegin) > slowRequestThreshold {
@ -110,7 +110,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
return errors.Trace(err)
}
}
same, err := batch.relocate(bo, c.store.regionCache)
same, err := batch.relocate(bo, c.store.GetRegionCache())
if err != nil {
return errors.Trace(err)
}
@ -145,7 +145,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
}
// Update commit ts and retry.
commitTS, err := c.store.getTimestampWithRetry(bo, c.txn.GetScope())
commitTS, err := c.store.GetTimestampWithRetry(bo, c.txn.GetScope())
if err != nil {
logutil.Logger(bo.GetCtx()).Warn("2PC get commitTS failed",
zap.Error(err),
@ -198,7 +198,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
return nil
}
func (c *twoPhaseCommitter) commitMutations(bo *Backoffer, mutations CommitterMutations) error {
func (c *twoPhaseCommitter) commitMutations(bo *retry.Backoffer, mutations CommitterMutations) error {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("twoPhaseCommitter.commitMutations", opentracing.ChildOf(span.Context()))
defer span1.Finish()

View File

@ -30,7 +30,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package tikv
package transaction
import (
"encoding/hex"
@ -81,7 +81,7 @@ func (actionPessimisticRollback) tiKVTxnRegionsNumHistogram() prometheus.Observe
return metrics.TxnRegionsNumHistogramPessimisticRollback
}
func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchMutations) error {
func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
m := batch.mutations
mutations := make([]*kvrpcpb.Mutation, m.Len())
for i := 0; i < m.Len(); i++ {
@ -154,7 +154,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
return errors.Trace(err)
}
}
same, err := batch.relocate(bo, c.store.regionCache)
same, err := batch.relocate(bo, c.store.GetRegionCache())
if err != nil {
return errors.Trace(err)
}
@ -200,7 +200,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
// Because we already waited on tikv, no need to Backoff here.
// tikv default will wait 3s(also the maximum wait value) when lock error occurs
startTime = time.Now()
msBeforeTxnExpired, _, err := c.store.lockResolver.ResolveLocks(bo, 0, locks)
msBeforeTxnExpired, _, err := c.store.GetLockResolver().ResolveLocks(bo, 0, locks)
if err != nil {
return errors.Trace(err)
}
@ -240,7 +240,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
}
}
func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchMutations) error {
func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticRollback, &kvrpcpb.PessimisticRollbackRequest{
StartVersion: c.startTS,
ForUpdateTs: c.forUpdateTS,
@ -265,7 +265,7 @@ func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *Bac
return nil
}
func (c *twoPhaseCommitter) pessimisticLockMutations(bo *Backoffer, lockCtx *kv.LockCtx, mutations CommitterMutations) error {
func (c *twoPhaseCommitter) pessimisticLockMutations(bo *retry.Backoffer, lockCtx *kv.LockCtx, mutations CommitterMutations) error {
if c.sessionID > 0 {
if val, err := util.EvalFailpoint("beforePessimisticLock"); err == nil {
// Pass multiple instructions in one string, delimited by commas, to trigger multiple behaviors, like
@ -289,6 +289,6 @@ func (c *twoPhaseCommitter) pessimisticLockMutations(bo *Backoffer, lockCtx *kv.
return c.doActionOnMutations(bo, actionPessimisticLock{lockCtx}, mutations)
}
func (c *twoPhaseCommitter) pessimisticRollbackMutations(bo *Backoffer, mutations CommitterMutations) error {
func (c *twoPhaseCommitter) pessimisticRollbackMutations(bo *retry.Backoffer, mutations CommitterMutations) error {
return c.doActionOnMutations(bo, actionPessimisticRollback{}, mutations)
}

View File

@ -30,7 +30,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package tikv
package transaction
import (
"encoding/hex"
@ -141,7 +141,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u
return tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag})
}
func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchMutations) (err error) {
func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) (err error) {
// WARNING: This function only tries to send a single request to a single region, so it don't
// need to unset the `useOnePC` flag when it fails. A special case is that when TiKV returns
// regionErr, it's uncertain if the request will be splitted into multiple and sent to multiple
@ -181,7 +181,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff
attempts := 0
req := c.buildPrewriteRequest(batch, txnSize)
sender := NewRegionRequestSender(c.store.regionCache, c.store.GetTiKVClient())
sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient())
defer func() {
if err != nil {
// If we fail to receive response for async commit prewrite, it will be undetermined whether this
@ -220,7 +220,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff
return errors.Trace(err)
}
}
same, err := batch.relocate(bo, c.store.regionCache)
same, err := batch.relocate(bo, c.store.GetRegionCache())
if err != nil {
return errors.Trace(err)
}
@ -314,7 +314,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff
locks = append(locks, lock)
}
start := time.Now()
msBeforeExpired, err := c.store.lockResolver.ResolveLocksForWrite(bo, c.startTS, c.forUpdateTS, locks)
msBeforeExpired, err := c.store.GetLockResolver().ResolveLocksForWrite(bo, c.startTS, c.forUpdateTS, locks)
if err != nil {
return errors.Trace(err)
}
@ -328,7 +328,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff
}
}
func (c *twoPhaseCommitter) prewriteMutations(bo *Backoffer, mutations CommitterMutations) error {
func (c *twoPhaseCommitter) prewriteMutations(bo *retry.Backoffer, mutations CommitterMutations) error {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("twoPhaseCommitter.prewriteMutations", opentracing.ChildOf(span.Context()))
defer span1.Finish()

View File

@ -0,0 +1,368 @@
// Copyright 2021 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package transaction
import (
"bytes"
"context"
"sync/atomic"
"time"
"github.com/pingcap/errors"
"github.com/tikv/client-go/v2/internal/locate"
"github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/internal/unionstore"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
)
// TxnProbe wraps a txn and exports internal states for testing purpose.
type TxnProbe struct {
*KVTxn
}
// SetStartTS resets the txn's start ts.
func (txn TxnProbe) SetStartTS(ts uint64) {
txn.startTS = ts
}
// GetCommitTS returns the commit ts.
func (txn TxnProbe) GetCommitTS() uint64 {
return txn.commitTS
}
// GetUnionStore returns transaction's embedded unionstore.
func (txn TxnProbe) GetUnionStore() *unionstore.KVUnionStore {
return txn.us
}
// IsAsyncCommit returns if the txn is committed using async commit.
func (txn TxnProbe) IsAsyncCommit() bool {
return txn.committer.isAsyncCommit()
}
// NewCommitter creates an committer.
func (txn TxnProbe) NewCommitter(sessionID uint64) (CommitterProbe, error) {
committer, err := newTwoPhaseCommitterWithInit(txn.KVTxn, sessionID)
return CommitterProbe{twoPhaseCommitter: committer}, err
}
// GetCommitter returns the transaction committer.
func (txn TxnProbe) GetCommitter() CommitterProbe {
return CommitterProbe{txn.committer}
}
// SetCommitter sets the bind committer of a transaction.
func (txn TxnProbe) SetCommitter(committer CommitterProbe) {
txn.committer = committer.twoPhaseCommitter
}
// CollectLockedKeys returns all locked keys of a transaction.
func (txn TxnProbe) CollectLockedKeys() [][]byte {
return txn.collectLockedKeys()
}
// BatchGetSingleRegion gets a batch of keys from a region.
func (txn TxnProbe) BatchGetSingleRegion(bo *retry.Backoffer, region locate.RegionVerID, keys [][]byte, collect func([]byte, []byte)) error {
snapshot := txnsnapshot.SnapshotProbe{KVSnapshot: txn.GetSnapshot()}
return snapshot.BatchGetSingleRegion(bo, region, keys, collect)
}
// NewScanner returns a scanner to iterate given key range.
func (txn TxnProbe) NewScanner(start, end []byte, batchSize int, reverse bool) (*txnsnapshot.Scanner, error) {
snapshot := txnsnapshot.SnapshotProbe{KVSnapshot: txn.GetSnapshot()}
return snapshot.NewScanner(start, end, batchSize, reverse)
}
// GetStartTime returns the time when txn starts.
func (txn TxnProbe) GetStartTime() time.Time {
return txn.startTime
}
func newTwoPhaseCommitterWithInit(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, error) {
c, err := newTwoPhaseCommitter(txn, sessionID)
if err != nil {
return nil, errors.Trace(err)
}
if err = c.initKeysAndMutations(); err != nil {
return nil, errors.Trace(err)
}
return c, nil
}
// CommitterProbe wraps a 2PC committer and exports internal states for testing purpose.
type CommitterProbe struct {
*twoPhaseCommitter
}
// InitKeysAndMutations prepares the committer for commit.
func (c CommitterProbe) InitKeysAndMutations() error {
return c.initKeysAndMutations()
}
// SetPrimaryKey resets the committer's commit ts.
func (c CommitterProbe) SetPrimaryKey(key []byte) {
c.primaryKey = key
}
// GetPrimaryKey returns primary key of the committer.
func (c CommitterProbe) GetPrimaryKey() []byte {
return c.primaryKey
}
// GetMutations returns the mutation buffer to commit.
func (c CommitterProbe) GetMutations() CommitterMutations {
return c.mutations
}
// SetMutations replace the mutation buffer.
func (c CommitterProbe) SetMutations(muts CommitterMutations) {
c.mutations = muts.(*memBufferMutations)
}
// SetCommitTS resets the committer's commit ts.
func (c CommitterProbe) SetCommitTS(ts uint64) {
atomic.StoreUint64(&c.commitTS, ts)
}
// GetCommitTS returns the commit ts of the committer.
func (c CommitterProbe) GetCommitTS() uint64 {
return atomic.LoadUint64(&c.commitTS)
}
// GetMinCommitTS returns the minimal commit ts can be used.
func (c CommitterProbe) GetMinCommitTS() uint64 {
return c.minCommitTS
}
// SetMinCommitTS sets the minimal commit ts can be used.
func (c CommitterProbe) SetMinCommitTS(ts uint64) {
c.minCommitTS = ts
}
// SetMaxCommitTS sets the max commit ts can be used.
func (c CommitterProbe) SetMaxCommitTS(ts uint64) {
c.maxCommitTS = ts
}
// SetSessionID sets the session id of the committer.
func (c CommitterProbe) SetSessionID(id uint64) {
c.sessionID = id
}
// GetForUpdateTS returns the pessimistic ForUpdate ts.
func (c CommitterProbe) GetForUpdateTS() uint64 {
return c.forUpdateTS
}
// SetForUpdateTS sets pessimistic ForUpdate ts.
func (c CommitterProbe) SetForUpdateTS(ts uint64) {
c.forUpdateTS = ts
}
// GetStartTS returns the start ts of the transaction.
func (c CommitterProbe) GetStartTS() uint64 {
return c.startTS
}
// GetLockTTL returns the lock ttl duration of the transaction.
func (c CommitterProbe) GetLockTTL() uint64 {
return c.lockTTL
}
// SetLockTTL sets the lock ttl duration.
func (c CommitterProbe) SetLockTTL(ttl uint64) {
c.lockTTL = ttl
}
// SetLockTTLByTimeAndSize sets the lock ttl duration by time and size.
func (c CommitterProbe) SetLockTTLByTimeAndSize(start time.Time, size int) {
c.lockTTL = txnLockTTL(start, size)
}
// SetTxnSize resets the txn size of the committer and updates lock TTL.
func (c CommitterProbe) SetTxnSize(sz int) {
c.txnSize = sz
c.lockTTL = txnLockTTL(c.txn.startTime, sz)
}
// SetUseAsyncCommit enables async commit feature.
func (c CommitterProbe) SetUseAsyncCommit() {
c.useAsyncCommit = 1
}
// Execute runs the commit process.
func (c CommitterProbe) Execute(ctx context.Context) error {
return c.execute(ctx)
}
// PrewriteAllMutations performs the first phase of commit.
func (c CommitterProbe) PrewriteAllMutations(ctx context.Context) error {
return c.PrewriteMutations(ctx, c.mutations)
}
// PrewriteMutations performs the first phase of commit for given keys.
func (c CommitterProbe) PrewriteMutations(ctx context.Context, mutations CommitterMutations) error {
return c.prewriteMutations(retry.NewBackofferWithVars(ctx, PrewriteMaxBackoff, nil), mutations)
}
// CommitMutations performs the second phase of commit.
func (c CommitterProbe) CommitMutations(ctx context.Context) error {
return c.commitMutations(retry.NewBackofferWithVars(ctx, int(atomic.LoadUint64(&CommitMaxBackoff)), nil), c.mutationsOfKeys([][]byte{c.primaryKey}))
}
// MutationsOfKeys returns mutations match the keys.
func (c CommitterProbe) MutationsOfKeys(keys [][]byte) CommitterMutations {
return c.mutationsOfKeys(keys)
}
// PessimisticRollbackMutations rolls mutations back.
func (c CommitterProbe) PessimisticRollbackMutations(ctx context.Context, muts CommitterMutations) error {
return c.pessimisticRollbackMutations(retry.NewBackofferWithVars(ctx, pessimisticRollbackMaxBackoff, nil), muts)
}
// Cleanup cleans dirty data of a committer.
func (c CommitterProbe) Cleanup(ctx context.Context) {
c.cleanup(ctx)
c.cleanWg.Wait()
}
// WaitCleanup waits for the committer to complete.
func (c CommitterProbe) WaitCleanup() {
c.cleanWg.Wait()
}
// IsOnePC returns if the committer is using one PC.
func (c CommitterProbe) IsOnePC() bool {
return c.isOnePC()
}
// BuildPrewriteRequest builds rpc request for mutation.
func (c CommitterProbe) BuildPrewriteRequest(regionID, regionConf, regionVersion uint64, mutations CommitterMutations, txnSize uint64) *tikvrpc.Request {
var batch batchMutations
batch.mutations = mutations
batch.region = locate.NewRegionVerID(regionID, regionConf, regionVersion)
for _, key := range mutations.GetKeys() {
if bytes.Equal(key, c.primary()) {
batch.isPrimary = true
break
}
}
return c.buildPrewriteRequest(batch, txnSize)
}
// IsAsyncCommit returns if the committer uses async commit.
func (c CommitterProbe) IsAsyncCommit() bool {
return c.isAsyncCommit()
}
// CheckAsyncCommit returns if async commit is available.
func (c CommitterProbe) CheckAsyncCommit() bool {
return c.checkAsyncCommit()
}
// GetOnePCCommitTS returns the commit ts of one pc.
func (c CommitterProbe) GetOnePCCommitTS() uint64 {
return c.onePCCommitTS
}
// IsTTLUninitialized returns if the TTL manager is uninitialized.
func (c CommitterProbe) IsTTLUninitialized() bool {
state := atomic.LoadUint32((*uint32)(&c.ttlManager.state))
return state == uint32(stateUninitialized)
}
// IsTTLRunning returns if the TTL manager is running state.
func (c CommitterProbe) IsTTLRunning() bool {
state := atomic.LoadUint32((*uint32)(&c.ttlManager.state))
return state == uint32(stateRunning)
}
// CloseTTLManager closes the TTL manager.
func (c CommitterProbe) CloseTTLManager() {
c.ttlManager.close()
}
// GetUndeterminedErr returns the encountered undetermined error (if any).
func (c CommitterProbe) GetUndeterminedErr() error {
c.mu.RLock()
defer c.mu.RUnlock()
return c.mu.undeterminedErr
}
// SetNoFallBack disallows async commit to fall back to normal mode.
func (c CommitterProbe) SetNoFallBack() {
c.testingKnobs.noFallBack = true
}
// SetPrimaryKeyBlocker is used to block committer after primary is sent.
func (c CommitterProbe) SetPrimaryKeyBlocker(ac, bk chan struct{}) {
c.testingKnobs.acAfterCommitPrimary = ac
c.testingKnobs.bkAfterCommitPrimary = bk
}
// CleanupMutations performs the clean up phase.
func (c CommitterProbe) CleanupMutations(ctx context.Context) error {
bo := retry.NewBackofferWithVars(ctx, cleanupMaxBackoff, nil)
return c.cleanupMutations(bo, c.mutations)
}
// SendTxnHeartBeat renews a txn's ttl.
func SendTxnHeartBeat(bo *retry.Backoffer, store kvstore, primary []byte, startTS, ttl uint64) (newTTL uint64, stopHeartBeat bool, err error) {
return sendTxnHeartBeat(bo, store, primary, startTS, ttl)
}
// ConfigProbe exposes configurations and global variables for testing purpose.
type ConfigProbe struct{}
// GetTxnCommitBatchSize returns the batch size to commit txn.
func (c ConfigProbe) GetTxnCommitBatchSize() uint64 {
return txnCommitBatchSize
}
// GetPessimisticLockMaxBackoff returns pessimisticLockMaxBackoff
func (c ConfigProbe) GetPessimisticLockMaxBackoff() int {
return pessimisticLockMaxBackoff
}
// GetDefaultLockTTL returns the default lock TTL.
func (c ConfigProbe) GetDefaultLockTTL() uint64 {
return defaultLockTTL
}
// GetTTLFactor returns the factor to calculate txn TTL.
func (c ConfigProbe) GetTTLFactor() int {
return ttlFactor
}
// LoadPreSplitDetectThreshold returns presplit detect threshold config.
func (c ConfigProbe) LoadPreSplitDetectThreshold() uint32 {
return atomic.LoadUint32(&preSplitDetectThreshold)
}
// StorePreSplitDetectThreshold updates presplit detect threshold config.
func (c ConfigProbe) StorePreSplitDetectThreshold(v uint32) {
atomic.StoreUint32(&preSplitDetectThreshold, v)
}
// LoadPreSplitSizeThreshold returns presplit size threshold config.
func (c ConfigProbe) LoadPreSplitSizeThreshold() uint32 {
return atomic.LoadUint32(&preSplitSizeThreshold)
}
// StorePreSplitSizeThreshold updates presplit size threshold config.
func (c ConfigProbe) StorePreSplitSizeThreshold(v uint32) {
atomic.StoreUint32(&preSplitSizeThreshold, v)
}

View File

@ -30,7 +30,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package tikv
package transaction
import (
"bytes"
@ -56,7 +56,6 @@ import (
"github.com/tikv/client-go/v2/internal/unionstore"
tikv "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
"github.com/tikv/client-go/v2/txnkv/txnutil"
"github.com/tikv/client-go/v2/util"
@ -74,36 +73,11 @@ type SchemaAmender interface {
AmendTxn(ctx context.Context, startInfoSchema SchemaVer, change *RelatedSchemaChange, mutations CommitterMutations) (CommitterMutations, error)
}
// StartTSOption indicates the option when beginning a transaction
// `TxnScope` must be set for each object
// Every other fields are optional, but currently at most one of them can be set
type StartTSOption struct {
TxnScope string
StartTS *uint64
}
// DefaultStartTSOption creates a default StartTSOption, ie. Work in GlobalTxnScope and get start ts when got used
func DefaultStartTSOption() StartTSOption {
return StartTSOption{TxnScope: oracle.GlobalTxnScope}
}
// SetStartTS returns a new StartTSOption with StartTS set to the given startTS
func (to StartTSOption) SetStartTS(startTS uint64) StartTSOption {
to.StartTS = &startTS
return to
}
// SetTxnScope returns a new StartTSOption with TxnScope set to txnScope
func (to StartTSOption) SetTxnScope(txnScope string) StartTSOption {
to.TxnScope = txnScope
return to
}
// KVTxn contains methods to interact with a TiKV transaction.
type KVTxn struct {
snapshot *txnsnapshot.KVSnapshot
us *unionstore.KVUnionStore
store *KVStore // for connection to region.
store kvstore // for connection to region.
startTS uint64
startTime time.Time // Monotonic timestamp for recording txn time consuming.
commitTS uint64
@ -135,24 +109,8 @@ type KVTxn struct {
resourceGroupTag []byte
}
// ExtractStartTS use `option` to get the proper startTS for a transaction.
func ExtractStartTS(store *KVStore, option StartTSOption) (uint64, error) {
if option.StartTS != nil {
return *option.StartTS, nil
}
bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil)
return store.getTimestampWithRetry(bo, option.TxnScope)
}
func newTiKVTxnWithOptions(store *KVStore, options StartTSOption) (*KVTxn, error) {
if options.TxnScope == "" {
options.TxnScope = oracle.GlobalTxnScope
}
startTS, err := ExtractStartTS(store, options)
if err != nil {
return nil, errors.Trace(err)
}
snapshot := txnsnapshot.NewTiKVSnapshot(store, startTS, store.nextReplicaReadSeed())
// NewTiKVTxn creates a new KVTxn.
func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64, scope string) (*KVTxn, error) {
cfg := config.GetGlobalConfig()
newTiKVTxn := &KVTxn{
snapshot: snapshot,
@ -162,7 +120,7 @@ func newTiKVTxnWithOptions(store *KVStore, options StartTSOption) (*KVTxn, error
startTime: time.Now(),
valid: true,
vars: tikv.DefaultVars,
scope: options.TxnScope,
scope: scope,
enableAsyncCommit: cfg.EnableAsyncCommit,
enable1PC: cfg.Enable1PC,
}
@ -224,12 +182,12 @@ func (txn *KVTxn) String() string {
// If such entry is not found, it returns an invalid Iterator with no error.
// It yields only keys that < upperBound. If upperBound is nil, it means the upperBound is unbounded.
// The Iterator must be Closed after use.
func (txn *KVTxn) Iter(k []byte, upperBound []byte) (Iterator, error) {
func (txn *KVTxn) Iter(k []byte, upperBound []byte) (unionstore.Iterator, error) {
return txn.us.Iter(k, upperBound)
}
// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
func (txn *KVTxn) IterReverse(k []byte) (Iterator, error) {
func (txn *KVTxn) IterReverse(k []byte) (unionstore.Iterator, error) {
return txn.us.IterReverse(k)
}
@ -397,7 +355,7 @@ func (txn *KVTxn) Commit(ctx context.Context) error {
}()
// latches disabled
// pessimistic transaction should also bypass latch.
if txn.store.txnLatches == nil || txn.IsPessimistic() {
if txn.store.TxnLatches() == nil || txn.IsPessimistic() {
err = committer.execute(ctx)
if val == nil || sessionID > 0 {
txn.onCommitted(err)
@ -409,13 +367,13 @@ func (txn *KVTxn) Commit(ctx context.Context) error {
// latches enabled
// for transactions which need to acquire latches
start = time.Now()
lock := txn.store.txnLatches.Lock(committer.startTS, committer.mutations.GetKeys())
lock := txn.store.TxnLatches().Lock(committer.startTS, committer.mutations.GetKeys())
commitDetail := committer.getDetail()
commitDetail.LocalLatchTime = time.Since(start)
if commitDetail.LocalLatchTime > 0 {
metrics.TiKVLocalLatchWaitTimeHistogram.Observe(commitDetail.LocalLatchTime.Seconds())
}
defer txn.store.txnLatches.UnLock(lock)
defer txn.store.TxnLatches().UnLock(lock)
if lock.IsStale() {
return &tikverr.ErrWriteConflictInLatch{StartTS: txn.startTS}
}
@ -521,8 +479,8 @@ func (txn *KVTxn) onCommitted(err error) {
func (txn *KVTxn) LockKeysWithWaitTime(ctx context.Context, lockWaitTime int64, keysInput ...[]byte) (err error) {
forUpdateTs := txn.startTS
if txn.IsPessimistic() {
bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil)
forUpdateTs, err = txn.store.getTimestampWithRetry(bo, txn.scope)
bo := retry.NewBackofferWithVars(context.Background(), TsoMaxBackoff, nil)
forUpdateTs, err = txn.store.GetTimestampWithRetry(bo, txn.scope)
if err != nil {
return err
}
@ -787,7 +745,7 @@ func (txn *KVTxn) GetUnionStore() *unionstore.KVUnionStore {
}
// GetMemBuffer return the MemBuffer binding to this transaction.
func (txn *KVTxn) GetMemBuffer() *MemDB {
func (txn *KVTxn) GetMemBuffer() *unionstore.MemDB {
return txn.us.GetMemBuffer()
}
@ -806,5 +764,5 @@ func (txn *KVTxn) SetBinlogExecutor(binlog BinlogExecutor) {
// GetClusterID returns store's cluster id.
func (txn *KVTxn) GetClusterID() uint64 {
return txn.store.clusterID
return txn.store.GetClusterID()
}

View File

@ -0,0 +1,38 @@
// Copyright 2021 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package txnkv
import "github.com/tikv/client-go/v2/txnkv/transaction"
// KVTxn contains methods to interact with a TiKV transaction.
type KVTxn = transaction.KVTxn
// BinlogWriteResult defines the result of prewrite binlog.
type BinlogWriteResult = transaction.BinlogWriteResult
// KVFilter is a filter that filters out unnecessary KV pairs.
type KVFilter = transaction.KVFilter
// SchemaLeaseChecker is used to validate schema version is not changed during transaction execution.
type SchemaLeaseChecker = transaction.SchemaLeaseChecker
// SchemaVer is the infoSchema which will return the schema version.
type SchemaVer = transaction.SchemaVer
// SchemaAmender is used by pessimistic transactions to amend commit mutations for schema change during 2pc.
type SchemaAmender = transaction.SchemaAmender
// MaxTxnTimeUse is the max time a Txn may use (in ms) from its begin to commit.
// We use it to abort the transaction to guarantee GC worker will not influence it.
const MaxTxnTimeUse = transaction.MaxTxnTimeUse