client-go/integration_tests/snapshot_test.go

430 lines
13 KiB
Go

// Copyright 2021 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// NOTE: The code in this file is based on code from the
// TiDB project, licensed under the Apache License v 2.0
//
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/tests/snapshot_test.go
//
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tikv_test
import (
"context"
"fmt"
"math"
"sync"
"testing"
"time"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pkg/errors"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv"
"github.com/tikv/client-go/v2/txnkv/transaction"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
)
func TestSnapshot(t *testing.T) {
suite.Run(t, new(testSnapshotSuite))
}
type testSnapshotSuite struct {
suite.Suite
store tikv.StoreProbe
prefix string
rowNums []int
}
func (s *testSnapshotSuite) SetupSuite() {
s.store = tikv.StoreProbe{KVStore: NewTestStore(s.T())}
s.prefix = fmt.Sprintf("snapshot_%d", time.Now().Unix())
s.rowNums = append(s.rowNums, 1, 100, 191)
}
func (s *testSnapshotSuite) TearDownSuite() {
txn := s.beginTxn()
scanner, err := txn.Iter(encodeKey(s.prefix, ""), nil)
s.Nil(err)
s.NotNil(scanner)
for scanner.Valid() {
k := scanner.Key()
err = txn.Delete(k)
s.Nil(err)
scanner.Next()
}
err = txn.Commit(context.Background())
s.Nil(err)
err = s.store.Close()
s.Nil(err)
}
func (s *testSnapshotSuite) beginTxn() transaction.TxnProbe {
txn, err := s.store.Begin()
s.Require().Nil(err)
return txn
}
func (s *testSnapshotSuite) checkAll(keys [][]byte) {
txn := s.beginTxn()
snapshot := txn.GetSnapshot()
m, err := snapshot.BatchGet(context.Background(), keys)
s.Nil(err)
scan, err := txn.Iter(encodeKey(s.prefix, ""), nil)
s.Nil(err)
cnt := 0
for scan.Valid() {
cnt++
k := scan.Key()
v := scan.Value()
v2, ok := m[string(k)]
s.True(ok, fmt.Sprintf("key: %q", k))
s.Equal(v, v2)
scan.Next()
}
err = txn.Commit(context.Background())
s.Nil(err)
s.Len(m, cnt)
}
func (s *testSnapshotSuite) deleteKeys(keys [][]byte) {
txn := s.beginTxn()
for _, k := range keys {
err := txn.Delete(k)
s.Nil(err)
}
err := txn.Commit(context.Background())
s.Nil(err)
}
func (s *testSnapshotSuite) TestBatchGet() {
for _, rowNum := range s.rowNums {
s.T().Logf("test BatchGet, length=%v", rowNum)
txn := s.beginTxn()
for i := 0; i < rowNum; i++ {
k := encodeKey(s.prefix, s08d("key", i))
err := txn.Set(k, valueBytes(i))
s.Nil(err)
}
err := txn.Commit(context.Background())
s.Nil(err)
keys := makeKeys(rowNum, s.prefix)
s.checkAll(keys)
s.deleteKeys(keys)
}
}
func (s *testSnapshotSuite) TestSnapshotCache() {
txn := s.beginTxn()
s.Nil(txn.Set([]byte("x"), []byte("x")))
s.Nil(txn.Delete([]byte("y"))) // delete should also be cached
s.Nil(txn.Set([]byte("a"), []byte("a")))
s.Nil(txn.Delete([]byte("b")))
s.Nil(txn.Commit(context.Background()))
txn = s.beginTxn()
snapshot := txn.GetSnapshot()
// generate cache by BatchGet
_, err := snapshot.BatchGet(context.Background(), [][]byte{[]byte("x"), []byte("y")})
s.Nil(err)
// generate cache by Get
_, err = snapshot.Get(context.Background(), []byte("a"))
s.Nil(err)
_, err = snapshot.Get(context.Background(), []byte("b"))
s.True(error.IsErrNotFound(err))
s.Nil(failpoint.Enable("tikvclient/snapshot-get-cache-fail", `return(true)`))
ctx := context.WithValue(context.Background(), "TestSnapshotCache", true)
// check cache from BatchGet
value, err := snapshot.Get(ctx, []byte("x"))
s.Nil(err)
s.Equal([]byte("x"), value)
_, err = snapshot.Get(ctx, []byte("y"))
s.True(error.IsErrNotFound(err))
// check cache from Get
value, err = snapshot.Get(ctx, []byte("a"))
s.Nil(err)
s.Equal([]byte("a"), value)
_, err = snapshot.Get(ctx, []byte("b"))
s.True(error.IsErrNotFound(err))
s.Nil(failpoint.Disable("tikvclient/snapshot-get-cache-fail"))
}
func (s *testSnapshotSuite) TestBatchGetNotExist() {
for _, rowNum := range s.rowNums {
s.T().Logf("test BatchGetNotExist, length=%v", rowNum)
txn := s.beginTxn()
for i := 0; i < rowNum; i++ {
k := encodeKey(s.prefix, s08d("key", i))
err := txn.Set(k, valueBytes(i))
s.Nil(err)
}
err := txn.Commit(context.Background())
s.Nil(err)
keys := makeKeys(rowNum, s.prefix)
keys = append(keys, []byte("noSuchKey"))
s.checkAll(keys)
s.deleteKeys(keys)
}
}
func makeKeys(rowNum int, prefix string) [][]byte {
keys := make([][]byte, 0, rowNum)
for i := 0; i < rowNum; i++ {
k := encodeKey(prefix, s08d("key", i))
keys = append(keys, k)
}
return keys
}
func (s *testSnapshotSuite) TestSkipLargeTxnLock() {
x := []byte("x_key_TestSkipLargeTxnLock")
y := []byte("y_key_TestSkipLargeTxnLock")
txn := s.beginTxn()
s.Nil(txn.Set(x, []byte("x")))
s.Nil(txn.Set(y, []byte("y")))
ctx := context.Background()
committer, err := txn.NewCommitter(0)
s.Nil(err)
committer.SetLockTTL(3000)
s.Nil(committer.PrewriteAllMutations(ctx))
txn1 := s.beginTxn()
// txn1 is not blocked by txn in the large txn protocol.
_, err = txn1.Get(ctx, x)
s.True(error.IsErrNotFound(err))
res, err := toTiDBTxn(&txn1).BatchGet(ctx, toTiDBKeys([][]byte{x, y, []byte("z")}))
s.Nil(err)
s.Len(res, 0)
// Commit txn, check the final commit ts is pushed.
committer.SetCommitTS(txn.StartTS() + 1)
s.Nil(committer.CommitMutations(ctx))
status, err := s.store.GetLockResolver().GetTxnStatus(txn.StartTS(), 0, x)
s.Nil(err)
s.True(status.IsCommitted())
s.Greater(status.CommitTS(), txn1.StartTS())
}
func (s *testSnapshotSuite) TestPointGetSkipTxnLock() {
x := []byte("x_key_TestPointGetSkipTxnLock")
y := []byte("y_key_TestPointGetSkipTxnLock")
txn := s.beginTxn()
s.Nil(txn.Set(x, []byte("x")))
s.Nil(txn.Set(y, []byte("y")))
ctx := context.Background()
committer, err := txn.NewCommitter(0)
s.Nil(err)
committer.SetLockTTL(3000)
s.Nil(committer.PrewriteAllMutations(ctx))
snapshot := s.store.GetSnapshot(math.MaxUint64)
start := time.Now()
s.Equal(committer.GetPrimaryKey(), x)
// Point get secondary key. Shouldn't be blocked by the lock and read old data.
_, err = snapshot.Get(ctx, y)
s.True(error.IsErrNotFound(err))
s.Less(time.Since(start), 500*time.Millisecond)
// Commit the primary key
committer.SetCommitTS(txn.StartTS() + 1)
committer.CommitMutations(ctx)
snapshot = s.store.GetSnapshot(math.MaxUint64)
start = time.Now()
// Point get secondary key. Should read committed data.
value, err := snapshot.Get(ctx, y)
s.Nil(err)
s.Equal(value, []byte("y"))
s.Less(time.Since(start), 500*time.Millisecond)
}
func (s *testSnapshotSuite) TestSnapshotThreadSafe() {
txn := s.beginTxn()
key := []byte("key_test_snapshot_threadsafe")
s.Nil(txn.Set(key, []byte("x")))
ctx := context.Background()
err := txn.Commit(context.Background())
s.Nil(err)
snapshot := s.store.GetSnapshot(math.MaxUint64)
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
for i := 0; i < 30; i++ {
_, err := snapshot.Get(ctx, key)
s.Nil(err)
_, err = snapshot.BatchGet(ctx, [][]byte{key, []byte("key_not_exist")})
s.Nil(err)
}
wg.Done()
}()
}
wg.Wait()
}
func (s *testSnapshotSuite) TestSnapshotRuntimeStats() {
reqStats := tikv.NewRegionRequestRuntimeStats()
reqStats.RecordRPCRuntimeStats(tikvrpc.CmdGet, time.Second)
reqStats.RecordRPCRuntimeStats(tikvrpc.CmdGet, time.Millisecond)
snapshot := s.store.GetSnapshot(0)
runtimeStats := &txnkv.SnapshotRuntimeStats{}
snapshot.SetRuntimeStats(runtimeStats)
snapshot.MergeRegionRequestStats(reqStats)
snapshot.MergeRegionRequestStats(reqStats)
bo := tikv.NewBackofferWithVars(context.Background(), 2000, nil)
err := bo.BackoffWithMaxSleepTxnLockFast(5, errors.New("test"))
s.Nil(err)
snapshot.RecordBackoffInfo(bo)
snapshot.RecordBackoffInfo(bo)
expect := "Get:{num_rpc:4, total_time:2s},txnLockFast_backoff:{num:2, total_time:10ms}"
s.Equal(expect, snapshot.FormatStats())
s.Equal(int64(4), runtimeStats.GetCmdRPCCount(tikvrpc.CmdGet))
detail := &kvrpcpb.ExecDetailsV2{
TimeDetail: &kvrpcpb.TimeDetail{
WaitWallTimeMs: 100,
ProcessWallTimeMs: 100,
},
ScanDetailV2: &kvrpcpb.ScanDetailV2{
ProcessedVersions: 10,
ProcessedVersionsSize: 10,
TotalVersions: 15,
GetSnapshotNanos: 500,
RocksdbBlockReadCount: 20,
RocksdbBlockReadByte: 15,
RocksdbDeleteSkippedCount: 5,
RocksdbKeySkippedCount: 1,
RocksdbBlockCacheHitCount: 10,
},
}
snapshot.MergeExecDetail(detail)
expect = "Get:{num_rpc:4, total_time:2s},txnLockFast_backoff:{num:2, total_time:10ms}, " +
"time_detail: {total_process_time: 100ms, total_wait_time: 100ms}, " +
"scan_detail: {total_process_keys: 10, total_process_keys_size: 10, total_keys: 15, get_snapshot_time: 500ns, " +
"rocksdb: {delete_skipped_count: 5, key_skipped_count: 1, block: {cache_hit_count: 10, read_count: 20, read_byte: 15 Bytes}}}"
s.Equal(expect, snapshot.FormatStats())
snapshot.MergeExecDetail(detail)
expect = "Get:{num_rpc:4, total_time:2s},txnLockFast_backoff:{num:2, total_time:10ms}, " +
"time_detail: {total_process_time: 200ms, total_wait_time: 200ms}, " +
"scan_detail: {total_process_keys: 20, total_process_keys_size: 20, total_keys: 30, get_snapshot_time: 1µs, " +
"rocksdb: {delete_skipped_count: 10, key_skipped_count: 2, block: {cache_hit_count: 20, read_count: 40, read_byte: 30 Bytes}}}"
s.Equal(expect, snapshot.FormatStats())
snapshot.GetResolveLockDetail().ResolveLockTime = int64(time.Second)
expect = "Get:{num_rpc:4, total_time:2s},txnLockFast_backoff:{num:2, total_time:10ms}, " +
"time_detail: {total_process_time: 200ms, total_wait_time: 200ms}, " +
"resolve_lock_time:1s, " +
"scan_detail: {total_process_keys: 20, total_process_keys_size: 20, total_keys: 30, get_snapshot_time: 1µs, " +
"rocksdb: {delete_skipped_count: 10, key_skipped_count: 2, block: {cache_hit_count: 20, read_count: 40, read_byte: 30 Bytes}}}"
s.Equal(expect, snapshot.FormatStats())
}
func (s *testSnapshotSuite) TestRCRead() {
for _, rowNum := range s.rowNums {
s.T().Logf("test RC Read, length=%v", rowNum)
txn := s.beginTxn()
keys := makeKeys(rowNum, s.prefix)
for i, k := range keys {
err := txn.Set(k, valueBytes(i))
s.Nil(err)
}
err := txn.Commit(context.Background())
s.Nil(err)
key0 := encodeKey(s.prefix, s08d("key", 0))
txn1 := s.beginTxn()
txn1.Set(key0, valueBytes(1))
committer1, err := txn1.NewCommitter(1)
s.Nil(err)
err = committer1.PrewriteAllMutations(context.Background())
s.Nil(err)
var meetLocks []*txnkv.Lock
resolver := tikv.NewLockResolverProb(s.store.GetLockResolver())
resolver.SetMeetLockCallback(func(locks []*txnkv.Lock) {
meetLocks = append(meetLocks, locks...)
})
// RC read
txn2 := s.beginTxn()
snapshot := txn2.GetSnapshot()
snapshot.SetIsolationLevel(txnsnapshot.RC)
// get
v, err := snapshot.Get(context.Background(), key0)
s.Nil(err)
s.Equal(len(meetLocks), 0)
s.Equal(v, valueBytes(0))
// batch get
m, err := snapshot.BatchGet(context.Background(), keys)
s.Nil(err)
s.Equal(len(meetLocks), 0)
s.Equal(len(m), rowNum)
for i, k := range keys {
s.Equal(m[string(k)], valueBytes(i))
}
committer1.Cleanup(context.Background())
s.deleteKeys(keys)
}
}
func (s *testSnapshotSuite) TestSnapshotCacheBypassMaxUint64() {
txn := s.beginTxn()
s.Nil(txn.Set([]byte("x"), []byte("x")))
s.Nil(txn.Set([]byte("y"), []byte("y")))
s.Nil(txn.Set([]byte("z"), []byte("z")))
s.Nil(txn.Commit(context.Background()))
// cache version < math.MaxUint64
startTS, err := s.store.GetTimestampWithRetry(tikv.NewNoopBackoff(context.Background()), oracle.GlobalTxnScope)
s.Nil(err)
snapshot := s.store.GetSnapshot(startTS)
snapshot.Get(context.Background(), []byte("x"))
snapshot.BatchGet(context.Background(), [][]byte{[]byte("y"), []byte("z")})
s.Equal(snapshot.SnapCache(), map[string][]byte{
"x": []byte("x"),
"y": []byte("y"),
"z": []byte("z"),
})
// not cache version == math.MaxUint64
snapshot = s.store.GetSnapshot(math.MaxUint64)
snapshot.Get(context.Background(), []byte("x"))
snapshot.BatchGet(context.Background(), [][]byte{[]byte("y"), []byte("z")})
s.Empty(snapshot.SnapCache())
}