// Copyright 2021 TiKV Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // NOTE: The code in this file is based on code from the // TiDB project, licensed under the Apache License v 2.0 // // https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/rawkv_test.go // // Copyright 2021 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package rawkv import ( "bytes" "context" "fmt" "hash/crc64" "testing" "github.com/pingcap/failpoint" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/config/retry" "github.com/tikv/client-go/v2/internal/locate" "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/tikv" ) func TestRawKV(t *testing.T) { tikv.EnableFailpoints() suite.Run(t, new(testRawkvSuite)) } type testRawkvSuite struct { suite.Suite mvccStore mocktikv.MVCCStore cluster *mocktikv.Cluster store1 uint64 // store1 is leader store2 uint64 // store2 is follower peer1 uint64 // peer1 is leader peer2 uint64 // peer2 is follower region1 uint64 bo *retry.Backoffer } type key = []byte type value = []byte func (s *testRawkvSuite) SetupTest() { s.mvccStore = mocktikv.MustNewMVCCStore() s.cluster = mocktikv.NewCluster(s.mvccStore) storeIDs, peerIDs, regionID, _ := mocktikv.BootstrapWithMultiStores(s.cluster, 2) s.region1 = regionID s.store1 = storeIDs[0] s.store2 = storeIDs[1] s.peer1 = peerIDs[0] s.peer2 = peerIDs[1] s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil) s.Nil(failpoint.Enable("tikvclient/injectReResolveInterval", `return("1s")`)) } func (s *testRawkvSuite) TearDownTest() { s.Nil(failpoint.Disable("tikvclient/injectReResolveInterval")) s.mvccStore.Close() } func (s *testRawkvSuite) storeAddr(id uint64) string { return fmt.Sprintf("store%d", id) } func (s *testRawkvSuite) TestReplaceAddrWithNewStore() { mvccStore := mocktikv.MustNewMVCCStore() defer mvccStore.Close() client := &Client{ clusterID: 0, regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), } defer client.Close() testKey := []byte("test_key") testValue := []byte("test_value") err := client.Put(context.Background(), testKey, testValue) s.Nil(err) // make store2 using store1's addr and store1 offline store1Addr := s.storeAddr(s.store1) s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2)) s.cluster.UpdateStoreAddr(s.store2, store1Addr) s.cluster.RemoveStore(s.store1) s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) defer failpoint.Disable("tikvclient/injectLiveness") getVal, err := client.Get(context.Background(), testKey) s.Nil(err) s.Equal(getVal, testValue) } func (s *testRawkvSuite) TestUpdateStoreAddr() { mvccStore := mocktikv.MustNewMVCCStore() defer mvccStore.Close() client := &Client{ clusterID: 0, regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), } defer client.Close() testKey := []byte("test_key") testValue := []byte("test_value") err := client.Put(context.Background(), testKey, testValue) s.Nil(err) // tikv-server reports `StoreNotMatch` And retry store1Addr := s.storeAddr(s.store1) s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2)) s.cluster.UpdateStoreAddr(s.store2, store1Addr) getVal, err := client.Get(context.Background(), testKey) s.Nil(err) s.Equal(getVal, testValue) } func (s *testRawkvSuite) TestReplaceNewAddrAndOldOfflineImmediately() { mvccStore := mocktikv.MustNewMVCCStore() defer mvccStore.Close() client := &Client{ clusterID: 0, regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), } defer client.Close() testKey := []byte("test_key") testValue := []byte("test_value") err := client.Put(context.Background(), testKey, testValue) s.Nil(err) // pre-load store2's address into cache via follower-read. loc, err := client.regionCache.LocateKey(s.bo, testKey) s.Nil(err) fctx, err := client.regionCache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, 0) s.Nil(err) s.Equal(fctx.Store.StoreID(), s.store2) s.Equal(fctx.Addr, "store2") // make store2 using store1's addr and store1 offline store1Addr := s.storeAddr(s.store1) s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2)) s.cluster.UpdateStoreAddr(s.store2, store1Addr) s.cluster.RemoveStore(s.store1) s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) defer failpoint.Disable("tikvclient/injectLiveness") getVal, err := client.Get(context.Background(), testKey) s.Nil(err) s.Equal(getVal, testValue) } func (s *testRawkvSuite) TestReplaceStore() { mvccStore := mocktikv.MustNewMVCCStore() defer mvccStore.Close() client := &Client{ clusterID: 0, regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), } defer client.Close() testKey := []byte("test_key") testValue := []byte("test_value") err := client.Put(context.Background(), testKey, testValue) s.Nil(err) s.cluster.MarkTombstone(s.store1) store3 := s.cluster.AllocID() peer3 := s.cluster.AllocID() s.cluster.AddStore(store3, s.storeAddr(s.store1)) s.cluster.AddPeer(s.region1, store3, peer3) s.cluster.RemovePeer(s.region1, s.peer1) s.cluster.ChangeLeader(s.region1, peer3) s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) defer failpoint.Disable("tikvclient/injectLiveness") err = client.Put(context.Background(), testKey, testValue) s.Nil(err) } func (s *testRawkvSuite) TestColumnFamilyForClient() { mvccStore := mocktikv.MustNewMVCCStore() defer mvccStore.Close() client := &Client{ clusterID: 0, regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), } defer client.Close() testKeyCf1, testValueCf1, cf1 := []byte("test_key_cf1"), []byte("test_value_cf1"), "cf1" testKeyCf2, testValueCf2, cf2 := []byte("test_key_cf2"), []byte("test_value_cf2"), "cf2" // test put client.SetColumnFamily(cf1) err := client.Put(context.Background(), testKeyCf1, testValueCf1) s.Nil(err) client.SetColumnFamily(cf2) err = client.Put(context.Background(), testKeyCf2, testValueCf2) s.Nil(err) // make store2 using store1's addr and store1 offline store1Addr := s.storeAddr(s.store1) s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2)) s.cluster.UpdateStoreAddr(s.store2, store1Addr) s.cluster.RemoveStore(s.store1) s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) defer failpoint.Disable("tikvclient/injectLiveness") // test get client.SetColumnFamily(cf1) getVal, err := client.Get(context.Background(), testKeyCf1) s.Nil(err) s.Equal(getVal, testValueCf1) getVal, err = client.Get(context.Background(), testKeyCf2) s.Nil(err) s.Equal(getVal, []byte(nil)) client.SetColumnFamily(cf2) getVal, err = client.Get(context.Background(), testKeyCf2) s.Nil(err) s.Equal(getVal, testValueCf2) getVal, err = client.Get(context.Background(), testKeyCf1) s.Nil(err) s.Equal(getVal, []byte(nil)) client.SetColumnFamily("") getVal, err = client.Get(context.Background(), testKeyCf1) s.Nil(err) s.Equal(getVal, []byte(nil)) getVal, err = client.Get(context.Background(), testKeyCf2) s.Nil(err) s.Equal(getVal, []byte(nil)) // test delete client.SetColumnFamily(cf1) err = client.Delete(context.Background(), testKeyCf1) s.Nil(err) getVal, err = client.Get(context.Background(), testKeyCf1) s.Nil(err) s.Equal(getVal, []byte(nil)) client.SetColumnFamily(cf2) err = client.Delete(context.Background(), testKeyCf2) s.Nil(err) getVal, err = client.Get(context.Background(), testKeyCf2) s.Nil(err) s.Equal(getVal, []byte(nil)) } func (s *testRawkvSuite) TestColumnFamilyForOptions() { mvccStore := mocktikv.MustNewMVCCStore() defer mvccStore.Close() client := &Client{ clusterID: 0, regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), } defer client.Close() keyInCf1, valueInCf1, cf1 := []byte("db"), []byte("TiDB"), "cf1" keyInCf2, valueInCf2, cf2 := []byte("kv"), []byte("TiKV"), "cf2" // test put err := client.Put(context.Background(), keyInCf1, valueInCf1, SetColumnFamily(cf1)) s.Nil(err) err = client.Put(context.Background(), keyInCf2, valueInCf2, SetColumnFamily(cf2)) s.Nil(err) // make store2 using store1's addr and store1 offline store1Addr := s.storeAddr(s.store1) s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2)) s.cluster.UpdateStoreAddr(s.store2, store1Addr) s.cluster.RemoveStore(s.store1) s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) defer failpoint.Disable("tikvclient/injectLiveness") // test get getVal, err := client.Get(context.Background(), keyInCf1, SetColumnFamily(cf1)) s.Nil(err) s.Equal(getVal, valueInCf1) getVal, err = client.Get(context.Background(), keyInCf2, SetColumnFamily(cf1)) s.Nil(err) s.Equal(getVal, []byte(nil)) getVal, err = client.Get(context.Background(), keyInCf2, SetColumnFamily(cf2)) s.Nil(err) s.Equal(getVal, valueInCf2) getVal, err = client.Get(context.Background(), keyInCf1, SetColumnFamily(cf2)) s.Nil(err) s.Equal(getVal, []byte(nil)) // test delete err = client.Delete(context.Background(), keyInCf1, SetColumnFamily(cf1)) s.Nil(err) getVal, err = client.Get(context.Background(), keyInCf1, SetColumnFamily(cf1)) s.Nil(err) s.Equal(getVal, []byte(nil)) err = client.Delete(context.Background(), keyInCf2, SetColumnFamily(cf2)) s.Nil(err) getVal, err = client.Get(context.Background(), keyInCf2, SetColumnFamily(cf2)) s.Nil(err) s.Equal(getVal, []byte(nil)) } func (s *testRawkvSuite) TestBatch() { mvccStore := mocktikv.MustNewMVCCStore() defer mvccStore.Close() client := &Client{ clusterID: 0, regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), } defer client.Close() cf := "test_cf" paris := map[string]string{ "db": "TiDB", "key2": "value2", "key1": "value1", "key3": "value3", "kv": "TiKV", } keys := make([]key, 0) values := make([]value, 0) for k, v := range paris { keys = append(keys, []byte(k)) values = append(values, []byte(v)) } // test BatchPut err := client.BatchPut(context.Background(), keys, values, SetColumnFamily(cf)) s.Nil(err) // make store2 using store1's addr and store1 offline store1Addr := s.storeAddr(s.store1) s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2)) s.cluster.UpdateStoreAddr(s.store2, store1Addr) s.cluster.RemoveStore(s.store1) s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) defer failpoint.Disable("tikvclient/injectLiveness") // test BatchGet returnValues, err := client.BatchGet(context.Background(), keys, SetColumnFamily(cf)) s.Nil(err) s.Equal(len(returnValues), len(paris)) for i, v := range returnValues { s.True(bytes.Equal(v, []byte(paris[string(keys[i])]))) } // test BatchDelete err = client.BatchDelete(context.Background(), keys, SetColumnFamily(cf)) s.Nil(err) returnValue, err := client.Get(context.Background(), keys[0], SetColumnFamily(cf)) s.Nil(err) s.Equal(returnValue, []byte(nil)) } func (s *testRawkvSuite) TestScan() { mvccStore := mocktikv.MustNewMVCCStore() defer mvccStore.Close() client := &Client{ clusterID: 0, regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), } defer client.Close() cf := "test_cf" paris := map[string]string{ "db": "TiDB", "key2": "value2", "key1": "value1", "key4": "value4", "key3": "value3", "kv": "TiKV", } keys := make([]key, 0) values := make([]value, 0) for k, v := range paris { keys = append(keys, []byte(k)) values = append(values, []byte(v)) } // BatchPut err := client.BatchPut(context.Background(), keys, values, SetColumnFamily(cf)) s.Nil(err) // make store2 using store1's addr and store1 offline store1Addr := s.storeAddr(s.store1) s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2)) s.cluster.UpdateStoreAddr(s.store2, store1Addr) s.cluster.RemoveStore(s.store1) s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) defer failpoint.Disable("tikvclient/injectLiveness") // test scan startKey, endKey := []byte("key1"), []byte("keyz") limit := 3 returnKeys, returnValues, err := client.Scan(context.Background(), startKey, endKey, limit, SetColumnFamily(cf)) s.Nil(err) s.Equal(len(returnKeys), limit) s.Equal(len(returnValues), limit) s.True(bytes.Equal(returnKeys[0], []byte("key1"))) s.True(bytes.Equal(returnKeys[1], []byte("key2"))) s.True(bytes.Equal(returnKeys[2], []byte("key3"))) for i, k := range returnKeys { s.True(bytes.Equal(returnValues[i], []byte(paris[string(k)]))) } // test ReverseScan with onlyKey startKey, endKey = []byte("key3"), nil limit = 10 returnKeys, _, err = client.ReverseScan( context.Background(), startKey, endKey, limit, SetColumnFamily(cf), ScanKeyOnly(), ) s.Nil(err) s.Equal(len(returnKeys), 3) s.True(bytes.Equal(returnKeys[0], []byte("key2"))) s.True(bytes.Equal(returnKeys[1], []byte("key1"))) s.True(bytes.Equal(returnKeys[2], []byte("db"))) } func (s *testRawkvSuite) TestDeleteRange() { mvccStore := mocktikv.MustNewMVCCStore() defer mvccStore.Close() client := &Client{ clusterID: 0, regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), } defer client.Close() cf := "test_cf" paris := map[string]string{ "db": "TiDB", "key2": "value2", "key1": "value1", "key4": "value4", "key3": "value3", "kv": "TiKV", } keys := make([]key, 0) values := make([]value, 0) for k, v := range paris { keys = append(keys, []byte(k)) values = append(values, []byte(v)) } // BatchPut err := client.BatchPut(context.Background(), keys, values, SetColumnFamily(cf)) s.Nil(err) // make store2 using store1's addr and store1 offline store1Addr := s.storeAddr(s.store1) s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2)) s.cluster.UpdateStoreAddr(s.store2, store1Addr) s.cluster.RemoveStore(s.store1) s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) defer failpoint.Disable("tikvclient/injectLiveness") // test DeleteRange startKey, endKey := []byte("key3"), []byte(nil) err = client.DeleteRange(context.Background(), startKey, endKey, SetColumnFamily(cf)) s.Nil(err) ks, vs, err := client.Scan(context.Background(), startKey, endKey, 10, SetColumnFamily(cf)) s.Nil(err) s.Equal(0, len(ks)) s.Equal(0, len(vs)) } func (s *testRawkvSuite) TestCompareAndSwap() { mvccStore := mocktikv.MustNewMVCCStore() defer mvccStore.Close() client := &Client{ clusterID: 0, regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), } defer client.Close() cf := "my_cf" key, value, newValue := []byte("kv"), []byte("TiDB"), []byte("TiKV") // put err := client.Put(context.Background(), key, value, SetColumnFamily(cf)) s.Nil(err) // make store2 using store1's addr and store1 offline store1Addr := s.storeAddr(s.store1) s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2)) s.cluster.UpdateStoreAddr(s.store2, store1Addr) s.cluster.RemoveStore(s.store1) s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) defer failpoint.Disable("tikvclient/injectLiveness") // test CompareAndSwap for false atomic _, _, err = client.CompareAndSwap( context.Background(), key, value, newValue, SetColumnFamily(cf)) s.Error(err) // test CompareAndSwap for swap successfully client.SetAtomicForCAS(true) returnValue, swapped, err := client.CompareAndSwap( context.Background(), key, newValue, newValue, SetColumnFamily(cf)) s.Nil(err) s.False(swapped) s.True(bytes.Equal(value, returnValue)) // test CompareAndSwap for swap successfully client.SetAtomicForCAS(true) returnValue, swapped, err = client.CompareAndSwap( context.Background(), key, value, newValue, SetColumnFamily(cf)) s.Nil(err) s.True(swapped) s.True(bytes.Equal(value, returnValue)) v, err := client.Get(context.Background(), key, SetColumnFamily(cf)) s.Nil(err) s.Equal(string(v), string(newValue)) } func (s *testRawkvSuite) TestRawChecksum() { mvccStore := mocktikv.MustNewMVCCStore() defer mvccStore.Close() client := &Client{ clusterID: 0, regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), } defer client.Close() cf := "CF_DEFAULT" paris := map[string]string{ "db": "TiDB", "key2": "value2", "key1": "value1", "key4": "value4", "key3": "value3", "kv": "TiKV", } keys := make([]key, 0) values := make([]value, 0) for k, v := range paris { keys = append(keys, []byte(k)) values = append(values, []byte(v)) } expectCrc64Xor := uint64(0) expectTotalKvs := uint64(0) expectTotalBytes := uint64(0) digest := crc64.New(crc64.MakeTable(crc64.ECMA)) for i, key := range keys { digest.Reset() digest.Write(key) digest.Write(values[i]) expectCrc64Xor ^= digest.Sum64() expectTotalKvs++ expectTotalBytes += (uint64)(len(key) + len(values[i])) } // BatchPut err := client.BatchPut(context.Background(), keys, values, SetColumnFamily(cf)) s.Nil(err) // test Checksum startKey, endKey := []byte("db"), []byte(nil) check, err := client.Checksum(context.Background(), startKey, endKey, SetColumnFamily(cf)) s.Nil(err) s.Equal(expectCrc64Xor, check.Crc64Xor) s.Equal(expectTotalKvs, check.TotalKvs) s.Equal(expectTotalBytes, check.TotalBytes) }