mirror of https://github.com/tikv/client-go.git
1006 lines
30 KiB
Go
1006 lines
30 KiB
Go
// Copyright 2021 TiKV Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// NOTE: The code in this file is based on code from the
|
|
// TiDB project, licensed under the Apache License v 2.0
|
|
//
|
|
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/rawkv.go
|
|
//
|
|
|
|
// Copyright 2016 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package rawkv
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
|
"github.com/pkg/errors"
|
|
"github.com/tikv/client-go/v2/config"
|
|
"github.com/tikv/client-go/v2/config/retry"
|
|
tikverr "github.com/tikv/client-go/v2/error"
|
|
"github.com/tikv/client-go/v2/internal/client"
|
|
"github.com/tikv/client-go/v2/internal/kvrpc"
|
|
"github.com/tikv/client-go/v2/internal/locate"
|
|
"github.com/tikv/client-go/v2/metrics"
|
|
"github.com/tikv/client-go/v2/oracle"
|
|
"github.com/tikv/client-go/v2/tikv"
|
|
"github.com/tikv/client-go/v2/tikvrpc"
|
|
pd "github.com/tikv/pd/client"
|
|
"github.com/tikv/pd/client/opt"
|
|
"github.com/tikv/pd/client/pkg/caller"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
var (
|
|
// MaxRawKVScanLimit is the maximum scan limit for rawkv Scan.
|
|
MaxRawKVScanLimit = 10240
|
|
// ErrMaxScanLimitExceeded is returned when the limit for rawkv Scan is to large.
|
|
ErrMaxScanLimitExceeded = errors.New("limit should be less than MaxRawKVScanLimit")
|
|
)
|
|
|
|
const (
|
|
// rawBatchPutSize is the maximum size limit for rawkv each batch put request.
|
|
rawBatchPutSize = 16 * 1024
|
|
// rawBatchPairCount is the maximum limit for rawkv each batch get/delete request.
|
|
rawBatchPairCount = 512
|
|
componentName = caller.Component("rawkv-client-go")
|
|
)
|
|
|
|
type rawOptions struct {
|
|
// ColumnFamily filed is used for manipulate kv in specified column family
|
|
ColumnFamily string
|
|
|
|
// This field is used for Scan()/ReverseScan().
|
|
KeyOnly bool
|
|
}
|
|
|
|
// RawChecksum represents the checksum result of raw kv pairs in TiKV cluster.
|
|
type RawChecksum struct {
|
|
// Crc64Xor is the checksum result with crc64 algorithm
|
|
Crc64Xor uint64
|
|
// TotalKvs is the total number of kvpairs
|
|
TotalKvs uint64
|
|
// TotalBytes is the total bytes of kvpairs, including prefix in APIV2
|
|
TotalBytes uint64
|
|
}
|
|
|
|
// RawOption represents possible options that can be cotrolled by the user
|
|
// to tweak the API behavior.
|
|
//
|
|
// Available options are:
|
|
// - ScanColumnFamily
|
|
// - ScanKeyOnly
|
|
type RawOption interface {
|
|
apply(opts *rawOptions)
|
|
}
|
|
|
|
type rawOptionFunc func(opts *rawOptions)
|
|
|
|
func (f rawOptionFunc) apply(opts *rawOptions) {
|
|
f(opts)
|
|
}
|
|
|
|
// SetColumnFamily is a RawkvOption to only manipulate the k-v in specified column family
|
|
func SetColumnFamily(cf string) RawOption {
|
|
return rawOptionFunc(func(opts *rawOptions) {
|
|
opts.ColumnFamily = cf
|
|
})
|
|
}
|
|
|
|
// ScanKeyOnly is a rawkvOptions that tells the scanner to only returns
|
|
// keys and omit the values.
|
|
// It can work only in API scan().
|
|
func ScanKeyOnly() RawOption {
|
|
return rawOptionFunc(func(opts *rawOptions) {
|
|
opts.KeyOnly = true
|
|
})
|
|
}
|
|
|
|
// Client is a client of TiKV server which is used as a key-value storage,
|
|
// only GET/PUT/DELETE commands are supported.
|
|
type Client struct {
|
|
apiVersion kvrpcpb.APIVersion
|
|
clusterID uint64
|
|
regionCache *locate.RegionCache
|
|
pdClient pd.Client
|
|
rpcClient client.Client
|
|
cf string
|
|
atomic bool
|
|
}
|
|
|
|
type option struct {
|
|
apiVersion kvrpcpb.APIVersion
|
|
security config.Security
|
|
gRPCDialOptions []grpc.DialOption
|
|
pdOptions []opt.ClientOption
|
|
keyspace string
|
|
}
|
|
|
|
// ClientOpt is factory to set the client options.
|
|
type ClientOpt func(*option)
|
|
|
|
// WithPDOptions is used to set the opt.ClientOption
|
|
func WithPDOptions(opts ...opt.ClientOption) ClientOpt {
|
|
return func(o *option) {
|
|
o.pdOptions = append(o.pdOptions, opts...)
|
|
}
|
|
}
|
|
|
|
// WithSecurity is used to set the config.Security
|
|
func WithSecurity(security config.Security) ClientOpt {
|
|
return func(o *option) {
|
|
o.security = security
|
|
}
|
|
}
|
|
|
|
// WithGRPCDialOptions is used to set the grpc.DialOption.
|
|
func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOpt {
|
|
return func(o *option) {
|
|
o.gRPCDialOptions = append(o.gRPCDialOptions, opts...)
|
|
}
|
|
}
|
|
|
|
// WithAPIVersion is used to set the api version.
|
|
func WithAPIVersion(apiVersion kvrpcpb.APIVersion) ClientOpt {
|
|
return func(o *option) {
|
|
o.apiVersion = apiVersion
|
|
}
|
|
}
|
|
|
|
// WithKeyspace is used to set the keyspace Name.
|
|
func WithKeyspace(name string) ClientOpt {
|
|
return func(o *option) {
|
|
o.keyspace = name
|
|
}
|
|
}
|
|
|
|
// SetAtomicForCAS sets atomic mode for CompareAndSwap
|
|
func (c *Client) SetAtomicForCAS(b bool) *Client {
|
|
c.atomic = b
|
|
return c
|
|
}
|
|
|
|
// SetColumnFamily sets columnFamily for client
|
|
func (c *Client) SetColumnFamily(columnFamily string) *Client {
|
|
c.cf = columnFamily
|
|
return c
|
|
}
|
|
|
|
// NewClient creates a client with PD cluster addrs.
|
|
func NewClient(ctx context.Context, pdAddrs []string, security config.Security, opts ...opt.ClientOption) (*Client, error) {
|
|
return NewClientWithOpts(ctx, pdAddrs, WithSecurity(security), WithPDOptions(opts...))
|
|
}
|
|
|
|
// NewClientWithOpts creates a client with PD cluster addrs and client options.
|
|
func NewClientWithOpts(ctx context.Context, pdAddrs []string, opts ...ClientOpt) (*Client, error) {
|
|
opt := &option{}
|
|
for _, o := range opts {
|
|
o(opt)
|
|
}
|
|
|
|
// Use an unwrapped PDClient to obtain keyspace meta.
|
|
pdCli, err := pd.NewClientWithContext(ctx, componentName, pdAddrs, pd.SecurityOption{
|
|
CAPath: opt.security.ClusterSSLCA,
|
|
CertPath: opt.security.ClusterSSLCert,
|
|
KeyPath: opt.security.ClusterSSLKey,
|
|
}, opt.pdOptions...)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
// Build a CodecPDClient
|
|
var codecCli *tikv.CodecPDClient
|
|
|
|
switch opt.apiVersion {
|
|
case kvrpcpb.APIVersion_V1, kvrpcpb.APIVersion_V1TTL:
|
|
codecCli = locate.NewCodecPDClient(tikv.ModeRaw, pdCli)
|
|
case kvrpcpb.APIVersion_V2:
|
|
codecCli, err = tikv.NewCodecPDClientWithKeyspace(tikv.ModeRaw, pdCli, opt.keyspace)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
default:
|
|
return nil, errors.Errorf("unknown api version: %d", opt.apiVersion)
|
|
}
|
|
|
|
pdCli = codecCli
|
|
|
|
rpcCli := client.NewRPCClient(
|
|
client.WithSecurity(opt.security),
|
|
client.WithGRPCDialOptions(opt.gRPCDialOptions...),
|
|
client.WithCodec(codecCli.GetCodec()),
|
|
)
|
|
|
|
return &Client{
|
|
apiVersion: opt.apiVersion,
|
|
clusterID: pdCli.GetClusterID(ctx),
|
|
regionCache: locate.NewRegionCache(pdCli),
|
|
pdClient: pdCli.WithCallerComponent(componentName),
|
|
rpcClient: rpcCli,
|
|
}, nil
|
|
}
|
|
|
|
// Close closes the client.
|
|
func (c *Client) Close() error {
|
|
if c.pdClient != nil {
|
|
c.pdClient.Close()
|
|
}
|
|
if c.regionCache != nil {
|
|
c.regionCache.Close()
|
|
}
|
|
if c.rpcClient == nil {
|
|
return nil
|
|
}
|
|
return c.rpcClient.Close()
|
|
}
|
|
|
|
// ClusterID returns the TiKV cluster ID.
|
|
func (c *Client) ClusterID() uint64 {
|
|
return c.clusterID
|
|
}
|
|
|
|
// Get queries value with the key. When the key does not exist, it returns `nil, nil`.
|
|
func (c *Client) Get(ctx context.Context, key []byte, options ...RawOption) ([]byte, error) {
|
|
start := time.Now()
|
|
defer func() { metrics.RawkvCmdHistogramWithGet.Observe(time.Since(start).Seconds()) }()
|
|
|
|
opts := c.getRawKVOptions(options...)
|
|
req := tikvrpc.NewRequest(
|
|
tikvrpc.CmdRawGet,
|
|
&kvrpcpb.RawGetRequest{
|
|
Key: key,
|
|
Cf: c.getColumnFamily(opts),
|
|
})
|
|
resp, _, err := c.sendReq(ctx, key, req, false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if resp.Resp == nil {
|
|
return nil, errors.WithStack(tikverr.ErrBodyMissing)
|
|
}
|
|
cmdResp := resp.Resp.(*kvrpcpb.RawGetResponse)
|
|
if cmdResp.GetError() != "" {
|
|
return nil, errors.New(cmdResp.GetError())
|
|
}
|
|
if cmdResp.NotFound {
|
|
return nil, nil
|
|
}
|
|
return convertNilToEmptySlice(cmdResp.Value), nil
|
|
}
|
|
|
|
const rawkvMaxBackoff = 20000
|
|
|
|
// BatchGet queries values with the keys.
|
|
func (c *Client) BatchGet(ctx context.Context, keys [][]byte, options ...RawOption) ([][]byte, error) {
|
|
start := time.Now()
|
|
defer func() {
|
|
metrics.RawkvCmdHistogramWithBatchGet.Observe(time.Since(start).Seconds())
|
|
}()
|
|
|
|
opts := c.getRawKVOptions(options...)
|
|
bo := retry.NewBackofferWithVars(ctx, rawkvMaxBackoff, nil)
|
|
resp, err := c.sendBatchReq(bo, keys, opts, tikvrpc.CmdRawBatchGet)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if resp.Resp == nil {
|
|
return nil, errors.WithStack(tikverr.ErrBodyMissing)
|
|
}
|
|
cmdResp := resp.Resp.(*kvrpcpb.RawBatchGetResponse)
|
|
|
|
keyToValue := make(map[string][]byte, len(keys))
|
|
for _, pair := range cmdResp.Pairs {
|
|
keyToValue[string(pair.Key)] = pair.Value
|
|
}
|
|
|
|
values := make([][]byte, len(keys))
|
|
for i, key := range keys {
|
|
v, ok := keyToValue[string(key)]
|
|
if ok {
|
|
v = convertNilToEmptySlice(v)
|
|
}
|
|
values[i] = v
|
|
}
|
|
return values, nil
|
|
}
|
|
|
|
// PutWithTTL stores a key-value pair to TiKV with a time-to-live duration.
|
|
func (c *Client) PutWithTTL(ctx context.Context, key, value []byte, ttl uint64, options ...RawOption) error {
|
|
start := time.Now()
|
|
defer func() { metrics.RawkvCmdHistogramWithBatchPut.Observe(time.Since(start).Seconds()) }()
|
|
metrics.RawkvSizeHistogramWithKey.Observe(float64(len(key)))
|
|
metrics.RawkvSizeHistogramWithValue.Observe(float64(len(value)))
|
|
|
|
opts := c.getRawKVOptions(options...)
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
|
|
Key: key,
|
|
Value: value,
|
|
Ttl: ttl,
|
|
Cf: c.getColumnFamily(opts),
|
|
ForCas: c.atomic,
|
|
})
|
|
resp, _, err := c.sendReq(ctx, key, req, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if resp.Resp == nil {
|
|
return errors.WithStack(tikverr.ErrBodyMissing)
|
|
}
|
|
cmdResp := resp.Resp.(*kvrpcpb.RawPutResponse)
|
|
if cmdResp.GetError() != "" {
|
|
return errors.New(cmdResp.GetError())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetKeyTTL get the TTL of a raw key from TiKV if key exists
|
|
func (c *Client) GetKeyTTL(ctx context.Context, key []byte, options ...RawOption) (*uint64, error) {
|
|
var ttl uint64
|
|
metrics.RawkvSizeHistogramWithKey.Observe(float64(len(key)))
|
|
|
|
opts := c.getRawKVOptions(options...)
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdGetKeyTTL, &kvrpcpb.RawGetKeyTTLRequest{
|
|
Key: key,
|
|
Cf: c.getColumnFamily(opts),
|
|
})
|
|
resp, _, err := c.sendReq(ctx, key, req, false)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if resp.Resp == nil {
|
|
return nil, errors.WithStack(tikverr.ErrBodyMissing)
|
|
}
|
|
|
|
cmdResp := resp.Resp.(*kvrpcpb.RawGetKeyTTLResponse)
|
|
if cmdResp.GetError() != "" {
|
|
return nil, errors.New(cmdResp.GetError())
|
|
}
|
|
|
|
if cmdResp.GetNotFound() {
|
|
return nil, nil
|
|
}
|
|
|
|
ttl = cmdResp.GetTtl()
|
|
return &ttl, nil
|
|
}
|
|
|
|
// GetPDClient returns the PD client.
|
|
func (c *Client) GetPDClient() pd.Client {
|
|
return c.pdClient
|
|
}
|
|
|
|
// Put stores a key-value pair to TiKV.
|
|
func (c *Client) Put(ctx context.Context, key, value []byte, options ...RawOption) error {
|
|
return c.PutWithTTL(ctx, key, value, 0, options...)
|
|
}
|
|
|
|
// BatchPut stores key-value pairs to TiKV.
|
|
func (c *Client) BatchPut(ctx context.Context, keys, values [][]byte, options ...RawOption) error {
|
|
return c.BatchPutWithTTL(ctx, keys, values, nil, options...)
|
|
}
|
|
|
|
// BatchPutWithTTL stores key-values pairs to TiKV with time-to-live durations.
|
|
func (c *Client) BatchPutWithTTL(ctx context.Context, keys, values [][]byte, ttls []uint64, options ...RawOption) error {
|
|
start := time.Now()
|
|
defer func() {
|
|
metrics.RawkvCmdHistogramWithBatchPut.Observe(time.Since(start).Seconds())
|
|
}()
|
|
|
|
if len(keys) != len(values) {
|
|
return errors.New("the len of keys is not equal to the len of values")
|
|
}
|
|
if len(ttls) > 0 && len(keys) != len(ttls) {
|
|
return errors.New("the len of ttls is not equal to the len of values")
|
|
}
|
|
bo := retry.NewBackofferWithVars(ctx, rawkvMaxBackoff, nil)
|
|
opts := c.getRawKVOptions(options...)
|
|
err := c.sendBatchPut(bo, keys, values, ttls, opts)
|
|
return err
|
|
}
|
|
|
|
// Delete deletes a key-value pair from TiKV.
|
|
func (c *Client) Delete(ctx context.Context, key []byte, options ...RawOption) error {
|
|
start := time.Now()
|
|
defer func() { metrics.RawkvCmdHistogramWithDelete.Observe(time.Since(start).Seconds()) }()
|
|
|
|
opts := c.getRawKVOptions(options...)
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdRawDelete, &kvrpcpb.RawDeleteRequest{
|
|
Key: key,
|
|
Cf: c.getColumnFamily(opts),
|
|
ForCas: c.atomic,
|
|
})
|
|
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
|
|
resp, _, err := c.sendReq(ctx, key, req, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if resp.Resp == nil {
|
|
return errors.WithStack(tikverr.ErrBodyMissing)
|
|
}
|
|
cmdResp := resp.Resp.(*kvrpcpb.RawDeleteResponse)
|
|
if cmdResp.GetError() != "" {
|
|
return errors.New(cmdResp.GetError())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// BatchDelete deletes key-value pairs from TiKV.
|
|
func (c *Client) BatchDelete(ctx context.Context, keys [][]byte, options ...RawOption) error {
|
|
start := time.Now()
|
|
defer func() {
|
|
metrics.RawkvCmdHistogramWithBatchDelete.Observe(time.Since(start).Seconds())
|
|
}()
|
|
|
|
bo := retry.NewBackofferWithVars(ctx, rawkvMaxBackoff, nil)
|
|
opts := c.getRawKVOptions(options...)
|
|
resp, err := c.sendBatchReq(bo, keys, opts, tikvrpc.CmdRawBatchDelete)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if resp.Resp == nil {
|
|
return errors.WithStack(tikverr.ErrBodyMissing)
|
|
}
|
|
cmdResp := resp.Resp.(*kvrpcpb.RawBatchDeleteResponse)
|
|
if cmdResp.GetError() != "" {
|
|
return errors.New(cmdResp.GetError())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DeleteRange deletes all key-value pairs in the [startKey, endKey) range from TiKV.
|
|
func (c *Client) DeleteRange(ctx context.Context, startKey []byte, endKey []byte, options ...RawOption) error {
|
|
start := time.Now()
|
|
var err error
|
|
defer func() {
|
|
var label = "delete_range"
|
|
if err != nil {
|
|
label += "_error"
|
|
}
|
|
metrics.TiKVRawkvCmdHistogram.WithLabelValues(label).Observe(time.Since(start).Seconds())
|
|
}()
|
|
|
|
// Process each affected region respectively
|
|
for !bytes.Equal(startKey, endKey) {
|
|
opts := c.getRawKVOptions(options...)
|
|
var resp *tikvrpc.Response
|
|
var actualEndKey []byte
|
|
resp, actualEndKey, err = c.sendDeleteRangeReq(ctx, startKey, endKey, opts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if resp.Resp == nil {
|
|
return errors.WithStack(tikverr.ErrBodyMissing)
|
|
}
|
|
cmdResp := resp.Resp.(*kvrpcpb.RawDeleteRangeResponse)
|
|
if cmdResp.GetError() != "" {
|
|
return errors.New(cmdResp.GetError())
|
|
}
|
|
startKey = actualEndKey
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Scan queries continuous kv pairs in range [startKey, endKey), up to limit pairs.
|
|
// The returned keys are in lexicographical order.
|
|
// If endKey is empty, it means unbounded.
|
|
// If you want to exclude the startKey or include the endKey, push a '\0' to the key. For example, to scan
|
|
// (startKey, endKey], you can write:
|
|
// `Scan(ctx, push(startKey, '\0'), push(endKey, '\0'), limit)`.
|
|
func (c *Client) Scan(ctx context.Context, startKey, endKey []byte, limit int, options ...RawOption,
|
|
) (keys [][]byte, values [][]byte, err error) {
|
|
start := time.Now()
|
|
defer func() { metrics.RawkvCmdHistogramWithRawScan.Observe(time.Since(start).Seconds()) }()
|
|
|
|
if limit > MaxRawKVScanLimit {
|
|
return nil, nil, errors.WithStack(ErrMaxScanLimitExceeded)
|
|
}
|
|
|
|
opts := c.getRawKVOptions(options...)
|
|
|
|
for len(keys) < limit && (len(endKey) == 0 || bytes.Compare(startKey, endKey) < 0) {
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdRawScan, &kvrpcpb.RawScanRequest{
|
|
StartKey: startKey,
|
|
EndKey: endKey,
|
|
Limit: uint32(limit - len(keys)),
|
|
KeyOnly: opts.KeyOnly,
|
|
Cf: c.getColumnFamily(opts),
|
|
})
|
|
resp, loc, err := c.sendReq(ctx, startKey, req, false)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if resp.Resp == nil {
|
|
return nil, nil, errors.WithStack(tikverr.ErrBodyMissing)
|
|
}
|
|
cmdResp := resp.Resp.(*kvrpcpb.RawScanResponse)
|
|
for _, pair := range cmdResp.Kvs {
|
|
keys = append(keys, pair.Key)
|
|
values = append(values, convertNilToEmptySlice(pair.Value))
|
|
}
|
|
startKey = loc.EndKey
|
|
if len(startKey) == 0 {
|
|
break
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// ReverseScan queries continuous kv pairs in range [endKey, startKey),
|
|
// from startKey(upperBound) to endKey(lowerBound), up to limit pairs.
|
|
// The returned keys are in reversed lexicographical order.
|
|
// If endKey is empty, it means unbounded.
|
|
// If you want to include the startKey or exclude the endKey, push a '\0' to the key. For example, to scan
|
|
// (endKey, startKey], you can write:
|
|
// `ReverseScan(ctx, push(startKey, '\0'), push(endKey, '\0'), limit)`.
|
|
// It doesn't support Scanning from "", because locating the last Region is not yet implemented.
|
|
func (c *Client) ReverseScan(ctx context.Context, startKey, endKey []byte, limit int, options ...RawOption) (keys [][]byte, values [][]byte, err error) {
|
|
start := time.Now()
|
|
defer func() {
|
|
metrics.RawkvCmdHistogramWithRawReversScan.Observe(time.Since(start).Seconds())
|
|
}()
|
|
|
|
if limit > MaxRawKVScanLimit {
|
|
return nil, nil, errors.WithStack(ErrMaxScanLimitExceeded)
|
|
}
|
|
|
|
opts := c.getRawKVOptions(options...)
|
|
|
|
for len(keys) < limit && bytes.Compare(startKey, endKey) > 0 {
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdRawScan, &kvrpcpb.RawScanRequest{
|
|
StartKey: startKey,
|
|
EndKey: endKey,
|
|
Limit: uint32(limit - len(keys)),
|
|
Reverse: true,
|
|
KeyOnly: opts.KeyOnly,
|
|
Cf: c.getColumnFamily(opts),
|
|
})
|
|
resp, loc, err := c.sendReq(ctx, startKey, req, true)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if resp.Resp == nil {
|
|
return nil, nil, errors.WithStack(tikverr.ErrBodyMissing)
|
|
}
|
|
cmdResp := resp.Resp.(*kvrpcpb.RawScanResponse)
|
|
for _, pair := range cmdResp.Kvs {
|
|
keys = append(keys, pair.Key)
|
|
values = append(values, convertNilToEmptySlice(pair.Value))
|
|
}
|
|
startKey = loc.StartKey
|
|
if len(startKey) == 0 {
|
|
break
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// Checksum do checksum of continuous kv pairs in range [startKey, endKey).
|
|
// If endKey is empty, it means unbounded.
|
|
// If you want to exclude the startKey or include the endKey, push a '\0' to the key. For example, to scan
|
|
// (startKey, endKey], you can write:
|
|
// `Checksum(ctx, push(startKey, '\0'), push(endKey, '\0'))`.
|
|
func (c *Client) Checksum(ctx context.Context, startKey, endKey []byte, options ...RawOption,
|
|
) (check RawChecksum, err error) {
|
|
|
|
start := time.Now()
|
|
defer func() { metrics.RawkvCmdHistogramWithRawChecksum.Observe(time.Since(start).Seconds()) }()
|
|
|
|
for len(endKey) == 0 || bytes.Compare(startKey, endKey) < 0 {
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdRawChecksum, &kvrpcpb.RawChecksumRequest{
|
|
Algorithm: kvrpcpb.ChecksumAlgorithm_Crc64_Xor,
|
|
Ranges: []*kvrpcpb.KeyRange{{
|
|
StartKey: startKey,
|
|
EndKey: endKey,
|
|
}},
|
|
})
|
|
resp, loc, err := c.sendReq(ctx, startKey, req, false)
|
|
if err != nil {
|
|
return RawChecksum{0, 0, 0}, err
|
|
}
|
|
if resp.Resp == nil {
|
|
return RawChecksum{0, 0, 0}, errors.WithStack(tikverr.ErrBodyMissing)
|
|
}
|
|
cmdResp := resp.Resp.(*kvrpcpb.RawChecksumResponse)
|
|
check.Crc64Xor ^= cmdResp.GetChecksum()
|
|
check.TotalKvs += cmdResp.GetTotalKvs()
|
|
check.TotalBytes += cmdResp.GetTotalBytes()
|
|
startKey = loc.EndKey
|
|
if len(startKey) == 0 {
|
|
break
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// CompareAndSwap results in an atomic compare-and-set operation for the given key while SetAtomicForCAS(true)
|
|
// If the value retrieved is equal to previousValue, newValue is written.
|
|
// It returns the previous value and whether the value is successfully swapped.
|
|
//
|
|
// If SetAtomicForCAS(false), it will returns an error because
|
|
// CAS operations enforce the client should operate in atomic mode.
|
|
//
|
|
// NOTE: This feature is experimental. It depends on the single-row transaction mechanism of TiKV which is conflict
|
|
// with the normal write operation in rawkv mode. If multiple clients exist, it's up to the clients the sync the atomic mode flag.
|
|
// If some clients write in atomic mode but the other don't, the linearizability of TiKV will be violated.
|
|
func (c *Client) CompareAndSwap(ctx context.Context, key, previousValue, newValue []byte, options ...RawOption) ([]byte, bool, error) {
|
|
if !c.atomic {
|
|
return nil, false, errors.New("using CompareAndSwap without enable atomic mode")
|
|
}
|
|
|
|
opts := c.getRawKVOptions(options...)
|
|
reqArgs := kvrpcpb.RawCASRequest{
|
|
Key: key,
|
|
Value: newValue,
|
|
Cf: c.getColumnFamily(opts),
|
|
}
|
|
if previousValue == nil {
|
|
reqArgs.PreviousNotExist = true
|
|
} else {
|
|
reqArgs.PreviousValue = previousValue
|
|
}
|
|
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdRawCompareAndSwap, &reqArgs)
|
|
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
|
|
resp, _, err := c.sendReq(ctx, key, req, false)
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
if resp.Resp == nil {
|
|
return nil, false, errors.WithStack(tikverr.ErrBodyMissing)
|
|
}
|
|
|
|
cmdResp := resp.Resp.(*kvrpcpb.RawCASResponse)
|
|
if cmdResp.GetError() != "" {
|
|
return nil, false, errors.New(cmdResp.GetError())
|
|
}
|
|
|
|
if cmdResp.PreviousNotExist {
|
|
return nil, cmdResp.Succeed, nil
|
|
}
|
|
return convertNilToEmptySlice(cmdResp.PreviousValue), cmdResp.Succeed, nil
|
|
}
|
|
|
|
func (c *Client) sendReq(ctx context.Context, key []byte, req *tikvrpc.Request, reverse bool) (*tikvrpc.Response, *locate.KeyLocation, error) {
|
|
bo := retry.NewBackofferWithVars(ctx, rawkvMaxBackoff, nil)
|
|
sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient, oracle.NoopReadTSValidator{})
|
|
for {
|
|
var loc *locate.KeyLocation
|
|
var err error
|
|
if reverse {
|
|
loc, err = c.regionCache.LocateEndKey(bo, key)
|
|
} else {
|
|
loc, err = c.regionCache.LocateKey(bo, key)
|
|
}
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
resp, _, err := sender.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
regionErr, err := resp.GetRegionError()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if regionErr != nil {
|
|
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
continue
|
|
}
|
|
return resp, loc, nil
|
|
}
|
|
}
|
|
|
|
func (c *Client) sendBatchReq(bo *retry.Backoffer, keys [][]byte, options *rawOptions, cmdType tikvrpc.CmdType) (*tikvrpc.Response, error) { // split the keys
|
|
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var batches []kvrpc.Batch
|
|
for regionID, groupKeys := range groups {
|
|
batches = kvrpc.AppendKeyBatches(batches, regionID, groupKeys, rawBatchPairCount)
|
|
}
|
|
forkedBo, cancel := bo.Fork()
|
|
ches := make(chan kvrpc.BatchResult, len(batches))
|
|
var lastForkedBo atomic.Pointer[retry.Backoffer]
|
|
for _, batch := range batches {
|
|
batch1 := batch
|
|
go func() {
|
|
singleBatchBackoffer, singleBatchCancel := forkedBo.Fork()
|
|
defer singleBatchCancel()
|
|
batchResult := c.doBatchReq(singleBatchBackoffer, batch1, options, cmdType)
|
|
lastForkedBo.Store(singleBatchBackoffer)
|
|
ches <- batchResult
|
|
}()
|
|
}
|
|
|
|
var firstError error
|
|
var resp *tikvrpc.Response
|
|
switch cmdType {
|
|
case tikvrpc.CmdRawBatchGet:
|
|
resp = &tikvrpc.Response{Resp: &kvrpcpb.RawBatchGetResponse{}}
|
|
case tikvrpc.CmdRawBatchDelete:
|
|
resp = &tikvrpc.Response{Resp: &kvrpcpb.RawBatchDeleteResponse{}}
|
|
}
|
|
for range batches {
|
|
if singleResp, ok := <-ches; ok {
|
|
if singleResp.Error != nil {
|
|
if firstError == nil {
|
|
firstError = errors.WithStack(singleResp.Error)
|
|
cancel()
|
|
}
|
|
} else if cmdType == tikvrpc.CmdRawBatchGet {
|
|
cmdResp := singleResp.Resp.(*kvrpcpb.RawBatchGetResponse)
|
|
resp.Resp.(*kvrpcpb.RawBatchGetResponse).Pairs = append(resp.Resp.(*kvrpcpb.RawBatchGetResponse).Pairs, cmdResp.Pairs...)
|
|
}
|
|
}
|
|
}
|
|
bo.UpdateUsingForked(lastForkedBo.Load())
|
|
|
|
if firstError == nil {
|
|
cancel()
|
|
}
|
|
return resp, firstError
|
|
}
|
|
|
|
func (c *Client) doBatchReq(bo *retry.Backoffer, batch kvrpc.Batch, options *rawOptions, cmdType tikvrpc.CmdType) kvrpc.BatchResult {
|
|
var req *tikvrpc.Request
|
|
switch cmdType {
|
|
case tikvrpc.CmdRawBatchGet:
|
|
req = tikvrpc.NewRequest(cmdType, &kvrpcpb.RawBatchGetRequest{
|
|
Keys: batch.Keys,
|
|
Cf: c.getColumnFamily(options),
|
|
})
|
|
case tikvrpc.CmdRawBatchDelete:
|
|
req = tikvrpc.NewRequest(cmdType, &kvrpcpb.RawBatchDeleteRequest{
|
|
Keys: batch.Keys,
|
|
Cf: c.getColumnFamily(options),
|
|
ForCas: c.atomic,
|
|
})
|
|
}
|
|
|
|
sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient, oracle.NoopReadTSValidator{})
|
|
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
|
|
resp, _, err := sender.SendReq(bo, req, batch.RegionID, client.ReadTimeoutShort)
|
|
|
|
batchResp := kvrpc.BatchResult{}
|
|
if err != nil {
|
|
batchResp.Error = err
|
|
return batchResp
|
|
}
|
|
regionErr, err := resp.GetRegionError()
|
|
if err != nil {
|
|
batchResp.Error = err
|
|
return batchResp
|
|
}
|
|
if regionErr != nil {
|
|
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
|
if err != nil {
|
|
batchResp.Error = err
|
|
return batchResp
|
|
}
|
|
resp, err = c.sendBatchReq(bo, batch.Keys, options, cmdType)
|
|
batchResp.Response = resp
|
|
batchResp.Error = err
|
|
return batchResp
|
|
}
|
|
|
|
switch cmdType {
|
|
case tikvrpc.CmdRawBatchGet:
|
|
batchResp.Response = resp
|
|
case tikvrpc.CmdRawBatchDelete:
|
|
if resp.Resp == nil {
|
|
batchResp.Error = errors.WithStack(tikverr.ErrBodyMissing)
|
|
return batchResp
|
|
}
|
|
cmdResp := resp.Resp.(*kvrpcpb.RawBatchDeleteResponse)
|
|
if cmdResp.GetError() != "" {
|
|
batchResp.Error = errors.New(cmdResp.GetError())
|
|
return batchResp
|
|
}
|
|
batchResp.Response = resp
|
|
}
|
|
return batchResp
|
|
}
|
|
|
|
// sendDeleteRangeReq sends a raw delete range request and returns the response and the actual endKey.
|
|
// If the given range spans over more than one regions, the actual endKey is the end of the first region.
|
|
// We can't use sendReq directly, because we need to know the end of the region before we send the request
|
|
// TODO: Is there any better way to avoid duplicating code with func `sendReq` ?
|
|
func (c *Client) sendDeleteRangeReq(ctx context.Context, startKey []byte, endKey []byte, opts *rawOptions) (*tikvrpc.Response, []byte, error) {
|
|
bo := retry.NewBackofferWithVars(ctx, rawkvMaxBackoff, nil)
|
|
sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient, oracle.NoopReadTSValidator{})
|
|
for {
|
|
loc, err := c.regionCache.LocateKey(bo, startKey)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
actualEndKey := endKey
|
|
if len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, endKey) < 0 {
|
|
actualEndKey = loc.EndKey
|
|
}
|
|
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdRawDeleteRange, &kvrpcpb.RawDeleteRangeRequest{
|
|
StartKey: startKey,
|
|
EndKey: actualEndKey,
|
|
Cf: c.getColumnFamily(opts),
|
|
})
|
|
|
|
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
|
|
resp, _, err := sender.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
regionErr, err := resp.GetRegionError()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if regionErr != nil {
|
|
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
continue
|
|
}
|
|
return resp, actualEndKey, nil
|
|
}
|
|
}
|
|
|
|
func (c *Client) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte, ttls []uint64, opts *rawOptions) error {
|
|
keyToValue := make(map[string][]byte, len(keys))
|
|
keyToTTL := make(map[string]uint64, len(keys))
|
|
for i, key := range keys {
|
|
keyToValue[string(key)] = values[i]
|
|
if len(ttls) > 0 {
|
|
keyToTTL[string(key)] = ttls[i]
|
|
}
|
|
}
|
|
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var batches []kvrpc.Batch
|
|
// split the keys by size and RegionVerID
|
|
for regionID, groupKeys := range groups {
|
|
batches = kvrpc.AppendBatches(batches, regionID, groupKeys, keyToValue, keyToTTL, rawBatchPutSize)
|
|
}
|
|
newBo, cancel := bo.Fork()
|
|
ch := make(chan error, len(batches))
|
|
var lastForkedBo atomic.Pointer[retry.Backoffer]
|
|
for _, batch := range batches {
|
|
batch1 := batch
|
|
go func() {
|
|
singleBatchBackoffer, singleBatchCancel := newBo.Fork()
|
|
defer singleBatchCancel()
|
|
e := c.doBatchPut(singleBatchBackoffer, batch1, opts)
|
|
lastForkedBo.Store(singleBatchBackoffer)
|
|
ch <- e
|
|
}()
|
|
}
|
|
|
|
for range batches {
|
|
if e := <-ch; e != nil {
|
|
// catch the first error
|
|
if err == nil {
|
|
err = errors.WithStack(e)
|
|
cancel()
|
|
}
|
|
}
|
|
}
|
|
bo.UpdateUsingForked(lastForkedBo.Load())
|
|
|
|
if err == nil {
|
|
cancel()
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (c *Client) doBatchPut(bo *retry.Backoffer, batch kvrpc.Batch, opts *rawOptions) error {
|
|
kvPair := make([]*kvrpcpb.KvPair, 0, len(batch.Keys))
|
|
for i, key := range batch.Keys {
|
|
kvPair = append(kvPair, &kvrpcpb.KvPair{Key: key, Value: batch.Values[i]})
|
|
}
|
|
|
|
var ttl uint64
|
|
if len(batch.TTLs) > 0 {
|
|
ttl = batch.TTLs[0]
|
|
}
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdRawBatchPut,
|
|
&kvrpcpb.RawBatchPutRequest{
|
|
Pairs: kvPair,
|
|
Cf: c.getColumnFamily(opts),
|
|
ForCas: c.atomic,
|
|
Ttls: batch.TTLs,
|
|
Ttl: ttl,
|
|
})
|
|
|
|
sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient, oracle.NoopReadTSValidator{})
|
|
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
|
|
req.ApiVersion = c.apiVersion
|
|
resp, _, err := sender.SendReq(bo, req, batch.RegionID, client.ReadTimeoutShort)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
regionErr, err := resp.GetRegionError()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if regionErr != nil {
|
|
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// recursive call
|
|
return c.sendBatchPut(bo, batch.Keys, batch.Values, batch.TTLs, opts)
|
|
}
|
|
|
|
if resp.Resp == nil {
|
|
return errors.WithStack(tikverr.ErrBodyMissing)
|
|
}
|
|
cmdResp := resp.Resp.(*kvrpcpb.RawBatchPutResponse)
|
|
if cmdResp.GetError() != "" {
|
|
return errors.New(cmdResp.GetError())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) getColumnFamily(options *rawOptions) string {
|
|
if options.ColumnFamily == "" {
|
|
return c.cf
|
|
}
|
|
return options.ColumnFamily
|
|
}
|
|
|
|
func (c *Client) getRawKVOptions(options ...RawOption) *rawOptions {
|
|
opts := rawOptions{}
|
|
for _, op := range options {
|
|
op.apply(&opts)
|
|
}
|
|
return &opts
|
|
}
|
|
|
|
// convertNilToEmptySlice is used to convert value of existed key return from TiKV.
|
|
// Convert nil to `[]byte{}` for indicating an empty value, and distinguishing from "not found",
|
|
// which is necessary when putting empty value is permitted.
|
|
// Also note that gRPC will always transfer empty byte slice as nil.
|
|
func convertNilToEmptySlice(value []byte) []byte {
|
|
if value == nil {
|
|
return []byte{}
|
|
}
|
|
return value
|
|
}
|