add rawkv/ReverseScan (#13)

Signed-off-by: kamijin_fanta <kamijin@live.jp>
This commit is contained in:
kamijin_fanta 2019-04-19 01:24:11 +09:00 committed by disksing
parent 2130e26d4f
commit 112f5cb76e
2 changed files with 67 additions and 0 deletions

View File

@ -302,6 +302,53 @@ func (c *Client) Scan(startKey, endKey []byte, limit int) (keys [][]byte, values
return
}
// ReverseScan queries continuous kv pairs in range [endKey, startKey), up to limit pairs.
// Direction is different from Scan, upper to lower.
// If endKey is empty, it means unbounded.
// If you want to include the startKey or exclude the endKey, append a '\0' to the key. For example, to scan
// (endKey, startKey], you can write:
// `ReverseScan(append(startKey, '\0'), append(endKey, '\0'), limit)`.
// It doesn't support Scanning from "", because locating the last Region is not yet implemented.
func (c *Client) ReverseScan(startKey, endKey []byte, limit int) (keys [][]byte, values [][]byte, err error) {
start := time.Now()
defer func() {
metrics.RawkvCmdHistogram.WithLabelValues("raw_reverse_scan").Observe(time.Since(start).Seconds())
}()
if limit > config.MaxRawKVScanLimit {
return nil, nil, errors.WithStack(ErrMaxScanLimitExceeded)
}
for len(keys) < limit {
req := &rpc.Request{
Type: rpc.CmdRawScan,
RawScan: &kvrpcpb.RawScanRequest{
StartKey: startKey,
EndKey: endKey,
Limit: uint32(limit - len(keys)),
Reverse: true,
},
}
resp, loc, err := c.sendReq(startKey, req)
if err != nil {
return nil, nil, err
}
cmdResp := resp.RawScan
if cmdResp == nil {
return nil, nil, errors.WithStack(rpc.ErrBodyMissing)
}
for _, pair := range cmdResp.Kvs {
keys = append(keys, pair.Key)
values = append(values, pair.Value)
}
startKey = loc.EndKey
if len(startKey) == 0 {
break
}
}
return
}
func (c *Client) sendReq(key []byte, req *rpc.Request) (*rpc.Response, *locate.KeyLocation, error) {
bo := retry.NewBackoffer(context.Background(), retry.RawkvMaxBackoff)
sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient)

View File

@ -129,6 +129,26 @@ func (s *testRawKVSuite) mustScanRange(c *C, startKey string, endKey string, lim
}
}
func (s *testRawKVSuite) mustReverseScan(c *C, startKey []byte, limit int, expect ...string) {
keys, values, err := s.client.ReverseScan(startKey, nil, limit)
c.Assert(err, IsNil)
c.Assert(len(keys)*2, Equals, len(expect))
for i := range keys {
c.Assert(string(keys[i]), Equals, expect[i*2])
c.Assert(string(values[i]), Equals, expect[i*2+1])
}
}
func (s *testRawKVSuite) mustReverseScanRange(c *C, startKey, endKey []byte, limit int, expect ...string) {
keys, values, err := s.client.ReverseScan(startKey, endKey, limit)
c.Assert(err, IsNil)
c.Assert(len(keys)*2, Equals, len(expect))
for i := range keys {
c.Assert(string(keys[i]), Equals, expect[i*2])
c.Assert(string(values[i]), Equals, expect[i*2+1])
}
}
func (s *testRawKVSuite) mustDeleteRange(c *C, startKey, endKey []byte, expected map[string]string) {
err := s.client.DeleteRange(startKey, endKey)
c.Assert(err, IsNil)