[RawKV] support checksum (#519)

Signed-off-by: haojinming <jinming.hao@pingcap.com>

Co-authored-by: iosmanthus <dengliming@pingcap.com>
This commit is contained in:
haojinming 2022-06-22 16:08:35 +08:00 committed by GitHub
parent 98a4e2776e
commit 681fb6e537
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 209 additions and 0 deletions

View File

@ -296,6 +296,7 @@ type RawKV interface {
RawBatchDelete(cf string, keys [][]byte) RawBatchDelete(cf string, keys [][]byte)
RawDeleteRange(cf string, startKey, endKey []byte) RawDeleteRange(cf string, startKey, endKey []byte)
RawCompareAndSwap(cf string, key, expectedValue, newvalue []byte) ([]byte, bool, error) RawCompareAndSwap(cf string, key, expectedValue, newvalue []byte) ([]byte, bool, error)
RawChecksum(cf string, startKey, endKey []byte) (uint64, uint64, uint64, error)
} }
// MVCCDebugger is for debugging. // MVCCDebugger is for debugging.

View File

@ -36,6 +36,7 @@ package mocktikv
import ( import (
"bytes" "bytes"
"hash/crc64"
"math" "math"
"sync" "sync"
@ -1778,6 +1779,44 @@ func (mvcc *MVCCLevelDB) RawReverseScan(cf string, startKey, endKey []byte, limi
return pairs return pairs
} }
// RawChecksum implements the RawKV interface.
func (mvcc *MVCCLevelDB) RawChecksum(cf string, startKey, endKey []byte) (uint64, uint64, uint64, error) {
mvcc.mu.Lock()
defer mvcc.mu.Unlock()
db := mvcc.getDB(cf)
if db == nil {
return 0, 0, 0, nil
}
iter := db.NewIterator(&util.Range{
Start: startKey,
}, nil)
crc64Xor := uint64(0)
totalKvs := uint64(0)
totalBytes := uint64(0)
digest := crc64.New(crc64.MakeTable(crc64.ECMA))
for iter.Next() {
key := iter.Key()
value := iter.Value()
err := iter.Error()
if err != nil {
return 0, 0, 0, err
}
if len(endKey) > 0 && bytes.Compare(key, endKey) >= 0 {
break
}
digest.Reset()
digest.Write(key)
digest.Write(value)
crc64Xor ^= digest.Sum64()
totalKvs++
totalBytes += (uint64)(len(key) + len(value))
}
return crc64Xor, totalKvs, totalBytes, nil
}
// RawDeleteRange implements the RawKV interface. // RawDeleteRange implements the RawKV interface.
func (mvcc *MVCCLevelDB) RawDeleteRange(cf string, startKey, endKey []byte) { func (mvcc *MVCCLevelDB) RawDeleteRange(cf string, startKey, endKey []byte) {
tikverr.Log(mvcc.doRawDeleteRange(cf, startKey, endKey)) tikverr.Log(mvcc.doRawDeleteRange(cf, startKey, endKey))

View File

@ -596,6 +596,48 @@ func (h kvHandler) handleKvRawScan(req *kvrpcpb.RawScanRequest) *kvrpcpb.RawScan
} }
} }
func (h kvHandler) handleKvRawChecksum(req *kvrpcpb.RawChecksumRequest) *kvrpcpb.RawChecksumResponse {
rawKV, ok := h.mvccStore.(RawKV)
if !ok {
errStr := "not implemented"
return &kvrpcpb.RawChecksumResponse{
RegionError: &errorpb.Error{
Message: errStr,
},
}
}
crc64Xor := uint64(0)
totalKvs := uint64(0)
totalBytes := uint64(0)
for _, r := range req.Ranges {
upperBound := h.endKey
if len(r.EndKey) > 0 && (len(upperBound) == 0 || bytes.Compare(r.EndKey, upperBound) < 0) {
upperBound = r.EndKey
}
rangeCrc64Xor, rangeKvs, rangeBytes, err := rawKV.RawChecksum(
"CF_DEFAULT",
r.StartKey,
upperBound,
)
if err != nil {
return &kvrpcpb.RawChecksumResponse{
RegionError: &errorpb.Error{
Message: err.Error(),
},
}
}
crc64Xor ^= rangeCrc64Xor
totalKvs += rangeKvs
totalBytes += rangeBytes
}
return &kvrpcpb.RawChecksumResponse{
Checksum: crc64Xor,
TotalKvs: totalKvs,
TotalBytes: totalBytes,
}
}
func (h kvHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.SplitRegionResponse { func (h kvHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.SplitRegionResponse {
keys := req.GetSplitKeys() keys := req.GetSplitKeys()
resp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)+1)} resp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)+1)}
@ -933,6 +975,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
return resp, nil return resp, nil
} }
resp.Resp = kvHandler{session}.HandleKvRawCompareAndSwap(r) resp.Resp = kvHandler{session}.HandleKvRawCompareAndSwap(r)
case tikvrpc.CmdRawChecksum:
r := req.RawChecksum()
if err := session.checkRequest(reqCtx, r.Size()); err != nil {
resp.Resp = &kvrpcpb.RawScanResponse{RegionError: err}
return resp, nil
}
resp.Resp = kvHandler{session}.handleKvRawChecksum(r)
case tikvrpc.CmdUnsafeDestroyRange: case tikvrpc.CmdUnsafeDestroyRange:
panic("unimplemented") panic("unimplemented")
case tikvrpc.CmdRegisterLockObserver: case tikvrpc.CmdRegisterLockObserver:

