client-go/integration_tests/range_task_test.go

266 lines
6.8 KiB
Go

// Copyright 2021 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// NOTE: The code in this file is based on code from the
// TiDB project, licensed under the Apache License v 2.0
//
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/tests/range_task_test.go
//
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tikv_test
import (
"bytes"
"context"
"errors"
"sort"
"testing"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/rangetask"
)
func TestRangeTask(t *testing.T) {
suite.Run(t, new(testRangeTaskSuite))
}
type testRangeTaskSuite struct {
suite.Suite
cluster testutils.Cluster
store *tikv.KVStore
testRanges []kv.KeyRange
expectedRanges [][]kv.KeyRange
}
func makeRange(startKey string, endKey string) kv.KeyRange {
return kv.KeyRange{
StartKey: []byte(startKey),
EndKey: []byte(endKey),
}
}
func (s *testRangeTaskSuite) SetupTest() {
// Split the store at "a" to "z"
splitKeys := make([][]byte, 0)
for k := byte('a'); k <= byte('z'); k++ {
splitKeys = append(splitKeys, []byte{k})
}
// Calculate all region's ranges
allRegionRanges := []kv.KeyRange{makeRange("", "a")}
for i := 0; i < len(splitKeys)-1; i++ {
allRegionRanges = append(allRegionRanges, kv.KeyRange{
StartKey: splitKeys[i],
EndKey: splitKeys[i+1],
})
}
allRegionRanges = append(allRegionRanges, makeRange("z", ""))
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
s.Require().Nil(err)
testutils.BootstrapWithMultiRegions(cluster, splitKeys...)
s.cluster = cluster
store, err := tikv.NewTestTiKVStore(client, pdClient, nil, nil, 0)
s.Require().Nil(err)
s.store = store
s.testRanges = []kv.KeyRange{
makeRange("", ""),
makeRange("", "b"),
makeRange("b", ""),
makeRange("b", "x"),
makeRange("a", "d"),
makeRange("a\x00", "d\x00"),
makeRange("a\xff\xff\xff", "c\xff\xff\xff"),
makeRange("a1", "a2"),
makeRange("a", "a"),
makeRange("a3", "a3"),
}
s.expectedRanges = [][]kv.KeyRange{
allRegionRanges,
allRegionRanges[:2],
allRegionRanges[2:],
allRegionRanges[2:24],
{
makeRange("a", "b"),
makeRange("b", "c"),
makeRange("c", "d"),
},
{
makeRange("a\x00", "b"),
makeRange("b", "c"),
makeRange("c", "d"),
makeRange("d", "d\x00"),
},
{
makeRange("a\xff\xff\xff", "b"),
makeRange("b", "c"),
makeRange("c", "c\xff\xff\xff"),
},
{
makeRange("a1", "a2"),
},
{},
{},
}
}
func (s *testRangeTaskSuite) TearDownTest() {
err := s.store.Close()
s.Require().Nil(err)
}
func collect(c chan *kv.KeyRange) []kv.KeyRange {
c <- nil
ranges := make([]kv.KeyRange, 0)
for {
r := <-c
if r == nil {
break
}
ranges = append(ranges, *r)
}
return ranges
}
func (s *testRangeTaskSuite) checkRanges(obtained []kv.KeyRange, expected []kv.KeyRange) {
sort.Slice(obtained, func(i, j int) bool {
return bytes.Compare(obtained[i].StartKey, obtained[j].StartKey) < 0
})
s.Equal(obtained, expected)
}
func batchRanges(ranges []kv.KeyRange, batchSize int) []kv.KeyRange {
result := make([]kv.KeyRange, 0, len(ranges))
for i := 0; i < len(ranges); i += batchSize {
lastRange := i + batchSize - 1
if lastRange >= len(ranges) {
lastRange = len(ranges) - 1
}
result = append(result, kv.KeyRange{
StartKey: ranges[i].StartKey,
EndKey: ranges[lastRange].EndKey,
})
}
return result
}
func (s *testRangeTaskSuite) testRangeTaskImpl(concurrency int) {
s.T().Logf("Test RangeTask, concurrency: %v", concurrency)
ranges := make(chan *kv.KeyRange, 100)
handler := func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) {
ranges <- &r
stat := rangetask.TaskStat{
CompletedRegions: 1,
}
return stat, nil
}
runner := rangetask.NewRangeTaskRunner(
"test-runner",
s.store,
concurrency,
handler,
)
for regionsPerTask := 1; regionsPerTask <= 5; regionsPerTask++ {
for i, r := range s.testRanges {
runner.SetRegionsPerTask(regionsPerTask)
expectedRanges := batchRanges(s.expectedRanges[i], regionsPerTask)
err := runner.RunOnRange(context.Background(), r.StartKey, r.EndKey)
s.Nil(err)
s.checkRanges(collect(ranges), expectedRanges)
s.Equal(runner.CompletedRegions(), len(expectedRanges))
s.Equal(runner.FailedRegions(), 0)
}
}
}
func (s *testRangeTaskSuite) TestRangeTask() {
for concurrency := 1; concurrency < 5; concurrency++ {
s.testRangeTaskImpl(concurrency)
}
}
func (s *testRangeTaskSuite) testRangeTaskErrorImpl(concurrency int) {
for i, r := range s.testRanges {
// Iterate all sub tasks and make it an error
subRanges := s.expectedRanges[i]
for _, subRange := range subRanges {
errKey := subRange.StartKey
s.T().Logf("Test RangeTask Error concurrency: %v, range: [%+q, %+q), errKey: %+q", concurrency, r.StartKey, r.EndKey, errKey)
handler := func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) {
stat := rangetask.TaskStat{CompletedRegions: 0, FailedRegions: 0}
if bytes.Equal(r.StartKey, errKey) {
stat.FailedRegions++
return stat, errors.New("test error")
}
stat.CompletedRegions++
return stat, nil
}
runner := rangetask.NewRangeTaskRunner(
"test-error-runner",
s.store,
concurrency,
handler,
)
runner.SetRegionsPerTask(1)
err := runner.RunOnRange(context.Background(), r.StartKey, r.EndKey)
// RunOnRange returns no error only when all sub tasks are done successfully.
s.NotNil(err)
s.Less(runner.CompletedRegions(), len(subRanges))
s.Equal(runner.FailedRegions(), 1)
}
}
}
func (s *testRangeTaskSuite) TestRangeTaskError() {
for concurrency := 1; concurrency < 5; concurrency++ {
s.testRangeTaskErrorImpl(concurrency)
}
}