From 05d115b3e88bef1af68b57addd5d1b1799e947da Mon Sep 17 00:00:00 2001 From: you06 Date: Wed, 20 Nov 2024 11:44:59 +0900 Subject: [PATCH] memdb: retain old version nodes of ART to satisfy snapshot read (#1503) ref pingcap/tidb#57425 Signed-off-by: you06 --- internal/unionstore/art/art_arena.go | 59 ++++++++++++- internal/unionstore/art/art_snapshot.go | 39 +++++---- internal/unionstore/art/art_snapshot_test.go | 90 ++++++++++++++++++++ internal/unionstore/memdb_test.go | 37 ++++++++ 4 files changed, 207 insertions(+), 18 deletions(-) create mode 100644 internal/unionstore/art/art_snapshot_test.go diff --git a/internal/unionstore/art/art_arena.go b/internal/unionstore/art/art_arena.go index f196bf0c..d977d91a 100644 --- a/internal/unionstore/art/art_arena.go +++ b/internal/unionstore/art/art_arena.go @@ -15,6 +15,7 @@ package art import ( + "sync/atomic" "unsafe" "github.com/tikv/client-go/v2/internal/unionstore/arena" @@ -25,11 +26,23 @@ import ( // reusing blocks reduces the memory pieces. type nodeArena struct { arena.MemdbArena + // The ART node will expand to a higher capacity, and the address of the freed node will be stored in the free list for reuse. // By reusing the freed node, memory usage and fragmentation can be reduced. freeNode4 []arena.MemdbArenaAddr freeNode16 []arena.MemdbArenaAddr freeNode48 []arena.MemdbArenaAddr + + // When there is ongoing snapshot iterator, ART should keep the old versions available, + // reuse the node can cause incorrect snapshot read result in this time. + // To avoid reused, freed nodes will be stored in unused slices before the snapshot iterator is closed. + // blockedSnapshotCnt is used to count the ongoing snapshot iterator. + blockedSnapshotCnt atomic.Int64 + // isUnusedNodeFreeing protect the free unused nodes process from data race. + isUnusedNodeFreeing atomic.Bool + unusedNode4 []arena.MemdbArenaAddr + unusedNode16 []arena.MemdbArenaAddr + unusedNode48 []arena.MemdbArenaAddr } type artAllocator struct { @@ -62,7 +75,11 @@ func (f *artAllocator) allocNode4() (arena.MemdbArenaAddr, *node4) { } func (f *artAllocator) freeNode4(addr arena.MemdbArenaAddr) { - f.nodeAllocator.freeNode4 = append(f.nodeAllocator.freeNode4, addr) + if f.nodeAllocator.blockedSnapshotCnt.Load() == 0 { + f.nodeAllocator.freeNode4 = append(f.nodeAllocator.freeNode4, addr) + return + } + f.nodeAllocator.unusedNode4 = append(f.nodeAllocator.unusedNode4, addr) } func (f *artAllocator) getNode4(addr arena.MemdbArenaAddr) *node4 { @@ -88,7 +105,11 @@ func (f *artAllocator) allocNode16() (arena.MemdbArenaAddr, *node16) { } func (f *artAllocator) freeNode16(addr arena.MemdbArenaAddr) { - f.nodeAllocator.freeNode16 = append(f.nodeAllocator.freeNode16, addr) + if f.nodeAllocator.blockedSnapshotCnt.Load() == 0 { + f.nodeAllocator.freeNode16 = append(f.nodeAllocator.freeNode16, addr) + return + } + f.nodeAllocator.unusedNode16 = append(f.nodeAllocator.unusedNode16, addr) } func (f *artAllocator) getNode16(addr arena.MemdbArenaAddr) *node16 { @@ -114,7 +135,11 @@ func (f *artAllocator) allocNode48() (arena.MemdbArenaAddr, *node48) { } func (f *artAllocator) freeNode48(addr arena.MemdbArenaAddr) { - f.nodeAllocator.freeNode48 = append(f.nodeAllocator.freeNode48, addr) + if f.nodeAllocator.blockedSnapshotCnt.Load() == 0 { + f.nodeAllocator.freeNode48 = append(f.nodeAllocator.freeNode48, addr) + return + } + f.nodeAllocator.unusedNode48 = append(f.nodeAllocator.unusedNode48, addr) } func (f *artAllocator) getNode48(addr arena.MemdbArenaAddr) *node48 { @@ -156,3 +181,31 @@ func (f *artAllocator) getLeaf(addr arena.MemdbArenaAddr) *artLeaf { data := f.nodeAllocator.GetData(addr) return (*artLeaf)(unsafe.Pointer(&data[0])) } + +func (f *artAllocator) snapshotInc() { + f.nodeAllocator.blockedSnapshotCnt.Add(1) +} + +// freeUnusedNodes will move the unused old version nodes into free list, allow it to be reused. +// This function is called when the snapshot iterator is closed, because read iterators can run concurrently. +func (f *artAllocator) snapshotDec() { + if f.nodeAllocator.blockedSnapshotCnt.Add(-1) != 0 { + return + } + if !f.nodeAllocator.isUnusedNodeFreeing.CompareAndSwap(false, true) { + return + } + if len(f.nodeAllocator.unusedNode4) > 0 { + f.nodeAllocator.freeNode4 = append(f.nodeAllocator.freeNode4, f.nodeAllocator.unusedNode4...) + f.nodeAllocator.unusedNode4 = f.nodeAllocator.unusedNode4[:0] + } + if len(f.nodeAllocator.unusedNode16) > 0 { + f.nodeAllocator.freeNode16 = append(f.nodeAllocator.freeNode16, f.nodeAllocator.unusedNode16...) + f.nodeAllocator.unusedNode16 = f.nodeAllocator.unusedNode16[:0] + } + if len(f.nodeAllocator.unusedNode48) > 0 { + f.nodeAllocator.freeNode48 = append(f.nodeAllocator.freeNode48, f.nodeAllocator.unusedNode48...) + f.nodeAllocator.unusedNode48 = f.nodeAllocator.unusedNode48[:0] + } + f.nodeAllocator.isUnusedNodeFreeing.Store(false) +} diff --git a/internal/unionstore/art/art_snapshot.go b/internal/unionstore/art/art_snapshot.go index 6f470dda..39e9dbae 100644 --- a/internal/unionstore/art/art_snapshot.go +++ b/internal/unionstore/art/art_snapshot.go @@ -36,9 +36,16 @@ func (t *ART) SnapshotGetter() *SnapGetter { } } -// SnapshotIter returns an Iterator for a snapshot of MemBuffer. -func (t *ART) SnapshotIter(start, end []byte) *SnapIter { - inner, err := t.Iter(start, end) +func (t *ART) newSnapshotIterator(start, end []byte, desc bool) *SnapIter { + var ( + inner *Iterator + err error + ) + if desc { + inner, err = t.IterReverse(start, end) + } else { + inner, err = t.Iter(start, end) + } if err != nil { panic(err) } @@ -46,26 +53,21 @@ func (t *ART) SnapshotIter(start, end []byte) *SnapIter { Iterator: inner, cp: t.getSnapshot(), } + it.tree.allocator.snapshotInc() for !it.setValue() && it.Valid() { _ = it.Next() } return it } +// SnapshotIter returns an Iterator for a snapshot of MemBuffer. +func (t *ART) SnapshotIter(start, end []byte) *SnapIter { + return t.newSnapshotIterator(start, end, false) +} + // SnapshotIterReverse returns a reverse Iterator for a snapshot of MemBuffer. func (t *ART) SnapshotIterReverse(k, lowerBound []byte) *SnapIter { - inner, err := t.IterReverse(k, lowerBound) - if err != nil { - panic(err) - } - it := &SnapIter{ - Iterator: inner, - cp: t.getSnapshot(), - } - for !it.setValue() && it.valid { - _ = it.Next() - } - return it + return t.newSnapshotIterator(k, lowerBound, true) } type SnapGetter struct { @@ -112,6 +114,13 @@ func (i *SnapIter) Next() error { return nil } +// Close releases the resources of the iterator and related version. +// Make sure to call `Close` after the iterator is not used. +func (i *SnapIter) Close() { + i.Iterator.Close() + i.tree.allocator.snapshotDec() +} + func (i *SnapIter) setValue() bool { if !i.Valid() { return false diff --git a/internal/unionstore/art/art_snapshot_test.go b/internal/unionstore/art/art_snapshot_test.go new file mode 100644 index 00000000..85a87106 --- /dev/null +++ b/internal/unionstore/art/art_snapshot_test.go @@ -0,0 +1,90 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package art + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/internal/unionstore/arena" +) + +func TestSnapshotIteratorPreventFreeNode(t *testing.T) { + check := func(num int) { + tree := New() + for i := 0; i < num; i++ { + tree.Set([]byte{0, byte(i)}, []byte{0, byte(i)}) + } + var unusedNodeSlice *[]arena.MemdbArenaAddr + switch num { + case 4: + unusedNodeSlice = &tree.allocator.nodeAllocator.unusedNode4 + case 16: + unusedNodeSlice = &tree.allocator.nodeAllocator.unusedNode16 + case 48: + unusedNodeSlice = &tree.allocator.nodeAllocator.unusedNode48 + default: + panic("unsupported num") + } + it := tree.SnapshotIter(nil, nil) + require.Equal(t, 0, len(*unusedNodeSlice)) + tree.Set([]byte{0, byte(num)}, []byte{0, byte(num)}) + require.Equal(t, 1, len(*unusedNodeSlice)) + it.Close() + require.Equal(t, 0, len(*unusedNodeSlice)) + } + + check(4) + check(16) + check(48) +} + +func TestConcurrentSnapshotIterNoRace(t *testing.T) { + check := func(num int) { + tree := New() + for i := 0; i < num; i++ { + tree.Set([]byte{0, byte(i)}, []byte{0, byte(i)}) + } + + const concurrency = 100 + it := tree.SnapshotIter(nil, nil) + + tree.Set([]byte{0, byte(num)}, []byte{0, byte(num)}) + + var wg sync.WaitGroup + wg.Add(concurrency) + go func() { + it.Close() + wg.Done() + }() + for i := 1; i < concurrency; i++ { + go func(it *SnapIter) { + concurrentIt := tree.SnapshotIter(nil, nil) + concurrentIt.Close() + wg.Done() + }(it) + } + wg.Wait() + + require.Empty(t, tree.allocator.nodeAllocator.unusedNode4) + require.Empty(t, tree.allocator.nodeAllocator.unusedNode16) + require.Empty(t, tree.allocator.nodeAllocator.unusedNode48) + } + + check(4) + check(16) + check(48) +} diff --git a/internal/unionstore/memdb_test.go b/internal/unionstore/memdb_test.go index 4f1def3e..4721f837 100644 --- a/internal/unionstore/memdb_test.go +++ b/internal/unionstore/memdb_test.go @@ -1327,3 +1327,40 @@ func TestSelectValueHistory(t *testing.T) { check(t, newRbtDBWithContext()) check(t, newArtDBWithContext()) } + +func TestSnapshotReaderWithWrite(t *testing.T) { + check := func(db MemBuffer, num int) { + for i := 0; i < num; i++ { + db.Set([]byte{0, byte(i)}, []byte{0, byte(i)}) + } + h := db.Staging() + defer db.Release(h) + + iter := db.SnapshotIter([]byte{0, 0}, []byte{0, 255}) + assert.Equal(t, iter.Key(), []byte{0, 0}) + + db.Set([]byte{0, byte(num)}, []byte{0, byte(num)}) // ART: node4/node16/node48 is freed and wait to be reused. + + // ART: reuse the node4/node16/node48 + for i := 0; i < num; i++ { + db.Set([]byte{1, byte(i)}, []byte{1, byte(i)}) + } + + for i := 0; i < num; i++ { + assert.True(t, iter.Valid()) + assert.Equal(t, iter.Key(), []byte{0, byte(i)}) + assert.Nil(t, iter.Next()) + } + assert.False(t, iter.Valid()) + iter.Close() + } + + check(newRbtDBWithContext(), 4) + check(newArtDBWithContext(), 4) + + check(newRbtDBWithContext(), 16) + check(newArtDBWithContext(), 16) + + check(newRbtDBWithContext(), 48) + check(newArtDBWithContext(), 48) +}