diff --git a/internal/unionstore/memdb.go b/internal/unionstore/memdb.go index acfc7498..76fe3794 100644 --- a/internal/unionstore/memdb.go +++ b/internal/unionstore/memdb.go @@ -90,13 +90,15 @@ type MemDB struct { skipMutex bool } +const unlimitedSize = math.MaxUint64 + func newMemDB() *MemDB { db := new(MemDB) db.allocator.init() db.root = nullAddr db.stages = make([]MemDBCheckpoint, 0, 2) - db.entrySizeLimit = math.MaxUint64 - db.bufferSizeLimit = math.MaxUint64 + db.entrySizeLimit = unlimitedSize + db.bufferSizeLimit = unlimitedSize db.vlog.memdb = db db.skipMutex = false return db diff --git a/internal/unionstore/pipelined_memdb.go b/internal/unionstore/pipelined_memdb.go index fd73fae7..28fb9f52 100644 --- a/internal/unionstore/pipelined_memdb.go +++ b/internal/unionstore/pipelined_memdb.go @@ -38,16 +38,16 @@ type PipelinedMemDB struct { // Like MemDB, this RWMutex only used to ensure memdbSnapGetter.Get will not race with // concurrent memdb.Set, memdb.SetWithFlags, memdb.Delete and memdb.UpdateFlags. sync.RWMutex - onFlushing atomic.Bool - errCh chan error - flushFunc FlushFunc - bufferBatchGetter BufferBatchGetter - memDB *MemDB - flushingMemDB *MemDB // the flushingMemDB is not wrapped by a mutex, because there is no data race in it. - len, size int // len and size records the total flushed and onflushing memdb. - generation uint64 - entryLimit, bufferLimit uint64 - flushOption flushOption + onFlushing atomic.Bool + errCh chan error + flushFunc FlushFunc + bufferBatchGetter BufferBatchGetter + memDB *MemDB + flushingMemDB *MemDB // the flushingMemDB is not wrapped by a mutex, because there is no data race in it. + len, size int // len and size records the total flushed and onflushing memdb. + generation uint64 + entryLimit uint64 + flushOption flushOption // prefetchCache is used to cache the result of BatchGet, it's invalidated when Flush. // the values are wrapped by util.Option. // None -> not found @@ -110,7 +110,6 @@ func NewPipelinedMemDB(bufferBatchGetter BufferBatchGetter, flushFunc FlushFunc) generation: 0, // keep entryLimit and bufferLimit same with the memdb's default values. entryLimit: memdb.entrySizeLimit, - bufferLimit: memdb.bufferSizeLimit, flushOption: flushOpt, } } @@ -306,7 +305,8 @@ func (p *PipelinedMemDB) Flush(force bool) (bool, error) { p.len += p.flushingMemDB.Len() p.size += p.flushingMemDB.Size() p.memDB = newMemDB() - p.memDB.SetEntrySizeLimit(p.entryLimit, p.bufferLimit) + // buffer size is limited by ForceFlushMemSizeThreshold. Do not set bufferLimit + p.memDB.SetEntrySizeLimit(p.entryLimit, unlimitedSize) p.memDB.setSkipMutex(true) p.generation++ go func(generation uint64) { @@ -404,9 +404,10 @@ func (p *PipelinedMemDB) IterReverse([]byte, []byte) (Iterator, error) { } // SetEntrySizeLimit sets the size limit for each entry and total buffer. -func (p *PipelinedMemDB) SetEntrySizeLimit(entryLimit, bufferLimit uint64) { - p.entryLimit, p.bufferLimit = entryLimit, bufferLimit - p.memDB.SetEntrySizeLimit(entryLimit, bufferLimit) +func (p *PipelinedMemDB) SetEntrySizeLimit(entryLimit, _ uint64) { + p.entryLimit = entryLimit + // buffer size is limited by ForceFlushMemSizeThreshold. Do not set bufferLimit. + p.memDB.SetEntrySizeLimit(entryLimit, unlimitedSize) } func (p *PipelinedMemDB) Len() int { diff --git a/internal/unionstore/union_store.go b/internal/unionstore/union_store.go index 61a4870c..643fb7ff 100644 --- a/internal/unionstore/union_store.go +++ b/internal/unionstore/union_store.go @@ -36,7 +36,6 @@ package unionstore import ( "context" - "math" "time" tikverr "github.com/tikv/client-go/v2/error" @@ -155,10 +154,10 @@ func (us *KVUnionStore) UnmarkPresumeKeyNotExists(k []byte) { // SetEntrySizeLimit sets the size limit for each entry and total buffer. func (us *KVUnionStore) SetEntrySizeLimit(entryLimit, bufferLimit uint64) { if entryLimit == 0 { - entryLimit = math.MaxUint64 + entryLimit = unlimitedSize } if bufferLimit == 0 { - bufferLimit = math.MaxUint64 + bufferLimit = unlimitedSize } us.memBuffer.SetEntrySizeLimit(entryLimit, bufferLimit) }