mirror of https://github.com/tikv/client-go.git
add raw key ttl feature (#222)
Signed-off-by: iosmanthus <myosmanthustree@gmail.com>
This commit is contained in:
parent
654864ded8
commit
f3e7dc3042
|
|
@ -2,6 +2,7 @@
|
|||
addr = "127.0.0.1:20160"
|
||||
[storage]
|
||||
reserve-space = "1MB"
|
||||
enable-ttl = true
|
||||
[pd]
|
||||
endpoints = ["127.0.0.1:2379"]
|
||||
[rocksdb]
|
||||
|
|
|
|||
|
|
@ -0,0 +1,94 @@
|
|||
// Copyright 2021 TiKV Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tikv_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"github.com/tikv/client-go/v2/config"
|
||||
"github.com/tikv/client-go/v2/rawkv"
|
||||
)
|
||||
|
||||
func TestTTL(t *testing.T) {
|
||||
if !*withTiKV {
|
||||
t.Skip("skipping TestTTL because with-tikv is not enabled")
|
||||
}
|
||||
suite.Run(t, new(ttlTestSuite))
|
||||
}
|
||||
|
||||
type ttlTestSuite struct {
|
||||
suite.Suite
|
||||
client *rawkv.Client
|
||||
}
|
||||
|
||||
func (s *ttlTestSuite) SetupTest() {
|
||||
addrs := strings.Split(*pdAddrs, ",")
|
||||
client, err := rawkv.NewClient(context.TODO(), addrs, config.DefaultConfig().Security)
|
||||
require.Nil(s.T(), err)
|
||||
s.client = client
|
||||
}
|
||||
|
||||
func (s *ttlTestSuite) mustPutWithTTL(key, value []byte, ttl uint64) {
|
||||
err := s.client.PutWithTTL(context.TODO(), key, value, ttl)
|
||||
s.Nil(err)
|
||||
}
|
||||
|
||||
func (s *ttlTestSuite) mustNotExist(key []byte) {
|
||||
v, err := s.client.Get(context.TODO(), key)
|
||||
s.Nil(err)
|
||||
s.Nil(v)
|
||||
}
|
||||
|
||||
func (s *ttlTestSuite) mustGetKeyTTL(key []byte) *uint64 {
|
||||
ttl, err := s.client.GetKeyTTL(context.TODO(), key)
|
||||
s.Nil(err)
|
||||
return ttl
|
||||
}
|
||||
|
||||
// TODO: we may mock this feature in unistore.
|
||||
func (s *ttlTestSuite) TestPutWithTTL() {
|
||||
key := []byte("test-put-with-ttl")
|
||||
value := []byte("value")
|
||||
var ttl uint64 = 1
|
||||
s.mustPutWithTTL(key, value, ttl)
|
||||
time.Sleep(time.Second * time.Duration(ttl*2))
|
||||
s.mustNotExist(key)
|
||||
}
|
||||
|
||||
func (s *ttlTestSuite) TestGetKeyTTL() {
|
||||
key := []byte("test-get-key-ttl")
|
||||
value := []byte("value")
|
||||
var ttl uint64 = 2
|
||||
s.mustPutWithTTL(key, value, ttl)
|
||||
time.Sleep(time.Second * time.Duration(ttl/2))
|
||||
|
||||
rest := s.mustGetKeyTTL(key)
|
||||
s.NotNil(rest)
|
||||
s.LessOrEqual(*rest, ttl/2)
|
||||
|
||||
time.Sleep(time.Second * time.Duration(ttl/2))
|
||||
s.mustNotExist(key)
|
||||
|
||||
rest = s.mustGetKeyTTL(key)
|
||||
s.Nil(rest)
|
||||
}
|
||||
|
||||
func (s *ttlTestSuite) TearDownTest() {
|
||||
s.client.Close()
|
||||
}
|
||||
|
|
@ -172,8 +172,8 @@ func (c *Client) BatchGet(ctx context.Context, keys [][]byte) ([][]byte, error)
|
|||
return values, nil
|
||||
}
|
||||
|
||||
// Put stores a key-value pair to TiKV.
|
||||
func (c *Client) Put(ctx context.Context, key, value []byte) error {
|
||||
// PutWithTTL stores a key-value pair to TiKV with a time-to-live duration.
|
||||
func (c *Client) PutWithTTL(ctx context.Context, key, value []byte, ttl uint64) error {
|
||||
start := time.Now()
|
||||
defer func() { metrics.RawkvCmdHistogramWithBatchPut.Observe(time.Since(start).Seconds()) }()
|
||||
metrics.RawkvSizeHistogramWithKey.Observe(float64(len(key)))
|
||||
|
|
@ -186,6 +186,7 @@ func (c *Client) Put(ctx context.Context, key, value []byte) error {
|
|||
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
|
||||
Key: key,
|
||||
Value: value,
|
||||
Ttl: ttl,
|
||||
ForCas: c.atomic,
|
||||
})
|
||||
resp, _, err := c.sendReq(ctx, key, req, false)
|
||||
|
|
@ -202,6 +203,40 @@ func (c *Client) Put(ctx context.Context, key, value []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// GetKeyTTL get the TTL of a raw key from TiKV if key exists
|
||||
func (c *Client) GetKeyTTL(ctx context.Context, key []byte) (*uint64, error) {
|
||||
var ttl uint64
|
||||
metrics.RawkvSizeHistogramWithKey.Observe(float64(len(key)))
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdGetKeyTTL, &kvrpcpb.RawGetKeyTTLRequest{
|
||||
Key: key,
|
||||
})
|
||||
resp, _, err := c.sendReq(ctx, key, req, false)
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
if resp.Resp == nil {
|
||||
return nil, errors.Trace(tikverr.ErrBodyMissing)
|
||||
}
|
||||
|
||||
cmdResp := resp.Resp.(*kvrpcpb.RawGetKeyTTLResponse)
|
||||
if cmdResp.GetError() != "" {
|
||||
return nil, errors.New(cmdResp.GetError())
|
||||
}
|
||||
|
||||
if cmdResp.GetNotFound() {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
ttl = cmdResp.GetTtl()
|
||||
return &ttl, nil
|
||||
}
|
||||
|
||||
// Put stores a key-value pair to TiKV.
|
||||
func (c *Client) Put(ctx context.Context, key, value []byte) error {
|
||||
return c.PutWithTTL(ctx, key, value, 0)
|
||||
}
|
||||
|
||||
// BatchPut stores key-value pairs to TiKV.
|
||||
func (c *Client) BatchPut(ctx context.Context, keys, values [][]byte) error {
|
||||
start := time.Now()
|
||||
|
|
|
|||
|
|
@ -79,6 +79,7 @@ const (
|
|||
CmdRawBatchDelete
|
||||
CmdRawDeleteRange
|
||||
CmdRawScan
|
||||
CmdGetKeyTTL
|
||||
CmdRawCompareAndSwap
|
||||
|
||||
CmdUnsafeDestroyRange
|
||||
|
|
@ -362,6 +363,11 @@ func (req *Request) UnsafeDestroyRange() *kvrpcpb.UnsafeDestroyRangeRequest {
|
|||
return req.Req.(*kvrpcpb.UnsafeDestroyRangeRequest)
|
||||
}
|
||||
|
||||
// RawGetKeyTTL returns RawGetKeyTTLRequest in request.
|
||||
func (req *Request) RawGetKeyTTL() *kvrpcpb.RawGetKeyTTLRequest {
|
||||
return req.Req.(*kvrpcpb.RawGetKeyTTLRequest)
|
||||
}
|
||||
|
||||
// RawCompareAndSwap returns RawCASRequest in request.
|
||||
func (req *Request) RawCompareAndSwap() *kvrpcpb.RawCASRequest {
|
||||
return req.Req.(*kvrpcpb.RawCASRequest)
|
||||
|
|
@ -683,6 +689,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error {
|
|||
req.RawScan().Context = ctx
|
||||
case CmdUnsafeDestroyRange:
|
||||
req.UnsafeDestroyRange().Context = ctx
|
||||
case CmdGetKeyTTL:
|
||||
req.RawGetKeyTTL().Context = ctx
|
||||
case CmdRawCompareAndSwap:
|
||||
req.RawCompareAndSwap().Context = ctx
|
||||
case CmdRegisterLockObserver:
|
||||
|
|
@ -815,6 +823,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) {
|
|||
p = &kvrpcpb.UnsafeDestroyRangeResponse{
|
||||
RegionError: e,
|
||||
}
|
||||
case CmdGetKeyTTL:
|
||||
p = &kvrpcpb.RawGetKeyTTLResponse{
|
||||
RegionError: e,
|
||||
}
|
||||
case CmdRawCompareAndSwap:
|
||||
p = &kvrpcpb.RawCASResponse{
|
||||
RegionError: e,
|
||||
|
|
@ -931,6 +943,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp
|
|||
resp.Resp, err = client.RawScan(ctx, req.RawScan())
|
||||
case CmdUnsafeDestroyRange:
|
||||
resp.Resp, err = client.UnsafeDestroyRange(ctx, req.UnsafeDestroyRange())
|
||||
case CmdGetKeyTTL:
|
||||
resp.Resp, err = client.RawGetKeyTTL(ctx, req.RawGetKeyTTL())
|
||||
case CmdRawCompareAndSwap:
|
||||
resp.Resp, err = client.RawCompareAndSwap(ctx, req.RawCompareAndSwap())
|
||||
case CmdRegisterLockObserver:
|
||||
|
|
|
|||
Loading…
Reference in New Issue