memdb: retain old version nodes of ART to satisfy snapshot read (#1503)

ref pingcap/tidb#57425

Signed-off-by: you06 <you1474600@gmail.com>
This commit is contained in:
you06 2024-11-20 11:44:59 +09:00 committed by GitHub
parent 3eb6e787c0
commit 05d115b3e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 207 additions and 18 deletions

View File

@ -15,6 +15,7 @@
package art
import (
"sync/atomic"
"unsafe"
"github.com/tikv/client-go/v2/internal/unionstore/arena"
@ -25,11 +26,23 @@ import (
// reusing blocks reduces the memory pieces.
type nodeArena struct {
arena.MemdbArena
// The ART node will expand to a higher capacity, and the address of the freed node will be stored in the free list for reuse.
// By reusing the freed node, memory usage and fragmentation can be reduced.
freeNode4 []arena.MemdbArenaAddr
freeNode16 []arena.MemdbArenaAddr
freeNode48 []arena.MemdbArenaAddr
// When there is ongoing snapshot iterator, ART should keep the old versions available,
// reuse the node can cause incorrect snapshot read result in this time.
// To avoid reused, freed nodes will be stored in unused slices before the snapshot iterator is closed.
// blockedSnapshotCnt is used to count the ongoing snapshot iterator.
blockedSnapshotCnt atomic.Int64
// isUnusedNodeFreeing protect the free unused nodes process from data race.
isUnusedNodeFreeing atomic.Bool
unusedNode4 []arena.MemdbArenaAddr
unusedNode16 []arena.MemdbArenaAddr
unusedNode48 []arena.MemdbArenaAddr
}
type artAllocator struct {
@ -62,7 +75,11 @@ func (f *artAllocator) allocNode4() (arena.MemdbArenaAddr, *node4) {
}
func (f *artAllocator) freeNode4(addr arena.MemdbArenaAddr) {
f.nodeAllocator.freeNode4 = append(f.nodeAllocator.freeNode4, addr)
if f.nodeAllocator.blockedSnapshotCnt.Load() == 0 {
f.nodeAllocator.freeNode4 = append(f.nodeAllocator.freeNode4, addr)
return
}
f.nodeAllocator.unusedNode4 = append(f.nodeAllocator.unusedNode4, addr)
}
func (f *artAllocator) getNode4(addr arena.MemdbArenaAddr) *node4 {
@ -88,7 +105,11 @@ func (f *artAllocator) allocNode16() (arena.MemdbArenaAddr, *node16) {
}
func (f *artAllocator) freeNode16(addr arena.MemdbArenaAddr) {
f.nodeAllocator.freeNode16 = append(f.nodeAllocator.freeNode16, addr)
if f.nodeAllocator.blockedSnapshotCnt.Load() == 0 {
f.nodeAllocator.freeNode16 = append(f.nodeAllocator.freeNode16, addr)
return
}
f.nodeAllocator.unusedNode16 = append(f.nodeAllocator.unusedNode16, addr)
}
func (f *artAllocator) getNode16(addr arena.MemdbArenaAddr) *node16 {
@ -114,7 +135,11 @@ func (f *artAllocator) allocNode48() (arena.MemdbArenaAddr, *node48) {
}
func (f *artAllocator) freeNode48(addr arena.MemdbArenaAddr) {
f.nodeAllocator.freeNode48 = append(f.nodeAllocator.freeNode48, addr)
if f.nodeAllocator.blockedSnapshotCnt.Load() == 0 {
f.nodeAllocator.freeNode48 = append(f.nodeAllocator.freeNode48, addr)
return
}
f.nodeAllocator.unusedNode48 = append(f.nodeAllocator.unusedNode48, addr)
}
func (f *artAllocator) getNode48(addr arena.MemdbArenaAddr) *node48 {
@ -156,3 +181,31 @@ func (f *artAllocator) getLeaf(addr arena.MemdbArenaAddr) *artLeaf {
data := f.nodeAllocator.GetData(addr)
return (*artLeaf)(unsafe.Pointer(&data[0]))
}
func (f *artAllocator) snapshotInc() {
f.nodeAllocator.blockedSnapshotCnt.Add(1)
}
// freeUnusedNodes will move the unused old version nodes into free list, allow it to be reused.
// This function is called when the snapshot iterator is closed, because read iterators can run concurrently.
func (f *artAllocator) snapshotDec() {
if f.nodeAllocator.blockedSnapshotCnt.Add(-1) != 0 {
return
}
if !f.nodeAllocator.isUnusedNodeFreeing.CompareAndSwap(false, true) {
return
}
if len(f.nodeAllocator.unusedNode4) > 0 {
f.nodeAllocator.freeNode4 = append(f.nodeAllocator.freeNode4, f.nodeAllocator.unusedNode4...)
f.nodeAllocator.unusedNode4 = f.nodeAllocator.unusedNode4[:0]
}
if len(f.nodeAllocator.unusedNode16) > 0 {
f.nodeAllocator.freeNode16 = append(f.nodeAllocator.freeNode16, f.nodeAllocator.unusedNode16...)
f.nodeAllocator.unusedNode16 = f.nodeAllocator.unusedNode16[:0]
}
if len(f.nodeAllocator.unusedNode48) > 0 {
f.nodeAllocator.freeNode48 = append(f.nodeAllocator.freeNode48, f.nodeAllocator.unusedNode48...)
f.nodeAllocator.unusedNode48 = f.nodeAllocator.unusedNode48[:0]
}
f.nodeAllocator.isUnusedNodeFreeing.Store(false)
}

View File

@ -36,9 +36,16 @@ func (t *ART) SnapshotGetter() *SnapGetter {
}
}
// SnapshotIter returns an Iterator for a snapshot of MemBuffer.
func (t *ART) SnapshotIter(start, end []byte) *SnapIter {
inner, err := t.Iter(start, end)
func (t *ART) newSnapshotIterator(start, end []byte, desc bool) *SnapIter {
var (
inner *Iterator
err error
)
if desc {
inner, err = t.IterReverse(start, end)
} else {
inner, err = t.Iter(start, end)
}
if err != nil {
panic(err)
}
@ -46,26 +53,21 @@ func (t *ART) SnapshotIter(start, end []byte) *SnapIter {
Iterator: inner,
cp: t.getSnapshot(),
}
it.tree.allocator.snapshotInc()
for !it.setValue() && it.Valid() {
_ = it.Next()
}
return it
}
// SnapshotIter returns an Iterator for a snapshot of MemBuffer.
func (t *ART) SnapshotIter(start, end []byte) *SnapIter {
return t.newSnapshotIterator(start, end, false)
}
// SnapshotIterReverse returns a reverse Iterator for a snapshot of MemBuffer.
func (t *ART) SnapshotIterReverse(k, lowerBound []byte) *SnapIter {
inner, err := t.IterReverse(k, lowerBound)
if err != nil {
panic(err)
}
it := &SnapIter{
Iterator: inner,
cp: t.getSnapshot(),
}
for !it.setValue() && it.valid {
_ = it.Next()
}
return it
return t.newSnapshotIterator(k, lowerBound, true)
}
type SnapGetter struct {
@ -112,6 +114,13 @@ func (i *SnapIter) Next() error {
return nil
}
// Close releases the resources of the iterator and related version.
// Make sure to call `Close` after the iterator is not used.
func (i *SnapIter) Close() {
i.Iterator.Close()
i.tree.allocator.snapshotDec()
}
func (i *SnapIter) setValue() bool {
if !i.Valid() {
return false

View File

@ -0,0 +1,90 @@
// Copyright 2024 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package art
import (
"sync"
"testing"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/internal/unionstore/arena"
)
func TestSnapshotIteratorPreventFreeNode(t *testing.T) {
check := func(num int) {
tree := New()
for i := 0; i < num; i++ {
tree.Set([]byte{0, byte(i)}, []byte{0, byte(i)})
}
var unusedNodeSlice *[]arena.MemdbArenaAddr
switch num {
case 4:
unusedNodeSlice = &tree.allocator.nodeAllocator.unusedNode4
case 16:
unusedNodeSlice = &tree.allocator.nodeAllocator.unusedNode16
case 48:
unusedNodeSlice = &tree.allocator.nodeAllocator.unusedNode48
default:
panic("unsupported num")
}
it := tree.SnapshotIter(nil, nil)
require.Equal(t, 0, len(*unusedNodeSlice))
tree.Set([]byte{0, byte(num)}, []byte{0, byte(num)})
require.Equal(t, 1, len(*unusedNodeSlice))
it.Close()
require.Equal(t, 0, len(*unusedNodeSlice))
}
check(4)
check(16)
check(48)
}
func TestConcurrentSnapshotIterNoRace(t *testing.T) {
check := func(num int) {
tree := New()
for i := 0; i < num; i++ {
tree.Set([]byte{0, byte(i)}, []byte{0, byte(i)})
}
const concurrency = 100
it := tree.SnapshotIter(nil, nil)
tree.Set([]byte{0, byte(num)}, []byte{0, byte(num)})
var wg sync.WaitGroup
wg.Add(concurrency)
go func() {
it.Close()
wg.Done()
}()
for i := 1; i < concurrency; i++ {
go func(it *SnapIter) {
concurrentIt := tree.SnapshotIter(nil, nil)
concurrentIt.Close()
wg.Done()
}(it)
}
wg.Wait()
require.Empty(t, tree.allocator.nodeAllocator.unusedNode4)
require.Empty(t, tree.allocator.nodeAllocator.unusedNode16)
require.Empty(t, tree.allocator.nodeAllocator.unusedNode48)
}
check(4)
check(16)
check(48)
}

View File

@ -1327,3 +1327,40 @@ func TestSelectValueHistory(t *testing.T) {
check(t, newRbtDBWithContext())
check(t, newArtDBWithContext())
}
func TestSnapshotReaderWithWrite(t *testing.T) {
check := func(db MemBuffer, num int) {
for i := 0; i < num; i++ {
db.Set([]byte{0, byte(i)}, []byte{0, byte(i)})
}
h := db.Staging()
defer db.Release(h)
iter := db.SnapshotIter([]byte{0, 0}, []byte{0, 255})
assert.Equal(t, iter.Key(), []byte{0, 0})
db.Set([]byte{0, byte(num)}, []byte{0, byte(num)}) // ART: node4/node16/node48 is freed and wait to be reused.
// ART: reuse the node4/node16/node48
for i := 0; i < num; i++ {
db.Set([]byte{1, byte(i)}, []byte{1, byte(i)})
}
for i := 0; i < num; i++ {
assert.True(t, iter.Valid())
assert.Equal(t, iter.Key(), []byte{0, byte(i)})
assert.Nil(t, iter.Next())
}
assert.False(t, iter.Valid())
iter.Close()
}
check(newRbtDBWithContext(), 4)
check(newArtDBWithContext(), 4)
check(newRbtDBWithContext(), 16)
check(newArtDBWithContext(), 16)
check(newRbtDBWithContext(), 48)
check(newArtDBWithContext(), 48)
}