Introduce API v2 (#494)

* introduce raw.Client with api v2

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* migrate rawkv test to api v2

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* git fire: refactoring the interface SplitRegion of CodecClientV2

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* go mod tidy

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* fix split region issue in CodecClientV2

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* rearrange imports

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* add version info for RegionCache

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* delay request encoding to RPCClient

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* prevent multiple encoding

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* wip: debugging retry logic

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* fix EpochNotMatch region decoding

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* remove txn request branch

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* copy request before encoding it

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* add gRPCDialOptions for rawkv.Client

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* fix api breaking change of raw.NewClient

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* address menglong's comments

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* go mod tidy

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* address comments and fix linter

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* fix api_test

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* fix TestScan

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* fix TestScan

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

* fix compatibility

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>

Co-authored-by: Xiaoguang Sun <sunxiaoguang@users.noreply.github.com>
This commit is contained in:
iosmanthus 2022-07-09 14:49:46 +08:00 committed by GitHub
parent 630d55eb12
commit 68cd1bc6c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 916 additions and 223 deletions

View File

@ -3,6 +3,7 @@ module integration_tests
go 1.16
require (
github.com/klauspost/compress v1.15.4 // indirect
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
@ -11,6 +12,7 @@ require (
github.com/pingcap/tidb/parser v0.0.0-20220706093502-562b03368993 // indirect
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.2-0.20220504104629-106ec21d14df
github.com/tidwall/gjson v1.14.1
github.com/tikv/client-go/v2 v2.0.1-0.20220627063500-947d923945fd
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710
go.uber.org/goleak v1.1.12

View File

@ -660,8 +660,9 @@ github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYs
github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.1 h1:y9FcTHGyrebwfP0ZZqFiaxTaiDnUrGkJkI+f583BL1A=
github.com/klauspost/compress v1.15.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.4 h1:1kn4/7MepF/CHmYub99/nNX8az0IJjfSOU/jbnTVfqQ=
github.com/klauspost/compress v1.15.4/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s=
github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4=
@ -1045,6 +1046,12 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:
github.com/tetafro/godot v1.4.11/go.mod h1:LR3CJpxDVGlYOWn3ZZg1PgNZdTUvzsZWu8xaEohUpn8=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tidwall/gjson v1.14.1 h1:iymTbGkQBhveq21bEvAQ81I0LEBork8BFe1CUZXdyuo=
github.com/tidwall/gjson v1.14.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710 h1:jxgmKOscXSjaFEKQGRyY5qOpK8hLqxs2irb/uDJMtwk=
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710/go.mod h1:AtvppPwkiyUgQlR1W9qSqfTB+OsOIu19jDCOxOsPkmU=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144/go.mod h1:Qimiffbc6q9tBWlVV6x0P9sat/ao1xEkREYPPj9hphk=

View File

@ -0,0 +1,390 @@
package raw_tikv_test
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"strings"
"testing"
"time"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/stretchr/testify/suite"
"github.com/tidwall/gjson"
"github.com/tikv/client-go/v2/rawkv"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
)
func TestApi(t *testing.T) {
if !*withTiKV {
t.Skip("skipping TestApi because with-tikv is not enabled")
}
suite.Run(t, new(apiTestSuite))
}
type apiTestSuite struct {
suite.Suite
client *rawkv.Client
clientForCas *rawkv.Client
pdClient pd.Client
}
func getConfig(url string) (string, error) {
transport := &http.Transport{}
client := http.Client{
Transport: transport,
}
defer transport.CloseIdleConnections()
resp, err := client.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
return string(body), nil
}
func (s *apiTestSuite) getApiVersion(pdCli pd.Client) kvrpcpb.APIVersion {
stores, err := pdCli.GetAllStores(context.Background())
s.Nilf(err, "fail to get store, %v", err)
for _, store := range stores {
resp, err := getConfig(fmt.Sprintf("http://%s/config", store.StatusAddress))
s.Nilf(err, "fail to get config of TiKV store %s: %v", store.StatusAddress, err)
v := gjson.Get(resp, "storage.api-version")
if v.Type == gjson.Null || v.Uint() != 2 {
return kvrpcpb.APIVersion_V1
}
}
return kvrpcpb.APIVersion_V2
}
func (s *apiTestSuite) newRawKVClient(pdCli pd.Client, addrs []string) *rawkv.Client {
version := s.getApiVersion(pdCli)
cli, err := rawkv.NewClientWithOpts(context.Background(), addrs, rawkv.WithAPIVersion(version))
s.Nil(err)
return cli
}
func (s *apiTestSuite) wrapPDClient(pdCli pd.Client, addrs []string) pd.Client {
if s.getApiVersion(pdCli) == kvrpcpb.APIVersion_V2 {
return tikv.NewCodecPDClientV2(pdCli, tikv.ModeRaw)
}
return pdCli
}
func (s *apiTestSuite) SetupTest() {
addrs := strings.Split(*pdAddrs, ",")
pdClient, err := pd.NewClient(addrs, pd.SecurityOption{})
s.Nil(err)
s.pdClient = s.wrapPDClient(pdClient, addrs)
client := s.newRawKVClient(pdClient, addrs)
s.client = client
clientForCas := s.newRawKVClient(pdClient, addrs)
clientForCas.SetAtomicForCAS(true)
s.clientForCas = clientForCas
}
func withPrefix(prefix, key string) string {
return prefix + key
}
func withPrefixes(prefix string, keys []string) []string {
var result []string
for i := range keys {
result = append(result, withPrefix(prefix, keys[i]))
}
return result
}
func (s *apiTestSuite) cleanKeyPrefix(prefix string) {
end := append([]byte(prefix), 127)
err := s.client.DeleteRange(context.Background(), []byte(prefix), end)
s.Nil(err)
ks, _ := s.mustScan(prefix, "", "", 10240)
s.Empty(ks)
}
func (s *apiTestSuite) mustPut(prefix string, key string, value string) {
err := s.client.Put(context.Background(), []byte(withPrefix(prefix, key)), []byte(value))
s.Nil(err)
}
func (s *apiTestSuite) mustGet(prefix string, key string) string {
v, err := s.client.Get(context.Background(), []byte(withPrefix(prefix, key)))
s.Nil(err)
return string(v)
}
func (s *apiTestSuite) mustDelete(prefix string, key string) {
err := s.client.Delete(context.Background(), []byte(withPrefix(prefix, key)))
s.Nil(err)
}
func (s *apiTestSuite) mustScan(prefix string, start string, end string, limit int) ([]string, []string) {
end = withPrefix(prefix, end)
end += string([]byte{127})
ks, vs, err := s.client.Scan(context.Background(), []byte(withPrefix(prefix, start)), []byte(end), limit)
s.Nil(err)
for i := range ks {
ks[i] = bytes.TrimPrefix(ks[i], []byte(prefix))
}
return toStrings(ks), toStrings(vs)
}
func (s *apiTestSuite) mustReverseScan(prefix string, start string, end string, limit int) ([]string, []string) {
ks, vs, err := s.client.ReverseScan(context.Background(), []byte(withPrefix(prefix, start)), []byte(withPrefix(prefix, end)), limit)
s.Nil(err)
return toStrings(ks), toStrings(vs)
}
func (s *apiTestSuite) mustDeleteRange(prefix string, start, end string) {
if end == "" {
end = prefix
}
end += string([]byte{127})
err := s.client.DeleteRange(context.Background(), []byte(withPrefix(prefix, start)), []byte(withPrefix(prefix, end)))
s.Nil(err)
}
func toStrings(data [][]byte) []string {
var ss []string
for _, b := range data {
ss = append(ss, string(b))
}
return ss
}
func toBytes(data []string) [][]byte {
var bs [][]byte
for _, s := range data {
bs = append(bs, []byte(s))
}
return bs
}
func (s *apiTestSuite) mustBatchPut(prefix string, keys []string, values []string) {
s.Equal(len(keys), len(values))
keys = withPrefixes(prefix, keys)
err := s.client.BatchPut(context.Background(), toBytes(keys), toBytes(values))
s.Nil(err)
}
func (s *apiTestSuite) mustBatchGet(prefix string, keys []string) []string {
keys = withPrefixes(prefix, keys)
vs, err := s.client.BatchGet(context.Background(), toBytes(keys))
s.Nil(err)
return toStrings(vs)
}
func (s *apiTestSuite) mustBatchDelete(prefix string, keys []string) {
keys = withPrefixes(prefix, keys)
err := s.client.BatchDelete(context.Background(), toBytes(keys))
s.Nil(err)
}
func (s *apiTestSuite) mustCAS(prefix, key, old, new string) (bool, string) {
var oldValue []byte
if old != "" {
oldValue = []byte(old)
}
oldValue, success, err := s.clientForCas.CompareAndSwap(context.Background(), []byte(withPrefix(prefix, key)), oldValue, []byte(new))
s.Nil(err)
return success, string(oldValue)
}
func (s *apiTestSuite) mustPutWithTTL(prefix, key, value string, ttl uint64) {
err := s.client.PutWithTTL(context.Background(), []byte(withPrefix(prefix, key)), []byte(value), ttl)
s.Nil(err)
}
func (s *apiTestSuite) mustGetKeyTTL(prefix, key string) *uint64 {
ttl, err := s.client.GetKeyTTL(context.Background(), []byte(withPrefix(prefix, key)))
s.Nil(err)
return ttl
}
func (s *apiTestSuite) mustNotExist(prefix string, key string) {
v := s.mustGet(prefix, key)
s.Empty(v)
}
func (s *apiTestSuite) mustSplitRegion(prefix string, splitKeys []string) {
var keys [][]byte
for i := range splitKeys {
keys = append(keys, []byte(withPrefix(prefix, splitKeys[i])))
}
_, err := s.pdClient.SplitRegions(context.Background(), keys)
if err != nil {
s.T().Fatalf("failed to split regions: %v", err)
}
s.Nil(err)
}
func (s *apiTestSuite) TestSimple() {
prefix := "test_simple"
s.cleanKeyPrefix(prefix)
s.mustNotExist(prefix, "key")
s.mustPut(prefix, "key", "value")
v := s.mustGet(prefix, "key")
s.Equal("value", v)
s.mustDelete(prefix, "key")
s.mustNotExist(prefix, "key")
}
func (s *apiTestSuite) TestScan() {
prefix := "test_scan"
s.cleanKeyPrefix(prefix)
var (
keys []string
values []string
)
for i := 0; i < 20480; i++ {
keys = append(keys, fmt.Sprintf("key@%v", i))
values = append(values, fmt.Sprintf("value@%v", i))
}
s.mustBatchPut(prefix, keys, values)
var splitKeys []string
for i := 0; i < 20480; i += 1024 {
splitKeys = append(splitKeys, fmt.Sprintf("key@%v", i))
}
s.mustSplitRegion(prefix, splitKeys)
keys, values = s.mustScan(prefix, keys[0], "", 10240)
s.Equal(10240, len(keys))
s.Equal(10240, len(values))
s.Equal(len(keys), len(values))
for i := range keys {
s.True(strings.HasPrefix(keys[i], "key@"))
s.True(strings.HasPrefix(values[i], "value@"))
}
}
func (s *apiTestSuite) TestReverseScan() {
prefix := "test_reverse_scan"
s.cleanKeyPrefix(prefix)
for i := 0; i < 10; i++ {
s.mustPut(prefix, fmt.Sprintf("key:%v", i), fmt.Sprintf("value:%v", i))
}
keys, values := s.mustReverseScan(prefix, "key:", "", 5)
for i := range keys {
s.Equal(fmt.Sprintf("key:%v", i), keys[len(keys)-1-i])
s.Equal(fmt.Sprintf("value:%v", i), values[len(keys)-1-i])
}
}
func (s *apiTestSuite) TestBatchOp() {
prefix := "test_batch_op"
ks := []string{"k1", "k2"}
s.cleanKeyPrefix(prefix)
s.mustBatchPut(prefix, ks, []string{"v1", "v2"})
vs := s.mustBatchGet(prefix, ks)
s.Equal("v1", vs[0])
s.Equal("v2", vs[1])
s.mustBatchDelete(prefix, ks)
s.mustNotExist(prefix, ks[0])
s.mustNotExist(prefix, ks[1])
}
func (s *apiTestSuite) TestCAS() {
prefix := "test_cas"
s.cleanKeyPrefix(prefix)
success, old := s.mustCAS(prefix, "key", "", "hello world")
s.True(success)
s.Equal("", old)
v := s.mustGet(prefix, "key")
s.Equal("hello world", v)
success, old = s.mustCAS(prefix, "key", "hello", "world")
s.False(success)
s.Equal("hello world", old)
v = s.mustGet(prefix, "key")
s.Equal("hello world", v)
success, old = s.mustCAS(prefix, "key", "hello world", "world")
s.True(success)
s.Equal("hello world", old)
v = s.mustGet(prefix, "key")
s.Equal("world", v)
}
func (s *apiTestSuite) TestTTL() {
prefix := "test_ttl"
var ttl uint64 = 2
s.mustPutWithTTL(prefix, "key", "value", ttl)
time.Sleep(time.Second * time.Duration(ttl/2))
rest := s.mustGetKeyTTL(prefix, "key")
s.NotNil(rest)
s.LessOrEqual(*rest, ttl/2)
time.Sleep(time.Second * time.Duration(ttl/2))
s.mustNotExist(prefix, "key")
rest = s.mustGetKeyTTL(prefix, "key")
s.Nil(rest)
}
func (s *apiTestSuite) TestDeleteRange() {
prefix := "test_delete_range"
s.cleanKeyPrefix(prefix)
var (
keys []string
values []string
)
for i := 0; i < 20480; i++ {
keys = append(keys, fmt.Sprintf("key@%v", i))
values = append(values, fmt.Sprintf("value@%v", i))
}
s.mustBatchPut(prefix, keys, values)
s.mustSplitRegion(prefix, []string{"key@4096"})
s.mustDeleteRange(prefix, "", "")
s.mustNotExist(prefix, "key@0")
s.mustNotExist(prefix, "key@1")
s.mustNotExist(prefix, "key@2")
}
func (s *apiTestSuite) TearDownTest() {
if s.client != nil {
_ = s.client.Close()
}
if s.clientForCas != nil {
_ = s.clientForCas.Close()
}
if s.pdClient != nil {
s.pdClient.Close()
}
}

View File

@ -1,94 +0,0 @@
// Copyright 2021 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package raw_tikv_test
import (
"context"
"strings"
"testing"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/rawkv"
)
func TestCAS(t *testing.T) {
if !*withTiKV {
t.Skip("skipping TestCAS because with-tikv is not enabled")
}
suite.Run(t, new(casTestSuite))
}
type casTestSuite struct {
suite.Suite
client *rawkv.Client
}
func (s *casTestSuite) SetupTest() {
addrs := strings.Split(*pdAddrs, ",")
client, err := rawkv.NewClient(context.TODO(), addrs, config.DefaultConfig().Security)
require.Nil(s.T(), err)
s.client = client
}
func (s *casTestSuite) mustCompareAndSwap(key, previousValue, newValue []byte) ([]byte, bool) {
previousValue, succeed, err := s.client.
SetAtomicForCAS(true).
CompareAndSwap(context.TODO(), key, previousValue, newValue)
s.Nil(err)
return previousValue, succeed
}
func (s *casTestSuite) mustGet(key []byte) []byte {
v, err := s.client.Get(context.TODO(), key)
s.Nil(err)
return v
}
func (s *casTestSuite) mustPut(key, value []byte) {
err := s.client.Put(context.TODO(), key, value)
s.Nil(err)
}
func (s *casTestSuite) mustDelete(key []byte) {
err := s.client.Delete(context.TODO(), key)
s.Nil(err)
}
func (s *casTestSuite) TestBasic() {
key := []byte("key")
s.mustDelete(key)
initial := []byte("value-0")
prev, succeed := s.mustCompareAndSwap(key, nil, initial)
s.Nil(prev)
s.True(succeed)
s.Equal(initial, s.mustGet(key))
another := []byte("value-1")
s.mustPut(key, another)
prev, succeed = s.mustCompareAndSwap(key, another, initial)
s.True(succeed)
s.Equal(another, prev)
prev, succeed = s.mustCompareAndSwap(key, another, initial)
s.False(succeed)
s.Equal(initial, prev)
}
func (s *casTestSuite) TearDownTest() {
s.client.Close()
}

View File

@ -1,95 +0,0 @@
// Copyright 2021 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package raw_tikv_test
import (
"context"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/rawkv"
)
func TestTTL(t *testing.T) {
if !*withTiKV {
t.Skip("skipping TestTTL because with-tikv is not enabled")
}
suite.Run(t, new(ttlTestSuite))
}
type ttlTestSuite struct {
suite.Suite
client *rawkv.Client
}
func (s *ttlTestSuite) SetupTest() {
addrs := strings.Split(*pdAddrs, ",")
client, err := rawkv.NewClient(context.TODO(), addrs, config.DefaultConfig().Security)
require.Nil(s.T(), err)
s.client = client
}
func (s *ttlTestSuite) mustPutWithTTL(key, value []byte, ttl uint64) {
err := s.client.PutWithTTL(context.TODO(), key, value, ttl)
s.Nil(err)
}
func (s *ttlTestSuite) mustNotExist(key []byte) {
v, err := s.client.Get(context.TODO(), key)
s.Nil(err)
s.Nil(v)
}
func (s *ttlTestSuite) mustGetKeyTTL(key []byte) *uint64 {
ttl, err := s.client.GetKeyTTL(context.TODO(), key)
s.Nil(err)
return ttl
}
// TODO: we may mock this feature in unistore.
func (s *ttlTestSuite) TestPutWithTTL() {
key := []byte("test-put-with-ttl")
value := []byte("value")
var ttl uint64 = 1
s.mustPutWithTTL(key, value, ttl)
time.Sleep(time.Second * time.Duration(ttl*2))
s.mustNotExist(key)
}
func (s *ttlTestSuite) TestGetKeyTTL() {
key := []byte("test-get-key-ttl")
value := []byte("value")
var ttl uint64 = 2
s.mustPutWithTTL(key, value, ttl)
time.Sleep(time.Second * time.Duration(ttl/2))
rest := s.mustGetKeyTTL(key)
s.NotNil(rest)
s.LessOrEqual(*rest, ttl/2)
time.Sleep(time.Second * time.Duration(ttl/2))
s.mustNotExist(key)
rest = s.mustGetKeyTTL(key)
s.Nil(rest)
}
func (s *ttlTestSuite) TearDownTest() {
s.client.Close()
}

View File

@ -0,0 +1,210 @@
package client
import (
"bytes"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pkg/errors"
"github.com/tikv/client-go/v2/tikvrpc"
)
// Mode represents the operation mode of a request.
type Mode int
const (
// ModeRaw represent a raw operation in TiKV
ModeRaw = iota
// ModeTxn represent a transaction operation in TiKV
ModeTxn
)
var (
// APIV2RawKeyPrefix is prefix of raw key in API V2.
APIV2RawKeyPrefix = []byte{'r', 0, 0, 0}
// APIV2RawEndKey is max key of raw key in API V2.
APIV2RawEndKey = []byte{'r', 0, 0, 1}
// APIV2TxnKeyPrefix is prefix of txn key in API V2.
APIV2TxnKeyPrefix = []byte{'x', 0, 0, 0}
// APIV2TxnEndKey is max key of txn key in API V2.
APIV2TxnEndKey = []byte{'x', 0, 0, 1}
)
func getV2Prefix(mode Mode) []byte {
switch mode {
case ModeRaw:
return APIV2RawKeyPrefix
case ModeTxn:
return APIV2TxnKeyPrefix
}
panic("unreachable")
}
func getV2EndKey(mode Mode) []byte {
switch mode {
case ModeRaw:
return APIV2RawEndKey
case ModeTxn:
return APIV2TxnEndKey
}
panic("unreachable")
}
// EncodeV2Key encode a user key into API V2 format.
func EncodeV2Key(mode Mode, key []byte) []byte {
return append(getV2Prefix(mode), key...)
}
// EncodeV2Range encode a range into API V2 format.
func EncodeV2Range(mode Mode, start, end []byte) ([]byte, []byte) {
var b []byte
if len(end) > 0 {
b = EncodeV2Key(mode, end)
} else {
b = getV2EndKey(mode)
}
return EncodeV2Key(mode, start), b
}
// MapV2RangeToV1 maps a range in API V2 format into V1 range.
// This function forbid the user seeing other keyspace.
func MapV2RangeToV1(mode Mode, start []byte, end []byte) ([]byte, []byte) {
var a, b []byte
minKey := getV2Prefix(mode)
if bytes.Compare(start, minKey) < 0 {
a = []byte{}
} else {
a = start[len(minKey):]
}
maxKey := getV2EndKey(mode)
if len(end) == 0 || bytes.Compare(end, maxKey) >= 0 {
b = []byte{}
} else {
b = end[len(maxKey):]
}
return a, b
}
// EncodeV2Keys encodes keys into API V2 format.
func EncodeV2Keys(mode Mode, keys [][]byte) [][]byte {
var ks [][]byte
for _, key := range keys {
ks = append(ks, EncodeV2Key(mode, key))
}
return ks
}
// EncodeV2Pairs encodes pairs into API V2 format.
func EncodeV2Pairs(mode Mode, pairs []*kvrpcpb.KvPair) []*kvrpcpb.KvPair {
var ps []*kvrpcpb.KvPair
for _, pair := range pairs {
p := *pair
p.Key = EncodeV2Key(mode, p.Key)
ps = append(ps, &p)
}
return ps
}
// EncodeRequest encodes req into specified API version format.
func EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error) {
if req.GetApiVersion() == kvrpcpb.APIVersion_V1 {
return req, nil
}
newReq := *req
// TODO(iosmanthus): support transaction request types
switch req.Type {
case tikvrpc.CmdRawGet:
r := *req.RawGet()
r.Key = EncodeV2Key(ModeRaw, r.Key)
newReq.Req = &r
case tikvrpc.CmdRawBatchGet:
r := *req.RawBatchGet()
r.Keys = EncodeV2Keys(ModeRaw, r.Keys)
newReq.Req = &r
case tikvrpc.CmdRawPut:
r := *req.RawPut()
r.Key = EncodeV2Key(ModeRaw, r.Key)
newReq.Req = &r
case tikvrpc.CmdRawBatchPut:
r := *req.RawBatchPut()
r.Pairs = EncodeV2Pairs(ModeRaw, r.Pairs)
newReq.Req = &r
case tikvrpc.CmdRawDelete:
r := *req.RawDelete()
r.Key = EncodeV2Key(ModeRaw, r.Key)
newReq.Req = &r
case tikvrpc.CmdRawBatchDelete:
r := *req.RawBatchDelete()
r.Keys = EncodeV2Keys(ModeRaw, r.Keys)
newReq.Req = &r
case tikvrpc.CmdRawDeleteRange:
r := *req.RawDeleteRange()
r.StartKey, r.EndKey = EncodeV2Range(ModeRaw, r.StartKey, r.EndKey)
newReq.Req = &r
case tikvrpc.CmdRawScan:
r := *req.RawScan()
r.StartKey, r.EndKey = EncodeV2Range(ModeRaw, r.StartKey, r.EndKey)
newReq.Req = &r
case tikvrpc.CmdGetKeyTTL:
r := *req.RawGetKeyTTL()
r.Key = EncodeV2Key(ModeRaw, r.Key)
newReq.Req = &r
case tikvrpc.CmdRawCompareAndSwap:
r := *req.RawCompareAndSwap()
r.Key = EncodeV2Key(ModeRaw, r.Key)
newReq.Req = &r
}
return &newReq, nil
}
// DecodeV2Key decodes API V2 encoded key into a normal user key.
func DecodeV2Key(mode Mode, key []byte) ([]byte, error) {
prefix := getV2Prefix(mode)
if !bytes.HasPrefix(key, prefix) {
return nil, errors.Errorf("invalid encoded key prefix: %q", key)
}
return key[len(prefix):], nil
}
// DecodeV2Pairs decodes API V2 encoded pairs into normal user pairs.
func DecodeV2Pairs(mode Mode, pairs []*kvrpcpb.KvPair) ([]*kvrpcpb.KvPair, error) {
var ps []*kvrpcpb.KvPair
for _, pair := range pairs {
var err error
p := *pair
p.Key, err = DecodeV2Key(mode, p.Key)
if err != nil {
return nil, err
}
ps = append(ps, &p)
}
return ps, nil
}
// DecodeResponse decode the resp in specified API version format.
func DecodeResponse(req *tikvrpc.Request, resp *tikvrpc.Response) (*tikvrpc.Response, error) {
if req.GetApiVersion() == kvrpcpb.APIVersion_V1 {
return resp, nil
}
var err error
switch req.Type {
case tikvrpc.CmdRawBatchGet:
r := resp.Resp.(*kvrpcpb.RawBatchGetResponse)
r.Pairs, err = DecodeV2Pairs(ModeRaw, r.Pairs)
case tikvrpc.CmdRawScan:
r := resp.Resp.(*kvrpcpb.RawScanResponse)
r.Kvs, err = DecodeV2Pairs(ModeRaw, r.Kvs)
}
return resp, err
}

View File

@ -0,0 +1,27 @@
package client
import (
"testing"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikvrpc"
)
func TestEncodeRequest(t *testing.T) {
req := &tikvrpc.Request{
Type: tikvrpc.CmdRawGet,
Req: &kvrpcpb.RawGetRequest{
Key: []byte("key"),
},
}
req.ApiVersion = kvrpcpb.APIVersion_V2
r, err := EncodeRequest(req)
require.Nil(t, err)
require.Equal(t, append(APIV2RawKeyPrefix, []byte("key")...), r.RawGet().Key)
r, err = EncodeRequest(req)
require.Nil(t, err)
require.Equal(t, append(APIV2RawKeyPrefix, []byte("key")...), r.RawGet().Key)
}

View File

@ -121,7 +121,8 @@ type connArray struct {
done chan struct{}
}
func newConnArray(maxSize uint, addr string, security config.Security, idleNotify *uint32, enableBatch bool, dialTimeout time.Duration) (*connArray, error) {
func newConnArray(maxSize uint, addr string, security config.Security,
idleNotify *uint32, enableBatch bool, dialTimeout time.Duration, opts []grpc.DialOption) (*connArray, error) {
a := &connArray{
index: 0,
v: make([]*grpc.ClientConn, maxSize),
@ -129,13 +130,13 @@ func newConnArray(maxSize uint, addr string, security config.Security, idleNotif
done: make(chan struct{}),
dialTimeout: dialTimeout,
}
if err := a.Init(addr, security, idleNotify, enableBatch); err != nil {
if err := a.Init(addr, security, idleNotify, enableBatch, opts...); err != nil {
return nil, err
}
return a, nil
}
func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32, enableBatch bool) error {
func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32, enableBatch bool, opts ...grpc.DialOption) error {
a.target = addr
opt := grpc.WithTransportCredentials(insecure.NewCredentials())
@ -172,9 +173,8 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
if cfg.TiKVClient.GrpcCompressionType == gzip.Name {
callOptions = append(callOptions, grpc.UseCompressor(gzip.Name))
}
conn, err := grpc.DialContext(
ctx,
addr,
opts = append([]grpc.DialOption{
opt,
grpc.WithInitialWindowSize(GrpcInitialWindowSize),
grpc.WithInitialConnWindowSize(GrpcInitialConnWindowSize),
@ -195,6 +195,12 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
Timeout: time.Duration(keepAliveTimeout) * time.Second,
PermitWithoutStream: true,
}),
}, opts...)
conn, err := grpc.DialContext(
ctx,
addr,
opts...,
)
cancel()
if err != nil {
@ -248,16 +254,29 @@ func (a *connArray) Close() {
close(a.done)
}
type option struct {
gRPCDialOptions []grpc.DialOption
security config.Security
dialTimeout time.Duration
}
// Opt is the option for the client.
type Opt func(*RPCClient)
type Opt func(*option)
// WithSecurity is used to set the security config.
func WithSecurity(security config.Security) Opt {
return func(c *RPCClient) {
return func(c *option) {
c.security = security
}
}
// WithGRPCDialOptions is used to set the grpc.DialOption.
func WithGRPCDialOptions(grpcDialOptions ...grpc.DialOption) Opt {
return func(c *option) {
c.gRPCDialOptions = grpcDialOptions
}
}
// RPCClient is RPC client struct.
// TODO: Add flow control between RPC clients in TiDB ond RPC servers in TiKV.
// Since we use shared client connection to communicate to the same TiKV, it's possible
@ -265,25 +284,26 @@ func WithSecurity(security config.Security) Opt {
type RPCClient struct {
sync.RWMutex
conns map[string]*connArray
security config.Security
conns map[string]*connArray
option *option
idleNotify uint32
// Periodically check whether there is any connection that is idle and then close and remove these connections.
// Implement background cleanup.
isClosed bool
dialTimeout time.Duration
isClosed bool
}
// NewRPCClient creates a client that manages connections and rpc calls with tikv-servers.
func NewRPCClient(opts ...Opt) *RPCClient {
cli := &RPCClient{
conns: make(map[string]*connArray),
dialTimeout: dialTimeout,
conns: make(map[string]*connArray),
option: &option{
dialTimeout: dialTimeout,
},
}
for _, opt := range opts {
opt(cli)
opt(cli.option)
}
return cli
}
@ -323,7 +343,16 @@ func (c *RPCClient) createConnArray(addr string, enableBatch bool, opts ...func(
for _, opt := range opts {
opt(&client)
}
array, err = newConnArray(client.GrpcConnectionCount, addr, c.security, &c.idleNotify, enableBatch, c.dialTimeout)
array, err = newConnArray(
client.GrpcConnectionCount,
addr,
c.option.security,
&c.idleNotify,
enableBatch,
c.option.dialTimeout,
c.option.gRPCDialOptions)
if err != nil {
return nil, err
}
@ -400,8 +429,7 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, start time.
counter.(sendReqCounterCacheValue).timeCounter.Add(secs)
}
// SendRequest sends a Request to server and receives Response.
func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan(fmt.Sprintf("rpcClient.SendRequest, region ID: %d, type: %s", req.RegionId, req.Type), opentracing.ChildOf(span.Context()))
defer span1.Finish()
@ -473,6 +501,19 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
return tikvrpc.CallRPC(ctx1, client, req)
}
// SendRequest sends a Request to server and receives Response.
func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
req, err := EncodeRequest(req)
if err != nil {
return nil, err
}
resp, err := c.sendRequest(ctx, addr, req, timeout)
if err != nil {
return nil, err
}
return DecodeResponse(req, resp)
}
func (c *RPCClient) getCopStreamResponse(ctx context.Context, client tikvpb.TikvClient, req *tikvrpc.Request, timeout time.Duration, connArray *connArray) (*tikvrpc.Response, error) {
// Coprocessor streaming request.
// Use context to support timeout for grpc streaming client.

View File

@ -60,7 +60,7 @@ func TestPanicInRecvLoop(t *testing.T) {
addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
rpcClient := NewRPCClient()
rpcClient.dialTimeout = time.Second / 3
rpcClient.option.dialTimeout = time.Second / 3
// Start batchRecvLoop, and it should panic in `failPendingRequests`.
_, err := rpcClient.getConnArray(addr, true, func(cfg *config.TiKVClient) { cfg.GrpcConnectionCount = 1 })

View File

@ -0,0 +1,110 @@
package locate
import (
"context"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/client-go/v2/internal/client"
"github.com/tikv/client-go/v2/util/codec"
pd "github.com/tikv/pd/client"
)
// CodecPDClientV2 wraps a PD Client to decode the region meta in API v2 manner.
type CodecPDClientV2 struct {
*CodecPDClient
mode client.Mode
}
// NewCodecPDClientV2 create a CodecPDClientV2.
func NewCodecPDClientV2(client pd.Client, mode client.Mode) *CodecPDClientV2 {
codecClient := NewCodeCPDClient(client)
return &CodecPDClientV2{codecClient, mode}
}
// GetRegion encodes the key before send requests to pd-server and decodes the
// returned StartKey && EndKey from pd-server.
func (c *CodecPDClientV2) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) {
queryKey := client.EncodeV2Key(c.mode, key)
region, err := c.CodecPDClient.GetRegion(ctx, queryKey, opts...)
return c.processRegionResult(region, err)
}
// GetPrevRegion encodes the key before send requests to pd-server and decodes the
// returned StartKey && EndKey from pd-server.
func (c *CodecPDClientV2) GetPrevRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) {
queryKey := client.EncodeV2Key(c.mode, key)
region, err := c.CodecPDClient.GetPrevRegion(ctx, queryKey, opts...)
return c.processRegionResult(region, err)
}
// GetRegionByID encodes the key before send requests to pd-server and decodes the
// returned StartKey && EndKey from pd-server.
func (c *CodecPDClientV2) GetRegionByID(ctx context.Context, regionID uint64, opts ...pd.GetRegionOption) (*pd.Region, error) {
region, err := c.CodecPDClient.GetRegionByID(ctx, regionID, opts...)
return c.processRegionResult(region, err)
}
// ScanRegions encodes the key before send requests to pd-server and decodes the
// returned StartKey && EndKey from pd-server.
func (c *CodecPDClientV2) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int) ([]*pd.Region, error) {
start, end := client.EncodeV2Range(c.mode, startKey, endKey)
regions, err := c.CodecPDClient.ScanRegions(ctx, start, end, limit)
if err != nil {
return nil, err
}
for i := range regions {
region, _ := c.processRegionResult(regions[i], nil)
regions[i] = region
}
return regions, nil
}
// SplitRegions split regions by given split keys
func (c *CodecPDClientV2) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...pd.RegionsOption) (*pdpb.SplitRegionsResponse, error) {
var keys [][]byte
for i := range splitKeys {
withPrefix := client.EncodeV2Key(c.mode, splitKeys[i])
keys = append(keys, codec.EncodeBytes(nil, withPrefix))
}
return c.CodecPDClient.SplitRegions(ctx, keys, opts...)
}
func (c *CodecPDClientV2) processRegionResult(region *pd.Region, err error) (*pd.Region, error) {
if err != nil {
return nil, err
}
if region != nil {
// TODO(@iosmanthus): enable buckets support.
region.Buckets = nil
region.Meta.StartKey, region.Meta.EndKey =
client.MapV2RangeToV1(c.mode, region.Meta.StartKey, region.Meta.EndKey)
}
return region, nil
}
func (c *CodecPDClientV2) decodeRegionWithShallowCopy(region *metapb.Region) (*metapb.Region, error) {
var err error
newRegion := *region
if len(region.StartKey) > 0 {
_, newRegion.StartKey, err = codec.DecodeBytes(region.StartKey, nil)
}
if err != nil {
return nil, err
}
if len(region.EndKey) > 0 {
_, newRegion.EndKey, err = codec.DecodeBytes(region.EndKey, nil)
}
if err != nil {
return nil, err
}
newRegion.StartKey, newRegion.EndKey = client.MapV2RangeToV1(c.mode, newRegion.StartKey, newRegion.EndKey)
return &newRegion, nil
}

View File

@ -51,6 +51,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/google/btree"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pkg/errors"
"github.com/stathat/consistent"
@ -363,6 +364,7 @@ func (r *Region) isValid() bool {
// purposes only.
type RegionCache struct {
pdClient pd.Client
apiVersion kvrpcpb.APIVersion
enableForwarding bool
mu struct {
@ -398,6 +400,14 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
c := &RegionCache{
pdClient: pdClient,
}
switch pdClient.(type) {
case *CodecPDClientV2:
c.apiVersion = kvrpcpb.APIVersion_V2
default:
c.apiVersion = kvrpcpb.APIVersion_V1
}
c.mu.regions = make(map[RegionVerID]*Region)
c.mu.latestVersions = make(map[uint64]RegionVerID)
c.mu.sorted = btree.New(btreeDegree)
@ -1753,12 +1763,18 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext
newRegions := make([]*Region, 0, len(currentRegions))
// If the region epoch is not ahead of TiKV's, replace region meta in region cache.
for _, meta := range currentRegions {
if _, ok := c.pdClient.(*CodecPDClient); ok {
var err error
var err error
oldMeta := meta
switch c.pdClient.(type) {
case *CodecPDClient:
// Can't modify currentRegions in this function because it can be shared by
// multiple goroutines, refer to https://github.com/pingcap/tidb/pull/16962.
if meta, err = decodeRegionMetaKeyWithShallowCopy(meta); err != nil {
return false, errors.Errorf("newRegion's range key is not encoded: %v, %v", meta, err)
return false, errors.Errorf("newRegion's range key is not encoded: %v, %v", oldMeta, err)
}
case *CodecPDClientV2:
if meta, err = c.pdClient.(*CodecPDClientV2).decodeRegionWithShallowCopy(meta); err != nil {
return false, errors.Errorf("newRegion's range key is not encoded: %v, %v", oldMeta, err)
}
}
// TODO(youjiali1995): new regions inherit old region's buckets now. Maybe we should make EpochNotMatch error

View File

@ -99,6 +99,7 @@ func LoadShuttingDown() uint32 {
// split, so we simply return the error to caller.
type RegionRequestSender struct {
regionCache *RegionCache
apiVersion kvrpcpb.APIVersion
client client.Client
storeAddr string
rpcError error
@ -191,6 +192,7 @@ func RecordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats,
func NewRegionRequestSender(regionCache *RegionCache, client client.Client) *RegionRequestSender {
return &RegionRequestSender{
regionCache: regionCache,
apiVersion: regionCache.apiVersion,
client: client,
}
}
@ -1120,6 +1122,8 @@ func fetchRespInfo(resp *tikvrpc.Response) string {
}
func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, rpcCtx *RPCContext, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, retry bool, err error) {
req.ApiVersion = s.apiVersion
if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil {
return nil, false, err
}
@ -1449,7 +1453,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
}
if regionErr.GetKeyNotInRegion() != nil {
logutil.BgLogger().Debug("tikv reports `KeyNotInRegion`", zap.Stringer("ctx", ctx))
logutil.BgLogger().Debug("tikv reports `KeyNotInRegion`", zap.Stringer("req", req), zap.Stringer("ctx", ctx))
s.regionCache.InvalidateCachedRegion(ctx.Region)
return false, nil
}

View File

@ -50,6 +50,7 @@ import (
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikvrpc"
pd "github.com/tikv/pd/client"
"google.golang.org/grpc"
)
var (
@ -120,6 +121,7 @@ func ScanKeyOnly() RawOption {
// Client is a client of TiKV server which is used as a key-value storage,
// only GET/PUT/DELETE commands are supported.
type Client struct {
apiVersion kvrpcpb.APIVersion
clusterID uint64
regionCache *locate.RegionCache
pdClient pd.Client
@ -128,6 +130,44 @@ type Client struct {
atomic bool
}
type option struct {
apiVersion kvrpcpb.APIVersion
security config.Security
gRPCDialOptions []grpc.DialOption
pdOptions []pd.ClientOption
}
// ClientOpt is factory to set the client options.
type ClientOpt func(*option)
// WithPDOptions is used to set the pd.ClientOption
func WithPDOptions(opts ...pd.ClientOption) ClientOpt {
return func(o *option) {
o.pdOptions = append(o.pdOptions, opts...)
}
}
// WithSecurity is used to set the config.Security
func WithSecurity(security config.Security) ClientOpt {
return func(o *option) {
o.security = security
}
}
// WithGRPCDialOptions is used to set the grpc.DialOption.
func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOpt {
return func(o *option) {
o.gRPCDialOptions = append(o.gRPCDialOptions, opts...)
}
}
// WithAPIVersion is used to set the api version.
func WithAPIVersion(apiVersion kvrpcpb.APIVersion) ClientOpt {
return func(o *option) {
o.apiVersion = apiVersion
}
}
// SetAtomicForCAS sets atomic mode for CompareAndSwap
func (c *Client) SetAtomicForCAS(b bool) *Client {
c.atomic = b
@ -142,19 +182,36 @@ func (c *Client) SetColumnFamily(columnFamily string) *Client {
// NewClient creates a client with PD cluster addrs.
func NewClient(ctx context.Context, pdAddrs []string, security config.Security, opts ...pd.ClientOption) (*Client, error) {
return NewClientWithOpts(ctx, pdAddrs, WithSecurity(security), WithPDOptions(opts...))
}
// NewClientWithOpts creates a client with PD cluster addrs and client options.
func NewClientWithOpts(ctx context.Context, pdAddrs []string, opts ...ClientOpt) (*Client, error) {
opt := &option{}
for _, o := range opts {
o(opt)
}
pdCli, err := pd.NewClient(pdAddrs, pd.SecurityOption{
CAPath: security.ClusterSSLCA,
CertPath: security.ClusterSSLCert,
KeyPath: security.ClusterSSLKey,
}, opts...)
CAPath: opt.security.ClusterSSLCA,
CertPath: opt.security.ClusterSSLCert,
KeyPath: opt.security.ClusterSSLKey,
}, opt.pdOptions...)
if err != nil {
return nil, errors.WithStack(err)
}
if opt.apiVersion == kvrpcpb.APIVersion_V2 {
pdCli = locate.NewCodecPDClientV2(pdCli, client.ModeRaw)
}
return &Client{
apiVersion: opt.apiVersion,
clusterID: pdCli.GetClusterID(ctx),
regionCache: locate.NewRegionCache(pdCli),
pdClient: pdCli,
rpcClient: client.NewRPCClient(client.WithSecurity(security)),
rpcClient: client.NewRPCClient(client.WithSecurity(opt.security), client.WithGRPCDialOptions(opt.gRPCDialOptions...)),
}, nil
}
@ -790,11 +847,11 @@ func (c *Client) sendDeleteRangeReq(ctx context.Context, startKey []byte, endKey
func (c *Client) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte, ttls []uint64, opts *rawOptions) error {
keyToValue := make(map[string][]byte, len(keys))
keyTottl := make(map[string]uint64, len(keys))
keyToTTL := make(map[string]uint64, len(keys))
for i, key := range keys {
keyToValue[string(key)] = values[i]
if len(ttls) > 0 {
keyTottl[string(key)] = ttls[i]
keyToTTL[string(key)] = ttls[i]
}
}
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil)
@ -804,7 +861,7 @@ func (c *Client) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte, ttls [
var batches []kvrpc.Batch
// split the keys by size and RegionVerID
for regionID, groupKeys := range groups {
batches = kvrpc.AppendBatches(batches, regionID, groupKeys, keyToValue, keyTottl, rawBatchPutSize)
batches = kvrpc.AppendBatches(batches, regionID, groupKeys, keyToValue, keyToTTL, rawBatchPutSize)
}
bo, cancel := bo.Fork()
ch := make(chan error, len(batches))
@ -850,6 +907,7 @@ func (c *Client) doBatchPut(bo *retry.Backoffer, batch kvrpc.Batch, opts *rawOpt
sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient)
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
req.ApiVersion = c.apiVersion
resp, err := sender.SendReq(bo, req, batch.RegionID, client.ReadTimeoutShort)
if err != nil {
return err

View File

@ -91,6 +91,23 @@ type RPCRuntimeStats = locate.RPCRuntimeStats
// CodecPDClient wraps a PD Client to decode the encoded keys in region meta.
type CodecPDClient = locate.CodecPDClient
// CodecPDClientV2 wraps a PD Client to decode the region meta in API v2 manner.
type CodecPDClientV2 = locate.CodecPDClientV2
// NewCodecPDClientV2 is a constructor for CodecPDClientV2
var NewCodecPDClientV2 = locate.NewCodecPDClientV2
// Mode represents the operation mode of a request, export client.Mode
type Mode = client.Mode
var (
// ModeRaw represent a raw operation in TiKV, export client.ModeRaw
ModeRaw Mode = client.ModeRaw
// ModeTxn represent a transaction operation in TiKV, export client.ModeTxn
ModeTxn Mode = client.ModeTxn
)
// RecordRegionRequestRuntimeStats records request runtime stats.
func RecordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*locate.RPCRuntimeStats, cmd tikvrpc.CmdType, d time.Duration) {
locate.RecordRegionRequestRuntimeStats(stats, cmd, d)

View File

@ -715,14 +715,14 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error {
req.RawDeleteRange().Context = ctx
case CmdRawScan:
req.RawScan().Context = ctx
case CmdUnsafeDestroyRange:
req.UnsafeDestroyRange().Context = ctx
case CmdGetKeyTTL:
req.RawGetKeyTTL().Context = ctx
case CmdRawCompareAndSwap:
req.RawCompareAndSwap().Context = ctx
case CmdRawChecksum:
req.RawChecksum().Context = ctx
case CmdUnsafeDestroyRange:
req.UnsafeDestroyRange().Context = ctx
case CmdRegisterLockObserver:
req.RegisterLockObserver().Context = ctx
case CmdCheckLockObserver: