From 4183ab10fab6522557553301b6050a8c9d48a43f Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 15 Apr 2024 16:53:35 +0800 Subject: [PATCH] Use Mem() instead of Size() to estimate pipelined-memdb size (#1286) * fix: use Mem() instead of Size() to evaluate pipelined-memdb size, for better memory control Signed-off-by: ekexium * test: fix TestPipelinedFlushTrigger Signed-off-by: ekexium --------- Signed-off-by: ekexium --- internal/unionstore/pipelined_memdb.go | 64 ++++++++++++++------- internal/unionstore/pipelined_memdb_test.go | 41 +++++++------ 2 files changed, 66 insertions(+), 39 deletions(-) diff --git a/internal/unionstore/pipelined_memdb.go b/internal/unionstore/pipelined_memdb.go index 69b39ba2..ffdf804f 100644 --- a/internal/unionstore/pipelined_memdb.go +++ b/internal/unionstore/pipelined_memdb.go @@ -62,32 +62,32 @@ const ( // small batch can lead to poor performance and resource waste in random write workload. // 10K batch size is large enough to get good performance with random write workloads in tests. MinFlushKeys = 10000 - // MinFlushSize is the minimum size of MemDB to trigger flush. - MinFlushSize = 16 * 1024 * 1024 // 16MB - // ForceFlushSizeThreshold is the threshold to force flush MemDB, which controls the max memory consumption of PipelinedMemDB. - ForceFlushSizeThreshold = 128 * 1024 * 1024 // 128MB + // MinFlushMemSize is the minimum size of MemDB to trigger flush. + MinFlushMemSize uint64 = 16 * 1024 * 1024 // 16MB + // ForceFlushMemSizeThreshold is the threshold to force flush MemDB, which controls the max memory consumption of PipelinedMemDB. + ForceFlushMemSizeThreshold uint64 = 128 * 1024 * 1024 // 128MB ) type flushOption struct { - MinFlushKeys int - MinFlushSize int - ForceFlushSizeThreshold int + MinFlushKeys uint64 + MinFlushMemSize uint64 + ForceFlushMemSizeThreshold uint64 } func newFlushOption() flushOption { opt := flushOption{ - MinFlushKeys: MinFlushKeys, - MinFlushSize: MinFlushSize, - ForceFlushSizeThreshold: ForceFlushSizeThreshold, + MinFlushKeys: MinFlushKeys, + MinFlushMemSize: MinFlushMemSize, + ForceFlushMemSizeThreshold: ForceFlushMemSizeThreshold, } if val, err := util.EvalFailpoint("pipelinedMemDBMinFlushKeys"); err == nil && val != nil { - opt.MinFlushKeys = val.(int) + opt.MinFlushKeys = uint64(val.(int)) } if val, err := util.EvalFailpoint("pipelinedMemDBMinFlushSize"); err == nil && val != nil { - opt.MinFlushSize = val.(int) + opt.MinFlushMemSize = uint64(val.(int)) } if val, err := util.EvalFailpoint("pipelinedMemDBForceFlushSizeThreshold"); err == nil && val != nil { - opt.ForceFlushSizeThreshold = val.(int) + opt.ForceFlushMemSizeThreshold = uint64(val.(int)) } return opt } @@ -323,15 +323,33 @@ func (p *PipelinedMemDB) Flush(force bool) (bool, error) { } func (p *PipelinedMemDB) needFlush() bool { - size := p.memDB.Size() - // size < MinFlushSize, do not flush. - // MinFlushSize <= size < ForceFlushSizeThreshold && keys < MinFlushKeys, do not flush. - // MinFlushSize <= size < ForceFlushSizeThreshold && keys >= MinFlushKeys, flush. - // size >= ForceFlushSizeThreshold, flush. - if size < p.flushOption.MinFlushSize || (p.memDB.Len() < p.flushOption.MinFlushKeys && size < p.flushOption.ForceFlushSizeThreshold) { + size := p.memDB.Mem() + // mem size < MinFlushMemSize, do not flush. + // MinFlushMemSize <= mem size < ForceFlushMemSizeThreshold && keys < MinFlushKeys, do not flush. + // MinFlushMemSize <= mem size < ForceFlushMemSizeThreshold && keys >= MinFlushKeys, flush. + // mem size >= ForceFlushMemSizeThreshold, flush. + + /* + Keys + ^ + | | + | | + | | + | | Flush + | | + MinKey(10k) | +------------+ + | | | + | No | No Flush | Flush + | Flush | | + +-----------------------------------------> Size + 0 MinSize(16MB) Force(128MB) + */ + if size < p.flushOption.MinFlushMemSize || + (uint64(p.memDB.Len()) < p.flushOption.MinFlushKeys && + size < p.flushOption.ForceFlushMemSizeThreshold) { return false } - if p.onFlushing.Load() && size < p.flushOption.ForceFlushSizeThreshold { + if p.onFlushing.Load() && size < p.flushOption.ForceFlushMemSizeThreshold { return false } return true @@ -391,7 +409,11 @@ func (p *PipelinedMemDB) Len() int { } func (p *PipelinedMemDB) Size() int { - return p.memDB.Size() + p.size + size := p.size + if p.memDB != nil { + size += p.memDB.Size() + } + return size } func (p *PipelinedMemDB) OnFlushing() bool { diff --git a/internal/unionstore/pipelined_memdb_test.go b/internal/unionstore/pipelined_memdb_test.go index cf85aa52..745489f5 100644 --- a/internal/unionstore/pipelined_memdb_test.go +++ b/internal/unionstore/pipelined_memdb_test.go @@ -32,18 +32,20 @@ func emptyBufferBatchGetter(ctx context.Context, keys [][]byte) (map[string][]by } func TestPipelinedFlushTrigger(t *testing.T) { - avgKeySize := MinFlushSize / MinFlushKeys + // because memdb's memory usage is hard to control, we use a cargo-culted value here. + avgKeySize := int(MinFlushMemSize/MinFlushKeys) / 3 // block the flush goroutine for checking the flushingMemDB status. blockCh := make(chan struct{}) - // Will not flush when keys number >= MinFlushKeys and size < MinFlushSize + // Will not flush when keys number >= MinFlushKeys and size < MinFlushMemSize memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { <-blockCh return nil }) for i := 0; i < MinFlushKeys; i++ { key := []byte(strconv.Itoa(i)) - value := make([]byte, avgKeySize-len(key)-1) // (key + value) * MinFLushKeys < MinFlushKeys + value := make([]byte, avgKeySize-len(key)-1) + // (key + value) * MinFlushKeys < MinFlushMemSize memdb.Set(key, value) flushed, err := memdb.Flush(false) require.False(t, flushed) @@ -51,16 +53,18 @@ func TestPipelinedFlushTrigger(t *testing.T) { require.False(t, memdb.OnFlushing()) } require.Equal(t, memdb.memDB.Len(), MinFlushKeys) - require.Less(t, memdb.memDB.Size(), MinFlushSize) + require.Less(t, memdb.memDB.Mem(), MinFlushMemSize) - // Will not flush when keys number < MinFlushKeys and size >= MinFlushSize + // Will not flush when keys number < MinFlushKeys and size >= MinFlushMemSize + avgKeySize = int(MinFlushMemSize/MinFlushKeys) / 2 memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { <-blockCh return nil }) for i := 0; i < MinFlushKeys-1; i++ { key := []byte(strconv.Itoa(i)) - value := make([]byte, avgKeySize-len(key)+1) // (key + value) * (MinFLushKeys - 1) > MinFlushKeys + value := make([]byte, avgKeySize-len(key)+1) + // (key + value) * (MinFLushKeys - 1) > MinFlushMemSize memdb.Set(key, value) flushed, err := memdb.Flush(false) require.False(t, flushed) @@ -68,9 +72,10 @@ func TestPipelinedFlushTrigger(t *testing.T) { require.False(t, memdb.OnFlushing()) } require.Less(t, memdb.memDB.Len(), MinFlushKeys) - require.Greater(t, memdb.memDB.Size(), MinFlushSize) + require.Greater(t, memdb.memDB.Mem(), MinFlushMemSize) + require.Less(t, memdb.memDB.Mem(), ForceFlushMemSizeThreshold) - // Flush when keys number >= MinFlushKeys and size >= MinFlushSize + // Flush when keys number >= MinFlushKeys and mem size >= MinFlushMemSize memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { <-blockCh return nil @@ -106,7 +111,7 @@ func TestPipelinedFlushSkip(t *testing.T) { }) for i := 0; i < MinFlushKeys; i++ { key := []byte(strconv.Itoa(i)) - value := make([]byte, MinFlushSize/MinFlushKeys-len(key)+1) + value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1) memdb.Set(key, value) } flushed, err := memdb.Flush(false) @@ -117,7 +122,7 @@ func TestPipelinedFlushSkip(t *testing.T) { require.Equal(t, memdb.memDB.Size(), 0) for i := 0; i < MinFlushKeys; i++ { key := []byte(strconv.Itoa(MinFlushKeys + i)) - value := make([]byte, MinFlushSize/MinFlushKeys-len(key)+1) + value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1) memdb.Set(key, value) } flushed, err = memdb.Flush(false) @@ -144,7 +149,7 @@ func TestPipelinedFlushBlock(t *testing.T) { }) for i := 0; i < MinFlushKeys; i++ { key := []byte(strconv.Itoa(i)) - value := make([]byte, MinFlushSize/MinFlushKeys-len(key)+1) + value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1) memdb.Set(key, value) } flushed, err := memdb.Flush(false) @@ -154,13 +159,13 @@ func TestPipelinedFlushBlock(t *testing.T) { require.Equal(t, memdb.memDB.Len(), 0) require.Equal(t, memdb.memDB.Size(), 0) - // When size of memdb is greater than ForceFlushSizeThreshold, Flush will be blocked. + // When size of memdb is greater than ForceFlushMemSizeThreshold, Flush will be blocked. for i := 0; i < MinFlushKeys-1; i++ { key := []byte(strconv.Itoa(MinFlushKeys + i)) - value := make([]byte, ForceFlushSizeThreshold/(MinFlushKeys-1)-len(key)+1) + value := make([]byte, int(ForceFlushMemSizeThreshold/(MinFlushKeys-1))-len(key)+1) memdb.Set(key, value) } - require.Greater(t, memdb.memDB.Size(), ForceFlushSizeThreshold) + require.Greater(t, memdb.memDB.Mem(), ForceFlushMemSizeThreshold) flushReturned := make(chan struct{}) oneSec := time.After(time.Second) go func() { @@ -191,7 +196,7 @@ func TestPipelinedFlushGet(t *testing.T) { memdb.Set([]byte("key"), []byte("value")) for i := 0; i < MinFlushKeys; i++ { key := []byte(strconv.Itoa(i)) - value := make([]byte, MinFlushSize/MinFlushKeys-len(key)+1) + value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1) memdb.Set(key, value) } value, err := memdb.Get(context.Background(), []byte("key")) @@ -214,7 +219,7 @@ func TestPipelinedFlushGet(t *testing.T) { blockCh <- struct{}{} for i := 0; i < MinFlushKeys; i++ { key := []byte(strconv.Itoa(i)) - value := make([]byte, MinFlushSize/MinFlushKeys-len(key)+1) + value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1) memdb.Set(key, value) } flushed, err = memdb.Flush(false) @@ -237,7 +242,7 @@ func TestPipelinedFlushSize(t *testing.T) { keys := 0 for i := 0; i < MinFlushKeys; i++ { key := []byte(strconv.Itoa(i)) - value := make([]byte, MinFlushSize/MinFlushKeys-len(key)+1) + value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1) keys++ size += len(key) + len(value) memdb.Set(key, value) @@ -255,7 +260,7 @@ func TestPipelinedFlushSize(t *testing.T) { for i := 0; i < MinFlushKeys; i++ { key := []byte(strconv.Itoa(MinFlushKeys + i)) - value := make([]byte, MinFlushSize/MinFlushKeys-len(key)+1) + value := make([]byte, int(MinFlushMemSize/MinFlushKeys)-len(key)+1) keys++ size += len(key) + len(value) memdb.Set(key, value)