diff --git a/go.mod b/go.mod index 89e70c22..18299503 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20230206112125-0561adc37543 + github.com/pingcap/kvproto v0.0.0-20230216153817-c6df78cc9dea github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.14.0 diff --git a/go.sum b/go.sum index 42fd9b44..72408251 100644 --- a/go.sum +++ b/go.sum @@ -145,8 +145,8 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgW github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20230206112125-0561adc37543 h1:QcC52K9hhsP6eVmQBnSMI/b8TiOUVztbaeduTXspmeQ= -github.com/pingcap/kvproto v0.0.0-20230206112125-0561adc37543/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= +github.com/pingcap/kvproto v0.0.0-20230216153817-c6df78cc9dea h1:Qt8xe4CWgA/pPfYLHwCl8Mz0g7Mbnbhx4l0gVf9eH1w= +github.com/pingcap/kvproto v0.0.0-20230216153817-c6df78cc9dea/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/integration_tests/go.mod b/integration_tests/go.mod index 321ac59a..d5c58bd2 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -6,7 +6,7 @@ require ( github.com/ninedraft/israce v0.0.3 github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c - github.com/pingcap/kvproto v0.0.0-20230206112125-0561adc37543 + github.com/pingcap/kvproto v0.0.0-20230216153817-c6df78cc9dea github.com/pingcap/tidb v1.1.0-beta.0.20230207083958-f1d450ff7aa4 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.8.1 diff --git a/integration_tests/go.sum b/integration_tests/go.sum index d09cd84c..ed3cc338 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -334,8 +334,8 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 h1:Pe2LbxRmbTfAoKJ65bZLmhahmvHm7n9DUxGRQT00208= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20230206112125-0561adc37543 h1:QcC52K9hhsP6eVmQBnSMI/b8TiOUVztbaeduTXspmeQ= -github.com/pingcap/kvproto v0.0.0-20230206112125-0561adc37543/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= +github.com/pingcap/kvproto v0.0.0-20230216153817-c6df78cc9dea h1:Qt8xe4CWgA/pPfYLHwCl8Mz0g7Mbnbhx4l0gVf9eH1w= +github.com/pingcap/kvproto v0.0.0-20230216153817-c6df78cc9dea/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c h1:crhkw6DD+07Bg1wYhW5Piw+kYNKZqFQqfC2puUf6gMI= github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/internal/apicodec/codec.go b/internal/apicodec/codec.go index 3eae9acf..abd51c34 100644 --- a/internal/apicodec/codec.go +++ b/internal/apicodec/codec.go @@ -23,15 +23,15 @@ const ( ) const ( - // NulSpaceID is a special keyspace id that represents no keyspace exist. - NulSpaceID KeyspaceID = 0xffffffff + // NullspaceID is a special keyspace id that represents no keyspace exist. + NullspaceID KeyspaceID = 0xffffffff ) // ParseKeyspaceID retrieves the keyspaceID from the given keyspace-encoded key. // It returns error if the given key is not in proper api-v2 format. func ParseKeyspaceID(b []byte) (KeyspaceID, error) { if err := checkV2Key(b); err != nil { - return NulSpaceID, err + return NullspaceID, err } buf := append([]byte{}, b[:keyspacePrefixLen]...) @@ -85,3 +85,26 @@ func DecodeKey(encoded []byte, version kvrpcpb.APIVersion) ([]byte, []byte, erro } return nil, nil, errors.Errorf("unsupported api version %s", version.String()) } + +func attachAPICtx(c Codec, req *tikvrpc.Request) (*tikvrpc.Request, error) { + // Shallow copy the request to avoid concurrent modification. + r := *req + + ctx := &r.Context + ctx.ApiVersion = c.GetAPIVersion() + ctx.KeyspaceId = uint32(c.GetKeyspaceID()) + + switch r.Type { + case tikvrpc.CmdMPPTask: + mpp := *r.DispatchMPPTask() + mpp.Meta.KeyspaceId = ctx.KeyspaceId + r.Req = &mpp + } + + err := tikvrpc.AttachContext(&r, ctx) + if err != nil { + return nil, err + } + + return &r, nil +} diff --git a/internal/apicodec/codec_test.go b/internal/apicodec/codec_test.go index a42773d5..668e2a97 100644 --- a/internal/apicodec/codec_test.go +++ b/internal/apicodec/codec_test.go @@ -18,11 +18,11 @@ func TestParseKeyspaceID(t *testing.T) { id, err = ParseKeyspaceID([]byte{'t', 0, 0}) assert.NotNil(t, err) - assert.Equal(t, NulSpaceID, id) + assert.Equal(t, NullspaceID, id) id, err = ParseKeyspaceID([]byte{'t', 0, 0, 1, 1, 2, 3}) assert.NotNil(t, err) - assert.Equal(t, NulSpaceID, id) + assert.Equal(t, NullspaceID, id) } func TestDecodeKey(t *testing.T) { diff --git a/internal/apicodec/codec_v1.go b/internal/apicodec/codec_v1.go index 18fd5b6c..d2434c2c 100644 --- a/internal/apicodec/codec_v1.go +++ b/internal/apicodec/codec_v1.go @@ -31,11 +31,11 @@ func (c *codecV1) GetKeyspace() []byte { } func (c *codecV1) GetKeyspaceID() KeyspaceID { - return NulSpaceID + return NullspaceID } func (c *codecV1) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error) { - return req, nil + return attachAPICtx(c, req) } func (c *codecV1) DecodeResponse(req *tikvrpc.Request, resp *tikvrpc.Response) (*tikvrpc.Response, error) { diff --git a/internal/apicodec/codec_v2.go b/internal/apicodec/codec_v2.go index ea83a336..48005aa9 100644 --- a/internal/apicodec/codec_v2.go +++ b/internal/apicodec/codec_v2.go @@ -109,7 +109,10 @@ func (c *codecV2) GetAPIVersion() kvrpcpb.APIVersion { // EncodeRequest encodes with the given Codec. // NOTE: req is reused on retry. MUST encode on cloned request, other than overwrite the original. func (c *codecV2) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error) { - newReq := *req + newReq, err := attachAPICtx(c, req) + if err != nil { + return nil, err + } // Encode requests based on command type. switch req.Type { // Transaction Request Types. @@ -233,6 +236,7 @@ func (c *codecV2) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error) newReq.Req = &r case tikvrpc.CmdMPPTask: r := *req.DispatchMPPTask() + r.Meta.KeyspaceId = uint32(c.GetKeyspaceID()) r.Regions = c.encodeRegionInfos(r.Regions) r.TableRegions = c.encodeTableRegions(r.TableRegions) newReq.Req = &r @@ -268,7 +272,7 @@ func (c *codecV2) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error) newReq.Req = &r } - return &newReq, nil + return newReq, nil } // DecodeResponse decode the resp with the given codec. diff --git a/internal/client/client.go b/internal/client/client.go index 23b690b7..168638d1 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -640,11 +640,13 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R // SendRequest sends a Request to server and receives Response. func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + // In unit test, the option or codec may be nil. Here should skip the encode/decode process. if c.option == nil || c.option.codec == nil { return c.sendRequest(ctx, addr, req, timeout) } - req, err := c.option.codec.EncodeRequest(req) + codec := c.option.codec + req, err := codec.EncodeRequest(req) if err != nil { return nil, err } @@ -652,7 +654,7 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R if err != nil { return nil, err } - return c.option.codec.DecodeResponse(req, resp) + return codec.DecodeResponse(req, resp) } func (c *RPCClient) getCopStreamResponse(ctx context.Context, client tikvpb.TikvClient, req *tikvrpc.Request, timeout time.Duration, connArray *connArray) (*tikvrpc.Response, error) { diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 1ae15e72..2876eb57 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -283,6 +283,7 @@ type replicaSelector struct { // | reachable +-----+-----+ all proxies are tried ^ // +------------+tryNewProxy+-------------------------+ // +-----------+ + type selectorState interface { next(*retry.Backoffer, *replicaSelector) (*RPCContext, error) onSendSuccess(*replicaSelector) @@ -1153,8 +1154,6 @@ func fetchRespInfo(resp *tikvrpc.Response) string { } func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, rpcCtx *RPCContext, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, retry bool, err error) { - req.ApiVersion = s.apiVersion - if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil { return nil, false, err } diff --git a/tikv/region.go b/tikv/region.go index 188096e6..97845aa7 100644 --- a/tikv/region.go +++ b/tikv/region.go @@ -119,7 +119,7 @@ var DefaultKeyspaceName = apicodec.DefaultKeyspaceName // Mode represents the operation mode of a request, export client.Mode type Mode = apicodec.Mode -var ( +const ( // ModeRaw represent a raw operation in TiKV, export client.ModeRaw ModeRaw Mode = apicodec.ModeRaw @@ -127,6 +127,14 @@ var ( ModeTxn Mode = apicodec.ModeTxn ) +// KeyspaceID denotes the target keyspace of the request. +type KeyspaceID = apicodec.KeyspaceID + +const ( + // NullspaceID is a special keyspace id that represents no keyspace exist + NullspaceID KeyspaceID = apicodec.NullspaceID +) + // RecordRegionRequestRuntimeStats records request runtime stats. func RecordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*locate.RPCRuntimeStats, cmd tikvrpc.CmdType, d time.Duration) { locate.RecordRegionRequestRuntimeStats(stats, cmd, d) diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 3bc28657..7e7290f5 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -689,15 +689,8 @@ type MPPStreamResponse struct { Lease } -// SetContext set the Context field for the given req to the specified ctx. -func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { - ctx := &req.Context - if region != nil { - ctx.RegionId = region.Id - ctx.RegionEpoch = region.RegionEpoch - } - ctx.Peer = peer - +// AttachContext sets the request context to the request. +func AttachContext(req *Request, ctx *kvrpcpb.Context) error { switch req.Type { case CmdGet: req.Get().Context = ctx @@ -763,8 +756,12 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { req.Cop().Context = ctx case CmdBatchCop: req.BatchCop().Context = ctx + // Dispatching MPP tasks don't need a region context, because it's a request for store but not region. case CmdMPPTask: - // Dispatching MPP tasks don't need a region context, because it's a request for store but not region. + case CmdMPPConn: + case CmdMPPCancel: + case CmdMPPAlive: + case CmdMvccGetByKey: req.MvccGetByKey().Context = ctx case CmdMvccGetByStartTs: @@ -789,6 +786,17 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { return nil } +// SetContext set the Context field for the given req to the specified ctx. +func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { + ctx := &req.Context + if region != nil { + ctx.RegionId = region.Id + ctx.RegionEpoch = region.RegionEpoch + } + ctx.Peer = peer + return AttachContext(req, ctx) +} + // GenRegionErrorResp returns corresponding Response with specified RegionError // according to the given req. func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) {