View File

@ -53,6 +53,7 @@ var (
RawkvCmdHistogramWithRawReversScan prometheus.Observer RawkvCmdHistogramWithRawReversScan prometheus.Observer
RawkvSizeHistogramWithKey prometheus.Observer RawkvSizeHistogramWithKey prometheus.Observer
RawkvSizeHistogramWithValue prometheus.Observer RawkvSizeHistogramWithValue prometheus.Observer
RawkvCmdHistogramWithRawChecksum prometheus.Observer
BackoffHistogramRPC prometheus.Observer BackoffHistogramRPC prometheus.Observer
BackoffHistogramLock prometheus.Observer BackoffHistogramLock prometheus.Observer
@ -152,6 +153,7 @@ func initShortcuts() {
RawkvCmdHistogramWithRawReversScan = TiKVRawkvCmdHistogram.WithLabelValues("raw_reverse_scan") RawkvCmdHistogramWithRawReversScan = TiKVRawkvCmdHistogram.WithLabelValues("raw_reverse_scan")
RawkvSizeHistogramWithKey = TiKVRawkvSizeHistogram.WithLabelValues("key") RawkvSizeHistogramWithKey = TiKVRawkvSizeHistogram.WithLabelValues("key")
RawkvSizeHistogramWithValue = TiKVRawkvSizeHistogram.WithLabelValues("value") RawkvSizeHistogramWithValue = TiKVRawkvSizeHistogram.WithLabelValues("value")
RawkvCmdHistogramWithRawChecksum = TiKVRawkvSizeHistogram.WithLabelValues("raw_checksum")
BackoffHistogramRPC = TiKVBackoffHistogram.WithLabelValues("tikvRPC") BackoffHistogramRPC = TiKVBackoffHistogram.WithLabelValues("tikvRPC")
BackoffHistogramLock = TiKVBackoffHistogram.WithLabelValues("txnLock") BackoffHistogramLock = TiKVBackoffHistogram.WithLabelValues("txnLock")

View File

@ -74,6 +74,16 @@ type rawOptions struct {
KeyOnly bool KeyOnly bool
} }
// RawChecksum represents the checksum result of raw kv pairs in TiKV cluster.
type RawChecksum struct {
// Crc64Xor is the checksum result with crc64 algorithm
Crc64Xor uint64
// TotalKvs is the total number of kvpairs
TotalKvs uint64
// TotalBytes is the total bytes of kvpairs, including prefix in APIV2
TotalBytes uint64
}
// RawOption represents possible options that can be cotrolled by the user // RawOption represents possible options that can be cotrolled by the user
// to tweak the API behavior. // to tweak the API behavior.
// //
@ -504,6 +514,44 @@ func (c *Client) ReverseScan(ctx context.Context, startKey, endKey []byte, limit
return return
} }
// Checksum do checksum of continuous kv pairs in range [startKey, endKey).
// If endKey is empty, it means unbounded.
// If you want to exclude the startKey or include the endKey, push a '\0' to the key. For example, to scan
// (startKey, endKey], you can write:
// `Checksum(ctx, push(startKey, '\0'), push(endKey, '\0'))`.
func (c *Client) Checksum(ctx context.Context, startKey, endKey []byte, options ...RawOption,
) (check RawChecksum, err error) {
start := time.Now()
defer func() { metrics.RawkvCmdHistogramWithRawChecksum.Observe(time.Since(start).Seconds()) }()
for len(endKey) == 0 || bytes.Compare(startKey, endKey) < 0 {
req := tikvrpc.NewRequest(tikvrpc.CmdRawChecksum, &kvrpcpb.RawChecksumRequest{
Algorithm: kvrpcpb.ChecksumAlgorithm_Crc64_Xor,
Ranges: []*kvrpcpb.KeyRange{{
StartKey: startKey,
EndKey: endKey,
}},
})
resp, loc, err := c.sendReq(ctx, startKey, req, false)
if err != nil {
return RawChecksum{0, 0, 0}, err
}
if resp.Resp == nil {
return RawChecksum{0, 0, 0}, errors.WithStack(tikverr.ErrBodyMissing)
}
cmdResp := resp.Resp.(*kvrpcpb.RawChecksumResponse)
check.Crc64Xor ^= cmdResp.GetChecksum()
check.TotalKvs += cmdResp.GetTotalKvs()
check.TotalBytes += cmdResp.GetTotalBytes()
startKey = loc.EndKey
if len(startKey) == 0 {
break
}
}
return
}
// CompareAndSwap results in an atomic compare-and-set operation for the given key while SetAtomicForCAS(true) // CompareAndSwap results in an atomic compare-and-set operation for the given key while SetAtomicForCAS(true)
// If the value retrieved is equal to previousValue, newValue is written. // If the value retrieved is equal to previousValue, newValue is written.
// It returns the previous value and whether the value is successfully swapped. // It returns the previous value and whether the value is successfully swapped.

