diff --git a/integration_tests/tikv.toml b/integration_tests/tikv.toml index 8dc0b812..863c54ad 100644 --- a/integration_tests/tikv.toml +++ b/integration_tests/tikv.toml @@ -2,6 +2,7 @@ addr = "127.0.0.1:20160" [storage] reserve-space = "1MB" +enable-ttl = true [pd] endpoints = ["127.0.0.1:2379"] [rocksdb] diff --git a/integration_tests/ttl_test.go b/integration_tests/ttl_test.go new file mode 100644 index 00000000..70db771b --- /dev/null +++ b/integration_tests/ttl_test.go @@ -0,0 +1,94 @@ +// Copyright 2021 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tikv_test + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/tikv/client-go/v2/config" + "github.com/tikv/client-go/v2/rawkv" +) + +func TestTTL(t *testing.T) { + if !*withTiKV { + t.Skip("skipping TestTTL because with-tikv is not enabled") + } + suite.Run(t, new(ttlTestSuite)) +} + +type ttlTestSuite struct { + suite.Suite + client *rawkv.Client +} + +func (s *ttlTestSuite) SetupTest() { + addrs := strings.Split(*pdAddrs, ",") + client, err := rawkv.NewClient(context.TODO(), addrs, config.DefaultConfig().Security) + require.Nil(s.T(), err) + s.client = client +} + +func (s *ttlTestSuite) mustPutWithTTL(key, value []byte, ttl uint64) { + err := s.client.PutWithTTL(context.TODO(), key, value, ttl) + s.Nil(err) +} + +func (s *ttlTestSuite) mustNotExist(key []byte) { + v, err := s.client.Get(context.TODO(), key) + s.Nil(err) + s.Nil(v) +} + +func (s *ttlTestSuite) mustGetKeyTTL(key []byte) *uint64 { + ttl, err := s.client.GetKeyTTL(context.TODO(), key) + s.Nil(err) + return ttl +} + +// TODO: we may mock this feature in unistore. +func (s *ttlTestSuite) TestPutWithTTL() { + key := []byte("test-put-with-ttl") + value := []byte("value") + var ttl uint64 = 1 + s.mustPutWithTTL(key, value, ttl) + time.Sleep(time.Second * time.Duration(ttl*2)) + s.mustNotExist(key) +} + +func (s *ttlTestSuite) TestGetKeyTTL() { + key := []byte("test-get-key-ttl") + value := []byte("value") + var ttl uint64 = 2 + s.mustPutWithTTL(key, value, ttl) + time.Sleep(time.Second * time.Duration(ttl/2)) + + rest := s.mustGetKeyTTL(key) + s.NotNil(rest) + s.LessOrEqual(*rest, ttl/2) + + time.Sleep(time.Second * time.Duration(ttl/2)) + s.mustNotExist(key) + + rest = s.mustGetKeyTTL(key) + s.Nil(rest) +} + +func (s *ttlTestSuite) TearDownTest() { + s.client.Close() +} diff --git a/rawkv/rawkv.go b/rawkv/rawkv.go index e93218d5..6a4ff8be 100644 --- a/rawkv/rawkv.go +++ b/rawkv/rawkv.go @@ -172,8 +172,8 @@ func (c *Client) BatchGet(ctx context.Context, keys [][]byte) ([][]byte, error) return values, nil } -// Put stores a key-value pair to TiKV. -func (c *Client) Put(ctx context.Context, key, value []byte) error { +// PutWithTTL stores a key-value pair to TiKV with a time-to-live duration. +func (c *Client) PutWithTTL(ctx context.Context, key, value []byte, ttl uint64) error { start := time.Now() defer func() { metrics.RawkvCmdHistogramWithBatchPut.Observe(time.Since(start).Seconds()) }() metrics.RawkvSizeHistogramWithKey.Observe(float64(len(key))) @@ -186,6 +186,7 @@ func (c *Client) Put(ctx context.Context, key, value []byte) error { req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: key, Value: value, + Ttl: ttl, ForCas: c.atomic, }) resp, _, err := c.sendReq(ctx, key, req, false) @@ -202,6 +203,40 @@ func (c *Client) Put(ctx context.Context, key, value []byte) error { return nil } +// GetKeyTTL get the TTL of a raw key from TiKV if key exists +func (c *Client) GetKeyTTL(ctx context.Context, key []byte) (*uint64, error) { + var ttl uint64 + metrics.RawkvSizeHistogramWithKey.Observe(float64(len(key))) + req := tikvrpc.NewRequest(tikvrpc.CmdGetKeyTTL, &kvrpcpb.RawGetKeyTTLRequest{ + Key: key, + }) + resp, _, err := c.sendReq(ctx, key, req, false) + + if err != nil { + return nil, errors.Trace(err) + } + if resp.Resp == nil { + return nil, errors.Trace(tikverr.ErrBodyMissing) + } + + cmdResp := resp.Resp.(*kvrpcpb.RawGetKeyTTLResponse) + if cmdResp.GetError() != "" { + return nil, errors.New(cmdResp.GetError()) + } + + if cmdResp.GetNotFound() { + return nil, nil + } + + ttl = cmdResp.GetTtl() + return &ttl, nil +} + +// Put stores a key-value pair to TiKV. +func (c *Client) Put(ctx context.Context, key, value []byte) error { + return c.PutWithTTL(ctx, key, value, 0) +} + // BatchPut stores key-value pairs to TiKV. func (c *Client) BatchPut(ctx context.Context, keys, values [][]byte) error { start := time.Now() diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index bce40c03..fdcfb755 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -79,6 +79,7 @@ const ( CmdRawBatchDelete CmdRawDeleteRange CmdRawScan + CmdGetKeyTTL CmdRawCompareAndSwap CmdUnsafeDestroyRange @@ -362,6 +363,11 @@ func (req *Request) UnsafeDestroyRange() *kvrpcpb.UnsafeDestroyRangeRequest { return req.Req.(*kvrpcpb.UnsafeDestroyRangeRequest) } +// RawGetKeyTTL returns RawGetKeyTTLRequest in request. +func (req *Request) RawGetKeyTTL() *kvrpcpb.RawGetKeyTTLRequest { + return req.Req.(*kvrpcpb.RawGetKeyTTLRequest) +} + // RawCompareAndSwap returns RawCASRequest in request. func (req *Request) RawCompareAndSwap() *kvrpcpb.RawCASRequest { return req.Req.(*kvrpcpb.RawCASRequest) @@ -683,6 +689,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { req.RawScan().Context = ctx case CmdUnsafeDestroyRange: req.UnsafeDestroyRange().Context = ctx + case CmdGetKeyTTL: + req.RawGetKeyTTL().Context = ctx case CmdRawCompareAndSwap: req.RawCompareAndSwap().Context = ctx case CmdRegisterLockObserver: @@ -815,6 +823,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) { p = &kvrpcpb.UnsafeDestroyRangeResponse{ RegionError: e, } + case CmdGetKeyTTL: + p = &kvrpcpb.RawGetKeyTTLResponse{ + RegionError: e, + } case CmdRawCompareAndSwap: p = &kvrpcpb.RawCASResponse{ RegionError: e, @@ -931,6 +943,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp resp.Resp, err = client.RawScan(ctx, req.RawScan()) case CmdUnsafeDestroyRange: resp.Resp, err = client.UnsafeDestroyRange(ctx, req.UnsafeDestroyRange()) + case CmdGetKeyTTL: + resp.Resp, err = client.RawGetKeyTTL(ctx, req.RawGetKeyTTL()) case CmdRawCompareAndSwap: resp.Resp, err = client.RawCompareAndSwap(ctx, req.RawCompareAndSwap()) case CmdRegisterLockObserver: