mirror of https://github.com/tikv/client-go.git
rawkv: move rawkv client to rawkv (#227)
Signed-off-by: disksing <i@disksing.com> Co-authored-by: Shirly <AndreMouche@126.com>
This commit is contained in:
parent
5ef645fed2
commit
0fdc8e3d6f
|
|
@ -40,6 +40,7 @@ import (
|
|||
|
||||
"github.com/pingcap/tidb/store/mockstore/unistore"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"github.com/tikv/client-go/v2/rawkv"
|
||||
"github.com/tikv/client-go/v2/testutils"
|
||||
"github.com/tikv/client-go/v2/tikv"
|
||||
)
|
||||
|
|
@ -51,7 +52,7 @@ func TestRawKV(t *testing.T) {
|
|||
type testRawKVSuite struct {
|
||||
suite.Suite
|
||||
cluster testutils.Cluster
|
||||
client tikv.RawKVClientProbe
|
||||
client rawkv.ClientProbe
|
||||
bo *tikv.Backoffer
|
||||
}
|
||||
|
||||
|
|
@ -60,7 +61,7 @@ func (s *testRawKVSuite) SetupTest() {
|
|||
s.Require().Nil(err)
|
||||
unistore.BootstrapWithSingleStore(cluster)
|
||||
s.cluster = cluster
|
||||
s.client = tikv.RawKVClientProbe{RawKVClient: &tikv.RawKVClient{}}
|
||||
s.client = rawkv.ClientProbe{Client: &rawkv.Client{}}
|
||||
s.client.SetPDClient(pdClient)
|
||||
s.client.SetRegionCache(tikv.NewRegionCache(pdClient))
|
||||
s.client.SetRPCClient(client)
|
||||
|
|
@ -72,13 +73,13 @@ func (s *testRawKVSuite) TearDownTest() {
|
|||
}
|
||||
|
||||
func (s *testRawKVSuite) mustNotExist(key []byte) {
|
||||
v, err := s.client.Get(key)
|
||||
v, err := s.client.Get(context.Background(), key)
|
||||
s.Nil(err)
|
||||
s.Nil(v)
|
||||
}
|
||||
|
||||
func (s *testRawKVSuite) mustBatchNotExist(keys [][]byte) {
|
||||
values, err := s.client.BatchGet(keys)
|
||||
values, err := s.client.BatchGet(context.Background(), keys)
|
||||
s.Nil(err)
|
||||
s.NotNil(values)
|
||||
s.Equal(len(keys), len(values))
|
||||
|
|
@ -88,14 +89,14 @@ func (s *testRawKVSuite) mustBatchNotExist(keys [][]byte) {
|
|||
}
|
||||
|
||||
func (s *testRawKVSuite) mustGet(key, value []byte) {
|
||||
v, err := s.client.Get(key)
|
||||
v, err := s.client.Get(context.Background(), key)
|
||||
s.Nil(err)
|
||||
s.NotNil(v)
|
||||
s.Equal(v, value)
|
||||
}
|
||||
|
||||
func (s *testRawKVSuite) mustBatchGet(keys, values [][]byte) {
|
||||
checkValues, err := s.client.BatchGet(keys)
|
||||
checkValues, err := s.client.BatchGet(context.Background(), keys)
|
||||
s.Nil(err)
|
||||
s.NotNil(checkValues)
|
||||
s.Equal(len(keys), len(checkValues))
|
||||
|
|
@ -105,27 +106,27 @@ func (s *testRawKVSuite) mustBatchGet(keys, values [][]byte) {
|
|||
}
|
||||
|
||||
func (s *testRawKVSuite) mustPut(key, value []byte) {
|
||||
err := s.client.Put(key, value)
|
||||
err := s.client.Put(context.Background(), key, value)
|
||||
s.Nil(err)
|
||||
}
|
||||
|
||||
func (s *testRawKVSuite) mustBatchPut(keys, values [][]byte) {
|
||||
err := s.client.BatchPut(keys, values)
|
||||
err := s.client.BatchPut(context.Background(), keys, values)
|
||||
s.Nil(err)
|
||||
}
|
||||
|
||||
func (s *testRawKVSuite) mustDelete(key []byte) {
|
||||
err := s.client.Delete(key)
|
||||
err := s.client.Delete(context.Background(), key)
|
||||
s.Nil(err)
|
||||
}
|
||||
|
||||
func (s *testRawKVSuite) mustBatchDelete(keys [][]byte) {
|
||||
err := s.client.BatchDelete(keys)
|
||||
err := s.client.BatchDelete(context.Background(), keys)
|
||||
s.Nil(err)
|
||||
}
|
||||
|
||||
func (s *testRawKVSuite) mustScan(startKey string, limit int, expect ...string) {
|
||||
keys, values, err := s.client.Scan([]byte(startKey), nil, limit)
|
||||
keys, values, err := s.client.Scan(context.Background(), []byte(startKey), nil, limit)
|
||||
s.Nil(err)
|
||||
s.Equal(len(keys)*2, len(expect))
|
||||
for i := range keys {
|
||||
|
|
@ -135,7 +136,7 @@ func (s *testRawKVSuite) mustScan(startKey string, limit int, expect ...string)
|
|||
}
|
||||
|
||||
func (s *testRawKVSuite) mustScanRange(startKey string, endKey string, limit int, expect ...string) {
|
||||
keys, values, err := s.client.Scan([]byte(startKey), []byte(endKey), limit)
|
||||
keys, values, err := s.client.Scan(context.Background(), []byte(startKey), []byte(endKey), limit)
|
||||
s.Nil(err)
|
||||
s.Equal(len(keys)*2, len(expect))
|
||||
for i := range keys {
|
||||
|
|
@ -145,7 +146,7 @@ func (s *testRawKVSuite) mustScanRange(startKey string, endKey string, limit int
|
|||
}
|
||||
|
||||
func (s *testRawKVSuite) mustReverseScan(startKey []byte, limit int, expect ...string) {
|
||||
keys, values, err := s.client.ReverseScan(startKey, nil, limit)
|
||||
keys, values, err := s.client.ReverseScan(context.Background(), startKey, nil, limit)
|
||||
s.Nil(err)
|
||||
s.Equal(len(keys)*2, len(expect))
|
||||
for i := range keys {
|
||||
|
|
@ -155,7 +156,7 @@ func (s *testRawKVSuite) mustReverseScan(startKey []byte, limit int, expect ...s
|
|||
}
|
||||
|
||||
func (s *testRawKVSuite) mustReverseScanRange(startKey, endKey []byte, limit int, expect ...string) {
|
||||
keys, values, err := s.client.ReverseScan(startKey, endKey, limit)
|
||||
keys, values, err := s.client.ReverseScan(context.Background(), startKey, endKey, limit)
|
||||
s.Nil(err)
|
||||
s.Equal(len(keys)*2, len(expect))
|
||||
for i := range keys {
|
||||
|
|
@ -165,7 +166,7 @@ func (s *testRawKVSuite) mustReverseScanRange(startKey, endKey []byte, limit int
|
|||
}
|
||||
|
||||
func (s *testRawKVSuite) mustDeleteRange(startKey, endKey []byte, expected map[string]string) {
|
||||
err := s.client.DeleteRange(startKey, endKey)
|
||||
err := s.client.DeleteRange(context.Background(), startKey, endKey)
|
||||
s.Nil(err)
|
||||
|
||||
for keyStr := range expected {
|
||||
|
|
@ -179,7 +180,7 @@ func (s *testRawKVSuite) mustDeleteRange(startKey, endKey []byte, expected map[s
|
|||
}
|
||||
|
||||
func (s *testRawKVSuite) checkData(expected map[string]string) {
|
||||
keys, values, err := s.client.Scan([]byte(""), nil, len(expected)+1)
|
||||
keys, values, err := s.client.Scan(context.Background(), []byte(""), nil, len(expected)+1)
|
||||
s.Nil(err)
|
||||
|
||||
s.Equal(len(expected), len(keys))
|
||||
|
|
@ -205,7 +206,7 @@ func (s *testRawKVSuite) TestSimple() {
|
|||
s.mustGet([]byte("key"), []byte("value"))
|
||||
s.mustDelete([]byte("key"))
|
||||
s.mustNotExist([]byte("key"))
|
||||
err := s.client.Put([]byte("key"), []byte(""))
|
||||
err := s.client.Put(context.Background(), []byte("key"), []byte(""))
|
||||
s.NotNil(err)
|
||||
}
|
||||
|
||||
|
|
@ -214,7 +215,7 @@ func (s *testRawKVSuite) TestRawBatch() {
|
|||
size := 0
|
||||
var testKeys [][]byte
|
||||
var testValues [][]byte
|
||||
for i := 0; size/(tikv.ConfigProbe{}.GetRawBatchPutSize()) < 4; i++ {
|
||||
for i := 0; size/(rawkv.ConfigProbe{}.GetRawBatchPutSize()) < 4; i++ {
|
||||
key := fmt.Sprint("key", i)
|
||||
size += len(key)
|
||||
testKeys = append(testKeys, []byte(key))
|
||||
|
|
|
|||
546
rawkv/rawkv.go
546
rawkv/rawkv.go
|
|
@ -33,78 +33,262 @@
|
|||
package rawkv
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/tikv/client-go/v2/config"
|
||||
"github.com/tikv/client-go/v2/tikv"
|
||||
tikverr "github.com/tikv/client-go/v2/error"
|
||||
"github.com/tikv/client-go/v2/internal/client"
|
||||
"github.com/tikv/client-go/v2/internal/kvrpc"
|
||||
"github.com/tikv/client-go/v2/internal/locate"
|
||||
"github.com/tikv/client-go/v2/internal/retry"
|
||||
"github.com/tikv/client-go/v2/metrics"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
pd "github.com/tikv/pd/client"
|
||||
)
|
||||
|
||||
var (
|
||||
// MaxRawKVScanLimit is the maximum scan limit for rawkv Scan.
|
||||
MaxRawKVScanLimit = 10240
|
||||
// ErrMaxScanLimitExceeded is returned when the limit for rawkv Scan is to large.
|
||||
ErrMaxScanLimitExceeded = errors.New("limit should be less than MaxRawKVScanLimit")
|
||||
)
|
||||
|
||||
const (
|
||||
// rawBatchPutSize is the maximum size limit for rawkv each batch put request.
|
||||
rawBatchPutSize = 16 * 1024
|
||||
// rawBatchPairCount is the maximum limit for rawkv each batch get/delete request.
|
||||
rawBatchPairCount = 512
|
||||
)
|
||||
|
||||
// Client is a client of TiKV server which is used as a key-value storage,
|
||||
// only GET/PUT/DELETE commands are supported.
|
||||
type Client struct {
|
||||
client *tikv.RawKVClient
|
||||
clusterID uint64
|
||||
regionCache *locate.RegionCache
|
||||
pdClient pd.Client
|
||||
rpcClient client.Client
|
||||
}
|
||||
|
||||
// NewClient creates a client with PD cluster addrs.
|
||||
func NewClient(ctx context.Context, pdAddrs []string, security config.Security, opts ...pd.ClientOption) (*Client, error) {
|
||||
client, err := tikv.NewRawKVClient(pdAddrs, security, opts...)
|
||||
pdCli, err := pd.NewClient(pdAddrs, pd.SecurityOption{
|
||||
CAPath: security.ClusterSSLCA,
|
||||
CertPath: security.ClusterSSLCert,
|
||||
KeyPath: security.ClusterSSLKey,
|
||||
}, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return &Client{client: client}, nil
|
||||
return &Client{
|
||||
clusterID: pdCli.GetClusterID(ctx),
|
||||
regionCache: locate.NewRegionCache(pdCli),
|
||||
pdClient: pdCli,
|
||||
rpcClient: client.NewRPCClient(security),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Close closes the client.
|
||||
func (c *Client) Close() error {
|
||||
return c.client.Close()
|
||||
if c.pdClient != nil {
|
||||
c.pdClient.Close()
|
||||
}
|
||||
if c.regionCache != nil {
|
||||
c.regionCache.Close()
|
||||
}
|
||||
if c.rpcClient == nil {
|
||||
return nil
|
||||
}
|
||||
return c.rpcClient.Close()
|
||||
}
|
||||
|
||||
// ClusterID returns the TiKV cluster ID.
|
||||
func (c *Client) ClusterID() uint64 {
|
||||
return c.client.ClusterID()
|
||||
return c.clusterID
|
||||
}
|
||||
|
||||
// Get queries value with the key. When the key does not exist, it returns `nil, nil`.
|
||||
// TODO: use ctx after moving all rawkv code out.
|
||||
func (c *Client) Get(ctx context.Context, key []byte) ([]byte, error) {
|
||||
return c.client.Get(key)
|
||||
start := time.Now()
|
||||
defer func() { metrics.RawkvCmdHistogramWithGet.Observe(time.Since(start).Seconds()) }()
|
||||
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdRawGet, &kvrpcpb.RawGetRequest{Key: key})
|
||||
resp, _, err := c.sendReq(ctx, key, req, false)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
if resp.Resp == nil {
|
||||
return nil, errors.Trace(tikverr.ErrBodyMissing)
|
||||
}
|
||||
cmdResp := resp.Resp.(*kvrpcpb.RawGetResponse)
|
||||
if cmdResp.GetError() != "" {
|
||||
return nil, errors.New(cmdResp.GetError())
|
||||
}
|
||||
if len(cmdResp.Value) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
return cmdResp.Value, nil
|
||||
}
|
||||
|
||||
const rawkvMaxBackoff = 20000
|
||||
|
||||
// BatchGet queries values with the keys.
|
||||
// TODO: use ctx after moving all rawkv code out.
|
||||
func (c *Client) BatchGet(ctx context.Context, keys [][]byte) ([][]byte, error) {
|
||||
return c.client.BatchGet(keys)
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
metrics.RawkvCmdHistogramWithBatchGet.Observe(time.Since(start).Seconds())
|
||||
}()
|
||||
|
||||
bo := retry.NewBackofferWithVars(ctx, rawkvMaxBackoff, nil)
|
||||
resp, err := c.sendBatchReq(bo, keys, tikvrpc.CmdRawBatchGet)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
if resp.Resp == nil {
|
||||
return nil, errors.Trace(tikverr.ErrBodyMissing)
|
||||
}
|
||||
cmdResp := resp.Resp.(*kvrpcpb.RawBatchGetResponse)
|
||||
|
||||
keyToValue := make(map[string][]byte, len(keys))
|
||||
for _, pair := range cmdResp.Pairs {
|
||||
keyToValue[string(pair.Key)] = pair.Value
|
||||
}
|
||||
|
||||
values := make([][]byte, len(keys))
|
||||
for i, key := range keys {
|
||||
values[i] = keyToValue[string(key)]
|
||||
}
|
||||
return values, nil
|
||||
}
|
||||
|
||||
// Put stores a key-value pair to TiKV.
|
||||
// TODO: use ctx after moving all rawkv code out.
|
||||
func (c *Client) Put(ctx context.Context, key, value []byte) error {
|
||||
return c.client.Put(key, value)
|
||||
start := time.Now()
|
||||
defer func() { metrics.RawkvCmdHistogramWithBatchPut.Observe(time.Since(start).Seconds()) }()
|
||||
metrics.RawkvSizeHistogramWithKey.Observe(float64(len(key)))
|
||||
metrics.RawkvSizeHistogramWithValue.Observe(float64(len(value)))
|
||||
|
||||
if len(value) == 0 {
|
||||
return errors.New("empty value is not supported")
|
||||
}
|
||||
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
|
||||
Key: key,
|
||||
Value: value,
|
||||
})
|
||||
resp, _, err := c.sendReq(ctx, key, req, false)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if resp.Resp == nil {
|
||||
return errors.Trace(tikverr.ErrBodyMissing)
|
||||
}
|
||||
cmdResp := resp.Resp.(*kvrpcpb.RawPutResponse)
|
||||
if cmdResp.GetError() != "" {
|
||||
return errors.New(cmdResp.GetError())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// BatchPut stores key-value pairs to TiKV.
|
||||
// TODO: use ctx after moving all rawkv code out.
|
||||
func (c *Client) BatchPut(ctx context.Context, keys, values [][]byte) error {
|
||||
return c.client.BatchPut(keys, values)
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
metrics.RawkvCmdHistogramWithBatchPut.Observe(time.Since(start).Seconds())
|
||||
}()
|
||||
|
||||
if len(keys) != len(values) {
|
||||
return errors.New("the len of keys is not equal to the len of values")
|
||||
}
|
||||
for _, value := range values {
|
||||
if len(value) == 0 {
|
||||
return errors.New("empty value is not supported")
|
||||
}
|
||||
}
|
||||
bo := retry.NewBackofferWithVars(ctx, rawkvMaxBackoff, nil)
|
||||
err := c.sendBatchPut(bo, keys, values)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// Delete deletes a key-value pair from TiKV.
|
||||
// TODO: use ctx after moving all rawkv code out.
|
||||
func (c *Client) Delete(ctx context.Context, key []byte) error {
|
||||
return c.client.Delete(key)
|
||||
start := time.Now()
|
||||
defer func() { metrics.RawkvCmdHistogramWithDelete.Observe(time.Since(start).Seconds()) }()
|
||||
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdRawDelete, &kvrpcpb.RawDeleteRequest{
|
||||
Key: key,
|
||||
})
|
||||
resp, _, err := c.sendReq(ctx, key, req, false)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if resp.Resp == nil {
|
||||
return errors.Trace(tikverr.ErrBodyMissing)
|
||||
}
|
||||
cmdResp := resp.Resp.(*kvrpcpb.RawDeleteResponse)
|
||||
if cmdResp.GetError() != "" {
|
||||
return errors.New(cmdResp.GetError())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// BatchDelete deletes key-value pairs from TiKV.
|
||||
// TODO: use ctx after moving all rawkv code out.
|
||||
func (c *Client) BatchDelete(ctx context.Context, keys [][]byte) error {
|
||||
return c.client.BatchDelete(keys)
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
metrics.RawkvCmdHistogramWithBatchDelete.Observe(time.Since(start).Seconds())
|
||||
}()
|
||||
|
||||
bo := retry.NewBackofferWithVars(ctx, rawkvMaxBackoff, nil)
|
||||
resp, err := c.sendBatchReq(bo, keys, tikvrpc.CmdRawBatchDelete)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if resp.Resp == nil {
|
||||
return errors.Trace(tikverr.ErrBodyMissing)
|
||||
}
|
||||
cmdResp := resp.Resp.(*kvrpcpb.RawBatchDeleteResponse)
|
||||
if cmdResp.GetError() != "" {
|
||||
return errors.New(cmdResp.GetError())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteRange deletes all key-value pairs in a range from TiKV.
|
||||
// TODO: use ctx after moving all rawkv code out.
|
||||
func (c *Client) DeleteRange(ctx context.Context, startKey []byte, endKey []byte) error {
|
||||
return c.client.DeleteRange(startKey, endKey)
|
||||
start := time.Now()
|
||||
var err error
|
||||
defer func() {
|
||||
var label = "delete_range"
|
||||
if err != nil {
|
||||
label += "_error"
|
||||
}
|
||||
metrics.TiKVRawkvCmdHistogram.WithLabelValues(label).Observe(time.Since(start).Seconds())
|
||||
}()
|
||||
|
||||
// Process each affected region respectively
|
||||
for !bytes.Equal(startKey, endKey) {
|
||||
var resp *tikvrpc.Response
|
||||
var actualEndKey []byte
|
||||
resp, actualEndKey, err = c.sendDeleteRangeReq(ctx, startKey, endKey)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if resp.Resp == nil {
|
||||
return errors.Trace(tikverr.ErrBodyMissing)
|
||||
}
|
||||
cmdResp := resp.Resp.(*kvrpcpb.RawDeleteRangeResponse)
|
||||
if cmdResp.GetError() != "" {
|
||||
return errors.New(cmdResp.GetError())
|
||||
}
|
||||
startKey = actualEndKey
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Scan queries continuous kv pairs in range [startKey, endKey), up to limit pairs.
|
||||
|
|
@ -112,9 +296,38 @@ func (c *Client) DeleteRange(ctx context.Context, startKey []byte, endKey []byte
|
|||
// If you want to exclude the startKey or include the endKey, push a '\0' to the key. For example, to scan
|
||||
// (startKey, endKey], you can write:
|
||||
// `Scan(ctx, push(startKey, '\0'), push(endKey, '\0'), limit)`.
|
||||
// TODO: use ctx after moving all rawkv code out.
|
||||
func (c *Client) Scan(ctx context.Context, startKey, endKey []byte, limit int) (keys [][]byte, values [][]byte, err error) {
|
||||
return c.client.Scan(startKey, endKey, limit)
|
||||
start := time.Now()
|
||||
defer func() { metrics.RawkvCmdHistogramWithRawScan.Observe(time.Since(start).Seconds()) }()
|
||||
|
||||
if limit > MaxRawKVScanLimit {
|
||||
return nil, nil, errors.Trace(ErrMaxScanLimitExceeded)
|
||||
}
|
||||
|
||||
for len(keys) < limit && (len(endKey) == 0 || bytes.Compare(startKey, endKey) < 0) {
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdRawScan, &kvrpcpb.RawScanRequest{
|
||||
StartKey: startKey,
|
||||
EndKey: endKey,
|
||||
Limit: uint32(limit - len(keys)),
|
||||
})
|
||||
resp, loc, err := c.sendReq(ctx, startKey, req, false)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
if resp.Resp == nil {
|
||||
return nil, nil, errors.Trace(tikverr.ErrBodyMissing)
|
||||
}
|
||||
cmdResp := resp.Resp.(*kvrpcpb.RawScanResponse)
|
||||
for _, pair := range cmdResp.Kvs {
|
||||
keys = append(keys, pair.Key)
|
||||
values = append(values, pair.Value)
|
||||
}
|
||||
startKey = loc.EndKey
|
||||
if len(startKey) == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ReverseScan queries continuous kv pairs in range [endKey, startKey), up to limit pairs.
|
||||
|
|
@ -124,7 +337,290 @@ func (c *Client) Scan(ctx context.Context, startKey, endKey []byte, limit int) (
|
|||
// (endKey, startKey], you can write:
|
||||
// `ReverseScan(ctx, push(startKey, '\0'), push(endKey, '\0'), limit)`.
|
||||
// It doesn't support Scanning from "", because locating the last Region is not yet implemented.
|
||||
// TODO: use ctx after moving all rawkv code out.
|
||||
func (c *Client) ReverseScan(ctx context.Context, startKey, endKey []byte, limit int) (keys [][]byte, values [][]byte, err error) {
|
||||
return c.client.ReverseScan(startKey, endKey, limit)
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
metrics.RawkvCmdHistogramWithRawReversScan.Observe(time.Since(start).Seconds())
|
||||
}()
|
||||
|
||||
if limit > MaxRawKVScanLimit {
|
||||
return nil, nil, errors.Trace(ErrMaxScanLimitExceeded)
|
||||
}
|
||||
|
||||
for len(keys) < limit && bytes.Compare(startKey, endKey) > 0 {
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdRawScan, &kvrpcpb.RawScanRequest{
|
||||
StartKey: startKey,
|
||||
EndKey: endKey,
|
||||
Limit: uint32(limit - len(keys)),
|
||||
Reverse: true,
|
||||
})
|
||||
resp, loc, err := c.sendReq(ctx, startKey, req, true)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
if resp.Resp == nil {
|
||||
return nil, nil, errors.Trace(tikverr.ErrBodyMissing)
|
||||
}
|
||||
cmdResp := resp.Resp.(*kvrpcpb.RawScanResponse)
|
||||
for _, pair := range cmdResp.Kvs {
|
||||
keys = append(keys, pair.Key)
|
||||
values = append(values, pair.Value)
|
||||
}
|
||||
startKey = loc.StartKey
|
||||
if len(startKey) == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Client) sendReq(ctx context.Context, key []byte, req *tikvrpc.Request, reverse bool) (*tikvrpc.Response, *locate.KeyLocation, error) {
|
||||
bo := retry.NewBackofferWithVars(ctx, rawkvMaxBackoff, nil)
|
||||
sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient)
|
||||
for {
|
||||
var loc *locate.KeyLocation
|
||||
var err error
|
||||
if reverse {
|
||||
loc, err = c.regionCache.LocateEndKey(bo, key)
|
||||
} else {
|
||||
loc, err = c.regionCache.LocateKey(bo, key)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
resp, err := sender.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
if regionErr != nil {
|
||||
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
return resp, loc, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) sendBatchReq(bo *retry.Backoffer, keys [][]byte, cmdType tikvrpc.CmdType) (*tikvrpc.Response, error) { // split the keys
|
||||
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
var batches []kvrpc.Batch
|
||||
for regionID, groupKeys := range groups {
|
||||
batches = kvrpc.AppendKeyBatches(batches, regionID, groupKeys, rawBatchPairCount)
|
||||
}
|
||||
bo, cancel := bo.Fork()
|
||||
ches := make(chan kvrpc.BatchResult, len(batches))
|
||||
for _, batch := range batches {
|
||||
batch1 := batch
|
||||
go func() {
|
||||
singleBatchBackoffer, singleBatchCancel := bo.Fork()
|
||||
defer singleBatchCancel()
|
||||
ches <- c.doBatchReq(singleBatchBackoffer, batch1, cmdType)
|
||||
}()
|
||||
}
|
||||
|
||||
var firstError error
|
||||
var resp *tikvrpc.Response
|
||||
switch cmdType {
|
||||
case tikvrpc.CmdRawBatchGet:
|
||||
resp = &tikvrpc.Response{Resp: &kvrpcpb.RawBatchGetResponse{}}
|
||||
case tikvrpc.CmdRawBatchDelete:
|
||||
resp = &tikvrpc.Response{Resp: &kvrpcpb.RawBatchDeleteResponse{}}
|
||||
}
|
||||
for i := 0; i < len(batches); i++ {
|
||||
singleResp, ok := <-ches
|
||||
if ok {
|
||||
if singleResp.Error != nil {
|
||||
cancel()
|
||||
if firstError == nil {
|
||||
firstError = singleResp.Error
|
||||
}
|
||||
} else if cmdType == tikvrpc.CmdRawBatchGet {
|
||||
cmdResp := singleResp.Resp.(*kvrpcpb.RawBatchGetResponse)
|
||||
resp.Resp.(*kvrpcpb.RawBatchGetResponse).Pairs = append(resp.Resp.(*kvrpcpb.RawBatchGetResponse).Pairs, cmdResp.Pairs...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return resp, firstError
|
||||
}
|
||||
|
||||
func (c *Client) doBatchReq(bo *retry.Backoffer, batch kvrpc.Batch, cmdType tikvrpc.CmdType) kvrpc.BatchResult {
|
||||
var req *tikvrpc.Request
|
||||
switch cmdType {
|
||||
case tikvrpc.CmdRawBatchGet:
|
||||
req = tikvrpc.NewRequest(cmdType, &kvrpcpb.RawBatchGetRequest{
|
||||
Keys: batch.Keys,
|
||||
})
|
||||
case tikvrpc.CmdRawBatchDelete:
|
||||
req = tikvrpc.NewRequest(cmdType, &kvrpcpb.RawBatchDeleteRequest{
|
||||
Keys: batch.Keys,
|
||||
})
|
||||
}
|
||||
|
||||
sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient)
|
||||
resp, err := sender.SendReq(bo, req, batch.RegionID, client.ReadTimeoutShort)
|
||||
|
||||
batchResp := kvrpc.BatchResult{}
|
||||
if err != nil {
|
||||
batchResp.Error = errors.Trace(err)
|
||||
return batchResp
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
batchResp.Error = errors.Trace(err)
|
||||
return batchResp
|
||||
}
|
||||
if regionErr != nil {
|
||||
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
batchResp.Error = errors.Trace(err)
|
||||
return batchResp
|
||||
}
|
||||
resp, err = c.sendBatchReq(bo, batch.Keys, cmdType)
|
||||
batchResp.Response = resp
|
||||
batchResp.Error = err
|
||||
return batchResp
|
||||
}
|
||||
|
||||
switch cmdType {
|
||||
case tikvrpc.CmdRawBatchGet:
|
||||
batchResp.Response = resp
|
||||
case tikvrpc.CmdRawBatchDelete:
|
||||
if resp.Resp == nil {
|
||||
batchResp.Error = errors.Trace(tikverr.ErrBodyMissing)
|
||||
return batchResp
|
||||
}
|
||||
cmdResp := resp.Resp.(*kvrpcpb.RawBatchDeleteResponse)
|
||||
if cmdResp.GetError() != "" {
|
||||
batchResp.Error = errors.New(cmdResp.GetError())
|
||||
return batchResp
|
||||
}
|
||||
batchResp.Response = resp
|
||||
}
|
||||
return batchResp
|
||||
}
|
||||
|
||||
// sendDeleteRangeReq sends a raw delete range request and returns the response and the actual endKey.
|
||||
// If the given range spans over more than one regions, the actual endKey is the end of the first region.
|
||||
// We can't use sendReq directly, because we need to know the end of the region before we send the request
|
||||
// TODO: Is there any better way to avoid duplicating code with func `sendReq` ?
|
||||
func (c *Client) sendDeleteRangeReq(ctx context.Context, startKey []byte, endKey []byte) (*tikvrpc.Response, []byte, error) {
|
||||
bo := retry.NewBackofferWithVars(ctx, rawkvMaxBackoff, nil)
|
||||
sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient)
|
||||
for {
|
||||
loc, err := c.regionCache.LocateKey(bo, startKey)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
actualEndKey := endKey
|
||||
if len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, endKey) < 0 {
|
||||
actualEndKey = loc.EndKey
|
||||
}
|
||||
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdRawDeleteRange, &kvrpcpb.RawDeleteRangeRequest{
|
||||
StartKey: startKey,
|
||||
EndKey: actualEndKey,
|
||||
})
|
||||
|
||||
resp, err := sender.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
if regionErr != nil {
|
||||
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
return resp, actualEndKey, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte) error {
|
||||
keyToValue := make(map[string][]byte, len(keys))
|
||||
for i, key := range keys {
|
||||
keyToValue[string(key)] = values[i]
|
||||
}
|
||||
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
var batches []kvrpc.Batch
|
||||
// split the keys by size and RegionVerID
|
||||
for regionID, groupKeys := range groups {
|
||||
batches = kvrpc.AppendBatches(batches, regionID, groupKeys, keyToValue, rawBatchPutSize)
|
||||
}
|
||||
bo, cancel := bo.Fork()
|
||||
ch := make(chan error, len(batches))
|
||||
for _, batch := range batches {
|
||||
batch1 := batch
|
||||
go func() {
|
||||
singleBatchBackoffer, singleBatchCancel := bo.Fork()
|
||||
defer singleBatchCancel()
|
||||
ch <- c.doBatchPut(singleBatchBackoffer, batch1)
|
||||
}()
|
||||
}
|
||||
|
||||
for i := 0; i < len(batches); i++ {
|
||||
if e := <-ch; e != nil {
|
||||
cancel()
|
||||
// catch the first error
|
||||
if err == nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
}
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
func (c *Client) doBatchPut(bo *retry.Backoffer, batch kvrpc.Batch) error {
|
||||
kvPair := make([]*kvrpcpb.KvPair, 0, len(batch.Keys))
|
||||
for i, key := range batch.Keys {
|
||||
kvPair = append(kvPair, &kvrpcpb.KvPair{Key: key, Value: batch.Values[i]})
|
||||
}
|
||||
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdRawBatchPut, &kvrpcpb.RawBatchPutRequest{Pairs: kvPair})
|
||||
|
||||
sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient)
|
||||
resp, err := sender.SendReq(bo, req, batch.RegionID, client.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if regionErr != nil {
|
||||
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
// recursive call
|
||||
return c.sendBatchPut(bo, batch.Keys, batch.Values)
|
||||
}
|
||||
|
||||
if resp.Resp == nil {
|
||||
return errors.Trace(tikverr.ErrBodyMissing)
|
||||
}
|
||||
cmdResp := resp.Resp.(*kvrpcpb.RawBatchPutResponse)
|
||||
if cmdResp.GetError() != "" {
|
||||
return errors.New(cmdResp.GetError())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tikv
|
||||
package rawkv
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
|
@ -38,6 +38,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
"github.com/tikv/client-go/v2/internal/locate"
|
||||
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
|
||||
"github.com/tikv/client-go/v2/internal/retry"
|
||||
"github.com/tikv/client-go/v2/kv"
|
||||
|
|
@ -83,15 +84,15 @@ func (s *testRawkvSuite) TestReplaceAddrWithNewStore() {
|
|||
mvccStore := mocktikv.MustNewMVCCStore()
|
||||
defer mvccStore.Close()
|
||||
|
||||
client := &RawKVClient{
|
||||
client := &Client{
|
||||
clusterID: 0,
|
||||
regionCache: NewRegionCache(mocktikv.NewPDClient(s.cluster)),
|
||||
regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)),
|
||||
rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil),
|
||||
}
|
||||
defer client.Close()
|
||||
testKey := []byte("test_key")
|
||||
testValue := []byte("test_value")
|
||||
err := client.Put(testKey, testValue)
|
||||
err := client.Put(context.Background(), testKey, testValue)
|
||||
s.Nil(err)
|
||||
|
||||
// make store2 using store1's addr and store1 offline
|
||||
|
|
@ -102,7 +103,7 @@ func (s *testRawkvSuite) TestReplaceAddrWithNewStore() {
|
|||
s.cluster.ChangeLeader(s.region1, s.peer2)
|
||||
s.cluster.RemovePeer(s.region1, s.peer1)
|
||||
|
||||
getVal, err := client.Get(testKey)
|
||||
getVal, err := client.Get(context.Background(), testKey)
|
||||
|
||||
s.Nil(err)
|
||||
s.Equal(getVal, testValue)
|
||||
|
|
@ -112,22 +113,22 @@ func (s *testRawkvSuite) TestUpdateStoreAddr() {
|
|||
mvccStore := mocktikv.MustNewMVCCStore()
|
||||
defer mvccStore.Close()
|
||||
|
||||
client := &RawKVClient{
|
||||
client := &Client{
|
||||
clusterID: 0,
|
||||
regionCache: NewRegionCache(mocktikv.NewPDClient(s.cluster)),
|
||||
regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)),
|
||||
rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil),
|
||||
}
|
||||
defer client.Close()
|
||||
testKey := []byte("test_key")
|
||||
testValue := []byte("test_value")
|
||||
err := client.Put(testKey, testValue)
|
||||
err := client.Put(context.Background(), testKey, testValue)
|
||||
s.Nil(err)
|
||||
// tikv-server reports `StoreNotMatch` And retry
|
||||
store1Addr := s.storeAddr(s.store1)
|
||||
s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2))
|
||||
s.cluster.UpdateStoreAddr(s.store2, store1Addr)
|
||||
|
||||
getVal, err := client.Get(testKey)
|
||||
getVal, err := client.Get(context.Background(), testKey)
|
||||
|
||||
s.Nil(err)
|
||||
s.Equal(getVal, testValue)
|
||||
|
|
@ -137,15 +138,15 @@ func (s *testRawkvSuite) TestReplaceNewAddrAndOldOfflineImmediately() {
|
|||
mvccStore := mocktikv.MustNewMVCCStore()
|
||||
defer mvccStore.Close()
|
||||
|
||||
client := &RawKVClient{
|
||||
client := &Client{
|
||||
clusterID: 0,
|
||||
regionCache: NewRegionCache(mocktikv.NewPDClient(s.cluster)),
|
||||
regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)),
|
||||
rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil),
|
||||
}
|
||||
defer client.Close()
|
||||
testKey := []byte("test_key")
|
||||
testValue := []byte("test_value")
|
||||
err := client.Put(testKey, testValue)
|
||||
err := client.Put(context.Background(), testKey, testValue)
|
||||
s.Nil(err)
|
||||
|
||||
// pre-load store2's address into cache via follower-read.
|
||||
|
|
@ -164,7 +165,7 @@ func (s *testRawkvSuite) TestReplaceNewAddrAndOldOfflineImmediately() {
|
|||
s.cluster.ChangeLeader(s.region1, s.peer2)
|
||||
s.cluster.RemovePeer(s.region1, s.peer1)
|
||||
|
||||
getVal, err := client.Get(testKey)
|
||||
getVal, err := client.Get(context.Background(), testKey)
|
||||
s.Nil(err)
|
||||
s.Equal(getVal, testValue)
|
||||
}
|
||||
|
|
@ -173,15 +174,15 @@ func (s *testRawkvSuite) TestReplaceStore() {
|
|||
mvccStore := mocktikv.MustNewMVCCStore()
|
||||
defer mvccStore.Close()
|
||||
|
||||
client := &RawKVClient{
|
||||
client := &Client{
|
||||
clusterID: 0,
|
||||
regionCache: NewRegionCache(mocktikv.NewPDClient(s.cluster)),
|
||||
regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)),
|
||||
rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil),
|
||||
}
|
||||
defer client.Close()
|
||||
testKey := []byte("test_key")
|
||||
testValue := []byte("test_value")
|
||||
err := client.Put(testKey, testValue)
|
||||
err := client.Put(context.Background(), testKey, testValue)
|
||||
s.Nil(err)
|
||||
|
||||
s.cluster.MarkTombstone(s.store1)
|
||||
|
|
@ -192,6 +193,6 @@ func (s *testRawkvSuite) TestReplaceStore() {
|
|||
s.cluster.RemovePeer(s.region1, s.peer1)
|
||||
s.cluster.ChangeLeader(s.region1, peer3)
|
||||
|
||||
err = client.Put(testKey, testValue)
|
||||
err = client.Put(context.Background(), testKey, testValue)
|
||||
s.Nil(err)
|
||||
}
|
||||
|
|
@ -0,0 +1,53 @@
|
|||
// Copyright 2021 TiKV Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package rawkv
|
||||
|
||||
import (
|
||||
"github.com/tikv/client-go/v2/internal/client"
|
||||
"github.com/tikv/client-go/v2/internal/locate"
|
||||
pd "github.com/tikv/pd/client"
|
||||
)
|
||||
|
||||
// ClientProbe wraps RawKVClient and exposes internal states for testing purpose.
|
||||
type ClientProbe struct {
|
||||
*Client
|
||||
}
|
||||
|
||||
// GetRegionCache returns the internal region cache container.
|
||||
func (c ClientProbe) GetRegionCache() *locate.RegionCache {
|
||||
return c.regionCache
|
||||
}
|
||||
|
||||
// SetRegionCache resets the internal region cache container.
|
||||
func (c ClientProbe) SetRegionCache(regionCache *locate.RegionCache) {
|
||||
c.regionCache = regionCache
|
||||
}
|
||||
|
||||
// SetPDClient resets the interval PD client.
|
||||
func (c ClientProbe) SetPDClient(client pd.Client) {
|
||||
c.pdClient = client
|
||||
}
|
||||
|
||||
// SetRPCClient resets the internal RPC client.
|
||||
func (c ClientProbe) SetRPCClient(client client.Client) {
|
||||
c.rpcClient = client
|
||||
}
|
||||
|
||||
// ConfigProbe exposes configurations and global variables for testing purpose.
|
||||
type ConfigProbe struct{}
|
||||
|
||||
// GetRawBatchPutSize returns the raw batch put size config.
|
||||
func (c ConfigProbe) GetRawBatchPutSize() int {
|
||||
return rawBatchPutSize
|
||||
}
|
||||
626
tikv/rawkv.go
626
tikv/rawkv.go
|
|
@ -1,626 +0,0 @@
|
|||
// Copyright 2021 TiKV Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// NOTE: The code in this file is based on code from the
|
||||
// TiDB project, licensed under the Apache License v 2.0
|
||||
//
|
||||
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/rawkv.go
|
||||
//
|
||||
|
||||
// Copyright 2016 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tikv
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/tikv/client-go/v2/config"
|
||||
tikverr "github.com/tikv/client-go/v2/error"
|
||||
"github.com/tikv/client-go/v2/internal/client"
|
||||
"github.com/tikv/client-go/v2/internal/kvrpc"
|
||||
"github.com/tikv/client-go/v2/internal/locate"
|
||||
"github.com/tikv/client-go/v2/internal/retry"
|
||||
"github.com/tikv/client-go/v2/metrics"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
pd "github.com/tikv/pd/client"
|
||||
)
|
||||
|
||||
var (
|
||||
// MaxRawKVScanLimit is the maximum scan limit for rawkv Scan.
|
||||
MaxRawKVScanLimit = 10240
|
||||
// ErrMaxScanLimitExceeded is returned when the limit for rawkv Scan is to large.
|
||||
ErrMaxScanLimitExceeded = errors.New("limit should be less than MaxRawKVScanLimit")
|
||||
)
|
||||
|
||||
const (
|
||||
// rawBatchPutSize is the maximum size limit for rawkv each batch put request.
|
||||
rawBatchPutSize = 16 * 1024
|
||||
// rawBatchPairCount is the maximum limit for rawkv each batch get/delete request.
|
||||
rawBatchPairCount = 512
|
||||
)
|
||||
|
||||
// RawKVClient is a client of TiKV server which is used as a key-value storage,
|
||||
// only GET/PUT/DELETE commands are supported.
|
||||
type RawKVClient struct {
|
||||
clusterID uint64
|
||||
regionCache *locate.RegionCache
|
||||
pdClient pd.Client
|
||||
rpcClient Client
|
||||
}
|
||||
|
||||
// NewRawKVClient creates a client with PD cluster addrs.
|
||||
func NewRawKVClient(pdAddrs []string, security config.Security, opts ...pd.ClientOption) (*RawKVClient, error) {
|
||||
pdCli, err := pd.NewClient(pdAddrs, pd.SecurityOption{
|
||||
CAPath: security.ClusterSSLCA,
|
||||
CertPath: security.ClusterSSLCert,
|
||||
KeyPath: security.ClusterSSLKey,
|
||||
}, opts...)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return &RawKVClient{
|
||||
clusterID: pdCli.GetClusterID(context.TODO()),
|
||||
regionCache: locate.NewRegionCache(pdCli),
|
||||
pdClient: pdCli,
|
||||
rpcClient: client.NewRPCClient(security),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Close closes the client.
|
||||
func (c *RawKVClient) Close() error {
|
||||
if c.pdClient != nil {
|
||||
c.pdClient.Close()
|
||||
}
|
||||
if c.regionCache != nil {
|
||||
c.regionCache.Close()
|
||||
}
|
||||
if c.rpcClient == nil {
|
||||
return nil
|
||||
}
|
||||
return c.rpcClient.Close()
|
||||
}
|
||||
|
||||
// ClusterID returns the TiKV cluster ID.
|
||||
func (c *RawKVClient) ClusterID() uint64 {
|
||||
return c.clusterID
|
||||
}
|
||||
|
||||
// Get queries value with the key. When the key does not exist, it returns `nil, nil`.
|
||||
func (c *RawKVClient) Get(key []byte) ([]byte, error) {
|
||||
start := time.Now()
|
||||
defer func() { metrics.RawkvCmdHistogramWithGet.Observe(time.Since(start).Seconds()) }()
|
||||
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdRawGet, &kvrpcpb.RawGetRequest{Key: key})
|
||||
resp, _, err := c.sendReq(key, req, false)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
if resp.Resp == nil {
|
||||
return nil, errors.Trace(tikverr.ErrBodyMissing)
|
||||
}
|
||||
cmdResp := resp.Resp.(*kvrpcpb.RawGetResponse)
|
||||
if cmdResp.GetError() != "" {
|
||||
return nil, errors.New(cmdResp.GetError())
|
||||
}
|
||||
if len(cmdResp.Value) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
return cmdResp.Value, nil
|
||||
}
|
||||
|
||||
const rawkvMaxBackoff = 20000
|
||||
|
||||
// BatchGet queries values with the keys.
|
||||
func (c *RawKVClient) BatchGet(keys [][]byte) ([][]byte, error) {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
metrics.RawkvCmdHistogramWithBatchGet.Observe(time.Since(start).Seconds())
|
||||
}()
|
||||
|
||||
bo := retry.NewBackofferWithVars(context.Background(), rawkvMaxBackoff, nil)
|
||||
resp, err := c.sendBatchReq(bo, keys, tikvrpc.CmdRawBatchGet)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
if resp.Resp == nil {
|
||||
return nil, errors.Trace(tikverr.ErrBodyMissing)
|
||||
}
|
||||
cmdResp := resp.Resp.(*kvrpcpb.RawBatchGetResponse)
|
||||
|
||||
keyToValue := make(map[string][]byte, len(keys))
|
||||
for _, pair := range cmdResp.Pairs {
|
||||
keyToValue[string(pair.Key)] = pair.Value
|
||||
}
|
||||
|
||||
values := make([][]byte, len(keys))
|
||||
for i, key := range keys {
|
||||
values[i] = keyToValue[string(key)]
|
||||
}
|
||||
return values, nil
|
||||
}
|
||||
|
||||
// Put stores a key-value pair to TiKV.
|
||||
func (c *RawKVClient) Put(key, value []byte) error {
|
||||
start := time.Now()
|
||||
defer func() { metrics.RawkvCmdHistogramWithBatchPut.Observe(time.Since(start).Seconds()) }()
|
||||
metrics.RawkvSizeHistogramWithKey.Observe(float64(len(key)))
|
||||
metrics.RawkvSizeHistogramWithValue.Observe(float64(len(value)))
|
||||
|
||||
if len(value) == 0 {
|
||||
return errors.New("empty value is not supported")
|
||||
}
|
||||
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
|
||||
Key: key,
|
||||
Value: value,
|
||||
})
|
||||
resp, _, err := c.sendReq(key, req, false)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if resp.Resp == nil {
|
||||
return errors.Trace(tikverr.ErrBodyMissing)
|
||||
}
|
||||
cmdResp := resp.Resp.(*kvrpcpb.RawPutResponse)
|
||||
if cmdResp.GetError() != "" {
|
||||
return errors.New(cmdResp.GetError())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// BatchPut stores key-value pairs to TiKV.
|
||||
func (c *RawKVClient) BatchPut(keys, values [][]byte) error {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
metrics.RawkvCmdHistogramWithBatchPut.Observe(time.Since(start).Seconds())
|
||||
}()
|
||||
|
||||
if len(keys) != len(values) {
|
||||
return errors.New("the len of keys is not equal to the len of values")
|
||||
}
|
||||
for _, value := range values {
|
||||
if len(value) == 0 {
|
||||
return errors.New("empty value is not supported")
|
||||
}
|
||||
}
|
||||
bo := retry.NewBackofferWithVars(context.Background(), rawkvMaxBackoff, nil)
|
||||
err := c.sendBatchPut(bo, keys, values)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// Delete deletes a key-value pair from TiKV.
|
||||
func (c *RawKVClient) Delete(key []byte) error {
|
||||
start := time.Now()
|
||||
defer func() { metrics.RawkvCmdHistogramWithDelete.Observe(time.Since(start).Seconds()) }()
|
||||
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdRawDelete, &kvrpcpb.RawDeleteRequest{
|
||||
Key: key,
|
||||
})
|
||||
resp, _, err := c.sendReq(key, req, false)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if resp.Resp == nil {
|
||||
return errors.Trace(tikverr.ErrBodyMissing)
|
||||
}
|
||||
cmdResp := resp.Resp.(*kvrpcpb.RawDeleteResponse)
|
||||
if cmdResp.GetError() != "" {
|
||||
return errors.New(cmdResp.GetError())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// BatchDelete deletes key-value pairs from TiKV
|
||||
func (c *RawKVClient) BatchDelete(keys [][]byte) error {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
metrics.RawkvCmdHistogramWithBatchDelete.Observe(time.Since(start).Seconds())
|
||||
}()
|
||||
|
||||
bo := retry.NewBackofferWithVars(context.Background(), rawkvMaxBackoff, nil)
|
||||
resp, err := c.sendBatchReq(bo, keys, tikvrpc.CmdRawBatchDelete)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if resp.Resp == nil {
|
||||
return errors.Trace(tikverr.ErrBodyMissing)
|
||||
}
|
||||
cmdResp := resp.Resp.(*kvrpcpb.RawBatchDeleteResponse)
|
||||
if cmdResp.GetError() != "" {
|
||||
return errors.New(cmdResp.GetError())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteRange deletes all key-value pairs in a range from TiKV
|
||||
func (c *RawKVClient) DeleteRange(startKey []byte, endKey []byte) error {
|
||||
start := time.Now()
|
||||
var err error
|
||||
defer func() {
|
||||
var label = "delete_range"
|
||||
if err != nil {
|
||||
label += "_error"
|
||||
}
|
||||
metrics.TiKVRawkvCmdHistogram.WithLabelValues(label).Observe(time.Since(start).Seconds())
|
||||
}()
|
||||
|
||||
// Process each affected region respectively
|
||||
for !bytes.Equal(startKey, endKey) {
|
||||
var resp *tikvrpc.Response
|
||||
var actualEndKey []byte
|
||||
resp, actualEndKey, err = c.sendDeleteRangeReq(startKey, endKey)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if resp.Resp == nil {
|
||||
return errors.Trace(tikverr.ErrBodyMissing)
|
||||
}
|
||||
cmdResp := resp.Resp.(*kvrpcpb.RawDeleteRangeResponse)
|
||||
if cmdResp.GetError() != "" {
|
||||
return errors.New(cmdResp.GetError())
|
||||
}
|
||||
startKey = actualEndKey
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Scan queries continuous kv pairs in range [startKey, endKey), up to limit pairs.
|
||||
// If endKey is empty, it means unbounded.
|
||||
// If you want to exclude the startKey or include the endKey, push a '\0' to the key. For example, to scan
|
||||
// (startKey, endKey], you can write:
|
||||
// `Scan(push(startKey, '\0'), push(endKey, '\0'), limit)`.
|
||||
func (c *RawKVClient) Scan(startKey, endKey []byte, limit int) (keys [][]byte, values [][]byte, err error) {
|
||||
start := time.Now()
|
||||
defer func() { metrics.RawkvCmdHistogramWithRawScan.Observe(time.Since(start).Seconds()) }()
|
||||
|
||||
if limit > MaxRawKVScanLimit {
|
||||
return nil, nil, errors.Trace(ErrMaxScanLimitExceeded)
|
||||
}
|
||||
|
||||
for len(keys) < limit && (len(endKey) == 0 || bytes.Compare(startKey, endKey) < 0) {
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdRawScan, &kvrpcpb.RawScanRequest{
|
||||
StartKey: startKey,
|
||||
EndKey: endKey,
|
||||
Limit: uint32(limit - len(keys)),
|
||||
})
|
||||
resp, loc, err := c.sendReq(startKey, req, false)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
if resp.Resp == nil {
|
||||
return nil, nil, errors.Trace(tikverr.ErrBodyMissing)
|
||||
}
|
||||
cmdResp := resp.Resp.(*kvrpcpb.RawScanResponse)
|
||||
for _, pair := range cmdResp.Kvs {
|
||||
keys = append(keys, pair.Key)
|
||||
values = append(values, pair.Value)
|
||||
}
|
||||
startKey = loc.EndKey
|
||||
if len(startKey) == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ReverseScan queries continuous kv pairs in range [endKey, startKey), up to limit pairs.
|
||||
// Direction is different from Scan, upper to lower.
|
||||
// If endKey is empty, it means unbounded.
|
||||
// If you want to include the startKey or exclude the endKey, push a '\0' to the key. For example, to scan
|
||||
// (endKey, startKey], you can write:
|
||||
// `ReverseScan(push(startKey, '\0'), push(endKey, '\0'), limit)`.
|
||||
// It doesn't support Scanning from "", because locating the last Region is not yet implemented.
|
||||
func (c *RawKVClient) ReverseScan(startKey, endKey []byte, limit int) (keys [][]byte, values [][]byte, err error) {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
metrics.RawkvCmdHistogramWithRawReversScan.Observe(time.Since(start).Seconds())
|
||||
}()
|
||||
|
||||
if limit > MaxRawKVScanLimit {
|
||||
return nil, nil, errors.Trace(ErrMaxScanLimitExceeded)
|
||||
}
|
||||
|
||||
for len(keys) < limit && bytes.Compare(startKey, endKey) > 0 {
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdRawScan, &kvrpcpb.RawScanRequest{
|
||||
StartKey: startKey,
|
||||
EndKey: endKey,
|
||||
Limit: uint32(limit - len(keys)),
|
||||
Reverse: true,
|
||||
})
|
||||
resp, loc, err := c.sendReq(startKey, req, true)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
if resp.Resp == nil {
|
||||
return nil, nil, errors.Trace(tikverr.ErrBodyMissing)
|
||||
}
|
||||
cmdResp := resp.Resp.(*kvrpcpb.RawScanResponse)
|
||||
for _, pair := range cmdResp.Kvs {
|
||||
keys = append(keys, pair.Key)
|
||||
values = append(values, pair.Value)
|
||||
}
|
||||
startKey = loc.StartKey
|
||||
if len(startKey) == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c *RawKVClient) sendReq(key []byte, req *tikvrpc.Request, reverse bool) (*tikvrpc.Response, *locate.KeyLocation, error) {
|
||||
bo := retry.NewBackofferWithVars(context.Background(), rawkvMaxBackoff, nil)
|
||||
sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient)
|
||||
for {
|
||||
var loc *locate.KeyLocation
|
||||
var err error
|
||||
if reverse {
|
||||
loc, err = c.regionCache.LocateEndKey(bo, key)
|
||||
} else {
|
||||
loc, err = c.regionCache.LocateKey(bo, key)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
resp, err := sender.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
if regionErr != nil {
|
||||
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
return resp, loc, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *RawKVClient) sendBatchReq(bo *Backoffer, keys [][]byte, cmdType tikvrpc.CmdType) (*tikvrpc.Response, error) { // split the keys
|
||||
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
var batches []kvrpc.Batch
|
||||
for regionID, groupKeys := range groups {
|
||||
batches = kvrpc.AppendKeyBatches(batches, regionID, groupKeys, rawBatchPairCount)
|
||||
}
|
||||
bo, cancel := bo.Fork()
|
||||
ches := make(chan kvrpc.BatchResult, len(batches))
|
||||
for _, batch := range batches {
|
||||
batch1 := batch
|
||||
go func() {
|
||||
singleBatchBackoffer, singleBatchCancel := bo.Fork()
|
||||
defer singleBatchCancel()
|
||||
ches <- c.doBatchReq(singleBatchBackoffer, batch1, cmdType)
|
||||
}()
|
||||
}
|
||||
|
||||
var firstError error
|
||||
var resp *tikvrpc.Response
|
||||
switch cmdType {
|
||||
case tikvrpc.CmdRawBatchGet:
|
||||
resp = &tikvrpc.Response{Resp: &kvrpcpb.RawBatchGetResponse{}}
|
||||
case tikvrpc.CmdRawBatchDelete:
|
||||
resp = &tikvrpc.Response{Resp: &kvrpcpb.RawBatchDeleteResponse{}}
|
||||
}
|
||||
for i := 0; i < len(batches); i++ {
|
||||
singleResp, ok := <-ches
|
||||
if ok {
|
||||
if singleResp.Error != nil {
|
||||
cancel()
|
||||
if firstError == nil {
|
||||
firstError = singleResp.Error
|
||||
}
|
||||
} else if cmdType == tikvrpc.CmdRawBatchGet {
|
||||
cmdResp := singleResp.Resp.(*kvrpcpb.RawBatchGetResponse)
|
||||
resp.Resp.(*kvrpcpb.RawBatchGetResponse).Pairs = append(resp.Resp.(*kvrpcpb.RawBatchGetResponse).Pairs, cmdResp.Pairs...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return resp, firstError
|
||||
}
|
||||
|
||||
func (c *RawKVClient) doBatchReq(bo *Backoffer, batch kvrpc.Batch, cmdType tikvrpc.CmdType) kvrpc.BatchResult {
|
||||
var req *tikvrpc.Request
|
||||
switch cmdType {
|
||||
case tikvrpc.CmdRawBatchGet:
|
||||
req = tikvrpc.NewRequest(cmdType, &kvrpcpb.RawBatchGetRequest{
|
||||
Keys: batch.Keys,
|
||||
})
|
||||
case tikvrpc.CmdRawBatchDelete:
|
||||
req = tikvrpc.NewRequest(cmdType, &kvrpcpb.RawBatchDeleteRequest{
|
||||
Keys: batch.Keys,
|
||||
})
|
||||
}
|
||||
|
||||
sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient)
|
||||
resp, err := sender.SendReq(bo, req, batch.RegionID, client.ReadTimeoutShort)
|
||||
|
||||
batchResp := kvrpc.BatchResult{}
|
||||
if err != nil {
|
||||
batchResp.Error = errors.Trace(err)
|
||||
return batchResp
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
batchResp.Error = errors.Trace(err)
|
||||
return batchResp
|
||||
}
|
||||
if regionErr != nil {
|
||||
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
batchResp.Error = errors.Trace(err)
|
||||
return batchResp
|
||||
}
|
||||
resp, err = c.sendBatchReq(bo, batch.Keys, cmdType)
|
||||
batchResp.Response = resp
|
||||
batchResp.Error = err
|
||||
return batchResp
|
||||
}
|
||||
|
||||
switch cmdType {
|
||||
case tikvrpc.CmdRawBatchGet:
|
||||
batchResp.Response = resp
|
||||
case tikvrpc.CmdRawBatchDelete:
|
||||
if resp.Resp == nil {
|
||||
batchResp.Error = errors.Trace(tikverr.ErrBodyMissing)
|
||||
return batchResp
|
||||
}
|
||||
cmdResp := resp.Resp.(*kvrpcpb.RawBatchDeleteResponse)
|
||||
if cmdResp.GetError() != "" {
|
||||
batchResp.Error = errors.New(cmdResp.GetError())
|
||||
return batchResp
|
||||
}
|
||||
batchResp.Response = resp
|
||||
}
|
||||
return batchResp
|
||||
}
|
||||
|
||||
// sendDeleteRangeReq sends a raw delete range request and returns the response and the actual endKey.
|
||||
// If the given range spans over more than one regions, the actual endKey is the end of the first region.
|
||||
// We can't use sendReq directly, because we need to know the end of the region before we send the request
|
||||
// TODO: Is there any better way to avoid duplicating code with func `sendReq` ?
|
||||
func (c *RawKVClient) sendDeleteRangeReq(startKey []byte, endKey []byte) (*tikvrpc.Response, []byte, error) {
|
||||
bo := retry.NewBackofferWithVars(context.Background(), rawkvMaxBackoff, nil)
|
||||
sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient)
|
||||
for {
|
||||
loc, err := c.regionCache.LocateKey(bo, startKey)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
actualEndKey := endKey
|
||||
if len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, endKey) < 0 {
|
||||
actualEndKey = loc.EndKey
|
||||
}
|
||||
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdRawDeleteRange, &kvrpcpb.RawDeleteRangeRequest{
|
||||
StartKey: startKey,
|
||||
EndKey: actualEndKey,
|
||||
})
|
||||
|
||||
resp, err := sender.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
if regionErr != nil {
|
||||
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
return resp, actualEndKey, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *RawKVClient) sendBatchPut(bo *Backoffer, keys, values [][]byte) error {
|
||||
keyToValue := make(map[string][]byte, len(keys))
|
||||
for i, key := range keys {
|
||||
keyToValue[string(key)] = values[i]
|
||||
}
|
||||
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
var batches []kvrpc.Batch
|
||||
// split the keys by size and RegionVerID
|
||||
for regionID, groupKeys := range groups {
|
||||
batches = kvrpc.AppendBatches(batches, regionID, groupKeys, keyToValue, rawBatchPutSize)
|
||||
}
|
||||
bo, cancel := bo.Fork()
|
||||
ch := make(chan error, len(batches))
|
||||
for _, batch := range batches {
|
||||
batch1 := batch
|
||||
go func() {
|
||||
singleBatchBackoffer, singleBatchCancel := bo.Fork()
|
||||
defer singleBatchCancel()
|
||||
ch <- c.doBatchPut(singleBatchBackoffer, batch1)
|
||||
}()
|
||||
}
|
||||
|
||||
for i := 0; i < len(batches); i++ {
|
||||
if e := <-ch; e != nil {
|
||||
cancel()
|
||||
// catch the first error
|
||||
if err == nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
}
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
func (c *RawKVClient) doBatchPut(bo *Backoffer, batch kvrpc.Batch) error {
|
||||
kvPair := make([]*kvrpcpb.KvPair, 0, len(batch.Keys))
|
||||
for i, key := range batch.Keys {
|
||||
kvPair = append(kvPair, &kvrpcpb.KvPair{Key: key, Value: batch.Values[i]})
|
||||
}
|
||||
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdRawBatchPut, &kvrpcpb.RawBatchPutRequest{Pairs: kvPair})
|
||||
|
||||
sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient)
|
||||
resp, err := sender.SendReq(bo, req, batch.RegionID, client.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if regionErr != nil {
|
||||
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
// recursive call
|
||||
return c.sendBatchPut(bo, batch.Keys, batch.Values)
|
||||
}
|
||||
|
||||
if resp.Resp == nil {
|
||||
return errors.Trace(tikverr.ErrBodyMissing)
|
||||
}
|
||||
cmdResp := resp.Resp.(*kvrpcpb.RawBatchPutResponse)
|
||||
if cmdResp.GetError() != "" {
|
||||
return errors.New(cmdResp.GetError())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -575,33 +575,3 @@ func (c ConfigProbe) StorePreSplitSizeThreshold(v uint32) {
|
|||
func (c ConfigProbe) SetOracleUpdateInterval(v int) {
|
||||
oracleUpdateInterval = v
|
||||
}
|
||||
|
||||
// GetRawBatchPutSize returns the raw batch put size config.
|
||||
func (c ConfigProbe) GetRawBatchPutSize() int {
|
||||
return rawBatchPutSize
|
||||
}
|
||||
|
||||
// RawKVClientProbe wraps RawKVClient and exposes internal states for testing purpose.
|
||||
type RawKVClientProbe struct {
|
||||
*RawKVClient
|
||||
}
|
||||
|
||||
// GetRegionCache returns the internal region cache container.
|
||||
func (c RawKVClientProbe) GetRegionCache() *locate.RegionCache {
|
||||
return c.regionCache
|
||||
}
|
||||
|
||||
// SetRegionCache resets the internal region cache container.
|
||||
func (c RawKVClientProbe) SetRegionCache(regionCache *locate.RegionCache) {
|
||||
c.regionCache = regionCache
|
||||
}
|
||||
|
||||
// SetPDClient resets the interval PD client.
|
||||
func (c RawKVClientProbe) SetPDClient(client pd.Client) {
|
||||
c.pdClient = client
|
||||
}
|
||||
|
||||
// SetRPCClient resets the internal RPC client.
|
||||
func (c RawKVClientProbe) SetRPCClient(client Client) {
|
||||
c.rpcClient = client
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue