mirror of https://github.com/tikv/client-go.git
add code lint (#10)
* add code lint Signed-off-by: disksing <i@disksing.com>
This commit is contained in:
parent
ea8b88134e
commit
2130e26d4f
4
Makefile
4
Makefile
|
|
@ -3,3 +3,7 @@ default:
|
||||||
|
|
||||||
test:
|
test:
|
||||||
GO111MODULE=on go test ./...
|
GO111MODULE=on go test ./...
|
||||||
|
|
||||||
|
check:
|
||||||
|
GO111MODULE=off go get golang.org/x/lint/golint
|
||||||
|
GO111MODULE=on golint `go list ./...`
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
cli, err := rawkv.NewRawKVClient([]string{"127.0.0.1:2379"}, config.Security{})
|
cli, err := rawkv.NewClient([]string{"127.0.0.1:2379"}, config.Security{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ import (
|
||||||
"github.com/tikv/client-go/txnkv"
|
"github.com/tikv/client-go/txnkv"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// KV represents a Key-Value pair.
|
||||||
type KV struct {
|
type KV struct {
|
||||||
K, V []byte
|
K, V []byte
|
||||||
}
|
}
|
||||||
|
|
@ -33,14 +34,14 @@ func (kv KV) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
client *txnkv.TxnClient
|
client *txnkv.Client
|
||||||
pdAddr = flag.String("pd", "127.0.0.1:2379", "pd address")
|
pdAddr = flag.String("pd", "127.0.0.1:2379", "pd address")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Init initializes information.
|
// Init initializes information.
|
||||||
func initStore() {
|
func initStore() {
|
||||||
var err error
|
var err error
|
||||||
client, err = txnkv.NewTxnClient([]string{*pdAddr}, config.Security{})
|
client, err = txnkv.NewClient([]string{*pdAddr}, config.Security{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,11 +15,6 @@ package metrics
|
||||||
|
|
||||||
import "github.com/prometheus/client_golang/prometheus"
|
import "github.com/prometheus/client_golang/prometheus"
|
||||||
|
|
||||||
const (
|
|
||||||
LabelBatchRecvLoop = "batch-recv-loop"
|
|
||||||
LabelBatchSendLoop = "batch-send-loop"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Client metrics.
|
// Client metrics.
|
||||||
// TODO: Create new grafana page for the metrics.
|
// TODO: Create new grafana page for the metrics.
|
||||||
var (
|
var (
|
||||||
|
|
|
||||||
|
|
@ -1,18 +0,0 @@
|
||||||
// Copyright 2018 PingCAP, Inc.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package rawkv
|
|
||||||
|
|
||||||
import "github.com/pkg/errors"
|
|
||||||
|
|
||||||
var ErrBodyMissing = errors.New("response body is missing")
|
|
||||||
|
|
@ -33,17 +33,17 @@ var (
|
||||||
ErrMaxScanLimitExceeded = errors.New("limit should be less than MaxRawKVScanLimit")
|
ErrMaxScanLimitExceeded = errors.New("limit should be less than MaxRawKVScanLimit")
|
||||||
)
|
)
|
||||||
|
|
||||||
// RawKVClient is a client of TiKV server which is used as a key-value storage,
|
// Client is a rawkv client of TiKV server which is used as a key-value storage,
|
||||||
// only GET/PUT/DELETE commands are supported.
|
// only GET/PUT/DELETE commands are supported.
|
||||||
type RawKVClient struct {
|
type Client struct {
|
||||||
clusterID uint64
|
clusterID uint64
|
||||||
regionCache *locate.RegionCache
|
regionCache *locate.RegionCache
|
||||||
pdClient pd.Client
|
pdClient pd.Client
|
||||||
rpcClient rpc.Client
|
rpcClient rpc.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRawKVClient creates a client with PD cluster addrs.
|
// NewClient creates a client with PD cluster addrs.
|
||||||
func NewRawKVClient(pdAddrs []string, security config.Security) (*RawKVClient, error) {
|
func NewClient(pdAddrs []string, security config.Security) (*Client, error) {
|
||||||
pdCli, err := pd.NewClient(pdAddrs, pd.SecurityOption{
|
pdCli, err := pd.NewClient(pdAddrs, pd.SecurityOption{
|
||||||
CAPath: security.SSLCA,
|
CAPath: security.SSLCA,
|
||||||
CertPath: security.SSLCert,
|
CertPath: security.SSLCert,
|
||||||
|
|
@ -52,7 +52,7 @@ func NewRawKVClient(pdAddrs []string, security config.Security) (*RawKVClient, e
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &RawKVClient{
|
return &Client{
|
||||||
clusterID: pdCli.GetClusterID(context.TODO()),
|
clusterID: pdCli.GetClusterID(context.TODO()),
|
||||||
regionCache: locate.NewRegionCache(pdCli),
|
regionCache: locate.NewRegionCache(pdCli),
|
||||||
pdClient: pdCli,
|
pdClient: pdCli,
|
||||||
|
|
@ -61,18 +61,18 @@ func NewRawKVClient(pdAddrs []string, security config.Security) (*RawKVClient, e
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the client.
|
// Close closes the client.
|
||||||
func (c *RawKVClient) Close() error {
|
func (c *Client) Close() error {
|
||||||
c.pdClient.Close()
|
c.pdClient.Close()
|
||||||
return c.rpcClient.Close()
|
return c.rpcClient.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClusterID returns the TiKV cluster ID.
|
// ClusterID returns the TiKV cluster ID.
|
||||||
func (c *RawKVClient) ClusterID() uint64 {
|
func (c *Client) ClusterID() uint64 {
|
||||||
return c.clusterID
|
return c.clusterID
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get queries value with the key. When the key does not exist, it returns `nil, nil`.
|
// Get queries value with the key. When the key does not exist, it returns `nil, nil`.
|
||||||
func (c *RawKVClient) Get(key []byte) ([]byte, error) {
|
func (c *Client) Get(key []byte) ([]byte, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() { metrics.RawkvCmdHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }()
|
defer func() { metrics.RawkvCmdHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }()
|
||||||
|
|
||||||
|
|
@ -88,7 +88,7 @@ func (c *RawKVClient) Get(key []byte) ([]byte, error) {
|
||||||
}
|
}
|
||||||
cmdResp := resp.RawGet
|
cmdResp := resp.RawGet
|
||||||
if cmdResp == nil {
|
if cmdResp == nil {
|
||||||
return nil, errors.WithStack(ErrBodyMissing)
|
return nil, errors.WithStack(rpc.ErrBodyMissing)
|
||||||
}
|
}
|
||||||
if cmdResp.GetError() != "" {
|
if cmdResp.GetError() != "" {
|
||||||
return nil, errors.New(cmdResp.GetError())
|
return nil, errors.New(cmdResp.GetError())
|
||||||
|
|
@ -100,7 +100,7 @@ func (c *RawKVClient) Get(key []byte) ([]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// BatchGet queries values with the keys.
|
// BatchGet queries values with the keys.
|
||||||
func (c *RawKVClient) BatchGet(keys [][]byte) ([][]byte, error) {
|
func (c *Client) BatchGet(keys [][]byte) ([][]byte, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
metrics.RawkvCmdHistogram.WithLabelValues("batch_get").Observe(time.Since(start).Seconds())
|
metrics.RawkvCmdHistogram.WithLabelValues("batch_get").Observe(time.Since(start).Seconds())
|
||||||
|
|
@ -114,7 +114,7 @@ func (c *RawKVClient) BatchGet(keys [][]byte) ([][]byte, error) {
|
||||||
|
|
||||||
cmdResp := resp.RawBatchGet
|
cmdResp := resp.RawBatchGet
|
||||||
if cmdResp == nil {
|
if cmdResp == nil {
|
||||||
return nil, errors.WithStack(ErrBodyMissing)
|
return nil, errors.WithStack(rpc.ErrBodyMissing)
|
||||||
}
|
}
|
||||||
|
|
||||||
keyToValue := make(map[string][]byte, len(keys))
|
keyToValue := make(map[string][]byte, len(keys))
|
||||||
|
|
@ -130,7 +130,7 @@ func (c *RawKVClient) BatchGet(keys [][]byte) ([][]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put stores a key-value pair to TiKV.
|
// Put stores a key-value pair to TiKV.
|
||||||
func (c *RawKVClient) Put(key, value []byte) error {
|
func (c *Client) Put(key, value []byte) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() { metrics.RawkvCmdHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds()) }()
|
defer func() { metrics.RawkvCmdHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds()) }()
|
||||||
metrics.RawkvSizeHistogram.WithLabelValues("key").Observe(float64(len(key)))
|
metrics.RawkvSizeHistogram.WithLabelValues("key").Observe(float64(len(key)))
|
||||||
|
|
@ -153,7 +153,7 @@ func (c *RawKVClient) Put(key, value []byte) error {
|
||||||
}
|
}
|
||||||
cmdResp := resp.RawPut
|
cmdResp := resp.RawPut
|
||||||
if cmdResp == nil {
|
if cmdResp == nil {
|
||||||
return errors.WithStack(ErrBodyMissing)
|
return errors.WithStack(rpc.ErrBodyMissing)
|
||||||
}
|
}
|
||||||
if cmdResp.GetError() != "" {
|
if cmdResp.GetError() != "" {
|
||||||
return errors.New(cmdResp.GetError())
|
return errors.New(cmdResp.GetError())
|
||||||
|
|
@ -162,7 +162,7 @@ func (c *RawKVClient) Put(key, value []byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// BatchPut stores key-value pairs to TiKV.
|
// BatchPut stores key-value pairs to TiKV.
|
||||||
func (c *RawKVClient) BatchPut(keys, values [][]byte) error {
|
func (c *Client) BatchPut(keys, values [][]byte) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
metrics.RawkvCmdHistogram.WithLabelValues("batch_put").Observe(time.Since(start).Seconds())
|
metrics.RawkvCmdHistogram.WithLabelValues("batch_put").Observe(time.Since(start).Seconds())
|
||||||
|
|
@ -181,7 +181,7 @@ func (c *RawKVClient) BatchPut(keys, values [][]byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete deletes a key-value pair from TiKV.
|
// Delete deletes a key-value pair from TiKV.
|
||||||
func (c *RawKVClient) Delete(key []byte) error {
|
func (c *Client) Delete(key []byte) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() { metrics.RawkvCmdHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds()) }()
|
defer func() { metrics.RawkvCmdHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds()) }()
|
||||||
|
|
||||||
|
|
@ -197,7 +197,7 @@ func (c *RawKVClient) Delete(key []byte) error {
|
||||||
}
|
}
|
||||||
cmdResp := resp.RawDelete
|
cmdResp := resp.RawDelete
|
||||||
if cmdResp == nil {
|
if cmdResp == nil {
|
||||||
return errors.WithStack(ErrBodyMissing)
|
return errors.WithStack(rpc.ErrBodyMissing)
|
||||||
}
|
}
|
||||||
if cmdResp.GetError() != "" {
|
if cmdResp.GetError() != "" {
|
||||||
return errors.New(cmdResp.GetError())
|
return errors.New(cmdResp.GetError())
|
||||||
|
|
@ -206,7 +206,7 @@ func (c *RawKVClient) Delete(key []byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// BatchDelete deletes key-value pairs from TiKV
|
// BatchDelete deletes key-value pairs from TiKV
|
||||||
func (c *RawKVClient) BatchDelete(keys [][]byte) error {
|
func (c *Client) BatchDelete(keys [][]byte) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
metrics.RawkvCmdHistogram.WithLabelValues("batch_delete").Observe(time.Since(start).Seconds())
|
metrics.RawkvCmdHistogram.WithLabelValues("batch_delete").Observe(time.Since(start).Seconds())
|
||||||
|
|
@ -219,7 +219,7 @@ func (c *RawKVClient) BatchDelete(keys [][]byte) error {
|
||||||
}
|
}
|
||||||
cmdResp := resp.RawBatchDelete
|
cmdResp := resp.RawBatchDelete
|
||||||
if cmdResp == nil {
|
if cmdResp == nil {
|
||||||
return errors.WithStack(ErrBodyMissing)
|
return errors.WithStack(rpc.ErrBodyMissing)
|
||||||
}
|
}
|
||||||
if cmdResp.GetError() != "" {
|
if cmdResp.GetError() != "" {
|
||||||
return errors.New(cmdResp.GetError())
|
return errors.New(cmdResp.GetError())
|
||||||
|
|
@ -228,7 +228,7 @@ func (c *RawKVClient) BatchDelete(keys [][]byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteRange deletes all key-value pairs in a range from TiKV
|
// DeleteRange deletes all key-value pairs in a range from TiKV
|
||||||
func (c *RawKVClient) DeleteRange(startKey []byte, endKey []byte) error {
|
func (c *Client) DeleteRange(startKey []byte, endKey []byte) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
var err error
|
var err error
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
@ -249,7 +249,7 @@ func (c *RawKVClient) DeleteRange(startKey []byte, endKey []byte) error {
|
||||||
}
|
}
|
||||||
cmdResp := resp.RawDeleteRange
|
cmdResp := resp.RawDeleteRange
|
||||||
if cmdResp == nil {
|
if cmdResp == nil {
|
||||||
return errors.WithStack(ErrBodyMissing)
|
return errors.WithStack(rpc.ErrBodyMissing)
|
||||||
}
|
}
|
||||||
if cmdResp.GetError() != "" {
|
if cmdResp.GetError() != "" {
|
||||||
return errors.New(cmdResp.GetError())
|
return errors.New(cmdResp.GetError())
|
||||||
|
|
@ -265,7 +265,7 @@ func (c *RawKVClient) DeleteRange(startKey []byte, endKey []byte) error {
|
||||||
// If you want to exclude the startKey or include the endKey, append a '\0' to the key. For example, to scan
|
// If you want to exclude the startKey or include the endKey, append a '\0' to the key. For example, to scan
|
||||||
// (startKey, endKey], you can write:
|
// (startKey, endKey], you can write:
|
||||||
// `Scan(append(startKey, '\0'), append(endKey, '\0'), limit)`.
|
// `Scan(append(startKey, '\0'), append(endKey, '\0'), limit)`.
|
||||||
func (c *RawKVClient) Scan(startKey, endKey []byte, limit int) (keys [][]byte, values [][]byte, err error) {
|
func (c *Client) Scan(startKey, endKey []byte, limit int) (keys [][]byte, values [][]byte, err error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() { metrics.RawkvCmdHistogram.WithLabelValues("raw_scan").Observe(time.Since(start).Seconds()) }()
|
defer func() { metrics.RawkvCmdHistogram.WithLabelValues("raw_scan").Observe(time.Since(start).Seconds()) }()
|
||||||
|
|
||||||
|
|
@ -288,7 +288,7 @@ func (c *RawKVClient) Scan(startKey, endKey []byte, limit int) (keys [][]byte, v
|
||||||
}
|
}
|
||||||
cmdResp := resp.RawScan
|
cmdResp := resp.RawScan
|
||||||
if cmdResp == nil {
|
if cmdResp == nil {
|
||||||
return nil, nil, errors.WithStack(ErrBodyMissing)
|
return nil, nil, errors.WithStack(rpc.ErrBodyMissing)
|
||||||
}
|
}
|
||||||
for _, pair := range cmdResp.Kvs {
|
for _, pair := range cmdResp.Kvs {
|
||||||
keys = append(keys, pair.Key)
|
keys = append(keys, pair.Key)
|
||||||
|
|
@ -302,7 +302,7 @@ func (c *RawKVClient) Scan(startKey, endKey []byte, limit int) (keys [][]byte, v
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RawKVClient) sendReq(key []byte, req *rpc.Request) (*rpc.Response, *locate.KeyLocation, error) {
|
func (c *Client) sendReq(key []byte, req *rpc.Request) (*rpc.Response, *locate.KeyLocation, error) {
|
||||||
bo := retry.NewBackoffer(context.Background(), retry.RawkvMaxBackoff)
|
bo := retry.NewBackoffer(context.Background(), retry.RawkvMaxBackoff)
|
||||||
sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient)
|
sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient)
|
||||||
for {
|
for {
|
||||||
|
|
@ -329,7 +329,7 @@ func (c *RawKVClient) sendReq(key []byte, req *rpc.Request) (*rpc.Response, *loc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RawKVClient) sendBatchReq(bo *retry.Backoffer, keys [][]byte, cmdType rpc.CmdType) (*rpc.Response, error) { // split the keys
|
func (c *Client) sendBatchReq(bo *retry.Backoffer, keys [][]byte, cmdType rpc.CmdType) (*rpc.Response, error) { // split the keys
|
||||||
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys)
|
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -376,7 +376,7 @@ func (c *RawKVClient) sendBatchReq(bo *retry.Backoffer, keys [][]byte, cmdType r
|
||||||
return resp, firstError
|
return resp, firstError
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RawKVClient) doBatchReq(bo *retry.Backoffer, batch batch, cmdType rpc.CmdType) singleBatchResp {
|
func (c *Client) doBatchReq(bo *retry.Backoffer, batch batch, cmdType rpc.CmdType) singleBatchResp {
|
||||||
var req *rpc.Request
|
var req *rpc.Request
|
||||||
switch cmdType {
|
switch cmdType {
|
||||||
case rpc.CmdRawBatchGet:
|
case rpc.CmdRawBatchGet:
|
||||||
|
|
@ -426,7 +426,7 @@ func (c *RawKVClient) doBatchReq(bo *retry.Backoffer, batch batch, cmdType rpc.C
|
||||||
case rpc.CmdRawBatchDelete:
|
case rpc.CmdRawBatchDelete:
|
||||||
cmdResp := resp.RawBatchDelete
|
cmdResp := resp.RawBatchDelete
|
||||||
if cmdResp == nil {
|
if cmdResp == nil {
|
||||||
batchResp.err = errors.WithStack(ErrBodyMissing)
|
batchResp.err = errors.WithStack(rpc.ErrBodyMissing)
|
||||||
return batchResp
|
return batchResp
|
||||||
}
|
}
|
||||||
if cmdResp.GetError() != "" {
|
if cmdResp.GetError() != "" {
|
||||||
|
|
@ -442,7 +442,7 @@ func (c *RawKVClient) doBatchReq(bo *retry.Backoffer, batch batch, cmdType rpc.C
|
||||||
// If the given range spans over more than one regions, the actual endKey is the end of the first region.
|
// If the given range spans over more than one regions, the actual endKey is the end of the first region.
|
||||||
// We can't use sendReq directly, because we need to know the end of the region before we send the request
|
// We can't use sendReq directly, because we need to know the end of the region before we send the request
|
||||||
// TODO: Is there any better way to avoid duplicating code with func `sendReq` ?
|
// TODO: Is there any better way to avoid duplicating code with func `sendReq` ?
|
||||||
func (c *RawKVClient) sendDeleteRangeReq(startKey []byte, endKey []byte) (*rpc.Response, []byte, error) {
|
func (c *Client) sendDeleteRangeReq(startKey []byte, endKey []byte) (*rpc.Response, []byte, error) {
|
||||||
bo := retry.NewBackoffer(context.Background(), retry.RawkvMaxBackoff)
|
bo := retry.NewBackoffer(context.Background(), retry.RawkvMaxBackoff)
|
||||||
sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient)
|
sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient)
|
||||||
for {
|
for {
|
||||||
|
|
@ -483,7 +483,7 @@ func (c *RawKVClient) sendDeleteRangeReq(startKey []byte, endKey []byte) (*rpc.R
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RawKVClient) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte) error {
|
func (c *Client) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte) error {
|
||||||
keyToValue := make(map[string][]byte)
|
keyToValue := make(map[string][]byte)
|
||||||
for i, key := range keys {
|
for i, key := range keys {
|
||||||
keyToValue[string(key)] = values[i]
|
keyToValue[string(key)] = values[i]
|
||||||
|
|
@ -560,7 +560,7 @@ func appendBatches(batches []batch, regionID locate.RegionVerID, groupKeys [][]b
|
||||||
return batches
|
return batches
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RawKVClient) doBatchPut(bo *retry.Backoffer, batch batch) error {
|
func (c *Client) doBatchPut(bo *retry.Backoffer, batch batch) error {
|
||||||
kvPair := make([]*kvrpcpb.KvPair, 0, len(batch.keys))
|
kvPair := make([]*kvrpcpb.KvPair, 0, len(batch.keys))
|
||||||
for i, key := range batch.keys {
|
for i, key := range batch.keys {
|
||||||
kvPair = append(kvPair, &kvrpcpb.KvPair{Key: key, Value: batch.values[i]})
|
kvPair = append(kvPair, &kvrpcpb.KvPair{Key: key, Value: batch.values[i]})
|
||||||
|
|
@ -593,7 +593,7 @@ func (c *RawKVClient) doBatchPut(bo *retry.Backoffer, batch batch) error {
|
||||||
|
|
||||||
cmdResp := resp.RawBatchPut
|
cmdResp := resp.RawBatchPut
|
||||||
if cmdResp == nil {
|
if cmdResp == nil {
|
||||||
return errors.WithStack(ErrBodyMissing)
|
return errors.WithStack(rpc.ErrBodyMissing)
|
||||||
}
|
}
|
||||||
if cmdResp.GetError() != "" {
|
if cmdResp.GetError() != "" {
|
||||||
return errors.New(cmdResp.GetError())
|
return errors.New(cmdResp.GetError())
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,7 @@ func TestT(t *testing.T) {
|
||||||
|
|
||||||
type testRawKVSuite struct {
|
type testRawKVSuite struct {
|
||||||
cluster *mocktikv.Cluster
|
cluster *mocktikv.Cluster
|
||||||
client *RawKVClient
|
client *Client
|
||||||
bo *retry.Backoffer
|
bo *retry.Backoffer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -43,7 +43,7 @@ func (s *testRawKVSuite) SetUpTest(c *C) {
|
||||||
mocktikv.BootstrapWithSingleStore(s.cluster)
|
mocktikv.BootstrapWithSingleStore(s.cluster)
|
||||||
pdClient := mocktikv.NewPDClient(s.cluster)
|
pdClient := mocktikv.NewPDClient(s.cluster)
|
||||||
mvccStore := mocktikv.MustNewMVCCStore()
|
mvccStore := mocktikv.MustNewMVCCStore()
|
||||||
s.client = &RawKVClient{
|
s.client = &Client{
|
||||||
clusterID: 0,
|
clusterID: 0,
|
||||||
regionCache: locate.NewRegionCache(pdClient),
|
regionCache: locate.NewRegionCache(pdClient),
|
||||||
pdClient: pdClient,
|
pdClient: pdClient,
|
||||||
|
|
|
||||||
|
|
@ -196,7 +196,7 @@ func (b *Backoffer) Backoff(typ BackoffType, err error) error {
|
||||||
b.totalSleep += f(b.ctx)
|
b.totalSleep += f(b.ctx)
|
||||||
b.types = append(b.types, typ)
|
b.types = append(b.types, typ)
|
||||||
|
|
||||||
var startTs interface{} = ""
|
var startTs interface{}
|
||||||
if ts := b.ctx.Value(txnStartKey); ts != nil {
|
if ts := b.ctx.Value(txnStartKey); ts != nil {
|
||||||
startTs = ts
|
startTs = ts
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,25 +21,29 @@ import (
|
||||||
"github.com/tikv/client-go/txnkv/store"
|
"github.com/tikv/client-go/txnkv/store"
|
||||||
)
|
)
|
||||||
|
|
||||||
type TxnClient struct {
|
// Client is a transactional client of TiKV server.
|
||||||
|
type Client struct {
|
||||||
tikvStore *store.TiKVStore
|
tikvStore *store.TiKVStore
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTxnClient(pdAddrs []string, security config.Security) (*TxnClient, error) {
|
// NewClient creates a client with PD addresses.
|
||||||
|
func NewClient(pdAddrs []string, security config.Security) (*Client, error) {
|
||||||
tikvStore, err := store.NewStore(pdAddrs, security)
|
tikvStore, err := store.NewStore(pdAddrs, security)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &TxnClient{
|
return &Client{
|
||||||
tikvStore: tikvStore,
|
tikvStore: tikvStore,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TxnClient) Close() error {
|
// Close stop the client.
|
||||||
|
func (c *Client) Close() error {
|
||||||
return c.tikvStore.Close()
|
return c.tikvStore.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TxnClient) Begin() (*Transaction, error) {
|
// Begin creates a transaction for read/write.
|
||||||
|
func (c *Client) Begin() (*Transaction, error) {
|
||||||
ts, err := c.GetTS()
|
ts, err := c.GetTS()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -47,10 +51,12 @@ func (c *TxnClient) Begin() (*Transaction, error) {
|
||||||
return c.BeginWithTS(ts), nil
|
return c.BeginWithTS(ts), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TxnClient) BeginWithTS(ts uint64) *Transaction {
|
// BeginWithTS creates a transaction which is normally readonly.
|
||||||
|
func (c *Client) BeginWithTS(ts uint64) *Transaction {
|
||||||
return newTransaction(c.tikvStore, ts)
|
return newTransaction(c.tikvStore, ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TxnClient) GetTS() (uint64, error) {
|
// GetTS returns a latest timestamp.
|
||||||
|
func (c *Client) GetTS() (uint64, error) {
|
||||||
return c.tikvStore.GetTimestampWithRetry(retry.NewBackoffer(context.TODO(), retry.TsoMaxBackoff))
|
return c.tikvStore.GetTimestampWithRetry(retry.NewBackoffer(context.TODO(), retry.TsoMaxBackoff))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -40,7 +40,7 @@ type Retriever interface {
|
||||||
// Iter creates an Iterator positioned on the first entry that k <= entry's key.
|
// Iter creates an Iterator positioned on the first entry that k <= entry's key.
|
||||||
// If such entry is not found, it returns an invalid Iterator with no error.
|
// If such entry is not found, it returns an invalid Iterator with no error.
|
||||||
// It yields only keys that < upperBound. If upperBound is nil, it means the upperBound is unbounded.
|
// It yields only keys that < upperBound. If upperBound is nil, it means the upperBound is unbounded.
|
||||||
// The Iterator must be Closed after use.
|
// The Iterator must be closed after use.
|
||||||
Iter(k key.Key, upperBound key.Key) (Iterator, error)
|
Iter(k key.Key, upperBound key.Key) (Iterator, error)
|
||||||
|
|
||||||
// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
|
// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
|
||||||
|
|
|
||||||
19
txnkv/txn.go
19
txnkv/txn.go
|
|
@ -27,6 +27,7 @@ import (
|
||||||
"github.com/tikv/client-go/txnkv/store"
|
"github.com/tikv/client-go/txnkv/store"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Transaction is a key-value transaction.
|
||||||
type Transaction struct {
|
type Transaction struct {
|
||||||
tikvStore *store.TiKVStore
|
tikvStore *store.TiKVStore
|
||||||
snapshot *store.TiKVSnapshot
|
snapshot *store.TiKVSnapshot
|
||||||
|
|
@ -77,6 +78,7 @@ func (txn *Transaction) Get(k key.Key) ([]byte, error) {
|
||||||
return ret, nil
|
return ret, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BatchGet gets a batch of values from TiKV server.
|
||||||
func (txn *Transaction) BatchGet(keys []key.Key) (map[string][]byte, error) {
|
func (txn *Transaction) BatchGet(keys []key.Key) (map[string][]byte, error) {
|
||||||
if txn.IsReadOnly() {
|
if txn.IsReadOnly() {
|
||||||
return txn.snapshot.BatchGet(keys)
|
return txn.snapshot.BatchGet(keys)
|
||||||
|
|
@ -109,6 +111,7 @@ func (txn *Transaction) BatchGet(keys []key.Key) (map[string][]byte, error) {
|
||||||
return storageValues, nil
|
return storageValues, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set sets the value for key k as v into kv store.
|
||||||
func (txn *Transaction) Set(k key.Key, v []byte) error {
|
func (txn *Transaction) Set(k key.Key, v []byte) error {
|
||||||
txn.setCnt++
|
txn.setCnt++
|
||||||
return txn.us.Set(k, v)
|
return txn.us.Set(k, v)
|
||||||
|
|
@ -118,6 +121,10 @@ func (txn *Transaction) String() string {
|
||||||
return fmt.Sprintf("txn-%d", txn.startTS)
|
return fmt.Sprintf("txn-%d", txn.startTS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Iter creates an Iterator positioned on the first entry that k <= entry's key.
|
||||||
|
// If such entry is not found, it returns an invalid Iterator with no error.
|
||||||
|
// It yields only keys that < upperBound. If upperBound is nil, it means the upperBound is unbounded.
|
||||||
|
// The Iterator must be closed after use.
|
||||||
func (txn *Transaction) Iter(k key.Key, upperBound key.Key) (kv.Iterator, error) {
|
func (txn *Transaction) Iter(k key.Key, upperBound key.Key) (kv.Iterator, error) {
|
||||||
metrics.TxnCmdCounter.WithLabelValues("seek").Inc()
|
metrics.TxnCmdCounter.WithLabelValues("seek").Inc()
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
@ -137,15 +144,19 @@ func (txn *Transaction) IterReverse(k key.Key) (kv.Iterator, error) {
|
||||||
return txn.us.IterReverse(k)
|
return txn.us.IterReverse(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsReadOnly returns if there are pending key-value to commit in the transaction.
|
||||||
func (txn *Transaction) IsReadOnly() bool {
|
func (txn *Transaction) IsReadOnly() bool {
|
||||||
return txn.us.GetMemBuffer().Len() == 0 && len(txn.lockKeys) == 0
|
return txn.us.GetMemBuffer().Len() == 0 && len(txn.lockKeys) == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Delete removes the entry for key k from kv store.
|
||||||
func (txn *Transaction) Delete(k key.Key) error {
|
func (txn *Transaction) Delete(k key.Key) error {
|
||||||
metrics.TxnCmdCounter.WithLabelValues("delete").Inc()
|
metrics.TxnCmdCounter.WithLabelValues("delete").Inc()
|
||||||
return txn.us.Delete(k)
|
return txn.us.Delete(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetOption sets an option with a value, when val is nil, uses the default
|
||||||
|
// value of this option.
|
||||||
func (txn *Transaction) SetOption(opt kv.Option, val interface{}) {
|
func (txn *Transaction) SetOption(opt kv.Option, val interface{}) {
|
||||||
txn.us.SetOption(opt, val)
|
txn.us.SetOption(opt, val)
|
||||||
switch opt {
|
switch opt {
|
||||||
|
|
@ -160,6 +171,7 @@ func (txn *Transaction) SetOption(opt kv.Option, val interface{}) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DelOption deletes an option.
|
||||||
func (txn *Transaction) DelOption(opt kv.Option) {
|
func (txn *Transaction) DelOption(opt kv.Option) {
|
||||||
txn.us.DelOption(opt)
|
txn.us.DelOption(opt)
|
||||||
}
|
}
|
||||||
|
|
@ -168,6 +180,7 @@ func (txn *Transaction) close() {
|
||||||
txn.valid = false
|
txn.valid = false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Commit commits the transaction operations to KV store.
|
||||||
func (txn *Transaction) Commit(ctx context.Context) error {
|
func (txn *Transaction) Commit(ctx context.Context) error {
|
||||||
if !txn.valid {
|
if !txn.valid {
|
||||||
return kv.ErrInvalidTxn
|
return kv.ErrInvalidTxn
|
||||||
|
|
@ -246,6 +259,7 @@ func (txn *Transaction) Commit(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Rollback undoes the transaction operations to KV store.
|
||||||
func (txn *Transaction) Rollback() error {
|
func (txn *Transaction) Rollback() error {
|
||||||
if !txn.valid {
|
if !txn.valid {
|
||||||
return kv.ErrInvalidTxn
|
return kv.ErrInvalidTxn
|
||||||
|
|
@ -257,6 +271,7 @@ func (txn *Transaction) Rollback() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LockKeys tries to lock the entries with the keys in KV store.
|
||||||
func (txn *Transaction) LockKeys(keys ...key.Key) error {
|
func (txn *Transaction) LockKeys(keys ...key.Key) error {
|
||||||
metrics.TxnCmdCounter.WithLabelValues("lock_keys").Inc()
|
metrics.TxnCmdCounter.WithLabelValues("lock_keys").Inc()
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
|
|
@ -265,14 +280,18 @@ func (txn *Transaction) LockKeys(keys ...key.Key) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Valid returns if the transaction is valid.
|
||||||
|
// A transaction becomes invalid after commit or rollback.
|
||||||
func (txn *Transaction) Valid() bool {
|
func (txn *Transaction) Valid() bool {
|
||||||
return txn.valid
|
return txn.valid
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Len returns the count of key-value pairs in the transaction's memory buffer.
|
||||||
func (txn *Transaction) Len() int {
|
func (txn *Transaction) Len() int {
|
||||||
return txn.us.Len()
|
return txn.us.Len()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Size returns the length (in bytes) of the transaction's memory buffer.
|
||||||
func (txn *Transaction) Size() int {
|
func (txn *Transaction) Size() int {
|
||||||
return txn.us.Size()
|
return txn.us.Size()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue