// Copyright 2021 TiKV Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // NOTE: The code in this file is based on code from the // TiDB project, licensed under the Apache License v 2.0 // // https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/tests/async_commit_test.go // // Copyright 2020 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package tikv_test import ( "bytes" "context" "fmt" "math" "sync" "sync/atomic" "testing" "time" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/pkg/store/mockstore/unistore" "github.com/pkg/errors" "github.com/stretchr/testify/suite" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv" "github.com/tikv/client-go/v2/txnkv/transaction" "github.com/tikv/client-go/v2/txnkv/txnlock" "github.com/tikv/client-go/v2/util" ) func TestAsyncCommit(t *testing.T) { suite.Run(t, new(testAsyncCommitSuite)) } // testAsyncCommitCommon is used to put common parts that will be both used by // testAsyncCommitSuite and testAsyncCommitFailSuite. type testAsyncCommitCommon struct { suite.Suite cluster testutils.Cluster store *tikv.KVStore } // TODO(youjiali1995): remove it after updating TiDB. type unistoreClientWrapper struct { *unistore.RPCClient } func (c *unistoreClientWrapper) SetEventListener(listener tikv.ClientEventListener) {} func (s *testAsyncCommitCommon) setUpTest() { if *withTiKV { s.store = NewTestStore(s.T()) return } client, pdClient, cluster, err := unistore.New("", nil) s.Require().Nil(err) unistore.BootstrapWithSingleStore(cluster) s.cluster = cluster store, err := tikv.NewTestTiKVStore(fpClient{Client: &unistoreClientWrapper{client}}, pdClient, nil, nil, 0) s.Require().Nil(err) s.store = store } func (s *testAsyncCommitCommon) tearDownTest() { s.store.Close() } func (s *testAsyncCommitCommon) putAlphabets(enableAsyncCommit bool) { for ch := byte('a'); ch <= byte('z'); ch++ { s.putKV([]byte{ch}, []byte{ch}, enableAsyncCommit) } } func (s *testAsyncCommitCommon) putKV(key, value []byte, enableAsyncCommit bool) (uint64, uint64) { txn := s.beginAsyncCommit() err := txn.Set(key, value) s.Nil(err) err = txn.Commit(context.Background()) s.Nil(err) return txn.StartTS(), txn.CommitTS() } func (s *testAsyncCommitCommon) mustGetFromTxn(txn transaction.TxnProbe, key, expectedValue []byte) { v, err := txn.Get(context.Background(), key) s.Nil(err) s.Equal(v, expectedValue) } func (s *testAsyncCommitCommon) mustGetLock(key []byte) *txnkv.Lock { ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) s.Nil(err) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ Key: key, Version: ver, }) bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil) loc, err := s.store.GetRegionCache().LocateKey(bo, key) s.Nil(err) resp, err := s.store.SendReq(bo, req, loc.Region, time.Second*10) s.Nil(err) s.NotNil(resp.Resp) keyErr := resp.Resp.(*kvrpcpb.GetResponse).GetError() s.NotNil(keyErr) lock, err := txnlock.ExtractLockFromKeyErr(keyErr) s.Nil(err) return lock } func (s *testAsyncCommitCommon) mustPointGet(key, expectedValue []byte) { snap := s.store.GetSnapshot(math.MaxUint64) value, err := snap.Get(context.Background(), key) s.Nil(err) s.Equal(value, expectedValue) } func (s *testAsyncCommitCommon) mustGetFromSnapshot(version uint64, key, expectedValue []byte) { snap := s.store.GetSnapshot(version) value, err := snap.Get(context.Background(), key) s.Nil(err) s.Equal(value, expectedValue) } func (s *testAsyncCommitCommon) mustGetNoneFromSnapshot(version uint64, key []byte) { snap := s.store.GetSnapshot(version) _, err := snap.Get(context.Background(), key) s.Equal(errors.Cause(err), tikverr.ErrNotExist) } func (s *testAsyncCommitCommon) beginAsyncCommitWithLinearizability() transaction.TxnProbe { txn := s.beginAsyncCommit() txn.SetCausalConsistency(false) return txn } func (s *testAsyncCommitCommon) beginAsyncCommit() transaction.TxnProbe { txn, err := s.store.Begin() s.Nil(err) txn.SetEnableAsyncCommit(true) return transaction.TxnProbe{KVTxn: txn} } func (s *testAsyncCommitCommon) begin() transaction.TxnProbe { txn, err := s.store.Begin() s.Nil(err) return transaction.TxnProbe{KVTxn: txn} } func (s *testAsyncCommitCommon) begin1PC() transaction.TxnProbe { txn, err := s.store.Begin() s.Nil(err) txn.SetEnable1PC(true) return transaction.TxnProbe{KVTxn: txn} } type testAsyncCommitSuite struct { testAsyncCommitCommon bo *tikv.Backoffer } func (s *testAsyncCommitSuite) SetupTest() { s.testAsyncCommitCommon.setUpTest() s.bo = tikv.NewBackofferWithVars(context.Background(), 5000, nil) } func (s *testAsyncCommitSuite) TearDownTest() { s.testAsyncCommitCommon.tearDownTest() } func (s *testAsyncCommitSuite) lockKeysWithAsyncCommit(keys, values [][]byte, primaryKey, primaryValue []byte, commitPrimary bool) (uint64, uint64) { txn, err := s.store.Begin() s.Nil(err) txn.SetEnableAsyncCommit(true) for i, k := range keys { if len(values[i]) > 0 { err = txn.Set(k, values[i]) } else { err = txn.Delete(k) } s.Nil(err) } if len(primaryValue) > 0 { err = txn.Set(primaryKey, primaryValue) } else { err = txn.Delete(primaryKey) } s.Nil(err) txnProbe := transaction.TxnProbe{KVTxn: txn} tpc, err := txnProbe.NewCommitter(0) s.Nil(err) tpc.SetPrimaryKey(primaryKey) tpc.SetUseAsyncCommit() ctx := context.Background() err = tpc.PrewriteAllMutations(ctx) s.Nil(err) if commitPrimary { commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) s.Nil(err) tpc.SetCommitTS(commitTS) err = tpc.CommitMutations(ctx) s.Nil(err) } return txn.StartTS(), tpc.GetCommitTS() } func (s *testAsyncCommitSuite) TestCheckSecondaries() { // This test doesn't support tikv mode. if *withTiKV { return } s.putAlphabets(true) loc, err := s.store.GetRegionCache().LocateKey(s.bo, []byte("a")) s.Nil(err) newRegionID, peerID := s.cluster.AllocID(), s.cluster.AllocID() s.cluster.Split(loc.Region.GetID(), newRegionID, []byte("e"), []uint64{peerID}, peerID) s.store.GetRegionCache().InvalidateCachedRegion(loc.Region) // No locks to check, only primary key is locked, should be successful. s.lockKeysWithAsyncCommit([][]byte{}, [][]byte{}, []byte("z"), []byte("z"), false) lock := s.mustGetLock([]byte("z")) lock.UseAsyncCommit = true ts, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) s.Nil(err) var lockutil txnlock.LockProbe status := lockutil.NewLockStatus(nil, true, ts) resolver := tikv.NewLockResolverProb(s.store.GetLockResolver()) err = resolver.ResolveAsyncCommitLock(s.bo, lock, status) s.Nil(err) currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) s.Nil(err) status, err = resolver.GetTxnStatus(s.bo, lock.TxnID, []byte("z"), currentTS, currentTS, true, false, nil) s.Nil(err) s.True(status.IsCommitted()) s.Equal(status.CommitTS(), ts) // One key is committed (i), one key is locked (a). Should get committed. ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) s.Nil(err) commitTs := ts + 10 gotCheckA := int64(0) gotCheckB := int64(0) gotResolve := int64(0) gotOther := int64(0) mock := mockResolveClient{ Client: s.store.GetTiKVClient(), onCheckSecondaries: func(req *kvrpcpb.CheckSecondaryLocksRequest) (*tikvrpc.Response, error) { if req.StartVersion != ts { return nil, errors.Errorf("Bad start version: %d, expected: %d", req.StartVersion, ts) } var resp kvrpcpb.CheckSecondaryLocksResponse for _, k := range req.Keys { if bytes.Equal(k, []byte("a")) { atomic.StoreInt64(&gotCheckA, 1) resp = kvrpcpb.CheckSecondaryLocksResponse{ Locks: []*kvrpcpb.LockInfo{{Key: []byte("a"), PrimaryLock: []byte("z"), LockVersion: ts, UseAsyncCommit: true}}, CommitTs: commitTs, } } else if bytes.Equal(k, []byte("i")) { atomic.StoreInt64(&gotCheckB, 1) resp = kvrpcpb.CheckSecondaryLocksResponse{ Locks: []*kvrpcpb.LockInfo{}, CommitTs: commitTs, } } else { fmt.Printf("Got other key: %s\n", k) atomic.StoreInt64(&gotOther, 1) } } return &tikvrpc.Response{Resp: &resp}, nil }, onResolveLock: func(req *kvrpcpb.ResolveLockRequest) (*tikvrpc.Response, error) { if req.StartVersion != ts { return nil, errors.Errorf("Bad start version: %d, expected: %d", req.StartVersion, ts) } if req.CommitVersion != commitTs { return nil, errors.Errorf("Bad commit version: %d, expected: %d", req.CommitVersion, commitTs) } for _, k := range req.Keys { if bytes.Equal(k, []byte("a")) || bytes.Equal(k, []byte("z")) { atomic.StoreInt64(&gotResolve, 1) } else { atomic.StoreInt64(&gotOther, 1) } } resp := kvrpcpb.ResolveLockResponse{} return &tikvrpc.Response{Resp: &resp}, nil }, } s.store.SetTiKVClient(&mock) status = lockutil.NewLockStatus([][]byte{[]byte("a"), []byte("i")}, true, 0) lock = &txnkv.Lock{ Key: []byte("a"), Primary: []byte("z"), TxnID: ts, LockType: kvrpcpb.Op_Put, UseAsyncCommit: true, MinCommitTS: ts + 5, } _ = s.beginAsyncCommit() err = resolver.ResolveAsyncCommitLock(s.bo, lock, status) s.Nil(err) s.Equal(gotCheckA, int64(1)) s.Equal(gotCheckB, int64(1)) s.Equal(gotOther, int64(0)) s.Equal(gotResolve, int64(1)) // One key has been rolled back (b), one is locked (a). Should be rolled back. ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) s.Nil(err) commitTs = ts + 10 gotCheckA = int64(0) gotCheckB = int64(0) gotResolve = int64(0) gotOther = int64(0) mock.onResolveLock = func(req *kvrpcpb.ResolveLockRequest) (*tikvrpc.Response, error) { if req.StartVersion != ts { return nil, errors.Errorf("Bad start version: %d, expected: %d", req.StartVersion, ts) } if req.CommitVersion != commitTs { return nil, errors.Errorf("Bad commit version: %d, expected: 0", req.CommitVersion) } for _, k := range req.Keys { if bytes.Equal(k, []byte("a")) || bytes.Equal(k, []byte("z")) { atomic.StoreInt64(&gotResolve, 1) } else { atomic.StoreInt64(&gotOther, 1) } } resp := kvrpcpb.ResolveLockResponse{} return &tikvrpc.Response{Resp: &resp}, nil } lock.TxnID = ts lock.MinCommitTS = ts + 5 err = resolver.ResolveAsyncCommitLock(s.bo, lock, status) s.Nil(err) s.Equal(gotCheckA, int64(1)) s.Equal(gotCheckB, int64(1)) s.Equal(gotResolve, int64(1)) s.Equal(gotOther, int64(0)) } func (s *testAsyncCommitSuite) TestRepeatableRead() { var sessionID uint64 = 0 test := func(isPessimistic bool) { s.putKV([]byte("k1"), []byte("v1"), true) sessionID++ ctx := context.WithValue(context.Background(), util.SessionID, sessionID) txn1 := s.beginAsyncCommit() txn1.SetPessimistic(isPessimistic) s.mustGetFromTxn(txn1, []byte("k1"), []byte("v1")) txn1.Set([]byte("k1"), []byte("v2")) for i := 0; i < 20; i++ { _, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) s.Nil(err) } txn2 := s.beginAsyncCommit() s.mustGetFromTxn(txn2, []byte("k1"), []byte("v1")) err := txn1.Commit(ctx) s.Nil(err) // Check txn1 is committed in async commit. s.True(txn1.IsAsyncCommit()) s.mustGetFromTxn(txn2, []byte("k1"), []byte("v1")) err = txn2.Rollback() s.Nil(err) txn3 := s.beginAsyncCommit() s.mustGetFromTxn(txn3, []byte("k1"), []byte("v2")) err = txn3.Rollback() s.Nil(err) } test(false) test(true) } // It's just a simple validation of linearizability. // Extra tests are needed to test this feature with the control of the TiKV cluster. func (s *testAsyncCommitSuite) TestAsyncCommitLinearizability() { t1 := s.beginAsyncCommitWithLinearizability() t2 := s.beginAsyncCommitWithLinearizability() err := t1.Set([]byte("a"), []byte("a1")) s.Nil(err) err = t2.Set([]byte("b"), []byte("b1")) s.Nil(err) ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) // t2 commits earlier than t1 err = t2.Commit(ctx) s.Nil(err) err = t1.Commit(ctx) s.Nil(err) commitTS1 := t1.CommitTS() commitTS2 := t2.CommitTS() s.Less(commitTS2, commitTS1) } // TestAsyncCommitWithMultiDC tests that async commit can only be enabled in global transactions func (s *testAsyncCommitSuite) TestAsyncCommitWithMultiDC() { // It requires setting placement rules to run with TiKV if *withTiKV { return } localTxn := s.beginAsyncCommit() err := localTxn.Set([]byte("a"), []byte("a1")) localTxn.SetScope("bj") s.Nil(err) ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) err = localTxn.Commit(ctx) s.Nil(err) s.False(localTxn.IsAsyncCommit()) globalTxn := s.beginAsyncCommit() err = globalTxn.Set([]byte("b"), []byte("b1")) globalTxn.SetScope(oracle.GlobalTxnScope) s.Nil(err) err = globalTxn.Commit(ctx) s.Nil(err) s.True(globalTxn.IsAsyncCommit()) } func (s *testAsyncCommitSuite) TestResolveTxnFallbackFromAsyncCommit() { keys := [][]byte{[]byte("k0"), []byte("k1")} values := [][]byte{[]byte("v00"), []byte("v10")} initTest := func() transaction.CommitterProbe { t0 := s.begin() err := t0.Set(keys[0], values[0]) s.Nil(err) err = t0.Set(keys[1], values[1]) s.Nil(err) err = t0.Commit(context.Background()) s.Nil(err) t1 := s.beginAsyncCommit() err = t1.Set(keys[0], []byte("v01")) s.Nil(err) err = t1.Set(keys[1], []byte("v11")) s.Nil(err) committer, err := t1.NewCommitter(1) s.Nil(err) committer.SetLockTTL(1) committer.SetUseAsyncCommit() return committer } prewriteKey := func(committer transaction.CommitterProbe, idx int, fallback bool) { bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil) loc, err := s.store.GetRegionCache().LocateKey(bo, keys[idx]) s.Nil(err) req := committer.BuildPrewriteRequest(loc.Region.GetID(), loc.Region.GetConfVer(), loc.Region.GetVer(), committer.GetMutations().Slice(idx, idx+1), 1) if fallback { req.Req.(*kvrpcpb.PrewriteRequest).MaxCommitTs = 1 } resp, err := s.store.SendReq(bo, req, loc.Region, 5000) s.Nil(err) s.NotNil(resp.Resp) } readKey := func(idx int) { t2 := s.begin() ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() val, err := t2.Get(ctx, keys[idx]) s.Nil(err) s.Equal(val, values[idx]) } // Case 1: Fallback primary, read primary committer := initTest() prewriteKey(committer, 0, true) prewriteKey(committer, 1, false) readKey(0) readKey(1) // Case 2: Fallback primary, read secondary committer = initTest() prewriteKey(committer, 0, true) prewriteKey(committer, 1, false) readKey(1) readKey(0) // Case 3: Fallback secondary, read primary committer = initTest() prewriteKey(committer, 0, false) prewriteKey(committer, 1, true) readKey(0) readKey(1) // Case 4: Fallback secondary, read secondary committer = initTest() prewriteKey(committer, 0, false) prewriteKey(committer, 1, true) readKey(1) readKey(0) // Case 5: Fallback both, read primary committer = initTest() prewriteKey(committer, 0, true) prewriteKey(committer, 1, true) readKey(0) readKey(1) // Case 6: Fallback both, read secondary committer = initTest() prewriteKey(committer, 0, true) prewriteKey(committer, 1, true) readKey(1) readKey(0) } type mockResolveClient struct { tikv.Client onResolveLock func(*kvrpcpb.ResolveLockRequest) (*tikvrpc.Response, error) onCheckSecondaries func(*kvrpcpb.CheckSecondaryLocksRequest) (*tikvrpc.Response, error) } func (m *mockResolveClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { // Intercept check secondary locks and resolve lock messages if the callback is non-nil. // If the callback returns (nil, nil), forward to the inner client. if cr, ok := req.Req.(*kvrpcpb.CheckSecondaryLocksRequest); ok && m.onCheckSecondaries != nil { result, err := m.onCheckSecondaries(cr) if result != nil || err != nil { return result, err } } else if rr, ok := req.Req.(*kvrpcpb.ResolveLockRequest); ok && m.onResolveLock != nil { result, err := m.onResolveLock(rr) if result != nil || err != nil { return result, err } } return m.Client.SendRequest(ctx, addr, req, timeout) } // TestPessimisticTxnResolveAsyncCommitLock tests that pessimistic transactions resolve non-expired async-commit locks during the prewrite phase. // Pessimistic transactions will resolve locks immediately during the prewrite phase because of the special logic for handling non-pessimistic lock conflict. // However, async-commit locks can't be resolved until they expire. This test covers it. func (s *testAsyncCommitSuite) TestPessimisticTxnResolveAsyncCommitLock() { ctx := context.Background() k := []byte("k") // Lock the key with an async-commit lock. s.lockKeysWithAsyncCommit([][]byte{}, [][]byte{}, k, k, false) txn, err := s.store.Begin() s.Nil(err) txn.SetPessimistic(true) err = txn.LockKeys(ctx, &kv.LockCtx{ForUpdateTS: txn.StartTS()}, []byte("k1")) s.Nil(err) txn.Set(k, k) err = txn.Commit(context.Background()) s.Nil(err) } func (s *testAsyncCommitSuite) TestRollbackAsyncCommitEnforcesFallback() { // This test doesn't support tikv mode. t1 := s.beginAsyncCommit() t1.SetPessimistic(true) t1.Set([]byte("a"), []byte("a")) t1.Set([]byte("z"), []byte("z")) committer, err := t1.NewCommitter(1) s.Nil(err) committer.SetUseAsyncCommit() committer.SetLockTTL(1000) committer.SetMaxCommitTS(oracle.ComposeTS(oracle.ExtractPhysical(committer.GetStartTS())+1500, 0)) committer.PrewriteMutations(context.Background(), committer.GetMutations().Slice(0, 1)) s.True(committer.IsAsyncCommit()) lock := s.mustGetLock([]byte("a")) resolver := tikv.NewLockResolverProb(s.store.GetLockResolver()) for { currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) s.Nil(err) status, err := resolver.GetTxnStatus(s.bo, lock.TxnID, []byte("a"), currentTS, currentTS, false, false, nil) s.Nil(err) if status.IsRolledBack() { break } time.Sleep(time.Millisecond * 30) } s.True(committer.IsAsyncCommit()) committer.PrewriteMutations(context.Background(), committer.GetMutations().Slice(1, 2)) s.False(committer.IsAsyncCommit()) } func (s *testAsyncCommitSuite) TestAsyncCommitLifecycleHooks() { reachedPre := atomic.Bool{} reachedPost := atomic.Bool{} var wg sync.WaitGroup t1 := s.beginAsyncCommit() t1.SetBackgroundGoroutineLifecycleHooks(transaction.LifecycleHooks{ Pre: func() { wg.Add(1) reachedPre.Store(true) }, Post: func() { s.Equal(reachedPre.Load(), true) reachedPost.Store(true) wg.Done() }, }) t1.Set([]byte("a"), []byte("a")) t1.Set([]byte("z"), []byte("z")) s.Nil(t1.Commit(context.Background())) s.Equal(reachedPre.Load(), true) s.Equal(reachedPost.Load(), false) wg.Wait() s.Equal(reachedPost.Load(), true) }