From 58f3322fc39a38ba7db0f49c4cd22e5a70c0b2f8 Mon Sep 17 00:00:00 2001 From: you06 Date: Wed, 25 Sep 2024 15:03:02 +0800 Subject: [PATCH] membuffer: implement snapshot get and iterator for ART (#1467) ref pingcap/tidb#55287 Signed-off-by: you06 --- internal/unionstore/art/art.go | 51 +++++++++-- internal/unionstore/art/art_iterator.go | 14 ++- internal/unionstore/art/art_snapshot.go | 115 ++++++++++++++++++++---- internal/unionstore/memdb.go | 6 +- internal/unionstore/memdb_art.go | 4 +- internal/unionstore/memdb_rbt.go | 3 +- internal/unionstore/memdb_test.go | 33 ++++--- 7 files changed, 178 insertions(+), 48 deletions(-) diff --git a/internal/unionstore/art/art.go b/internal/unionstore/art/art.go index d54a2dc9..7410003b 100644 --- a/internal/unionstore/art/art.go +++ b/internal/unionstore/art/art.go @@ -423,6 +423,11 @@ func (t *ART) InspectNode(addr arena.MemdbArenaAddr) (*artLeaf, arena.MemdbArena return lf, lf.vAddr } +// IsStaging returns whether the MemBuffer is in staging status. +func (t *ART) IsStaging() bool { + return len(t.stages) > 0 +} + // Checkpoint returns a checkpoint of ART. func (t *ART) Checkpoint() *arena.MemDBCheckpoint { cp := t.allocator.vlogAllocator.Checkpoint() @@ -471,7 +476,7 @@ func (t *ART) Cleanup(h int) { return } if h < len(t.stages) { - panic(fmt.Sprintf("cannot cleanup staging buffer, h=%v, len(db.stages)=%v", h, len(t.stages))) + panic(fmt.Sprintf("cannot cleanup staging buffer, h=%v, len(tree.stages)=%v", h, len(t.stages))) } cp := &t.stages[h-1] @@ -501,7 +506,8 @@ func (t *ART) Reset() { // DiscardValues releases the memory used by all values. // NOTE: any operation need value will panic after this function. func (t *ART) DiscardValues() { - panic("unimplemented") + t.vlogInvalid = true + t.allocator.vlogAllocator.Reset() } // InspectStage used to inspect the value updates in the given stage. @@ -514,16 +520,35 @@ func (t *ART) InspectStage(handle int, f func([]byte, kv.KeyFlags, []byte)) { // SelectValueHistory select the latest value which makes `predicate` returns true from the modification history. func (t *ART) SelectValueHistory(key []byte, predicate func(value []byte) bool) ([]byte, error) { - panic("unimplemented") + _, x := t.search(key) + if x == nil { + return nil, tikverr.ErrNotExist + } + if x.vAddr.IsNull() { + // A flags only key, act as value not exists + return nil, tikverr.ErrNotExist + } + result := t.allocator.vlogAllocator.SelectValueHistory(x.vAddr, func(addr arena.MemdbArenaAddr) bool { + return predicate(t.allocator.vlogAllocator.GetValue(addr)) + }) + if result.IsNull() { + return nil, nil + } + return t.allocator.vlogAllocator.GetValue(result), nil + } -func (t *ART) SetMemoryFootprintChangeHook(fn func(uint64)) { - panic("unimplemented") +func (t *ART) SetMemoryFootprintChangeHook(hook func(uint64)) { + innerHook := func() { + hook(t.allocator.nodeAllocator.Capacity() + t.allocator.vlogAllocator.Capacity()) + } + t.allocator.nodeAllocator.SetMemChangeHook(innerHook) + t.allocator.vlogAllocator.SetMemChangeHook(innerHook) } // MemHookSet implements the MemBuffer interface. func (t *ART) MemHookSet() bool { - panic("unimplemented") + return t.allocator.nodeAllocator.MemHookSet() } // GetKeyByHandle returns key by handle. @@ -544,10 +569,24 @@ func (t *ART) GetValueByHandle(handle arena.MemKeyHandle) ([]byte, bool) { return t.allocator.vlogAllocator.GetValue(lf.vAddr), true } +// GetEntrySizeLimit gets the size limit for each entry and total buffer. +func (t *ART) GetEntrySizeLimit() (uint64, uint64) { + return t.entrySizeLimit, t.bufferSizeLimit +} + func (t *ART) SetEntrySizeLimit(entryLimit, bufferLimit uint64) { t.entrySizeLimit, t.bufferSizeLimit = entryLimit, bufferLimit } +// RemoveFromBuffer is a test function, not support yet. func (t *ART) RemoveFromBuffer(key []byte) { panic("unimplemented") } + +func (t *ART) GetCacheHitCount() uint64 { + return 0 +} + +func (t *ART) GetCacheMissCount() uint64 { + return 0 +} diff --git a/internal/unionstore/art/art_iterator.go b/internal/unionstore/art/art_iterator.go index 3c844753..ad0c8521 100644 --- a/internal/unionstore/art/art_iterator.go +++ b/internal/unionstore/art/art_iterator.go @@ -50,7 +50,10 @@ func (t *ART) iter(lowerBound, upperBound []byte, reverse, includeFlags bool) (* inner: &baseIter{ allocator: &t.allocator, }, - currAddr: arena.BadAddr, // the default value of currAddr is not equal to any valid address + // the default value of currAddr is not equal to any valid address + // arena.BadAddr's idx is maxuint32 - 1, which is impossible in common cases, + // this avoids the initial value of currAddr equals to endAddr. + currAddr: arena.BadAddr, endAddr: arena.NullAddr, } it.init(lowerBound, upperBound) @@ -84,6 +87,15 @@ func (it *Iterator) Value() []byte { return it.tree.allocator.vlogAllocator.GetValue(it.currLeaf.vAddr) } +// HasValue returns false if it is flags only. +func (it *Iterator) HasValue() bool { + return !it.isFlagsOnly() +} + +func (it *Iterator) isFlagsOnly() bool { + return it.currLeaf != nil && it.currLeaf.vAddr.IsNull() +} + func (it *Iterator) Next() error { if !it.valid { // iterate is finished diff --git a/internal/unionstore/art/art_snapshot.go b/internal/unionstore/art/art_snapshot.go index 8b714d9b..1aab70a4 100644 --- a/internal/unionstore/art/art_snapshot.go +++ b/internal/unionstore/art/art_snapshot.go @@ -14,30 +14,111 @@ package art -import "context" +import ( + "context" -func (*ART) SnapshotGetter() *SnapshotGetter { - panic("unimplemented") + tikverr "github.com/tikv/client-go/v2/error" + "github.com/tikv/client-go/v2/internal/unionstore/arena" +) + +func (t *ART) getSnapshot() arena.MemDBCheckpoint { + if len(t.stages) > 0 { + return t.stages[0] + } + return t.checkpoint() } -func (*ART) SnapshotIter([]byte, []byte) *SnapshotIter { - panic("unimplemented") +// SnapshotGetter returns a Getter for a snapshot of MemBuffer. +func (t *ART) SnapshotGetter() *SnapGetter { + return &SnapGetter{ + tree: t, + cp: t.getSnapshot(), + } } -func (*ART) SnapshotIterReverse([]byte, []byte) *SnapshotIter { - panic("unimplemented") +// SnapshotIter returns an Iterator for a snapshot of MemBuffer. +func (t *ART) SnapshotIter(start, end []byte) *SnapIter { + inner, err := t.Iter(start, end) + if err != nil { + panic(err) + } + it := &SnapIter{ + Iterator: inner, + cp: t.getSnapshot(), + } + for !it.setValue() && it.Valid() { + _ = it.Next() + } + return it } -type SnapshotGetter struct{} - -func (s *SnapshotGetter) Get(context.Context, []byte) ([]byte, error) { - panic("unimplemented") +// SnapshotIterReverse returns a reverse Iterator for a snapshot of MemBuffer. +func (t *ART) SnapshotIterReverse(k, lowerBound []byte) *SnapIter { + inner, err := t.IterReverse(k, lowerBound) + if err != nil { + panic(err) + } + it := &SnapIter{ + Iterator: inner, + cp: t.getSnapshot(), + } + for !it.setValue() && it.valid { + _ = it.Next() + } + return it } -type SnapshotIter struct{} +type SnapGetter struct { + tree *ART + cp arena.MemDBCheckpoint +} -func (i *SnapshotIter) Valid() bool { panic("unimplemented") } -func (i *SnapshotIter) Key() []byte { panic("unimplemented") } -func (i *SnapshotIter) Value() []byte { panic("unimplemented") } -func (i *SnapshotIter) Next() error { panic("unimplemented") } -func (i *SnapshotIter) Close() { panic("unimplemented") } +func (snap *SnapGetter) Get(ctx context.Context, key []byte) ([]byte, error) { + addr, lf := snap.tree.search(key) + if addr.IsNull() { + return nil, tikverr.ErrNotExist + } + if lf.vAddr.IsNull() { + // A flags only key, act as value not exists + return nil, tikverr.ErrNotExist + } + v, ok := snap.tree.allocator.vlogAllocator.GetSnapshotValue(lf.vAddr, &snap.cp) + if !ok { + return nil, tikverr.ErrNotExist + } + return v, nil +} + +type SnapIter struct { + *Iterator + value []byte + cp arena.MemDBCheckpoint +} + +func (i *SnapIter) Value() []byte { + return i.value +} + +func (i *SnapIter) Next() error { + i.value = nil + for i.Valid() { + if err := i.Iterator.Next(); err != nil { + return err + } + if i.setValue() { + return nil + } + } + return nil +} + +func (i *SnapIter) setValue() bool { + if !i.Valid() { + return false + } + if v, ok := i.tree.allocator.vlogAllocator.GetSnapshotValue(i.currLeaf.vAddr, &i.cp); ok { + i.value = v + return true + } + return false +} diff --git a/internal/unionstore/memdb.go b/internal/unionstore/memdb.go index 8284c202..57b7407d 100644 --- a/internal/unionstore/memdb.go +++ b/internal/unionstore/memdb.go @@ -49,7 +49,7 @@ type MemDBCheckpoint = arena.MemDBCheckpoint type MemKeyHandle = arena.MemKeyHandle -type MemDB = rbtDBWithContext +type MemDB = artDBWithContext -var NewMemDB = newRbtDBWithContext -var NewMemDBWithContext = newRbtDBWithContext +var NewMemDB = newArtDBWithContext +var NewMemDBWithContext = newArtDBWithContext diff --git a/internal/unionstore/memdb_art.go b/internal/unionstore/memdb_art.go index 07ace8f1..c7c1b21d 100644 --- a/internal/unionstore/memdb_art.go +++ b/internal/unionstore/memdb_art.go @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -//nolint:unused package unionstore import ( @@ -36,7 +35,6 @@ type artDBWithContext struct { skipMutex bool } -//nolint:unused func newArtDBWithContext() *artDBWithContext { return &artDBWithContext{ART: art.New()} } @@ -118,7 +116,7 @@ func (db *artDBWithContext) FlushWait() error { return nil } // GetMemDB implements the MemBuffer interface. func (db *artDBWithContext) GetMemDB() *MemDB { - panic("unimplemented") + return db } // BatchGet returns the values for given keys from the MemBuffer. diff --git a/internal/unionstore/memdb_rbt.go b/internal/unionstore/memdb_rbt.go index d4f6d6f6..c805f499 100644 --- a/internal/unionstore/memdb_rbt.go +++ b/internal/unionstore/memdb_rbt.go @@ -42,6 +42,7 @@ func newRbtDBWithContext() *rbtDBWithContext { } } +//nolint:unused func (db *rbtDBWithContext) setSkipMutex(skip bool) { db.skipMutex = skip } @@ -125,7 +126,7 @@ func (db *rbtDBWithContext) FlushWait() error { return nil } // GetMemDB implements the MemBuffer interface. func (db *rbtDBWithContext) GetMemDB() *MemDB { - return db + return nil } // BatchGet returns the values for given keys from the MemBuffer. diff --git a/internal/unionstore/memdb_test.go b/internal/unionstore/memdb_test.go index ac6f6d6a..3f6c8c2b 100644 --- a/internal/unionstore/memdb_test.go +++ b/internal/unionstore/memdb_test.go @@ -995,6 +995,7 @@ func testUnsetTemporaryFlag(t *testing.T, buffer MemBuffer) { func TestSnapshotGetIter(t *testing.T) { testSnapshotGetIter(t, newRbtDBWithContext()) + testSnapshotGetIter(t, newArtDBWithContext()) } func testSnapshotGetIter(t *testing.T, db MemBuffer) { @@ -1089,21 +1090,19 @@ func testIterNoResult(t *testing.T, buffer MemBuffer) { assert := assert.New(t) assert.Nil(buffer.Set([]byte{1, 1}, []byte{1, 1})) - // Test lower bound and upper bound seek same position - iter, err := buffer.Iter([]byte{1, 0, 0}, []byte{1, 0, 1}) - assert.Nil(err) - assert.False(iter.Valid()) - iter, err = buffer.IterReverse([]byte{1, 0, 1}, []byte{1, 0, 0}) - assert.Nil(err) - assert.False(iter.Valid()) - // Test lower bound >= upper bound - iter, err = buffer.Iter([]byte{1, 0, 1}, []byte{1, 0, 0}) - assert.Nil(err) - assert.False(iter.Valid()) - iter, err = buffer.IterReverse([]byte{1, 0, 0}, []byte{1, 0, 1}) - assert.Nil(err) - assert.False(iter.Valid()) - iter, err = buffer.Iter([]byte{1, 1}, []byte{1, 1}) - assert.Nil(err) - assert.False(iter.Valid()) + + checkFn := func(lowerBound, upperBound []byte) { + iter, err := buffer.Iter(lowerBound, upperBound) + assert.Nil(err) + assert.False(iter.Valid()) + iter, err = buffer.IterReverse(upperBound, lowerBound) + assert.Nil(err) + assert.False(iter.Valid()) + } + + // Test lower bound and upper bound seek to the same position + checkFn([]byte{1, 1}, []byte{1, 1}) + checkFn([]byte{1, 0, 0}, []byte{1, 0, 1}) + // Test lower bound > upper bound + checkFn([]byte{1, 0, 1}, []byte{1, 0, 0}) }