client-go/rawkv/rawkv_test.go

660 lines
20 KiB
Go

// Copyright 2021 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// NOTE: The code in this file is based on code from the
// TiDB project, licensed under the Apache License v 2.0
//
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/rawkv_test.go
//
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rawkv
import (
"bytes"
"context"
"fmt"
"hash/crc64"
"testing"
"github.com/pingcap/failpoint"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/config/retry"
"github.com/tikv/client-go/v2/internal/locate"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikv"
)
func TestRawKV(t *testing.T) {
tikv.EnableFailpoints()
suite.Run(t, new(testRawkvSuite))
}
type testRawkvSuite struct {
suite.Suite
mvccStore mocktikv.MVCCStore
cluster *mocktikv.Cluster
store1 uint64 // store1 is leader
store2 uint64 // store2 is follower
peer1 uint64 // peer1 is leader
peer2 uint64 // peer2 is follower
region1 uint64
bo *retry.Backoffer
}
type key = []byte
type value = []byte
func (s *testRawkvSuite) SetupTest() {
s.mvccStore = mocktikv.MustNewMVCCStore()
s.cluster = mocktikv.NewCluster(s.mvccStore)
storeIDs, peerIDs, regionID, _ := mocktikv.BootstrapWithMultiStores(s.cluster, 2)
s.region1 = regionID
s.store1 = storeIDs[0]
s.store2 = storeIDs[1]
s.peer1 = peerIDs[0]
s.peer2 = peerIDs[1]
s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil)
s.Nil(failpoint.Enable("tikvclient/injectReResolveInterval", `return("1s")`))
}
func (s *testRawkvSuite) TearDownTest() {
s.Nil(failpoint.Disable("tikvclient/injectReResolveInterval"))
s.mvccStore.Close()
}
func (s *testRawkvSuite) storeAddr(id uint64) string {
return fmt.Sprintf("store%d", id)
}
func (s *testRawkvSuite) TestReplaceAddrWithNewStore() {
mvccStore := mocktikv.MustNewMVCCStore()
defer mvccStore.Close()
client := &Client{
clusterID: 0,
regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)),
rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil),
}
defer client.Close()
testKey := []byte("test_key")
testValue := []byte("test_value")
err := client.Put(context.Background(), testKey, testValue)
s.Nil(err)
// make store2 using store1's addr and store1 offline
store1Addr := s.storeAddr(s.store1)
s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2))
s.cluster.UpdateStoreAddr(s.store2, store1Addr)
s.cluster.RemoveStore(s.store1)
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)
s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")
getVal, err := client.Get(context.Background(), testKey)
s.Nil(err)
s.Equal(getVal, testValue)
}
func (s *testRawkvSuite) TestUpdateStoreAddr() {
mvccStore := mocktikv.MustNewMVCCStore()
defer mvccStore.Close()
client := &Client{
clusterID: 0,
regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)),
rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil),
}
defer client.Close()
testKey := []byte("test_key")
testValue := []byte("test_value")
err := client.Put(context.Background(), testKey, testValue)
s.Nil(err)
// tikv-server reports `StoreNotMatch` And retry
store1Addr := s.storeAddr(s.store1)
s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2))
s.cluster.UpdateStoreAddr(s.store2, store1Addr)
getVal, err := client.Get(context.Background(), testKey)
s.Nil(err)
s.Equal(getVal, testValue)
}
func (s *testRawkvSuite) TestReplaceNewAddrAndOldOfflineImmediately() {
mvccStore := mocktikv.MustNewMVCCStore()
defer mvccStore.Close()
client := &Client{
clusterID: 0,
regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)),
rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil),
}
defer client.Close()
testKey := []byte("test_key")
testValue := []byte("test_value")
err := client.Put(context.Background(), testKey, testValue)
s.Nil(err)
// pre-load store2's address into cache via follower-read.
loc, err := client.regionCache.LocateKey(s.bo, testKey)
s.Nil(err)
fctx, err := client.regionCache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, 0)
s.Nil(err)
s.Equal(fctx.Store.StoreID(), s.store2)
s.Equal(fctx.Addr, "store2")
// make store2 using store1's addr and store1 offline
store1Addr := s.storeAddr(s.store1)
s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2))
s.cluster.UpdateStoreAddr(s.store2, store1Addr)
s.cluster.RemoveStore(s.store1)
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)
s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")
getVal, err := client.Get(context.Background(), testKey)
s.Nil(err)
s.Equal(getVal, testValue)
}
func (s *testRawkvSuite) TestReplaceStore() {
mvccStore := mocktikv.MustNewMVCCStore()
defer mvccStore.Close()
client := &Client{
clusterID: 0,
regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)),
rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil),
}
defer client.Close()
testKey := []byte("test_key")
testValue := []byte("test_value")
err := client.Put(context.Background(), testKey, testValue)
s.Nil(err)
s.cluster.MarkTombstone(s.store1)
store3 := s.cluster.AllocID()
peer3 := s.cluster.AllocID()
s.cluster.AddStore(store3, s.storeAddr(s.store1))
s.cluster.AddPeer(s.region1, store3, peer3)
s.cluster.RemovePeer(s.region1, s.peer1)
s.cluster.ChangeLeader(s.region1, peer3)
s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")
err = client.Put(context.Background(), testKey, testValue)
s.Nil(err)
}
func (s *testRawkvSuite) TestColumnFamilyForClient() {
mvccStore := mocktikv.MustNewMVCCStore()
defer mvccStore.Close()
client := &Client{
clusterID: 0,
regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)),
rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil),
}
defer client.Close()
testKeyCf1, testValueCf1, cf1 := []byte("test_key_cf1"), []byte("test_value_cf1"), "cf1"
testKeyCf2, testValueCf2, cf2 := []byte("test_key_cf2"), []byte("test_value_cf2"), "cf2"
// test put
client.SetColumnFamily(cf1)
err := client.Put(context.Background(), testKeyCf1, testValueCf1)
s.Nil(err)
client.SetColumnFamily(cf2)
err = client.Put(context.Background(), testKeyCf2, testValueCf2)
s.Nil(err)
// make store2 using store1's addr and store1 offline
store1Addr := s.storeAddr(s.store1)
s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2))
s.cluster.UpdateStoreAddr(s.store2, store1Addr)
s.cluster.RemoveStore(s.store1)
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)
s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")
// test get
client.SetColumnFamily(cf1)
getVal, err := client.Get(context.Background(), testKeyCf1)
s.Nil(err)
s.Equal(getVal, testValueCf1)
getVal, err = client.Get(context.Background(), testKeyCf2)
s.Nil(err)
s.Equal(getVal, []byte(nil))
client.SetColumnFamily(cf2)
getVal, err = client.Get(context.Background(), testKeyCf2)
s.Nil(err)
s.Equal(getVal, testValueCf2)
getVal, err = client.Get(context.Background(), testKeyCf1)
s.Nil(err)
s.Equal(getVal, []byte(nil))
client.SetColumnFamily("")
getVal, err = client.Get(context.Background(), testKeyCf1)
s.Nil(err)
s.Equal(getVal, []byte(nil))
getVal, err = client.Get(context.Background(), testKeyCf2)
s.Nil(err)
s.Equal(getVal, []byte(nil))
// test delete
client.SetColumnFamily(cf1)
err = client.Delete(context.Background(), testKeyCf1)
s.Nil(err)
getVal, err = client.Get(context.Background(), testKeyCf1)
s.Nil(err)
s.Equal(getVal, []byte(nil))
client.SetColumnFamily(cf2)
err = client.Delete(context.Background(), testKeyCf2)
s.Nil(err)
getVal, err = client.Get(context.Background(), testKeyCf2)
s.Nil(err)
s.Equal(getVal, []byte(nil))
}
func (s *testRawkvSuite) TestColumnFamilyForOptions() {
mvccStore := mocktikv.MustNewMVCCStore()
defer mvccStore.Close()
client := &Client{
clusterID: 0,
regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)),
rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil),
}
defer client.Close()
keyInCf1, valueInCf1, cf1 := []byte("db"), []byte("TiDB"), "cf1"
keyInCf2, valueInCf2, cf2 := []byte("kv"), []byte("TiKV"), "cf2"
// test put
err := client.Put(context.Background(), keyInCf1, valueInCf1, SetColumnFamily(cf1))
s.Nil(err)
err = client.Put(context.Background(), keyInCf2, valueInCf2, SetColumnFamily(cf2))
s.Nil(err)
// make store2 using store1's addr and store1 offline
store1Addr := s.storeAddr(s.store1)
s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2))
s.cluster.UpdateStoreAddr(s.store2, store1Addr)
s.cluster.RemoveStore(s.store1)
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)
s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")
// test get
getVal, err := client.Get(context.Background(), keyInCf1, SetColumnFamily(cf1))
s.Nil(err)
s.Equal(getVal, valueInCf1)
getVal, err = client.Get(context.Background(), keyInCf2, SetColumnFamily(cf1))
s.Nil(err)
s.Equal(getVal, []byte(nil))
getVal, err = client.Get(context.Background(), keyInCf2, SetColumnFamily(cf2))
s.Nil(err)
s.Equal(getVal, valueInCf2)
getVal, err = client.Get(context.Background(), keyInCf1, SetColumnFamily(cf2))
s.Nil(err)
s.Equal(getVal, []byte(nil))
// test delete
err = client.Delete(context.Background(), keyInCf1, SetColumnFamily(cf1))
s.Nil(err)
getVal, err = client.Get(context.Background(), keyInCf1, SetColumnFamily(cf1))
s.Nil(err)
s.Equal(getVal, []byte(nil))
err = client.Delete(context.Background(), keyInCf2, SetColumnFamily(cf2))
s.Nil(err)
getVal, err = client.Get(context.Background(), keyInCf2, SetColumnFamily(cf2))
s.Nil(err)
s.Equal(getVal, []byte(nil))
}
func (s *testRawkvSuite) TestBatch() {
mvccStore := mocktikv.MustNewMVCCStore()
defer mvccStore.Close()
client := &Client{
clusterID: 0,
regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)),
rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil),
}
defer client.Close()
cf := "test_cf"
paris := map[string]string{
"db": "TiDB",
"key2": "value2",
"key1": "value1",
"key3": "value3",
"kv": "TiKV",
}
keys := make([]key, 0)
values := make([]value, 0)
for k, v := range paris {
keys = append(keys, []byte(k))
values = append(values, []byte(v))
}
// test BatchPut
err := client.BatchPut(context.Background(), keys, values, SetColumnFamily(cf))
s.Nil(err)
// make store2 using store1's addr and store1 offline
store1Addr := s.storeAddr(s.store1)
s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2))
s.cluster.UpdateStoreAddr(s.store2, store1Addr)
s.cluster.RemoveStore(s.store1)
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)
s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")
// test BatchGet
returnValues, err := client.BatchGet(context.Background(), keys, SetColumnFamily(cf))
s.Nil(err)
s.Equal(len(returnValues), len(paris))
for i, v := range returnValues {
s.True(bytes.Equal(v, []byte(paris[string(keys[i])])))
}
// test BatchDelete
err = client.BatchDelete(context.Background(), keys, SetColumnFamily(cf))
s.Nil(err)
returnValue, err := client.Get(context.Background(), keys[0], SetColumnFamily(cf))
s.Nil(err)
s.Equal(returnValue, []byte(nil))
}
func (s *testRawkvSuite) TestScan() {
mvccStore := mocktikv.MustNewMVCCStore()
defer mvccStore.Close()
client := &Client{
clusterID: 0,
regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)),
rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil),
}
defer client.Close()
cf := "test_cf"
paris := map[string]string{
"db": "TiDB",
"key2": "value2",
"key1": "value1",
"key4": "value4",
"key3": "value3",
"kv": "TiKV",
}
keys := make([]key, 0)
values := make([]value, 0)
for k, v := range paris {
keys = append(keys, []byte(k))
values = append(values, []byte(v))
}
// BatchPut
err := client.BatchPut(context.Background(), keys, values, SetColumnFamily(cf))
s.Nil(err)
// make store2 using store1's addr and store1 offline
store1Addr := s.storeAddr(s.store1)
s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2))
s.cluster.UpdateStoreAddr(s.store2, store1Addr)
s.cluster.RemoveStore(s.store1)
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)
s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")
// test scan
startKey, endKey := []byte("key1"), []byte("keyz")
limit := 3
returnKeys, returnValues, err := client.Scan(context.Background(), startKey, endKey, limit, SetColumnFamily(cf))
s.Nil(err)
s.Equal(len(returnKeys), limit)
s.Equal(len(returnValues), limit)
s.True(bytes.Equal(returnKeys[0], []byte("key1")))
s.True(bytes.Equal(returnKeys[1], []byte("key2")))
s.True(bytes.Equal(returnKeys[2], []byte("key3")))
for i, k := range returnKeys {
s.True(bytes.Equal(returnValues[i], []byte(paris[string(k)])))
}
// test ReverseScan with onlyKey
startKey, endKey = []byte("key3"), nil
limit = 10
returnKeys, _, err = client.ReverseScan(
context.Background(),
startKey,
endKey,
limit,
SetColumnFamily(cf),
ScanKeyOnly(),
)
s.Nil(err)
s.Equal(len(returnKeys), 3)
s.True(bytes.Equal(returnKeys[0], []byte("key2")))
s.True(bytes.Equal(returnKeys[1], []byte("key1")))
s.True(bytes.Equal(returnKeys[2], []byte("db")))
}
func (s *testRawkvSuite) TestDeleteRange() {
mvccStore := mocktikv.MustNewMVCCStore()
defer mvccStore.Close()
client := &Client{
clusterID: 0,
regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)),
rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil),
}
defer client.Close()
cf := "test_cf"
paris := map[string]string{
"db": "TiDB",
"key2": "value2",
"key1": "value1",
"key4": "value4",
"key3": "value3",
"kv": "TiKV",
}
keys := make([]key, 0)
values := make([]value, 0)
for k, v := range paris {
keys = append(keys, []byte(k))
values = append(values, []byte(v))
}
// BatchPut
err := client.BatchPut(context.Background(), keys, values, SetColumnFamily(cf))
s.Nil(err)
// make store2 using store1's addr and store1 offline
store1Addr := s.storeAddr(s.store1)
s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2))
s.cluster.UpdateStoreAddr(s.store2, store1Addr)
s.cluster.RemoveStore(s.store1)
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)
s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")
// test DeleteRange
startKey, endKey := []byte("key3"), []byte(nil)
err = client.DeleteRange(context.Background(), startKey, endKey, SetColumnFamily(cf))
s.Nil(err)
ks, vs, err := client.Scan(context.Background(), startKey, endKey, 10, SetColumnFamily(cf))
s.Nil(err)
s.Equal(0, len(ks))
s.Equal(0, len(vs))
}
func (s *testRawkvSuite) TestCompareAndSwap() {
mvccStore := mocktikv.MustNewMVCCStore()
defer mvccStore.Close()
client := &Client{
clusterID: 0,
regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)),
rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil),
}
defer client.Close()
cf := "my_cf"
key, value, newValue := []byte("kv"), []byte("TiDB"), []byte("TiKV")
// put
err := client.Put(context.Background(), key, value, SetColumnFamily(cf))
s.Nil(err)
// make store2 using store1's addr and store1 offline
store1Addr := s.storeAddr(s.store1)
s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2))
s.cluster.UpdateStoreAddr(s.store2, store1Addr)
s.cluster.RemoveStore(s.store1)
s.cluster.ChangeLeader(s.region1, s.peer2)
s.cluster.RemovePeer(s.region1, s.peer1)
s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
defer failpoint.Disable("tikvclient/injectLiveness")
// test CompareAndSwap for false atomic
_, _, err = client.CompareAndSwap(
context.Background(),
key,
value,
newValue,
SetColumnFamily(cf))
s.Error(err)
// test CompareAndSwap for swap successfully
client.SetAtomicForCAS(true)
returnValue, swapped, err := client.CompareAndSwap(
context.Background(),
key,
newValue,
newValue,
SetColumnFamily(cf))
s.Nil(err)
s.False(swapped)
s.True(bytes.Equal(value, returnValue))
// test CompareAndSwap for swap successfully
client.SetAtomicForCAS(true)
returnValue, swapped, err = client.CompareAndSwap(
context.Background(),
key,
value,
newValue,
SetColumnFamily(cf))
s.Nil(err)
s.True(swapped)
s.True(bytes.Equal(value, returnValue))
v, err := client.Get(context.Background(), key, SetColumnFamily(cf))
s.Nil(err)
s.Equal(string(v), string(newValue))
}
func (s *testRawkvSuite) TestRawChecksum() {
mvccStore := mocktikv.MustNewMVCCStore()
defer mvccStore.Close()
client := &Client{
clusterID: 0,
regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)),
rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil),
}
defer client.Close()
cf := "CF_DEFAULT"
paris := map[string]string{
"db": "TiDB",
"key2": "value2",
"key1": "value1",
"key4": "value4",
"key3": "value3",
"kv": "TiKV",
}
keys := make([]key, 0)
values := make([]value, 0)
for k, v := range paris {
keys = append(keys, []byte(k))
values = append(values, []byte(v))
}
expectCrc64Xor := uint64(0)
expectTotalKvs := uint64(0)
expectTotalBytes := uint64(0)
digest := crc64.New(crc64.MakeTable(crc64.ECMA))
for i, key := range keys {
digest.Reset()
digest.Write(key)
digest.Write(values[i])
expectCrc64Xor ^= digest.Sum64()
expectTotalKvs++
expectTotalBytes += (uint64)(len(key) + len(values[i]))
}
// BatchPut
err := client.BatchPut(context.Background(), keys, values, SetColumnFamily(cf))
s.Nil(err)
// test Checksum
startKey, endKey := []byte("db"), []byte(nil)
check, err := client.Checksum(context.Background(), startKey, endKey, SetColumnFamily(cf))
s.Nil(err)
s.Equal(expectCrc64Xor, check.Crc64Xor)
s.Equal(expectTotalKvs, check.TotalKvs)
s.Equal(expectTotalBytes, check.TotalBytes)
}