mirror of https://github.com/tikv/client-go.git
234 lines
6.6 KiB
Go
234 lines
6.6 KiB
Go
package apicodec
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/kvproto/pkg/errorpb"
|
|
"github.com/pingcap/kvproto/pkg/keyspacepb"
|
|
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
|
"github.com/tikv/client-go/v2/tikvrpc"
|
|
)
|
|
|
|
type codecV1 struct {
|
|
reqPool sync.Pool
|
|
memCodec memCodec
|
|
}
|
|
|
|
// NewCodecV1 returns a codec that can be used to encode/decode
|
|
// keys and requests to and from APIv1 format.
|
|
func NewCodecV1(mode Mode) Codec {
|
|
var codec *codecV1
|
|
switch mode {
|
|
case ModeRaw:
|
|
codec = &codecV1{memCodec: &defaultMemCodec{}}
|
|
case ModeTxn:
|
|
codec = &codecV1{memCodec: &memComparableCodec{}}
|
|
default:
|
|
panic("unknown mode")
|
|
}
|
|
codec.reqPool.New = func() any { return &tikvrpc.Request{} }
|
|
return codec
|
|
}
|
|
|
|
func (c *codecV1) GetAPIVersion() kvrpcpb.APIVersion {
|
|
return kvrpcpb.APIVersion_V1
|
|
}
|
|
|
|
func (c *codecV1) GetKeyspace() []byte {
|
|
return nil
|
|
}
|
|
|
|
func (c *codecV1) GetKeyspaceID() KeyspaceID {
|
|
return NullspaceID
|
|
}
|
|
|
|
func (c *codecV1) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error) {
|
|
r := c.reqPool.Get().(*tikvrpc.Request)
|
|
*r = *req
|
|
setAPICtx(c, r)
|
|
return r, nil
|
|
}
|
|
|
|
func (c *codecV1) DecodeResponse(req *tikvrpc.Request, resp *tikvrpc.Response) (*tikvrpc.Response, error) {
|
|
defer c.reqPool.Put(req)
|
|
regionError, err := resp.GetRegionError()
|
|
// If GetRegionError returns error, it means the response does not contain region error to decode,
|
|
// therefore we skip decoding and return the response as is.
|
|
if err != nil {
|
|
return resp, nil
|
|
}
|
|
decodeRegionError, err := c.decodeRegionError(regionError)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
switch req.Type {
|
|
case tikvrpc.CmdGet:
|
|
r := resp.Resp.(*kvrpcpb.GetResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdScan:
|
|
r := resp.Resp.(*kvrpcpb.ScanResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdPrewrite:
|
|
r := resp.Resp.(*kvrpcpb.PrewriteResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdCommit:
|
|
r := resp.Resp.(*kvrpcpb.CommitResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdCleanup:
|
|
r := resp.Resp.(*kvrpcpb.CleanupResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdBatchGet:
|
|
r := resp.Resp.(*kvrpcpb.BatchGetResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdBatchRollback:
|
|
r := resp.Resp.(*kvrpcpb.BatchRollbackResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdScanLock:
|
|
r := resp.Resp.(*kvrpcpb.ScanLockResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdResolveLock:
|
|
r := resp.Resp.(*kvrpcpb.ResolveLockResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdGC:
|
|
r := resp.Resp.(*kvrpcpb.GCResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdDeleteRange:
|
|
r := resp.Resp.(*kvrpcpb.DeleteRangeResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdPessimisticLock:
|
|
r := resp.Resp.(*kvrpcpb.PessimisticLockResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdPessimisticRollback:
|
|
r := resp.Resp.(*kvrpcpb.PessimisticRollbackResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdTxnHeartBeat:
|
|
r := resp.Resp.(*kvrpcpb.TxnHeartBeatResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdCheckTxnStatus:
|
|
r := resp.Resp.(*kvrpcpb.CheckTxnStatusResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdCheckSecondaryLocks:
|
|
r := resp.Resp.(*kvrpcpb.CheckSecondaryLocksResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdRawGet:
|
|
r := resp.Resp.(*kvrpcpb.RawGetResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdRawBatchGet:
|
|
r := resp.Resp.(*kvrpcpb.RawBatchGetResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdRawPut:
|
|
r := resp.Resp.(*kvrpcpb.RawPutResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdRawBatchPut:
|
|
r := resp.Resp.(*kvrpcpb.RawBatchPutResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdRawDelete:
|
|
r := resp.Resp.(*kvrpcpb.RawDeleteResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdRawBatchDelete:
|
|
r := resp.Resp.(*kvrpcpb.RawBatchDeleteResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdRawDeleteRange:
|
|
r := resp.Resp.(*kvrpcpb.RawDeleteRangeResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdRawScan:
|
|
r := resp.Resp.(*kvrpcpb.RawScanResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdGetKeyTTL:
|
|
r := resp.Resp.(*kvrpcpb.RawGetKeyTTLResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdRawCompareAndSwap:
|
|
r := resp.Resp.(*kvrpcpb.RawCASResponse)
|
|
r.RegionError = decodeRegionError
|
|
case tikvrpc.CmdRawChecksum:
|
|
r := resp.Resp.(*kvrpcpb.RawChecksumResponse)
|
|
r.RegionError = decodeRegionError
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (c *codecV1) EncodeRegionKey(key []byte) []byte {
|
|
return c.memCodec.encodeKey(key)
|
|
}
|
|
|
|
func (c *codecV1) DecodeRegionKey(encodedKey []byte) ([]byte, error) {
|
|
if len(encodedKey) == 0 {
|
|
return encodedKey, nil
|
|
}
|
|
return c.memCodec.decodeKey(encodedKey)
|
|
}
|
|
|
|
func (c *codecV1) EncodeRegionRange(start, end []byte) ([]byte, []byte) {
|
|
if len(end) > 0 {
|
|
return c.EncodeRegionKey(start), c.EncodeRegionKey(end)
|
|
}
|
|
return c.EncodeRegionKey(start), end
|
|
}
|
|
|
|
func (c *codecV1) DecodeRegionRange(encodedStart, encodedEnd []byte) ([]byte, []byte, error) {
|
|
start, err := c.DecodeRegionKey(encodedStart)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
end, err := c.DecodeRegionKey(encodedEnd)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return start, end, nil
|
|
}
|
|
|
|
func (c *codecV1) decodeRegionError(regionError *errorpb.Error) (*errorpb.Error, error) {
|
|
if regionError == nil {
|
|
return nil, nil
|
|
}
|
|
var err error
|
|
if errInfo := regionError.KeyNotInRegion; errInfo != nil {
|
|
errInfo.StartKey, errInfo.EndKey, err = c.DecodeRegionRange(errInfo.StartKey, errInfo.EndKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if errInfo := regionError.EpochNotMatch; errInfo != nil {
|
|
for _, meta := range errInfo.CurrentRegions {
|
|
meta.StartKey, meta.EndKey, err = c.DecodeRegionRange(meta.StartKey, meta.EndKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
return regionError, nil
|
|
}
|
|
|
|
func (c *codecV1) EncodeKey(key []byte) []byte {
|
|
return key
|
|
}
|
|
|
|
func (c *codecV1) EncodeRange(start, end []byte) ([]byte, []byte) {
|
|
return start, end
|
|
}
|
|
|
|
func (c *codecV1) DecodeRange(start, end []byte) ([]byte, []byte, error) {
|
|
return start, end, nil
|
|
}
|
|
|
|
func (c *codecV1) DecodeKey(key []byte) ([]byte, error) {
|
|
return key, nil
|
|
}
|
|
|
|
func (c *codecV1) DecodeBucketKeys(keys [][]byte) ([][]byte, error) {
|
|
ks := make([][]byte, 0, len(keys))
|
|
for _, key := range keys {
|
|
k, err := c.DecodeRegionKey(key)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
ks = append(ks, k)
|
|
}
|
|
return ks, nil
|
|
}
|
|
|
|
func (c *codecV1) GetKeyspaceMeta() *keyspacepb.KeyspaceMeta {
|
|
return nil
|
|
}
|