refine keyspace request encoding (#742)

This commit is contained in:
iosmanthus 2023-03-21 16:14:47 +08:00 committed by GitHub
parent 884a634378
commit 2e8d7a2d8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 70 additions and 46 deletions

View File

@ -86,7 +86,7 @@ func DecodeKey(encoded []byte, version kvrpcpb.APIVersion) ([]byte, []byte, erro
return nil, nil, errors.Errorf("unsupported api version %s", version.String()) return nil, nil, errors.Errorf("unsupported api version %s", version.String())
} }
func attachAPICtx(c Codec, req *tikvrpc.Request) (*tikvrpc.Request, error) { func attachAPICtx(c Codec, req *tikvrpc.Request) *tikvrpc.Request {
// Shallow copy the request to avoid concurrent modification. // Shallow copy the request to avoid concurrent modification.
r := *req r := *req
@ -97,9 +97,13 @@ func attachAPICtx(c Codec, req *tikvrpc.Request) (*tikvrpc.Request, error) {
switch r.Type { switch r.Type {
case tikvrpc.CmdMPPTask: case tikvrpc.CmdMPPTask:
mpp := *r.DispatchMPPTask() mpp := *r.DispatchMPPTask()
mpp.Meta.KeyspaceId = ctx.KeyspaceId // Shallow copy the meta to avoid concurrent modification.
mpp.Meta.ApiVersion = ctx.ApiVersion meta := *mpp.Meta
meta.KeyspaceId = ctx.KeyspaceId
meta.ApiVersion = ctx.ApiVersion
mpp.Meta = &meta
r.Req = &mpp r.Req = &mpp
case tikvrpc.CmdCompact: case tikvrpc.CmdCompact:
compact := *r.Compact() compact := *r.Compact()
compact.KeyspaceId = ctx.KeyspaceId compact.KeyspaceId = ctx.KeyspaceId
@ -109,5 +113,5 @@ func attachAPICtx(c Codec, req *tikvrpc.Request) (*tikvrpc.Request, error) {
tikvrpc.AttachContext(&r, ctx) tikvrpc.AttachContext(&r, ctx)
return &r, nil return &r
} }

View File

@ -35,7 +35,7 @@ func (c *codecV1) GetKeyspaceID() KeyspaceID {
} }
func (c *codecV1) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error) { func (c *codecV1) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error) {
return attachAPICtx(c, req) return attachAPICtx(c, req), nil
} }
func (c *codecV1) DecodeResponse(req *tikvrpc.Request, resp *tikvrpc.Response) (*tikvrpc.Response, error) { func (c *codecV1) DecodeResponse(req *tikvrpc.Request, resp *tikvrpc.Response) (*tikvrpc.Response, error) {

View File

@ -109,170 +109,167 @@ func (c *codecV2) GetAPIVersion() kvrpcpb.APIVersion {
// EncodeRequest encodes with the given Codec. // EncodeRequest encodes with the given Codec.
// NOTE: req is reused on retry. MUST encode on cloned request, other than overwrite the original. // NOTE: req is reused on retry. MUST encode on cloned request, other than overwrite the original.
func (c *codecV2) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error) { func (c *codecV2) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error) {
newReq, err := attachAPICtx(c, req) // attachAPICtx will shallow copy the request.
if err != nil { req = attachAPICtx(c, req)
return nil, err
}
// Encode requests based on command type. // Encode requests based on command type.
switch req.Type { switch req.Type {
// Transaction Request Types. // Transaction Request Types.
case tikvrpc.CmdGet: case tikvrpc.CmdGet:
r := *req.Get() r := *req.Get()
r.Key = c.EncodeKey(r.Key) r.Key = c.EncodeKey(r.Key)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdScan: case tikvrpc.CmdScan:
r := *req.Scan() r := *req.Scan()
r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, r.Reverse) r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, r.Reverse)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdPrewrite: case tikvrpc.CmdPrewrite:
r := *req.Prewrite() r := *req.Prewrite()
r.Mutations = c.encodeMutations(r.Mutations) r.Mutations = c.encodeMutations(r.Mutations)
r.PrimaryLock = c.EncodeKey(r.PrimaryLock) r.PrimaryLock = c.EncodeKey(r.PrimaryLock)
r.Secondaries = c.encodeKeys(r.Secondaries) r.Secondaries = c.encodeKeys(r.Secondaries)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdCommit: case tikvrpc.CmdCommit:
r := *req.Commit() r := *req.Commit()
r.Keys = c.encodeKeys(r.Keys) r.Keys = c.encodeKeys(r.Keys)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdCleanup: case tikvrpc.CmdCleanup:
r := *req.Cleanup() r := *req.Cleanup()
r.Key = c.EncodeKey(r.Key) r.Key = c.EncodeKey(r.Key)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdBatchGet: case tikvrpc.CmdBatchGet:
r := *req.BatchGet() r := *req.BatchGet()
r.Keys = c.encodeKeys(r.Keys) r.Keys = c.encodeKeys(r.Keys)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdBatchRollback: case tikvrpc.CmdBatchRollback:
r := *req.BatchRollback() r := *req.BatchRollback()
r.Keys = c.encodeKeys(r.Keys) r.Keys = c.encodeKeys(r.Keys)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdScanLock: case tikvrpc.CmdScanLock:
r := *req.ScanLock() r := *req.ScanLock()
r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, false) r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, false)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdResolveLock: case tikvrpc.CmdResolveLock:
r := *req.ResolveLock() r := *req.ResolveLock()
r.Keys = c.encodeKeys(r.Keys) r.Keys = c.encodeKeys(r.Keys)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdGC: case tikvrpc.CmdGC:
// TODO: Deprecate Central GC Mode. // TODO: Deprecate Central GC Mode.
case tikvrpc.CmdDeleteRange: case tikvrpc.CmdDeleteRange:
r := *req.DeleteRange() r := *req.DeleteRange()
r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, false) r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, false)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdPessimisticLock: case tikvrpc.CmdPessimisticLock:
r := *req.PessimisticLock() r := *req.PessimisticLock()
r.Mutations = c.encodeMutations(r.Mutations) r.Mutations = c.encodeMutations(r.Mutations)
r.PrimaryLock = c.EncodeKey(r.PrimaryLock) r.PrimaryLock = c.EncodeKey(r.PrimaryLock)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdPessimisticRollback: case tikvrpc.CmdPessimisticRollback:
r := *req.PessimisticRollback() r := *req.PessimisticRollback()
r.Keys = c.encodeKeys(r.Keys) r.Keys = c.encodeKeys(r.Keys)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdTxnHeartBeat: case tikvrpc.CmdTxnHeartBeat:
r := *req.TxnHeartBeat() r := *req.TxnHeartBeat()
r.PrimaryLock = c.EncodeKey(r.PrimaryLock) r.PrimaryLock = c.EncodeKey(r.PrimaryLock)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdCheckTxnStatus: case tikvrpc.CmdCheckTxnStatus:
r := *req.CheckTxnStatus() r := *req.CheckTxnStatus()
r.PrimaryKey = c.EncodeKey(r.PrimaryKey) r.PrimaryKey = c.EncodeKey(r.PrimaryKey)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdCheckSecondaryLocks: case tikvrpc.CmdCheckSecondaryLocks:
r := *req.CheckSecondaryLocks() r := *req.CheckSecondaryLocks()
r.Keys = c.encodeKeys(r.Keys) r.Keys = c.encodeKeys(r.Keys)
newReq.Req = &r req.Req = &r
// Raw Request Types. // Raw Request Types.
case tikvrpc.CmdRawGet: case tikvrpc.CmdRawGet:
r := *req.RawGet() r := *req.RawGet()
r.Key = c.EncodeKey(r.Key) r.Key = c.EncodeKey(r.Key)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdRawBatchGet: case tikvrpc.CmdRawBatchGet:
r := *req.RawBatchGet() r := *req.RawBatchGet()
r.Keys = c.encodeKeys(r.Keys) r.Keys = c.encodeKeys(r.Keys)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdRawPut: case tikvrpc.CmdRawPut:
r := *req.RawPut() r := *req.RawPut()
r.Key = c.EncodeKey(r.Key) r.Key = c.EncodeKey(r.Key)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdRawBatchPut: case tikvrpc.CmdRawBatchPut:
r := *req.RawBatchPut() r := *req.RawBatchPut()
r.Pairs = c.encodeParis(r.Pairs) r.Pairs = c.encodeParis(r.Pairs)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdRawDelete: case tikvrpc.CmdRawDelete:
r := *req.RawDelete() r := *req.RawDelete()
r.Key = c.EncodeKey(r.Key) r.Key = c.EncodeKey(r.Key)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdRawBatchDelete: case tikvrpc.CmdRawBatchDelete:
r := *req.RawBatchDelete() r := *req.RawBatchDelete()
r.Keys = c.encodeKeys(r.Keys) r.Keys = c.encodeKeys(r.Keys)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdRawDeleteRange: case tikvrpc.CmdRawDeleteRange:
r := *req.RawDeleteRange() r := *req.RawDeleteRange()
r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, false) r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, false)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdRawScan: case tikvrpc.CmdRawScan:
r := *req.RawScan() r := *req.RawScan()
r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, r.Reverse) r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, r.Reverse)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdGetKeyTTL: case tikvrpc.CmdGetKeyTTL:
r := *req.RawGetKeyTTL() r := *req.RawGetKeyTTL()
r.Key = c.EncodeKey(r.Key) r.Key = c.EncodeKey(r.Key)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdRawCompareAndSwap: case tikvrpc.CmdRawCompareAndSwap:
r := *req.RawCompareAndSwap() r := *req.RawCompareAndSwap()
r.Key = c.EncodeKey(r.Key) r.Key = c.EncodeKey(r.Key)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdRawChecksum: case tikvrpc.CmdRawChecksum:
r := *req.RawChecksum() r := *req.RawChecksum()
r.Ranges = c.encodeKeyRanges(r.Ranges) r.Ranges = c.encodeKeyRanges(r.Ranges)
newReq.Req = &r req.Req = &r
// TiFlash Requests // TiFlash Requests
case tikvrpc.CmdBatchCop: case tikvrpc.CmdBatchCop:
r := *req.BatchCop() r := *req.BatchCop()
r.Regions = c.encodeRegionInfos(r.Regions) r.Regions = c.encodeRegionInfos(r.Regions)
r.TableRegions = c.encodeTableRegions(r.TableRegions) r.TableRegions = c.encodeTableRegions(r.TableRegions)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdMPPTask: case tikvrpc.CmdMPPTask:
r := *req.DispatchMPPTask() r := *req.DispatchMPPTask()
r.Meta.KeyspaceId = uint32(c.GetKeyspaceID())
r.Regions = c.encodeRegionInfos(r.Regions) r.Regions = c.encodeRegionInfos(r.Regions)
r.TableRegions = c.encodeTableRegions(r.TableRegions) r.TableRegions = c.encodeTableRegions(r.TableRegions)
newReq.Req = &r req.Req = &r
// Other requests. // Other requests.
case tikvrpc.CmdUnsafeDestroyRange: case tikvrpc.CmdUnsafeDestroyRange:
r := *req.UnsafeDestroyRange() r := *req.UnsafeDestroyRange()
r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, false) r.StartKey, r.EndKey = c.encodeRange(r.StartKey, r.EndKey, false)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdPhysicalScanLock: case tikvrpc.CmdPhysicalScanLock:
r := *req.PhysicalScanLock() r := *req.PhysicalScanLock()
r.StartKey = c.EncodeKey(r.StartKey) r.StartKey = c.EncodeKey(r.StartKey)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdStoreSafeTS: case tikvrpc.CmdStoreSafeTS:
r := *req.StoreSafeTS() r := *req.StoreSafeTS()
r.KeyRange = c.encodeKeyRange(r.KeyRange) r.KeyRange = c.encodeKeyRange(r.KeyRange)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdCop: case tikvrpc.CmdCop:
r := *req.Cop() r := *req.Cop()
r.Ranges = c.encodeCopRanges(r.Ranges) r.Ranges = c.encodeCopRanges(r.Ranges)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdCopStream: case tikvrpc.CmdCopStream:
r := *req.Cop() r := *req.Cop()
r.Ranges = c.encodeCopRanges(r.Ranges) r.Ranges = c.encodeCopRanges(r.Ranges)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdMvccGetByKey: case tikvrpc.CmdMvccGetByKey:
r := *req.MvccGetByKey() r := *req.MvccGetByKey()
r.Key = c.EncodeRegionKey(r.Key) r.Key = c.EncodeRegionKey(r.Key)
newReq.Req = &r req.Req = &r
case tikvrpc.CmdSplitRegion: case tikvrpc.CmdSplitRegion:
r := *req.SplitRegion() r := *req.SplitRegion()
r.SplitKeys = c.encodeKeys(r.SplitKeys) r.SplitKeys = c.encodeKeys(r.SplitKeys)
newReq.Req = &r req.Req = &r
} }
return newReq, nil return req, nil
} }
// DecodeResponse decode the resp with the given codec. // DecodeResponse decode the resp with the given codec.

