diff --git a/rawkv/rawkv.go b/rawkv/rawkv.go index e9150cb7..09742e5d 100644 --- a/rawkv/rawkv.go +++ b/rawkv/rawkv.go @@ -302,6 +302,53 @@ func (c *Client) Scan(startKey, endKey []byte, limit int) (keys [][]byte, values return } +// ReverseScan queries continuous kv pairs in range [endKey, startKey), up to limit pairs. +// Direction is different from Scan, upper to lower. +// If endKey is empty, it means unbounded. +// If you want to include the startKey or exclude the endKey, append a '\0' to the key. For example, to scan +// (endKey, startKey], you can write: +// `ReverseScan(append(startKey, '\0'), append(endKey, '\0'), limit)`. +// It doesn't support Scanning from "", because locating the last Region is not yet implemented. +func (c *Client) ReverseScan(startKey, endKey []byte, limit int) (keys [][]byte, values [][]byte, err error) { + start := time.Now() + defer func() { + metrics.RawkvCmdHistogram.WithLabelValues("raw_reverse_scan").Observe(time.Since(start).Seconds()) + }() + + if limit > config.MaxRawKVScanLimit { + return nil, nil, errors.WithStack(ErrMaxScanLimitExceeded) + } + + for len(keys) < limit { + req := &rpc.Request{ + Type: rpc.CmdRawScan, + RawScan: &kvrpcpb.RawScanRequest{ + StartKey: startKey, + EndKey: endKey, + Limit: uint32(limit - len(keys)), + Reverse: true, + }, + } + resp, loc, err := c.sendReq(startKey, req) + if err != nil { + return nil, nil, err + } + cmdResp := resp.RawScan + if cmdResp == nil { + return nil, nil, errors.WithStack(rpc.ErrBodyMissing) + } + for _, pair := range cmdResp.Kvs { + keys = append(keys, pair.Key) + values = append(values, pair.Value) + } + startKey = loc.EndKey + if len(startKey) == 0 { + break + } + } + return +} + func (c *Client) sendReq(key []byte, req *rpc.Request) (*rpc.Response, *locate.KeyLocation, error) { bo := retry.NewBackoffer(context.Background(), retry.RawkvMaxBackoff) sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient) diff --git a/rawkv/rawkv_test.go b/rawkv/rawkv_test.go index af3f4770..22aafb69 100644 --- a/rawkv/rawkv_test.go +++ b/rawkv/rawkv_test.go @@ -129,6 +129,26 @@ func (s *testRawKVSuite) mustScanRange(c *C, startKey string, endKey string, lim } } +func (s *testRawKVSuite) mustReverseScan(c *C, startKey []byte, limit int, expect ...string) { + keys, values, err := s.client.ReverseScan(startKey, nil, limit) + c.Assert(err, IsNil) + c.Assert(len(keys)*2, Equals, len(expect)) + for i := range keys { + c.Assert(string(keys[i]), Equals, expect[i*2]) + c.Assert(string(values[i]), Equals, expect[i*2+1]) + } +} + +func (s *testRawKVSuite) mustReverseScanRange(c *C, startKey, endKey []byte, limit int, expect ...string) { + keys, values, err := s.client.ReverseScan(startKey, endKey, limit) + c.Assert(err, IsNil) + c.Assert(len(keys)*2, Equals, len(expect)) + for i := range keys { + c.Assert(string(keys[i]), Equals, expect[i*2]) + c.Assert(string(values[i]), Equals, expect[i*2+1]) + } +} + func (s *testRawKVSuite) mustDeleteRange(c *C, startKey, endKey []byte, expected map[string]string) { err := s.client.DeleteRange(startKey, endKey) c.Assert(err, IsNil)