package apicodec import ( "bytes" "encoding/binary" "sync" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pkg/errors" "github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util/redact" "go.uber.org/zap" ) var ( // DefaultKeyspaceID is the keyspaceID of the default keyspace. DefaultKeyspaceID uint32 = 0 // DefaultKeyspaceName is the name of the default keyspace. DefaultKeyspaceName = "DEFAULT" rawModePrefix byte = 'r' txnModePrefix byte = 'x' keyspacePrefixLen = 4 // maxKeyspaceID is the maximum value of keyspaceID, its value is uint24Max. maxKeyspaceID = uint32(0xFFFFFF) // errKeyOutOfBound happens when key to be decoded lies outside the keyspace's range. errKeyOutOfBound = errors.New("given key does not belong to the keyspace") ) func checkV2Key(b []byte) error { if len(b) < keyspacePrefixLen || (b[0] != rawModePrefix && b[0] != txnModePrefix) { return errors.Errorf("invalid API V2 key %s", b) } return nil } // BuildKeyspaceName builds a keyspace name func BuildKeyspaceName(name string) string { if name == "" { return DefaultKeyspaceName } return name } // codecV2 is used to encode/decode keys and request into APIv2 format. type codecV2 struct { reqPool sync.Pool prefix []byte endKey []byte memCodec memCodec keyspaceMeta *keyspacepb.KeyspaceMeta } // NewCodecV2 returns a codec that can be used to encode/decode // keys and requests to and from APIv2 format. func NewCodecV2(mode Mode, keyspaceMeta *keyspacepb.KeyspaceMeta) (Codec, error) { keyspaceID := keyspaceMeta.Id if keyspaceID > maxKeyspaceID { return nil, errors.Errorf("keyspaceID %d is out of range, maximum is %d", keyspaceID, maxKeyspaceID) } prefix, err := getIDByte(keyspaceID) if err != nil { return nil, err } codec := &codecV2{ // Region keys in CodecV2 are always encoded in memory comparable form. memCodec: &memComparableCodec{}, keyspaceMeta: keyspaceMeta, } codec.prefix = make([]byte, 4) codec.endKey = make([]byte, 4) switch mode { case ModeRaw: codec.prefix[0] = rawModePrefix case ModeTxn: codec.prefix[0] = txnModePrefix default: return nil, errors.Errorf("unknown mode") } copy(codec.prefix[1:], prefix) prefixVal := binary.BigEndian.Uint32(codec.prefix) binary.BigEndian.PutUint32(codec.endKey, prefixVal+1) codec.reqPool.New = func() any { return &tikvrpc.Request{} } return codec, nil } func getIDByte(keyspaceID uint32) ([]byte, error) { // PutUint32 requires 4 bytes to operate, so must use buffer with size 4 here. b := make([]byte, 4) // Use BigEndian to put the least significant byte to last array position. // For example, keyspaceID 1 should result in []byte{0, 0, 1} binary.BigEndian.PutUint32(b, keyspaceID) // When keyspaceID can't fit in 3 bytes, first byte of buffer will be non-zero. // So return error. if b[0] != 0 { return nil, errors.Errorf("illegal keyspaceID: %v, keyspaceID must be 3 byte", b) } // Remove the first byte to make keyspace ID 3 bytes. return b[1:], nil } func (c *codecV2) GetKeyspace() []byte { return c.prefix } func (c *codecV2) GetKeyspaceID() KeyspaceID { return KeyspaceID(c.keyspaceMeta.Id) } func (c *codecV2) GetKeyspaceMeta() *keyspacepb.KeyspaceMeta { return c.keyspaceMeta } func (c *codecV2) GetAPIVersion() kvrpcpb.APIVersion { return kvrpcpb.APIVersion_V2 } // EncodeRequest encodes with the given Codec. // NOTE: req is reused on retry. MUST encode on cloned request, other than overwrite the original. func (c *codecV2) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error) { r := c.reqPool.Get().(*tikvrpc.Request) *r = *req setAPICtx(c, r) req = r // Encode requests based on command type. switch req.Type { // Transaction Request Types. case tikvrpc.CmdGet: r := *req.Get() r.Key = c.EncodeKey(r.Key) req.Req = &r case tikvrpc.CmdScan: r := *req.Scan() r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, r.Reverse) req.Req = &r case tikvrpc.CmdPrewrite: r := *req.Prewrite() r.Mutations = c.encodeMutations(r.Mutations) r.PrimaryLock = c.EncodeKey(r.PrimaryLock) r.Secondaries = c.encodeKeys(r.Secondaries) req.Req = &r case tikvrpc.CmdCommit: r := *req.Commit() r.Keys = c.encodeKeys(r.Keys) if len(r.PrimaryKey) > 0 { // Only encode the primary key if it is not empty. // Otherwise, it means `PrimaryKey` is not set, left it as empty to indicate it is not set in RPC. r.PrimaryKey = c.EncodeKey(r.PrimaryKey) } req.Req = &r case tikvrpc.CmdCleanup: r := *req.Cleanup() r.Key = c.EncodeKey(r.Key) req.Req = &r case tikvrpc.CmdBatchGet: r := *req.BatchGet() r.Keys = c.encodeKeys(r.Keys) req.Req = &r case tikvrpc.CmdBatchRollback: r := *req.BatchRollback() r.Keys = c.encodeKeys(r.Keys) req.Req = &r case tikvrpc.CmdScanLock: r := *req.ScanLock() r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, false) req.Req = &r case tikvrpc.CmdResolveLock: r := *req.ResolveLock() r.Keys = c.encodeKeys(r.Keys) req.Req = &r case tikvrpc.CmdGC: // TODO: Deprecate Central GC Mode. case tikvrpc.CmdDeleteRange: r := *req.DeleteRange() r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, false) req.Req = &r case tikvrpc.CmdPessimisticLock: r := *req.PessimisticLock() r.Mutations = c.encodeMutations(r.Mutations) r.PrimaryLock = c.EncodeKey(r.PrimaryLock) req.Req = &r case tikvrpc.CmdPessimisticRollback: r := *req.PessimisticRollback() r.Keys = c.encodeKeys(r.Keys) req.Req = &r case tikvrpc.CmdTxnHeartBeat: r := *req.TxnHeartBeat() r.PrimaryLock = c.EncodeKey(r.PrimaryLock) req.Req = &r case tikvrpc.CmdCheckTxnStatus: r := *req.CheckTxnStatus() r.PrimaryKey = c.EncodeKey(r.PrimaryKey) req.Req = &r case tikvrpc.CmdCheckSecondaryLocks: r := *req.CheckSecondaryLocks() r.Keys = c.encodeKeys(r.Keys) req.Req = &r // Raw Request Types. case tikvrpc.CmdRawGet: r := *req.RawGet() r.Key = c.EncodeKey(r.Key) req.Req = &r case tikvrpc.CmdRawBatchGet: r := *req.RawBatchGet() r.Keys = c.encodeKeys(r.Keys) req.Req = &r case tikvrpc.CmdRawPut: r := *req.RawPut() r.Key = c.EncodeKey(r.Key) req.Req = &r case tikvrpc.CmdRawBatchPut: r := *req.RawBatchPut() r.Pairs = c.encodeParis(r.Pairs) req.Req = &r case tikvrpc.CmdRawDelete: r := *req.RawDelete() r.Key = c.EncodeKey(r.Key) req.Req = &r case tikvrpc.CmdRawBatchDelete: r := *req.RawBatchDelete() r.Keys = c.encodeKeys(r.Keys) req.Req = &r case tikvrpc.CmdRawDeleteRange: r := *req.RawDeleteRange() r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, false) req.Req = &r case tikvrpc.CmdRawScan: r := *req.RawScan() r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, r.Reverse) req.Req = &r case tikvrpc.CmdGetKeyTTL: r := *req.RawGetKeyTTL() r.Key = c.EncodeKey(r.Key) req.Req = &r case tikvrpc.CmdRawCompareAndSwap: r := *req.RawCompareAndSwap() r.Key = c.EncodeKey(r.Key) req.Req = &r case tikvrpc.CmdRawChecksum: r := *req.RawChecksum() r.Ranges = c.encodeKeyRanges(r.Ranges) req.Req = &r // TiFlash Requests case tikvrpc.CmdBatchCop: r := *req.BatchCop() r.Regions = c.encodeRegionInfos(r.Regions) r.TableRegions = c.encodeTableRegions(r.TableRegions) req.Req = &r case tikvrpc.CmdMPPTask: r := *req.DispatchMPPTask() r.Regions = c.encodeRegionInfos(r.Regions) r.TableRegions = c.encodeTableRegions(r.TableRegions) req.Req = &r // Other requests. case tikvrpc.CmdUnsafeDestroyRange: r := *req.UnsafeDestroyRange() r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, false) req.Req = &r case tikvrpc.CmdPhysicalScanLock: r := *req.PhysicalScanLock() r.StartKey = c.EncodeKey(r.StartKey) req.Req = &r case tikvrpc.CmdStoreSafeTS: r := *req.StoreSafeTS() r.KeyRange = c.encodeKeyRange(r.KeyRange) req.Req = &r case tikvrpc.CmdCop: r := *req.Cop() r.Ranges = c.encodeCopRanges(r.Ranges) r.Tasks = c.encodeStoreBatchTasks(r.Tasks) req.Req = &r case tikvrpc.CmdCopStream: r := *req.Cop() r.Ranges = c.encodeCopRanges(r.Ranges) r.Tasks = c.encodeStoreBatchTasks(r.Tasks) req.Req = &r case tikvrpc.CmdMvccGetByKey: r := *req.MvccGetByKey() r.Key = c.EncodeKey(r.Key) req.Req = &r case tikvrpc.CmdSplitRegion: r := *req.SplitRegion() r.SplitKeys = c.encodeKeys(r.SplitKeys) req.Req = &r } return req, nil } // DecodeResponse decode the resp with the given codec. func (c *codecV2) DecodeResponse(req *tikvrpc.Request, resp *tikvrpc.Response) (*tikvrpc.Response, error) { defer c.reqPool.Put(req) var err error // Decode response based on command type. switch req.Type { // Transaction KV responses. // Keys that need to be decoded lies in RegionError, KeyError and LockInfo. case tikvrpc.CmdGet: r := resp.Resp.(*kvrpcpb.GetResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } r.Error, err = c.decodeKeyError(r.Error) if err != nil { return nil, err } case tikvrpc.CmdScan: r := resp.Resp.(*kvrpcpb.ScanResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } r.Pairs, err = c.decodePairs(r.Pairs) if err != nil { return nil, err } r.Error, err = c.decodeKeyError(r.Error) if err != nil { return nil, err } case tikvrpc.CmdPrewrite: r := resp.Resp.(*kvrpcpb.PrewriteResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } r.Errors, err = c.decodeKeyErrors(r.Errors) if err != nil { return nil, err } case tikvrpc.CmdCommit: r := resp.Resp.(*kvrpcpb.CommitResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } r.Error, err = c.decodeKeyError(r.Error) if err != nil { return nil, err } case tikvrpc.CmdCleanup: r := resp.Resp.(*kvrpcpb.CleanupResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } r.Error, err = c.decodeKeyError(r.Error) if err != nil { return nil, err } case tikvrpc.CmdBatchGet: r := resp.Resp.(*kvrpcpb.BatchGetResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } r.Pairs, err = c.decodePairs(r.Pairs) if err != nil { return nil, err } r.Error, err = c.decodeKeyError(r.Error) if err != nil { return nil, err } case tikvrpc.CmdBatchRollback: r := resp.Resp.(*kvrpcpb.BatchRollbackResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } r.Error, err = c.decodeKeyError(r.Error) if err != nil { return nil, err } case tikvrpc.CmdScanLock: r := resp.Resp.(*kvrpcpb.ScanLockResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } r.Error, err = c.decodeKeyError(r.Error) if err != nil { return nil, err } r.Locks, err = c.decodeLockInfos(r.Locks) if err != nil { return nil, err } case tikvrpc.CmdResolveLock: r := resp.Resp.(*kvrpcpb.ResolveLockResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } r.Error, err = c.decodeKeyError(r.Error) if err != nil { return nil, err } case tikvrpc.CmdGC: // TODO: Deprecate Central GC Mode. r := resp.Resp.(*kvrpcpb.GCResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } r.Error, err = c.decodeKeyError(r.Error) if err != nil { return nil, err } case tikvrpc.CmdDeleteRange: r := resp.Resp.(*kvrpcpb.DeleteRangeResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } case tikvrpc.CmdPessimisticLock: r := resp.Resp.(*kvrpcpb.PessimisticLockResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } r.Errors, err = c.decodeKeyErrors(r.Errors) if err != nil { return nil, err } case tikvrpc.CmdPessimisticRollback: r := resp.Resp.(*kvrpcpb.PessimisticRollbackResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } r.Errors, err = c.decodeKeyErrors(r.Errors) if err != nil { return nil, err } case tikvrpc.CmdTxnHeartBeat: r := resp.Resp.(*kvrpcpb.TxnHeartBeatResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } r.Error, err = c.decodeKeyError(r.Error) if err != nil { return nil, err } case tikvrpc.CmdCheckTxnStatus: r := resp.Resp.(*kvrpcpb.CheckTxnStatusResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } r.Error, err = c.decodeKeyError(r.Error) if err != nil { return nil, err } r.LockInfo, err = c.decodeLockInfo(r.LockInfo) if err != nil { return nil, err } case tikvrpc.CmdCheckSecondaryLocks: r := resp.Resp.(*kvrpcpb.CheckSecondaryLocksResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } r.Error, err = c.decodeKeyError(r.Error) if err != nil { return nil, err } r.Locks, err = c.decodeLockInfos(r.Locks) if err != nil { return nil, err } // RawKV Responses. // Most of these responses does not require treatment aside from Region Error decoding. // Exceptions are Response with keys attach to them, like RawScan and RawBatchGet, // which need have their keys decoded. case tikvrpc.CmdRawGet: r := resp.Resp.(*kvrpcpb.RawGetResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } case tikvrpc.CmdRawBatchGet: r := resp.Resp.(*kvrpcpb.RawBatchGetResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } r.Pairs, err = c.decodePairs(r.Pairs) if err != nil { return nil, err } case tikvrpc.CmdRawPut: r := resp.Resp.(*kvrpcpb.RawPutResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } case tikvrpc.CmdRawBatchPut: r := resp.Resp.(*kvrpcpb.RawBatchPutResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } case tikvrpc.CmdRawDelete: r := resp.Resp.(*kvrpcpb.RawDeleteResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } case tikvrpc.CmdRawBatchDelete: r := resp.Resp.(*kvrpcpb.RawBatchDeleteResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } case tikvrpc.CmdRawDeleteRange: r := resp.Resp.(*kvrpcpb.RawDeleteRangeResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } case tikvrpc.CmdRawScan: r := resp.Resp.(*kvrpcpb.RawScanResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } r.Kvs, err = c.decodePairs(r.Kvs) if err != nil { return nil, err } case tikvrpc.CmdGetKeyTTL: r := resp.Resp.(*kvrpcpb.RawGetKeyTTLResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } case tikvrpc.CmdRawCompareAndSwap: r := resp.Resp.(*kvrpcpb.RawCASResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } case tikvrpc.CmdRawChecksum: r := resp.Resp.(*kvrpcpb.RawChecksumResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } // Other requests. case tikvrpc.CmdUnsafeDestroyRange: r := resp.Resp.(*kvrpcpb.UnsafeDestroyRangeResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } case tikvrpc.CmdPhysicalScanLock: r := resp.Resp.(*kvrpcpb.PhysicalScanLockResponse) r.Locks, err = c.decodeLockInfos(r.Locks) if err != nil { return nil, err } case tikvrpc.CmdCop: r := resp.Resp.(*coprocessor.Response) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } r.Locked, err = c.decodeLockInfo(r.Locked) if err != nil { return nil, err } r.Range, err = c.decodeCopRange(r.Range) if err != nil { return nil, err } case tikvrpc.CmdCopStream: return nil, errors.New("streaming coprocessor is not supported yet") case tikvrpc.CmdBatchCop, tikvrpc.CmdMPPTask: // There aren't range infos in BatchCop and MPPTask responses. case tikvrpc.CmdMvccGetByKey: r := resp.Resp.(*kvrpcpb.MvccGetByKeyResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } case tikvrpc.CmdSplitRegion: r := resp.Resp.(*kvrpcpb.SplitRegionResponse) r.RegionError, err = c.decodeRegionError(r.RegionError) if err != nil { return nil, err } r.Regions, err = c.decodeRegions(r.Regions) if err != nil { return nil, err } } return resp, nil } func (c *codecV2) EncodeRegionKey(key []byte) []byte { encodeKey := c.EncodeKey(key) return c.memCodec.encodeKey(encodeKey) } func (c *codecV2) DecodeRegionKey(encodedKey []byte) ([]byte, error) { memDecoded, err := c.memCodec.decodeKey(encodedKey) if err != nil { return nil, err } return c.DecodeKey(memDecoded) } // EncodeRegionRange first append appropriate prefix to start and end, // then pass them to memCodec to encode them to appropriate memory format. func (c *codecV2) EncodeRegionRange(start, end []byte) ([]byte, []byte) { encodedStart, encodedEnd := c.encodeRange(start, end, false) encodedStart = c.memCodec.encodeKey(encodedStart) encodedEnd = c.memCodec.encodeKey(encodedEnd) return encodedStart, encodedEnd } // DecodeRegionRange first decode key from memory compatible format, // then pass decode them with DecodeRange to map them to correct range. // Note that empty byte slice/ nil slice requires special treatment. func (c *codecV2) DecodeRegionRange(encodedStart, encodedEnd []byte) ([]byte, []byte, error) { var err error if len(encodedStart) != 0 { encodedStart, err = c.memCodec.decodeKey(encodedStart) if err != nil { return nil, nil, err } } if len(encodedEnd) != 0 { encodedEnd, err = c.memCodec.decodeKey(encodedEnd) if err != nil { return nil, nil, err } } return c.DecodeRange(encodedStart, encodedEnd) } func (c *codecV2) EncodeRange(start, end []byte) ([]byte, []byte) { return c.encodeRange(start, end, false) } // encodeRange encodes start and end to correct range in APIv2. // Note that if end is nil/ empty byte slice, it means no end. // So we use endKey of the keyspace directly. func (c *codecV2) encodeRange(start, end []byte, reverse bool) ([]byte, []byte) { // If reverse, scan from end to start. // Corresponding start and end encode needs to be reversed. if reverse { end, start = c.encodeRange(end, start, false) return start, end } var encodedEnd []byte if len(end) > 0 { encodedEnd = c.EncodeKey(end) } else { encodedEnd = c.endKey } return c.EncodeKey(start), encodedEnd } // DecodeRange maps encodedStart and end back to normal start and // end without APIv2 prefixes. func (c *codecV2) DecodeRange(encodedStart, encodedEnd []byte) (start []byte, end []byte, err error) { if bytes.Compare(encodedStart, c.endKey) >= 0 || (len(encodedEnd) > 0 && bytes.Compare(encodedEnd, c.prefix) <= 0) { return nil, nil, errors.WithStack(errKeyOutOfBound) } start, end = []byte{}, []byte{} if bytes.HasPrefix(encodedStart, c.prefix) { start = encodedStart[len(c.prefix):] } if bytes.HasPrefix(encodedEnd, c.prefix) { end = encodedEnd[len(c.prefix):] } return } func (c *codecV2) EncodeKey(key []byte) []byte { return append(c.prefix, key...) } func (c *codecV2) DecodeKey(encodedKey []byte) ([]byte, error) { // If the given key does not start with the correct prefix, // return out of bound error. if !bytes.HasPrefix(encodedKey, c.prefix) { logutil.BgLogger().Warn("key not in keyspace", zap.String("keyspacePrefix", redact.Key(c.prefix)), zap.String("key", redact.Key(encodedKey)), zap.Stack("stack")) return nil, errKeyOutOfBound } return encodedKey[len(c.prefix):], nil } func (c *codecV2) encodeKeyRange(keyRange *kvrpcpb.KeyRange) *kvrpcpb.KeyRange { encodedRange := &kvrpcpb.KeyRange{} encodedRange.StartKey, encodedRange.EndKey = c.encodeRange(keyRange.StartKey, keyRange.EndKey, false) return encodedRange } func (c *codecV2) encodeKeyRanges(keyRanges []*kvrpcpb.KeyRange) []*kvrpcpb.KeyRange { encodedRanges := make([]*kvrpcpb.KeyRange, 0, len(keyRanges)) for _, keyRange := range keyRanges { encodedRanges = append(encodedRanges, c.encodeKeyRange(keyRange)) } return encodedRanges } func (c *codecV2) encodeCopRange(r *coprocessor.KeyRange) *coprocessor.KeyRange { newRange := &coprocessor.KeyRange{} newRange.Start, newRange.End = c.encodeRange(r.Start, r.End, false) return newRange } func (c *codecV2) decodeCopRange(r *coprocessor.KeyRange) (*coprocessor.KeyRange, error) { var err error if r != nil { r.Start, r.End, err = c.DecodeRange(r.Start, r.End) } if err != nil { return nil, err } return r, nil } func (c *codecV2) encodeCopRanges(ranges []*coprocessor.KeyRange) []*coprocessor.KeyRange { newRanges := make([]*coprocessor.KeyRange, 0, len(ranges)) for _, r := range ranges { newRanges = append(newRanges, c.encodeCopRange(r)) } return newRanges } func (c *codecV2) decodeRegions(regions []*metapb.Region) ([]*metapb.Region, error) { var err error for _, region := range regions { region.StartKey, region.EndKey, err = c.DecodeRegionRange(region.StartKey, region.EndKey) if err != nil { return nil, err } } return regions, nil } func (c *codecV2) encodeKeys(keys [][]byte) [][]byte { var encodedKeys [][]byte for _, key := range keys { encodedKeys = append(encodedKeys, c.EncodeKey(key)) } return encodedKeys } func (c *codecV2) encodeParis(pairs []*kvrpcpb.KvPair) []*kvrpcpb.KvPair { var encodedPairs []*kvrpcpb.KvPair for _, pair := range pairs { p := *pair p.Key = c.EncodeKey(p.Key) encodedPairs = append(encodedPairs, &p) } return encodedPairs } func (c *codecV2) decodePairs(encodedPairs []*kvrpcpb.KvPair) ([]*kvrpcpb.KvPair, error) { var pairs []*kvrpcpb.KvPair for _, encodedPair := range encodedPairs { var err error p := *encodedPair if p.Error != nil { p.Error, err = c.decodeKeyError(p.Error) if err != nil { return nil, err } } if len(p.Key) > 0 { p.Key, err = c.DecodeKey(p.Key) if err != nil { return nil, err } } pairs = append(pairs, &p) } return pairs, nil } func (c *codecV2) encodeMutations(mutations []*kvrpcpb.Mutation) []*kvrpcpb.Mutation { var encodedMutations []*kvrpcpb.Mutation for _, mutation := range mutations { m := *mutation m.Key = c.EncodeKey(m.Key) encodedMutations = append(encodedMutations, &m) } return encodedMutations } func (c *codecV2) encodeRegionInfo(info *coprocessor.RegionInfo) *coprocessor.RegionInfo { i := *info i.Ranges = c.encodeCopRanges(info.Ranges) return &i } func (c *codecV2) encodeRegionInfos(infos []*coprocessor.RegionInfo) []*coprocessor.RegionInfo { var encodedInfos []*coprocessor.RegionInfo for _, info := range infos { encodedInfos = append(encodedInfos, c.encodeRegionInfo(info)) } return encodedInfos } func (c *codecV2) encodeTableRegions(infos []*coprocessor.TableRegions) []*coprocessor.TableRegions { var encodedInfos []*coprocessor.TableRegions for _, info := range infos { i := *info i.Regions = c.encodeRegionInfos(info.Regions) encodedInfos = append(encodedInfos, &i) } return encodedInfos } func (c *codecV2) encodeStoreBatchTasks(tasks []*coprocessor.StoreBatchTask) []*coprocessor.StoreBatchTask { var encodedTasks []*coprocessor.StoreBatchTask for _, task := range tasks { t := *task t.Ranges = c.encodeCopRanges(t.Ranges) encodedTasks = append(encodedTasks, &t) } return encodedTasks } func (c *codecV2) decodeRegionError(regionError *errorpb.Error) (*errorpb.Error, error) { if regionError == nil { return nil, nil } var err error if errInfo := regionError.KeyNotInRegion; errInfo != nil { errInfo.Key, err = c.DecodeKey(errInfo.Key) if err != nil { return nil, err } errInfo.StartKey, errInfo.EndKey, err = c.DecodeRegionRange(errInfo.StartKey, errInfo.EndKey) if err != nil { return nil, err } } if errInfo := regionError.EpochNotMatch; errInfo != nil { decodedRegions := make([]*metapb.Region, 0, len(errInfo.CurrentRegions)) for _, meta := range errInfo.CurrentRegions { meta.StartKey, meta.EndKey, err = c.DecodeRegionRange(meta.StartKey, meta.EndKey) if err != nil { // skip out of keyspace range's region if errors.Is(err, errKeyOutOfBound) { continue } return nil, err } decodedRegions = append(decodedRegions, meta) } errInfo.CurrentRegions = decodedRegions } return regionError, nil } func (c *codecV2) decodeKeyError(keyError *kvrpcpb.KeyError) (*kvrpcpb.KeyError, error) { if keyError == nil { return nil, nil } var err error if keyError.Locked != nil { keyError.Locked, err = c.decodeLockInfo(keyError.Locked) if err != nil { return nil, err } } if keyError.Conflict != nil { keyError.Conflict.Key, err = c.DecodeKey(keyError.Conflict.Key) if err != nil { return nil, err } keyError.Conflict.Primary, err = c.DecodeKey(keyError.Conflict.Primary) if err != nil { return nil, err } } if keyError.AlreadyExist != nil { keyError.AlreadyExist.Key, err = c.DecodeKey(keyError.AlreadyExist.Key) if err != nil { return nil, err } } if keyError.Deadlock != nil { keyError.Deadlock.LockKey, err = c.DecodeKey(keyError.Deadlock.LockKey) if err != nil { return nil, err } for _, wait := range keyError.Deadlock.WaitChain { wait.Key, err = c.DecodeKey(wait.Key) if err != nil { return nil, err } } } if keyError.CommitTsExpired != nil { keyError.CommitTsExpired.Key, err = c.DecodeKey(keyError.CommitTsExpired.Key) if err != nil { return nil, err } } if keyError.TxnNotFound != nil { keyError.TxnNotFound.PrimaryKey, err = c.DecodeKey(keyError.TxnNotFound.PrimaryKey) if err != nil { return nil, err } } if keyError.AssertionFailed != nil { keyError.AssertionFailed.Key, err = c.DecodeKey(keyError.AssertionFailed.Key) if err != nil { return nil, err } } if keyError.TxnLockNotFound != nil { keyError.TxnLockNotFound.Key, err = c.DecodeKey(keyError.TxnLockNotFound.Key) if err != nil { return nil, err } } if debugInfo := keyError.DebugInfo; debugInfo != nil { for _, mvccInfo := range debugInfo.MvccInfo { if mvccInfo.Key, err = c.DecodeKey(mvccInfo.Key); err != nil { return nil, err } } } return keyError, nil } func (c *codecV2) decodeKeyErrors(keyErrors []*kvrpcpb.KeyError) ([]*kvrpcpb.KeyError, error) { var err error for i := range keyErrors { keyErrors[i], err = c.decodeKeyError(keyErrors[i]) if err != nil { return nil, err } } return keyErrors, nil } func (c *codecV2) decodeLockInfo(info *kvrpcpb.LockInfo) (*kvrpcpb.LockInfo, error) { if info == nil { return nil, nil } var err error info.Key, err = c.DecodeKey(info.Key) if err != nil { return nil, err } info.PrimaryLock, err = c.DecodeKey(info.PrimaryLock) if err != nil { return nil, err } for i := range info.Secondaries { info.Secondaries[i], err = c.DecodeKey(info.Secondaries[i]) if err != nil { return nil, err } } return info, nil } func (c *codecV2) decodeLockInfos(locks []*kvrpcpb.LockInfo) ([]*kvrpcpb.LockInfo, error) { var err error for i := range locks { locks[i], err = c.decodeLockInfo(locks[i]) if err != nil { return nil, err } } return locks, nil } func (c *codecV2) DecodeBucketKeys(keys [][]byte) ([][]byte, error) { ks := make([][]byte, 0, len(keys)) for i, key := range keys { var ( k []byte err error ) if len(key) > 0 { k, err = c.memCodec.decodeKey(key) } if err != nil { return nil, err } if i == 0 && bytes.Compare(k, c.prefix) < 0 { ks = append(ks, []byte{}) } else if i == len(keys)-1 && (len(k) == 0 || bytes.Compare(k, c.endKey) >= 0) { ks = append(ks, []byte{}) } else if bytes.HasPrefix(k, c.prefix) { raw := k[len(c.prefix):] if len(raw) == 0 && len(ks) > 0 && len(ks[0]) == 0 { continue } ks = append(ks, raw) } } return ks, nil }