txn: implement staging for PipelinedMemDB (#1230)

* feat: implement staging for PipelinedMemDB

Signed-off-by: ekexium <eke@fastmail.com>

* comment: explain staging methods

Signed-off-by: ekexium <eke@fastmail.com>

---------

Signed-off-by: ekexium <eke@fastmail.com>
This commit is contained in:
ekexium 2024-03-18 14:25:05 +08:00 committed by GitHub
parent 56418bf28f
commit 73c0712c01
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 20 additions and 6 deletions

View File

@ -278,8 +278,14 @@ func (p *PipelinedMemDB) Flush(force bool) (bool, error) {
if p.flushFunc == nil {
return false, errors.New("flushFunc is not provided")
}
// invalidate the batch get cache whether the flush is really triggered.
p.batchGetCache = nil
if len(p.memDB.stages) > 0 {
return false, errors.New("there are stages unreleased when Flush is called")
}
if !force && !p.needFlush() {
return false, nil
}
@ -446,19 +452,27 @@ func (p *PipelinedMemDB) SnapshotGetter() Getter {
panic("SnapshotGetter is not supported for PipelinedMemDB")
}
// Staging is not supported for PipelinedMemDB, it returns 0 handle.
// NOTE about the Staging()/Cleanup()/Release() methods:
// Its correctness is guaranteed by that no stage can exist when a Flush() is called.
// We guarantee in TiDB side that every stage lives inside a flush batch,
// which means the modifications all goes to the mutable memdb.
// Then the staging of the whole PipelinedMemDB can be directly implemented by its mutable memdb.
//
// Checkpoint()/RevertToCheckpoint() is not supported for PipelinedMemDB.
// Staging implements MemBuffer interface.
func (p *PipelinedMemDB) Staging() int {
panic("Staging is not supported for PipelinedMemDB")
return p.memDB.Staging()
}
// Cleanup implements MemBuffer interface.
func (p *PipelinedMemDB) Cleanup(int) {
panic("Cleanup is not supported for PipelinedMemDB")
func (p *PipelinedMemDB) Cleanup(h int) {
p.memDB.Cleanup(h)
}
// Release implements MemBuffer interface.
func (p *PipelinedMemDB) Release(int) {
panic("Release is not supported for PipelinedMemDB")
func (p *PipelinedMemDB) Release(h int) {
p.memDB.Release(h)
}
// Checkpoint implements MemBuffer interface.