From 681fb6e537b5b114f07cbc9eb0d6564742a470aa Mon Sep 17 00:00:00 2001 From: haojinming Date: Wed, 22 Jun 2022 16:08:35 +0800 Subject: [PATCH] [RawKV] support checksum (#519) Signed-off-by: haojinming Co-authored-by: iosmanthus --- internal/mockstore/mocktikv/mvcc.go | 1 + internal/mockstore/mocktikv/mvcc_leveldb.go | 39 +++++++++++++++ internal/mockstore/mocktikv/rpc.go | 49 +++++++++++++++++++ metrics/shortcuts.go | 2 + rawkv/rawkv.go | 48 ++++++++++++++++++ rawkv/rawkv_test.go | 54 +++++++++++++++++++++ tikvrpc/tikvrpc.go | 16 ++++++ 7 files changed, 209 insertions(+) diff --git a/internal/mockstore/mocktikv/mvcc.go b/internal/mockstore/mocktikv/mvcc.go index e0a7ee65..c279efb2 100644 --- a/internal/mockstore/mocktikv/mvcc.go +++ b/internal/mockstore/mocktikv/mvcc.go @@ -296,6 +296,7 @@ type RawKV interface { RawBatchDelete(cf string, keys [][]byte) RawDeleteRange(cf string, startKey, endKey []byte) RawCompareAndSwap(cf string, key, expectedValue, newvalue []byte) ([]byte, bool, error) + RawChecksum(cf string, startKey, endKey []byte) (uint64, uint64, uint64, error) } // MVCCDebugger is for debugging. diff --git a/internal/mockstore/mocktikv/mvcc_leveldb.go b/internal/mockstore/mocktikv/mvcc_leveldb.go index cd343801..aa17fde3 100644 --- a/internal/mockstore/mocktikv/mvcc_leveldb.go +++ b/internal/mockstore/mocktikv/mvcc_leveldb.go @@ -36,6 +36,7 @@ package mocktikv import ( "bytes" + "hash/crc64" "math" "sync" @@ -1778,6 +1779,44 @@ func (mvcc *MVCCLevelDB) RawReverseScan(cf string, startKey, endKey []byte, limi return pairs } +// RawChecksum implements the RawKV interface. +func (mvcc *MVCCLevelDB) RawChecksum(cf string, startKey, endKey []byte) (uint64, uint64, uint64, error) { + mvcc.mu.Lock() + defer mvcc.mu.Unlock() + + db := mvcc.getDB(cf) + if db == nil { + return 0, 0, 0, nil + } + + iter := db.NewIterator(&util.Range{ + Start: startKey, + }, nil) + + crc64Xor := uint64(0) + totalKvs := uint64(0) + totalBytes := uint64(0) + digest := crc64.New(crc64.MakeTable(crc64.ECMA)) + for iter.Next() { + key := iter.Key() + value := iter.Value() + err := iter.Error() + if err != nil { + return 0, 0, 0, err + } + if len(endKey) > 0 && bytes.Compare(key, endKey) >= 0 { + break + } + digest.Reset() + digest.Write(key) + digest.Write(value) + crc64Xor ^= digest.Sum64() + totalKvs++ + totalBytes += (uint64)(len(key) + len(value)) + } + return crc64Xor, totalKvs, totalBytes, nil +} + // RawDeleteRange implements the RawKV interface. func (mvcc *MVCCLevelDB) RawDeleteRange(cf string, startKey, endKey []byte) { tikverr.Log(mvcc.doRawDeleteRange(cf, startKey, endKey)) diff --git a/internal/mockstore/mocktikv/rpc.go b/internal/mockstore/mocktikv/rpc.go index f17d00d6..7b5b0cc1 100644 --- a/internal/mockstore/mocktikv/rpc.go +++ b/internal/mockstore/mocktikv/rpc.go @@ -596,6 +596,48 @@ func (h kvHandler) handleKvRawScan(req *kvrpcpb.RawScanRequest) *kvrpcpb.RawScan } } +func (h kvHandler) handleKvRawChecksum(req *kvrpcpb.RawChecksumRequest) *kvrpcpb.RawChecksumResponse { + rawKV, ok := h.mvccStore.(RawKV) + if !ok { + errStr := "not implemented" + return &kvrpcpb.RawChecksumResponse{ + RegionError: &errorpb.Error{ + Message: errStr, + }, + } + } + + crc64Xor := uint64(0) + totalKvs := uint64(0) + totalBytes := uint64(0) + for _, r := range req.Ranges { + upperBound := h.endKey + if len(r.EndKey) > 0 && (len(upperBound) == 0 || bytes.Compare(r.EndKey, upperBound) < 0) { + upperBound = r.EndKey + } + rangeCrc64Xor, rangeKvs, rangeBytes, err := rawKV.RawChecksum( + "CF_DEFAULT", + r.StartKey, + upperBound, + ) + if err != nil { + return &kvrpcpb.RawChecksumResponse{ + RegionError: &errorpb.Error{ + Message: err.Error(), + }, + } + } + crc64Xor ^= rangeCrc64Xor + totalKvs += rangeKvs + totalBytes += rangeBytes + } + return &kvrpcpb.RawChecksumResponse{ + Checksum: crc64Xor, + TotalKvs: totalKvs, + TotalBytes: totalBytes, + } +} + func (h kvHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.SplitRegionResponse { keys := req.GetSplitKeys() resp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)+1)} @@ -933,6 +975,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R return resp, nil } resp.Resp = kvHandler{session}.HandleKvRawCompareAndSwap(r) + case tikvrpc.CmdRawChecksum: + r := req.RawChecksum() + if err := session.checkRequest(reqCtx, r.Size()); err != nil { + resp.Resp = &kvrpcpb.RawScanResponse{RegionError: err} + return resp, nil + } + resp.Resp = kvHandler{session}.handleKvRawChecksum(r) case tikvrpc.CmdUnsafeDestroyRange: panic("unimplemented") case tikvrpc.CmdRegisterLockObserver: diff --git a/metrics/shortcuts.go b/metrics/shortcuts.go index f7762618..8ff83856 100644 --- a/metrics/shortcuts.go +++ b/metrics/shortcuts.go @@ -53,6 +53,7 @@ var ( RawkvCmdHistogramWithRawReversScan prometheus.Observer RawkvSizeHistogramWithKey prometheus.Observer RawkvSizeHistogramWithValue prometheus.Observer + RawkvCmdHistogramWithRawChecksum prometheus.Observer BackoffHistogramRPC prometheus.Observer BackoffHistogramLock prometheus.Observer @@ -152,6 +153,7 @@ func initShortcuts() { RawkvCmdHistogramWithRawReversScan = TiKVRawkvCmdHistogram.WithLabelValues("raw_reverse_scan") RawkvSizeHistogramWithKey = TiKVRawkvSizeHistogram.WithLabelValues("key") RawkvSizeHistogramWithValue = TiKVRawkvSizeHistogram.WithLabelValues("value") + RawkvCmdHistogramWithRawChecksum = TiKVRawkvSizeHistogram.WithLabelValues("raw_checksum") BackoffHistogramRPC = TiKVBackoffHistogram.WithLabelValues("tikvRPC") BackoffHistogramLock = TiKVBackoffHistogram.WithLabelValues("txnLock") diff --git a/rawkv/rawkv.go b/rawkv/rawkv.go index a93c07f1..26117c6f 100644 --- a/rawkv/rawkv.go +++ b/rawkv/rawkv.go @@ -74,6 +74,16 @@ type rawOptions struct { KeyOnly bool } +// RawChecksum represents the checksum result of raw kv pairs in TiKV cluster. +type RawChecksum struct { + // Crc64Xor is the checksum result with crc64 algorithm + Crc64Xor uint64 + // TotalKvs is the total number of kvpairs + TotalKvs uint64 + // TotalBytes is the total bytes of kvpairs, including prefix in APIV2 + TotalBytes uint64 +} + // RawOption represents possible options that can be cotrolled by the user // to tweak the API behavior. // @@ -504,6 +514,44 @@ func (c *Client) ReverseScan(ctx context.Context, startKey, endKey []byte, limit return } +// Checksum do checksum of continuous kv pairs in range [startKey, endKey). +// If endKey is empty, it means unbounded. +// If you want to exclude the startKey or include the endKey, push a '\0' to the key. For example, to scan +// (startKey, endKey], you can write: +// `Checksum(ctx, push(startKey, '\0'), push(endKey, '\0'))`. +func (c *Client) Checksum(ctx context.Context, startKey, endKey []byte, options ...RawOption, +) (check RawChecksum, err error) { + + start := time.Now() + defer func() { metrics.RawkvCmdHistogramWithRawChecksum.Observe(time.Since(start).Seconds()) }() + + for len(endKey) == 0 || bytes.Compare(startKey, endKey) < 0 { + req := tikvrpc.NewRequest(tikvrpc.CmdRawChecksum, &kvrpcpb.RawChecksumRequest{ + Algorithm: kvrpcpb.ChecksumAlgorithm_Crc64_Xor, + Ranges: []*kvrpcpb.KeyRange{{ + StartKey: startKey, + EndKey: endKey, + }}, + }) + resp, loc, err := c.sendReq(ctx, startKey, req, false) + if err != nil { + return RawChecksum{0, 0, 0}, err + } + if resp.Resp == nil { + return RawChecksum{0, 0, 0}, errors.WithStack(tikverr.ErrBodyMissing) + } + cmdResp := resp.Resp.(*kvrpcpb.RawChecksumResponse) + check.Crc64Xor ^= cmdResp.GetChecksum() + check.TotalKvs += cmdResp.GetTotalKvs() + check.TotalBytes += cmdResp.GetTotalBytes() + startKey = loc.EndKey + if len(startKey) == 0 { + break + } + } + return +} + // CompareAndSwap results in an atomic compare-and-set operation for the given key while SetAtomicForCAS(true) // If the value retrieved is equal to previousValue, newValue is written. // It returns the previous value and whether the value is successfully swapped. diff --git a/rawkv/rawkv_test.go b/rawkv/rawkv_test.go index fb83f605..f5eefaa2 100644 --- a/rawkv/rawkv_test.go +++ b/rawkv/rawkv_test.go @@ -38,6 +38,7 @@ import ( "bytes" "context" "fmt" + "hash/crc64" "testing" "github.com/stretchr/testify/suite" @@ -571,3 +572,56 @@ func (s *testRawkvSuite) TestCompareAndSwap() { s.Nil(err) s.Equal(string(v), string(newValue)) } + +func (s *testRawkvSuite) TestRawChecksum() { + mvccStore := mocktikv.MustNewMVCCStore() + defer mvccStore.Close() + + client := &Client{ + clusterID: 0, + regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), + rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), + } + defer client.Close() + + cf := "CF_DEFAULT" + paris := map[string]string{ + "db": "TiDB", + "key2": "value2", + "key1": "value1", + "key4": "value4", + "key3": "value3", + "kv": "TiKV", + } + keys := make([]key, 0) + values := make([]value, 0) + for k, v := range paris { + keys = append(keys, []byte(k)) + values = append(values, []byte(v)) + } + + expectCrc64Xor := uint64(0) + expectTotalKvs := uint64(0) + expectTotalBytes := uint64(0) + digest := crc64.New(crc64.MakeTable(crc64.ECMA)) + for i, key := range keys { + digest.Reset() + digest.Write(key) + digest.Write(values[i]) + expectCrc64Xor ^= digest.Sum64() + expectTotalKvs++ + expectTotalBytes += (uint64)(len(key) + len(values[i])) + } + + // BatchPut + err := client.BatchPut(context.Background(), keys, values, SetColumnFamily(cf)) + s.Nil(err) + + // test Checksum + startKey, endKey := []byte("db"), []byte(nil) + check, err := client.Checksum(context.Background(), startKey, endKey, SetColumnFamily(cf)) + s.Nil(err) + s.Equal(expectCrc64Xor, check.Crc64Xor) + s.Equal(expectTotalKvs, check.TotalKvs) + s.Equal(expectTotalBytes, check.TotalBytes) +} diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index ccae4ab1..3bcc6d54 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -83,6 +83,7 @@ const ( CmdRawScan CmdGetKeyTTL CmdRawCompareAndSwap + CmdRawChecksum CmdUnsafeDestroyRange @@ -156,6 +157,8 @@ func (t CmdType) String() string { return "RawDeleteRange" case CmdRawScan: return "RawScan" + case CmdRawChecksum: + return "RawChecksum" case CmdUnsafeDestroyRange: return "UnsafeDestroyRange" case CmdRegisterLockObserver: @@ -388,6 +391,11 @@ func (req *Request) RawCompareAndSwap() *kvrpcpb.RawCASRequest { return req.Req.(*kvrpcpb.RawCASRequest) } +// RawChecksum returns RawChecksumRequest in request. +func (req *Request) RawChecksum() *kvrpcpb.RawChecksumRequest { + return req.Req.(*kvrpcpb.RawChecksumRequest) +} + // RegisterLockObserver returns RegisterLockObserverRequest in request. func (req *Request) RegisterLockObserver() *kvrpcpb.RegisterLockObserverRequest { return req.Req.(*kvrpcpb.RegisterLockObserverRequest) @@ -713,6 +721,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { req.RawGetKeyTTL().Context = ctx case CmdRawCompareAndSwap: req.RawCompareAndSwap().Context = ctx + case CmdRawChecksum: + req.RawChecksum().Context = ctx case CmdRegisterLockObserver: req.RegisterLockObserver().Context = ctx case CmdCheckLockObserver: @@ -851,6 +861,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) { p = &kvrpcpb.RawCASResponse{ RegionError: e, } + case CmdRawChecksum: + p = &kvrpcpb.RawChecksumResponse{ + RegionError: e, + } case CmdCop: p = &coprocessor.Response{ RegionError: e, @@ -967,6 +981,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp resp.Resp, err = client.RawGetKeyTTL(ctx, req.RawGetKeyTTL()) case CmdRawCompareAndSwap: resp.Resp, err = client.RawCompareAndSwap(ctx, req.RawCompareAndSwap()) + case CmdRawChecksum: + resp.Resp, err = client.RawChecksum(ctx, req.RawChecksum()) case CmdRegisterLockObserver: resp.Resp, err = client.RegisterLockObserver(ctx, req.RegisterLockObserver()) case CmdCheckLockObserver: