mirror of https://github.com/tikv/client-go.git
refactor decoding buckets with new interface method: DecodeBucketKeys (#760)
Signed-off-by: iosmanthus <myosmanthustree@gmail.com>
This commit is contained in:
parent
6a92aeec92
commit
4ac424aea1
|
|
@ -57,8 +57,8 @@ type Codec interface {
|
||||||
EncodeRegionKey(key []byte) []byte
|
EncodeRegionKey(key []byte) []byte
|
||||||
// DecodeRegionKey decode region's key
|
// DecodeRegionKey decode region's key
|
||||||
DecodeRegionKey(encodedKey []byte) ([]byte, error)
|
DecodeRegionKey(encodedKey []byte) ([]byte, error)
|
||||||
// DecodeBucketKey decode region bucket's key
|
// DecodeBucketKeys decode region bucket's key
|
||||||
DecodeBucketKey(encodedKey []byte) ([]byte, error)
|
DecodeBucketKeys(keys [][]byte) ([][]byte, error)
|
||||||
// EncodeRegionRange encode region's start and end.
|
// EncodeRegionRange encode region's start and end.
|
||||||
EncodeRegionRange(start, end []byte) ([]byte, []byte)
|
EncodeRegionRange(start, end []byte) ([]byte, []byte)
|
||||||
// DecodeRegionRange decode region's start and end.
|
// DecodeRegionRange decode region's start and end.
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package apicodec
|
package apicodec
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/pingcap/errors"
|
||||||
"github.com/pingcap/kvproto/pkg/errorpb"
|
"github.com/pingcap/kvproto/pkg/errorpb"
|
||||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||||
"github.com/tikv/client-go/v2/tikvrpc"
|
"github.com/tikv/client-go/v2/tikvrpc"
|
||||||
|
|
@ -203,6 +204,14 @@ func (c *codecV1) DecodeKey(key []byte) ([]byte, error) {
|
||||||
return key, nil
|
return key, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *codecV1) DecodeBucketKey(key []byte) ([]byte, error) {
|
func (c *codecV1) DecodeBucketKeys(keys [][]byte) ([][]byte, error) {
|
||||||
return c.DecodeRegionKey(key)
|
ks := make([][]byte, 0, len(keys))
|
||||||
|
for _, key := range keys {
|
||||||
|
k, err := c.DecodeRegionKey(key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Trace(err)
|
||||||
|
}
|
||||||
|
ks = append(ks, k)
|
||||||
|
}
|
||||||
|
return ks, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,22 +2,7 @@ package apicodec
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/tikv/client-go/v2/util/codec"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestV1DecodeBucketKey(t *testing.T) {
|
func TestV1DecodeBucketKey(t *testing.T) {
|
||||||
c := NewCodecV1(ModeTxn)
|
|
||||||
raw := []byte("test")
|
|
||||||
encoded := codec.EncodeBytes(nil, raw)
|
|
||||||
key, err := c.DecodeBucketKey(encoded)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
assert.Equal(t, raw, key)
|
|
||||||
|
|
||||||
raw = []byte{}
|
|
||||||
encoded = codec.EncodeBytes(nil, raw)
|
|
||||||
key, err = c.DecodeBucketKey(encoded)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
assert.Equal(t, raw, key)
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -954,15 +954,31 @@ func (c *codecV2) decodeLockInfos(locks []*kvrpcpb.LockInfo) ([]*kvrpcpb.LockInf
|
||||||
return locks, nil
|
return locks, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *codecV2) DecodeBucketKey(encodedKey []byte) ([]byte, error) {
|
func (c *codecV2) DecodeBucketKeys(keys [][]byte) ([][]byte, error) {
|
||||||
key, err := c.memCodec.decodeKey(encodedKey)
|
ks := make([][]byte, 0, len(keys))
|
||||||
if err != nil {
|
for i, key := range keys {
|
||||||
return nil, err
|
var (
|
||||||
}
|
k []byte
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
if len(key) > 0 {
|
||||||
|
k, err = c.memCodec.decodeKey(key)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
if bytes.Compare(key, c.endKey) >= 0 || bytes.Compare(key, c.prefix) < 0 {
|
if i == 0 && bytes.Compare(k, c.prefix) < 0 {
|
||||||
return []byte{}, nil
|
ks = append(ks, []byte{})
|
||||||
|
} else if i == len(keys)-1 && (len(k) == 0 || bytes.Compare(k, c.endKey) >= 0) {
|
||||||
|
ks = append(ks, []byte{})
|
||||||
|
} else if bytes.HasPrefix(k, c.prefix) {
|
||||||
|
raw := k[len(c.prefix):]
|
||||||
|
if len(raw) == 0 && len(ks) > 0 && len(ks[0]) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ks = append(ks, raw)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return ks, nil
|
||||||
return key[len(c.prefix):], nil
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -296,22 +296,71 @@ func (suite *testCodecV2Suite) TestEncodeMPPRequest() {
|
||||||
suite.Equal(task.Regions[0].Ranges[0].End, suite.codec.EncodeKey([]byte("b")))
|
suite.Equal(task.Regions[0].Ranges[0].End, suite.codec.EncodeKey([]byte("b")))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *testCodecV2Suite) TestDecodeBucketKey() {
|
func (suite *testCodecV2Suite) TestDecodeBucketKeys() {
|
||||||
raw := []byte("a")
|
encodeWithPrefix := func(prefix, key []byte) []byte {
|
||||||
key := suite.codec.EncodeRegionKey(raw)
|
return codec.EncodeBytes(nil, append(prefix, key...))
|
||||||
bucketKey, err := suite.codec.DecodeBucketKey(key)
|
}
|
||||||
suite.Nil(err)
|
|
||||||
suite.Equal(raw, bucketKey)
|
|
||||||
|
|
||||||
raw = keyspaceEndKey
|
bucketKeys := [][]byte{
|
||||||
key = codec.EncodeBytes([]byte{}, raw)
|
encodeWithPrefix(prevKeyspacePrefix, []byte("a")),
|
||||||
bucketKey, err = suite.codec.DecodeBucketKey(key)
|
encodeWithPrefix(prevKeyspacePrefix, []byte("b")),
|
||||||
|
encodeWithPrefix(prevKeyspacePrefix, []byte("c")),
|
||||||
|
suite.codec.EncodeRegionKey([]byte("a")),
|
||||||
|
suite.codec.EncodeRegionKey([]byte("b")),
|
||||||
|
suite.codec.EncodeRegionKey([]byte("c")),
|
||||||
|
encodeWithPrefix(keyspaceEndKey, []byte("")),
|
||||||
|
encodeWithPrefix(keyspaceEndKey, []byte("a")),
|
||||||
|
encodeWithPrefix(keyspaceEndKey, []byte("b")),
|
||||||
|
encodeWithPrefix(keyspaceEndKey, []byte("c")),
|
||||||
|
}
|
||||||
|
keys, err := suite.codec.DecodeBucketKeys(bucketKeys)
|
||||||
suite.Nil(err)
|
suite.Nil(err)
|
||||||
suite.Empty(bucketKey)
|
suite.Equal([][]byte{
|
||||||
|
{},
|
||||||
|
[]byte("a"),
|
||||||
|
[]byte("b"),
|
||||||
|
[]byte("c"),
|
||||||
|
{},
|
||||||
|
}, keys)
|
||||||
|
|
||||||
raw = append(prevKeyspacePrefix, []byte("a")...)
|
bucketKeys = [][]byte{
|
||||||
key = codec.EncodeBytes([]byte{}, raw)
|
encodeWithPrefix(prevKeyspacePrefix, []byte("a")),
|
||||||
bucketKey, err = suite.codec.DecodeBucketKey(key)
|
encodeWithPrefix(prevKeyspacePrefix, []byte("b")),
|
||||||
|
encodeWithPrefix(prevKeyspacePrefix, []byte("c")),
|
||||||
|
suite.codec.EncodeRegionKey([]byte("")),
|
||||||
|
suite.codec.EncodeRegionKey([]byte("a")),
|
||||||
|
suite.codec.EncodeRegionKey([]byte("b")),
|
||||||
|
suite.codec.EncodeRegionKey([]byte("c")),
|
||||||
|
{},
|
||||||
|
}
|
||||||
|
keys, err = suite.codec.DecodeBucketKeys(bucketKeys)
|
||||||
suite.Nil(err)
|
suite.Nil(err)
|
||||||
suite.Empty(bucketKey)
|
suite.Equal([][]byte{
|
||||||
|
{},
|
||||||
|
[]byte("a"),
|
||||||
|
[]byte("b"),
|
||||||
|
[]byte("c"),
|
||||||
|
{},
|
||||||
|
}, keys)
|
||||||
|
|
||||||
|
bucketKeys = [][]byte{
|
||||||
|
{},
|
||||||
|
encodeWithPrefix(prevKeyspacePrefix, []byte("a")),
|
||||||
|
encodeWithPrefix(prevKeyspacePrefix, []byte("b")),
|
||||||
|
encodeWithPrefix(prevKeyspacePrefix, []byte("c")),
|
||||||
|
suite.codec.EncodeRegionKey([]byte("")),
|
||||||
|
suite.codec.EncodeRegionKey([]byte("a")),
|
||||||
|
suite.codec.EncodeRegionKey([]byte("b")),
|
||||||
|
suite.codec.EncodeRegionKey([]byte("c")),
|
||||||
|
{},
|
||||||
|
}
|
||||||
|
keys, err = suite.codec.DecodeBucketKeys(bucketKeys)
|
||||||
|
suite.Nil(err)
|
||||||
|
suite.Equal([][]byte{
|
||||||
|
{},
|
||||||
|
[]byte("a"),
|
||||||
|
[]byte("b"),
|
||||||
|
[]byte("c"),
|
||||||
|
{},
|
||||||
|
}, keys)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -163,16 +163,7 @@ func (c *CodecPDClient) decodeRegionKeyInPlace(r *pd.Region) error {
|
||||||
r.Meta.StartKey = decodedStart
|
r.Meta.StartKey = decodedStart
|
||||||
r.Meta.EndKey = decodedEnd
|
r.Meta.EndKey = decodedEnd
|
||||||
if r.Buckets != nil {
|
if r.Buckets != nil {
|
||||||
for i, k := range r.Buckets.Keys {
|
r.Buckets.Keys, err = c.codec.DecodeBucketKeys(r.Buckets.Keys)
|
||||||
if len(k) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
decoded, err := c.codec.DecodeBucketKey(k)
|
|
||||||
if err != nil {
|
|
||||||
return errors.WithStack(err)
|
|
||||||
}
|
|
||||||
r.Buckets.Keys[i] = decoded
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return nil
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue