mirror of https://github.com/tikv/client-go.git
txnkv/*: init txnkv/rangetask package (#234)
This commit is contained in:
parent
9a71d1d8e3
commit
0539869024
|
|
@ -85,19 +85,16 @@ func main() {
|
|||
// delete range
|
||||
// Be careful while using this API. This API doesn't keep recent MVCC versions, but will delete all versions of all keys
|
||||
// in the range immediately. Also notice that frequent invocation to this API may cause performance problems to TiKV.
|
||||
task := tikv.NewDeleteRangeTask(client, []byte("b"), []byte("c0"), 1)
|
||||
err = task.Execute(ctx)
|
||||
completedRegions, err := client.DeleteRange(ctx, []byte("b"), []byte("c0"), 1)
|
||||
panicWhenErrNotNil(err)
|
||||
fmt.Println("Delete Range [b,c0) success, the number of affacted regions: ", task.CompletedRegions())
|
||||
fmt.Println("Delete Range [b,c0) success, the number of affacted regions: ", completedRegions)
|
||||
|
||||
task = tikv.NewDeleteRangeTask(client, []byte("d0"), []byte("d0"), 1)
|
||||
task.Execute(ctx)
|
||||
completedRegions, err = client.DeleteRange(ctx, []byte("d0"), []byte("d0"), 1)
|
||||
panicWhenErrNotNil(err)
|
||||
fmt.Println("Delete Range [d0,d0) success, the number of affacted regions: ", task.CompletedRegions())
|
||||
fmt.Println("Delete Range [d0,d0) success, the number of affacted regions: ", completedRegions)
|
||||
|
||||
task = tikv.NewDeleteRangeTask(client, []byte("a"), []byte("e"), 1)
|
||||
task.Execute(ctx)
|
||||
completedRegions, err = client.DeleteRange(ctx, []byte("a"), []byte("e"), 1)
|
||||
panicWhenErrNotNil(err)
|
||||
fmt.Println("Delete Range [a,e) success, the number of affacted regions: ", task.CompletedRegions())
|
||||
fmt.Println("Delete Range [a,e) success, the number of affacted regions: ", completedRegions)
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -106,12 +106,10 @@ func (s *testDeleteRangeSuite) checkData(expectedData map[string]string) {
|
|||
}
|
||||
|
||||
func (s *testDeleteRangeSuite) deleteRange(startKey []byte, endKey []byte) int {
|
||||
task := tikv.NewDeleteRangeTask(s.store, startKey, endKey, 1)
|
||||
|
||||
err := task.Execute(context.Background())
|
||||
completedRegions, err := s.store.DeleteRange(context.Background(), startKey, endKey, 1)
|
||||
s.Nil(err)
|
||||
|
||||
return task.CompletedRegions()
|
||||
return completedRegions
|
||||
}
|
||||
|
||||
// deleteRangeFromMap deletes all keys in a given range from a map
|
||||
|
|
@ -156,7 +154,6 @@ func (s *testDeleteRangeSuite) TestDeleteRange() {
|
|||
s.checkData(testData)
|
||||
|
||||
s.mustDeleteRange([]byte("b"), []byte("c0"), testData, 2)
|
||||
s.mustDeleteRange([]byte("c11"), []byte("c12"), testData, 1)
|
||||
s.mustDeleteRange([]byte("d0"), []byte("d0"), testData, 0)
|
||||
s.mustDeleteRange([]byte("d0\x00"), []byte("d1\x00"), testData, 1)
|
||||
s.mustDeleteRange([]byte("c5"), []byte("d5"), testData, 2)
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ import (
|
|||
"github.com/tikv/client-go/v2/kv"
|
||||
"github.com/tikv/client-go/v2/testutils"
|
||||
"github.com/tikv/client-go/v2/tikv"
|
||||
"github.com/tikv/client-go/v2/txnkv/rangetask"
|
||||
)
|
||||
|
||||
func TestRangeTask(t *testing.T) {
|
||||
|
|
@ -186,15 +187,15 @@ func (s *testRangeTaskSuite) testRangeTaskImpl(concurrency int) {
|
|||
|
||||
ranges := make(chan *kv.KeyRange, 100)
|
||||
|
||||
handler := func(ctx context.Context, r kv.KeyRange) (tikv.RangeTaskStat, error) {
|
||||
handler := func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) {
|
||||
ranges <- &r
|
||||
stat := tikv.RangeTaskStat{
|
||||
stat := rangetask.TaskStat{
|
||||
CompletedRegions: 1,
|
||||
}
|
||||
return stat, nil
|
||||
}
|
||||
|
||||
runner := tikv.NewRangeTaskRunner("test-runner", s.store, concurrency, handler)
|
||||
runner := rangetask.NewRangeTaskRunner("test-runner", s.store, concurrency, handler)
|
||||
|
||||
for regionsPerTask := 1; regionsPerTask <= 5; regionsPerTask++ {
|
||||
for i, r := range s.testRanges {
|
||||
|
|
@ -225,18 +226,17 @@ func (s *testRangeTaskSuite) testRangeTaskErrorImpl(concurrency int) {
|
|||
errKey := subRange.StartKey
|
||||
s.T().Logf("Test RangeTask Error concurrency: %v, range: [%+q, %+q), errKey: %+q", concurrency, r.StartKey, r.EndKey, errKey)
|
||||
|
||||
handler := func(ctx context.Context, r kv.KeyRange) (tikv.RangeTaskStat, error) {
|
||||
stat := tikv.RangeTaskStat{CompletedRegions: 0, FailedRegions: 0}
|
||||
handler := func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) {
|
||||
stat := rangetask.TaskStat{CompletedRegions: 0, FailedRegions: 0}
|
||||
if bytes.Equal(r.StartKey, errKey) {
|
||||
stat.FailedRegions++
|
||||
return stat, errors.New("test error")
|
||||
|
||||
}
|
||||
stat.CompletedRegions++
|
||||
return stat, nil
|
||||
}
|
||||
|
||||
runner := tikv.NewRangeTaskRunner("test-error-runner", s.store, concurrency, handler)
|
||||
runner := rangetask.NewRangeTaskRunner("test-error-runner", s.store, concurrency, handler)
|
||||
runner.SetRegionsPerTask(1)
|
||||
err := runner.RunOnRange(context.Background(), r.StartKey, r.EndKey)
|
||||
// RunOnRange returns no error only when all sub tasks are done successfully.
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/tikv/client-go/v2/internal/retry"
|
||||
"github.com/tikv/client-go/v2/kv"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
"github.com/tikv/client-go/v2/txnkv/rangetask"
|
||||
zap "go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
|
@ -51,11 +52,11 @@ func (s *KVStore) GC(ctx context.Context, safepoint uint64) (newSafePoint uint64
|
|||
}
|
||||
|
||||
func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrency int) error {
|
||||
handler := func(ctx context.Context, r kv.KeyRange) (RangeTaskStat, error) {
|
||||
handler := func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) {
|
||||
return s.resolveLocksForRange(ctx, safePoint, r.StartKey, r.EndKey)
|
||||
}
|
||||
|
||||
runner := NewRangeTaskRunner("resolve-locks-runner", s, concurrency, handler)
|
||||
runner := rangetask.NewRangeTaskRunner("resolve-locks-runner", s, concurrency, handler)
|
||||
// Run resolve lock on the whole TiKV cluster. Empty keys means the range is unbounded.
|
||||
err := runner.RunOnRange(ctx, []byte(""), []byte(""))
|
||||
if err != nil {
|
||||
|
|
@ -67,12 +68,12 @@ func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrenc
|
|||
// We don't want gc to sweep out the cached info belong to other processes, like coprocessor.
|
||||
const gcScanLockLimit = ResolvedCacheSize / 2
|
||||
|
||||
func (s *KVStore) resolveLocksForRange(ctx context.Context, safePoint uint64, startKey []byte, endKey []byte) (RangeTaskStat, error) {
|
||||
func (s *KVStore) resolveLocksForRange(ctx context.Context, safePoint uint64, startKey []byte, endKey []byte) (rangetask.TaskStat, error) {
|
||||
// for scan lock request, we must return all locks even if they are generated
|
||||
// by the same transaction. because gc worker need to make sure all locks have been
|
||||
// cleaned.
|
||||
|
||||
var stat RangeTaskStat
|
||||
var stat rangetask.TaskStat
|
||||
key := startKey
|
||||
bo := NewGcResolveLockMaxBackoffer(ctx)
|
||||
for {
|
||||
|
|
|
|||
13
tikv/kv.go
13
tikv/kv.go
|
|
@ -59,6 +59,7 @@ import (
|
|||
"github.com/tikv/client-go/v2/oracle"
|
||||
"github.com/tikv/client-go/v2/oracle/oracles"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
"github.com/tikv/client-go/v2/txnkv/rangetask"
|
||||
"github.com/tikv/client-go/v2/util"
|
||||
pd "github.com/tikv/pd/client"
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
|
|
@ -283,6 +284,18 @@ func (s *KVStore) BeginWithOption(options StartTSOption) (*KVTxn, error) {
|
|||
return newTiKVTxnWithOptions(s, options)
|
||||
}
|
||||
|
||||
// DeleteRange delete all versions of all keys in the range[startKey,endKey) immediately.
|
||||
// Be careful while using this API. This API doesn't keep recent MVCC versions, but will delete all versions of all keys
|
||||
// in the range immediately. Also notice that frequent invocation to this API may cause performance problems to TiKV.
|
||||
func (s *KVStore) DeleteRange(ctx context.Context, startKey []byte, endKey []byte, concurrency int) (completedRegions int, err error) {
|
||||
task := rangetask.NewDeleteRangeTask(s, startKey, endKey, concurrency)
|
||||
err = task.Execute(ctx)
|
||||
if err == nil {
|
||||
completedRegions = task.CompletedRegions()
|
||||
}
|
||||
return completedRegions, err
|
||||
}
|
||||
|
||||
// GetSnapshot gets a snapshot that is able to read any data which data is <= ver.
|
||||
// if ts is MaxVersion or > current max committed version, we will use current version for this snapshot.
|
||||
func (s *KVStore) GetSnapshot(ts uint64) *KVSnapshot {
|
||||
|
|
|
|||
|
|
@ -51,6 +51,7 @@ import (
|
|||
"github.com/tikv/client-go/v2/internal/retry"
|
||||
"github.com/tikv/client-go/v2/kv"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
"github.com/tikv/client-go/v2/txnkv/rangetask"
|
||||
"github.com/tikv/client-go/v2/util"
|
||||
pd "github.com/tikv/pd/client"
|
||||
"go.uber.org/zap"
|
||||
|
|
@ -352,7 +353,7 @@ func (s *KVStore) WaitScatterRegionFinish(ctx context.Context, regionID uint64,
|
|||
|
||||
// CheckRegionInScattering uses to check whether scatter region finished.
|
||||
func (s *KVStore) CheckRegionInScattering(regionID uint64) (bool, error) {
|
||||
bo := retry.NewBackofferWithVars(context.Background(), locateRegionMaxBackoff, nil)
|
||||
bo := rangetask.NewLocateRegionBackoffer(context.Background())
|
||||
for {
|
||||
resp, err := s.pdClient.GetOperator(context.Background(), regionID)
|
||||
if err == nil && resp != nil {
|
||||
|
|
|
|||
|
|
@ -30,27 +30,36 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tikv
|
||||
package rangetask
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
tikverr "github.com/tikv/client-go/v2/error"
|
||||
"github.com/tikv/client-go/v2/internal/client"
|
||||
"github.com/tikv/client-go/v2/internal/locate"
|
||||
"github.com/tikv/client-go/v2/internal/retry"
|
||||
"github.com/tikv/client-go/v2/kv"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
)
|
||||
|
||||
type storage interface {
|
||||
// GetRegionCache gets the RegionCache.
|
||||
GetRegionCache() *locate.RegionCache
|
||||
// SendReq sends a request to TiKV.
|
||||
SendReq(bo *retry.Backoffer, req *tikvrpc.Request, regionID locate.RegionVerID, timeout time.Duration) (*tikvrpc.Response, error)
|
||||
}
|
||||
|
||||
// DeleteRangeTask is used to delete all keys in a range. After
|
||||
// performing DeleteRange, it keeps how many ranges it affects and
|
||||
// if the task was canceled or not.
|
||||
type DeleteRangeTask struct {
|
||||
completedRegions int
|
||||
store Storage
|
||||
store storage
|
||||
startKey []byte
|
||||
endKey []byte
|
||||
notifyOnly bool
|
||||
|
|
@ -60,7 +69,7 @@ type DeleteRangeTask struct {
|
|||
// NewDeleteRangeTask creates a DeleteRangeTask. Deleting will be performed when `Execute` method is invoked.
|
||||
// Be careful while using this API. This API doesn't keep recent MVCC versions, but will delete all versions of all keys
|
||||
// in the range immediately. Also notice that frequent invocation to this API may cause performance problems to TiKV.
|
||||
func NewDeleteRangeTask(store Storage, startKey []byte, endKey []byte, concurrency int) *DeleteRangeTask {
|
||||
func NewDeleteRangeTask(store storage, startKey []byte, endKey []byte, concurrency int) *DeleteRangeTask {
|
||||
return &DeleteRangeTask{
|
||||
completedRegions: 0,
|
||||
store: store,
|
||||
|
|
@ -74,7 +83,7 @@ func NewDeleteRangeTask(store Storage, startKey []byte, endKey []byte, concurren
|
|||
// NewNotifyDeleteRangeTask creates a task that sends delete range requests to all regions in the range, but with the
|
||||
// flag `notifyOnly` set. TiKV will not actually delete the range after receiving request, but it will be replicated via
|
||||
// raft. This is used to notify the involved regions before sending UnsafeDestroyRange requests.
|
||||
func NewNotifyDeleteRangeTask(store Storage, startKey []byte, endKey []byte, concurrency int) *DeleteRangeTask {
|
||||
func NewNotifyDeleteRangeTask(store storage, startKey []byte, endKey []byte, concurrency int) *DeleteRangeTask {
|
||||
task := NewDeleteRangeTask(store, startKey, endKey, concurrency)
|
||||
task.notifyOnly = true
|
||||
return task
|
||||
|
|
@ -102,9 +111,9 @@ func (t *DeleteRangeTask) Execute(ctx context.Context) error {
|
|||
const deleteRangeOneRegionMaxBackoff = 100000
|
||||
|
||||
// Execute performs the delete range operation.
|
||||
func (t *DeleteRangeTask) sendReqOnRange(ctx context.Context, r kv.KeyRange) (RangeTaskStat, error) {
|
||||
func (t *DeleteRangeTask) sendReqOnRange(ctx context.Context, r kv.KeyRange) (TaskStat, error) {
|
||||
startKey, rangeEndKey := r.StartKey, r.EndKey
|
||||
var stat RangeTaskStat
|
||||
var stat TaskStat
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
|
@ -30,7 +30,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tikv
|
||||
package rangetask
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
|
@ -55,14 +55,14 @@ const (
|
|||
lblFailedRegions = "failed-regions"
|
||||
)
|
||||
|
||||
// RangeTaskRunner splits a range into many ranges to process concurrently, and convenient to send requests to all
|
||||
// Runner splits a range into many ranges to process concurrently, and convenient to send requests to all
|
||||
// regions in the range. Because of merging and splitting, it's possible that multiple requests for disjoint ranges are
|
||||
// sent to the same region.
|
||||
type RangeTaskRunner struct {
|
||||
type Runner struct {
|
||||
name string
|
||||
store Storage
|
||||
store storage
|
||||
concurrency int
|
||||
handler RangeTaskHandler
|
||||
handler TaskHandler
|
||||
statLogInterval time.Duration
|
||||
regionsPerTask int
|
||||
|
||||
|
|
@ -70,16 +70,16 @@ type RangeTaskRunner struct {
|
|||
failedRegions int32
|
||||
}
|
||||
|
||||
// RangeTaskStat is used to count Regions that completed or failed to do the task.
|
||||
type RangeTaskStat struct {
|
||||
// TaskStat is used to count Regions that completed or failed to do the task.
|
||||
type TaskStat struct {
|
||||
CompletedRegions int
|
||||
FailedRegions int
|
||||
}
|
||||
|
||||
// RangeTaskHandler is the type of functions that processes a task of a key range.
|
||||
// TaskHandler is the type of functions that processes a task of a key range.
|
||||
// The function should calculate Regions that succeeded or failed to the task.
|
||||
// Returning error from the handler means the error caused the whole task should be stopped.
|
||||
type RangeTaskHandler = func(ctx context.Context, r kv.KeyRange) (RangeTaskStat, error)
|
||||
type TaskHandler = func(ctx context.Context, r kv.KeyRange) (TaskStat, error)
|
||||
|
||||
// NewRangeTaskRunner creates a RangeTaskRunner.
|
||||
//
|
||||
|
|
@ -88,11 +88,11 @@ type RangeTaskHandler = func(ctx context.Context, r kv.KeyRange) (RangeTaskStat,
|
|||
// will be canceled.
|
||||
func NewRangeTaskRunner(
|
||||
name string,
|
||||
store Storage,
|
||||
store storage,
|
||||
concurrency int,
|
||||
handler RangeTaskHandler,
|
||||
) *RangeTaskRunner {
|
||||
return &RangeTaskRunner{
|
||||
handler TaskHandler,
|
||||
) *Runner {
|
||||
return &Runner{
|
||||
name: name,
|
||||
store: store,
|
||||
concurrency: concurrency,
|
||||
|
|
@ -104,7 +104,7 @@ func NewRangeTaskRunner(
|
|||
|
||||
// SetRegionsPerTask sets how many regions is in a divided task. Since regions may split and merge, it's possible that
|
||||
// a sub task contains not exactly specified number of regions.
|
||||
func (s *RangeTaskRunner) SetRegionsPerTask(regionsPerTask int) {
|
||||
func (s *Runner) SetRegionsPerTask(regionsPerTask int) {
|
||||
if regionsPerTask < 1 {
|
||||
panic("RangeTaskRunner: regionsPerTask should be at least 1")
|
||||
}
|
||||
|
|
@ -113,9 +113,14 @@ func (s *RangeTaskRunner) SetRegionsPerTask(regionsPerTask int) {
|
|||
|
||||
const locateRegionMaxBackoff = 20000
|
||||
|
||||
// NewLocateRegionBackoffer creates the backoofer for LocateRegion request.
|
||||
func NewLocateRegionBackoffer(ctx context.Context) *retry.Backoffer {
|
||||
return retry.NewBackofferWithVars(ctx, locateRegionMaxBackoff, nil)
|
||||
}
|
||||
|
||||
// RunOnRange runs the task on the given range.
|
||||
// Empty startKey or endKey means unbounded.
|
||||
func (s *RangeTaskRunner) RunOnRange(ctx context.Context, startKey, endKey []byte) error {
|
||||
func (s *Runner) RunOnRange(ctx context.Context, startKey, endKey []byte) error {
|
||||
s.completedRegions = 0
|
||||
metrics.TiKVRangeTaskStats.WithLabelValues(s.name, lblCompletedRegions).Set(0)
|
||||
|
||||
|
|
@ -179,7 +184,7 @@ Loop:
|
|||
default:
|
||||
}
|
||||
|
||||
bo := retry.NewBackofferWithVars(ctx, locateRegionMaxBackoff, nil)
|
||||
bo := NewLocateRegionBackoffer(ctx)
|
||||
|
||||
rangeEndKey, err := s.store.GetRegionCache().BatchLoadRegionsFromKey(bo, key, s.regionsPerTask)
|
||||
if err != nil {
|
||||
|
|
@ -244,7 +249,7 @@ Loop:
|
|||
}
|
||||
|
||||
// createWorker creates a worker that can process tasks from the given channel.
|
||||
func (s *RangeTaskRunner) createWorker(taskCh chan *kv.KeyRange, wg *sync.WaitGroup) *rangeTaskWorker {
|
||||
func (s *Runner) createWorker(taskCh chan *kv.KeyRange, wg *sync.WaitGroup) *rangeTaskWorker {
|
||||
return &rangeTaskWorker{
|
||||
name: s.name,
|
||||
store: s.store,
|
||||
|
|
@ -258,20 +263,20 @@ func (s *RangeTaskRunner) createWorker(taskCh chan *kv.KeyRange, wg *sync.WaitGr
|
|||
}
|
||||
|
||||
// CompletedRegions returns how many regions has been sent requests.
|
||||
func (s *RangeTaskRunner) CompletedRegions() int {
|
||||
func (s *Runner) CompletedRegions() int {
|
||||
return int(atomic.LoadInt32(&s.completedRegions))
|
||||
}
|
||||
|
||||
// FailedRegions returns how many regions has failed to do the task.
|
||||
func (s *RangeTaskRunner) FailedRegions() int {
|
||||
func (s *Runner) FailedRegions() int {
|
||||
return int(atomic.LoadInt32(&s.failedRegions))
|
||||
}
|
||||
|
||||
// rangeTaskWorker is used by RangeTaskRunner to process tasks concurrently.
|
||||
type rangeTaskWorker struct {
|
||||
name string
|
||||
store Storage
|
||||
handler RangeTaskHandler
|
||||
store storage
|
||||
handler TaskHandler
|
||||
taskCh chan *kv.KeyRange
|
||||
wg *sync.WaitGroup
|
||||
|
||||
Loading…
Reference in New Issue