mirror of https://github.com/tikv/client-go.git
666 lines
20 KiB
Go
666 lines
20 KiB
Go
// Copyright 2021 TiKV Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// NOTE: The code in this file is based on code from the
|
|
// TiDB project, licensed under the Apache License v 2.0
|
|
//
|
|
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/tests/async_commit_test.go
|
|
//
|
|
|
|
// Copyright 2020 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package tikv_test
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
|
"github.com/pingcap/tidb/pkg/store/mockstore/unistore"
|
|
"github.com/pkg/errors"
|
|
"github.com/stretchr/testify/suite"
|
|
tikverr "github.com/tikv/client-go/v2/error"
|
|
"github.com/tikv/client-go/v2/kv"
|
|
"github.com/tikv/client-go/v2/oracle"
|
|
"github.com/tikv/client-go/v2/testutils"
|
|
"github.com/tikv/client-go/v2/tikv"
|
|
"github.com/tikv/client-go/v2/tikvrpc"
|
|
"github.com/tikv/client-go/v2/txnkv"
|
|
"github.com/tikv/client-go/v2/txnkv/transaction"
|
|
"github.com/tikv/client-go/v2/txnkv/txnlock"
|
|
"github.com/tikv/client-go/v2/util"
|
|
)
|
|
|
|
func TestAsyncCommit(t *testing.T) {
|
|
suite.Run(t, new(testAsyncCommitSuite))
|
|
}
|
|
|
|
// testAsyncCommitCommon is used to put common parts that will be both used by
|
|
// testAsyncCommitSuite and testAsyncCommitFailSuite.
|
|
type testAsyncCommitCommon struct {
|
|
suite.Suite
|
|
cluster testutils.Cluster
|
|
store *tikv.KVStore
|
|
}
|
|
|
|
// TODO(youjiali1995): remove it after updating TiDB.
|
|
type unistoreClientWrapper struct {
|
|
*unistore.RPCClient
|
|
}
|
|
|
|
func (c *unistoreClientWrapper) SetEventListener(listener tikv.ClientEventListener) {}
|
|
|
|
func (s *testAsyncCommitCommon) setUpTest() {
|
|
if *withTiKV {
|
|
s.store = NewTestStore(s.T())
|
|
return
|
|
}
|
|
|
|
client, pdClient, cluster, err := unistore.New("", nil)
|
|
s.Require().Nil(err)
|
|
|
|
unistore.BootstrapWithSingleStore(cluster)
|
|
s.cluster = cluster
|
|
store, err := tikv.NewTestTiKVStore(fpClient{Client: &unistoreClientWrapper{client}}, pdClient, nil, nil, 0)
|
|
s.Require().Nil(err)
|
|
|
|
s.store = store
|
|
}
|
|
|
|
func (s *testAsyncCommitCommon) tearDownTest() {
|
|
s.store.Close()
|
|
}
|
|
|
|
func (s *testAsyncCommitCommon) putAlphabets(enableAsyncCommit bool) {
|
|
for ch := byte('a'); ch <= byte('z'); ch++ {
|
|
s.putKV([]byte{ch}, []byte{ch}, enableAsyncCommit)
|
|
}
|
|
}
|
|
|
|
func (s *testAsyncCommitCommon) putKV(key, value []byte, enableAsyncCommit bool) (uint64, uint64) {
|
|
txn := s.beginAsyncCommit()
|
|
err := txn.Set(key, value)
|
|
s.Nil(err)
|
|
err = txn.Commit(context.Background())
|
|
s.Nil(err)
|
|
return txn.StartTS(), txn.CommitTS()
|
|
}
|
|
|
|
func (s *testAsyncCommitCommon) mustGetFromTxn(txn transaction.TxnProbe, key, expectedValue []byte) {
|
|
v, err := txn.Get(context.Background(), key)
|
|
s.Nil(err)
|
|
s.Equal(v, expectedValue)
|
|
}
|
|
|
|
func (s *testAsyncCommitCommon) mustGetLock(key []byte) *txnkv.Lock {
|
|
ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
|
|
s.Nil(err)
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
|
|
Key: key,
|
|
Version: ver,
|
|
})
|
|
bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil)
|
|
loc, err := s.store.GetRegionCache().LocateKey(bo, key)
|
|
s.Nil(err)
|
|
resp, err := s.store.SendReq(bo, req, loc.Region, time.Second*10)
|
|
s.Nil(err)
|
|
s.NotNil(resp.Resp)
|
|
keyErr := resp.Resp.(*kvrpcpb.GetResponse).GetError()
|
|
s.NotNil(keyErr)
|
|
lock, err := txnlock.ExtractLockFromKeyErr(keyErr)
|
|
s.Nil(err)
|
|
return lock
|
|
}
|
|
|
|
func (s *testAsyncCommitCommon) mustPointGet(key, expectedValue []byte) {
|
|
snap := s.store.GetSnapshot(math.MaxUint64)
|
|
value, err := snap.Get(context.Background(), key)
|
|
s.Nil(err)
|
|
s.Equal(value, expectedValue)
|
|
}
|
|
|
|
func (s *testAsyncCommitCommon) mustGetFromSnapshot(version uint64, key, expectedValue []byte) {
|
|
snap := s.store.GetSnapshot(version)
|
|
value, err := snap.Get(context.Background(), key)
|
|
s.Nil(err)
|
|
s.Equal(value, expectedValue)
|
|
}
|
|
|
|
func (s *testAsyncCommitCommon) mustGetNoneFromSnapshot(version uint64, key []byte) {
|
|
snap := s.store.GetSnapshot(version)
|
|
_, err := snap.Get(context.Background(), key)
|
|
s.Equal(errors.Cause(err), tikverr.ErrNotExist)
|
|
}
|
|
|
|
func (s *testAsyncCommitCommon) beginAsyncCommitWithLinearizability() transaction.TxnProbe {
|
|
txn := s.beginAsyncCommit()
|
|
txn.SetCausalConsistency(false)
|
|
return txn
|
|
}
|
|
|
|
func (s *testAsyncCommitCommon) beginAsyncCommit() transaction.TxnProbe {
|
|
txn, err := s.store.Begin()
|
|
s.Nil(err)
|
|
txn.SetEnableAsyncCommit(true)
|
|
return transaction.TxnProbe{KVTxn: txn}
|
|
}
|
|
|
|
func (s *testAsyncCommitCommon) begin() transaction.TxnProbe {
|
|
txn, err := s.store.Begin()
|
|
s.Nil(err)
|
|
return transaction.TxnProbe{KVTxn: txn}
|
|
}
|
|
|
|
func (s *testAsyncCommitCommon) begin1PC() transaction.TxnProbe {
|
|
txn, err := s.store.Begin()
|
|
s.Nil(err)
|
|
txn.SetEnable1PC(true)
|
|
return transaction.TxnProbe{KVTxn: txn}
|
|
}
|
|
|
|
type testAsyncCommitSuite struct {
|
|
testAsyncCommitCommon
|
|
bo *tikv.Backoffer
|
|
}
|
|
|
|
func (s *testAsyncCommitSuite) SetupTest() {
|
|
s.testAsyncCommitCommon.setUpTest()
|
|
s.bo = tikv.NewBackofferWithVars(context.Background(), 5000, nil)
|
|
}
|
|
|
|
func (s *testAsyncCommitSuite) TearDownTest() {
|
|
s.testAsyncCommitCommon.tearDownTest()
|
|
}
|
|
|
|
func (s *testAsyncCommitSuite) lockKeysWithAsyncCommit(keys, values [][]byte, primaryKey, primaryValue []byte, commitPrimary bool) (uint64, uint64) {
|
|
txn, err := s.store.Begin()
|
|
s.Nil(err)
|
|
txn.SetEnableAsyncCommit(true)
|
|
for i, k := range keys {
|
|
if len(values[i]) > 0 {
|
|
err = txn.Set(k, values[i])
|
|
} else {
|
|
err = txn.Delete(k)
|
|
}
|
|
s.Nil(err)
|
|
}
|
|
if len(primaryValue) > 0 {
|
|
err = txn.Set(primaryKey, primaryValue)
|
|
} else {
|
|
err = txn.Delete(primaryKey)
|
|
}
|
|
s.Nil(err)
|
|
txnProbe := transaction.TxnProbe{KVTxn: txn}
|
|
tpc, err := txnProbe.NewCommitter(0)
|
|
s.Nil(err)
|
|
tpc.SetPrimaryKey(primaryKey)
|
|
tpc.SetUseAsyncCommit()
|
|
|
|
ctx := context.Background()
|
|
err = tpc.PrewriteAllMutations(ctx)
|
|
s.Nil(err)
|
|
|
|
if commitPrimary {
|
|
commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
|
s.Nil(err)
|
|
tpc.SetCommitTS(commitTS)
|
|
err = tpc.CommitMutations(ctx)
|
|
s.Nil(err)
|
|
}
|
|
return txn.StartTS(), tpc.GetCommitTS()
|
|
}
|
|
|
|
func (s *testAsyncCommitSuite) TestCheckSecondaries() {
|
|
// This test doesn't support tikv mode.
|
|
if *withTiKV {
|
|
return
|
|
}
|
|
|
|
s.putAlphabets(true)
|
|
|
|
loc, err := s.store.GetRegionCache().LocateKey(s.bo, []byte("a"))
|
|
s.Nil(err)
|
|
newRegionID, peerID := s.cluster.AllocID(), s.cluster.AllocID()
|
|
s.cluster.Split(loc.Region.GetID(), newRegionID, []byte("e"), []uint64{peerID}, peerID)
|
|
s.store.GetRegionCache().InvalidateCachedRegion(loc.Region)
|
|
|
|
// No locks to check, only primary key is locked, should be successful.
|
|
s.lockKeysWithAsyncCommit([][]byte{}, [][]byte{}, []byte("z"), []byte("z"), false)
|
|
lock := s.mustGetLock([]byte("z"))
|
|
lock.UseAsyncCommit = true
|
|
ts, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
|
s.Nil(err)
|
|
var lockutil txnlock.LockProbe
|
|
status := lockutil.NewLockStatus(nil, true, ts)
|
|
|
|
resolver := tikv.NewLockResolverProb(s.store.GetLockResolver())
|
|
err = resolver.ResolveAsyncCommitLock(s.bo, lock, status)
|
|
s.Nil(err)
|
|
currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
|
s.Nil(err)
|
|
status, err = resolver.GetTxnStatus(s.bo, lock.TxnID, []byte("z"), currentTS, currentTS, true, false, nil)
|
|
s.Nil(err)
|
|
s.True(status.IsCommitted())
|
|
s.Equal(status.CommitTS(), ts)
|
|
|
|
// One key is committed (i), one key is locked (a). Should get committed.
|
|
ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
|
s.Nil(err)
|
|
commitTs := ts + 10
|
|
|
|
gotCheckA := int64(0)
|
|
gotCheckB := int64(0)
|
|
gotResolve := int64(0)
|
|
gotOther := int64(0)
|
|
mock := mockResolveClient{
|
|
Client: s.store.GetTiKVClient(),
|
|
onCheckSecondaries: func(req *kvrpcpb.CheckSecondaryLocksRequest) (*tikvrpc.Response, error) {
|
|
if req.StartVersion != ts {
|
|
return nil, errors.Errorf("Bad start version: %d, expected: %d", req.StartVersion, ts)
|
|
}
|
|
var resp kvrpcpb.CheckSecondaryLocksResponse
|
|
for _, k := range req.Keys {
|
|
if bytes.Equal(k, []byte("a")) {
|
|
atomic.StoreInt64(&gotCheckA, 1)
|
|
|
|
resp = kvrpcpb.CheckSecondaryLocksResponse{
|
|
Locks: []*kvrpcpb.LockInfo{{Key: []byte("a"), PrimaryLock: []byte("z"), LockVersion: ts, UseAsyncCommit: true}},
|
|
CommitTs: commitTs,
|
|
}
|
|
} else if bytes.Equal(k, []byte("i")) {
|
|
atomic.StoreInt64(&gotCheckB, 1)
|
|
|
|
resp = kvrpcpb.CheckSecondaryLocksResponse{
|
|
Locks: []*kvrpcpb.LockInfo{},
|
|
CommitTs: commitTs,
|
|
}
|
|
} else {
|
|
fmt.Printf("Got other key: %s\n", k)
|
|
atomic.StoreInt64(&gotOther, 1)
|
|
}
|
|
}
|
|
return &tikvrpc.Response{Resp: &resp}, nil
|
|
},
|
|
onResolveLock: func(req *kvrpcpb.ResolveLockRequest) (*tikvrpc.Response, error) {
|
|
if req.StartVersion != ts {
|
|
return nil, errors.Errorf("Bad start version: %d, expected: %d", req.StartVersion, ts)
|
|
}
|
|
if req.CommitVersion != commitTs {
|
|
return nil, errors.Errorf("Bad commit version: %d, expected: %d", req.CommitVersion, commitTs)
|
|
}
|
|
for _, k := range req.Keys {
|
|
if bytes.Equal(k, []byte("a")) || bytes.Equal(k, []byte("z")) {
|
|
atomic.StoreInt64(&gotResolve, 1)
|
|
} else {
|
|
atomic.StoreInt64(&gotOther, 1)
|
|
}
|
|
}
|
|
resp := kvrpcpb.ResolveLockResponse{}
|
|
return &tikvrpc.Response{Resp: &resp}, nil
|
|
},
|
|
}
|
|
s.store.SetTiKVClient(&mock)
|
|
|
|
status = lockutil.NewLockStatus([][]byte{[]byte("a"), []byte("i")}, true, 0)
|
|
lock = &txnkv.Lock{
|
|
Key: []byte("a"),
|
|
Primary: []byte("z"),
|
|
TxnID: ts,
|
|
LockType: kvrpcpb.Op_Put,
|
|
UseAsyncCommit: true,
|
|
MinCommitTS: ts + 5,
|
|
}
|
|
|
|
_ = s.beginAsyncCommit()
|
|
|
|
err = resolver.ResolveAsyncCommitLock(s.bo, lock, status)
|
|
s.Nil(err)
|
|
s.Equal(gotCheckA, int64(1))
|
|
s.Equal(gotCheckB, int64(1))
|
|
s.Equal(gotOther, int64(0))
|
|
s.Equal(gotResolve, int64(1))
|
|
|
|
// One key has been rolled back (b), one is locked (a). Should be rolled back.
|
|
ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
|
s.Nil(err)
|
|
commitTs = ts + 10
|
|
|
|
gotCheckA = int64(0)
|
|
gotCheckB = int64(0)
|
|
gotResolve = int64(0)
|
|
gotOther = int64(0)
|
|
mock.onResolveLock = func(req *kvrpcpb.ResolveLockRequest) (*tikvrpc.Response, error) {
|
|
if req.StartVersion != ts {
|
|
return nil, errors.Errorf("Bad start version: %d, expected: %d", req.StartVersion, ts)
|
|
}
|
|
if req.CommitVersion != commitTs {
|
|
return nil, errors.Errorf("Bad commit version: %d, expected: 0", req.CommitVersion)
|
|
}
|
|
for _, k := range req.Keys {
|
|
if bytes.Equal(k, []byte("a")) || bytes.Equal(k, []byte("z")) {
|
|
atomic.StoreInt64(&gotResolve, 1)
|
|
} else {
|
|
atomic.StoreInt64(&gotOther, 1)
|
|
}
|
|
}
|
|
resp := kvrpcpb.ResolveLockResponse{}
|
|
return &tikvrpc.Response{Resp: &resp}, nil
|
|
}
|
|
|
|
lock.TxnID = ts
|
|
lock.MinCommitTS = ts + 5
|
|
|
|
err = resolver.ResolveAsyncCommitLock(s.bo, lock, status)
|
|
s.Nil(err)
|
|
s.Equal(gotCheckA, int64(1))
|
|
s.Equal(gotCheckB, int64(1))
|
|
s.Equal(gotResolve, int64(1))
|
|
s.Equal(gotOther, int64(0))
|
|
}
|
|
|
|
func (s *testAsyncCommitSuite) TestRepeatableRead() {
|
|
var sessionID uint64 = 0
|
|
test := func(isPessimistic bool) {
|
|
s.putKV([]byte("k1"), []byte("v1"), true)
|
|
|
|
sessionID++
|
|
ctx := context.WithValue(context.Background(), util.SessionID, sessionID)
|
|
txn1 := s.beginAsyncCommit()
|
|
txn1.SetPessimistic(isPessimistic)
|
|
s.mustGetFromTxn(txn1, []byte("k1"), []byte("v1"))
|
|
txn1.Set([]byte("k1"), []byte("v2"))
|
|
|
|
for i := 0; i < 20; i++ {
|
|
_, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
|
s.Nil(err)
|
|
}
|
|
|
|
txn2 := s.beginAsyncCommit()
|
|
s.mustGetFromTxn(txn2, []byte("k1"), []byte("v1"))
|
|
|
|
err := txn1.Commit(ctx)
|
|
s.Nil(err)
|
|
// Check txn1 is committed in async commit.
|
|
s.True(txn1.IsAsyncCommit())
|
|
s.mustGetFromTxn(txn2, []byte("k1"), []byte("v1"))
|
|
err = txn2.Rollback()
|
|
s.Nil(err)
|
|
|
|
txn3 := s.beginAsyncCommit()
|
|
s.mustGetFromTxn(txn3, []byte("k1"), []byte("v2"))
|
|
err = txn3.Rollback()
|
|
s.Nil(err)
|
|
}
|
|
|
|
test(false)
|
|
test(true)
|
|
}
|
|
|
|
// It's just a simple validation of linearizability.
|
|
// Extra tests are needed to test this feature with the control of the TiKV cluster.
|
|
func (s *testAsyncCommitSuite) TestAsyncCommitLinearizability() {
|
|
t1 := s.beginAsyncCommitWithLinearizability()
|
|
t2 := s.beginAsyncCommitWithLinearizability()
|
|
err := t1.Set([]byte("a"), []byte("a1"))
|
|
s.Nil(err)
|
|
err = t2.Set([]byte("b"), []byte("b1"))
|
|
s.Nil(err)
|
|
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
|
|
// t2 commits earlier than t1
|
|
err = t2.Commit(ctx)
|
|
s.Nil(err)
|
|
err = t1.Commit(ctx)
|
|
s.Nil(err)
|
|
commitTS1 := t1.CommitTS()
|
|
commitTS2 := t2.CommitTS()
|
|
s.Less(commitTS2, commitTS1)
|
|
}
|
|
|
|
// TestAsyncCommitWithMultiDC tests that async commit can only be enabled in global transactions
|
|
func (s *testAsyncCommitSuite) TestAsyncCommitWithMultiDC() {
|
|
// It requires setting placement rules to run with TiKV
|
|
if *withTiKV {
|
|
return
|
|
}
|
|
|
|
localTxn := s.beginAsyncCommit()
|
|
err := localTxn.Set([]byte("a"), []byte("a1"))
|
|
localTxn.SetScope("bj")
|
|
s.Nil(err)
|
|
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
|
|
err = localTxn.Commit(ctx)
|
|
s.Nil(err)
|
|
s.False(localTxn.IsAsyncCommit())
|
|
|
|
globalTxn := s.beginAsyncCommit()
|
|
err = globalTxn.Set([]byte("b"), []byte("b1"))
|
|
globalTxn.SetScope(oracle.GlobalTxnScope)
|
|
s.Nil(err)
|
|
err = globalTxn.Commit(ctx)
|
|
s.Nil(err)
|
|
s.True(globalTxn.IsAsyncCommit())
|
|
}
|
|
|
|
func (s *testAsyncCommitSuite) TestResolveTxnFallbackFromAsyncCommit() {
|
|
keys := [][]byte{[]byte("k0"), []byte("k1")}
|
|
values := [][]byte{[]byte("v00"), []byte("v10")}
|
|
initTest := func() transaction.CommitterProbe {
|
|
t0 := s.begin()
|
|
err := t0.Set(keys[0], values[0])
|
|
s.Nil(err)
|
|
err = t0.Set(keys[1], values[1])
|
|
s.Nil(err)
|
|
err = t0.Commit(context.Background())
|
|
s.Nil(err)
|
|
|
|
t1 := s.beginAsyncCommit()
|
|
err = t1.Set(keys[0], []byte("v01"))
|
|
s.Nil(err)
|
|
err = t1.Set(keys[1], []byte("v11"))
|
|
s.Nil(err)
|
|
|
|
committer, err := t1.NewCommitter(1)
|
|
s.Nil(err)
|
|
committer.SetLockTTL(1)
|
|
committer.SetUseAsyncCommit()
|
|
return committer
|
|
}
|
|
prewriteKey := func(committer transaction.CommitterProbe, idx int, fallback bool) {
|
|
bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil)
|
|
loc, err := s.store.GetRegionCache().LocateKey(bo, keys[idx])
|
|
s.Nil(err)
|
|
req := committer.BuildPrewriteRequest(loc.Region.GetID(), loc.Region.GetConfVer(), loc.Region.GetVer(),
|
|
committer.GetMutations().Slice(idx, idx+1), 1)
|
|
if fallback {
|
|
req.Req.(*kvrpcpb.PrewriteRequest).MaxCommitTs = 1
|
|
}
|
|
resp, err := s.store.SendReq(bo, req, loc.Region, 5000)
|
|
s.Nil(err)
|
|
s.NotNil(resp.Resp)
|
|
}
|
|
readKey := func(idx int) {
|
|
t2 := s.begin()
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
|
defer cancel()
|
|
val, err := t2.Get(ctx, keys[idx])
|
|
s.Nil(err)
|
|
s.Equal(val, values[idx])
|
|
}
|
|
|
|
// Case 1: Fallback primary, read primary
|
|
committer := initTest()
|
|
prewriteKey(committer, 0, true)
|
|
prewriteKey(committer, 1, false)
|
|
readKey(0)
|
|
readKey(1)
|
|
|
|
// Case 2: Fallback primary, read secondary
|
|
committer = initTest()
|
|
prewriteKey(committer, 0, true)
|
|
prewriteKey(committer, 1, false)
|
|
readKey(1)
|
|
readKey(0)
|
|
|
|
// Case 3: Fallback secondary, read primary
|
|
committer = initTest()
|
|
prewriteKey(committer, 0, false)
|
|
prewriteKey(committer, 1, true)
|
|
readKey(0)
|
|
readKey(1)
|
|
|
|
// Case 4: Fallback secondary, read secondary
|
|
committer = initTest()
|
|
prewriteKey(committer, 0, false)
|
|
prewriteKey(committer, 1, true)
|
|
readKey(1)
|
|
readKey(0)
|
|
|
|
// Case 5: Fallback both, read primary
|
|
committer = initTest()
|
|
prewriteKey(committer, 0, true)
|
|
prewriteKey(committer, 1, true)
|
|
readKey(0)
|
|
readKey(1)
|
|
|
|
// Case 6: Fallback both, read secondary
|
|
committer = initTest()
|
|
prewriteKey(committer, 0, true)
|
|
prewriteKey(committer, 1, true)
|
|
readKey(1)
|
|
readKey(0)
|
|
}
|
|
|
|
type mockResolveClient struct {
|
|
tikv.Client
|
|
onResolveLock func(*kvrpcpb.ResolveLockRequest) (*tikvrpc.Response, error)
|
|
onCheckSecondaries func(*kvrpcpb.CheckSecondaryLocksRequest) (*tikvrpc.Response, error)
|
|
}
|
|
|
|
func (m *mockResolveClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
|
|
// Intercept check secondary locks and resolve lock messages if the callback is non-nil.
|
|
// If the callback returns (nil, nil), forward to the inner client.
|
|
if cr, ok := req.Req.(*kvrpcpb.CheckSecondaryLocksRequest); ok && m.onCheckSecondaries != nil {
|
|
result, err := m.onCheckSecondaries(cr)
|
|
if result != nil || err != nil {
|
|
return result, err
|
|
}
|
|
} else if rr, ok := req.Req.(*kvrpcpb.ResolveLockRequest); ok && m.onResolveLock != nil {
|
|
result, err := m.onResolveLock(rr)
|
|
if result != nil || err != nil {
|
|
return result, err
|
|
}
|
|
}
|
|
return m.Client.SendRequest(ctx, addr, req, timeout)
|
|
}
|
|
|
|
// TestPessimisticTxnResolveAsyncCommitLock tests that pessimistic transactions resolve non-expired async-commit locks during the prewrite phase.
|
|
// Pessimistic transactions will resolve locks immediately during the prewrite phase because of the special logic for handling non-pessimistic lock conflict.
|
|
// However, async-commit locks can't be resolved until they expire. This test covers it.
|
|
func (s *testAsyncCommitSuite) TestPessimisticTxnResolveAsyncCommitLock() {
|
|
ctx := context.Background()
|
|
k := []byte("k")
|
|
|
|
// Lock the key with an async-commit lock.
|
|
s.lockKeysWithAsyncCommit([][]byte{}, [][]byte{}, k, k, false)
|
|
|
|
txn, err := s.store.Begin()
|
|
s.Nil(err)
|
|
txn.SetPessimistic(true)
|
|
err = txn.LockKeys(ctx, &kv.LockCtx{ForUpdateTS: txn.StartTS()}, []byte("k1"))
|
|
s.Nil(err)
|
|
|
|
txn.Set(k, k)
|
|
err = txn.Commit(context.Background())
|
|
s.Nil(err)
|
|
}
|
|
|
|
func (s *testAsyncCommitSuite) TestRollbackAsyncCommitEnforcesFallback() {
|
|
// This test doesn't support tikv mode.
|
|
|
|
t1 := s.beginAsyncCommit()
|
|
t1.SetPessimistic(true)
|
|
t1.Set([]byte("a"), []byte("a"))
|
|
t1.Set([]byte("z"), []byte("z"))
|
|
committer, err := t1.NewCommitter(1)
|
|
s.Nil(err)
|
|
committer.SetUseAsyncCommit()
|
|
committer.SetLockTTL(1000)
|
|
committer.SetMaxCommitTS(oracle.ComposeTS(oracle.ExtractPhysical(committer.GetStartTS())+1500, 0))
|
|
committer.PrewriteMutations(context.Background(), committer.GetMutations().Slice(0, 1))
|
|
s.True(committer.IsAsyncCommit())
|
|
lock := s.mustGetLock([]byte("a"))
|
|
resolver := tikv.NewLockResolverProb(s.store.GetLockResolver())
|
|
for {
|
|
currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
|
s.Nil(err)
|
|
status, err := resolver.GetTxnStatus(s.bo, lock.TxnID, []byte("a"), currentTS, currentTS, false, false, nil)
|
|
s.Nil(err)
|
|
if status.IsRolledBack() {
|
|
break
|
|
}
|
|
time.Sleep(time.Millisecond * 30)
|
|
}
|
|
s.True(committer.IsAsyncCommit())
|
|
committer.PrewriteMutations(context.Background(), committer.GetMutations().Slice(1, 2))
|
|
s.False(committer.IsAsyncCommit())
|
|
}
|
|
|
|
func (s *testAsyncCommitSuite) TestAsyncCommitLifecycleHooks() {
|
|
reachedPre := atomic.Bool{}
|
|
reachedPost := atomic.Bool{}
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
t1 := s.beginAsyncCommit()
|
|
t1.SetBackgroundGoroutineLifecycleHooks(transaction.LifecycleHooks{
|
|
Pre: func() {
|
|
wg.Add(1)
|
|
|
|
reachedPre.Store(true)
|
|
},
|
|
Post: func() {
|
|
s.Equal(reachedPre.Load(), true)
|
|
reachedPost.Store(true)
|
|
|
|
wg.Done()
|
|
},
|
|
})
|
|
t1.Set([]byte("a"), []byte("a"))
|
|
t1.Set([]byte("z"), []byte("z"))
|
|
s.Nil(t1.Commit(context.Background()))
|
|
|
|
s.Equal(reachedPre.Load(), true)
|
|
s.Equal(reachedPost.Load(), false)
|
|
wg.Wait()
|
|
s.Equal(reachedPost.Load(), true)
|
|
}
|