View File

@ -38,6 +38,7 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"hash/crc64"
"testing" "testing"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
@ -571,3 +572,56 @@ func (s *testRawkvSuite) TestCompareAndSwap() {
s.Nil(err) s.Nil(err)
s.Equal(string(v), string(newValue)) s.Equal(string(v), string(newValue))
} }
func (s *testRawkvSuite) TestRawChecksum() {
mvccStore := mocktikv.MustNewMVCCStore()
defer mvccStore.Close()
client := &Client{
clusterID: 0,
regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)),
rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil),
}
defer client.Close()
cf := "CF_DEFAULT"
paris := map[string]string{
"db": "TiDB",
"key2": "value2",
"key1": "value1",
"key4": "value4",
"key3": "value3",
"kv": "TiKV",
}
keys := make([]key, 0)
values := make([]value, 0)
for k, v := range paris {
keys = append(keys, []byte(k))
values = append(values, []byte(v))
}
expectCrc64Xor := uint64(0)
expectTotalKvs := uint64(0)
expectTotalBytes := uint64(0)
digest := crc64.New(crc64.MakeTable(crc64.ECMA))
for i, key := range keys {
digest.Reset()
digest.Write(key)
digest.Write(values[i])
expectCrc64Xor ^= digest.Sum64()
expectTotalKvs++
expectTotalBytes += (uint64)(len(key) + len(values[i]))
}
// BatchPut
err := client.BatchPut(context.Background(), keys, values, SetColumnFamily(cf))
s.Nil(err)
// test Checksum
startKey, endKey := []byte("db"), []byte(nil)
check, err := client.Checksum(context.Background(), startKey, endKey, SetColumnFamily(cf))
s.Nil(err)
s.Equal(expectCrc64Xor, check.Crc64Xor)
s.Equal(expectTotalKvs, check.TotalKvs)
s.Equal(expectTotalBytes, check.TotalBytes)
}

