From 0fdc8e3d6f24e86066394b290116fd69b55049af Mon Sep 17 00:00:00 2001 From: disksing Date: Tue, 13 Jul 2021 11:56:49 +0800 Subject: [PATCH] rawkv: move rawkv client to rawkv (#227) Signed-off-by: disksing Co-authored-by: Shirly --- integration_tests/rawkv_test.go | 37 +- rawkv/rawkv.go | 546 ++++++++++++++++++++++++++-- {tikv => rawkv}/rawkv_test.go | 35 +- rawkv/test_prob.go | 53 +++ tikv/rawkv.go | 626 -------------------------------- tikv/test_probe.go | 30 -- 6 files changed, 611 insertions(+), 716 deletions(-) rename {tikv => rawkv}/rawkv_test.go (85%) create mode 100644 rawkv/test_prob.go delete mode 100644 tikv/rawkv.go diff --git a/integration_tests/rawkv_test.go b/integration_tests/rawkv_test.go index 731eddec..c66b2a8d 100644 --- a/integration_tests/rawkv_test.go +++ b/integration_tests/rawkv_test.go @@ -40,6 +40,7 @@ import ( "github.com/pingcap/tidb/store/mockstore/unistore" "github.com/stretchr/testify/suite" + "github.com/tikv/client-go/v2/rawkv" "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" ) @@ -51,7 +52,7 @@ func TestRawKV(t *testing.T) { type testRawKVSuite struct { suite.Suite cluster testutils.Cluster - client tikv.RawKVClientProbe + client rawkv.ClientProbe bo *tikv.Backoffer } @@ -60,7 +61,7 @@ func (s *testRawKVSuite) SetupTest() { s.Require().Nil(err) unistore.BootstrapWithSingleStore(cluster) s.cluster = cluster - s.client = tikv.RawKVClientProbe{RawKVClient: &tikv.RawKVClient{}} + s.client = rawkv.ClientProbe{Client: &rawkv.Client{}} s.client.SetPDClient(pdClient) s.client.SetRegionCache(tikv.NewRegionCache(pdClient)) s.client.SetRPCClient(client) @@ -72,13 +73,13 @@ func (s *testRawKVSuite) TearDownTest() { } func (s *testRawKVSuite) mustNotExist(key []byte) { - v, err := s.client.Get(key) + v, err := s.client.Get(context.Background(), key) s.Nil(err) s.Nil(v) } func (s *testRawKVSuite) mustBatchNotExist(keys [][]byte) { - values, err := s.client.BatchGet(keys) + values, err := s.client.BatchGet(context.Background(), keys) s.Nil(err) s.NotNil(values) s.Equal(len(keys), len(values)) @@ -88,14 +89,14 @@ func (s *testRawKVSuite) mustBatchNotExist(keys [][]byte) { } func (s *testRawKVSuite) mustGet(key, value []byte) { - v, err := s.client.Get(key) + v, err := s.client.Get(context.Background(), key) s.Nil(err) s.NotNil(v) s.Equal(v, value) } func (s *testRawKVSuite) mustBatchGet(keys, values [][]byte) { - checkValues, err := s.client.BatchGet(keys) + checkValues, err := s.client.BatchGet(context.Background(), keys) s.Nil(err) s.NotNil(checkValues) s.Equal(len(keys), len(checkValues)) @@ -105,27 +106,27 @@ func (s *testRawKVSuite) mustBatchGet(keys, values [][]byte) { } func (s *testRawKVSuite) mustPut(key, value []byte) { - err := s.client.Put(key, value) + err := s.client.Put(context.Background(), key, value) s.Nil(err) } func (s *testRawKVSuite) mustBatchPut(keys, values [][]byte) { - err := s.client.BatchPut(keys, values) + err := s.client.BatchPut(context.Background(), keys, values) s.Nil(err) } func (s *testRawKVSuite) mustDelete(key []byte) { - err := s.client.Delete(key) + err := s.client.Delete(context.Background(), key) s.Nil(err) } func (s *testRawKVSuite) mustBatchDelete(keys [][]byte) { - err := s.client.BatchDelete(keys) + err := s.client.BatchDelete(context.Background(), keys) s.Nil(err) } func (s *testRawKVSuite) mustScan(startKey string, limit int, expect ...string) { - keys, values, err := s.client.Scan([]byte(startKey), nil, limit) + keys, values, err := s.client.Scan(context.Background(), []byte(startKey), nil, limit) s.Nil(err) s.Equal(len(keys)*2, len(expect)) for i := range keys { @@ -135,7 +136,7 @@ func (s *testRawKVSuite) mustScan(startKey string, limit int, expect ...string) } func (s *testRawKVSuite) mustScanRange(startKey string, endKey string, limit int, expect ...string) { - keys, values, err := s.client.Scan([]byte(startKey), []byte(endKey), limit) + keys, values, err := s.client.Scan(context.Background(), []byte(startKey), []byte(endKey), limit) s.Nil(err) s.Equal(len(keys)*2, len(expect)) for i := range keys { @@ -145,7 +146,7 @@ func (s *testRawKVSuite) mustScanRange(startKey string, endKey string, limit int } func (s *testRawKVSuite) mustReverseScan(startKey []byte, limit int, expect ...string) { - keys, values, err := s.client.ReverseScan(startKey, nil, limit) + keys, values, err := s.client.ReverseScan(context.Background(), startKey, nil, limit) s.Nil(err) s.Equal(len(keys)*2, len(expect)) for i := range keys { @@ -155,7 +156,7 @@ func (s *testRawKVSuite) mustReverseScan(startKey []byte, limit int, expect ...s } func (s *testRawKVSuite) mustReverseScanRange(startKey, endKey []byte, limit int, expect ...string) { - keys, values, err := s.client.ReverseScan(startKey, endKey, limit) + keys, values, err := s.client.ReverseScan(context.Background(), startKey, endKey, limit) s.Nil(err) s.Equal(len(keys)*2, len(expect)) for i := range keys { @@ -165,7 +166,7 @@ func (s *testRawKVSuite) mustReverseScanRange(startKey, endKey []byte, limit int } func (s *testRawKVSuite) mustDeleteRange(startKey, endKey []byte, expected map[string]string) { - err := s.client.DeleteRange(startKey, endKey) + err := s.client.DeleteRange(context.Background(), startKey, endKey) s.Nil(err) for keyStr := range expected { @@ -179,7 +180,7 @@ func (s *testRawKVSuite) mustDeleteRange(startKey, endKey []byte, expected map[s } func (s *testRawKVSuite) checkData(expected map[string]string) { - keys, values, err := s.client.Scan([]byte(""), nil, len(expected)+1) + keys, values, err := s.client.Scan(context.Background(), []byte(""), nil, len(expected)+1) s.Nil(err) s.Equal(len(expected), len(keys)) @@ -205,7 +206,7 @@ func (s *testRawKVSuite) TestSimple() { s.mustGet([]byte("key"), []byte("value")) s.mustDelete([]byte("key")) s.mustNotExist([]byte("key")) - err := s.client.Put([]byte("key"), []byte("")) + err := s.client.Put(context.Background(), []byte("key"), []byte("")) s.NotNil(err) } @@ -214,7 +215,7 @@ func (s *testRawKVSuite) TestRawBatch() { size := 0 var testKeys [][]byte var testValues [][]byte - for i := 0; size/(tikv.ConfigProbe{}.GetRawBatchPutSize()) < 4; i++ { + for i := 0; size/(rawkv.ConfigProbe{}.GetRawBatchPutSize()) < 4; i++ { key := fmt.Sprint("key", i) size += len(key) testKeys = append(testKeys, []byte(key)) diff --git a/rawkv/rawkv.go b/rawkv/rawkv.go index 896c62c7..a108321a 100644 --- a/rawkv/rawkv.go +++ b/rawkv/rawkv.go @@ -33,78 +33,262 @@ package rawkv import ( + "bytes" "context" + "time" + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/tikv/client-go/v2/config" - "github.com/tikv/client-go/v2/tikv" + tikverr "github.com/tikv/client-go/v2/error" + "github.com/tikv/client-go/v2/internal/client" + "github.com/tikv/client-go/v2/internal/kvrpc" + "github.com/tikv/client-go/v2/internal/locate" + "github.com/tikv/client-go/v2/internal/retry" + "github.com/tikv/client-go/v2/metrics" + "github.com/tikv/client-go/v2/tikvrpc" pd "github.com/tikv/pd/client" ) +var ( + // MaxRawKVScanLimit is the maximum scan limit for rawkv Scan. + MaxRawKVScanLimit = 10240 + // ErrMaxScanLimitExceeded is returned when the limit for rawkv Scan is to large. + ErrMaxScanLimitExceeded = errors.New("limit should be less than MaxRawKVScanLimit") +) + +const ( + // rawBatchPutSize is the maximum size limit for rawkv each batch put request. + rawBatchPutSize = 16 * 1024 + // rawBatchPairCount is the maximum limit for rawkv each batch get/delete request. + rawBatchPairCount = 512 +) + // Client is a client of TiKV server which is used as a key-value storage, // only GET/PUT/DELETE commands are supported. type Client struct { - client *tikv.RawKVClient + clusterID uint64 + regionCache *locate.RegionCache + pdClient pd.Client + rpcClient client.Client } // NewClient creates a client with PD cluster addrs. func NewClient(ctx context.Context, pdAddrs []string, security config.Security, opts ...pd.ClientOption) (*Client, error) { - client, err := tikv.NewRawKVClient(pdAddrs, security, opts...) + pdCli, err := pd.NewClient(pdAddrs, pd.SecurityOption{ + CAPath: security.ClusterSSLCA, + CertPath: security.ClusterSSLCert, + KeyPath: security.ClusterSSLKey, + }, opts...) if err != nil { - return nil, err + return nil, errors.Trace(err) } - return &Client{client: client}, nil + return &Client{ + clusterID: pdCli.GetClusterID(ctx), + regionCache: locate.NewRegionCache(pdCli), + pdClient: pdCli, + rpcClient: client.NewRPCClient(security), + }, nil } // Close closes the client. func (c *Client) Close() error { - return c.client.Close() + if c.pdClient != nil { + c.pdClient.Close() + } + if c.regionCache != nil { + c.regionCache.Close() + } + if c.rpcClient == nil { + return nil + } + return c.rpcClient.Close() } // ClusterID returns the TiKV cluster ID. func (c *Client) ClusterID() uint64 { - return c.client.ClusterID() + return c.clusterID } // Get queries value with the key. When the key does not exist, it returns `nil, nil`. -// TODO: use ctx after moving all rawkv code out. func (c *Client) Get(ctx context.Context, key []byte) ([]byte, error) { - return c.client.Get(key) + start := time.Now() + defer func() { metrics.RawkvCmdHistogramWithGet.Observe(time.Since(start).Seconds()) }() + + req := tikvrpc.NewRequest(tikvrpc.CmdRawGet, &kvrpcpb.RawGetRequest{Key: key}) + resp, _, err := c.sendReq(ctx, key, req, false) + if err != nil { + return nil, errors.Trace(err) + } + if resp.Resp == nil { + return nil, errors.Trace(tikverr.ErrBodyMissing) + } + cmdResp := resp.Resp.(*kvrpcpb.RawGetResponse) + if cmdResp.GetError() != "" { + return nil, errors.New(cmdResp.GetError()) + } + if len(cmdResp.Value) == 0 { + return nil, nil + } + return cmdResp.Value, nil } +const rawkvMaxBackoff = 20000 + // BatchGet queries values with the keys. -// TODO: use ctx after moving all rawkv code out. func (c *Client) BatchGet(ctx context.Context, keys [][]byte) ([][]byte, error) { - return c.client.BatchGet(keys) + start := time.Now() + defer func() { + metrics.RawkvCmdHistogramWithBatchGet.Observe(time.Since(start).Seconds()) + }() + + bo := retry.NewBackofferWithVars(ctx, rawkvMaxBackoff, nil) + resp, err := c.sendBatchReq(bo, keys, tikvrpc.CmdRawBatchGet) + if err != nil { + return nil, errors.Trace(err) + } + + if resp.Resp == nil { + return nil, errors.Trace(tikverr.ErrBodyMissing) + } + cmdResp := resp.Resp.(*kvrpcpb.RawBatchGetResponse) + + keyToValue := make(map[string][]byte, len(keys)) + for _, pair := range cmdResp.Pairs { + keyToValue[string(pair.Key)] = pair.Value + } + + values := make([][]byte, len(keys)) + for i, key := range keys { + values[i] = keyToValue[string(key)] + } + return values, nil } // Put stores a key-value pair to TiKV. -// TODO: use ctx after moving all rawkv code out. func (c *Client) Put(ctx context.Context, key, value []byte) error { - return c.client.Put(key, value) + start := time.Now() + defer func() { metrics.RawkvCmdHistogramWithBatchPut.Observe(time.Since(start).Seconds()) }() + metrics.RawkvSizeHistogramWithKey.Observe(float64(len(key))) + metrics.RawkvSizeHistogramWithValue.Observe(float64(len(value))) + + if len(value) == 0 { + return errors.New("empty value is not supported") + } + + req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ + Key: key, + Value: value, + }) + resp, _, err := c.sendReq(ctx, key, req, false) + if err != nil { + return errors.Trace(err) + } + if resp.Resp == nil { + return errors.Trace(tikverr.ErrBodyMissing) + } + cmdResp := resp.Resp.(*kvrpcpb.RawPutResponse) + if cmdResp.GetError() != "" { + return errors.New(cmdResp.GetError()) + } + return nil } // BatchPut stores key-value pairs to TiKV. -// TODO: use ctx after moving all rawkv code out. func (c *Client) BatchPut(ctx context.Context, keys, values [][]byte) error { - return c.client.BatchPut(keys, values) + start := time.Now() + defer func() { + metrics.RawkvCmdHistogramWithBatchPut.Observe(time.Since(start).Seconds()) + }() + + if len(keys) != len(values) { + return errors.New("the len of keys is not equal to the len of values") + } + for _, value := range values { + if len(value) == 0 { + return errors.New("empty value is not supported") + } + } + bo := retry.NewBackofferWithVars(ctx, rawkvMaxBackoff, nil) + err := c.sendBatchPut(bo, keys, values) + return errors.Trace(err) } // Delete deletes a key-value pair from TiKV. -// TODO: use ctx after moving all rawkv code out. func (c *Client) Delete(ctx context.Context, key []byte) error { - return c.client.Delete(key) + start := time.Now() + defer func() { metrics.RawkvCmdHistogramWithDelete.Observe(time.Since(start).Seconds()) }() + + req := tikvrpc.NewRequest(tikvrpc.CmdRawDelete, &kvrpcpb.RawDeleteRequest{ + Key: key, + }) + resp, _, err := c.sendReq(ctx, key, req, false) + if err != nil { + return errors.Trace(err) + } + if resp.Resp == nil { + return errors.Trace(tikverr.ErrBodyMissing) + } + cmdResp := resp.Resp.(*kvrpcpb.RawDeleteResponse) + if cmdResp.GetError() != "" { + return errors.New(cmdResp.GetError()) + } + return nil } // BatchDelete deletes key-value pairs from TiKV. -// TODO: use ctx after moving all rawkv code out. func (c *Client) BatchDelete(ctx context.Context, keys [][]byte) error { - return c.client.BatchDelete(keys) + start := time.Now() + defer func() { + metrics.RawkvCmdHistogramWithBatchDelete.Observe(time.Since(start).Seconds()) + }() + + bo := retry.NewBackofferWithVars(ctx, rawkvMaxBackoff, nil) + resp, err := c.sendBatchReq(bo, keys, tikvrpc.CmdRawBatchDelete) + if err != nil { + return errors.Trace(err) + } + if resp.Resp == nil { + return errors.Trace(tikverr.ErrBodyMissing) + } + cmdResp := resp.Resp.(*kvrpcpb.RawBatchDeleteResponse) + if cmdResp.GetError() != "" { + return errors.New(cmdResp.GetError()) + } + return nil } // DeleteRange deletes all key-value pairs in a range from TiKV. -// TODO: use ctx after moving all rawkv code out. func (c *Client) DeleteRange(ctx context.Context, startKey []byte, endKey []byte) error { - return c.client.DeleteRange(startKey, endKey) + start := time.Now() + var err error + defer func() { + var label = "delete_range" + if err != nil { + label += "_error" + } + metrics.TiKVRawkvCmdHistogram.WithLabelValues(label).Observe(time.Since(start).Seconds()) + }() + + // Process each affected region respectively + for !bytes.Equal(startKey, endKey) { + var resp *tikvrpc.Response + var actualEndKey []byte + resp, actualEndKey, err = c.sendDeleteRangeReq(ctx, startKey, endKey) + if err != nil { + return errors.Trace(err) + } + if resp.Resp == nil { + return errors.Trace(tikverr.ErrBodyMissing) + } + cmdResp := resp.Resp.(*kvrpcpb.RawDeleteRangeResponse) + if cmdResp.GetError() != "" { + return errors.New(cmdResp.GetError()) + } + startKey = actualEndKey + } + + return nil } // Scan queries continuous kv pairs in range [startKey, endKey), up to limit pairs. @@ -112,9 +296,38 @@ func (c *Client) DeleteRange(ctx context.Context, startKey []byte, endKey []byte // If you want to exclude the startKey or include the endKey, push a '\0' to the key. For example, to scan // (startKey, endKey], you can write: // `Scan(ctx, push(startKey, '\0'), push(endKey, '\0'), limit)`. -// TODO: use ctx after moving all rawkv code out. func (c *Client) Scan(ctx context.Context, startKey, endKey []byte, limit int) (keys [][]byte, values [][]byte, err error) { - return c.client.Scan(startKey, endKey, limit) + start := time.Now() + defer func() { metrics.RawkvCmdHistogramWithRawScan.Observe(time.Since(start).Seconds()) }() + + if limit > MaxRawKVScanLimit { + return nil, nil, errors.Trace(ErrMaxScanLimitExceeded) + } + + for len(keys) < limit && (len(endKey) == 0 || bytes.Compare(startKey, endKey) < 0) { + req := tikvrpc.NewRequest(tikvrpc.CmdRawScan, &kvrpcpb.RawScanRequest{ + StartKey: startKey, + EndKey: endKey, + Limit: uint32(limit - len(keys)), + }) + resp, loc, err := c.sendReq(ctx, startKey, req, false) + if err != nil { + return nil, nil, errors.Trace(err) + } + if resp.Resp == nil { + return nil, nil, errors.Trace(tikverr.ErrBodyMissing) + } + cmdResp := resp.Resp.(*kvrpcpb.RawScanResponse) + for _, pair := range cmdResp.Kvs { + keys = append(keys, pair.Key) + values = append(values, pair.Value) + } + startKey = loc.EndKey + if len(startKey) == 0 { + break + } + } + return } // ReverseScan queries continuous kv pairs in range [endKey, startKey), up to limit pairs. @@ -124,7 +337,290 @@ func (c *Client) Scan(ctx context.Context, startKey, endKey []byte, limit int) ( // (endKey, startKey], you can write: // `ReverseScan(ctx, push(startKey, '\0'), push(endKey, '\0'), limit)`. // It doesn't support Scanning from "", because locating the last Region is not yet implemented. -// TODO: use ctx after moving all rawkv code out. func (c *Client) ReverseScan(ctx context.Context, startKey, endKey []byte, limit int) (keys [][]byte, values [][]byte, err error) { - return c.client.ReverseScan(startKey, endKey, limit) + start := time.Now() + defer func() { + metrics.RawkvCmdHistogramWithRawReversScan.Observe(time.Since(start).Seconds()) + }() + + if limit > MaxRawKVScanLimit { + return nil, nil, errors.Trace(ErrMaxScanLimitExceeded) + } + + for len(keys) < limit && bytes.Compare(startKey, endKey) > 0 { + req := tikvrpc.NewRequest(tikvrpc.CmdRawScan, &kvrpcpb.RawScanRequest{ + StartKey: startKey, + EndKey: endKey, + Limit: uint32(limit - len(keys)), + Reverse: true, + }) + resp, loc, err := c.sendReq(ctx, startKey, req, true) + if err != nil { + return nil, nil, errors.Trace(err) + } + if resp.Resp == nil { + return nil, nil, errors.Trace(tikverr.ErrBodyMissing) + } + cmdResp := resp.Resp.(*kvrpcpb.RawScanResponse) + for _, pair := range cmdResp.Kvs { + keys = append(keys, pair.Key) + values = append(values, pair.Value) + } + startKey = loc.StartKey + if len(startKey) == 0 { + break + } + } + return +} + +func (c *Client) sendReq(ctx context.Context, key []byte, req *tikvrpc.Request, reverse bool) (*tikvrpc.Response, *locate.KeyLocation, error) { + bo := retry.NewBackofferWithVars(ctx, rawkvMaxBackoff, nil) + sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient) + for { + var loc *locate.KeyLocation + var err error + if reverse { + loc, err = c.regionCache.LocateEndKey(bo, key) + } else { + loc, err = c.regionCache.LocateKey(bo, key) + } + if err != nil { + return nil, nil, errors.Trace(err) + } + resp, err := sender.SendReq(bo, req, loc.Region, client.ReadTimeoutShort) + if err != nil { + return nil, nil, errors.Trace(err) + } + regionErr, err := resp.GetRegionError() + if err != nil { + return nil, nil, errors.Trace(err) + } + if regionErr != nil { + err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) + if err != nil { + return nil, nil, errors.Trace(err) + } + continue + } + return resp, loc, nil + } +} + +func (c *Client) sendBatchReq(bo *retry.Backoffer, keys [][]byte, cmdType tikvrpc.CmdType) (*tikvrpc.Response, error) { // split the keys + groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil) + if err != nil { + return nil, errors.Trace(err) + } + + var batches []kvrpc.Batch + for regionID, groupKeys := range groups { + batches = kvrpc.AppendKeyBatches(batches, regionID, groupKeys, rawBatchPairCount) + } + bo, cancel := bo.Fork() + ches := make(chan kvrpc.BatchResult, len(batches)) + for _, batch := range batches { + batch1 := batch + go func() { + singleBatchBackoffer, singleBatchCancel := bo.Fork() + defer singleBatchCancel() + ches <- c.doBatchReq(singleBatchBackoffer, batch1, cmdType) + }() + } + + var firstError error + var resp *tikvrpc.Response + switch cmdType { + case tikvrpc.CmdRawBatchGet: + resp = &tikvrpc.Response{Resp: &kvrpcpb.RawBatchGetResponse{}} + case tikvrpc.CmdRawBatchDelete: + resp = &tikvrpc.Response{Resp: &kvrpcpb.RawBatchDeleteResponse{}} + } + for i := 0; i < len(batches); i++ { + singleResp, ok := <-ches + if ok { + if singleResp.Error != nil { + cancel() + if firstError == nil { + firstError = singleResp.Error + } + } else if cmdType == tikvrpc.CmdRawBatchGet { + cmdResp := singleResp.Resp.(*kvrpcpb.RawBatchGetResponse) + resp.Resp.(*kvrpcpb.RawBatchGetResponse).Pairs = append(resp.Resp.(*kvrpcpb.RawBatchGetResponse).Pairs, cmdResp.Pairs...) + } + } + } + + return resp, firstError +} + +func (c *Client) doBatchReq(bo *retry.Backoffer, batch kvrpc.Batch, cmdType tikvrpc.CmdType) kvrpc.BatchResult { + var req *tikvrpc.Request + switch cmdType { + case tikvrpc.CmdRawBatchGet: + req = tikvrpc.NewRequest(cmdType, &kvrpcpb.RawBatchGetRequest{ + Keys: batch.Keys, + }) + case tikvrpc.CmdRawBatchDelete: + req = tikvrpc.NewRequest(cmdType, &kvrpcpb.RawBatchDeleteRequest{ + Keys: batch.Keys, + }) + } + + sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient) + resp, err := sender.SendReq(bo, req, batch.RegionID, client.ReadTimeoutShort) + + batchResp := kvrpc.BatchResult{} + if err != nil { + batchResp.Error = errors.Trace(err) + return batchResp + } + regionErr, err := resp.GetRegionError() + if err != nil { + batchResp.Error = errors.Trace(err) + return batchResp + } + if regionErr != nil { + err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) + if err != nil { + batchResp.Error = errors.Trace(err) + return batchResp + } + resp, err = c.sendBatchReq(bo, batch.Keys, cmdType) + batchResp.Response = resp + batchResp.Error = err + return batchResp + } + + switch cmdType { + case tikvrpc.CmdRawBatchGet: + batchResp.Response = resp + case tikvrpc.CmdRawBatchDelete: + if resp.Resp == nil { + batchResp.Error = errors.Trace(tikverr.ErrBodyMissing) + return batchResp + } + cmdResp := resp.Resp.(*kvrpcpb.RawBatchDeleteResponse) + if cmdResp.GetError() != "" { + batchResp.Error = errors.New(cmdResp.GetError()) + return batchResp + } + batchResp.Response = resp + } + return batchResp +} + +// sendDeleteRangeReq sends a raw delete range request and returns the response and the actual endKey. +// If the given range spans over more than one regions, the actual endKey is the end of the first region. +// We can't use sendReq directly, because we need to know the end of the region before we send the request +// TODO: Is there any better way to avoid duplicating code with func `sendReq` ? +func (c *Client) sendDeleteRangeReq(ctx context.Context, startKey []byte, endKey []byte) (*tikvrpc.Response, []byte, error) { + bo := retry.NewBackofferWithVars(ctx, rawkvMaxBackoff, nil) + sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient) + for { + loc, err := c.regionCache.LocateKey(bo, startKey) + if err != nil { + return nil, nil, errors.Trace(err) + } + + actualEndKey := endKey + if len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, endKey) < 0 { + actualEndKey = loc.EndKey + } + + req := tikvrpc.NewRequest(tikvrpc.CmdRawDeleteRange, &kvrpcpb.RawDeleteRangeRequest{ + StartKey: startKey, + EndKey: actualEndKey, + }) + + resp, err := sender.SendReq(bo, req, loc.Region, client.ReadTimeoutShort) + if err != nil { + return nil, nil, errors.Trace(err) + } + regionErr, err := resp.GetRegionError() + if err != nil { + return nil, nil, errors.Trace(err) + } + if regionErr != nil { + err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) + if err != nil { + return nil, nil, errors.Trace(err) + } + continue + } + return resp, actualEndKey, nil + } +} + +func (c *Client) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte) error { + keyToValue := make(map[string][]byte, len(keys)) + for i, key := range keys { + keyToValue[string(key)] = values[i] + } + groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil) + if err != nil { + return errors.Trace(err) + } + var batches []kvrpc.Batch + // split the keys by size and RegionVerID + for regionID, groupKeys := range groups { + batches = kvrpc.AppendBatches(batches, regionID, groupKeys, keyToValue, rawBatchPutSize) + } + bo, cancel := bo.Fork() + ch := make(chan error, len(batches)) + for _, batch := range batches { + batch1 := batch + go func() { + singleBatchBackoffer, singleBatchCancel := bo.Fork() + defer singleBatchCancel() + ch <- c.doBatchPut(singleBatchBackoffer, batch1) + }() + } + + for i := 0; i < len(batches); i++ { + if e := <-ch; e != nil { + cancel() + // catch the first error + if err == nil { + err = e + } + } + } + return errors.Trace(err) +} + +func (c *Client) doBatchPut(bo *retry.Backoffer, batch kvrpc.Batch) error { + kvPair := make([]*kvrpcpb.KvPair, 0, len(batch.Keys)) + for i, key := range batch.Keys { + kvPair = append(kvPair, &kvrpcpb.KvPair{Key: key, Value: batch.Values[i]}) + } + + req := tikvrpc.NewRequest(tikvrpc.CmdRawBatchPut, &kvrpcpb.RawBatchPutRequest{Pairs: kvPair}) + + sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient) + resp, err := sender.SendReq(bo, req, batch.RegionID, client.ReadTimeoutShort) + if err != nil { + return errors.Trace(err) + } + regionErr, err := resp.GetRegionError() + if err != nil { + return errors.Trace(err) + } + if regionErr != nil { + err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) + if err != nil { + return errors.Trace(err) + } + // recursive call + return c.sendBatchPut(bo, batch.Keys, batch.Values) + } + + if resp.Resp == nil { + return errors.Trace(tikverr.ErrBodyMissing) + } + cmdResp := resp.Resp.(*kvrpcpb.RawBatchPutResponse) + if cmdResp.GetError() != "" { + return errors.New(cmdResp.GetError()) + } + return nil } diff --git a/tikv/rawkv_test.go b/rawkv/rawkv_test.go similarity index 85% rename from tikv/rawkv_test.go rename to rawkv/rawkv_test.go index 597560ea..3c7d7dd4 100644 --- a/tikv/rawkv_test.go +++ b/rawkv/rawkv_test.go @@ -30,7 +30,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tikv +package rawkv import ( "context" @@ -38,6 +38,7 @@ import ( "testing" "github.com/stretchr/testify/suite" + "github.com/tikv/client-go/v2/internal/locate" "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/kv" @@ -83,15 +84,15 @@ func (s *testRawkvSuite) TestReplaceAddrWithNewStore() { mvccStore := mocktikv.MustNewMVCCStore() defer mvccStore.Close() - client := &RawKVClient{ + client := &Client{ clusterID: 0, - regionCache: NewRegionCache(mocktikv.NewPDClient(s.cluster)), + regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), } defer client.Close() testKey := []byte("test_key") testValue := []byte("test_value") - err := client.Put(testKey, testValue) + err := client.Put(context.Background(), testKey, testValue) s.Nil(err) // make store2 using store1's addr and store1 offline @@ -102,7 +103,7 @@ func (s *testRawkvSuite) TestReplaceAddrWithNewStore() { s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) - getVal, err := client.Get(testKey) + getVal, err := client.Get(context.Background(), testKey) s.Nil(err) s.Equal(getVal, testValue) @@ -112,22 +113,22 @@ func (s *testRawkvSuite) TestUpdateStoreAddr() { mvccStore := mocktikv.MustNewMVCCStore() defer mvccStore.Close() - client := &RawKVClient{ + client := &Client{ clusterID: 0, - regionCache: NewRegionCache(mocktikv.NewPDClient(s.cluster)), + regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), } defer client.Close() testKey := []byte("test_key") testValue := []byte("test_value") - err := client.Put(testKey, testValue) + err := client.Put(context.Background(), testKey, testValue) s.Nil(err) // tikv-server reports `StoreNotMatch` And retry store1Addr := s.storeAddr(s.store1) s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2)) s.cluster.UpdateStoreAddr(s.store2, store1Addr) - getVal, err := client.Get(testKey) + getVal, err := client.Get(context.Background(), testKey) s.Nil(err) s.Equal(getVal, testValue) @@ -137,15 +138,15 @@ func (s *testRawkvSuite) TestReplaceNewAddrAndOldOfflineImmediately() { mvccStore := mocktikv.MustNewMVCCStore() defer mvccStore.Close() - client := &RawKVClient{ + client := &Client{ clusterID: 0, - regionCache: NewRegionCache(mocktikv.NewPDClient(s.cluster)), + regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), } defer client.Close() testKey := []byte("test_key") testValue := []byte("test_value") - err := client.Put(testKey, testValue) + err := client.Put(context.Background(), testKey, testValue) s.Nil(err) // pre-load store2's address into cache via follower-read. @@ -164,7 +165,7 @@ func (s *testRawkvSuite) TestReplaceNewAddrAndOldOfflineImmediately() { s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) - getVal, err := client.Get(testKey) + getVal, err := client.Get(context.Background(), testKey) s.Nil(err) s.Equal(getVal, testValue) } @@ -173,15 +174,15 @@ func (s *testRawkvSuite) TestReplaceStore() { mvccStore := mocktikv.MustNewMVCCStore() defer mvccStore.Close() - client := &RawKVClient{ + client := &Client{ clusterID: 0, - regionCache: NewRegionCache(mocktikv.NewPDClient(s.cluster)), + regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), } defer client.Close() testKey := []byte("test_key") testValue := []byte("test_value") - err := client.Put(testKey, testValue) + err := client.Put(context.Background(), testKey, testValue) s.Nil(err) s.cluster.MarkTombstone(s.store1) @@ -192,6 +193,6 @@ func (s *testRawkvSuite) TestReplaceStore() { s.cluster.RemovePeer(s.region1, s.peer1) s.cluster.ChangeLeader(s.region1, peer3) - err = client.Put(testKey, testValue) + err = client.Put(context.Background(), testKey, testValue) s.Nil(err) } diff --git a/rawkv/test_prob.go b/rawkv/test_prob.go new file mode 100644 index 00000000..2073ec15 --- /dev/null +++ b/rawkv/test_prob.go @@ -0,0 +1,53 @@ +// Copyright 2021 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package rawkv + +import ( + "github.com/tikv/client-go/v2/internal/client" + "github.com/tikv/client-go/v2/internal/locate" + pd "github.com/tikv/pd/client" +) + +// ClientProbe wraps RawKVClient and exposes internal states for testing purpose. +type ClientProbe struct { + *Client +} + +// GetRegionCache returns the internal region cache container. +func (c ClientProbe) GetRegionCache() *locate.RegionCache { + return c.regionCache +} + +// SetRegionCache resets the internal region cache container. +func (c ClientProbe) SetRegionCache(regionCache *locate.RegionCache) { + c.regionCache = regionCache +} + +// SetPDClient resets the interval PD client. +func (c ClientProbe) SetPDClient(client pd.Client) { + c.pdClient = client +} + +// SetRPCClient resets the internal RPC client. +func (c ClientProbe) SetRPCClient(client client.Client) { + c.rpcClient = client +} + +// ConfigProbe exposes configurations and global variables for testing purpose. +type ConfigProbe struct{} + +// GetRawBatchPutSize returns the raw batch put size config. +func (c ConfigProbe) GetRawBatchPutSize() int { + return rawBatchPutSize +} diff --git a/tikv/rawkv.go b/tikv/rawkv.go deleted file mode 100644 index 2554e406..00000000 --- a/tikv/rawkv.go +++ /dev/null @@ -1,626 +0,0 @@ -// Copyright 2021 TiKV Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -// NOTE: The code in this file is based on code from the -// TiDB project, licensed under the Apache License v 2.0 -// -// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/rawkv.go -// - -// Copyright 2016 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package tikv - -import ( - "bytes" - "context" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/tikv/client-go/v2/config" - tikverr "github.com/tikv/client-go/v2/error" - "github.com/tikv/client-go/v2/internal/client" - "github.com/tikv/client-go/v2/internal/kvrpc" - "github.com/tikv/client-go/v2/internal/locate" - "github.com/tikv/client-go/v2/internal/retry" - "github.com/tikv/client-go/v2/metrics" - "github.com/tikv/client-go/v2/tikvrpc" - pd "github.com/tikv/pd/client" -) - -var ( - // MaxRawKVScanLimit is the maximum scan limit for rawkv Scan. - MaxRawKVScanLimit = 10240 - // ErrMaxScanLimitExceeded is returned when the limit for rawkv Scan is to large. - ErrMaxScanLimitExceeded = errors.New("limit should be less than MaxRawKVScanLimit") -) - -const ( - // rawBatchPutSize is the maximum size limit for rawkv each batch put request. - rawBatchPutSize = 16 * 1024 - // rawBatchPairCount is the maximum limit for rawkv each batch get/delete request. - rawBatchPairCount = 512 -) - -// RawKVClient is a client of TiKV server which is used as a key-value storage, -// only GET/PUT/DELETE commands are supported. -type RawKVClient struct { - clusterID uint64 - regionCache *locate.RegionCache - pdClient pd.Client - rpcClient Client -} - -// NewRawKVClient creates a client with PD cluster addrs. -func NewRawKVClient(pdAddrs []string, security config.Security, opts ...pd.ClientOption) (*RawKVClient, error) { - pdCli, err := pd.NewClient(pdAddrs, pd.SecurityOption{ - CAPath: security.ClusterSSLCA, - CertPath: security.ClusterSSLCert, - KeyPath: security.ClusterSSLKey, - }, opts...) - if err != nil { - return nil, errors.Trace(err) - } - return &RawKVClient{ - clusterID: pdCli.GetClusterID(context.TODO()), - regionCache: locate.NewRegionCache(pdCli), - pdClient: pdCli, - rpcClient: client.NewRPCClient(security), - }, nil -} - -// Close closes the client. -func (c *RawKVClient) Close() error { - if c.pdClient != nil { - c.pdClient.Close() - } - if c.regionCache != nil { - c.regionCache.Close() - } - if c.rpcClient == nil { - return nil - } - return c.rpcClient.Close() -} - -// ClusterID returns the TiKV cluster ID. -func (c *RawKVClient) ClusterID() uint64 { - return c.clusterID -} - -// Get queries value with the key. When the key does not exist, it returns `nil, nil`. -func (c *RawKVClient) Get(key []byte) ([]byte, error) { - start := time.Now() - defer func() { metrics.RawkvCmdHistogramWithGet.Observe(time.Since(start).Seconds()) }() - - req := tikvrpc.NewRequest(tikvrpc.CmdRawGet, &kvrpcpb.RawGetRequest{Key: key}) - resp, _, err := c.sendReq(key, req, false) - if err != nil { - return nil, errors.Trace(err) - } - if resp.Resp == nil { - return nil, errors.Trace(tikverr.ErrBodyMissing) - } - cmdResp := resp.Resp.(*kvrpcpb.RawGetResponse) - if cmdResp.GetError() != "" { - return nil, errors.New(cmdResp.GetError()) - } - if len(cmdResp.Value) == 0 { - return nil, nil - } - return cmdResp.Value, nil -} - -const rawkvMaxBackoff = 20000 - -// BatchGet queries values with the keys. -func (c *RawKVClient) BatchGet(keys [][]byte) ([][]byte, error) { - start := time.Now() - defer func() { - metrics.RawkvCmdHistogramWithBatchGet.Observe(time.Since(start).Seconds()) - }() - - bo := retry.NewBackofferWithVars(context.Background(), rawkvMaxBackoff, nil) - resp, err := c.sendBatchReq(bo, keys, tikvrpc.CmdRawBatchGet) - if err != nil { - return nil, errors.Trace(err) - } - - if resp.Resp == nil { - return nil, errors.Trace(tikverr.ErrBodyMissing) - } - cmdResp := resp.Resp.(*kvrpcpb.RawBatchGetResponse) - - keyToValue := make(map[string][]byte, len(keys)) - for _, pair := range cmdResp.Pairs { - keyToValue[string(pair.Key)] = pair.Value - } - - values := make([][]byte, len(keys)) - for i, key := range keys { - values[i] = keyToValue[string(key)] - } - return values, nil -} - -// Put stores a key-value pair to TiKV. -func (c *RawKVClient) Put(key, value []byte) error { - start := time.Now() - defer func() { metrics.RawkvCmdHistogramWithBatchPut.Observe(time.Since(start).Seconds()) }() - metrics.RawkvSizeHistogramWithKey.Observe(float64(len(key))) - metrics.RawkvSizeHistogramWithValue.Observe(float64(len(value))) - - if len(value) == 0 { - return errors.New("empty value is not supported") - } - - req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ - Key: key, - Value: value, - }) - resp, _, err := c.sendReq(key, req, false) - if err != nil { - return errors.Trace(err) - } - if resp.Resp == nil { - return errors.Trace(tikverr.ErrBodyMissing) - } - cmdResp := resp.Resp.(*kvrpcpb.RawPutResponse) - if cmdResp.GetError() != "" { - return errors.New(cmdResp.GetError()) - } - return nil -} - -// BatchPut stores key-value pairs to TiKV. -func (c *RawKVClient) BatchPut(keys, values [][]byte) error { - start := time.Now() - defer func() { - metrics.RawkvCmdHistogramWithBatchPut.Observe(time.Since(start).Seconds()) - }() - - if len(keys) != len(values) { - return errors.New("the len of keys is not equal to the len of values") - } - for _, value := range values { - if len(value) == 0 { - return errors.New("empty value is not supported") - } - } - bo := retry.NewBackofferWithVars(context.Background(), rawkvMaxBackoff, nil) - err := c.sendBatchPut(bo, keys, values) - return errors.Trace(err) -} - -// Delete deletes a key-value pair from TiKV. -func (c *RawKVClient) Delete(key []byte) error { - start := time.Now() - defer func() { metrics.RawkvCmdHistogramWithDelete.Observe(time.Since(start).Seconds()) }() - - req := tikvrpc.NewRequest(tikvrpc.CmdRawDelete, &kvrpcpb.RawDeleteRequest{ - Key: key, - }) - resp, _, err := c.sendReq(key, req, false) - if err != nil { - return errors.Trace(err) - } - if resp.Resp == nil { - return errors.Trace(tikverr.ErrBodyMissing) - } - cmdResp := resp.Resp.(*kvrpcpb.RawDeleteResponse) - if cmdResp.GetError() != "" { - return errors.New(cmdResp.GetError()) - } - return nil -} - -// BatchDelete deletes key-value pairs from TiKV -func (c *RawKVClient) BatchDelete(keys [][]byte) error { - start := time.Now() - defer func() { - metrics.RawkvCmdHistogramWithBatchDelete.Observe(time.Since(start).Seconds()) - }() - - bo := retry.NewBackofferWithVars(context.Background(), rawkvMaxBackoff, nil) - resp, err := c.sendBatchReq(bo, keys, tikvrpc.CmdRawBatchDelete) - if err != nil { - return errors.Trace(err) - } - if resp.Resp == nil { - return errors.Trace(tikverr.ErrBodyMissing) - } - cmdResp := resp.Resp.(*kvrpcpb.RawBatchDeleteResponse) - if cmdResp.GetError() != "" { - return errors.New(cmdResp.GetError()) - } - return nil -} - -// DeleteRange deletes all key-value pairs in a range from TiKV -func (c *RawKVClient) DeleteRange(startKey []byte, endKey []byte) error { - start := time.Now() - var err error - defer func() { - var label = "delete_range" - if err != nil { - label += "_error" - } - metrics.TiKVRawkvCmdHistogram.WithLabelValues(label).Observe(time.Since(start).Seconds()) - }() - - // Process each affected region respectively - for !bytes.Equal(startKey, endKey) { - var resp *tikvrpc.Response - var actualEndKey []byte - resp, actualEndKey, err = c.sendDeleteRangeReq(startKey, endKey) - if err != nil { - return errors.Trace(err) - } - if resp.Resp == nil { - return errors.Trace(tikverr.ErrBodyMissing) - } - cmdResp := resp.Resp.(*kvrpcpb.RawDeleteRangeResponse) - if cmdResp.GetError() != "" { - return errors.New(cmdResp.GetError()) - } - startKey = actualEndKey - } - - return nil -} - -// Scan queries continuous kv pairs in range [startKey, endKey), up to limit pairs. -// If endKey is empty, it means unbounded. -// If you want to exclude the startKey or include the endKey, push a '\0' to the key. For example, to scan -// (startKey, endKey], you can write: -// `Scan(push(startKey, '\0'), push(endKey, '\0'), limit)`. -func (c *RawKVClient) Scan(startKey, endKey []byte, limit int) (keys [][]byte, values [][]byte, err error) { - start := time.Now() - defer func() { metrics.RawkvCmdHistogramWithRawScan.Observe(time.Since(start).Seconds()) }() - - if limit > MaxRawKVScanLimit { - return nil, nil, errors.Trace(ErrMaxScanLimitExceeded) - } - - for len(keys) < limit && (len(endKey) == 0 || bytes.Compare(startKey, endKey) < 0) { - req := tikvrpc.NewRequest(tikvrpc.CmdRawScan, &kvrpcpb.RawScanRequest{ - StartKey: startKey, - EndKey: endKey, - Limit: uint32(limit - len(keys)), - }) - resp, loc, err := c.sendReq(startKey, req, false) - if err != nil { - return nil, nil, errors.Trace(err) - } - if resp.Resp == nil { - return nil, nil, errors.Trace(tikverr.ErrBodyMissing) - } - cmdResp := resp.Resp.(*kvrpcpb.RawScanResponse) - for _, pair := range cmdResp.Kvs { - keys = append(keys, pair.Key) - values = append(values, pair.Value) - } - startKey = loc.EndKey - if len(startKey) == 0 { - break - } - } - return -} - -// ReverseScan queries continuous kv pairs in range [endKey, startKey), up to limit pairs. -// Direction is different from Scan, upper to lower. -// If endKey is empty, it means unbounded. -// If you want to include the startKey or exclude the endKey, push a '\0' to the key. For example, to scan -// (endKey, startKey], you can write: -// `ReverseScan(push(startKey, '\0'), push(endKey, '\0'), limit)`. -// It doesn't support Scanning from "", because locating the last Region is not yet implemented. -func (c *RawKVClient) ReverseScan(startKey, endKey []byte, limit int) (keys [][]byte, values [][]byte, err error) { - start := time.Now() - defer func() { - metrics.RawkvCmdHistogramWithRawReversScan.Observe(time.Since(start).Seconds()) - }() - - if limit > MaxRawKVScanLimit { - return nil, nil, errors.Trace(ErrMaxScanLimitExceeded) - } - - for len(keys) < limit && bytes.Compare(startKey, endKey) > 0 { - req := tikvrpc.NewRequest(tikvrpc.CmdRawScan, &kvrpcpb.RawScanRequest{ - StartKey: startKey, - EndKey: endKey, - Limit: uint32(limit - len(keys)), - Reverse: true, - }) - resp, loc, err := c.sendReq(startKey, req, true) - if err != nil { - return nil, nil, errors.Trace(err) - } - if resp.Resp == nil { - return nil, nil, errors.Trace(tikverr.ErrBodyMissing) - } - cmdResp := resp.Resp.(*kvrpcpb.RawScanResponse) - for _, pair := range cmdResp.Kvs { - keys = append(keys, pair.Key) - values = append(values, pair.Value) - } - startKey = loc.StartKey - if len(startKey) == 0 { - break - } - } - return -} - -func (c *RawKVClient) sendReq(key []byte, req *tikvrpc.Request, reverse bool) (*tikvrpc.Response, *locate.KeyLocation, error) { - bo := retry.NewBackofferWithVars(context.Background(), rawkvMaxBackoff, nil) - sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient) - for { - var loc *locate.KeyLocation - var err error - if reverse { - loc, err = c.regionCache.LocateEndKey(bo, key) - } else { - loc, err = c.regionCache.LocateKey(bo, key) - } - if err != nil { - return nil, nil, errors.Trace(err) - } - resp, err := sender.SendReq(bo, req, loc.Region, client.ReadTimeoutShort) - if err != nil { - return nil, nil, errors.Trace(err) - } - regionErr, err := resp.GetRegionError() - if err != nil { - return nil, nil, errors.Trace(err) - } - if regionErr != nil { - err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) - if err != nil { - return nil, nil, errors.Trace(err) - } - continue - } - return resp, loc, nil - } -} - -func (c *RawKVClient) sendBatchReq(bo *Backoffer, keys [][]byte, cmdType tikvrpc.CmdType) (*tikvrpc.Response, error) { // split the keys - groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil) - if err != nil { - return nil, errors.Trace(err) - } - - var batches []kvrpc.Batch - for regionID, groupKeys := range groups { - batches = kvrpc.AppendKeyBatches(batches, regionID, groupKeys, rawBatchPairCount) - } - bo, cancel := bo.Fork() - ches := make(chan kvrpc.BatchResult, len(batches)) - for _, batch := range batches { - batch1 := batch - go func() { - singleBatchBackoffer, singleBatchCancel := bo.Fork() - defer singleBatchCancel() - ches <- c.doBatchReq(singleBatchBackoffer, batch1, cmdType) - }() - } - - var firstError error - var resp *tikvrpc.Response - switch cmdType { - case tikvrpc.CmdRawBatchGet: - resp = &tikvrpc.Response{Resp: &kvrpcpb.RawBatchGetResponse{}} - case tikvrpc.CmdRawBatchDelete: - resp = &tikvrpc.Response{Resp: &kvrpcpb.RawBatchDeleteResponse{}} - } - for i := 0; i < len(batches); i++ { - singleResp, ok := <-ches - if ok { - if singleResp.Error != nil { - cancel() - if firstError == nil { - firstError = singleResp.Error - } - } else if cmdType == tikvrpc.CmdRawBatchGet { - cmdResp := singleResp.Resp.(*kvrpcpb.RawBatchGetResponse) - resp.Resp.(*kvrpcpb.RawBatchGetResponse).Pairs = append(resp.Resp.(*kvrpcpb.RawBatchGetResponse).Pairs, cmdResp.Pairs...) - } - } - } - - return resp, firstError -} - -func (c *RawKVClient) doBatchReq(bo *Backoffer, batch kvrpc.Batch, cmdType tikvrpc.CmdType) kvrpc.BatchResult { - var req *tikvrpc.Request - switch cmdType { - case tikvrpc.CmdRawBatchGet: - req = tikvrpc.NewRequest(cmdType, &kvrpcpb.RawBatchGetRequest{ - Keys: batch.Keys, - }) - case tikvrpc.CmdRawBatchDelete: - req = tikvrpc.NewRequest(cmdType, &kvrpcpb.RawBatchDeleteRequest{ - Keys: batch.Keys, - }) - } - - sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient) - resp, err := sender.SendReq(bo, req, batch.RegionID, client.ReadTimeoutShort) - - batchResp := kvrpc.BatchResult{} - if err != nil { - batchResp.Error = errors.Trace(err) - return batchResp - } - regionErr, err := resp.GetRegionError() - if err != nil { - batchResp.Error = errors.Trace(err) - return batchResp - } - if regionErr != nil { - err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) - if err != nil { - batchResp.Error = errors.Trace(err) - return batchResp - } - resp, err = c.sendBatchReq(bo, batch.Keys, cmdType) - batchResp.Response = resp - batchResp.Error = err - return batchResp - } - - switch cmdType { - case tikvrpc.CmdRawBatchGet: - batchResp.Response = resp - case tikvrpc.CmdRawBatchDelete: - if resp.Resp == nil { - batchResp.Error = errors.Trace(tikverr.ErrBodyMissing) - return batchResp - } - cmdResp := resp.Resp.(*kvrpcpb.RawBatchDeleteResponse) - if cmdResp.GetError() != "" { - batchResp.Error = errors.New(cmdResp.GetError()) - return batchResp - } - batchResp.Response = resp - } - return batchResp -} - -// sendDeleteRangeReq sends a raw delete range request and returns the response and the actual endKey. -// If the given range spans over more than one regions, the actual endKey is the end of the first region. -// We can't use sendReq directly, because we need to know the end of the region before we send the request -// TODO: Is there any better way to avoid duplicating code with func `sendReq` ? -func (c *RawKVClient) sendDeleteRangeReq(startKey []byte, endKey []byte) (*tikvrpc.Response, []byte, error) { - bo := retry.NewBackofferWithVars(context.Background(), rawkvMaxBackoff, nil) - sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient) - for { - loc, err := c.regionCache.LocateKey(bo, startKey) - if err != nil { - return nil, nil, errors.Trace(err) - } - - actualEndKey := endKey - if len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, endKey) < 0 { - actualEndKey = loc.EndKey - } - - req := tikvrpc.NewRequest(tikvrpc.CmdRawDeleteRange, &kvrpcpb.RawDeleteRangeRequest{ - StartKey: startKey, - EndKey: actualEndKey, - }) - - resp, err := sender.SendReq(bo, req, loc.Region, client.ReadTimeoutShort) - if err != nil { - return nil, nil, errors.Trace(err) - } - regionErr, err := resp.GetRegionError() - if err != nil { - return nil, nil, errors.Trace(err) - } - if regionErr != nil { - err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) - if err != nil { - return nil, nil, errors.Trace(err) - } - continue - } - return resp, actualEndKey, nil - } -} - -func (c *RawKVClient) sendBatchPut(bo *Backoffer, keys, values [][]byte) error { - keyToValue := make(map[string][]byte, len(keys)) - for i, key := range keys { - keyToValue[string(key)] = values[i] - } - groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil) - if err != nil { - return errors.Trace(err) - } - var batches []kvrpc.Batch - // split the keys by size and RegionVerID - for regionID, groupKeys := range groups { - batches = kvrpc.AppendBatches(batches, regionID, groupKeys, keyToValue, rawBatchPutSize) - } - bo, cancel := bo.Fork() - ch := make(chan error, len(batches)) - for _, batch := range batches { - batch1 := batch - go func() { - singleBatchBackoffer, singleBatchCancel := bo.Fork() - defer singleBatchCancel() - ch <- c.doBatchPut(singleBatchBackoffer, batch1) - }() - } - - for i := 0; i < len(batches); i++ { - if e := <-ch; e != nil { - cancel() - // catch the first error - if err == nil { - err = e - } - } - } - return errors.Trace(err) -} - -func (c *RawKVClient) doBatchPut(bo *Backoffer, batch kvrpc.Batch) error { - kvPair := make([]*kvrpcpb.KvPair, 0, len(batch.Keys)) - for i, key := range batch.Keys { - kvPair = append(kvPair, &kvrpcpb.KvPair{Key: key, Value: batch.Values[i]}) - } - - req := tikvrpc.NewRequest(tikvrpc.CmdRawBatchPut, &kvrpcpb.RawBatchPutRequest{Pairs: kvPair}) - - sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient) - resp, err := sender.SendReq(bo, req, batch.RegionID, client.ReadTimeoutShort) - if err != nil { - return errors.Trace(err) - } - regionErr, err := resp.GetRegionError() - if err != nil { - return errors.Trace(err) - } - if regionErr != nil { - err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) - if err != nil { - return errors.Trace(err) - } - // recursive call - return c.sendBatchPut(bo, batch.Keys, batch.Values) - } - - if resp.Resp == nil { - return errors.Trace(tikverr.ErrBodyMissing) - } - cmdResp := resp.Resp.(*kvrpcpb.RawBatchPutResponse) - if cmdResp.GetError() != "" { - return errors.New(cmdResp.GetError()) - } - return nil -} diff --git a/tikv/test_probe.go b/tikv/test_probe.go index 3e495d7c..c4a662fa 100644 --- a/tikv/test_probe.go +++ b/tikv/test_probe.go @@ -575,33 +575,3 @@ func (c ConfigProbe) StorePreSplitSizeThreshold(v uint32) { func (c ConfigProbe) SetOracleUpdateInterval(v int) { oracleUpdateInterval = v } - -// GetRawBatchPutSize returns the raw batch put size config. -func (c ConfigProbe) GetRawBatchPutSize() int { - return rawBatchPutSize -} - -// RawKVClientProbe wraps RawKVClient and exposes internal states for testing purpose. -type RawKVClientProbe struct { - *RawKVClient -} - -// GetRegionCache returns the internal region cache container. -func (c RawKVClientProbe) GetRegionCache() *locate.RegionCache { - return c.regionCache -} - -// SetRegionCache resets the internal region cache container. -func (c RawKVClientProbe) SetRegionCache(regionCache *locate.RegionCache) { - c.regionCache = regionCache -} - -// SetPDClient resets the interval PD client. -func (c RawKVClientProbe) SetPDClient(client pd.Client) { - c.pdClient = client -} - -// SetRPCClient resets the internal RPC client. -func (c RawKVClientProbe) SetRPCClient(client Client) { - c.rpcClient = client -}