From 4f806a5442981d2cc98e0ff940f0e5dc1c86576b Mon Sep 17 00:00:00 2001 From: daimashusheng <89769163+daimashusheng@users.noreply.github.com> Date: Sun, 26 Sep 2021 19:50:08 +0800 Subject: [PATCH] batchput add ttl to every key (#298) Signed-off-by: tangjk --- go.mod | 2 +- go.sum | 4 ++-- integration_tests/go.mod | 2 +- integration_tests/go.sum | 4 ++-- integration_tests/raw/rawkv_test.go | 2 +- internal/kvrpc/batch.go | 11 ++++++++--- rawkv/rawkv.go | 20 ++++++++++++++------ 7 files changed, 29 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index 5696a7bf..382e6d75 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3 github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20210806074406-317f69fb54b4 + github.com/pingcap/kvproto v0.0.0-20210915062418-0f5764a128ad github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 github.com/pingcap/parser v0.0.0-20210525032559-c37778aff307 github.com/prometheus/client_golang v1.5.1 diff --git a/go.sum b/go.sum index ab24d3ad..4817af59 100644 --- a/go.sum +++ b/go.sum @@ -290,8 +290,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17Xtb github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20210806074406-317f69fb54b4 h1:4EUpHzPFHwleKkVALyMqQbQcNziPZvU+vhUT9Wzj93E= -github.com/pingcap/kvproto v0.0.0-20210806074406-317f69fb54b4/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20210915062418-0f5764a128ad h1:suBPTeuY6yVF7xvTGeTQ9+tiGzufnORJpCRwzbdN2sc= +github.com/pingcap/kvproto v0.0.0-20210915062418-0f5764a128ad/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/integration_tests/go.mod b/integration_tests/go.mod index dc05c176..1e13e2b9 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -6,7 +6,7 @@ require ( github.com/ninedraft/israce v0.0.3 github.com/pingcap/errors v0.11.5-0.20210425183316-da1aaba5fb63 github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd - github.com/pingcap/kvproto v0.0.0-20210806074406-317f69fb54b4 + github.com/pingcap/kvproto v0.0.0-20210915062418-0f5764a128ad github.com/pingcap/parser v0.0.0-20210728060616-75cff0c906d2 github.com/pingcap/tidb v1.1.0-beta.0.20210729073017-a27d306e65a0 github.com/stretchr/testify v1.7.0 diff --git a/integration_tests/go.sum b/integration_tests/go.sum index 356b436a..d0583895 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -421,8 +421,8 @@ github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210722091755-91a52cd9e8db/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20210806074406-317f69fb54b4 h1:4EUpHzPFHwleKkVALyMqQbQcNziPZvU+vhUT9Wzj93E= -github.com/pingcap/kvproto v0.0.0-20210806074406-317f69fb54b4/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20210915062418-0f5764a128ad h1:suBPTeuY6yVF7xvTGeTQ9+tiGzufnORJpCRwzbdN2sc= +github.com/pingcap/kvproto v0.0.0-20210915062418-0f5764a128ad/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/integration_tests/raw/rawkv_test.go b/integration_tests/raw/rawkv_test.go index 07e45b1a..06a02153 100644 --- a/integration_tests/raw/rawkv_test.go +++ b/integration_tests/raw/rawkv_test.go @@ -113,7 +113,7 @@ func (s *testRawKVSuite) mustPut(key, value []byte) { } func (s *testRawKVSuite) mustBatchPut(keys, values [][]byte) { - err := s.client.BatchPut(context.Background(), keys, values) + err := s.client.BatchPut(context.Background(), keys, values, nil) s.Nil(err) } diff --git a/internal/kvrpc/batch.go b/internal/kvrpc/batch.go index caa061df..2c90af40 100644 --- a/internal/kvrpc/batch.go +++ b/internal/kvrpc/batch.go @@ -24,6 +24,7 @@ type Batch struct { RegionID locate.RegionVerID Keys [][]byte Values [][]byte + TTLs []uint64 } // BatchResult wraps a Batch request's server response or an error. @@ -34,25 +35,29 @@ type BatchResult struct { // AppendBatches divides the mutation to be requested into Batches so that the size of each batch is // approximately the same as the given limit. -func AppendBatches(batches []Batch, regionID locate.RegionVerID, groupKeys [][]byte, keyToValue map[string][]byte, limit int) []Batch { +func AppendBatches(batches []Batch, regionID locate.RegionVerID, groupKeys [][]byte, keyToValue map[string][]byte, keyToTTL map[string]uint64, limit int) []Batch { var start, size int var keys, values [][]byte + var ttls []uint64 for start = 0; start < len(groupKeys); start++ { if size >= limit { - batches = append(batches, Batch{RegionID: regionID, Keys: keys, Values: values}) + batches = append(batches, Batch{RegionID: regionID, Keys: keys, Values: values, TTLs: ttls}) keys = make([][]byte, 0) values = make([][]byte, 0) + ttls = make([]uint64, 0) size = 0 } key := groupKeys[start] value := keyToValue[string(key)] + ttl := keyToTTL[string(key)] keys = append(keys, key) values = append(values, value) + ttls = append(ttls, ttl) size += len(key) size += len(value) } if len(keys) != 0 { - batches = append(batches, Batch{RegionID: regionID, Keys: keys, Values: values}) + batches = append(batches, Batch{RegionID: regionID, Keys: keys, Values: values, TTLs: ttls}) } return batches } diff --git a/rawkv/rawkv.go b/rawkv/rawkv.go index 50c853d5..c26e210a 100644 --- a/rawkv/rawkv.go +++ b/rawkv/rawkv.go @@ -240,7 +240,7 @@ func (c *Client) Put(ctx context.Context, key, value []byte) error { } // BatchPut stores key-value pairs to TiKV. -func (c *Client) BatchPut(ctx context.Context, keys, values [][]byte) error { +func (c *Client) BatchPut(ctx context.Context, keys, values [][]byte, ttls []uint64) error { start := time.Now() defer func() { metrics.RawkvCmdHistogramWithBatchPut.Observe(time.Since(start).Seconds()) @@ -249,13 +249,16 @@ func (c *Client) BatchPut(ctx context.Context, keys, values [][]byte) error { if len(keys) != len(values) { return errors.New("the len of keys is not equal to the len of values") } + if len(ttls) > 0 && len(keys) != len(ttls) { + return errors.New("the len of ttls is not equal to the len of values") + } for _, value := range values { if len(value) == 0 { return errors.New("empty value is not supported") } } bo := retry.NewBackofferWithVars(ctx, rawkvMaxBackoff, nil) - err := c.sendBatchPut(bo, keys, values) + err := c.sendBatchPut(bo, keys, values, ttls) return errors.Trace(err) } @@ -652,10 +655,14 @@ func (c *Client) sendDeleteRangeReq(ctx context.Context, startKey []byte, endKey } } -func (c *Client) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte) error { +func (c *Client) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte, ttls []uint64) error { keyToValue := make(map[string][]byte, len(keys)) + keyTottl := make(map[string]uint64, len(keys)) for i, key := range keys { keyToValue[string(key)] = values[i] + if len(ttls) > 0 { + keyTottl[string(key)] = ttls[i] + } } groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil) if err != nil { @@ -664,7 +671,7 @@ func (c *Client) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte) error var batches []kvrpc.Batch // split the keys by size and RegionVerID for regionID, groupKeys := range groups { - batches = kvrpc.AppendBatches(batches, regionID, groupKeys, keyToValue, rawBatchPutSize) + batches = kvrpc.AppendBatches(batches, regionID, groupKeys, keyToValue, keyTottl, rawBatchPutSize) } bo, cancel := bo.Fork() ch := make(chan error, len(batches)) @@ -695,7 +702,8 @@ func (c *Client) doBatchPut(bo *retry.Backoffer, batch kvrpc.Batch) error { kvPair = append(kvPair, &kvrpcpb.KvPair{Key: key, Value: batch.Values[i]}) } - req := tikvrpc.NewRequest(tikvrpc.CmdRawBatchPut, &kvrpcpb.RawBatchPutRequest{Pairs: kvPair, ForCas: c.atomic}) + req := tikvrpc.NewRequest(tikvrpc.CmdRawBatchPut, + &kvrpcpb.RawBatchPutRequest{Pairs: kvPair, ForCas: c.atomic, Ttls: batch.TTLs}) sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient) req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) @@ -713,7 +721,7 @@ func (c *Client) doBatchPut(bo *retry.Backoffer, batch kvrpc.Batch) error { return errors.Trace(err) } // recursive call - return c.sendBatchPut(bo, batch.Keys, batch.Values) + return c.sendBatchPut(bo, batch.Keys, batch.Values, batch.TTLs) } if resp.Resp == nil {