rawkv: Added back `ScanOption` from `v1` but with a more standard Golang pattern (#402)

* rawkv: Added back `ScanOption` from `v1` but with a more standard Golang pattern

This enables specifying:
- KeyOnly
- Cf

#### Notes

This could also be used to remove the `ReverseScan` version and `ScanReverse` option could be used. It could be argue that `ReverseScan` is more discoverable. Happy to remove if it's decided that `ScanReverse` is discoverable enough.

Signed-off-by: Matthieu Vachon <matt@streamingfast.io>

* Fixed linting issues

Signed-off-by: Matthieu Vachon <matt@streamingfast.io>

Co-authored-by: Matthieu Vachon <matt@streamingfast.io>
This commit is contained in:
Matthieu Vachon 2022-01-06 21:06:34 -05:00 committed by GitHub
parent 13298f12fb
commit c24a5ba915
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 53 additions and 2 deletions

View File

@ -66,6 +66,43 @@ const (
rawBatchPairCount = 512
)
type scanOptions struct {
KeyOnly bool
ColumnFamily string
}
// ScanOption represents possible scan options that can be cotrolled by the user
// to tweak the scanning behavior.
//
// Available options are:
// - ScanKeyOnly
// - ScanColumnFamily
type ScanOption interface {
apply(opts *scanOptions)
}
type scanOptionFunc func(opts *scanOptions)
func (f scanOptionFunc) apply(opts *scanOptions) {
f(opts)
}
// ScanKeyOnly is a ScanOption that tells the scanner to only returns
// keys and omit the values.
func ScanKeyOnly() ScanOption {
return scanOptionFunc(func(opts *scanOptions) {
opts.KeyOnly = true
})
}
// ScanColumnFamily is a ScanOption that tells the scanner to only returns
// the following column family elements.
func ScanColumnFamily(columnfamily string) ScanOption {
return scanOptionFunc(func(opts *scanOptions) {
opts.ColumnFamily = columnfamily
})
}
// Client is a client of TiKV server which is used as a key-value storage,
// only GET/PUT/DELETE commands are supported.
type Client struct {
@ -351,7 +388,7 @@ func (c *Client) DeleteRange(ctx context.Context, startKey []byte, endKey []byte
// If you want to exclude the startKey or include the endKey, push a '\0' to the key. For example, to scan
// (startKey, endKey], you can write:
// `Scan(ctx, push(startKey, '\0'), push(endKey, '\0'), limit)`.
func (c *Client) Scan(ctx context.Context, startKey, endKey []byte, limit int) (keys [][]byte, values [][]byte, err error) {
func (c *Client) Scan(ctx context.Context, startKey, endKey []byte, limit int, options ...ScanOption) (keys [][]byte, values [][]byte, err error) {
start := time.Now()
defer func() { metrics.RawkvCmdHistogramWithRawScan.Observe(time.Since(start).Seconds()) }()
@ -359,11 +396,18 @@ func (c *Client) Scan(ctx context.Context, startKey, endKey []byte, limit int) (
return nil, nil, errors.WithStack(ErrMaxScanLimitExceeded)
}
opts := scanOptions{}
for _, opt := range options {
opt.apply(&opts)
}
for len(keys) < limit && (len(endKey) == 0 || bytes.Compare(startKey, endKey) < 0) {
req := tikvrpc.NewRequest(tikvrpc.CmdRawScan, &kvrpcpb.RawScanRequest{
StartKey: startKey,
EndKey: endKey,
Limit: uint32(limit - len(keys)),
KeyOnly: opts.KeyOnly,
Cf: opts.ColumnFamily,
})
resp, loc, err := c.sendReq(ctx, startKey, req, false)
if err != nil {
@ -392,7 +436,7 @@ func (c *Client) Scan(ctx context.Context, startKey, endKey []byte, limit int) (
// (endKey, startKey], you can write:
// `ReverseScan(ctx, push(startKey, '\0'), push(endKey, '\0'), limit)`.
// It doesn't support Scanning from "", because locating the last Region is not yet implemented.
func (c *Client) ReverseScan(ctx context.Context, startKey, endKey []byte, limit int) (keys [][]byte, values [][]byte, err error) {
func (c *Client) ReverseScan(ctx context.Context, startKey, endKey []byte, limit int, options ...ScanOption) (keys [][]byte, values [][]byte, err error) {
start := time.Now()
defer func() {
metrics.RawkvCmdHistogramWithRawReversScan.Observe(time.Since(start).Seconds())
@ -402,12 +446,19 @@ func (c *Client) ReverseScan(ctx context.Context, startKey, endKey []byte, limit
return nil, nil, errors.WithStack(ErrMaxScanLimitExceeded)
}
opts := scanOptions{}
for _, opt := range options {
opt.apply(&opts)
}
for len(keys) < limit && bytes.Compare(startKey, endKey) > 0 {
req := tikvrpc.NewRequest(tikvrpc.CmdRawScan, &kvrpcpb.RawScanRequest{
StartKey: startKey,
EndKey: endKey,
Limit: uint32(limit - len(keys)),
Reverse: true,
KeyOnly: opts.KeyOnly,
Cf: opts.ColumnFamily,
})
resp, loc, err := c.sendReq(ctx, startKey, req, true)
if err != nil {