client-go/internal/apicodec/codec_v2.go

1034 lines
28 KiB
Go

package apicodec
import (
"bytes"
"encoding/binary"
"sync"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pkg/errors"
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util/redact"
"go.uber.org/zap"
)
var (
// DefaultKeyspaceID is the keyspaceID of the default keyspace.
DefaultKeyspaceID uint32 = 0
// DefaultKeyspaceName is the name of the default keyspace.
DefaultKeyspaceName = "DEFAULT"
rawModePrefix byte = 'r'
txnModePrefix byte = 'x'
keyspacePrefixLen = 4
// maxKeyspaceID is the maximum value of keyspaceID, its value is uint24Max.
maxKeyspaceID = uint32(0xFFFFFF)
// errKeyOutOfBound happens when key to be decoded lies outside the keyspace's range.
errKeyOutOfBound = errors.New("given key does not belong to the keyspace")
)
func checkV2Key(b []byte) error {
if len(b) < keyspacePrefixLen || (b[0] != rawModePrefix && b[0] != txnModePrefix) {
return errors.Errorf("invalid API V2 key %s", b)
}
return nil
}
// BuildKeyspaceName builds a keyspace name
func BuildKeyspaceName(name string) string {
if name == "" {
return DefaultKeyspaceName
}
return name
}
// codecV2 is used to encode/decode keys and request into APIv2 format.
type codecV2 struct {
reqPool sync.Pool
prefix []byte
endKey []byte
memCodec memCodec
keyspaceMeta *keyspacepb.KeyspaceMeta
}
// NewCodecV2 returns a codec that can be used to encode/decode
// keys and requests to and from APIv2 format.
func NewCodecV2(mode Mode, keyspaceMeta *keyspacepb.KeyspaceMeta) (Codec, error) {
keyspaceID := keyspaceMeta.Id
if keyspaceID > maxKeyspaceID {
return nil, errors.Errorf("keyspaceID %d is out of range, maximum is %d", keyspaceID, maxKeyspaceID)
}
prefix, err := getIDByte(keyspaceID)
if err != nil {
return nil, err
}
codec := &codecV2{
// Region keys in CodecV2 are always encoded in memory comparable form.
memCodec: &memComparableCodec{},
keyspaceMeta: keyspaceMeta,
}
codec.prefix = make([]byte, 4)
codec.endKey = make([]byte, 4)
switch mode {
case ModeRaw:
codec.prefix[0] = rawModePrefix
case ModeTxn:
codec.prefix[0] = txnModePrefix
default:
return nil, errors.Errorf("unknown mode")
}
copy(codec.prefix[1:], prefix)
prefixVal := binary.BigEndian.Uint32(codec.prefix)
binary.BigEndian.PutUint32(codec.endKey, prefixVal+1)
codec.reqPool.New = func() any { return &tikvrpc.Request{} }
return codec, nil
}
func getIDByte(keyspaceID uint32) ([]byte, error) {
// PutUint32 requires 4 bytes to operate, so must use buffer with size 4 here.
b := make([]byte, 4)
// Use BigEndian to put the least significant byte to last array position.
// For example, keyspaceID 1 should result in []byte{0, 0, 1}
binary.BigEndian.PutUint32(b, keyspaceID)
// When keyspaceID can't fit in 3 bytes, first byte of buffer will be non-zero.
// So return error.
if b[0] != 0 {
return nil, errors.Errorf("illegal keyspaceID: %v, keyspaceID must be 3 byte", b)
}
// Remove the first byte to make keyspace ID 3 bytes.
return b[1:], nil
}
func (c *codecV2) GetKeyspace() []byte {
return c.prefix
}
func (c *codecV2) GetKeyspaceID() KeyspaceID {
return KeyspaceID(c.keyspaceMeta.Id)
}
func (c *codecV2) GetKeyspaceMeta() *keyspacepb.KeyspaceMeta {
return c.keyspaceMeta
}
func (c *codecV2) GetAPIVersion() kvrpcpb.APIVersion {
return kvrpcpb.APIVersion_V2
}
// EncodeRequest encodes with the given Codec.
// NOTE: req is reused on retry. MUST encode on cloned request, other than overwrite the original.
func (c *codecV2) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error) {
r := c.reqPool.Get().(*tikvrpc.Request)
*r = *req
setAPICtx(c, r)
req = r
// Encode requests based on command type.
switch req.Type {
// Transaction Request Types.
case tikvrpc.CmdGet:
r := *req.Get()
r.Key = c.EncodeKey(r.Key)
req.Req = &r
case tikvrpc.CmdScan:
r := *req.Scan()
r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, r.Reverse)
req.Req = &r
case tikvrpc.CmdPrewrite:
r := *req.Prewrite()
r.Mutations = c.encodeMutations(r.Mutations)
r.PrimaryLock = c.EncodeKey(r.PrimaryLock)
r.Secondaries = c.encodeKeys(r.Secondaries)
req.Req = &r
case tikvrpc.CmdCommit:
r := *req.Commit()
r.Keys = c.encodeKeys(r.Keys)
if len(r.PrimaryKey) > 0 {
// Only encode the primary key if it is not empty.
// Otherwise, it means `PrimaryKey` is not set, left it as empty to indicate it is not set in RPC.
r.PrimaryKey = c.EncodeKey(r.PrimaryKey)
}
req.Req = &r
case tikvrpc.CmdCleanup:
r := *req.Cleanup()
r.Key = c.EncodeKey(r.Key)
req.Req = &r
case tikvrpc.CmdBatchGet:
r := *req.BatchGet()
r.Keys = c.encodeKeys(r.Keys)
req.Req = &r
case tikvrpc.CmdBatchRollback:
r := *req.BatchRollback()
r.Keys = c.encodeKeys(r.Keys)
req.Req = &r
case tikvrpc.CmdScanLock:
r := *req.ScanLock()
r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, false)
req.Req = &r
case tikvrpc.CmdResolveLock:
r := *req.ResolveLock()
r.Keys = c.encodeKeys(r.Keys)
req.Req = &r
case tikvrpc.CmdGC:
// TODO: Deprecate Central GC Mode.
case tikvrpc.CmdDeleteRange:
r := *req.DeleteRange()
r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, false)
req.Req = &r
case tikvrpc.CmdPessimisticLock:
r := *req.PessimisticLock()
r.Mutations = c.encodeMutations(r.Mutations)
r.PrimaryLock = c.EncodeKey(r.PrimaryLock)
req.Req = &r
case tikvrpc.CmdPessimisticRollback:
r := *req.PessimisticRollback()
r.Keys = c.encodeKeys(r.Keys)
req.Req = &r
case tikvrpc.CmdTxnHeartBeat:
r := *req.TxnHeartBeat()
r.PrimaryLock = c.EncodeKey(r.PrimaryLock)
req.Req = &r
case tikvrpc.CmdCheckTxnStatus:
r := *req.CheckTxnStatus()
r.PrimaryKey = c.EncodeKey(r.PrimaryKey)
req.Req = &r
case tikvrpc.CmdCheckSecondaryLocks:
r := *req.CheckSecondaryLocks()
r.Keys = c.encodeKeys(r.Keys)
req.Req = &r
// Raw Request Types.
case tikvrpc.CmdRawGet:
r := *req.RawGet()
r.Key = c.EncodeKey(r.Key)
req.Req = &r
case tikvrpc.CmdRawBatchGet:
r := *req.RawBatchGet()
r.Keys = c.encodeKeys(r.Keys)
req.Req = &r
case tikvrpc.CmdRawPut:
r := *req.RawPut()
r.Key = c.EncodeKey(r.Key)
req.Req = &r
case tikvrpc.CmdRawBatchPut:
r := *req.RawBatchPut()
r.Pairs = c.encodeParis(r.Pairs)
req.Req = &r
case tikvrpc.CmdRawDelete:
r := *req.RawDelete()
r.Key = c.EncodeKey(r.Key)
req.Req = &r
case tikvrpc.CmdRawBatchDelete:
r := *req.RawBatchDelete()
r.Keys = c.encodeKeys(r.Keys)
req.Req = &r
case tikvrpc.CmdRawDeleteRange:
r := *req.RawDeleteRange()
r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, false)
req.Req = &r
case tikvrpc.CmdRawScan:
r := *req.RawScan()
r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, r.Reverse)
req.Req = &r
case tikvrpc.CmdGetKeyTTL:
r := *req.RawGetKeyTTL()
r.Key = c.EncodeKey(r.Key)
req.Req = &r
case tikvrpc.CmdRawCompareAndSwap:
r := *req.RawCompareAndSwap()
r.Key = c.EncodeKey(r.Key)
req.Req = &r
case tikvrpc.CmdRawChecksum:
r := *req.RawChecksum()
r.Ranges = c.encodeKeyRanges(r.Ranges)
req.Req = &r
// TiFlash Requests
case tikvrpc.CmdBatchCop:
r := *req.BatchCop()
r.Regions = c.encodeRegionInfos(r.Regions)
r.TableRegions = c.encodeTableRegions(r.TableRegions)
req.Req = &r
case tikvrpc.CmdMPPTask:
r := *req.DispatchMPPTask()
r.Regions = c.encodeRegionInfos(r.Regions)
r.TableRegions = c.encodeTableRegions(r.TableRegions)
req.Req = &r
// Other requests.
case tikvrpc.CmdUnsafeDestroyRange:
r := *req.UnsafeDestroyRange()
r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, false)
req.Req = &r
case tikvrpc.CmdPhysicalScanLock:
r := *req.PhysicalScanLock()
r.StartKey = c.EncodeKey(r.StartKey)
req.Req = &r
case tikvrpc.CmdStoreSafeTS:
r := *req.StoreSafeTS()
r.KeyRange = c.encodeKeyRange(r.KeyRange)
req.Req = &r
case tikvrpc.CmdCop:
r := *req.Cop()
r.Ranges = c.encodeCopRanges(r.Ranges)
r.Tasks = c.encodeStoreBatchTasks(r.Tasks)
req.Req = &r
case tikvrpc.CmdCopStream:
r := *req.Cop()
r.Ranges = c.encodeCopRanges(r.Ranges)
r.Tasks = c.encodeStoreBatchTasks(r.Tasks)
req.Req = &r
case tikvrpc.CmdMvccGetByKey:
r := *req.MvccGetByKey()
r.Key = c.EncodeKey(r.Key)
req.Req = &r
case tikvrpc.CmdSplitRegion:
r := *req.SplitRegion()
r.SplitKeys = c.encodeKeys(r.SplitKeys)
req.Req = &r
}
return req, nil
}
// DecodeResponse decode the resp with the given codec.
func (c *codecV2) DecodeResponse(req *tikvrpc.Request, resp *tikvrpc.Response) (*tikvrpc.Response, error) {
defer c.reqPool.Put(req)
var err error
// Decode response based on command type.
switch req.Type {
// Transaction KV responses.
// Keys that need to be decoded lies in RegionError, KeyError and LockInfo.
case tikvrpc.CmdGet:
r := resp.Resp.(*kvrpcpb.GetResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
r.Error, err = c.decodeKeyError(r.Error)
if err != nil {
return nil, err
}
case tikvrpc.CmdScan:
r := resp.Resp.(*kvrpcpb.ScanResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
r.Pairs, err = c.decodePairs(r.Pairs)
if err != nil {
return nil, err
}
r.Error, err = c.decodeKeyError(r.Error)
if err != nil {
return nil, err
}
case tikvrpc.CmdPrewrite:
r := resp.Resp.(*kvrpcpb.PrewriteResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
r.Errors, err = c.decodeKeyErrors(r.Errors)
if err != nil {
return nil, err
}
case tikvrpc.CmdCommit:
r := resp.Resp.(*kvrpcpb.CommitResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
r.Error, err = c.decodeKeyError(r.Error)
if err != nil {
return nil, err
}
case tikvrpc.CmdCleanup:
r := resp.Resp.(*kvrpcpb.CleanupResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
r.Error, err = c.decodeKeyError(r.Error)
if err != nil {
return nil, err
}
case tikvrpc.CmdBatchGet:
r := resp.Resp.(*kvrpcpb.BatchGetResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
r.Pairs, err = c.decodePairs(r.Pairs)
if err != nil {
return nil, err
}
r.Error, err = c.decodeKeyError(r.Error)
if err != nil {
return nil, err
}
case tikvrpc.CmdBatchRollback:
r := resp.Resp.(*kvrpcpb.BatchRollbackResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
r.Error, err = c.decodeKeyError(r.Error)
if err != nil {
return nil, err
}
case tikvrpc.CmdScanLock:
r := resp.Resp.(*kvrpcpb.ScanLockResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
r.Error, err = c.decodeKeyError(r.Error)
if err != nil {
return nil, err
}
r.Locks, err = c.decodeLockInfos(r.Locks)
if err != nil {
return nil, err
}
case tikvrpc.CmdResolveLock:
r := resp.Resp.(*kvrpcpb.ResolveLockResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
r.Error, err = c.decodeKeyError(r.Error)
if err != nil {
return nil, err
}
case tikvrpc.CmdGC:
// TODO: Deprecate Central GC Mode.
r := resp.Resp.(*kvrpcpb.GCResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
r.Error, err = c.decodeKeyError(r.Error)
if err != nil {
return nil, err
}
case tikvrpc.CmdDeleteRange:
r := resp.Resp.(*kvrpcpb.DeleteRangeResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
case tikvrpc.CmdPessimisticLock:
r := resp.Resp.(*kvrpcpb.PessimisticLockResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
r.Errors, err = c.decodeKeyErrors(r.Errors)
if err != nil {
return nil, err
}
case tikvrpc.CmdPessimisticRollback:
r := resp.Resp.(*kvrpcpb.PessimisticRollbackResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
r.Errors, err = c.decodeKeyErrors(r.Errors)
if err != nil {
return nil, err
}
case tikvrpc.CmdTxnHeartBeat:
r := resp.Resp.(*kvrpcpb.TxnHeartBeatResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
r.Error, err = c.decodeKeyError(r.Error)
if err != nil {
return nil, err
}
case tikvrpc.CmdCheckTxnStatus:
r := resp.Resp.(*kvrpcpb.CheckTxnStatusResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
r.Error, err = c.decodeKeyError(r.Error)
if err != nil {
return nil, err
}
r.LockInfo, err = c.decodeLockInfo(r.LockInfo)
if err != nil {
return nil, err
}
case tikvrpc.CmdCheckSecondaryLocks:
r := resp.Resp.(*kvrpcpb.CheckSecondaryLocksResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
r.Error, err = c.decodeKeyError(r.Error)
if err != nil {
return nil, err
}
r.Locks, err = c.decodeLockInfos(r.Locks)
if err != nil {
return nil, err
}
// RawKV Responses.
// Most of these responses does not require treatment aside from Region Error decoding.
// Exceptions are Response with keys attach to them, like RawScan and RawBatchGet,
// which need have their keys decoded.
case tikvrpc.CmdRawGet:
r := resp.Resp.(*kvrpcpb.RawGetResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
case tikvrpc.CmdRawBatchGet:
r := resp.Resp.(*kvrpcpb.RawBatchGetResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
r.Pairs, err = c.decodePairs(r.Pairs)
if err != nil {
return nil, err
}
case tikvrpc.CmdRawPut:
r := resp.Resp.(*kvrpcpb.RawPutResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
case tikvrpc.CmdRawBatchPut:
r := resp.Resp.(*kvrpcpb.RawBatchPutResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
case tikvrpc.CmdRawDelete:
r := resp.Resp.(*kvrpcpb.RawDeleteResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
case tikvrpc.CmdRawBatchDelete:
r := resp.Resp.(*kvrpcpb.RawBatchDeleteResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
case tikvrpc.CmdRawDeleteRange:
r := resp.Resp.(*kvrpcpb.RawDeleteRangeResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
case tikvrpc.CmdRawScan:
r := resp.Resp.(*kvrpcpb.RawScanResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
r.Kvs, err = c.decodePairs(r.Kvs)
if err != nil {
return nil, err
}
case tikvrpc.CmdGetKeyTTL:
r := resp.Resp.(*kvrpcpb.RawGetKeyTTLResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
case tikvrpc.CmdRawCompareAndSwap:
r := resp.Resp.(*kvrpcpb.RawCASResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
case tikvrpc.CmdRawChecksum:
r := resp.Resp.(*kvrpcpb.RawChecksumResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
// Other requests.
case tikvrpc.CmdUnsafeDestroyRange:
r := resp.Resp.(*kvrpcpb.UnsafeDestroyRangeResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
case tikvrpc.CmdPhysicalScanLock:
r := resp.Resp.(*kvrpcpb.PhysicalScanLockResponse)
r.Locks, err = c.decodeLockInfos(r.Locks)
if err != nil {
return nil, err
}
case tikvrpc.CmdCop:
r := resp.Resp.(*coprocessor.Response)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
r.Locked, err = c.decodeLockInfo(r.Locked)
if err != nil {
return nil, err
}
r.Range, err = c.decodeCopRange(r.Range)
if err != nil {
return nil, err
}
case tikvrpc.CmdCopStream:
return nil, errors.New("streaming coprocessor is not supported yet")
case tikvrpc.CmdBatchCop, tikvrpc.CmdMPPTask:
// There aren't range infos in BatchCop and MPPTask responses.
case tikvrpc.CmdMvccGetByKey:
r := resp.Resp.(*kvrpcpb.MvccGetByKeyResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
case tikvrpc.CmdSplitRegion:
r := resp.Resp.(*kvrpcpb.SplitRegionResponse)
r.RegionError, err = c.decodeRegionError(r.RegionError)
if err != nil {
return nil, err
}
r.Regions, err = c.decodeRegions(r.Regions)
if err != nil {
return nil, err
}
}
return resp, nil
}
func (c *codecV2) EncodeRegionKey(key []byte) []byte {
encodeKey := c.EncodeKey(key)
return c.memCodec.encodeKey(encodeKey)
}
func (c *codecV2) DecodeRegionKey(encodedKey []byte) ([]byte, error) {
memDecoded, err := c.memCodec.decodeKey(encodedKey)
if err != nil {
return nil, err
}
return c.DecodeKey(memDecoded)
}
// EncodeRegionRange first append appropriate prefix to start and end,
// then pass them to memCodec to encode them to appropriate memory format.
func (c *codecV2) EncodeRegionRange(start, end []byte) ([]byte, []byte) {
encodedStart, encodedEnd := c.encodeRange(start, end, false)
encodedStart = c.memCodec.encodeKey(encodedStart)
encodedEnd = c.memCodec.encodeKey(encodedEnd)
return encodedStart, encodedEnd
}
// DecodeRegionRange first decode key from memory compatible format,
// then pass decode them with DecodeRange to map them to correct range.
// Note that empty byte slice/ nil slice requires special treatment.
func (c *codecV2) DecodeRegionRange(encodedStart, encodedEnd []byte) ([]byte, []byte, error) {
var err error
if len(encodedStart) != 0 {
encodedStart, err = c.memCodec.decodeKey(encodedStart)
if err != nil {
return nil, nil, err
}
}
if len(encodedEnd) != 0 {
encodedEnd, err = c.memCodec.decodeKey(encodedEnd)
if err != nil {
return nil, nil, err
}
}
return c.DecodeRange(encodedStart, encodedEnd)
}
func (c *codecV2) EncodeRange(start, end []byte) ([]byte, []byte) {
return c.encodeRange(start, end, false)
}
// encodeRange encodes start and end to correct range in APIv2.
// Note that if end is nil/ empty byte slice, it means no end.
// So we use endKey of the keyspace directly.
func (c *codecV2) encodeRange(start, end []byte, reverse bool) ([]byte, []byte) {
// If reverse, scan from end to start.
// Corresponding start and end encode needs to be reversed.
if reverse {
end, start = c.encodeRange(end, start, false)
return start, end
}
var encodedEnd []byte
if len(end) > 0 {
encodedEnd = c.EncodeKey(end)
} else {
encodedEnd = c.endKey
}
return c.EncodeKey(start), encodedEnd
}
// DecodeRange maps encodedStart and end back to normal start and
// end without APIv2 prefixes.
func (c *codecV2) DecodeRange(encodedStart, encodedEnd []byte) (start []byte, end []byte, err error) {
if bytes.Compare(encodedStart, c.endKey) >= 0 ||
(len(encodedEnd) > 0 && bytes.Compare(encodedEnd, c.prefix) <= 0) {
return nil, nil, errors.WithStack(errKeyOutOfBound)
}
start, end = []byte{}, []byte{}
if bytes.HasPrefix(encodedStart, c.prefix) {
start = encodedStart[len(c.prefix):]
}
if bytes.HasPrefix(encodedEnd, c.prefix) {
end = encodedEnd[len(c.prefix):]
}
return
}
func (c *codecV2) EncodeKey(key []byte) []byte {
return append(c.prefix, key...)
}
func (c *codecV2) DecodeKey(encodedKey []byte) ([]byte, error) {
// If the given key does not start with the correct prefix,
// return out of bound error.
if !bytes.HasPrefix(encodedKey, c.prefix) {
logutil.BgLogger().Warn("key not in keyspace",
zap.String("keyspacePrefix", redact.Key(c.prefix)),
zap.String("key", redact.Key(encodedKey)),
zap.Stack("stack"))
return nil, errKeyOutOfBound
}
return encodedKey[len(c.prefix):], nil
}
func (c *codecV2) encodeKeyRange(keyRange *kvrpcpb.KeyRange) *kvrpcpb.KeyRange {
encodedRange := &kvrpcpb.KeyRange{}
encodedRange.StartKey, encodedRange.EndKey = c.encodeRange(keyRange.StartKey, keyRange.EndKey, false)
return encodedRange
}
func (c *codecV2) encodeKeyRanges(keyRanges []*kvrpcpb.KeyRange) []*kvrpcpb.KeyRange {
encodedRanges := make([]*kvrpcpb.KeyRange, 0, len(keyRanges))
for _, keyRange := range keyRanges {
encodedRanges = append(encodedRanges, c.encodeKeyRange(keyRange))
}
return encodedRanges
}
func (c *codecV2) encodeCopRange(r *coprocessor.KeyRange) *coprocessor.KeyRange {
newRange := &coprocessor.KeyRange{}
newRange.Start, newRange.End = c.encodeRange(r.Start, r.End, false)
return newRange
}
func (c *codecV2) decodeCopRange(r *coprocessor.KeyRange) (*coprocessor.KeyRange, error) {
var err error
if r != nil {
r.Start, r.End, err = c.DecodeRange(r.Start, r.End)
}
if err != nil {
return nil, err
}
return r, nil
}
func (c *codecV2) encodeCopRanges(ranges []*coprocessor.KeyRange) []*coprocessor.KeyRange {
newRanges := make([]*coprocessor.KeyRange, 0, len(ranges))
for _, r := range ranges {
newRanges = append(newRanges, c.encodeCopRange(r))
}
return newRanges
}
func (c *codecV2) decodeRegions(regions []*metapb.Region) ([]*metapb.Region, error) {
var err error
for _, region := range regions {
region.StartKey, region.EndKey, err = c.DecodeRegionRange(region.StartKey, region.EndKey)
if err != nil {
return nil, err
}
}
return regions, nil
}
func (c *codecV2) encodeKeys(keys [][]byte) [][]byte {
var encodedKeys [][]byte
for _, key := range keys {
encodedKeys = append(encodedKeys, c.EncodeKey(key))
}
return encodedKeys
}
func (c *codecV2) encodeParis(pairs []*kvrpcpb.KvPair) []*kvrpcpb.KvPair {
var encodedPairs []*kvrpcpb.KvPair
for _, pair := range pairs {
p := *pair
p.Key = c.EncodeKey(p.Key)
encodedPairs = append(encodedPairs, &p)
}
return encodedPairs
}
func (c *codecV2) decodePairs(encodedPairs []*kvrpcpb.KvPair) ([]*kvrpcpb.KvPair, error) {
var pairs []*kvrpcpb.KvPair
for _, encodedPair := range encodedPairs {
var err error
p := *encodedPair
if p.Error != nil {
p.Error, err = c.decodeKeyError(p.Error)
if err != nil {
return nil, err
}
}
if len(p.Key) > 0 {
p.Key, err = c.DecodeKey(p.Key)
if err != nil {
return nil, err
}
}
pairs = append(pairs, &p)
}
return pairs, nil
}
func (c *codecV2) encodeMutations(mutations []*kvrpcpb.Mutation) []*kvrpcpb.Mutation {
var encodedMutations []*kvrpcpb.Mutation
for _, mutation := range mutations {
m := *mutation
m.Key = c.EncodeKey(m.Key)
encodedMutations = append(encodedMutations, &m)
}
return encodedMutations
}
func (c *codecV2) encodeRegionInfo(info *coprocessor.RegionInfo) *coprocessor.RegionInfo {
i := *info
i.Ranges = c.encodeCopRanges(info.Ranges)
return &i
}
func (c *codecV2) encodeRegionInfos(infos []*coprocessor.RegionInfo) []*coprocessor.RegionInfo {
var encodedInfos []*coprocessor.RegionInfo
for _, info := range infos {
encodedInfos = append(encodedInfos, c.encodeRegionInfo(info))
}
return encodedInfos
}
func (c *codecV2) encodeTableRegions(infos []*coprocessor.TableRegions) []*coprocessor.TableRegions {
var encodedInfos []*coprocessor.TableRegions
for _, info := range infos {
i := *info
i.Regions = c.encodeRegionInfos(info.Regions)
encodedInfos = append(encodedInfos, &i)
}
return encodedInfos
}
func (c *codecV2) encodeStoreBatchTasks(tasks []*coprocessor.StoreBatchTask) []*coprocessor.StoreBatchTask {
var encodedTasks []*coprocessor.StoreBatchTask
for _, task := range tasks {
t := *task
t.Ranges = c.encodeCopRanges(t.Ranges)
encodedTasks = append(encodedTasks, &t)
}
return encodedTasks
}
func (c *codecV2) decodeRegionError(regionError *errorpb.Error) (*errorpb.Error, error) {
if regionError == nil {
return nil, nil
}
var err error
if errInfo := regionError.KeyNotInRegion; errInfo != nil {
errInfo.Key, err = c.DecodeKey(errInfo.Key)
if err != nil {
return nil, err
}
errInfo.StartKey, errInfo.EndKey, err = c.DecodeRegionRange(errInfo.StartKey, errInfo.EndKey)
if err != nil {
return nil, err
}
}
if errInfo := regionError.EpochNotMatch; errInfo != nil {
decodedRegions := make([]*metapb.Region, 0, len(errInfo.CurrentRegions))
for _, meta := range errInfo.CurrentRegions {
meta.StartKey, meta.EndKey, err = c.DecodeRegionRange(meta.StartKey, meta.EndKey)
if err != nil {
// skip out of keyspace range's region
if errors.Is(err, errKeyOutOfBound) {
continue
}
return nil, err
}
decodedRegions = append(decodedRegions, meta)
}
errInfo.CurrentRegions = decodedRegions
}
return regionError, nil
}
func (c *codecV2) decodeKeyError(keyError *kvrpcpb.KeyError) (*kvrpcpb.KeyError, error) {
if keyError == nil {
return nil, nil
}
var err error
if keyError.Locked != nil {
keyError.Locked, err = c.decodeLockInfo(keyError.Locked)
if err != nil {
return nil, err
}
}
if keyError.Conflict != nil {
keyError.Conflict.Key, err = c.DecodeKey(keyError.Conflict.Key)
if err != nil {
return nil, err
}
keyError.Conflict.Primary, err = c.DecodeKey(keyError.Conflict.Primary)
if err != nil {
return nil, err
}
}
if keyError.AlreadyExist != nil {
keyError.AlreadyExist.Key, err = c.DecodeKey(keyError.AlreadyExist.Key)
if err != nil {
return nil, err
}
}
if keyError.Deadlock != nil {
keyError.Deadlock.LockKey, err = c.DecodeKey(keyError.Deadlock.LockKey)
if err != nil {
return nil, err
}
for _, wait := range keyError.Deadlock.WaitChain {
wait.Key, err = c.DecodeKey(wait.Key)
if err != nil {
return nil, err
}
}
}
if keyError.CommitTsExpired != nil {
keyError.CommitTsExpired.Key, err = c.DecodeKey(keyError.CommitTsExpired.Key)
if err != nil {
return nil, err
}
}
if keyError.TxnNotFound != nil {
keyError.TxnNotFound.PrimaryKey, err = c.DecodeKey(keyError.TxnNotFound.PrimaryKey)
if err != nil {
return nil, err
}
}
if keyError.AssertionFailed != nil {
keyError.AssertionFailed.Key, err = c.DecodeKey(keyError.AssertionFailed.Key)
if err != nil {
return nil, err
}
}
if keyError.TxnLockNotFound != nil {
keyError.TxnLockNotFound.Key, err = c.DecodeKey(keyError.TxnLockNotFound.Key)
if err != nil {
return nil, err
}
}
if debugInfo := keyError.DebugInfo; debugInfo != nil {
for _, mvccInfo := range debugInfo.MvccInfo {
if mvccInfo.Key, err = c.DecodeKey(mvccInfo.Key); err != nil {
return nil, err
}
}
}
return keyError, nil
}
func (c *codecV2) decodeKeyErrors(keyErrors []*kvrpcpb.KeyError) ([]*kvrpcpb.KeyError, error) {
var err error
for i := range keyErrors {
keyErrors[i], err = c.decodeKeyError(keyErrors[i])
if err != nil {
return nil, err
}
}
return keyErrors, nil
}
func (c *codecV2) decodeLockInfo(info *kvrpcpb.LockInfo) (*kvrpcpb.LockInfo, error) {
if info == nil {
return nil, nil
}
var err error
info.Key, err = c.DecodeKey(info.Key)
if err != nil {
return nil, err
}
info.PrimaryLock, err = c.DecodeKey(info.PrimaryLock)
if err != nil {
return nil, err
}
for i := range info.Secondaries {
info.Secondaries[i], err = c.DecodeKey(info.Secondaries[i])
if err != nil {
return nil, err
}
}
return info, nil
}
func (c *codecV2) decodeLockInfos(locks []*kvrpcpb.LockInfo) ([]*kvrpcpb.LockInfo, error) {
var err error
for i := range locks {
locks[i], err = c.decodeLockInfo(locks[i])
if err != nil {
return nil, err
}
}
return locks, nil
}
func (c *codecV2) DecodeBucketKeys(keys [][]byte) ([][]byte, error) {
ks := make([][]byte, 0, len(keys))
for i, key := range keys {
var (
k []byte
err error
)
if len(key) > 0 {
k, err = c.memCodec.decodeKey(key)
}
if err != nil {
return nil, err
}
if i == 0 && bytes.Compare(k, c.prefix) < 0 {
ks = append(ks, []byte{})
} else if i == len(keys)-1 && (len(k) == 0 || bytes.Compare(k, c.endKey) >= 0) {
ks = append(ks, []byte{})
} else if bytes.HasPrefix(k, c.prefix) {
raw := k[len(c.prefix):]
if len(raw) == 0 && len(ks) > 0 && len(ks[0]) == 0 {
continue
}
ks = append(ks, raw)
}
}
return ks, nil
}