Let pipelined memdb ignore bufferLimit (#1392)

* fix: pipelined memdb ignores bufferLimit

Signed-off-by: ekexium <eke@fastmail.com>

* refactor: introduce unlimitedSize

Signed-off-by: ekexium <eke@fastmail.com>

* refactor: unlimitedSize

Signed-off-by: ekexium <eke@fastmail.com>

---------

Signed-off-by: ekexium <eke@fastmail.com>
Co-authored-by: cfzjywxk <lsswxrxr@163.com>
This commit is contained in:
ekexium 2024-07-29 12:13:17 +08:00 committed by GitHub
parent 88ce38492f
commit fce0abfd82
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 22 additions and 20 deletions

View File

@ -90,13 +90,15 @@ type MemDB struct {
skipMutex bool skipMutex bool
} }
const unlimitedSize = math.MaxUint64
func newMemDB() *MemDB { func newMemDB() *MemDB {
db := new(MemDB) db := new(MemDB)
db.allocator.init() db.allocator.init()
db.root = nullAddr db.root = nullAddr
db.stages = make([]MemDBCheckpoint, 0, 2) db.stages = make([]MemDBCheckpoint, 0, 2)
db.entrySizeLimit = math.MaxUint64 db.entrySizeLimit = unlimitedSize
db.bufferSizeLimit = math.MaxUint64 db.bufferSizeLimit = unlimitedSize
db.vlog.memdb = db db.vlog.memdb = db
db.skipMutex = false db.skipMutex = false
return db return db

View File

@ -38,16 +38,16 @@ type PipelinedMemDB struct {
// Like MemDB, this RWMutex only used to ensure memdbSnapGetter.Get will not race with // Like MemDB, this RWMutex only used to ensure memdbSnapGetter.Get will not race with
// concurrent memdb.Set, memdb.SetWithFlags, memdb.Delete and memdb.UpdateFlags. // concurrent memdb.Set, memdb.SetWithFlags, memdb.Delete and memdb.UpdateFlags.
sync.RWMutex sync.RWMutex
onFlushing atomic.Bool onFlushing atomic.Bool
errCh chan error errCh chan error
flushFunc FlushFunc flushFunc FlushFunc
bufferBatchGetter BufferBatchGetter bufferBatchGetter BufferBatchGetter
memDB *MemDB memDB *MemDB
flushingMemDB *MemDB // the flushingMemDB is not wrapped by a mutex, because there is no data race in it. flushingMemDB *MemDB // the flushingMemDB is not wrapped by a mutex, because there is no data race in it.
len, size int // len and size records the total flushed and onflushing memdb. len, size int // len and size records the total flushed and onflushing memdb.
generation uint64 generation uint64
entryLimit, bufferLimit uint64 entryLimit uint64
flushOption flushOption flushOption flushOption
// prefetchCache is used to cache the result of BatchGet, it's invalidated when Flush. // prefetchCache is used to cache the result of BatchGet, it's invalidated when Flush.
// the values are wrapped by util.Option. // the values are wrapped by util.Option.
// None -> not found // None -> not found
@ -110,7 +110,6 @@ func NewPipelinedMemDB(bufferBatchGetter BufferBatchGetter, flushFunc FlushFunc)
generation: 0, generation: 0,
// keep entryLimit and bufferLimit same with the memdb's default values. // keep entryLimit and bufferLimit same with the memdb's default values.
entryLimit: memdb.entrySizeLimit, entryLimit: memdb.entrySizeLimit,
bufferLimit: memdb.bufferSizeLimit,
flushOption: flushOpt, flushOption: flushOpt,
} }
} }
@ -306,7 +305,8 @@ func (p *PipelinedMemDB) Flush(force bool) (bool, error) {
p.len += p.flushingMemDB.Len() p.len += p.flushingMemDB.Len()
p.size += p.flushingMemDB.Size() p.size += p.flushingMemDB.Size()
p.memDB = newMemDB() p.memDB = newMemDB()
p.memDB.SetEntrySizeLimit(p.entryLimit, p.bufferLimit) // buffer size is limited by ForceFlushMemSizeThreshold. Do not set bufferLimit
p.memDB.SetEntrySizeLimit(p.entryLimit, unlimitedSize)
p.memDB.setSkipMutex(true) p.memDB.setSkipMutex(true)
p.generation++ p.generation++
go func(generation uint64) { go func(generation uint64) {
@ -404,9 +404,10 @@ func (p *PipelinedMemDB) IterReverse([]byte, []byte) (Iterator, error) {
} }
// SetEntrySizeLimit sets the size limit for each entry and total buffer. // SetEntrySizeLimit sets the size limit for each entry and total buffer.
func (p *PipelinedMemDB) SetEntrySizeLimit(entryLimit, bufferLimit uint64) { func (p *PipelinedMemDB) SetEntrySizeLimit(entryLimit, _ uint64) {
p.entryLimit, p.bufferLimit = entryLimit, bufferLimit p.entryLimit = entryLimit
p.memDB.SetEntrySizeLimit(entryLimit, bufferLimit) // buffer size is limited by ForceFlushMemSizeThreshold. Do not set bufferLimit.
p.memDB.SetEntrySizeLimit(entryLimit, unlimitedSize)
} }
func (p *PipelinedMemDB) Len() int { func (p *PipelinedMemDB) Len() int {

View File

@ -36,7 +36,6 @@ package unionstore
import ( import (
"context" "context"
"math"
"time" "time"
tikverr "github.com/tikv/client-go/v2/error" tikverr "github.com/tikv/client-go/v2/error"
@ -155,10 +154,10 @@ func (us *KVUnionStore) UnmarkPresumeKeyNotExists(k []byte) {
// SetEntrySizeLimit sets the size limit for each entry and total buffer. // SetEntrySizeLimit sets the size limit for each entry and total buffer.
func (us *KVUnionStore) SetEntrySizeLimit(entryLimit, bufferLimit uint64) { func (us *KVUnionStore) SetEntrySizeLimit(entryLimit, bufferLimit uint64) {
if entryLimit == 0 { if entryLimit == 0 {
entryLimit = math.MaxUint64 entryLimit = unlimitedSize
} }
if bufferLimit == 0 { if bufferLimit == 0 {
bufferLimit = math.MaxUint64 bufferLimit = unlimitedSize
} }
us.memBuffer.SetEntrySizeLimit(entryLimit, bufferLimit) us.memBuffer.SetEntrySizeLimit(entryLimit, bufferLimit)
} }