*: Support kv_prepare_flashback_to_version cmd (#592)

Signed-off-by: Hangjie Mo <mohangjie1995@gmail.com>

Signed-off-by: Hangjie Mo <mohangjie1995@gmail.com>
This commit is contained in:
Hangjie Mo 2022-09-29 19:13:18 +08:00 committed by GitHub
parent 33efe476e0
commit 51f3bd3944
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 32 additions and 7 deletions

2
go.mod
View File

@ -13,7 +13,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20220908075542-7c004f4daf21
github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0

4
go.sum
View File

@ -155,8 +155,8 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20220510035547-0e2f26c0a46a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220908075542-7c004f4daf21 h1:HfAWnlVF7P1nNJvXP4ew1Lcnng/BnAVQ40AsUHKR5EA=
github.com/pingcap/kvproto v0.0.0-20220908075542-7c004f4daf21/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c h1:ceg4xjEEXNgPsScTQ5dtidiltLF4h17Y/jUqfyLAy9E=
github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=

View File

@ -6,7 +6,7 @@ require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/kvproto v0.0.0-20220908075542-7c004f4daf21
github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c
github.com/pingcap/tidb v1.1.0-beta.0.20220902042024-0482b2e83ed2
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.0

View File

@ -301,8 +301,8 @@ github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 h1:Pe2LbxRmbTfAoKJ65bZL
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20220510035547-0e2f26c0a46a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220908075542-7c004f4daf21 h1:HfAWnlVF7P1nNJvXP4ew1Lcnng/BnAVQ40AsUHKR5EA=
github.com/pingcap/kvproto v0.0.0-20220908075542-7c004f4daf21/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c h1:ceg4xjEEXNgPsScTQ5dtidiltLF4h17Y/jUqfyLAy9E=
github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.0 h1:ELiPxACz7vdo1qAvvaWJg1NrYFoY6gqAh/+Uo6aXdD8=

View File

@ -474,6 +474,10 @@ func (s *mockTikvGrpcServer) KvFlashbackToVersion(context.Context, *kvrpcpb.Flas
return nil, errors.New("unreachable")
}
func (s *mockTikvGrpcServer) KvPrepareFlashbackToVersion(context.Context, *kvrpcpb.PrepareFlashbackToVersionRequest) (*kvrpcpb.PrepareFlashbackToVersionResponse, error) {
return nil, errors.New("unreachable")
}
func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCanceled() {
// prepare a mock tikv grpc server
addr := "localhost:56341"

View File

@ -73,6 +73,7 @@ const (
CmdCheckTxnStatus
CmdCheckSecondaryLocks
CmdFlashbackToVersion
CmdPrepareFlashbackToVersion
CmdRawGet CmdType = 256 + iota
CmdRawBatchGet
@ -206,6 +207,8 @@ func (t CmdType) String() string {
return "LockWaitInfo"
case CmdFlashbackToVersion:
return "FlashbackToVersion"
case CmdPrepareFlashbackToVersion:
return "PrepareFlashbackToVersion"
}
return "Unknown"
}
@ -519,6 +522,11 @@ func (req *Request) FlashbackToVersion() *kvrpcpb.FlashbackToVersionRequest {
return req.Req.(*kvrpcpb.FlashbackToVersionRequest)
}
// PrepareFlashbackToVersion returns PrepareFlashbackToVersion in request.
func (req *Request) PrepareFlashbackToVersion() *kvrpcpb.PrepareFlashbackToVersionRequest {
return req.Req.(*kvrpcpb.PrepareFlashbackToVersionRequest)
}
// ToBatchCommandsRequest converts the request to an entry in BatchCommands request.
func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Request {
switch req.Type {
@ -576,6 +584,8 @@ func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Reques
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_TxnHeartBeat{TxnHeartBeat: req.TxnHeartBeat()}}
case CmdFlashbackToVersion:
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_FlashbackToVersion{FlashbackToVersion: req.FlashbackToVersion()}}
case CmdPrepareFlashbackToVersion:
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_PrepareFlashbackToVersion{PrepareFlashbackToVersion: req.PrepareFlashbackToVersion()}}
}
return nil
}
@ -615,6 +625,8 @@ func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) (*Res
return &Response{Resp: res.DeleteRange}, nil
case *tikvpb.BatchCommandsResponse_Response_FlashbackToVersion:
return &Response{Resp: res.FlashbackToVersion}, nil
case *tikvpb.BatchCommandsResponse_Response_PrepareFlashbackToVersion:
return &Response{Resp: res.PrepareFlashbackToVersion}, nil
case *tikvpb.BatchCommandsResponse_Response_RawGet:
return &Response{Resp: res.RawGet}, nil
case *tikvpb.BatchCommandsResponse_Response_RawBatchGet:
@ -767,6 +779,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error {
req.CheckSecondaryLocks().Context = ctx
case CmdFlashbackToVersion:
req.FlashbackToVersion().Context = ctx
case CmdPrepareFlashbackToVersion:
req.PrepareFlashbackToVersion().Context = ctx
default:
return errors.Errorf("invalid request type %v", req.Type)
}
@ -918,6 +932,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) {
p = &kvrpcpb.FlashbackToVersionResponse{
RegionError: e,
}
case CmdPrepareFlashbackToVersion:
p = &kvrpcpb.PrepareFlashbackToVersionResponse{
RegionError: e,
}
default:
return nil, errors.Errorf("invalid request type %v", req.Type)
}
@ -1074,6 +1092,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp
resp.Resp, err = client.Compact(ctx, req.Compact())
case CmdFlashbackToVersion:
resp.Resp, err = client.KvFlashbackToVersion(ctx, req.FlashbackToVersion())
case CmdPrepareFlashbackToVersion:
resp.Resp, err = client.KvPrepareFlashbackToVersion(ctx, req.PrepareFlashbackToVersion())
default:
return nil, errors.Errorf("invalid request type: %v", req.Type)
}
@ -1236,7 +1256,8 @@ func (req *Request) IsTxnWriteRequest() bool {
req.Type == CmdCleanup ||
req.Type == CmdTxnHeartBeat ||
req.Type == CmdResolveLock ||
req.Type == CmdFlashbackToVersion {
req.Type == CmdFlashbackToVersion ||
req.Type == CmdPrepareFlashbackToVersion {
return true
}
return false