Skip to content

Commit

Permalink
memdb: retain old version nodes of ART to satisfy snapshot read (tikv…
Browse files Browse the repository at this point in the history
…#1503)

ref pingcap/tidb#57425

Signed-off-by: you06 <you1474600@gmail.com>
  • Loading branch information
you06 authored and rleungx committed Nov 20, 2024
1 parent 3eb6e78 commit ef3badd
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 18 deletions.
59 changes: 56 additions & 3 deletions internal/unionstore/art/art_arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package art

import (
"sync/atomic"
"unsafe"

"github.com/tikv/client-go/v2/internal/unionstore/arena"
Expand All @@ -25,11 +26,23 @@ import (
// reusing blocks reduces the memory pieces.
type nodeArena struct {
arena.MemdbArena

// The ART node will expand to a higher capacity, and the address of the freed node will be stored in the free list for reuse.
// By reusing the freed node, memory usage and fragmentation can be reduced.
freeNode4 []arena.MemdbArenaAddr
freeNode16 []arena.MemdbArenaAddr
freeNode48 []arena.MemdbArenaAddr

// When there is ongoing snapshot iterator, ART should keep the old versions available,
// reuse the node can cause incorrect snapshot read result in this time.
// To avoid reused, freed nodes will be stored in unused slices before the snapshot iterator is closed.
// blockedSnapshotCnt is used to count the ongoing snapshot iterator.
blockedSnapshotCnt atomic.Int64
// isUnusedNodeFreeing protect the free unused nodes process from data race.
isUnusedNodeFreeing atomic.Bool
unusedNode4 []arena.MemdbArenaAddr
unusedNode16 []arena.MemdbArenaAddr
unusedNode48 []arena.MemdbArenaAddr
}

type artAllocator struct {
Expand Down Expand Up @@ -62,7 +75,11 @@ func (f *artAllocator) allocNode4() (arena.MemdbArenaAddr, *node4) {
}

func (f *artAllocator) freeNode4(addr arena.MemdbArenaAddr) {
f.nodeAllocator.freeNode4 = append(f.nodeAllocator.freeNode4, addr)
if f.nodeAllocator.blockedSnapshotCnt.Load() == 0 {
f.nodeAllocator.freeNode4 = append(f.nodeAllocator.freeNode4, addr)
return
}
f.nodeAllocator.unusedNode4 = append(f.nodeAllocator.unusedNode4, addr)
}

func (f *artAllocator) getNode4(addr arena.MemdbArenaAddr) *node4 {
Expand All @@ -88,7 +105,11 @@ func (f *artAllocator) allocNode16() (arena.MemdbArenaAddr, *node16) {
}

func (f *artAllocator) freeNode16(addr arena.MemdbArenaAddr) {
f.nodeAllocator.freeNode16 = append(f.nodeAllocator.freeNode16, addr)
if f.nodeAllocator.blockedSnapshotCnt.Load() == 0 {
f.nodeAllocator.freeNode16 = append(f.nodeAllocator.freeNode16, addr)
return
}
f.nodeAllocator.unusedNode16 = append(f.nodeAllocator.unusedNode16, addr)
}

func (f *artAllocator) getNode16(addr arena.MemdbArenaAddr) *node16 {
Expand All @@ -114,7 +135,11 @@ func (f *artAllocator) allocNode48() (arena.MemdbArenaAddr, *node48) {
}

func (f *artAllocator) freeNode48(addr arena.MemdbArenaAddr) {
f.nodeAllocator.freeNode48 = append(f.nodeAllocator.freeNode48, addr)
if f.nodeAllocator.blockedSnapshotCnt.Load() == 0 {
f.nodeAllocator.freeNode48 = append(f.nodeAllocator.freeNode48, addr)
return
}
f.nodeAllocator.unusedNode48 = append(f.nodeAllocator.unusedNode48, addr)
}

func (f *artAllocator) getNode48(addr arena.MemdbArenaAddr) *node48 {
Expand Down Expand Up @@ -156,3 +181,31 @@ func (f *artAllocator) getLeaf(addr arena.MemdbArenaAddr) *artLeaf {
data := f.nodeAllocator.GetData(addr)
return (*artLeaf)(unsafe.Pointer(&data[0]))
}

func (f *artAllocator) snapshotInc() {
f.nodeAllocator.blockedSnapshotCnt.Add(1)
}

// freeUnusedNodes will move the unused old version nodes into free list, allow it to be reused.
// This function is called when the snapshot iterator is closed, because read iterators can run concurrently.
func (f *artAllocator) snapshotDec() {
if f.nodeAllocator.blockedSnapshotCnt.Add(-1) != 0 {
return
}
if !f.nodeAllocator.isUnusedNodeFreeing.CompareAndSwap(false, true) {
return
}
if len(f.nodeAllocator.unusedNode4) > 0 {
f.nodeAllocator.freeNode4 = append(f.nodeAllocator.freeNode4, f.nodeAllocator.unusedNode4...)
f.nodeAllocator.unusedNode4 = f.nodeAllocator.unusedNode4[:0]
}
if len(f.nodeAllocator.unusedNode16) > 0 {
f.nodeAllocator.freeNode16 = append(f.nodeAllocator.freeNode16, f.nodeAllocator.unusedNode16...)
f.nodeAllocator.unusedNode16 = f.nodeAllocator.unusedNode16[:0]
}
if len(f.nodeAllocator.unusedNode48) > 0 {
f.nodeAllocator.freeNode48 = append(f.nodeAllocator.freeNode48, f.nodeAllocator.unusedNode48...)
f.nodeAllocator.unusedNode48 = f.nodeAllocator.unusedNode48[:0]
}
f.nodeAllocator.isUnusedNodeFreeing.Store(false)
}
39 changes: 24 additions & 15 deletions internal/unionstore/art/art_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,36 +36,38 @@ func (t *ART) SnapshotGetter() *SnapGetter {
}
}

// SnapshotIter returns an Iterator for a snapshot of MemBuffer.
func (t *ART) SnapshotIter(start, end []byte) *SnapIter {
inner, err := t.Iter(start, end)
func (t *ART) newSnapshotIterator(start, end []byte, desc bool) *SnapIter {
var (
inner *Iterator
err error
)
if desc {
inner, err = t.IterReverse(start, end)
} else {
inner, err = t.Iter(start, end)
}
if err != nil {
panic(err)
}
it := &SnapIter{
Iterator: inner,
cp: t.getSnapshot(),
}
it.tree.allocator.snapshotInc()
for !it.setValue() && it.Valid() {
_ = it.Next()
}
return it
}

// SnapshotIter returns an Iterator for a snapshot of MemBuffer.
func (t *ART) SnapshotIter(start, end []byte) *SnapIter {
return t.newSnapshotIterator(start, end, false)
}

// SnapshotIterReverse returns a reverse Iterator for a snapshot of MemBuffer.
func (t *ART) SnapshotIterReverse(k, lowerBound []byte) *SnapIter {
inner, err := t.IterReverse(k, lowerBound)
if err != nil {
panic(err)
}
it := &SnapIter{
Iterator: inner,
cp: t.getSnapshot(),
}
for !it.setValue() && it.valid {
_ = it.Next()
}
return it
return t.newSnapshotIterator(k, lowerBound, true)
}

type SnapGetter struct {
Expand Down Expand Up @@ -112,6 +114,13 @@ func (i *SnapIter) Next() error {
return nil
}

// Close releases the resources of the iterator and related version.
// Make sure to call `Close` after the iterator is not used.
func (i *SnapIter) Close() {
i.Iterator.Close()
i.tree.allocator.snapshotDec()
}

func (i *SnapIter) setValue() bool {
if !i.Valid() {
return false
Expand Down
90 changes: 90 additions & 0 deletions internal/unionstore/art/art_snapshot_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2024 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package art

import (
"sync"
"testing"

"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/internal/unionstore/arena"
)

func TestSnapshotIteratorPreventFreeNode(t *testing.T) {
check := func(num int) {
tree := New()
for i := 0; i < num; i++ {
tree.Set([]byte{0, byte(i)}, []byte{0, byte(i)})
}
var unusedNodeSlice *[]arena.MemdbArenaAddr
switch num {
case 4:
unusedNodeSlice = &tree.allocator.nodeAllocator.unusedNode4
case 16:
unusedNodeSlice = &tree.allocator.nodeAllocator.unusedNode16
case 48:
unusedNodeSlice = &tree.allocator.nodeAllocator.unusedNode48
default:
panic("unsupported num")
}
it := tree.SnapshotIter(nil, nil)
require.Equal(t, 0, len(*unusedNodeSlice))
tree.Set([]byte{0, byte(num)}, []byte{0, byte(num)})
require.Equal(t, 1, len(*unusedNodeSlice))
it.Close()
require.Equal(t, 0, len(*unusedNodeSlice))
}

check(4)
check(16)
check(48)
}

func TestConcurrentSnapshotIterNoRace(t *testing.T) {
check := func(num int) {
tree := New()
for i := 0; i < num; i++ {
tree.Set([]byte{0, byte(i)}, []byte{0, byte(i)})
}

const concurrency = 100
it := tree.SnapshotIter(nil, nil)

tree.Set([]byte{0, byte(num)}, []byte{0, byte(num)})

var wg sync.WaitGroup
wg.Add(concurrency)
go func() {
it.Close()
wg.Done()
}()
for i := 1; i < concurrency; i++ {
go func(it *SnapIter) {
concurrentIt := tree.SnapshotIter(nil, nil)
concurrentIt.Close()
wg.Done()
}(it)
}
wg.Wait()

require.Empty(t, tree.allocator.nodeAllocator.unusedNode4)
require.Empty(t, tree.allocator.nodeAllocator.unusedNode16)
require.Empty(t, tree.allocator.nodeAllocator.unusedNode48)
}

check(4)
check(16)
check(48)
}
37 changes: 37 additions & 0 deletions internal/unionstore/memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1327,3 +1327,40 @@ func TestSelectValueHistory(t *testing.T) {
check(t, newRbtDBWithContext())
check(t, newArtDBWithContext())
}

func TestSnapshotReaderWithWrite(t *testing.T) {
check := func(db MemBuffer, num int) {
for i := 0; i < num; i++ {
db.Set([]byte{0, byte(i)}, []byte{0, byte(i)})
}
h := db.Staging()
defer db.Release(h)

iter := db.SnapshotIter([]byte{0, 0}, []byte{0, 255})
assert.Equal(t, iter.Key(), []byte{0, 0})

db.Set([]byte{0, byte(num)}, []byte{0, byte(num)}) // ART: node4/node16/node48 is freed and wait to be reused.

// ART: reuse the node4/node16/node48
for i := 0; i < num; i++ {
db.Set([]byte{1, byte(i)}, []byte{1, byte(i)})
}

for i := 0; i < num; i++ {
assert.True(t, iter.Valid())
assert.Equal(t, iter.Key(), []byte{0, byte(i)})
assert.Nil(t, iter.Next())
}
assert.False(t, iter.Valid())
iter.Close()
}

check(newRbtDBWithContext(), 4)
check(newArtDBWithContext(), 4)

check(newRbtDBWithContext(), 16)
check(newArtDBWithContext(), 16)

check(newRbtDBWithContext(), 48)
check(newArtDBWithContext(), 48)
}

0 comments on commit ef3badd

Please sign in to comment.