mirror of https://github.com/tikv/client-go.git
parent
7f6daad88a
commit
b3d61828e5
|
|
@ -35,7 +35,6 @@
|
|||
package mocktikv
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
|
|
@ -232,26 +231,7 @@ func (c *pdClient) ScatterRegions(ctx context.Context, regionsID []uint64, opts
|
|||
}
|
||||
|
||||
func (c *pdClient) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...pd.RegionsOption) (*pdpb.SplitRegionsResponse, error) {
|
||||
regionsID := make([]uint64, 0, len(splitKeys))
|
||||
for i, key := range splitKeys {
|
||||
k := NewMvccKey(key)
|
||||
region, _, _ := c.cluster.GetRegionByKey(k)
|
||||
if bytes.Equal(region.GetStartKey(), key) {
|
||||
continue
|
||||
}
|
||||
if i == 0 {
|
||||
regionsID = append(regionsID, region.Id)
|
||||
}
|
||||
newRegionID, newPeerIDs := c.cluster.AllocID(), c.cluster.AllocIDs(len(region.Peers))
|
||||
newRegion := c.cluster.SplitRaw(region.GetId(), newRegionID, k, newPeerIDs, newPeerIDs[0])
|
||||
regionsID = append(regionsID, newRegion.Id)
|
||||
}
|
||||
response := &pdpb.SplitRegionsResponse{
|
||||
Header: &pdpb.ResponseHeader{},
|
||||
FinishedPercentage: 100,
|
||||
RegionsId: regionsID,
|
||||
}
|
||||
return response, nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (c *pdClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
|
||||
|
|
|
|||
|
|
@ -40,13 +40,18 @@ import (
|
|||
"fmt"
|
||||
"math"
|
||||
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
"github.com/pingcap/kvproto/pkg/pdpb"
|
||||
"github.com/pkg/errors"
|
||||
tikverr "github.com/tikv/client-go/v2/error"
|
||||
"github.com/tikv/client-go/v2/internal/client"
|
||||
"github.com/tikv/client-go/v2/internal/kvrpc"
|
||||
"github.com/tikv/client-go/v2/internal/locate"
|
||||
"github.com/tikv/client-go/v2/internal/logutil"
|
||||
"github.com/tikv/client-go/v2/internal/retry"
|
||||
"github.com/tikv/client-go/v2/kv"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
"github.com/tikv/client-go/v2/txnkv/rangetask"
|
||||
"github.com/tikv/client-go/v2/util"
|
||||
pd "github.com/tikv/pd/client"
|
||||
|
|
@ -59,7 +64,7 @@ func equalRegionStartKey(key, regionStartKey []byte) bool {
|
|||
return bytes.Equal(key, regionStartKey)
|
||||
}
|
||||
|
||||
func (s *KVStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter bool, tableID *int64) (*pdpb.SplitRegionsResponse, error) {
|
||||
func (s *KVStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter bool, tableID *int64) (*tikvrpc.Response, error) {
|
||||
// equalRegionStartKey is used to filter split keys.
|
||||
// If the split key is equal to the start key of the region, then the key has been split, we need to skip the split key.
|
||||
groups, _, err := s.regionCache.GroupKeysByRegion(bo, keys, equalRegionStartKey)
|
||||
|
|
@ -85,9 +90,9 @@ func (s *KVStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter boo
|
|||
}
|
||||
if len(batches) == 1 {
|
||||
resp := s.batchSendSingleRegion(bo, batches[0], scatter, tableID)
|
||||
return resp, err
|
||||
return resp.Response, resp.Error
|
||||
}
|
||||
ch := make(chan *pdpb.SplitRegionsResponse, len(batches))
|
||||
ch := make(chan kvrpc.BatchResult, len(batches))
|
||||
for _, batch1 := range batches {
|
||||
go func(b kvrpc.Batch) {
|
||||
backoffer, cancel := bo.Fork()
|
||||
|
|
@ -97,38 +102,37 @@ func (s *KVStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter boo
|
|||
select {
|
||||
case ch <- s.batchSendSingleRegion(backoffer, b, scatter, tableID):
|
||||
case <-bo.GetCtx().Done():
|
||||
resp := &pdpb.SplitRegionsResponse{}
|
||||
resp.Header.Error = &pdpb.Error{Message: err.Error()}
|
||||
ch <- resp
|
||||
ch <- kvrpc.BatchResult{Error: bo.GetCtx().Err()}
|
||||
}
|
||||
}, func(r interface{}) {
|
||||
if r != nil {
|
||||
resp := &pdpb.SplitRegionsResponse{}
|
||||
resp.Header.Error = &pdpb.Error{Message: err.Error()}
|
||||
ch <- resp
|
||||
ch <- kvrpc.BatchResult{Error: errors.Errorf("%v", r)}
|
||||
}
|
||||
})
|
||||
}(batch1)
|
||||
}
|
||||
|
||||
srResp := &pdpb.SplitRegionsResponse{RegionsId: make([]uint64, len(keys)*2)}
|
||||
srResp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)*2)}
|
||||
for i := 0; i < len(batches); i++ {
|
||||
batchResp := <-ch
|
||||
if batchResp.Header.Error != nil {
|
||||
respErr := errors.New(batchResp.Header.Error.GetMessage())
|
||||
logutil.BgLogger().Info("batch split regions failed", zap.Error(respErr))
|
||||
if batchResp.Error != nil {
|
||||
logutil.BgLogger().Info("batch split regions failed", zap.Error(batchResp.Error))
|
||||
if err == nil {
|
||||
err = respErr
|
||||
err = batchResp.Error
|
||||
}
|
||||
}
|
||||
|
||||
// If the split succeeds and the scatter fails, we also need to add the region IDs.
|
||||
srResp.RegionsId = append(srResp.RegionsId, batchResp.RegionsId...)
|
||||
if batchResp.Response != nil {
|
||||
spResp := batchResp.Resp.(*kvrpcpb.SplitRegionResponse)
|
||||
regions := spResp.GetRegions()
|
||||
srResp.Regions = append(srResp.Regions, regions...)
|
||||
}
|
||||
}
|
||||
return srResp, err
|
||||
return &tikvrpc.Response{Resp: srResp}, err
|
||||
}
|
||||
|
||||
func (s *KVStore) batchSendSingleRegion(bo *Backoffer, batch kvrpc.Batch, scatter bool, tableID *int64) *pdpb.SplitRegionsResponse {
|
||||
func (s *KVStore) batchSendSingleRegion(bo *Backoffer, batch kvrpc.Batch, scatter bool, tableID *int64) kvrpc.BatchResult {
|
||||
if val, err := util.EvalFailpoint("mockSplitRegionTimeout"); err == nil {
|
||||
if val.(bool) {
|
||||
if _, ok := bo.GetCtx().Deadline(); ok {
|
||||
|
|
@ -137,56 +141,80 @@ func (s *KVStore) batchSendSingleRegion(bo *Backoffer, batch kvrpc.Batch, scatte
|
|||
}
|
||||
}
|
||||
|
||||
resp, err := s.pdClient.SplitRegions(bo.GetCtx(), batch.Keys)
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdSplitRegion, &kvrpcpb.SplitRegionRequest{
|
||||
SplitKeys: batch.Keys,
|
||||
}, kvrpcpb.Context{
|
||||
Priority: kvrpcpb.CommandPri_Normal,
|
||||
})
|
||||
|
||||
sender := locate.NewRegionRequestSender(s.regionCache, s.GetTiKVClient())
|
||||
resp, err := sender.SendReq(bo, req, batch.RegionID, client.ReadTimeoutShort)
|
||||
|
||||
batchResp := kvrpc.BatchResult{Response: resp}
|
||||
if err != nil {
|
||||
resp := &pdpb.SplitRegionsResponse{}
|
||||
resp.Header = &pdpb.ResponseHeader{}
|
||||
resp.Header.Error = &pdpb.Error{Message: err.Error()}
|
||||
return resp
|
||||
batchResp.Error = err
|
||||
return batchResp
|
||||
}
|
||||
if resp == nil {
|
||||
return &pdpb.SplitRegionsResponse{
|
||||
Header: &pdpb.ResponseHeader{
|
||||
Error: &pdpb.Error{Message: "empty response"},
|
||||
},
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
batchResp.Error = err
|
||||
return batchResp
|
||||
}
|
||||
if regionErr != nil {
|
||||
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
batchResp.Error = err
|
||||
return batchResp
|
||||
}
|
||||
}
|
||||
regionIDs := resp.GetRegionsId()
|
||||
if len(regionIDs) > 0 {
|
||||
// Divide a region into n, one of them may not need to be scattered,
|
||||
// so n-1 needs to be scattered to other stores.
|
||||
regionIDs = regionIDs[:len(regionIDs)-1]
|
||||
resp, err = s.splitBatchRegionsReq(bo, batch.Keys, scatter, tableID)
|
||||
batchResp.Response = resp
|
||||
batchResp.Error = err
|
||||
return batchResp
|
||||
}
|
||||
|
||||
spResp := resp.Resp.(*kvrpcpb.SplitRegionResponse)
|
||||
regions := spResp.GetRegions()
|
||||
if len(regions) > 0 {
|
||||
// Divide a region into n, one of them may not need to be scattered,
|
||||
// so n-1 needs to be scattered to other stores.
|
||||
spResp.Regions = regions[:len(regions)-1]
|
||||
}
|
||||
var newRegionLeft string
|
||||
if len(spResp.Regions) > 0 {
|
||||
newRegionLeft = logutil.Hex(spResp.Regions[0]).String()
|
||||
}
|
||||
logutil.BgLogger().Info("batch split regions complete",
|
||||
zap.Uint64("batch region ID", batch.RegionID.GetID()),
|
||||
zap.String("first at", kv.StrKey(batch.Keys[0])),
|
||||
zap.Int("new region count", len(regionIDs)))
|
||||
if resp.FinishedPercentage != 100 {
|
||||
err = errors.Errorf("Fail to batch split regions, finishedPercentage : %d, batch region ID : %d",
|
||||
resp.FinishedPercentage, batch.RegionID.GetID())
|
||||
resp.Header.Error = &pdpb.Error{Message: err.Error()}
|
||||
}
|
||||
zap.String("first new region left", newRegionLeft),
|
||||
zap.Int("new region count", len(spResp.Regions)))
|
||||
|
||||
if !scatter {
|
||||
return resp
|
||||
return batchResp
|
||||
}
|
||||
for i, id := range regionIDs {
|
||||
if err = s.scatterRegion(bo, id, tableID); err == nil {
|
||||
|
||||
for i, r := range spResp.Regions {
|
||||
if err = s.scatterRegion(bo, r.Id, tableID); err == nil {
|
||||
logutil.BgLogger().Info("batch split regions, scatter region complete",
|
||||
zap.Uint64("batch region ID", batch.RegionID.GetID()),
|
||||
zap.String("at", kv.StrKey(batch.Keys[i])))
|
||||
zap.String("at", kv.StrKey(batch.Keys[i])),
|
||||
zap.Stringer("new region left", logutil.Hex(r)))
|
||||
continue
|
||||
}
|
||||
|
||||
logutil.BgLogger().Info("batch split regions, scatter region failed",
|
||||
zap.Uint64("batch region ID", batch.RegionID.GetID()),
|
||||
zap.String("at", kv.StrKey(batch.Keys[i])),
|
||||
zap.Stringer("new region left", logutil.Hex(r)),
|
||||
zap.Error(err))
|
||||
if batchResp.Error == nil {
|
||||
batchResp.Error = err
|
||||
}
|
||||
if _, ok := err.(*tikverr.ErrPDServerTimeout); ok {
|
||||
break
|
||||
}
|
||||
}
|
||||
return resp
|
||||
return batchResp
|
||||
}
|
||||
|
||||
const (
|
||||
|
|
@ -199,8 +227,11 @@ func (s *KVStore) SplitRegions(ctx context.Context, splitKeys [][]byte, scatter
|
|||
bo := retry.NewBackofferWithVars(ctx, int(math.Min(float64(len(splitKeys))*splitRegionBackoff, maxSplitRegionsBackoff)), nil)
|
||||
resp, err := s.splitBatchRegionsReq(bo, splitKeys, scatter, tableID)
|
||||
regionIDs = make([]uint64, 0, len(splitKeys))
|
||||
if resp != nil {
|
||||
regionIDs = append(regionIDs, resp.GetRegionsId()...)
|
||||
if resp != nil && resp.Resp != nil {
|
||||
spResp := resp.Resp.(*kvrpcpb.SplitRegionResponse)
|
||||
for _, r := range spResp.Regions {
|
||||
regionIDs = append(regionIDs, r.Id)
|
||||
}
|
||||
logutil.BgLogger().Info("split regions complete", zap.Int("region count", len(regionIDs)), zap.Uint64s("region IDs", regionIDs))
|
||||
}
|
||||
return regionIDs, err
|
||||
|
|
|
|||
Loading…
Reference in New Issue