mirror of https://github.com/tikv/client-go.git
463 lines
15 KiB
Go
463 lines
15 KiB
Go
// Copyright 2024 TiKV Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package unionstore
|
|
|
|
import (
|
|
"context"
|
|
"strconv"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/stretchr/testify/require"
|
|
tikverr "github.com/tikv/client-go/v2/error"
|
|
"github.com/tikv/client-go/v2/util"
|
|
)
|
|
|
|
func emptyBufferBatchGetter(ctx context.Context, keys [][]byte) (map[string][]byte, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func TestPipelinedFlushTrigger(t *testing.T) {
|
|
// because memdb's memory usage is hard to control, we use a cargo-culted value here.
|
|
avgKeySize := int(MinFlushMemSize/MinFlushKeys) / 3
|
|
|
|
// block the flush goroutine for checking the flushingMemDB status.
|
|
blockCh := make(chan struct{})
|
|
// Will not flush when keys number >= MinFlushKeys and size < MinFlushMemSize
|
|
memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
|
|
<-blockCh
|
|
return nil
|
|
})
|
|
for i := 0; i < MinFlushKeys; i++ {
|
|
key := []byte(strconv.Itoa(i))
|
|
value := make([]byte, avgKeySize-len(key)-1)
|
|
// (key + value) * MinFlushKeys < MinFlushMemSize
|
|
memdb.Set(key, value)
|
|
flushed, err := memdb.Flush(false)
|
|
require.False(t, flushed)
|
|
require.Nil(t, err)
|
|
require.False(t, memdb.OnFlushing())
|
|
}
|
|
require.Equal(t, memdb.memDB.Len(), MinFlushKeys)
|
|
require.Less(t, memdb.memDB.Mem(), MinFlushMemSize)
|
|
|
|
// Will not flush when keys number < MinFlushKeys and size >= MinFlushMemSize
|
|
avgKeySize = int(MinFlushMemSize/MinFlushKeys) / 2
|
|
memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
|
|
<-blockCh
|
|
return nil
|
|
})
|
|
for i := 0; i < MinFlushKeys-1; i++ {
|
|
key := []byte(strconv.Itoa(i))
|
|
value := make([]byte, avgKeySize*2-len(key)+1)
|
|
// (key + value) * (MinFLushKeys - 1) > MinFlushMemSize
|
|
memdb.Set(key, value)
|
|
flushed, err := memdb.Flush(false)
|
|
require.False(t, flushed)
|
|
require.Nil(t, err)
|
|
require.False(t, memdb.OnFlushing())
|
|
}
|
|
require.Less(t, memdb.memDB.Len(), MinFlushKeys)
|
|
require.Greater(t, memdb.memDB.Mem(), MinFlushMemSize)
|
|
require.Less(t, memdb.memDB.Mem(), ForceFlushMemSizeThreshold)
|
|
|
|
// Flush when keys number >= MinFlushKeys and mem size >= MinFlushMemSize
|
|
memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
|
|
<-blockCh
|
|
return nil
|
|
})
|
|
for i := 0; i < MinFlushKeys; i++ {
|
|
key := []byte(strconv.Itoa(i))
|
|
value := make([]byte, avgKeySize*2-len(key)+1) // (key + value) * MinFLushKeys > MinFlushKeys
|
|
memdb.Set(key, value)
|
|
flushed, err := memdb.Flush(false)
|
|
require.Nil(t, err)
|
|
if i == MinFlushKeys-1 {
|
|
require.True(t, flushed)
|
|
require.True(t, memdb.OnFlushing())
|
|
} else {
|
|
require.False(t, flushed)
|
|
require.False(t, memdb.OnFlushing())
|
|
}
|
|
}
|
|
require.Equal(t, memdb.memDB.Len(), 0)
|
|
require.Equal(t, memdb.memDB.Size(), 0)
|
|
// the flushingMemDB length and size should be added to the total length and size.
|
|
require.Equal(t, memdb.Len(), MinFlushKeys)
|
|
require.Equal(t, memdb.Size(), memdb.flushingMemDB.Size())
|
|
close(blockCh)
|
|
require.Nil(t, memdb.FlushWait())
|
|
}
|
|
|
|
func TestPipelinedFlushSkip(t *testing.T) {
|
|
blockCh := make(chan struct{})
|
|
memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
|
|
<-blockCh
|
|
return nil
|
|
})
|
|
for i := 0; i < MinFlushKeys; i++ {
|
|
key := []byte(strconv.Itoa(i))
|
|
value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
|
|
memdb.Set(key, value)
|
|
}
|
|
flushed, err := memdb.Flush(false)
|
|
require.True(t, flushed)
|
|
require.Nil(t, err)
|
|
require.True(t, memdb.OnFlushing())
|
|
require.Equal(t, memdb.memDB.Len(), 0)
|
|
require.Equal(t, memdb.memDB.Size(), 0)
|
|
for i := 0; i < MinFlushKeys; i++ {
|
|
key := []byte(strconv.Itoa(MinFlushKeys + i))
|
|
value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
|
|
memdb.Set(key, value)
|
|
}
|
|
flushed, err = memdb.Flush(false)
|
|
require.Nil(t, err)
|
|
// flush is skipped because there is an ongoing flush.
|
|
require.False(t, flushed)
|
|
require.Equal(t, memdb.memDB.Len(), MinFlushKeys)
|
|
close(blockCh)
|
|
require.Nil(t, memdb.FlushWait())
|
|
// can flush when the ongoing flush is done.
|
|
flushed, err = memdb.Flush(false)
|
|
require.True(t, flushed)
|
|
require.Nil(t, err)
|
|
require.Equal(t, memdb.memDB.Len(), 0)
|
|
require.Equal(t, memdb.len, 2*MinFlushKeys)
|
|
require.Nil(t, memdb.FlushWait())
|
|
}
|
|
|
|
func TestPipelinedFlushBlock(t *testing.T) {
|
|
blockCh := make(chan struct{})
|
|
memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
|
|
<-blockCh
|
|
return nil
|
|
})
|
|
for i := 0; i < MinFlushKeys; i++ {
|
|
key := []byte(strconv.Itoa(i))
|
|
value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
|
|
memdb.Set(key, value)
|
|
}
|
|
flushed, err := memdb.Flush(false)
|
|
require.Nil(t, err)
|
|
require.True(t, flushed)
|
|
require.True(t, memdb.OnFlushing())
|
|
require.Equal(t, memdb.memDB.Len(), 0)
|
|
require.Equal(t, memdb.memDB.Size(), 0)
|
|
|
|
// When size of memdb is greater than ForceFlushMemSizeThreshold, Flush will be blocked.
|
|
for i := 0; i < MinFlushKeys-1; i++ {
|
|
key := []byte(strconv.Itoa(MinFlushKeys + i))
|
|
value := make([]byte, int(ForceFlushMemSizeThreshold/(MinFlushKeys-1))-len(key)+1)
|
|
memdb.Set(key, value)
|
|
}
|
|
require.Greater(t, memdb.memDB.Mem(), ForceFlushMemSizeThreshold)
|
|
flushReturned := make(chan struct{})
|
|
oneSec := time.After(time.Second)
|
|
go func() {
|
|
flushed, err := memdb.Flush(false)
|
|
require.Nil(t, err)
|
|
require.True(t, flushed)
|
|
close(flushReturned)
|
|
}()
|
|
select {
|
|
case <-flushReturned:
|
|
require.Fail(t, "Flush should be blocked")
|
|
case <-oneSec:
|
|
}
|
|
require.True(t, memdb.OnFlushing())
|
|
blockCh <- struct{}{} // first flush done
|
|
<-flushReturned // second flush start
|
|
require.True(t, memdb.OnFlushing())
|
|
close(blockCh)
|
|
require.Nil(t, memdb.FlushWait())
|
|
}
|
|
|
|
func TestPipelinedFlushGet(t *testing.T) {
|
|
blockCh := make(chan struct{})
|
|
memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
|
|
<-blockCh
|
|
return nil
|
|
})
|
|
memdb.Set([]byte("key"), []byte("value"))
|
|
for i := 0; i < MinFlushKeys; i++ {
|
|
key := []byte(strconv.Itoa(i))
|
|
value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
|
|
memdb.Set(key, value)
|
|
}
|
|
value, err := memdb.Get(context.Background(), []byte("key"))
|
|
require.Nil(t, err)
|
|
require.Equal(t, value, []byte("value"))
|
|
flushed, err := memdb.Flush(false)
|
|
require.True(t, flushed)
|
|
require.Nil(t, err)
|
|
require.True(t, memdb.OnFlushing())
|
|
|
|
// The key is in flushingMemDB memdb instead of current mutable memdb.
|
|
_, err = memdb.memDB.Get(context.Background(), []byte("key"))
|
|
require.True(t, tikverr.IsErrNotFound(err))
|
|
// But we still can get the value by PipelinedMemDB.Get.
|
|
value, err = memdb.Get(context.Background(), []byte("key"))
|
|
require.Nil(t, err)
|
|
require.Equal(t, value, []byte("value"))
|
|
|
|
// finish the first flush
|
|
blockCh <- struct{}{}
|
|
for i := 0; i < MinFlushKeys; i++ {
|
|
key := []byte(strconv.Itoa(i))
|
|
value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
|
|
memdb.Set(key, value)
|
|
}
|
|
flushed, err = memdb.Flush(false)
|
|
require.Nil(t, err)
|
|
require.True(t, flushed)
|
|
require.True(t, memdb.OnFlushing())
|
|
|
|
// now the key is guaranteed to be flushed into stores, though PipelinedMemDB.Get does not see it, snapshot get should get it.
|
|
_, err = memdb.Get(context.Background(), []byte("key"))
|
|
require.True(t, tikverr.IsErrNotFound(err))
|
|
close(blockCh)
|
|
require.Nil(t, memdb.FlushWait())
|
|
}
|
|
|
|
func TestPipelinedFlushSize(t *testing.T) {
|
|
memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
|
|
return nil
|
|
})
|
|
size := 0
|
|
keys := 0
|
|
for i := 0; i < MinFlushKeys; i++ {
|
|
key := []byte(strconv.Itoa(i))
|
|
value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
|
|
keys++
|
|
size += len(key) + len(value)
|
|
memdb.Set(key, value)
|
|
require.Equal(t, memdb.Len(), keys)
|
|
require.Equal(t, memdb.Size(), size)
|
|
}
|
|
// keys & size should be accumulated into PipelinedMemDB.
|
|
flushed, err := memdb.Flush(false)
|
|
require.Nil(t, err)
|
|
require.True(t, flushed)
|
|
require.Equal(t, memdb.memDB.Len(), 0)
|
|
require.Equal(t, memdb.memDB.Size(), 0)
|
|
require.Equal(t, memdb.Len(), keys)
|
|
require.Equal(t, memdb.Size(), size)
|
|
|
|
for i := 0; i < MinFlushKeys; i++ {
|
|
key := []byte(strconv.Itoa(MinFlushKeys + i))
|
|
value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
|
|
keys++
|
|
size += len(key) + len(value)
|
|
memdb.Set(key, value)
|
|
require.Equal(t, memdb.Len(), keys)
|
|
require.Equal(t, memdb.Size(), size)
|
|
}
|
|
require.Equal(t, memdb.Len(), keys)
|
|
require.Equal(t, memdb.Size(), size)
|
|
// with final flush, keys & size should not be changed.
|
|
flushed, err = memdb.Flush(true)
|
|
require.Nil(t, err)
|
|
require.True(t, flushed)
|
|
require.Equal(t, memdb.Len(), keys)
|
|
require.Equal(t, memdb.Size(), size)
|
|
require.Nil(t, memdb.FlushWait())
|
|
}
|
|
|
|
func TestPipelinedFlushGeneration(t *testing.T) {
|
|
generationCh := make(chan uint64)
|
|
memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(generation uint64, db *MemDB) error {
|
|
generationCh <- generation
|
|
return nil
|
|
})
|
|
for i := 0; i < 100; i++ {
|
|
memdb.Set([]byte{uint8(i)}, []byte{uint8(i)})
|
|
memdb.Flush(true)
|
|
// generation start from 1
|
|
require.Equal(t, <-generationCh, uint64(i+1))
|
|
}
|
|
require.Nil(t, memdb.FlushWait())
|
|
}
|
|
|
|
func TestErrorIterator(t *testing.T) {
|
|
iteratorToErr := func(iter Iterator) {
|
|
for iter.Valid() {
|
|
err := iter.Next()
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
t.Log("iterator does not return error")
|
|
t.Fail()
|
|
}
|
|
|
|
memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
|
|
return nil
|
|
})
|
|
iteratorToErr(memdb.SnapshotIter(nil, nil))
|
|
iteratorToErr(memdb.SnapshotIterReverse(nil, nil))
|
|
}
|
|
|
|
func TestPipelinedAdjustFlushCondition(t *testing.T) {
|
|
util.EnableFailpoints()
|
|
memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { return nil })
|
|
memdb.Set([]byte("key"), []byte("value"))
|
|
flushed, err := memdb.Flush(false)
|
|
require.Nil(t, err)
|
|
require.False(t, flushed)
|
|
|
|
// can flush even only 1 key
|
|
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushKeys", `return(1)`))
|
|
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushSize", `return(1)`))
|
|
memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { return nil })
|
|
memdb.Set([]byte("key"), []byte("value"))
|
|
flushed, err = memdb.Flush(false)
|
|
require.Nil(t, err)
|
|
require.True(t, flushed)
|
|
require.Nil(t, memdb.FlushWait())
|
|
|
|
// need 2 keys to flush
|
|
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushKeys", `return(2)`))
|
|
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushSize", `return(1)`))
|
|
memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { return nil })
|
|
memdb.Set([]byte("key"), []byte("value"))
|
|
flushed, err = memdb.Flush(false)
|
|
require.Nil(t, err)
|
|
require.False(t, flushed)
|
|
require.Nil(t, memdb.FlushWait())
|
|
|
|
// need 2 keys to flush, but force threshold reached
|
|
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushKeys", `return(2)`))
|
|
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushSize", `return(1)`))
|
|
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBForceFlushSizeThreshold", `return(2)`))
|
|
memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { return nil })
|
|
memdb.Set([]byte("key"), []byte("value"))
|
|
flushed, err = memdb.Flush(false)
|
|
require.Nil(t, err)
|
|
require.True(t, flushed)
|
|
|
|
require.Nil(t, failpoint.Disable("tikvclient/pipelinedMemDBMinFlushKeys"))
|
|
require.Nil(t, failpoint.Disable("tikvclient/pipelinedMemDBMinFlushSize"))
|
|
require.Nil(t, failpoint.Disable("tikvclient/pipelinedMemDBForceFlushSizeThreshold"))
|
|
require.Nil(t, memdb.FlushWait())
|
|
}
|
|
|
|
func TestMemBufferBatchGetCache(t *testing.T) {
|
|
util.EnableFailpoints()
|
|
flushDone := make(chan struct{})
|
|
var remoteMutex sync.RWMutex
|
|
remoteBuffer := make(map[string][]byte, 16)
|
|
pipelinedMemdb := NewPipelinedMemDB(func(_ context.Context, keys [][]byte) (map[string][]byte, error) {
|
|
remoteMutex.RLock()
|
|
defer remoteMutex.RUnlock()
|
|
m := make(map[string][]byte, len(keys))
|
|
for _, k := range keys {
|
|
if val, ok := remoteBuffer[string(k)]; ok {
|
|
m[string(k)] = val
|
|
}
|
|
}
|
|
return m, nil
|
|
}, func(_ uint64, db *MemDB) error {
|
|
remoteMutex.Lock()
|
|
defer remoteMutex.Unlock()
|
|
for it, _ := db.Iter(nil, nil); it.Valid(); it.Next() {
|
|
remoteBuffer[string(it.Key())] = it.Value()
|
|
}
|
|
flushDone <- struct{}{}
|
|
return nil
|
|
})
|
|
|
|
mustGetFromCache := func(key []byte) ([]byte, error) {
|
|
require.NotNil(t, pipelinedMemdb.batchGetCache)
|
|
v, ok := pipelinedMemdb.batchGetCache[string(key)]
|
|
require.True(t, ok)
|
|
inner := v.Inner()
|
|
if inner == nil {
|
|
return nil, tikverr.ErrNotExist
|
|
}
|
|
return *inner, nil
|
|
}
|
|
mustNotExistInCache := func(key []byte) {
|
|
require.NotNil(t, pipelinedMemdb.batchGetCache)
|
|
_, ok := pipelinedMemdb.batchGetCache[string(key)]
|
|
require.False(t, ok)
|
|
}
|
|
mustFlush := func() {
|
|
flushed, err := pipelinedMemdb.Flush(true)
|
|
require.Nil(t, err)
|
|
require.True(t, flushed)
|
|
<-flushDone
|
|
}
|
|
|
|
err := pipelinedMemdb.Set([]byte("k1"), []byte("v11"))
|
|
require.Nil(t, err)
|
|
mustFlush()
|
|
err = pipelinedMemdb.Set([]byte("k2"), []byte("v21"))
|
|
require.Nil(t, err)
|
|
mustFlush()
|
|
// memdb: [], flushing memdb: [k2 -> v21], remoteBuffer: [k1 -> v11, k2 -> v21]
|
|
_, err = pipelinedMemdb.GetLocal(context.Background(), []byte("k1"))
|
|
require.Error(t, err)
|
|
require.True(t, tikverr.IsErrNotFound(err))
|
|
v, err := pipelinedMemdb.GetLocal(context.Background(), []byte("k2"))
|
|
require.Nil(t, err)
|
|
require.Equal(t, v, []byte("v21"))
|
|
// batch get caches the result
|
|
// cache: [k1 -> v11, k2 -> v21]
|
|
m, err := pipelinedMemdb.BatchGet(context.Background(), [][]byte{[]byte("k1"), []byte("k2")})
|
|
require.Nil(t, err)
|
|
require.Equal(t, m, map[string][]byte{"k1": []byte("v11"), "k2": []byte("v21")})
|
|
v, err = mustGetFromCache([]byte("k1"))
|
|
require.Nil(t, err)
|
|
require.Equal(t, v, []byte("v11"))
|
|
v, err = mustGetFromCache([]byte("k2"))
|
|
require.Nil(t, err)
|
|
require.Equal(t, v, []byte("v21"))
|
|
mustNotExistInCache([]byte("k3"))
|
|
// cache: [k1 -> v11, k2 -> v21, k3 -> not exist]
|
|
m, err = pipelinedMemdb.BatchGet(context.Background(), [][]byte{[]byte("k3")})
|
|
require.Nil(t, err)
|
|
require.Len(t, m, 0)
|
|
_, err = mustGetFromCache([]byte("k3"))
|
|
require.Error(t, err)
|
|
require.True(t, tikverr.IsErrNotFound(err))
|
|
|
|
// memdb: [], flushing memdb: [k1 -> [], k3 -> []], remoteBuffer: [k1 -> [], k2 -> v21, k3 -> []]
|
|
pipelinedMemdb.Delete([]byte("k1"))
|
|
pipelinedMemdb.Set([]byte("k2"), []byte("v22"))
|
|
pipelinedMemdb.Delete([]byte("k3"))
|
|
mustFlush()
|
|
require.Nil(t, pipelinedMemdb.batchGetCache)
|
|
// cache: [k1 -> [], k2 -> v22, k3 -> [], k4 -> not exist]
|
|
m, err = pipelinedMemdb.BatchGet(context.Background(), [][]byte{[]byte("k1"), []byte("k2"), []byte("k3"), []byte("k4")})
|
|
require.Nil(t, err)
|
|
require.Equal(t, m, map[string][]byte{"k1": {}, "k2": []byte("v22"), "k3": {}})
|
|
v, err = mustGetFromCache([]byte("k1"))
|
|
require.Nil(t, err)
|
|
require.Len(t, v, 0)
|
|
v, err = mustGetFromCache([]byte("k2"))
|
|
require.Nil(t, err)
|
|
require.Equal(t, v, []byte("v22"))
|
|
v, err = mustGetFromCache([]byte("k3"))
|
|
require.Nil(t, err)
|
|
require.Len(t, v, 0)
|
|
_, err = mustGetFromCache([]byte("k4"))
|
|
require.Error(t, err)
|
|
require.True(t, tikverr.IsErrNotFound(err))
|
|
require.Nil(t, pipelinedMemdb.FlushWait())
|
|
}
|