View File

@ -83,6 +83,7 @@ const (
CmdRawScan CmdRawScan
CmdGetKeyTTL CmdGetKeyTTL
CmdRawCompareAndSwap CmdRawCompareAndSwap
CmdRawChecksum
CmdUnsafeDestroyRange CmdUnsafeDestroyRange
@ -156,6 +157,8 @@ func (t CmdType) String() string {
return "RawDeleteRange" return "RawDeleteRange"
case CmdRawScan: case CmdRawScan:
return "RawScan" return "RawScan"
case CmdRawChecksum:
return "RawChecksum"
case CmdUnsafeDestroyRange: case CmdUnsafeDestroyRange:
return "UnsafeDestroyRange" return "UnsafeDestroyRange"
case CmdRegisterLockObserver: case CmdRegisterLockObserver:
@ -388,6 +391,11 @@ func (req *Request) RawCompareAndSwap() *kvrpcpb.RawCASRequest {
return req.Req.(*kvrpcpb.RawCASRequest) return req.Req.(*kvrpcpb.RawCASRequest)
} }
// RawChecksum returns RawChecksumRequest in request.
func (req *Request) RawChecksum() *kvrpcpb.RawChecksumRequest {
return req.Req.(*kvrpcpb.RawChecksumRequest)
}
// RegisterLockObserver returns RegisterLockObserverRequest in request. // RegisterLockObserver returns RegisterLockObserverRequest in request.
func (req *Request) RegisterLockObserver() *kvrpcpb.RegisterLockObserverRequest { func (req *Request) RegisterLockObserver() *kvrpcpb.RegisterLockObserverRequest {
return req.Req.(*kvrpcpb.RegisterLockObserverRequest) return req.Req.(*kvrpcpb.RegisterLockObserverRequest)
@ -713,6 +721,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error {
req.RawGetKeyTTL().Context = ctx req.RawGetKeyTTL().Context = ctx
case CmdRawCompareAndSwap: case CmdRawCompareAndSwap:
req.RawCompareAndSwap().Context = ctx req.RawCompareAndSwap().Context = ctx
case CmdRawChecksum:
req.RawChecksum().Context = ctx
case CmdRegisterLockObserver: case CmdRegisterLockObserver:
req.RegisterLockObserver().Context = ctx req.RegisterLockObserver().Context = ctx
case CmdCheckLockObserver: case CmdCheckLockObserver:
@ -851,6 +861,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) {
p = &kvrpcpb.RawCASResponse{ p = &kvrpcpb.RawCASResponse{
RegionError: e, RegionError: e,
} }
case CmdRawChecksum:
p = &kvrpcpb.RawChecksumResponse{
RegionError: e,
}
case CmdCop: case CmdCop:
p = &coprocessor.Response{ p = &coprocessor.Response{
RegionError: e, RegionError: e,
@ -967,6 +981,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp
resp.Resp, err = client.RawGetKeyTTL(ctx, req.RawGetKeyTTL()) resp.Resp, err = client.RawGetKeyTTL(ctx, req.RawGetKeyTTL())
case CmdRawCompareAndSwap: case CmdRawCompareAndSwap:
resp.Resp, err = client.RawCompareAndSwap(ctx, req.RawCompareAndSwap()) resp.Resp, err = client.RawCompareAndSwap(ctx, req.RawCompareAndSwap())
case CmdRawChecksum:
resp.Resp, err = client.RawChecksum(ctx, req.RawChecksum())
case CmdRegisterLockObserver: case CmdRegisterLockObserver:
resp.Resp, err = client.RegisterLockObserver(ctx, req.RegisterLockObserver()) resp.Resp, err = client.RegisterLockObserver(ctx, req.RegisterLockObserver())
case CmdCheckLockObserver: case CmdCheckLockObserver: