*: reduce overhead of codec client (#1555)

Signed-off-by: zyguan <zhongyangguan@gmail.com>
This commit is contained in:
zyguan 2025-01-17 11:47:01 +08:00 committed by GitHub
parent e9d868c0a1
commit 20764920b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 164 additions and 18 deletions

View File

@ -91,10 +91,7 @@ func DecodeKey(encoded []byte, version kvrpcpb.APIVersion) ([]byte, []byte, erro
return nil, nil, errors.Errorf("unsupported api version %s", version.String())
}
func attachAPICtx(c Codec, req *tikvrpc.Request) *tikvrpc.Request {
// Shallow copy the request to avoid concurrent modification.
r := *req
func setAPICtx(c Codec, r *tikvrpc.Request) {
r.Context.ApiVersion = c.GetAPIVersion()
r.Context.KeyspaceId = uint32(c.GetKeyspaceID())
@ -114,8 +111,4 @@ func attachAPICtx(c Codec, req *tikvrpc.Request) *tikvrpc.Request {
compact.ApiVersion = r.Context.ApiVersion
r.Req = &compact
}
tikvrpc.AttachContext(&r, r.Context)
return &r
}

View File

@ -1,6 +1,8 @@
package apicodec
import (
"sync"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/keyspacepb"
@ -9,19 +11,24 @@ import (
)
type codecV1 struct {
reqPool sync.Pool
memCodec memCodec
}
// NewCodecV1 returns a codec that can be used to encode/decode
// keys and requests to and from APIv1 format.
func NewCodecV1(mode Mode) Codec {
var codec *codecV1
switch mode {
case ModeRaw:
return &codecV1{memCodec: &defaultMemCodec{}}
codec = &codecV1{memCodec: &defaultMemCodec{}}
case ModeTxn:
return &codecV1{memCodec: &memComparableCodec{}}
codec = &codecV1{memCodec: &memComparableCodec{}}
default:
panic("unknown mode")
}
panic("unknown mode")
codec.reqPool.New = func() any { return &tikvrpc.Request{} }
return codec
}
func (c *codecV1) GetAPIVersion() kvrpcpb.APIVersion {
@ -37,10 +44,14 @@ func (c *codecV1) GetKeyspaceID() KeyspaceID {
}
func (c *codecV1) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error) {
return attachAPICtx(c, req), nil
r := c.reqPool.Get().(*tikvrpc.Request)
*r = *req
setAPICtx(c, r)
return r, nil
}
func (c *codecV1) DecodeResponse(req *tikvrpc.Request, resp *tikvrpc.Response) (*tikvrpc.Response, error) {
defer c.reqPool.Put(req)
regionError, err := resp.GetRegionError()
// If GetRegionError returns error, it means the response does not contain region error to decode,
// therefore we skip decoding and return the response as is.

View File

@ -4,6 +4,7 @@ import (
"bytes"
"encoding/binary"
"encoding/hex"
"sync"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/errorpb"
@ -50,6 +51,7 @@ func BuildKeyspaceName(name string) string {
// codecV2 is used to encode/decode keys and request into APIv2 format.
type codecV2 struct {
reqPool sync.Pool
prefix []byte
endKey []byte
memCodec memCodec
@ -85,6 +87,7 @@ func NewCodecV2(mode Mode, keyspaceMeta *keyspacepb.KeyspaceMeta) (Codec, error)
copy(codec.prefix[1:], prefix)
prefixVal := binary.BigEndian.Uint32(codec.prefix)
binary.BigEndian.PutUint32(codec.endKey, prefixVal+1)
codec.reqPool.New = func() any { return &tikvrpc.Request{} }
return codec, nil
}
@ -122,8 +125,10 @@ func (c *codecV2) GetAPIVersion() kvrpcpb.APIVersion {
// EncodeRequest encodes with the given Codec.
// NOTE: req is reused on retry. MUST encode on cloned request, other than overwrite the original.
func (c *codecV2) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error) {
// attachAPICtx will shallow copy the request.
req = attachAPICtx(c, req)
r := c.reqPool.Get().(*tikvrpc.Request)
*r = *req
setAPICtx(c, r)
req = r
// Encode requests based on command type.
switch req.Type {
// Transaction Request Types.
@ -289,6 +294,7 @@ func (c *codecV2) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error)
// DecodeResponse decode the resp with the given codec.
func (c *codecV2) DecodeResponse(req *tikvrpc.Request, resp *tikvrpc.Response) (*tikvrpc.Response, error) {
defer c.reqPool.Put(req)
var err error
// Decode response based on command type.
switch req.Type {

View File

@ -651,6 +651,8 @@ func (c *RPCClient) updateSendReqHistogramAndExecStats(req *tikvrpc.Request, res
}
func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) {
tikvrpc.AttachContext(req, req.Context)
var spanRPC opentracing.Span
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
spanRPC = span.Tracer().StartSpan(fmt.Sprintf("rpcClient.SendRequest, region ID: %d, type: %s", req.RegionId, req.Type), opentracing.ChildOf(span.Context()))

View File

@ -87,7 +87,7 @@ func (r reqCollapse) tryCollapseRequest(ctx context.Context, addr string, req *t
return
}
canCollapse = true
key := strconv.FormatUint(resolveLock.Context.RegionId, 10) + "-" + strconv.FormatUint(resolveLock.StartVersion, 10)
key := strconv.FormatUint(req.Context.RegionId, 10) + "-" + strconv.FormatUint(resolveLock.StartVersion, 10)
resp, err = r.collapse(ctx, key, &resolveRegionSf, addr, req, timeout)
return
default:

View File

@ -182,7 +182,7 @@ func TestCollapseResolveLock(t *testing.T) {
CommitVersion: commitTS,
Keys: keys,
})
tikvrpc.SetContext(req, region, nil)
tikvrpc.SetContextNoAttach(req, region, nil)
return req
}
buildBatchResolveLockReq := func(regionID uint64, txnInfos []*kvrpcpb.TxnInfo) *tikvrpc.Request {
@ -190,7 +190,7 @@ func TestCollapseResolveLock(t *testing.T) {
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, &kvrpcpb.ResolveLockRequest{
TxnInfos: txnInfos,
})
tikvrpc.SetContext(req, region, nil)
tikvrpc.SetContextNoAttach(req, region, nil)
return req
}

View File

@ -860,7 +860,8 @@ func (s *RegionRequestSender) SendReqCtx(
if req.InputRequestSource != "" && s.replicaSelector != nil {
patchRequestSource(req, s.replicaSelector.replicaType())
}
if err := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); err != nil {
// RPCClient.SendRequest will attach `req.Context` thus skip attaching here to reduce overhead.
if err := tikvrpc.SetContextNoAttach(req, rpcCtx.Meta, rpcCtx.Peer); err != nil {
return nil, nil, retryTimes, err
}
if s.replicaSelector != nil {

View File

@ -132,6 +132,7 @@ func (f *fnClient) CloseAddrVer(addr string, ver uint64) error {
func (f *fnClient) SetEventListener(listener client.ClientEventListener) {}
func (f *fnClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
tikvrpc.AttachContext(req, req.Context)
return f.fn(ctx, addr, req, timeout)
}

View File

@ -733,6 +733,8 @@ func (c *RPCClient) checkArgs(ctx context.Context, addr string) (*Session, error
// SendRequest sends a request to mock cluster.
func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
tikvrpc.AttachContext(req, req.Context)
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("RPCClient.SendRequest", opentracing.ChildOf(span.Context()))
defer span1.Finish()

View File

@ -59,6 +59,8 @@ func (c *CodecClient) SendRequest(ctx context.Context, addr string, req *tikvrpc
if err != nil {
return nil, err
}
// TODO(zyguan): since unistore does not attach context yet, here we attach the context manually to make integration tests pass.
tikvrpc.AttachContext(req, req.Context)
resp, err := c.Client.SendRequest(ctx, addr, req, timeout)
if err != nil {
return nil, err

View File

@ -382,3 +382,94 @@ func patchCmdCtx(req *Request, cmd CmdType, ctx *kvrpcpb.Context) bool {
}
return true
}
func isValidReqType(cmd CmdType) bool {
switch cmd {
case CmdGet:
return true
case CmdScan:
return true
case CmdPrewrite:
return true
case CmdPessimisticLock:
return true
case CmdPessimisticRollback:
return true
case CmdCommit:
return true
case CmdCleanup:
return true
case CmdBatchGet:
return true
case CmdBatchRollback:
return true
case CmdScanLock:
return true
case CmdResolveLock:
return true
case CmdGC:
return true
case CmdDeleteRange:
return true
case CmdRawGet:
return true
case CmdRawBatchGet:
return true
case CmdRawPut:
return true
case CmdRawBatchPut:
return true
case CmdRawDelete:
return true
case CmdRawBatchDelete:
return true
case CmdRawDeleteRange:
return true
case CmdRawScan:
return true
case CmdRawGetKeyTTL:
return true
case CmdRawCompareAndSwap:
return true
case CmdRawChecksum:
return true
case CmdUnsafeDestroyRange:
return true
case CmdRegisterLockObserver:
return true
case CmdCheckLockObserver:
return true
case CmdRemoveLockObserver:
return true
case CmdPhysicalScanLock:
return true
case CmdCop:
return true
case CmdBatchCop:
return true
case CmdMvccGetByKey:
return true
case CmdMvccGetByStartTs:
return true
case CmdSplitRegion:
return true
case CmdTxnHeartBeat:
return true
case CmdCheckTxnStatus:
return true
case CmdCheckSecondaryLocks:
return true
case CmdFlashbackToVersion:
return true
case CmdPrepareFlashbackToVersion:
return true
case CmdFlush:
return true
case CmdBufferBatchGet:
return true
case CmdCopStream, CmdMPPTask, CmdMPPConn, CmdMPPCancel, CmdMPPAlive, CmdEmpty:
return true
default:
return false
}
}

View File

@ -97,3 +97,25 @@ cat <<EOF >> $output
return true
}
EOF
cat <<EOF >> $output
func isValidReqType(cmd CmdType) bool {
switch cmd {
EOF
for cmd in "${cmds[@]}"; do
cat <<EOF >> $output
case Cmd${cmd}:
return true
EOF
done
cat <<EOF >> $output
case CmdCopStream, CmdMPPTask, CmdMPPConn, CmdMPPCancel, CmdMPPAlive, CmdEmpty:
return true
default:
return false
}
}
EOF

View File

@ -804,6 +804,8 @@ func AttachContext(req *Request, rpcCtx kvrpcpb.Context) bool {
}
// SetContext set the Context field for the given req to the specified ctx.
//
// Deprecated: use SetContextNoAttach instead, RPCClient will call AttachContext(req, req.Context).
func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error {
if region != nil {
req.Context.RegionId = region.Id
@ -818,6 +820,19 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error {
return nil
}
// SetContextNoAttach likes SetContext, but it doesn't attach the context to the underlying request.
func SetContextNoAttach(req *Request, region *metapb.Region, peer *metapb.Peer) error {
if !isValidReqType(req.Type) {
return errors.Errorf("invalid request type %v", req.Type)
}
if region != nil {
req.Context.RegionId = region.Id
req.Context.RegionEpoch = region.RegionEpoch
}
req.Context.Peer = peer
return nil
}
// GenRegionErrorResp returns corresponding Response with specified RegionError
// according to the given req.
func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) {