mirror of https://github.com/tikv/client-go.git
266 lines
6.8 KiB
Go
266 lines
6.8 KiB
Go
// Copyright 2021 TiKV Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// NOTE: The code in this file is based on code from the
|
|
// TiDB project, licensed under the Apache License v 2.0
|
|
//
|
|
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/tests/range_task_test.go
|
|
//
|
|
|
|
// Copyright 2019 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package tikv_test
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"sort"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/suite"
|
|
"github.com/tikv/client-go/v2/kv"
|
|
"github.com/tikv/client-go/v2/testutils"
|
|
"github.com/tikv/client-go/v2/tikv"
|
|
"github.com/tikv/client-go/v2/txnkv/rangetask"
|
|
)
|
|
|
|
func TestRangeTask(t *testing.T) {
|
|
suite.Run(t, new(testRangeTaskSuite))
|
|
}
|
|
|
|
type testRangeTaskSuite struct {
|
|
suite.Suite
|
|
cluster testutils.Cluster
|
|
store *tikv.KVStore
|
|
|
|
testRanges []kv.KeyRange
|
|
expectedRanges [][]kv.KeyRange
|
|
}
|
|
|
|
func makeRange(startKey string, endKey string) kv.KeyRange {
|
|
return kv.KeyRange{
|
|
StartKey: []byte(startKey),
|
|
EndKey: []byte(endKey),
|
|
}
|
|
}
|
|
|
|
func (s *testRangeTaskSuite) SetupTest() {
|
|
// Split the store at "a" to "z"
|
|
splitKeys := make([][]byte, 0)
|
|
for k := byte('a'); k <= byte('z'); k++ {
|
|
splitKeys = append(splitKeys, []byte{k})
|
|
}
|
|
|
|
// Calculate all region's ranges
|
|
allRegionRanges := []kv.KeyRange{makeRange("", "a")}
|
|
for i := 0; i < len(splitKeys)-1; i++ {
|
|
allRegionRanges = append(allRegionRanges, kv.KeyRange{
|
|
StartKey: splitKeys[i],
|
|
EndKey: splitKeys[i+1],
|
|
})
|
|
}
|
|
allRegionRanges = append(allRegionRanges, makeRange("z", ""))
|
|
|
|
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
|
|
s.Require().Nil(err)
|
|
testutils.BootstrapWithMultiRegions(cluster, splitKeys...)
|
|
s.cluster = cluster
|
|
|
|
store, err := tikv.NewTestTiKVStore(client, pdClient, nil, nil, 0)
|
|
s.Require().Nil(err)
|
|
|
|
s.store = store
|
|
|
|
s.testRanges = []kv.KeyRange{
|
|
makeRange("", ""),
|
|
makeRange("", "b"),
|
|
makeRange("b", ""),
|
|
makeRange("b", "x"),
|
|
makeRange("a", "d"),
|
|
makeRange("a\x00", "d\x00"),
|
|
makeRange("a\xff\xff\xff", "c\xff\xff\xff"),
|
|
makeRange("a1", "a2"),
|
|
makeRange("a", "a"),
|
|
makeRange("a3", "a3"),
|
|
}
|
|
|
|
s.expectedRanges = [][]kv.KeyRange{
|
|
allRegionRanges,
|
|
allRegionRanges[:2],
|
|
allRegionRanges[2:],
|
|
allRegionRanges[2:24],
|
|
{
|
|
makeRange("a", "b"),
|
|
makeRange("b", "c"),
|
|
makeRange("c", "d"),
|
|
},
|
|
{
|
|
makeRange("a\x00", "b"),
|
|
makeRange("b", "c"),
|
|
makeRange("c", "d"),
|
|
makeRange("d", "d\x00"),
|
|
},
|
|
{
|
|
makeRange("a\xff\xff\xff", "b"),
|
|
makeRange("b", "c"),
|
|
makeRange("c", "c\xff\xff\xff"),
|
|
},
|
|
{
|
|
makeRange("a1", "a2"),
|
|
},
|
|
{},
|
|
{},
|
|
}
|
|
}
|
|
|
|
func (s *testRangeTaskSuite) TearDownTest() {
|
|
err := s.store.Close()
|
|
s.Require().Nil(err)
|
|
}
|
|
|
|
func collect(c chan *kv.KeyRange) []kv.KeyRange {
|
|
c <- nil
|
|
ranges := make([]kv.KeyRange, 0)
|
|
|
|
for {
|
|
r := <-c
|
|
if r == nil {
|
|
break
|
|
}
|
|
|
|
ranges = append(ranges, *r)
|
|
}
|
|
return ranges
|
|
}
|
|
|
|
func (s *testRangeTaskSuite) checkRanges(obtained []kv.KeyRange, expected []kv.KeyRange) {
|
|
sort.Slice(obtained, func(i, j int) bool {
|
|
return bytes.Compare(obtained[i].StartKey, obtained[j].StartKey) < 0
|
|
})
|
|
|
|
s.Equal(obtained, expected)
|
|
}
|
|
|
|
func batchRanges(ranges []kv.KeyRange, batchSize int) []kv.KeyRange {
|
|
result := make([]kv.KeyRange, 0, len(ranges))
|
|
|
|
for i := 0; i < len(ranges); i += batchSize {
|
|
lastRange := i + batchSize - 1
|
|
if lastRange >= len(ranges) {
|
|
lastRange = len(ranges) - 1
|
|
}
|
|
|
|
result = append(result, kv.KeyRange{
|
|
StartKey: ranges[i].StartKey,
|
|
EndKey: ranges[lastRange].EndKey,
|
|
})
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func (s *testRangeTaskSuite) testRangeTaskImpl(concurrency int) {
|
|
s.T().Logf("Test RangeTask, concurrency: %v", concurrency)
|
|
|
|
ranges := make(chan *kv.KeyRange, 100)
|
|
|
|
handler := func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) {
|
|
ranges <- &r
|
|
stat := rangetask.TaskStat{
|
|
CompletedRegions: 1,
|
|
}
|
|
return stat, nil
|
|
}
|
|
|
|
runner := rangetask.NewRangeTaskRunner(
|
|
"test-runner",
|
|
s.store,
|
|
concurrency,
|
|
handler,
|
|
)
|
|
|
|
for regionsPerTask := 1; regionsPerTask <= 5; regionsPerTask++ {
|
|
for i, r := range s.testRanges {
|
|
runner.SetRegionsPerTask(regionsPerTask)
|
|
|
|
expectedRanges := batchRanges(s.expectedRanges[i], regionsPerTask)
|
|
|
|
err := runner.RunOnRange(context.Background(), r.StartKey, r.EndKey)
|
|
s.Nil(err)
|
|
s.checkRanges(collect(ranges), expectedRanges)
|
|
s.Equal(runner.CompletedRegions(), len(expectedRanges))
|
|
s.Equal(runner.FailedRegions(), 0)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *testRangeTaskSuite) TestRangeTask() {
|
|
for concurrency := 1; concurrency < 5; concurrency++ {
|
|
s.testRangeTaskImpl(concurrency)
|
|
}
|
|
}
|
|
|
|
func (s *testRangeTaskSuite) testRangeTaskErrorImpl(concurrency int) {
|
|
for i, r := range s.testRanges {
|
|
// Iterate all sub tasks and make it an error
|
|
subRanges := s.expectedRanges[i]
|
|
for _, subRange := range subRanges {
|
|
errKey := subRange.StartKey
|
|
s.T().Logf("Test RangeTask Error concurrency: %v, range: [%+q, %+q), errKey: %+q", concurrency, r.StartKey, r.EndKey, errKey)
|
|
|
|
handler := func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) {
|
|
stat := rangetask.TaskStat{CompletedRegions: 0, FailedRegions: 0}
|
|
if bytes.Equal(r.StartKey, errKey) {
|
|
stat.FailedRegions++
|
|
return stat, errors.New("test error")
|
|
}
|
|
stat.CompletedRegions++
|
|
return stat, nil
|
|
}
|
|
|
|
runner := rangetask.NewRangeTaskRunner(
|
|
"test-error-runner",
|
|
s.store,
|
|
concurrency,
|
|
handler,
|
|
)
|
|
runner.SetRegionsPerTask(1)
|
|
err := runner.RunOnRange(context.Background(), r.StartKey, r.EndKey)
|
|
// RunOnRange returns no error only when all sub tasks are done successfully.
|
|
s.NotNil(err)
|
|
s.Less(runner.CompletedRegions(), len(subRanges))
|
|
s.Equal(runner.FailedRegions(), 1)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *testRangeTaskSuite) TestRangeTaskError() {
|
|
for concurrency := 1; concurrency < 5; concurrency++ {
|
|
s.testRangeTaskErrorImpl(concurrency)
|
|
}
|
|
}
|