Use Mem() instead of Size() to estimate pipelined-memdb size (#1286)

* fix: use Mem() instead of Size() to evaluate pipelined-memdb size, for better memory control

Signed-off-by: ekexium <eke@fastmail.com>

* test: fix TestPipelinedFlushTrigger

Signed-off-by: ekexium <eke@fastmail.com>

---------

Signed-off-by: ekexium <eke@fastmail.com>
This commit is contained in:
ekexium 2024-04-15 16:53:35 +08:00 committed by GitHub
parent a23f6cac0c
commit 4183ab10fa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 66 additions and 39 deletions

View File

@ -62,32 +62,32 @@ const (
// small batch can lead to poor performance and resource waste in random write workload. // small batch can lead to poor performance and resource waste in random write workload.
// 10K batch size is large enough to get good performance with random write workloads in tests. // 10K batch size is large enough to get good performance with random write workloads in tests.
MinFlushKeys = 10000 MinFlushKeys = 10000
// MinFlushSize is the minimum size of MemDB to trigger flush. // MinFlushMemSize is the minimum size of MemDB to trigger flush.
MinFlushSize = 16 * 1024 * 1024 // 16MB MinFlushMemSize uint64 = 16 * 1024 * 1024 // 16MB
// ForceFlushSizeThreshold is the threshold to force flush MemDB, which controls the max memory consumption of PipelinedMemDB. // ForceFlushMemSizeThreshold is the threshold to force flush MemDB, which controls the max memory consumption of PipelinedMemDB.
ForceFlushSizeThreshold = 128 * 1024 * 1024 // 128MB ForceFlushMemSizeThreshold uint64 = 128 * 1024 * 1024 // 128MB
) )
type flushOption struct { type flushOption struct {
MinFlushKeys int MinFlushKeys uint64
MinFlushSize int MinFlushMemSize uint64
ForceFlushSizeThreshold int ForceFlushMemSizeThreshold uint64
} }
func newFlushOption() flushOption { func newFlushOption() flushOption {
opt := flushOption{ opt := flushOption{
MinFlushKeys: MinFlushKeys, MinFlushKeys: MinFlushKeys,
MinFlushSize: MinFlushSize, MinFlushMemSize: MinFlushMemSize,
ForceFlushSizeThreshold: ForceFlushSizeThreshold, ForceFlushMemSizeThreshold: ForceFlushMemSizeThreshold,
} }
if val, err := util.EvalFailpoint("pipelinedMemDBMinFlushKeys"); err == nil && val != nil { if val, err := util.EvalFailpoint("pipelinedMemDBMinFlushKeys"); err == nil && val != nil {
opt.MinFlushKeys = val.(int) opt.MinFlushKeys = uint64(val.(int))
} }
if val, err := util.EvalFailpoint("pipelinedMemDBMinFlushSize"); err == nil && val != nil { if val, err := util.EvalFailpoint("pipelinedMemDBMinFlushSize"); err == nil && val != nil {
opt.MinFlushSize = val.(int) opt.MinFlushMemSize = uint64(val.(int))
} }
if val, err := util.EvalFailpoint("pipelinedMemDBForceFlushSizeThreshold"); err == nil && val != nil { if val, err := util.EvalFailpoint("pipelinedMemDBForceFlushSizeThreshold"); err == nil && val != nil {
opt.ForceFlushSizeThreshold = val.(int) opt.ForceFlushMemSizeThreshold = uint64(val.(int))
} }
return opt return opt
} }
@ -323,15 +323,33 @@ func (p *PipelinedMemDB) Flush(force bool) (bool, error) {
} }
func (p *PipelinedMemDB) needFlush() bool { func (p *PipelinedMemDB) needFlush() bool {
size := p.memDB.Size() size := p.memDB.Mem()
// size < MinFlushSize, do not flush. // mem size < MinFlushMemSize, do not flush.
// MinFlushSize <= size < ForceFlushSizeThreshold && keys < MinFlushKeys, do not flush. // MinFlushMemSize <= mem size < ForceFlushMemSizeThreshold && keys < MinFlushKeys, do not flush.
// MinFlushSize <= size < ForceFlushSizeThreshold && keys >= MinFlushKeys, flush. // MinFlushMemSize <= mem size < ForceFlushMemSizeThreshold && keys >= MinFlushKeys, flush.
// size >= ForceFlushSizeThreshold, flush. // mem size >= ForceFlushMemSizeThreshold, flush.
if size < p.flushOption.MinFlushSize || (p.memDB.Len() < p.flushOption.MinFlushKeys && size < p.flushOption.ForceFlushSizeThreshold) {
/*
Keys
^
| |
| |
| |
| | Flush
| |
MinKey(10k) | +------------+
| | |
| No | No Flush | Flush
| Flush | |
+-----------------------------------------> Size
0 MinSize(16MB) Force(128MB)
*/
if size < p.flushOption.MinFlushMemSize ||
(uint64(p.memDB.Len()) < p.flushOption.MinFlushKeys &&
size < p.flushOption.ForceFlushMemSizeThreshold) {
return false return false
} }
if p.onFlushing.Load() && size < p.flushOption.ForceFlushSizeThreshold { if p.onFlushing.Load() && size < p.flushOption.ForceFlushMemSizeThreshold {
return false return false
} }
return true return true
@ -391,7 +409,11 @@ func (p *PipelinedMemDB) Len() int {
} }
func (p *PipelinedMemDB) Size() int { func (p *PipelinedMemDB) Size() int {
return p.memDB.Size() + p.size size := p.size
if p.memDB != nil {
size += p.memDB.Size()
}
return size
} }
func (p *PipelinedMemDB) OnFlushing() bool { func (p *PipelinedMemDB) OnFlushing() bool {

View File

@ -32,18 +32,20 @@ func emptyBufferBatchGetter(ctx context.Context, keys [][]byte) (map[string][]by
} }
func TestPipelinedFlushTrigger(t *testing.T) { func TestPipelinedFlushTrigger(t *testing.T) {
avgKeySize := MinFlushSize / MinFlushKeys // because memdb's memory usage is hard to control, we use a cargo-culted value here.
avgKeySize := int(MinFlushMemSize/MinFlushKeys) / 3
// block the flush goroutine for checking the flushingMemDB status. // block the flush goroutine for checking the flushingMemDB status.
blockCh := make(chan struct{}) blockCh := make(chan struct{})
// Will not flush when keys number >= MinFlushKeys and size < MinFlushSize // Will not flush when keys number >= MinFlushKeys and size < MinFlushMemSize
memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
<-blockCh <-blockCh
return nil return nil
}) })
for i := 0; i < MinFlushKeys; i++ { for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(i)) key := []byte(strconv.Itoa(i))
value := make([]byte, avgKeySize-len(key)-1) // (key + value) * MinFLushKeys < MinFlushKeys value := make([]byte, avgKeySize-len(key)-1)
// (key + value) * MinFlushKeys < MinFlushMemSize
memdb.Set(key, value) memdb.Set(key, value)
flushed, err := memdb.Flush(false) flushed, err := memdb.Flush(false)
require.False(t, flushed) require.False(t, flushed)
@ -51,16 +53,18 @@ func TestPipelinedFlushTrigger(t *testing.T) {
require.False(t, memdb.OnFlushing()) require.False(t, memdb.OnFlushing())
} }
require.Equal(t, memdb.memDB.Len(), MinFlushKeys) require.Equal(t, memdb.memDB.Len(), MinFlushKeys)
require.Less(t, memdb.memDB.Size(), MinFlushSize) require.Less(t, memdb.memDB.Mem(), MinFlushMemSize)
// Will not flush when keys number < MinFlushKeys and size >= MinFlushSize // Will not flush when keys number < MinFlushKeys and size >= MinFlushMemSize
avgKeySize = int(MinFlushMemSize/MinFlushKeys) / 2
memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
<-blockCh <-blockCh
return nil return nil
}) })
for i := 0; i < MinFlushKeys-1; i++ { for i := 0; i < MinFlushKeys-1; i++ {
key := []byte(strconv.Itoa(i)) key := []byte(strconv.Itoa(i))
value := make([]byte, avgKeySize-len(key)+1) // (key + value) * (MinFLushKeys - 1) > MinFlushKeys value := make([]byte, avgKeySize-len(key)+1)
// (key + value) * (MinFLushKeys - 1) > MinFlushMemSize
memdb.Set(key, value) memdb.Set(key, value)
flushed, err := memdb.Flush(false) flushed, err := memdb.Flush(false)
require.False(t, flushed) require.False(t, flushed)
@ -68,9 +72,10 @@ func TestPipelinedFlushTrigger(t *testing.T) {
require.False(t, memdb.OnFlushing()) require.False(t, memdb.OnFlushing())
} }
require.Less(t, memdb.memDB.Len(), MinFlushKeys) require.Less(t, memdb.memDB.Len(), MinFlushKeys)
require.Greater(t, memdb.memDB.Size(), MinFlushSize) require.Greater(t, memdb.memDB.Mem(), MinFlushMemSize)
require.Less(t, memdb.memDB.Mem(), ForceFlushMemSizeThreshold)
// Flush when keys number >= MinFlushKeys and size >= MinFlushSize // Flush when keys number >= MinFlushKeys and mem size >= MinFlushMemSize
memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error {
<-blockCh <-blockCh
return nil return nil
@ -106,7 +111,7 @@ func TestPipelinedFlushSkip(t *testing.T) {
}) })
for i := 0; i < MinFlushKeys; i++ { for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(i)) key := []byte(strconv.Itoa(i))
value := make([]byte, MinFlushSize/MinFlushKeys-len(key)+1) value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
memdb.Set(key, value) memdb.Set(key, value)
} }
flushed, err := memdb.Flush(false) flushed, err := memdb.Flush(false)
@ -117,7 +122,7 @@ func TestPipelinedFlushSkip(t *testing.T) {
require.Equal(t, memdb.memDB.Size(), 0) require.Equal(t, memdb.memDB.Size(), 0)
for i := 0; i < MinFlushKeys; i++ { for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(MinFlushKeys + i)) key := []byte(strconv.Itoa(MinFlushKeys + i))
value := make([]byte, MinFlushSize/MinFlushKeys-len(key)+1) value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
memdb.Set(key, value) memdb.Set(key, value)
} }
flushed, err = memdb.Flush(false) flushed, err = memdb.Flush(false)
@ -144,7 +149,7 @@ func TestPipelinedFlushBlock(t *testing.T) {
}) })
for i := 0; i < MinFlushKeys; i++ { for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(i)) key := []byte(strconv.Itoa(i))
value := make([]byte, MinFlushSize/MinFlushKeys-len(key)+1) value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
memdb.Set(key, value) memdb.Set(key, value)
} }
flushed, err := memdb.Flush(false) flushed, err := memdb.Flush(false)
@ -154,13 +159,13 @@ func TestPipelinedFlushBlock(t *testing.T) {
require.Equal(t, memdb.memDB.Len(), 0) require.Equal(t, memdb.memDB.Len(), 0)
require.Equal(t, memdb.memDB.Size(), 0) require.Equal(t, memdb.memDB.Size(), 0)
// When size of memdb is greater than ForceFlushSizeThreshold, Flush will be blocked. // When size of memdb is greater than ForceFlushMemSizeThreshold, Flush will be blocked.
for i := 0; i < MinFlushKeys-1; i++ { for i := 0; i < MinFlushKeys-1; i++ {
key := []byte(strconv.Itoa(MinFlushKeys + i)) key := []byte(strconv.Itoa(MinFlushKeys + i))
value := make([]byte, ForceFlushSizeThreshold/(MinFlushKeys-1)-len(key)+1) value := make([]byte, int(ForceFlushMemSizeThreshold/(MinFlushKeys-1))-len(key)+1)
memdb.Set(key, value) memdb.Set(key, value)
} }
require.Greater(t, memdb.memDB.Size(), ForceFlushSizeThreshold) require.Greater(t, memdb.memDB.Mem(), ForceFlushMemSizeThreshold)
flushReturned := make(chan struct{}) flushReturned := make(chan struct{})
oneSec := time.After(time.Second) oneSec := time.After(time.Second)
go func() { go func() {
@ -191,7 +196,7 @@ func TestPipelinedFlushGet(t *testing.T) {
memdb.Set([]byte("key"), []byte("value")) memdb.Set([]byte("key"), []byte("value"))
for i := 0; i < MinFlushKeys; i++ { for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(i)) key := []byte(strconv.Itoa(i))
value := make([]byte, MinFlushSize/MinFlushKeys-len(key)+1) value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
memdb.Set(key, value) memdb.Set(key, value)
} }
value, err := memdb.Get(context.Background(), []byte("key")) value, err := memdb.Get(context.Background(), []byte("key"))
@ -214,7 +219,7 @@ func TestPipelinedFlushGet(t *testing.T) {
blockCh <- struct{}{} blockCh <- struct{}{}
for i := 0; i < MinFlushKeys; i++ { for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(i)) key := []byte(strconv.Itoa(i))
value := make([]byte, MinFlushSize/MinFlushKeys-len(key)+1) value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
memdb.Set(key, value) memdb.Set(key, value)
} }
flushed, err = memdb.Flush(false) flushed, err = memdb.Flush(false)
@ -237,7 +242,7 @@ func TestPipelinedFlushSize(t *testing.T) {
keys := 0 keys := 0
for i := 0; i < MinFlushKeys; i++ { for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(i)) key := []byte(strconv.Itoa(i))
value := make([]byte, MinFlushSize/MinFlushKeys-len(key)+1) value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
keys++ keys++
size += len(key) + len(value) size += len(key) + len(value)
memdb.Set(key, value) memdb.Set(key, value)
@ -255,7 +260,7 @@ func TestPipelinedFlushSize(t *testing.T) {
for i := 0; i < MinFlushKeys; i++ { for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(MinFlushKeys + i)) key := []byte(strconv.Itoa(MinFlushKeys + i))
value := make([]byte, MinFlushSize/MinFlushKeys-len(key)+1) value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1)
keys++ keys++
size += len(key) + len(value) size += len(key) + len(value)
memdb.Set(key, value) memdb.Set(key, value)