From 75d3f86f3ba89b9b94a36c82f8540e7c8db8e6a0 Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 30 May 2022 13:58:57 +0800 Subject: [PATCH] unionstore: add checkpoint API to support TiDB savepoint feature (#490) Signed-off-by: crazycs520 --- internal/unionstore/memdb.go | 18 +++++++++++++++--- internal/unionstore/memdb_arena.go | 21 +++++++++++---------- internal/unionstore/memdb_snapshot.go | 6 +++--- tikv/unionstore_export.go | 3 +++ 4 files changed, 32 insertions(+), 16 deletions(-) diff --git a/internal/unionstore/memdb.go b/internal/unionstore/memdb.go index 194b8864..69fa6a58 100644 --- a/internal/unionstore/memdb.go +++ b/internal/unionstore/memdb.go @@ -85,14 +85,14 @@ type MemDB struct { vlogInvalid bool dirty bool - stages []memdbCheckpoint + stages []MemDBCheckpoint } func newMemDB() *MemDB { db := new(MemDB) db.allocator.init() db.root = nullAddr - db.stages = make([]memdbCheckpoint, 0, 2) + db.stages = make([]MemDBCheckpoint, 0, 2) db.entrySizeLimit = math.MaxUint64 db.bufferSizeLimit = math.MaxUint64 return db @@ -155,6 +155,18 @@ func (db *MemDB) Cleanup(h int) { db.stages = db.stages[:h-1] } +// Checkpoint returns a checkpoint of MemDB. +func (db *MemDB) Checkpoint() *MemDBCheckpoint { + cp := db.vlog.checkpoint() + return &cp +} + +// RevertToCheckpoint reverts the MemDB to the checkpoint. +func (db *MemDB) RevertToCheckpoint(cp *MemDBCheckpoint) { + db.vlog.revertToCheckpoint(db, cp) + db.vlog.truncate(cp) +} + // Reset resets the MemBuffer to initial states. func (db *MemDB) Reset() { db.root = nullAddr @@ -338,7 +350,7 @@ func (db *MemDB) set(key []byte, value []byte, ops ...kv.FlagsOp) error { } func (db *MemDB) setValue(x memdbNodeAddr, value []byte) { - var activeCp *memdbCheckpoint + var activeCp *MemDBCheckpoint if len(db.stages) > 0 { activeCp = &db.stages[len(db.stages)-1] } diff --git a/internal/unionstore/memdb_arena.go b/internal/unionstore/memdb_arena.go index db83e465..ef7dd906 100644 --- a/internal/unionstore/memdb_arena.go +++ b/internal/unionstore/memdb_arena.go @@ -167,18 +167,19 @@ func (a *memdbArenaBlock) reset() { a.length = 0 } -type memdbCheckpoint struct { +// MemDBCheckpoint is the checkpoint of memory DB. +type MemDBCheckpoint struct { blockSize int blocks int offsetInBlock int } -func (cp *memdbCheckpoint) isSamePosition(other *memdbCheckpoint) bool { +func (cp *MemDBCheckpoint) isSamePosition(other *MemDBCheckpoint) bool { return cp.blocks == other.blocks && cp.offsetInBlock == other.offsetInBlock } -func (a *memdbArena) checkpoint() memdbCheckpoint { - snap := memdbCheckpoint{ +func (a *memdbArena) checkpoint() MemDBCheckpoint { + snap := MemDBCheckpoint{ blockSize: a.blockSize, blocks: len(a.blocks), } @@ -188,7 +189,7 @@ func (a *memdbArena) checkpoint() memdbCheckpoint { return snap } -func (a *memdbArena) truncate(snap *memdbCheckpoint) { +func (a *memdbArena) truncate(snap *MemDBCheckpoint) { for i := snap.blocks; i < len(a.blocks); i++ { a.blocks[i] = memdbArenaBlock{} } @@ -310,7 +311,7 @@ func (l *memdbVlog) getValue(addr memdbArenaAddr) []byte { return block[valueOff:lenOff:lenOff] } -func (l *memdbVlog) getSnapshotValue(addr memdbArenaAddr, snap *memdbCheckpoint) ([]byte, bool) { +func (l *memdbVlog) getSnapshotValue(addr memdbArenaAddr, snap *MemDBCheckpoint) ([]byte, bool) { result := l.selectValueHistory(addr, func(addr memdbArenaAddr) bool { return !l.canModify(snap, addr) }) @@ -332,7 +333,7 @@ func (l *memdbVlog) selectValueHistory(addr memdbArenaAddr, predicate func(memdb return nullAddr } -func (l *memdbVlog) revertToCheckpoint(db *MemDB, cp *memdbCheckpoint) { +func (l *memdbVlog) revertToCheckpoint(db *MemDB, cp *MemDBCheckpoint) { cursor := l.checkpoint() for !cp.isSamePosition(&cursor) { hdrOff := cursor.offsetInBlock - memdbVlogHdrSize @@ -361,7 +362,7 @@ func (l *memdbVlog) revertToCheckpoint(db *MemDB, cp *memdbCheckpoint) { } } -func (l *memdbVlog) inspectKVInLog(db *MemDB, head, tail *memdbCheckpoint, f func([]byte, kv.KeyFlags, []byte)) { +func (l *memdbVlog) inspectKVInLog(db *MemDB, head, tail *MemDBCheckpoint, f func([]byte, kv.KeyFlags, []byte)) { cursor := *tail for !head.isSamePosition(&cursor) { cursorAddr := memdbArenaAddr{idx: uint32(cursor.blocks - 1), off: uint32(cursor.offsetInBlock)} @@ -381,7 +382,7 @@ func (l *memdbVlog) inspectKVInLog(db *MemDB, head, tail *memdbCheckpoint, f fun } } -func (l *memdbVlog) moveBackCursor(cursor *memdbCheckpoint, hdr *memdbVlogHdr) { +func (l *memdbVlog) moveBackCursor(cursor *MemDBCheckpoint, hdr *memdbVlogHdr) { cursor.offsetInBlock -= (memdbVlogHdrSize + int(hdr.valueLen)) if cursor.offsetInBlock == 0 { cursor.blocks-- @@ -391,7 +392,7 @@ func (l *memdbVlog) moveBackCursor(cursor *memdbCheckpoint, hdr *memdbVlogHdr) { } } -func (l *memdbVlog) canModify(cp *memdbCheckpoint, addr memdbArenaAddr) bool { +func (l *memdbVlog) canModify(cp *MemDBCheckpoint, addr memdbArenaAddr) bool { if cp == nil { return true } diff --git a/internal/unionstore/memdb_snapshot.go b/internal/unionstore/memdb_snapshot.go index 6d17bf61..2adedfbd 100644 --- a/internal/unionstore/memdb_snapshot.go +++ b/internal/unionstore/memdb_snapshot.go @@ -60,7 +60,7 @@ func (db *MemDB) SnapshotIter(start, end []byte) Iterator { return it } -func (db *MemDB) getSnapshot() memdbCheckpoint { +func (db *MemDB) getSnapshot() MemDBCheckpoint { if len(db.stages) > 0 { return db.stages[0] } @@ -69,7 +69,7 @@ func (db *MemDB) getSnapshot() memdbCheckpoint { type memdbSnapGetter struct { db *MemDB - cp memdbCheckpoint + cp MemDBCheckpoint } func (snap *memdbSnapGetter) Get(key []byte) ([]byte, error) { @@ -91,7 +91,7 @@ func (snap *memdbSnapGetter) Get(key []byte) ([]byte, error) { type memdbSnapIter struct { *MemdbIterator value []byte - cp memdbCheckpoint + cp MemDBCheckpoint } func (i *memdbSnapIter) Value() []byte { diff --git a/tikv/unionstore_export.go b/tikv/unionstore_export.go index 82ccfc1f..9e905a61 100644 --- a/tikv/unionstore_export.go +++ b/tikv/unionstore_export.go @@ -51,3 +51,6 @@ type Iterator = unionstore.Iterator // When discarding a newly added KV in `Cleanup`, the non-persistent flags will be cleared. // If there are persistent flags associated with key, we will keep this key in node without value. type MemDB = unionstore.MemDB + +// MemDBCheckpoint is the checkpoint of memory DB. +type MemDBCheckpoint = unionstore.MemDBCheckpoint