client-go/internal/unionstore/pipelined_memdb_test.go

463 lines
15 KiB
Go

// Copyright 2024 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package unionstore
import (
"context"
"strconv"
"sync"
"testing"
"time"
"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/util"
)
func emptyBufferBatchGetter(ctx context.Context, keys [][]byte) (map[string][]byte, error) {
return nil, nil
}
func TestPipelinedFlushTrigger(t *testing.T) {
// because memdb's memory usage is hard to control, we use a cargo-culted value here.
avgKeySize := int(MinFlushMemSize/MinFlushKeys) / 3
// block the flush goroutine for checking the flushingMemDB status.
blockCh := make(chan struct{})
// Will not flush when keys number >= MinFlushKeys and size < MinFlushMemSize
memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
<-blockCh
return nil
})
for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(i))
value := make([]byte, avgKeySize-len(key)-1)
// (key + value) * MinFlushKeys < MinFlushMemSize
memdb.Set(key, value)
flushed, err := memdb.Flush(false)
require.False(t, flushed)
require.Nil(t, err)
require.False(t, memdb.OnFlushing())
}
require.Equal(t, memdb.memDB.Len(), MinFlushKeys)
require.Less(t, memdb.memDB.Mem(), MinFlushMemSize)
// Will not flush when keys number < MinFlushKeys and size >= MinFlushMemSize
avgKeySize = int(MinFlushMemSize/MinFlushKeys) / 2
memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
<-blockCh
return nil
})
for i := 0; i < MinFlushKeys-1; i++ {
key := []byte(strconv.Itoa(i))
value := make([]byte, avgKeySize*2-len(key)+1)
// (key + value) * (MinFLushKeys - 1) > MinFlushMemSize
memdb.Set(key, value)
flushed, err := memdb.Flush(false)
require.False(t, flushed)
require.Nil(t, err)
require.False(t, memdb.OnFlushing())
}
require.Less(t, memdb.memDB.Len(), MinFlushKeys)
require.Greater(t, memdb.memDB.Mem(), MinFlushMemSize)
require.Less(t, memdb.memDB.Mem(), ForceFlushMemSizeThreshold)
// Flush when keys number >= MinFlushKeys and mem size >= MinFlushMemSize
memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
<-blockCh
return nil
})
for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(i))
value := make([]byte, avgKeySize*2-len(key)+1) // (key + value) * MinFLushKeys > MinFlushKeys
memdb.Set(key, value)
flushed, err := memdb.Flush(false)
require.Nil(t, err)
if i == MinFlushKeys-1 {
require.True(t, flushed)
require.True(t, memdb.OnFlushing())
} else {
require.False(t, flushed)
require.False(t, memdb.OnFlushing())
}
}
require.Equal(t, memdb.memDB.Len(), 0)
require.Equal(t, memdb.memDB.Size(), 0)
// the flushingMemDB length and size should be added to the total length and size.
require.Equal(t, memdb.Len(), MinFlushKeys)
require.Equal(t, memdb.Size(), memdb.flushingMemDB.Size())
close(blockCh)
require.Nil(t, memdb.FlushWait())
}
func TestPipelinedFlushSkip(t *testing.T) {
blockCh := make(chan struct{})
memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
<-blockCh
return nil
})
for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(i))
value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
memdb.Set(key, value)
}
flushed, err := memdb.Flush(false)
require.True(t, flushed)
require.Nil(t, err)
require.True(t, memdb.OnFlushing())
require.Equal(t, memdb.memDB.Len(), 0)
require.Equal(t, memdb.memDB.Size(), 0)
for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(MinFlushKeys + i))
value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
memdb.Set(key, value)
}
flushed, err = memdb.Flush(false)
require.Nil(t, err)
// flush is skipped because there is an ongoing flush.
require.False(t, flushed)
require.Equal(t, memdb.memDB.Len(), MinFlushKeys)
close(blockCh)
require.Nil(t, memdb.FlushWait())
// can flush when the ongoing flush is done.
flushed, err = memdb.Flush(false)
require.True(t, flushed)
require.Nil(t, err)
require.Equal(t, memdb.memDB.Len(), 0)
require.Equal(t, memdb.len, 2*MinFlushKeys)
require.Nil(t, memdb.FlushWait())
}
func TestPipelinedFlushBlock(t *testing.T) {
blockCh := make(chan struct{})
memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
<-blockCh
return nil
})
for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(i))
value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
memdb.Set(key, value)
}
flushed, err := memdb.Flush(false)
require.Nil(t, err)
require.True(t, flushed)
require.True(t, memdb.OnFlushing())
require.Equal(t, memdb.memDB.Len(), 0)
require.Equal(t, memdb.memDB.Size(), 0)
// When size of memdb is greater than ForceFlushMemSizeThreshold, Flush will be blocked.
for i := 0; i < MinFlushKeys-1; i++ {
key := []byte(strconv.Itoa(MinFlushKeys + i))
value := make([]byte, int(ForceFlushMemSizeThreshold/(MinFlushKeys-1))-len(key)+1)
memdb.Set(key, value)
}
require.Greater(t, memdb.memDB.Mem(), ForceFlushMemSizeThreshold)
flushReturned := make(chan struct{})
oneSec := time.After(time.Second)
go func() {
flushed, err := memdb.Flush(false)
require.Nil(t, err)
require.True(t, flushed)
close(flushReturned)
}()
select {
case <-flushReturned:
require.Fail(t, "Flush should be blocked")
case <-oneSec:
}
require.True(t, memdb.OnFlushing())
blockCh <- struct{}{} // first flush done
<-flushReturned // second flush start
require.True(t, memdb.OnFlushing())
close(blockCh)
require.Nil(t, memdb.FlushWait())
}
func TestPipelinedFlushGet(t *testing.T) {
blockCh := make(chan struct{})
memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
<-blockCh
return nil
})
memdb.Set([]byte("key"), []byte("value"))
for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(i))
value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
memdb.Set(key, value)
}
value, err := memdb.Get(context.Background(), []byte("key"))
require.Nil(t, err)
require.Equal(t, value, []byte("value"))
flushed, err := memdb.Flush(false)
require.True(t, flushed)
require.Nil(t, err)
require.True(t, memdb.OnFlushing())
// The key is in flushingMemDB memdb instead of current mutable memdb.
_, err = memdb.memDB.Get(context.Background(), []byte("key"))
require.True(t, tikverr.IsErrNotFound(err))
// But we still can get the value by PipelinedMemDB.Get.
value, err = memdb.Get(context.Background(), []byte("key"))
require.Nil(t, err)
require.Equal(t, value, []byte("value"))
// finish the first flush
blockCh <- struct{}{}
for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(i))
value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
memdb.Set(key, value)
}
flushed, err = memdb.Flush(false)
require.Nil(t, err)
require.True(t, flushed)
require.True(t, memdb.OnFlushing())
// now the key is guaranteed to be flushed into stores, though PipelinedMemDB.Get does not see it, snapshot get should get it.
_, err = memdb.Get(context.Background(), []byte("key"))
require.True(t, tikverr.IsErrNotFound(err))
close(blockCh)
require.Nil(t, memdb.FlushWait())
}
func TestPipelinedFlushSize(t *testing.T) {
memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
return nil
})
size := 0
keys := 0
for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(i))
value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
keys++
size += len(key) + len(value)
memdb.Set(key, value)
require.Equal(t, memdb.Len(), keys)
require.Equal(t, memdb.Size(), size)
}
// keys & size should be accumulated into PipelinedMemDB.
flushed, err := memdb.Flush(false)
require.Nil(t, err)
require.True(t, flushed)
require.Equal(t, memdb.memDB.Len(), 0)
require.Equal(t, memdb.memDB.Size(), 0)
require.Equal(t, memdb.Len(), keys)
require.Equal(t, memdb.Size(), size)
for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(MinFlushKeys + i))
value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
keys++
size += len(key) + len(value)
memdb.Set(key, value)
require.Equal(t, memdb.Len(), keys)
require.Equal(t, memdb.Size(), size)
}
require.Equal(t, memdb.Len(), keys)
require.Equal(t, memdb.Size(), size)
// with final flush, keys & size should not be changed.
flushed, err = memdb.Flush(true)
require.Nil(t, err)
require.True(t, flushed)
require.Equal(t, memdb.Len(), keys)
require.Equal(t, memdb.Size(), size)
require.Nil(t, memdb.FlushWait())
}
func TestPipelinedFlushGeneration(t *testing.T) {
generationCh := make(chan uint64)
memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(generation uint64, db *MemDB) error {
generationCh <- generation
return nil
})
for i := 0; i < 100; i++ {
memdb.Set([]byte{uint8(i)}, []byte{uint8(i)})
memdb.Flush(true)
// generation start from 1
require.Equal(t, <-generationCh, uint64(i+1))
}
require.Nil(t, memdb.FlushWait())
}
func TestErrorIterator(t *testing.T) {
iteratorToErr := func(iter Iterator) {
for iter.Valid() {
err := iter.Next()
if err != nil {
return
}
}
t.Log("iterator does not return error")
t.Fail()
}
memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
return nil
})
iteratorToErr(memdb.SnapshotIter(nil, nil))
iteratorToErr(memdb.SnapshotIterReverse(nil, nil))
}
func TestPipelinedAdjustFlushCondition(t *testing.T) {
util.EnableFailpoints()
memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { return nil })
memdb.Set([]byte("key"), []byte("value"))
flushed, err := memdb.Flush(false)
require.Nil(t, err)
require.False(t, flushed)
// can flush even only 1 key
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushKeys", `return(1)`))
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushSize", `return(1)`))
memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { return nil })
memdb.Set([]byte("key"), []byte("value"))
flushed, err = memdb.Flush(false)
require.Nil(t, err)
require.True(t, flushed)
require.Nil(t, memdb.FlushWait())
// need 2 keys to flush
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushKeys", `return(2)`))
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushSize", `return(1)`))
memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { return nil })
memdb.Set([]byte("key"), []byte("value"))
flushed, err = memdb.Flush(false)
require.Nil(t, err)
require.False(t, flushed)
require.Nil(t, memdb.FlushWait())
// need 2 keys to flush, but force threshold reached
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushKeys", `return(2)`))
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushSize", `return(1)`))
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBForceFlushSizeThreshold", `return(2)`))
memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { return nil })
memdb.Set([]byte("key"), []byte("value"))
flushed, err = memdb.Flush(false)
require.Nil(t, err)
require.True(t, flushed)
require.Nil(t, failpoint.Disable("tikvclient/pipelinedMemDBMinFlushKeys"))
require.Nil(t, failpoint.Disable("tikvclient/pipelinedMemDBMinFlushSize"))
require.Nil(t, failpoint.Disable("tikvclient/pipelinedMemDBForceFlushSizeThreshold"))
require.Nil(t, memdb.FlushWait())
}
func TestMemBufferBatchGetCache(t *testing.T) {
util.EnableFailpoints()
flushDone := make(chan struct{})
var remoteMutex sync.RWMutex
remoteBuffer := make(map[string][]byte, 16)
pipelinedMemdb := NewPipelinedMemDB(func(_ context.Context, keys [][]byte) (map[string][]byte, error) {
remoteMutex.RLock()
defer remoteMutex.RUnlock()
m := make(map[string][]byte, len(keys))
for _, k := range keys {
if val, ok := remoteBuffer[string(k)]; ok {
m[string(k)] = val
}
}
return m, nil
}, func(_ uint64, db *MemDB) error {
remoteMutex.Lock()
defer remoteMutex.Unlock()
for it, _ := db.Iter(nil, nil); it.Valid(); it.Next() {
remoteBuffer[string(it.Key())] = it.Value()
}
flushDone <- struct{}{}
return nil
})
mustGetFromCache := func(key []byte) ([]byte, error) {
require.NotNil(t, pipelinedMemdb.batchGetCache)
v, ok := pipelinedMemdb.batchGetCache[string(key)]
require.True(t, ok)
inner := v.Inner()
if inner == nil {
return nil, tikverr.ErrNotExist
}
return *inner, nil
}
mustNotExistInCache := func(key []byte) {
require.NotNil(t, pipelinedMemdb.batchGetCache)
_, ok := pipelinedMemdb.batchGetCache[string(key)]
require.False(t, ok)
}
mustFlush := func() {
flushed, err := pipelinedMemdb.Flush(true)
require.Nil(t, err)
require.True(t, flushed)
<-flushDone
}
err := pipelinedMemdb.Set([]byte("k1"), []byte("v11"))
require.Nil(t, err)
mustFlush()
err = pipelinedMemdb.Set([]byte("k2"), []byte("v21"))
require.Nil(t, err)
mustFlush()
// memdb: [], flushing memdb: [k2 -> v21], remoteBuffer: [k1 -> v11, k2 -> v21]
_, err = pipelinedMemdb.GetLocal(context.Background(), []byte("k1"))
require.Error(t, err)
require.True(t, tikverr.IsErrNotFound(err))
v, err := pipelinedMemdb.GetLocal(context.Background(), []byte("k2"))
require.Nil(t, err)
require.Equal(t, v, []byte("v21"))
// batch get caches the result
// cache: [k1 -> v11, k2 -> v21]
m, err := pipelinedMemdb.BatchGet(context.Background(), [][]byte{[]byte("k1"), []byte("k2")})
require.Nil(t, err)
require.Equal(t, m, map[string][]byte{"k1": []byte("v11"), "k2": []byte("v21")})
v, err = mustGetFromCache([]byte("k1"))
require.Nil(t, err)
require.Equal(t, v, []byte("v11"))
v, err = mustGetFromCache([]byte("k2"))
require.Nil(t, err)
require.Equal(t, v, []byte("v21"))
mustNotExistInCache([]byte("k3"))
// cache: [k1 -> v11, k2 -> v21, k3 -> not exist]
m, err = pipelinedMemdb.BatchGet(context.Background(), [][]byte{[]byte("k3")})
require.Nil(t, err)
require.Len(t, m, 0)
_, err = mustGetFromCache([]byte("k3"))
require.Error(t, err)
require.True(t, tikverr.IsErrNotFound(err))
// memdb: [], flushing memdb: [k1 -> [], k3 -> []], remoteBuffer: [k1 -> [], k2 -> v21, k3 -> []]
pipelinedMemdb.Delete([]byte("k1"))
pipelinedMemdb.Set([]byte("k2"), []byte("v22"))
pipelinedMemdb.Delete([]byte("k3"))
mustFlush()
require.Nil(t, pipelinedMemdb.batchGetCache)
// cache: [k1 -> [], k2 -> v22, k3 -> [], k4 -> not exist]
m, err = pipelinedMemdb.BatchGet(context.Background(), [][]byte{[]byte("k1"), []byte("k2"), []byte("k3"), []byte("k4")})
require.Nil(t, err)
require.Equal(t, m, map[string][]byte{"k1": {}, "k2": []byte("v22"), "k3": {}})
v, err = mustGetFromCache([]byte("k1"))
require.Nil(t, err)
require.Len(t, v, 0)
v, err = mustGetFromCache([]byte("k2"))
require.Nil(t, err)
require.Equal(t, v, []byte("v22"))
v, err = mustGetFromCache([]byte("k3"))
require.Nil(t, err)
require.Len(t, v, 0)
_, err = mustGetFromCache([]byte("k4"))
require.Error(t, err)
require.True(t, tikverr.IsErrNotFound(err))
require.Nil(t, pipelinedMemdb.FlushWait())
}