View File

@ -4,9 +4,11 @@ import (
"math" "math"
"testing" "testing"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/tikvrpc"
) )
@ -270,3 +272,24 @@ func (suite *testCodecV2Suite) TestDecodeEpochNotMatch() {
func (suite *testCodecV2Suite) TestGetKeyspaceID() { func (suite *testCodecV2Suite) TestGetKeyspaceID() {
suite.Equal(KeyspaceID(testKeyspaceID), suite.codec.GetKeyspaceID()) suite.Equal(KeyspaceID(testKeyspaceID), suite.codec.GetKeyspaceID())
} }
func (suite *testCodecV2Suite) TestEncodeMPPRequest() {
req, err := suite.codec.EncodeRequest(&tikvrpc.Request{
Type: tikvrpc.CmdMPPTask,
Req: &mpp.DispatchTaskRequest{
Meta: &mpp.TaskMeta{},
Regions: []*coprocessor.RegionInfo{
{
Ranges: []*coprocessor.KeyRange{{Start: []byte("a"), End: []byte("b")}},
},
},
},
})
suite.Nil(err)
task, ok := req.Req.(*mpp.DispatchTaskRequest)
suite.True(ok)
suite.Equal(task.Meta.KeyspaceId, testKeyspaceID)
suite.Equal(task.Meta.ApiVersion, kvrpcpb.APIVersion_V2)
suite.Equal(task.Regions[0].Ranges[0].Start, suite.codec.EncodeKey([]byte("a")))
suite.Equal(task.Regions[0].Ranges[0].End, suite.codec.EncodeKey([]byte("b")))
}