diff --git a/internal/mockstore/mocktikv/pd.go b/internal/mockstore/mocktikv/pd.go index 7d6e2368..0e96f124 100644 --- a/internal/mockstore/mocktikv/pd.go +++ b/internal/mockstore/mocktikv/pd.go @@ -35,7 +35,6 @@ package mocktikv import ( - "bytes" "context" "fmt" "math" @@ -232,26 +231,7 @@ func (c *pdClient) ScatterRegions(ctx context.Context, regionsID []uint64, opts } func (c *pdClient) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...pd.RegionsOption) (*pdpb.SplitRegionsResponse, error) { - regionsID := make([]uint64, 0, len(splitKeys)) - for i, key := range splitKeys { - k := NewMvccKey(key) - region, _, _ := c.cluster.GetRegionByKey(k) - if bytes.Equal(region.GetStartKey(), key) { - continue - } - if i == 0 { - regionsID = append(regionsID, region.Id) - } - newRegionID, newPeerIDs := c.cluster.AllocID(), c.cluster.AllocIDs(len(region.Peers)) - newRegion := c.cluster.SplitRaw(region.GetId(), newRegionID, k, newPeerIDs, newPeerIDs[0]) - regionsID = append(regionsID, newRegion.Id) - } - response := &pdpb.SplitRegionsResponse{ - Header: &pdpb.ResponseHeader{}, - FinishedPercentage: 100, - RegionsId: regionsID, - } - return response, nil + return nil, nil } func (c *pdClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { diff --git a/tikv/split_region.go b/tikv/split_region.go index f2832d70..6186c304 100644 --- a/tikv/split_region.go +++ b/tikv/split_region.go @@ -40,13 +40,18 @@ import ( "fmt" "math" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pkg/errors" tikverr "github.com/tikv/client-go/v2/error" + "github.com/tikv/client-go/v2/internal/client" "github.com/tikv/client-go/v2/internal/kvrpc" + "github.com/tikv/client-go/v2/internal/locate" "github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv/rangetask" "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" @@ -59,7 +64,7 @@ func equalRegionStartKey(key, regionStartKey []byte) bool { return bytes.Equal(key, regionStartKey) } -func (s *KVStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter bool, tableID *int64) (*pdpb.SplitRegionsResponse, error) { +func (s *KVStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter bool, tableID *int64) (*tikvrpc.Response, error) { // equalRegionStartKey is used to filter split keys. // If the split key is equal to the start key of the region, then the key has been split, we need to skip the split key. groups, _, err := s.regionCache.GroupKeysByRegion(bo, keys, equalRegionStartKey) @@ -85,9 +90,9 @@ func (s *KVStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter boo } if len(batches) == 1 { resp := s.batchSendSingleRegion(bo, batches[0], scatter, tableID) - return resp, err + return resp.Response, resp.Error } - ch := make(chan *pdpb.SplitRegionsResponse, len(batches)) + ch := make(chan kvrpc.BatchResult, len(batches)) for _, batch1 := range batches { go func(b kvrpc.Batch) { backoffer, cancel := bo.Fork() @@ -97,38 +102,37 @@ func (s *KVStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter boo select { case ch <- s.batchSendSingleRegion(backoffer, b, scatter, tableID): case <-bo.GetCtx().Done(): - resp := &pdpb.SplitRegionsResponse{} - resp.Header.Error = &pdpb.Error{Message: err.Error()} - ch <- resp + ch <- kvrpc.BatchResult{Error: bo.GetCtx().Err()} } }, func(r interface{}) { if r != nil { - resp := &pdpb.SplitRegionsResponse{} - resp.Header.Error = &pdpb.Error{Message: err.Error()} - ch <- resp + ch <- kvrpc.BatchResult{Error: errors.Errorf("%v", r)} } }) }(batch1) } - srResp := &pdpb.SplitRegionsResponse{RegionsId: make([]uint64, len(keys)*2)} + srResp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)*2)} for i := 0; i < len(batches); i++ { batchResp := <-ch - if batchResp.Header.Error != nil { - respErr := errors.New(batchResp.Header.Error.GetMessage()) - logutil.BgLogger().Info("batch split regions failed", zap.Error(respErr)) + if batchResp.Error != nil { + logutil.BgLogger().Info("batch split regions failed", zap.Error(batchResp.Error)) if err == nil { - err = respErr + err = batchResp.Error } } // If the split succeeds and the scatter fails, we also need to add the region IDs. - srResp.RegionsId = append(srResp.RegionsId, batchResp.RegionsId...) + if batchResp.Response != nil { + spResp := batchResp.Resp.(*kvrpcpb.SplitRegionResponse) + regions := spResp.GetRegions() + srResp.Regions = append(srResp.Regions, regions...) + } } - return srResp, err + return &tikvrpc.Response{Resp: srResp}, err } -func (s *KVStore) batchSendSingleRegion(bo *Backoffer, batch kvrpc.Batch, scatter bool, tableID *int64) *pdpb.SplitRegionsResponse { +func (s *KVStore) batchSendSingleRegion(bo *Backoffer, batch kvrpc.Batch, scatter bool, tableID *int64) kvrpc.BatchResult { if val, err := util.EvalFailpoint("mockSplitRegionTimeout"); err == nil { if val.(bool) { if _, ok := bo.GetCtx().Deadline(); ok { @@ -137,56 +141,80 @@ func (s *KVStore) batchSendSingleRegion(bo *Backoffer, batch kvrpc.Batch, scatte } } - resp, err := s.pdClient.SplitRegions(bo.GetCtx(), batch.Keys) + req := tikvrpc.NewRequest(tikvrpc.CmdSplitRegion, &kvrpcpb.SplitRegionRequest{ + SplitKeys: batch.Keys, + }, kvrpcpb.Context{ + Priority: kvrpcpb.CommandPri_Normal, + }) + + sender := locate.NewRegionRequestSender(s.regionCache, s.GetTiKVClient()) + resp, err := sender.SendReq(bo, req, batch.RegionID, client.ReadTimeoutShort) + + batchResp := kvrpc.BatchResult{Response: resp} if err != nil { - resp := &pdpb.SplitRegionsResponse{} - resp.Header = &pdpb.ResponseHeader{} - resp.Header.Error = &pdpb.Error{Message: err.Error()} - return resp + batchResp.Error = err + return batchResp } - if resp == nil { - return &pdpb.SplitRegionsResponse{ - Header: &pdpb.ResponseHeader{ - Error: &pdpb.Error{Message: "empty response"}, - }, + regionErr, err := resp.GetRegionError() + if err != nil { + batchResp.Error = err + return batchResp + } + if regionErr != nil { + err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) + if err != nil { + batchResp.Error = err + return batchResp } - } - regionIDs := resp.GetRegionsId() - if len(regionIDs) > 0 { - // Divide a region into n, one of them may not need to be scattered, - // so n-1 needs to be scattered to other stores. - regionIDs = regionIDs[:len(regionIDs)-1] + resp, err = s.splitBatchRegionsReq(bo, batch.Keys, scatter, tableID) + batchResp.Response = resp + batchResp.Error = err + return batchResp } + spResp := resp.Resp.(*kvrpcpb.SplitRegionResponse) + regions := spResp.GetRegions() + if len(regions) > 0 { + // Divide a region into n, one of them may not need to be scattered, + // so n-1 needs to be scattered to other stores. + spResp.Regions = regions[:len(regions)-1] + } + var newRegionLeft string + if len(spResp.Regions) > 0 { + newRegionLeft = logutil.Hex(spResp.Regions[0]).String() + } logutil.BgLogger().Info("batch split regions complete", zap.Uint64("batch region ID", batch.RegionID.GetID()), zap.String("first at", kv.StrKey(batch.Keys[0])), - zap.Int("new region count", len(regionIDs))) - if resp.FinishedPercentage != 100 { - err = errors.Errorf("Fail to batch split regions, finishedPercentage : %d, batch region ID : %d", - resp.FinishedPercentage, batch.RegionID.GetID()) - resp.Header.Error = &pdpb.Error{Message: err.Error()} - } + zap.String("first new region left", newRegionLeft), + zap.Int("new region count", len(spResp.Regions))) + if !scatter { - return resp + return batchResp } - for i, id := range regionIDs { - if err = s.scatterRegion(bo, id, tableID); err == nil { + + for i, r := range spResp.Regions { + if err = s.scatterRegion(bo, r.Id, tableID); err == nil { logutil.BgLogger().Info("batch split regions, scatter region complete", zap.Uint64("batch region ID", batch.RegionID.GetID()), - zap.String("at", kv.StrKey(batch.Keys[i]))) + zap.String("at", kv.StrKey(batch.Keys[i])), + zap.Stringer("new region left", logutil.Hex(r))) continue } logutil.BgLogger().Info("batch split regions, scatter region failed", zap.Uint64("batch region ID", batch.RegionID.GetID()), zap.String("at", kv.StrKey(batch.Keys[i])), + zap.Stringer("new region left", logutil.Hex(r)), zap.Error(err)) + if batchResp.Error == nil { + batchResp.Error = err + } if _, ok := err.(*tikverr.ErrPDServerTimeout); ok { break } } - return resp + return batchResp } const ( @@ -199,8 +227,11 @@ func (s *KVStore) SplitRegions(ctx context.Context, splitKeys [][]byte, scatter bo := retry.NewBackofferWithVars(ctx, int(math.Min(float64(len(splitKeys))*splitRegionBackoff, maxSplitRegionsBackoff)), nil) resp, err := s.splitBatchRegionsReq(bo, splitKeys, scatter, tableID) regionIDs = make([]uint64, 0, len(splitKeys)) - if resp != nil { - regionIDs = append(regionIDs, resp.GetRegionsId()...) + if resp != nil && resp.Resp != nil { + spResp := resp.Resp.(*kvrpcpb.SplitRegionResponse) + for _, r := range spResp.Regions { + regionIDs = append(regionIDs, r.Id) + } logutil.BgLogger().Info("split regions complete", zap.Int("region count", len(regionIDs)), zap.Uint64s("region IDs", regionIDs)) } return regionIDs, err